Golang 基础之并发基本同步原语
sync.Cond
条件变量
Cond 类型原型
1
2
3
4
5
6
7
8
9
10
type Cond struct {
// L 是在观察或改变状态时保持的
L Locker
// 包含过滤或未导出的字段
}
func NewCond ( l Locker ) * Cond
func ( c * Cond ) Broadcast ()
func ( c * Cond ) Signal ()
func ( c * Cond ) Wait ()
Cond 实现了一个条件变量,用于等待或宣布事件发生时 goroutine 的交汇点。 在这个定义中,“事件”是指两个或更多的 goroutine 之间的任何信号,仅指事件发生了,不包含其他任何信息。 通常,你可能想要在收到某个 goroutine 信号前令其处于等待状态。
sync.Cond
是 Go 语言中用于协调多个 goroutine 的条件变量,常用于在共享资源状态变化时通知等待的 goroutine。以下是实际使用案例及详细说明:
案例:生产者-消费者模型
生产者向队列中添加数据,消费者从队列中取出数据。当队列为空时,消费者等待;当队列有数据时,生产者通知消费者。
代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package main
import (
"fmt"
"sync"
"time"
)
type Queue struct {
items [] int
cond * sync . Cond
}
func NewQueue () * Queue {
q := & Queue {
items : [] int {},
cond : sync . NewCond ( & sync . Mutex {}),
}
return q
}
// 生产者:向队列添加数据并通知消费者
func ( q * Queue ) Add ( item int ) {
q . cond . L . Lock ()
defer q . cond . L . Unlock ()
q . items = append ( q . items , item )
fmt . Printf ( "Produced: %d\n" , item )
q . cond . Signal () // 通知一个等待的消费者
}
// 消费者:从队列取出数据(队列空时阻塞等待)
func ( q * Queue ) Get () int {
q . cond . L . Lock ()
defer q . cond . L . Unlock ()
for len ( q . items ) == 0 {
fmt . Println ( "Consumer waiting..." )
q . cond . Wait () // 释放锁并阻塞,唤醒后重新加锁
}
item := q . items [ 0 ]
q . items = q . items [ 1 :]
fmt . Printf ( "Consumed: %d\n" , item )
return item
}
func main () {
q := NewQueue ()
// 启动消费者
go func () {
for {
q . Get ()
time . Sleep ( 500 * time . Millisecond ) // 模拟处理耗时
}
}()
// 生产者生产数据
for i := 1 ; i <= 5 ; i ++ {
q . Add ( i )
time . Sleep ( 200 * time . Millisecond ) // 模拟生产间隔
}
time . Sleep ( 3 * time . Second ) // 等待程序运行
}
关键点解释
条件变量初始化
通过 sync.NewCond(&sync.Mutex{})
创建条件变量,底层需要一个互斥锁。
生产者逻辑
加锁修改共享资源(队列)。
添加数据后调用 Signal()
通知一个 等待的消费者(或 Broadcast()
通知所有消费者)。
消费者逻辑
加锁后检查条件(队列是否为空)。
如果条件不满足,调用 Wait()
:
释放锁 ,允许其他 goroutine 修改共享资源。
阻塞直到被 Signal()
或 Broadcast()
唤醒。
唤醒后重新加锁 ,继续检查条件(需用 for
循环防止虚假唤醒)。
使用场景
资源池管理
当 goroutine 需要等待资源(如数据库连接)可用时,条件变量可以高效协调资源的分配。
事件触发
多个 goroutine 等待某个事件(如系统初始化完成),事件发生后通过 Broadcast()
通知所有等待者。
任务调度
工作协程在任务队列为空时等待,主协程添加任务后唤醒工作协程。
注意事项
避免虚假唤醒 :始终在 for
循环中检查条件,而不是 if
语句。
锁的粒度 :确保在调用 Wait()
、Signal()
或 Broadcast()
时持有锁。
性能优化 :Signal()
唤醒一个等待者,Broadcast()
唤醒所有等待者,根据场景选择。
通过合理使用 sync.Cond
,可以高效解决复杂的 goroutine 同步问题。
sync.Cond
在 Go 中是一个强大的工具,用于在多个 goroutine 之间高效协调共享资源的状态变化。除了经典的生产者-消费者模型 ,它还可以解决许多复杂的同步问题。以下是更多实际应用场景及代码示例:
1. 资源池管理(连接池、对象池)
当资源(如数据库连接)数量有限时,sync.Cond
可以协调资源的分配。如果资源暂时不可用,请求的 goroutine 可以阻塞等待,直到资源被释放。
代码示例:数据库连接池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
type ConnectionPool struct {
connections [] * DBConn
cond * sync . Cond
maxSize int
}
func NewConnectionPool ( maxSize int ) * ConnectionPool {
return & ConnectionPool {
cond : sync . NewCond ( & sync . Mutex {}),
maxSize : maxSize ,
}
}
// 获取连接(如果池为空则阻塞等待)
func ( p * ConnectionPool ) Acquire () * DBConn {
p . cond . L . Lock ()
defer p . cond . L . Unlock ()
// 等待直到池中有可用连接
for len ( p . connections ) == 0 {
fmt . Println ( "Waiting for connection..." )
p . cond . Wait ()
}
// 取出一个连接
conn := p . connections [ 0 ]
p . connections = p . connections [ 1 :]
return conn
}
// 释放连接(归还到池中)
func ( p * ConnectionPool ) Release ( conn * DBConn ) {
p . cond . L . Lock ()
defer p . cond . L . Unlock ()
// 如果池未满,归还连接并通知等待者
if len ( p . connections ) < p . maxSize {
p . connections = append ( p . connections , conn )
p . cond . Signal () // 通知一个等待的 goroutine
}
}
场景说明 :
当连接池为空时,Acquire()
调用 Wait()
阻塞,直到其他协程释放连接并调用 Signal()
。
释放连接时,如果池未满,归还后通知等待的协程。
2. 任务调度(工作池模式)
多个工作协程从任务队列中获取任务执行,当队列为空时,协程阻塞等待新任务的到来。
代码示例:动态任务分发
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
type TaskQueue struct {
tasks [] func ()
cond * sync . Cond
}
func NewTaskQueue () * TaskQueue {
return & TaskQueue {
cond : sync . NewCond ( & sync . Mutex {}),
}
}
// 工作协程:不断处理任务
func ( q * TaskQueue ) Worker ( id int ) {
for {
q . cond . L . Lock ()
for len ( q . tasks ) == 0 {
q . cond . Wait () // 等待新任务
}
task := q . tasks [ 0 ]
q . tasks = q . tasks [ 1 :]
q . cond . L . Unlock ()
fmt . Printf ( "Worker %d processing task\n" , id )
task () // 执行任务
}
}
// 添加任务并唤醒一个工作协程
func ( q * TaskQueue ) AddTask ( task func ()) {
q . cond . L . Lock ()
defer q . cond . L . Unlock ()
q . tasks = append ( q . tasks , task )
q . cond . Signal () // 通知一个等待的 Worker
}
func main () {
q := NewTaskQueue ()
// 启动 3 个工作协程
for i := 1 ; i <= 3 ; i ++ {
go q . Worker ( i )
}
// 添加 5 个任务
for i := 1 ; i <= 5 ; i ++ {
taskID := i
q . AddTask ( func () {
fmt . Printf ( "Task %d done\n" , taskID )
})
}
time . Sleep ( 1 * time . Second )
}
场景说明 :
工作协程在任务队列为空时阻塞,避免空转消耗 CPU。
添加任务时唤醒一个空闲协程,实现高效的任务调度。
3. 事件触发(等待初始化完成)
多个协程需要等待某个初始化操作完成后才能继续执行(如配置加载、服务启动)。
代码示例:等待系统初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
type System struct {
isReady bool
cond * sync . Cond
}
func NewSystem () * System {
return & System {
cond : sync . NewCond ( & sync . Mutex {}),
}
}
// 初始化完成时调用,通知所有等待者
func ( s * System ) MarkReady () {
s . cond . L . Lock ()
defer s . cond . L . Unlock ()
s . isReady = true
s . cond . Broadcast () // 唤醒所有等待的协程
}
// 其他协程调用此方法等待初始化完成
func ( s * System ) WaitUntilReady () {
s . cond . L . Lock ()
defer s . cond . L . Unlock ()
for ! s . isReady {
s . cond . Wait () // 阻塞直到 isReady 为 true
}
fmt . Println ( "System is ready!" )
}
func main () {
sys := NewSystem ()
// 启动 3 个协程等待系统就绪
for i := 0 ; i < 3 ; i ++ {
go func ( id int ) {
sys . WaitUntilReady ()
fmt . Printf ( "Goroutine %d starts working\n" , id )
}( i )
}
// 模拟初始化耗时
time . Sleep ( 2 * time . Second )
sys . MarkReady ()
time . Sleep ( 1 * time . Second )
}
场景说明 :
多个协程调用 WaitUntilReady()
阻塞,直到主协程调用 MarkReady()
。
使用 Broadcast()
一次性唤醒所有等待的协程。
4. 限流控制(并发请求限制)
限制同时处理的请求数量,当达到阈值时,新请求阻塞等待,直到有资源释放。
代码示例:并发请求限流器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
type RateLimiter struct {
current int
max int
cond * sync . Cond
}
func NewRateLimiter ( maxConcurrent int ) * RateLimiter {
return & RateLimiter {
cond : sync . NewCond ( & sync . Mutex {}),
max : maxConcurrent ,
}
}
// 请求进入时调用,如果超过限制则阻塞
func ( r * RateLimiter ) Acquire () {
r . cond . L . Lock ()
defer r . cond . L . Unlock ()
for r . current >= r . max {
r . cond . Wait () // 等待直到有资源释放
}
r . current ++
}
// 请求完成时释放资源
func ( r * RateLimiter ) Release () {
r . cond . L . Lock ()
defer r . cond . L . Unlock ()
r . current --
r . cond . Signal () // 通知一个等待的请求
}
func main () {
limiter := NewRateLimiter ( 2 ) // 允许同时处理 2 个请求
for i := 1 ; i <= 5 ; i ++ {
go func ( id int ) {
limiter . Acquire ()
defer limiter . Release ()
fmt . Printf ( "Request %d starts\n" , id )
time . Sleep ( 1 * time . Second )
fmt . Printf ( "Request %d done\n" , id )
}( i )
}
time . Sleep ( 5 * time . Second )
}
场景说明 :
当并发请求数超过 max
时,新请求阻塞在 Acquire()
。
请求完成后调用 Release()
,唤醒一个等待的请求。
5. 多阶段任务协调
多个协程需要按特定顺序执行任务(如阶段 A 完成后,阶段 B 才能开始)。
代码示例:阶段任务协调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
type Coordinator struct {
phase int
cond * sync . Cond
}
func NewCoordinator () * Coordinator {
return & Coordinator {
cond : sync . NewCond ( & sync . Mutex {}),
}
}
// 等待直到进入目标阶段
func ( c * Coordinator ) WaitForPhase ( targetPhase int ) {
c . cond . L . Lock ()
defer c . cond . L . Unlock ()
for c . phase < targetPhase {
c . cond . Wait ()
}
}
// 推进到下一阶段并通知所有等待者
func ( c * Coordinator ) NextPhase () {
c . cond . L . Lock ()
defer c . cond . L . Unlock ()
c . phase ++
c . cond . Broadcast () // 唤醒所有等待此阶段的协程
}
func main () {
coord := NewCoordinator ()
// 阶段 1 的协程
go func () {
coord . WaitForPhase ( 1 )
fmt . Println ( "Phase 1 tasks running" )
}()
// 阶段 2 的协程
go func () {
coord . WaitForPhase ( 2 )
fmt . Println ( "Phase 2 tasks running" )
}()
time . Sleep ( 1 * time . Second )
coord . NextPhase () // 推进到阶段 1
time . Sleep ( 1 * time . Second )
coord . NextPhase () // 推进到阶段 2
time . Sleep ( 1 * time . Second )
}
总结
sync.Cond
的核心应用场景是 需要等待某个条件成立时的高效阻塞与唤醒 ,常见于以下模式:
资源争用 :连接池、对象池、限流器。
任务协调 :生产者-消费者、工作池、多阶段任务。
事件通知 :系统初始化、状态变更。
关键注意事项 :
始终在 for
循环中检查条件(避免虚假唤醒)。
确保在调用 Wait()
、Signal()
、Broadcast()
时持有锁。
根据场景选择 Signal()
(唤醒一个)或 Broadcast()
(唤醒所有)。
合理使用 sync.Cond
可以避免轮询(如 time.Sleep
)带来的性能损耗,使并发控制更加高效和优雅。
检查队列是否为空的条件要用 for 循环来包裹 q.cond.Wait()?
在 Go 的 sync.Cond
中,使用 for
循环包裹 q.cond.Wait()
是为了避免虚假唤醒(Spurious Wakeup)和 条件状态可能在唤醒后再次失效 的问题。这是条件变量(Condition Variable)的经典设计模式,原因如下:
1. 虚假唤醒(Spurious Wakeup)
操作系统底层的条件变量实现(如 POSIX 的 pthread_cond_wait
)允许没有明确调用 Signal
/Broadcast
时,Wait
也可能被唤醒 。这种现象称为“虚假唤醒”。
即使没有其他协程通知,Wait()
也可能意外返回。如果此时用 if
检查条件,会导致协程在条件未满足时继续执行,引发逻辑错误。
例如:
消费者协程因虚假唤醒从 Wait()
返回,但此时队列仍然是空的。
若用 if
检查,消费者会错误地尝试从空队列取数据,导致 panic。
2. 共享状态的竞争
即使没有虚假唤醒,条件可能在唤醒后再次失效 。例如:
队列中有 1 个元素,消费者 A 和 B 同时在 Wait()
。
生产者调用 Signal()
唤醒消费者 A。
消费者 A 被唤醒,但在它重新获取锁之前,另一个消费者 C 突然抢到锁并取走了元素。
此时队列再次为空,消费者 A 必须重新检查条件(len(q.items) == 0
),否则会尝试从空队列取数据。
如果使用 if
,消费者 A 会直接执行后续逻辑,导致错误;而 for
会强制它重新检查条件。
3. 设计规范
Go 的 sync.Cond
文档明确要求:调用 Wait
时,必须在循环中检查条件 。这是条件变量的标准使用模式。
Because the first thing a Wait
does is unlock the Locker
, it is typical for the caller to have no knowledge of whether the condition is true at the time of the Wait
. The condition must be rechecked after the Wait
returns.
—— Go 官方文档
代码对比
错误写法(使用 if
)
1
2
3
4
5
6
7
q . cond . L . Lock ()
if len ( q . items ) == 0 { // ❌ 使用 if
q . cond . Wait ()
}
// 执行时可能条件已不满足!
item := q . items [ 0 ]
...
正确写法(使用 for
)
1
2
3
4
5
6
7
q . cond . L . Lock ()
for len ( q . items ) == 0 { // ✅ 使用 for
q . cond . Wait ()
}
// 确保条件满足后才执行
item := q . items [ 0 ]
...
总结
for
循环 确保每次从 Wait()
返回后重新检查条件 ,防止虚假唤醒或竞争导致的逻辑错误。
这是条件变量的通用最佳实践,不仅适用于 Go,也适用于其他语言(如 C++、Java)。
在你的生产者-消费者模型中,for
循环保证了消费者协程只有在队列确实有数据时才会继续执行 ,从而避免对空队列操作引发 panic。