From d1429e811473146872bcea2a3aeafe9df8bfd49e Mon Sep 17 00:00:00 2001 From: huangsimin Date: Sat, 30 Jul 2022 04:07:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E5=BC=BA=E5=A4=A7=E7=9A=84vs?= =?UTF-8?q?code=20debug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/launch.json | 42 +++++-- .../dataflow/controller/MasterProcessor.java | 108 ++++++++++++++++ .../dataflow/controller/PacketsProcessor.java | 4 +- .../controller/TransferLeaderProcessor.java | 4 +- .../dataflow/statemachine/MasterFactory.java | 115 +++--------------- .../dataflow/statemachine/StateFactory.java | 29 ++++- .../dataflow/statemachine/StateMachine.java | 1 - .../MasterRegister.java} | 4 +- .../WorkerRegister.java} | 4 +- .../statemachine/master/MasterContext.java | 15 +++ .../statemachine/master/MasterExecute.java | 8 ++ .../statemachine/rpc/OperateProcessor.java | 4 +- 12 files changed, 217 insertions(+), 121 deletions(-) create mode 100644 src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java rename src/main/java/com/yuandian/dataflow/statemachine/{rpc/annotations/Master.java => annotations/MasterRegister.java} (82%) rename src/main/java/com/yuandian/dataflow/statemachine/{rpc/annotations/ProcessorRaft.java => annotations/WorkerRegister.java} (80%) create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/master/MasterExecute.java diff --git a/.vscode/launch.json b/.vscode/launch.json index 6e17393..42668ae 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -6,29 +6,53 @@ "configurations": [ { "type": "java", - "name": "Launch OperateProcessor", + "name": "Raft-0", "request": "launch", - "mainClass": "com.yuandian.dataflow.statemachine.rpc.OperateProcessor", - "projectName": "dataflow" + "mainClass": "com.yuandian.dataflow.Server", + "projectName": "dataflow", + "args": [ + "0" + ], }, { "type": "java", - "name": "Launch StateFactory", + "name": "Raft-1", "request": "launch", - "mainClass": "com.yuandian.dataflow.statemachine.StateFactory", - "projectName": "dataflow" + "mainClass": "com.yuandian.dataflow.Server", + "projectName": "dataflow", + "args": [ + "1" + ], }, { "type": "java", - "name": "Launch Server", + "name": "Raft-2", "request": "launch", "mainClass": "com.yuandian.dataflow.Server", "projectName": "dataflow", "args": [ "2" ], - "preLaunchTask": "restart", - "postDebugTask": "stopall" + }, + + + // { + // "type": "java", + // "name": "Launch Server", + // "request": "launch", + // "mainClass": "com.yuandian.dataflow.Server", + // "projectName": "dataflow", + // "args": [ + // "2" + // ], + // "preLaunchTask": "restart", + // "postDebugTask": "stopall" + // } + ], + "compounds": [ + { + "name": "Rafts-Server", + "configurations": ["Raft-0","Raft-1", "Raft-2"] } ] } \ No newline at end of file diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java new file mode 100644 index 0000000..28c4f2e --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java @@ -0,0 +1,108 @@ +package com.yuandian.dataflow.controller; + +import java.time.Instant; +import java.util.List; + +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.rpc.InvokeCallback; +import com.google.protobuf.Any; +import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; +import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; +import com.yuandian.dataflow.statemachine.StateFactory; +import com.yuandian.dataflow.statemachine.annotations.MasterRegister; +import com.yuandian.dataflow.statemachine.closure.GenericClosure; +import com.yuandian.dataflow.statemachine.master.MasterContext; +import com.yuandian.dataflow.statemachine.master.MasterExecute; +import com.yuandian.dataflow.statemachine.operate.Operate; +import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; +import com.yuandian.dataflow.statemachine.state.State; +import com.yuandian.dataflow.statemachine.state.WorkerState; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@MasterRegister +public class MasterProcessor implements MasterExecute { + + private final int MAX_TASKS = 100; + + @Override + public void loop(MasterContext cxt) { + + var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers()); + log.debug("master({}) execute {}", StateFactory.getServerId(), + StateFactory.getRaftNode().listAlivePeers()); + if (alivePeers == null) { + return; + } + + // 读一致性 + StateFactory.readIndexState(new GenericClosure() { + + @Override + public void run(Status status) { + var state = this.getValue(); + // log.debug("masterExecute start {} {}", status, alivePeers); + alivePeers.forEach((peer) -> { + + if (state == null) { + log.error("readIndexState获取的状态为 {}", state); + return; + } + + WorkerState ws = state.getWorkers().get(peer); + if (ws == null) { + log.error("WorkerState获取的状态为 {}", ws); + return; + } + + var canTasks = MAX_TASKS - ws.getTaskQueueSize(); + log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks); + if (canTasks <= 0) { + return; + } + ws.setUpdateAt(Instant.now()); + ws.setTaskQueueSize(MAX_TASKS); + + // 模拟发送包的数据到该节点上 + var request = new PacketsRequest(); + for (int i = 0; i < canTasks; i++) { + var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow + .newBuilder() + .setTableId(10086) + .build()); + request.getPackets().add(p); + } + + // 先提交 节点的 剩余能处理的任务数量. 然后再处理 + Operate.CallOperate( + new Operate(OperateType.PUT_WORKERSTATE, ws), + new GenericClosure() { + @Override + public void run(Status status) { + log.info("PacketsRequest run {}", status); + try { + StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request, + new InvokeCallback() { + @Override + public void complete(Object result, Throwable err) { + if (err != null) { + // TODO: 如果错误, 需要让节点恢复任务处理的状态 + log.debug("{}", err); + } + log.debug("PacketsRequest: {}", result); + } + }, 5000); + } catch (InterruptedException | RemotingException e) { + log.info("error send packets {}", e.toString()); + } + } + }); + }); + } + + }); + } + +} diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index 71b0969..9ac1163 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -17,11 +17,11 @@ import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.google.protobuf.Any; import com.yuandian.dataflow.statemachine.StateFactory; +import com.yuandian.dataflow.statemachine.annotations.WorkerRegister; import com.yuandian.dataflow.statemachine.closure.GenericClosure; import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.state.State; import lombok.Getter; @@ -36,7 +36,7 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j -@ProcessorRaft +@WorkerRegister public class PacketsProcessor implements RpcProcessor { @Setter diff --git a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java index 3bb8f35..7f40148 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java @@ -7,7 +7,7 @@ import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.yuandian.dataflow.statemachine.StateFactory; -import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; +import com.yuandian.dataflow.statemachine.annotations.WorkerRegister; import lombok.Getter; import lombok.Setter; @@ -18,7 +18,7 @@ import lombok.extern.slf4j.Slf4j; * 例子 强制转换leader */ @Slf4j -@ProcessorRaft +@WorkerRegister public class TransferLeaderProcessor implements RpcProcessor { @Setter diff --git a/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java index 23731f0..8e4263a 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java @@ -18,6 +18,8 @@ import com.google.protobuf.Any; import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; import com.yuandian.dataflow.statemachine.closure.GenericClosure; +import com.yuandian.dataflow.statemachine.master.MasterContext; +import com.yuandian.dataflow.statemachine.master.MasterExecute; import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; import com.yuandian.dataflow.statemachine.state.State; @@ -40,113 +42,30 @@ import lombok.extern.slf4j.Slf4j; @Setter @ToString public class MasterFactory { - - public static final int MAX_TASKS = 100; - - public static class MasterContext { - AtomicBoolean isExit = new AtomicBoolean(false); - Object share; + + private static MasterExecute masterExecuteCls; + + public static void registerMasterLoop(MasterExecute masterExecute) { + if(masterExecuteCls != null) { + log.error("registerMasterLoop is registed"); + } + masterExecuteCls = masterExecute; } - - private static MasterContext cxt = new MasterContext(); - - - private static Consumer masterMainLoop; - - public static Thread masterExecute = new Thread(new Runnable() { + public static Thread masterExecuteThread = new Thread(new Runnable() { @Override public void run() { - try { - while (!cxt.isExit.get()) { - - masterMainLoop.accept(cxt); - - - var alivePeers = List.copyOf(StateFactory.getRaftNode().listAlivePeers()); - log.debug("master({}) execute {}", StateFactory.getServerId(), - StateFactory.getRaftNode().listAlivePeers()); - if (alivePeers != null) { - - // 读一致性 - StateFactory.readIndexState(new GenericClosure() { - - @Override - public void run(Status status) { - var state = this.getValue(); - // log.debug("masterExecute start {} {}", status, alivePeers); - alivePeers.forEach((peer) -> { - - if (state == null) { - log.error("readIndexState获取的状态为 {}", state); - return; - } - - WorkerState ws = state.getWorkers().get(peer); - if (ws == null) { - log.error("WorkerState获取的状态为 {}", ws); - return; - } - - var canTasks = MAX_TASKS - ws.getTaskQueueSize(); - log.info("剩余能处理的任务数量[{}] :{}", peer, canTasks); - if (canTasks <= 0) { - return; - } - ws.setUpdateAt(Instant.now()); - ws.setTaskQueueSize(MAX_TASKS); - - // 模拟发送包的数据到该节点上 - var request = new PacketsRequest(); - for (int i = 0; i < canTasks; i++) { - var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow - .newBuilder() - .setTableId(10086) - .build()); - request.getPackets().add(p); - } - - // 先提交 节点的 剩余能处理的任务数量. 然后再处理 - Operate.CallOperate( - new Operate(OperateType.PUT_WORKERSTATE, ws), - new GenericClosure() { - @Override - public void run(Status status) { - log.info("PacketsRequest run {}", status); - try { - StateFactory.rpcClientInvokeAsync(peer.getEndpoint(), request, - new InvokeCallback() { - @Override - public void complete(Object result, Throwable err) { - if (err != null) { - // TODO: 如果错误, 需要让节点恢复任务处理的状态 - log.debug("{}", err); - } - log.debug("PacketsRequest: {}", result); - } - }, 5000); - } catch (InterruptedException | RemotingException e) { - log.info("error send packets {}", e.toString()); - } - } - }); - }); - } - - }); - } - - Thread.sleep(2000); - } - } catch (InterruptedException e) { - log.info("{}", e.toString()); + + MasterContext cxt = new MasterContext(); + while (!cxt.getIsExit().get()) { + masterExecuteCls.loop(cxt); } - + } }); public static Thread getMasterExecute() { - return masterExecute; + return masterExecuteThread; } public static void Init() { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java index b57e15f..fc6a750 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateFactory.java @@ -46,11 +46,13 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.BytesUtil; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.ThreadPoolUtil; +import com.yuandian.dataflow.statemachine.annotations.MasterRegister; +import com.yuandian.dataflow.statemachine.annotations.WorkerRegister; import com.yuandian.dataflow.statemachine.closure.GenericClosure; +import com.yuandian.dataflow.statemachine.master.MasterExecute; import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.utils.Utils; @@ -188,7 +190,11 @@ public class StateFactory { var clsName = traces[traces.length - 1].getClassName(); var packName = clsName.substring(0, Utils.indexOfAsPossible(clsName, ".", 3)); log.info("获取 {} -> {} 下包的所有注解",clsName, packName ); - Set> scans = new Reflections(packName).getTypesAnnotatedWith(ProcessorRaft.class); + + var refl = new Reflections(packName); + Set> scans = refl.getTypesAnnotatedWith(WorkerRegister.class); + + scans.forEach((pRaftClass) -> { scansMap.put(pRaftClass.getName(), pRaftClass); }); @@ -202,6 +208,18 @@ public class StateFactory { log.info("{} {}", name, e.toString()); } }); + + refl.getTypesAnnotatedWith(MasterRegister.class).forEach((pClass)->{ + try { + MasterExecute execute = (MasterExecute)pClass.getDeclaredConstructor().newInstance(); + MasterFactory.registerMasterLoop(execute); + + } catch (InstantiationException | IllegalAccessException | IllegalArgumentException + | InvocationTargetException | NoSuchMethodException | SecurityException e) { + log.info("{}", e.toString()); + + } + });; // 启动集群 node = cluster.start(); @@ -220,10 +238,15 @@ public class StateFactory { public void run(Status status, long index, byte[] reqCtx) { log.debug("readIndexState({}) {}", getServerId(), status); if (status.isOk()) { - // 回调失败 + closure.success(ss.fsm.getState()); closure.setValue(ss.fsm.getState()); + } else { + + // 提交同步 } + + // 回调失败 closure.run(status); } }); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 2499b34..9220fa9 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -90,7 +90,6 @@ public class StateMachine extends StateMachineAdapter { switch (op.getType()) { case PUT_WORKERSTATE: - WorkerState ws = op.getValue(); log.debug("PUT {}", ws.peerId); state.getWorkers().put(ws.peerId, ws); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/Master.java b/src/main/java/com/yuandian/dataflow/statemachine/annotations/MasterRegister.java similarity index 82% rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/Master.java rename to src/main/java/com/yuandian/dataflow/statemachine/annotations/MasterRegister.java index 200a627..968b73c 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/Master.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/annotations/MasterRegister.java @@ -4,7 +4,7 @@ * @author eson *2022年7月21日-14:27:49 */ -package com.yuandian.dataflow.statemachine.rpc.annotations; +package com.yuandian.dataflow.statemachine.annotations; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -19,6 +19,6 @@ import java.lang.annotation.Target; */ @Retention(RetentionPolicy.CLASS) @Target(ElementType.TYPE) -public @interface Master { +public @interface MasterRegister { public int order() default 0 ; } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/ProcessorRaft.java b/src/main/java/com/yuandian/dataflow/statemachine/annotations/WorkerRegister.java similarity index 80% rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/ProcessorRaft.java rename to src/main/java/com/yuandian/dataflow/statemachine/annotations/WorkerRegister.java index a28677b..e579111 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/annotations/ProcessorRaft.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/annotations/WorkerRegister.java @@ -4,7 +4,7 @@ * @author eson *2022年7月21日-14:27:49 */ -package com.yuandian.dataflow.statemachine.rpc.annotations; +package com.yuandian.dataflow.statemachine.annotations; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -19,5 +19,5 @@ import java.lang.annotation.Target; */ @Retention(RetentionPolicy.CLASS) @Target(ElementType.TYPE) -public @interface ProcessorRaft { +public @interface WorkerRegister { } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java new file mode 100644 index 0000000..f510a6e --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterContext.java @@ -0,0 +1,15 @@ +package com.yuandian.dataflow.statemachine.master; + +import java.util.concurrent.atomic.AtomicBoolean; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Getter +@Setter +public class MasterContext { + private AtomicBoolean isExit = new AtomicBoolean(false); + private Object share; +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/master/MasterExecute.java b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterExecute.java new file mode 100644 index 0000000..d1542eb --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterExecute.java @@ -0,0 +1,8 @@ +package com.yuandian.dataflow.statemachine.master; + +/** + * Master的主线程循环 + */ +public interface MasterExecute { + void loop(MasterContext cxt); +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java index 93ed2e1..f440658 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java @@ -13,9 +13,9 @@ import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.yuandian.dataflow.statemachine.StateFactory; +import com.yuandian.dataflow.statemachine.annotations.WorkerRegister; import com.yuandian.dataflow.statemachine.closure.GenericClosure; import com.yuandian.dataflow.statemachine.operate.Operate; -import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import javassist.ClassPath; @@ -31,7 +31,7 @@ import lombok.extern.slf4j.Slf4j; *2022年7月12日-11:10:54 */ @Slf4j -@ProcessorRaft +@WorkerRegister public class OperateProcessor implements RpcProcessor { /**