0%

golang的sync-mutex

sync.mutex 作用:
sync.Mutex是Go标准库中常用的一个排外锁。当一个 goroutine 获得了这个锁的拥有权后, 其它请求锁的 goroutine 就会阻塞在
Lock 方法的调用上,直到锁被释放。

初代mutext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type Mutex struct {
key int32;
sema int32;
}

func xadd(val *int32, delta int32) (new int32) {
for {
v := *val;
if cas(val, v, v+delta) {
return v+delta;
}
}
panic("unreached")
}

func (m *Mutex) Lock() {
if xadd(&m.key, 1) == 1 {
// changed from 0 to 1; we hold lock
return;
}
sys.semacquire(&m.sema);
}

func (m *Mutex) Unlock() {
if xadd(&m.key, -1) == 0 {
// changed from 1 to 0; no contention
return;
}
sys.semrelease(&m.sema);
}

通过cas对 key 进行加一, 如果key的值是从0加到1, 则直接获得了锁。否则通过semacquire进行sleep, 被唤醒的时候就获得了锁。
2016年 Go 1.9中增加了饥饿模式,让锁变得更公平,不公平的等待时间限制在1毫秒,并且修复了一个大bug,唤醒的goroutine总是放在等待队列的尾部会导致更加不公平的等待时间。
现有的锁实现是很复杂的。粗略的瞄一眼,很难理解其中的逻辑, 尤其实现中字段的共用,标识的位操作,sync函数的调用、正常模式和饥饿模式的改变等

sync.mutex

在分析源代码之前, 我们要从多线程(goroutine)的并发场景去理解为什么实现中有很多的分支。

  1. 当一个goroutine获取这个锁的时候, 有可能这个锁根本没有竞争者, 那么这个goroutine轻轻松松获取了这个锁。
  2. 而如果这个锁已经被别的goroutine拥有, 就需要考虑怎么处理当前的期望获取锁的goroutine。
  3. 同时, 当并发goroutine很多的时候,有可能会有多个竞争者, 而且还会有通过信号量唤醒的等待者。
    mutex的结构:
    1
    2
    3
    4
    type Mutex struct {
    state int32
    sema uint32
    }
    state 是一个共用的字段,第 0 个bit 表示是被 goroutine 所拥有加锁。第一个 bit 表示mux 是否被唤醒,
    也就是有某个唤醒的goroutine要尝试获取锁。第二个 bit 标记这个mutex状态, 值为1表明此锁已处于饥饿状态。

同时 goroutine也有自身的状态:有可能它是新来的goroutine,也有可能是被唤醒的goroutine,
可能是处于正常状态的goroutine, 也有可能是处于饥饿状态的goroutine。
也就是说在变更mutex状态的时候,也和当前goroutine的状态有关。

mutex 的状态

mutext 饥饿状态主要是为了保持锁的公平性,防止早运行的goroutine一直获取不到 锁,从而得不到执行。
互斥锁有两种状态:正常状态和饥饿状态:

  1. 正常状态:

        在正常状态下,所有等待的goroutine按照FIFO的顺序去正常等待获取锁。

        
    唤醒的goroutine不会直接拥有锁,而是去和正在运行的goroutime去竞争锁。
    goroutine竞争锁的拥有,新请求锁的goroutine具有优势,因为它正在CPU上执行, 而且可能有好几个.
    所以刚刚唤醒的goroutine有很大可能在锁竞争中失败。在这种情况下,这个被唤醒的goroutine会加入到等待队列的前面。

         如果一个等待的goroutine超过1ms没有获取锁,那么它将会把锁转变为饥饿模式。【当前goroutine 要设置改变锁的状态时候:要么设置为加锁,要么设置为饥饿模式。饥饿模式权限是为了让等待执行较长时间的goroutine获取到执行权限】。
    锁可能同时具有加锁和饥饿的标志,因为G1第一次获取到的时候会加锁,G2长时间等待后会标记为饥饿。G1解锁的时候,会解除加锁状态,会判断当前锁是不是具有饥饿锁,如果具有饥饿会通知G2执行,G2执行的时候会解除饥饿,同时设置锁为锁状态。
    也就是说,只有一个goroutine会获取到饥饿状态。
  2. 饥饿状态:

        在饥饿模式下,锁的所有权将从unlock的gorutine直接交给交给等待队列中的第一个。
        新来的goroutine将不会尝试去获得锁,即使锁看起来是unlock状态, 也不会去尝试自旋操作,而是放在等待队列的尾部。
    如果一个等待的goroutine获取了锁,并且满足一以下其中的任何一个条件:
        (1)它是队列中的最后一个;
        (2)它等待的时候小于1ms。它会将锁的状态转换为正常状态。

        正常状态有很好的性能表现,饥饿模式也是非常重要的,因为它能阻止尾部延迟的现象。

mutex之lock操作

