From 86e6effaa1ba842fe8930958ca64293921f06d53 Mon Sep 17 00:00:00 2001 From: eson <474420502@qq.com> Date: Tue, 1 Aug 2023 01:18:55 +0800 Subject: [PATCH] =?UTF-8?q?TODO:=20=E4=BF=AE=E6=94=B9=E5=88=B0=E6=9B=B4?= =?UTF-8?q?=E5=8A=A0=E4=BC=98=E9=9B=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 + go.sum | 4 +++ main.go | 107 +++++++++++++++++++++++++++----------------------------- sm.go | 4 +-- 4 files changed, 57 insertions(+), 59 deletions(-) diff --git a/go.mod b/go.mod index 4495b03..c82ae61 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( require ( github.com/474420502/batchexecute v0.0.2 // indirect + github.com/474420502/execute v0.0.1 // indirect github.com/DataDog/zstd v1.4.5 // indirect github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/VictoriaMetrics/metrics v1.18.1 // indirect diff --git a/go.sum b/go.sum index b2cc84a..a3876b9 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,10 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/474420502/batchexecute v0.0.2 h1:gTMyUh2x6HpV7/+nOLj4qFRO/l5/B/eg7dXsHxLJpbg= github.com/474420502/batchexecute v0.0.2/go.mod h1:IWazO1QTaB5LyWwMxSqIX/6g/UXwwpnqk0AVM5j24J0= +github.com/474420502/execute v0.0.0-20230731165309-2d8b1969ccb7 h1:U0jTaGJGWzJM/mZSN6L8rBfACry8+xJeHRX9t99xxfE= +github.com/474420502/execute v0.0.0-20230731165309-2d8b1969ccb7/go.mod h1:3/9VeQxxZx5lAqz1UWEolgp0OK3hautW9jhMyReuWZc= +github.com/474420502/execute v0.0.1 h1:DCC7RiS+R+fZyw9cpYgCmGfC5iyBljdfpolSN20yVWo= +github.com/474420502/execute v0.0.1/go.mod h1:3/9VeQxxZx5lAqz1UWEolgp0OK3hautW9jhMyReuWZc= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= diff --git a/main.go b/main.go index 3d0d793..12eb396 100644 --- a/main.go +++ b/main.go @@ -12,7 +12,7 @@ import ( "syscall" "time" - "github.com/474420502/batchexecute" + "github.com/474420502/execute" "github.com/lni/dragonboat/v4" "github.com/lni/dragonboat/v4/config" "github.com/lni/dragonboat/v4/logger" @@ -22,58 +22,8 @@ func main() { } -func BatchFunc(nh *dragonboat.NodeHost) { - - BatchQueueExecute = batchexecute.NewBatchExecute[bool](func(params *bool) { - - cs := nh.GetNoOPSession(128) - for { - - cmd := Command{ - Name: "dequeue", - Group: "test", - } - - data, err := cmd.Encode() - if err != nil { - log.Println(err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - result, err := nh.SyncPropose(ctx, cs, data) - cancel() - if err != nil { - log.Println(err) - break - } else { - if len(result.Data) == 0 { - log.Println("wait 10 second") - var m runtime.MemStats - - runtime.ReadMemStats(&m) - - allocMB := float64(m.Alloc) / 1024 / 1024 - totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024 - sysMB := float64(m.Sys) / 1024 / 1024 - - fmt.Printf("Alloc = %.2f MB, TotalAlloc = %.2f MB, Sys = %.2f MB\n", allocMB, totalAllocMB, sysMB) - time.Sleep(time.Second * 10) - break - } else { - var item QueueItem - err := item.Decode(result.Data) - if err != nil { - log.Println(err) - } - log.Println(item) - - } - } - } - - }) - -} +var BatchQueueExecute = execute.NewEventTriggeredExecute[bool]() +var Consumption execute.Event var addresses []string = []string{ "localhost:5500", @@ -156,18 +106,63 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat // RaftAddress is used to identify the NodeHost instance RaftAddress: nodeAddr, } + nh, err := dragonboat.NewNodeHost(nhc) if err != nil { panic(err) } + Consumption = BatchQueueExecute.RegisterExecute(func(*bool) { + cs := nh.GetNoOPSession(128) + for { + + cmd := Command{ + Name: "dequeue", + Group: "test", + } + + data, err := cmd.Encode() + if err != nil { + log.Println(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + result, err := nh.SyncPropose(ctx, cs, data) + cancel() + if err != nil { + log.Println(err) + break + } else { + if len(result.Data) == 0 { + log.Println("wait 10 second") + var m runtime.MemStats + + runtime.ReadMemStats(&m) + + allocMB := float64(m.Alloc) / 1024 / 1024 + totalAllocMB := float64(m.TotalAlloc) / 1024 / 1024 + sysMB := float64(m.Sys) / 1024 / 1024 + + fmt.Printf("Alloc = %.2f MB, TotalAlloc = %.2f MB, Sys = %.2f MB\n", allocMB, totalAllocMB, sysMB) + time.Sleep(time.Second * 10) + break + } else { + var item QueueItem + err := item.Decode(result.Data) + if err != nil { + log.Println(err) + } + log.Println(item) + + } + } + } + }, nil) + if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil { fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err) os.Exit(1) } - BatchFunc(nh) - return nh - } diff --git a/sm.go b/sm.go index 62a45da..79b72a7 100644 --- a/sm.go +++ b/sm.go @@ -13,8 +13,6 @@ import ( sm "github.com/lni/dragonboat/v4/statemachine" ) -var BatchQueueExecute *batchexecute.BatchExecute[bool] - // SMQueue实现了StateMachine接口,用于管理基于PriorityQueue的消息队列 type SMQueue struct { // 所属的Shard ID @@ -133,7 +131,7 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { Value: cmd.Item, }) result.Value = uint64(len(d)) - go BatchQueueExecute.Execute(nil) // 通知可以执行update + BatchQueueExecute.Notify(Consumption, nil) // 通知可以执行update return result, err case "dequeue": var queue *PriorityQueue[QueueItem]