版本
- go version 1.10.1
使用方法
var m sync.map
value,ok:=m.Load(key)
m.Store(key,value)
value,loaded,ok := m.LoadOrStore(key,value)
m.Delete(key)
m.Range(func(key,value) bool)
数据结构
/*
package: sync
file: map.go
line: 27
*/
type Map struct {
mu Mutex
read atomic.Value
dirty map[interface{}]*entry
misses int
}
/*
package: sync
file: map.go
line: 27
*/
type readOnly struct {
m map[interface{}]*entry
amended bool
}
/*
package: sync
file: map.go
line: 73
*/
type entry struct {
p unsafe.Pointer
}
/*
package: sync
file: map.go
line: 70
*/
var expunged = unsafe.Pointer(new(interface{}))
mu: Mutex锁,在对dirty进行操作的时候,需要上锁
read: atomic.Value对象,对readOnly对象进行原子读取和存储,对map进行操作时,优先操作read中的数据。
readOnly:维护一个只读的map和一个判断dirty是否有新值的字段。
dirty:保存最新数据的map,当从read中多次读取不到数据时,会将dirty提升成read。
misses:从read读取不到数据的次数,当dirty提升为read的时候置零。
entry:指针p保存传入value的地址
expunged:随机地址,当entry从dirty中删除的时候,将entry的p指向expunged
方法
Nwomap
/*
package: sync
file: map.go
line: 95
*/
func newEntry(i interface{}) *entry {
return &entry{p: unsafe.Pointer(&i)}
}
newEntry传入一个interface对象,返回保存了interface对象指针的entry指针。
Load
/*
package: sync
file: map.go
line: 102
*/
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly) //step 1
e, ok := read.m[key]
if !ok && read.amended { //step 2
m.mu.Lock()
read, _ = m.read.Load().(readOnly) //step 3
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key] //step 4
m.missLocked()
}
m.mu.Unlock()
}
if !ok { //step 5
return nil, false
}
return e.load() //step 6
}
- 从read中读取readOnly,并尝试从readOnly的map中寻找key对应的entry。
- 若在readOnly中不存在这个key,但是dirty中存在新的entry(不一定是key对应的entry,只是表示dirty中存在read中不存在的entry),则获取Mutex锁。
- 双检查,重复step1和step2。若获得锁之前key在dirty中存在,并且dirty被提升read,则这个时候key在read中存在,读取read中的entry并解锁。
- 若在read中还是不存在,则直接从dirty中读取entry,并且将miss数加1。若miss数大于等于dirty中的数据,则将dirty提升为read,将readOnly中的amended设为false,dirty设为nil,miss置零,然后解锁。
/*
package: sync
file: map.go
line: 343
*/
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
- 若key在dirty和read中都不存在,则返回空值和false。
- 从entry中读取存储的数据,若entry的p为空或者被标记为expunged,则表示该值已经被删除,则返回nil和false,否则返回对应的值和true。
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
Store
/*
package: sync
file: map.go
line: 136
*/
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly) //step 1
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock() //step 2
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok { //step 3
e.storeLocked(&value)
} else { //step 4
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
-
从read中读取readOnly,并尝试从readOnly的map中寻找key对应的entry,若存在则尝试直接将entry的p指向value。
func (e *entry) tryStore(i *interface{}) bool { p := atomic.LoadPointer(&e.p) //step a if p == expunged { return false } for { //step b if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true } p = atomic.LoadPointer(&e.p) if p == expunged { return false } } }
a. 判断p是否被标记为expunged,若是则表示key已经在dirty中删除,不能直接在read中存储。
b. 循环尝试进行CAS操作将当前Value的值存储到entry中,除非key在dirty中被删除。 上锁然后进行双检查,若dirty被提升为read,并且key在新的read中存在,则将新的值存入entry中。中间会判断p是否被标记为expunged,若是则将key存到dirty中。
若在dirty中存在,则直接将值存到entry中。
-
若在dirty中也不存在,则创建新的entry存到dirty中。并且将read的amended设为ture,表示dirty中有新的数据。若dirty为nil,会将read的值赋值给新的dirty。
/* package: sync file: map.go line: 353 */ func (m *Map) dirtyLocked() { if m.dirty != nil { //step a return } read, _ := m.read.Load().(readOnly) //step b m.dirty = make(map[interface{}]*entry, len(read.m)) for k, e := range read.m { if !e.tryExpungeLocked() { //step c m.dirty[k] = e } } } /* package: sync file: map.go line: 367 */ func (e *entry) tryExpungeLocked() (isExpunged bool) { p := atomic.LoadPointer(&e.p) for p == nil { if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { return true } p = atomic.LoadPointer(&e.p) } return p == expunged }
a. 若dirty不为nil,则直接返回。
b. 读出read,创建长度同read的map赋值给dirty。
c. 循环遍历read,若read中的entry的值为nil,则将p标记为expunged,若不为nil,则将entry赋值给dirty。因此,p只有在dirty在重新创建的时候才会被标记为expunged,并且对应的key不会在dirty中出现。所以,在step2的时候,若entry的p被标记为expunged了,那么dirty一定不为nil,并且key在dirty中不存在,需要将key加入到dirty中。
LoadOrStore
/*
package: sync
file: map.go
line: 203
*/
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()
return actual, loaded
}
/*
package: sync
file: map.go
line: 243
*/
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p) //step 1
if p == expunged {
return nil, false, false
}
if p != nil { //step 2
return *(*interface{})(p), true, true
}
ic := i
for { //step 3
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
}
}
基本逻辑同Load方法,在read中找对应的entry,若存在则尝试读并写入,若不存在则上锁后依次在read、dirty中进行读取和写入。唯一区别则是若key在read中不存在,而在dirty中存在,会像Store方法一样,进行miss数增加和判断是否需要提升dirty的操作。
而在entry的tryLoadOrStore方法中
- 判断key是否被删除,若是则返回nil
- 若不为nil,则返回对应的值
- 若为nil,则循环尝试原子CAS操作将新的值存到entry中和进行step 1、step 2的判断
Delete
/*
package: sync
file: map.go
line: 271
*/
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly) //step 1
e, ok := read.m[key]
if !ok && read.amended { //step 2
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok { //step 3
e.delete()
}
}
/*
package: sync
file: map.go
line: 288
*/
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
- 从read中读取readOnly,并尝试从readOnly的map中寻找key对应的entry。
- 若在read中不存在,并且dirty中有新的数据,则上锁并进行双检查,若dirty中依然有新的数据,则直接调用delete方法删除dirty中的对应的key,无论在dirty中是否存在这个key。
- 若entry存在,则将entry中的p标记为nil。
Range
/*
package: sync
file: map.go
line: 310
*/
func (m *Map) Range(f func(key, value interface{}) bool) {
read, _ := m.read.Load().(readOnly) //step 1
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m { //step 2
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
- 从read中读取readOnly,若dirty中有新数据,则立即提升dirty为read。
- 使用原生的range方法遍历read,若数据不为nil,则调用回调方法传入key和value,若回调方法返回false,则跳出循环。
总结
sync.map通过使用一个只读的read和一只写的dirty来保证读写时候的并发效率,只有在对read中不存在的key进行读写的时候,才会去上锁写dirty。
read和dirty通过使用entry进行数据共享,若一个key在read中存在,则只修改read中这个key对应的entry的值,就可以保证在dirty中数据同步。而不需要对map进行修改,保证了不会对read造成并发读写异常。比如delete操作,只是将read中的entry的p设置为nil,而不是将这个key删除。
在读取操作的时候,若key在read中多次不存在,而在dirty中有新数据(不管在dirty中是否存在),则会将dirty提升为read,保证读取的命中率,减少上锁次数。
在写操作的时候,若read不存在这个key,才会对dirty进行写,并且若dirty为空,会从read中复制不为空的数据。
sync.map适合在对大量重复key的读写操作的场景下使用,这个时候约等于都在对read进行读写,不需要上锁,效率非常高。但是不适合在对大量新增key的读写场景下使用,这个时候大部分操作都是在对dirty进行操作,几乎都需要上锁,而且由于大量miss和大量新增key的写,导致频繁的dirty提升和dirty复制,效率反而可能低于读写锁。