From da8af186c85147b7747e7c55210b099963e1663f Mon Sep 17 00:00:00 2001 From: eson <9673575+githubcontent@user.noreply.gitee.com> Date: Tue, 1 Aug 2023 19:49:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=87=86=E5=A4=87=E8=B5=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 3 +-- go.sum | 6 ++---- main.go | 4 ++-- sm.go | 8 ++++---- start_test.go | 1 + 5 files changed, 10 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index b63d143..276d283 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,7 @@ module fusenrender go 1.20 require ( - github.com/474420502/batchexecute v0.0.2 - github.com/474420502/execute v0.0.3 + github.com/474420502/execute v0.1.1 github.com/google/uuid v1.3.0 github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 diff --git a/go.sum b/go.sum index 724417b..1681bf9 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/474420502/batchexecute v0.0.2 h1:gTMyUh2x6HpV7/+nOLj4qFRO/l5/B/eg7dXsHxLJpbg= -github.com/474420502/batchexecute v0.0.2/go.mod h1:IWazO1QTaB5LyWwMxSqIX/6g/UXwwpnqk0AVM5j24J0= -github.com/474420502/execute v0.0.3 h1:kmVaUG/LQis0vXiLsr0WS0pCpHjtsqb1gy9fbuH8oTk= -github.com/474420502/execute v0.0.3/go.mod h1:3/9VeQxxZx5lAqz1UWEolgp0OK3hautW9jhMyReuWZc= +github.com/474420502/execute v0.1.1 h1:lMG/f/NOSScD10Yyqkazd2uAgW8Ogj0ZLG/Pm7lsYE8= +github.com/474420502/execute v0.1.1/go.mod h1:3/9VeQxxZx5lAqz1UWEolgp0OK3hautW9jhMyReuWZc= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= diff --git a/main.go b/main.go index 844bbdd..f98ca03 100644 --- a/main.go +++ b/main.go @@ -72,7 +72,7 @@ var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool]) } } } -}, nil) +}) var addresses []string = []string{ "localhost:5500", @@ -165,7 +165,7 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat } // 设置共享的参数 - Consumption.SetShared(nh) + Consumption.WithShared(nh) if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil { fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err) diff --git a/sm.go b/sm.go index fdeab75..059e3e3 100644 --- a/sm.go +++ b/sm.go @@ -86,9 +86,9 @@ func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine { queues: make(map[string]*PriorityQueue[QueueItem]), counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) { - log.Printf("queue remain: %d\n", *params.Value) + log.Printf("queue remain: %d\n", params.Value) time.Sleep(time.Second * 5) - }, nil), + }), } } @@ -133,7 +133,7 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { Value: cmd.Item, }) result.Value = uint64(len(d)) - Consumption.Notify(nil) // 通知可以执行update + Consumption.Notify(Consumption.NULL) // 通知可以执行update return result, err case "dequeue": var queue *PriorityQueue[QueueItem] @@ -166,7 +166,7 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) { queue.Empty() size := queue.Size() - s.counter.Notify(&size) + s.counter.Notify(size) // log.Println("queue remain:", queue.Size()) } diff --git a/start_test.go b/start_test.go index 969e211..46bde4d 100644 --- a/start_test.go +++ b/start_test.go @@ -89,6 +89,7 @@ func TestStartNodeB(t *testing.T) { // this goroutine makes a linearizable read every 10 second. it returns the // Count value maintained in IStateMachine. see datastore.go for details. cs := nh.GetNoOPSession(128) + ticker := time.NewTicker(10 * time.Millisecond) for { select {