diff --git a/pom.xml b/pom.xml
index d35d4da..480ac89 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
1.7.36
1.3.11
2.7.1
- 3.12.11
+ 4.7.0
2.1.0
1.30
1.2.11
@@ -83,11 +83,15 @@
-
- org.mongodb
- mongo-java-driver
- ${mongo.driver.version}
-
+
+
+
+
+ org.mongodb
+ mongodb-driver-sync
+ ${mongo.driver.version}
+
+
diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
index a35b520..14edd27 100644
--- a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
+++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
@@ -18,9 +18,9 @@ import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.google.protobuf.Any;
import com.yuandian.dataflow.statemachine.StateServerFactory;
+import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
-import com.yuandian.dataflow.statemachine.OperateClosure;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
@@ -52,37 +52,38 @@ public class PacketsProcessor implements RpcProcessor{
-
- var ws = state.getWorkers().get(StateServerFactory.getServerId());
- ws.setTaskQueueSize( ws.getTaskQueueSize() - request.packets.size());
- ws.setUpdateAt(Instant.now());
+ ss.readIndexState( new GenericClosure() {
- log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), request.packets.size(), state.getWorkers().size());
-
- Operate.CallOperate(new Operate(OperateType.PUT,ws), new OperateClosure() {
- @Override
- public void run(Status status) {
- var resp = this.getResponse();
- if(status.isOk()) {
- resp.setSuccess(true);
- log.info("{}", resp);
- } else {
- resp.setSuccess(false);
+ @Override
+ public void run(Status status) {
+
+ var state = this.getValue();
+ var ws = state.getWorkers().get(StateServerFactory.getServerId());
+ ws.setTaskQueueSize( ws.getTaskQueueSize() - request.packets.size());
+ ws.setUpdateAt(Instant.now());
+
+ log.debug("workerState taskQueueSize: {} psize: {} state {}", ws.getTaskQueueSize(), request.packets.size(), state.getWorkers().size());
+
+ Operate.CallOperate(new Operate(OperateType.PUT,ws), new GenericClosure() {
+ @Override
+ public void run(Status status) {
+ var resp = this.getResponse();
+ resp.setMsg(rpcCtx.getRemoteAddress());
+ if(status.isOk()) {
+ resp.setSuccess(true);
+ log.info("{}", resp);
+ } else {
+ resp.setSuccess(false);
+ }
+ rpcCtx.sendResponse(resp);
}
-
- rpcCtx.sendResponse(resp);
- }
- });
- });
+ });
+ }
+
+ } );
}
diff --git a/src/main/java/com/yuandian/dataflow/projo/Doc.java b/src/main/java/com/yuandian/dataflow/projo/Doc.java
index e1389a1..3d073fd 100644
--- a/src/main/java/com/yuandian/dataflow/projo/Doc.java
+++ b/src/main/java/com/yuandian/dataflow/projo/Doc.java
@@ -4,33 +4,66 @@ package com.yuandian.dataflow.projo;
import java.time.LocalDateTime;
+import org.bson.BsonDocument;
+import org.bson.BsonInt64;
+import org.bson.BsonValue;
import org.bson.Document;
+import org.bson.codecs.configuration.CodecProvider;
+import org.bson.codecs.configuration.CodecRegistry;
+import org.bson.codecs.pojo.PojoCodecProvider;
+import static com.mongodb.MongoClientSettings.getDefaultCodecRegistry;
+import static org.bson.codecs.configuration.CodecRegistries.fromProviders;
+import static org.bson.codecs.configuration.CodecRegistries.fromRegistries;
import org.bson.codecs.pojo.annotations.BsonProperty;
+import org.bson.conversions.Bson;
+import org.bson.types.ObjectId;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Filters;
import lombok.Getter;
import lombok.Setter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
@Setter
@Getter
-public final class Doc extends Document {
+@ToString
+public final class Doc {
- @JsonProperty("code")
- @BsonProperty("code")
- public int Code ;
+ @BsonProperty("retryPackets")
+ public int retryPackets ;
- @JsonProperty("ts")
- @BsonProperty("ts")
- public LocalDateTime TS;
+ @BsonProperty("serverResponseTime")
+ public int serverResponseTime ;
- @JsonProperty("desc")
- @BsonProperty("desc")
- public String Desc;
+ @BsonProperty("requestBytes")
+ public int requestBytes ;
- @JsonProperty("data")
- @BsonProperty("data")
- public Document Data;
+ @BsonProperty("businessName")
+ public String businessName ;
+
+ @BsonProperty("responseIp")
+ public int responseIp ;
+
+
+
+
+ public static void main(String[] args) {
+ MongoClient mgo = MongoClients.create("mongodb://yuandian:yuandian123@192.168.1.113:27017");
+ CodecProvider pojoCodecProvider = PojoCodecProvider.builder().register("com.yuandian.dataflow.projo").build();
+ CodecRegistry pojoCodecRegistry = fromRegistries(getDefaultCodecRegistry(),fromProviders(pojoCodecProvider));
+
+ MongoDatabase oriDatabase = mgo.getDatabase("ori-database").withCodecRegistry(pojoCodecRegistry);
+ MongoCollection db = oriDatabase.getCollection("network_performace_flow_2022072400", Doc.class);
+ log.debug("{}", db.countDocuments( new BsonDocument("serverResponseTime", new BsonDocument("$gt", new BsonInt64(2083478517) )) ));
+ }
}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java
new file mode 100644
index 0000000..9263085
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/statemachine/MasterFactory.java
@@ -0,0 +1,124 @@
+/**
+ * description
+ *
+ * @author eson
+ *2022年7月20日-10:00:05
+ */
+package com.yuandian.dataflow.statemachine;
+
+import java.time.Instant;
+
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.error.RemotingException;
+import com.alipay.sofa.jraft.rpc.InvokeCallback;
+import com.google.protobuf.Any;
+import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
+import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
+import com.yuandian.dataflow.statemachine.closure.GenericClosure;
+import com.yuandian.dataflow.statemachine.operate.Operate;
+import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
+import com.yuandian.dataflow.statemachine.state.State;
+import com.yuandian.dataflow.statemachine.state.WorkerState;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.var;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Master主线程, 用于接收packets
+ *
+ * @author eson
+ * 2022年7月20日-10:00:05
+ */
+@Slf4j
+@Getter
+@Setter
+@ToString
+public class MasterFactory {
+
+ public static final int MAX_TASKS = 100;
+
+ public static Thread masterExecute = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ log.debug("master execute {}", StateServerFactory.getServerId());
+ var alivePeers = StateServerFactory.getRaftNode().listAlivePeers();
+ log.debug("master execute {}", StateServerFactory.getRaftNode().listAlivePeers());
+ if (alivePeers != null) {
+
+ var ss = StateServerFactory.getStateServer();
+ // 读一致性
+ ss.readIndexState( new GenericClosure() {
+
+ @Override
+ public void run(Status status) {
+ var state = this.getValue();
+
+ alivePeers.forEach((peer) -> {
+ WorkerState ws = state.getWorkers().get(peer);
+ if (ws != null) {
+ //
+ var canDealTasks = MAX_TASKS - ws.getTaskQueueSize();
+ log.debug("cap :{} peer: {}", canDealTasks, peer);
+ if (canDealTasks <= 0) {
+ return;
+ }
+ ws.setUpdateAt(Instant.now());
+ ws.setTaskQueueSize(MAX_TASKS);
+
+ var request = new PacketsRequest();
+ for (int i = 0; i < canDealTasks; i++) {
+ var p = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow
+ .newBuilder()
+ .setTableId(10086)
+ .build());
+ request.getPackets().add(p);
+ }
+
+ Operate.CallOperate(new Operate(OperateType.PUT, ws), new GenericClosure() {
+ @Override
+ public void run(Status status) {
+
+ log.info("{}", status);
+ try {
+ ss.getRpcClient().invokeAsync(peer.getEndpoint(), request,
+ new InvokeCallback() {
+ @Override
+ public void complete(Object result, Throwable err) {
+ log.info("PacketsRequest: {}", result);
+ }
+ }, 5000);
+ } catch (InterruptedException | RemotingException e) {
+ log.info("error send packets {}", e.toString());
+ }
+ }
+ });
+ }
+ });
+ }
+
+ } );
+ }
+
+ Thread.sleep(5000);
+ }
+ } catch (InterruptedException e) {
+ log.info("{}", e.toString());
+ }
+
+ }
+ });
+
+ public static Thread getMasterExecute() {
+ return masterExecute;
+ }
+
+ public static void Init() {
+
+ }
+
+}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
index 3a75347..cf3b6c1 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/StateMachine.java
@@ -16,13 +16,13 @@ import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
+import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor.OperateRequest;
import com.yuandian.dataflow.statemachine.state.State;
-import com.yuandian.dataflow.statemachine.state.StateFactory;
import com.yuandian.dataflow.statemachine.state.WorkerState;
import lombok.var;
@@ -66,12 +66,11 @@ public class StateMachine extends StateMachineAdapter {
while (iter.hasNext()) {
Operate op = null;
- OperateClosure closure = null;
+ GenericClosure closure = null;
if (iter.done() != null) {
// This task is applied by this node, get value from closure to avoid additional
// parsing.
- closure = (OperateClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
- // log.info("SyncDataClosure(done) taskState:{} leaderTerm:{} {}",this.state, this.leaderTerm, closure);
+ closure = (GenericClosure)iter.done(); // 只支持单一个State. 全状态机只支持一种提交
op = closure.getValue();
} else {
@@ -79,11 +78,7 @@ public class StateMachine extends StateMachineAdapter {
final ByteBuffer data = iter.getData();
try {
- op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(
- data.array(),Operate.class.getName());
- // log.info("SyncDataClosure(null) taskState:{} leaderTerm:{}", state, this.leaderTerm);
-
-
+ op = SerializerManager.getSerializer(SerializerManager.Hessian2).deserialize(data.array(),Operate.class.getName());
} catch (CodecException e) {
log.info("{}", e.toString());
}
@@ -101,8 +96,6 @@ public class StateMachine extends StateMachineAdapter {
}
break;
case REMOVE:
-
-
if(closure != null) {
closure.success(op);
closure.run(Status.OK());
@@ -139,33 +132,39 @@ public class StateMachine extends StateMachineAdapter {
@Override
public void onLeaderStart(final long term) {
- log.debug("onLeaderStart {}", StateServerFactory.getCluster().getServerId());
+ log.debug("onLeaderStart {}", StateServerFactory.getServerId());
this.leaderTerm.set(term);
- if(StateFactory.getMasterExecute().isAlive()) {
- StateFactory.getMasterExecute().interrupt();
+ // 判断是否Master线程还在跑, 如果存在则中断
+ if(MasterFactory.getMasterExecute().isAlive()) {
+ MasterFactory.getMasterExecute().interrupt();
}
var ss = StateServerFactory.getStateServer();
- ss.readIndexState((state)->{
- var ws = state.getWorkers().get( StateServerFactory.getServerId() );
- if(ws == null) {
- ws = new WorkerState(StateServerFactory.getServerId());
- // state.getWorkers().put(ss.getCluster().getServerId(), ws);
- }
+ ss.readIndexState( new GenericClosure() {
- Operate op = new Operate(OperateType.PUT, ws);
- ss.applyOperate(op, new OperateClosure() {
- @Override
- public void run(Status status) {
- log.debug("master update workerstate: {}", status);
+ @Override
+ public void run(Status status) {
+
+ var ws = state.getWorkers().get( StateServerFactory.getServerId() );
+ if(ws == null) {
+ ws = new WorkerState(StateServerFactory.getServerId());
}
- });
-
+
+ Operate op = new Operate(OperateType.PUT, ws);
+ ss.applyOperate(op, new GenericClosure() {
+ @Override
+ public void run(Status status) {
+ log.debug("master update workerstate: {}", status);
+ }
+ });
+ }
+
});
- StateFactory.getMasterExecute().start();
-
+ // 当成为master时候 必须启动
+ MasterFactory.getMasterExecute().start();
+
super.onLeaderStart(term);
}
@@ -175,8 +174,9 @@ public class StateMachine extends StateMachineAdapter {
this.leaderTerm.set(-1);
super.onLeaderStop(status);
- if(StateFactory.getMasterExecute().isAlive()) {
- StateFactory.getMasterExecute().interrupt();
+ // 判断是否Master线程还在跑, 如果存在则中断
+ if(MasterFactory.getMasterExecute().isAlive()) {
+ MasterFactory.getMasterExecute().interrupt();
}
@@ -196,18 +196,17 @@ public class StateMachine extends StateMachineAdapter {
log.debug("[onStartFollowing] {} {}", ctx, StateServerFactory.getCluster().getServerId());
try {
- if(StateFactory.getMasterExecute().isAlive()) {
- StateFactory.getMasterExecute().interrupt();
+ // 判断是否Master线程还在跑, 如果存在则中断
+ if(MasterFactory.getMasterExecute().isAlive()) {
+ MasterFactory.getMasterExecute().interrupt();
}
-
- var ss = StateServerFactory.getStateServer();
- var ws = new WorkerState(ss.getCluster().getServerId());
-
+
+ var ws = new WorkerState(StateServerFactory.getServerId());
log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId());
var op = new Operate(OperateType.PUT, ws);
- Operate.CallOperate(op, new OperateClosure() {
+ Operate.CallOperate(op, new GenericClosure() {
@Override
public void run(Status status) {
log.info("{} {}", status, this.getResponse());
@@ -237,7 +236,7 @@ public class StateMachine extends StateMachineAdapter {
log.debug("my: {} leader id {}",StateServerFactory.getServerId(), StateServerFactory.getLeaderId());
var op = new Operate(OperateType.PUT, ws);
- Operate.CallOperate(op, new OperateClosure() {
+ Operate.CallOperate(op, new GenericClosure() {
@Override
public void run(Status status) {
log.info("{} {}", status, this.getResponse());
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java
index 66cad64..c5d724a 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/StateServerFactory.java
@@ -39,6 +39,7 @@ import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
+import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
@@ -73,7 +74,7 @@ public class StateServerFactory {
public static boolean isLeader() {
- return ss.getNode().isLeader() ;
+ return ss.node.isLeader() ;
}
@@ -82,11 +83,15 @@ public class StateServerFactory {
}
public static PeerId getServerId() {
- return ss.getCluster().getServerId();
+ return ss.cluster.getServerId();
}
public static Node getNode() {
- return ss.getNode() ;
+ return ss.node ;
+ }
+
+ public static Node getRaftNode() {
+ return ss.cluster.getRaftNode() ;
}
public static RpcClient getRpcClient() {
@@ -179,12 +184,13 @@ public class StateServerFactory {
}
}
- public void readIndexState(Consumer dofunc) {
+ public void readIndexState(GenericClosure closure) {
getNode().readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
if( status.isOk()) {
- dofunc.accept(ss.fsm.getState());
+ closure.success(ss.fsm.getState());
+ closure.run(status);
}
}
} );
@@ -192,7 +198,7 @@ public class StateServerFactory {
- public void applyOperate(Operate op, OperateClosure closure) {
+ public void applyOperate(Operate op, GenericClosure closure) {
// 所有的提交都必须再leader进行
if (!ss.isLeader()) {
ss.handlerNotLeaderError(closure);
@@ -227,7 +233,7 @@ public class StateServerFactory {
return response;
}
- public void handlerNotLeaderError(final OperateClosure closure) {
+ public void handlerNotLeaderError(final GenericClosure closure) {
closure.failure("Not leader.", redirect().getRedirect());
closure.run(new Status(RaftError.EPERM, "Not leader"));
}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/OperateClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java
similarity index 67%
rename from src/main/java/com/yuandian/dataflow/statemachine/OperateClosure.java
rename to src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java
index 69a10e7..002bed1 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/OperateClosure.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/closure/GenericClosure.java
@@ -1,4 +1,4 @@
-package com.yuandian.dataflow.statemachine;
+package com.yuandian.dataflow.statemachine.closure;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
@@ -19,31 +19,28 @@ import org.slf4j.LoggerFactory;
@Getter
@Setter
@ToString
-public abstract class OperateClosure implements Closure {
+public abstract class GenericClosure implements Closure {
// 状态机的统一响应
- private RaftResponse response;
+ private RaftResponse response;
// 代表任务状态
- private Operate value;
+ private T value;
-
-
-
- public OperateClosure() {
+ public GenericClosure() {
}
public void failure(final String errorMsg, final PeerId redirect) {
- final RaftResponse response = new RaftResponse();
+ final RaftResponse response = new RaftResponse();
response.setSuccess(false);
response.setMsg(errorMsg);
response.setRedirect(redirect);
setResponse(response);
}
- public void success(final Operate value) {
- final RaftResponse response = new RaftResponse();
- response.setOperate(value);
+ public void success(final T value) {
+ final RaftResponse response = new RaftResponse();
+ response.setValue(value);
response.setSuccess(true);
setResponse(response);
}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/closure/StateClosure.java b/src/main/java/com/yuandian/dataflow/statemachine/closure/StateClosure.java
new file mode 100644
index 0000000..e49a805
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/statemachine/closure/StateClosure.java
@@ -0,0 +1,5 @@
+package com.yuandian.dataflow.statemachine.closure;
+
+public class StateClosure {
+
+}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java
index bab840a..0194b1a 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/operate/Operate.java
@@ -4,8 +4,8 @@ import java.io.Serializable;
import com.alipay.sofa.jraft.error.RemotingException;
import com.alipay.sofa.jraft.rpc.InvokeCallback;
-import com.yuandian.dataflow.statemachine.OperateClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory;
+import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.rpc.OperateProcessor;
import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
import com.yuandian.dataflow.statemachine.state.WorkerState;
@@ -45,14 +45,21 @@ public class Operate implements Serializable {
return;
};
- public static void CallOperate(Operate op, OperateClosure closure) {
+ /**
+ * 调用操作设置
+ * @param op 传入的操作类
+ * @param closure 回调函数. Operate为返回值
+ */
+ public static void CallOperate(Operate op, GenericClosure closure) {
var ss = StateServerFactory.getStateServer();
- if (StateServerFactory.isLeader()) {
+ // 如果是leader 就直接提交
+ if (StateServerFactory.isLeader()) {
ss.applyOperate(op, closure);
return;
}
+ // 非leader 转发请求 统一有leader处理
var request = new OperateProcessor.OperateRequest();
request.setOperate(op);
@@ -64,14 +71,14 @@ public class Operate implements Serializable {
@Override
public void complete(Object result, Throwable err) {
log.info("{}", result);
- var resp = (RaftResponse) result;
+ //TODO: 解决回调的次序问题
+ var resp = (RaftResponse) result;
closure.setResponse(resp);
- closure.success(resp.getOperate());
+ closure.success(resp.getValue());
}
}, 5000);
} catch (InterruptedException | RemotingException e) {
- // TODO Auto-generated catch block
closure.failure("failure", null);
log.info("{}", e.toString());
}
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java
index b8b4c8a..96a825a 100644
--- a/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java
+++ b/src/main/java/com/yuandian/dataflow/statemachine/rpc/OperateProcessor.java
@@ -17,8 +17,8 @@ import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.yuandian.dataflow.statemachine.StateServerFactory;
-import com.yuandian.dataflow.statemachine.OperateClosure;
import com.yuandian.dataflow.statemachine.StateServerFactory.StateServer;
+import com.yuandian.dataflow.statemachine.closure.GenericClosure;
import com.yuandian.dataflow.statemachine.operate.Operate;
import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
import com.yuandian.dataflow.statemachine.state.State;
@@ -64,7 +64,7 @@ public class OperateProcessor implements RpcProcessor implements Serializable {
private static final long serialVersionUID = 1L;
- private Operate operate;
+ private T value;
private boolean success;
diff --git a/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java b/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java
deleted file mode 100644
index 65e5cb2..0000000
--- a/src/main/java/com/yuandian/dataflow/statemachine/state/StateFactory.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * description
- *
- * @author eson
- *2022年7月20日-10:00:05
- */
-package com.yuandian.dataflow.statemachine.state;
-
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import com.alibaba.nacos.shaded.io.grpc.netty.shaded.io.netty.handler.codec.http.HttpContentEncoder.Result;
-import com.alipay.sofa.jraft.Status;
-import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.error.RemotingException;
-import com.alipay.sofa.jraft.rpc.InvokeCallback;
-import com.google.protobuf.Any;
-import com.yuandian.dataflow.controller.PacketsProcessor.PacketsRequest;
-import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
-import com.yuandian.dataflow.statemachine.StateServerFactory;
-import com.yuandian.dataflow.statemachine.operate.Operate;
-import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
-import com.yuandian.dataflow.statemachine.OperateClosure;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import lombok.var;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * description
- *
- * @author eson
- * 2022年7月20日-10:00:05
- */
-@Slf4j
-@Getter
-@Setter
-@ToString
-public class StateFactory {
-
- @Getter
- @Setter
- @ToString
- public static class PeerIdCap {
- private PeerId peer;
- private long cap;
-
- public PeerIdCap(PeerId pid, long cap) {
- this.peer = pid;
- this.cap = cap;
- }
- }
-
- public static Thread masterExecute = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- while (true) {
- log.debug("master execute {}", StateServerFactory.getServerId());
- var alivePeers = StateServerFactory.getCluster().getRaftNode().listAlivePeers();
- log.debug("master execute {}", StateServerFactory.getCluster().getRaftNode().listAlivePeers());
- if (alivePeers != null) {
-
- var ss = StateServerFactory.getStateServer();
- // var state = ss.getFsm().getState();
-
- ss.readIndexState((state) -> {
- alivePeers.forEach((peer) -> {
- WorkerState ws = state.getWorkers().get(peer);
- if (ws != null) {
- var cap = 100 - ws.getTaskQueueSize();
- log.debug("cap :{} peer: {}", cap, peer);
- if (cap <= 0) {
- return;
- }
- ws.setUpdateAt(Instant.now());
- ws.setTaskQueueSize(100);
-
- var request = new PacketsRequest();
- for (int i = 0; i < cap; i++) {
- var p = Any.pack(
- BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder()
- .setTableId(10086)
- .build());
- request.getPackets().add(p);
- }
-
- var op = new Operate(OperateType.PUT, ws);
-
- Operate.CallOperate(op, new OperateClosure() {
- @Override
- public void run(Status status) {
-
- log.info("{}", status);
-
- try {
- ss.getRpcClient().invokeAsync(peer.getEndpoint(),
- request, new InvokeCallback() {
-
- @Override
- public void complete(Object result, Throwable err) {
- log.info("{}", result);
- }
-
- }, 5000);
- } catch (InterruptedException | RemotingException e) {
- log.info("error send packets {}", e.toString());
- }
- }
- });
-
- }
- });
-
- });
-
- // ss.applyState(state, new SyncClosure() {
- // public void run(Status status) {
- // log.debug("{}", status);
- // };
- // });
-
- }
-
- Thread.sleep(5000);
- }
- } catch (InterruptedException e) {
- log.info("{}", e.toString());
- }
-
- }
- });
-
- public static Thread getMasterExecute() {
- return masterExecute;
- }
-
- public static void Init() {
-
- }
-
-}
diff --git a/src/test/java/com/yuandian/dataflow/AppTest.java b/src/test/java/com/yuandian/dataflow/AppTest.java
index 1117839..88cdb2f 100644
--- a/src/test/java/com/yuandian/dataflow/AppTest.java
+++ b/src/test/java/com/yuandian/dataflow/AppTest.java
@@ -23,7 +23,8 @@ import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
// import org.springframework.expression.spel.ast.FunctionReference;
-import com.mongodb.MongoClient;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
import com.mongodb.client.model.InsertManyOptions;
import com.yuandian.dataflow.projo.Doc;
@@ -122,75 +123,77 @@ public class AppTest {
}
}
- @Test
- public void Mongodb() throws InterruptedException {
+ // @Test
+ // public void Mongodb() throws InterruptedException {
- ArrayList execs = new ArrayList<>();
+ // ArrayList execs = new ArrayList<>();
- final Metric metric = new Metric();
- metric.start();
- for (int c = 0; c < 10; c++) {
- Thread exec = new Thread(() -> {
+ // final Metric metric = new Metric();
+ // metric.start();
+ // for (int c = 0; c < 10; c++) {
+ // Thread exec = new Thread(() -> {
- @Cleanup
- MongoClient mgo = new MongoClient("localhost", 27017);
+ // @Cleanup
+ // MongoClient mgo = MongoClients.create("mongodb://yuandian:yuandian123@192.168.1.113:27017");
+
+
- log.info("msg");
+ // log.info("msg");
- long LoopNumber = 5;
- long BatchSize = 20000;
+ // long LoopNumber = 5;
+ // long BatchSize = 20000;
- var db = mgo.getDatabase("yuandian");
- var cltdoc = db.getCollection("doc");
+ // var db = mgo.getDatabase("yuandian");
+ // var cltdoc = db.getCollection("doc");
- for (int n = 0; n < LoopNumber; n++) {
+ // for (int n = 0; n < LoopNumber; n++) {
- metric.push(() -> {
+ // metric.push(() -> {
- List documents = new ArrayList<>();
- Random r = new Random();
+ // List documents = new ArrayList<>();
+ // Random r = new Random();
- for (int i = 0; i < BatchSize; i++) {
+ // for (int i = 0; i < BatchSize; i++) {
- var doc = new Doc();
- var datadoc = new Document();
+ // var doc = new Doc();
+ // var datadoc = new Document();
- doc.append("code", r.nextInt(100));
- doc.append("desc", "desc");
- doc.append("ts", Instant.now());
+ // doc.append("code", r.nextInt(100));
+ // doc.append("desc", "desc");
+ // doc.append("ts", Instant.now());
- for (int ii = 0; ii < 24; ii++) {
- UUID uid = UUID.randomUUID();
- datadoc
- .append(uid.toString(), uid.toString());
- }
+ // for (int ii = 0; ii < 24; ii++) {
+ // UUID uid = UUID.randomUUID();
+ // datadoc
+ // .append(uid.toString(), uid.toString());
+ // }
- doc.append("data", datadoc);
- documents.add(doc);
- }
+ // doc.append("data", datadoc);
+ // documents.add(doc);
+ // }
- var opt = new InsertManyOptions();
- cltdoc.insertMany(documents, opt);
- return BatchSize;
- });
- }
- });
- exec.start();
- execs.add(exec);
- }
- ;
+ // var opt = new InsertManyOptions();
+ // cltdoc.insertMany(documents, opt);
+ // return BatchSize;
+ // });
+ // }
+ // });
+ // exec.start();
+ // execs.add(exec);
+ // }
+ // ;
- execs.forEach((e) -> {
- try {
- e.join();
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- });
+ // execs.forEach((e) -> {
+ // try {
+ // e.join();
+ // } catch (InterruptedException e1) {
+ // e1.printStackTrace();
+ // }
+ // });
- metric.close();
+ // metric.close();
- }
+ // }
diff --git a/src/test/java/com/yuandian/dataflow/MongodbTest.java b/src/test/java/com/yuandian/dataflow/MongodbTest.java
index 7822bd7..424ed9e 100644
--- a/src/test/java/com/yuandian/dataflow/MongodbTest.java
+++ b/src/test/java/com/yuandian/dataflow/MongodbTest.java
@@ -1,6 +1,6 @@
package com.yuandian.dataflow;
-import com.mongodb.MongoClient;
+
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection;
@@ -18,73 +18,73 @@ import java.util.List;
public class MongodbTest {
public static void insertMsgToMongoDB(T obj) {
- try {
- ServerAddress serverAddress = new ServerAddress("192.168.1.113", 27017);
- List addrs = new ArrayList<>();
- addrs.add(serverAddress);
+ // try {
+ // ServerAddress serverAddress = new ServerAddress("192.168.1.113", 27017);
+ // List addrs = new ArrayList<>();
+ // addrs.add(serverAddress);
- MongoCredential credential = MongoCredential.createScramSha1Credential("yd-rw", "admin", "yuandian123".toCharArray());
- List credentials = new ArrayList<>();
- credentials.add(credential);
+ // MongoCredential credential = MongoCredential.createScramSha1Credential("yd-rw", "admin", "yuandian123".toCharArray());
+ // List credentials = new ArrayList<>();
+ // credentials.add(credential);
- MongoClient mongoClient = new MongoClient(addrs, credentials);
+ // MongoClient mongoClient = new MongoClient(addrs, credentials);
- MongoDatabase db = mongoClient.getDatabase("yd-base");
+ // MongoDatabase db = mongoClient.getDatabase("yd-base");
- // todo 修改名字
- MongoCollection collection = db.getCollection("lxy-test");
+ // // todo 修改名字
+ // MongoCollection collection = db.getCollection("lxy-test");
- collection.insertOne(obj2Doc(obj));
+ // collection.insertOne(obj2Doc(obj));
- System.err.println("insert success");
- } catch (Exception e) {
- log.info("{}", e.toString());
- }
- }
+ // System.err.println("insert success");
+ // } catch (Exception e) {
+ // log.info("{}", e.toString());
+ // }
+ // }
- public static Document obj2Doc(T obj) throws Exception {
- Document doc = new Document();
- Field[] fields = obj.getClass().getDeclaredFields();
- for (Field field : fields) {
- String varName = field.getName();
- boolean accessFlag = field.isAccessible();
- if (!accessFlag) {
- field.setAccessible(true);
- }
- Object param = field.get(obj);
- if (param == null) {
- continue;
- } else if (param instanceof Integer) {
- int value = ((Integer) param).intValue();
- doc.put(varName, value);
- } else if (param instanceof String) {
- String value = (String) param;
- doc.put(varName, value);
- } else if (param instanceof Double) {
- double value = ((Double) param).doubleValue();
- doc.put(varName, value);
- } else if (param instanceof Float) {
- float value = ((Float) param).floatValue();
- doc.put(varName, value);
- } else if (param instanceof Long) {
- long value = ((Long) param).longValue();
- doc.put(varName, value);
- } else if (param instanceof Boolean) {
- boolean value = ((Boolean) param).booleanValue();
- doc.put(varName, value);
- }
- field.setAccessible(accessFlag);
- }
- return doc;
- }
+ // public static Document obj2Doc(T obj) throws Exception {
+ // Document doc = new Document();
+ // Field[] fields = obj.getClass().getDeclaredFields();
+ // for (Field field : fields) {
+ // String varName = field.getName();
+ // boolean accessFlag = field.isAccessible();
+ // if (!accessFlag) {
+ // field.setAccessible(true);
+ // }
+ // Object param = field.get(obj);
+ // if (param == null) {
+ // continue;
+ // } else if (param instanceof Integer) {
+ // int value = ((Integer) param).intValue();
+ // doc.put(varName, value);
+ // } else if (param instanceof String) {
+ // String value = (String) param;
+ // doc.put(varName, value);
+ // } else if (param instanceof Double) {
+ // double value = ((Double) param).doubleValue();
+ // doc.put(varName, value);
+ // } else if (param instanceof Float) {
+ // float value = ((Float) param).floatValue();
+ // doc.put(varName, value);
+ // } else if (param instanceof Long) {
+ // long value = ((Long) param).longValue();
+ // doc.put(varName, value);
+ // } else if (param instanceof Boolean) {
+ // boolean value = ((Boolean) param).booleanValue();
+ // doc.put(varName, value);
+ // }
+ // field.setAccessible(accessFlag);
+ // }
+ // return doc;
+ // }
- public static T doc2Obj(Document doc, Class clazz) throws Exception {
- T obj = clazz.newInstance();
- for (String key : doc.keySet()) {
- Field field = clazz.getDeclaredField(key);
- field.setAccessible(true);
- field.set(obj, doc.get(key));
- }
- return obj;
+ // public static T doc2Obj(Document doc, Class clazz) throws Exception {
+ // T obj = clazz.newInstance();
+ // for (String key : doc.keySet()) {
+ // Field field = clazz.getDeclaredField(key);
+ // field.setAccessible(true);
+ // field.set(obj, doc.get(key));
+ // }
+ // return obj;
}
}