This commit is contained in:
huangsimin 2022-07-18 03:07:52 +08:00
parent 5a50ce22ef
commit 493f3d4907
14 changed files with 188 additions and 109 deletions

View File

@ -192,7 +192,7 @@
<version>1.6.2</version>
</plugin>
<plugin>
<!-- <plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
@ -213,7 +213,7 @@
</goals>
</execution>
</executions>
</plugin>
</plugin> -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>

View File

@ -15,10 +15,10 @@ import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.yuandian.dataflow.statemachine.SyncDataClosure;
import com.yuandian.dataflow.statemachine.SyncClosure;
import com.yuandian.dataflow.statemachine.StateMachine;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
@ -54,29 +54,7 @@ public class Server {
Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
StateServerFactory.InitStateServer(peeridstr, conf);
// Thread printer = new Thread( new Runnable(){
// @Override
// public void run() {
// // TODO Auto-generated method stub
// while(true) {
// var state = StateServerFactory.getStateServer().getFsm().getState();
// log.info("{}", state);
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// }
// }
// } );
// printer.start();
System.setProperty("server.port", sprPort);
var app = SpringApplication.run(Server.class, args);
app.start();

View File

@ -9,7 +9,7 @@ import org.springframework.web.bind.annotation.RequestParam;
import com.alipay.sofa.jraft.Status;
import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncDataClosure;
import com.yuandian.dataflow.statemachine.SyncClosure;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.var;
@ -24,17 +24,17 @@ public class TaskLog {
@GetMapping(path = "/test")
public ResponseEntity<Response> Processing() throws InterruptedException {
SyncDataClosure closure = new SyncDataClosure() {
SyncClosure<State> closure = new SyncClosure<State>() {
@Override
public void run(Status status) {
log.info(getState().toString());
log.error("{} {}",getValue().toString(), getValue().getWorkers().size());
}
};
var state = new State();
closure.setState(state);
closure.setValue(state);
log.info(StateServerFactory.getStateServer().getFsm().getState().toString() );
log.error(StateServerFactory.getStateServer().getFsm().getState().toString() );
// state.getWorker().setPeerId(StateServerFactory.getStateServer().getNode().getNodeId().getPeerId());
// state.getWorker().setTaskQueueSize(1);

View File

@ -19,7 +19,7 @@ import lombok.Setter;
@BsonDiscriminator
@Getter
@Setter
public final class Doc extends Document {

View File

@ -24,8 +24,8 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.Utils;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncDataRequest;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.RequestState;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.var;
@ -70,8 +70,8 @@ public class StateMachine extends StateMachineAdapter {
if (iter.done() != null) {
// This task is applied by this node, get value from closure to avoid additional
// parsing.
var closure = (SyncDataClosure)iter.done();
state = closure.getState();
var closure = (SyncClosure<State>)iter.done();
state = closure.getValue();
log.info("SyncDataClosure(done) taskState:{} leaderTerm:{}",state, this.leaderTerm);
closure.success(state);
closure.run(Status.OK());

View File

@ -14,6 +14,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
@ -27,13 +28,15 @@ import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncDataRequest;
import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.rpc.RequestCondition;
import com.yuandian.dataflow.statemachine.rpc.RequestState;
import com.yuandian.dataflow.statemachine.rpc.SyncStateProcessor;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
@ -53,33 +56,23 @@ import lombok.extern.slf4j.Slf4j;
public class StateServerFactory {
private static StateServer ss;
// 必须初始化
public static void InitStateServer(String peerstr, Configuration conf) throws Exception {
if(ss != null) {
throw new Exception("重复初始化 InitStateServer");
}
ss = new StateServerFactory.StateServer(peerstr, conf);
log.error("init peerid {}", ss.node.getNodeId().getPeerId());
ss.getNode().join();
var request = new RequestCondition();
request.setWorkerState( new WorkerState() );
request.getWorkerState().setPeerId( ss.cluster.getServerId() );
log.error("{}", ss.getNode().getNodeMetrics() );
ss.readIndexState(true, new SyncDataClosure() {
@Override
public void run(Status status) {
log.info("add peerid {}", getState());
var wstate = getState().getWorkers().get(ss.node.getNodeId().getPeerId());
if(wstate == null) {
wstate = new WorkerState();
getState().getWorkers().put(ss.node.getNodeId().getPeerId(), wstate);
log.info("update: {}", getState());
ss.applyState(getState(), new SyncDataClosure() {
@Override
public void run(Status status) {
log.info("{} add workers", ss.node.getNodeId().getPeerId());
}
} );
}
}
});
ResponseSM resp = (ResponseSM)ss.getRpcClient().invokeSync(ss.getNode().getLeaderId().getEndpoint(), request, 5000);
log.info("{}", resp);
}
// 获取状态服务的对象
@ -90,6 +83,10 @@ public class StateServerFactory {
@Getter
@Setter
public static class StateServer {
RpcClient rpcClient;
private Node node;
private RaftGroupService cluster;
@ -127,31 +124,29 @@ public class StateServerFactory {
nodeOptions.setFsm(fsm);
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
cluster.getRpcServer().registerProcessor(new SyncDataProcessor());
cluster.getRpcServer().registerProcessor(new SyncStateProcessor());
node = cluster.start();
if(node.isLeader()) {
}
rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
}
public boolean isLeader() {
return this.fsm.isLeader();
}
public State getState() {
public State getFSMState() {
return this.fsm.getState();
}
public void applyState(State state, SyncDataClosure closure) {
public void applyState(State state, SyncClosure<State> closure) {
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);
return;
}
try {
closure.setState(state);
closure.setValue(state);
final Task task = new Task();
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(state)));
task.setDone(closure);
@ -164,31 +159,52 @@ public class StateServerFactory {
}
}
public void readIndexState(final boolean readOnlySafe, final SyncDataClosure closure) {
if(!readOnlySafe){
closure.setState(getState());
closure.success(getState());
closure.run(Status.OK());
public void applyWorkerState(WorkerState state, SyncClosure<WorkerState> closure) {
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);
return;
}
var wmap = getFSMState().getWorkers();
var wstate = wmap.get(ss.node.getNodeId().getPeerId());
if(wstate == null) {
wstate = new WorkerState();
wmap.put(ss.node.getNodeId().getPeerId(), wstate);
log.error("update: {}", wmap.size());
ss.applyState(getFSMState(), new SyncClosure<State>() {
@Override
public void run(Status status) {
log.error("{} add workers", ss.node.getNodeId().getPeerId());
}
} );
}
}
public void readIndexState(final boolean readOnlySafe, final SyncClosure<State> closure) {
closure.setValue(getFSMState());
if(!readOnlySafe){
closure.success(getFSMState());
closure.run(Status.OK());
return;
}
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if(status.isOk()){
log.info("readIndex {}", getState());
closure.setState(getState());
closure.success(getState());
log.error("readIndex {}", getFSMState());
closure.success(getFSMState());
closure.run(Status.OK());
return;
}
readIndexExecutor.execute(() -> {
if(isLeader()){
log.info("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
applyState(getState(), closure);
log.error("Fail to get value with 'ReadIndex': {}, try to applying to the state machine.", status);
applyState(getFSMState(), closure);
}else {
handlerNotLeaderError(closure);
}
@ -197,8 +213,8 @@ public class StateServerFactory {
});
}
public SMResponse redirect() {
final SMResponse response = new SMResponse();
public ResponseSM redirect() {
final ResponseSM response = new ResponseSM();
response.setSuccess(false);
if (this.node != null) {
final PeerId leader = this.node.getLeaderId();
@ -209,7 +225,7 @@ public class StateServerFactory {
return response;
}
public void handlerNotLeaderError(final SyncDataClosure closure) {
public void handlerNotLeaderError(final SyncClosure closure) {
closure.failure("Not leader.", redirect().getRedirect());
closure.run(new Status(RaftError.EPERM, "Not leader"));
}
@ -220,13 +236,13 @@ public class StateServerFactory {
// return StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());
return ThreadPoolUtil.newBuilder() //
.poolName("CounterPool") //
.poolName("ReadIndexPool") //
.enableMetric(true) //
.coreThreads(4) //
.maximumThreads(4) //
.keepAliveSeconds(60L) //
.workQueue(new SynchronousQueue<>()) //
.threadFactory(new NamedThreadFactory("CounterService", true)) //
.threadFactory(new NamedThreadFactory("ReadIndexService", true)) //
.rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) //
.build();
}
@ -235,7 +251,7 @@ public class StateServerFactory {
public static void main(String[] args) throws InterruptedException, RemotingException {
var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new SyncDataRequest(), 5000);
var resp = rpcClient.invokeSync(new Endpoint("localhost",4441), new RequestState(), 5000);
log.info("{}", resp);
}
}

View File

@ -3,7 +3,7 @@ package com.yuandian.dataflow.statemachine;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.ResponseSM;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
@ -18,21 +18,20 @@ import org.slf4j.LoggerFactory;
@Getter
@Setter
@ToString
public abstract class SyncDataClosure implements Closure {
public abstract class SyncClosure<T> implements Closure {
// 状态机的统一响应
private SMResponse response;
private ResponseSM response;
// 代表任务状态
private State state;
private T value;
public Object locksync = new Object();
public SyncDataClosure() {
public SyncClosure() {
}
public void failure(final String errorMsg, final PeerId redirect) {
final SMResponse response = new SMResponse();
final ResponseSM response = new ResponseSM();
response.setSuccess(false);
response.setMsg(errorMsg);
response.setRedirect(redirect);
@ -40,7 +39,7 @@ public abstract class SyncDataClosure implements Closure {
}
public void success(final State value) {
final SMResponse response = new SMResponse();
final ResponseSM response = new ResponseSM();
response.setState(value);
response.setSuccess(true);
setResponse(response);

View File

@ -0,0 +1,21 @@
package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
@Setter
@ToString
public class RequestCondition implements Serializable {
private static final long serialVersionUID = 1L;
private WorkerState workerState;
}

View File

@ -9,6 +9,7 @@ package com.yuandian.dataflow.statemachine.rpc;
import java.io.Serializable;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.Getter;
@ -26,9 +27,9 @@ import lombok.extern.slf4j.Slf4j;
@Getter
@Setter
@ToString
public class SyncDataRequest implements Serializable {
public class RequestState implements Serializable {
private static final long serialVersionUID = 1L;
private State state;
}

View File

@ -26,7 +26,7 @@ import lombok.extern.slf4j.Slf4j;
@Getter
@Setter
@ToString
public class SMResponse implements Serializable {
public class ResponseSM implements Serializable {
private static final long serialVersionUID = 1L;

View File

@ -0,0 +1,59 @@
/**
* description
*
* @author eson
*2022年7月12日-11:10:54
*/
package com.yuandian.dataflow.statemachine.rpc;
import java.nio.ByteBuffer;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer;
import com.alipay.sofa.jraft.entity.PeerId;
import org.apache.commons.lang.StringUtils;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月12日-11:10:54
*/
@Slf4j
public class SyncConditionProcessor implements RpcProcessor<RequestCondition> {
@Override
public void handleRequest(RpcContext rpcCtx, RequestCondition request) {
log.info("request: {}", request);
final SyncClosure closure = new SyncClosure() {
@Override
public void run(Status status) {
rpcCtx.sendResponse(getResponse());
log.info("{}", status);
}
};
StateServerFactory.getStateServer().applyWorkerState(request.getWorkerState(), closure);
}
@Override
public String interest() {
return RequestState.class.getName();
}
}

View File

@ -16,8 +16,9 @@ import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.StateServerFactory;
import com.yuandian.dataflow.statemachine.SyncDataClosure;
import com.yuandian.dataflow.statemachine.SyncClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer;
import com.yuandian.dataflow.statemachine.state.State;
import com.alipay.sofa.jraft.entity.PeerId;
import org.apache.commons.lang.StringUtils;
@ -32,14 +33,14 @@ import lombok.extern.slf4j.Slf4j;
*2022年7月12日-11:10:54
*/
@Slf4j
public class SyncDataProcessor implements RpcProcessor<SyncDataRequest> {
public class SyncStateProcessor implements RpcProcessor<RequestState> {
@Override
public void handleRequest(RpcContext rpcCtx, SyncDataRequest request) {
public void handleRequest(RpcContext rpcCtx, RequestState request) {
log.info("request: {}", request);
final SyncDataClosure closure = new SyncDataClosure() {
final SyncClosure closure = new SyncClosure() {
@Override
public void run(Status status) {
rpcCtx.sendResponse(getResponse());
@ -52,7 +53,7 @@ public class SyncDataProcessor implements RpcProcessor<SyncDataRequest> {
@Override
public String interest() {
return SyncDataRequest.class.getName();
return RequestState.class.getName();
}

View File

@ -11,7 +11,7 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.Endpoint;
import com.yuandian.dataflow.statemachine.rpc.SMResponse;
import com.yuandian.dataflow.statemachine.rpc.SyncDataRequest;
import com.yuandian.dataflow.statemachine.rpc.RequestState;
import com.yuandian.dataflow.statemachine.state.State;
import lombok.var;
@ -26,7 +26,7 @@ public class StateMachineTest {
var fstate = new State();
var fdata = new SyncDataRequest();
var fdata = new RequestState();
fdata.setState(fstate);
var leader = new Endpoint("localhost",4441);
@ -45,7 +45,7 @@ public class StateMachineTest {
while(true) {
var state = new State();
var request = new SyncDataRequest(); // 创建请求
var request = new RequestState(); // 创建请求
request.setState(state); // 添加请求的参数
var wstate = state.getWorkers();

View File

@ -4,6 +4,10 @@ screen -S raft-2 -X quit
sleep 2
screen -dmS raft-0 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 0
screen -dmS raft-1 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 1
screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2
screen -dmS raft-0 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 0
screen -dmS raft-1 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 1
screen -dmS raft-2 -L java -jar target/dataflow-1.0.0-SNAPSHOT.jar 2
screen -S raft-0 -X logfile flush 1
screen -S raft-1 -X logfile flush 1
screen -S raft-2 -X logfile flush 1