diff --git a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java index 2eac689..62a583f 100644 --- a/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java +++ b/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java @@ -8,9 +8,13 @@ package com.yuandian.dataflow.grpc; import java.time.Duration; import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; import com.yuandian.common.Config; import com.yuandian.dataflow.proto.CollectPacketsServerGrpc; @@ -21,7 +25,10 @@ import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServer import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServerStub; import com.yuandian.dataflow.proto.msgtype.ApmBaseDataFlowOuterClass.ApmBaseDataFlow; - +import com.yuandian.dataflow.proto.msgtype.AppFlowOuterClass.AppFlow; +import com.yuandian.dataflow.proto.msgtype.BussFlowMiddOuterClass.BussFlowMidd; +import com.yuandian.dataflow.proto.msgtype.BussFlowOrlOuterClass.BussFlowOrl; +import com.yuandian.dataflow.proto.msgtype.BussFlowWebOuterClass.BussFlowWeb; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; @@ -45,6 +52,69 @@ public class CollectPackets extends CollectPacketsServerImplBase { return null; }); + Map> domap = new HashMap(); + + + + System.out.println( Any.pack(ApmBaseDataFlow.newBuilder().build()).getTypeUrl() ); + + + domap.put(Any.pack(ApmBaseDataFlow.newBuilder().build()).getTypeUrl(), (p)->{ + try { + p.unpack(ApmBaseDataFlow.class); + } catch (InvalidProtocolBufferException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + }); + + + domap.put(Any.pack(BussFlowWeb.newBuilder().build()).getTypeUrl(), (p)->{ + try { + p.unpack(BussFlowWeb.class); + } catch (InvalidProtocolBufferException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + }); + + domap.put(Any.pack(BussFlowMidd.newBuilder().build()).getTypeUrl(), (p)->{ + try { + p.unpack(BussFlowMidd.class); + } catch (InvalidProtocolBufferException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + }); + + + domap.put(Any.pack(BussFlowOrl.newBuilder().build()).getTypeUrl(), (p)->{ + try { + p.unpack(BussFlowOrl.class); + } catch (InvalidProtocolBufferException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + }); + + domap.put(Any.pack(AppFlow.newBuilder().build()).getTypeUrl(), (p)->{ + try { + p.unpack(AppFlow.class); + } catch (InvalidProtocolBufferException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return null; + }); + + + + + // private final ManagedChannelBuilder managedChannelBuilder; @@ -67,17 +137,18 @@ public class CollectPackets extends CollectPacketsServerImplBase { var stub = CollectPacketsServerGrpc.newBlockingStub(channel); // stub.withCompression("snappy"); - var response = stub.getPackets(Request.newBuilder().setVersion("0.1.0").build()); + var response = stub.getPackets(Request.newBuilder().setVersion("v1.0.0").build()); while(true){ var iter = response.next(); Instant now = Instant.now(); var i = 0; for(var p : iter.getPacketsList()) { - if(p.is(ApmBaseDataFlow.class)) { - var flow = p.unpack(ApmBaseDataFlow.class); - i ++; - } + var func = domap.get(p.getTypeUrl()); + if(func != null) { + func.apply(p); + i++; + } } log.info("条数:{}, {}:ms",i,Duration.between(now, Instant.now()).toMillis());