package fsm import ( "context" "encoding/gob" "fmt" "fusenapi/model/gmodel" "fusenapi/utils/auth" "io" "log" "sync" "time" "github.com/hashicorp/raft" "gorm.io/gorm" ) // StateCluster is a simple key-value store as an FSM. type StateCluster struct { mu sync.Mutex store map[int64]*UserState gdb *gorm.DB waiter *WaitCallback ra *raft.Raft // The consensus mechanism } func (fsm *StateCluster) GetUserState(Userid int64) (*UserState, error) { fsm.mu.Lock() if us, ok := fsm.store[Userid]; ok { // log.Println("exists") fsm.mu.Unlock() return us, nil } fsm.mu.Unlock() // log.Println("apply") cmd := &command{ Op: "update", Key: Userid, } future := fsm.ra.Apply(cmd.Encode(), time.Minute) // log.Println(future.Index()) if future.Response() != nil { return nil, future.Response().(error) } if us := fsm.waiter.Wait(Userid, time.Second*5); us != nil { return us, nil } else { return nil, fmt.Errorf("timeout") } } // Apply applies a Raft log entry to the key-value store. func (fsm *StateCluster) Apply(fsmlog *raft.Log) interface{} { var cmd command err := cmd.Decode(fsmlog.Data) if err != nil { return err } switch cmd.Op { case "update": fsm.mu.Lock() defer fsm.mu.Unlock() // log.Println("update") models := gmodel.NewAllModels(fsm.gdb) user, err := models.FsUser.FindUserById(context.TODO(), cmd.Key) if err != nil { log.Println(err) } userState := &UserState{ UserId: cmd.Key, PwdHash: auth.StringToHash(*user.PasswordHash), Expired: time.Now(), } fsm.store[cmd.Key] = userState fsm.waiter.Done(userState) default: return fmt.Errorf("unrecognized command operation: %s", cmd.Op) } return nil } // Snapshot returns a snapshot of the key-value store. func (fsm *StateCluster) Snapshot() (raft.FSMSnapshot, error) { fsm.mu.Lock() defer fsm.mu.Unlock() snapshot := kvStoreSnapshot{ store: make(map[int64]*UserState), } for k, v := range fsm.store { snapshot.store[k] = v } return &snapshot, nil } // Restore stores the key-value store to a previous state. func (fsm *StateCluster) Restore(rc io.ReadCloser) error { var snapshot kvStoreSnapshot dec := gob.NewDecoder(rc) if err := dec.Decode(&snapshot); err != nil { return err } fsm.store = snapshot.store return nil } // kvStoreSnapshot represents a snapshot of the key-value store. type kvStoreSnapshot struct { store map[int64]*UserState } // Persist writes the snapshot to the provided sink. func (snapshot *kvStoreSnapshot) Persist(sink raft.SnapshotSink) error { // The example has been simplified. In a production environment, you would // need to handle this operation with more care. return nil } // Release is invoked when the Raft instance is finished with the snapshot. func (snapshot *kvStoreSnapshot) Release() { // Normally you would put any cleanup here. }