From 7795362d5a1b42eb8bc73fa5a3e45cf61fc4ced2 Mon Sep 17 00:00:00 2001 From: huangsimin Date: Wed, 27 Jul 2022 23:37:40 +0800 Subject: [PATCH] =?UTF-8?q?TODO:=20=E8=A7=A3=E5=86=B3=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E4=B8=8D=E5=88=B0=E8=BE=BE=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/tasks.json | 16 +++-- pom.xml | 6 +- restart.sh | 3 +- .../dataflow/controller/PacketsProcessor.java | 64 +++++++++---------- .../java/com/yuandian/dataflow/projo/Doc.java | 5 +- .../dataflow/statemachine/MasterFactory.java | 16 +++-- .../dataflow/statemachine/StateMachine.java | 1 - .../statemachine/StateServerFactory.java | 31 ++++----- .../statemachine/operate/Operate.java | 2 +- .../statemachine/rpc/OperateProcessor.java | 3 +- .../statemachine/rpc/RaftResponse.java | 1 - .../java/com/yuandian/dataflow/AppTest.java | 1 + .../statemachine/StateMachineTest.java | 2 +- start.sh | 7 +- 14 files changed, 72 insertions(+), 86 deletions(-) diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 384c8de..029d8f2 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -5,24 +5,30 @@ "label": "restart", "type": "shell", "command": "sh restart.sh", - "isBackground": true, + "isBackground": false, "presentation": { "echo": true, - "reveal": "silent", + "reveal": "always", "focus": false, - "panel": "shared", + "panel": "new", "showReuseMessage": true, "clear": false, "close": true - } + + }, }, { "label": "stopall", "type": "shell", "command": "sh stop.sh", "presentation": { + "echo": true, + "reveal": "always", + "focus": false, + "panel": "shared", "close": true - } + }, + } ] } \ No newline at end of file diff --git a/pom.xml b/pom.xml index 480ac89..2d4aaf4 100644 --- a/pom.xml +++ b/pom.xml @@ -11,9 +11,9 @@ UTF-8 - 8 - 8 - 8 + 11 + 11 + 11 3.20.1 1.7.4 diff --git a/restart.sh b/restart.sh index 39698a2..8e0f02d 100755 --- a/restart.sh +++ b/restart.sh @@ -1,3 +1,4 @@ #! /bin/bash -sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0 && sh start.sh +sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0 +sh start.sh diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index 6afee1d..6c1b3fb 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -23,7 +23,7 @@ import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor; - + import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft; import com.yuandian.dataflow.statemachine.state.State; @@ -36,7 +36,7 @@ import lombok.extern.slf4j.Slf4j; * description * * @author eson - *2022年7月21日-13:48:01 + * 2022年7月21日-13:48:01 */ @Slf4j @@ -51,45 +51,39 @@ public class PacketsProcessor implements RpcProcessor(); + resp.setSuccess(true); + rpcCtx.sendResponse(resp); - var resp = new RaftResponse<>(); - resp.setSuccess(true); - rpcCtx.sendResponse(resp); - - var ss = StateServerFactory.getStateServer(); - log.debug("{} handler request.packets.size(): {}", StateServerFactory.getServerId(), request.packets.size()); + var ss = StateServerFactory.getStateServer(); + log.debug("{} handler request.packets.size(): {}", StateServerFactory.getServerId(), request.packets.size()); - - ss.readIndexState( new GenericClosure() { + ss.readIndexState(new GenericClosure() { - @Override - public void run(Status status) { - log.debug("status {}", status); - if(status.isOk()) { - var state = this.getValue(); - var ws = state.getWorkers().get(StateServerFactory.getServerId()); + @Override + public void run(Status status) { + log.debug("status {}", status); + if (status.isOk()) { + var state = this.getValue(); + var ws = state.getWorkers().get(StateServerFactory.getServerId()); - ws.setTaskQueueSize( ws.getTaskQueueSize() - request.packets.size()); - ws.setUpdateAt(Instant.now()); - - log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), request.packets.size(), state.getWorkers().size()); - - Operate.CallOperate(new Operate(OperateType.PUT,ws), new GenericClosure() { - @Override - public void run(Status status) { - if(status.isOk()) { - log.info("{}", resp); - } + ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); + ws.setUpdateAt(Instant.now()); + + log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), + request.packets.size(), state.getWorkers().size()); + Operate.CallOperate(new Operate(OperateType.PUT, ws), new GenericClosure() { + @Override + public void run(Status status) { + if (status.isOk()) { + log.info("{}", resp); } - }); - } - + } + }); } - - } ); - - + } + }); } @Override diff --git a/src/main/java/com/yuandian/dataflow/projo/Doc.java b/src/main/java/com/yuandian/dataflow/projo/Doc.java index 3d073fd..6ee2faa 100644 --- a/src/main/java/com/yuandian/dataflow/projo/Doc.java +++ b/src/main/java/com/yuandian/dataflow/projo/Doc.java @@ -52,10 +52,7 @@ public final class Doc { @BsonProperty("responseIp") public int responseIp ; - - - - + public static void main(String[] args) { MongoClient mgo = MongoClients.create("mongodb://yuandian:yuandian123@192.168.1.113:27017"); CodecProvider pojoCodecProvider = PojoCodecProvider.builder().register("com.yuandian.dataflow.projo").build(); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java index 9263085..67725eb 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java @@ -62,16 +62,16 @@ public class MasterFactory { WorkerState ws = state.getWorkers().get(peer); if (ws != null) { // - var canDealTasks = MAX_TASKS - ws.getTaskQueueSize(); - log.debug("cap :{} peer: {}", canDealTasks, peer); - if (canDealTasks <= 0) { + var canTasks = MAX_TASKS - ws.getTaskQueueSize(); + log.debug("cap :{} peer: {}", canTasks, peer); + if (canTasks <= 0) { return; } ws.setUpdateAt(Instant.now()); ws.setTaskQueueSize(MAX_TASKS); var request = new PacketsRequest(); - for (int i = 0; i < canDealTasks; i++) { + for (int i = 0; i < canTasks; i++) { var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow .newBuilder() .setTableId(10086) @@ -82,14 +82,16 @@ public class MasterFactory { Operate.CallOperate(new Operate(OperateType.PUT, ws), new GenericClosure() { @Override public void run(Status status) { - - log.info("{}", status); + log.info("PacketsRequest run {}", status); try { ss.getRpcClient().invokeAsync(peer.getEndpoint(), request, new InvokeCallback() { @Override public void complete(Object result, Throwable err) { - log.info("PacketsRequest: {}", result); + if(err != null) { + log.debug("{}", err); + } + log.debug("PacketsRequest: {}", result); } }, 5000); } catch (InterruptedException | RemotingException e) { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index b854fdc..0270fba 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -211,7 +211,6 @@ public class StateMachine extends StateMachineAdapter { log.info("{} {}", status, this.getResponse()); } }); - return; } catch (Exception e) { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 7521b7e..caccbe2 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -7,6 +7,7 @@ package com.yuandian.dataflow.statemachine; import java.io.File; +import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import java.util.Set; import java.util.concurrent.Executor; @@ -141,7 +142,7 @@ public class StateServerFactory { nodeOptions.setInitialConf(conf); File RaftDataFile = new File(String.format("./raftdata/%d", port) ); - log.info("{}",RaftDataFile.mkdirs()); + log.info("mkdirs: {}",RaftDataFile.mkdirs()); nodeOptions.setLogUri( String.format("./raftdata/%d/logs", port) ); nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port)); @@ -150,18 +151,14 @@ public class StateServerFactory { nodeOptions.setFsm(fsm); cluster = new RaftGroupService(groupId, serverId, nodeOptions); - - - + Set> scans = new Reflections("com.yuandian.dataflow").getTypesAnnotatedWith(ProcessorRaft.class); scans.forEach((pRaftClass)->{ try { - cluster.getRpcServer().registerProcessor((RpcProcessor) pRaftClass.newInstance()); - } catch (InstantiationException e) { + cluster.getRpcServer().registerProcessor((RpcProcessor) pRaftClass.getDeclaredConstructor().newInstance()); + } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e) { log.info("{}", e.toString()); - } catch (IllegalAccessException e) { - log.info("{}", e.toString()); - } + } }); node = cluster.start(); @@ -219,11 +216,9 @@ public class StateServerFactory { closure.run(new Status(RaftError.EINTERNAL, errorMsg)); } } - - - - public RaftResponse redirect() { - final RaftResponse response = new RaftResponse(); + + public RaftResponse redirect() { + final RaftResponse response = new RaftResponse(); response.setSuccess(false); if (this.node != null) { final PeerId leader = this.node.getLeaderId(); @@ -234,16 +229,12 @@ public class StateServerFactory { return response; } - public void handlerNotLeaderError(final GenericClosure closure) { + public void handlerNotLeaderError(final GenericClosure closure) { closure.failure("Not leader.", redirect().getRedirect()); closure.run(new Status(RaftError.EPERM, "Not leader")); } - - + private Executor createReadIndexExecutor() { - // final StoreEngineOptions opts = new StoreEngineOptions(); - // return StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads()); - return ThreadPoolUtil.newBuilder() // .poolName("ReadIndexPool") // .enableMetric(true) // diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java index df2cf67..3b822af 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java @@ -71,7 +71,7 @@ public class Operate implements Serializable { @Override public void complete(Object result, Throwable err) { - log.info("{}", result); + log.info("Object result {}", result); //TODO: 解决回调的次序问题 var resp = (RaftResponse) result; closure.setResponse(resp); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java index 4c67936..86aae22 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java @@ -73,8 +73,7 @@ public class OperateProcessor implements RpcProcessor implements Serializable { private T value; - private boolean success; /** * redirect peer id diff --git a/src/test/java/com/yuandian/dataflow/AppTest.java b/src/test/java/com/yuandian/dataflow/AppTest.java index 88cdb2f..e5ba148 100644 --- a/src/test/java/com/yuandian/dataflow/AppTest.java +++ b/src/test/java/com/yuandian/dataflow/AppTest.java @@ -40,6 +40,7 @@ import lombok.extern.slf4j.Slf4j; */ @DisplayName("AppTest") @Slf4j +@var public class AppTest { diff --git a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java index 5e117a0..4bb07f5 100644 --- a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java +++ b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java @@ -13,7 +13,7 @@ import com.yuandian.dataflow.statemachine.rpc.RaftResponse; import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest; import com.yuandian.dataflow.statemachine.state.State; -import lombok.var; + import lombok.extern.slf4j.Slf4j; @Slf4j diff --git a/start.sh b/start.sh index 51c666e..0ffea46 100755 --- a/start.sh +++ b/start.sh @@ -3,17 +3,14 @@ screen -S raft-0 -X quit screen -S raft-1 -X quit # screen -S raft-2 -X quit - - - -sleep 5s +sleep 1s VERSION=1.0.0-SNAPSHOT screen -dmS raft-0 -L java -jar target/dataflow-${VERSION}.jar 0 screen -dmS raft-1 -L java -jar target/dataflow-${VERSION}.jar 1 # screen -dmS raft-2 -L java -jar target/dataflow-${VERSION}.jar 2 -sleep 1 +sleep 0.5s screen -S raft-0 -X logfile flush 0 screen -S raft-1 -X logfile flush 0