完善closure失败的输入

This commit is contained in:
huangsimin 2022-07-28 16:01:34 +08:00
parent 11ed9ceb61
commit 7381a55174
2 changed files with 40 additions and 25 deletions

View File

@ -57,36 +57,48 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
// StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted); // StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
var resp = new RaftResponse<>(); var resp = new RaftResponse<>();
resp.setSuccess(true); resp.setSuccess(true);
rpcCtx.sendResponse(resp); rpcCtx.sendResponse(resp); // 因为jraft是异步. 收到数据直接返回
var ss = StateFactory.getStateServer(); try {
log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size()); log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size());
// TODO: request.packets 入库,回填, 告警 等操作
// 读状态 Closure<State> 里的 getValue<State> State的状态
ss.readIndexState(new GenericClosure<State>() {
@Override
public void run(Status status) {
if (status.isOk()) { } finally { // 确保 更新 最终的任务状态给master.
// 读状态 Closure<State> 里的 getValue<State> State的状态
StateFactory.readIndexState(new GenericClosure<State>() {
@Override
public void run(Status status) {
if (!status.isOk()) {
log.error("失败 readIndexState {}", status);
}
// readIndexState 失败后也需要直接 更新自己状态
var state = this.getValue(); // 获取返回的状态 var state = this.getValue(); // 获取返回的状态
var ws = state.getWorkers().get(StateFactory.getServerId()); var ws = state.getWorkers().get(StateFactory.getServerId());
ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
ws.setUpdateAt(Instant.now()); // 设置更新时间 ws.setUpdateAt(Instant.now()); // 设置更新时间
// log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
// request.packets.size(), state.getWorkers().size()); new GenericClosure<Operate>() {
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws), new GenericClosure<Operate>() { @Override
@Override public void run(Status status) {
public void run(Status status) { if (!status.isOk()) {
if (status.isOk()) { log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp);
log.info("[{}] {}", StateFactory.getServerId(), resp); }
} }
} });
});
} }
} });
}); }
;
} }
@Override @Override

View File

@ -3,6 +3,8 @@ package com.yuandian.dataflow.statemachine.operate;
import java.io.Serializable; import java.io.Serializable;
import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.yuandian.dataflow.statemachine.StateFactory; import com.yuandian.dataflow.statemachine.StateFactory;
@ -77,8 +79,8 @@ public class Operate implements Serializable {
@Override @Override
public void complete(Object result, Throwable err) { public void complete(Object result, Throwable err) {
log.info("Object result {}", result); log.debug("Object result {}", result);
//TODO: 解决回调的次序问题
var resp = (RaftResponse<Operate>) result; var resp = (RaftResponse<Operate>) result;
closure.setResponse(resp); closure.setResponse(resp);
closure.success(resp.getValue()); closure.success(resp.getValue());
@ -87,7 +89,8 @@ public class Operate implements Serializable {
}, 5000); }, 5000);
} catch (InterruptedException | RemotingException e) { } catch (InterruptedException | RemotingException e) {
closure.failure("failure", null); closure.failure(e.getMessage(), null);
closure.run(new Status(100000, "invokeAsync fail"));
log.info("{}", e.toString()); log.info("{}", e.toString());
} }