删除旧的状态机代码

This commit is contained in:
huangsimin 2022-08-10 15:55:31 +08:00
parent 73cbcc9a0e
commit c3b067cca5
22 changed files with 223 additions and 1134 deletions

View File

@ -9,12 +9,12 @@ import org.slf4j.MarkerFactory;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.conf.Configuration;
import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.utils.Utils;
import io.netty.util.internal.StringUtil;
import javassist.ClassClassPath;
import lombok.extern.slf4j.Slf4j;
@ -48,8 +48,8 @@ public class Server {
// var sprPort = sprPeers[2];
log.info("{} {}", peeridstr, sprPort);
conf = JRaftUtils.getConfiguration(String.join(",", peers));
StateFactory.startStateServer(peeridstr, conf);
// conf = JRaftUtils.getConfiguration(String.join(",", peers));
// StateFactory.startStateServer(peeridstr, conf);

View File

@ -21,14 +21,11 @@ import com.yuandian.dataflow.statemachine.master.MasterContext;
import com.yuandian.dataflow.statemachine.master.MasterExecute;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.utils.PacketsManager;
import com.yuandian.dataflow.utils.Utils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@MasterRegister

View File

@ -22,16 +22,12 @@ import com.yuandian.dataflow.proto.Processor.ProcessorResponse;
import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerImplBase;
import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
import io.grpc.stub.StreamObserver;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* description
@ -54,8 +50,7 @@ public class PacketsProcessor extends ProcessorServerImplBase {
super.allPackets(request, responseObserver);
log.info("packets {}", request.getPacketsList().size());
}

View File

@ -1,41 +0,0 @@
package com.yuandian.dataflow.controller;
import java.io.Serializable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
import com.yuandian.dataflow.statemachine_old.StateFactory;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* 例子 强制转换leader
*/
@Slf4j
public class TransferLeaderProcessor implements RpcProcessor<TransferLeaderProcessor.LeaderRequest> {
@Setter
@Getter
public static class LeaderRequest implements Serializable {
PeerId peer;
}
@Override
public void handleRequest(RpcContext rpcCtx, LeaderRequest request) {
Status status = StateFactory.getCluster().getRaftNode().transferLeadershipTo(request.peer);
rpcCtx.sendResponse(status);
log.debug("[TransferLeader] {} change leader to {}", status, request.peer);
}
@Override
public String interest() {
return LeaderRequest.class.getName();
}
}

View File

@ -38,7 +38,12 @@ import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.JavaUtils;
import com.yuandian.dataflow.statemachine.Operate.OperateType;
import com.yuandian.dataflow.statemachine.messages.MessageUtils;
import com.yuandian.dataflow.statemachine.messages.Operate;
import com.yuandian.dataflow.statemachine.messages.Query;
import com.yuandian.dataflow.statemachine.messages.RaftReply;
import com.yuandian.dataflow.statemachine.messages.Operate.OperateType;
import com.yuandian.dataflow.statemachine.messages.RaftReply.Status;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.statemachine_old.MasterFactory;
@ -74,9 +79,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
@Slf4j
public class StateMachine extends BaseStateMachine {
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
private State state = new State();
@ -95,10 +98,9 @@ public class StateMachine extends BaseStateMachine {
}
public Boolean isLeader() {
return leader.get();
return leader.get();
}
public Executor asyncExecutor = Executors.newFixedThreadPool(8);
/**
@ -114,7 +116,7 @@ public class StateMachine extends BaseStateMachine {
*/
@Override
public void initialize(RaftServer server, RaftGroupId groupId,
RaftStorage raftStorage) throws IOException {
RaftStorage raftStorage) throws IOException {
super.initialize(server, groupId, raftStorage);
this.storage.init(raftStorage);
load(storage.getLatestSnapshot());
@ -139,15 +141,14 @@ public class StateMachine extends BaseStateMachine {
*/
@Override
public long takeSnapshot() {
//get the last applied index
// get the last applied index
final TermIndex last = getLastAppliedTermIndex();
//create a file with a proper name to store the snapshot
final File snapshotFile =
storage.getSnapshotFile(last.getTerm(), last.getIndex());
// create a file with a proper name to store the snapshot
final File snapshotFile = storage.getSnapshotFile(last.getTerm(), last.getIndex());
//serialize the counter object and write it into the snapshot file
try {
// serialize the counter object and write it into the snapshot file
try {
ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(snapshotFile)));
out.writeObject(counter);
out.close();
@ -156,7 +157,7 @@ public class StateMachine extends BaseStateMachine {
+ "\", last applied index=" + last);
}
//return the index of the stored snapshot (which is the last applied one)
// return the index of the stored snapshot (which is the last applied one)
return last.getIndex();
}
@ -168,13 +169,13 @@ public class StateMachine extends BaseStateMachine {
* @throws IOException if any error happens during read from storage
*/
private long load(SingleFileSnapshotInfo snapshot) throws IOException {
//check the snapshot nullity
// check the snapshot nullity
if (snapshot == null) {
LOG.warn("The snapshot info is null.");
return RaftLog.INVALID_LOG_INDEX;
}
//check the existance of the snapshot file
// check the existance of the snapshot file
final File snapshotFile = snapshot.getFile().getPath().toFile();
if (!snapshotFile.exists()) {
LOG.warn("The snapshot file {} does not exist for snapshot {}",
@ -182,18 +183,17 @@ public class StateMachine extends BaseStateMachine {
return RaftLog.INVALID_LOG_INDEX;
}
//load the TermIndex object for the snapshot using the file name pattern of
// load the TermIndex object for the snapshot using the file name pattern of
// the snapshot
final TermIndex last =
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
//read the file and cast it to the AtomicInteger and set the counter
// read the file and cast it to the AtomicInteger and set the counter
try (ObjectInputStream in = new ObjectInputStream(
new BufferedInputStream(new FileInputStream(snapshotFile)))) {
//set the last applied termIndex to the termIndex of the snapshot
// set the last applied termIndex to the termIndex of the snapshot
setLastAppliedTermIndex(last);
//read, cast and set the counter
// read, cast and set the counter
counter = JavaUtils.cast(in.readObject());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
@ -202,51 +202,37 @@ public class StateMachine extends BaseStateMachine {
return last.getIndex();
}
@Override
public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {
log.info("msg {}", groupMemberId.getPeerId());
StateServerFactory.getCurrentPeerId().setRaftPeerId(groupMemberId.getPeerId().toString());
leader.set(newLeaderId == groupMemberId.getPeerId());
log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId , groupMemberId.getPeerId(), isLeader());
// super.notifyLeaderChanged(groupMemberId, newLeaderId);
asyncExecutor.execute(()->{
log.info("msg {}", groupMemberId.getPeerId());
if (StateServerFactory.getCurrentPeerId().getRaftPeerId() == null) {
StateServerFactory.getCurrentPeerId().setRaftPeerId(groupMemberId.getPeerId().toString());
}
leader.set(newLeaderId == groupMemberId.getPeerId());
log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId, groupMemberId.getPeerId(), isLeader());
asyncExecutor.execute(() -> {
log.info("asyncExecutor");
var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getCurrentPeerId() ));
var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getCurrentPeerId()));
try {
var reply = StateServerFactory.send(op);
log.info("{}", reply);
} catch (IOException e) {
e.printStackTrace();
log.info("{}", MessageUtils.<RaftReply>fromMessage(reply.getMessage()));
} catch (IOException | ClassNotFoundException e) {
log.error("{}",e.toString());
}
});
if (MasterFactory.getMasterExecute().isAlive())
if (MasterFactory.getMasterExecute().isAlive())
MasterFactory.getMasterExecute().interrupt();
if(isLeader())
if (isLeader())
MasterFactory.getMasterExecute().start();
}
/**
* Handle GET command, which used by clients to get the counter value.
*
@ -255,39 +241,33 @@ public class StateMachine extends BaseStateMachine {
*/
@Override
public CompletableFuture<Message> query(Message request) {
var data = request.getContent();
var inBytes = new ByteArrayInputStream( data.toByteArray());
try (var inObject = new ObjectInputStream(inBytes)) {
// log.info("applyTransaction {}", inObject.toString());
log.info("{}", request);
var op = (Query)inObject.readObject();
switch(op.getType()){
case GET_STATE:
try(var rlock = readLock()) {
var ws = state.getWorkers().get( ((WorkerState)op.getValue()).getPeerId() );
if(ws == null) {
return CompletableFuture.completedFuture(Message.valueOf("Peerid is not exist"));
}
return CompletableFuture.completedFuture(Message.valueOf("Peerid is not exist"));
}
var reply = new RaftReply();
try {
var query = MessageUtils.<Query>fromMessage(request);
switch (query.getType()) {
case GET_STATE:
try (var rlock = readLock()) {
var ws = state.getWorkers().get(((WorkerState) query.getValue()).getPeerId());
if (ws == null) {
return CompletableFuture.completedFuture(Status.setError(reply, "Peerid is not exist"));
}
reply.setData(ws);
}
break;
default:
return CompletableFuture.completedFuture(
Message.valueOf("Invalid Command"));
Status.setError(reply, "Invalid Command");
return CompletableFuture.completedFuture(reply);
}
} catch (ClassNotFoundException | IOException e) {
e.printStackTrace();
log.error("{}", e.toString());
}
return CompletableFuture.completedFuture(
Message.valueOf("ok"));
return CompletableFuture.completedFuture(Status.setOK(reply));
}
/**
@ -298,68 +278,45 @@ public class StateMachine extends BaseStateMachine {
*/
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
// log.info("applyTransaction");
final RaftProtos.LogEntryProto entry = trx.getLogEntry();
//check if the command is valid
// String logData = entry.getStateMachineLogEntry().getLogData()
// .toString(Charset.defaultCharset());
// log.info("applyTransaction");
var reply = new RaftReply();
Operate op = null;
final RaftProtos.LogEntryProto entry = trx.getLogEntry();
final long index = entry.getIndex();
try {
var data = entry.getStateMachineLogEntry().getLogData();
var inBytes = new ByteArrayInputStream( data.toByteArray());
var inObject = new ObjectInputStream(inBytes);
op = (Operate)inObject.readObject();
// log.info("applyTransaction {}", data);
inObject.close();
inBytes.close();
final var op = MessageUtils.<Operate>fromByteString(data);
try (var wlock = writeLock()) {
switch (op.getType()) {
case ALLOCATE_PACKETS:
break;
case PUT_WORKERSTATE:
var ws = (WorkerState) op.getValue();
if(ws.getPeerId().getRaftPeerId() == null) {
return CompletableFuture.completedFuture(Status.setError(reply, "RaftPeerId 为 null"));
}
state.getWorkers().put(ws.getPeerId(), ws);
break;
case REMOVE:
break;
default:
break;
}
// if (isLeader()) {
// log.info("{}: getType {} ", index, op.getType());
// }
log.info("{}: getType {} ", index, op.getType());
updateLastAppliedTermIndex(entry.getTerm(), index);
}
} catch (IOException | ClassNotFoundException e) {
log.info("{}", e.toString());
return CompletableFuture.completedFuture(Message.valueOf("错误op"));
}
// if (!logData.equals("INCREMENT")) {
// return CompletableFuture.completedFuture(
// Message.valueOf("Invalid Command"));
// }
//update the last applied term and index
final long index = entry.getIndex();
try(var wlock = writeLock()) {
switch(op.getType()) {
case ALLOCATE_PACKETS:
break;
case PUT_WORKERSTATE:
var ws = (WorkerState)op.getValue();
// log.info("applyTransaction {}", OperateType.PUT_WORKERSTATE);
state.getWorkers().put(ws.getPeerId() , ws);
break;
case REMOVE:
break;
default:
break;
}
updateLastAppliedTermIndex(entry.getTerm(), index);
return CompletableFuture.completedFuture(Status.setError(reply, "错误op"));
}
//return the new value of the counter to the client
final CompletableFuture<Message> f =
CompletableFuture.completedFuture(Message.valueOf("put ok"));
//if leader, log the incremented value and it's log index
if (isLeader()) {
log.info("{}: getType {}, state {}", index, op.getType(), state);
}
// log.info("applyTransaction {}", 6);
return f;
return CompletableFuture.completedFuture(Status.setOK(reply));
}
}

View File

@ -33,6 +33,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.NetUtils;
import com.yuandian.dataflow.statemachine.grpc.ProcessorServer;
import com.yuandian.dataflow.statemachine.state.Peer;
import com.yuandian.dataflow.statemachine_old.MasterFactory;
import io.netty.util.internal.StringUtil;
@ -111,6 +112,7 @@ public final class StateServer implements Closeable {
this.processorServer = new ProcessorServer();
this.processorServer.getGrpcServer().start();
this.peer.setProcessorPort(this.processorServer.getGrpcServer().getPort());
}

View File

@ -9,6 +9,8 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import com.yuandian.dataflow.statemachine.state.Peer;
public class StateServerFactory {
public static StateServer stateServer;

View File

@ -39,10 +39,10 @@ import org.springframework.cglib.proxy.CallbackFilter;
import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
import com.yuandian.dataflow.statemachine.StateServer;
import com.yuandian.dataflow.statemachine.Operate.OperateType;
import com.yuandian.dataflow.statemachine.messages.Operate;
import com.yuandian.dataflow.statemachine.messages.Query;
import com.yuandian.dataflow.statemachine.messages.Operate.OperateType;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.statemachine.Operate;
import com.yuandian.dataflow.statemachine.Query;
import lombok.extern.slf4j.Slf4j;

View File

@ -4,7 +4,7 @@
* @author eson
*2022年7月20日-10:00:05
*/
package com.yuandian.dataflow.statemachine_old;
package com.yuandian.dataflow.statemachine.master;
import java.time.Duration;
import java.time.Instant;
@ -18,19 +18,15 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.Any;
// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.master.MasterContext;
import com.yuandian.dataflow.statemachine.master.MasterExecute;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* Master主线程, 用于接收packets

View File

@ -0,0 +1,31 @@
package com.yuandian.dataflow.statemachine.messages;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@java.lang.SuppressWarnings("unchecked")
public class MessageUtils {
public static <T> T fromMessage(Message msg) throws IOException, ClassNotFoundException {
var inBytes = new ByteArrayInputStream(msg.getContent().toByteArray());
var inObject = new ObjectInputStream(inBytes);
T result = (T) inObject.readObject();
inObject.close();
inBytes.close();
return result;
}
public static <T> T fromByteString(ByteString msg) throws IOException, ClassNotFoundException {
var inBytes = new ByteArrayInputStream(msg.toByteArray());
var inObject = new ObjectInputStream(inBytes);
T result = (T) inObject.readObject();
inObject.close();
inBytes.close();
return result;
}
}

View File

@ -1,4 +1,4 @@
package com.yuandian.dataflow.statemachine;
package com.yuandian.dataflow.statemachine.messages;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -63,27 +63,23 @@ public class Operate implements Message,Serializable {
this.value = value;
}
public Message toMessage() {
try {
var output = ByteString.newOutput();
var outputStream = new ObjectOutputStream(output);
outputStream.writeObject( this);
outputStream.close();
output.close();
// var inBytes = new ByteArrayInputStream( output.toByteString().toByteArray() );
// var inObject = new ObjectInputStream(inBytes);
// var a = (Operate)inObject.readObject();
// log.info("applyTransaction {}", a);
return Message.valueOf(output.toByteString());
} catch (IOException e) {
e.printStackTrace();
log.error("{}",e.toString());
}
return null;
}
@ -91,33 +87,16 @@ public class Operate implements Message,Serializable {
@Override
public ByteString getContent() {
try {
var output = ByteString.newOutput();
var outputStream = new ObjectOutputStream(output);
outputStream.writeObject(this);
outputStream.close();
output.close();
// var inBytes = new ByteArrayInputStream( output.toByteString().toByteArray() );
// var inObject = new ObjectInputStream(inBytes);
// var a = (Operate)inObject.readObject();
// log.info("applyTransaction {}", a);
return output.toByteString();
} catch (IOException e) {
e.printStackTrace();
log.error("{}",e.toString());
}
return null;
}
}

View File

@ -1,4 +1,4 @@
package com.yuandian.dataflow.statemachine;
package com.yuandian.dataflow.statemachine.messages;
import java.io.IOException;
import java.io.ObjectOutputStream;
@ -44,7 +44,7 @@ public class Query implements Message,Serializable {
output.close();
return output.toByteString();
} catch (IOException e) {
e.printStackTrace();
log.error("{}",e.toString());
}
return null;
}

View File

@ -0,0 +1,78 @@
package com.yuandian.dataflow.statemachine.messages;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import javax.servlet.http.PushBuilder;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
@Setter
@ToString
public class RaftReply implements Message, Serializable {
private Status code; // 状态码
private String message; // 返回字符串信息说明
private Object data; // object 必须 继承 Serializable
public RaftReply() {
}
@ToString
public enum Status {
OK(0), ERROR(1);
private final int code;
private Status(int code) {
this.code = code;
}
public static RaftReply setOK( RaftReply reply ) {
reply.setCode(OK);
reply.setData(OK.toString());
return reply;
}
public static RaftReply setError( RaftReply reply ) {
reply.setCode(ERROR);
reply.setData(ERROR.toString());
return reply;
}
public static RaftReply setError( RaftReply reply, String error) {
reply.setCode(ERROR);
reply.setData(ERROR.toString());
return reply;
}
}
@Override
public ByteString getContent() {
try {
var output = ByteString.newOutput();
var outputStream = new ObjectOutputStream(output);
outputStream.writeObject(this);
outputStream.close();
output.close();
return output.toByteString();
} catch (IOException e) {
log.error("{}",e.toString());
}
return null;
}
}

View File

@ -1,4 +1,4 @@
package com.yuandian.dataflow.statemachine;
package com.yuandian.dataflow.statemachine.state;
import java.io.Serializable;

View File

@ -21,8 +21,6 @@ import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.HashMap;
import com.yuandian.dataflow.statemachine.Peer;
/**
* 代表任务状态 暂时全局使用这个结构. 添加新增状态
*

View File

@ -9,8 +9,6 @@ package com.yuandian.dataflow.statemachine.state;
import java.io.Serializable;
import java.time.Instant;
import com.yuandian.dataflow.statemachine.Peer;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

View File

@ -1,329 +0,0 @@
/**
* description
*
* @author eson
*2022年7月12日-13:36:24
*/
package com.yuandian.dataflow.statemachine_old;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.reflections.ReflectionUtils;
import org.reflections.Reflections;
import com.alipay.remoting.NamedThreadFactory;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.ReadIndexStatus;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.InvokeContext;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
import com.yuandian.dataflow.statemachine.master.MasterExecute;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.utils.Utils;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
* 2022年7月12日-13:36:24
*/
@Slf4j
public class StateFactory {
private static StateServer ss;
private static ReentrantLock lockReadIndex = new ReentrantLock();
public static void startStateServer(String peerstr, Configuration conf) throws Exception {
if (ss != null) {
throw new Exception("重复初始化 InitStateServer");
}
ss = new StateFactory.StateServer(peerstr, conf);
}
public static boolean isLeader() {
return ss.node.isLeader();
}
public static PeerId getLeaderId() {
return ss.node.getLeaderId();
}
public static PeerId getServerId() {
return ss.cluster.getServerId();
}
public static Node getNode() {
return ss.node;
}
public static Node getRaftNode() {
return ss.cluster.getRaftNode();
}
public static RpcClient getRpcClient() {
return ss.getRpcClient();
}
public static RaftGroupService getCluster() {
return ss.getCluster();
}
// 获取状态服务的对象
public static StateServer getStateServer() {
return ss;
}
public static void readIndexState(GenericClosure closure) {
ss.readIndexState(closure);
}
public static void applyOperate(OperateOld op, GenericClosure closure) {
ss.applyOperate(op, closure);
}
public static void rpcClientInvokeAsync(final Endpoint endpoint, final Object request,
final InvokeCallback callback, final long timeoutMs)
throws InterruptedException, RemotingException {
ss.getRpcClient().invokeAsync(endpoint, request, callback, timeoutMs);
}
public static Object rpcClientInvokeSync(final Endpoint endpoint, final Object request, final long timeoutMs)
throws InterruptedException, RemotingException {
return ss.getRpcClient().invokeSync(endpoint, request, timeoutMs);
}
public static Object rpcClientInvokeSync(final Endpoint endpoint, final Object request, final InvokeContext ctx,
final long timeoutMs) throws InterruptedException, RemotingException {
return ss.getRpcClient().invokeSync(endpoint, request, ctx, timeoutMs);
}
@Getter
@Setter
public static class StateServer {
RpcClient rpcClient;
private Node node;
private RaftGroupService cluster;
private StateMachine fsm;
private String groupId = "dataflow";
private Executor readIndexExecutor = createReadIndexExecutor();
public StateServer(String addr, Configuration conf) {
// String[] peers = new
// String[]{"localhost:4440","localhost:4441","localhost:4442"};
// String[] sprPeers = new String[]{"3440","3441","3442"};
// var peeridstr = peers[Integer.parseInt(serverId)];
// var sprPort = sprPeers[Integer.parseInt(args[0])];
// String groupId = "jraft";
// conf =
// JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
PeerId serverId = JRaftUtils.getPeerId(addr);
int port = serverId.getPort();
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setElectionTimeoutMs(1000);
nodeOptions.setSnapshotLogIndexMargin(3600);
nodeOptions.setInitialConf(conf);
File RaftDataFile = new File(String.format("./raftdata/%d", port));
log.info("mkdirs: {}", RaftDataFile.mkdirs());
nodeOptions.setLogUri(String.format("./raftdata/%d/logs", port));
nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port));
nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port));
fsm = new StateMachine(); // 状态实例初始化
nodeOptions.setFsm(fsm);
cluster = new RaftGroupService(groupId, serverId, nodeOptions);
// 扫描注解RaftProccessor 注册
var now = Instant.now();
HashMap<String, Class<?>> scansMap = new HashMap<>();
var traces = Thread.currentThread().getStackTrace();
var clsName = traces[traces.length - 1].getClassName();
var packName = clsName.substring(0, Utils.indexOfAsPossible(clsName, ".", 3));
log.info("获取 {} -> {} 下包的所有注解", clsName, packName);
var refl = new Reflections(packName);
Set<Class<?>> scans = refl.getTypesAnnotatedWith(GrpcProcessor.class);
scans.forEach((pRaftClass) -> {
scansMap.put(pRaftClass.getName(), pRaftClass);
});
log.info("扫描注解的时间(scan annotations cost time): {} ms", Duration.between(now, Instant.now()).toMillis());
scansMap.forEach((name, pRaftClass) -> {
try {
cluster.getRpcServer()
.registerProcessor((RpcProcessor<?>) pRaftClass.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e) {
log.info("{} {}", name, e.toString());
}
});
refl.getTypesAnnotatedWith(MasterRegister.class).forEach((pClass) -> {
try {
MasterExecute execute = (MasterExecute) pClass.getDeclaredConstructor().newInstance();
MasterFactory.registerMasterLoop(execute);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e) {
log.info("{}", e.toString());
}
});
;
// 启动集群
node = cluster.start();
rpcClient = new BoltRaftRpcFactory().createRpcClient(); // 创建rpc客户端.
rpcClient.init(new CliOptions()); // 初始化
}
public boolean isLeader() {
return this.fsm.isLeader();
}
public void readIndexState(GenericClosure closure) {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
// log.debug("readIndexState({}) {}", getServerId(), status);
if (status.isOk()) {
closure.success(ss.fsm.getState());
closure.setValue(ss.fsm.getState());
closure.run(status);
return ;
}
// 回调失败
// 提交同步
log.info("status not ok");
// readIndexExecutor.execute(()->{
// if(isLeader()) {
// Operate.CallOperate(new Operate(OperateType.GET_STATE, null), closure);
// } else {
// handlerNotLeaderError(closure);
// }
// });
}
});
}
public void applyOperate(OperateOld op, GenericClosure closure) {
// 所有的提交都必须再leader进行
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);
return;
}
try {
closure.setValue(op);
final Task task = new Task();
task.setData(
ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op)));
task.setDone(closure); // 确认所有数据 一致, 不需要加锁
StateFactory.getNode().apply(task);
} catch (CodecException e) {
String errorMsg = "Fail to encode Operate";
log.debug("{} {}", errorMsg, e);
closure.failure(errorMsg, PeerId.emptyPeer());
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
}
}
public RaftResponse redirect() {
final RaftResponse response = new RaftResponse ();
response.setSuccess(false);
if (this.node != null) {
final PeerId leader = this.node.getLeaderId();
if (leader != null) {
response.setRedirect(leader);
}
}
return response;
}
public void handlerNotLeaderError(final GenericClosure closure) {
closure.failure("Not leader.", redirect().getRedirect());
closure.run(new Status(RaftError.EPERM, "Not leader"));
}
private Executor createReadIndexExecutor() {
return ThreadPoolUtil.newBuilder() //
.poolName("ReadIndexPool") //
.enableMetric(true) //
.coreThreads(4) //
.maximumThreads(4) //
.keepAliveSeconds(60L) //
.workQueue(new SynchronousQueue<>()) //
.threadFactory(new NamedThreadFactory("ReadIndexService", true)) //
.rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) //
.build();
}
}
public static void main(String[] args) throws InterruptedException, RemotingException {
ReflectionUtils.get(ReflectionUtils.SuperClass.of(State.class));
var rpcClient = new BoltRaftRpcFactory().createRpcClient();
rpcClient.init(new CliOptions());
var resp = rpcClient.invokeSync(new Endpoint("localhost", 4441), new OperateRequest(), 5000);
log.info("{}", resp);
}
}

