添加节点分布式

This commit is contained in:
eson
2023-07-25 19:32:51 +08:00
parent a248c6cbeb
commit 7200531c27
27 changed files with 633 additions and 58 deletions

135
fsm/fsm.go Normal file
View File

@@ -0,0 +1,135 @@
package fsm
import (
"context"
"encoding/gob"
"fmt"
"fusenapi/model/gmodel"
"fusenapi/utils/auth"
"io"
"log"
"sync"
"time"
"github.com/hashicorp/raft"
"gorm.io/gorm"
)
// StateCluster is a simple key-value store as an FSM.
type StateCluster struct {
mu sync.Mutex
store map[int64]*UserState
gdb *gorm.DB
waiter *WaitCallback
ra *raft.Raft // The consensus mechanism
}
func (fsm *StateCluster) GetUserState(Userid int64) (*UserState, error) {
fsm.mu.Lock()
if us, ok := fsm.store[Userid]; ok {
// log.Println("exists")
fsm.mu.Unlock()
return us, nil
}
fsm.mu.Unlock()
// log.Println("apply")
cmd := &command{
Op: "update",
Key: Userid,
}
future := fsm.ra.Apply(cmd.Encode(), time.Minute)
// log.Println(future.Index())
if future.Response() != nil {
return nil, future.Response().(error)
}
if us := fsm.waiter.Wait(Userid, time.Second*5); us != nil {
return us, nil
} else {
return nil, fmt.Errorf("timeout")
}
}
// Apply applies a Raft log entry to the key-value store.
func (fsm *StateCluster) Apply(fsmlog *raft.Log) interface{} {
var cmd command
err := cmd.Decode(fsmlog.Data)
if err != nil {
return err
}
switch cmd.Op {
case "update":
fsm.mu.Lock()
defer fsm.mu.Unlock()
// log.Println("update")
models := gmodel.NewAllModels(fsm.gdb)
user, err := models.FsUser.FindUserById(context.TODO(), cmd.Key)
if err != nil {
log.Println(err)
}
userState := &UserState{
UserId: cmd.Key,
PwdHash: auth.StringToHash(*user.PasswordHash),
Expired: time.Now(),
}
fsm.store[cmd.Key] = userState
fsm.waiter.Done(userState)
default:
return fmt.Errorf("unrecognized command operation: %s", cmd.Op)
}
return nil
}
// Snapshot returns a snapshot of the key-value store.
func (fsm *StateCluster) Snapshot() (raft.FSMSnapshot, error) {
fsm.mu.Lock()
defer fsm.mu.Unlock()
snapshot := kvStoreSnapshot{
store: make(map[int64]*UserState),
}
for k, v := range fsm.store {
snapshot.store[k] = v
}
return &snapshot, nil
}
// Restore stores the key-value store to a previous state.
func (fsm *StateCluster) Restore(rc io.ReadCloser) error {
var snapshot kvStoreSnapshot
dec := gob.NewDecoder(rc)
if err := dec.Decode(&snapshot); err != nil {
return err
}
fsm.store = snapshot.store
return nil
}
// kvStoreSnapshot represents a snapshot of the key-value store.
type kvStoreSnapshot struct {
store map[int64]*UserState
}
// Persist writes the snapshot to the provided sink.
func (snapshot *kvStoreSnapshot) Persist(sink raft.SnapshotSink) error {
// The example has been simplified. In a production environment, you would
// need to handle this operation with more care.
return nil
}
// Release is invoked when the Raft instance is finished with the snapshot.
func (snapshot *kvStoreSnapshot) Release() {
// Normally you would put any cleanup here.
}

157
fsm/main.go Normal file
View File

