diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index 35579fc..2c3acfb 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -76,7 +76,7 @@ public class Server { - String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; + /*String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"}; String[] sprPeers = new String[]{"3440","3441","3442"}; @@ -107,11 +107,13 @@ public class Server { node = cluster.start(); done = new RaftClosure(); - System.setProperty("server.port", sprPort); + System.setProperty("server.port", sprPort);*/ + + System.setProperty("server.port", "3440"); var app = SpringApplication.run(Server.class, args); app.start(); - node.shutdown(done); + // node.shutdown(done); } } diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index 0fb58ee..ed07bc8 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -5,9 +5,11 @@ import java.nio.ByteBuffer; import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.entity.Task; +import com.google.protobuf.util.JsonFormat; import com.yuandian.dataflow.Server; +import com.yuandian.dataflow.grpc.MongodbTest; import com.yuandian.dataflow.projo.Response; -import com.yuandian.dataflow.statemachine.RaftClosure; +import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; import lombok.extern.slf4j.Slf4j; @@ -32,20 +34,33 @@ public class TaskLog { // private static final Logger log = LoggerFactory.getLogger(TaskLog.class); private static Node node = Server.GetNode(); - @GetMapping(path = "/test") - public ResponseEntity Processing(@RequestBody int status) { + @PostMapping(path = "/test") + public ResponseEntity Processing(@RequestBody String json) { - Task task = new Task(); + /*Task task = new Task(); log.error(node.toString()); RaftClosure done = new RaftClosure(); task.setData(ByteBuffer.wrap("hello".getBytes())); task.setDone(done); - Server.GetNode().apply(task); + Server.GetNode().apply(task);*/ + + try { + // 1、类型转换 + BacktrackingFlowOuterClass.BacktrackingFlow.Builder builder = BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder(); + JsonFormat.parser().merge(json, builder); + BacktrackingFlowOuterClass.BacktrackingFlow backtrackingFlow = builder.build(); + + // 2、业务处理 + + // 3、数据保存到 mongoDB + MongodbTest.insertMsgToMongoDB(backtrackingFlow); + } catch (Exception e) { + e.printStackTrace(); + } Response response = new Response(); - response.Code = HttpStatus.OK; response.Message = HttpStatus.OK.toString(); return new ResponseEntity(response, HttpStatus.OK); diff --git a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java index dd41e53..45ce4ea 100644 --- a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java +++ b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java @@ -2,150 +2,248 @@ * description * * @author eson - *2022年6月09日-16:29:17 + * 2022年6月09日-16:29:17 */ package com.yuandian.dataflow.grpc; import java.time.Duration; import java.time.Instant; -import java.util.HashMap; -import java.util.Map; +import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; -import com.google.protobuf.Any; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; +import com.google.protobuf.*; +import com.google.protobuf.util.JsonFormat; import com.yuandian.common.Config; +import com.yuandian.dataflow.projo.Response; import com.yuandian.dataflow.proto.CollectPacketsServerGrpc; - + import com.yuandian.dataflow.proto.Base.Request; -import com.yuandian.dataflow.proto.Base.Response; import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServerImplBase; -import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServerStub; -import com.yuandian.dataflow.proto.msgtype.ApmBaseDataFlowOuterClass.ApmBaseDataFlow; -import com.yuandian.dataflow.proto.msgtype.AppFlowOuterClass.AppFlow; -import com.yuandian.dataflow.proto.msgtype.BussFlowMiddOuterClass.BussFlowMidd; -import com.yuandian.dataflow.proto.msgtype.BussFlowOrlOuterClass.BussFlowOrl; -import com.yuandian.dataflow.proto.msgtype.BussFlowWebOuterClass.BussFlowWeb; +import com.yuandian.dataflow.proto.msgtype.*; import io.grpc.ManagedChannelBuilder; -import io.grpc.stub.StreamObserver; import lombok.var; import lombok.extern.slf4j.Slf4j; +import org.springframework.http.*; +import org.springframework.web.client.RestTemplate; /** * description * * @author eson - *2022年6月09日-16:29:17 + * 2022年6月09日-16:29:17 */ @Slf4j public class CollectPackets extends CollectPacketsServerImplBase { - + public static void main(String[] args) throws Exception { - Config.UseConfig( (cnf) -> { - var tid = cnf.get("test_id"); - log.info("config {}",tid); - return null; - }); + Config.UseConfig((cnf) -> { + var tid = cnf.get("test_id"); + log.info("config {}", tid); + return null; + }); - Map> domap = new HashMap(); - - log.info("{}", Any.pack(ApmBaseDataFlow.newBuilder().build()).getTypeUrl() ); - log.info("{}", AppFlow.class.hashCode() ); - - domap.put(Any.pack(ApmBaseDataFlow.newBuilder().build()).getTypeUrl(), (p)->{ + var managedChannelBuilder = ManagedChannelBuilder.forAddress("10.1.1.117", 60017); + // var managedChannelBuilder = ManagedChannelBuilder.forAddress("127.0.0.1", 60017); + var channel = managedChannelBuilder.maxInboundMessageSize(Integer.MAX_VALUE).usePlaintext().build(); + + Map> domap = getHandleFuncMap(); try { - p.unpack(ApmBaseDataFlow.class); - } catch (InvalidProtocolBufferException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return null; - }); + var stub = CollectPacketsServerGrpc.newBlockingStub(channel); + var response = stub.getPackets(Request.newBuilder().setVersion("v1.0.0").build()); - - domap.put(Any.pack(BussFlowWeb.newBuilder().build()).getTypeUrl(), (p)->{ - try { - var msg = p.unpack(BussFlowWeb.class); - } catch (InvalidProtocolBufferException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return null; - }); + // todo 优先处理 回溯分析的 其他的流先忽略 + String typeUrl = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow.getDefaultInstance()).getTypeUrl(); - domap.put(Any.pack(BussFlowMidd.newBuilder().build()).getTypeUrl(), (p)->{ - try { - p.unpack(BussFlowMidd.class); - } catch (InvalidProtocolBufferException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return null; - }); + int count = 10; + while (count-- > 0) { + var iter = response.next(); + Instant now = Instant.now(); + var i = 0; + for (var p : iter.getPacketsList()) { + if (typeUrl.equals(p.getTypeUrl())) { + var func = domap.get(p.getTypeUrl()); + if (func != null) { + func.apply(p); + i++; + } + } else { + log.info("不是回溯分析的流,暂时忽略: {}", p.getTypeUrl()); + } + } - - domap.put(Any.pack(BussFlowOrl.newBuilder().build()).getTypeUrl(), (p)->{ - try { - var msg = p.unpack(BussFlowOrl.class); - } catch (InvalidProtocolBufferException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return null; - }); - - domap.put(Any.pack(AppFlow.newBuilder().build()).getTypeUrl(), (p)->{ - try { - p.unpack(AppFlow.class); - } catch (InvalidProtocolBufferException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - return null; - }); - - - // private final ManagedChannelBuilder managedChannelBuilder; - // private final CollectPacketsServerStub blockingStub; - - // private final ManagedChannel channel; - // public Client(String name, int port) { - - // } - // public void shutdown() throws InterruptedException { - // channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); - // } - // public void sayHi(String name){ - // MyThing.Request request = MyThing.Request.newBuilder().setName(name).build(); - // MyThing.Response response = blockingStub.sayHi(request); - // System.out.println(response.getName()); - // } - var managedChannelBuilder = ManagedChannelBuilder.forAddress("10.1.1.117", 60017); - var channel = managedChannelBuilder.usePlaintext().build(); - var stub = CollectPacketsServerGrpc.newBlockingStub(channel); - // stub.withCompression("snappy"); - - var response = stub.getPackets(Request.newBuilder().setVersion("v1.0.0").build()); - - while(true){ - var iter = response.next(); - Instant now = Instant.now(); - var i = 0; - for(var p : iter.getPacketsList()) { - var func = domap.get(p.getTypeUrl()); - if(func != null) { - func.apply(p); - i++; + log.info("次序:{} 条数: {}, {}:ms", count, i, Duration.between(now, Instant.now()).toMillis()); } + } catch (Exception e) { + e.printStackTrace(); + } finally { + channel.shutdown().awaitTermination(1, TimeUnit.SECONDS); } - log.info("条数:{}, {}:ms",i,Duration.between(now, Instant.now()).toMillis()); } + + // todo 待优化 + private static HashMap> getHandleFuncMap() { + return new HashMap>() { + { + /*put(Any.pack(ApmBaseDataFlowOuterClass.ApmBaseDataFlow.getDefaultInstance()).getTypeUrl(), (p) -> { + try { + var result = p.unpack(ApmBaseDataFlowOuterClass.ApmBaseDataFlow.class); + // System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result)); + + // MongodbTest.insertMsgToMongoDB(result); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + return null; + }); + put(Any.pack(AppFlowOuterClass.AppFlow.getDefaultInstance()).getTypeUrl(), (p) -> { + try { + var result = p.unpack(AppFlowOuterClass.AppFlow.class); + // System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result)); + + // MongodbTest.insertMsgToMongoDB(result); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + return null; + });*/ + + // todo 优先对接 回溯分析 的流,保存数据到 mongodb + put(Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow.getDefaultInstance()).getTypeUrl(), (p) -> { + try { + var result = p.unpack(BacktrackingFlowOuterClass.BacktrackingFlow.class); + System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result)); + + RestTemplate client = new RestTemplate(); + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + HttpEntity requestEntity = new HttpEntity<>(JsonFormat.printer().print(result), headers); + String url = "http://localhost:3440/test"; + ResponseEntity response = client.exchange(url, HttpMethod.POST, requestEntity, Response.class); + + System.out.println("result:" + response.getBody()); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + return null; + }); + + /*put(Any.pack(BusinessBodyDataOuterClass.BusinessBodyData.getDefaultInstance()).getTypeUrl(), (p) -> { + try { + var result = p.unpack(BusinessBodyDataOuterClass.BusinessBodyData.class); + // System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result)); + + // MongodbTest.insertMsgToMongoDB(result); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + return null; + }); + put(Any.pack(BussFlowDbOuterClass.BussFlowDb.getDefaultInstance()).getTypeUrl(), (p) -> { + try { + var result = p.unpack(BussFlowDbOuterClass.BussFlowDb.class); + // System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result)); + + // MongodbTest.insertMsgToMongoDB(result); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + return null; + }); + put(Any.pack(BussFlowExternalOuterClass.BussFlowExternal.getDefaultInstance()).getTypeUrl(), (p) -> { + try { + var result = p.unpack(BussFlowExternalOuterClass.BussFlowExternal.class); + // System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result)); + + // MongodbTest.insertMsgToMongoDB(result); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + return null; + }); + put(Any.pack(BussFlowMiddOuterClass.BussFlowMidd.getDefaultInstance()).getTypeUrl(), (p) -> { + try { + var result = p.unpack(BussFlowMiddOuterClass.BussFlowMidd.class); + // System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result)); + + // MongodbTest.insertMsgToMongoDB(result); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + return null; + }); + put(Any.pack(BussFlowOrlOuterClass.BussFlowOrl.getDefaultInstance()).getTypeUrl(), (p) -> { + try { + var result = p.unpack(BussFlowOrlOuterClass.BussFlowOrl.class); + // System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result)); + + // MongodbTest.insertMsgToMongoDB(result); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + return null; + }); + put(Any.pack(DataFlowOuterClass.DataFlow.getDefaultInstance()).getTypeUrl(), (p) -> { + try { + var result = p.unpack(DataFlowOuterClass.DataFlow.class); + // System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result)); + + // MongodbTest.insertMsgToMongoDB(result); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + return null; + }); + put(Any.pack(QoeFlowOuterClass.QoeFlow.getDefaultInstance()).getTypeUrl(), (p) -> { + try { + var result = p.unpack(QoeFlowOuterClass.QoeFlow.class); + // System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result)); + + // MongodbTest.insertMsgToMongoDB(result); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + return null; + }); + put(Any.pack(SstFlowOuterClass.SstFlow.getDefaultInstance()).getTypeUrl(), (p) -> { + try { + var result = p.unpack(SstFlowOuterClass.SstFlow.class); + // System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result)); + + // MongodbTest.insertMsgToMongoDB(result); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + return null; + }); + put(Any.pack(UsrFlowOuterClass.UsrFlow.getDefaultInstance()).getTypeUrl(), (p) -> { + try { + var result = p.unpack(UsrFlowOuterClass.UsrFlow.class); + // System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result)); + + // MongodbTest.insertMsgToMongoDB(result); + } catch (InvalidProtocolBufferException e) { + e.printStackTrace(); + } + return null; + });*/ + } + }; + } + + private static String convert(com.google.protobuf.ByteString byteString) { + ArrayList res = new ArrayList<>(); + byteString.forEach(byteStr -> { + res.add(String.valueOf(byteStr & 0xff)); + }); + return res.stream().collect(Collectors.joining(".")); } } diff --git a/src/main/java/com/yuandian/dataflow/grpc/CollectPacketsServer.java b/src/main/java/com/yuandian/dataflow/grpc/CollectPacketsServer.java new file mode 100644 index 0000000..d459dd6 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/grpc/CollectPacketsServer.java @@ -0,0 +1,82 @@ +/* + * Copyright 2015 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.yuandian.dataflow.grpc; + +import com.google.protobuf.Any; +import com.yuandian.dataflow.proto.Base; +import com.yuandian.dataflow.proto.CollectPacketsServerGrpc; +import com.yuandian.dataflow.proto.msgtype.AppFlowOuterClass; +import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * Server that manages startup/shutdown of a {@code Greeter} server. + */ +public class CollectPacketsServer extends CollectPacketsServerGrpc.CollectPacketsServerImplBase { + private static final Logger logger = Logger.getLogger(CollectPacketsServer.class.getName()); + + @Override + public void getPackets(Base.Request request, StreamObserver responseObserver) { + for (int i = 0; i < 1; i++) { + responseObserver.onNext(Base.Response.newBuilder() + .setCode(200) + .setMessage("测试一波") + .addPackets( + Any.pack( + BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder() + .setTableId(10086) + .build() + ) + ) + .build()); + System.out.println("CollectPacketsServer: " + i); + } + responseObserver.onCompleted(); + } + + //private Server server; + + /** + * Main launches the server from the command line. + */ + public static void main(String[] args) throws IOException, InterruptedException { + Server server = ServerBuilder.forPort(60017) + .addService(new CollectPacketsServer()) + .build() + .start(); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("*** server shut down"); + } + }); + server.awaitTermination(); + } +} diff --git a/src/main/java/com/yuandian/dataflow/grpc/MongodbTest.java b/src/main/java/com/yuandian/dataflow/grpc/MongodbTest.java new file mode 100644 index 0000000..d250df0 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/grpc/MongodbTest.java @@ -0,0 +1,86 @@ +package com.yuandian.dataflow.grpc; + +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import org.bson.Document; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; + +public class MongodbTest { + + public static void insertMsgToMongoDB(T obj) { + try { + ServerAddress serverAddress = new ServerAddress("192.168.1.113", 27017); + List addrs = new ArrayList<>(); + addrs.add(serverAddress); + + MongoCredential credential = MongoCredential.createScramSha1Credential("yd-rw", "admin", "yuandian123".toCharArray()); + List credentials = new ArrayList<>(); + credentials.add(credential); + + MongoClient mongoClient = new MongoClient(addrs, credentials); + + MongoDatabase db = mongoClient.getDatabase("yd-base"); + + // todo 修改名字 + MongoCollection collection = db.getCollection("lxy-test"); + + collection.insertOne(obj2Doc(obj)); + + System.err.println("insert success"); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static Document obj2Doc(T obj) throws Exception { + Document doc = new Document(); + Field[] fields = obj.getClass().getDeclaredFields(); + for (Field field : fields) { + String varName = field.getName(); + boolean accessFlag = field.isAccessible(); + if (!accessFlag) { + field.setAccessible(true); + } + Object param = field.get(obj); + if (param == null) { + continue; + } else if (param instanceof Integer) { + int value = ((Integer) param).intValue(); + doc.put(varName, value); + } else if (param instanceof String) { + String value = (String) param; + doc.put(varName, value); + } else if (param instanceof Double) { + double value = ((Double) param).doubleValue(); + doc.put(varName, value); + } else if (param instanceof Float) { + float value = ((Float) param).floatValue(); + doc.put(varName, value); + } else if (param instanceof Long) { + long value = ((Long) param).longValue(); + doc.put(varName, value); + } else if (param instanceof Boolean) { + boolean value = ((Boolean) param).booleanValue(); + doc.put(varName, value); + } + field.setAccessible(accessFlag); + } + return doc; + } + + public static T doc2Obj(Document doc, Class clazz) throws Exception { + T obj = clazz.newInstance(); + for (String key : doc.keySet()) { + Field field = clazz.getDeclaredField(key); + field.setAccessible(true); + field.set(obj, doc.get(key)); + } + return obj; + } +}