View File

@ -1,282 +0,0 @@
package com.yuandian.dataflow.statemachine_old;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.utils.Utils;
import lombok.extern.slf4j.Slf4j;
/**
* Counter state machine.
*
* @author boyan (boyan@alibaba-inc.com)
*
* 2018-Apr-09 4:52:31 PM
*/
@Slf4j
public class StateMachine extends StateMachineAdapter {
// private static final Logger LOG =
// LoggerFactory.getLogger(StateMachine.class);
/**
* State value 全局使用的唯一状态
*/
private State state = new State();
/**
* Leader term
*/
private final AtomicLong leaderTerm = new AtomicLong(-1);
public boolean isLeader() {
return this.leaderTerm.get() > 0;
}
/**
* Returns current value. 只有Get 操作状态由协议流程决定 Apply
*/
public State getState() {
return state;
}
@Override
@SuppressWarnings("unchecked")
public void onApply(final Iterator iter) {
while (iter.hasNext()) {
OperateOld op = null;
GenericClosure closure = null;
if (iter.done() != null) {
// leader可以直接从 回调closure里提取operate
closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
op = (OperateOld)closure.getValue();
} else {
// 非leader 需要从getData反序列化出来后处理
final ByteBuffer data = iter.getData();
try {
op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
data.array(),
OperateOld.class.getName());
} catch (CodecException e) {
log.info("{}", e.toString());
}
}
if (op == null) {
log.error("op 为 {}. 存在错误, 可能版本不一致", op);
continue;
}
switch (op.getType()) {
case PUT_WORKERSTATE:
WorkerState opws = op.getValue();
log.debug("PUT {}", opws.peerId);
// state.getWorkers().put(opws.peerId, opws);
if (closure != null) {
closure.success(op);
closure.run(Status.OK());
}
break;
case ALLOCATE_PACKETS:
List<PeerId> alivePeers = op.getValue();
PeerId[] peers = new PeerId[alivePeers.size()];
alivePeers.toArray(peers);
// 统计 每个节点还有多少任务容量
var isNext = false;
var canTasks = new int[alivePeers.size()];
for(int i = 0; i < peers.length; i++) {
var peer = peers[i];
WorkerState ws = state.getWorkers().get(peer);
if (ws == null) {
log.error("WorkerState获取的状态为 {}", ws);
continue;
}
var can = OperateOld.MAX_TASKS - ws.getTaskQueueSize();
canTasks[i] = can;
if(!isNext) {
isNext = true;
}
}
if(!isNext) {
break;
}
// log.info("size: {}", Operate.packetsManager.size());
// 统计每个节点发送多少任务
var allocTasks = Utils.allocationTasks(OperateOld.packetsManager.size(), canTasks);
if(closure != null) {
closure.setValue(allocTasks);
}
for(int i = 0; i < peers.length; i++) {
var peer = peers[i];
if(allocTasks[i] <= 0) {
continue;
}
WorkerState ws = state.getWorkers().get(peer);
ws.setUpdateAt(Instant.now());
ws.setTaskQueueSize(ws.getTaskQueueSize() + allocTasks[i]);
// log.info("剩余能处理的任务数量[{}] :{}", peer, Operate.MAX_TASKS - ws.getTaskQueueSize());
}
break;
// case GET_STATE:
// closure.setValue(this.state);
// log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex());
// break;
case REMOVE:
break;
default:
break;
}
if (closure != null) {
closure.success(op);
closure.run(Status.OK());
}
iter.next();
}
}
@Override
public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
return;
}
@Override
public void onError(final RaftException e) {
log.debug("Raft error: {}", e, e);
}
@Override
public boolean onSnapshotLoad(final SnapshotReader reader) {
return true;
}
@Override
public void onLeaderStart(final long term) {
log.debug("onLeaderStart[{}]", StateFactory.getServerId());
this.leaderTerm.set(term);
// 判断是否Master线程还在跑, 如果存在则中断
if (MasterFactory.getMasterExecute().isAlive()) {
MasterFactory.getMasterExecute().interrupt();
}
var ws = this.state.getWorkers().get(StateFactory.getServerId());
if (ws == null) {
// ws = new WorkerState(StateFactory.getServerId());
}
// 更新当前WorkerState
StateFactory.applyOperate(new OperateOld(OperateType.PUT_WORKERSTATE, ws), new GenericClosure () {
@Override
public void run(Status status) {
log.debug("master update workerstate: {}", status);
}
});
// 当成为master时候 必须启动
MasterFactory.getMasterExecute().start();
super.onLeaderStart(term);
}
@Override
public void onLeaderStop(final Status status) {
log.debug("onLeaderStop[{}]", StateFactory.getServerId());
this.leaderTerm.set(-1);
// 判断是否Master线程还在跑, 如果存在则中断
if (MasterFactory.getMasterExecute().isAlive()) {
MasterFactory.getMasterExecute().interrupt();
}
super.onLeaderStop(status);
}
@Override
public void onShutdown() {
log.info("onShutdown[{}]",StateFactory.getServerId());
super.onShutdown();
}
@Override
public void onStartFollowing(LeaderChangeContext ctx) {
log.debug("onStartFollowing[{}]] {}", StateFactory.getServerId(),ctx);
try {
// 判断是否Master线程还在跑, 如果存在则中断
if (MasterFactory.getMasterExecute().isAlive()) {
MasterFactory.getMasterExecute().interrupt();
}
// 在startFollowing不能使用 readIndexState
// 更新当前WorkerState
// OperateOld.CallOperate(new OperateOld(OperateType.PUT_WORKERSTATE, new WorkerState(StateFactory.getServerId())), new GenericClosure() {
// @Override
// public void run(Status status) {
// log.debug("onStartFollowing update workerstate: {}", status);
// }
// });
return;
} catch (Exception e) {
log.info("{}", e.toString());
}
super.onStartFollowing(ctx);
}
@Override
public void onConfigurationCommitted(Configuration conf) {
super.onConfigurationCommitted(conf);
}
@Override
public void onStopFollowing(LeaderChangeContext ctx) {
log.debug("onStopFollowing[{}] {}", StateFactory.getServerId(), ctx);
// var ws = new WorkerState(StateFactory.getServerId());
// var op = new OperateOld(OperateType.PUT_WORKERSTATE, ws);
// OperateOld.CallOperate(op, new GenericClosure() {
// @Override
// public void run(Status status) {
// log.info("{} {}", status, this.getResponse());
// }
// });
super.onStopFollowing(ctx);
}
}

