feat: 回溯分析流的简单处理

This commit is contained in:
linxianying 2022-07-11 10:44:16 +08:00
parent ae89fb4caf
commit 9e899c8c28
5 changed files with 400 additions and 117 deletions

View File

@ -76,7 +76,7 @@ public class Server {
String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
/*String[] peers = new String[]{"localhost:4440","localhost:4441","localhost:4442"};
String[] sprPeers = new String[]{"3440","3441","3442"};
@ -107,11 +107,13 @@ public class Server {
node = cluster.start();
done = new RaftClosure();
System.setProperty("server.port", sprPort);
System.setProperty("server.port", sprPort);*/
System.setProperty("server.port", "3440");
var app = SpringApplication.run(Server.class, args);
app.start();
node.shutdown(done);
// node.shutdown(done);
}
}

View File

@ -5,9 +5,11 @@ import java.nio.ByteBuffer;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.entity.Task;
import com.google.protobuf.util.JsonFormat;
import com.yuandian.dataflow.Server;
import com.yuandian.dataflow.grpc.MongodbTest;
import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.statemachine.RaftClosure;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import lombok.extern.slf4j.Slf4j;
@ -32,20 +34,33 @@ public class TaskLog {
// private static final Logger log = LoggerFactory.getLogger(TaskLog.class);
private static Node node = Server.GetNode();
@GetMapping(path = "/test")
public ResponseEntity<Response> Processing(@RequestBody int status) {
@PostMapping(path = "/test")
public ResponseEntity<Response> Processing(@RequestBody String json) {
Task task = new Task();
/*Task task = new Task();
log.error(node.toString());
RaftClosure done = new RaftClosure();
task.setData(ByteBuffer.wrap("hello".getBytes()));
task.setDone(done);
Server.GetNode().apply(task);
Server.GetNode().apply(task);*/
try {
// 1类型转换
BacktrackingFlowOuterClass.BacktrackingFlow.Builder builder = BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder();
JsonFormat.parser().merge(json, builder);
BacktrackingFlowOuterClass.BacktrackingFlow backtrackingFlow = builder.build();
// 2业务处理
// 3数据保存到 mongoDB
MongodbTest.insertMsgToMongoDB(backtrackingFlow);
} catch (Exception e) {
e.printStackTrace();
}
Response response = new Response();
response.Code = HttpStatus.OK;
response.Message = HttpStatus.OK.toString();
return new ResponseEntity<Response>(response, HttpStatus.OK);

View File

@ -2,150 +2,248 @@
* description
*
* @author eson
*2022年6月09日-16:29:17
* 2022年6月09日-16:29:17
*/
package com.yuandian.dataflow.grpc;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.*;
import com.google.protobuf.util.JsonFormat;
import com.yuandian.common.Config;
import com.yuandian.dataflow.projo.Response;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc;
import com.yuandian.dataflow.proto.Base.Request;
import com.yuandian.dataflow.proto.Base.Response;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServerImplBase;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc.CollectPacketsServerStub;
import com.yuandian.dataflow.proto.msgtype.ApmBaseDataFlowOuterClass.ApmBaseDataFlow;
import com.yuandian.dataflow.proto.msgtype.AppFlowOuterClass.AppFlow;
import com.yuandian.dataflow.proto.msgtype.BussFlowMiddOuterClass.BussFlowMidd;
import com.yuandian.dataflow.proto.msgtype.BussFlowOrlOuterClass.BussFlowOrl;
import com.yuandian.dataflow.proto.msgtype.BussFlowWebOuterClass.BussFlowWeb;
import com.yuandian.dataflow.proto.msgtype.*;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import lombok.var;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.*;
import org.springframework.web.client.RestTemplate;
/**
* description
*
* @author eson
*2022年6月09日-16:29:17
* 2022年6月09日-16:29:17
*/
@Slf4j
public class CollectPackets extends CollectPacketsServerImplBase {
public static void main(String[] args) throws Exception {
Config.UseConfig( (cnf) -> {
var tid = cnf.get("test_id");
log.info("config {}",tid);
return null;
});
Config.UseConfig((cnf) -> {
var tid = cnf.get("test_id");
log.info("config {}", tid);
return null;
});
Map<String, Function<Any, Object>> domap = new HashMap();
log.info("{}", Any.pack(ApmBaseDataFlow.newBuilder().build()).getTypeUrl() );
log.info("{}", AppFlow.class.hashCode() );
domap.put(Any.pack(ApmBaseDataFlow.newBuilder().build()).getTypeUrl(), (p)->{
var managedChannelBuilder = ManagedChannelBuilder.forAddress("10.1.1.117", 60017);
// var managedChannelBuilder = ManagedChannelBuilder.forAddress("127.0.0.1", 60017);
var channel = managedChannelBuilder.maxInboundMessageSize(Integer.MAX_VALUE).usePlaintext().build();
Map<String, Function<Any, Object>> domap = getHandleFuncMap();
try {
p.unpack(ApmBaseDataFlow.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
var stub = CollectPacketsServerGrpc.newBlockingStub(channel);
var response = stub.getPackets(Request.newBuilder().setVersion("v1.0.0").build());
domap.put(Any.pack(BussFlowWeb.newBuilder().build()).getTypeUrl(), (p)->{
try {
var msg = p.unpack(BussFlowWeb.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
// todo 优先处理 回溯分析的 其他的流先忽略
String typeUrl = Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow.getDefaultInstance()).getTypeUrl();
domap.put(Any.pack(BussFlowMidd.newBuilder().build()).getTypeUrl(), (p)->{
try {
p.unpack(BussFlowMidd.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
int count = 10;
while (count-- > 0) {
var iter = response.next();
Instant now = Instant.now();
var i = 0;
for (var p : iter.getPacketsList()) {
if (typeUrl.equals(p.getTypeUrl())) {
var func = domap.get(p.getTypeUrl());
if (func != null) {
func.apply(p);
i++;
}
} else {
log.info("不是回溯分析的流,暂时忽略: {}", p.getTypeUrl());
}
}
domap.put(Any.pack(BussFlowOrl.newBuilder().build()).getTypeUrl(), (p)->{
try {
var msg = p.unpack(BussFlowOrl.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
domap.put(Any.pack(AppFlow.newBuilder().build()).getTypeUrl(), (p)->{
try {
p.unpack(AppFlow.class);
} catch (InvalidProtocolBufferException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
});
// private final ManagedChannelBuilder<?> managedChannelBuilder;
// private final CollectPacketsServerStub blockingStub;
// private final ManagedChannel channel;
// public Client(String name, int port) {
// }
// public void shutdown() throws InterruptedException {
// channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
// }
// public void sayHi(String name){
// MyThing.Request request = MyThing.Request.newBuilder().setName(name).build();
// MyThing.Response response = blockingStub.sayHi(request);
// System.out.println(response.getName());
// }
var managedChannelBuilder = ManagedChannelBuilder.forAddress("10.1.1.117", 60017);
var channel = managedChannelBuilder.usePlaintext().build();
var stub = CollectPacketsServerGrpc.newBlockingStub(channel);
// stub.withCompression("snappy");
var response = stub.getPackets(Request.newBuilder().setVersion("v1.0.0").build());
while(true){
var iter = response.next();
Instant now = Instant.now();
var i = 0;
for(var p : iter.getPacketsList()) {
var func = domap.get(p.getTypeUrl());
if(func != null) {
func.apply(p);
i++;
log.info("次序:{} 条数: {}, {}:ms", count, i, Duration.between(now, Instant.now()).toMillis());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
channel.shutdown().awaitTermination(1, TimeUnit.SECONDS);
}
log.info("条数:{}, {}:ms",i,Duration.between(now, Instant.now()).toMillis());
}
// todo 待优化
private static HashMap<String, Function<Any, Object>> getHandleFuncMap() {
return new HashMap<String, Function<Any, Object>>() {
{
/*put(Any.pack(ApmBaseDataFlowOuterClass.ApmBaseDataFlow.getDefaultInstance()).getTypeUrl(), (p) -> {
try {
var result = p.unpack(ApmBaseDataFlowOuterClass.ApmBaseDataFlow.class);
// System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
});
put(Any.pack(AppFlowOuterClass.AppFlow.getDefaultInstance()).getTypeUrl(), (p) -> {
try {
var result = p.unpack(AppFlowOuterClass.AppFlow.class);
// System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
});*/
// todo 优先对接 回溯分析 的流保存数据到 mongodb
put(Any.pack(BacktrackingFlowOuterClass.BacktrackingFlow.getDefaultInstance()).getTypeUrl(), (p) -> {
try {
var result = p.unpack(BacktrackingFlowOuterClass.BacktrackingFlow.class);
System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
RestTemplate client = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<Object> requestEntity = new HttpEntity<>(JsonFormat.printer().print(result), headers);
String url = "http://localhost:3440/test";
ResponseEntity<Response> response = client.exchange(url, HttpMethod.POST, requestEntity, Response.class);
System.out.println("result:" + response.getBody());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
});
/*put(Any.pack(BusinessBodyDataOuterClass.BusinessBodyData.getDefaultInstance()).getTypeUrl(), (p) -> {
try {
var result = p.unpack(BusinessBodyDataOuterClass.BusinessBodyData.class);
// System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
});
put(Any.pack(BussFlowDbOuterClass.BussFlowDb.getDefaultInstance()).getTypeUrl(), (p) -> {
try {
var result = p.unpack(BussFlowDbOuterClass.BussFlowDb.class);
// System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
});
put(Any.pack(BussFlowExternalOuterClass.BussFlowExternal.getDefaultInstance()).getTypeUrl(), (p) -> {
try {
var result = p.unpack(BussFlowExternalOuterClass.BussFlowExternal.class);
// System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
});
put(Any.pack(BussFlowMiddOuterClass.BussFlowMidd.getDefaultInstance()).getTypeUrl(), (p) -> {
try {
var result = p.unpack(BussFlowMiddOuterClass.BussFlowMidd.class);
// System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
});
put(Any.pack(BussFlowOrlOuterClass.BussFlowOrl.getDefaultInstance()).getTypeUrl(), (p) -> {
try {
var result = p.unpack(BussFlowOrlOuterClass.BussFlowOrl.class);
// System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
});
put(Any.pack(DataFlowOuterClass.DataFlow.getDefaultInstance()).getTypeUrl(), (p) -> {
try {
var result = p.unpack(DataFlowOuterClass.DataFlow.class);
// System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
});
put(Any.pack(QoeFlowOuterClass.QoeFlow.getDefaultInstance()).getTypeUrl(), (p) -> {
try {
var result = p.unpack(QoeFlowOuterClass.QoeFlow.class);
// System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
});
put(Any.pack(SstFlowOuterClass.SstFlow.getDefaultInstance()).getTypeUrl(), (p) -> {
try {
var result = p.unpack(SstFlowOuterClass.SstFlow.class);
// System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
});
put(Any.pack(UsrFlowOuterClass.UsrFlow.getDefaultInstance()).getTypeUrl(), (p) -> {
try {
var result = p.unpack(UsrFlowOuterClass.UsrFlow.class);
// System.out.println(result.getClass().toString() + " ,val: " + JsonFormat.printer().print(result));
// MongodbTest.insertMsgToMongoDB(result);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
});*/
}
};
}
private static String convert(com.google.protobuf.ByteString byteString) {
ArrayList<String> res = new ArrayList<>();
byteString.forEach(byteStr -> {
res.add(String.valueOf(byteStr & 0xff));
});
return res.stream().collect(Collectors.joining("."));
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright 2015 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yuandian.dataflow.grpc;
import com.google.protobuf.Any;
import com.yuandian.dataflow.proto.Base;
import com.yuandian.dataflow.proto.CollectPacketsServerGrpc;
import com.yuandian.dataflow.proto.msgtype.AppFlowOuterClass;
import com.yuandian.dataflow.proto.msgtype.BacktrackingFlowOuterClass;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
/**
* Server that manages startup/shutdown of a {@code Greeter} server.
*/
public class CollectPacketsServer extends CollectPacketsServerGrpc.CollectPacketsServerImplBase {
private static final Logger logger = Logger.getLogger(CollectPacketsServer.class.getName());
@Override
public void getPackets(Base.Request request, StreamObserver<Base.Response> responseObserver) {
for (int i = 0; i < 1; i++) {
responseObserver.onNext(Base.Response.newBuilder()
.setCode(200)
.setMessage("测试一波")
.addPackets(
Any.pack(
BacktrackingFlowOuterClass.BacktrackingFlow.newBuilder()
.setTableId(10086)
.build()
)
)
.build());
System.out.println("CollectPacketsServer: " + i);
}
responseObserver.onCompleted();
}
//private Server server;
/**
* Main launches the server from the command line.
*/
public static void main(String[] args) throws IOException, InterruptedException {
Server server = ServerBuilder.forPort(60017)
.addService(new CollectPacketsServer())
.build()
.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
server.awaitTermination();
}
}

View File

@ -0,0 +1,86 @@
package com.yuandian.dataflow.grpc;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
public class MongodbTest {
public static <T> void insertMsgToMongoDB(T obj) {
try {
ServerAddress serverAddress = new ServerAddress("192.168.1.113", 27017);
List<ServerAddress> addrs = new ArrayList<>();
addrs.add(serverAddress);
MongoCredential credential = MongoCredential.createScramSha1Credential("yd-rw", "admin", "yuandian123".toCharArray());
List<MongoCredential> credentials = new ArrayList<>();
credentials.add(credential);
MongoClient mongoClient = new MongoClient(addrs, credentials);
MongoDatabase db = mongoClient.getDatabase("yd-base");
// todo 修改名字
MongoCollection<Document> collection = db.getCollection("lxy-test");
collection.insertOne(obj2Doc(obj));
System.err.println("insert success");
} catch (Exception e) {
e.printStackTrace();
}
}
public static <T> Document obj2Doc(T obj) throws Exception {
Document doc = new Document();
Field[] fields = obj.getClass().getDeclaredFields();
for (Field field : fields) {
String varName = field.getName();
boolean accessFlag = field.isAccessible();
if (!accessFlag) {
field.setAccessible(true);
}
Object param = field.get(obj);
if (param == null) {
continue;
} else if (param instanceof Integer) {
int value = ((Integer) param).intValue();
doc.put(varName, value);
} else if (param instanceof String) {
String value = (String) param;
doc.put(varName, value);
} else if (param instanceof Double) {
double value = ((Double) param).doubleValue();
doc.put(varName, value);
} else if (param instanceof Float) {
float value = ((Float) param).floatValue();
doc.put(varName, value);
} else if (param instanceof Long) {
long value = ((Long) param).longValue();
doc.put(varName, value);
} else if (param instanceof Boolean) {
boolean value = ((Boolean) param).booleanValue();
doc.put(varName, value);
}
field.setAccessible(accessFlag);
}
return doc;
}
public static <T> T doc2Obj(Document doc, Class<T> clazz) throws Exception {
T obj = clazz.newInstance();
for (String key : doc.keySet()) {
Field field = clazz.getDeclaredField(key);
field.setAccessible(true);
field.set(obj, doc.get(key));
}
return obj;
}
}