@@ -0,0 +1,157 @@
package fsm
import (
"bytes"
"encoding/gob"
"fmt"
"fusenapi/initalize"
"log"
"net"
"os"
"time"
"github.com/hashicorp/raft"
"gorm.io/gorm"
)
func test1() {
log.SetFlags(log.Llongfile)
fsm := StartNode("fs1", "localhost:5500", initalize.InitMysql("fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest"))
time.Sleep(time.Second * 5)
for i := 0; i < 30; i++ {
go log.Println(fsm.GetUserState(39))
}
log.Println(fsm.GetUserState(39))
select {}
}
// StartNode 启动节点
func StartNode(ServerID string, RaftBind string, gdb *gorm.DB) *StateCluster {
fsm := &StateCluster{
store: make(map[int64]*UserState),
waiter: NewWaitCallback(),
gdb: gdb,
}
var retainSnapshotCount = 2
// var ServerID string = "fs1"
// var RaftBind string = "localhost:5500"
var RaftDir string = fmt.Sprintf("/tmp/raftdir/%s", ServerID)
// Setup Raft configuration.
config := raft.DefaultConfig()
config.LocalID = raft.ServerID(ServerID)
// Setup Raft communication.
addr, err := net.ResolveTCPAddr("tcp", RaftBind)
if err != nil {
panic(err)
}
transport, err := raft.NewTCPTransport(RaftBind, addr, 3, 30*time.Second, os.Stderr)
if err != nil {
panic(err)
}
// Create the snapshot store. This allows the Raft to truncate the log.
snapshots, err := raft.NewFileSnapshotStore(RaftDir, retainSnapshotCount, os.Stderr)
if err != nil {
panic(fmt.Errorf("file snapshot store: %s", err))
}
// Create the log store and stable store.
logStore := raft.NewInmemStore()
stableStore := raft.NewInmemStore()
// Create the Raft system.
fsm.ra, err = raft.NewRaft(config, fsm, logStore, stableStore, snapshots, transport)
if err != nil {
panic(err)
}
configuration := raft.Configuration{
Servers: []raft.Server{
{
Suffrage: raft.Voter,
ID: config.LocalID,
Address: transport.LocalAddr(),
},
},
}
fu := fsm.ra.BootstrapCluster(configuration)
if err := fu.Error(); err != nil {
log.Println(err)
}
waitForLeader(fsm.ra)
return fsm
}
func waitForLeader(ra *raft.Raft) {
leaderCh := ra.LeaderCh()
for {
select {
case isLeader := <-leaderCh:
if isLeader {
return
}
case <-time.After(10 * time.Second):
log.Println("Still waiting for the leader...")
}
}
}
// var gdb *gorm.DB = initalize.InitMysql("fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest")
type UserState struct {
Expired time.Time
UserId int64
PwdHash uint64
}
func (us *UserState) Encode() []byte {
var buf = bytes.NewBuffer(nil)
err := gob.NewEncoder(buf).Encode(us)
if err != nil {
log.Panic(err)
return nil
}
return buf.Bytes()
}
// command is used for internal command representation.
type command struct {
Op string
Key int64
Value *UserState
}
func (cmd *command) Encode() []byte {
var buf = bytes.NewBuffer(nil)
err := gob.NewEncoder(buf).Encode(cmd)
if err != nil {
log.Panic(err)
return nil
}
return buf.Bytes()
}
func (cmd *command) Decode(sbuf []byte) error {
var buf = bytes.NewBuffer(sbuf)
err := gob.NewDecoder(buf).Decode(cmd)
if err != nil {
// log.Panic(err)
return err
}
return nil
}

7
fsm/main_test.go Normal file
View File

@@ -0,0 +1,7 @@
package fsm
import "testing"
func TestMain(t *testing.T) {
test1()
}

57
fsm/waitcallback.go Normal file
View File

@@ -0,0 +1,57 @@
package fsm
import (
"sync"
"time"
)
type condAndState struct {
cond *sync.Cond
state *UserState
}
type WaitCallback struct {
Waiter sync.Map
mu sync.Mutex
}
func NewWaitCallback() *WaitCallback {
return &WaitCallback{}
}
func (w *WaitCallback) Done(us *UserState) {
if v, ok := w.Waiter.Load(us.UserId); ok {
w.mu.Lock()
defer w.mu.Unlock()
cas := v.(*condAndState)
cas.state = us
cas.cond.Broadcast()
}
}
func (w *WaitCallback) Wait(key int64, timeout time.Duration) *UserState {
cas := &condAndState{
cond: sync.NewCond(&w.mu),
state: nil,
}
v, loaded := w.Waiter.LoadOrStore(key, cas)
if loaded {
cas = v.(*condAndState)
}
done := make(chan struct{})
go func() {
w.mu.Lock()
defer w.mu.Unlock()
cas.cond.Wait()
close(done)
}()
select {
case <-done:
return cas.state
case <-time.After(timeout):
return nil
}
}