View File

@ -1,59 +0,0 @@
package com.yuandian.dataflow.statemachine_old.closure;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.entity.PeerId;
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
@Setter
@ToString
public abstract class GenericClosure implements Closure {
// 状态机的统一响应
private RaftResponse response;
// 代表任务状态
private Object value;
public <T> T getValue() {
if(this.value == null) {
return null;
}
return (T)this.value;
}
public GenericClosure() {
}
/**
* 错误的时候返回错误信息. 自动装配response
* @param errorMsg
* @param redirect
*/
public void failure(final String errorMsg, final PeerId redirect) {
final RaftResponse response = new RaftResponse();
response.setSuccess(false);
response.setMsg(errorMsg);
log.error("{}", errorMsg);
response.setRedirect(redirect);
setResponse(response);
}
/**
* 成功时调用该方法. 自动装配response
* @param value
*/
public void success(final Object value) {
final RaftResponse response = new RaftResponse ();
response.setValue(value);
response.setSuccess(true);
setResponse(response);
}
}

View File

@ -1,108 +0,0 @@
package com.yuandian.dataflow.statemachine_old.operate;
import java.io.Serializable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor;
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
import com.yuandian.dataflow.utils.PacketsManager;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* 操作
*
* @author eson
*/
@Slf4j
@Data
public class OperateOld implements Serializable {
private static int DEFAULT_ASYNC_TIMEOUT = 5000;
public static final int MAX_TASKS = 1000;
public static PacketsManager packetsManager = new PacketsManager();
public static enum OperateType {
/**
* 同步WorkerState状态.
*/
PUT_WORKERSTATE,
/**
* 分配packets
*/
ALLOCATE_PACKETS,
/**
* 暂无想法
*/
REMOVE;
}
private OperateType type;
private Object value;
public OperateOld(OperateType t, Object v) {
this.type = t;
this.value = v;
}
@java.lang.SuppressWarnings("unchecked")
public <T> T getValue() {
return (T) this.value;
};
public <T> void setValue(T value) {
this.value = value;
return;
};
/**
* 调用操作设置
* @param op 传入的操作类
* @param closure 回调函数. Operate为返回值
*/
@java.lang.SuppressWarnings("unchecked")
public static void CallOperate(OperateOld op, GenericClosure closure) {
// log.debug("CallOperate Value {}", op.<WorkerState>getValue());
// 如果是leader 就直接提交
if (StateFactory.isLeader()) {
StateFactory.applyOperate(op, closure);
return;
}
// 非leader 转发请求 统一有leader处理
var request = new OperateProcessor.OperateRequest();
request.setOperate(op);
try {
StateFactory.rpcClientInvokeAsync(StateFactory.getLeaderId().getEndpoint(), request, new InvokeCallback() {
@Override
public void complete(Object result, Throwable err) {
// log.debug("Object result {}", result);
var resp = (RaftResponse) result;
closure.setResponse(resp);
closure.success(resp.getValue());
closure.run(Status.OK());
}
}, DEFAULT_ASYNC_TIMEOUT);
} catch (InterruptedException | RemotingException e) {
closure.failure(e.getMessage(), null);
closure.run(new Status(100000, "invokeAsync fail"));
log.info("{}", e.toString());
}
}
}

