修改状态机的写法
This commit is contained in:
209
shared/main.go
Normal file
209
shared/main.go
Normal file
@@ -0,0 +1,209 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"fusenapi/utils/autoconfig"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/lni/dragonboat/v4"
|
||||
"github.com/lni/dragonboat/v4/config"
|
||||
"github.com/lni/dragonboat/v4/logger"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func test1() {
|
||||
// log.SetFlags(log.Llongfile)
|
||||
|
||||
// fsm := StartNode("fs1", "localhost:5500", nil, initalize.InitMysql("fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest"))
|
||||
|
||||
// time.Sleep(time.Second * 5)
|
||||
|
||||
// for i := 0; i < 30; i++ {
|
||||
// go log.Println(fsm.GetUserState(39))
|
||||
// }
|
||||
|
||||
// log.Println(fsm.GetUserState(39))
|
||||
|
||||
// select {}
|
||||
}
|
||||
|
||||
var addresses []string = []string{
|
||||
"localhost:5500",
|
||||
"localhost:5501",
|
||||
"localhost:5502",
|
||||
}
|
||||
|
||||
var shardID uint64 = 128
|
||||
|
||||
func StartNode(ServerID uint64, serverconfigs []*autoconfig.ConfigServer, gdb *gorm.DB) *SharedState {
|
||||
|
||||
// addr := "localhost"
|
||||
|
||||
// addr = fmt.Sprintf("%s:%d", addr, port)
|
||||
|
||||
// https://github.com/golang/go/issues/17393
|
||||
if runtime.GOOS == "darwin" {
|
||||
signal.Ignore(syscall.Signal(0xd))
|
||||
}
|
||||
initialMembers := make(map[uint64]string)
|
||||
|
||||
// when joining a new node which is not an initial members, the initialMembers
|
||||
// map should be empty.
|
||||
// when restarting a node that is not a member of the initial nodes, you can
|
||||
// leave the initialMembers to be empty. we still populate the initialMembers
|
||||
// here for simplicity.
|
||||
|
||||
for _, v := range serverconfigs {
|
||||
// key is the ReplicaID, ReplicaID is not allowed to be 0
|
||||
// value is the raft address
|
||||
|
||||
initialMembers[v.ReplicaId] = fmt.Sprintf("%s:%d", v.Host, v.Port-2000)
|
||||
}
|
||||
|
||||
// for simplicity, in this example program, addresses of all those 3 initial
|
||||
// raft members are hard coded. when address is not specified on the command
|
||||
// line, we assume the node being launched is an initial raft member.
|
||||
|
||||
var nodeAddr = initialMembers[ServerID]
|
||||
|
||||
fmt.Fprintf(os.Stdout, "node address: %s\n", nodeAddr)
|
||||
// change the log verbosity
|
||||
logger.GetLogger("raft").SetLevel(logger.ERROR)
|
||||
logger.GetLogger("rsm").SetLevel(logger.WARNING)
|
||||
logger.GetLogger("transport").SetLevel(logger.WARNING)
|
||||
logger.GetLogger("grpc").SetLevel(logger.WARNING)
|
||||
// config for raft node
|
||||
// See GoDoc for all available options
|
||||
rc := config.Config{
|
||||
// ShardID and ReplicaID of the raft node
|
||||
ReplicaID: uint64(ServerID),
|
||||
ShardID: shardID,
|
||||
|
||||
ElectionRTT: 10,
|
||||
|
||||
HeartbeatRTT: 1,
|
||||
CheckQuorum: true,
|
||||
|
||||
SnapshotEntries: 10,
|
||||
|
||||
CompactionOverhead: 5,
|
||||
}
|
||||
datadir := filepath.Join(
|
||||
"shared-state",
|
||||
fmt.Sprintf("node%d", ServerID))
|
||||
|
||||
nhc := config.NodeHostConfig{
|
||||
|
||||
WALDir: datadir,
|
||||
// NodeHostDir is where everything else is stored.
|
||||
NodeHostDir: datadir,
|
||||
// RTTMillisecond is the average round trip time between NodeHosts (usually
|
||||
// on two machines/vms), it is in millisecond. Such RTT includes the
|
||||
// processing delays caused by NodeHosts, not just the network delay between
|
||||
// two NodeHost instances.
|
||||
RTTMillisecond: 200,
|
||||
// RaftAddress is used to identify the NodeHost instance
|
||||
RaftAddress: nodeAddr,
|
||||
}
|
||||
nh, err := dragonboat.NewNodeHost(nhc)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := nh.StartReplica(initialMembers, false, NewFsStateMachine, rc); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "failed to add cluster, %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
ss := &SharedState{
|
||||
shardID: shardID,
|
||||
replicaID: ServerID,
|
||||
nh: nh,
|
||||
}
|
||||
|
||||
return ss
|
||||
}
|
||||
|
||||
// func JoinCluster(ServerID string, LeaderAddress string, RaftBind string, gdb *gorm.DB) *StateCluster {
|
||||
|
||||
// fsm := StartNode(ServerID, RaftBind, gdb)
|
||||
|
||||
// configFuture := fsm.ra.GetConfiguration()
|
||||
// if err := configFuture.Error(); err != nil {
|
||||
// log.Fatalf("failed to get raft configuration: %v", err)
|
||||
// }
|
||||
|
||||
// for _, srv := range configFuture.Configuration().Servers {
|
||||
// if srv.ID == raft.ServerID(ServerID) && srv.Address == raft.ServerAddress(LeaderAddress) {
|
||||
// if future := fsm.ra.RemoveServer(srv.ID, 0, 0); future.Error() != nil {
|
||||
// log.Fatalf("Error removing existing server [%s]: %v", ServerID, future.Error())
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
// f := fsm.ra.AddVoter(raft.ServerID(ServerID), raft.ServerAddress(RaftBind), 0, 0)
|
||||
// if f.Error() != nil {
|
||||
// log.Fatalf("Error adding voter: %v", f.Error())
|
||||
// }
|
||||
|
||||
// return fsm
|
||||
// }
|
||||
|
||||
func waitForCluster(ra *raft.Raft) {
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
state := ra.State()
|
||||
if state == raft.Leader || state == raft.Follower {
|
||||
log.Println("Raft cluster is running")
|
||||
return
|
||||
} else {
|
||||
log.Println("Still waiting for the cluster to start...")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// var gdb *gorm.DB = initalize.InitMysql("fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest")
|
||||
|
||||
type UserState struct {
|
||||
UpdateAt time.Time
|
||||
UserId int64
|
||||
PwdHash uint64
|
||||
}
|
||||
|
||||
func (us *UserState) Encode(do func([]byte) error) error {
|
||||
|
||||
var buf bytes.Buffer
|
||||
err := gob.NewEncoder(&buf).Encode(us)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if do != nil {
|
||||
err := do(buf.Bytes())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (us *UserState) Decode(data []byte) error {
|
||||
buf := bytes.NewBuffer(data)
|
||||
err := gob.NewDecoder(buf).Decode(us)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
7
shared/main_test.go
Normal file
7
shared/main_test.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package shared
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestMain(t *testing.T) {
|
||||
test1()
|
||||
}
|
||||
48
shared/shared_state.go
Normal file
48
shared/shared_state.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"github.com/lni/dragonboat/v4"
|
||||
)
|
||||
|
||||
type SharedState struct {
|
||||
shardID uint64
|
||||
replicaID uint64
|
||||
nh *dragonboat.NodeHost
|
||||
}
|
||||
|
||||
func (ss *SharedState) GetUserState(Userid int64) (us *UserState, err error) {
|
||||
|
||||
ius, err := ss.nh.SyncRead(context.TODO(), ss.shardID, Userid)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if ius != nil {
|
||||
return ius.(*UserState), nil
|
||||
}
|
||||
|
||||
cs := ss.nh.GetNoOPSession(128)
|
||||
|
||||
cmd := &CmdUpdate{UserId: Userid}
|
||||
cmdBuf, err := FsPasser.PackToBytes(cmd)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result, err := ss.nh.SyncPropose(context.TODO(), cs, cmdBuf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
us = &UserState{}
|
||||
err = us.Decode(result.Data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return us, err
|
||||
}
|
||||
91
shared/sm.go
Normal file
91
shared/sm.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/gob"
|
||||
"fusenapi/initalize"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
sm "github.com/lni/dragonboat/v4/statemachine"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// FsStateMachine is a simple key-value store as an FSM.
|
||||
type FsStateMachine struct {
|
||||
shardID uint64
|
||||
replicaID uint64
|
||||
|
||||
mu sync.Mutex
|
||||
store map[int64]*UserState
|
||||
|
||||
gdb *gorm.DB
|
||||
// waiter *WaitCallback
|
||||
// ra *raft.Raft // The consensus mechanism
|
||||
}
|
||||
|
||||
func NewFsStateMachine(shardID uint64, replicaID uint64) sm.IStateMachine {
|
||||
return &FsStateMachine{
|
||||
shardID: shardID,
|
||||
replicaID: replicaID,
|
||||
store: make(map[int64]*UserState),
|
||||
gdb: initalize.InitMysql("fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest"),
|
||||
}
|
||||
}
|
||||
|
||||
// Lookup performs local lookup on the ExampleStateMachine instance. In this example,
|
||||
// we always return the Count value as a little endian binary encoded byte
|
||||
// slice.
|
||||
func (s *FsStateMachine) Lookup(query interface{}) (item interface{}, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
userid := query.(int64)
|
||||
if us, ok := s.store[userid]; ok {
|
||||
return us, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Update updates the object using the specified committed raft entry.
|
||||
func (s *FsStateMachine) Update(e sm.Entry) (result sm.Result, err error) {
|
||||
ctx := context.TODO()
|
||||
ctx = context.WithValue(ctx, ctxEntry{}, &e)
|
||||
ctx = context.WithValue(ctx, ctxSM{}, s)
|
||||
return FsPasser.ExecuteWithBytes(ctx, e.Cmd)
|
||||
}
|
||||
|
||||
// SaveSnapshot saves the current IStateMachine state into a snapshot using the
|
||||
// specified io.Writer object.
|
||||
func (s *FsStateMachine) SaveSnapshot(w io.Writer,
|
||||
fc sm.ISnapshotFileCollection, done <-chan struct{}) error {
|
||||
// as shown above, the only state that can be saved is the Count variable
|
||||
// there is no external file in this IStateMachine example, we thus leave
|
||||
// the fc untouched
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
return gob.NewEncoder(w).Encode(&s.store)
|
||||
}
|
||||
|
||||
// RecoverFromSnapshot recovers the state using the provided snapshot.
|
||||
func (s *FsStateMachine) RecoverFromSnapshot(r io.Reader,
|
||||
files []sm.SnapshotFile,
|
||||
done <-chan struct{}) error {
|
||||
// restore the Count variable, that is the only state we maintain in this
|
||||
// example, the input files is expected to be empty
|
||||
|
||||
err := gob.NewDecoder(r).Decode(&s.store)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the IStateMachine instance. There is nothing for us to cleanup
|
||||
// or release as this is a pure in memory data store. Note that the Close
|
||||
// method is not guaranteed to be called as node can crash at any time.
|
||||
func (s *FsStateMachine) Close() error { return nil }
|
||||
65
shared/sm_update_handler.go
Normal file
65
shared/sm_update_handler.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package shared
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fusenapi/model/gmodel"
|
||||
"fusenapi/utils/auth"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/474420502/passer"
|
||||
sm "github.com/lni/dragonboat/v4/statemachine"
|
||||
)
|
||||
|
||||
type CmdUpdate struct {
|
||||
UserId int64
|
||||
}
|
||||
|
||||
// 上下文传递
|
||||
type (
|
||||
ctxSM struct{}
|
||||
// ctxUserState struct{}
|
||||
ctxEntry struct{}
|
||||
)
|
||||
|
||||
// 结构体异步传递后, 执行的注册函数, 实际上就是update的handler
|
||||
var FsPasser *passer.Passer[sm.Result] = func() *passer.Passer[sm.Result] {
|
||||
|
||||
fsPasser := passer.NewPasser[sm.Result]()
|
||||
fsPasser.RegisterPasser(&CmdUpdate{}, func(ctx context.Context, obj any) (sm.Result, error) {
|
||||
var result sm.Result
|
||||
|
||||
cmd := obj.(*CmdUpdate)
|
||||
s := ctx.Value(ctxSM{}).(*FsStateMachine)
|
||||
e := ctx.Value(ctxEntry{}).(*sm.Entry)
|
||||
|
||||
if old, ok := s.store[cmd.UserId]; ok {
|
||||
if time.Since(old.UpdateAt) <= time.Second {
|
||||
return result, nil
|
||||
}
|
||||
}
|
||||
|
||||
// log.Println("update")
|
||||
models := gmodel.NewAllModels(s.gdb)
|
||||
user, err := models.FsUser.FindUserById(context.TODO(), cmd.UserId)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
||||
userState := &UserState{
|
||||
UserId: cmd.UserId,
|
||||
PwdHash: auth.StringToHash(*user.PasswordHash),
|
||||
UpdateAt: time.Now(),
|
||||
}
|
||||
s.store[cmd.UserId] = userState
|
||||
|
||||
err = userState.Encode(func(b []byte) error {
|
||||
e.Result.Data = b
|
||||
result.Data = b
|
||||
return nil
|
||||
})
|
||||
return result, err
|
||||
})
|
||||
|
||||
return fsPasser
|
||||
}()
|
||||
Reference in New Issue
Block a user