From 7381a55174181c320a1c79ef8554cba15edc213b Mon Sep 17 00:00:00 2001
From: huangsimin <474420502@qq.com>
Date: Thu, 28 Jul 2022 16:01:34 +0800
Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84closure=E5=A4=B1=E8=B4=A5?=
 =?UTF-8?q?=E7=9A=84=E8=BE=93=E5=85=A5?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../dataflow/controller/PacketsProcessor.java | 56 +++++++++++--------
 .../statemachine/operate/Operate.java         |  9 ++-
 2 files changed, 40 insertions(+), 25 deletions(-)

diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
index 69c6117..2163df5 100644
--- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
+++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
@@ -57,36 +57,48 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
         // StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
         var resp = new RaftResponse<>();
         resp.setSuccess(true);
-        rpcCtx.sendResponse(resp);
+        rpcCtx.sendResponse(resp); // 因为jraft是异步. 收到数据直接返回
 
-        var ss = StateFactory.getStateServer();
-        log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size());
+        try {
+            log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size());
+            // TODO: request.packets 入库,回填, 告警 等操作
 
-        // 读状态 Closure<State> 里的 getValue<State>为 State的状态
-        ss.readIndexState(new GenericClosure<State>() {
+            
+
+        } finally { // 确保 更新 最终的任务状态给master.
+
+            // 读状态 Closure<State> 里的 getValue<State>为 State的状态
+            StateFactory.readIndexState(new GenericClosure<State>() {
+
+                @Override
+                public void run(Status status) {
+
+                    if (!status.isOk()) {
+                        log.error("失败 readIndexState {}", status);
+                    }
+
+                    // readIndexState 失败后也需要直接 更新自己状态
 
-            @Override
-            public void run(Status status) {
-                
-                if (status.isOk()) {
                     var state = this.getValue(); // 获取返回的状态
                     var ws = state.getWorkers().get(StateFactory.getServerId());
-                    ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size());
+                    ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
                     ws.setUpdateAt(Instant.now()); // 设置更新时间
+ 
+                    Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
+                            new GenericClosure<Operate>() {
+                                @Override
+                                public void run(Status status) {
+                                    if (!status.isOk()) {
+                                        log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp);
+                                    }
+                                }
+                            });
 
-                    // log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(),
-                    //         request.packets.size(), state.getWorkers().size());
-                    Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure<Operate>() {
-                        @Override
-                        public void run(Status status) {
-                            if (status.isOk()) {
-                                log.info("[{}] {}", StateFactory.getServerId(), resp);
-                            }
-                        }
-                    });
                 }
-            }
-        });
+            });
+        }
+        ;
+
     }
 
     @Override
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java
index d385dd2..976af37 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java
@@ -3,6 +3,8 @@ package com.yuandian.dataflow.statemachine.operate;
 import java.io.Serializable;
 
 import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.error.RaftError;
+import com.alipay.sofa.jraft.error.RaftException;
 import com.alipay.sofa.jraft.error.RemotingException;
 import com.alipay.sofa.jraft.rpc.InvokeCallback;
 import com.yuandian.dataflow.statemachine.StateFactory;
@@ -77,8 +79,8 @@ public class Operate implements Serializable {
 
                         @Override
                         public void complete(Object result, Throwable err) {
-                            log.info("Object result {}", result);
-                            //TODO: 解决回调的次序问题
+                            log.debug("Object result {}", result);
+                      
                             var resp = (RaftResponse<Operate>) result;
                             closure.setResponse(resp);
                             closure.success(resp.getValue());
@@ -87,7 +89,8 @@ public class Operate implements Serializable {
 
                     }, 5000);
         } catch (InterruptedException | RemotingException e) {
-            closure.failure("failure", null);
+            closure.failure(e.getMessage(), null);
+            closure.run(new Status(100000, "invokeAsync fail"));
             log.info("{}", e.toString());
         }