所有涉读写操作必须一体在onApply上完成.

This commit is contained in:
huangsimin 2022-08-03 14:41:23 +08:00
parent 69ac1d6317
commit d184fd014a
6 changed files with 34 additions and 18 deletions

View File

@ -65,7 +65,7 @@ public class MasterProcessor implements MasterExecute {
log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size());
cxt.sleep(5000);
} else {
log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size());
// log.debug("master({}) execute {} packets: {}", StateFactory.getServerId(), alivePeers, packetsManager.size());
}
@ -77,12 +77,11 @@ public class MasterProcessor implements MasterExecute {
// 等待全部反馈后才能进入下次循环
CountDownLatch latch = new CountDownLatch(alivePeers.size());
// 读一致性
StateFactory.readIndexState(new GenericClosure() {
@Override
public void run(Status status) {
var state = StateFactory.getStateServer().getFsm().getState();
// log.debug("masterExecute start {} {}", status, alivePeers);
var state = this.<State>getValue();
// var state = this.<State>getValue();
if (state == null) {
log.error("readIndexState获取的状态为 {}", state);
return;
@ -142,7 +141,6 @@ public class MasterProcessor implements MasterExecute {
@Override
public void complete(Object result, Throwable err) {
latch.countDown();
log.debug("countDown {}", latch.getCount());
if (err != null) {
// TODO: 如果错误, 需要让节点恢复任务处理的状态
log.debug("{}", err);
@ -159,9 +157,9 @@ public class MasterProcessor implements MasterExecute {
}
}
});
try {
latch.await(DEFAULT_ASYNC_TIMEOUT, TimeUnit.MILLISECONDS);

View File

@ -58,22 +58,22 @@ public class PacketsProcessor implements RpcProcessor<PacketsProcessor.PacketsRe
rpcCtx.sendResponse(resp); // 因为jraft是异步. 收到数据直接返回
try {
log.info("response {} ms", Duration.between(now, Instant.now()).toMillis()); // 返回response的时间
// log.info("response {} ms", Duration.between(now, Instant.now()).toMillis()); // 返回response的时间
// TODO: request.packets 入库,回填, 告警 等操作
Thread.sleep(ThreadLocalRandom.current().nextLong(100, 500));
Thread.sleep(ThreadLocalRandom.current().nextLong(10, 100));
} catch (InterruptedException e) {
log.info(e.toString());
} finally { // 确保 更新 最终的任务状态给master.
// 读状态 Closure<State> 里的 getValue<State> State的状态
var state = StateFactory.getStateServer().getFsm().getState() ; // 获取返回的状态
var ws = state.getWorkers().get(StateFactory.getServerId());
ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
ws.setUpdateAt(Instant.now()); // 设置更新时间
// 读状态 Closure<State> 里的 getValue<State> State的状态
var state = StateFactory.getStateServer().getFsm().getState() ; // 获取返回的状态
var ws = state.getWorkers().get(StateFactory.getServerId());
ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
ws.setUpdateAt(Instant.now()); // 设置更新时间
Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),

View File

@ -61,7 +61,8 @@ public class MasterFactory {
while (!cxt.getIsExit()) {
Instant now = Instant.now();
masterExecuteCls.loop(cxt);
log.info("master loop execute time: {} ms",Duration.between(now, Instant.now()).toMillis());
cxt.setLastLoopExecuteTime(Duration.between(now, Instant.now()));
// log.info("master loop execute time: {} ms",Duration.between(now, Instant.now()).toMillis());
}
}

View File

@ -277,8 +277,9 @@ public class StateFactory {
ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op)));
task.setDone(closure); // 确认所有数据 一致, 不需要加锁
StateFactory.getNode().apply(task);
} catch (CodecException e) {
String errorMsg = "Fail to encode WorkerState";
String errorMsg = "Fail to encode Operate";
log.debug("{} {}", errorMsg, e);
closure.failure(errorMsg, PeerId.emptyPeer());
closure.run(new Status(RaftError.EINTERNAL, errorMsg));

View File

@ -97,7 +97,7 @@ public class StateMachine extends StateMachineAdapter {
break;
case GET_STATE:
closure.setValue(this.state);
// log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex());
log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex());
break;
case REMOVE:

View File

@ -1,5 +1,6 @@
package com.yuandian.dataflow.statemachine.master;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
@ -9,6 +10,11 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MasterContext {
private AtomicBoolean isExit = new AtomicBoolean(false);
private Duration lastLoopExecuteTime = Duration.ZERO;
private Object share;
public Boolean getIsExit() {
@ -27,6 +33,16 @@ public class MasterContext {
this.share = share;
}
public void setLastLoopExecuteTime(Duration lastLoopExecuteTime) {
this.lastLoopExecuteTime = lastLoopExecuteTime;
}
public Duration getLastLoopExecuteTime() {
return lastLoopExecuteTime;
}
public void sleep(long millis) {
try {
Thread.sleep(millis);