diff --git a/common/utils/concurrent_map.go b/common/utils/concurrent_map.go new file mode 100644 index 000000000..e12e3ccc2 --- /dev/null +++ b/common/utils/concurrent_map.go @@ -0,0 +1,370 @@ +package utils + +import ( + "encoding/json" + "fmt" + "sync" +) + +var SHARD_COUNT = 32 + +type Stringer interface { + fmt.Stringer + comparable +} + +// A "thread" safe map of type string:Anything. +// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards. +type ConcurrentMap[K comparable, V any] struct { + shards []*ConcurrentMapShared[K, V] + sharding func(key K) uint32 +} + +// A "thread" safe string to anything map. +type ConcurrentMapShared[K comparable, V any] struct { + items map[K]V + sync.RWMutex // Read Write mutex, guards access to internal map. +} + +func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] { + m := ConcurrentMap[K, V]{ + sharding: sharding, + shards: make([]*ConcurrentMapShared[K, V], SHARD_COUNT), + } + for i := 0; i < SHARD_COUNT; i++ { + m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)} + } + return m +} + +// Creates a new concurrent map. +func New[V any]() ConcurrentMap[string, V] { + return create[string, V](fnv32) +} + +// Creates a new concurrent map. +func NewStringer[K Stringer, V any]() ConcurrentMap[K, V] { + return create[K, V](strfnv32[K]) +} + +// Creates a new concurrent map. +func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] { + return create[K, V](sharding) +} + +// GetShard returns shard under given key +func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] { + return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)] +} + +func (m ConcurrentMap[K, V]) MSet(data map[K]V) { + for key, value := range data { + shard := m.GetShard(key) + shard.Lock() + shard.items[key] = value + shard.Unlock() + } +} + +// Sets the given value under the specified key. +func (m ConcurrentMap[K, V]) Set(key K, value V) { + // Get map shard. + shard := m.GetShard(key) + shard.Lock() + shard.items[key] = value + shard.Unlock() +} + +// Callback to return new element to be inserted into the map +// It is called while lock is held, therefore it MUST NOT +// try to access other keys in same map, as it can lead to deadlock since +// Go sync.RWLock is not reentrant +type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V + +// Insert or Update - updates existing element or inserts a new one using UpsertCb +func (m ConcurrentMap[K, V]) Upsert(key K, value V, cb UpsertCb[V]) (res V) { + shard := m.GetShard(key) + shard.Lock() + v, ok := shard.items[key] + res = cb(ok, v, value) + shard.items[key] = res + shard.Unlock() + return res +} + +// Sets the given value under the specified key if no value was associated with it. +func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool { + // Get map shard. + shard := m.GetShard(key) + shard.Lock() + _, ok := shard.items[key] + if !ok { + shard.items[key] = value + } + shard.Unlock() + return !ok +} + +// Get retrieves an element from map under given key. +func (m ConcurrentMap[K, V]) Get(key K) (V, bool) { + // Get shard + shard := m.GetShard(key) + shard.RLock() + // Get item from shard. + val, ok := shard.items[key] + shard.RUnlock() + return val, ok +} + +// Count returns the number of elements within the map. +func (m ConcurrentMap[K, V]) Count() int { + count := 0 + for i := 0; i < SHARD_COUNT; i++ { + shard := m.shards[i] + shard.RLock() + count += len(shard.items) + shard.RUnlock() + } + return count +} + +// Looks up an item under specified key +func (m ConcurrentMap[K, V]) Has(key K) bool { + // Get shard + shard := m.GetShard(key) + shard.RLock() + // See if element is within shard. + _, ok := shard.items[key] + shard.RUnlock() + return ok +} + +// Remove removes an element from the map. +func (m ConcurrentMap[K, V]) Remove(key K) { + // Try to get shard. + shard := m.GetShard(key) + shard.Lock() + delete(shard.items, key) + shard.Unlock() +} + +// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held +// If returns true, the element will be removed from the map +type RemoveCb[K any, V any] func(key K, v V, exists bool) bool + +// RemoveCb locks the shard containing the key, retrieves its current value and calls the callback with those params +// If callback returns true and element exists, it will remove it from the map +// Returns the value returned by the callback (even if element was not present in the map) +func (m ConcurrentMap[K, V]) RemoveCb(key K, cb RemoveCb[K, V]) bool { + // Try to get shard. + shard := m.GetShard(key) + shard.Lock() + v, ok := shard.items[key] + remove := cb(key, v, ok) + if remove && ok { + delete(shard.items, key) + } + shard.Unlock() + return remove +} + +// Pop removes an element from the map and returns it +func (m ConcurrentMap[K, V]) Pop(key K) (v V, exists bool) { + // Try to get shard. + shard := m.GetShard(key) + shard.Lock() + v, exists = shard.items[key] + delete(shard.items, key) + shard.Unlock() + return v, exists +} + +// IsEmpty checks if map is empty. +func (m ConcurrentMap[K, V]) IsEmpty() bool { + return m.Count() == 0 +} + +// Used by the Iter & IterBuffered functions to wrap two variables together over a channel, +type Tuple[K comparable, V any] struct { + Key K + Val V +} + +// Iter returns an iterator which could be used in a for range loop. +// +// Deprecated: using IterBuffered() will get a better performence +func (m ConcurrentMap[K, V]) Iter() <-chan Tuple[K, V] { + chans := snapshot(m) + ch := make(chan Tuple[K, V]) + go fanIn(chans, ch) + return ch +} + +// IterBuffered returns a buffered iterator which could be used in a for range loop. +func (m ConcurrentMap[K, V]) IterBuffered() <-chan Tuple[K, V] { + chans := snapshot(m) + total := 0 + for _, c := range chans { + total += cap(c) + } + ch := make(chan Tuple[K, V], total) + go fanIn(chans, ch) + return ch +} + +// Clear removes all items from map. +func (m ConcurrentMap[K, V]) Clear() { + for item := range m.IterBuffered() { + m.Remove(item.Key) + } +} + +// Returns a array of channels that contains elements in each shard, +// which likely takes a snapshot of `m`. +// It returns once the size of each buffered channel is determined, +// before all the channels are populated using goroutines. +func snapshot[K comparable, V any](m ConcurrentMap[K, V]) (chans []chan Tuple[K, V]) { + //When you access map items before initializing. + if len(m.shards) == 0 { + panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`) + } + chans = make([]chan Tuple[K, V], SHARD_COUNT) + wg := sync.WaitGroup{} + wg.Add(SHARD_COUNT) + // Foreach shard. + for index, shard := range m.shards { + go func(index int, shard *ConcurrentMapShared[K, V]) { + // Foreach key, value pair. + shard.RLock() + chans[index] = make(chan Tuple[K, V], len(shard.items)) + wg.Done() + for key, val := range shard.items { + chans[index] <- Tuple[K, V]{key, val} + } + shard.RUnlock() + close(chans[index]) + }(index, shard) + } + wg.Wait() + return chans +} + +// fanIn reads elements from channels `chans` into channel `out` +func fanIn[K comparable, V any](chans []chan Tuple[K, V], out chan Tuple[K, V]) { + wg := sync.WaitGroup{} + wg.Add(len(chans)) + for _, ch := range chans { + go func(ch chan Tuple[K, V]) { + for t := range ch { + out <- t + } + wg.Done() + }(ch) + } + wg.Wait() + close(out) +} + +// Items returns all items as map[string]V +func (m ConcurrentMap[K, V]) Items() map[K]V { + tmp := make(map[K]V) + + // Insert items to temporary map. + for item := range m.IterBuffered() { + tmp[item.Key] = item.Val + } + + return tmp +} + +// Iterator callbacalled for every key,value found in +// maps. RLock is held for all calls for a given shard +// therefore callback sess consistent view of a shard, +// but not across the shards +type IterCb[K comparable, V any] func(key K, v V) + +// Callback based iterator, cheapest way to read +// all elements in a map. +func (m ConcurrentMap[K, V]) IterCb(fn IterCb[K, V]) { + for idx := range m.shards { + shard := (m.shards)[idx] + shard.RLock() + for key, value := range shard.items { + fn(key, value) + } + shard.RUnlock() + } +} + +// Keys returns all keys as []string +func (m ConcurrentMap[K, V]) Keys() []K { + count := m.Count() + ch := make(chan K, count) + go func() { + // Foreach shard. + wg := sync.WaitGroup{} + wg.Add(SHARD_COUNT) + for _, shard := range m.shards { + go func(shard *ConcurrentMapShared[K, V]) { + // Foreach key, value pair. + shard.RLock() + for key := range shard.items { + ch <- key + } + shard.RUnlock() + wg.Done() + }(shard) + } + wg.Wait() + close(ch) + }() + + // Generate keys + keys := make([]K, 0, count) + for k := range ch { + keys = append(keys, k) + } + return keys +} + +// Reviles ConcurrentMap "private" variables to json marshal. +func (m ConcurrentMap[K, V]) MarshalJSON() ([]byte, error) { + // Create a temporary map, which will hold all item spread across shards. + tmp := make(map[K]V) + + // Insert items to temporary map. + for item := range m.IterBuffered() { + tmp[item.Key] = item.Val + } + return json.Marshal(tmp) +} +func strfnv32[K fmt.Stringer](key K) uint32 { + return fnv32(key.String()) +} + +func fnv32(key string) uint32 { + hash := uint32(2166136261) + const prime32 = uint32(16777619) + keyLength := len(key) + for i := 0; i < keyLength; i++ { + hash *= prime32 + hash ^= uint32(key[i]) + } + return hash +} + +// Reverse process of Marshal. +func (m *ConcurrentMap[K, V]) UnmarshalJSON(b []byte) (err error) { + tmp := make(map[K]V) + + // Unmarshal into a single map. + if err := json.Unmarshal(b, &tmp); err != nil { + return err + } + + // foreach key,value pair in temporary map insert into our concurrent map. + for key, val := range tmp { + m.Set(key, val) + } + return nil +} diff --git a/logic/service/space/space.go b/logic/service/space/space.go index 32e61e91d..71a75d39c 100644 --- a/logic/service/space/space.go +++ b/logic/service/space/space.go @@ -13,12 +13,13 @@ import ( // Space 针对Player的并发安全map,键为uint32类型 type Space struct { mu sync.RWMutex // 读写锁,读多写少场景更高效 - data map[uint32]common.PlayerI // 存储玩家数据的map,键为玩家ID + data utils.ConcurrentMap[uint32, common.PlayerI]// 存储玩家数据的map,键为玩家ID CanRefresh bool //是否能够刷怪 ID uint32 // 地图ID Name string //地图名称 DefaultPos model.Pos //默认位置DefaultPos - Positions map[uint32]model.Pos //从上一个地图跳转后默认位置 + + Positions *utils.SyncMap[uint32, model.Pos] //从上一个地图跳转后默认位置 } // NewSyncMap 创建一个新的玩家同步map