回滚测试

This commit is contained in:
huangsimin 2022-08-08 14:59:04 +08:00
parent 67dd6f4da2
commit 32debdd367
22 changed files with 1083 additions and 581 deletions

7
.vscode/launch.json vendored
View File

@ -4,6 +4,13 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0", "version": "0.2.0",
"configurations": [ "configurations": [
{
"type": "java",
"name": "Launch ProcessorServer",
"request": "launch",
"mainClass": "com.yuandian.dataflow.statemachine.grpc.ProcessorServer",
"projectName": "dataflow"
},
{ {
"type": "java", "type": "java",
"name": "Launch CounterClient", "name": "Launch CounterClient",

View File

@ -17,14 +17,14 @@ import com.google.protobuf.Any;
// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; // import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.annotations.MasterRegister; import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
import com.yuandian.dataflow.statemachine.master.MasterContext;
import com.yuandian.dataflow.statemachine.master.MasterExecute;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.statemachine_old.StateFactory; import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.master.MasterContext;
import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld; import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.state.State;
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
import com.yuandian.dataflow.utils.PacketsManager; import com.yuandian.dataflow.utils.PacketsManager;
import com.yuandian.dataflow.utils.Utils; import com.yuandian.dataflow.utils.Utils;

View File

@ -17,16 +17,16 @@ import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.rpc.RpcContext; import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor; import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.google.protobuf.Any; import com.google.protobuf.Any;
import com.yuandian.dataflow.proto.Processor.PacketsRequest; import com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest;
import com.yuandian.dataflow.proto.Processor.Response; import com.yuandian.dataflow.proto.Processor.ProcessorResponse;
import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerImplBase; import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerImplBase;
import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor; import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine_old.StateFactory; import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld; import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine_old.state.State;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import lombok.Getter; import lombok.Getter;
@ -43,17 +43,22 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
@GrpcProcessor @GrpcProcessor
public class PacketsProcessor extends ProcessorServerImplBase { public class PacketsProcessor extends ProcessorServerImplBase {
@Override @Override
public void packetsProcessor(PacketsRequest request, StreamObserver<Response> responseObserver) { public void allPackets(PacketsProcessorRequest request, StreamObserver<ProcessorResponse> responseObserver) {
responseObserver.onNext( Response.newBuilder().build() ); var response = ProcessorResponse.newBuilder();
responseObserver.onNext( response.build() );
responseObserver.onCompleted(); responseObserver.onCompleted();
super.allPackets(request, responseObserver);
log.info("packets {}", request.getPacketsList().size());
} }
// @Setter // @Setter
// @Getter // @Getter
// public static class PacketsRequest implements Serializable { // public static class PacketsRequest implements Serializable {

View File

@ -18,7 +18,6 @@ import lombok.extern.slf4j.Slf4j;
* 例子 强制转换leader * 例子 强制转换leader
*/ */
@Slf4j @Slf4j
@GrpcProcessor
public class TransferLeaderProcessor implements RpcProcessor<TransferLeaderProcessor.LeaderRequest> { public class TransferLeaderProcessor implements RpcProcessor<TransferLeaderProcessor.LeaderRequest> {
@Setter @Setter

View File

@ -13,6 +13,9 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.thirdparty.com.google.rpc.context.AttributeContext.Peer;
import com.google.protobuf.*; import com.google.protobuf.*;
import com.google.protobuf.util.JsonFormat; import com.google.protobuf.util.JsonFormat;
import com.yuandian.common.Config; import com.yuandian.common.Config;
@ -48,10 +51,12 @@ public class CollectPackets extends CollectPacketsServerImplBase {
return null; return null;
}); });
var managedChannelBuilder = ManagedChannelBuilder.forAddress("10.1.1.117", 60017); var managedChannelBuilder = ManagedChannelBuilder.forAddress("10.1.1.117", 60017);
// var managedChannelBuilder = ManagedChannelBuilder.forAddress("127.0.0.1", 60017); // var managedChannelBuilder = ManagedChannelBuilder.forAddress("127.0.0.1", 60017);
var channel = managedChannelBuilder.maxInboundMessageSize(Integer.MAX_VALUE).usePlaintext().build(); var channel = managedChannelBuilder.maxInboundMessageSize(Integer.MAX_VALUE).usePlaintext().build();
Map<String, Function<Any, Object>> domap = getHandleFuncMap(); Map<String, Function<Any, Object>> domap = getHandleFuncMap();
try { try {
var stub = CollectPacketsServerGrpc.newBlockingStub(channel); var stub = CollectPacketsServerGrpc.newBlockingStub(channel);

File diff suppressed because it is too large Load Diff

View File

@ -24,38 +24,38 @@ public final class ProcessorServerGrpc {
private ProcessorServerGrpc() {} private ProcessorServerGrpc() {}
public static final String SERVICE_NAME = "com.yuandian.dataflow.proto.ProcessorServer"; public static final String SERVICE_NAME = "dataflow.ProcessorServer";
// Static method descriptors that strictly reflect the proto. // Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsRequest, private static volatile io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest,
com.yuandian.dataflow.proto.Processor.Response> getPacketsProcessorMethod; com.yuandian.dataflow.proto.Processor.ProcessorResponse> getAllPacketsMethod;
@io.grpc.stub.annotations.RpcMethod( @io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "PacketsProcessor", fullMethodName = SERVICE_NAME + '/' + "AllPackets",
requestType = com.yuandian.dataflow.proto.Processor.PacketsRequest.class, requestType = com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.class,
responseType = com.yuandian.dataflow.proto.Processor.Response.class, responseType = com.yuandian.dataflow.proto.Processor.ProcessorResponse.class,
methodType = io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING) methodType = io.grpc.MethodDescriptor.MethodType.UNARY)
public static io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsRequest, public static io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest,
com.yuandian.dataflow.proto.Processor.Response> getPacketsProcessorMethod() { com.yuandian.dataflow.proto.Processor.ProcessorResponse> getAllPacketsMethod() {
io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsRequest, com.yuandian.dataflow.proto.Processor.Response> getPacketsProcessorMethod; io.grpc.MethodDescriptor<com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest, com.yuandian.dataflow.proto.Processor.ProcessorResponse> getAllPacketsMethod;
if ((getPacketsProcessorMethod = ProcessorServerGrpc.getPacketsProcessorMethod) == null) { if ((getAllPacketsMethod = ProcessorServerGrpc.getAllPacketsMethod) == null) {
synchronized (ProcessorServerGrpc.class) { synchronized (ProcessorServerGrpc.class) {
if ((getPacketsProcessorMethod = ProcessorServerGrpc.getPacketsProcessorMethod) == null) { if ((getAllPacketsMethod = ProcessorServerGrpc.getAllPacketsMethod) == null) {
ProcessorServerGrpc.getPacketsProcessorMethod = getPacketsProcessorMethod = ProcessorServerGrpc.getAllPacketsMethod = getAllPacketsMethod =
io.grpc.MethodDescriptor.<com.yuandian.dataflow.proto.Processor.PacketsRequest, com.yuandian.dataflow.proto.Processor.Response>newBuilder() io.grpc.MethodDescriptor.<com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest, com.yuandian.dataflow.proto.Processor.ProcessorResponse>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.SERVER_STREAMING) .setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "PacketsProcessor")) .setFullMethodName(generateFullMethodName(SERVICE_NAME, "AllPackets"))
.setSampledToLocalTracing(true) .setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.yuandian.dataflow.proto.Processor.PacketsRequest.getDefaultInstance())) com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller( .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
com.yuandian.dataflow.proto.Processor.Response.getDefaultInstance())) com.yuandian.dataflow.proto.Processor.ProcessorResponse.getDefaultInstance()))
.setSchemaDescriptor(new ProcessorServerMethodDescriptorSupplier("PacketsProcessor")) .setSchemaDescriptor(new ProcessorServerMethodDescriptorSupplier("AllPackets"))
.build(); .build();
} }
} }
} }
return getPacketsProcessorMethod; return getAllPacketsMethod;
} }
/** /**
@ -108,20 +108,20 @@ public final class ProcessorServerGrpc {
/** /**
*/ */
public void packetsProcessor(com.yuandian.dataflow.proto.Processor.PacketsRequest request, public void allPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request,
io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.Response> responseObserver) { io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.ProcessorResponse> responseObserver) {
asyncUnimplementedUnaryCall(getPacketsProcessorMethod(), responseObserver); asyncUnimplementedUnaryCall(getAllPacketsMethod(), responseObserver);
} }
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() { @java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor()) return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod( .addMethod(
getPacketsProcessorMethod(), getAllPacketsMethod(),
asyncServerStreamingCall( asyncUnaryCall(
new MethodHandlers< new MethodHandlers<
com.yuandian.dataflow.proto.Processor.PacketsRequest, com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest,
com.yuandian.dataflow.proto.Processor.Response>( com.yuandian.dataflow.proto.Processor.ProcessorResponse>(
this, METHODID_PACKETS_PROCESSOR))) this, METHODID_ALL_PACKETS)))
.build(); .build();
} }
} }
@ -142,10 +142,10 @@ public final class ProcessorServerGrpc {
/** /**
*/ */
public void packetsProcessor(com.yuandian.dataflow.proto.Processor.PacketsRequest request, public void allPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request,
io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.Response> responseObserver) { io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.ProcessorResponse> responseObserver) {
asyncServerStreamingCall( asyncUnaryCall(
getChannel().newCall(getPacketsProcessorMethod(), getCallOptions()), request, responseObserver); getChannel().newCall(getAllPacketsMethod(), getCallOptions()), request, responseObserver);
} }
} }
@ -165,10 +165,9 @@ public final class ProcessorServerGrpc {
/** /**
*/ */
public java.util.Iterator<com.yuandian.dataflow.proto.Processor.Response> packetsProcessor( public com.yuandian.dataflow.proto.Processor.ProcessorResponse allPackets(com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request) {
com.yuandian.dataflow.proto.Processor.PacketsRequest request) { return blockingUnaryCall(
return blockingServerStreamingCall( getChannel(), getAllPacketsMethod(), getCallOptions(), request);
getChannel(), getPacketsProcessorMethod(), getCallOptions(), request);
} }
} }
@ -185,9 +184,17 @@ public final class ProcessorServerGrpc {
io.grpc.Channel channel, io.grpc.CallOptions callOptions) { io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ProcessorServerFutureStub(channel, callOptions); return new ProcessorServerFutureStub(channel, callOptions);
} }
/**
*/
public com.google.common.util.concurrent.ListenableFuture<com.yuandian.dataflow.proto.Processor.ProcessorResponse> allPackets(
com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest request) {
return futureUnaryCall(
getChannel().newCall(getAllPacketsMethod(), getCallOptions()), request);
}
} }
private static final int METHODID_PACKETS_PROCESSOR = 0; private static final int METHODID_ALL_PACKETS = 0;
private static final class MethodHandlers<Req, Resp> implements private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>, io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
@ -206,9 +213,9 @@ public final class ProcessorServerGrpc {
@java.lang.SuppressWarnings("unchecked") @java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) { public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) { switch (methodId) {
case METHODID_PACKETS_PROCESSOR: case METHODID_ALL_PACKETS:
serviceImpl.packetsProcessor((com.yuandian.dataflow.proto.Processor.PacketsRequest) request, serviceImpl.allPackets((com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest) request,
(io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.Response>) responseObserver); (io.grpc.stub.StreamObserver<com.yuandian.dataflow.proto.Processor.ProcessorResponse>) responseObserver);
break; break;
default: default:
throw new AssertionError(); throw new AssertionError();
@ -271,7 +278,7 @@ public final class ProcessorServerGrpc {
if (result == null) { if (result == null) {
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME) serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
.setSchemaDescriptor(new ProcessorServerFileDescriptorSupplier()) .setSchemaDescriptor(new ProcessorServerFileDescriptorSupplier())
.addMethod(getPacketsProcessorMethod()) .addMethod(getAllPacketsMethod())
.build(); .build();
} }
} }

