diff --git a/.gitignore b/.gitignore
index 674db64..ffc6ed1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -79,7 +79,7 @@ local.properties
.classpath
# Annotation Processing
-.apt_generated
+.apt_generated*
.sts4-cache/
@@ -202,4 +202,4 @@ README.html
raftdata
-screenlog.*
\ No newline at end of file
+screenlog.*
diff --git a/.gitmodules b/.gitmodules
index 14b84f0..52f0e1a 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +1,4 @@
[submodule "src/main/proto"]
path = src/main/proto
url = http://git.yuandian.com/project/proto/dataflow
+ branch = v1.0.3
\ No newline at end of file
diff --git a/.vscode/launch.json b/.vscode/launch.json
new file mode 100644
index 0000000..979edf3
--- /dev/null
+++ b/.vscode/launch.json
@@ -0,0 +1,19 @@
+{
+ // Use IntelliSense to learn about possible attributes.
+ // Hover to view descriptions of existing attributes.
+ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
+ "version": "0.2.0",
+ "configurations": [
+ {
+ "type": "java",
+ "name": "Launch Server",
+ "request": "launch",
+ "mainClass": "com.yuandian.dataflow.Server",
+ "projectName": "dataflow",
+ "args": ["2"],
+ "preLaunchTask": "restart",
+ "postDebugTask": "stopall",
+
+ }
+ ]
+}
\ No newline at end of file
diff --git a/.vscode/settings.json b/.vscode/settings.json
new file mode 100644
index 0000000..e0f15db
--- /dev/null
+++ b/.vscode/settings.json
@@ -0,0 +1,3 @@
+{
+ "java.configuration.updateBuildConfiguration": "automatic"
+}
\ No newline at end of file
diff --git a/.vscode/tasks.json b/.vscode/tasks.json
new file mode 100644
index 0000000..029d8f2
--- /dev/null
+++ b/.vscode/tasks.json
@@ -0,0 +1,34 @@
+{
+ "version": "2.0.0",
+ "tasks": [
+ {
+ "label": "restart",
+ "type": "shell",
+ "command": "sh restart.sh",
+ "isBackground": false,
+ "presentation": {
+ "echo": true,
+ "reveal": "always",
+ "focus": false,
+ "panel": "new",
+ "showReuseMessage": true,
+ "clear": false,
+ "close": true
+
+ },
+ },
+ {
+ "label": "stopall",
+ "type": "shell",
+ "command": "sh stop.sh",
+ "presentation": {
+ "echo": true,
+ "reveal": "always",
+ "focus": false,
+ "panel": "shared",
+ "close": true
+ },
+
+ }
+ ]
+}
\ No newline at end of file
diff --git a/assembly.xml b/assembly.xml
new file mode 100644
index 0000000..e1c5d6e
--- /dev/null
+++ b/assembly.xml
@@ -0,0 +1,44 @@
+
+
+ bin
+
+ dir
+ tar.gz
+
+ false
+
+
+ true
+ lib
+ runtime
+
+
+
+
+
+
+
+
+ 0664
+ ${project.build.directory}
+
+
+ *.jar
+
+
+
+
+
+
+
+ ${project.directory}
+ false
+
+
+ lib
+
+
+
+
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index fc86a3e..05b5a17 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,9 +11,9 @@
UTF-8
- 8
- 8
- 8
+ 11
+ 11
+ 11
3.20.1
1.7.4
@@ -21,12 +21,12 @@
2.3.0
1.32.3
1.7.36
- 1.3.10
- 2.7.0
- 3.12.11
+ 1.3.11
+ 2.7.1
+ 4.7.0
2.1.0
1.30
-
+ 1.2.11
1.0.4
@@ -47,13 +47,27 @@
${yuandian.common.config.version}
+
+
+
+
+ ch.qos.logback
+ logback-classic
+ ${logback.version}
+
+
+
+ ch.qos.logback
+ logback-core
+ ${logback.version}
+
org.slf4j
slf4j-api
${slf4j.version}
-
+
org.yaml
@@ -69,19 +83,28 @@
-
- org.mongodb
- mongo-java-driver
- ${mongo.driver.version}
-
-
-
+
+
+
+ org.mongodb
+ mongodb-driver-sync
+ ${mongo.driver.version}
+
+
+
+
+ org.reflections
+ reflections
+ 0.10.2
+
+
+
@@ -128,13 +151,14 @@
${grpc.version}
-
+
org.projectlombok
lombok
1.18.24
provided
+
@@ -191,12 +215,13 @@
os-maven-plugin
1.6.2
-
+
org.xolstice.maven.plugins
protobuf-maven-plugin
0.6.1
+
grpc-java
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
@@ -212,21 +237,77 @@
test-compile-custom
+
+
+
- org.springframework.boot
- spring-boot-maven-plugin
- ${spring.boot.version}
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 3.2.2
+
+
+
+ true
+ lib/
+ com.yuandian.dataflow.Server
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy-dependencies
+ prepare-package
+
+ copy-dependencies
+
+
+ false
+ false
+ true
+ ${project.build.directory}/lib
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ make-assembly
+ assembly
- repackage
+ single
+
+ false
+
+
+ com.yuandian.dataflow.Server
+
+
+
+ assembly.xml
+
+
+
+
+
+
+
org.apache.maven.plugins
maven-release-plugin
diff --git a/restart.sh b/restart.sh
new file mode 100755
index 0000000..8e0f02d
--- /dev/null
+++ b/restart.sh
@@ -0,0 +1,4 @@
+#! /bin/bash
+
+sh stop.sh & rm raftdata/ -rf && mvn package && truncate -s 0 screenlog.0
+sh start.sh
diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java
index 9a0d35a..7b3723e 100644
--- a/src/main/java/com/yuandian/dataflow/Server.java
+++ b/src/main/java/com/yuandian/dataflow/Server.java
@@ -1,25 +1,15 @@
package com.yuandian.dataflow;
-import java.io.File;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.SpringBootConfiguration;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.slf4j.MarkerFactory;
import com.alipay.sofa.jraft.JRaftUtils;
-import com.alipay.sofa.jraft.Node;
-import com.alipay.sofa.jraft.RaftGroupService;
import com.alipay.sofa.jraft.conf.Configuration;
-import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.option.NodeOptions;
-import com.yuandian.dataflow.statemachine.RaftClosure;
-import com.yuandian.dataflow.statemachine.StateMachine;
-import com.yuandian.dataflow.statemachine.StateServer;
-import com.yuandian.dataflow.statemachine.rpc.SyncDataProcessor;
+import com.yuandian.dataflow.statemachine.StateFactory;
-import lombok.var;
+
+import lombok.extern.slf4j.Slf4j;
@@ -28,39 +18,41 @@ import lombok.var;
* Hello world!
*
*/
-@SpringBootApplication
-@SpringBootConfiguration
+@Slf4j
public class Server {
- @Autowired
- public static Node node;
- public static RaftClosure done;
- private static StateServer stateServer;
-
- public static Node GetNode() {
- return node;
- }
-
- public static RaftClosure GetDone() {
- return done;
- }
+ public static String peeridstr;
+ public static String sprPort;
+ public static Configuration conf ;
-
- public static void main(String[] args) {
-
+ public static void main(String[] args) throws Exception {
+
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
- String[] sprPeers = new String[]{"3440","3441","3442"};
- var peeridstr = peers[ Integer.parseInt(args[0] )];
- var sprPort = sprPeers[Integer.parseInt(args[0] )];
+
+ var peeridx = Integer.parseInt(args[0]);
+ var peeridstr = peers[ peeridx ];
+
+ // var peeridstr = peers[2];
+ // var sprPort = sprPeers[2];
+ log.info("{} {}", peeridstr, sprPort);
- Configuration conf = JRaftUtils.getConfiguration("localhost:4440,localhost:4441,localhost:4442");
- stateServer = new StateServer(peeridstr, conf);
+ conf = JRaftUtils.getConfiguration(String.join(",", peers));
+ StateFactory.startStateServer(peeridstr, conf);
+
- System.setProperty("server.port", sprPort);
- var app = SpringApplication.run(Server.class, args);
- app.start();
+ // System.setProperty("server.port", sprPort);
+ // ConfigurableApplicationContext app = SpringApplication.run(Server.class, args);
+ // StateServerFactory.setAppCxt(app);
+ // app.addApplicationListener(new SpringReadyEvent());
+ // app.start();
+
}
+
+
+
+
+
}
diff --git a/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
new file mode 100644
index 0000000..b215770
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/controller/PacketsProcessor.java
@@ -0,0 +1,99 @@
+/**
+ * description
+ *
+ * @author eson
+ *2022年7月21日-13:48:01
+ */
+package com.yuandian.dataflow.controller;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.ArrayList;
+
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.rpc.RpcContext;
+import com.alipay.sofa.jraft.rpc.RpcProcessor;
+import com.google.protobuf.Any;
+import com.yuandian.dataflow.statemachine.StateFactory;
+import com.yuandian.dataflow.statemachine.closure.GenericClosure;
+import com.yuandian.dataflow.statemachine.operate.Operate;
+import com.yuandian.dataflow.statemachine.operate.Operate.OperateType;
+import com.yuandian.dataflow.statemachine.rpc.RaftResponse;
+import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
+import com.yuandian.dataflow.statemachine.state.State;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * description
+ *
+ * @author eson
+ * 2022年7月21日-13:48:01
+ */
+
+@Slf4j
+@ProcessorRaft
+public class PacketsProcessor implements RpcProcessor {
+
+ @Setter
+ @Getter
+ public static class PacketsRequest implements Serializable {
+ private ArrayList packets = new ArrayList<>();
+ }
+
+ @Override
+ public void handleRequest(RpcContext rpcCtx, PacketsRequest request) {
+ // StateServerFactory.getStateServer().updateFsmStateAsync(s, onCompleted);
+ var resp = new RaftResponse<>();
+ resp.setSuccess(true);
+ rpcCtx.sendResponse(resp); // 因为jraft是异步. 收到数据直接返回
+
+ try {
+ log.debug("{} handler request.packets.size(): {}", StateFactory.getServerId(), request.packets.size());
+ // TODO: request.packets 入库,回填, 告警 等操作
+
+
+
+ } finally { // 确保 更新 最终的任务状态给master.
+
+ // 读状态 Closure 里的 getValue为 State的状态
+ StateFactory.readIndexState(new GenericClosure() {
+
+ @Override
+ public void run(Status status) {
+
+ if (!status.isOk()) {
+ log.error("失败 readIndexState {}", status);
+ }
+
+ // readIndexState 失败后也需要直接 更新自己状态
+
+ var state = this.getValue(); // 获取返回的状态
+ var ws = state.getWorkers().get(StateFactory.getServerId());
+ ws.setTaskQueueSize(ws.getTaskQueueSize() - request.packets.size()); // 减少 现有的任务数量
+ ws.setUpdateAt(Instant.now()); // 设置更新时间
+
+ Operate.CallOperate(new Operate(OperateType.PUT_WORKERSTATE, ws),
+ new GenericClosure() {
+ @Override
+ public void run(Status status) {
+ if (!status.isOk()) {
+ log.error("CallOperate [{}] {}", StateFactory.getServerId(), resp);
+ }
+ }
+ });
+
+ }
+ });
+ }
+ ;
+
+ }
+
+ @Override
+ public String interest() {
+ return PacketsRequest.class.getName();
+ }
+}
diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java
deleted file mode 100644
index ed07bc8..0000000
--- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package com.yuandian.dataflow.controller;
-
-import java.nio.ByteBuffer;
-
-import com.alipay.sofa.jraft.Closure;
-import com.alipay.sofa.jraft.entity.Task;
-
-import com.google.protobuf.util.JsonFormat;
-import com.yuandian.dataflow.Server;
-import com.yuandian.dataflow.grpc.MongodbTest;
-import com.yuandian.dataflow.projo.Response;
-import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.commons.lang.ObjectUtils.Null;
-import org.apache.commons.logging.Log;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Controller;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import com.alipay.sofa.jraft.Node;
-
-@Slf4j
-@Controller
-public class TaskLog {
-
- // private static final Logger log = LoggerFactory.getLogger(TaskLog.class);
- private static Node node = Server.GetNode();
-
- @PostMapping(path = "/test")
- public ResponseEntity Processing(@RequestBody String json) {
-
- /*Task task = new Task();
-
- log.error(node.toString());
-
- RaftClosure done = new RaftClosure();
- task.setData(ByteBuffer.wrap("hello".getBytes()));
- task.setDone(done);
- Server.GetNode().apply(task);*/
-
- try {
- // 1、类型转换
- BacktrackingFlowOuterClass.BacktrackingFlow.Builder builder = BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder();
- JsonFormat.parser().merge(json, builder);
- BacktrackingFlowOuterClass.BacktrackingFlow backtrackingFlow = builder.build();
-
- // 2、业务处理
-
- // 3、数据保存到 mongoDB
- MongodbTest.insertMsgToMongoDB(backtrackingFlow);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- Response response = new Response();
- response.Code = HttpStatus.OK;
- response.Message = HttpStatus.OK.toString();
- return new ResponseEntity(response, HttpStatus.OK);
- }
-
- @GetMapping(path = "/test2")
- public ResponseEntity MongodbTest(@RequestBody int status) {
-
- Response response = new Response();
- return new ResponseEntity(response, HttpStatus.OK);
- }
-}
diff --git a/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java
new file mode 100644
index 0000000..3bb8f35
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/controller/TransferLeaderProcessor.java
@@ -0,0 +1,42 @@
+package com.yuandian.dataflow.controller;
+
+import java.io.Serializable;
+
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.rpc.RpcContext;
+import com.alipay.sofa.jraft.rpc.RpcProcessor;
+import com.yuandian.dataflow.statemachine.StateFactory;
+import com.yuandian.dataflow.statemachine.rpc.annotations.ProcessorRaft;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * 例子 强制转换leader
+ */
+@Slf4j
+@ProcessorRaft
+public class TransferLeaderProcessor implements RpcProcessor {
+
+ @Setter
+ @Getter
+ public static class LeaderRequest implements Serializable {
+ PeerId peer;
+ }
+
+ @Override
+ public void handleRequest(RpcContext rpcCtx, LeaderRequest request) {
+ Status status = StateFactory.getCluster().getRaftNode().transferLeadershipTo(request.peer);
+ rpcCtx.sendResponse(status);
+ log.debug("[TransferLeader] {} change leader to {}", status, request.peer);
+ }
+
+ @Override
+ public String interest() {
+ return LeaderRequest.class.getName();
+ }
+
+}
diff --git a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java
index 45ce4ea..7030e64 100644
--- a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java
+++ b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java
@@ -16,7 +16,6 @@ import java.util.stream.Collectors;
import com.google.protobuf.*;
import com.google.protobuf.util.JsonFormat;
import com.yuandian.common.Config;
-import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc;
@@ -26,10 +25,10 @@ import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServer
import com.yuandian.dataflow.proto.msgtype.*;
import io.grpc.ManagedChannelBuilder;
-import lombok.var;
+
import lombok.extern.slf4j.Slf4j;
-import org.springframework.http.*;
-import org.springframework.web.client.RestTemplate;
+// import org.springframework.http.*;
+// import org.springframework.web.client.RestTemplate;
/**
* description
@@ -81,7 +80,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
log.info("次序:{} 条数: {}, {}:ms", count, i, Duration.between(now, Instant.now()).toMillis());
}
} catch (Exception e) {
- e.printStackTrace();
+ log.info("{}", e.toString());
} finally {
channel.shutdown().awaitTermination(1, TimeUnit.SECONDS);
}
@@ -99,7 +98,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
+ log.info("{}", e.toString());
}
return null;
});
@@ -110,7 +109,7 @@ public class CollectPackets extends CollectPacketsServerImplBase {
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
+ log.info("{}", e.toString());
}
return null;
});*/
@@ -120,18 +119,18 @@ public class CollectPackets extends CollectPacketsServerImplBase {
try {
var result = p.unpack(BacktrackingFlowOuterClass.BacktrackingFlow.class);
System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
+
+ // RestTemplate client = new RestTemplate();
+ // HttpHeaders headers = new HttpHeaders();
+ // headers.setContentType(MediaType.APPLICATION_JSON);
- RestTemplate client = new RestTemplate();
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON);
+ // HttpEntity