dataflow/src/main/java/com/yuandian/dataflow/grpc/CollectPackets.java
2022-07-05 18:23:30 +08:00

152 lines
4.7 KiB
Java

/**
* description
*
* @author eson
*2022年6月09日-16:29:17
*/
package com.yuandian.dataflow.grpc;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.yuandian.common.Config;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc;
import com.yuandian.dataflow.proto.Base.Request;
import com.yuandian.dataflow.proto.Base.Response;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServerImplBase;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServerStub;
import com.yuandian.dataflow.proto.msgtype.ApmBaseDataFlowOuterClass.ApmBaseDataFlow;
import com.yuandian.dataflow.proto.msgtype.AppFlowOuterClass.AppFlow;
import com.yuandian.dataflow.proto.msgtype.BussFlowMiddOuterClass.BussFlowMidd;
import com.yuandian.dataflow.proto.msgtype.BussFlowOrlOuterClass.BussFlowOrl;
import com.yuandian.dataflow.proto.msgtype.BussFlowWebOuterClass.BussFlowWeb;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
/**
* description
*
* @author eson
*2022年6月09日-16:29:17
*/
@Slf4j
public class CollectPackets extends CollectPacketsServerImplBase {
public static void main(String[] args) throws Exception {
Config.UseConfig( (cnf) -> {
var tid = cnf.get("test_id");
log.info("config {}",tid);
return null;
});
Map<String, Function<Any, Object>> domap = new HashMap();
log.info("{}", Any.pack(ApmBaseDataFlow.newBuilder().build()).getTypeUrl() );
log.info("{}", AppFlow.class.hashCode() );
domap.put(Any.pack(ApmBaseDataFlow.newBuilder().build()).getTypeUrl(), (p)->{
try {
p.unpack(ApmBaseDataFlow.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
domap.put(Any.pack(BussFlowWeb.newBuilder().build()).getTypeUrl(), (p)->{
try {
var msg = p.unpack(BussFlowWeb.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
domap.put(Any.pack(BussFlowMidd.newBuilder().build()).getTypeUrl(), (p)->{
try {
p.unpack(BussFlowMidd.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
domap.put(Any.pack(BussFlowOrl.newBuilder().build()).getTypeUrl(), (p)->{
try {
var msg = p.unpack(BussFlowOrl.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
domap.put(Any.pack(AppFlow.newBuilder().build()).getTypeUrl(), (p)->{
try {
p.unpack(AppFlow.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
// private final ManagedChannelBuilder<?> managedChannelBuilder;
// private final CollectPacketsServerStub blockingStub;
// private final ManagedChannel channel;
// public Client(String name, int port) {
// }
// public void shutdown() throws InterruptedException {
// channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
// }
// public void sayHi(String name){
// MyThing.Request request = MyThing.Request.newBuilder().setName(name).build();
// MyThing.Response response = blockingStub.sayHi(request);
// System.out.println(response.getName());
// }
var managedChannelBuilder = ManagedChannelBuilder.forAddress("10.1.1.117", 60017);
var channel = managedChannelBuilder.usePlaintext().build();
var stub = CollectPacketsServerGrpc.newBlockingStub(channel);
// stub.withCompression("snappy");
var response = stub.getPackets(Request.newBuilder().setVersion("v1.0.0").build());
while(true){
var iter = response.next();
Instant now = Instant.now();
var i = 0;
for(var p : iter.getPacketsList()) {
var func = domap.get(p.getTypeUrl());
if(func != null) {
func.apply(p);
i++;
}
}
log.info("条数:{}, {}:ms",i,Duration.between(now, Instant.now()).toMillis());
}
}
}