diff --git a/fsm/main.go b/fsm/main.go index f0b35c4f..f4ed95c9 100644 --- a/fsm/main.go +++ b/fsm/main.go @@ -131,7 +131,6 @@ func StartNode(ServerID uint64, serverconfigs []*autoconfig.ConfigServer, gdb *g } return ss - } // func JoinCluster(ServerID string, LeaderAddress string, RaftBind string, gdb *gorm.DB) *StateCluster { @@ -208,44 +207,3 @@ func (us *UserState) Decode(data []byte) error { } return nil } - -type OperateType string - -const ( - OP_Update OperateType = "update" -) - -// Command is used for internal Command representation. -type Command struct { - Op OperateType - Key int64 - Value *UserState -} - -func (cmd *Command) Encode(do func(buf []byte) error) error { - var buf bytes.Buffer - - err := gob.NewEncoder(&buf).Encode(cmd) - if err != nil { - return err - } - - if do != nil { - err := do(buf.Bytes()) - if err != nil { - return err - } - } - - return nil -} - -func (cmd *Command) Decode(sbuf []byte) error { - var buf = bytes.NewBuffer(sbuf) - err := gob.NewDecoder(buf).Decode(cmd) - if err != nil { - // log.Panic(err) - return err - } - return nil -} diff --git a/fsm/shared_state.go b/fsm/shared_state.go new file mode 100644 index 00000000..57516193 --- /dev/null +++ b/fsm/shared_state.go @@ -0,0 +1,53 @@ +package fsm + +import ( + "context" + "log" + + "github.com/lni/dragonboat/v4" +) + +type SharedState struct { + shardID uint64 + replicaID uint64 + nh *dragonboat.NodeHost +} + +func (ss *SharedState) GetUserState(Userid int64) (us *UserState, err error) { + + ius, err := ss.nh.SyncRead(context.TODO(), ss.shardID, Userid) + if err != nil { + log.Println(err) + return nil, err + } + + if ius != nil { + return ius.(*UserState), nil + } + + // cmd := &Command{ + // Op: OP_Update, + // Key: Userid, + // } + + cs := ss.nh.GetNoOPSession(128) + + cmd := &CmdUpdate{UserId: Userid} + cmdBuf, err := FsPasser.PackToBytes(cmd) + if err != nil { + return nil, err + } + + result, err := ss.nh.SyncPropose(context.TODO(), cs, cmdBuf) + if err != nil { + return nil, err + } + + us = &UserState{} + err = us.Decode(result.Data) + if err != nil { + return nil, err + } + + return us, err +} diff --git a/fsm/fsm.go b/fsm/sm.go similarity index 59% rename from fsm/fsm.go rename to fsm/sm.go index b2697264..f9301e79 100644 --- a/fsm/fsm.go +++ b/fsm/sm.go @@ -3,16 +3,10 @@ package fsm import ( "context" "encoding/gob" - "fmt" "fusenapi/initalize" - "fusenapi/model/gmodel" - "fusenapi/utils/auth" "io" - "log" "sync" - "time" - "github.com/lni/dragonboat/v4" sm "github.com/lni/dragonboat/v4/statemachine" "gorm.io/gorm" ) @@ -39,48 +33,6 @@ func NewFsStateMachine(shardID uint64, replicaID uint64) sm.IStateMachine { } } -type SharedState struct { - shardID uint64 - replicaID uint64 - nh *dragonboat.NodeHost -} - -func (ss *SharedState) GetUserState(Userid int64) (us *UserState, err error) { - - ius, err := ss.nh.SyncRead(context.TODO(), ss.shardID, Userid) - if err != nil { - log.Println(err) - return nil, err - } - - if ius != nil { - return ius.(*UserState), nil - } - - cmd := &Command{ - Op: OP_Update, - Key: Userid, - } - - cs := ss.nh.GetNoOPSession(128) - err = cmd.Encode(func(buf []byte) error { - result, err := ss.nh.SyncPropose(context.TODO(), cs, buf) - if err != nil { - return err - } - - us = &UserState{} - err = us.Decode(result.Data) - if err != nil { - return err - } - - return nil - }) - - return us, err -} - // Lookup performs local lookup on the ExampleStateMachine instance. In this example, // we always return the Count value as a little endian binary encoded byte // slice. @@ -98,48 +50,10 @@ func (s *FsStateMachine) Lookup(query interface{}) (item interface{}, err error) // Update updates the object using the specified committed raft entry. func (s *FsStateMachine) Update(e sm.Entry) (result sm.Result, err error) { - - var cmd Command - err = cmd.Decode(e.Cmd) - if err != nil { - return result, err - } - - switch cmd.Op { - case OP_Update: - s.mu.Lock() - defer s.mu.Unlock() - - if old, ok := s.store[cmd.Key]; ok { - if time.Since(old.UpdateAt) <= time.Second { - return - } - } - - // log.Println("update") - models := gmodel.NewAllModels(s.gdb) - user, err := models.FsUser.FindUserById(context.TODO(), cmd.Key) - if err != nil { - log.Println(err) - } - userState := &UserState{ - UserId: cmd.Key, - PwdHash: auth.StringToHash(*user.PasswordHash), - UpdateAt: time.Now(), - } - s.store[cmd.Key] = userState - err = userState.Encode(func(b []byte) error { - e.Result.Data = b - result.Data = b - return nil - }) - - return result, err - - default: - return result, fmt.Errorf("unknonw cmd type: %s", cmd.Op) - } - + ctx := context.TODO() + ctx = context.WithValue(ctx, ctxEntry{}, &e) + ctx = context.WithValue(ctx, ctxSM{}, s) + return FsPasser.ExecuteWithBytes(ctx, e.Cmd) } // SaveSnapshot saves the current IStateMachine state into a snapshot using the diff --git a/fsm/sm_update_handler.go b/fsm/sm_update_handler.go new file mode 100644 index 00000000..f0fa1d8b --- /dev/null +++ b/fsm/sm_update_handler.go @@ -0,0 +1,65 @@ +package fsm + +import ( + "context" + "fusenapi/model/gmodel" + "fusenapi/utils/auth" + "log" + "time" + + "github.com/474420502/passer" + sm "github.com/lni/dragonboat/v4/statemachine" +) + +type CmdUpdate struct { + UserId int64 +} + +// 上下文传递 +type ( + ctxSM struct{} + // ctxUserState struct{} + ctxEntry struct{} +) + +// 结构体异步传递后, 执行的注册函数, 实际上就是update的handler +var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] { + + fsPasser := passer.NewPasser[sm.Result]() + fsPasser.RegisterPasser(&CmdUpdate{}, func(ctx context.Context, obj any) (sm.Result, error) { + var result sm.Result + + cmd := obj.(*CmdUpdate) + s := ctx.Value(ctxSM{}).(*FsStateMachine) + e := ctx.Value(ctxEntry{}).(*sm.Entry) + + if old, ok := s.store[cmd.UserId]; ok { + if time.Since(old.UpdateAt) <= time.Second { + return result, nil + } + } + + // log.Println("update") + models := gmodel.NewAllModels(s.gdb) + user, err := models.FsUser.FindUserById(context.TODO(), cmd.UserId) + if err != nil { + log.Println(err) + } + + userState := &UserState{ + UserId: cmd.UserId, + PwdHash: auth.StringToHash(*user.PasswordHash), + UpdateAt: time.Now(), + } + s.store[cmd.UserId] = userState + + err = userState.Encode(func(b []byte) error { + e.Result.Data = b + result.Data = b + return nil + }) + return result, err + }) + + return fsPasser +}() diff --git a/fsm/waitcallback.go b/fsm/waitcallback.go deleted file mode 100644 index a03fe1ab..00000000 --- a/fsm/waitcallback.go +++ /dev/null @@ -1,57 +0,0 @@ -package fsm - -import ( - "sync" - "time" -) - -type condAndState struct { - cond *sync.Cond - state *UserState -} - -type WaitCallback struct { - Waiter sync.Map - mu sync.Mutex -} - -func NewWaitCallback() *WaitCallback { - return &WaitCallback{} -} - -func (w *WaitCallback) Done(us *UserState) { - if v, ok := w.Waiter.Load(us.UserId); ok { - w.mu.Lock() - defer w.mu.Unlock() - cas := v.(*condAndState) - cas.state = us - cas.cond.Broadcast() - } -} - -func (w *WaitCallback) Wait(key int64, timeout time.Duration) *UserState { - cas := &condAndState{ - cond: sync.NewCond(&w.mu), - state: nil, - } - - v, loaded := w.Waiter.LoadOrStore(key, cas) - if loaded { - cas = v.(*condAndState) - } - - done := make(chan struct{}) - go func() { - w.mu.Lock() - defer w.mu.Unlock() - cas.cond.Wait() - close(done) - }() - - select { - case <-done: - return cas.state - case <-time.After(timeout): - return nil - } -} diff --git a/go.mod b/go.mod index 71dd9c62..1f76d5aa 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module fusenapi go 1.20 require ( + github.com/474420502/passer v0.0.1 github.com/SebastiaanKlippert/go-wkhtmltopdf v1.9.0 github.com/aws/aws-sdk-go v1.44.295 github.com/bwmarrin/snowflake v0.3.0 @@ -62,7 +63,7 @@ require ( github.com/valyala/histogram v1.2.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect golang.org/x/crypto v0.11.0 // indirect - golang.org/x/exp v0.0.0-20200513190911-00229845015e // indirect + golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect diff --git a/go.sum b/go.sum index 2519f840..e5035777 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/474420502/passer v0.0.1 h1:ZWnt7hpFzsYDV7LHSEyLvLUvW5mRxrnDmgFdIl17q3w= +github.com/474420502/passer v0.0.1/go.mod h1:MmnnrF9d51sPkFzdRq2pQtxQKqyjburVM1LjMbOCezE= github.com/474420502/random v0.4.1 h1:HUUyLXRWMijVb7CJoEC16f0aFQOW25Lkr80Mut6PoKU= github.com/474420502/requests v1.40.0 h1:VDuLxSG/3IGBvMfjPV8+o7s1l5mOwLAgfo5Og6vMAJw= github.com/474420502/requests v1.40.0/go.mod h1:2SCVzim0ONFYG09g/GrM7RTeJIC6qTyZfnohsjnG5C8= @@ -391,6 +393,7 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= @@ -468,6 +471,7 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= @@ -616,8 +620,9 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20200513190911-00229845015e h1:rMqLP+9XLy+LdbCXHjJHAmTfXCr93W7oruWA6Hq1Alc= golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw= +golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691 h1:/yRP+0AN7mf5DkD3BAI6TOFnd51gEoDEb8o35jIFtgw= +golang.org/x/exp v0.0.0-20230728194245-b0cb94b80691/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b h1:+qEpEAPhDZ1o0x3tHzZTQDArnOixOzGD9HUJfcg0mb4= @@ -947,6 +952,7 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/cheggaaa/pb.v1 v1.0.28 h1:n1tBJnnK2r7g9OW2btFH91V92STTUevLXYFb8gy9EMk= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= diff --git a/server/resource/internal/handler/resourceinfohandler.go b/server/resource/internal/handler/resourceinfohandler.go index 14b39d98..b358e31e 100644 --- a/server/resource/internal/handler/resourceinfohandler.go +++ b/server/resource/internal/handler/resourceinfohandler.go @@ -15,7 +15,7 @@ func ResourceInfoHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var req types.ResourceInfoReq - userinfo, err := basic.RequestParse(w, r, svcCtx, &req) + userinfo, err := basic.RequestParse(w, r, svcCtx.SharedState, &req) if err != nil { return } diff --git a/server/resource/internal/svc/servicecontext.go b/server/resource/internal/svc/servicecontext.go index f4b7039e..69f90fb9 100644 --- a/server/resource/internal/svc/servicecontext.go +++ b/server/resource/internal/svc/servicecontext.go @@ -3,6 +3,7 @@ package svc import ( "errors" "fmt" + "fusenapi/fsm" "fusenapi/server/resource/internal/config" "net/http" @@ -17,7 +18,8 @@ import ( ) type ServiceContext struct { - Config config.Config + Config config.Config + SharedState *fsm.SharedState MysqlConn *gorm.DB AllModels *gmodel.AllModelsGen