From c3b067cca5bc52e9c7df968218ef53c0707372e1 Mon Sep 17 00:00:00 2001
From: huangsimin <474420502@qq.com>
Date: Wed, 10 Aug 2022 15:55:31 +0800
Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=97=A7=E7=9A=84=E7=8A=B6?=
 =?UTF-8?q?=E6=80=81=E6=9C=BA=E4=BB=A3=E7=A0=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../java/com/yuandian/dataflow/Server.java    |   6 +-
 .../dataflow/controller/MasterProcessor.java  |   5 +-
 .../dataflow/controller/PacketsProcessor.java |   9 +-
 .../controller/TransferLeaderProcessor.java   |  41 ---
 .../dataflow/statemachine/StateMachine.java   | 225 +++++-------
 .../dataflow/statemachine/StateServer.java    |   2 +
 .../statemachine/StateServerFactory.java      |   2 +
 .../statemachine/client/CounterClient.java    |   6 +-
 .../master}/MasterFactory.java                |   8 +-
 .../statemachine/messages/MessageUtils.java   |  31 ++
 .../statemachine/{ => messages}/Operate.java  |  31 +-
 .../statemachine/{ => messages}/Query.java    |   4 +-
 .../statemachine/messages/RaftReply.java      |  78 +++++
 .../statemachine/{ => state}/Peer.java        |   2 +-
 .../dataflow/statemachine/state/State.java    |   2 -
 .../statemachine/state/WorkerState.java       |   2 -
 .../statemachine_old/StateFactory.java        | 329 ------------------
 .../statemachine_old/StateMachine.java        | 282 ---------------
 .../closure/GenericClosure.java               |  59 ----
 .../statemachine_old/operate/OperateOld.java  | 108 ------
 .../rpc/OperateProcessor.java                 |  84 -----
 .../statemachine_old/rpc/RaftResponse.java    |  41 ---
 22 files changed, 223 insertions(+), 1134 deletions(-)
 delete mode 100644 src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java
 rename src/main/java/com/yuandian/dataflow/{statemachine_old => statemachine/master}/MasterFactory.java (82%)
 create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/messages/MessageUtils.java
 rename src/main/java/com/yuandian/dataflow/statemachine/{ => messages}/Operate.java (84%)
 rename src/main/java/com/yuandian/dataflow/statemachine/{ => messages}/Query.java (92%)
 create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/messages/RaftReply.java
 rename src/main/java/com/yuandian/dataflow/statemachine/{ => state}/Peer.java (93%)
 delete mode 100644 src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java
 delete mode 100644 src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java
 delete mode 100644 src/main/java/com/yuandian/dataflow/statemachine_old/closure/GenericClosure.java
 delete mode 100644 src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java
 delete mode 100644 src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java
 delete mode 100644 src/main/java/com/yuandian/dataflow/statemachine_old/rpc/RaftResponse.java

diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java
index 28c322f..80e6d91 100644
--- a/src/main/java/com/yuandian/dataflow/Server.java
+++ b/src/main/java/com/yuandian/dataflow/Server.java
@@ -9,12 +9,12 @@ import org.slf4j.MarkerFactory;
 
 import com.alipay.sofa.jraft.JRaftUtils;
 import com.alipay.sofa.jraft.conf.Configuration;
-import com.yuandian.dataflow.statemachine_old.StateFactory;
 import com.yuandian.dataflow.utils.Utils;
 
 import io.netty.util.internal.StringUtil;
 import javassist.ClassClassPath;
 import lombok.extern.slf4j.Slf4j;
+ 
 
  
 
