逻辑跑通

This commit is contained in:
huangsimin 2022-07-19 10:15:54 +08:00
parent 20e50d142d
commit 6f743686a1
5 changed files with 55 additions and 32 deletions

View File

@ -27,7 +27,7 @@ public class TaskLog {
StateServerFactory.getStateServer().useFsmState((fsmState)->{ StateServerFactory.getStateServer().useFsmState((fsmState)->{
log.error(fsmState.toString() ); log.debug(fsmState.toString() );
return null; return null;
}); });
@ -36,14 +36,11 @@ public class TaskLog {
// state.getWorker().setTaskQueueSize(1); // state.getWorker().setTaskQueueSize(1);
StateServerFactory.getStateServer().updateFsmState((fsmState)->{ StateServerFactory.getStateServer().updateFsmState((fsmState)->{
log.error(fsmState.toString() ); log.debug(fsmState.toString() );
fsmState.getWorkers().put(new PeerId(), new WorkerState()); fsmState.getWorkers().put(new PeerId(), new WorkerState());
return fsmState; return fsmState;
}); });
final Response response = new Response(); final Response response = new Response();
response.Code = HttpStatus.OK; response.Code = HttpStatus.OK;
response.Message = "ok"; response.Message = "ok";

View File

@ -44,12 +44,12 @@ public class Header {
var out = new DataOutputStream(sock.getOutputStream()); var out = new DataOutputStream(sock.getOutputStream());
// 发送验证字符串 // 发送验证字符串
// out.write("public".getBytes()); // out.write("public".getBytes());
// log.error("{}", PacketHeader.PacketCode(in)); // log.debug("{}", PacketHeader.PacketCode(in));
// var pheader = new PacketHeader(in); // var pheader = new PacketHeader(in);
// log.error("{}", pheader); // log.debug("{}", pheader);
// var pbase = PacketBase.createPacketBase(pheader); // var pbase = PacketBase.createPacketBase(pheader);
// log.error("{}",pbase); // log.debug("{}",pbase);
// //60010流需要解压 // //60010流需要解压
@ -74,7 +74,7 @@ public class Header {
// var length = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt(); // var length = ByteBuffer.wrap(in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
// if (length <= 13) { // if (length <= 13) {
// log.error("[{}:{}][数据总条数:{}][当前解析第{}条][length<13][探针发包有问题]"); // log.debug("[{}:{}][数据总条数:{}][当前解析第{}条][length<13][探针发包有问题]");
// throw new Exception("数据解析异常"); // throw new Exception("数据解析异常");
// } // }
@ -90,7 +90,7 @@ public class Header {
// if (length <= 13) { // if (length <= 13) {
// log.error("[{}:{}][数据总条数:{}][当前解析第{}条][length<13][探针发包有问题]"); // log.debug("[{}:{}][数据总条数:{}][当前解析第{}条][length<13][探针发包有问题]");
// throw new Exception("数据解析异常"); // throw new Exception("数据解析异常");
// } // }

View File

@ -153,7 +153,7 @@ public class StateMachine extends StateMachineAdapter {
@Override @Override
public void onError(final RaftException e) { public void onError(final RaftException e) {
log.error("Raft error: {}", e, e); log.debug("Raft error: {}", e, e);
} }
@Override @Override
@ -167,30 +167,41 @@ public class StateMachine extends StateMachineAdapter {
this.leaderTerm.set(term); this.leaderTerm.set(term);
super.onLeaderStart(term); super.onLeaderStart(term);
var ss = StateServerFactory.getStateServer();
var ws = new WorkerState();
ws.setPeerId(ss.getCluster().getServerId());
final SyncClosure<State> closure = new SyncClosure< State>() {
@Override
public void run(Status status) {
log.debug("leader set WorkerState {} ", status);
}
};
ss.applyWorkerState(ws, closure);
return;
} }
@Override @Override
public void onStartFollowing(LeaderChangeContext ctx) { public void onStartFollowing(LeaderChangeContext ctx) {
super.onStartFollowing(ctx); super.onStartFollowing(ctx);
var ss = StateServerFactory.getStateServer();
var node = ss.getNode();
try { try {
var ss = StateServerFactory.getStateServer();
var ws = new WorkerState();
ws.setPeerId(ss.getCluster().getServerId());
var request = new RequestCondition(); var request = new RequestCondition();
request.setWorkerState( new WorkerState() ); request.setWorkerState(ws);
request.getWorkerState().setPeerId( ss.getCluster().getServerId() ); log.debug("my: {} leader id {}",ss.getCluster().getServerId(), ss.getNode().getLeaderId());
log.error("------------ leader id {}", node.getLeaderId());
ResponseSM resp; ResponseSM resp;
resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(node.getLeaderId().getEndpoint(), request, 5000); resp = (ResponseSM)StateServerFactory.getStateServer().getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000);
log.error("{}", resp); if(resp == null) {
log.error("{} set WorkerState is error", resp);
}
log.debug("WorkerState is {}", resp);
return;
} catch (InterruptedException | RemotingException e) { } catch (InterruptedException | RemotingException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
@Override @Override

View File

@ -62,7 +62,7 @@ public class StateServerFactory {
throw new Exception("重复初始化 InitStateServer"); throw new Exception("重复初始化 InitStateServer");
} }
ss = new StateServerFactory.StateServer(peerstr, conf); ss = new StateServerFactory.StateServer(peerstr, conf);
log.error("init peerid {}", ss.node.getNodeId().getPeerId()); log.debug("init peerid {}", ss.node.getNodeId().getPeerId());
} }
@ -143,7 +143,7 @@ public class StateServerFactory {
getFsm().useState((fsmState)->{ getFsm().useState((fsmState)->{
if(status.isOk()){ if(status.isOk()){
log.error("readIndex {}", fsmState); log.debug("readIndex {}", fsmState);
closure.success(fsmState); closure.success(fsmState);
closure.run(Status.OK()); closure.run(Status.OK());
dofunc.apply(fsmState); dofunc.apply(fsmState);
@ -152,7 +152,7 @@ public class StateServerFactory {
readIndexExecutor.execute(() -> { readIndexExecutor.execute(() -> {
if(isLeader()){ if(isLeader()){
log.error("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); log.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", getNode());
applyState(fsmState, closure); applyState(fsmState, closure);
}else { }else {
handlerNotLeaderError(closure); handlerNotLeaderError(closure);
@ -176,6 +176,8 @@ public class StateServerFactory {
} }
public void applyState(State state, SyncClosure<State> closure) { public void applyState(State state, SyncClosure<State> closure) {
// 所有的提交都必须再leader进行 // 所有的提交都必须再leader进行
if (!ss.isLeader()) { if (!ss.isLeader()) {
@ -191,7 +193,7 @@ public class StateServerFactory {
StateServerFactory.getStateServer().getNode().apply(task); StateServerFactory.getStateServer().getNode().apply(task);
} catch (CodecException e) { } catch (CodecException e) {
String errorMsg = "Fail to encode TaskState"; String errorMsg = "Fail to encode TaskState";
log.error(errorMsg, e); log.debug(errorMsg, e);
closure.failure(errorMsg, PeerId.emptyPeer()); closure.failure(errorMsg, PeerId.emptyPeer());
closure.run(new Status(RaftError.EINTERNAL, errorMsg)); closure.run(new Status(RaftError.EINTERNAL, errorMsg));
} }
@ -219,7 +221,7 @@ public class StateServerFactory {
StateServerFactory.getStateServer().getNode().apply(task); // 提交数据 StateServerFactory.getStateServer().getNode().apply(task); // 提交数据
} catch (CodecException e) { } catch (CodecException e) {
String errorMsg = "Fail to encode TaskState"; String errorMsg = "Fail to encode TaskState";
log.error("{}:{}",errorMsg, e); log.debug("{}:{}",errorMsg, e);
closure.failure(errorMsg, PeerId.emptyPeer()); closure.failure(errorMsg, PeerId.emptyPeer());
closure.run(new Status(RaftError.EINTERNAL, errorMsg)); closure.run(new Status(RaftError.EINTERNAL, errorMsg));
} }
@ -252,7 +254,7 @@ public class StateServerFactory {
public void run(Status status, long index, byte[] reqCtx) { public void run(Status status, long index, byte[] reqCtx) {
if(status.isOk()){ if(status.isOk()){
log.error("readIndex {}", fsmState); log.debug("readIndex {}", fsmState);
closure.success(fsmState); closure.success(fsmState);
closure.run(Status.OK()); closure.run(Status.OK());
return; return;
@ -260,7 +262,7 @@ public class StateServerFactory {
readIndexExecutor.execute(() -> { readIndexExecutor.execute(() -> {
if(isLeader()){ if(isLeader()){
log.error("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); log.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
applyState(fsmState, closure); applyState(fsmState, closure);
}else { }else {
handlerNotLeaderError(closure); handlerNotLeaderError(closure);
@ -310,6 +312,8 @@ public class StateServerFactory {
} }
} }
public static void main(String[] args) throws InterruptedException, RemotingException { public static void main(String[] args) throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient(); var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions()); rpcClient.init(new CliOptions());

View File

@ -6,8 +6,19 @@
%d{yyyyMMdd HH:mm:ss.SSS} %level %thread\(%file:%line\): %msg%n %d{yyyyMMdd HH:mm:ss.SSS} %level %thread\(%file:%line\): %msg%n
</pattern> </pattern>
</encoder> </encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>DEBUG</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender> </appender>
<root level="INFO">
<!-- <root level="INFO">
<appender-ref ref="CONSOLE" /> <appender-ref ref="CONSOLE" />
</root> </root> -->
<logger name="com.yuandian.dataflow" level="debug">
<appender-ref ref="CONSOLE" />
</logger>
</configuration> </configuration>