开源项目Workflow中有个重要的基础模块: 代码仅300行的C语言线程池。
本文会伴随源码分析,而逻辑完备、对称无差别的特点于第3部分开始 欢迎跳阅, 或直接到Github主页上围观代码
https://github.com/sogou/workflow/blob/master/src/kernel/thrdpool.c
作为目前Github上炙手可热的异步调度引擎 Workflow有一个大招是: 计算通信融为一体。 而计算的核心:Executor调度器, 就是基于这个线程池实现的。 可以说,一个通用而高效的线程池, 是我们写C/C++代码时离不开的基础模块。
thrdpool代码位置在src/kernel/, 不仅可以直接拿来使用, 同时也适合阅读学习。
而更重要的,秉承Workflow项目本身 一贯的严谨极简的作风, 这个thrdpool代码极致简洁, 实现逻辑上亦非常完备, 结构精巧,处处严谨, 复杂的并发处理依然可以对称无差别, 不得不让我惊叹:妙啊!!!
你可能会很好奇, 线程池还能写出什么别致的新思路吗? 先列出一些,你们细品:
特点1
:创建完线程池后,无需记录任何线程id或对象,线程池可以通过一个等一个的方式优雅地去结束所有线程; 也就是说,每一个线程都是对等的特点2
:线程任务可以由另一个线程任务调起;甚至线程池正在被销毁时也可以提交下一个任务;(这很重要,因为线程本身很可能是不知道线程池的状态的; 即,每一个任务也是对等的特点3
:同理,线程任务也可以销毁这个线程池;(非常完整~ 每一种行为也是对等的,包括destroy我真的迫不及待为大家深层解读一下, 这个我愿称之为“逻辑完备”的线程池。
第一部分我先梳理一些基本内容梳理, 有基础的小伙伴可以直接跳过。 如果有不准确的地方,欢迎大家指正交流~
Question: 为什么需要线程池? 其实思路不仅对线程池, 对任何有限资源的调度管理都是类似的。
我们知道, 通过pthread或者std::thread创建线程, 就可以实现多线程并发执行我们的代码。
但是CPU的核数是固定的, 所以真正并行执行的最大值也是固定的, 过多的线程除了频繁创建产生overhead以外, 还会导致对系统资源进行争抢, 这些都是不必要的浪费。
因此我们可以管理有限个线程, 循环且合理地利用它们。♻️
那么线程池一般包含哪些内容呢?
接下来我们看看Workflow的thrdpool是怎么做的。
以下共7步常用思路, 足以让我们把代码飞快过一遍。
我们打开thrdpool.h
,只需关注这三个:
// 创建线程池
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize);
// 把任务交给线程池的入口
int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool);
// 销毁线程池
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), thrdpool_t *pool);
即,我们如何描述一个交给线程池的任务。
struct thrdpool_task
{
void (*routine)(void *); // 函数指针
void *context; // 上下文
};
struct __thrdpool
{
struct list_head task_queue;// 任务队列
size_t nthreads; // 线程个数
size_t stacksize; // 构造线程时的参数
pthread_t tid; // 运行期间记录的是个zero值
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_key_t key;
pthread_cond_t *terminate;
};
没有一个多余,每一个成员都很到位:
以上各个成员的用途, 好像说了,又好像没说, 是因为几乎每一个成员都值得深挖一下, 所以我们记住它们, 后面看代码的时候就会豁然开朗!
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize)
{
thrdpool_t *pool;
ret = pthread_key_create(&pool->key, NULL);
if (ret == 0)
{
// 去掉了其他代码,但是注意到刚才的tid和terminate的赋值
memset(&pool->tid, 0, sizeof (pthread_t));
pool->terminate = NULL;
if (__thrdpool_create_threads(nthreads, pool) >= 0)
return pool;
...
这里可以看到
__thrdpool_create_threads()
里边
最关键的,就是循环创建nthreads个线程。
while (pool->nthreads < nthreads)
{
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
...
所以我们在上一步知道了,
每个线程执行的是__thrdpool_routine()
不难想象,它会不停从队列拿任务出来执行:
static void *__thrdpool_routine(void *arg)
{
...
while (1)
{
// 1. 从队列里拿一个任务出来,没有就等待
pthread_mutex_lock(&pool->mutex);
while (!pool->terminate && list_empty(&pool->task_queue))
pthread_cond_wait(&pool->cond, &pool->mutex);
// 2. 线程池结束的标志位,记住它,先跳过
if (pool->terminate)
break;
// 3. 如果能走到这里,恭喜你,拿到了任务~
entry = list_entry(*pos, struct __thrdpool_task_entry, list);
list_del(*pos);
// 4. 先解锁
pthread_mutex_unlock(&pool->mutex);
task_routine = entry->task.routine;
task_context = entry->task.context;
free(entry);
// 5. 再执行
task_routine(task_context);
// 6. 这里也先记住它,意思是线程池里的线程可以销毁线程池
if (pool->nthreads == 0)
{
/* Thread pool was destroyed by the task. */
free(pool);
return NULL;
}
}
... // 后面还有魔法,留下一章解读~~~
刚才看到的__thrdpool_routine()
就是线程的核心函数了,
它可以和谁关联起来呢?
可以和接口thrdpool_schedule()
关联上
我们说过,线程池上有个队列管理任务:
我们已经看过消费者了,来看看生产者的代码:
inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
thrdpool_t *pool)
{
struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf;
entry->task = *task;
pthread_mutex_lock(&pool->mutex);
// 添加到队列里
list_add_tail(&entry->list, &pool->task_queue);
// 叫醒在等待的线程
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
}
说到这里,特点2
就非常清晰了:
开篇说的特点2
是说,
”线程任务可以由另一个线程任务调起”。
只要对队列的管理做得好, 显然消费者所执行的函数里也可以生产
对于线程池来说就是比如销毁的情况。
接口thrdpool_destroy()
实现非常简单:
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool)
{
...
// 1. 内部会设置pool->terminate,并叫醒所有等在队列拿任务的线程
__thrdpool_terminate(in_pool, pool);
// 2. 把队列里还没有执行的任务都拿出来,通过pending返回给用户
list_for_each_safe(pos, tmp, &pool->task_queue)
{
entry = list_entry(pos, struct __thrdpool_task_entry, list);
list_del(pos);
if (pending)
pending(&entry->task);
... // 后面就是销毁各种内存,同样有魔法~
在退出的时候,
我们那些已经提交但是还没有被执行的任务
是绝对不能就这么扔掉了的,
于是我们可以传入一个pending()
函数, **上层可以做自己的回收、回调、
或任何保证上层逻辑完备的事情。**
设计的完整性,无处不在。
接下来我们就可以跟着我们的核心问题, 针对性地看看每个特点都是怎么实现的。
这里提出一个问题: 线程池要退出,如何结束所有线程?
一般线程池的实现都是 需要记录下所有的线程id, 或者thread对象, 以便于我们去用join方法等待它们结束。
不严格地用join收拾干净会有什么问题? 最直观的,模块退出时很可能会报内存泄漏
但是我们刚才看, pool里并没有记录所有的tid呀? 正如开篇说的, pool上只有一个tid,而且还是个空的值。
而特点1
正是Workflow thrdpool的答案:
无需记录所有线程,我可以让线程挨个自动退出、且一个等待下一个,最终达到调用完thrdpool_destroy()后内存回收干净的目的。
这里先给一个简单的图, 假设发起destroy的人是main线程, 我们如何做到一个等一个退出:
外部线程,比如main,发起destroy
步骤如下:
是不是非常有意思!!! 非常优雅的做法!!!
所以我们会发现, 其实大家不太需要知道太多信息, 只需要知道我要负责的上一个人。
当然每一步都是非常严谨的, 结合刚才跳过的第一段魔法感受一下:
static void *__thrdpool_routine(void *arg)
{
while (1)
{ // 1.注意这里还持有锁
pthread_mutex_lock(&pool->mutex);
... // 等着队列拿任务出来
// 2. 这既是标识位,也是发起销毁的那个人所等待的condition
if (pool->terminate)
break;
... // 执行拿到的任务
}
/* One thread joins another. Don't need to keep all thread IDs. */
// 3. 把线程池上记录的那个tid拿下来,我来负责上一人
tid = pool->tid;
// 4. 把我自己记录到线程池上,下一个人来负责我
pool->tid = pthread_self();
// 5. 每个人都减1,最后一个人负责叫醒发起detroy的人
if (--pool->nthreads == 0)
pthread_cond_signal(pool->terminate);
// 6. 这里可以解锁进行等待了
pthread_mutex_unlock(&pool->mutex);
// 7. 只有第一个人拿到0值
if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0)
// 8. 只要不0值,我就要负责等上一个结束才能退
pthread_join(tid, NULL);
return NULL; // 9. 退出,干干净净~
}
在第二部分我们看过源码, 只要队列管理得好, 线程任务里提交下一个任务是完全OK的。
这很合理。
那么问题来了,
特点1
又说,我们每个线程,
是不需要知道太多线程池的状态和信息的。
而线程池的销毁是个过程,
如果在这个过程间提交任务会怎么样呢?
因此特点2
的一个重要解读是:
线程池被销毁时也可以提交下一个任务。
而且刚才提过,
还没有被执行的任务,
可以通过我们传入的pending()函数拿回来。
简单看看销毁时的严谨做法:
static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)
{
pthread_cond_t term = PTHREAD_COND_INITIALIZER;
pthread_mutex_lock(&pool->mutex);
// 1. 加锁设置标志位,之后的添加任务不会被执行,但可以pending拿到
pool->terminate = &term;
// 2. 广播所有等待的消费者
pthread_cond_broadcast(&pool->cond);
if (in_pool) // 3. 这里的魔法等下讲>_<~
{
/* Thread pool destroyed in a pool thread is legal. */
pthread_detach(pthread_self());
pool->nthreads--;
}
// 4. 如果还有线程没有退完,我会等,注意这里是while
while (pool->nthreads > 0)
pthread_cond_wait(&term, &pool->mutex);
pthread_mutex_unlock(&pool->mutex);
// 5.同样地等待打算退出的上一个人
if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)
pthread_join(pool->tid, NULL);
}
既然线程任务可以做任何事情, 理论上,线程任务也可以销毁线程池❓
作为一个逻辑完备的线程池, 大胆一点, 我们把问号去掉。
而且,销毁并不会结束当前任务, 它会等这个任务执行完。
想象一下,刚才的__thrdpool_routine(), while(1)里拿出来的那个任务, 做的事情竟然是发起thrdpool_destroy()...
把上面的图大胆改一下:
我们让一个routine来destroy线程池
如果发起销毁的人, 是我们自己内部的线程, 那么我们就不是等n个,而是等n-1, 少了一个外部线程等待我们。 如何实现才能让这些逻辑都完美融合呢? 我们把刚才跳过的三段魔法串起来看看。
如果发起销毁的人是线程池内部的线程, 那么它具有较强的自我管理意识
(因为前面说了,会等它这个任务执行完) 而我们可以放心大胆地pthread_detach, 无需任何人join它等待它结束。
static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)
{
...
// 每个由线程池创建的线程都设置了一个key,由此判断是否是in_pool
if (in_pool)
{
/* Thread pool destroyed in a pool thread is legal. */
pthread_detach(pthread_self());
pool->nthreads--;
}
一定是发起销毁的那个人。 所以这里用in_pool来判断是否是外部的人:
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool)
{
// 已经调用完第一段,且挨个pending(未执行的task)了
// 销毁其他内部分配的内存
...
// 如果不是内部线程发起的销毁,要负责回收线程池内存
if (!in_pool)
free(pool);
}
那现在不是main线程发起的销毁呢? 发起的销毁的那个内部线程, 怎么能保证我可以在最后关头 把所有资源回收干净、调free(pool)、 最后功成身退呢?
在前面阅读源码第5步,其实我们看过, __thrdpool_routine()**里有free的地方。**
于是现在三段魔法终于串起来了。
static void *__thrdpool_routine(void *arg)
{
while (1)
{ // ...
task_routine(task_context); // 如果routine里做的事情,是销毁线程池...
// 注意这个时候,其他内存都已经被destroy里清掉了,万万不可以再用什么mutex、cond
if (pool->nthreads == 0)
{
/* Thread pool was destroyed by the task. */
free(pool);
return NULL;
}
...
非常重要的一点, 由于并发,我们是不知道谁先操作的。 假设我们稍微改一改这个顺序, 就又是另一番逻辑。
比如我作为一个内部线程, 在routine()里调用destroy()期间, 发现还有线程没有执行完, 我就要等在我的terminate上, 待最后看到nthreads==0的那个人叫醒我。
然后,我的代码继续执行, 函数栈就会从destroy()回到routine(), 也就是上面那几行, 再然后就可以free(pool); 由于这时候我已经放飞自我detach了, 于是一切顺利结束。
你看,无论如何都可以完美地销毁线程池:
并发是复杂多变的,代码是简洁统一的
是不是太妙了! 我写到这里已经要感动哭了!
这个线程池只有两个文件:
thrdpool.h
和 thrdpool.c
,
而且只依赖内核的数据结构list.h
。
我们把它拿出来玩,自己写一段代码:
void my_routine(void *context)
{
// 我们要执行的函数
printf("task-%llu start.\n", reinterpret_cast<unsigned long long>(context); );
}
void my_pending(const struct thrdpool_task *task)
{
// 线程池销毁后,没执行的任务会到这里
printf("pending task-%llu.\n", reinterpret_cast<unsigned long long>(task->context););
}
int main()
{
thrdpool_t *thrd_pool = thrdpool_create(3, 1024); // 创建
struct thrdpool_task task;
unsigned long long i;
for (i = 0; i < 5; i++)
{
task.routine = &my_routine;
task.context = reinterpret_cast<void *>(i);
thrdpool_schedule(&task, thrd_pool); // 调用
}
getchar(); // 卡住主线程,按回车继续
thrdpool_destroy(&my_pending, thrd_pool); // 结束
return 0;
}
再打印几行log,直接编译就可以跑起来:
妈妈再也不用担心我的C语言作业
简单程度堪比大一上学期C语言作业。
最后谈谈感受。
看完之后我很后悔为什么没有早点看 为什么不早点就可以获得知识的感觉, 并且在浅层看懂之际, 我知道自己肯定没有完全理解到里边的精髓, 毕竟我不能深刻地理解到 设计者当时对并发的构思和模型上的选择。
我只能说, 没有十多年顶级的系统调用和并发编程的功底 写不出这样的代码, 没有极致的审美与对品控的偏执 也写不出这样的代码。
并发编程有很多说道, 就正如退出这个这么简单的事情, 想要做到退出时回收干净却很难。 如果说你写业务逻辑自己管线程, 退出什么的sleep(1)都无所谓, 但如果说做框架的人 不能把自己的框架做得完美无暇逻辑自洽 就难免让人感觉差点意思。
而这个thrdpool,它作为一个线程池, 是如此的逻辑完备, 用最对称简洁的方式去面对复杂的并发。
再次让我深深地感到震撼: 我们身边那些原始的、底层的、基础的代码, 还有很多新思路, 还可以写得如此美。
本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/kwYyrRLZK7J-v9IRHGJWRw
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。