了解一个包的使用,首先从阅读它的注释开始
简介
sync.WaitGroup相信大家都不陌生,当我们起了多个goroutinue,但是有希望他们同时ready之后再做一些事情的时候,WaitGroup就可以闪亮登场了,举个栗子:
g := sync.WaitGroup{}
// 输入即将要等待goroutinue的数量
g.Add(2)
// func1
go func() {
// do something
// 手动确认完成
g.Done()
}()
// func2
go func() {
// do something
g.Done()
}()
// 此时的主goroutinue就会等待 func1 func2完成后再继续往下进行
g.Wait()
可以说是使用相当方便了,其源码也是非常少的
sync.WaitGroup包注释如下:
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
包中大体的结构:
type WaitGroup struct {}
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {}
func (wg *WaitGroup) Add(delta int) {}
func (wg *WaitGroup) Done() {}
func (wg *WaitGroup) Wait() {}
下面来一个个看其中有什么玄机
WaitGroup struct
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32
}
noCopy
注释中其实简单向我们介绍了用法,这里不多说了。最后的一句话可以关注下,WaitGroup使用后,不可以值拷贝,也就是struct中的noCopy声明实现的效果,这个noCopy的实现和原理大家可以去查阅下资料,go源码中注释和定义如下:
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}
有时间了可以研究下,这里先简单介绍下,它是Lock包中的一个空结构体,当我们不希望某个struct可以值拷贝时候可以在struct定义里添加这个noCopy结构体,需要注意,编译时候noCopy不会导致报错,可以使用go vet检查
state1
这个是waitGroup struct中唯一的一个变量了,长度为12个字节,三个变量各用四个字节,从注释中也可以看到其作用的划分,
- 高64位分为两部分,高32位存add、done两个函数操作计数,低32位用于Wait 的计数,存等待中的goroutinue数量
- 低32位,存储信号量,用于等待和唤醒各个goroutinue,runtime_Semrelease 和 runtime_Semacquire会操作
下面开始一个个函数阅读
state函数
// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
这个函数会返回两个计数器和信号量两个数据,这里的if条件很有意思,这里其实是判断了变量起始地址,除8能除尽,表示的是state1起始地址是内存对齐的,两个计数器数据存在一定关系,所以此时会将前8个字节放在一起,存储两个计数器,这样可以让cpu一次读取,提高效率
add函数 和 done函数
done函数本质就是add函数,主要来看下add函数
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
直接上add函数源码,注释中见
// Add adds delta, which may be negative, to the WaitGroup counter.
// add函数可能加正数也可能加负数,负数即done操作
// If the counter becomes zero, all goroutines blocked on Wait are released.
// 如果计数器为0,那所有等待的goroutinue就会被释放
// If the counter goes negative, Add panics.
// 如果计数器为负,调用add会panic,可能是使用有误或者出现并发问题
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait.
// add只会出现在wait之前,代码中一会儿可以看到判断add和wait未出现并发的校验
// Calls with a negative delta, or calls with a positive delta that start when
// the counter is greater than zero, may happen at any time.
// add 和 done是有可能并发的
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// add 操作要放在起goroutinue之前和有事情阻塞之前
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// 如果要复用一个waitGroup,那么add操作一定要在之前的wait结束之后
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
//取计数器和信号量两个参数
statep, semap := wg.state()
if race.Enabled {
// 竞态检查用,可以暂不关注
}
// AddUint64 会原子的将传参加到statep上,并且返回一个新值,注意新值,这个下面会用到
// 原子操作计数器的高4位,即 add 和 done操作的值
state := atomic.AddUint64(statep, uint64(delta)<<32)
//v是 add 和 done操作的值
v := int32(state >> 32)
//w是 用于wait的值
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// 竞态检查用,可以暂不关注
}
// v小于零,即注释中说的add 和done的使用出现了问题,panic
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// w != 0 说明有goroutinue走到了wait,delta > 0 && v == int32(delta) 说明正在调用add
// 即,wait与add存在并发调用,或者顺序颠倒、使用有误
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// v > 0 即调用了add,或者goroutinue们还没有执行完,w == 0表示还没有程序走到wait
// 直接返回等待后续操作即可
if v > 0 || w == 0 {
return
}
// 此时,v == 0,即所有goroutinue都已经执行了done,并且w!=0,即有goroutinue开始等待
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
// 在v==0的情况下,说明goroutinue都已经执行done完毕了,wait函数就不会有
// 阻塞效果(一会儿wait代码中能看到),此时statep(原值地址)
// 不等于state(原子操作后的新值),只会是又有add参与了执行,致使原值发生改变
// 本次操作为done,此时接下来要做的wait还没有执行完毕,
// 又有新的add,说明wait和add并发执行了
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
*statep = 0
// done均完毕,开始释放信号量
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
wait 函数
注释见
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
if race.Enabled {
// 竞态检查用,可以暂不关注
}
for
// 原子加载statep
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
// v==0表示done执行完毕,不需要阻塞了
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
// 竞态检查用,可以暂不关注
}
return
}
// Increment waiters count.
// 此时会先比较statep和state是否相等,即有没有其他线程在操作,
// 如果依然相等则用state+1复制给statep
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// 竞态检查用,可以暂不关注
}
// 在semap上等待信号量
runtime_Semacquire(semap)
// add函数中释放信号量时,statep已经被清零了,如果此时不为零说明又有add执行了
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
// 竞态检查用,可以暂不关注
}
return
}
}
}
信号量操作
信号量相比大家都不陌生,这里贴下两个函数的注释,感兴趣可以深入研究下,这里暂不讨论了
runtime_Semrelease
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
runtime_Semacquire
// Semacquire waits until *s > 0 and then atomically decrements it.
// It is intended as a simple sleep primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semacquire(s *uint32)
总结
到这里所有的代码都已经读过了,waitGroup的代码量确实并不多,但是有比较实用,建议多看看,琢磨琢磨,有很多细节值得我们去学习,比如原子做加法、时刻警惕校验异常并发等等
并发问题,确实是一个心智上相对更难理解,但是我们却更加需要特别注意的问题,尤其是我这种从PHP一把梭转到go语言的菜鸟,踩了不少坑。还望大佬们多多指点🙏
下面是总结下waitGroup流程的一个简易图解: