流的实现在Libuv里占了很大的篇幅,是非常核心的逻辑。流的本质是封装了对文件描述符的操作,例如读、写,连接、监听。我们首先看看数据结构,流在Libuv里用uv_stream_s表示,继承于uv_handle_s。
struct uv_stream_s {
// uv_handle_s的字段
void* data;
// 所属事件循环
uv_loop_t* loop;
// handle类型
uv_handle_type type;
// 关闭handle时的回调
uv_close_cb close_cb;
// 用于插入事件循环的handle队列
void* handle_queue[2];
union {
int fd;
void* reserved[4];
} u;
// 用于插入事件循环的closing阶段
uv_handle_t* next_closing;
// 各种标记
unsigned int flags;
// 流拓展的字段
/*
户写入流的字节大小,流缓存用户的输入,
然后等到可写的时候才执行真正的写
*/
size_t write_queue_size;
// 分配内存的函数,内存由用户定义,用来保存读取的数据
uv_alloc_cb alloc_cb;
// 读回调
uv_read_cb read_cb;
// 连接请求对应的结构体
uv_connect_t *connect_req;
/*
关闭写端的时候,发送完缓存的数据,
执行shutdown_req的回调(shutdown_req在uv_shutdown的时候赋值)
*/
uv_shutdown_t *shutdown_req;
/*
流对应的IO观察者
*/
uv__io_t io_watcher;
// 缓存待写的数据,该字段用于插入队列
void* write_queue[2];
// 已经完成了数据写入的队列,该字段用于插入队列
void* write_completed_queue[2];
// 有连接到来并且完成三次握手后,执行的回调
uv_connection_cb connection_cb;
// 操作流时出错码
int delayed_error;
// accept返回的通信socket对应的文件描述
int accepted_fd;
// 同上,用于IPC时,缓存多个传递的文件描述符
void* queued_fds;
}
流的实现中,最核心的字段是IO观察者,其余的字段是和流的性质相关的。IO观察者封装了流对应的文件描述符和文件描述符事件触发时的回调。比如读一个流、写一个流、关闭一个流、连接一个流、监听一个流,在uv_stream_s中都有对应的字段去支持。但是本质上是靠IO观察者去驱动的。
1 读一个流,就是IO观察者中的文件描述符的可读事件触发时,执行用户的读回调。 2 写一个流,先把数据写到流中,等到IO观察者中的文件描述符可写事件触发时,执行真正的写入,并执行用户的写结束回调。 3 关闭一个流,就是IO观察者中的文件描述符可写事件触发时,就会执行关闭流的写端。如果流中还有数据没有写完,则先写完(比如发送)后再执行关闭操作,接着执行用户的回调。 4 连接流,比如作为客户端去连接服务器。就是IO观察者中的文件描述符可读事件触发时(比如建立三次握手成功),执行用户的回调。 5 监听流,就是IO观察者中的文件描述符可读事件触发时(比如有完成三次握手的连接),执行用户的回调。
下面我们看一下流的具体实现
在使用uv_stream_t之前需要首先初始化,我们看一下如何初始化一个流。
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
int err;
// 记录handle的类型
uv__handle_init(loop, (uv_handle_t*)stream, type);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
stream->close_cb = NULL;
stream->connection_cb = NULL;
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
stream->write_queue_size = 0;
/*
初始化IO观察者,把文件描述符(这里还没有,所以是-1)和
回调uv__stream_io记录在io_watcher上,fd的事件触发时,统一
由uv__stream_io函数处理,但也会有特殊情况(下面会讲到)
*/
uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}
初始化一个流的逻辑很简单明了,就是初始化相关的字段,需要注意的是初始化IO观察者时,设置的处理函数是uv__stream_io,后面我们会分析这个函数的具体逻辑。
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
// 还没有设置fd或者设置的同一个fd则继续,否则返回UV_EBUSY
if (!(stream->io_watcher.fd == -1 ||
stream->io_watcher.fd == fd))
return UV_EBUSY;
// 设置流的标记
stream->flags |= flags;
// 是TCP流则可以设置下面的属性
if (stream->type == UV_TCP) {
// 关闭nagle算法
if ((stream->flags & UV_HANDLE_TCP_NODELAY) &&
uv__tcp_nodelay(fd, 1))
return UV__ERR(errno);
/*
开启keepalive机制
*/
if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
uv__tcp_keepalive(fd, 1, 60)) {
return UV__ERR(errno);
}
}
/*
保存socket对应的文件描述符到IO观察者中,Libuv会在
Poll IO阶段监听该文件描述符
*/
stream->io_watcher.fd = fd;
return 0;
}
打开一个流,本质上就是给这个流关联一个文件描述符,后续的操作的时候都是基于这个文件描述符的,另外还有一些属性的设置。
我们在一个流上执行uv_read_start后,流的数据(如果有的话)就会通过read_cb回调源源不断地流向调用方。
int uv_read_start(uv_stream_t* stream,
uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
// 流已经关闭,不能读
if (stream->flags & UV_HANDLE_CLOSING)
return UV_EINVAL;
// 流不可读,说明可能是只写流
if (!(stream->flags & UV_HANDLE_READABLE))
return -ENOTCONN;
// 标记正在读
stream->flags |= UV_HANDLE_READING;
// 记录读回调,有数据的时候会执行这个回调
stream->read_cb = read_cb;
// 分配内存函数,用于存储读取的数据
stream->alloc_cb = alloc_cb;
// 注册等待读事件
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
// 激活handle,有激活的handle,事件循环不会退出
uv__handle_start(stream);
return 0;
}
执行uv_read_start本质上是给流对应的文件描述符在epoll中注册了一个等待可读事件,并记录相应的上下文,比如读回调函数,分配内存的函数。接着打上正在做读取操作的标记。当可读事件触发的时候,读回调就会被执行,除了读取数据,还有一个读操作就是停止读取。对应的函数是uv_read_stop。
int uv_read_stop(uv_stream_t* stream) {
// 是否正在执行读取操作,如果不是,则没有必要停止
if (!(stream->flags & UV_HANDLE_READING))
return 0;
// 清除正在读取的标记
stream->flags &= ~UV_HANDLE_READING;
// 撤销等待读事件
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
// 对写事件也不感兴趣,停掉handle。允许事件循环退出
if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
stream->read_cb = NULL;
stream->alloc_cb = NULL;
return 0;
}
另外还有一个辅助函数,判断流是否设置了可读属性。
int uv_is_readable(const uv_stream_t* stream) {
return !!(stream->flags & UV_HANDLE_READABLE);
}
上面的函数只是注册和注销读事件,如果可读事件触发的时候,我们还需要自己去读取数据,我们看一下真正的读逻辑
static void uv__read(uv_stream_t* stream) {
uv_buf_t buf;
ssize_t nread;
struct msghdr msg;
char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
int count;
int err;
int is_ipc;
// 清除读取部分标记
stream->flags &= ~UV_STREAM_READ_PARTIAL;
count = 32;
/*
流是Unix域类型并且用于IPC,Unix域不一定用于IPC,
用作IPC可以支持传递文件描述符
*/
is_ipc = stream->type == UV_NAMED_PIPE &&
((uv_pipe_t*) stream)->ipc;
// 设置了读回调,正在读,count大于0
while (stream->read_cb
&& (stream->flags & UV_STREAM_READING)
&& (count-- > 0)) {
buf = uv_buf_init(NULL, 0);
// 调用调用方提供的分配内存函数,分配内存承载数据
stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
/*
不是IPC则直接读取数据到buf,否则用recvmsg读取数据
和传递的文件描述符(如果有的话)
*/
if (!is_ipc) {
do {
nread = read(uv__stream_fd(stream),
buf.base,
buf.len);
}
while (nread < 0 && errno == EINTR);
} else {
/* ipc uses recvmsg */
msg.msg_flags = 0;
msg.msg_iov = (struct iovec*) &buf;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_controllen = sizeof(cmsg_space);
msg.msg_control = cmsg_space;
do {
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
}
while (nread < 0 && errno == EINTR);
}
// 读失败
if (nread < 0) {
// 读繁忙
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 执行读回调
stream->read_cb(stream, 0, &buf);
} else {
/* Error. User should call uv_close(). */
// 读失败
stream->read_cb(stream, -errno, &buf);
}
return;
} else if (nread == 0) {
// 读到结尾了
uv__stream_eof(stream, &buf);
return;
} else {
// 读成功,读取数据的长度
ssize_t buflen = buf.len;
/*
是IPC则解析读取的数据,把文件描述符解析出来,
放到stream的accepted_fd和queued_fds字段
*/
if (is_ipc) {
err = uv__stream_recv_cmsg(stream, &msg);
if (err != 0) {
stream->read_cb(stream, err, &buf);
return;
}
}
// 执行读回调
stream->read_cb(stream, nread, &buf);
}
}
}
uv_read除了可以读取一般的数据外,还支持读取传递的文件描述符。我们看一下描述符传递的原理。我们知道,父进程fork出子进程的时候,子进程是继承父进程的文件描述符列表的。我们看一下进程和文件描述符的关系。 fork之前如图5-1所示。
我们再看一下fork之后的结构如图5-2所示。
如果父进程或者子进程在fork之后创建了新的文件描述符,那父子进程间就不能共享了,假设父进程要把一个文件描述符传给子进程,那怎么办呢?根据进程和文件描述符的关系。传递文件描述符要做的事情,不仅仅是在子进程中新建一个fd,还要建立起fd->file->inode的关联,不过我们不需要关注这些,因为操作系统都帮我们处理了,我们只需要通过sendmsg把想传递的文件描述符发送给Unix域的另一端。Unix域另一端就可以通过recvmsg把文件描述符从数据中读取出来。接着使用uv__stream_recv_cmsg函数保存数据里解析出来的文件描述符。
static int uv__stream_recv_cmsg(uv_stream_t* stream,
struct msghdr* msg) {
struct cmsghdr* cmsg;
// 遍历msg
for (cmsg = CMSG_FIRSTHDR(msg);
cmsg != NULL;
cmsg = CMSG_NXTHDR(msg, cmsg)) {
char* start;
char* end;
int err;
void* pv;
int* pi;
unsigned int i;
unsigned int count;
pv = CMSG_DATA(cmsg);
pi = pv;
start = (char*) cmsg;
end = (char*) cmsg + cmsg->cmsg_len;
count = 0;
while (start + CMSG_LEN(count * sizeof(*pi)) < end)
count++;
for (i = 0; i < count; i++) {
/*
accepted_fd代表当前待处理的文件描述符,
如果已经有值则剩余描述符就通过uv__stream_queue_fd排队
如果还没有值则先赋值
*/
if (stream->accepted_fd != -1) {
err = uv__stream_queue_fd(stream, pi[i]);
} else {
stream->accepted_fd = pi[i];
}
}
}
return 0;
}
uv__stream_recv_cmsg会从数据中解析出一个个文件描述符存到stream中,第一个文件描述符保存在accepted_fd,剩下的使用uv__stream_queue_fd处理。
struct uv__stream_queued_fds_s {
unsigned int size;
unsigned int offset;
int fds[1];
};
static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
uv__stream_queued_fds_t* queued_fds;
unsigned int queue_size;
// 原来的内存
queued_fds = stream->queued_fds;
// 没有内存,则分配
if (queued_fds == NULL) {
// 默认8个
queue_size = 8;
/*
一个元数据内存+多个fd的内存
(前面加*代表解引用后的值的类型所占的内存大小,
减一是因为uv__stream_queued_fds_t
结构体本身有一个空间)
*/
queued_fds = uv__malloc((queue_size - 1) *
sizeof(*queued_fds->fds) +
sizeof(*queued_fds));
if (queued_fds == NULL)
return UV_ENOMEM;
// 容量
queued_fds->size = queue_size;
// 已使用个数
queued_fds->offset = 0;
// 指向可用的内存
stream->queued_fds = queued_fds;
// 之前的内存用完了,扩容
} else if (queued_fds->size == queued_fds->offset) {
// 每次加8个
queue_size = queued_fds->size + 8;
queued_fds = uv__realloc(queued_fds,
(queue_size - 1) * sizeof(*queued_fds->fds) + sizeof(*queued_fds));
if (queued_fds == NULL)
return UV_ENOMEM;
// 更新容量大小
queued_fds->size = queue_size;
// 保存新的内存
stream->queued_fds = queued_fds;
}
/* Put fd in a queue */
// 保存fd
queued_fds->fds[queued_fds->offset++] = fd;
return 0;
}
内存结构如图5-3所示。
最后我们看一下读结束后的处理,
static void uv__stream_eof(uv_stream_t* stream,
const uv_buf_t* buf) {
// 打上读结束标记
stream->flags |= UV_STREAM_READ_EOF;
// 注销等待可读事件
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
// 没有注册等待可写事件则停掉handle,否则会影响事件循环的退出
if (!uv__io_active(&stream->io_watcher, POLLOUT))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
// 执行读回调
stream->read_cb(stream, UV_EOF, buf);
// 清除正在读标记
stream->flags &= ~UV_STREAM_READING;
}
我们看到,流结束的时候,首先注销等待可读事件,然后通过回调通知上层。
我们在流上执行uv_write就可以往流中写入数据。
int uv_write(
/*
一个写请求,记录了需要写入的数据和信息。
数据来自下面的const uv_buf_t bufs[]
*/
uv_write_t* req,
// 往哪个流写
uv_stream_t* handle,
// 需要写入的数据
const uv_buf_t bufs[],
// uv_buf_t个数
unsigned int nbufs,
// 写完后执行的回调
uv_write_cb cb
) {
return uv_write2(req, handle, bufs, nbufs, NULL, cb);
}
uv_write是直接调用uv_write2。第四个参数是NULL。代表是一般的写数据,不传递文件描述符。
int uv_write2(uv_write_t* req,
uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs,
uv_stream_t* send_handle,
uv_write_cb cb) {
int empty_queue;
// 待发送队列是否为空
empty_queue = (stream->write_queue_size == 0);
// 构造一个写请求
uv__req_init(stream->loop, req, UV_WRITE);
// 写请求对应的回调
req->cb = cb;
// 写请求对应的流
req->handle = stream;
req->error = 0;
// 需要发送的文件描述符,也可以是NULL说明不需要发送文件描述符
req->send_handle = send_handle;
QUEUE_INIT(&req->queue);
// bufs指向待写的数据
req->bufs = req->bufsml;
// 复制调用方的数据过来
memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
// buf个数
req->nbufs = nbufs;
// 当前写成功的buf索引,针对bufs数组
req->write_index = 0;
// 待写的数据大小 = 之前的大小 + 本次大小
stream->write_queue_size += uv__count_bufs(bufs, nbufs);
// 插入待写队列
QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
// 非空说明正在连接,还不能写,比如TCP流
if (stream->connect_req) {
/* Still connecting, do nothing. */
}
else if (empty_queue) { // 当前待写队列为空,直接写
uv__write(stream);
}
else {
// 还有数据没有写完,注册等待写事件
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
return 0;
}
uv_write2的主要逻辑就是封装一个写请求,插入到流的待写队列。然后根据当前流的情况。看是直接写入还是等待会再写入。关系图大致如图5-4所示。
uv_write2只是对写请求进行一些预处理,真正执行写的函数是uv__write
static void uv__write(uv_stream_t* stream) {
struct iovec* iov;
QUEUE* q;
uv_write_t* req;
int iovmax;
int iovcnt;
ssize_t n;
int err;
start:
// 没有数据需要写
if (QUEUE_EMPTY(&stream->write_queue))
return;
q = QUEUE_HEAD(&stream->write_queue);
req = QUEUE_DATA(q, uv_write_t, queue);
// 从哪里开始写
iov = (struct iovec*) &(req->bufs[req->write_index]);
// 还有多少没写
iovcnt = req->nbufs - req->write_index;
// 最多可以写多少
iovmax = uv__getiovmax();
// 取最小值
if (iovcnt > iovmax)
iovcnt = iovmax;
// 需要传递文件描述符
if (req->send_handle) {
int fd_to_send;
struct msghdr msg;
struct cmsghdr *cmsg;
union {
char data[64];
struct cmsghdr alias;
} scratch;
if (uv__is_closing(req->send_handle)) {
err = -EBADF;
goto error;
}
// 待发送的文件描述符
fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
memset(&scratch, 0, sizeof(scratch));
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iovcnt;
msg.msg_flags = 0;
msg.msg_control = &scratch.alias;
msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
{
void* pv = CMSG_DATA(cmsg);
int* pi = pv;
*pi = fd_to_send;
}
do {
// 使用sendmsg函数发送文件描述符
n = sendmsg(uv__stream_fd(stream), &msg, 0);
}
while (n == -1 && errno == EINTR);
} else {
do {
// 写一个或者写批量写
if (iovcnt == 1) {
n = write(uv__stream_fd(stream),
iov[0].iov_base,
iov[0].iov_len);
} else {
n = writev(uv__stream_fd(stream), iov, iovcnt);
}
}
while (n == -1 && errno == EINTR);
}
// 写失败
if (n < 0) {
/*
不是写繁忙,则报错,
否则如果设置了同步写标记,则继续尝试写
*/
if (errno != EAGAIN &&
errno != EWOULDBLOCK &&
errno != ENOBUFS) {
err = -errno;
goto error;
} else if (stream->flags & UV_STREAM_BLOCKING) {
/* If this is a blocking stream, try again. */
goto start;
}
} else {
// 写成功
while (n >= 0) {
// 当前buf首地址
uv_buf_t* buf = &(req->bufs[req->write_index]);
// 当前buf的数据长度
size_t len = buf->len;
// 小于说明当前buf还没有写完(还没有被消费完)
if ((size_t)n < len) {
// 更新待写的首地址
buf->base += n;
// 更新待写的数据长度
buf->len -= n;
/*
更新待写队列的长度,这个队列是待写数据的
总长度,等于多个buf的和
*/
stream->write_queue_size -= n;
n = 0;
/*
还没写完,设置了同步写,则继续尝试写,
否则退出,注册待写事件
*/
if (stream->flags & UV_STREAM_BLOCKING) {
goto start;
} else {
break;
}
} else {
/*
当前buf的数据都写完了,则更新待写数据的的首
地址,即下一个buf,因为当前buf写完了
*/
req->write_index++;
// 更新n,用于下一个循环的计算
n -= len;
// 更新待写队列的长度
stream->write_queue_size -= len;
/*
等于最后一个buf了,说明待写队列的数据
都写完了
*/
if (req->write_index == req->nbufs) {
/*
释放buf对应的内存,并把请求插入写完成
队列,然后准备触发写完成回调
*/
uv__write_req_finish(req);
return;
}
}
}
}
/*
写成功,但是还没有写完,注册待写事件,
等待可写的时候继续写
*/
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
return;
// 写出错
error:
// 记录错误
req->error = err;
/*
释放内存,丢弃数据,插入写完成队列,
把IO观察者插入pending队列,等待pending阶段执行回调
*/
uv__write_req_finish(req);
// 注销待写事件
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
// 如果也没有注册等待可读事件,则把handle关闭
if (!uv__io_active(&stream->io_watcher, POLLIN))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
我们看一下一个写请求结束后(成功或者失败),Libuv如何处理的。逻辑在uv__write_req_finish函数。
static void uv__write_req_finish(uv_write_t* req) {
uv_stream_t* stream = req->handle;
// 从待写队列中移除
QUEUE_REMOVE(&req->queue);
// 写成功,并且分配了额外的堆内存,则需要释放,见uv__write
if (req->error == 0) {
if (req->bufs != req->bufsml)
uv__free(req->bufs);
req->bufs = NULL;
}
// 插入写完成队列
QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
/*
把IO观察者插入pending队列,Libuv在处理pending阶段时,
会触发IO观察者的写事件
*/
uv__io_feed(stream->loop, &stream->io_watcher);
}
uv__write_req_finish的逻辑比较简单
1把节点从待写队列中移除 2 req->bufs != req->bufsml不相等说明分配了堆内存,需要自己释放 3并把请求插入写完成队列,把IO观察者插入pending队列,等待pending阶段执行回调,在pending节点会执行IO观察者的回调(uv__stream_io)。
我们看一下uv__stream_io如何处理的,下面是具体的处理逻辑。
// 可写事件触发
if (events & (POLLOUT | POLLERR | POLLHUP)) {
// 继续执行写
uv__write(stream);
// 处理写成功回调
uv__write_callbacks(stream);
// 待写队列空,注销等待可写事件,即不需要写了
if (QUEUE_EMPTY(&stream->write_queue))
uv__drain(stream);
}
我们只关注uv__write_callbacks。
static void uv__write_callbacks(uv_stream_t* stream) {
uv_write_t* req;
QUEUE* q;
// 写完成队列非空
while (!QUEUE_EMPTY(&stream->write_completed_queue)) {
q = QUEUE_HEAD(&stream->write_completed_queue);
req = QUEUE_DATA(q, uv_write_t, queue);
QUEUE_REMOVE(q);
uv__req_unregister(stream->loop, req);
// bufs的内存还没有被释放
if (req->bufs != NULL) {
// 更新待写队列的大小,即减去req对应的所有数据的大小
stream->write_queue_size -= uv__write_req_size(req);
/*
bufs默认指向bufsml,超过默认大小时,
bufs指向新申请的堆内存,所以需要释放
*/
if (req->bufs != req->bufsml)
uv__free(req->bufs);
req->bufs = NULL;
}
// 执行回调
if (req->cb)
req->cb(req, req->error);
}
}
uv__write_callbacks负责更新流的待写队列大小、释放额外申请的堆内存、执行每个写请求的回调。
// 关闭流的写端
int uv_shutdown(uv_shutdown_t* req,
uv_stream_t* stream,
uv_shutdown_cb cb) {
// 初始化一个关闭请求,关联的handle是stream
uv__req_init(stream->loop, req, UV_SHUTDOWN);
req->handle = stream;
// 关闭后执行的回调
req->cb = cb;
stream->shutdown_req = req;
// 设置正在关闭的标记
stream->flags |= UV_HANDLE_SHUTTING;
// 注册等待可写事件
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
return 0;
}
关闭流的写端就是相当于给流发送一个关闭请求,把请求挂载到流中,然后注册等待可写事件,在可写事件触发的时候就会执行关闭操作。在分析写流的章节中我们提到,可写事件触发的时候,会执行uvdrain注销等待可写事件,除此之外,uvdrain还做了一个事情,就是关闭流的写端。我们看看具体的逻辑。
static void uv__drain(uv_stream_t* stream) {
uv_shutdown_t* req;
int err;
// 撤销等待可写事件,因为没有数据需要写入了
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
// 设置了关闭写端,但是还没有关闭,则执行关闭写端
if ((stream->flags & UV_HANDLE_SHUTTING) &&
!(stream->flags & UV_HANDLE_CLOSING) &&
!(stream->flags & UV_HANDLE_SHUT)) {
req = stream->shutdown_req;
stream->shutdown_req = NULL;
// 清除标记
stream->flags &= ~UV_HANDLE_SHUTTING;
uv__req_unregister(stream->loop, req);
err = 0;
// 关闭写端
if (shutdown(uv__stream_fd(stream), SHUT_WR))
err = UV__ERR(errno);
// 标记已关闭写端
if (err == 0)
stream->flags |= UV_HANDLE_SHUT;
// 执行回调
if (req->cb != NULL)
req->cb(req, err);
}
}
通过调用shutdown关闭流的写端,比如TCP流发送完数据后可以关闭写端。但是仍然可以读。
void uv__stream_close(uv_stream_t* handle) {
unsigned int i;
uv__stream_queued_fds_t* queued_fds;
// 从事件循环中删除IO观察者,移出pending队列
uv__io_close(handle->loop, &handle->io_watcher);
// 停止读
uv_read_stop(handle);
// 停掉handle
uv__handle_stop(handle);
// 不可读、写
handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
// 关闭非标准流的文件描述符
if (handle->io_watcher.fd != -1) {
/*
Don't close stdio file descriptors.
Nothing good comes from it.
*/
if (handle->io_watcher.fd > STDERR_FILENO)
uv__close(handle->io_watcher.fd);
handle->io_watcher.fd = -1;
}
// 关闭通信socket对应的文件描述符
if (handle->accepted_fd != -1) {
uv__close(handle->accepted_fd);
handle->accepted_fd = -1;
}
// 同上,这是在排队等待处理的文件描述符
if (handle->queued_fds != NULL) {
queued_fds = handle->queued_fds;
for (i = 0; i < queued_fds->offset; i++)
uv__close(queued_fds->fds[i]);
uv__free(handle->queued_fds);
handle->queued_fds = NULL;
}
}
关闭流就是把流注册在epoll的事件注销,关闭所持有的文件描述符。
连接流是针对TCP和Unix域的,所以我们首先介绍一下一些网络编程相关的内容,首先我们先要有一个socket。我们看Libuv中如何新建一个socket。
int uv__socket(int domain, int type, int protocol) {
int sockfd;
int err;
// 新建一个socket,并设置非阻塞和LOEXEC标记
sockfd = socket(domain, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
// 不触发SIGPIPE信号,比如对端已经关闭,本端又执行写
#if defined(SO_NOSIGPIPE)
{
int on = 1;
setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &on, sizeof(on));
}
#endif
return sockfd;
}
在Libuv中,socket的模式都是非阻塞的,uv__socket是Libuv中申请socket的函数,不过Libuv不直接调用该函数,而是封装了一下。
/*
1 获取一个新的socket fd
2 把fd保存到handle里,并根据flag进行相关设置
3 绑定到本机随意的地址(如果设置了该标记的话)
*/
static int new_socket(uv_tcp_t* handle,
int domain,
unsigned long flags) {
struct sockaddr_storage saddr;
socklen_t slen;
int sockfd;
// 获取一个socket
sockfd = uv__socket(domain, SOCK_STREAM, 0);
// 设置选项和保存socket的文件描述符到IO观察者中
uv__stream_open((uv_stream_t*) handle, sockfd, flags);
// 设置了需要绑定标记UV_HANDLE_BOUND
if (flags & UV_HANDLE_BOUND) {
slen = sizeof(saddr);
memset(&saddr, 0, sizeof(saddr));
// 获取fd对应的socket信息,比如IP,端口,可能没有
getsockname(uv__stream_fd(handle),
(struct sockaddr*) &saddr,
&slen);
// 绑定到socket中,如果没有则绑定到系统随机选择的地址
bind(uv__stream_fd(handle),(struct sockaddr*) &saddr, slen);
}
return 0;
}
上面的代码就是在Libuv申请一个socket的逻辑,另外它还支持新建的socket,可以绑定到一个用户设置的,或者操作系统随机选择的地址。不过Libuv并不直接使用这个函数。而是又封装了一层。
// 如果流还没有对应的fd,则申请一个新的,如果有则修改流的配置
static int maybe_new_socket(uv_tcp_t* handle,
int domain,
unsigned long flags) {
struct sockaddr_storage saddr;
socklen_t slen;
// 已经有fd了
if (uv__stream_fd(handle) != -1) {
// 该流需要绑定到一个地址
if (flags & UV_HANDLE_BOUND) {
/*
流是否已经绑定到一个地址了。handle的flag是在
new_socket里设置的,如果有这个标记说明已经执行过绑定了,
直接更新flags就行。
*/
if (handle->flags & UV_HANDLE_BOUND) {
handle->flags |= flags;
return 0;
}
// 有fd,但是可能还没绑定到一个地址
slen = sizeof(saddr);
memset(&saddr, 0, sizeof(saddr));
// 获取socket绑定到的地址
if (getsockname(uv__stream_fd(handle),
(struct sockaddr*) &saddr,
&slen))
return UV__ERR(errno);
// 绑定过了socket地址,则更新flags就行
if ((saddr.ss_family == AF_INET6 &&
((struct sockaddr_in6*) &saddr)->sin6_port != 0) ||
(saddr.ss_family == AF_INET &&
((struct sockaddr_in*) &saddr)->sin_port != 0)) {
handle->flags |= flags;
return 0;
}
// 没绑定则绑定到随机地址,bind中实现
if (bind(uv__stream_fd(handle),
(struct sockaddr*) &saddr,
slen))
return UV__ERR(errno);
}
handle->flags |= flags;
return 0;
}
// 申请一个新的fd关联到流
return new_socket(handle, domain, flags);
}
maybe_new_socket函数的逻辑分支很多,主要如下 1 如果流还没有关联到fd,则申请一个新的fd关联到流上 2 如果流已经关联了一个fd。 如果流设置了绑定地址的标记,但是已经通过Libuv绑定了一个地址(Libuv会设置UV_HANDLE_BOUND标记,用户也可能是直接调bind函数绑定了)。则不需要再次绑定,更新flags就行。 如果流设置了绑定地址的标记,但是还没有通过Libuv绑定一个地址,这时候通过getsocketname判断用户是否自己通过bind函数绑定了一个地址,是的话则不需要再次执行绑定操作。否则随机绑定到一个地址。
以上两个函数的逻辑主要是申请一个socket和给socket绑定一个地址。下面我们开看一下连接流的实现。
int uv__tcp_connect(uv_connect_t* req,
uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int addrlen,
uv_connect_cb cb) {
int err;
int r;
// 已经发起了connect了
if (handle->connect_req != NULL)
return UV_EALREADY;
// 申请一个socket和绑定一个地址,如果还没有的话
err = maybe_new_socket(handle, addr->sa_family,
UV_HANDLE_READABLE | UV_HANDLE_WRITABLE
if (err)
return err;
handle->delayed_error = 0;
do {
// 清除全局错误变量的值
errno = 0;
// 非阻塞发起三次握手
r = connect(uv__stream_fd(handle), addr, addrlen);
} while (r == -1 && errno == EINTR);
if (r == -1 && errno != 0) {
// 三次握手还没有完成
if (errno == EINPROGRESS)
; /* not an error */
else if (errno == ECONNREFUSED)
// 对方拒绝建立连接,延迟报错
handle->delayed_error = UV__ERR(errno);
else
// 直接报错
return UV__ERR(errno);
}
// 初始化一个连接型request,并设置某些字段
uv__req_init(handle->loop, req, UV_CONNECT);
req->cb = cb;
req->handle = (uv_stream_t*) handle;
QUEUE_INIT(&req->queue);
// 连接请求
handle->connect_req = req;
// 注册到Libuv观察者队列
uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
// 连接出错,插入pending队尾
if (handle->delayed_error)
uv__io_feed(handle->loop, &handle->io_watcher);
return 0;
}
连接流的逻辑,大致如下 1 申请一个socket,绑定一个地址。 2 根据给定的服务器地址,发起三次握手,非阻塞的,会直接返回继续执行,不会等到三次握手完成。 3 往流上挂载一个connect型的请求。 4 设置IO观察者感兴趣的事件为可写。然后把IO观察者插入事件循环的IO观察者队列。等待可写的时候时候(完成三次握手),就会执行cb回调。
可写事件触发时,会执行uv__stream_io,我们看一下具体的逻辑。
if (stream->connect_req) {
uv__stream_connect(stream);
return;
}
我们继续看uv__stream_connect。
static void uv__stream_connect(uv_stream_t* stream) {
int error;
uv_connect_t* req = stream->connect_req;
socklen_t errorsize = sizeof(int);
// 连接出错
if (stream->delayed_error) {
error = stream->delayed_error;
stream->delayed_error = 0;
} else {
// 还是需要判断一下是不是出错了
getsockopt(uv__stream_fd(stream),
SOL_SOCKET,
SO_ERROR,
&error,
&errorsize);
error = UV__ERR(error);
}
// 还没连接成功,先返回,等待下次可写事件的触发
if (error == UV__ERR(EINPROGRESS))
return;
// 清空
stream->connect_req = NULL;
uv__req_unregister(stream->loop, req);
/*
连接出错则注销之前注册的等待可写队列,
连接成功如果待写队列为空,也注销事件,有数据需要写的时候再注册
*/
if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
}
// 执行回调,通知上层连接结果
if (req->cb)
req->cb(req, error);
if (uv__stream_fd(stream) == -1)
return;
// 连接失败,清空待写的数据和执行写请求的回调(如果有的话)
if (error < 0) {
uv__stream_flush_write_queue(stream, UV_ECANCELED);
uv__write_callbacks(stream);
}
}
连接流的逻辑是 1发起非阻塞式连接 2 注册等待可写事件 3 可写事件触发时,把连接结果告诉调用方 4 连接成功则发送写队列的数据,连接失败则清除写队列的数据并执行每个写请求的回调(有的话)。
监听流是针对TCP或Unix域的,主要是把一个socket变成listen状态。并且设置一些属性。
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
static int single_accept = -1;
unsigned long flags;
int err;
if (tcp->delayed_error)
return tcp->delayed_error;
// 是否设置了不连续accept。默认是连续accept。
if (single_accept == -1) {
const char* val = getenv("UV_TCP_SINGLE_ACCEPT");
single_accept = (val != NULL && atoi(val) != 0);
}
// 设置不连续accept
if (single_accept)
tcp->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT;
flags = 0;
/*
可能还没有用于listen的fd,socket地址等。
这里申请一个socket和绑定到一个地址
(如果调listen之前没有调bind则绑定到随机地址)
*/
err = maybe_new_socket(tcp, AF_INET, flags);
if (err)
return err;
// 设置fd为listen状态
if (listen(tcp->io_watcher.fd, backlog))
return UV__ERR(errno);
// 建立连接后的业务回调
tcp->connection_cb = cb;
tcp->flags |= UV_HANDLE_BOUND;
// 设置io观察者的回调,由epoll监听到连接到来时执行
tcp->io_watcher.cb = uv__server_io;
/*
插入观察者队列,这时候还没有增加到epoll,
Poll IO阶段再遍历观察者队列进行处理(epoll_ctl)
*/
uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);
return 0;
}
监听流的逻辑看起来很多,但是主要的逻辑是把流对的fd改成listen状态,这样流就可以接收连接请求了。接着设置连接到来时执行的回调。最后注册IO观察者到事件循环。等待连接到来。就会执行uvserver_io。uvserver_io再执行connection_cb。监听流和其它流有一个区别是,当IO观察者的事件触发时,监听流执行的回调是uvserver_io函数。而其它流是在uvstream_io里统一处理。我们看一下连接到来或者Unix域上有数据到来时的处理逻辑。
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
int err;
stream = container_of(w, uv_stream_t, io_watcher);
// 注册等待可读事件
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
while (uv__stream_fd(stream) != -1) {
/*
通过accept拿到和客户端通信的fd,我们看到这个
fd和服务器的fd是不一样的
*/
err = uv__accept(uv__stream_fd(stream));
// 错误处理
if (err < 0) {
/*
uv__stream_fd(stream)对应的fd是非阻塞的,
返回这个错说明没有连接可用accept了,直接返回
*/
if (err == -EAGAIN || err == -EWOULDBLOCK)
return; /* Not an error. */
if (err == -ECONNABORTED)
continue;
// 进程的打开的文件描述符个数达到阈值,看是否有备用的
if (err == -EMFILE || err == -ENFILE) {
err = uv__emfile_trick(loop, uv__stream_fd(stream));
if (err == -EAGAIN || err == -EWOULDBLOCK)
break;
}
// 发生错误,执行回调
stream->connection_cb(stream, err);
continue;
}
// 记录拿到的通信socket对应的fd
stream->accepted_fd = err;
// 执行上传回调
stream->connection_cb(stream, 0);
/*
stream->accepted_fd为-1说明在回调connection_cb里已经消费
了 accepted_fd,否则先注销服务器在epoll中的fd的读事件,等
待消费后再注册,即不再处理请求了
*/
if (stream->accepted_fd != -1) {
/*
The user hasn't yet accepted called uv_accept()
*/
uv__io_stop(loop, &stream->io_watcher, POLLIN);
return;
}
/*
是TCP类型的流并且设置每次只accpet一个连接,则定时阻塞,
被唤醒后再accept,否则一直accept(如果用户在connect回
调里消费了accept_fd的话),定时阻塞用于多进程竞争处理连接
*/
if (stream->type == UV_TCP &&
(stream->flags & UV_TCP_SINGLE_ACCEPT)) {
struct timespec timeout = { 0, 1 };
nanosleep(&timeout, NULL);
}
}
}
我们看到连接到来时,Libuv会从已完成连接的队列中摘下一个节点,然后执行connection_cb回调。在connection_cb回调里,需要uv_accept消费accpet_fd。
int uv_accept(uv_stream_t* server, uv_stream_t* client) {
int err;
switch (client->type) {
case UV_NAMED_PIPE:
case UV_TCP:
// 把文件描述符保存到client
err = uv__stream_open(client,
server->accepted_fd,
UV_STREAM_READABLE
| UV_STREAM_WRITABLE);
if (err) {
uv__close(server->accepted_fd);
goto done;
}
break;
case UV_UDP:
err = uv_udp_open((uv_udp_t*) client,
server->accepted_fd);
if (err) {
uv__close(server->accepted_fd);
goto done;
}
break;
default:
return -EINVAL;
}
client->flags |= UV_HANDLE_BOUND;
done:
// 非空则继续放一个到accpet_fd中等待accept,用于文件描述符传递
if (server->queued_fds != NULL) {
uv__stream_queued_fds_t* queued_fds;
queued_fds = server->queued_fds;
// 把第一个赋值到accept_fd
server->accepted_fd = queued_fds->fds[0];
/*
offset减去一个单位,如果没有了,则释放内存,
否则需要把后面的往前挪,offset执行最后一个
*/
if (--queued_fds->offset == 0) {
uv__free(queued_fds);
server->queued_fds = NULL;
} else {
memmove(queued_fds->fds,
queued_fds->fds + 1,
queued_fds->offset * sizeof(*queued_fds->fds));
}
} else {
// 没有排队的fd了,则注册等待可读事件,等待accept新的fd
server->accepted_fd = -1;
if (err == 0)
uv__io_start(server->loop, &server->io_watcher, POLLIN);
}
return err;
}
client是用于和客户端进行通信的流,accept就是把accept_fd保存到client中,client就可以通过fd和对端进行通信了。消费完accept_fd后,如果还有待处理的fd的话,需要补充一个到accept_fd(针对Unix域),其它的继续排队等待处理,如果没有待处理的fd则注册等待可读事件,继续处理新的连接。
当我们不再需要一个流的时候,我们会首先调用uv_close关闭这个流,关闭流只是注销了事件和释放了文件描述符,调用uv_close之后,流对应的结构体就会被加入到closing队列,在closing阶段的时候,才会执行销毁流的操作,比如丢弃还没有写完成的数据,执行对应流的回调,我们看看销毁流的函数uv__stream_destroy。
void uv__stream_destroy(uv_stream_t* stream) {
// 正在连接,则执行回调
if (stream->connect_req) {
uv__req_unregister(stream->loop, stream->connect_req);
stream->connect_req->cb(stream->connect_req, -ECANCELED);
stream->connect_req = NULL;
}
// 丢弃待写的数据,如果有的话
uv__stream_flush_write_queue(stream, -ECANCELED);
// 处理写完成队列,这里是处理被丢弃的数据
uv__write_callbacks(stream);
// 正在关闭流,直接回调
if (stream->shutdown_req) {
uv__req_unregister(stream->loop, stream->shutdown_req);
stream->shutdown_req->cb(stream->shutdown_req, -ECANCELED);
stream->shutdown_req = NULL;
}
}
我们看到,销毁流的时候,如果流中还有待写的数据,则会丢弃。我们看一下uv__stream_flush_write_queue和uv__write_callbacks。
void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
uv_write_t* req;
QUEUE* q;
while (!QUEUE_EMPTY(&stream->write_queue)) {
q = QUEUE_HEAD(&stream->write_queue);
QUEUE_REMOVE(q);
req = QUEUE_DATA(q, uv_write_t, queue);
// 把错误写到每个请求中
req->error = error;
QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
}
}
uv__stream_flush_write_queue丢弃待写队列中的请求,并直接插入写完成队列中。uv__write_callbacks是写完或者写出错时执行的函数,它逐个处理写完成队列中的节点,每个节点是一个写请求,执行它的回调,如何分配了堆内存,则释放内存。在写流章节已经分析,不再具体展开。
在流的实现中,读写等操作都只是注册事件到epoll,事件触发的时候,会执行统一的回调函数uv__stream_io。下面列一下该函数的代码,具体实现在其它章节已经分析。
static void uv__stream_io(uv_loop_t* loop,
uv__io_t* w,
unsigned int events) {
uv_stream_t* stream;
stream = container_of(w, uv_stream_t, io_watcher);
// 是连接流,则执行连接处理函数
if (stream->connect_req) {
uv__stream_connect(stream);
return;
}
/*
Ignore POLLHUP here. Even it it's set,
there may still be data to read.
*/
// 可读是触发,则执行读处理
if (events & (POLLIN | POLLERR | POLLHUP))
uv__read(stream);
// 读回调关闭了流
if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */
/* ¬¬
POLLHUP说明对端关闭了,即不会发生数据过来了。
如果流的模式是持续读,
1 如果只读取了部分(设置UV_STREAM_READ_PARTIAL),
并且没有读到结尾(没有设置UV_STREAM_READ_EOF),
则直接作读结束处理,
2 如果只读取了部分,上面的读回调执行了读结束操作,
则这里就不需要处理了
3 如果没有设置只读了部分,还没有执行读结束操作,
则不能作读结束操作,因为对端虽然关闭了,但是之
前的传过来的数据可能还没有被消费完
4 如果没有设置只读了部分,执行了读结束操作,那这
里也不需要处理
*/
if ((events & POLLHUP) &&
(stream->flags & UV_STREAM_READING) &&
(stream->flags & UV_STREAM_READ_PARTIAL) &&
!(stream->flags & UV_STREAM_READ_EOF)) {
uv_buf_t buf = { NULL, 0 };
uv__stream_eof(stream, &buf);
}
if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */
// 可写事件触发
if (events & (POLLOUT | POLLERR | POLLHUP)) {
// 写数据
uv__write(stream);
// 写完后做后置处理,释放内存,执行回调等
uv__write_callbacks(stream);
// 待写队列为空,则注销等待写事件
if (QUEUE_EMPTY(&stream->write_queue))
uv__drain(stream);
}
}