View File

@ -0,0 +1,32 @@
package com.yuandian.dataflow.statemachine;
import org.apache.ratis.protocol.RaftPeer;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
@Setter
public class PeerId {
public PeerId(RaftPeer raftPeer, int processorPort) {
this.raftPeer = raftPeer;
this.processorPort = processorPort;
}
private RaftPeer raftPeer;
private int processorPort;
@Override
public boolean equals(Object arg0) {
return getRaftPeer().getId().toString() == ((PeerId)arg0).getRaftPeer().getId().toString();
}
@Override
public int hashCode() {
return getRaftPeer().getId().hashCode() ;
}
}

View File

@ -9,7 +9,7 @@ import javax.servlet.http.PushBuilder;
import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.Message;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import com.yuandian.dataflow.statemachine_old.state.WorkerState; import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@ -24,7 +24,7 @@ public class Query implements Message,Serializable {
/** /**
* 同步WorkerState状态. * 同步WorkerState状态.
*/ */
GET_WORKER_STATE, GET_STATE,
} }
private Type type; private Type type;

View File

@ -38,8 +38,9 @@ import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.JavaUtils;
import com.yuandian.dataflow.statemachine.Operate.OperateType; import com.yuandian.dataflow.statemachine.Operate.OperateType;
import com.yuandian.dataflow.statemachine_old.state.State; import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine_old.state.WorkerState; import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.statemachine_old.MasterFactory;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -197,14 +198,35 @@ public class StateMachine extends BaseStateMachine {
@Override @Override
public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) { public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {
log.info("newLeaderId: {} groupMemberId: {}", newLeaderId , groupMemberId.getPeerId());
leader.set(newLeaderId == groupMemberId.getPeerId()); leader.set(newLeaderId == groupMemberId.getPeerId());
log.info("newLeaderId: {} groupMemberId: {} leader {}", newLeaderId , groupMemberId.getPeerId(), isLeader());
var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(StateServerFactory.CurrentPeerId()));
try {
var reply = StateServerFactory.send(op);
log.info("{}", reply);
} catch (IOException e) {
e.printStackTrace();
}
if (MasterFactory.getMasterExecute().isAlive())
MasterFactory.getMasterExecute().interrupt();
if(isLeader())
MasterFactory.getMasterExecute().start();
super.notifyLeaderChanged(groupMemberId, newLeaderId); super.notifyLeaderChanged(groupMemberId, newLeaderId);
} }
/** /**
* Handle GET command, which used by clients to get the counter value. * Handle GET command, which used by clients to get the counter value.
* *
@ -221,7 +243,7 @@ public class StateMachine extends BaseStateMachine {
log.info("{}", request); log.info("{}", request);
var op = (Query)inObject.readObject(); var op = (Query)inObject.readObject();
switch(op.getType()){ switch(op.getType()){
case GET_WORKER_STATE: case GET_STATE:
try(var rlock = readLock()) { try(var rlock = readLock()) {
var ws = state.getWorkers().get( ((WorkerState)op.getValue()).getPeerId() ); var ws = state.getWorkers().get( ((WorkerState)op.getValue()).getPeerId() );
if(ws == null) { if(ws == null) {
@ -232,11 +254,11 @@ public class StateMachine extends BaseStateMachine {
default: default:
if (op.getType() == Query.Type.GET_WORKER_STATE ) {
return CompletableFuture.completedFuture( return CompletableFuture.completedFuture(
Message.valueOf("Invalid Command")); Message.valueOf("Invalid Command"));
}
break;
} }
} catch (ClassNotFoundException | IOException e) { } catch (ClassNotFoundException | IOException e) {
@ -272,13 +294,8 @@ public class StateMachine extends BaseStateMachine {
var inBytes = new ByteArrayInputStream( data.toByteArray()); var inBytes = new ByteArrayInputStream( data.toByteArray());
var inObject = new ObjectInputStream(inBytes); var inObject = new ObjectInputStream(inBytes);
// log.info("applyTransaction {}", inObject.toString());
op = (Operate)inObject.readObject(); op = (Operate)inObject.readObject();
// log.info("applyTransaction {}", data);
log.info("applyTransaction {}", data);
inObject.close(); inObject.close();
inBytes.close(); inBytes.close();
} catch (IOException | ClassNotFoundException e) { } catch (IOException | ClassNotFoundException e) {
@ -301,7 +318,7 @@ public class StateMachine extends BaseStateMachine {
case PUT_WORKERSTATE: case PUT_WORKERSTATE:
var ws = (WorkerState)op.getValue(); var ws = (WorkerState)op.getValue();
log.info("applyTransaction {}", 3); // log.info("applyTransaction {}", OperateType.PUT_WORKERSTATE);
state.getWorkers().put(ws.getPeerId() , ws); state.getWorkers().put(ws.getPeerId() , ws);
break; break;

View File

@ -18,9 +18,13 @@
package com.yuandian.dataflow.statemachine; package com.yuandian.dataflow.statemachine;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.conf.RaftProperties;
// import org.apache.ratis.examples.common.Constants; // import org.apache.ratis.examples.common.Constants;
import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcFactory;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeer;
@ -28,13 +32,19 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.NetUtils; import org.apache.ratis.util.NetUtils;
import com.yuandian.dataflow.statemachine.grpc.ProcessorServer;
import com.yuandian.dataflow.statemachine_old.MasterFactory;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import lombok.Getter;
import lombok.Setter;
import java.io.Closeable; import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.Scanner; import java.util.Scanner;
import java.util.UUID; import java.util.UUID;
@ -49,12 +59,24 @@ import static java.nio.charset.StandardCharsets.UTF_8;
* Run this application three times with three different parameter set-up a * Run this application three times with three different parameter set-up a
* ratis cluster which maintain a counter value replicated in each server memory * ratis cluster which maintain a counter value replicated in each server memory
*/ */
@Getter
@Setter
public final class StateServer implements Closeable { public final class StateServer implements Closeable {
private final RaftServer server;
public static HashMap<PeerId, PeerId> activesPeers = new HashMap<>();
private final RaftClient raftClient;
private final RaftServer raftServer;
private final ProcessorServer processorServer;
private final PeerId peer;
public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77"); public static final UUID CLUSTER_GROUP_ID = UUID.fromString("79642d72-6166-742d-6461-7461666c6f77");
public StateServer(RaftPeer peer, ArrayList<RaftPeer> peers, File storageDir) throws IOException { public StateServer(RaftPeer curpeer, ArrayList<RaftPeer> peers) throws IOException {
final File storageDir = new File("./raftdata/" + curpeer.getId());
//create a property object //create a property object
RaftProperties properties = new RaftProperties(); RaftProperties properties = new RaftProperties();
@ -62,63 +84,85 @@ public final class StateServer implements Closeable {
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir)); RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
//set the port which server listen to in RaftProperty object //set the port which server listen to in RaftProperty object
final int port = NetUtils.createSocketAddr(peer.getAddress()).getPort(); final int port = NetUtils.createSocketAddr(curpeer.getAddress()).getPort();
GrpcConfigKeys.Server.setPort(properties, port); GrpcConfigKeys.Server.setPort(properties, port);
//create the counter state machine which hold the counter value //create the counter state machine which hold the counter value
StateMachine counterStateMachine = new StateMachine();
RaftGroup raftGroup = RaftGroup.valueOf( RaftGroup raftGroup = RaftGroup.valueOf(
RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers); RaftGroupId.valueOf(CLUSTER_GROUP_ID), peers);
StateMachine stateMachine = new StateMachine();
//create and start the Raft server //create and start the Raft server
this.server = RaftServer.newBuilder() this.raftServer = RaftServer.newBuilder()
.setGroup(raftGroup) .setGroup(raftGroup)
.setProperties(properties) .setProperties(properties)
.setServerId(peer.getId()) .setServerId(curpeer.getId())
.setStateMachine(counterStateMachine) .setStateMachine(stateMachine)
.build(); .build();
// create RaftClient
raftClient = buildClient(peers,raftGroup);
this.processorServer = new ProcessorServer();
this.processorServer.getGrpcServer().start();
this.peer = new PeerId(curpeer, this.processorServer.getGrpcServer().getPort());
} }
public void start() throws IOException { // block
server.start(); public void start() throws IOException, InterruptedException {
raftServer.start();
this.processorServer.getGrpcServer().awaitTermination();
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
server.close(); MasterFactory.getMasterExecute().interrupt();
raftServer.close();
this.processorServer.getGrpcServer().shutdown();
} }
public static void main(String[] args) throws IOException { private static RaftClient buildClient( ArrayList<RaftPeer> peers, RaftGroup raftGroup) {
RaftProperties raftProperties = new RaftProperties();
RaftClient.Builder builder = RaftClient.newBuilder()
.setProperties(raftProperties)
.setRaftGroup(raftGroup)
.setClientRpc(new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), raftProperties));
return builder.build();
}
public static void main(String[] args) throws IOException, InterruptedException {
if (args.length < 1) { if (args.length < 1) {
System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.StateServer {serverIndex}"); System.err.println("Usage: java -cp *.jar org.apache.ratis.examples.counter.server.StateServer {serverIndex}");
System.err.println("{serverIndex} could be 1, 2 or 3"); System.err.println("{serverIndex} could be 1, 2 or 3");
System.exit(1); System.exit(1);
} }
var peers = new ArrayList<RaftPeer>(); StateServerFactory.startServer(args[0]);
String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
for (int i = 0; i < addresses.length; i++) { // var peers = new ArrayList<RaftPeer>();
var port = addresses[i].split(":")[1]; // String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
peers.add(RaftPeer.newBuilder().setId("" + port).setAddress(addresses[i]).build());
}
//find current peer object based on application parameter // for (int i = 0; i < addresses.length; i++) {
final RaftPeer currentPeer = peers.get(Integer.parseInt(args[0])); // // var port = addresses[i].split(":")[1];
// peers.add(RaftPeer.newBuilder().setId("yd-" + args[0]).setAddress(addresses[i]).build());
// }
//start a counter server // //find current peer object based on application parameter
final File storageDir = new File("./raftdata/" + currentPeer.getId()); // final RaftPeer currentPeer = peers.get(Integer.parseInt(args[0]));
final StateServer stateServer = new StateServer(currentPeer, peers, storageDir);
stateServer.start(); // //start a counter server
// final File storageDir = new File("./raftdata/" + currentPeer.getId());
// final StateServer stateServer = new StateServer(currentPeer, peers, storageDir);
// stateServer.start();
//exit when any input entered // stateServer.close();
Scanner scanner = new Scanner(System.in, UTF_8.name());
scanner.nextLine();
stateServer.close();
} }
} }