View File

@ -1,84 +0,0 @@
/**
* description
*
* @author eson
*2022年7月12日-11:10:54
*/
package com.yuandian.dataflow.statemachine_old.rpc;
import java.io.Serializable;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import javassist.ClassPath;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月12日-11:10:54
*/
@Slf4j
public class OperateProcessor implements RpcProcessor<OperateProcessor.OperateRequest> {
/**
* 同步状态时, 需要用的结构类. 新增的状态可以在 Operate结构里添加
*
* @author eson
*2022年7月11日-16:01:07
*/
@Getter
@Setter
@ToString
public static class OperateRequest implements Serializable {
private static final long serialVersionUID = 1L;
private OperateOld operate;
}
@Override
public void handleRequest(RpcContext rpcCtx, OperateRequest request) {
// log.info("request: {}", request);
final GenericClosure closure = new GenericClosure () {
@Override
public void run(Status status) {
if(status.isOk()) {
// log.info("{}", status);
rpcCtx.sendResponse(getResponse());
return;
}
if(status.getRaftError() == RaftError.EPERM) {
//TODO: Not leader 需要转发
log.info("{}", status);
}
}
};
StateFactory.getStateServer().applyOperate(request.getOperate(), closure);
}
@Override
public String interest() {
return OperateRequest.class.getName();
}
}

View File

@ -1,41 +0,0 @@
/**
* description
*
* @author eson
*2022年7月13日-09:07:22
*/
package com.yuandian.dataflow.statemachine_old.rpc;
import java.io.Serializable;
import com.alipay.sofa.jraft.entity.PeerId;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年7月13日-09:07:22
*/
@Slf4j
@Getter
@Setter
@ToString
public class RaftResponse implements Serializable {
private static final long serialVersionUID = 1L;
private Object value;
private boolean success;
/**
* redirect peer id
*/
private PeerId redirect;
private String msg;
}