From d7cd8ed7580b2c985b3a1c78f2cb735e4be09503 Mon Sep 17 00:00:00 2001
From: huangsimin <eson.hsm@nonolive.com>
Date: Tue, 19 Jul 2022 18:26:56 +0800
Subject: [PATCH] =?UTF-8?q?TODO:=20=E5=90=84=E4=B8=AA=E8=8A=82=E7=82=B9?=
 =?UTF-8?q?=E7=9A=84=E9=94=80=E6=AF=81=E5=85=B3=E7=B3=BB?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../java/com/yuandian/dataflow/Server.java    |  7 +-
 .../yuandian/dataflow/controller/TaskLog.java | 28 +++++---
 .../dataflow/statemachine/StateMachine.java   | 45 ++++++++-----
 .../statemachine/StateServerFactory.java      | 66 +++++++++++++++++--
 src/main/resources/logback.xml                |  4 +-
 .../java/com/yuandian/dataflow/AppTest.java   |  1 +
 start.sh                                      |  3 +-
 7 files changed, 115 insertions(+), 39 deletions(-)

diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java
index 89ce643..4a49a81 100644
--- a/src/main/java/com/yuandian/dataflow/Server.java
+++ b/src/main/java/com/yuandian/dataflow/Server.java
@@ -38,8 +38,7 @@ public class Server {
  
     public static void main(String[] args) throws Exception {
     
-
-
+     
 
         String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
         String[] sprPeers = new String[]{"3440","3441","3442"};
@@ -54,12 +53,10 @@ public class Server {
 
         Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
    
-        StateServerFactory.InitStateServer(peeridstr, conf);
+        StateServerFactory.initStateServer(peeridstr, conf);
  
         System.setProperty("server.port", sprPort);
         var app = SpringApplication.run(Server.class, args);
         app.start();
-
-        
     }
 }
diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java
index 216a65c..4a48904 100644
--- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java
+++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java
@@ -24,26 +24,36 @@ public class TaskLog {
 
 	@GetMapping(path = "/test")
 	public ResponseEntity<Response> Processing() throws InterruptedException, RemotingException {
- 
-
-		var ws = new WorkerState(new PeerId());
-		StateServerFactory.getStateServer().updateFsmWorkerState(ws);
-		final Response response = new Response();
+		// var ws = new WorkerState(new PeerId());
+		// StateServerFactory.getStateServer().updateFsmWorkerState(ws);
+		Response response = new Response();
 		StateServerFactory.getStateServer().useFsmState((fsmState)->{
 			log.debug(fsmState.toString() );
+			log.debug( StateServerFactory.getNode().getLeaderId().toString() );
 			response.Message = fsmState.toString();
 			return null;
 		});
-
-		
 		response.Code = HttpStatus.OK;
 		return new ResponseEntity<Response>(response, HttpStatus.OK);
 	}
 
 	@GetMapping(path = "/test2")
-	public ResponseEntity<Response> MongodbTest(@RequestParam int status) {
-
+	public ResponseEntity<Response> MongodbTest() {
 		Response response = new Response();
+		StateServerFactory.getStateServer().useFsmStateAsync((fsmState)->{
+			log.debug("{} {}", fsmState.toString());
+			// response.Message = fsmState.toString();
+			return null;
+		});
+
+		return new ResponseEntity<Response>(response, HttpStatus.OK);
+	}
+
+
+	@GetMapping(path = "/test3")
+	public ResponseEntity<Response> RemoveLeader() {
+		Response response = new Response();
+		StateServerFactory.getNode().shutdown();
 		return new ResponseEntity<Response>(response, HttpStatus.OK);
 	}
 }
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
index ede6e55..4d292da 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
@@ -111,7 +111,7 @@ public class StateMachine extends StateMachineAdapter {
 
 
     @Override
-    // @SuppressWarnings("unchecked")
+    @SuppressWarnings("unchecked")
     public void onApply(final Iterator iter) {
         while (iter.hasNext()) {
 
@@ -131,7 +131,7 @@ public class StateMachine extends StateMachineAdapter {
                 final ByteBuffer data = iter.getData();
                 try {
                     synchronized(state) {
-                state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
+                    state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
                     data.array(), State.class.getName());
                     log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm);
                 }
@@ -153,20 +153,17 @@ public class StateMachine extends StateMachineAdapter {
 
     @Override
     public void onError(final RaftException e) {
-        log.debug("Raft error: {}", e, e);
+        log.error("Raft error: {}", e, e);
     }
 
     @Override
     public boolean onSnapshotLoad(final SnapshotReader reader) {
-
         return true;
     }
 
     @Override
     public void onLeaderStart(final long term) {
         this.leaderTerm.set(term);
-        super.onLeaderStart(term);
-
         try {
             updateState((state)->{
                 var ws = new WorkerState(StateServerFactory.getStateServer().getCluster().getServerId());
@@ -178,15 +175,31 @@ public class StateMachine extends StateMachineAdapter {
         } catch (RemotingException e) {
             e.printStackTrace();
         }
-
- 
-        return;
+        super.onLeaderStart(term);
     }
 
+    @Override
+    public void onLeaderStop(final Status status) {
+        this.leaderTerm.set(-1);
+        super.onLeaderStop(status);
+
+        try {
+            updateState((state)->{
+                state.getWorkers().remove( StateServerFactory.getServerId() );
+                return state;
+            });
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } catch (RemotingException e) {
+            e.printStackTrace();
+        }
+    }
+ 
+
     @Override
     public void onStartFollowing(LeaderChangeContext ctx) {
    
-        super.onStartFollowing(ctx);
+        
         
         try {
             var ss = StateServerFactory.getStateServer();
@@ -204,15 +217,15 @@ public class StateMachine extends StateMachineAdapter {
         } catch (InterruptedException | RemotingException e) {
             e.printStackTrace();
         }
-     
+
+        super.onStartFollowing(ctx);
     }
 
-    @Override
-    public void onLeaderStop(final Status status) {
-        this.leaderTerm.set(-1);
-        super.onLeaderStop(status);
-    }
+    
 
+ 
+
+  
     public static void main(String[] args) throws InterruptedException, RemotingException {
         
     }
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java
index 3e6338c..5f01dc3 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java
@@ -58,13 +58,28 @@ public class StateServerFactory {
 
     private static StateServer ss;
  
-    public static void InitStateServer(String peerstr, Configuration conf) throws Exception {
+    public static void initStateServer(String peerstr, Configuration conf) throws Exception {
         if(ss != null) {
             throw new Exception("重复初始化 InitStateServer");
         }
         ss = new StateServerFactory.StateServer(peerstr, conf);
         log.debug("init peerid {}", ss.node.getNodeId().getPeerId());
+    }
 
+    public static PeerId getServerId() {
+        return ss.getCluster().getServerId();
+    }
+
+    public static Node getNode() {
+        return ss.getNode() ;
+    }
+
+    public static RpcClient getRpcClient() {
+        return ss.getRpcClient();
+    }
+
+    public static RaftGroupService getCluster() {
+        return ss.getCluster();
     }
 
     // 获取状态服务的对象
@@ -136,11 +151,11 @@ public class StateServerFactory {
             SyncClosure<State> closure = new SyncClosure<State>() {
                 @Override
                 public void run(Status status) {
-                    
+
                 }
              };
              
-             
+ 
             getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
                 @Override
                 public void run(Status status, long index, byte[] reqCtx) {
@@ -167,7 +182,7 @@ public class StateServerFactory {
                     });
 
                     synchronized(dofunc) {
-                        log.debug("dofunc notify");
+                        // log.debug("dofunc notify {}", getNode());
                         dofunc.notify();
                     }
                 }
@@ -175,9 +190,9 @@ public class StateServerFactory {
  
             try {
                 synchronized(dofunc) {
-                    log.debug("dofunc wait");
+                    // log.debug("dofunc wait");
                     dofunc.wait(5000);
-                    log.debug("dofunc unwait");
+                    // log.debug("dofunc unwait");
                 }
             } catch (InterruptedException e) {
                 e.printStackTrace();
@@ -186,6 +201,43 @@ public class StateServerFactory {
          
     }
 
+    public void useFsmStateAsync(Function<State, Void> dofunc) { 
+
+        SyncClosure<State> closure = new SyncClosure<State>() {
+            @Override
+            public void run(Status status) {
+                dofunc.apply(this.getValue());
+            }
+         };
+         
+        getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
+            @Override
+            public void run(Status status, long index, byte[] reqCtx) {
+
+                getFsm().useState((fsmState)->{ 
+                    if(status.isOk()){
+                        closure.setValue(fsmState);
+                        closure.success(fsmState);
+                        closure.run(Status.OK()); 
+                        return null;
+                    }
+
+                    readIndexExecutor.execute(() -> {
+                        if(isLeader()){
+                            log.info("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", getNode());
+                            applyState(fsmState, closure);
+                        }else {
+                            handlerNotLeaderError(closure);
+                        }
+                    });
+                    return null;
+                });
+            }
+        });
+ 
+    return ;
+    }
+
     /**
      * 同步更新 WorkerState
      * @param dofunc
@@ -194,6 +246,7 @@ public class StateServerFactory {
      */
     public void updateFsmWorkerState(WorkerState ws) throws InterruptedException, RemotingException {
 
+        // leader就直接提交
         if(isLeader()) {
             var closure = new SyncClosure<State>() {
                 @Override
@@ -208,6 +261,7 @@ public class StateServerFactory {
         }
 
         try {
+            // 非leader就 rpc请求
             var ss = StateServerFactory.getStateServer();
             var request = new RequestCondition();
             request.setWorkerState(ws);
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index fe5dcb1..4b84335 100644
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -6,11 +6,11 @@
             %d{yyyyMMdd HH:mm:ss.SSS} %level %thread\(%file:%line\): %msg%n
          </pattern>
       </encoder>
-      <filter class="ch.qos.logback.classic.filter.LevelFilter">
+      <!-- <filter class="ch.qos.logback.classic.filter.LevelFilter">
          <level>DEBUG</level>
          <onMatch>ACCEPT</onMatch>
          <onMismatch>DENY</onMismatch>
-       </filter>
+       </filter> -->
    </appender>
 
 
diff --git a/src/test/java/com/yuandian/dataflow/AppTest.java b/src/test/java/com/yuandian/dataflow/AppTest.java
index 14a9c6b..0d6d478 100644
--- a/src/test/java/com/yuandian/dataflow/AppTest.java
+++ b/src/test/java/com/yuandian/dataflow/AppTest.java
@@ -41,6 +41,7 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class AppTest {
 
+  
     @FunctionalInterface
     public interface FuncReturn {
         public float Execute();
diff --git a/start.sh b/start.sh
index 42bed0d..748c796 100755
--- a/start.sh
+++ b/start.sh
@@ -1,6 +1,6 @@
 screen -S raft-0 -X quit
 screen -S raft-1 -X quit
-screen -S raft-2 -X quit
+screen -S raft-2 -X quit 
 
 sleep 1
 
@@ -8,6 +8,7 @@ screen  -dmS raft-0 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 0
 screen  -dmS raft-1 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 1
 screen  -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2
 
+sleep 1
 
 screen  -S raft-0 -X logfile flush 0
 screen  -S raft-1 -X logfile flush 0