View File

@ -0,0 +1,44 @@
package com.yuandian.dataflow.statemachine;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
public class StateServerFactory {
public static StateServer stateServer;
public static void startServer(String sid) throws IOException, InterruptedException {
var peers = new ArrayList<RaftPeer>();
String[] addresses = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
for (int i = 0; i < addresses.length; i++) {
// var port = addresses[i].split(":")[1];
peers.add(RaftPeer.newBuilder().setId("yd-" + sid).setAddress(addresses[i]).build());
}
//find current peer object based on application parameter
final RaftPeer currentPeer = peers.get(Integer.parseInt(sid));
//start a counter server
final StateServer stateServer = new StateServer(currentPeer, peers);
stateServer.start();
stateServer.close();
}
public static PeerId CurrentPeerId() {
return stateServer.getPeer();
}
public static RaftClientReply send(Message msg) throws IOException {
return stateServer.getRaftClient().io().send(msg);
}
}

View File

@ -40,7 +40,7 @@ import org.springframework.cglib.proxy.CallbackFilter;
import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder; import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
import com.yuandian.dataflow.statemachine.StateServer; import com.yuandian.dataflow.statemachine.StateServer;
import com.yuandian.dataflow.statemachine.Operate.OperateType; import com.yuandian.dataflow.statemachine.Operate.OperateType;
import com.yuandian.dataflow.statemachine_old.state.WorkerState; import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.statemachine.Operate; import com.yuandian.dataflow.statemachine.Operate;
import com.yuandian.dataflow.statemachine.Query; import com.yuandian.dataflow.statemachine.Query;
@ -92,6 +92,7 @@ public final class CounterClient {
var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(null)); var op = new Operate(OperateType.PUT_WORKERSTATE, new WorkerState(null));
var reply = raftClient.io().send(op); var reply = raftClient.io().send(op);
log.info("{}", reply); log.info("{}", reply);
executorService.submit(() -> raftClient.io().send(op)); executorService.submit(() -> raftClient.io().send(op));
latch.countDown(); latch.countDown();
@ -105,10 +106,11 @@ public final class CounterClient {
log.info("{}", Duration.between(now, Instant.now()).toMillis()); log.info("{}", Duration.between(now, Instant.now()).toMillis());
//send GET command and print the response //send GET command and print the response
var query = new Query(Query.Type.GET_WORKER_STATE, new WorkerState(null)); var query = new Query(Query.Type.GET_STATE, new WorkerState(null));
RaftClientReply count = raftClient.io().sendReadOnly(query); RaftClientReply count = raftClient.io().sendReadOnly(query);
String response = count.getMessage().getContent().toString(Charset.defaultCharset()); String response = count.getMessage().getContent().toString(Charset.defaultCharset());
System.out.println(response); // System.out.println(response);
log.info("{}", response);
} }