如果:当goroutine 尝试加锁的时候,锁处于空闲状态。那么直接设置为加锁状态,或者进入自旋逻辑。
当尝试加锁的时候:

  1. 如果当前处于加锁,且是非饥饿状态,且未达到最大自旋次数。尝试标记自己需要被唤醒。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
    // 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识, 并标记自己为被唤醒。
    if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
    }
    runtime_doSpin()
    iter++ // 是计数器,标记自旋的次数
    old = m.state // old 是lock函数的内部变量,用来获取当前mutex的状态
    continue
    }
  2. 被唤醒执行下边的逻辑:【或者说不满足自旋后的操作】

        当不在自旋的时候,有可能被唤醒,有可能锁已经被解除,有可能锁处于饥饿状态。
    当前锁的可能状态:
    1
    2
    3
    4
    5
    // 1. 锁还没有被释放, 锁处于正常状态
    // 2. 锁还没有被释放, 锁处于饥饿状态
    // 3. 锁已经被释放, 锁处于正常状态
    // 4. 锁已经被释放, 锁处于饥饿状态
    // 5. 锁空闲状态
    当前协程尝试重置锁状态:
    1. 如果mutx 处于未饥饿状态.
      把mutx 设置为加锁。【可以已经从别的协程的饥饿锁状态回归,此时是未饥饿的,但是可能是有锁,也可能是无锁】
    2. 如果mutx 处于锁状态或者饥饿状态。
      增加获取锁的队列等待数。
    3. 如果当前协程处于饥饿状态下:
      说明锁状态是处于加锁状态或者饥饿状态。饥饿状态也是一种锁状态,优先级比锁状态高,以便当前goroutine能获取到优先执行权。
      如果mutx 处于加锁,且当前goroutine是饥饿状态,设置mutx为饥饿状态。【抢占锁】
      如果mutx 未加锁,说明是自己在尝试获取锁。【那么case 1 已经设置当前锁状态为 lock】
      1
      2
      3
      if starving && old&mutexLocked != 0 {
      new |= mutexStarving
      }
    4. 尝试使用cas 设置锁状态。【基于1, 2,3重置后的锁状态】
      设置成功:
      1. 如果老的锁状态【重置之前的状态】为 未加锁,不是饥饿。直接跳出循环【此时锁,已经被当前goroutine锁住】。执行业务逻辑。
      2. 添加自身到等待队列
        如果是新的添加到队尾,如果是老的添加到队头去
        1
        2
        queueLifo := waitStartTime != 0
        runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      3. 判断是否超过最大锁等待时间,超过设置自身为饥饿状态【下次循环的时候设置 锁状态为 饥饿状态。】,
      4. 重新获取最新的锁状态,如果是 饥饿状态,那么本身应该获取到.执行权限
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        // 说明当前goroutine 要么是新的goroutine,要么是饥饿的goroutine或者未达到饥饿态的处于自旋中的goroutine.
        // 如果自身未达到饥饿状态的条件,会一直自旋,等待达到饥饿状态的时候,设置锁状态是饥饿状态,那么下次执行一定是自身。
        // unlokc 会解除饥饿状态。
        // 再次获取锁状态,如果是饥饿状态,那么一定是当前goroutine 设置的。因为当前goroutine获取到了执行权限,且在自旋过程中。
        // 而且 锁的状态变更是通过 CAS操作,只有一个执行权限的goroutine可以设置成功。
        old = m.state
        if old&mutexStarving != 0 {
        // If this goroutine was woken and mutex is in starvation mode,
        // ownership was handed off to us but mutex is in somewhat
        // inconsistent state: mutexLocked is not set and we are still
        // accounted as waiter. Fix that.
        if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
        throw("sync: inconsistent mutex state")
        }
        delta := int32(mutexLocked - 1<<mutexWaiterShift)
        // 非饥饿,或者自身处于队列第一个的时候。都要解除饥饿状态,以免死锁。
        if !starving || old>>mutexWaiterShift == 1 {
        // Exit starvation mode.
        // Critical to do it here and consider wait time.
        // Starvation mode is so inefficient, that two goroutines
        // can go lock-step infinitely once they switch mutex
        // to starvation mode.
        delta -= mutexStarving
        }
        // 解除饥饿状态,跳出循环,执行业务逻辑
        atomic.AddInt32(&m.state, delta)
        break
        }
        // 如果不是饥饿状态,重置自旋次数和唤醒状态不为真,让再次执行获取到唤醒状态时候执行。
        如果设置失败:
        重新循环

