diff --git a/.vscode/launch.json b/.vscode/launch.json index db80756..979edf3 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -10,9 +10,10 @@ "request": "launch", "mainClass": "com.yuandian.dataflow.Server", "projectName": "dataflow", - "args": "2", + "args": ["2"], "preLaunchTask": "restart", "postDebugTask": "stopall", + } ] } \ No newline at end of file diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 7566731..384c8de 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -5,9 +5,10 @@ "label": "restart", "type": "shell", "command": "sh restart.sh", + "isBackground": true, "presentation": { "echo": true, - "reveal": "always", + "reveal": "silent", "focus": false, "panel": "shared", "showReuseMessage": true, diff --git a/restart.sh b/restart.sh index a51605a..39698a2 100755 --- a/restart.sh +++ b/restart.sh @@ -1,2 +1,3 @@ #! /bin/bash -sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0 && sh start.sh + +sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0 && sh start.sh diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 86232a4..b3556a2 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -22,8 +22,11 @@ public class Server { public static String peeridstr; public static String sprPort; public static Configuration conf ; + public static void main(String[] args) throws Exception { + + String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; String[] sprPeers = new String[]{"3440","3441","3442"}; @@ -39,6 +42,8 @@ public class Server { conf = JRaftUtils.getConfiguration(String.join(",", peers)); StateServerFactory.startStateServer(peeridstr, conf); + + // System.setProperty("server.port", sprPort); // ConfigurableApplicationContext app = SpringApplication.run(Server.class, args); // StateServerFactory.setAppCxt(app); diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index 2c40164..6afee1d 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -70,7 +70,6 @@ public class PacketsProcessor implements RpcProcessor { + + @Setter + @Getter + public static class LeaderRequest implements Serializable { + PeerId peer; + } + + @Override + public void handleRequest(RpcContext rpcCtx, LeaderRequest request) { + Status status = StateServerFactory.getCluster().getRaftNode().transferLeadershipTo(request.peer); + rpcCtx.sendResponse(status); + log.debug("[TransferLeader] {} change leader to {}", status, request.peer); + } + + @Override + public String interest() { + return LeaderRequest.class.getName(); + } + +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 28fef8e..b854fdc 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -14,8 +14,11 @@ import com.alipay.sofa.jraft.core.StateMachineAdapter; import com.alipay.sofa.jraft.entity.LeaderChangeContext; import com.alipay.sofa.jraft.error.RaftException; import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import com.yuandian.dataflow.Server; +import com.yuandian.dataflow.controller.TransferLeaderProcessor; import com.yuandian.dataflow.statemachine.closure.GenericClosure; import com.yuandian.dataflow.statemachine.operate.Operate; import com.yuandian.dataflow.statemachine.operate.Operate.OperateType; @@ -173,13 +176,12 @@ public class StateMachine extends StateMachineAdapter { public void onLeaderStop(final Status status) { log.debug("onLeaderStop {}", StateServerFactory.getCluster().getServerId()); this.leaderTerm.set(-1); - super.onLeaderStop(status); - // 判断是否Master线程还在跑, 如果存在则中断 if (MasterFactory.getMasterExecute().isAlive()) { MasterFactory.getMasterExecute().interrupt(); } + super.onLeaderStop(status); } @Override @@ -198,18 +200,19 @@ public class StateMachine extends StateMachineAdapter { MasterFactory.getMasterExecute().interrupt(); } + var ws = new WorkerState(StateServerFactory.getServerId()); log.debug("my: {} leader id {}", StateServerFactory.getServerId(), StateServerFactory.getLeaderId()); var op = new Operate(OperateType.PUT, ws); - - Operate.CallOperate(op, new GenericClosure() { + Operate.CallOperate(op, new GenericClosure() { @Override public void run(Status status) { log.info("{} {}", status, this.getResponse()); } }); - + + return; } catch (Exception e) { log.info("{}", e.toString()); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java index 4ad2efd..4c67936 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java @@ -67,8 +67,19 @@ public class OperateProcessor implements RpcProcessor closure = new GenericClosure() { @Override public void run(Status status) { - rpcCtx.sendResponse(getResponse()); - log.info("{}", status); + + if(status.isOk()) { + log.info("{}", status); + rpcCtx.sendResponse(getResponse()); + return; + } + + + if(status.getRaftError() == RaftError.EPERM) { + //TODO: Not leader 需要转发 + log.info("{}", status); + } + } }; diff --git a/start.sh b/start.sh index 3746270..51c666e 100755 --- a/start.sh +++ b/start.sh @@ -1,19 +1,20 @@ #! /bin/bash screen -S raft-0 -X quit screen -S raft-1 -X quit -screen -S raft-2 -X quit +# screen -S raft-2 -X quit + + +sleep 5s + VERSION=1.0.0-SNAPSHOT - -sleep 1 - screen -dmS raft-0 -L java -jar target/dataflow-${VERSION}.jar 0 screen -dmS raft-1 -L java -jar target/dataflow-${VERSION}.jar 1 # screen -dmS raft-2 -L java -jar target/dataflow-${VERSION}.jar 2 sleep 1 -screen -S raft-0 -X logfile flush 0 -screen -S raft-1 -X logfile flush 0 +screen -S raft-0 -X logfile flush 0 +screen -S raft-1 -X logfile flush 0 # screen -S raft-2 -X logfile flush 0 \ No newline at end of file