TODO: 更好设计

This commit is contained in:
huangsimin 2022-07-18 17:54:37 +08:00
parent 18d329910d
commit ad78dd2613
3 changed files with 54 additions and 42 deletions

View File

@ -31,9 +31,9 @@ public class TaskLog {
var state = new State();
closure.setValue(state);
StateServerFactory.getStateServer().useState((fsmState)->{
StateServerFactory.getStateServer().useFsmState((fsmState)->{
log.error(fsmState.toString() );
return fsmState;
return null;
});

View File

@ -64,9 +64,9 @@ public class StateMachine extends StateMachineAdapter {
/**
* Returns current value. 只有Get 操作状态由协议流程决定 Apply
*/
// public State getState() {
// return state;
// }
public State getState() {
return state;
}
/**
@ -106,10 +106,12 @@ public class StateMachine extends StateMachineAdapter {
// Have to parse FetchAddRequest from this user log.
final ByteBuffer data = iter.getData();
try {
state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), State.class.getName());
log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm);
state = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(), State.class.getName());
log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm);
} catch (CodecException e) {
e.printStackTrace();
}
@ -147,7 +149,7 @@ public class StateMachine extends StateMachineAdapter {
@Override
public void onStartFollowing(LeaderChangeContext ctx) {
// TODO Auto-generated method stub
super.onStartFollowing(ctx);
var ss = StateServerFactory.getStateServer();
@ -162,7 +164,7 @@ public class StateMachine extends StateMachineAdapter {
resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(node.getLeaderId().getEndpoint(), request, 5000);
log.error("{}", resp);
} catch (InterruptedException | RemotingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

View File

@ -126,48 +126,54 @@ public class StateServerFactory {
}
public void useState(Function<State, Void> dofunc) {
this.fsm.useState((fsmState)->{
public void useFsmState(Function<State, Void> dofunc) {
SyncClosure<State> closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
}
};
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if(status.isOk()){
log.error("readIndex {}", fsmState);
closure.success(fsmState);
closure.run(Status.OK());
return;
}
readIndexExecutor.execute(() -> {
if(isLeader()){
log.error("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
applyState(fsmState, closure);
}else {
handlerNotLeaderError(closure);
getFsm().useState((fsmState)->{
if(status.isOk()){
log.error("readIndex {}", fsmState);
closure.success(fsmState);
closure.run(Status.OK());
dofunc.apply(fsmState);
return null;
}
readIndexExecutor.execute(() -> {
if(isLeader()){
log.error("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
applyState(fsmState, closure);
}else {
handlerNotLeaderError(closure);
}
});
return null;
});
}
});
return null;
});
return ;
}
public void applyState(State state, SyncClosure<State> closure) {
// 所有的提交都必须再leader进行
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);
return;
@ -177,7 +183,7 @@ public class StateServerFactory {
closure.setValue(state);
final Task task = new Task();
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(state)));
task.setDone(closure);
task.setDone(closure); // 确认所有数据 一致, 不需要加锁
StateServerFactory.getStateServer().getNode().apply(task);
} catch (CodecException e) {
@ -194,7 +200,9 @@ public class StateServerFactory {
return;
}
useState((fsmState)->{
useFsmState((fsmState)->{
var wmap = fsmState.getWorkers();
var wstate = wmap.get(state.getPeerId());
@ -205,16 +213,18 @@ public class StateServerFactory {
log.error("{}", fsmState);
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(fsmState)));
task.setDone(closure);
StateServerFactory.getStateServer().getNode().apply(task);
StateServerFactory.getStateServer().getNode().apply(task); // 提交数据
} catch (CodecException e) {
String errorMsg = "Fail to encode TaskState";
log.error(errorMsg, e);
closure.failure(errorMsg, PeerId.emptyPeer());
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
}
} else {
closure.success(fsmState);
closure.run(Status.OK());
}
return fsmState;
return null;
});
@ -225,13 +235,13 @@ public class StateServerFactory {
public void readIndexState(final boolean readOnlySafe, final SyncClosure<State> closure) {
useState((fsmState)->{
useFsmState((fsmState)->{
closure.setValue(fsmState);
if(!readOnlySafe){
closure.success(fsmState);
closure.run(Status.OK());
return fsmState;
return null;
}
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@ -256,7 +266,7 @@ public class StateServerFactory {
}
});
return fsmState;
return null;
});