fusen-render/sm.go
2023-07-29 21:58:11 +08:00

423 lines
9.0 KiB
Go

package fusenrender
import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"time"
"github.com/dgraph-io/badger/v3"
sm "github.com/lni/dragonboat/v4/statemachine"
)
type SMQueue struct {
Count uint64
db *badger.DB
}
type Command struct {
Name string `json:"name"`
Group *string `json:"group"`
Item *QueueItem `json:"item"`
}
type QueueItem struct {
Group string `json:"group"` // 组名
Priority uint32 `json:"priority"` // 处理的优先级
CreateAt time.Time `json:"create_at"` // 创建时间 统一utc
Data any `json:"data"` // 操作的数据结构
}
func (item *QueueItem) Encode() ([]byte, error) {
val, err := json.Marshal(item)
if err != nil {
return nil, err
}
return val, nil
}
// NewSMQueue creates and return a new ExampleStateMachine object.
func NewSMQueue(datapath string) sm.IStateMachine {
opts := badger.DefaultOptions(datapath)
db, err := badger.Open(opts)
if err != nil {
panic(err)
}
return &SMQueue{
db: db,
}
}
// Lookup performs local lookup on the ExampleStateMachine instance. In this example,
// we always return the Count value as a little endian binary encoded byte
// slice.
func (s *SMQueue) Lookup(group interface{}) (item interface{}, err error) {
prefix := []byte(fmt.Sprintf("%s_", group.(string)))
err = s.db.Update(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
it.Seek(prefix)
if !it.ValidForPrefix(prefix) {
return nil
}
itemKey := it.Item().Key()
err = it.Item().Value(func(val []byte) error {
item = &QueueItem{}
return json.Unmarshal(val, item)
})
if err != nil {
log.Println(err)
}
return txn.Delete(itemKey)
})
return item, nil
}
func (q *SMQueue) GetKey(item *QueueItem) []byte {
return []byte(fmt.Sprintf("%s_%d_%d", item.Group, -item.Priority, item.CreateAt.UTC().Unix()))
}
// Update updates the object using the specified committed raft entry.
func (s *SMQueue) Update(e sm.Entry) (result sm.Result, err error) {
var cmd Command
err = json.Unmarshal(e.Cmd, &cmd)
if err != nil {
return result, err
}
switch cmd.Name {
case "enqueue":
d, err := cmd.Item.Encode()
if err != nil {
return result, err
}
err = s.db.Update(func(txn *badger.Txn) error {
return txn.Set([]byte(s.GetKey(cmd.Item)), d)
})
return sm.Result{Value: uint64(len(d))}, err
case "dequeue":
prefix := []byte(fmt.Sprintf("%s_", *cmd.Group))
err = s.db.Update(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
it.Seek(prefix)
if !it.ValidForPrefix(prefix) {
return nil
}
itemKey := it.Item().Key()
err = it.Item().Value(func(val []byte) error {
result.Data = val
return nil
})
if err != nil {
log.Println(err)
}
return txn.Delete(itemKey)
})
return result, err
default:
return result, fmt.Errorf("unknonw cmd type: %s", cmd.Name)
}
}
// SaveSnapshot saves the current IStateMachine state into a snapshot using the
// specified io.Writer object.
func (s *SMQueue) SaveSnapshot(w io.Writer,
fc sm.ISnapshotFileCollection, done <-chan struct{}) error {
// as shown above, the only state that can be saved is the Count variable
// there is no external file in this IStateMachine example, we thus leave
// the fc untouched
// 创建一个只读事务
txn := s.db.NewTransaction(false)
// 在事务中读取数据
iter := txn.NewIterator(badger.IteratorOptions{})
for iter.Rewind(); iter.Valid(); iter.Next() {
item := iter.Item()
err := item.Value(func(val []byte) error {
_, err := w.Write(val)
return err
})
if err != nil {
return err
}
// 处理key-value
}
// 释放迭代器和事务
iter.Close()
txn.Discard()
return nil
}
// RecoverFromSnapshot recovers the state using the provided snapshot.
func (s *SMQueue) RecoverFromSnapshot(r io.Reader,
files []sm.SnapshotFile,
done <-chan struct{}) error {
// restore the Count variable, that is the only state we maintain in this
// example, the input files is expected to be empty
data, err := ioutil.ReadAll(r)
if err != nil {
return err
}
v := binary.LittleEndian.Uint64(data)
s.Count = v
return nil
}
// Close closes the IStateMachine instance. There is nothing for us to cleanup
// or release as this is a pure in memory data store. Note that the Close
// method is not guaranteed to be called as node can crash at any time.
func (s *SMQueue) Close() error { return nil }
// import (
// "encoding/json"
// "fmt"
// "io"
// "log"
// "sync"
// "time"
// "github.com/dgraph-io/badger/v3"
// "github.com/hashicorp/raft"
// )
// type SMQueue struct {
// mu sync.Mutex
// queue *Queue
// ra *raft.Raft
// }
// func NewQueueFSM(datapath string) *SMQueue {
// q, err := NewQueue(datapath)
// if err != nil {
// panic(err)
// }
// return &SMQueue{
// queue: q,
// }
// }
// const (
// Enqueue raft.LogType = 20
// Dequeue raft.LogType = 21
// )
// func (q *SMQueue) LogInfo() {
// log.Println(q.ra.GetConfiguration().Configuration(), q.ra.State())
// }
// func (q *SMQueue) Put(item *QueueItem) error {
// data, err := item.Encode()
// if err != nil {
// return err
// }
// return q.PutJsonString(data)
// }
// func (q *SMQueue) PutJsonString(item []byte) error {
// task := ApplyTask{
// Name: "enqueue",
// Object: item,
// }
// d, err := task.Encode()
// if err != nil {
// return err
// }
// future := q.ra.Apply(d, time.Second*15)
// if future.Error() != nil {
// return future.Error()
// }
// resp := future.Response()
// if resp == nil {
// return nil
// }
// return resp.(error)
// }
// func (q *SMQueue) Pop(group string) (*QueueItem, error) {
// task := ApplyTask{
// Name: "dequeue",
// Object: []byte(group),
// }
// d, err := task.Encode()
// if err != nil {
// return nil, err
// }
// ierr := q.ra.Apply(d, time.Second*15)
// if ierr.Error() != nil {
// return nil, ierr.Error()
// }
// switch v := ierr.Response().(type) {
// case error:
// return nil, v
// case *QueueItem:
// return v, nil
// default:
// return nil, fmt.Errorf("unknown %v", v)
// }
// }
// func (q *SMQueue) Apply(log *raft.Log) interface{} {
// leader, id := q.ra.LeaderWithID()
// q.mu.Lock()
// defer q.mu.Unlock()
// var task ApplyTask
// err := task.Decode(log.Data)
// if err != nil {
// return err
// }
// switch task.Name {
// case "enqueue":
// var item QueueItem
// if err := json.Unmarshal(task.Object, &item); err != nil {
// return err
// }
// return q.queue.Enqueue(&item)
// case "dequeue":
// // log.Data 传入group
// item, err := q.queue.Dequeue(string(task.Object))
// if err != nil {
// return err
// }
// return item
// default:
// return fmt.Errorf("unknown type: %v", log.Type)
// }
// }
// type QueueSnapshot struct {
// Items []*QueueItem
// }
// // Persist writes the snapshot to the provided sink.
// func (snapshot *QueueSnapshot) Persist(sink raft.SnapshotSink) error {
// // The example has been simplified. In a production environment, you would
// // need to handle this operation with more care.
// return nil
// }
// // Release is invoked when the Raft instance is finished with the snapshot.
// func (snapshot *QueueSnapshot) Release() {
// // Normally you would put any cleanup here.
// }
// // Snapshot 返回队列快照
// func (fsm *SMQueue) Snapshot() (raft.FSMSnapshot, error) {
// var items []*QueueItem
// // 使用 Badger 读取所有队列项
// fsm.queue.db.View(func(txn *badger.Txn) error {
// opts := badger.DefaultIteratorOptions
// opts.PrefetchValues = false // 只需要key
// it := txn.NewIterator(opts)
// defer it.Close()
// for it.Rewind(); it.Valid(); it.Next() {
// err := it.Item().Value(func(val []byte) error {
// item := &QueueItem{}
// err := json.Unmarshal(val, item)
// if err != nil {
// return err
// }
// items = append(items, item)
// return nil
// })
// if err != nil {
// log.Println(err)
// }
// }
// return nil
// })
// snapshot := &QueueSnapshot{Items: items}
// return snapshot, nil
// }
// // Restore 恢复队列状态
// func (fsm *SMQueue) Restore(rc io.ReadCloser) error {
// snapshot := &QueueSnapshot{}
// if err := json.NewDecoder(rc).Decode(snapshot); err != nil {
// return err
// }
// // 用快照数据重建队列
// fsm.queue.db.Update(func(txn *badger.Txn) error {
// for _, item := range snapshot.Items {
// val, err := item.Encode()
// if err != nil {
// log.Println(err)
// continue
// }
// if err := txn.Set(fsm.queue.GetKey(item), val); err != nil {
// return err
// }
// }
// return nil
// })
// return nil
// }
// func waitForCluster(ra *raft.Raft) {
// ticker := time.NewTicker(500 * time.Millisecond)
// defer ticker.Stop()
// for range ticker.C {
// state := ra.State()
// if state == raft.Leader || state == raft.Follower {
// log.Println("Raft cluster is running")
// return
// } else {
// log.Println("Still waiting for the cluster to start...")
// }
// }
// }