信号量是并发编程中常见的同步机制,在标准库的并发原语中使用频繁,比如 Mutex、WaitGroup 等,这些并发原语的实现都有信号量的影子,所以我们很有必要学好弄清楚信号量的实现原理,做到“知其然,更要知其所以然”,我们才能有更多的“武器”去应对实际面临的业务场景问题。
今天我们就来搞定信号量,通过这篇文章你能掌握:
维基百科上是这样解释信号量的:
信号量的概念是计算机科学家 Dijkstra (Dijkstra 算法的发明者)提出来的,广泛应用在不同的操作系统中。系统中,会给每一个进程一个信号量,代表每个进程当前的状态,未得到控制权的进程,会在特定的地方被迫停下来,等待可以继续进行的信号到来。
下文用 G 代表 goroutine。
通俗点解释就是,信号量通常使用一个整型变量 S 表示一组资源,当 G 完成对此信号量的等待(wait)时,S 就减 1,当 G 完成对此信号量的释放(release)时,S 就加 1。当计数值为 0 的时候,G 调用 wait 等待该信号量会阻塞,除非 S 又大于 0,等待的 G 才会解除阻塞,成功返回。
举个例子,假如图书馆有 10 本《Go 语言编程之旅》,有 1 万个人都想读这本书,“僧多粥少”。所以,图书馆管理员先会让这 1 万个人进行登记,按照登记的顺序,借阅此书。如果书全部被借走,那么,其他想看此书的人就需要等待,如果有人还书了,图书馆管理员就会通知下一位同学来借阅这本书。这里的资源是《Go 语言编程之旅》这十本书,想读此书的同学就是 goroutine,图书管理员就是信号量。
从上面的解释中我们可以得知什么是信号量,其实信号量就是一种变量或者抽象数据类型,用于控制并发系统中多个进程对公共资源的访问,访问具有原子性。信号量主要分为两类:
信号量定义有两个操作 P 和 V,P 操作是减少信号量的计数值,而 V 操作是增加信号量的计数值。
通常初始化时,将信号量 S 指定数值为 n,就像是一个有 n 个资源的池子。P 操作相当于请求资源,如果资源可用,就立即返回;如果没有资源或者不够,那么,G 会阻塞等待。V 操作会释放持有的资源,把资源返还给信号量。
信号量的值除了初始化的操作以外,只能由 P/V 操作改变。
我们一般用信号量保护一组资源,比如数据库连接池、几个打印机资源等等。如果信号量蜕变成二值信号量,那么,它的 P/V 就和互斥锁的 Lock/Unlock 一样了。
在看 Go 源码时,我们经常能够看到下面这几个关于信号量的函数:
func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
这几个函数就是信号量的 PV 操作,遗憾的是,它是 Go 运行时内部使用的,并没有封装暴露成一个对外的信号量并发原语,我们没有办法使用。不过没关系,Go 在它的扩展包中提供了信号量 semaphore,不过这个信号量的类型名并不叫 Semaphore,而是叫 Weighted。
这是一个带权重的信号量,接下来我们就重点分析一下这个库。
Weighted 的实现思路:使用互斥锁 + List 实现的。互斥锁实现其它字段的保护,而 List 实现了一个等待队列,等待者的通知是通过 Channel 的通知机制实现的。
Weighted 主要包括两个结构体和几个常用方法。
type Weighted struct {
size int64 // 最大资源个数,初始化的时候指定
cur int64 // 计数器,当前已使用资源数
mu sync.Mutex // 互斥锁,对字段保护
waiters list.List // 等待者列表,当前处于阻塞等待的请求者 goroutine
}
每个字段的含义见代码注释,其中 waiters 存储的数据是 waiter 对象,waiter 数据结构如下:
type waiter struct {
n int64 // 调用者申请的资源数
ready chan<- struct{} // 当调用者可以获取到信号量资源时, close chan,调用者便会收到通知,成功返回
}
字段含义见注释。
这里提下初始化资源数方法 NewWeighted,很简单:
// 创建资源数为 n 的信号量
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
1.阻塞获取资源的方法 -- Acquire(),源码如下:
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
// 有可用资源,直接返回
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
// 程序执行到这里说明无足够资源使用
if n > s.size {
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
// 资源不足,构造 waiter,将其加入到等待队列
// ready channel 用于通知阻塞的调用者有资源可用,由释放资源的 goroutine 负责 close,起到消息通知的作用
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w) // 加入到等待队列
s.mu.Unlock()
// 调用者陷入 select 阻塞,除非收到外部 ctx 的取消信号或者被通知有资源可用
select {
case <-ctx.Done(): // 收到外面的控制信号
err := ctx.Err()
s.mu.Lock()
select {
case <-ready: // 再次确认是否可能是被唤醒的,如果被唤醒了则忽略控制信号,返回 nil 表示成功
err = nil
default: // 收到控制信息且还没有获取到资源,就直接将原来添加的 waiter 删除掉
isFront := s.waiters.Front() == elem // 当前 waiter 是否是链表头元素
s.waiters.Remove(elem) // 删除 waiter
if isFront && s.size > s.cur { // 如果是链表头元素且有资源可用则尝试唤醒链表第一个等待的 waiter
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready: // 消息通知,请求资源的 goroutine 被释放资源的 goroutine 唤醒了
return nil
}
}
详细说明可以看注释,Acquire() 相当于 P 操作,可以一次获取多个资源,如果没有足够多的资源,调用者就会被阻塞。可以通过第一个参数 Context 增加超时或者 cancel 的机制。如果正常获取了资源,就返回 nil;否则,就返回 ctx.Err(),信号量不改变。
2.非阻塞获取资源的方法 -- TryAcquire,源码如下:
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
这个方法比较简单,非阻塞地获取指定数量的资源,如果当前没有空闲资源,就直接返回 false。
3.通知等待者 notifyWaiters,源码如下:
func (s *Weighted) notifyWaiters() {
for {
next := s.waiters.Front() // 获取队头元素
if next == nil { // 队列里没有元素
break
}
w := next.Value.(waiter)
if s.size-s.cur < w.n { // 资源不满足请求者的要求
break
}
s.cur += w.n // 增加已用资源
s.waiters.Remove(next)
close(w.ready) // 关闭 ready channel,用于通知调用者 goroutine 已经获取到资源,继续运行
}
}
通过 for 循环从链表头部开始依次遍历链表中的所有 waiter,并更新计数器 weighted.cur,同时将其从链表中移除,直到遇到空闲资源小于 waiter.n 为止。
仔细分析,我们会发现,notifyWaiters 方法是按照 FIFO 方式唤醒调用者。这样做的目的是为了避免调用者出现“饿死”的情况,当释放 10 个资源的时候,如果第一个等待者需要 11 个资源,那么,队列中的所有等待者都会继续等待,即使有的等待者只需要 1 个资源,否则的话,资源可能总是被那些请求资源数小的调用者获取,这样一来,请求资源数巨大的调用者,就没有机会获得资源了。
4.释放占用的资源 -- Release(),源码如下:
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n // 释放占用资源数
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters() // 唤醒等待请求资源的 goroutine
s.mu.Unlock()
}
Release() 相当于 V 操作,可以将 n 个资源释放,返还给信号量。
现在我们知道了信号量的实现原理,针对实际业务场景中又该如何使用呢?我们举个 worker pool 的例子,也是官网提供的:考拉兹猜想。
“考拉兹猜想”说的是:对于每一个正整数,如果它是奇数,则对它乘 3 再加 1,如果它是偶数,则对它除以 2,如此循环,最终都能够得到 1。
我们的例子需要实现的是,对于给出的正整数,计算循环多少次之后能得到 1,代码如下:
func main() {
var (
maxWorkers = runtime.GOMAXPROCS(0) // worker 数量
sem = semaphore.NewWeighted(int64(maxWorkers)) // 信号量
out = make([]int, 32) // 任务数
)
ctx := context.TODO()
for i := range out {
if err := sem.Acquire(ctx, 1); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
break
}
go func(i int) {
defer sem.Release(1)
out[i] = collatzSteps(i + 1)
}(i)
}
// 等待所有的任务执行完成,也可以通过 WaitGroup 实现
if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
}
fmt.Println(out)
}
func collatzSteps(n int) (steps int) {
if n <= 0 {
panic("nonpositive input")
}
for ; n > 1; steps++ {
if steps < 0 {
panic("too many steps")
}
if n%2 == 0 {
n /= 2
continue
}
const maxInt = int(^uint(0) >> 1)
if n > (maxInt-1)/3 {
panic("overflow")
}
n = 3*n + 1
}
return steps
}
上面的代码创建数量与 CPU 核数相同的 worker,假设是 4, 相当于池子里只有 4 个资源可用,每个 worker 处理完一个整数,才能处理下一个,相当于控制住了并发数量。
输出:
[0 1 7 2 5 8 16 3 19 6 14 9 9 17 17 4 12 20 20 7 7 15 15 10 23 10 111 18 18 18 106 5]
阅读完源码之后,会发现使用 semaphore 过程中一不小心就会导致错误,比如:如果请求的资源数比最大的资源数还大,那么,调用者可能永远被阻塞;调用 Release() 方法时,可以传递任意的整数。但如果传递一个比请求到的数量大的错误的数值,程序就会 panic;如果传递一个负数,会导致资源永久被持有,等等。
使用时有哪些常犯的错误:
使用一项技术,保证不出错的前提是正确地使用它,对于信号量来说也是一样,所以使用信号量是应该格外小心,确保正确地传递参数,请求多少资源,就释放多少资源。
本篇文章详细介绍了什么是信号量,什么是 PV 操作,官方扩展包 semaphore 实现原理,剖析了实际场景中的例子,以及使用信号量时的注意事项,相信你已经掌握了信号量。
除了官方扩展包 semaphore 的实现方式外,还有别的办法可以实现信号量,你还知道哪些方式可以实现吗?
可以在评论区留言,期待与大家一起探讨!
本文由哈喽比特于2年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/5zedcA-3lYw5qqie7qUR9A
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。