准备走

This commit is contained in:
eson 2023-08-01 19:49:00 +08:00
parent 128a3df07c
commit da8af186c8
5 changed files with 10 additions and 12 deletions

3
go.mod
View File

@ -3,8 +3,7 @@ module fusenrender
go 1.20 go 1.20
require ( require (
github.com/474420502/batchexecute v0.0.2 github.com/474420502/execute v0.1.1
github.com/474420502/execute v0.0.3
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be github.com/lni/dragonboat/v4 v4.0.0-20230709075559-54497b9553be
github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4

6
go.sum
View File

@ -1,9 +1,7 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/474420502/batchexecute v0.0.2 h1:gTMyUh2x6HpV7/+nOLj4qFRO/l5/B/eg7dXsHxLJpbg= github.com/474420502/execute v0.1.1 h1:lMG/f/NOSScD10Yyqkazd2uAgW8Ogj0ZLG/Pm7lsYE8=
github.com/474420502/batchexecute v0.0.2/go.mod h1:IWazO1QTaB5LyWwMxSqIX/6g/UXwwpnqk0AVM5j24J0= github.com/474420502/execute v0.1.1/go.mod h1:3/9VeQxxZx5lAqz1UWEolgp0OK3hautW9jhMyReuWZc=
github.com/474420502/execute v0.0.3 h1:kmVaUG/LQis0vXiLsr0WS0pCpHjtsqb1gy9fbuH8oTk=
github.com/474420502/execute v0.0.3/go.mod h1:3/9VeQxxZx5lAqz1UWEolgp0OK3hautW9jhMyReuWZc=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=

View File

@ -72,7 +72,7 @@ var Consumption = triggered.RegisterExecute(func(params *triggered.Params[bool])
} }
} }
} }
}, nil) })
var addresses []string = []string{ var addresses []string = []string{
"localhost:5500", "localhost:5500",
@ -165,7 +165,7 @@ func StartNode(replicaID uint64, exampleShardID uint64, addr string) *dragonboat
} }
// 设置共享的参数 // 设置共享的参数
Consumption.SetShared(nh) Consumption.WithShared(nh)
if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil { if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil {
fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err) fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err)

8
sm.go
View File

@ -86,9 +86,9 @@ func NewSMQueue(shardID uint64, replicaID uint64) sm.IStateMachine {
queues: make(map[string]*PriorityQueue[QueueItem]), queues: make(map[string]*PriorityQueue[QueueItem]),
counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) { counter: triggered.RegisterExecute[int64](func(params *triggered.Params[int64]) {
log.Printf("queue remain: %d\n", *params.Value) log.Printf("queue remain: %d\n", params.Value)
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
}, nil), }),
} }
} }
@ -133,7 +133,7 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) {
Value: cmd.Item, Value: cmd.Item,
}) })
result.Value = uint64(len(d)) result.Value = uint64(len(d))
Consumption.Notify(nil) // 通知可以执行update Consumption.Notify(Consumption.NULL) // 通知可以执行update
return result, err return result, err
case "dequeue": case "dequeue":
var queue *PriorityQueue[QueueItem] var queue *PriorityQueue[QueueItem]
@ -166,7 +166,7 @@ func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) {
queue.Empty() queue.Empty()
size := queue.Size() size := queue.Size()
s.counter.Notify(&size) s.counter.Notify(size)
// log.Println("queue remain:", queue.Size()) // log.Println("queue remain:", queue.Size())
} }

View File

@ -89,6 +89,7 @@ func TestStartNodeB(t *testing.T) {
// this goroutine makes a linearizable read every 10 second. it returns the // this goroutine makes a linearizable read every 10 second. it returns the
// Count value maintained in IStateMachine. see datastore.go for details. // Count value maintained in IStateMachine. see datastore.go for details.
cs := nh.GetNoOPSession(128) cs := nh.GetNoOPSession(128)
ticker := time.NewTicker(10 * time.Millisecond) ticker := time.NewTicker(10 * time.Millisecond)
for { for {
select { select {