修改 新解析pack的写法

This commit is contained in:
eson 2022-06-20 18:15:58 +08:00
parent efd701cfe7
commit 6de583aebc

View File

@ -8,9 +8,13 @@ package com.yuandian.dataflow.grpc;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.yuandian.common.Config;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc;
@ -21,7 +25,10 @@ import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServer
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServerStub;
import com.yuandian.dataflow.proto.msgtype.ApmBaseDataFlowOuterClass.ApmBaseDataFlow;
import com.yuandian.dataflow.proto.msgtype.AppFlowOuterClass.AppFlow;
import com.yuandian.dataflow.proto.msgtype.BussFlowMiddOuterClass.BussFlowMidd;
import com.yuandian.dataflow.proto.msgtype.BussFlowOrlOuterClass.BussFlowOrl;
import com.yuandian.dataflow.proto.msgtype.BussFlowWebOuterClass.BussFlowWeb;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
@ -45,6 +52,69 @@ public class CollectPackets extends CollectPacketsServerImplBase {
return null;
});
Map<String, Function<Any, Object>> domap = new HashMap();
System.out.println( Any.pack(ApmBaseDataFlow.newBuilder().build()).getTypeUrl() );
domap.put(Any.pack(ApmBaseDataFlow.newBuilder().build()).getTypeUrl(), (p)->{
try {
p.unpack(ApmBaseDataFlow.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
domap.put(Any.pack(BussFlowWeb.newBuilder().build()).getTypeUrl(), (p)->{
try {
p.unpack(BussFlowWeb.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
domap.put(Any.pack(BussFlowMidd.newBuilder().build()).getTypeUrl(), (p)->{
try {
p.unpack(BussFlowMidd.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
domap.put(Any.pack(BussFlowOrl.newBuilder().build()).getTypeUrl(), (p)->{
try {
p.unpack(BussFlowOrl.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
domap.put(Any.pack(AppFlow.newBuilder().build()).getTypeUrl(), (p)->{
try {
p.unpack(AppFlow.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
// private final ManagedChannelBuilder<?> managedChannelBuilder;
@ -67,17 +137,18 @@ public class CollectPackets extends CollectPacketsServerImplBase {
var stub = CollectPacketsServerGrpc.newBlockingStub(channel);
// stub.withCompression("snappy");
var response = stub.getPackets(Request.newBuilder().setVersion("0.1.0").build());
var response = stub.getPackets(Request.newBuilder().setVersion("v1.0.0").build());
while(true){
var iter = response.next();
Instant now = Instant.now();
var i = 0;
for(var p : iter.getPacketsList()) {
if(p.is(ApmBaseDataFlow.class)) {
var flow = p.unpack(ApmBaseDataFlow.class);
i ++;
}
var func = domap.get(p.getTypeUrl());
if(func != null) {
func.apply(p);
i++;
}
}
log.info("条数:{}, {}:ms",i,Duration.between(now, Instant.now()).toMillis());