修复一部分fsm

This commit is contained in:
2023-07-30 18:50:27 +08:00
parent 12f85aecd7
commit 47ae46c46d
4 changed files with 589 additions and 98 deletions

View File

@@ -11,125 +11,152 @@ import (
"sync"
"time"
"github.com/hashicorp/raft"
"github.com/lni/dragonboat/v4"
sm "github.com/lni/dragonboat/v4/statemachine"
"gorm.io/gorm"
)
// StateCluster is a simple key-value store as an FSM.
type StateCluster struct {
mu sync.Mutex
shardID uint64
replicaID uint64
nh *dragonboat.NodeHost
mu sync.Mutex
store map[int64]*UserState
gdb *gorm.DB
waiter *WaitCallback
ra *raft.Raft // The consensus mechanism
gdb *gorm.DB
// waiter *WaitCallback
// ra *raft.Raft // The consensus mechanism
}
func (fsm *StateCluster) GetUserState(Userid int64) (*UserState, error) {
func (fsm *StateCluster) GetUserState(Userid int64) (us *UserState, err error) {
fsm.mu.Lock()
if us, ok := fsm.store[Userid]; ok {
// log.Println("exists")
fsm.mu.Unlock()
return us, nil
ius, err := fsm.nh.SyncRead(context.TODO(), fsm.shardID, Userid)
if err != nil {
log.Println(err)
return nil, err
}
fsm.mu.Unlock()
// log.Println("apply")
cmd := &command{
Op: "update",
if ius != nil {
return ius.(*UserState), nil
}
cmd := &Command{
Op: OP_Update,
Key: Userid,
}
future := fsm.ra.Apply(cmd.Encode(), time.Minute)
// log.Println(future.Index())
cs := fsm.nh.GetNoOPSession(128)
err = cmd.Encode(func(buf []byte) error {
result, err := fsm.nh.SyncPropose(context.TODO(), cs, buf)
if err != nil {
return err
}
if future.Response() != nil {
return nil, future.Response().(error)
}
us = &UserState{}
err = us.Decode(result.Data)
if err != nil {
return err
}
if us := fsm.waiter.Wait(Userid, time.Second*5); us != nil {
return us, nil
} else {
return nil, fmt.Errorf("timeout")
}
return nil
})
return us, err
}
// Apply applies a Raft log entry to the key-value store.
func (fsm *StateCluster) Apply(fsmlog *raft.Log) interface{} {
var cmd command
// Lookup performs local lookup on the ExampleStateMachine instance. In this example,
// we always return the Count value as a little endian binary encoded byte
// slice.
func (s *StateCluster) Lookup(query interface{}) (item interface{}, err error) {
s.mu.Lock()
defer s.mu.Unlock()
err := cmd.Decode(fsmlog.Data)
userid := query.(int64)
if us, ok := s.store[userid]; ok {
return us, nil
}
return nil, nil
}
// Update updates the object using the specified committed raft entry.
func (s *StateCluster) Update(e sm.Entry) (result sm.Result, err error) {
var cmd Command
err = cmd.Decode(e.Cmd)
if err != nil {
return err
return result, err
}
switch cmd.Op {
case "update":
fsm.mu.Lock()
defer fsm.mu.Unlock()
case OP_Update:
s.mu.Lock()
defer s.mu.Unlock()
if old, ok := s.store[cmd.Key]; ok {
if time.Since(old.UpdateAt) <= time.Second {
return
}
}
// log.Println("update")
models := gmodel.NewAllModels(fsm.gdb)
models := gmodel.NewAllModels(s.gdb)
user, err := models.FsUser.FindUserById(context.TODO(), cmd.Key)
if err != nil {
log.Println(err)
}
userState := &UserState{
UserId: cmd.Key,
PwdHash: auth.StringToHash(*user.PasswordHash),
Expired: time.Now(),
UserId: cmd.Key,
PwdHash: auth.StringToHash(*user.PasswordHash),
UpdateAt: time.Now(),
}
fsm.store[cmd.Key] = userState
fsm.waiter.Done(userState)
s.store[cmd.Key] = userState
err = userState.Encode(func(b []byte) error {
e.Result.Data = b
result.Data = b
return nil
})
return result, err
default:
return fmt.Errorf("unrecognized command operation: %s", cmd.Op)
return result, fmt.Errorf("unknonw cmd type: %s", cmd.Op)
}
return nil
}
// Snapshot returns a snapshot of the key-value store.
func (fsm *StateCluster) Snapshot() (raft.FSMSnapshot, error) {
fsm.mu.Lock()
defer fsm.mu.Unlock()
// SaveSnapshot saves the current IStateMachine state into a snapshot using the
// specified io.Writer object.
func (s *StateCluster) SaveSnapshot(w io.Writer,
fc sm.ISnapshotFileCollection, done <-chan struct{}) error {
// as shown above, the only state that can be saved is the Count variable
// there is no external file in this IStateMachine example, we thus leave
// the fc untouched
snapshot := kvStoreSnapshot{
store: make(map[int64]*UserState),
}
s.mu.Lock()
defer s.mu.Unlock()
for k, v := range fsm.store {
snapshot.store[k] = v
}
return &snapshot, nil
return gob.NewEncoder(w).Encode(&s.store)
}
// Restore stores the key-value store to a previous state.
func (fsm *StateCluster) Restore(rc io.ReadCloser) error {
var snapshot kvStoreSnapshot
dec := gob.NewDecoder(rc)
if err := dec.Decode(&snapshot); err != nil {
// RecoverFromSnapshot recovers the state using the provided snapshot.
func (s *StateCluster) RecoverFromSnapshot(r io.Reader,
files []sm.SnapshotFile,
done <-chan struct{}) error {
// restore the Count variable, that is the only state we maintain in this
// example, the input files is expected to be empty
err := gob.NewDecoder(r).Decode(&s.store)
if err != nil {
return err
}
fsm.store = snapshot.store
return nil
}
// kvStoreSnapshot represents a snapshot of the key-value store.
type kvStoreSnapshot struct {
store map[int64]*UserState
}
// Persist writes the snapshot to the provided sink.
func (snapshot *kvStoreSnapshot) Persist(sink raft.SnapshotSink) error {
// The example has been simplified. In a production environment, you would
// need to handle this operation with more care.
return nil
}
// Release is invoked when the Raft instance is finished with the snapshot.
func (snapshot *kvStoreSnapshot) Release() {
// Normally you would put any cleanup here.
}
// Close closes the IStateMachine instance. There is nothing for us to cleanup
// or release as this is a pure in memory data store. Note that the Close
// method is not guaranteed to be called as node can crash at any time.
func (s *StateCluster) Close() error { return nil }

View File

@@ -3,15 +3,23 @@ package fsm
import (
"bytes"
"encoding/gob"
"flag"
"fmt"
"fusenapi/initalize"
"fusenapi/utils/autoconfig"
"log"
"net"
"os"
"os/signal"
"path/filepath"
"runtime"
"syscall"
"time"
"github.com/hashicorp/raft"
"github.com/lni/dragonboat"
"github.com/lni/dragonboat/config"
"github.com/lni/dragonboat/logger"
"gorm.io/gorm"
)
@@ -31,13 +39,108 @@ func test1() {
select {}
}
var addresses []string = []string{
"localhost:5500",
"localhost:5501",
"localhost:5502",
}
func StartNode(replicaID uint64, exampleShardID uint64, addr string, gdb *gorm.DB) *dragonboat.NodeHost {
// addr := "localhost"
// addr = fmt.Sprintf("%s:%d", addr, port)
flag.Parse()
if len(addr) == 0 && replicaID != 1 && replicaID != 2 && replicaID != 3 {
fmt.Fprintf(os.Stderr, "node id must be 1, 2 or 3 when address is not specified\n")
os.Exit(1)
}
// https://github.com/golang/go/issues/17393
if runtime.GOOS == "darwin" {
signal.Ignore(syscall.Signal(0xd))
}
initialMembers := make(map[uint64]string)
// when joining a new node which is not an initial members, the initialMembers
// map should be empty.
// when restarting a node that is not a member of the initial nodes, you can
// leave the initialMembers to be empty. we still populate the initialMembers
// here for simplicity.
for idx, v := range addresses {
// key is the ReplicaID, ReplicaID is not allowed to be 0
// value is the raft address
initialMembers[uint64(idx+1)] = v
}
// for simplicity, in this example program, addresses of all those 3 initial
// raft members are hard coded. when address is not specified on the command
// line, we assume the node being launched is an initial raft member.
var nodeAddr = initialMembers[uint64(replicaID)]
fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr)
// change the log verbosity
logger.GetLogger("raft").SetLevel(logger.ERROR)
logger.GetLogger("rsm").SetLevel(logger.WARNING)
logger.GetLogger("transport").SetLevel(logger.WARNING)
logger.GetLogger("grpc").SetLevel(logger.WARNING)
// config for raft node
// See GoDoc for all available options
rc := config.Config{
// ShardID and ReplicaID of the raft node
ReplicaID: uint64(replicaID),
ShardID: exampleShardID,
ElectionRTT: 10,
HeartbeatRTT: 1,
CheckQuorum: true,
SnapshotEntries: 10,
CompactionOverhead: 5,
}
datadir := filepath.Join(
"example-data",
"queue-data",
fmt.Sprintf("node%d", replicaID))
nhc := config.NodeHostConfig{
WALDir: datadir,
// NodeHostDir is where everything else is stored.
NodeHostDir: datadir,
// RTTMillisecond is the average round trip time between NodeHosts (usually
// on two machines/vms), it is in millisecond. Such RTT includes the
// processing delays caused by NodeHosts, not just the network delay between
// two NodeHost instances.
RTTMillisecond: 200,
// RaftAddress is used to identify the NodeHost instance
RaftAddress: nodeAddr,
}
nh, err := dragonboat.NewNodeHost(nhc)
if err != nil {
panic(err)
}
if err := nh.StartReplica(initialMembers, false, NewSMQueue, rc); err != nil {
fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err)
os.Exit(1)
}
return nh
}
// StartNode 启动节点
func StartNode(ServerID string, RaftBind string, serverconfigs []*autoconfig.ConfigServer, gdb *gorm.DB) *StateCluster {
func StartNode1(ServerID string, RaftBind string, serverconfigs []*autoconfig.ConfigServer, gdb *gorm.DB) *StateCluster {
fsm := &StateCluster{
store: make(map[int64]*UserState),
waiter: NewWaitCallback(),
gdb: gdb,
store: make(map[int64]*UserState),
gdb: gdb,
}
var retainSnapshotCount = 2
@@ -156,41 +259,70 @@ func waitForCluster(ra *raft.Raft) {
// var gdb *gorm.DB = initalize.InitMysql("fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest")
type UserState struct {
Expired time.Time
UserId int64
PwdHash uint64
UpdateAt time.Time
UserId int64
PwdHash uint64
}
func (us *UserState) Encode() []byte {
var buf = bytes.NewBuffer(nil)
err := gob.NewEncoder(buf).Encode(us)
func (us *UserState) Encode(do func([]byte) error) error {
var buf bytes.Buffer
err := gob.NewEncoder(&buf).Encode(us)
if err != nil {
log.Panic(err)
return nil
return err
}
return buf.Bytes()
if do != nil {
err := do(buf.Bytes())
if err != nil {
return err
}
}
return nil
}
// command is used for internal command representation.
type command struct {
Op string
func (us *UserState) Decode(data []byte) error {
buf := bytes.NewBuffer(data)
err := gob.NewDecoder(buf).Decode(us)
if err != nil {
return err
}
return nil
}
type OperateType string
const (
OP_Update OperateType = "update"
)
// Command is used for internal Command representation.
type Command struct {
Op OperateType
Key int64
Value *UserState
}
func (cmd *command) Encode() []byte {
var buf = bytes.NewBuffer(nil)
err := gob.NewEncoder(buf).Encode(cmd)
func (cmd *Command) Encode(do func(buf []byte) error) error {
var buf bytes.Buffer
err := gob.NewEncoder(&buf).Encode(cmd)
if err != nil {
log.Panic(err)
return nil
return err
}
return buf.Bytes()
if do != nil {
err := do(buf.Bytes())
if err != nil {
return err
}
}
return nil
}
func (cmd *command) Decode(sbuf []byte) error {
func (cmd *Command) Decode(sbuf []byte) error {
var buf = bytes.NewBuffer(sbuf)
err := gob.NewDecoder(buf).Decode(cmd)
if err != nil {