完成状态机的读写

This commit is contained in:
huangsimin 2022-07-15 01:00:22 +08:00
parent 886a5ffe1a
commit dce906a7eb
4 changed files with 46 additions and 44 deletions

View File

@ -34,10 +34,6 @@ import lombok.extern.slf4j.Slf4j;
@SpringBootConfiguration @SpringBootConfiguration
public class Server { public class Server {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {

View File

@ -1,14 +1,12 @@
package com.yuandian.dataflow.controller; package com.yuandian.dataflow.controller;
import java.nio.ByteBuffer; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import com.alibaba.nacos.api.naming.pojo.Cluster; import com.alipay.sofa.jraft.Status;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcClient;
import com.yuandian.dataflow.Server;
import com.yuandian.dataflow.projo.Response; import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.statemachine.StateServerFactory; import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncDataClosure; import com.yuandian.dataflow.statemachine.SyncDataClosure;
@ -17,23 +15,6 @@ import com.yuandian.dataflow.statemachine.rpc.TaskState;
import lombok.var; import lombok.var;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ObjectUtils.Null;
import org.apache.commons.logging.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.Status;
@Slf4j @Slf4j
@Controller @Controller
public class TaskLog { public class TaskLog {
@ -41,23 +22,35 @@ public class TaskLog {
@GetMapping(path = "/test") @GetMapping(path = "/test")
public ResponseEntity<Response> Processing() { public ResponseEntity<Response> Processing() throws InterruptedException {
// var state = StateServerFactory.getStateServer().getFsm().getTaskState(); // var state = StateServerFactory.getStateServer().getFsm().getTaskState();
var c = new SyncDataClosure() {
SyncDataClosure closure = new SyncDataClosure() {
@Override @Override
public void run(Status status) { public void run(Status status) {
log.info(getTaskState().toString()); synchronized(lockObject) {
lockObject.notify();
}
} }
}; };
StateServerFactory.getStateServer().readIndexState(true, c ); var state = new TaskState();
state.setTaskQueueSize(1);
closure.setTaskState(state);
StateServerFactory.getStateServer().readIndexState(true, closure);
synchronized(closure.lockObject) {
closure.lockObject.wait();
}
Response response = new Response();
final Response response = new Response();
response.Code = HttpStatus.OK; response.Code = HttpStatus.OK;
response.Message = "OK"; response.Message = closure.getTaskState().toString();
return new ResponseEntity<Response>(response, HttpStatus.OK); return new ResponseEntity<Response>(response, HttpStatus.OK);
} }

View File

@ -140,6 +140,8 @@ public class StateServerFactory {
public void readIndexState(final boolean readOnlySafe, final SyncDataClosure closure) { public void readIndexState(final boolean readOnlySafe, final SyncDataClosure closure) {
if(!readOnlySafe){ if(!readOnlySafe){
closure.setTaskState(getTaskState());
closure.success(getTaskState()); closure.success(getTaskState());
closure.run(Status.OK()); closure.run(Status.OK());
return; return;
@ -149,14 +151,21 @@ public class StateServerFactory {
@Override @Override
public void run(Status status, long index, byte[] reqCtx) { public void run(Status status, long index, byte[] reqCtx) {
if(status.isOk()){ if(status.isOk()){
if(closure.getTaskState() != null){
applyState(closure.getTaskState(), closure);
} else {
closure.setTaskState(getTaskState());
closure.success(getTaskState()); closure.success(getTaskState());
closure.run(Status.OK()); closure.run(Status.OK());
}
return; return;
} }
readIndexExecutor.execute(() -> { readIndexExecutor.execute(() -> {
if(isLeader()){ if(isLeader()){
log.debug("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status); log.info("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
applyState(getTaskState(), closure); applyState(getTaskState(), closure);
}else { }else {
handlerNotLeaderError(closure); handlerNotLeaderError(closure);

View File

@ -25,6 +25,12 @@ public abstract class SyncDataClosure implements Closure {
// 代表任务状态 // 代表任务状态
private TaskState taskState; private TaskState taskState;
public Object lockObject = new Object();
public SyncDataClosure() {
}
public void failure(final String errorMsg, final PeerId redirect) { public void failure(final String errorMsg, final PeerId redirect) {
final SMResponse response = new SMResponse(); final SMResponse response = new SMResponse();
@ -41,6 +47,4 @@ public abstract class SyncDataClosure implements Closure {
setResponse(response); setResponse(response);
} }
} }