From 97f70731b5397eca33d61e1ae271c8ff0b602d13 Mon Sep 17 00:00:00 2001 From: 1 <1@72wo.cn> Date: Wed, 21 Jan 2026 14:51:10 +0000 Subject: [PATCH] 1 --- .../concurrent_swiss_map.go | 88 ++++++++++--------- logic/go.mod | 5 +- logic/go.sum | 3 + logic/main.go | 62 ++++++++++++- 4 files changed, 111 insertions(+), 47 deletions(-) diff --git a/common/utils/concurrent-swiss-map/concurrent_swiss_map.go b/common/utils/concurrent-swiss-map/concurrent_swiss_map.go index c584f6a48..8620ece36 100644 --- a/common/utils/concurrent-swiss-map/concurrent_swiss_map.go +++ b/common/utils/concurrent-swiss-map/concurrent_swiss_map.go @@ -1,6 +1,7 @@ package csmap import ( + "blazing/cool" "context" "encoding/json" "sync" @@ -16,6 +17,7 @@ type CsMap[K comparable, V any] struct { shards []shard[K, V] shardCount uint64 size uint64 + pool *ants.Pool } type HashShardPair[K comparable, V any] struct { @@ -46,28 +48,29 @@ func New[K comparable, V any](options ...OptFunc[K, V]) *CsMap[K, V] { for i := 0; i < int(m.shardCount); i++ { m.shards[i] = shard[K, V]{items: swiss.NewMap[K, V](uint32((m.size / m.shardCount) + 1)), RWMutex: &sync.RWMutex{}} } + m.pool, _ = ants.NewPool(-1) return &m } -// Create creates *CsMap. -// -// Deprecated: New function should be used instead. -func Create[K comparable, V any](options ...func(options *CsMap[K, V])) *CsMap[K, V] { - m := CsMap[K, V]{ - hasher: maphash.NewHasher[K]().Hash, - shardCount: 32, - } - for _, option := range options { - option(&m) - } +// // Create creates *CsMap. +// // +// // Deprecated: New function should be used instead. +// func Create[K comparable, V any](options ...func(options *CsMap[K, V])) *CsMap[K, V] { +// m := CsMap[K, V]{ +// hasher: maphash.NewHasher[K]().Hash, +// shardCount: 32, +// } +// for _, option := range options { +// option(&m) +// } - m.shards = make([]shard[K, V], m.shardCount) +// m.shards = make([]shard[K, V], m.shardCount) - for i := 0; i < int(m.shardCount); i++ { - m.shards[i] = shard[K, V]{items: swiss.NewMap[K, V](uint32((m.size / m.shardCount) + 1)), RWMutex: &sync.RWMutex{}} - } - return &m -} +// for i := 0; i < int(m.shardCount); i++ { +// m.shards[i] = shard[K, V]{items: swiss.NewMap[K, V](uint32((m.size / m.shardCount) + 1)), RWMutex: &sync.RWMutex{}} +// } +// return &m +// } func WithShardCount[K comparable, V any](count uint64) func(csMap *CsMap[K, V]) { return func(csMap *CsMap[K, V]) { @@ -240,40 +243,41 @@ func (m *CsMap[K, V]) UnmarshalJSON(b []byte) error { func (m *CsMap[K, V]) produce(ctx context.Context, ch chan Tuple[K, V]) { var wg sync.WaitGroup wg.Add(len(m.shards)) - - var producepool, _ = ants.NewPoolWithFuncGeneric(-1, func(i int) { - defer wg.Done() - - shard := m.shards[i] - shard.RLock() - shard.items.Iter(func(k K, v V) (stop bool) { - select { - case <-ctx.Done(): - return true - default: - ch <- Tuple[K, V]{Key: k, Val: v} - } - return false - }) - shard.RUnlock() - }) - for i := range m.shards { - producepool.Invoke(i) - } + go func(i int) { + defer wg.Done() + defer func() { + if err := recover(); err != nil { // 恢复 panic,err 为 panic 错误值 + // 1. 打印错误信息 - pool.Submit(func() { + cool.Logger.Error(context.TODO(), "panic 错误:", err) + + } + }() + shard := m.shards[i] + shard.RLock() + shard.items.Iter(func(k K, v V) (stop bool) { + select { + case <-ctx.Done(): + return true + default: + ch <- Tuple[K, V]{Key: k, Val: v} + } + return false + }) + shard.RUnlock() + }(i) + } + go func() { wg.Wait() close(ch) - }) + }() } -var pool, _ = ants.NewPool(-1) - func (m *CsMap[K, V]) listen(f func(key K, value V) (stop bool), ch chan Tuple[K, V]) *sync.WaitGroup { var wg sync.WaitGroup wg.Add(1) - pool.Submit(func() { + m.pool.Submit(func() { defer wg.Done() for t := range ch { if stop := f(t.Key, t.Val); stop { diff --git a/logic/go.mod b/logic/go.mod index 6fc3443ff..6b647c219 100644 --- a/logic/go.mod +++ b/logic/go.mod @@ -1,6 +1,6 @@ module blazing/logic -go 1.20 +go 1.24.0 require ( github.com/antlabs/timer v0.1.4 @@ -19,6 +19,7 @@ require ( require ( github.com/alpacahq/alpacadecimal v0.0.8 // indirect + github.com/shirou/gopsutil/v4 v4.25.12 // indirect github.com/shopspring/decimal v1.3.1 // indirect github.com/valyala/fastrand v1.1.0 // indirect ) @@ -59,7 +60,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/net v0.33.0 // indirect - golang.org/x/sys v0.30.0 // indirect + golang.org/x/sys v0.38.0 // indirect golang.org/x/text v0.22.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect diff --git a/logic/go.sum b/logic/go.sum index 953dce145..b4814d697 100644 --- a/logic/go.sum +++ b/logic/go.sum @@ -94,6 +94,8 @@ github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDN github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw= github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= +github.com/shirou/gopsutil/v4 v4.25.12 h1:e7PvW/0RmJ8p8vPGJH4jvNkOyLmbkXgXW4m6ZPic6CY= +github.com/shirou/gopsutil/v4 v4.25.12/go.mod h1:EivAfP5x2EhLp2ovdpKSozecVXn1TmuG7SMzs/Wh4PU= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= @@ -125,6 +127,7 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/logic/main.go b/logic/main.go index b6a6ede4a..34ea9d602 100644 --- a/logic/main.go +++ b/logic/main.go @@ -2,9 +2,11 @@ package main import ( "fmt" + "log" "os" "runtime" "strings" + "time" _ "github.com/gogf/gf/contrib/nosql/redis/v2" "github.com/gogf/gf/v2/os/gcmd" @@ -15,6 +17,8 @@ import ( "blazing/cool" + "github.com/shirou/gopsutil/v4/mem" + //"blazing/o/service" "blazing/modules/base/service" blservice "blazing/modules/player/service" @@ -32,8 +36,9 @@ func PprofWeb() { } } -// signalHandlerForMain 主进程信号处理函数 -func signalHandlerForMain(sig os.Signal) { +// cleanup 优雅清理资源,根据业务需求实现 +func cleanup() { + log.Println("执行优雅清理资源...") fight.Fightpool.ReleaseTimeout(0) player.Mainplayer.Range(func(key uint32, value *player.Player) bool { @@ -41,6 +46,12 @@ func signalHandlerForMain(sig os.Signal) { return true }) + log.Println("资源清理完成,程序即将退出") +} + +// signalHandlerForMain 主进程信号处理函数 +func signalHandlerForMain(sig os.Signal) { + cleanup() fmt.Println("MainProcess is shutting down due to signal:", sig.String()) } @@ -71,7 +82,7 @@ func main() { // if cool.Config.PortBL == 1 || cool.Config.PortBL == 2 { //只分析1服务器的 // go PprofWeb() // } - + go monitorMemAndQuit() fmt.Println("Process start, pid:", os.Getpid()) gproc.AddSigHandlerShutdown( @@ -82,6 +93,51 @@ func main() { gproc.Listen() } +const ( + memThresholdRatio = 0.9 // 内存占用阈值70% + checkInterval = 3 * time.Second // 内存检测间隔,可按需调整 +) + +// 监控内存,超阈值则优雅退出程序 +func monitorMemAndQuit() { + var memStats runtime.MemStats + for { + // 1. 获取系统总内存和可用内存 + sysMem, err := mem.VirtualMemory() + if err != nil { + log.Printf("获取系统内存失败:%v\n", err) + time.Sleep(checkInterval) + continue + } + + // 2. 获取Go进程当前堆内存占用(进程实际使用的核心内存) + runtime.ReadMemStats(&memStats) + procUsedMem := memStats.HeapInuse // 进程堆内存占用(字节) + sysTotalMem := sysMem.Total // 系统总内存(字节) + + // 3. 计算进程内存占系统总内存的比例 + usedRatio := float64(procUsedMem) / float64(sysTotalMem) + // 格式化输出(MB),方便查看 + procUsedMB := procUsedMem / 1024 / 1024 + sysTotalMB := sysTotalMem / 1024 / 1024 + + log.Printf("当前内存:进程占用%vMB / 系统总%vMB,占比%.1f%%", + procUsedMB, sysTotalMB, usedRatio*100) + + // 4. 超70%阈值,执行优雅退出 + if usedRatio >= memThresholdRatio { + log.Fatalf("内存占比达%.1f%%,超过70%阈值,程序开始退出", usedRatio*100) + // ########## 可选:这里添加你的优雅清理逻辑 ########## + // 如:关闭数据库连接、释放文件句柄、保存业务状态、推送退出告警等 + cleanup() + // 退出程序,返回非0码(方便进程管理工具识别并重启) + os.Exit(1) + } + + time.Sleep(checkInterval) + } +} + // loadAccounts 从CSV文件加载账号信息 func loadAccounts() { t1, _ := os.Getwd()