View File

@ -0,0 +1,102 @@
package com.yuandian.dataflow.statemachine.grpc;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.reflections.Reflections;
import com.yuandian.dataflow.proto.Processor.PacketsProcessorRequest;
import com.yuandian.dataflow.proto.Processor.ProcessorResponse;
import com.yuandian.dataflow.proto.ProcessorServerGrpc.ProcessorServerImplBase;
import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
import com.yuandian.dataflow.statemachine.master.MasterExecute;
import com.yuandian.dataflow.statemachine_old.MasterFactory;
import com.yuandian.dataflow.utils.Utils;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerServiceDefinition;
import io.grpc.stub.StreamObserver;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
@Setter
public class ProcessorServer {
public static void main(String[] args) throws Exception {
// ServerBuilder builder = ServerBuilder.forPort(0);
// var server = builder.build().start();
// log.info("{}", server.getPort());
var server = new ProcessorServer();
server.grpcServer.awaitTermination();
}
private Server grpcServer;
public ProcessorServer() {
ServerBuilder builder = ServerBuilder.forPort(0);
// 扫描注解RaftProccessor 注册
var now = Instant.now();
HashMap<String, Class<?>> scansMap = new HashMap<>();
var traces = Thread.currentThread().getStackTrace();
var clsName = traces[traces.length - 1].getClassName();
var packName = clsName.substring(0, Utils.indexOfAsPossible(clsName, ".", 3));
log.info("获取 {} -> {} 下包的所有注解", clsName, packName);
var refl = new Reflections(packName);
Set<Class<?>> scans = refl.getTypesAnnotatedWith(GrpcProcessor.class);
scans.forEach((pRaftClass) -> {
scansMap.put(pRaftClass.getName(), pRaftClass);
});
log.info("扫描注解的时间(scan annotations cost time): {} ms", Duration.between(now, Instant.now()).toMillis());
scansMap.forEach((name, pRaftClass) -> {
try {
builder.addService( (BindableService)pRaftClass.getDeclaredConstructor().newInstance() );
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e) {
log.info("{}",e.toString());
}
});
refl.getTypesAnnotatedWith(MasterRegister.class).forEach((pClass) -> {
try {
MasterExecute execute = (MasterExecute) pClass.getDeclaredConstructor().newInstance();
MasterFactory.registerMasterLoop(execute);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e) {
log.info("{}", e.toString());
}
});
grpcServer = builder.build();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
log.info("*** shutting down gRPC server since JVM is shutting down");
try {
grpcServer.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("{}", e.toString());
}
System.err.println("*** server shut down");
}
});
}
}

