了解一个包的使用,首先从阅读它的注释开始

简介

sync.WaitGroup相信大家都不陌生,当我们起了多个goroutinue,但是有希望他们同时ready之后再做一些事情的时候,WaitGroup就可以闪亮登场了,举个栗子:

g := sync.WaitGroup{}
// 输入即将要等待goroutinue的数量
g.Add(2)
// func1
go func() {
	// do something
    // 手动确认完成
	g.Done()
}()

// func2
go func() {
	// do something
	g.Done()
}()
// 此时的主goroutinue就会等待 func1 func2完成后再继续往下进行
g.Wait()

可以说是使用相当方便了,其源码也是非常少的
sync.WaitGroup包注释如下:

// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.

包中大体的结构:

type WaitGroup struct {}
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {}
func (wg *WaitGroup) Add(delta int) {}
func (wg *WaitGroup) Done() {}
func (wg *WaitGroup) Wait() {}

下面来一个个看其中有什么玄机

WaitGroup struct

// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
	noCopy noCopy

	// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
	// 64-bit atomic operations require 64-bit alignment, but 32-bit
	// compilers do not ensure it. So we allocate 12 bytes and then use
	// the aligned 8 bytes in them as state, and the other 4 as storage
	// for the sema.
	state1 [3]uint32
}

noCopy

注释中其实简单向我们介绍了用法,这里不多说了。最后的一句话可以关注下,WaitGroup使用后,不可以值拷贝,也就是struct中的noCopy声明实现的效果,这个noCopy的实现和原理大家可以去查阅下资料,go源码中注释和定义如下:

// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}

有时间了可以研究下,这里先简单介绍下,它是Lock包中的一个空结构体,当我们不希望某个struct可以值拷贝时候可以在struct定义里添加这个noCopy结构体,需要注意,编译时候noCopy不会导致报错,可以使用go vet检查

state1

这个是waitGroup struct中唯一的一个变量了,长度为12个字节,三个变量各用四个字节,从注释中也可以看到其作用的划分,

  • 高64位分为两部分,高32位存add、done两个函数操作计数,低32位用于Wait 的计数,存等待中的goroutinue数量
  • 低32位,存储信号量,用于等待和唤醒各个goroutinue,runtime_Semrelease 和 runtime_Semacquire会操作

下面开始一个个函数阅读

state函数

// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
	} else {
		return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
	}
}

这个函数会返回两个计数器和信号量两个数据,这里的if条件很有意思,这里其实是判断了变量起始地址,除8能除尽,表示的是state1起始地址是内存对齐的,两个计数器数据存在一定关系,所以此时会将前8个字节放在一起,存储两个计数器,这样可以让cpu一次读取,提高效率

add函数 和 done函数

done函数本质就是add函数,主要来看下add函数

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

直接上add函数源码,注释中见

// Add adds delta, which may be negative, to the WaitGroup counter.
// add函数可能加正数也可能加负数,负数即done操作
// If the counter becomes zero, all goroutines blocked on Wait are released.
// 如果计数器为0,那所有等待的goroutinue就会被释放
// If the counter goes negative, Add panics.
// 如果计数器为负,调用add会panic,可能是使用有误或者出现并发问题
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. 
// add只会出现在wait之前,代码中一会儿可以看到判断add和wait未出现并发的校验
// Calls with a negative delta, or calls with a positive delta that start when
// the counter is greater than zero, may happen at any time.
// add 和 done是有可能并发的
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// add 操作要放在起goroutinue之前和有事情阻塞之前
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// 如果要复用一个waitGroup,那么add操作一定要在之前的wait结束之后
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
    //取计数器和信号量两个参数
	statep, semap := wg.state()
	if race.Enabled {
		// 竞态检查用,可以暂不关注
	}
    // AddUint64 会原子的将传参加到statep上,并且返回一个新值,注意新值,这个下面会用到
    // 原子操作计数器的高4位,即 add 和 done操作的值
	state := atomic.AddUint64(statep, uint64(delta)<<32)
    //v是 add 和 done操作的值
	v := int32(state >> 32)
    //w是 用于wait的值
	w := uint32(state)
	if race.Enabled && delta > 0 && v == int32(delta) {
		// 竞态检查用,可以暂不关注
	}
    // v小于零,即注释中说的add 和done的使用出现了问题,panic
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
    // w != 0 说明有goroutinue走到了wait,delta > 0 && v == int32(delta) 说明正在调用add
    // 即,wait与add存在并发调用,或者顺序颠倒、使用有误
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
    // v > 0 即调用了add,或者goroutinue们还没有执行完,w == 0表示还没有程序走到wait
    // 直接返回等待后续操作即可
	if v > 0 || w == 0 {
		return
	}
    // 此时,v == 0,即所有goroutinue都已经执行了done,并且w!=0,即有goroutinue开始等待
	// This goroutine has set counter to 0 when waiters > 0.
	// Now there can't be concurrent mutations of state:
	// - Adds must not happen concurrently with Wait,
	// - Wait does not increment waiters if it sees counter == 0.
	// Still do a cheap sanity check to detect WaitGroup misuse.
    // 在v==0的情况下,说明goroutinue都已经执行done完毕了,wait函数就不会有
    // 阻塞效果(一会儿wait代码中能看到),此时statep(原值地址)
    // 不等于state(原子操作后的新值),只会是又有add参与了执行,致使原值发生改变
    // 本次操作为done,此时接下来要做的wait还没有执行完毕,
    // 又有新的add,说明wait和add并发执行了
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// Reset waiters count to 0.
	*statep = 0
    // done均完毕,开始释放信号量
	for ; w != 0; w-- {
		runtime_Semrelease(semap, false, 0)
	}
}

wait 函数

注释见

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
	statep, semap := wg.state()
	if race.Enabled {
		// 竞态检查用,可以暂不关注
	}
	for 
        // 原子加载statep
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32)
		w := uint32(state)
        // v==0表示done执行完毕,不需要阻塞了
		if v == 0 {
			// Counter is 0, no need to wait.
			if race.Enabled {
				// 竞态检查用,可以暂不关注
			}
			return
		}
		// Increment waiters count.
        // 此时会先比较statep和state是否相等,即有没有其他线程在操作,
        // 如果依然相等则用state+1复制给statep
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			if race.Enabled && w == 0 {
				// 竞态检查用,可以暂不关注
			}
            // 在semap上等待信号量
			runtime_Semacquire(semap)
            // add函数中释放信号量时,statep已经被清零了,如果此时不为零说明又有add执行了
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			if race.Enabled {
				// 竞态检查用,可以暂不关注
			}
			return
		}
	}
}

信号量操作

信号量相比大家都不陌生,这里贴下两个函数的注释,感兴趣可以深入研究下,这里暂不讨论了
runtime_Semrelease

// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

runtime_Semacquire

// Semacquire waits until *s > 0 and then atomically decrements it.
// It is intended as a simple sleep primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semacquire(s *uint32)

总结

到这里所有的代码都已经读过了,waitGroup的代码量确实并不多,但是有比较实用,建议多看看,琢磨琢磨,有很多细节值得我们去学习,比如原子做加法、时刻警惕校验异常并发等等
并发问题,确实是一个心智上相对更难理解,但是我们却更加需要特别注意的问题,尤其是我这种从PHP一把梭转到go语言的菜鸟,踩了不少坑。还望大佬们多多指点🙏

下面是总结下waitGroup流程的一个简易图解:
waitGroup

引用文章