From 761a8dd664f293b2234a0c694617143dab2e70bd Mon Sep 17 00:00:00 2001
From: huangsimin <eson.hsm@nonolive.com>
Date: Thu, 21 Jul 2022 16:32:48 +0800
Subject: [PATCH] =?UTF-8?q?TODO:=20=E6=9B=B4=E6=96=B0=E7=8A=B6=E6=80=81?=
 =?UTF-8?q?=E7=9A=84=20=E9=87=8D=E6=96=B0=E8=AE=BE=E8=AE=A1=20=E9=81=BF?=
 =?UTF-8?q?=E5=85=8D=E8=87=AA=E9=94=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../dataflow/controller/PacketsProcessor.java |  1 +
 .../statemachine/StateServerFactory.java      |  4 +-
 .../statemachine/state/StateFactory.java      | 80 +++++++++++--------
 src/main/resources/logback.xml                |  6 +-
 4 files changed, 54 insertions(+), 37 deletions(-)

diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
index c8a3162..ff00dc9 100644
--- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
+++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
@@ -45,6 +45,7 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
             log.info("{}",request.packets.size());
             var resp = new ResponseSM();
             resp.setMsg(rpcCtx.getRemoteAddress());
+ 
             rpcCtx.sendResponse(resp);
     }
 
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java
index eca31a3..4f47e9e 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java
@@ -126,6 +126,7 @@ public class StateServerFactory {
         int port = serverId.getPort();
 
         NodeOptions nodeOptions = new NodeOptions();
+        
         nodeOptions.setElectionTimeoutMs(1000);
         nodeOptions.setSnapshotLogIndexMargin(3600);
         nodeOptions.setInitialConf(conf);
@@ -140,7 +141,8 @@ public class StateServerFactory {
         nodeOptions.setFsm(fsm);
 
         cluster = new RaftGroupService(groupId, serverId, nodeOptions);
-
+        
+        
          
         Set<Class<?>>  scans = new Reflections("com.yuandian.dataflow").getTypesAnnotatedWith(ProcessorRaft.class);
         scans.forEach((pRaftClass)->{
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java
index 6d9c165..0c21e5c 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java
@@ -6,13 +6,16 @@
  */
 package com.yuandian.dataflow.statemachine.state;
 
- 
+import java.time.Instant;
+
 import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.http.HttpContentEncoder.Result;
+import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.error.RemotingException;
 import com.google.protobuf.Any;
 import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
 import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
 import com.yuandian.dataflow.statemachine.StateServerFactory;
+import com.yuandian.dataflow.statemachine.SyncClosure;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -43,47 +46,58 @@ public class StateFactory {
                     if (alivePeers != null) {
                         var ss = StateServerFactory.getStateServer();
                         StateServerFactory.getStateServer().useFsmStateAsync((state) -> {
-                            synchronized(alivePeers){
-                            alivePeers.forEach((peer) -> {
-                                var ws = state.getWorkers().get(peer);
-                                if (ws != null) {
-                                    var cap = 100 - ws.getTaskQueueSize();
-                                    if (cap > 0) {
-                                        log.debug("{}", cap);
-                                        var request = new PacketsRequest();
-                                        for(int i = 0; i < cap ; i++ ) {
-                                            var p = Any.pack(
-                                                BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder()
-                                                        .setTableId(10086)
-                                                        .build()
-                                                );
+                            synchronized (alivePeers) {
+                                alivePeers.forEach((peer) -> {
+                                    WorkerState ws = state.getWorkers().get(peer);
+                                    if (ws != null) {
+                                        var cap = 100 - ws.getTaskQueueSize();
+                                        if (cap > 0) {
+                                            log.debug("{}", cap);
+                                            var request = new PacketsRequest();
+                                            for (int i = 0; i < cap; i++) {
+                                                var p = Any.pack(
+                                                        BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder()
+                                                                .setTableId(10086)
+                                                                .build());
 
                                                 request.getPackets().add(p);
 
-                                        }
-                          
-                                        try {
-                                            var result = ss.getRpcClient().invokeSync(ws.getPeerId().getEndpoint(), request, 5000);
-                                            log.info("{}", result);
-                                        } catch (InterruptedException e) {
-                                            // TODO Auto-generated catch block
-                                            e.printStackTrace();
-                                        } catch (RemotingException e) {
-                                            // TODO Auto-generated catch block
-                                            e.printStackTrace();
+                                            }
+                                            
+                                            try {
+                                                var result = ss.getRpcClient().invokeSync(ws.getPeerId().getEndpoint(),
+                                                        request, 5000);
+                                                log.info("{}", result);
+                                                ws.setUpdateAt( Instant.now() );
+                                                ws.setTaskQueueSize(ws.getTaskQueueSize() - cap);
+
+                                                
+
+                                            } catch (InterruptedException e) {
+                                                // TODO Auto-generated catch block
+                                                e.printStackTrace();
+                                            } catch (RemotingException e) {
+                                                // TODO Auto-generated catch block
+                                                e.printStackTrace();
+                                            }
                                         }
                                     }
-                                }
-                                alivePeers.notifyAll(); 
-                            });
-                        }
-                            
+
+                                    ss.applyState(state, new SyncClosure<State>() {
+                                        public void run(Status status) {
+                                            log.debug("{}", status);
+                                        };
+                                    } );
+                                    alivePeers.notifyAll();
+                                });
+                            }
+
                             return null;
                         });
-                        synchronized(alivePeers){
+                        synchronized (alivePeers) {
                             alivePeers.wait(5000);
                         }
-                        
+
                     }
 
                     Thread.sleep(2000);
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index 77df777..e67372b 100644
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -20,11 +20,11 @@
    </appender>
 
 
-   <!-- <root level="info">
+   <!-- <root level="debug">
       <appender-ref ref="CONSOLE" />
    </root> -->
    
-   <logger name="com.yuandian.dataflow" level="debug">
-      <appender-ref ref="CONSOLE" />
+   <logger name="com.yuandian.dataflow" level="debug|info">
+      <appender-ref ref="CONSOLE"/>
   </logger>
 </configuration>
\ No newline at end of file