View File

@ -1,4 +1,4 @@
package com.yuandian.dataflow.statemachine_old.master; package com.yuandian.dataflow.statemachine.master;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -12,9 +12,6 @@ public class MasterContext {
private AtomicBoolean isExit = new AtomicBoolean(false); private AtomicBoolean isExit = new AtomicBoolean(false);
private Duration lastLoopExecuteTime = Duration.ZERO; private Duration lastLoopExecuteTime = Duration.ZERO;
private Object share; private Object share;
public Boolean getIsExit() { public Boolean getIsExit() {

View File

@ -1,4 +1,4 @@
package com.yuandian.dataflow.statemachine_old.master; package com.yuandian.dataflow.statemachine.master;
/** /**
* Master的主线程循环 * Master的主线程循环

View File

@ -4,11 +4,11 @@
* @author eson * @author eson
*2022年7月13日-09:11:26 *2022年7月13日-09:11:26
*/ */
package com.yuandian.dataflow.statemachine_old.state; package com.yuandian.dataflow.statemachine.state;
import java.io.Serializable; import java.io.Serializable;
import com.alipay.sofa.jraft.entity.PeerId;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@ -21,6 +21,8 @@ import java.lang.reflect.Method;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.util.HashMap; import java.util.HashMap;
import com.yuandian.dataflow.statemachine.PeerId;
/** /**
* 代表任务状态 暂时全局使用这个结构. 添加新增状态 * 代表任务状态 暂时全局使用这个结构. 添加新增状态
* *

View File

@ -4,12 +4,12 @@
* @author eson * @author eson
*2022年7月15日-10:04:00 *2022年7月15日-10:04:00
*/ */
package com.yuandian.dataflow.statemachine_old.state; package com.yuandian.dataflow.statemachine.state;
import java.io.Serializable; import java.io.Serializable;
import java.time.Instant; import java.time.Instant;
import com.alipay.sofa.jraft.entity.PeerId; import com.yuandian.dataflow.statemachine.PeerId;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@ -50,4 +50,6 @@ public class WorkerState implements Serializable {
this.peerId = peer; this.peerId = peer;
this.updateAt = Instant.now(); this.updateAt = Instant.now();
} }
} }

