diff --git a/design_com.puml b/design_com.puml new file mode 100644 index 0000000..2abd880 --- /dev/null +++ b/design_com.puml @@ -0,0 +1,43 @@ +@startuml 数据流组件图 + +package "探针组" { + [探针1] - grpc1 + [探针2] - grpc2 + [探针3] - grpc3 + [探针4] - grpc4 +} + +node "数据流" { + grpc1 --> [数据流1] + grpc2 --> [数据流2] + grpc3 --> [数据流3] + grpc4 --> [数据流4] +} + +node "配置中心" { + nacos -left-> 数据流 + nacos -left-> 探针组 + nacos -down-> 大数据计算 +} + +database "mongodb" { + node "mongod-ori-master" + node "mongod-ori-slave" + + node "mongod-bmp-master" + node "mongod-bmp-slave" + + 数据流 -down-> mongodb +} + + +node "大数据计算" { + [计算任务1] + [计算任务2] + [计算任务3] + [计算任务4] +} + +mongodb -> 大数据计算 + +@enduml \ No newline at end of file diff --git a/design_main.puml b/design_main.puml new file mode 100644 index 0000000..a985938 --- /dev/null +++ b/design_main.puml @@ -0,0 +1,26 @@ +@startuml 时序图 + +配置中心 <- 探针组: 读取配置 +数据流 -> 配置中心: 读取配置 + +group 数据流处理过程 + 探针组 -> 数据流: 数据流jraft(master) 通过 grpc protobuf数据 + 数据流 -> 数据流: 分配数据到jraft节点处理 + 数据流 -> 数据流: jraft状态机做状态同步 + 数据流 -> 数据流: 根据数据类型做处理 + 数据流 -> 配置中心: 更新配置 + 数据流 -> 数据存储: 入库 mongodb +end group + + +group 大数据计算过程 + 配置中心 -> 大数据计算: 读取相关配置 + 数据存储 <- 大数据计算: 读取需要的数据 + 大数据计算 -> 大数据计算: 计算需求 + 大数据计算 -> 配置中心: 更新配置 + 大数据计算 -> 数据存储: 提炼后的数据入库 +end group + + + +@enduml \ No newline at end of file diff --git a/pom.xml b/pom.xml index e454114..fc86a3e 100644 --- a/pom.xml +++ b/pom.xml @@ -197,7 +197,6 @@ protobuf-maven-plugin 0.6.1 - grpc-java com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} @@ -213,7 +212,6 @@ test-compile-custom - @@ -263,6 +261,7 @@ true + diff --git a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java index 0492355..9967ad7 100644 --- a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java +++ b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java @@ -49,14 +49,14 @@ public class CollectPackets extends CollectPacketsServerImplBase { Config.UseConfig( (cnf) -> { var tid = cnf.get("test_id"); - log.info("{}",tid); + log.info("config {}",tid); return null; }); Map> domap = new HashMap(); - System.out.println( Any.pack(ApmBaseDataFlow.newBuilder().build()).getTypeUrl() ); - System.out.println( AppFlow.class.hashCode() ); + log.info("{}", Any.pack(ApmBaseDataFlow.newBuilder().build()).getTypeUrl() ); + log.info("{}", AppFlow.class.hashCode() ); domap.put(Any.pack(ApmBaseDataFlow.newBuilder().build()).getTypeUrl(), (p)->{ try { @@ -131,7 +131,7 @@ public class CollectPackets extends CollectPacketsServerImplBase { // MyThing.Response response = blockingStub.sayHi(request); // System.out.println(response.getName()); // } - var managedChannelBuilder = ManagedChannelBuilder.forAddress("10.1.1.117", 50051); + var managedChannelBuilder = ManagedChannelBuilder.forAddress("10.1.1.117", 60010); var channel = managedChannelBuilder.usePlaintext().build(); var stub = CollectPacketsServerGrpc.newBlockingStub(channel); // stub.withCompression("snappy"); diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 0ce9599..1a5cfba 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -3,7 +3,7 @@ class="ch.qos.logback.core.ConsoleAppender"> - %d{yyyyMMdd HH:mm:ss.SSS} %-5level%thread(%file:%line): %msg%n + %d{yyyyMMdd HH:mm:ss.SSS} %-5level%thread\(%file:%line\): %msg%n