lock代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
func (m *Mutex) Lock() {
// 如果mutext的state没有被锁,也没有等待/唤醒的goroutine, 锁处于正常状态,那么获得锁,返回.
// 比如锁第一次被goroutine请求时,就是这种状态。或者锁处于空闲的时候,也是这种状态。
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// 标记本goroutine的等待时间
var waitStartTime int64
// 本goroutine是否已经处于饥饿状态
starving := false
// 本goroutine是否已唤醒
awoke := false

// 自旋次数
iter := 0

// 复制锁的当前状态
old := m.state

for {
// 第一个条件是state已被锁,但是不是饥饿状态。如果时饥饿状态,自旋时没有用的,锁的拥有权直接交给了等待队列的第一个。
// 第二个条件是还可以自旋,多核、压力不大并且在一定次数内可以自旋, 具体的条件可以参考`sync_runtime_canSpin`的实现。
// 如果满足这两个条件,不断自旋来等待锁被释放、或者进入饥饿状态、或者不能再自旋。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识, 并标记自己为被唤醒。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}

// 到了这一步, state的状态可能是:
// 1. 锁还没有被释放,锁处于正常状态
// 2. 锁还没有被释放, 锁处于饥饿状态
// 3. 锁已经被释放, 锁处于正常状态
// 4. 锁已经被释放, 锁处于饥饿状态
//
// 并且本gorutine的 awoke可能是true, 也可能是false (其它goutine已经设置了state的woken标识)
// new 复制 state的当前状态, 用来设置新的状态
// old 是锁当前的状态
new := old

// 如果old state状态不是饥饿状态, new state 设置锁, 尝试通过CAS获取锁,
// 如果old state状态是饥饿状态, 则不设置new state的锁,因为饥饿状态下锁直接转给等待队列的第一个.
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 将等待队列的等待者的数量加1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}

// 如果当前goroutine已经处于饥饿状态, 并且old state的已被加锁,
// 将new state的状态标记为饥饿状态, 将锁转变为饥饿状态.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}

// 如果本goroutine已经设置为唤醒状态, 需要清除new state的唤醒标记, 因为本goroutine要么获得了锁,要么进入休眠,
// 总之state的新状态不再是woken状态.
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}

// 通过CAS设置new state值.
// 注意new的锁标记不一定是true, 也可能只是标记一下锁的state是饥饿状态.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果old state的状态是未被锁状态,并且锁不处于饥饿状态,
// 那么当前goroutine已经获取了锁的拥有权,返回
if old&(mutexLocked|mutexStarving) == 0 {
break
}

// 设置/计算本goroutine的等待时间
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}

// 既然未能获取到锁, 那么就使用sleep原语阻塞本goroutine
// 如果是新来的goroutine,queueLifo=false, 加入到等待队列的尾部,耐心等待
// 如果是唤醒的goroutine, queueLifo=true, 加入到等待队列的头部
runtime_SemacquireMutex(&m.sema, queueLifo)

// sleep之后,此goroutine被唤醒
// 计算当前goroutine是否已经处于饥饿状态.
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 得到当前的锁状态
old = m.state

// 如果当前的state已经是饥饿状态
// 那么锁应该处于Unlock状态,那么应该是锁被直接交给了本goroutine
if old&mutexStarving != 0 {

// 如果当前的state已被锁,或者已标记为唤醒, 或者等待的队列中不为空,
// 那么state是一个非法状态
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}

// 当前goroutine用来设置锁,并将等待的goroutine数减1.
delta := int32(mutexLocked - 1<<mutexWaiterShift)

// 如果本goroutine是最后一个等待者,或者它并不处于饥饿状态,
// 那么我们需要把锁的state状态设置为正常模式.
if !starving || old>>mutexWaiterShift == 1 {
// 退出饥饿模式
delta -= mutexStarving
}

// 设置新state, 因为已经获得了锁,退出、返回
atomic.AddInt32(&m.state, delta)
break
}

// 如果当前的锁是正常模式,本goroutine被唤醒,自旋次数清零,从for循环开始处重新开始
awoke = true
iter = 0
} else { // 如果CAS不成功,重新获取锁的state, 从for循环开始处重新开始
old = m.state
}
}
}

sync.unlock

重点:锁定的Mutex与特定的goroutine没有关联,允许一个goroutine锁定一个Mutex,然后安排另一个goroutine解锁它.
但是如果没有执行lock,而去unlock会异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (m *Mutex) Unlock() {
// 如果state不是处于锁的状态, 那么就是Unlock根本没有加锁的mutex, panic
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}

// 释放了锁,还得需要通知其它等待者
// 锁如果处于饥饿状态,直接交给等待队列的第一个, 唤醒它,让它去获取锁
// 锁如果处于正常状态,
// new state如果是正常状态
if new&mutexStarving == 0 {
old := new
for {
// 如果没有等待的goroutine, 或者锁不处于空闲的状态,直接返回.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 将等待的goroutine数减一,并设置woken标识
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 设置新的state, 这里通过信号量会唤醒一个阻塞的goroutine去获取锁.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// 饥饿模式下, 直接将锁的拥有权传给等待队列中的第一个.
// 注意此时state的mutexLocked还没有加锁,唤醒的goroutine会设置它。
// 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态, mutex还是被认为处于锁状态,
// 新来的goroutine不会把锁抢过去,在达到一定次数的时候如果还没获取到锁,那么会设置锁为 饥饿状态,从而回去到执行权限。
runtime_Semrelease(&m.sema, true)
}
}

资料:

https://colobu.com/2018/12/18/dive-into-sync-mutex/
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/
https://www.ququ123.top/2022/04/golang_sync_mutex_principle/
https://gohandbook1.haohongfan.com/docs/sync-chapter/2021-04-01-mutex/