View File

@ -18,13 +18,13 @@ import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.google.protobuf.Any; import com.google.protobuf.Any;
// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; // import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import com.yuandian.dataflow.statemachine.master.MasterContext;
import com.yuandian.dataflow.statemachine.master.MasterExecute;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.master.MasterContext;
import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld; import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.state.State;
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@ -56,7 +56,7 @@ public class MasterFactory {
public static Thread masterExecuteThread = new Thread(new Runnable() { public static Thread masterExecuteThread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
log.info("master execute");
MasterContext cxt = new MasterContext(); MasterContext cxt = new MasterContext();
while (!cxt.getIsExit()) { while (!cxt.getIsExit()) {
Instant now = Instant.now(); Instant now = Instant.now();
@ -72,8 +72,4 @@ public class MasterFactory {
return masterExecuteThread; return masterExecuteThread;
} }
public static void Init() {
}
} }

View File

@ -51,13 +51,13 @@ import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil; import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor; import com.yuandian.dataflow.statemachine.annotations.GrpcProcessor;
import com.yuandian.dataflow.statemachine.annotations.MasterRegister; import com.yuandian.dataflow.statemachine.annotations.MasterRegister;
import com.yuandian.dataflow.statemachine.master.MasterExecute;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.master.MasterExecute;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld; import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor.OperateRequest; import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine_old.state.State;
import com.yuandian.dataflow.utils.Utils; import com.yuandian.dataflow.utils.Utils;
import lombok.Getter; import lombok.Getter;

View File

@ -17,12 +17,12 @@ import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.error.RaftException; import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.yuandian.dataflow.statemachine.state.State;
import com.yuandian.dataflow.statemachine.state.WorkerState;
// import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest; // import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld; import com.yuandian.dataflow.statemachine_old.operate.OperateOld;
import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType; import com.yuandian.dataflow.statemachine_old.operate.OperateOld.OperateType;
import com.yuandian.dataflow.statemachine_old.state.State;
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
import com.yuandian.dataflow.utils.Utils; import com.yuandian.dataflow.utils.Utils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -93,7 +93,7 @@ public class StateMachine extends StateMachineAdapter {
case PUT_WORKERSTATE: case PUT_WORKERSTATE:
WorkerState opws = op.getValue(); WorkerState opws = op.getValue();
log.debug("PUT {}", opws.peerId); log.debug("PUT {}", opws.peerId);
state.getWorkers().put(opws.peerId, opws); // state.getWorkers().put(opws.peerId, opws);
if (closure != null) { if (closure != null) {
closure.success(op); closure.success(op);
closure.run(Status.OK()); closure.run(Status.OK());
@ -194,7 +194,7 @@ public class StateMachine extends StateMachineAdapter {
var ws = this.state.getWorkers().get(StateFactory.getServerId()); var ws = this.state.getWorkers().get(StateFactory.getServerId());
if (ws == null) { if (ws == null) {
ws = new WorkerState(StateFactory.getServerId()); // ws = new WorkerState(StateFactory.getServerId());
} }
// 更新当前WorkerState // 更新当前WorkerState
@ -242,12 +242,12 @@ public class StateMachine extends StateMachineAdapter {
// 更新当前WorkerState // 更新当前WorkerState
OperateOld.CallOperate(new OperateOld(OperateType.PUT_WORKERSTATE, new WorkerState(StateFactory.getServerId())), new GenericClosure() { // OperateOld.CallOperate(new OperateOld(OperateType.PUT_WORKERSTATE, new WorkerState(StateFactory.getServerId())), new GenericClosure() {
@Override // @Override
public void run(Status status) { // public void run(Status status) {
log.debug("onStartFollowing update workerstate: {}", status); // log.debug("onStartFollowing update workerstate: {}", status);
} // }
}); // });
return; return;
} catch (Exception e) { } catch (Exception e) {
@ -267,14 +267,14 @@ public class StateMachine extends StateMachineAdapter {
public void onStopFollowing(LeaderChangeContext ctx) { public void onStopFollowing(LeaderChangeContext ctx) {
log.debug("onStopFollowing[{}] {}", StateFactory.getServerId(), ctx); log.debug("onStopFollowing[{}] {}", StateFactory.getServerId(), ctx);
var ws = new WorkerState(StateFactory.getServerId()); // var ws = new WorkerState(StateFactory.getServerId());
var op = new OperateOld(OperateType.PUT_WORKERSTATE, ws); // var op = new OperateOld(OperateType.PUT_WORKERSTATE, ws);
OperateOld.CallOperate(op, new GenericClosure() { // OperateOld.CallOperate(op, new GenericClosure() {
@Override // @Override
public void run(Status status) { // public void run(Status status) {
log.info("{} {}", status, this.getResponse()); // log.info("{} {}", status, this.getResponse());
} // }
}); // });
super.onStopFollowing(ctx); super.onStopFollowing(ctx);
} }

View File

@ -5,11 +5,11 @@ import java.io.Serializable;
import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RemotingException; import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback; import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import com.yuandian.dataflow.statemachine_old.StateFactory; import com.yuandian.dataflow.statemachine_old.StateFactory;
import com.yuandian.dataflow.statemachine_old.closure.GenericClosure; import com.yuandian.dataflow.statemachine_old.closure.GenericClosure;
import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor; import com.yuandian.dataflow.statemachine_old.rpc.OperateProcessor;
import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse; import com.yuandian.dataflow.statemachine_old.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine_old.state.WorkerState;
import com.yuandian.dataflow.utils.PacketsManager; import com.yuandian.dataflow.utils.PacketsManager;
import lombok.Data; import lombok.Data;