新改动
This commit is contained in:
42
fsm/main.go
42
fsm/main.go
@@ -131,7 +131,6 @@ func StartNode(ServerID uint64, serverconfigs []*autoconfig.ConfigServer, gdb *g
|
||||
}
|
||||
|
||||
return ss
|
||||
|
||||
}
|
||||
|
||||
// func JoinCluster(ServerID string, LeaderAddress string, RaftBind string, gdb *gorm.DB) *StateCluster {
|
||||
@@ -208,44 +207,3 @@ func (us *UserState) Decode(data []byte) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type OperateType string
|
||||
|
||||
const (
|
||||
OP_Update OperateType = "update"
|
||||
)
|
||||
|
||||
// Command is used for internal Command representation.
|
||||
type Command struct {
|
||||
Op OperateType
|
||||
Key int64
|
||||
Value *UserState
|
||||
}
|
||||
|
||||
func (cmd *Command) Encode(do func(buf []byte) error) error {
|
||||
var buf bytes.Buffer
|
||||
|
||||
err := gob.NewEncoder(&buf).Encode(cmd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if do != nil {
|
||||
err := do(buf.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cmd *Command) Decode(sbuf []byte) error {
|
||||
var buf = bytes.NewBuffer(sbuf)
|
||||
err := gob.NewDecoder(buf).Decode(cmd)
|
||||
if err != nil {
|
||||
// log.Panic(err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
53
fsm/shared_state.go
Normal file
53
fsm/shared_state.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package fsm
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"github.com/lni/dragonboat/v4"
|
||||
)
|
||||
|
||||
type SharedState struct {
|
||||
shardID uint64
|
||||
replicaID uint64
|
||||
nh *dragonboat.NodeHost
|
||||
}
|
||||
|
||||
func (ss *SharedState) GetUserState(Userid int64) (us *UserState, err error) {
|
||||
|
||||
ius, err := ss.nh.SyncRead(context.TODO(), ss.shardID, Userid)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if ius != nil {
|
||||
return ius.(*UserState), nil
|
||||
}
|
||||
|
||||
// cmd := &Command{
|
||||
// Op: OP_Update,
|
||||
// Key: Userid,
|
||||
// }
|
||||
|
||||
cs := ss.nh.GetNoOPSession(128)
|
||||
|
||||
cmd := &CmdUpdate{UserId: Userid}
|
||||
cmdBuf, err := FsPasser.PackToBytes(cmd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result, err := ss.nh.SyncPropose(context.TODO(), cs, cmdBuf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
us = &UserState{}
|
||||
err = us.Decode(result.Data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return us, err
|
||||
}
|
||||
@@ -3,16 +3,10 @@ package fsm
|
||||
import (
|
||||
"context"
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"fusenapi/initalize"
|
||||
"fusenapi/model/gmodel"
|
||||
"fusenapi/utils/auth"
|
||||
"io"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lni/dragonboat/v4"
|
||||
sm "github.com/lni/dragonboat/v4/statemachine"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
@@ -39,48 +33,6 @@ func NewFsStateMachine(shardID uint64, replicaID uint64) sm.IStateMachine {
|
||||
}
|
||||
}
|
||||
|
||||
type SharedState struct {
|
||||
shardID uint64
|
||||
replicaID uint64
|
||||
nh *dragonboat.NodeHost
|
||||
}
|
||||
|
||||
func (ss *SharedState) GetUserState(Userid int64) (us *UserState, err error) {
|
||||
|
||||
ius, err := ss.nh.SyncRead(context.TODO(), ss.shardID, Userid)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if ius != nil {
|
||||
return ius.(*UserState), nil
|
||||
}
|
||||
|
||||
cmd := &Command{
|
||||
Op: OP_Update,
|
||||
Key: Userid,
|
||||
}
|
||||
|
||||
cs := ss.nh.GetNoOPSession(128)
|
||||
err = cmd.Encode(func(buf []byte) error {
|
||||
result, err := ss.nh.SyncPropose(context.TODO(), cs, buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
us = &UserState{}
|
||||
err = us.Decode(result.Data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return us, err
|
||||
}
|
||||
|
||||
// Lookup performs local lookup on the ExampleStateMachine instance. In this example,
|
||||
// we always return the Count value as a little endian binary encoded byte
|
||||
// slice.
|
||||
@@ -98,48 +50,10 @@ func (s *FsStateMachine) Lookup(query interface{}) (item interface{}, err error)
|
||||
|
||||
// Update updates the object using the specified committed raft entry.
|
||||
func (s *FsStateMachine) Update(e sm.Entry) (result sm.Result, err error) {
|
||||
|
||||
var cmd Command
|
||||
err = cmd.Decode(e.Cmd)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
switch cmd.Op {
|
||||
case OP_Update:
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if old, ok := s.store[cmd.Key]; ok {
|
||||
if time.Since(old.UpdateAt) <= time.Second {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// log.Println("update")
|
||||
models := gmodel.NewAllModels(s.gdb)
|
||||
user, err := models.FsUser.FindUserById(context.TODO(), cmd.Key)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
userState := &UserState{
|
||||
UserId: cmd.Key,
|
||||
PwdHash: auth.StringToHash(*user.PasswordHash),
|
||||
UpdateAt: time.Now(),
|
||||
}
|
||||
s.store[cmd.Key] = userState
|
||||
err = userState.Encode(func(b []byte) error {
|
||||
e.Result.Data = b
|
||||
result.Data = b
|
||||
return nil
|
||||
})
|
||||
|
||||
return result, err
|
||||
|
||||
default:
|
||||
return result, fmt.Errorf("unknonw cmd type: %s", cmd.Op)
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
ctx = context.WithValue(ctx, ctxEntry{}, &e)
|
||||
ctx = context.WithValue(ctx, ctxSM{}, s)
|
||||
return FsPasser.ExecuteWithBytes(ctx, e.Cmd)
|
||||
}
|
||||
|
||||
// SaveSnapshot saves the current IStateMachine state into a snapshot using the
|
||||
65
fsm/sm_update_handler.go
Normal file
65
fsm/sm_update_handler.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package fsm
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fusenapi/model/gmodel"
|
||||
"fusenapi/utils/auth"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/474420502/passer"
|
||||
sm "github.com/lni/dragonboat/v4/statemachine"
|
||||
)
|
||||
|
||||
type CmdUpdate struct {
|
||||
UserId int64
|
||||
}
|
||||
|
||||
// 上下文传递
|
||||
type (
|
||||
ctxSM struct{}
|
||||
// ctxUserState struct{}
|
||||
ctxEntry struct{}
|
||||
)
|
||||
|
||||
// 结构体异步传递后, 执行的注册函数, 实际上就是update的handler
|
||||
var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] {
|
||||
|
||||
fsPasser := passer.NewPasser[sm.Result]()
|
||||
fsPasser.RegisterPasser(&CmdUpdate{}, func(ctx context.Context, obj any) (sm.Result, error) {
|
||||
var result sm.Result
|
||||
|
||||
cmd := obj.(*CmdUpdate)
|
||||
s := ctx.Value(ctxSM{}).(*FsStateMachine)
|
||||
e := ctx.Value(ctxEntry{}).(*sm.Entry)
|
||||
|
||||
if old, ok := s.store[cmd.UserId]; ok {
|
||||
if time.Since(old.UpdateAt) <= time.Second {
|
||||
return result, nil
|
||||
}
|
||||
}
|
||||
|
||||
// log.Println("update")
|
||||
models := gmodel.NewAllModels(s.gdb)
|
||||
user, err := models.FsUser.FindUserById(context.TODO(), cmd.UserId)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
||||
userState := &UserState{
|
||||
UserId: cmd.UserId,
|
||||
PwdHash: auth.StringToHash(*user.PasswordHash),
|
||||
UpdateAt: time.Now(),
|
||||
}
|
||||
s.store[cmd.UserId] = userState
|
||||
|
||||
err = userState.Encode(func(b []byte) error {
|
||||
e.Result.Data = b
|
||||
result.Data = b
|
||||
return nil
|
||||
})
|
||||
return result, err
|
||||
})
|
||||
|
||||
return fsPasser
|
||||
}()
|
||||
@@ -1,57 +0,0 @@
|
||||
package fsm
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type condAndState struct {
|
||||
cond *sync.Cond
|
||||
state *UserState
|
||||
}
|
||||
|
||||
type WaitCallback struct {
|
||||
Waiter sync.Map
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewWaitCallback() *WaitCallback {
|
||||
return &WaitCallback{}
|
||||
}
|
||||
|
||||
func (w *WaitCallback) Done(us *UserState) {
|
||||
if v, ok := w.Waiter.Load(us.UserId); ok {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
cas := v.(*condAndState)
|
||||
cas.state = us
|
||||
cas.cond.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WaitCallback) Wait(key int64, timeout time.Duration) *UserState {
|
||||
cas := &condAndState{
|
||||
cond: sync.NewCond(&w.mu),
|
||||
state: nil,
|
||||
}
|
||||
|
||||
v, loaded := w.Waiter.LoadOrStore(key, cas)
|
||||
if loaded {
|
||||
cas = v.(*condAndState)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
cas.cond.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return cas.state
|
||||
case <-time.After(timeout):
|
||||
return nil
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user