From 20e50d142d1c385d19409cbd034777057d84ec1e Mon Sep 17 00:00:00 2001 From: huangsimin Date: Tue, 19 Jul 2022 00:04:59 +0800 Subject: [PATCH] finish --- .../java/com/yuandian/dataflow/Server.java | 3 +- .../yuandian/dataflow/controller/TaskLog.java | 12 ++++++- .../java/com/yuandian/dataflow/projo/Doc.java | 22 ++++-------- .../dataflow/statemachine/StateMachine.java | 36 +++++++++++++++---- .../statemachine/StateServerFactory.java | 10 ++++-- .../rpc/SyncConditionProcessor.java | 3 +- .../com/yuandian/dataflow}/MongodbTest.java | 2 +- .../statemachine/StateMachineTest.java | 6 ++-- 8 files changed, 61 insertions(+), 33 deletions(-) rename src/{main/java/com/yuandian/dataflow/grpc => test/java/com/yuandian/dataflow}/MongodbTest.java (98%) diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 7702caa..89ce643 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -7,6 +7,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; import com.alipay.remoting.serialization.SerializerManager; import com.alipay.sofa.jraft.JRaftUtils; @@ -31,7 +32,7 @@ import lombok.extern.slf4j.Slf4j; * */ @Slf4j -@SpringBootApplication +@SpringBootApplication(exclude = {MongoAutoConfiguration.class}) @SpringBootConfiguration public class Server { diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index bfba04e..4fbf2c8 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -7,10 +7,13 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.error.RemotingException; import com.yuandian.dataflow.projo.Response; import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.SyncClosure; import com.yuandian.dataflow.statemachine.state.State; +import com.yuandian.dataflow.statemachine.state.WorkerState; import lombok.var; import lombok.extern.slf4j.Slf4j; @@ -20,7 +23,7 @@ import lombok.extern.slf4j.Slf4j; public class TaskLog { @GetMapping(path = "/test") - public ResponseEntity Processing() throws InterruptedException { + public ResponseEntity Processing() throws InterruptedException, RemotingException { StateServerFactory.getStateServer().useFsmState((fsmState)->{ @@ -31,6 +34,13 @@ public class TaskLog { // state.getWorker().setPeerId(StateServerFactory.getStateServer().getNode().getNodeId().getPeerId()); // state.getWorker().setTaskQueueSize(1); + + StateServerFactory.getStateServer().updateFsmState((fsmState)->{ + log.error(fsmState.toString() ); + fsmState.getWorkers().put(new PeerId(), new WorkerState()); + return fsmState; + }); + diff --git a/src/main/java/com/yuandian/dataflow/projo/Doc.java b/src/main/java/com/yuandian/dataflow/projo/Doc.java index 502695f..e1389a1 100644 --- a/src/main/java/com/yuandian/dataflow/projo/Doc.java +++ b/src/main/java/com/yuandian/dataflow/projo/Doc.java @@ -2,27 +2,20 @@ package com.yuandian.dataflow.projo; -import java.io.Serializable; -import java.lang.annotation.Documented; import java.time.LocalDateTime; import org.bson.Document; -import org.bson.codecs.pojo.annotations.BsonCreator; -import org.bson.codecs.pojo.annotations.BsonDiscriminator; import org.bson.codecs.pojo.annotations.BsonProperty; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.util.JSONPObject; import lombok.Getter; import lombok.Setter; - - -@Getter @Setter -public final class Doc extends Document { +@Getter +public final class Doc extends Document { @JsonProperty("code") @BsonProperty("code") @@ -33,14 +26,11 @@ public final class Doc extends Document { public LocalDateTime TS; @JsonProperty("desc") - @BsonProperty("desc") + @BsonProperty("desc") public String Desc; @JsonProperty("data") - @BsonProperty("data") - public Document Data; - - - - + @BsonProperty("data") + public Document Data; + } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index aae79a4..ec27c09 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -80,11 +80,32 @@ public class StateMachine extends StateMachineAdapter { /** * Returns current value. 读取修改都在这个函数域内进行 + * @throws RemotingException + * @throws InterruptedException */ - public void updateState(Function dofunc) { + public void updateState(Function dofunc) throws InterruptedException, RemotingException { synchronized(this.state) { var newstate = dofunc.apply(this.state); - this.state = newstate; + var ss = StateServerFactory.getStateServer(); + if(!isLeader()) { + var request = new RequestState(); + request.setState(newstate); + var result = ss.getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000); + log.info("{}", result); + return; + } + + // this.state = newstate; + if(newstate != null) { + var colsure = new SyncClosure() { + @Override + public void run(Status status) { + + } + }; + colsure.setValue(newstate); + StateServerFactory.getStateServer().applyState(newstate, colsure); + } } } @@ -98,19 +119,22 @@ public class StateMachine extends StateMachineAdapter { // This task is applied by this node, get value from closure to avoid additional // parsing. var closure = (SyncClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 - state = closure.getValue(); - log.info("SyncDataClosure(done) taskState:{} leaderTerm:{}",state, this.leaderTerm); + log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure); + synchronized(this.state) { + this.state = closure.getValue(); + } + closure.success(state); closure.run(Status.OK()); } else { // Have to parse FetchAddRequest from this user log. final ByteBuffer data = iter.getData(); try { - + synchronized(state) { state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( data.array(), State.class.getName()); log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm); - + } } catch (CodecException e) { e.printStackTrace(); diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index e1cac63..edcc8aa 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -171,6 +171,10 @@ public class StateServerFactory { } + public void updateFsmState(Function dofunc) throws InterruptedException, RemotingException { + this.getFsm().updateState(dofunc); + } + public void applyState(State state, SyncClosure closure) { // 所有的提交都必须再leader进行 @@ -193,7 +197,7 @@ public class StateServerFactory { } } - public void applyWorkerState(WorkerState state, SyncClosure closure) { + public void applyWorkerState(WorkerState state, SyncClosure closure) { if (!ss.isLeader()) { ss.handlerNotLeaderError(closure); return; @@ -209,13 +213,13 @@ public class StateServerFactory { wmap.put(state.getPeerId(), state); try { final Task task = new Task(); - log.error("{}", fsmState); + closure.setValue(fsmState); task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(fsmState))); task.setDone(closure); StateServerFactory.getStateServer().getNode().apply(task); // 提交数据 } catch (CodecException e) { String errorMsg = "Fail to encode TaskState"; - log.error(errorMsg, e); + log.error("{}:{}",errorMsg, e); closure.failure(errorMsg, PeerId.emptyPeer()); closure.run(new Status(RaftError.EINTERNAL, errorMsg)); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncConditionProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncConditionProcessor.java index fefff27..263a63d 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncConditionProcessor.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/SyncConditionProcessor.java @@ -19,6 +19,7 @@ import com.lmax.disruptor.WorkProcessor; import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.SyncClosure; import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer; +import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; import com.alipay.sofa.jraft.entity.PeerId; @@ -40,7 +41,7 @@ public class SyncConditionProcessor implements RpcProcessor { public void handleRequest(RpcContext rpcCtx, RequestCondition request) { log.info("request: {}", request); - final SyncClosure closure = new SyncClosure() { + final SyncClosure closure = new SyncClosure< State>() { @Override public void run(Status status) { rpcCtx.sendResponse(getResponse()); diff --git a/src/main/java/com/yuandian/dataflow/grpc/MongodbTest.java b/src/test/java/com/yuandian/dataflow/MongodbTest.java similarity index 98% rename from src/main/java/com/yuandian/dataflow/grpc/MongodbTest.java rename to src/test/java/com/yuandian/dataflow/MongodbTest.java index d250df0..73e1ed9 100644 --- a/src/main/java/com/yuandian/dataflow/grpc/MongodbTest.java +++ b/src/test/java/com/yuandian/dataflow/MongodbTest.java @@ -1,4 +1,4 @@ -package com.yuandian.dataflow.grpc; +package com.yuandian.dataflow; import com.mongodb.MongoClient; import com.mongodb.MongoCredential; diff --git a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java index 809cb28..329abfa 100644 --- a/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java +++ b/src/test/java/com/yuandian/dataflow/statemachine/StateMachineTest.java @@ -23,8 +23,7 @@ public class StateMachineTest { void testOnApply() throws InterruptedException, RemotingException { var rpcClient = new BoltRaftRpcFactory().createRpcClient(); rpcClient.init(new CliOptions()); - - + var fstate = new State(); var fdata = new RequestState(); fdata.setState(fstate); @@ -39,8 +38,7 @@ public class StateMachineTest { , 5000); log.info("{}", resp); } - - + int i = 0 ; while(true) {