Node.js属于单线程事件循环架构,该事件循环由Libuv的uv_run函数实现,在该函数中执行while循环,然后不断地处理各个阶段(phase)的事件回调。事件循环的处理相当于一个消费者,消费由各种代码产生的任务。Node.js初始化完成后就开始陷入该事件循环中,事件循环的结束也就意味着Node.js的结束。下面看一下事件循环的核心代码。
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
// 在uv_run之前要先提交任务到loop
r = uv__loop_alive(loop);
// 事件循环没有任务执行,即将退出,设置一下当前循环的时间
if (!r)
uv__update_time(loop);
// 没有任务需要处理或者调用了uv_stop则退出事件循环
while (r != 0 && loop->stop_flag == 0) {
// 更新loop的time字段
uv__update_time(loop);
// 执行超时回调
uv__run_timers(loop);
/*
执行pending回调,ran_pending代表pending队列是否为空,
即没有节点可以执行
*/
ran_pending = uv__run_pending(loop);
// 继续执行各种队列
uv__run_idle(loop);
uv__run_prepare(loop);
timeout = 0;
/*
执行模式是UV_RUN_ONCE时,如果没有pending节点,
才会阻塞式Poll IO,默认模式也是
*/
if ((mode == UV_RUN_ONCE && !ran_pending) ||
mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
// Poll IO timeout是epoll_wait的超时时间
uv__io_poll(loop, timeout);
// 处理check阶段
uv__run_check(loop);
// 处理close阶段
uv__run_closing_handles(loop);
/*
还有一次执行超时回调的机会,因为uv__io_poll可能是因为
定时器超时返回的。
*/
if (mode == UV_RUN_ONCE) {
uv__update_time(loop);
uv__run_timers(loop);
}
r = uv__loop_alive(loop);
/*
只执行一次,退出循环,UV_RUN_NOWAIT表示在Poll IO阶段
不会阻塞并且循环只执行一次
*/
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}
// 是因为调用了uv_stop退出的,重置flag
if (loop->stop_flag != 0)
loop->stop_flag = 0;
/*
返回是否还有活跃的任务(handle或request),
业务代表可以再次执行uv_run
*/
return r;
}
Libuv分为几个阶段,下面从先到后,分别分析各个阶段的相关代码。
Libuv中,定时器阶段是第一个被处理的阶段。定时器是以最小堆实现的,最快过期的节点是根节点。Libuv在每次事件循环开始的时候都会缓存当前的时间,在每一轮的事件循环中,使用的都是这个缓存的时间,必要的时候Libuv会显式更新这个时间,因为获取时间需要调用操作系统提供的接口,而频繁调用系统调用会带来一定的耗时,缓存时间可以减少操作系统的调用,提高性能。Libuv缓存了当前最新的时间后,就执行uv__run_timers,该函数就是遍历最小堆,找出当前超时的节点。因为堆的性质是父节点肯定比孩子小。并且根节点是最小的,所以如果一个根节点,它没有超时,则后面的节点也不会超时。对于超时的节点就执行它的回调。我们看一下具体的逻辑。
void uv__run_timers(uv_loop_t* loop) {
struct heap_node* heap_node;
uv_timer_t* handle;
// 遍历二叉堆
for (;;) {
// 找出最小的节点
heap_node = heap_min(timer_heap(loop));
// 没有则退出
if (heap_node == NULL)
break;
// 通过结构体字段找到结构体首地址
handle = container_of(heap_node, uv_timer_t, heap_node);
// 最小的节点都没有超市,则后面的节点也不会超时
if (handle->timeout > loop->time)
break;
// 删除该节点
uv_timer_stop(handle);
/*
重试插入二叉堆,如果需要的话(设置了repeat,比如
setInterval)
*/
uv_timer_again(handle);
// 执行回调
handle->timer_cb(handle);
}
}
执行完回调后,还有两个关键的操作,第一就是stop,第二就是again。stop的逻辑很简单,就是把handle从二叉堆中删除,并且修改handle的状态。那么again又是什么呢?again是为了支持setInterval这种场景,如果handle设置了repeat标记,则该handle在超时后,每repeat的时间后,就会继续执行超时回调。对于setInterval,就是超时时间是x,每x的时间后,执行回调。这就是Node.js里定时器的底层原理。但Node.js不是每次调setTimeout/setInterval的时候都往最小堆插入一个节点,Node.js里,只有一个关于uv_timer_s的handle,它在JS层维护了一个数据结构,每次计算出最早到期的节点,然后修改handle的超时时间,具体在定时器章节讲解。 另外timer阶段和Poll IO阶段也有一些联系,因为Poll IO可能会导致主线程阻塞,为了保证主线程可以尽快执行定时器的回调,Poll IO不能一直阻塞,所以这时候,阻塞的时长就是最快到期的定时器节点的时长(具体可参考libuv core.c中的uv_backend_timeout函数)。
官网对pending阶段的解释是在上一轮的Poll IO阶段没有执行的IO回调,会在下一轮循环的pending阶段被执行。从源码来看,Poll IO阶段处理任务时,在某些情况下,如果当前执行的操作失败需要执行回调通知调用方一些信息,该回调函数不会立刻执行,而是在下一轮事件循环的pending阶段执行(比如写入数据成功,或者TCP连接失败时回调C++层),我们先看pending阶段的处理。
static int uv__run_pending(uv_loop_t* loop) {
QUEUE* q;
QUEUE pq;
uv__io_t* w;
if (QUEUE_EMPTY(&loop->pending_queue))
return 0;
// 把pending_queue队列的节点移到pq,即清空了pending_queue
QUEUE_MOVE(&loop->pending_queue, &pq);
// 遍历pq队列
while (!QUEUE_EMPTY(&pq)) {
// 取出当前第一个需要处理的节点,即pq.next
q = QUEUE_HEAD(&pq);
// 把当前需要处理的节点移出队列
QUEUE_REMOVE(q);
/*
重置一下prev和next指针,因为这时候这两个指针是
指向队列中的两个节点
*/
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, pending_queue);
w->cb(loop, w, POLLOUT);
}
return 1;
}
pending阶段的处理逻辑就是把pending队列里的节点逐个执行。我们看一下pending队列的节点是如何生产出来的。
void uv__io_feed(uv_loop_t* loop, uv__io_t* w) {
if (QUEUE_EMPTY(&w->pending_queue))
QUEUE_INSERT_TAIL(&loop->pending_queue, &w->pending_queue);
}
Libuv通过uv__io_feed函数生产pending任务,从Libuv的代码中我们看到IO错误的时候会调这个函数(如tcp.c的uv__tcp_connect函数)。
if (handle->delayed_error)
uv__io_feed(handle->loop, &handle->io_watcher);
在写入数据成功后(比如TCP、UDP),也会往pending队列插入一个节点,等待回调。比如发送数据成功后执行的代码(udp.c的uv__udp_sendmsg函数)
// 发送完移出写队列
QUEUE_REMOVE(&req->queue);
// 加入写完成队列
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
/*
有节点数据写完了,把IO观察者插入pending队列,
pending阶段执行回调
*/
uv__io_feed(handle->loop, &handle->io_watcher);
最后关闭IO的时候(如关闭一个TCP连接)会从pending队列移除对应的节点,因为已经关闭了,自然就不需要执行回调。
void uv__io_close(uv_loop_t* loop, uv__io_t* w) {
uv__io_stop(loop,
w,
POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI);
QUEUE_REMOVE(&w->pending_queue);
}
prepare,check,idle是Libuv事件循环中属于比较简单的一个阶段,它们的实现是一样的(见loop-watcher.c)。本节只讲解prepare阶段,我们知道Libuv中分为handle和request,而prepare阶段的任务是属于handle类型。这意味着除非我们显式移除,否则prepare阶段的节点在每次事件循环中都会被执行。下面我们先看看怎么使用它。
void prep_cb(uv_prepare_t *handle) {
printf("Prep callback\n");
}
int main() {
uv_prepare_t prep;
// 初始化一个handle,uv_default_loop是事件循环的核心结构体
uv_prepare_init(uv_default_loop(), &prep);
// 注册handle的回调
uv_prepare_start(&prep, prep_cb);
// 开始事件循环
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
return 0;
}
执行main函数,Libuv就会在prepare阶段执行回调prep_cb。我们分析一下这个过程。
int uv_prepare_init(uv_loop_t* loop, uv_prepare_t* handle) {
uv__handle_init(loop, (uv_handle_t*)handle, UV_PREPARE);
handle->prepare_cb = NULL;
return 0;
}
init函数主要是做一些初始化操作。我们继续要看start函数。
int uv_prepare_start(uv_prepare_t* handle, uv_prepare_cb cb) {
// 如果已经执行过start函数则直接返回
if (uv__is_active(handle)) return 0;
if (cb == NULL) return UV_EINVAL;
QUEUE_INSERT_HEAD(&handle->loop->prepare_handles,
&handle->queue);
handle->prepare_cb = cb;
uv__handle_start(handle);
return 0;
}
uv_prepare_start函数主要的逻辑主要是设置回调,把handle插入loop的prepare_handles队列,prepare_handles队列保存了prepare阶段的任务。在事件循环的prepare阶段会逐个执行里面的节点的回调。然后我们看看Libuv在事件循环的prepare阶段是如何处理的。
void uv__run_prepare(uv_loop_t* loop) {
uv_prepare_t* h;
QUEUE queue;
QUEUE* q;
/*
把该类型对应的队列中所有节点摘下来挂载到queue变量,
相当于清空prepare_handles队列,因为如果直接遍历
prepare_handles队列,在执行回调的时候一直往prepare_handles
队列加节点,会导致下面的while循环无法退出。
先移除的话,新插入的节点在下一轮事件循环才会被处理。
*/
QUEUE_MOVE(&loop->prepare_handles, &queue);
// 遍历队列,执行每个节点里面的函数
while (!QUEUE_EMPTY(&queue)) {
// 取下当前待处理的节点,即队列的头
q = QUEUE_HEAD(&queue);
/*
取得该节点对应的整个结构体的基地址,
即通过结构体成员取得结构体首地址
*/
h = QUEUE_DATA(q, uv_prepare_t, queue);
// 把该节点移出当前队列
QUEUE_REMOVE(q);
// 重新插入原来的队列
QUEUE_INSERT_TAIL(&loop->prepare_handles, q);
// 执行回调函数
h->prepare_cb(h);
}
}
uv__run_prepare函数的逻辑很简单,但是有一个重点的地方就是执行完每一个节点,Libuv会把该节点重新插入队列中,所以prepare(包括idle、check)阶段的节点在每一轮事件循环中都会被执行。而像定时器、pending、closing阶段的节点是一次性的,被执行后就会从队列里删除。 我们回顾一开始的测试代码。因为它设置了Libuv的运行模式是默认模式。而prepare队列又一直有一个handle节点,所以它是不会退出的。它会一直执行回调。那如果我们要退出怎么办呢?或者说不要执行prepare队列的某个节点了。我们只需要stop一下就可以了。
int uv_prepare_stop(uv_prepare_t* handle) {
if (!uv__is_active(handle)) return 0;
// 把handle从prepare队列中移除,但还挂载到handle_queue中
QUEUE_REMOVE(&handle->queue);
// 清除active标记位并且减去loop中handle的active数
uv__handle_stop(handle);
return 0;
}
stop函数和start函数是相反的作用,这就是Node.js中prepare、check、idle阶段的原理。
Poll IO是Libuv非常重要的一个阶段,文件IO、网络IO、信号处理等都在这个阶段处理,这也是最复杂的一个阶段。处理逻辑在core.c的uv__io_poll这个函数,这个函数比较复杂,我们分开分析。在开始分析Poll IO之前,先了解一下它相关的一些数据结构。 1 IO观察者uv__io_t。这个结构体是Poll IO阶段核心结构体。它主要是保存了IO相关的文件描述符、回 调、感兴趣的事件等信息。 2 watcher_queue观察者队列。所有需要Libuv处理的IO观察者都挂载在这个队列里,Libuv在Poll IO阶段会逐个处理。
下面我们开始分析Poll IO阶段。先看第一段逻辑。
// 没有IO观察者,则直接返回
if (loop->nfds == 0) {
assert(QUEUE_EMPTY(&loop->watcher_queue));
return;
}
// 遍历IO观察者队列
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
// 取出当前头节点
q = QUEUE_HEAD(&loop->watcher_queue);
// 脱离队列
QUEUE_REMOVE(q);
// 初始化(重置)节点的前后指针
QUEUE_INIT(q);
// 通过结构体成功获取结构体首地址
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
// 设置当前感兴趣的事件
e.events = w->pevents;
/*
这里使用了fd字段,事件触发后再通过fd从watchs
字段里找到对应的IO观察者,没有使用ptr指向IO观察者的方案
*/
e.data.fd = w->fd;
// 如果w->events初始化的时候为0,则新增,否则修改
if (w->events == 0)
op = EPOLL_CTL_ADD;
else
op = EPOLL_CTL_MOD;
// 修改epoll的数据
epoll_ctl(loop->backend_fd, op, w->fd, &e)
// 记录当前加到epoll时的状态
w->events = w->pevents;
}
第一步首先遍历IO观察者,修改epoll的数据。然后准备进入等待。
psigset = NULL;
if (loop->flags & UV_LOOP_BLOCK_SIGPROF) {
sigemptyset(&sigset);
sigaddset(&sigset, SIGPROF);
psigset = &sigset;
}
/*
http://man7.org/Linux/man-pages/man2/epoll_wait.2.html
pthread_sigmask(SIG_SETMASK, &sigmask, &origmask);
ready = epoll_wait(epfd, &events, maxevents, timeout);
pthread_sigmask(SIG_SETMASK, &origmask, NULL);
即屏蔽SIGPROF信号,避免SIGPROF信号唤醒epoll_wait,但是却没
有就绪的事件
*/
nfds = epoll_pwait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout,
psigset);
// epoll可能阻塞,这里需要更新事件循环的时间
uv__update_time(loop) ```
epoll_wait可能会引起主线程阻塞,所以wait返回后需要更新当前的时间,否则在使用的时候时间差会比较大,因为Libuv会在每轮时间循环开始的时候缓存当前时间这个值。其它地方直接使用,而不是每次都去获取。下面我们接着看epoll返回后的处理(假设有事件触发)。
// 保存epoll_wait返回的一些数据,maybe_resize申请空间的时候+2了
loop->watchers[loop->nwatchers] = (void*) events;
loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;
for (i = 0; i < nfds; i++) {
// 触发的事件和文件描述符
pe = events + i;
fd = pe->data.fd;
// 根据fd获取IO观察者,见上面的图
w = loop->watchers[fd];
// 会其它回调里被删除了,则从epoll中删除
if (w == NULL) {
epoll_ctl(loop->backend_fd, EPOLL_CTL_DEL, fd, pe);
continue;
}
if (pe->events != 0) {
/*
用于信号处理的IO观察者感兴趣的事件触发了,
即有信号发生。
*/
if (w == &loop->signal_io_watcher)
have_signals = 1;
else
// 一般的IO观察者则执行回调
w->cb(loop, w, pe->events);
nevents++;
}
}
// 有信号发生,触发回调
if (have_signals != 0)
loop->signal_io_watcher.cb(loop,
&loop->signal_io_watcher,
POLLIN);
上面的代码处理IO事件并执行IO观察者里的回调,但是有一个特殊的地方就是信号处理的IO观察者需要单独判断,它是一个全局的IO观察者,和一般动态申请和销毁的IO观察者不一样,它是存在于Libuv运行的整个生命周期。这就是Poll IO的整个过程。
close是Libuv每轮事件循环中最后的一个阶段。uv_close用于关闭一个handle,并且执行一个回调。uv_close产生的任务会插入到close阶段的队列,然后在close阶段被处理。我们看一下uv_close函数的实现。
void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
// 正在关闭,但是还没执行回调等后置操作
handle->flags |= UV_HANDLE_CLOSING;
handle->close_cb = close_cb;
switch (handle->type) {
case UV_PREPARE:
uv__prepare_close((uv_prepare_t*)handle);
break;
case UV_CHECK:
uv__check_close((uv_check_t*)handle);
break;
...
default:
assert(0);
}
uv__make_close_pending(handle);
}
uv_close设置回调和状态,然后根据handle类型调对应的close函数,一般就是stop这个handle,解除IO观察者注册的事件,从事件循环的handle队列移除该handle等等,比如prepare的close函数只是把handle从队列中移除。
void uv__prepare_close(uv_prepare_t* handle) {
uv_prepare_stop(handle);
}
int uv_prepare_stop(uv_prepare__t* handle) {
QUEUE_REMOVE(&handle->queue);
uv__handle_stop(handle);
return 0;
}
根据不同的handle做不同的处理后,接着执行uv__make_close_pending往close队列追加节点。
// 头插法插入closing队列,在closing阶段被执行
void uv__make_close_pending(uv_handle_t* handle) {
handle->next_closing = handle->loop->closing_handles;
handle->loop->closing_handles = handle;
}
然后在close阶段逐个处理。我们看一下close阶段的处理逻辑
// 执行closing阶段的的回调
static void uv__run_closing_handles(uv_loop_t* loop) {
uv_handle_t* p;
uv_handle_t* q;
p = loop->closing_handles;
loop->closing_handles = NULL;
while (p) {
q = p->next_closing;
uv__finish_close(p);
p = q;
}
}
// 执行closing阶段的回调
static void uv__finish_close(uv_handle_t* handle) {
handle->flags |= UV_HANDLE_CLOSED;
...
uv__handle_unref(handle);
// 从handle队列里移除
QUEUE_REMOVE(&handle->handle_queue);
if (handle->close_cb) {
handle->close_cb(handle);
}
}
uv__run_closing_handles会逐个执行每个任务节点的回调。
Libuv通过uv__loop_alive函数判断事件循环是否还需要继续执行。我们看看这个函数的定义。
static int uv__loop_alive(const uv_loop_t* loop) {
return uv__has_active_handles(loop) ||
uv__has_active_reqs(loop) ||
loop->closing_handles != NULL;
}
为什么会有一个closing_handle的判断呢?从uv_run的代码来看,执行完close阶段后,会立刻执行uv__loop_alive,正常来说,close阶段的队列是空的,但是如果我们在close回调里又往close队列新增了一个节点,而该节点不会在本轮的close阶段被执行,这样会导致执行完close阶段,但是close队列依然有节点,如果直接退出,则无法执行对应的回调。 我们看到有三种情况,Libuv认为事件循环是存活的。如果我们控制这三种条件就可以控制事件循环的的退出。我们通过一个例子理解一下这个过程。
const timeout = setTimeout(() => {
console.log('never console')
}, 5000);
timeout.unref();
上面的代码中,setTimeout的回调是不会执行的。除非超时时间非常短,短到第一轮事件循环的时候就到期了,否则在第一轮事件循环之后,由于unref的影响,事件循环直接退出了。unref影响的就是handle这个条件。这时候事件循环代码如下。
while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
uv__run_timers(loop);
// ...
// uv__loop_alive返回false,直接跳出while,从而退出事件循环
r = uv__loop_alive(loop);
}