@@ -48,8 +48,8 @@ public class Server {
         // var sprPort = sprPeers[2];
         log.info("{} {}", peeridstr, sprPort);
 
-        conf = JRaftUtils.getConfiguration(String.join(",", peers));
-        StateFactory.startStateServer(peeridstr, conf);
+        // conf = JRaftUtils.getConfiguration(String.join(",", peers));
+        // StateFactory.startStateServer(peeridstr, conf);
 
  
         
diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java
index bc89d93..1292226 100644
--- a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java
+++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java
@@ -21,14 +21,11 @@ import com.yuandian.dataflow.statemachine.master.MasterContext;
 import com.yuandian.dataflow.statemachine.master.MasterExecute;
 import com.yuandian.dataflow.statemachine.state.State;
 import com.yuandian.dataflow.statemachine.state.WorkerState;
-import com.yuandian.dataflow.statemachine_old.StateFactory;
-import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
-import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
 import com.yuandian.dataflow.utils.PacketsManager;
 import com.yuandian.dataflow.utils.Utils;
 
 import lombok.extern.slf4j.Slf4j;
+ 
 
 @Slf4j
 @MasterRegister
diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
index e21e623..de5dddf 100644
--- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
+++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
@@ -22,16 +22,12 @@ import com.yuandian.dataflow.proto.Processor.ProcessorResponse;
 import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerImplBase;
 import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
 import com.yuandian.dataflow.statemachine.state.State;
-import com.yuandian.dataflow.statemachine_old.StateFactory;
-import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
-import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
-import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
 
 import io.grpc.stub.StreamObserver;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+ 
 
 /**
  * description
@@ -54,8 +50,7 @@ public class PacketsProcessor extends ProcessorServerImplBase {
         super.allPackets(request, responseObserver);
 
         log.info("packets {}", request.getPacketsList().size());
-
-        
+ 
     }
  
   
diff --git a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java
deleted file mode 100644
index 379a6c9..0000000
--- a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package com.yuandian.dataflow.controller;
-
-import java.io.Serializable;
-
-import com.alipay.sofa.jraft.Status;
-import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.rpc.RpcContext;
-import com.alipay.sofa.jraft.rpc.RpcProcessor;
-import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
-import com.yuandian.dataflow.statemachine_old.StateFactory;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-
-
-/**
- *  例子 强制转换leader
- */
-@Slf4j
-public class TransferLeaderProcessor implements RpcProcessor<TransferLeaderProcessor.LeaderRequest> {
-    
-    @Setter
-    @Getter
-    public static class LeaderRequest implements Serializable {
-         PeerId peer;
-    }
-    
-    @Override
-    public void handleRequest(RpcContext rpcCtx, LeaderRequest request) {
-        Status status = StateFactory.getCluster().getRaftNode().transferLeadershipTo(request.peer);
-        rpcCtx.sendResponse(status);
-        log.debug("[TransferLeader] {} change leader to {}", status, request.peer);
-    }
-
-    @Override
-    public String interest() {
-        return LeaderRequest.class.getName();
-    }
-    
-}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
index 179a90c..fe1b535 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
@@ -38,7 +38,12 @@ import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.JavaUtils;
 
-import com.yuandian.dataflow.statemachine.Operate.OperateType;
+import com.yuandian.dataflow.statemachine.messages.MessageUtils;
+import com.yuandian.dataflow.statemachine.messages.Operate;
+import com.yuandian.dataflow.statemachine.messages.Query;
+import com.yuandian.dataflow.statemachine.messages.RaftReply;
+import com.yuandian.dataflow.statemachine.messages.Operate.OperateType;
+import com.yuandian.dataflow.statemachine.messages.RaftReply.Status;
 import com.yuandian.dataflow.statemachine.state.State;
 import com.yuandian.dataflow.statemachine.state.WorkerState;
 import com.yuandian.dataflow.statemachine_old.MasterFactory;
@@ -74,9 +79,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  */
 @Slf4j
 public class StateMachine extends BaseStateMachine {
-  private final SimpleStateMachineStorage storage =
-      new SimpleStateMachineStorage();
-
+  private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
 
   private State state = new State();
 
@@ -95,10 +98,9 @@ public class StateMachine extends BaseStateMachine {
   }
 
   public Boolean isLeader() {
-      return leader.get();
+    return leader.get();
   }
 
-
   public Executor asyncExecutor = Executors.newFixedThreadPool(8);
 
   /**
@@ -114,7 +116,7 @@ public class StateMachine extends BaseStateMachine {
    */
   @Override
   public void initialize(RaftServer server, RaftGroupId groupId,
-                         RaftStorage raftStorage) throws IOException {
+      RaftStorage raftStorage) throws IOException {
     super.initialize(server, groupId, raftStorage);
     this.storage.init(raftStorage);
     load(storage.getLatestSnapshot());
@@ -139,15 +141,14 @@ public class StateMachine extends BaseStateMachine {
    */
   @Override
   public long takeSnapshot() {
-    //get the last applied index
+    // get the last applied index
     final TermIndex last = getLastAppliedTermIndex();
 
-    //create a file with a proper name to store the snapshot
-    final File snapshotFile =
-        storage.getSnapshotFile(last.getTerm(), last.getIndex());
+    // create a file with a proper name to store the snapshot
+    final File snapshotFile = storage.getSnapshotFile(last.getTerm(), last.getIndex());
 
-    //serialize the counter object and write it into the snapshot file
-    try  {
+    // serialize the counter object and write it into the snapshot file
+    try {
       ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(snapshotFile)));
       out.writeObject(counter);
       out.close();
@@ -156,7 +157,7 @@ public class StateMachine extends BaseStateMachine {
           + "\", last applied index=" + last);
     }
 
-    //return the index of the stored snapshot (which is the last applied one)
+    // return the index of the stored snapshot (which is the last applied one)
     return last.getIndex();
   }
 
@@ -168,13 +169,13 @@ public class StateMachine extends BaseStateMachine {
    * @throws IOException if any error happens during read from storage
    */
   private long load(SingleFileSnapshotInfo snapshot) throws IOException {
-    //check the snapshot nullity
+    // check the snapshot nullity
     if (snapshot == null) {
       LOG.warn("The snapshot info is null.");
       return RaftLog.INVALID_LOG_INDEX;
     }
 
-    //check the existance of the snapshot file
+    // check the existance of the snapshot file
     final File snapshotFile = snapshot.getFile().getPath().toFile();
     if (!snapshotFile.exists()) {
       LOG.warn("The snapshot file {} does not exist for snapshot {}",
@@ -182,18 +183,17 @@ public class StateMachine extends BaseStateMachine {
       return RaftLog.INVALID_LOG_INDEX;
     }
 
-    //load the TermIndex object for the snapshot using the file name pattern of
+    // load the TermIndex object for the snapshot using the file name pattern of
     // the snapshot
-    final TermIndex last =
-        SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
+    final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
 
-    //read the file and cast it to the AtomicInteger and set the counter
+    // read the file and cast it to the AtomicInteger and set the counter
     try (ObjectInputStream in = new ObjectInputStream(
         new BufferedInputStream(new FileInputStream(snapshotFile)))) {
-      //set the last applied termIndex to the termIndex of the snapshot
+      // set the last applied termIndex to the termIndex of the snapshot
       setLastAppliedTermIndex(last);
 
-      //read, cast and set the counter
+      // read, cast and set the counter
       counter = JavaUtils.cast(in.readObject());
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException(e);
@@ -202,51 +202,37 @@ public class StateMachine extends BaseStateMachine {
     return last.getIndex();
   }
 
- 
- 
   @Override
   public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {
-     
-    log.info("msg {}", groupMemberId.getPeerId());
-    StateServerFactory.getCurrentPeerId().setRaftPeerId(groupMemberId.getPeerId().toString());
-  
-    leader.set(newLeaderId == groupMemberId.getPeerId());
-    
-    log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId  , groupMemberId.getPeerId(), isLeader());
-    // super.notifyLeaderChanged(groupMemberId, newLeaderId);
 
-    asyncExecutor.execute(()->{
+    log.info("msg {}", groupMemberId.getPeerId());
+    if (StateServerFactory.getCurrentPeerId().getRaftPeerId() == null) {
+      StateServerFactory.getCurrentPeerId().setRaftPeerId(groupMemberId.getPeerId().toString());
+    }
+
+    leader.set(newLeaderId == groupMemberId.getPeerId());
+
+    log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId, groupMemberId.getPeerId(), isLeader());
+
+    asyncExecutor.execute(() -> {
       log.info("asyncExecutor");
-      var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getCurrentPeerId() )); 
+      var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getCurrentPeerId()));
       try {
         var reply = StateServerFactory.send(op);
-        log.info("{}", reply);
-      } catch (IOException e) {
-        e.printStackTrace();
+        log.info("{}",  MessageUtils.<RaftReply>fromMessage(reply.getMessage()));
+      } catch (IOException | ClassNotFoundException e) {
+        log.error("{}",e.toString());
       }
     });
 
-
-
-
- 
-
-
-
-    if (MasterFactory.getMasterExecute().isAlive())  
+    if (MasterFactory.getMasterExecute().isAlive())
       MasterFactory.getMasterExecute().interrupt();
 
-    if(isLeader())  
+    if (isLeader())
       MasterFactory.getMasterExecute().start();
- 
+
   }
 
-  
-
- 
-
-  
-
   /**
    * Handle GET command, which used by clients to get the counter value.
    *
@@ -255,39 +241,33 @@ public class StateMachine extends BaseStateMachine {
    */
   @Override
   public CompletableFuture<Message> query(Message request) {
-    var data = request.getContent();
-    
-    var inBytes = new ByteArrayInputStream( data.toByteArray());
-    try (var inObject = new ObjectInputStream(inBytes)) {
-      // log.info("applyTransaction {}", inObject.toString());
-      log.info("{}", request);
-      var op = (Query)inObject.readObject();
-      switch(op.getType()){
-        case GET_STATE:
-          try(var rlock = readLock()) {
-            var ws =  state.getWorkers().get( ((WorkerState)op.getValue()).getPeerId() );
-            if(ws == null) {
-              return CompletableFuture.completedFuture(Message.valueOf("Peerid is not exist"));
-            }
-            return  CompletableFuture.completedFuture(Message.valueOf("Peerid is not exist"));
-          }
 
+    var reply = new RaftReply();
  
+    try {
+      var query = MessageUtils.<Query>fromMessage(request);
+      switch (query.getType()) {
+        case GET_STATE:
+          try (var rlock = readLock()) {
+            var ws = state.getWorkers().get(((WorkerState) query.getValue()).getPeerId());
+            if (ws == null) {
+              return CompletableFuture.completedFuture(Status.setError(reply, "Peerid is not exist"));
+            }
+            reply.setData(ws); 
+          }
+          break;
         default:
-        
-          return CompletableFuture.completedFuture(
-              Message.valueOf("Invalid Command"));
-  
-         
+    
+          Status.setError(reply, "Invalid Command");
+          return CompletableFuture.completedFuture(reply);
+
       }
 
     } catch (ClassNotFoundException | IOException e) {
-      e.printStackTrace();
+      log.error("{}", e.toString());
     }
-
-    return CompletableFuture.completedFuture(
-        Message.valueOf("ok"));
  
+    return CompletableFuture.completedFuture(Status.setOK(reply));
   }
 
   /**
@@ -298,68 +278,45 @@ public class StateMachine extends BaseStateMachine {
    */
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
-    // log.info("applyTransaction"); 
-    final RaftProtos.LogEntryProto entry = trx.getLogEntry();
-  
- 
-    //check if the command is valid
-    // String logData = entry.getStateMachineLogEntry().getLogData()
-    //     .toString(Charset.defaultCharset());
+    // log.info("applyTransaction");
+    var reply = new RaftReply();
 
-   
-    Operate op = null;
+    final RaftProtos.LogEntryProto entry = trx.getLogEntry();
+    final long index = entry.getIndex();
+  
     try {
-      
+
       var data = entry.getStateMachineLogEntry().getLogData();
-      
-      var inBytes = new ByteArrayInputStream( data.toByteArray());
-      var inObject = new ObjectInputStream(inBytes);
-      op = (Operate)inObject.readObject();
-      // log.info("applyTransaction {}", data);
-      inObject.close();
-      inBytes.close();
+      final var op = MessageUtils.<Operate>fromByteString(data);
+      try (var wlock = writeLock()) {
+  
+        switch (op.getType()) {
+          case ALLOCATE_PACKETS:
+            break;
+          case PUT_WORKERSTATE:
+            var ws = (WorkerState) op.getValue();
+            if(ws.getPeerId().getRaftPeerId() == null) {
+              return CompletableFuture.completedFuture(Status.setError(reply, "RaftPeerId 为 null"));
+            }
+            state.getWorkers().put(ws.getPeerId(), ws);
+            break;
+          case REMOVE:
+            break;
+          default:
+            break;
+        }
+        // if (isLeader()) {
+        //   log.info("{}: getType   {} ", index, op.getType());
+        // }
+        log.info("{}: getType   {} ", index, op.getType());
+        updateLastAppliedTermIndex(entry.getTerm(), index);
+      }
+ 
     } catch (IOException | ClassNotFoundException e) {
       log.info("{}", e.toString());
-      return CompletableFuture.completedFuture(Message.valueOf("错误op"));
-    }
-
-    // if (!logData.equals("INCREMENT")) {
-    //   return CompletableFuture.completedFuture(
-    //       Message.valueOf("Invalid Command"));
-    // }
-    //update the last applied term and index
-    final long index = entry.getIndex();
-    
-    try(var wlock = writeLock()) {
-     
-      switch(op.getType()) {
-        case ALLOCATE_PACKETS:
-          break;
-        case PUT_WORKERSTATE:
-        
-          var ws = (WorkerState)op.getValue();
-          // log.info("applyTransaction {}", OperateType.PUT_WORKERSTATE);
-          state.getWorkers().put(ws.getPeerId() , ws);
-          
-          break;
-        case REMOVE:
-          break;
-        default:
-          break;
-      }
-      updateLastAppliedTermIndex(entry.getTerm(), index);
+      return CompletableFuture.completedFuture(Status.setError(reply, "错误op"));
     }
  
-    //return the new value of the counter to the client
-    final CompletableFuture<Message> f =
-        CompletableFuture.completedFuture(Message.valueOf("put ok"));
-
-    //if leader, log the incremented value and it's log index
-    if (isLeader()) {
-      log.info("{}: getType   {}, state {}", index, op.getType(), state);
-    }
-     
-    // log.info("applyTransaction {}", 6);
-    return f;
+    return CompletableFuture.completedFuture(Status.setOK(reply));
   }
 }
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java
index 728a28c..769cf73 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java
@@ -33,6 +33,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.util.NetUtils;
 
 import com.yuandian.dataflow.statemachine.grpc.ProcessorServer;
+import com.yuandian.dataflow.statemachine.state.Peer;
 import com.yuandian.dataflow.statemachine_old.MasterFactory;
 
 import io.netty.util.internal.StringUtil;
@@ -111,6 +112,7 @@ public final class StateServer implements Closeable {
    
       this.processorServer = new ProcessorServer();
       this.processorServer.getGrpcServer().start();
+
       this.peer.setProcessorPort(this.processorServer.getGrpcServer().getPort());
   }
 
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java
index 9ec41bc..d94e2ec 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java
@@ -9,6 +9,8 @@ import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeer;
 
+import com.yuandian.dataflow.statemachine.state.Peer;
+
 public class StateServerFactory {
     public static StateServer stateServer;
 
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java
index 9fe6e64..64faaec 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java
@@ -39,10 +39,10 @@ import org.springframework.cglib.proxy.CallbackFilter;
 
 import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
 import com.yuandian.dataflow.statemachine.StateServer;
-import com.yuandian.dataflow.statemachine.Operate.OperateType;
+import com.yuandian.dataflow.statemachine.messages.Operate;
+import com.yuandian.dataflow.statemachine.messages.Query;
+import com.yuandian.dataflow.statemachine.messages.Operate.OperateType;
 import com.yuandian.dataflow.statemachine.state.WorkerState;
-import com.yuandian.dataflow.statemachine.Operate;
-import com.yuandian.dataflow.statemachine.Query;
 
 import lombok.extern.slf4j.Slf4j;
 
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterFactory.java
similarity index 82%
rename from src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java
rename to src/main/java/com/yuandian/dataflow/statemachine/master/MasterFactory.java
index d647256..a28c88e 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterFactory.java
@@ -4,7 +4,7 @@
  * @author eson
  *2022年7月20日-10:00:05
  */
-package com.yuandian.dataflow.statemachine_old;
+package com.yuandian.dataflow.statemachine.master;
 
 import java.time.Duration;
 import java.time.Instant;
@@ -18,19 +18,15 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
 import com.google.protobuf.Any;
 // import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
 import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
-import com.yuandian.dataflow.statemachine.master.MasterContext;
-import com.yuandian.dataflow.statemachine.master.MasterExecute;
 import com.yuandian.dataflow.statemachine.state.State;
 import com.yuandian.dataflow.statemachine.state.WorkerState;
-import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
-import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
 
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 
 import lombok.extern.slf4j.Slf4j;
+ 
 
 /**
  * Master主线程, 用于接收packets
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/messages/MessageUtils.java b/src/main/java/com/yuandian/dataflow/statemachine/messages/MessageUtils.java
new file mode 100644
index 0000000..3d8f24f
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/statemachine/messages/MessageUtils.java
@@ -0,0 +1,31 @@
+package com.yuandian.dataflow.statemachine.messages;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+@java.lang.SuppressWarnings("unchecked")
+
+public class MessageUtils {
+    
+     public static  <T> T fromMessage(Message msg) throws IOException, ClassNotFoundException {
+        var inBytes = new ByteArrayInputStream(msg.getContent().toByteArray());
+        var inObject = new ObjectInputStream(inBytes);
+        T result = (T) inObject.readObject();
+        inObject.close();
+        inBytes.close();
+        return result;
+    }
+
+    public static  <T> T fromByteString(ByteString msg) throws IOException, ClassNotFoundException {
+        var inBytes = new ByteArrayInputStream(msg.toByteArray());
+        var inObject = new ObjectInputStream(inBytes);
+        T result = (T) inObject.readObject();
+        inObject.close();
+        inBytes.close();
+        return result;
+    }
+}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/messages/Operate.java
similarity index 84%
rename from src/main/java/com/yuandian/dataflow/statemachine/Operate.java
rename to src/main/java/com/yuandian/dataflow/statemachine/messages/Operate.java
index cf42896..6279b2b 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/Operate.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/messages/Operate.java
@@ -1,4 +1,4 @@
-package com.yuandian.dataflow.statemachine;
+package com.yuandian.dataflow.statemachine.messages;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -63,27 +63,23 @@ public class Operate implements Message,Serializable {
         this.value = value;
     }
 
+  
+
     public Message toMessage() {
         try {
- 
             var output = ByteString.newOutput();
             var outputStream = new ObjectOutputStream(output);
             outputStream.writeObject( this);
             outputStream.close();
             output.close();
 
-
             // var inBytes = new ByteArrayInputStream( output.toByteString().toByteArray() );
             // var inObject = new ObjectInputStream(inBytes);
-   
-            
             // var a = (Operate)inObject.readObject();
             // log.info("applyTransaction {}", a);
-
-
             return Message.valueOf(output.toByteString());
         } catch (IOException  e) {
-            e.printStackTrace();
+            log.error("{}",e.toString());
         }
         return null;
     }
@@ -91,33 +87,16 @@ public class Operate implements Message,Serializable {
     @Override
     public ByteString getContent() {
         try {
- 
             var output = ByteString.newOutput();
             var outputStream = new ObjectOutputStream(output);
             outputStream.writeObject(this);
             outputStream.close();
             output.close();
-
-
-            // var inBytes = new ByteArrayInputStream( output.toByteString().toByteArray() );
-            // var inObject = new ObjectInputStream(inBytes);
-   
-            
-            // var a = (Operate)inObject.readObject();
-            // log.info("applyTransaction {}", a);
-
-
             return  output.toByteString();
         } catch (IOException  e) {
-            e.printStackTrace();
+            log.error("{}",e.toString());
         }
         return null;
     }
-
- 
- 
-
- 
-
  
 }
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/Query.java b/src/main/java/com/yuandian/dataflow/statemachine/messages/Query.java
similarity index 92%
rename from src/main/java/com/yuandian/dataflow/statemachine/Query.java
rename to src/main/java/com/yuandian/dataflow/statemachine/messages/Query.java
index 622e109..c082191 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/Query.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/messages/Query.java
@@ -1,4 +1,4 @@
-package com.yuandian.dataflow.statemachine;
+package com.yuandian.dataflow.statemachine.messages;
 
 import java.io.IOException;
 import java.io.ObjectOutputStream;
@@ -44,7 +44,7 @@ public class Query implements Message,Serializable {
             output.close();
             return  output.toByteString();
         } catch (IOException  e) {
-            e.printStackTrace();
+            log.error("{}",e.toString());
         }
         return null;
     }
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/messages/RaftReply.java b/src/main/java/com/yuandian/dataflow/statemachine/messages/RaftReply.java
new file mode 100644
index 0000000..429b01c
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/statemachine/messages/RaftReply.java
@@ -0,0 +1,78 @@
+package com.yuandian.dataflow.statemachine.messages;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import javax.servlet.http.PushBuilder;
+
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Getter
+@Setter
+@ToString
+public class RaftReply   implements Message, Serializable   {
+
+    private Status code; // 状态码
+    private String message; // 返回字符串信息说明
+    private Object data; // object 必须 继承 Serializable
+
+    public RaftReply() {
+
+    }
+
+    @ToString
+    public enum Status {
+        OK(0), ERROR(1);
+
+        private final int code;
+        private Status(int code) {
+            this.code = code;
+        }
+
+        public static RaftReply setOK( RaftReply reply ) {
+            reply.setCode(OK);
+            reply.setData(OK.toString());
+            return reply;
+        }
+
+        public static RaftReply setError( RaftReply reply ) {
+            reply.setCode(ERROR);
+            reply.setData(ERROR.toString());
+            return reply;
+        }
+
+        public static RaftReply setError( RaftReply reply, String error) {
+            reply.setCode(ERROR);
+            reply.setData(ERROR.toString());
+            return reply;
+        }
+    }
+
+
+ 
+    @Override
+    public ByteString getContent() {
+        try {
+            var output = ByteString.newOutput();
+            var outputStream = new ObjectOutputStream(output);
+            outputStream.writeObject(this);
+            outputStream.close();
+            output.close();
+            return output.toByteString();
+        } catch (IOException e) {
+            log.error("{}",e.toString());
+        }
+        return null;
+    }
+
+}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/Peer.java b/src/main/java/com/yuandian/dataflow/statemachine/state/Peer.java
similarity index 93%
rename from src/main/java/com/yuandian/dataflow/statemachine/Peer.java
rename to src/main/java/com/yuandian/dataflow/statemachine/state/Peer.java
index c399ac4..cc324f9 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/Peer.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/state/Peer.java
@@ -1,4 +1,4 @@
-package com.yuandian.dataflow.statemachine;
+package com.yuandian.dataflow.statemachine.state;
 
 import java.io.Serializable;
 
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/State.java b/src/main/java/com/yuandian/dataflow/statemachine/state/State.java
index 0b41804..977b9d0 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/state/State.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/state/State.java
@@ -21,8 +21,6 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.util.HashMap;
 
-import com.yuandian.dataflow.statemachine.Peer;
-
 /**
  * 代表任务状态 暂时全局使用这个结构. 添加新增状态
  *
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java
index 249bda1..7090e2b 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java
@@ -9,8 +9,6 @@ package com.yuandian.dataflow.statemachine.state;
 import java.io.Serializable;
 import java.time.Instant;
 
-import com.yuandian.dataflow.statemachine.Peer;
-
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java
deleted file mode 100644
index ad306f4..0000000
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/**
- * description
- *
- * @author eson
- *2022年7月12日-13:36:24
- */
-package com.yuandian.dataflow.statemachine_old;
-
-import java.io.File;
-import java.lang.reflect.InvocationTargetException;
-import java.nio.ByteBuffer;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
-
-import org.reflections.ReflectionUtils;
-import org.reflections.Reflections;
-
-import com.alipay.remoting.NamedThreadFactory;
-import com.alipay.remoting.exception.CodecException;
-import com.alipay.remoting.serialization.SerializerManager;
-import com.alipay.sofa.jraft.JRaftUtils;
-import com.alipay.sofa.jraft.Node;
-import com.alipay.sofa.jraft.RaftGroupService;
-import com.alipay.sofa.jraft.Status;
-import com.alipay.sofa.jraft.closure.ReadIndexClosure;
-import com.alipay.sofa.jraft.conf.Configuration;
-import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.entity.ReadIndexStatus;
-import com.alipay.sofa.jraft.entity.Task;
-import com.alipay.sofa.jraft.error.RaftError;
-import com.alipay.sofa.jraft.error.RemotingException;
-import com.alipay.sofa.jraft.option.CliOptions;
-import com.alipay.sofa.jraft.option.NodeOptions;
-import com.alipay.sofa.jraft.rpc.InvokeCallback;
-import com.alipay.sofa.jraft.rpc.InvokeContext;
-import com.alipay.sofa.jraft.rpc.RpcClient;
-import com.alipay.sofa.jraft.rpc.RpcProcessor;
-import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
-import com.alipay.sofa.jraft.util.BytesUtil;
-import com.alipay.sofa.jraft.util.Endpoint;
-import com.alipay.sofa.jraft.util.ThreadPoolUtil;
-import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
-import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
-import com.yuandian.dataflow.statemachine.master.MasterExecute;
-import com.yuandian.dataflow.statemachine.state.State;
-import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
-import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
-import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
-import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor.OperateRequest;
-import com.yuandian.dataflow.utils.Utils;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * description
- *
- * @author eson
- *         2022年7月12日-13:36:24
- */
-@Slf4j
-public class StateFactory {
-
-    private static StateServer ss;
-    private static ReentrantLock lockReadIndex = new ReentrantLock();
-
-    public static void startStateServer(String peerstr, Configuration conf) throws Exception {
-        if (ss != null) {
-            throw new Exception("重复初始化 InitStateServer");
-        }
-        ss = new StateFactory.StateServer(peerstr, conf);
-    }
-
-    public static boolean isLeader() {
-        return ss.node.isLeader();
-    }
-
-    public static PeerId getLeaderId() {
-        return ss.node.getLeaderId();
-    }
-
-    public static PeerId getServerId() {
-        return ss.cluster.getServerId();
-    }
-
-    public static Node getNode() {
-        return ss.node;
-    }
-
-    public static Node getRaftNode() {
-        return ss.cluster.getRaftNode();
-    }
-
-    public static RpcClient getRpcClient() {
-        return ss.getRpcClient();
-    }
-
-    public static RaftGroupService getCluster() {
-        return ss.getCluster();
-    }
-
-    // 获取状态服务的对象
-    public static StateServer getStateServer() {
-        return ss;
-    }
-
-    public static void readIndexState(GenericClosure closure) {
-        ss.readIndexState(closure);
-    }
-
-    public static void applyOperate(OperateOld op, GenericClosure closure) {
-        ss.applyOperate(op, closure);
-    }
-
-    public static void rpcClientInvokeAsync(final Endpoint endpoint, final Object request,
-            final InvokeCallback callback, final long timeoutMs)
-            throws InterruptedException, RemotingException {
-        ss.getRpcClient().invokeAsync(endpoint, request, callback, timeoutMs);
-    }
-
-    public static Object rpcClientInvokeSync(final Endpoint endpoint, final Object request, final long timeoutMs)
-            throws InterruptedException, RemotingException {
-        return ss.getRpcClient().invokeSync(endpoint, request, timeoutMs);
-    }
-
-    public static Object rpcClientInvokeSync(final Endpoint endpoint, final Object request, final InvokeContext ctx,
-            final long timeoutMs) throws InterruptedException, RemotingException {
-        return ss.getRpcClient().invokeSync(endpoint, request, ctx, timeoutMs);
-    }
-
-    @Getter
-    @Setter
-    public static class StateServer {
-
-        RpcClient rpcClient;
-
-        private Node node;
-        private RaftGroupService cluster;
-        private StateMachine fsm;
-
-        private String groupId = "dataflow";
-        private Executor readIndexExecutor = createReadIndexExecutor();
-
-        public StateServer(String addr, Configuration conf) {
-            // String[] peers = new
-            // String[]{"localhost:4440","localhost:4441","localhost:4442"};
-            // String[] sprPeers = new String[]{"3440","3441","3442"};
-
-            // var peeridstr = peers[Integer.parseInt(serverId)];
-            // var sprPort = sprPeers[Integer.parseInt(args[0])];
-
-            // String groupId = "jraft";
-
-            // conf =
-            // JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
-
-            PeerId serverId = JRaftUtils.getPeerId(addr);
-            int port = serverId.getPort();
-
-            NodeOptions nodeOptions = new NodeOptions();
-
-            nodeOptions.setElectionTimeoutMs(1000);
-            nodeOptions.setSnapshotLogIndexMargin(3600);
-            nodeOptions.setInitialConf(conf);
-
-            File RaftDataFile = new File(String.format("./raftdata/%d", port));
-            log.info("mkdirs: {}", RaftDataFile.mkdirs());
-
-            nodeOptions.setLogUri(String.format("./raftdata/%d/logs", port));
-            nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port));
-            nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port));
-            fsm = new StateMachine(); // 状态实例初始化
-            nodeOptions.setFsm(fsm);
-
-            cluster = new RaftGroupService(groupId, serverId, nodeOptions);
-
-            // 扫描注解RaftProccessor 注册
-            var now = Instant.now();
-            HashMap<String, Class<?>> scansMap = new HashMap<>();
-            var traces = Thread.currentThread().getStackTrace();
-            var clsName = traces[traces.length - 1].getClassName();
-            var packName = clsName.substring(0, Utils.indexOfAsPossible(clsName, ".", 3));
-            log.info("获取 {} -> {} 下包的所有注解", clsName, packName);
-
-            var refl = new Reflections(packName);
-            Set<Class<?>> scans = refl.getTypesAnnotatedWith(GrpcProcessor.class);
-
-            scans.forEach((pRaftClass) -> {
-                scansMap.put(pRaftClass.getName(), pRaftClass);
-            });
-            log.info("扫描注解的时间(scan annotations cost time): {} ms", Duration.between(now, Instant.now()).toMillis());
-            scansMap.forEach((name, pRaftClass) -> {
-                try {
-                    cluster.getRpcServer()
-                            .registerProcessor((RpcProcessor<?>) pRaftClass.getDeclaredConstructor().newInstance());
-                } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
-                        | InvocationTargetException | NoSuchMethodException | SecurityException e) {
-                    log.info("{} {}", name, e.toString());
-                }
-            });
-
-            refl.getTypesAnnotatedWith(MasterRegister.class).forEach((pClass) -> {
-                try {
-                    MasterExecute execute = (MasterExecute) pClass.getDeclaredConstructor().newInstance();
-                    MasterFactory.registerMasterLoop(execute);
-
-                } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
-                        | InvocationTargetException | NoSuchMethodException | SecurityException e) {
-                    log.info("{}", e.toString());
-
-                }
-            });
-            ;
-
-            // 启动集群
-            node = cluster.start();
-
-            rpcClient = new BoltRaftRpcFactory().createRpcClient(); // 创建rpc客户端.
-            rpcClient.init(new CliOptions()); // 初始化
-        }
-
-        public boolean isLeader() {
-            return this.fsm.isLeader();
-        }
-
-        public void readIndexState(GenericClosure closure) {
-
-            getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
-                @Override
-                public void run(Status status, long index, byte[] reqCtx) {
-                    // log.debug("readIndexState({}) {}", getServerId(), status);
-                    if (status.isOk()) {
-                        closure.success(ss.fsm.getState());
-                        closure.setValue(ss.fsm.getState());
-                        closure.run(status);
-                        return ;
-                    } 
-                    // 回调失败
-                    // 提交同步
- 
-                    log.info("status not ok");
-                    // readIndexExecutor.execute(()->{
-                    //     if(isLeader()) {
-                    //         Operate.CallOperate(new Operate(OperateType.GET_STATE, null), closure);
-                    //     } else {
-                    //         handlerNotLeaderError(closure);
-                    //     }  
-                    // });               
-                }
-            });
-
-        }
-
-        public void applyOperate(OperateOld op, GenericClosure closure) {
-            // 所有的提交都必须再leader进行
-            if (!ss.isLeader()) {
-                ss.handlerNotLeaderError(closure);
-                return;
-            }
-
-            try {
-                closure.setValue(op);
-                final Task task = new Task();
-                task.setData(
-                        ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op)));
-                task.setDone(closure); // 确认所有数据 一致, 不需要加锁
-                StateFactory.getNode().apply(task);
-               
-            } catch (CodecException e) {
-                String errorMsg = "Fail to encode Operate";
-                log.debug("{} {}", errorMsg, e);
-                closure.failure(errorMsg, PeerId.emptyPeer());
-                closure.run(new Status(RaftError.EINTERNAL, errorMsg));
-            }
-        }
-
-        public   RaftResponse  redirect() {
-            final RaftResponse response = new RaftResponse ();
-            response.setSuccess(false);
-            if (this.node != null) {
-                final PeerId leader = this.node.getLeaderId();
-                if (leader != null) {
-                    response.setRedirect(leader);
-                }
-            }
-            return response;
-        }
-
-        public   void handlerNotLeaderError(final GenericClosure  closure) {
-            closure.failure("Not leader.", redirect().getRedirect());
-            closure.run(new Status(RaftError.EPERM, "Not leader"));
-        }
-
-        private Executor createReadIndexExecutor() {
-            return ThreadPoolUtil.newBuilder() //
-                    .poolName("ReadIndexPool") //
-                    .enableMetric(true) //
-                    .coreThreads(4) //
-                    .maximumThreads(4) //
-                    .keepAliveSeconds(60L) //
-                    .workQueue(new SynchronousQueue<>()) //
-                    .threadFactory(new NamedThreadFactory("ReadIndexService", true)) //
-                    .rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) //
-                    .build();
-        }
-    }
-
-    public static void main(String[] args) throws InterruptedException, RemotingException {
-
-        ReflectionUtils.get(ReflectionUtils.SuperClass.of(State.class));
-
-        var rpcClient = new BoltRaftRpcFactory().createRpcClient();
-        rpcClient.init(new CliOptions());
-        var resp = rpcClient.invokeSync(new Endpoint("localhost", 4441), new OperateRequest(), 5000);
-        log.info("{}", resp);
-    }
-}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java
deleted file mode 100644
index d642e43..0000000
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java
+++ /dev/null
@@ -1,282 +0,0 @@
-package com.yuandian.dataflow.statemachine_old;
-
-import java.nio.ByteBuffer;
-import java.time.Instant;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.alipay.remoting.exception.CodecException;
-import com.alipay.remoting.serialization.SerializerManager;
-import com.alipay.sofa.jraft.Closure;
-import com.alipay.sofa.jraft.Iterator;
-import com.alipay.sofa.jraft.Status;
-import com.alipay.sofa.jraft.conf.Configuration;
-import com.alipay.sofa.jraft.core.StateMachineAdapter;
-import com.alipay.sofa.jraft.entity.LeaderChangeContext;
-import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.error.RaftException;
-import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
-import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
-import com.yuandian.dataflow.statemachine.state.State;
-import com.yuandian.dataflow.statemachine.state.WorkerState;
-// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
-import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
-import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
-import com.yuandian.dataflow.utils.Utils;
-
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * Counter state machine.
- *
- * @author boyan (boyan@alibaba-inc.com)
- *
- *         2018-Apr-09 4:52:31 PM
- */
-@Slf4j
-public class StateMachine extends StateMachineAdapter {
-
-    // private static final Logger LOG =
-    // LoggerFactory.getLogger(StateMachine.class);
-
-    /**
-     * State value 全局使用的唯一状态
-     */
-    private State state = new State();
-    /**
-     * Leader term
-     */
-    private final AtomicLong leaderTerm = new AtomicLong(-1);
-
-    public boolean isLeader() {
-        return this.leaderTerm.get() > 0;
-    }
-
-    /**
-     * Returns current value. 只有Get 操作状态由协议流程决定 Apply
-     */
-    public State getState() {
-        return state;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public void onApply(final Iterator iter) {
-        while (iter.hasNext()) {
-
-            OperateOld op = null;
-            GenericClosure closure = null;
-            if (iter.done() != null) {
-                // leader可以直接从 回调closure里提取operate
-                closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
-                op = (OperateOld)closure.getValue();
-            } else {
-                 // 非leader 需要从getData反序列化出来后处理
-                final ByteBuffer data = iter.getData();
-                try {
-                    op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
-                            data.array(),
-                            OperateOld.class.getName());
-                } catch (CodecException e) {
-                    log.info("{}", e.toString());
-                }
-            }
-
-            if (op == null) {
-                log.error("op 为 {}. 存在错误, 可能版本不一致", op);
-                continue;
-            }
-
-            switch (op.getType()) {
-
-                case PUT_WORKERSTATE:
-                    WorkerState opws = op.getValue();
-                    log.debug("PUT {}", opws.peerId);
-                    // state.getWorkers().put(opws.peerId, opws);
-                    if (closure != null) {
-                        closure.success(op);
-                        closure.run(Status.OK());
-                    }
-                    break;
-                case ALLOCATE_PACKETS:
-                    List<PeerId> alivePeers = op.getValue();
-                    PeerId[] peers = new PeerId[alivePeers.size()]; 
-                    alivePeers.toArray(peers);
-    
-                    // 统计 每个节点还有多少任务容量
-                    var isNext = false;
-                    var canTasks = new int[alivePeers.size()];
-                    for(int i = 0; i < peers.length; i++) {
-                        var peer = peers[i];
-                        WorkerState ws = state.getWorkers().get(peer);
-                        if (ws == null) {
-                            log.error("WorkerState获取的状态为 {}", ws);
-                            continue;
-                        }
-                        var can =  OperateOld.MAX_TASKS - ws.getTaskQueueSize();
-                        canTasks[i] = can;
-                        if(!isNext) {
-                            isNext = true;
-                        }
-                    }
-
-                    if(!isNext) {
-                        break;
-                    }
-
-                    // log.info("size: {}", Operate.packetsManager.size());
-                    // 统计每个节点发送多少任务
-                    var allocTasks = Utils.allocationTasks(OperateOld.packetsManager.size(), canTasks);
-                    if(closure != null) {
-                        closure.setValue(allocTasks);
-                    }
-                    for(int i = 0; i < peers.length; i++) {
-                        var peer = peers[i];
-                        if(allocTasks[i] <= 0) {
-                            continue;
-                        } 
-                
-                        WorkerState ws = state.getWorkers().get(peer);
-                        ws.setUpdateAt(Instant.now());
-                        ws.setTaskQueueSize(ws.getTaskQueueSize() + allocTasks[i]);
-                        // log.info("剩余能处理的任务数量[{}] :{}", peer, Operate.MAX_TASKS - ws.getTaskQueueSize());
-                    }
-
-                    break;
-                // case GET_STATE:     
-                //     closure.setValue(this.state);
-                //     log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex());
-                //     break;
-                case REMOVE:
-                    break;
-                default:
-                    break;
-
-            }
-
-            if (closure != null) {
-                closure.success(op);
-                closure.run(Status.OK());
-            }
-           
-
-            iter.next();
-        }
-    }
-
-    @Override
-    public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
-        return;
-    }
-
-    @Override
-    public void onError(final RaftException e) {
-        log.debug("Raft error: {}", e, e);
-    }
-
-    @Override
-    public boolean onSnapshotLoad(final SnapshotReader reader) {
-        return true;
-    }
-
-    @Override
-    public void onLeaderStart(final long term) {
-        log.debug("onLeaderStart[{}]", StateFactory.getServerId());
-        this.leaderTerm.set(term);
-
-        // 判断是否Master线程还在跑, 如果存在则中断
-        if (MasterFactory.getMasterExecute().isAlive()) {
-            MasterFactory.getMasterExecute().interrupt();
-        }
-
-  
-                
-        var ws = this.state.getWorkers().get(StateFactory.getServerId());
-        if (ws == null) {
-            // ws = new WorkerState(StateFactory.getServerId());
-        }
-
-        // 更新当前WorkerState
-        StateFactory.applyOperate(new OperateOld(OperateType.PUT_WORKERSTATE, ws), new GenericClosure () {
-            @Override
-            public void run(Status status) {
-                log.debug("master update workerstate: {}", status);
-            }
-        });
-          
-
-        // 当成为master时候 必须启动
-        MasterFactory.getMasterExecute().start();
-        super.onLeaderStart(term);
-    }
-
-    @Override
-    public void onLeaderStop(final Status status) {
-        log.debug("onLeaderStop[{}]", StateFactory.getServerId());
-        this.leaderTerm.set(-1);
-        // 判断是否Master线程还在跑, 如果存在则中断
-        if (MasterFactory.getMasterExecute().isAlive()) {
-            MasterFactory.getMasterExecute().interrupt();
-        }
-        super.onLeaderStop(status);
-    }
-
-    @Override
-    public void onShutdown() {
-        log.info("onShutdown[{}]",StateFactory.getServerId());
-        super.onShutdown();
-    }
-
-    @Override
-    public void onStartFollowing(LeaderChangeContext ctx) {
-        log.debug("onStartFollowing[{}]] {}", StateFactory.getServerId(),ctx);
-        try {
-
-            // 判断是否Master线程还在跑, 如果存在则中断
-            if (MasterFactory.getMasterExecute().isAlive()) {
-                MasterFactory.getMasterExecute().interrupt();
-            }
-
-            // 在startFollowing不能使用 readIndexState
-
-           
-            // 更新当前WorkerState
-            // OperateOld.CallOperate(new OperateOld(OperateType.PUT_WORKERSTATE, new WorkerState(StateFactory.getServerId())), new GenericClosure() {
-            //     @Override
-            //     public void run(Status status) {
-            //         log.debug("onStartFollowing update workerstate: {}", status);
-            //     }
-            // });
- 
-            return;
-        } catch (Exception e) {
-            log.info("{}", e.toString());
-        }
-
-        super.onStartFollowing(ctx);
-    }
-
-    @Override
-    public void onConfigurationCommitted(Configuration conf) {
-        super.onConfigurationCommitted(conf);
-    }
-
-    @Override
-    
-    public void onStopFollowing(LeaderChangeContext ctx) {
-        log.debug("onStopFollowing[{}] {}", StateFactory.getServerId(), ctx);
-
-        // var ws = new WorkerState(StateFactory.getServerId());
-        // var op = new OperateOld(OperateType.PUT_WORKERSTATE, ws);
-        // OperateOld.CallOperate(op, new GenericClosure() {
-        //     @Override
-        //     public void run(Status status) {
-        //         log.info("{} {}", status, this.getResponse());
-        //     }
-        // });
-
-        super.onStopFollowing(ctx);
-    }
-
-}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/closure/GenericClosure.java b/src/main/java/com/yuandian/dataflow/statemachine_old/closure/GenericClosure.java
deleted file mode 100644
index c828f97..0000000
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/closure/GenericClosure.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package com.yuandian.dataflow.statemachine_old.closure;
-
-import com.alipay.sofa.jraft.Closure;
-import com.alipay.sofa.jraft.entity.PeerId;
-import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-@Getter
-@Setter
-@ToString
-public  abstract  class  GenericClosure   implements Closure {
- 
- 
-	// 状态机的统一响应
-	private RaftResponse  response;
-	// 代表任务状态 
-	private Object value;
-
-    public <T> T getValue() {
-        if(this.value == null) {
-            return null;
-        }
-        return (T)this.value;
-    }
-
-    public GenericClosure() {
-  
-    }
-
-	/**
-     * 错误的时候返回错误信息.  自动装配response
-	 * @param errorMsg
-	 * @param redirect
-	 */
-	public void failure(final String errorMsg, final PeerId redirect) {
-        final RaftResponse response = new RaftResponse();
-        response.setSuccess(false);
-        response.setMsg(errorMsg);
-        log.error("{}", errorMsg);
-        response.setRedirect(redirect);
-        setResponse(response);
-    }
-
-    /**
-     * 成功时调用该方法. 自动装配response
-     * @param value
-     */
-    public void success(final Object value) {
-        final RaftResponse  response = new RaftResponse ();
-        response.setValue(value);
-        response.setSuccess(true);
-        setResponse(response);
-    }
-}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java b/src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java
deleted file mode 100644
index fb4eae4..0000000
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package com.yuandian.dataflow.statemachine_old.operate;
-
-import java.io.Serializable;
-
-import com.alipay.sofa.jraft.Status;
-import com.alipay.sofa.jraft.error.RemotingException;
-import com.alipay.sofa.jraft.rpc.InvokeCallback;
-import com.yuandian.dataflow.statemachine.state.WorkerState;
-import com.yuandian.dataflow.statemachine_old.StateFactory;
-import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor;
-import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
-import com.yuandian.dataflow.utils.PacketsManager;
-
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * 操作
- * 
- * @author eson
- */
-@Slf4j
-@Data
-public class OperateOld implements Serializable {
-
-    private static int DEFAULT_ASYNC_TIMEOUT = 5000;
-    public static final int MAX_TASKS = 1000;
-    public static PacketsManager packetsManager = new PacketsManager();
-
-    public static enum OperateType {
-        /**
-         * 同步WorkerState状态. 
-         */
-        PUT_WORKERSTATE, 
-
-        /**
-         * 分配packets
-         */
-        ALLOCATE_PACKETS, 
-
- 
-        /**
-         *  暂无想法
-         */
-        REMOVE;
-    }
-
-    private OperateType type;
-    private Object value;
-
- 
-
-    public OperateOld(OperateType t, Object v) {
-        this.type = t;
-        this.value = v;
-    }
-
-    @java.lang.SuppressWarnings("unchecked")
-    public <T> T getValue() {
-        return (T) this.value;
-    };
-
-    public <T> void setValue(T value) {
-        this.value = value;
-        return;
-    };
-
-    /**
-     * 调用操作设置
-     * @param op 传入的操作类
-     * @param closure 回调函数. Operate为返回值
-     */
-    @java.lang.SuppressWarnings("unchecked")
-    public static void CallOperate(OperateOld op, GenericClosure closure) {
-        // log.debug("CallOperate Value {}", op.<WorkerState>getValue());
-        // 如果是leader 就直接提交
-        if (StateFactory.isLeader()) { 
-            StateFactory.applyOperate(op, closure);
-            return;
-        }
-
-        // 非leader 转发请求 统一有leader处理
-        var request = new OperateProcessor.OperateRequest();
-        request.setOperate(op);
-        try {
-            StateFactory.rpcClientInvokeAsync(StateFactory.getLeaderId().getEndpoint(), request, new InvokeCallback() {
-                        @Override
-                        public void complete(Object result, Throwable err) {
-                            // log.debug("Object result {}", result);
-                            var resp = (RaftResponse) result;
-                            closure.setResponse(resp);
-                            closure.success(resp.getValue());
-                            closure.run(Status.OK());
-                        }
-
-                    }, DEFAULT_ASYNC_TIMEOUT);
-        } catch (InterruptedException | RemotingException e) {
-            closure.failure(e.getMessage(), null);
-            closure.run(new Status(100000, "invokeAsync fail"));
-            log.info("{}", e.toString());
-        }
-
-    }
-
-
-  
-}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java
deleted file mode 100644
index 8a22d8b..0000000
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * description
- *
- * @author eson
- *2022年7月12日-11:10:54
- */
-package com.yuandian.dataflow.statemachine_old.rpc;
-
-import java.io.Serializable;
-
-import com.alipay.sofa.jraft.Status;
-import com.alipay.sofa.jraft.error.RaftError;
-import com.alipay.sofa.jraft.rpc.RpcContext;
-import com.alipay.sofa.jraft.rpc.RpcProcessor;
-import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
-import com.yuandian.dataflow.statemachine_old.StateFactory;
-import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
-import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
-
-import javassist.ClassPath;
- 
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * description
- *
- * @author eson
- *2022年7月12日-11:10:54
- */
-@Slf4j
- 
-public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRequest> {
-
-    /**
-     * 同步状态时, 需要用的结构类. 新增的状态可以在 Operate结构里添加
-     *
-     * @author eson
-     *2022年7月11日-16:01:07
-    */
-    @Getter
-    @Setter
-    @ToString
-    public static class OperateRequest  implements Serializable {
-
-        private static final long serialVersionUID = 1L;
-    
-        private OperateOld operate; 
-    }
-
-
-    @Override
-    public void handleRequest(RpcContext rpcCtx, OperateRequest  request) {
-   
-        // log.info("request: {}", request);
- 
-        final GenericClosure  closure = new GenericClosure () {
-            @Override
-            public void run(Status status) {
-                
-                if(status.isOk()) {
-                    // log.info("{}", status);
-                    rpcCtx.sendResponse(getResponse());
-                    return;
-                }
- 
-                if(status.getRaftError() == RaftError.EPERM) {
-                    //TODO: Not leader 需要转发
-                    log.info("{}", status);
-                } 
-                
-            }
-        };
-    
-        StateFactory.getStateServer().applyOperate(request.getOperate(), closure); 
-    }
-
-    @Override
-    public String interest() {
-        return OperateRequest.class.getName();
-    }
-}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/RaftResponse.java b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/RaftResponse.java
deleted file mode 100644
index ce3b494..0000000
--- a/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/RaftResponse.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * description
- *
- * @author eson
- *2022年7月13日-09:07:22
- */
-package com.yuandian.dataflow.statemachine_old.rpc;
-
-import java.io.Serializable;
-
-import com.alipay.sofa.jraft.entity.PeerId;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * description
- *
- * @author eson
- *2022年7月13日-09:07:22
- */
-@Slf4j
-@Getter
-@Setter
-@ToString
-public class RaftResponse  implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private Object value;
-
-    private boolean           success;
-    /**
-     * redirect peer id
-     */
-    private PeerId            redirect;
-
-    private String            msg;
-}