From 493f3d4907d4b45d9297a3895f4ec6b39e0dcf59 Mon Sep 17 00:00:00 2001 From: huangsimin Date: Mon, 18 Jul 2022 03:07:52 +0800 Subject: [PATCH] TODO --- pom.xml | 4 +- .../java/com/yuandian/dataflow/Server.java | 26 +--- .../yuandian/dataflow/controller/TaskLog.java | 10 +- .../java/com/yuandian/dataflow/projo/Doc.java | 2 +- .../dataflow/statemachine/StateMachine.java | 8 +- .../statemachine/StateServerFactory.java | 114 ++++++++++-------- ...{SyncDataClosure.java => SyncClosure.java} | 17 ++- .../statemachine/rpc/RequestCondition.java | 21 ++++ ...SyncDataRequest.java => RequestState.java} | 5 +- .../rpc/{SMResponse.java => ResponseSM.java} | 2 +- .../rpc/SyncConditionProcessor.java | 59 +++++++++ ...Processor.java => SyncStateProcessor.java} | 13 +- .../statemachine/StateMachineTest.java | 6 +- start.sh | 10 +- 14 files changed, 188 insertions(+), 109 deletions(-) rename src/main/java/com/yuandian/dataflow/statemachine/{SyncDataClosure.java => SyncClosure.java} (70%) create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/rpc/RequestCondition.java rename src/main/java/com/yuandian/dataflow/statemachine/rpc/{SyncDataRequest.java => RequestState.java} (85%) rename src/main/java/com/yuandian/dataflow/statemachine/rpc/{SMResponse.java => ResponseSM.java} (93%) create mode 100644 src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncConditionProcessor.java rename src/main/java/com/yuandian/dataflow/statemachine/rpc/{SyncDataProcessor.java => SyncStateProcessor.java} (77%) diff --git a/pom.xml b/pom.xml index fc86a3e..384412f 100644 --- a/pom.xml +++ b/pom.xml @@ -192,7 +192,7 @@ 1.6.2 - + org.springframework.boot spring-boot-maven-plugin diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index e1fbba3..7702caa 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -15,10 +15,10 @@ import com.alipay.sofa.jraft.RaftGroupService; import com.alipay.sofa.jraft.conf.Configuration; import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.option.NodeOptions; -import com.yuandian.dataflow.statemachine.SyncDataClosure; +import com.yuandian.dataflow.statemachine.SyncClosure; import com.yuandian.dataflow.statemachine.StateMachine; import com.yuandian.dataflow.statemachine.StateServerFactory; -import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor; +import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor; import lombok.var; import lombok.extern.slf4j.Slf4j; @@ -54,29 +54,7 @@ public class Server { Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); StateServerFactory.InitStateServer(peeridstr, conf); - - // Thread printer = new Thread( new Runnable(){ - - // @Override - // public void run() { - // // TODO Auto-generated method stub - // while(true) { - // var state = StateServerFactory.getStateServer().getFsm().getState(); - // log.info("{}", state); - // try { - // Thread.sleep(1000); - // } catch (InterruptedException e) { - // // TODO Auto-generated catch block - // e.printStackTrace(); - // } - // } - // } - - // } ); - // printer.start(); - - System.setProperty("server.port", sprPort); var app = SpringApplication.run(Server.class, args); app.start(); diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index 59ddb33..443ab54 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -9,7 +9,7 @@ import org.springframework.web.bind.annotation.RequestParam; import com.alipay.sofa.jraft.Status; import com.yuandian.dataflow.projo.Response; import com.yuandian.dataflow.statemachine.StateServerFactory; -import com.yuandian.dataflow.statemachine.SyncDataClosure; +import com.yuandian.dataflow.statemachine.SyncClosure; import com.yuandian.dataflow.statemachine.state.State; import lombok.var; @@ -24,17 +24,17 @@ public class TaskLog { @GetMapping(path = "/test") public ResponseEntity Processing() throws InterruptedException { - SyncDataClosure closure = new SyncDataClosure() { + SyncClosure closure = new SyncClosure() { @Override public void run(Status status) { - log.info(getState().toString()); + log.error("{} {}",getValue().toString(), getValue().getWorkers().size()); } }; var state = new State(); - closure.setState(state); + closure.setValue(state); - log.info(StateServerFactory.getStateServer().getFsm().getState().toString() ); + log.error(StateServerFactory.getStateServer().getFsm().getState().toString() ); // state.getWorker().setPeerId(StateServerFactory.getStateServer().getNode().getNodeId().getPeerId()); // state.getWorker().setTaskQueueSize(1); diff --git a/src/main/java/com/yuandian/dataflow/projo/Doc.java b/src/main/java/com/yuandian/dataflow/projo/Doc.java index 8e56325..502695f 100644 --- a/src/main/java/com/yuandian/dataflow/projo/Doc.java +++ b/src/main/java/com/yuandian/dataflow/projo/Doc.java @@ -19,7 +19,7 @@ import lombok.Setter; -@BsonDiscriminator + @Getter @Setter public final class Doc extends Document { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 9fc88e3..1741024 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -24,8 +24,8 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.Utils; -import com.yuandian.dataflow.statemachine.rpc.SMResponse; -import com.yuandian.dataflow.statemachine.rpc.SyncDataRequest; +import com.yuandian.dataflow.statemachine.rpc.ResponseSM; +import com.yuandian.dataflow.statemachine.rpc.RequestState; import com.yuandian.dataflow.statemachine.state.State; import lombok.var; @@ -70,8 +70,8 @@ public class StateMachine extends StateMachineAdapter { if (iter.done() != null) { // This task is applied by this node, get value from closure to avoid additional // parsing. - var closure = (SyncDataClosure)iter.done(); - state = closure.getState(); + var closure = (SyncClosure)iter.done(); + state = closure.getValue(); log.info("SyncDataClosure(done) taskState:{} leaderTerm:{}",state, this.leaderTerm); closure.success(state); closure.run(Status.OK()); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index a811f2a..2945937 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -14,6 +14,7 @@ import java.util.concurrent.ThreadPoolExecutor; import com.alipay.remoting.NamedThreadFactory; import com.alipay.remoting.exception.CodecException; + import com.alipay.remoting.serialization.SerializerManager; import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.Node; @@ -27,13 +28,15 @@ import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.option.CliOptions; import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rpc.RpcClient; import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.BytesUtil; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.ThreadPoolUtil; -import com.yuandian.dataflow.statemachine.rpc.SMResponse; -import com.yuandian.dataflow.statemachine.rpc.SyncDataRequest; -import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor; +import com.yuandian.dataflow.statemachine.rpc.ResponseSM; +import com.yuandian.dataflow.statemachine.rpc.RequestCondition; +import com.yuandian.dataflow.statemachine.rpc.RequestState; +import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; @@ -53,33 +56,23 @@ import lombok.extern.slf4j.Slf4j; public class StateServerFactory { private static StateServer ss; - // 必须初始化 + public static void InitStateServer(String peerstr, Configuration conf) throws Exception { if(ss != null) { throw new Exception("重复初始化 InitStateServer"); } ss = new StateServerFactory.StateServer(peerstr, conf); + log.error("init peerid {}", ss.node.getNodeId().getPeerId()); + + ss.getNode().join(); + var request = new RequestCondition(); + request.setWorkerState( new WorkerState() ); + request.getWorkerState().setPeerId( ss.cluster.getServerId() ); + log.error("{}", ss.getNode().getNodeMetrics() ); + - - ss.readIndexState(true, new SyncDataClosure() { - @Override - public void run(Status status) { - log.info("add peerid {}", getState()); - var wstate = getState().getWorkers().get(ss.node.getNodeId().getPeerId()); - if(wstate == null) { - wstate = new WorkerState(); - getState().getWorkers().put(ss.node.getNodeId().getPeerId(), wstate); - log.info("update: {}", getState()); - ss.applyState(getState(), new SyncDataClosure() { - @Override - public void run(Status status) { - log.info("{} add workers", ss.node.getNodeId().getPeerId()); - } - } ); - } - } - - }); + ResponseSM resp = (ResponseSM)ss.getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000); + log.info("{}", resp); } // 获取状态服务的对象 @@ -90,6 +83,10 @@ public class StateServerFactory { @Getter @Setter public static class StateServer { + + RpcClient rpcClient; + + private Node node; private RaftGroupService cluster; @@ -127,31 +124,29 @@ public class StateServerFactory { nodeOptions.setFsm(fsm); cluster = new RaftGroupService(groupId, serverId, nodeOptions); - cluster.getRpcServer().registerProcessor(new SyncDataProcessor()); + cluster.getRpcServer().registerProcessor(new SyncStateProcessor()); node = cluster.start(); - if(node.isLeader()) { - - } - + rpcClient = new BoltRaftRpcFactory().createRpcClient(); + rpcClient.init(new CliOptions()); } public boolean isLeader() { return this.fsm.isLeader(); } - public State getState() { + public State getFSMState() { return this.fsm.getState(); } - public void applyState(State state, SyncDataClosure closure) { + public void applyState(State state, SyncClosure closure) { if (!ss.isLeader()) { ss.handlerNotLeaderError(closure); return; } try { - closure.setState(state); + closure.setValue(state); final Task task = new Task(); task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(state))); task.setDone(closure); @@ -164,31 +159,52 @@ public class StateServerFactory { } } - public void readIndexState(final boolean readOnlySafe, final SyncDataClosure closure) { - - if(!readOnlySafe){ - closure.setState(getState()); - closure.success(getState()); - closure.run(Status.OK()); + public void applyWorkerState(WorkerState state, SyncClosure closure) { + if (!ss.isLeader()) { + ss.handlerNotLeaderError(closure); return; } + var wmap = getFSMState().getWorkers(); + var wstate = wmap.get(ss.node.getNodeId().getPeerId()); + if(wstate == null) { + wstate = new WorkerState(); + wmap.put(ss.node.getNodeId().getPeerId(), wstate); + log.error("update: {}", wmap.size()); + ss.applyState(getFSMState(), new SyncClosure() { + @Override + public void run(Status status) { + log.error("{} add workers", ss.node.getNodeId().getPeerId()); + } + } ); + } + } + + public void readIndexState(final boolean readOnlySafe, final SyncClosure closure) { + + closure.setValue(getFSMState()); + if(!readOnlySafe){ + + closure.success(getFSMState()); + closure.run(Status.OK()); + return; + } + getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { @Override public void run(Status status, long index, byte[] reqCtx) { if(status.isOk()){ - log.info("readIndex {}", getState()); - closure.setState(getState()); - closure.success(getState()); + log.error("readIndex {}", getFSMState()); + closure.success(getFSMState()); closure.run(Status.OK()); return; } readIndexExecutor.execute(() -> { if(isLeader()){ - log.info("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); - applyState(getState(), closure); + log.error("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); + applyState(getFSMState(), closure); }else { handlerNotLeaderError(closure); } @@ -197,8 +213,8 @@ public class StateServerFactory { }); } - public SMResponse redirect() { - final SMResponse response = new SMResponse(); + public ResponseSM redirect() { + final ResponseSM response = new ResponseSM(); response.setSuccess(false); if (this.node != null) { final PeerId leader = this.node.getLeaderId(); @@ -209,7 +225,7 @@ public class StateServerFactory { return response; } - public void handlerNotLeaderError(final SyncDataClosure closure) { + public void handlerNotLeaderError(final SyncClosure closure) { closure.failure("Not leader.", redirect().getRedirect()); closure.run(new Status(RaftError.EPERM, "Not leader")); } @@ -220,13 +236,13 @@ public class StateServerFactory { // return StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads()); return ThreadPoolUtil.newBuilder() // - .poolName("CounterPool") // + .poolName("ReadIndexPool") // .enableMetric(true) // .coreThreads(4) // .maximumThreads(4) // .keepAliveSeconds(60L) // .workQueue(new SynchronousQueue<>()) // - .threadFactory(new NamedThreadFactory("CounterService", true)) // + .threadFactory(new NamedThreadFactory("ReadIndexService", true)) // .rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) // .build(); } @@ -235,7 +251,7 @@ public class StateServerFactory { public static void main(String[] args) throws InterruptedException, RemotingException { var rpcClient = new BoltRaftRpcFactory().createRpcClient(); rpcClient.init(new CliOptions()); - var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new SyncDataRequest(), 5000); + var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new RequestState(), 5000); log.info("{}", resp); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java similarity index 70% rename from src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java rename to src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java index 3d0b891..f19f155 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/SyncDataClosure.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/SyncClosure.java @@ -3,7 +3,7 @@ package com.yuandian.dataflow.statemachine; import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.entity.PeerId; -import com.yuandian.dataflow.statemachine.rpc.SMResponse; +import com.yuandian.dataflow.statemachine.rpc.ResponseSM; import com.yuandian.dataflow.statemachine.state.State; import lombok.Getter; @@ -18,21 +18,20 @@ import org.slf4j.LoggerFactory; @Getter @Setter @ToString -public abstract class SyncDataClosure implements Closure { +public abstract class SyncClosure implements Closure { // 状态机的统一响应 - private SMResponse response; + private ResponseSM response; // 代表任务状态 - private State state; + private T value; - public Object locksync = new Object(); - - public SyncDataClosure() { + + public SyncClosure() { } public void failure(final String errorMsg, final PeerId redirect) { - final SMResponse response = new SMResponse(); + final ResponseSM response = new ResponseSM(); response.setSuccess(false); response.setMsg(errorMsg); response.setRedirect(redirect); @@ -40,7 +39,7 @@ public abstract class SyncDataClosure implements Closure { } public void success(final State value) { - final SMResponse response = new SMResponse(); + final ResponseSM response = new ResponseSM(); response.setState(value); response.setSuccess(true); setResponse(response); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/RequestCondition.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/RequestCondition.java new file mode 100644 index 0000000..2c67e0c --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/RequestCondition.java @@ -0,0 +1,21 @@ +package com.yuandian.dataflow.statemachine.rpc; + +import java.io.Serializable; + +import com.yuandian.dataflow.statemachine.state.WorkerState; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Getter +@Setter +@ToString +public class RequestCondition implements Serializable { + + private static final long serialVersionUID = 1L; + + private WorkerState workerState; +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataRequest.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/RequestState.java similarity index 85% rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataRequest.java rename to src/main/java/com/yuandian/dataflow/statemachine/rpc/RequestState.java index 70e7228..f9fb594 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataRequest.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/RequestState.java @@ -9,6 +9,7 @@ package com.yuandian.dataflow.statemachine.rpc; import java.io.Serializable; import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.entity.PeerId; import com.yuandian.dataflow.statemachine.state.State; import lombok.Getter; @@ -26,9 +27,9 @@ import lombok.extern.slf4j.Slf4j; @Getter @Setter @ToString -public class SyncDataRequest implements Serializable { +public class RequestState implements Serializable { private static final long serialVersionUID = 1L; + private State state; - } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/ResponseSM.java similarity index 93% rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java rename to src/main/java/com/yuandian/dataflow/statemachine/rpc/ResponseSM.java index 8e28092..5e4694f 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SMResponse.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/ResponseSM.java @@ -26,7 +26,7 @@ import lombok.extern.slf4j.Slf4j; @Getter @Setter @ToString -public class SMResponse implements Serializable { +public class ResponseSM implements Serializable { private static final long serialVersionUID = 1L; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncConditionProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncConditionProcessor.java new file mode 100644 index 0000000..89ca91d --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncConditionProcessor.java @@ -0,0 +1,59 @@ +/** + * description + * + * @author eson + *2022年7月12日-11:10:54 + */ +package com.yuandian.dataflow.statemachine.rpc; + +import java.nio.ByteBuffer; + +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.SerializerManager; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.entity.Task; +import com.alipay.sofa.jraft.error.RaftError; +import com.alipay.sofa.jraft.rpc.RpcContext; +import com.alipay.sofa.jraft.rpc.RpcProcessor; +import com.yuandian.dataflow.statemachine.StateServerFactory; +import com.yuandian.dataflow.statemachine.SyncClosure; +import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer; +import com.alipay.sofa.jraft.entity.PeerId; + +import org.apache.commons.lang.StringUtils; + +import lombok.var; +import lombok.extern.slf4j.Slf4j; + +/** + * description + * + * @author eson + *2022年7月12日-11:10:54 + */ +@Slf4j +public class SyncConditionProcessor implements RpcProcessor { + + @Override + public void handleRequest(RpcContext rpcCtx, RequestCondition request) { + + log.info("request: {}", request); + final SyncClosure closure = new SyncClosure() { + @Override + public void run(Status status) { + rpcCtx.sendResponse(getResponse()); + log.info("{}", status); + } + }; + + StateServerFactory.getStateServer().applyWorkerState(request.getWorkerState(), closure); + } + + @Override + public String interest() { + return RequestState.class.getName(); + } + + + +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncStateProcessor.java similarity index 77% rename from src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java rename to src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncStateProcessor.java index 0e1f4ae..ade1afd 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncDataProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncStateProcessor.java @@ -16,8 +16,9 @@ import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.yuandian.dataflow.statemachine.StateServerFactory; -import com.yuandian.dataflow.statemachine.SyncDataClosure; +import com.yuandian.dataflow.statemachine.SyncClosure; import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer; +import com.yuandian.dataflow.statemachine.state.State; import com.alipay.sofa.jraft.entity.PeerId; import org.apache.commons.lang.StringUtils; @@ -32,14 +33,14 @@ import lombok.extern.slf4j.Slf4j; *2022年7月12日-11:10:54 */ @Slf4j -public class SyncDataProcessor implements RpcProcessor { +public class SyncStateProcessor implements RpcProcessor { @Override - public void handleRequest(RpcContext rpcCtx, SyncDataRequest request) { + public void handleRequest(RpcContext rpcCtx, RequestState request) { log.info("request: {}", request); - - final SyncDataClosure closure = new SyncDataClosure() { + + final SyncClosure closure = new SyncClosure() { @Override public void run(Status status) { rpcCtx.sendResponse(getResponse()); @@ -52,7 +53,7 @@ public class SyncDataProcessor implements RpcProcessor { @Override public String interest() { - return SyncDataRequest.class.getName(); + return RequestState.class.getName(); } diff --git a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java index 1384ebe..d4dcc53 100644 --- a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java +++ b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java @@ -11,7 +11,7 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; import com.alipay.sofa.jraft.util.Endpoint; import com.yuandian.dataflow.statemachine.rpc.SMResponse; -import com.yuandian.dataflow.statemachine.rpc.SyncDataRequest; +import com.yuandian.dataflow.statemachine.rpc.RequestState; import com.yuandian.dataflow.statemachine.state.State; import lombok.var; @@ -26,7 +26,7 @@ public class StateMachineTest { var fstate = new State(); - var fdata = new SyncDataRequest(); + var fdata = new RequestState(); fdata.setState(fstate); var leader = new Endpoint("localhost",4441); @@ -45,7 +45,7 @@ public class StateMachineTest { while(true) { var state = new State(); - var request = new SyncDataRequest(); // 创建请求 + var request = new RequestState(); // 创建请求 request.setState(state); // 添加请求的参数 var wstate = state.getWorkers(); diff --git a/start.sh b/start.sh index 66d3c5d..003f772 100755 --- a/start.sh +++ b/start.sh @@ -4,6 +4,10 @@ screen -S raft-2 -X quit sleep 2 -screen -dmS raft-0 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 0 -screen -dmS raft-1 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 1 -screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2 \ No newline at end of file +screen -dmS raft-0 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 0 +screen -dmS raft-1 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 1 +screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2 + +screen -S raft-0 -X logfile flush 1 +screen -S raft-1 -X logfile flush 1 +screen -S raft-2 -X logfile flush 1 \ No newline at end of file