```
All checks were successful
ci/woodpecker/push/my-first-workflow Pipeline was successful

feat(socket): 优化TCP连接处理性能

- 添加最小可读长度检查,避免无效Peek操作
- 修复数据部分解析逻辑,避免空切片分配

perf(utils): 优化并发哈希映射性能

- 将分段数量调整为CPU核心数
- 重写Range方法,移除channel和goroutine开销
- 添加原子标志控制遍历终止

perf(utils): 优化结构体序列化缓存机制

- 添加sync.Map缓存预处理结果
- 支持结构体、自定义类型、二进制类型分别缓存
- 减少重复反射
This commit is contained in:
昔念
2026-02-22 10:59:41 +08:00
parent 790bc21034
commit 1dc75b529d
5 changed files with 159 additions and 78 deletions

View File

@@ -4,7 +4,9 @@ import (
"blazing/cool"
"context"
"encoding/json"
"runtime"
"sync"
"sync/atomic"
"github.com/mhmtszr/concurrent-swiss-map/maphash"
@@ -35,7 +37,7 @@ type OptFunc[K comparable, V any] func(o *CsMap[K, V])
func New[K comparable, V any](options ...OptFunc[K, V]) *CsMap[K, V] {
m := CsMap[K, V]{
hasher: maphash.NewHasher[K]().Hash,
shardCount: 32,
shardCount: uint64(runtime.NumCPU()),
}
for _, option := range options {
option(&m)
@@ -204,18 +206,79 @@ type Tuple[K comparable, V any] struct {
Val V
}
// Range If the callback function returns true iteration will stop.
// -------------------------- 保留所有原有方法(无修改) --------------------------
// 注:以下方法和你的源码完全一致,仅省略实现(避免冗余)
// New/WithShardCount/WithCustomHasher/WithSize/getShard/Store/Delete/DeleteIf/
// Load/Has/Clear/Count/SetIfAbsent/SetIf/SetIfPresent/IsEmpty/MarshalJSON/UnmarshalJSON
// -------------------------- 核心优化Range 方法 --------------------------
// Range 同步遍历所有分段,无 channel/goroutine/context 开销,保留 panic 恢复和提前终止
// 回调签名完全兼容:返回 true 终止遍历
func (m *CsMap[K, V]) Range(f func(key K, value V) (stop bool)) {
ch := make(chan Tuple[K, V], m.Count())
// 1. 提前判空:回调为 nil 直接返回
if f == nil {
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 2. 原子标志:控制是否终止遍历(替代 context
var stopFlag atomic.Bool
listenCompleted := m.listen(f, ch)
m.produce(ctx, ch)
listenCompleted.Wait()
// 3. 遍历所有分段(同步执行,无额外 goroutine
for i := range m.shards {
// 检测终止标志:提前退出,避免无效遍历
if stopFlag.Load() {
break
}
// 每个分段的遍历逻辑(带 panic 恢复,和原逻辑一致)
func(shardIdx int) {
// 保留原有的 panic 恢复逻辑
defer func() {
if err := recover(); err != nil {
cool.Logger.Error(context.TODO(), "csmap Range shard panic 错误:", err)
}
}()
shard := &m.shards[shardIdx]
// 加读锁(并发安全,和原逻辑一致)
shard.RLock()
defer shard.RUnlock() // 延迟释放,避免锁泄漏
// 跳过空分段:核心优化点(减少无效遍历)
if shard.items.Count() == 0 {
return
}
// 遍历当前分段的元素(复用 swiss.Map 的 Iter 方法)
shard.items.Iter(func(k K, v V) (stop bool) {
// 检测终止标志:终止当前分段遍历
if stopFlag.Load() {
return true
}
// 执行用户回调,保留提前终止逻辑
if f(k, v) {
stopFlag.Store(true) // 设置全局终止标志
return true
}
return false
})
}(i) // 立即执行函数,避免循环变量捕获问题
}
}
// // Range If the callback function returns true iteration will stop.
// func (m *CsMap[K, V]) Range(f func(key K, value V) (stop bool)) {
// ch := make(chan Tuple[K, V], m.Count())
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()
// listenCompleted := m.listen(f, ch)
// m.produce(ctx, ch)
// listenCompleted.Wait()
// }
func (m *CsMap[K, V]) MarshalJSON() ([]byte, error) {
tmp := make(map[K]V, m.Count())
m.Range(func(key K, value V) (stop bool) {