From af8568c8cbce5aba078ca4f4f1ec8945ed524973 Mon Sep 17 00:00:00 2001 From: eson <9673575+githubcontent@user.noreply.gitee.com> Date: Tue, 14 Nov 2023 11:41:12 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E8=AF=A6=E7=BB=86=E7=9A=84?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=BF=A1=E6=81=AF=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sm.go | 14 ++++++++------ sm_upate_handler.go | 6 +++--- websocket.go | 45 +++++++++++++++++++++++++++++++++++++++------ 3 files changed, 50 insertions(+), 15 deletions(-) diff --git a/sm.go b/sm.go index 815b8ed..6448be2 100644 --- a/sm.go +++ b/sm.go @@ -37,12 +37,14 @@ func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine { queues: make(map[string]*PriorityQueue[QueueItem]), counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) { - var m runtime.MemStats - runtime.ReadMemStats(&m) - allocMB := float64(m.Alloc) / 1024 / 1024 - // totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024 - sysMB := float64(m.Sys) / 1024 / 1024 - log.Printf("队列堆积数据: %d, Alloc = %.2f MB, Sys = %.2fMB\n", params.Value, allocMB, sysMB) + if params.Value != 0 { + var m runtime.MemStats + runtime.ReadMemStats(&m) + allocMB := float64(m.Alloc) / 1024 / 1024 + // totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024 + sysMB := float64(m.Sys) / 1024 / 1024 + log.Printf("队列堆积数据: %d, Alloc = %.2f MB, Sys = %.2fMB\n", params.Value, allocMB, sysMB) + } }), } } diff --git a/sm_upate_handler.go b/sm_upate_handler.go index e52d272..e6cb9eb 100644 --- a/sm_upate_handler.go +++ b/sm_upate_handler.go @@ -8,8 +8,6 @@ import ( sm "github.com/lni/dragonboat/v4/statemachine" ) -var allPutCount int = 0 - // 结构体异步传递后, 执行的注册函数, 实际上就是update的handler var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] { @@ -37,7 +35,9 @@ var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] { Value: cmd.Item, }) - allPutCount++ + countPutPop.Use(func(item *CountPutPop) { + item.PutCount++ + }) var result sm.Result dequeueHandler.Notify(dequeueHandler.NULL) // 通知可以执行update diff --git a/websocket.go b/websocket.go index 6b627f6..a9b22b6 100644 --- a/websocket.go +++ b/websocket.go @@ -7,8 +7,10 @@ import ( "io" "log" "net/http" + "sync" "time" + "github.com/474420502/execute/triggered" "github.com/gorilla/websocket" "github.com/lni/dragonboat/v4" "github.com/lni/dragonboat/v4/client" @@ -30,9 +32,34 @@ func HttpListen(ns *dragonboat.NodeHost, port int) { http.ListenAndServe(fmt.Sprintf(":%d", port), nil) } -func queueHandler(w http.ResponseWriter, r *http.Request) { +type CountPutPop struct { + PutCount uint64 + PopCount uint64 + SelfCount map[string]uint64 + mu sync.Mutex +} - var count int +func (pp *CountPutPop) Use(do func(item *CountPutPop)) { + pp.mu.Lock() + defer pp.mu.Unlock() + do(pp) +} + +var countPutPop *CountPutPop = &CountPutPop{ + SelfCount: map[string]uint64{}, +} + +var logIntervalTimeHandler = triggered.RegisterExecute[*CountPutPop](func(params *triggered.Params[*CountPutPop]) { + params.Value.Use(func(item *CountPutPop) { + log.Printf("all pop: %d all put: %d\n", item.PutCount, item.PopCount) + for k, v := range item.SelfCount { + log.Printf("%s pop: %d\n", k, v) + } + }) + time.Sleep(time.Second * 15) +}) + +func queueHandler(w http.ResponseWriter, r *http.Request) { var err error conn, err := upgrader.Upgrade(w, r, nil) @@ -49,6 +76,10 @@ func queueHandler(w http.ResponseWriter, r *http.Request) { log.Printf("%s 退出连接 当前unity处理机器数量 %d\n", raddr, DequeueHandler.RefCountAdd(-1)) }() + countPutPop.Use(func(item *CountPutPop) { + item.SelfCount[raddr.String()] = 0 + }) + for { item := <-PopChannel @@ -71,7 +102,7 @@ func queueHandler(w http.ResponseWriter, r *http.Request) { } } - err = conn.SetWriteDeadline(time.Now().Add(time.Second * 4)) + err = conn.SetWriteDeadline(time.Now().Add(time.Second * 3)) if err != nil { log.Println(err) log.Println("重新回队") @@ -88,9 +119,11 @@ func queueHandler(w http.ResponseWriter, r *http.Request) { return } - count++ - log.Printf("count: %d all put: %d", count, allPutCount) - + countPutPop.Use(func(item *CountPutPop) { + item.PopCount++ + item.SelfCount[raddr.String()]++ + }) + logIntervalTimeHandler.Notify(countPutPop) // 打印消息 log.Printf("source: [%s] 数据 推送到unity [%s]\n", item.Source, raddr.String()) }