diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index 1360774..bfba04e 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -22,15 +22,7 @@ public class TaskLog { @GetMapping(path = "/test") public ResponseEntity Processing() throws InterruptedException { - SyncClosure closure = new SyncClosure() { - @Override - public void run(Status status) { - log.error("{} {}",getValue().toString(), getValue().getWorkers().size()); - } - }; - - var state = new State(); - closure.setValue(state); + StateServerFactory.getStateServer().useFsmState((fsmState)->{ log.error(fsmState.toString() ); return null; @@ -40,7 +32,7 @@ public class TaskLog { // state.getWorker().setPeerId(StateServerFactory.getStateServer().getNode().getNodeId().getPeerId()); // state.getWorker().setTaskQueueSize(1); - StateServerFactory.getStateServer().readIndexState(true, closure); + final Response response = new Response(); response.Code = HttpStatus.OK; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 969eac1..aae79a4 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -97,7 +97,7 @@ public class StateMachine extends StateMachineAdapter { if (iter.done() != null) { // This task is applied by this node, get value from closure to avoid additional // parsing. - var closure = (SyncClosure)iter.done(); + var closure = (SyncClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 state = closure.getValue(); log.info("SyncDataClosure(done) taskState:{} leaderTerm:{}",state, this.leaderTerm); closure.success(state); @@ -109,7 +109,7 @@ public class StateMachine extends StateMachineAdapter { state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( data.array(), State.class.getName()); - log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm); + log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm); } catch (CodecException e) { diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 9d0f091..e1cac63 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -185,7 +185,6 @@ public class StateServerFactory { task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(state))); task.setDone(closure); // 确认所有数据 一致, 不需要加锁 StateServerFactory.getStateServer().getNode().apply(task); - } catch (CodecException e) { String errorMsg = "Fail to encode TaskState"; log.error(errorMsg, e);