Java 语言中有一个 java.util.Concurrent 包提供了大量的并发工,而 Go 语言中也有这样的角色:sync包。
sync.Mutex 锁实现原理
sync.Mutext 是 Go 提供的一种同步原语,用于表达互斥,不过不推荐在业务中使用,根据 Go 的设计思想,应首先考虑通过传递消息(管道)来共享内存,而不是通过共享内存来传递消息
Mutex 结构体如下,只有两个变量,非常简单:
type Mutex struct {
state int32 // 状态,bitmap
sema uint32 // 信号量
}
state 字段是一个位图
- 第 0 位表示是否加锁
- 第 1 位表示是否已唤醒
- 第 2 位表示是否为饥饿模式
- 剩下的位则用于表示在这个 Mutex 上等待的 goroutine 数量
sema 是一个信号量,配合 sleep 原语和 notify 原因使用,具体实现是协程调用 runtime_SemacquireMutex
用于在信号量上等待信号量大于0,runtime_Semrelease
则会唤醒在这个信号量上等待的协程。
加锁的时候其实就是尝试将state的第一个位置成true,表示已经加锁,而解锁则是将其置回0再调用runtime_Semrelease
唤醒等待的协程。
不过之后的版本引入了饥饿模式,当有协程等待超过一毫秒之后,就会进入饥饿模式,所有占用CPU的协程不再尝试自旋获取锁,直接去等待队列尾端排队,锁会在释放后直接递交给等待队列中的第一个协程。当满足以下任一条件时就会退出饥饿模式:
- 等待时间小于 1ms
- 等待队列空了
不过需要注意的是,go 的 Mutex 并没有额外实现可重入的机制,所以当一个获取了锁再尝试去加锁时就会死锁。
package main
import "sync"
func main() {
mutex := sync.Mutex{}
mutex.Lock()
mutex.Lock() // panic,无法重入
}
> 输出:fatal error: all goroutines are asleep - deadlock!
在 Java语言中有重量级锁和轻量级锁的概念,而在 go 里面对应的则是在信号量上等待的饥饿模式,和自旋空等的普通模式。
饥饿模式和普通模式的切换,也是 go 在公平和性能中的一个权衡。
sync.RWMutex读写锁实现原理
RWMutex 提供的是一个读写锁的功能,同一时间可以有很多个读者在读,或者一个写者在写,适用于读多写少的场景;
RWMutex 的实现和 Mutex 一样,也是基于信号量和atomic指令来实现的;
数据结构:
type RWMutex struct {
w Mutex // 写者锁
writerSem uint32 // 写者信号量
readerSem uint32 // 读者信号量
readerCount int32 // 读者数量
readerWait int32 // 离开的写者数量
}
const rwmutexMaxReaders = 1 << 30 // 最大的读者数量,2^30 = 1073741824
大致的实现原理就是,读加锁时会先把读者数量加一,然后判断当前有没有协程尝试写入,没有的话就可以读了,如果有的话那么就会挂起,挂起是通过判断读者数量是否为负数来实现的,所以这里会限制读者数量不能超过rwmutexMaxReaders,这样才能保证读者数量能在尝试写入时一定能变成负值;在尝试加写锁时会先通过 Mutex 跟其他尝试写的协程竞争,然后才能进入临界区,进入之后就会把读者数量减去rwmutexMaxReaders变成负数,这样就不会有新增的读者,然后等待当前正在读的读者全部读完,最后一个读者读完之后会唤醒它,然后就可以开始写了,写者写完之后唤醒读者,大家你来我往,井井有条。
sync.Cond条件变量
Cond 是一个条件变量:
- Wait 等待
- Notify 唤醒一个等待的协程
- BroadCast 唤醒所有等待的协程
不过这个库在 Go 显得有的有点鸡肋,管道既可以实现信号量的功能,而且还顺带可以传输数据,具体实现的话底层和锁依赖的信号量一样的。
sync.WaitGroup 等待条件
waitGroup 类似于 Java 里面的 CountDownLatch类,可以让一个协程等待其他协程任务的完成,拥有三个方法:
- Add 申明要等待的数量
- Done 完成了一个任务,Add( -1)
- Wait 等待
底层实现也是通过 atomic 和信号量来实现的。
一个没有 WaitGroup 的例子:
wg := make(chan struct{})
size := 10
for i := 0; i < size; i++ {
go func(x int) {
println(x)
wg <- struct{}{}
}(i)
}
for i := 0; i < size; i++ {
<-wg
}
WaitGroup 版本:
wg := sync.WaitGroup{}
size := 10
wg.Add(size)
for i := 0; i < size; i++ {
go func(x int) {
println(x)
wg.Done()
}(i)
}
wg.Wait()
sync.Once实现原理
sync.Once 数据结构:
type Once struct {
done uint32 // 标记是否完成
m Mutex
}
sync.Once 可以保证提交给它的方法无论被调用多少次也只会执行一次,这个其实就是一个经典的单例模式的解决方案,两步判断,实现原理如下:
func (0 *Once)Do(f func){
if atomic.LoadUint32(&o.done) != 0 { // 判断是否已经被执行过
return
}
o.m.Lock() // 加锁尝试初始化
defer o.m.Unlock()
if o.done != 0 { // 可能在等待锁的时候已经被其他协程执行过了
return
}
f() // 执行
atomic.StoreUint32(&o.done, 1) // 标记已完成
}
sync.Map实现原理
数据结构
Go 语言中自带的 map 出于性能考虑并不是线程安全的,go 1.9 加入了 sync.Map 来提供并发场景下哈希表的高性能解决方案,维护了一个只读的副本,这样在只有查询的时候就不用加锁,数据结构如下:
type struct {
mu Mutex //
read atomic.Value // 只读数据
dirty map[interface{}]*entry // 动态数据
misses int // 未命中的数量
}
// read 的结构
type readOnly struct {
m map[interface{}]*entry
amended bool // true 表示 dirty 中后面添加了元素,如果 read 中查不到需要查一下 dirty 确认
}
type entry struct {
p unsafe.Pointer // *interface{} // 指向对应的存储元素
}
查询
读取数据时会先从 read 中读取,read 中读不到才会尝试去 dirty 读,任何对 dirty 的操作都会加锁,来保证数据的安全。如果 amended 不为 true 的话意味着 dirty 中的数据 read 全有,read 中查不到也就不会继续去 dirty 中查询了。
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended { // 穿透
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked() // 统计穿透 read 的数量
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
修改
修改数据时会尝试 CAS 直接修改 read 指向的节点,因为 read 和 dirty 指向的数据:entry 其实是同一份,所以修改是会生效的。
// 修改,不考虑添加的场景
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok{
for {
p := atomic.LoadPointer(&e.p)
//替换底层指向的值
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return // 修改成功
}
}
}
return
}
添加
添加数据时则会直接添加到 dirty 中,数据添加后虽然没有在 read 里出现,但是读是会但穿透次数频繁到一定程度后,就会把 dirty 升级为 read。
// 只考虑添加的场景,不考虑修改
func (m *Map) Store(key, value interface{}) {
m.mu.Lock()
if !read.amended { // 如果当前dirty和read数据一致,修改为这个标识符
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
m.mu.Unlock()
}
删除
因为 read 是只读的,所以不能直接删除,sync.Map 替换掉原有数据指向一个特殊的标记来表示数据已经被删除,同时会在 dirty 中删除这个 key,这样这个 key 就不会一直存在。
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended { // read 中没有找到,看看 dirty 中有没有
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) { // 用一个空指针表示删除了
return true
}
}
}
}
小结
sync.Map 的思想是用空间换时间,维护了一个只读的数据副本,对于读多写少的场景性能几乎和普通 map 一样,但是当写入和删除频繁时性能就比较差了,不过还是比用全局读写锁保护的map要好很多。
Java 里面的 ConcurrentHashMap 的思路则是锁粒度细分,不过 Go 社区里面也有用这种思路实现的开源库:https://github.com/orcaman/concurrent-map,如果写的场景也很频繁的话,这会是一个比 sync.Map 更好的选择。
sync.Pool对象池实现原理
我们在需要频繁创建和销毁对象时可以使用对象池来减少内存分配和GC的压力。
基本用法
pool := sync.Pool{New: NewObject} // 创建对象池同时提供创建函数
x := pool.Get() // 通过 Get 获取一个对象
pool.Put(x) // 使用完之后通过 Put 放回对象池
数据结构
type Pool struct {
local [P]poolLocal // 每个P(线程)私有的结构体
New func() interface{} // 创建函数
}
type poolLocalInternal struct {
private interface{}// 线程私有对象
shared poolChain // 共享双端队列
}
对象池的核心数据结构是一个长度为 Processor 数量即底层线程个数的数组,这样每个协程可以访问各自底层 P 的本地缓存来避免共享访问时加锁。
每个 P 都能缓存一个对象,同时所有 P 都会共享一个队列用来缓存公共的对象。
在 Go 语言早期共享对象池是直接用一个切片实现的队列 + 锁来实现的,在 Go 1.13 的时候优化成了使用无锁双端队列来避免加锁。``
获取对象流程
- 找到当前的 P 对应的 poolLocal
- poolLocal 中查看私有的那一份对象缓存在不在,如果在的话直接返回即可
- 去共享队列中尝试获取对象
- 如果共享队列中也没有,会遍历其他的 P 看看他们是否有缓存
- 如果所有地方都没有缓存,直接调用 New 创建一个