This commit is contained in:
1
2026-01-21 14:51:10 +00:00
parent 5ef922278a
commit 97f70731b5
4 changed files with 111 additions and 47 deletions

View File

@@ -1,6 +1,7 @@
package csmap
import (
"blazing/cool"
"context"
"encoding/json"
"sync"
@@ -16,6 +17,7 @@ type CsMap[K comparable, V any] struct {
shards []shard[K, V]
shardCount uint64
size uint64
pool *ants.Pool
}
type HashShardPair[K comparable, V any] struct {
@@ -46,28 +48,29 @@ func New[K comparable, V any](options ...OptFunc[K, V]) *CsMap[K, V] {
for i := 0; i < int(m.shardCount); i++ {
m.shards[i] = shard[K, V]{items: swiss.NewMap[K, V](uint32((m.size / m.shardCount) + 1)), RWMutex: &sync.RWMutex{}}
}
m.pool, _ = ants.NewPool(-1)
return &m
}
// Create creates *CsMap.
//
// Deprecated: New function should be used instead.
func Create[K comparable, V any](options ...func(options *CsMap[K, V])) *CsMap[K, V] {
m := CsMap[K, V]{
hasher: maphash.NewHasher[K]().Hash,
shardCount: 32,
}
for _, option := range options {
option(&m)
}
// // Create creates *CsMap.
// //
// // Deprecated: New function should be used instead.
// func Create[K comparable, V any](options ...func(options *CsMap[K, V])) *CsMap[K, V] {
// m := CsMap[K, V]{
// hasher: maphash.NewHasher[K]().Hash,
// shardCount: 32,
// }
// for _, option := range options {
// option(&m)
// }
m.shards = make([]shard[K, V], m.shardCount)
// m.shards = make([]shard[K, V], m.shardCount)
for i := 0; i < int(m.shardCount); i++ {
m.shards[i] = shard[K, V]{items: swiss.NewMap[K, V](uint32((m.size / m.shardCount) + 1)), RWMutex: &sync.RWMutex{}}
}
return &m
}
// for i := 0; i < int(m.shardCount); i++ {
// m.shards[i] = shard[K, V]{items: swiss.NewMap[K, V](uint32((m.size / m.shardCount) + 1)), RWMutex: &sync.RWMutex{}}
// }
// return &m
// }
func WithShardCount[K comparable, V any](count uint64) func(csMap *CsMap[K, V]) {
return func(csMap *CsMap[K, V]) {
@@ -240,40 +243,41 @@ func (m *CsMap[K, V]) UnmarshalJSON(b []byte) error {
func (m *CsMap[K, V]) produce(ctx context.Context, ch chan Tuple[K, V]) {
var wg sync.WaitGroup
wg.Add(len(m.shards))
var producepool, _ = ants.NewPoolWithFuncGeneric(-1, func(i int) {
defer wg.Done()
shard := m.shards[i]
shard.RLock()
shard.items.Iter(func(k K, v V) (stop bool) {
select {
case <-ctx.Done():
return true
default:
ch <- Tuple[K, V]{Key: k, Val: v}
}
return false
})
shard.RUnlock()
})
for i := range m.shards {
producepool.Invoke(i)
}
go func(i int) {
defer wg.Done()
defer func() {
if err := recover(); err != nil { // 恢复 panicerr 为 panic 错误值
// 1. 打印错误信息
pool.Submit(func() {
cool.Logger.Error(context.TODO(), "panic 错误:", err)
}
}()
shard := m.shards[i]
shard.RLock()
shard.items.Iter(func(k K, v V) (stop bool) {
select {
case <-ctx.Done():
return true
default:
ch <- Tuple[K, V]{Key: k, Val: v}
}
return false
})
shard.RUnlock()
}(i)
}
go func() {
wg.Wait()
close(ch)
})
}()
}
var pool, _ = ants.NewPool(-1)
func (m *CsMap[K, V]) listen(f func(key K, value V) (stop bool), ch chan Tuple[K, V]) *sync.WaitGroup {
var wg sync.WaitGroup
wg.Add(1)
pool.Submit(func() {
m.pool.Submit(func() {
defer wg.Done()
for t := range ch {
if stop := f(t.Key, t.Val); stop {

View File

@@ -1,6 +1,6 @@
module blazing/logic
go 1.20
go 1.24.0
require (
github.com/antlabs/timer v0.1.4
@@ -19,6 +19,7 @@ require (
require (
github.com/alpacahq/alpacadecimal v0.0.8 // indirect
github.com/shirou/gopsutil/v4 v4.25.12 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
)
@@ -59,7 +60,7 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.30.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/text v0.22.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect

View File

@@ -94,6 +94,8 @@ github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDN
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw=
github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0=
github.com/shirou/gopsutil/v4 v4.25.12 h1:e7PvW/0RmJ8p8vPGJH4jvNkOyLmbkXgXW4m6ZPic6CY=
github.com/shirou/gopsutil/v4 v4.25.12/go.mod h1:EivAfP5x2EhLp2ovdpKSozecVXn1TmuG7SMzs/Wh4PU=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
@@ -125,6 +127,7 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -2,9 +2,11 @@ package main
import (
"fmt"
"log"
"os"
"runtime"
"strings"
"time"
_ "github.com/gogf/gf/contrib/nosql/redis/v2"
"github.com/gogf/gf/v2/os/gcmd"
@@ -15,6 +17,8 @@ import (
"blazing/cool"
"github.com/shirou/gopsutil/v4/mem"
//"blazing/o/service"
"blazing/modules/base/service"
blservice "blazing/modules/player/service"
@@ -32,8 +36,9 @@ func PprofWeb() {
}
}
// signalHandlerForMain 主进程信号处理函数
func signalHandlerForMain(sig os.Signal) {
// cleanup 优雅清理资源,根据业务需求实现
func cleanup() {
log.Println("执行优雅清理资源...")
fight.Fightpool.ReleaseTimeout(0)
player.Mainplayer.Range(func(key uint32, value *player.Player) bool {
@@ -41,6 +46,12 @@ func signalHandlerForMain(sig os.Signal) {
return true
})
log.Println("资源清理完成,程序即将退出")
}
// signalHandlerForMain 主进程信号处理函数
func signalHandlerForMain(sig os.Signal) {
cleanup()
fmt.Println("MainProcess is shutting down due to signal:", sig.String())
}
@@ -71,7 +82,7 @@ func main() {
// if cool.Config.PortBL == 1 || cool.Config.PortBL == 2 { //只分析1服务器的
// go PprofWeb()
// }
go monitorMemAndQuit()
fmt.Println("Process start, pid:", os.Getpid())
gproc.AddSigHandlerShutdown(
@@ -82,6 +93,51 @@ func main() {
gproc.Listen()
}
const (
memThresholdRatio = 0.9 // 内存占用阈值70%
checkInterval = 3 * time.Second // 内存检测间隔,可按需调整
)
// 监控内存,超阈值则优雅退出程序
func monitorMemAndQuit() {
var memStats runtime.MemStats
for {
// 1. 获取系统总内存和可用内存
sysMem, err := mem.VirtualMemory()
if err != nil {
log.Printf("获取系统内存失败:%v\n", err)
time.Sleep(checkInterval)
continue
}
// 2. 获取Go进程当前堆内存占用进程实际使用的核心内存
runtime.ReadMemStats(&memStats)
procUsedMem := memStats.HeapInuse // 进程堆内存占用(字节)
sysTotalMem := sysMem.Total // 系统总内存(字节)
// 3. 计算进程内存占系统总内存的比例
usedRatio := float64(procUsedMem) / float64(sysTotalMem)
// 格式化输出MB方便查看
procUsedMB := procUsedMem / 1024 / 1024
sysTotalMB := sysTotalMem / 1024 / 1024
log.Printf("当前内存:进程占用%vMB / 系统总%vMB占比%.1f%%",
procUsedMB, sysTotalMB, usedRatio*100)
// 4. 超70%阈值,执行优雅退出
if usedRatio >= memThresholdRatio {
log.Fatalf("内存占比达%.1f%%超过70%阈值,程序开始退出", usedRatio*100)
// ########## 可选:这里添加你的优雅清理逻辑 ##########
// 如:关闭数据库连接、释放文件句柄、保存业务状态、推送退出告警等
cleanup()
// 退出程序返回非0码方便进程管理工具识别并重启
os.Exit(1)
}
time.Sleep(checkInterval)
}
}
// loadAccounts 从CSV文件加载账号信息
func loadAccounts() {
t1, _ := os.Getwd()