完成状态机

This commit is contained in:
huangsimin 2022-07-13 13:33:00 +08:00
parent e8f990d883
commit aa7749f924
9 changed files with 169 additions and 130 deletions

View File

@ -14,7 +14,7 @@ import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.yuandian.dataflow.statemachine.RaftClosure;
import com.yuandian.dataflow.statemachine.SyncDataClosure;
import com.yuandian.dataflow.statemachine.StateMachine;
import com.yuandian.dataflow.statemachine.StateServer;
import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor;
@ -34,14 +34,14 @@ public class Server {
@Autowired
public static Node node;
public static RaftClosure done;
public static SyncDataClosure done;
private static StateServer stateServer;
public static Node GetNode() {
return node;
}
public static RaftClosure GetDone() {
public static SyncDataClosure GetDone() {
return done;
}

View File

@ -2,14 +2,15 @@ package com.yuandian.dataflow.controller;
import java.nio.ByteBuffer;
import com.alibaba.nacos.api.naming.pojo.Cluster;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.entity.Task;
import com.google.protobuf.util.JsonFormat;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcClient;
import com.yuandian.dataflow.Server;
import com.yuandian.dataflow.grpc.MongodbTest;
import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.SyncDataClosure;
import lombok.extern.slf4j.Slf4j;
@ -25,40 +26,41 @@ import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import com.alipay.sofa.jraft.Node;
@Slf4j
@Controller
public class TaskLog {
// private static final Logger log = LoggerFactory.getLogger(TaskLog.class);
private static Node node = Server.GetNode();
@PostMapping(path = "/test")
public ResponseEntity<Response> Processing(@RequestBody String json) {
@GetMapping(path = "/test")
public ResponseEntity<Response> Processing() {
/*Task task = new Task();
log.info("node.isLeader {} {} node.getNodeId() {}", node.getNodeState(), node.getLeaderId(), node.getNodeId());
if(node.isLeader()) {
// Task task = new Task();
log.error(node.toString());
// // 处理状态机
// RaftClosure done = new RaftClosure();
// task.setData(ByteBuffer.wrap("hello".getBytes()));
// task.setDone(done);
// log.error("{} {} {}",node, node.toString(), task);
// node.apply(task);
// log.error("{}", "RaftClosure");
RaftClosure done = new RaftClosure();
task.setData(ByteBuffer.wrap("hello".getBytes()));
task.setDone(done);
Server.GetNode().apply(task);*/
}
try {
// 1类型转换
BacktrackingFlowOuterClass.BacktrackingFlow.Builder builder = BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder();
JsonFormat.parser().merge(json, builder);
BacktrackingFlowOuterClass.BacktrackingFlow backtrackingFlow = builder.build();
// 2业务处理
// 3数据保存到 mongoDB
MongodbTest.insertMsgToMongoDB(backtrackingFlow);
} catch (Exception e) {
e.printStackTrace();
}
Response response = new Response();
response.Code = HttpStatus.OK;
@ -67,7 +69,7 @@ public class TaskLog {
}
@GetMapping(path = "/test2")
public ResponseEntity<Response> MongodbTest(@RequestBody int status) {
public ResponseEntity<Response> MongodbTest(@RequestParam int status) {
Response response = new Response();
return new ResponseEntity<Response>(response, HttpStatus.OK);

View File

@ -1,27 +0,0 @@
package com.yuandian.dataflow.statemachine;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RaftClosure implements Closure {
private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class);
@Override
public void run(Status status) {
LOG.info("Task completed with status"+status.getCode());
LOG.info("Task completed with "+status.getErrorMsg());
LOG.info("Task completed with "+status.getRaftError());
}
// @Override
// public void onCommitted() {
// System.out.println("Task onCommitted");
// }
}

View File

@ -20,7 +20,7 @@ import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.RaftClosure;
import com.yuandian.dataflow.statemachine.SyncDataClosure;
import com.yuandian.dataflow.statemachine.StateMachine;
import com.yuandian.dataflow.statemachine.rpc.SyncData;
import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor;

View File

@ -0,0 +1,45 @@
package com.yuandian.dataflow.statemachine;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.TaskState;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
@Getter
@Setter
@ToString
public abstract class SyncDataClosure implements Closure {
// 状态机的统一响应
private SMResponse response;
// 代表任务状态
private TaskState state;
protected void failure(final String errorMsg, final String redirect) {
final SMResponse response = new SMResponse();
response.setSuccess(false);
response.setMsg(errorMsg);
response.setRedirect(redirect);
setResponse(response);
}
protected void success(final TaskState value) {
final SMResponse response = new SMResponse();
response.setState(value);
response.setSuccess(true);
setResponse(response);
}
}

View File

@ -0,0 +1,36 @@
/**
* description
*
* @author eson
*2022年7月13日-09:07:22
*/
package com.yuandian.dataflow.statemachine.rpc;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月13日-09:07:22
*/
@Slf4j
@Getter
@Setter
@ToString
public class SMResponse {
private TaskState state;
private boolean success;
/**
* redirect peer id
*/
private String redirect;
private String msg;
}

View File

@ -29,76 +29,6 @@ public class SyncData implements Serializable {
private static final long serialVersionUID = 1L;
private long queueSize = 0;
// @Getter
// @Setter
// public class IncrementAndGetRequest implements Serializable {
// private long delta;
// }
// public class GetValueRequest implements Serializable {
// private static final long serialVersionUID = 9218253805003988802L;
// public GetValueRequest() {
// super();
// }
// }
// @Getter
// @Setter
// public class ValueResponse implements Serializable {
// private static final long serialVersionUID = -4220017686727146773L;
// private long value;
// private boolean success;
// /**
// * redirect peer id
// */
// private String redirect;
// private String errorMsg;
// }
// public class IncrementAndAddClosure implements Closure {
// // private CounterServer counterServer;
// private IncrementAndGetRequest request;
// private ValueResponse response;
// private Closure done; // 网络应答callback
// public IncrementAndAddClosure(CounterServer counterServer, IncrementAndGetRequest request, ValueResponse response,
// Closure done) {
// super();
// this.counterServer = counterServer;
// this.request = request;
// this.response = response;
// this.done = done;
// }
// @Override
// public void run(Status status) {
// // 返回应答给客户端
// if (this.done != null) {
// done.run(status);
// }
// }
// public IncrementAndGetRequest getRequest() {
// return this.request;
// }
// public void setRequest(IncrementAndGetRequest request) {
// this.request = request;
// }
// public ValueResponse getResponse() {
// return this.response;
// }
// }
private TaskState state;
}

View File

@ -6,8 +6,10 @@
*/
package com.yuandian.dataflow.statemachine.rpc;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.SyncDataClosure;
import lombok.extern.slf4j.Slf4j;
@ -24,7 +26,16 @@ public class SyncDataProcessor implements RpcProcessor<SyncData> {
public void handleRequest(RpcContext rpcCtx, SyncData request) {
log.info("{}", rpcCtx);
log.info("{}", request);
rpcCtx.sendResponse(null); //
final SyncDataClosure closure = new SyncDataClosure() {
@Override
public void run(Status status) {
rpcCtx.sendResponse(getResponse());
}
};
}
@Override

View File

@ -0,0 +1,42 @@
/**
* description
*
* @author eson
*2022年7月13日-09:11:26
*/
package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import com.alipay.sofa.jraft.entity.PeerId;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
/**
* 代表任务状态 暂时全局使用这个结构
*
* @author eson
*2022年7月13日-09:11:26
*/
@Slf4j
@Getter
@Setter
public class TaskState implements Serializable {
private static final long serialVersionUID = -1L;
// 节点的对应peerID
private PeerId peerId;
private long taskQueueSize;
public static void main(String[] args) {
}
}