diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 28c322f..80e6d91 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -9,12 +9,12 @@ import org.slf4j.MarkerFactory; import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.conf.Configuration; -import com.yuandian.dataflow.statemachine_old.StateFactory; import com.yuandian.dataflow.utils.Utils; import io.netty.util.internal.StringUtil; import javassist.ClassClassPath; import lombok.extern.slf4j.Slf4j; + @@ -48,8 +48,8 @@ public class Server { // var sprPort = sprPeers[2]; log.info("{} {}", peeridstr, sprPort); - conf = JRaftUtils.getConfiguration(String.join(",", peers)); - StateFactory.startStateServer(peeridstr, conf); + // conf = JRaftUtils.getConfiguration(String.join(",", peers)); + // StateFactory.startStateServer(peeridstr, conf); diff --git a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java index bc89d93..1292226 100644 --- a/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/MasterProcessor.java @@ -21,14 +21,11 @@ import com.yuandian.dataflow.statemachine.master.MasterContext; import com.yuandian.dataflow.statemachine.master.MasterExecute; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; -import com.yuandian.dataflow.statemachine_old.StateFactory; -import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; -import com.yuandian.dataflow.statemachine_old.operate.OperateOld; -import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; import com.yuandian.dataflow.utils.PacketsManager; import com.yuandian.dataflow.utils.Utils; import lombok.extern.slf4j.Slf4j; + @Slf4j @MasterRegister diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java index e21e623..de5dddf 100644 --- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java +++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java @@ -22,16 +22,12 @@ import com.yuandian.dataflow.proto.Processor.ProcessorResponse; import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerImplBase; import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor; import com.yuandian.dataflow.statemachine.state.State; -import com.yuandian.dataflow.statemachine_old.StateFactory; -import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; -import com.yuandian.dataflow.statemachine_old.operate.OperateOld; -import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; -import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; import io.grpc.stub.StreamObserver; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; + /** * description @@ -54,8 +50,7 @@ public class PacketsProcessor extends ProcessorServerImplBase { super.allPackets(request, responseObserver); log.info("packets {}", request.getPacketsList().size()); - - + } diff --git a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java deleted file mode 100644 index 379a6c9..0000000 --- a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.yuandian.dataflow.controller; - -import java.io.Serializable; - -import com.alipay.sofa.jraft.Status; -import com.alipay.sofa.jraft.entity.PeerId; -import com.alipay.sofa.jraft.rpc.RpcContext; -import com.alipay.sofa.jraft.rpc.RpcProcessor; -import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor; -import com.yuandian.dataflow.statemachine_old.StateFactory; - -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; - - -/** - * 例子 强制转换leader - */ -@Slf4j -public class TransferLeaderProcessor implements RpcProcessor { - - @Setter - @Getter - public static class LeaderRequest implements Serializable { - PeerId peer; - } - - @Override - public void handleRequest(RpcContext rpcCtx, LeaderRequest request) { - Status status = StateFactory.getCluster().getRaftNode().transferLeadershipTo(request.peer); - rpcCtx.sendResponse(status); - log.debug("[TransferLeader] {} change leader to {}", status, request.peer); - } - - @Override - public String interest() { - return LeaderRequest.class.getName(); - } - -} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java index 179a90c..fe1b535 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java @@ -38,7 +38,12 @@ import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.JavaUtils; -import com.yuandian.dataflow.statemachine.Operate.OperateType; +import com.yuandian.dataflow.statemachine.messages.MessageUtils; +import com.yuandian.dataflow.statemachine.messages.Operate; +import com.yuandian.dataflow.statemachine.messages.Query; +import com.yuandian.dataflow.statemachine.messages.RaftReply; +import com.yuandian.dataflow.statemachine.messages.Operate.OperateType; +import com.yuandian.dataflow.statemachine.messages.RaftReply.Status; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; import com.yuandian.dataflow.statemachine_old.MasterFactory; @@ -74,9 +79,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; */ @Slf4j public class StateMachine extends BaseStateMachine { - private final SimpleStateMachineStorage storage = - new SimpleStateMachineStorage(); - + private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private State state = new State(); @@ -95,10 +98,9 @@ public class StateMachine extends BaseStateMachine { } public Boolean isLeader() { - return leader.get(); + return leader.get(); } - public Executor asyncExecutor = Executors.newFixedThreadPool(8); /** @@ -114,7 +116,7 @@ public class StateMachine extends BaseStateMachine { */ @Override public void initialize(RaftServer server, RaftGroupId groupId, - RaftStorage raftStorage) throws IOException { + RaftStorage raftStorage) throws IOException { super.initialize(server, groupId, raftStorage); this.storage.init(raftStorage); load(storage.getLatestSnapshot()); @@ -139,15 +141,14 @@ public class StateMachine extends BaseStateMachine { */ @Override public long takeSnapshot() { - //get the last applied index + // get the last applied index final TermIndex last = getLastAppliedTermIndex(); - //create a file with a proper name to store the snapshot - final File snapshotFile = - storage.getSnapshotFile(last.getTerm(), last.getIndex()); + // create a file with a proper name to store the snapshot + final File snapshotFile = storage.getSnapshotFile(last.getTerm(), last.getIndex()); - //serialize the counter object and write it into the snapshot file - try { + // serialize the counter object and write it into the snapshot file + try { ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(snapshotFile))); out.writeObject(counter); out.close(); @@ -156,7 +157,7 @@ public class StateMachine extends BaseStateMachine { + "\", last applied index=" + last); } - //return the index of the stored snapshot (which is the last applied one) + // return the index of the stored snapshot (which is the last applied one) return last.getIndex(); } @@ -168,13 +169,13 @@ public class StateMachine extends BaseStateMachine { * @throws IOException if any error happens during read from storage */ private long load(SingleFileSnapshotInfo snapshot) throws IOException { - //check the snapshot nullity + // check the snapshot nullity if (snapshot == null) { LOG.warn("The snapshot info is null."); return RaftLog.INVALID_LOG_INDEX; } - //check the existance of the snapshot file + // check the existance of the snapshot file final File snapshotFile = snapshot.getFile().getPath().toFile(); if (!snapshotFile.exists()) { LOG.warn("The snapshot file {} does not exist for snapshot {}", @@ -182,18 +183,17 @@ public class StateMachine extends BaseStateMachine { return RaftLog.INVALID_LOG_INDEX; } - //load the TermIndex object for the snapshot using the file name pattern of + // load the TermIndex object for the snapshot using the file name pattern of // the snapshot - final TermIndex last = - SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile); + final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile); - //read the file and cast it to the AtomicInteger and set the counter + // read the file and cast it to the AtomicInteger and set the counter try (ObjectInputStream in = new ObjectInputStream( new BufferedInputStream(new FileInputStream(snapshotFile)))) { - //set the last applied termIndex to the termIndex of the snapshot + // set the last applied termIndex to the termIndex of the snapshot setLastAppliedTermIndex(last); - //read, cast and set the counter + // read, cast and set the counter counter = JavaUtils.cast(in.readObject()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e); @@ -202,51 +202,37 @@ public class StateMachine extends BaseStateMachine { return last.getIndex(); } - - @Override public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) { - - log.info("msg {}", groupMemberId.getPeerId()); - StateServerFactory.getCurrentPeerId().setRaftPeerId(groupMemberId.getPeerId().toString()); - - leader.set(newLeaderId == groupMemberId.getPeerId()); - - log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId , groupMemberId.getPeerId(), isLeader()); - // super.notifyLeaderChanged(groupMemberId, newLeaderId); - asyncExecutor.execute(()->{ + log.info("msg {}", groupMemberId.getPeerId()); + if (StateServerFactory.getCurrentPeerId().getRaftPeerId() == null) { + StateServerFactory.getCurrentPeerId().setRaftPeerId(groupMemberId.getPeerId().toString()); + } + + leader.set(newLeaderId == groupMemberId.getPeerId()); + + log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId, groupMemberId.getPeerId(), isLeader()); + + asyncExecutor.execute(() -> { log.info("asyncExecutor"); - var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getCurrentPeerId() )); + var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.getCurrentPeerId())); try { var reply = StateServerFactory.send(op); - log.info("{}", reply); - } catch (IOException e) { - e.printStackTrace(); + log.info("{}", MessageUtils.fromMessage(reply.getMessage())); + } catch (IOException | ClassNotFoundException e) { + log.error("{}",e.toString()); } }); - - - - - - - - if (MasterFactory.getMasterExecute().isAlive()) + if (MasterFactory.getMasterExecute().isAlive()) MasterFactory.getMasterExecute().interrupt(); - if(isLeader()) + if (isLeader()) MasterFactory.getMasterExecute().start(); - + } - - - - - - /** * Handle GET command, which used by clients to get the counter value. * @@ -255,39 +241,33 @@ public class StateMachine extends BaseStateMachine { */ @Override public CompletableFuture query(Message request) { - var data = request.getContent(); - - var inBytes = new ByteArrayInputStream( data.toByteArray()); - try (var inObject = new ObjectInputStream(inBytes)) { - // log.info("applyTransaction {}", inObject.toString()); - log.info("{}", request); - var op = (Query)inObject.readObject(); - switch(op.getType()){ - case GET_STATE: - try(var rlock = readLock()) { - var ws = state.getWorkers().get( ((WorkerState)op.getValue()).getPeerId() ); - if(ws == null) { - return CompletableFuture.completedFuture(Message.valueOf("Peerid is not exist")); - } - return CompletableFuture.completedFuture(Message.valueOf("Peerid is not exist")); - } + var reply = new RaftReply(); + try { + var query = MessageUtils.fromMessage(request); + switch (query.getType()) { + case GET_STATE: + try (var rlock = readLock()) { + var ws = state.getWorkers().get(((WorkerState) query.getValue()).getPeerId()); + if (ws == null) { + return CompletableFuture.completedFuture(Status.setError(reply, "Peerid is not exist")); + } + reply.setData(ws); + } + break; default: - - return CompletableFuture.completedFuture( - Message.valueOf("Invalid Command")); - - + + Status.setError(reply, "Invalid Command"); + return CompletableFuture.completedFuture(reply); + } } catch (ClassNotFoundException | IOException e) { - e.printStackTrace(); + log.error("{}", e.toString()); } - - return CompletableFuture.completedFuture( - Message.valueOf("ok")); + return CompletableFuture.completedFuture(Status.setOK(reply)); } /** @@ -298,68 +278,45 @@ public class StateMachine extends BaseStateMachine { */ @Override public CompletableFuture applyTransaction(TransactionContext trx) { - // log.info("applyTransaction"); - final RaftProtos.LogEntryProto entry = trx.getLogEntry(); - - - //check if the command is valid - // String logData = entry.getStateMachineLogEntry().getLogData() - // .toString(Charset.defaultCharset()); + // log.info("applyTransaction"); + var reply = new RaftReply(); - - Operate op = null; + final RaftProtos.LogEntryProto entry = trx.getLogEntry(); + final long index = entry.getIndex(); + try { - + var data = entry.getStateMachineLogEntry().getLogData(); - - var inBytes = new ByteArrayInputStream( data.toByteArray()); - var inObject = new ObjectInputStream(inBytes); - op = (Operate)inObject.readObject(); - // log.info("applyTransaction {}", data); - inObject.close(); - inBytes.close(); + final var op = MessageUtils.fromByteString(data); + try (var wlock = writeLock()) { + + switch (op.getType()) { + case ALLOCATE_PACKETS: + break; + case PUT_WORKERSTATE: + var ws = (WorkerState) op.getValue(); + if(ws.getPeerId().getRaftPeerId() == null) { + return CompletableFuture.completedFuture(Status.setError(reply, "RaftPeerId 为 null")); + } + state.getWorkers().put(ws.getPeerId(), ws); + break; + case REMOVE: + break; + default: + break; + } + // if (isLeader()) { + // log.info("{}: getType {} ", index, op.getType()); + // } + log.info("{}: getType {} ", index, op.getType()); + updateLastAppliedTermIndex(entry.getTerm(), index); + } + } catch (IOException | ClassNotFoundException e) { log.info("{}", e.toString()); - return CompletableFuture.completedFuture(Message.valueOf("错误op")); - } - - // if (!logData.equals("INCREMENT")) { - // return CompletableFuture.completedFuture( - // Message.valueOf("Invalid Command")); - // } - //update the last applied term and index - final long index = entry.getIndex(); - - try(var wlock = writeLock()) { - - switch(op.getType()) { - case ALLOCATE_PACKETS: - break; - case PUT_WORKERSTATE: - - var ws = (WorkerState)op.getValue(); - // log.info("applyTransaction {}", OperateType.PUT_WORKERSTATE); - state.getWorkers().put(ws.getPeerId() , ws); - - break; - case REMOVE: - break; - default: - break; - } - updateLastAppliedTermIndex(entry.getTerm(), index); + return CompletableFuture.completedFuture(Status.setError(reply, "错误op")); } - //return the new value of the counter to the client - final CompletableFuture f = - CompletableFuture.completedFuture(Message.valueOf("put ok")); - - //if leader, log the incremented value and it's log index - if (isLeader()) { - log.info("{}: getType {}, state {}", index, op.getType(), state); - } - - // log.info("applyTransaction {}", 6); - return f; + return CompletableFuture.completedFuture(Status.setOK(reply)); } } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java index 728a28c..769cf73 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServer.java @@ -33,6 +33,7 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.NetUtils; import com.yuandian.dataflow.statemachine.grpc.ProcessorServer; +import com.yuandian.dataflow.statemachine.state.Peer; import com.yuandian.dataflow.statemachine_old.MasterFactory; import io.netty.util.internal.StringUtil; @@ -111,6 +112,7 @@ public final class StateServer implements Closeable { this.processorServer = new ProcessorServer(); this.processorServer.getGrpcServer().start(); + this.peer.setProcessorPort(this.processorServer.getGrpcServer().getPort()); } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java index 9ec41bc..d94e2ec 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java @@ -9,6 +9,8 @@ import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeer; +import com.yuandian.dataflow.statemachine.state.Peer; + public class StateServerFactory { public static StateServer stateServer; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java index 9fe6e64..64faaec 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/client/CounterClient.java @@ -39,10 +39,10 @@ import org.springframework.cglib.proxy.CallbackFilter; import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder; import com.yuandian.dataflow.statemachine.StateServer; -import com.yuandian.dataflow.statemachine.Operate.OperateType; +import com.yuandian.dataflow.statemachine.messages.Operate; +import com.yuandian.dataflow.statemachine.messages.Query; +import com.yuandian.dataflow.statemachine.messages.Operate.OperateType; import com.yuandian.dataflow.statemachine.state.WorkerState; -import com.yuandian.dataflow.statemachine.Operate; -import com.yuandian.dataflow.statemachine.Query; import lombok.extern.slf4j.Slf4j; diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterFactory.java similarity index 82% rename from src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java rename to src/main/java/com/yuandian/dataflow/statemachine/master/MasterFactory.java index d647256..a28c88e 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/MasterFactory.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/master/MasterFactory.java @@ -4,7 +4,7 @@ * @author eson *2022年7月20日-10:00:05 */ -package com.yuandian.dataflow.statemachine_old; +package com.yuandian.dataflow.statemachine.master; import java.time.Duration; import java.time.Instant; @@ -18,19 +18,15 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.google.protobuf.Any; // import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; -import com.yuandian.dataflow.statemachine.master.MasterContext; -import com.yuandian.dataflow.statemachine.master.MasterExecute; import com.yuandian.dataflow.statemachine.state.State; import com.yuandian.dataflow.statemachine.state.WorkerState; -import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; -import com.yuandian.dataflow.statemachine_old.operate.OperateOld; -import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; import lombok.Getter; import lombok.Setter; import lombok.ToString; import lombok.extern.slf4j.Slf4j; + /** * Master主线程, 用于接收packets diff --git a/src/main/java/com/yuandian/dataflow/statemachine/messages/MessageUtils.java b/src/main/java/com/yuandian/dataflow/statemachine/messages/MessageUtils.java new file mode 100644 index 0000000..3d8f24f --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/messages/MessageUtils.java @@ -0,0 +1,31 @@ +package com.yuandian.dataflow.statemachine.messages; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; + +import org.apache.ratis.protocol.Message; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; + +@java.lang.SuppressWarnings("unchecked") + +public class MessageUtils { + + public static T fromMessage(Message msg) throws IOException, ClassNotFoundException { + var inBytes = new ByteArrayInputStream(msg.getContent().toByteArray()); + var inObject = new ObjectInputStream(inBytes); + T result = (T) inObject.readObject(); + inObject.close(); + inBytes.close(); + return result; + } + + public static T fromByteString(ByteString msg) throws IOException, ClassNotFoundException { + var inBytes = new ByteArrayInputStream(msg.toByteArray()); + var inObject = new ObjectInputStream(inBytes); + T result = (T) inObject.readObject(); + inObject.close(); + inBytes.close(); + return result; + } +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/messages/Operate.java similarity index 84% rename from src/main/java/com/yuandian/dataflow/statemachine/Operate.java rename to src/main/java/com/yuandian/dataflow/statemachine/messages/Operate.java index cf42896..6279b2b 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/Operate.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/messages/Operate.java @@ -1,4 +1,4 @@ -package com.yuandian.dataflow.statemachine; +package com.yuandian.dataflow.statemachine.messages; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -63,27 +63,23 @@ public class Operate implements Message,Serializable { this.value = value; } + + public Message toMessage() { try { - var output = ByteString.newOutput(); var outputStream = new ObjectOutputStream(output); outputStream.writeObject( this); outputStream.close(); output.close(); - // var inBytes = new ByteArrayInputStream( output.toByteString().toByteArray() ); // var inObject = new ObjectInputStream(inBytes); - - // var a = (Operate)inObject.readObject(); // log.info("applyTransaction {}", a); - - return Message.valueOf(output.toByteString()); } catch (IOException e) { - e.printStackTrace(); + log.error("{}",e.toString()); } return null; } @@ -91,33 +87,16 @@ public class Operate implements Message,Serializable { @Override public ByteString getContent() { try { - var output = ByteString.newOutput(); var outputStream = new ObjectOutputStream(output); outputStream.writeObject(this); outputStream.close(); output.close(); - - - // var inBytes = new ByteArrayInputStream( output.toByteString().toByteArray() ); - // var inObject = new ObjectInputStream(inBytes); - - - // var a = (Operate)inObject.readObject(); - // log.info("applyTransaction {}", a); - - return output.toByteString(); } catch (IOException e) { - e.printStackTrace(); + log.error("{}",e.toString()); } return null; } - - - - - - } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/Query.java b/src/main/java/com/yuandian/dataflow/statemachine/messages/Query.java similarity index 92% rename from src/main/java/com/yuandian/dataflow/statemachine/Query.java rename to src/main/java/com/yuandian/dataflow/statemachine/messages/Query.java index 622e109..c082191 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/Query.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/messages/Query.java @@ -1,4 +1,4 @@ -package com.yuandian.dataflow.statemachine; +package com.yuandian.dataflow.statemachine.messages; import java.io.IOException; import java.io.ObjectOutputStream; @@ -44,7 +44,7 @@ public class Query implements Message,Serializable { output.close(); return output.toByteString(); } catch (IOException e) { - e.printStackTrace(); + log.error("{}",e.toString()); } return null; } diff --git a/src/main/java/com/yuandian/dataflow/statemachine/messages/RaftReply.java b/src/main/java/com/yuandian/dataflow/statemachine/messages/RaftReply.java new file mode 100644 index 0000000..429b01c --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/statemachine/messages/RaftReply.java @@ -0,0 +1,78 @@ +package com.yuandian.dataflow.statemachine.messages; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import javax.servlet.http.PushBuilder; + +import org.apache.ratis.protocol.Message; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Getter +@Setter +@ToString +public class RaftReply implements Message, Serializable { + + private Status code; // 状态码 + private String message; // 返回字符串信息说明 + private Object data; // object 必须 继承 Serializable + + public RaftReply() { + + } + + @ToString + public enum Status { + OK(0), ERROR(1); + + private final int code; + private Status(int code) { + this.code = code; + } + + public static RaftReply setOK( RaftReply reply ) { + reply.setCode(OK); + reply.setData(OK.toString()); + return reply; + } + + public static RaftReply setError( RaftReply reply ) { + reply.setCode(ERROR); + reply.setData(ERROR.toString()); + return reply; + } + + public static RaftReply setError( RaftReply reply, String error) { + reply.setCode(ERROR); + reply.setData(ERROR.toString()); + return reply; + } + } + + + + @Override + public ByteString getContent() { + try { + var output = ByteString.newOutput(); + var outputStream = new ObjectOutputStream(output); + outputStream.writeObject(this); + outputStream.close(); + output.close(); + return output.toByteString(); + } catch (IOException e) { + log.error("{}",e.toString()); + } + return null; + } + +} diff --git a/src/main/java/com/yuandian/dataflow/statemachine/Peer.java b/src/main/java/com/yuandian/dataflow/statemachine/state/Peer.java similarity index 93% rename from src/main/java/com/yuandian/dataflow/statemachine/Peer.java rename to src/main/java/com/yuandian/dataflow/statemachine/state/Peer.java index c399ac4..cc324f9 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/Peer.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/Peer.java @@ -1,4 +1,4 @@ -package com.yuandian.dataflow.statemachine; +package com.yuandian.dataflow.statemachine.state; import java.io.Serializable; diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/State.java b/src/main/java/com/yuandian/dataflow/statemachine/state/State.java index 0b41804..977b9d0 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/State.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/State.java @@ -21,8 +21,6 @@ import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.HashMap; -import com.yuandian.dataflow.statemachine.Peer; - /** * 代表任务状态 暂时全局使用这个结构. 添加新增状态 * diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java index 249bda1..7090e2b 100644 --- a/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java +++ b/src/main/java/com/yuandian/dataflow/statemachine/state/WorkerState.java @@ -9,8 +9,6 @@ package com.yuandian.dataflow.statemachine.state; import java.io.Serializable; import java.time.Instant; -import com.yuandian.dataflow.statemachine.Peer; - import lombok.Getter; import lombok.Setter; import lombok.ToString; diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java deleted file mode 100644 index ad306f4..0000000 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/StateFactory.java +++ /dev/null @@ -1,329 +0,0 @@ -/** - * description - * - * @author eson - *2022年7月12日-13:36:24 - */ -package com.yuandian.dataflow.statemachine_old; - -import java.io.File; -import java.lang.reflect.InvocationTargetException; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; - -import org.reflections.ReflectionUtils; -import org.reflections.Reflections; - -import com.alipay.remoting.NamedThreadFactory; -import com.alipay.remoting.exception.CodecException; -import com.alipay.remoting.serialization.SerializerManager; -import com.alipay.sofa.jraft.JRaftUtils; -import com.alipay.sofa.jraft.Node; -import com.alipay.sofa.jraft.RaftGroupService; -import com.alipay.sofa.jraft.Status; -import com.alipay.sofa.jraft.closure.ReadIndexClosure; -import com.alipay.sofa.jraft.conf.Configuration; -import com.alipay.sofa.jraft.entity.PeerId; -import com.alipay.sofa.jraft.entity.ReadIndexStatus; -import com.alipay.sofa.jraft.entity.Task; -import com.alipay.sofa.jraft.error.RaftError; -import com.alipay.sofa.jraft.error.RemotingException; -import com.alipay.sofa.jraft.option.CliOptions; -import com.alipay.sofa.jraft.option.NodeOptions; -import com.alipay.sofa.jraft.rpc.InvokeCallback; -import com.alipay.sofa.jraft.rpc.InvokeContext; -import com.alipay.sofa.jraft.rpc.RpcClient; -import com.alipay.sofa.jraft.rpc.RpcProcessor; -import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory; -import com.alipay.sofa.jraft.util.BytesUtil; -import com.alipay.sofa.jraft.util.Endpoint; -import com.alipay.sofa.jraft.util.ThreadPoolUtil; -import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor; -import com.yuandian.dataflow.statemachine.annotations.MasterRegister; -import com.yuandian.dataflow.statemachine.master.MasterExecute; -import com.yuandian.dataflow.statemachine.state.State; -import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; -import com.yuandian.dataflow.statemachine_old.operate.OperateOld; -import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; -import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; -import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor.OperateRequest; -import com.yuandian.dataflow.utils.Utils; - -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; - -/** - * description - * - * @author eson - * 2022年7月12日-13:36:24 - */ -@Slf4j -public class StateFactory { - - private static StateServer ss; - private static ReentrantLock lockReadIndex = new ReentrantLock(); - - public static void startStateServer(String peerstr, Configuration conf) throws Exception { - if (ss != null) { - throw new Exception("重复初始化 InitStateServer"); - } - ss = new StateFactory.StateServer(peerstr, conf); - } - - public static boolean isLeader() { - return ss.node.isLeader(); - } - - public static PeerId getLeaderId() { - return ss.node.getLeaderId(); - } - - public static PeerId getServerId() { - return ss.cluster.getServerId(); - } - - public static Node getNode() { - return ss.node; - } - - public static Node getRaftNode() { - return ss.cluster.getRaftNode(); - } - - public static RpcClient getRpcClient() { - return ss.getRpcClient(); - } - - public static RaftGroupService getCluster() { - return ss.getCluster(); - } - - // 获取状态服务的对象 - public static StateServer getStateServer() { - return ss; - } - - public static void readIndexState(GenericClosure closure) { - ss.readIndexState(closure); - } - - public static void applyOperate(OperateOld op, GenericClosure closure) { - ss.applyOperate(op, closure); - } - - public static void rpcClientInvokeAsync(final Endpoint endpoint, final Object request, - final InvokeCallback callback, final long timeoutMs) - throws InterruptedException, RemotingException { - ss.getRpcClient().invokeAsync(endpoint, request, callback, timeoutMs); - } - - public static Object rpcClientInvokeSync(final Endpoint endpoint, final Object request, final long timeoutMs) - throws InterruptedException, RemotingException { - return ss.getRpcClient().invokeSync(endpoint, request, timeoutMs); - } - - public static Object rpcClientInvokeSync(final Endpoint endpoint, final Object request, final InvokeContext ctx, - final long timeoutMs) throws InterruptedException, RemotingException { - return ss.getRpcClient().invokeSync(endpoint, request, ctx, timeoutMs); - } - - @Getter - @Setter - public static class StateServer { - - RpcClient rpcClient; - - private Node node; - private RaftGroupService cluster; - private StateMachine fsm; - - private String groupId = "dataflow"; - private Executor readIndexExecutor = createReadIndexExecutor(); - - public StateServer(String addr, Configuration conf) { - // String[] peers = new - // String[]{"localhost:4440","localhost:4441","localhost:4442"}; - // String[] sprPeers = new String[]{"3440","3441","3442"}; - - // var peeridstr = peers[Integer.parseInt(serverId)]; - // var sprPort = sprPeers[Integer.parseInt(args[0])]; - - // String groupId = "jraft"; - - // conf = - // JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442"); - - PeerId serverId = JRaftUtils.getPeerId(addr); - int port = serverId.getPort(); - - NodeOptions nodeOptions = new NodeOptions(); - - nodeOptions.setElectionTimeoutMs(1000); - nodeOptions.setSnapshotLogIndexMargin(3600); - nodeOptions.setInitialConf(conf); - - File RaftDataFile = new File(String.format("./raftdata/%d", port)); - log.info("mkdirs: {}", RaftDataFile.mkdirs()); - - nodeOptions.setLogUri(String.format("./raftdata/%d/logs", port)); - nodeOptions.setRaftMetaUri(String.format("./raftdata/%d/rafts", port)); - nodeOptions.setSnapshotUri(String.format("./raftdata/%d/snaps", port)); - fsm = new StateMachine(); // 状态实例初始化 - nodeOptions.setFsm(fsm); - - cluster = new RaftGroupService(groupId, serverId, nodeOptions); - - // 扫描注解RaftProccessor 注册 - var now = Instant.now(); - HashMap> scansMap = new HashMap<>(); - var traces = Thread.currentThread().getStackTrace(); - var clsName = traces[traces.length - 1].getClassName(); - var packName = clsName.substring(0, Utils.indexOfAsPossible(clsName, ".", 3)); - log.info("获取 {} -> {} 下包的所有注解", clsName, packName); - - var refl = new Reflections(packName); - Set> scans = refl.getTypesAnnotatedWith(GrpcProcessor.class); - - scans.forEach((pRaftClass) -> { - scansMap.put(pRaftClass.getName(), pRaftClass); - }); - log.info("扫描注解的时间(scan annotations cost time): {} ms", Duration.between(now, Instant.now()).toMillis()); - scansMap.forEach((name, pRaftClass) -> { - try { - cluster.getRpcServer() - .registerProcessor((RpcProcessor) pRaftClass.getDeclaredConstructor().newInstance()); - } catch (InstantiationException | IllegalAccessException | IllegalArgumentException - | InvocationTargetException | NoSuchMethodException | SecurityException e) { - log.info("{} {}", name, e.toString()); - } - }); - - refl.getTypesAnnotatedWith(MasterRegister.class).forEach((pClass) -> { - try { - MasterExecute execute = (MasterExecute) pClass.getDeclaredConstructor().newInstance(); - MasterFactory.registerMasterLoop(execute); - - } catch (InstantiationException | IllegalAccessException | IllegalArgumentException - | InvocationTargetException | NoSuchMethodException | SecurityException e) { - log.info("{}", e.toString()); - - } - }); - ; - - // 启动集群 - node = cluster.start(); - - rpcClient = new BoltRaftRpcFactory().createRpcClient(); // 创建rpc客户端. - rpcClient.init(new CliOptions()); // 初始化 - } - - public boolean isLeader() { - return this.fsm.isLeader(); - } - - public void readIndexState(GenericClosure closure) { - - getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() { - @Override - public void run(Status status, long index, byte[] reqCtx) { - // log.debug("readIndexState({}) {}", getServerId(), status); - if (status.isOk()) { - closure.success(ss.fsm.getState()); - closure.setValue(ss.fsm.getState()); - closure.run(status); - return ; - } - // 回调失败 - // 提交同步 - - log.info("status not ok"); - // readIndexExecutor.execute(()->{ - // if(isLeader()) { - // Operate.CallOperate(new Operate(OperateType.GET_STATE, null), closure); - // } else { - // handlerNotLeaderError(closure); - // } - // }); - } - }); - - } - - public void applyOperate(OperateOld op, GenericClosure closure) { - // 所有的提交都必须再leader进行 - if (!ss.isLeader()) { - ss.handlerNotLeaderError(closure); - return; - } - - try { - closure.setValue(op); - final Task task = new Task(); - task.setData( - ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(op))); - task.setDone(closure); // 确认所有数据 一致, 不需要加锁 - StateFactory.getNode().apply(task); - - } catch (CodecException e) { - String errorMsg = "Fail to encode Operate"; - log.debug("{} {}", errorMsg, e); - closure.failure(errorMsg, PeerId.emptyPeer()); - closure.run(new Status(RaftError.EINTERNAL, errorMsg)); - } - } - - public RaftResponse redirect() { - final RaftResponse response = new RaftResponse (); - response.setSuccess(false); - if (this.node != null) { - final PeerId leader = this.node.getLeaderId(); - if (leader != null) { - response.setRedirect(leader); - } - } - return response; - } - - public void handlerNotLeaderError(final GenericClosure closure) { - closure.failure("Not leader.", redirect().getRedirect()); - closure.run(new Status(RaftError.EPERM, "Not leader")); - } - - private Executor createReadIndexExecutor() { - return ThreadPoolUtil.newBuilder() // - .poolName("ReadIndexPool") // - .enableMetric(true) // - .coreThreads(4) // - .maximumThreads(4) // - .keepAliveSeconds(60L) // - .workQueue(new SynchronousQueue<>()) // - .threadFactory(new NamedThreadFactory("ReadIndexService", true)) // - .rejectedHandler(new ThreadPoolExecutor.AbortPolicy()) // - .build(); - } - } - - public static void main(String[] args) throws InterruptedException, RemotingException { - - ReflectionUtils.get(ReflectionUtils.SuperClass.of(State.class)); - - var rpcClient = new BoltRaftRpcFactory().createRpcClient(); - rpcClient.init(new CliOptions()); - var resp = rpcClient.invokeSync(new Endpoint("localhost", 4441), new OperateRequest(), 5000); - log.info("{}", resp); - } -} diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java deleted file mode 100644 index d642e43..0000000 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/StateMachine.java +++ /dev/null @@ -1,282 +0,0 @@ -package com.yuandian.dataflow.statemachine_old; - -import java.nio.ByteBuffer; -import java.time.Instant; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - -import com.alipay.remoting.exception.CodecException; -import com.alipay.remoting.serialization.SerializerManager; -import com.alipay.sofa.jraft.Closure; -import com.alipay.sofa.jraft.Iterator; -import com.alipay.sofa.jraft.Status; -import com.alipay.sofa.jraft.conf.Configuration; -import com.alipay.sofa.jraft.core.StateMachineAdapter; -import com.alipay.sofa.jraft.entity.LeaderChangeContext; -import com.alipay.sofa.jraft.entity.PeerId; -import com.alipay.sofa.jraft.error.RaftException; -import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; -import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; -import com.yuandian.dataflow.statemachine.state.State; -import com.yuandian.dataflow.statemachine.state.WorkerState; -// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; -import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; -import com.yuandian.dataflow.statemachine_old.operate.OperateOld; -import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; -import com.yuandian.dataflow.utils.Utils; - -import lombok.extern.slf4j.Slf4j; - -/** - * Counter state machine. - * - * @author boyan (boyan@alibaba-inc.com) - * - * 2018-Apr-09 4:52:31 PM - */ -@Slf4j -public class StateMachine extends StateMachineAdapter { - - // private static final Logger LOG = - // LoggerFactory.getLogger(StateMachine.class); - - /** - * State value 全局使用的唯一状态 - */ - private State state = new State(); - /** - * Leader term - */ - private final AtomicLong leaderTerm = new AtomicLong(-1); - - public boolean isLeader() { - return this.leaderTerm.get() > 0; - } - - /** - * Returns current value. 只有Get 操作状态由协议流程决定 Apply - */ - public State getState() { - return state; - } - - @Override - @SuppressWarnings("unchecked") - public void onApply(final Iterator iter) { - while (iter.hasNext()) { - - OperateOld op = null; - GenericClosure closure = null; - if (iter.done() != null) { - // leader可以直接从 回调closure里提取operate - closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交 - op = (OperateOld)closure.getValue(); - } else { - // 非leader 需要从getData反序列化出来后处理 - final ByteBuffer data = iter.getData(); - try { - op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize( - data.array(), - OperateOld.class.getName()); - } catch (CodecException e) { - log.info("{}", e.toString()); - } - } - - if (op == null) { - log.error("op 为 {}. 存在错误, 可能版本不一致", op); - continue; - } - - switch (op.getType()) { - - case PUT_WORKERSTATE: - WorkerState opws = op.getValue(); - log.debug("PUT {}", opws.peerId); - // state.getWorkers().put(opws.peerId, opws); - if (closure != null) { - closure.success(op); - closure.run(Status.OK()); - } - break; - case ALLOCATE_PACKETS: - List alivePeers = op.getValue(); - PeerId[] peers = new PeerId[alivePeers.size()]; - alivePeers.toArray(peers); - - // 统计 每个节点还有多少任务容量 - var isNext = false; - var canTasks = new int[alivePeers.size()]; - for(int i = 0; i < peers.length; i++) { - var peer = peers[i]; - WorkerState ws = state.getWorkers().get(peer); - if (ws == null) { - log.error("WorkerState获取的状态为 {}", ws); - continue; - } - var can = OperateOld.MAX_TASKS - ws.getTaskQueueSize(); - canTasks[i] = can; - if(!isNext) { - isNext = true; - } - } - - if(!isNext) { - break; - } - - // log.info("size: {}", Operate.packetsManager.size()); - // 统计每个节点发送多少任务 - var allocTasks = Utils.allocationTasks(OperateOld.packetsManager.size(), canTasks); - if(closure != null) { - closure.setValue(allocTasks); - } - for(int i = 0; i < peers.length; i++) { - var peer = peers[i]; - if(allocTasks[i] <= 0) { - continue; - } - - WorkerState ws = state.getWorkers().get(peer); - ws.setUpdateAt(Instant.now()); - ws.setTaskQueueSize(ws.getTaskQueueSize() + allocTasks[i]); - // log.info("剩余能处理的任务数量[{}] :{}", peer, Operate.MAX_TASKS - ws.getTaskQueueSize()); - } - - break; - // case GET_STATE: - // closure.setValue(this.state); - // log.info("GET_STATE value={} at logIndex={}", this.state, iter.getIndex()); - // break; - case REMOVE: - break; - default: - break; - - } - - if (closure != null) { - closure.success(op); - closure.run(Status.OK()); - } - - - iter.next(); - } - } - - @Override - public void onSnapshotSave(final SnapshotWriter writer, final Closure done) { - return; - } - - @Override - public void onError(final RaftException e) { - log.debug("Raft error: {}", e, e); - } - - @Override - public boolean onSnapshotLoad(final SnapshotReader reader) { - return true; - } - - @Override - public void onLeaderStart(final long term) { - log.debug("onLeaderStart[{}]", StateFactory.getServerId()); - this.leaderTerm.set(term); - - // 判断是否Master线程还在跑, 如果存在则中断 - if (MasterFactory.getMasterExecute().isAlive()) { - MasterFactory.getMasterExecute().interrupt(); - } - - - - var ws = this.state.getWorkers().get(StateFactory.getServerId()); - if (ws == null) { - // ws = new WorkerState(StateFactory.getServerId()); - } - - // 更新当前WorkerState - StateFactory.applyOperate(new OperateOld(OperateType.PUT_WORKERSTATE, ws), new GenericClosure () { - @Override - public void run(Status status) { - log.debug("master update workerstate: {}", status); - } - }); - - - // 当成为master时候 必须启动 - MasterFactory.getMasterExecute().start(); - super.onLeaderStart(term); - } - - @Override - public void onLeaderStop(final Status status) { - log.debug("onLeaderStop[{}]", StateFactory.getServerId()); - this.leaderTerm.set(-1); - // 判断是否Master线程还在跑, 如果存在则中断 - if (MasterFactory.getMasterExecute().isAlive()) { - MasterFactory.getMasterExecute().interrupt(); - } - super.onLeaderStop(status); - } - - @Override - public void onShutdown() { - log.info("onShutdown[{}]",StateFactory.getServerId()); - super.onShutdown(); - } - - @Override - public void onStartFollowing(LeaderChangeContext ctx) { - log.debug("onStartFollowing[{}]] {}", StateFactory.getServerId(),ctx); - try { - - // 判断是否Master线程还在跑, 如果存在则中断 - if (MasterFactory.getMasterExecute().isAlive()) { - MasterFactory.getMasterExecute().interrupt(); - } - - // 在startFollowing不能使用 readIndexState - - - // 更新当前WorkerState - // OperateOld.CallOperate(new OperateOld(OperateType.PUT_WORKERSTATE, new WorkerState(StateFactory.getServerId())), new GenericClosure() { - // @Override - // public void run(Status status) { - // log.debug("onStartFollowing update workerstate: {}", status); - // } - // }); - - return; - } catch (Exception e) { - log.info("{}", e.toString()); - } - - super.onStartFollowing(ctx); - } - - @Override - public void onConfigurationCommitted(Configuration conf) { - super.onConfigurationCommitted(conf); - } - - @Override - - public void onStopFollowing(LeaderChangeContext ctx) { - log.debug("onStopFollowing[{}] {}", StateFactory.getServerId(), ctx); - - // var ws = new WorkerState(StateFactory.getServerId()); - // var op = new OperateOld(OperateType.PUT_WORKERSTATE, ws); - // OperateOld.CallOperate(op, new GenericClosure() { - // @Override - // public void run(Status status) { - // log.info("{} {}", status, this.getResponse()); - // } - // }); - - super.onStopFollowing(ctx); - } - -} diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/closure/GenericClosure.java b/src/main/java/com/yuandian/dataflow/statemachine_old/closure/GenericClosure.java deleted file mode 100644 index c828f97..0000000 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/closure/GenericClosure.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.yuandian.dataflow.statemachine_old.closure; - -import com.alipay.sofa.jraft.Closure; -import com.alipay.sofa.jraft.entity.PeerId; -import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -@Getter -@Setter -@ToString -public abstract class GenericClosure implements Closure { - - - // 状态机的统一响应 - private RaftResponse response; - // 代表任务状态 - private Object value; - - public T getValue() { - if(this.value == null) { - return null; - } - return (T)this.value; - } - - public GenericClosure() { - - } - - /** - * 错误的时候返回错误信息. 自动装配response - * @param errorMsg - * @param redirect - */ - public void failure(final String errorMsg, final PeerId redirect) { - final RaftResponse response = new RaftResponse(); - response.setSuccess(false); - response.setMsg(errorMsg); - log.error("{}", errorMsg); - response.setRedirect(redirect); - setResponse(response); - } - - /** - * 成功时调用该方法. 自动装配response - * @param value - */ - public void success(final Object value) { - final RaftResponse response = new RaftResponse (); - response.setValue(value); - response.setSuccess(true); - setResponse(response); - } -} diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java b/src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java deleted file mode 100644 index fb4eae4..0000000 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/operate/OperateOld.java +++ /dev/null @@ -1,108 +0,0 @@ -package com.yuandian.dataflow.statemachine_old.operate; - -import java.io.Serializable; - -import com.alipay.sofa.jraft.Status; -import com.alipay.sofa.jraft.error.RemotingException; -import com.alipay.sofa.jraft.rpc.InvokeCallback; -import com.yuandian.dataflow.statemachine.state.WorkerState; -import com.yuandian.dataflow.statemachine_old.StateFactory; -import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; -import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor; -import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; -import com.yuandian.dataflow.utils.PacketsManager; - -import lombok.Data; -import lombok.extern.slf4j.Slf4j; - -/** - * 操作 - * - * @author eson - */ -@Slf4j -@Data -public class OperateOld implements Serializable { - - private static int DEFAULT_ASYNC_TIMEOUT = 5000; - public static final int MAX_TASKS = 1000; - public static PacketsManager packetsManager = new PacketsManager(); - - public static enum OperateType { - /** - * 同步WorkerState状态. - */ - PUT_WORKERSTATE, - - /** - * 分配packets - */ - ALLOCATE_PACKETS, - - - /** - * 暂无想法 - */ - REMOVE; - } - - private OperateType type; - private Object value; - - - - public OperateOld(OperateType t, Object v) { - this.type = t; - this.value = v; - } - - @java.lang.SuppressWarnings("unchecked") - public T getValue() { - return (T) this.value; - }; - - public void setValue(T value) { - this.value = value; - return; - }; - - /** - * 调用操作设置 - * @param op 传入的操作类 - * @param closure 回调函数. Operate为返回值 - */ - @java.lang.SuppressWarnings("unchecked") - public static void CallOperate(OperateOld op, GenericClosure closure) { - // log.debug("CallOperate Value {}", op.getValue()); - // 如果是leader 就直接提交 - if (StateFactory.isLeader()) { - StateFactory.applyOperate(op, closure); - return; - } - - // 非leader 转发请求 统一有leader处理 - var request = new OperateProcessor.OperateRequest(); - request.setOperate(op); - try { - StateFactory.rpcClientInvokeAsync(StateFactory.getLeaderId().getEndpoint(), request, new InvokeCallback() { - @Override - public void complete(Object result, Throwable err) { - // log.debug("Object result {}", result); - var resp = (RaftResponse) result; - closure.setResponse(resp); - closure.success(resp.getValue()); - closure.run(Status.OK()); - } - - }, DEFAULT_ASYNC_TIMEOUT); - } catch (InterruptedException | RemotingException e) { - closure.failure(e.getMessage(), null); - closure.run(new Status(100000, "invokeAsync fail")); - log.info("{}", e.toString()); - } - - } - - - -} diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java deleted file mode 100644 index 8a22d8b..0000000 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/OperateProcessor.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * description - * - * @author eson - *2022年7月12日-11:10:54 - */ -package com.yuandian.dataflow.statemachine_old.rpc; - -import java.io.Serializable; - -import com.alipay.sofa.jraft.Status; -import com.alipay.sofa.jraft.error.RaftError; -import com.alipay.sofa.jraft.rpc.RpcContext; -import com.alipay.sofa.jraft.rpc.RpcProcessor; -import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor; -import com.yuandian.dataflow.statemachine_old.StateFactory; -import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; -import com.yuandian.dataflow.statemachine_old.operate.OperateOld; - -import javassist.ClassPath; - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import lombok.extern.slf4j.Slf4j; - -/** - * description - * - * @author eson - *2022年7月12日-11:10:54 - */ -@Slf4j - -public class OperateProcessor implements RpcProcessor { - - /** - * 同步状态时, 需要用的结构类. 新增的状态可以在 Operate结构里添加 - * - * @author eson - *2022年7月11日-16:01:07 - */ - @Getter - @Setter - @ToString - public static class OperateRequest implements Serializable { - - private static final long serialVersionUID = 1L; - - private OperateOld operate; - } - - - @Override - public void handleRequest(RpcContext rpcCtx, OperateRequest request) { - - // log.info("request: {}", request); - - final GenericClosure closure = new GenericClosure () { - @Override - public void run(Status status) { - - if(status.isOk()) { - // log.info("{}", status); - rpcCtx.sendResponse(getResponse()); - return; - } - - if(status.getRaftError() == RaftError.EPERM) { - //TODO: Not leader 需要转发 - log.info("{}", status); - } - - } - }; - - StateFactory.getStateServer().applyOperate(request.getOperate(), closure); - } - - @Override - public String interest() { - return OperateRequest.class.getName(); - } -} diff --git a/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/RaftResponse.java b/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/RaftResponse.java deleted file mode 100644 index ce3b494..0000000 --- a/src/main/java/com/yuandian/dataflow/statemachine_old/rpc/RaftResponse.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * description - * - * @author eson - *2022年7月13日-09:07:22 - */ -package com.yuandian.dataflow.statemachine_old.rpc; - -import java.io.Serializable; - -import com.alipay.sofa.jraft.entity.PeerId; - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import lombok.extern.slf4j.Slf4j; - -/** - * description - * - * @author eson - *2022年7月13日-09:07:22 - */ -@Slf4j -@Getter -@Setter -@ToString -public class RaftResponse implements Serializable { - - private static final long serialVersionUID = 1L; - - private Object value; - - private boolean success; - /** - * redirect peer id - */ - private PeerId redirect; - - private String msg; -}