diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/ApmBaseDataFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/ApmBaseDataFlow.java new file mode 100644 index 0000000..def7133 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/ApmBaseDataFlow.java @@ -0,0 +1,287 @@ +package com.yuandian.dataflow.proto.decode; + + +import java.nio.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author User + * + */ +public class ApmBaseDataFlow extends PacketBase { + private static Logger logger = LoggerFactory.getLogger(ApmBaseDataFlow.class); + public static int SIZE = 156;//88;// + /** + * 抓包口 + */ + public long probeIf; + + + //四元组 + /** + * 请求端口 + */ + public int requestPort; + /** + * 响应端口 + */ + public int responsePort; + /** + * 请求IP + */ + public long requestIp; + /** + * 响应Ip + */ + public long responseIp; + + /** + * 源mac + */ + public long srcMac; + + /** + * 目的mac + */ + public long dstMac; + + /** + * 链路编号 + */ + public long vlanId; + + public long tvSec; + public long tvUsec; + + /** + * 开始时间 + */ + public long startTm; + + /** + * 总字节数 + */ + public long totalBytes; + + /** + * 总包数 + */ + public long totalPackets; + /** + * 总丢包数 + */ + public long totalDropPackets; + /** + * 重传延时 + */ + public long retranTimeDelay; + /** + * 客户端rtt + */ + public long clientRtt; + + /** + * 服务端Rtt + */ + public long serverRtt; + + + /** + * 用户响应时间 + */ + public long userResponseTime; + /** + * 服务响应时间 + */ + public long serverResponseTime; + /** + * tcp回话连接失败数 + */ + public long conFail; + + /** + * 会话重置数 + */ +// public long reset; + + /** + * 服务端总字节数 + */ + public long bytesIn; + /** + * 客户端总字节数 + */ + public long bytesOut; + /** + * 探针推送时间 + */ + public long timeFlag; + + /** + * 结束时间 + */ + public long endTm; + + /** + * 结束时间微秒 + */ + public long endTmUsec; + + /** + * 总响应数 + */ + public long responNum; + /** + * 客户端零窗口数 + */ + public long csWindow; + /** + * 服务端零窗口数 + */ + public long scWindow; + /** + * 客户端重置数 + */ + public long csReset; + /** + * 服务端重置数 + */ + public long scReset; + /** + * 客户端重传数 + */ + public long csRetran; + /** + * 服务端重传数 + */ + public long scRetran; + /** + * 会话建立时间 + */ + public long connSetupTm; + /** + * 新建会话数 + */ + public long newSession; + + public long csAlert; + + public long scAlert; + + public String protocal; + + + + @Override + public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { + ApmBaseDataFlow oneData = new ApmBaseDataFlow(); + oneData.setPacketHeader(header); + + oneData.probeIf = data.getLong(4); + + + oneData.requestPort = data.getInt( 2); + + oneData.responsePort = data.getInt( 2); + + oneData.requestIp = data.getLong(4); + + oneData.responseIp = data.getLong(4); + + + oneData.srcMac = data.getLong(6); + + oneData.dstMac = data.getLong(6); + + oneData.vlanId = data.getLong(4); + + + oneData.tvSec = data.getLong(4); + + oneData.tvUsec = data.getLong(4); + + +// oneData.startTm = data.getLong(8); + + + oneData.totalBytes = data.getLong(4); + + oneData.totalPackets = data.getLong(4); + + oneData.totalDropPackets = data.getLong(4); + + oneData.retranTimeDelay = data.getLong(4); + + oneData.clientRtt = data.getLong(4); + + //服务端Rtt + oneData.serverRtt = data.getLong(4); + + oneData.userResponseTime = data.getLong(4); + + oneData.serverResponseTime = data.getLong(4); + + oneData.conFail = data.getLong(4); + + +// oneData.reset = data.getLong(4); + + oneData.bytesIn = data.getLong(4); + + oneData.bytesOut = data.getLong(4); + + oneData.timeFlag = data.getLong(4); + + + oneData.endTm = data.getLong(4); + + oneData.endTmUsec = data.getLong(4); + + + oneData.responNum = data.getLong(4); + + oneData.csWindow = data.getLong(4); + + oneData.scWindow = data.getLong(4); + + oneData.csReset = data.getLong(4); + + oneData.scReset = data.getLong(4); + + oneData.csRetran = data.getLong(4); + + oneData.scRetran = data.getLong(4); + + oneData.connSetupTm = data.getLong(4); + + oneData.newSession = data.getLong(4); + + oneData.csAlert = data.getLong(4); + + oneData.scAlert = data.getLong(4); + + + oneData.protocal = new String(data.array()); + + return oneData; + } + + @Override + public String[] getData() { + return null; + } + + @Override + public int getInterfaceNumber() { + return 0; + } + + @Override + public int getUnitPacketLength() { + return 0; + } + +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/AppFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/AppFlow.java new file mode 100644 index 0000000..ee96ef9 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/AppFlow.java @@ -0,0 +1,101 @@ +package com.yuandian.dataflow.proto.decode; +import java.nio.ByteBuffer; + + +public class AppFlow extends PacketBase { + public static int SIZE = 104; + public long srcIp; + public int srcPort; + public long dstIp; + public int dstPort; + public long startTvSec; + public long startTvUsec; + public long lastTvSec; + public long lastTvUsec; + public long endTvSec; + public long endTvUsec; + public int inputPackets; + public int outputPackets; + public int inputBytes; + public int outputBytes; + public String protocaol; + public int appId; + public int appGroupId; + public int probeIf; + public int appStyle; + public long timeFlag; + public int vlanId; + public int mplsLable; + public int tos; + + + @Override + public PacketBase Parse(PacketHeader header,ByteBuffer data) + throws Exception { + AppFlow nlf = new AppFlow(); + nlf.srcIp = ByteUtil.getInteger(data, offset, 4); + offset += 4; + nlf.srcPort = ByteUtil.getInteger(data, offset, 2); + offset += 2; + nlf.dstIp = ByteUtil.getInteger(data, offset, 4); + offset += 4; + nlf.dstPort = ByteUtil.getInteger(data, offset, 2); + offset += 2; + nlf.startTvSec = ByteUtil.getLong(data, offset, 4); + offset += 4; + nlf.startTvUsec = ByteUtil.getLong(data, offset, 4); + offset += 4; + nlf.lastTvSec = ByteUtil.getLong(data, offset, 4); + offset += 4; + nlf.lastTvUsec = ByteUtil.getLong(data, offset, 4); + offset += 4; + nlf.endTvSec = ByteUtil.getLong(data, offset, 4); + offset += 4; + nlf.endTvUsec = ByteUtil.getLong(data, offset, 4); + offset += 4; + nlf.inputPackets = ByteUtil.getInteger(data, offset, 4); + offset += 4; + nlf.outputPackets = ByteUtil.getInteger(data, offset, 4); + offset += 4; + nlf.inputBytes = ByteUtil.getInteger(data, offset, 4); + offset += 4; + nlf.outputBytes = ByteUtil.getInteger(data, offset, 4); + offset += 4; + nlf.protocaol = ByteUtil.getString(data, offset, 20); + offset += 20; + nlf.appId = ByteUtil.getInteger(data, offset, 4); + offset += 4; + nlf.appGroupId = ByteUtil.getInteger(data, offset, 4); + offset += 4; + nlf.probeIf = ByteUtil.getInteger(data, offset, 4); + offset += 4; + nlf.appStyle = ByteUtil.getInteger(data, offset, 4); + offset += 4; + nlf.timeFlag = ByteUtil.getLong(data, offset, 4); + offset += 4; + nlf.vlanId = ByteUtil.getInteger(data, offset, 4); + offset += 4; + nlf.mplsLable = ByteUtil.getInteger(data, offset, 4); + offset += 4; + nlf.tos = ByteUtil.getInteger(data, offset, 4); + offset += 4; + + return nlf; + } + + @Override + public int getUnitPacketLength() { + return 0; + } + + @Override + public String[] getData() { + return null; + } + + @Override + public int getInterfaceNumber() { + return 0; + } + +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BacktrackingFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BacktrackingFlow.java new file mode 100644 index 0000000..fd7757b --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BacktrackingFlow.java @@ -0,0 +1,98 @@ +package com.yuandian.dataflow.proto.decode; + + + +/** + * @author User + * + */ +public class BacktrackingFlow extends PacketBase{ + public static int SIZE = 108; + //tuple StatisTuple10 56 10元组信息 + private long macSrc; //源MAC 8 + private long macDst; //目的MAC 8 + private long ipSrc; //源IP 8 + private long ipDst; //目的IP 8 + private long portSrc; //源端口,如果没有,为-1 + private long portDst; //目标端口,如果没有,为-1 + private int l3Proto; //第三层协议ID,如果没有,为-1 + private int l4Proto; //第四层协议ID,如果没有,为-1 + private int tos; //Tos,如果没有,为-1 + private int vlanId; //vlan ID,如果没有,为-1 + + private long bytes; // 8 字节总数 + private long packets; // 8 数据包总数 + private long tcpSp; // 8 tcp同步包数 + private long tcpScpn;// 8 tcp同步确认包数 + private long tcpSrp; // 8 tcp同步重置包数 + private long appId; // 4 appID + private long appGroupId;// 4 app组ID + private long mplsLabel;// 4 + + + + + + @Override + public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { + BacktrackingFlow backFlow = new BacktrackingFlow(); + backFlow.macSrc = ByteUtil.getLong(data, offset, 8); //源MAC 8 + offset+=8; + backFlow.macDst = ByteUtil.getLong(data, offset, 8); //目的MAC 8 + offset+=8; + backFlow.ipSrc = ByteUtil.getLong(data, offset, 8); //源IP 8 + offset+=8; + backFlow.ipDst = ByteUtil.getLong(data, offset, 8); //目的IP 8 + offset+=8; + backFlow.portSrc = ByteUtil.getInteger(data, offset, 4); //源端口,如果没有,为-1 + offset+=4; + backFlow.portDst = ByteUtil.getInteger(data, offset, 4); //目标端口,如果没有,为-1 + offset+=4; + backFlow.l3Proto = ByteUtil.getInteger(data, offset, 4); //第三层协议ID,如果没有,为-1 + offset+=4; + backFlow.l4Proto = ByteUtil.getInteger(data, offset, 4); //第四层协议ID,如果没有,为-1 + offset+=4; + backFlow.tos = ByteUtil.getInteger(data, offset, 4); //Tos,如果没有,为-1 + offset+=4; + backFlow.vlanId = ByteUtil.getInteger(data, offset, 4); //vlan ID,如果没有,为-1 + offset+=4; + + backFlow.bytes = ByteUtil.getLong(data, offset, 8); // 8 字节总数 + offset+=8; + backFlow.packets = ByteUtil.getLong(data, offset, 8); // 8 数据包总数 + offset+=8; + backFlow.tcpSp = ByteUtil.getLong(data, offset, 8); // 8 tcp同步包数 + offset+=8; + backFlow.tcpScpn = ByteUtil.getLong(data, offset, 8);// 8 tcp同步确认包数 + offset+=8; + backFlow.tcpSrp = ByteUtil.getLong(data, offset, 8); // 8 tcp同步重置包数 + offset+=8; + backFlow.appId = ByteUtil.getInteger(data, offset, 4); // 4 appID + offset+=4; + backFlow.appGroupId = ByteUtil.getInteger(data, offset, 4);// 4 app组ID + offset+=4; + backFlow.mplsLabel = ByteUtil.getInteger(data, offset, 4);// 4 + offset+=4; + + return backFlow; + } + + @Override + public int getUnitPacketLength() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public String[] getData() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getInterfaceNumber() { + // TODO Auto-generated method stub + return 0; + } + +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BasicTrafficFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BasicTrafficFlow.java new file mode 100644 index 0000000..4064dc1 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BasicTrafficFlow.java @@ -0,0 +1,82 @@ +package com.yuandian.dataflow.proto.decode; + + + +public class BasicTrafficFlow extends PacketBase { + public static int SIZE = 56; + public long capPort; + public int requestPort; + public int responsePort; + public long requestIp; + public long responseIp; + public long startTime; + public long totalBytes; + public long totalPackets; + public long spackets64; + public long spackets128; + public long spackets256; + public long spackets512; + public long spackets1024; + public long spackets; + public long sendTime; + + @Override + public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { + BasicTrafficFlow btf = new BasicTrafficFlow(); + btf.setPacketHeader(header); + + btf.capPort = ByteUtil.getLong(data, offset, 4); + offset += 4; + +// btf.requestPort = ByteUtil.getInteger(data, offset, 2); + offset += 2; +// btf.responsePort = ByteUtil.getInteger(data, offset, 2); + offset += 2; +// btf.requestIp = ByteUtil.getLong(data, offset, 4); + offset += 4; +// btf.responseIp = ByteUtil.getLong(data, offset, 4); + offset += 4; + + btf.startTime = ByteUtil.getLong(data, offset, 4); + offset += 4; + btf.totalBytes = ByteUtil.getLong(data, offset, 4); + offset += 4; + btf.totalPackets = ByteUtil.getLong(data, offset, 4); + offset += 4; + btf.spackets64 = ByteUtil.getLong(data, offset, 4); + offset += 4; + btf.spackets128 = ByteUtil.getLong(data, offset, 4); + offset += 4; + btf.spackets256 = ByteUtil.getLong(data, offset, 4); + offset += 4; + btf.spackets512 = ByteUtil.getLong(data, offset, 4); + offset += 4; + btf.spackets1024 = ByteUtil.getLong(data, offset, 4); + offset += 4; + btf.spackets = ByteUtil.getLong(data, offset, 4); + offset += 4; + btf.sendTime = ByteUtil.getLong(data, offset, 4); + offset += 4; + + return btf; + } + @Override + public String[] getData() { + // TODO Auto-generated method stub + return null; + } + @Override + public int getInterfaceNumber() { + // TODO Auto-generated method stub + return 0; + } + @Override + public int getUnitPacketLength() { + // TODO Auto-generated method stub + return 0; + } + + + + +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BusinessBodyData.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BusinessBodyData.java new file mode 100644 index 0000000..d8e5f6f --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BusinessBodyData.java @@ -0,0 +1,29 @@ +package com.yuandian.dataflow.proto.decode; + + +public class BusinessBodyData { + + public String relvanceDataId; + public long request_ip; + public int request_port; + public long response_ip; + public int response_port; + + public long start_tv_sec;//开始时间秒 + public long start_tv_usec;//开始时间毫秒 + public long end_tv_sec;//结束时间秒 + public long end_tv_usec;//结束时间微妙 + + public String resource_code; + public String noParameterRecognition; + public String originalRecognition; + public String requestCookie; + + public String requestBodyContext; + public String responseBodyContext; + + public int filterId; + public String business_detail_mesg; + + +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowDb.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowDb.java new file mode 100644 index 0000000..21731ec --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowDb.java @@ -0,0 +1,244 @@ +package com.yuandian.dataflow.proto.decode; + +import com.ud.module.common.SystemConfigManager; + + + +public class BussFlowDb extends PacketBase { + private int UNIT_DATA_LENGTH = 3362; + + // redis资源归并,处理服务资源发现是需要设置识别串(正则表达式) + public String redisRegex; + + public String id; //id + public int msg_len; //消息长度 + public int msg_type; //消息类型 + public long src_mac; + public long dst_mac; + public int protocol; //协议名 + public String session_serial_number;//会话序列号 + public String buss_type;//业务服务资源编码(C_01) + public long request_ip;//Web客户端IP + public int request_port;//Web客户端端口 + public long response_ip;//Web服务器IP + public int response_port;//Web服务器端口 + public long start_tv_sec;//Web开始时间秒 + public long start_tv_usec;//开始时间毫秒 + public long end_tv_sec;//结束时间秒 + public long end_tv_usec;//结束时间微妙 + public String disc_resource_ident; //web:url mid:apiBody db:sql + public String name; //web:操作系统 midd:interfaceName db:db_name + //web midd + public String session_id;//sessionid + public int request_msg_length; //请求报文长度 + public String request_msg_detail; //请求报文详情 + public int response_msg_length; //响应报文长度 + public String response_msg_detail; //响应报文详情 + //web段 + public String reter_url; + public String x_requested_with; + public long req_method; //请求方式 + public String content_type; //请求类型 + public String accept; //jieshou + public int req_cookie_leng;//请求cookie报文长度 + public String req_cookie_detail;//请求cookie报文详情 + public long t_intodb_time; + public int load_or_step; //0: 页面 1:加载项 2:非web段数据 + public String business_detail_mesg; + public String bussiness_key_mesg; //关键字 格式:key=val|key=val.... + public int isUncomplete; //组包是否完全 0:组包完整 1:不完整 + public int deal_state=1; + public int server_res_code; + public long server_response_time; + public long client_translate_time; + public String browser; + public int server_start_tv_sec; + public long server_start_tv_usec; + public int server_end_tv_sec; + public long server_end_tv_usec; + public String probe_ip; + public long probe_if; + public long server_translate_time; + public long time_flag; + public String base_code; + public String ori_sql; + public String reserved; + public long bytes_in; + public long bytes_out; + public int package_in; + public int package_out; + public String dataId; + public int filterId; + @Override + public int getUnitPacketLength() { + // TODO Auto-generated method stub + return UNIT_DATA_LENGTH; + } + + @SuppressWarnings("unchecked") + public BussFlowDb Parse(PacketHeader header,ByteBuffer data) + throws Exception { + BussFlowDb buss = new BussFlowDb(); +// buss.m_Header = header; + + buss.msg_type = header.getTableID(); +// offset += 4; +// buss.msg_version = ByteUtil.getInteger(data, offset, 1); +// offset += 1; +// buss.msg_seq = ByteUtil.getInteger(data, offset, 4); +// offset += 4; +// buss.msg_len = header.getMsg_len(); +// offset += 4; + +// UNIT_DATA_LENGTH=msg_len; + buss.src_mac = ByteUtil.getLong(data, offset, 8); + offset += 8; + buss.dst_mac = ByteUtil.getLong(data, offset, 8); + if(SystemConfigManager.macFilterFlag && buss.src_mac!=0 && buss.dst_mac==0){ + return null; + } + offset += 8; +// buss.vlan_id = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// buss.tos = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// buss.retran_count = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// buss.reset_count = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// buss.zerowin_count = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.protocol=ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.bytes_out = ByteUtil.getLong(data, offset, 4); + offset += 4; + buss.bytes_in = ByteUtil.getLong(data, offset, 4); + offset += 4; + buss.package_out = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.package_in = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.session_serial_number = ByteUtil.getString(data, offset, 24); + offset += 24; +// buss.recog_status = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.probe_if=ByteUtil.getInteger(data, offset,4); + offset += 4; +// buss.channel=ByteUtil.getString(data, offset, 24); + offset+=24; + buss.name = ByteUtil.getString(data, offset, 64); + offset+=64; + + buss.request_ip=ByteUtil.getLong(data, offset,4); + offset += 4; + buss.request_port=ByteUtil.getInteger(data, offset,4); + offset += 4; + buss.response_ip=ByteUtil.getLong(data, offset,4); + offset += 4; + buss.response_port=ByteUtil.getInteger(data, offset,4); + offset += 4; +// buss.deal_state=ByteUtil.getInteger(data, offset,8); + offset += 8; + buss.server_translate_time=ByteUtil.getLong(data, offset,8); + offset += 8; + buss.client_translate_time=ByteUtil.getLong(data, offset,8); + offset += 8; + buss.server_response_time=ByteUtil.getLong(data, offset,8); + offset += 8; + buss.server_start_tv_sec=ByteUtil.getInteger(data, offset,4); + offset +=4; + buss.server_start_tv_usec=ByteUtil.getLong(data, offset,4); + offset +=4; + //结束时间 + buss.server_end_tv_sec=ByteUtil.getInteger(data, offset,4); + offset +=4; + buss.server_end_tv_usec=ByteUtil.getLong(data, offset,4); + offset +=4; + buss.start_tv_sec=ByteUtil.getLong(data, offset,4); + offset +=4; + buss.start_tv_usec=ByteUtil.getLong(data, offset,4); + offset +=4; + //结束时间 + buss.end_tv_sec=ByteUtil.getLong(data, offset,4); + offset +=4; + buss.end_tv_usec=ByteUtil.getLong(data, offset,4); + offset +=4; + buss.isUncomplete = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.time_flag=ByteUtil.getInteger(data, offset, 4); + offset += 4; + + int sqlLen = ByteUtil.getInteger(data, offset, 4); + offset += 4; + int detailMsgLen = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.request_msg_length = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.response_msg_length = ByteUtil.getInteger(data, offset, 4); + offset += 4; + int reservedLen = ByteUtil.getInteger(data, offset, 4); + offset += 4; + + buss.ori_sql=ByteUtil.getString(data, offset,sqlLen); + offset += sqlLen; + buss.business_detail_mesg=ByteUtil.getString(data, offset, detailMsgLen); + offset += detailMsgLen; + buss.request_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.request_msg_length); + offset += buss.request_msg_length; + buss.response_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.response_msg_length); + offset += buss.response_msg_length; + buss.reserved = ByteUtil.getString(data, offset, reservedLen); + offset += reservedLen; + + /*if(SystemConfigManager.projectName.contains("GDYDBPM")){ + + boolean exist=false; + if(SystemConfigManager.reduceFlag){ + + redisTemplate = (RedisTemplate) ApplicationContextUtil.getBean("redisTemplate"); + exist = reduceByKey(buss); + } + + //不存在就加入缓存 + if(!exist)return buss; + //存在就返回空 + return null; + }*/ + + return buss; + } + + + public String getBussType() { + return buss_type; + } + + @Override + public String[] getData() { + String data[] = new String[45]; + return data; + } + + @Override + public int getInterfaceNumber() { + return 0; + } + + /** + * 如果已经存在这个session_id(即key),则说明是重复数据,不再进行后续操作 + * @param buss + * @return 存在返回true 不存在返回false + */ + /*private boolean reduceByKey(BussFlowDb buss) { + String coding = CodingGeneration.md5GenCoding2(buss.ori_sql); + String key=buss.session_serial_number+"|"+buss.request_ip+"|"+buss.request_port+"|"+buss.response_ip+"|"+buss.response_port+"|"+coding+"|"+buss.business_detail_mesg; + + //分布式同步锁,如果存在该key或者锁已经被获取则返回false,设置 + if(redisTemplate.opsForValue().setIfAbsent(key, System.currentTimeMillis()+"")){ + redisTemplate.expire(key,SystemConfigManager.keyExistTime,TimeUnit.SECONDS); + return false; + } + + return true; + }*/ +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowExternal.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowExternal.java new file mode 100644 index 0000000..1fbd515 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowExternal.java @@ -0,0 +1,132 @@ +package com.yuandian.dataflow.proto.decode; + +import java.util.Date; + + + + +public class BussFlowExternal extends PacketBase { + private final int UNIT_DATA_LENGTH =871; + + @Override + public int getUnitPacketLength() { + return UNIT_DATA_LENGTH; + } + + public BussFlowExternal Parse(PacketHeader header,ByteBuffer data) + throws Exception { + if (offset + UNIT_DATA_LENGTH > data.length) + throw new Exception("data length error!"); + BussFlowExternal buss = new BussFlowExternal(); +// buss.m_Header = header; + + buss.probe_if=ByteUtil.getLong(data, offset,4); + offset += 4; + buss.channel=ByteUtil.getString(data, offset,24); + offset += 24; + buss.interfaceid=ByteUtil.getString(data, offset,100); + offset += 100; + buss.systemName=ByteUtil.getString(data, offset,32); + offset += 32; + buss.net_type=ByteUtil.getString(data, offset,6); + offset += 6; + buss.net_segment = ByteUtil.getString(data, offset, 5); + offset += 5; + buss.session_id=ByteUtil.getString(data, offset,80); + offset += 80; + buss.phoneid=ByteUtil.getString(data, offset,12); + offset += 12; + + buss.request_ip=ByteUtil.getLong(data, offset,4); + offset += 4; + buss.request_port=ByteUtil.getLong(data, offset,4); + offset += 4; + buss.response_ip=ByteUtil.getLong(data, offset,4); + offset += 4; + buss.response_port=ByteUtil.getLong(data, offset,4); + offset += 4; + //开始时间 + buss.start_tv_sec=ByteUtil.getLong(data, offset,4); + offset +=4; + buss.start_tv_usec=ByteUtil.getLong(data, offset,4); + offset +=4; + //结束时间 + buss.end_tv_sec=ByteUtil.getLong(data, offset,4); + offset +=4; + buss.end_tv_usec=ByteUtil.getLong(data, offset,4); + offset +=4; + + buss.deal_state=ByteUtil.getInteger(data, offset,4); + offset += 4; + buss.server_translate_time=ByteUtil.getLong(data, offset,8); + offset += 8; + buss.server_response_time=ByteUtil.getLong(data, offset,8); + offset += 8; + buss.begin_url=ByteUtil.getString(data, offset,100); + offset += 100; + buss.operating_sytem=ByteUtil.getString(data, offset,20); + offset += 20; + buss.server_res_code=ByteUtil.getInteger(data, offset,2); + offset += 2; + buss.browser=ByteUtil.getString(data, offset,30); + offset += 30; + buss.business_detail_mesg=ByteUtil.getString(data, offset,200); + offset += 200; + buss.business_involve_msg = ByteUtil.getString(data, offset,200); + offset += 200; + isUncomplete = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.time_flag=ByteUtil.getLong(data, offset, 4); + offset += 4; + return buss; + } + + public String id; + public Long probe_if;//接口号 + public String channel;//营业厅渠道:前台或者分析服务器给出,渠道标识 + public String systemName; //外部系统名称 + public String interfaceid;// 业务接口编码 + public String session_id;//sessionid + public String phoneid;//受理手机号码 + public String net_type; + public String net_segment;//网段标识(客户-web) + public Long request_ip;//Web客户端IP + public Long request_port;//Web客户端端口 + public Long response_ip;//Web服务器IP + public Long response_port;//Web服务器端口 + + public Long start_tv_sec;//Web开始时间秒 + public Long start_tv_usec;//开始时间微秒 + public Long end_tv_sec;//结束时间秒 + public Long end_tv_usec;//结束时间微妙 + + public Integer deal_state;//Web操作成功/失败标识1成功0失败 + public Long server_translate_time;//Web服务器传输耗时 + public Long server_response_time;//Web服务器响应时间 + public String begin_url;//url + public String operating_sytem;//操作系统 + public Integer server_res_code; //Web系统返回码 + public String browser;//浏览器 + public String business_detail_mesg;//要获取的指标 + public Date insert_time;//插入时间 + public String business_involve_msg; //要关联的指标 + public Integer isUncomplete; + public Long time_flag; + + + @Override + public String[] getData() { + String data[] = new String[37]; + return data; + } + + @Override + public int getInterfaceNumber() { + + return Integer.parseInt(probe_if.toString()); + } + + + + +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowMidd.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowMidd.java new file mode 100644 index 0000000..afc31e3 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowMidd.java @@ -0,0 +1,267 @@ +package com.yuandian.dataflow.proto.decode; + +import java.util.concurrent.TimeUnit; + + +import com.ud.module.common.ApplicationContextUtil; +import com.ud.module.common.SystemConfigManager; + +import com.ud.module.util.code.CodingGeneration; + +public class BussFlowMidd extends PacketBase { + private int UNIT_DATA_LENGTH =3569; + + public String id; //id + public long src_mac; + public long dst_mac; + public int msg_len; //消息长度 + public int msg_type; //消息类型 + public int protocol; //协议名 + public String session_serial_number;//会话序列号 + public String buss_type;//业务服务资源编码(C_01) + public String net_segment;//网段标识(客户-web) + public long request_ip;//Web客户端IP + public int request_port;//Web客户端端口 + public long response_ip;//Web服务器IP + public int response_port;//Web服务器端口 + public long start_tv_sec;//Web开始时间秒 + public long start_tv_usec;//开始时间毫秒 + public long end_tv_sec;//结束时间秒 + public long end_tv_usec;//结束时间微妙 + public String disc_resource_ident; //web:url mid:apiBody db:sql + public String name; //web:操作系统 midd:interfaceName db:db_name + //web midd + public String session_id;//sessionid + public int request_msg_length; //请求报文长度 + public String request_msg_detail; //请求报文详情 + public int response_msg_length; //响应报文长度 + public String response_msg_detail; //响应报文详情 + //web段 + public String reter_url; + public String x_requested_with; + public long req_method; //请求方式 + public String content_type; //请求类型 + public String accept; //jieshou + public int req_cookie_leng;//请求cookie报文长度 + public String req_cookie_detail;//请求cookie报文详情 + public long t_intodb_time; + public int load_or_step; //0: 页面 1:加载项 2:非web段数据 + public String business_detail_mesg; + public String bussiness_key_mesg; //关键字 格式:key=val|key=val.... + public int isUncomplete; //组包是否完全 0:组包完整 1:不完整 + public int deal_state=1; + public int server_res_code; + public long server_response_time; + public long client_translate_time; + public String browser; + public int server_start_tv_sec; + public long server_start_tv_usec; + public int server_end_tv_sec; + public long server_end_tv_usec; + public String probe_ip; + public int probe_if; + public long server_translate_time; + public long time_flag; + public String channel; + public String base_code; + public String ori_api; + public String remain_data; + public long bytes_in; + public long bytes_out; + public int package_in; + public int package_out; + public String dataId; + public int filterId; + + @Override + public int getUnitPacketLength() { + // TODO Auto-generated method stub + return UNIT_DATA_LENGTH; + } + + public BussFlowMidd Parse(PacketHeader header,ByteBuffer data) + throws Exception { + BussFlowMidd buss = new BussFlowMidd(); +// buss.m_Header = header; + + buss.msg_type = header.getTableID(); +// offset += 4; +// buss.msg_version = ByteUtil.getInteger(data, offset, 1); +// offset += 1; +// buss.msg_seq = ByteUtil.getInteger(data, offset, 4); +// offset += 4; +// buss.msg_len = ByteUtil.getInteger(data, offset, 4); +// offset += 4; + +// UNIT_DATA_LENGTH=msg_len; + buss.src_mac = ByteUtil.getLong(data, offset, 8); + offset += 8; + buss.dst_mac = ByteUtil.getLong(data, offset, 8); + if(SystemConfigManager.macFilterFlag && buss.src_mac!=0 && buss.dst_mac==0){ + return null; + } + offset += 8; +// buss.vlan_id = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// buss.tos = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// buss.retran_count = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// buss.reset_count = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// buss.zerowin_count = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.protocol=ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.bytes_out = ByteUtil.getLong(data, offset, 4); + offset += 4; + buss.bytes_in = ByteUtil.getLong(data, offset, 4); + offset += 4; + buss.package_out = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.package_in = ByteUtil.getInteger(data, offset, 4); + offset += 4; + + buss.session_serial_number = ByteUtil.getString(data, offset, 24); + offset += 24; +// buss.recog_status = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// buss.req_method = ByteUtil.getLong(data, offset, 4); +// offset += 4; +// buss.content_type = ByteUtil.getString(data, offset, 40); +// offset += 40; +// buss.accept = ByteUtil.getString(data, offset, 40); +// offset += 40; + buss.probe_if=ByteUtil.getInteger(data, offset,4); + offset += 4; +// buss.buss_type =ByteUtil.getString(data, offset,10); +// offset += 10; + buss.channel=ByteUtil.getString(data, offset,24); + offset += 24; +// buss.session_id=ByteUtil.getString(data, offset,36); +// offset += 36; +// buss.net_segment=ByteUtil.getString(data, offset,6); +// offset += 6; + buss.request_ip=ByteUtil.getLong(data, offset,4); + offset += 4; + buss.request_port=ByteUtil.getInteger(data, offset,4); + offset += 4; + buss.response_ip=ByteUtil.getLong(data, offset,4); + offset += 4; + buss.response_port=ByteUtil.getInteger(data, offset,4); + offset += 4; + //开始时间 + buss.start_tv_sec=ByteUtil.getLong(data, offset,4); + offset +=4; + buss.start_tv_usec=ByteUtil.getLong(data, offset,4); + offset +=4; + //结束时间 + buss.end_tv_sec=ByteUtil.getLong(data, offset,4); + offset +=4; + buss.end_tv_usec=ByteUtil.getLong(data, offset,4); + offset +=4; +// buss.deal_state=ByteUtil.getInteger(data, offset,4); + offset +=4; + buss.server_res_code = ByteUtil.getInteger(data, offset, 2); + offset +=2; + buss.server_translate_time=ByteUtil.getLong(data, offset,8); + offset += 8; + + buss.server_start_tv_sec=ByteUtil.getInteger(data, offset,4); + offset +=4; + buss.server_start_tv_usec=ByteUtil.getLong(data, offset,4); + offset +=4; + //结束时间 + buss.server_end_tv_sec=ByteUtil.getInteger(data, offset,4); + offset +=4; + buss.server_end_tv_usec=ByteUtil.getLong(data, offset,4); + offset +=4; + + buss.server_response_time=ByteUtil.getLong(data, offset,8); + offset += 8; + buss.client_translate_time=ByteUtil.getLong(data, offset,8); + offset += 8; + buss.isUncomplete = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.time_flag=ByteUtil.getInteger(data, offset, 4); + offset +=4; + int detailMsgLen = ByteUtil.getInteger(data, offset, 4); + offset += 4; + int keyMsgLen = ByteUtil.getInteger(data, offset, 4); + offset += 4; + int apiLen = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.request_msg_length = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.response_msg_length = ByteUtil.getInteger(data, offset, 4); + offset += 4; + int remainLen = ByteUtil.getInteger(data, offset, 4); + offset += 4; + + buss.business_detail_mesg=ByteUtil.getString(data, offset,detailMsgLen); + offset += detailMsgLen; + buss.bussiness_key_mesg = ByteUtil.getString(data, offset, keyMsgLen); + offset += keyMsgLen; + buss.ori_api=ByteUtil.getString(data, offset, apiLen); + offset += apiLen; + buss.request_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.request_msg_length); + offset += buss.request_msg_length; + buss.response_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.response_msg_length); + offset += buss.response_msg_length; +// buss.remain_data = ByteUtil.getString(data, offset, remainLen); + offset += remainLen; + + + + /*if(SystemConfigManager.projectName.contains("GDYDBPM")){ + boolean exist=false; + if(SystemConfigManager.reduceFlag){ + + redisTemplate = (RedisTemplate) ApplicationContextUtil.getBean("redisTemplate"); + exist = reduceByKey(buss); + } + + //不存在就加入缓存 + if(!exist)return buss; + //存在就返回空 + return null; + }*/ + + return buss; + } + + public String getBussType() { + return buss_type; + } + + @Override + public String[] getData() { + String data[] = new String[35];//贵阳 + + return data; + } + + @Override + public int getInterfaceNumber() { + + return 0; + } + + /** + * 如果已经存在这个session_id(即key),则说明是重复数据,不再进行后续操作 + * @param buss + * @return + */ + /*private boolean reduceByKey(BussFlowMidd buss) { + String coding = CodingGeneration.md5GenCoding2(buss.ori_api); + String key=buss.session_serial_number+"|"+buss.request_ip+"|"+buss.request_port+"|"+buss.response_ip+"|"+buss.response_port+"|"+coding+"|"+buss.business_detail_mesg; + + //分布式同步锁,如果存在该key或者锁已经被获取则返回false,设置 + if(redisTemplate.opsForValue().setIfAbsent(key, System.currentTimeMillis()+"")){ + redisTemplate.expire(key,SystemConfigManager.keyExistTime,TimeUnit.SECONDS); + return false; + } + + return true; + }*/ +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowOrl.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowOrl.java new file mode 100644 index 0000000..b41a7db --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowOrl.java @@ -0,0 +1,111 @@ +package com.yuandian.dataflow.proto.decode; + + + +public class BussFlowOrl extends PacketBase { + + public int msg_type; + public int msg_version; + public int msg_seq; + public int msg_len; + public long request_mac; + public long response_mac; + public long request_ip; + public int request_port; + public long response_ip; + public int response_port; + public int probeif; + public int protocol; + public long start_tv_sec;//Web开始时间秒 + public long start_tv_usec;//开始时间毫秒 + public long end_tv_sec;//结束时间秒 + public long end_tv_usec;//结束时间微妙 + public int req_len; + public int res_len; + public int busi_msg_len; + public int key_msg_len; + public int detail_msg_len; + public int remain_len; + public String business_code; + public String sessionid; + public String req_data; + public String res_data; + public String busi_msg; + public String busi_key_msg; + public String busi_detail_msg; + public String remain_data; + + @Override + public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { + BussFlowOrl orl = new BussFlowOrl(); + orl.msg_type = header.getNow_type(); + orl.request_mac = ByteUtil.getLong(data, offset, 8); + offset += 8; + orl.response_mac = ByteUtil.getLong(data, offset, 8); + offset += 8; + orl.request_ip = ByteUtil.getLong(data, offset, 4); + offset += 4; + orl.request_port = ByteUtil.getInteger(data, offset, 4); + offset += 4; + orl.response_ip = ByteUtil.getLong(data, offset, 4); + offset += 4; + orl.response_port = ByteUtil.getInteger(data, offset, 4); + offset += 4; + orl.probeif = ByteUtil.getInteger(data, offset, 4); + offset += 4; + orl.protocol = ByteUtil.getInteger(data, offset, 4); + offset += 4; + orl.start_tv_sec = ByteUtil.getLong(data, offset, 4); + offset += 4; + orl.start_tv_usec = ByteUtil.getLong(data, offset, 4); + offset += 4; + orl.end_tv_sec = ByteUtil.getLong(data, offset, 4); + offset += 4; + orl.end_tv_usec = ByteUtil.getLong(data, offset, 4); + offset += 4; + int req_len = ByteUtil.getInteger(data, offset, 4); + offset += 4; + int res_len = ByteUtil.getInteger(data, offset, 4); + offset += 4; + int busi_msg_len = ByteUtil.getInteger(data, offset, 4); + offset += 4; + int key_msg_len = ByteUtil.getInteger(data, offset, 4); + offset += 4; + int detail_msg_len = ByteUtil.getInteger(data, offset, 4); + offset += 4; + int remain_len = ByteUtil.getInteger(data, offset, 4); + offset += 4; + orl.business_code = ByteUtil.getString(data, offset, 32); + offset += 32; + orl.sessionid = ByteUtil.getString(data, offset, 80); + offset += 80; + orl.req_data = ByteUtil.getString(data, offset, req_len); + offset += req_len; + orl.res_data = ByteUtil.getString(data, offset, res_len); + offset += res_len; + orl.busi_msg = ByteUtil.getString(data, offset, busi_msg_len); + offset += busi_msg_len; + orl.busi_key_msg = ByteUtil.getString(data, offset, key_msg_len); + offset += key_msg_len; + orl.busi_detail_msg = ByteUtil.getString(data, offset, detail_msg_len); + offset += detail_msg_len; + orl.remain_data = ByteUtil.getString(data, offset, remain_len); + offset += remain_len; + return orl; + } + + @Override + public int getUnitPacketLength() { + return 0; + } + + @Override + public String[] getData() { + return new String[0]; + } + + @Override + public int getInterfaceNumber() { + return 0; + } +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowWeb.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowWeb.java new file mode 100644 index 0000000..c7b2e99 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowWeb.java @@ -0,0 +1,362 @@ +package com.yuandian.dataflow.proto.decode; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + + +import com.ud.module.common.Constants; +import com.ud.module.common.SystemConfigManager; +import com.yuandian.dataflow.proto.decode.PacketBase; +import com.yuandian.dataflow.proto.decode.PacketHeader; + +import com.ud.module.util.flow.JXYDBPMDetailMesgUtils; +import com.ud.module.util.flow.URLUtil; + + +public class BussFlowWeb extends PacketBase { + private Integer UNIT_DATA_LENGTH =6104; + + private static int i = 1; + @Override + public int getUnitPacketLength() { + return UNIT_DATA_LENGTH; + } + + @SuppressWarnings("unchecked") + public BussFlowWeb Parse(PacketHeader header,ByteBuffer data) + throws Exception { + i++; + BussFlowWeb buss = new BussFlowWeb(); +// buss.m_Header = header; + + buss.msg_type = header.getTableID(); +// offset += 4; +// buss.msg_version = ByteUtil.getInteger(data, offset, 1); +// offset += 1; +// buss.msg_seq = ByteUtil.getInteger(data, offset, 4); +// offset += 4; +// buss.msg_len = ByteUtil.getInteger(data, offset, 4); +// offset += 4; + + buss.src_mac = ByteUtil.getLong(data, offset, 8); + offset += 8; + buss.dst_mac = ByteUtil.getLong(data, offset, 8); + offset += 8; +// buss.vlan_id = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// buss.tos = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// buss.retran_count = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// buss.reset_count = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// buss.zerowin_count = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.protocol=ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.bytes_out = ByteUtil.getLong(data, offset, 4); + offset += 4; + buss.bytes_in = ByteUtil.getLong(data, offset, 4); + offset += 4; + buss.package_out = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.package_in = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.session_serial_number = ByteUtil.getString(data, offset, 24); //5268109200 + offset += 24; +// buss.recog_status = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.req_method = ByteUtil.getLong(data, offset, 4); //1 + offset += 4; + buss.content_type = ByteUtil.getString(data, offset, 40); + offset += 40; + buss.accept = ByteUtil.getString(data, offset, 40); //image/png, image/svg+xml, image/*;q=0.8 + offset += 40; + buss.probe_if=ByteUtil.getInteger(data, offset,4); + offset += 4; + buss.channel=ByteUtil.getString(data, offset,24); + offset += 24; + buss.session_id=ByteUtil.getString(data, offset,80); //7Ip6bYet8n0wfxeJMJtAm7v2FW76ZTImJVyNjbYoRbUdUf71QaxQ!-202316728 + offset += 80; + buss.request_ip=ByteUtil.getLong(data, offset,4); //3232235850 + offset += 4; + buss.request_port=ByteUtil.getInteger(data, offset,4); //3309 + offset += 4; + buss.response_ip=ByteUtil.getLong(data, offset,4); //1971882754 + offset += 4; + buss.response_port=ByteUtil.getInteger(data, offset,4); //80 + offset += 4; + //开始时间 + buss.start_tv_sec=ByteUtil.getLong(data, offset,4); //1492473697 + offset +=4; + buss.start_tv_usec=ByteUtil.getLong(data, offset,4); //459461 + offset +=4; + //结束时间 + buss.end_tv_sec=ByteUtil.getLong(data, offset,4); //1492473697 + offset +=4; + buss.end_tv_usec=ByteUtil.getLong(data, offset,4); //459490 + offset +=4; + + buss.server_translate_time=ByteUtil.getLong(data, offset,8); + offset += 8; + + buss.server_start_tv_sec=ByteUtil.getInteger(data, offset,4); + offset +=4; + buss.server_start_tv_usec=ByteUtil.getLong(data, offset,4); + offset +=4; + //结束时间 + buss.server_end_tv_sec=ByteUtil.getInteger(data, offset,4); + offset +=4; + buss.server_end_tv_usec=ByteUtil.getLong(data, offset,4); + offset +=4; + + buss.server_response_time=ByteUtil.getLong(data, offset,8); + offset += 8; + buss.client_translate_time=ByteUtil.getLong(data, offset,8); + offset += 8; +// buss.locate_server_translate_time=ByteUtil.getLong(data, offset,8); + offset += 8; +// buss.locate_server_response_time=ByteUtil.getLong(data, offset,8); + offset += 8; +// buss.locate_client_translate_time=ByteUtil.getLong(data, offset,8); + offset += 8; + buss.x_requested_with = ByteUtil.getString(data, offset, 20); + offset += 20; + buss.operating_system=ByteUtil.getString(data, offset,20); //Windows NT 6.1 + offset += 20; + buss.server_res_code=ByteUtil.getInteger(data, offset,2); + offset += 2; + buss.browser=ByteUtil.getString(data, offset,30); + offset += 30; + buss.isUncomplete = ByteUtil.getInteger(data, offset, 4); + offset += 4; + buss.time_flag=ByteUtil.getInteger(data, offset, 4); + offset += 4; + int detailMsgLen = ByteUtil.getInteger(data, offset, 4); + offset += 4; + int keyMsgLen = ByteUtil.getInteger(data, offset, 4); + offset += 4; + int reqMsgLeng= ByteUtil.getInteger(data, offset, 4); + buss.request_msg_length =reqMsgLeng; + offset += 4; + int resMsgLength=ByteUtil.getInteger(data, offset, 4); + buss.response_msg_length = resMsgLength; + offset += 4; + int cookieLength=ByteUtil.getInteger(data, offset, 4); + buss.req_cookie_leng=cookieLength; + offset +=4; + int url_len=ByteUtil.getInteger(data, offset, 4); + offset +=4; + int refer_len=ByteUtil.getInteger(data, offset, 4); + offset += 4; + int remainLen = ByteUtil.getInteger(data, offset, 4); + offset += 4; + +// if (SystemConfigManager.systemCode.equals("JXYDBPM")) { +// buss.business_detail_mesg=JXYDBPMDetailMesgUtils.decodeValue(ByteUtil.getString(data, offset,detailMsgLen)); +// } else { +// buss.business_detail_mesg=ByteUtil.getString(data, offset,detailMsgLen); +// } + buss.business_detail_mesg=ByteUtil.getString(data, offset,detailMsgLen); + offset += detailMsgLen; + buss.bussiness_key_mesg = ByteUtil.getString(data, offset,keyMsgLen); + offset += keyMsgLen; + buss.request_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, reqMsgLeng); + offset += reqMsgLeng; + buss.response_msg_detail = ByteUtil.getStringIgnoreZero(data, offset,resMsgLength); + offset +=resMsgLength; + buss.req_cookie_detail = ByteUtil.getString(data, offset,cookieLength); + offset += cookieLength; + buss.ori_url=ByteUtil.getString(data, offset,url_len); // /images/index/220130.gif + if(buss.ori_url!=null && !"".equals(buss.ori_url.trim())){ + String result = buss.ori_url.replaceAll("\\?", "").replaceAll("\\/", "").replaceAll("\\*", "").replaceAll("\\=", "").replaceAll("\\&", "").replaceAll("\\+", ""); + if(isMessyCode(result)){//如果中文乱码 + String withoutValue = URLUtil.getInsertURL(buss.ori_url, Constants.SPLIT_NULL).trim();//去参数值 + String result2 = withoutValue.replaceAll("\\?", "").replaceAll("\\/", "").replaceAll("\\*", "").replaceAll("\\=", "").replaceAll("\\&", "").replaceAll("\\+", ""); + if(isMessyCode(result2)){//判断是否还乱码 + buss.ori_url=""; + }else{ + buss.ori_url=withoutValue; + } + }; + } + offset += url_len; + buss.reter_url = ByteUtil.getString(data, offset, refer_len); // http://www.10086.cn/gd/index_200_200.html + offset += refer_len; + buss.remain_data = ByteUtil.getString(data, offset, remainLen); + offset += remainLen; + /*if(SystemConfigManager.projectName.contains("GDYDBPM")){ + boolean exist=false; + if(SystemConfigManager.reduceFlag){ + + redisTemplate = (RedisTemplate) ApplicationContextUtil.getBean("redisTemplate"); + exist = reduceByKey(buss); + } + + //不存在就加入缓存 + if(!exist)return buss; + //存在就返回空 + return null; + } + */ + + return buss; + } + + + /** + * 如果已经存在这个key,则说明是重复数据,不再进行后续操作 + * @param buss + * @return + */ + /*private boolean reduceByKey(BussFlowWeb buss) { + + + String coding = CodingGeneration.md5GenCoding2(buss.ori_url); + String key=buss.session_serial_number+"|"+buss.request_ip+"|"+buss.request_port+"|"+buss.response_ip+"|"+buss.response_port+"|"+coding+"|"+buss.business_detail_mesg; + + //分布式同步锁,如果存在该key或者锁已经被获取则返回false,设置 + + boolean flag=redisTemplate.opsForValue().setIfAbsent(key, System.currentTimeMillis()+""); + + if(flag){ + redisTemplate.expire(key,SystemConfigManager.keyExistTime,TimeUnit.SECONDS); + //原来不存在这条数据 + return false; + } + + return true; + }*/ + + /** + * 判断字符串是否是乱码 + * + * @param strName 字符串 + * @return 是否是乱码 + */ + + public static boolean isMessyCode(String strName) { + Pattern p = Pattern.compile("\\s*|\t*|\r*|\n*"); + Matcher m = p.matcher(strName); + String after = m.replaceAll("").replace("-", "").replace("|", ""); + String temp = after.replaceAll("\\p{P}", ""); + char[] ch = temp.trim().toCharArray(); + float chLength = 0 ; + float count = 0; + for (int i = 0; i < ch.length; i++) { + char c = ch[i]; + if (!Character.isLetterOrDigit(c)) { + if (!isChinese(c)) { + count = count + 1; + } + chLength++; + } + } + float result = count / chLength ; + if (result > 0.4) { + return true; + } else { + return false; + } + } + + private static boolean isChinese(char c) { + Character.UnicodeBlock ub = Character.UnicodeBlock.of(c); + if (ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS + || ub == Character.UnicodeBlock.CJK_COMPATIBILITY_IDEOGRAPHS + || ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS_EXTENSION_A + || ub == Character.UnicodeBlock.GENERAL_PUNCTUATION + || ub == Character.UnicodeBlock.CJK_SYMBOLS_AND_PUNCTUATION + || ub == Character.UnicodeBlock.HALFWIDTH_AND_FULLWIDTH_FORMS) { + return true; + } + return false; + } + + public String id; //id + public int msg_len; //消息长度 + public int msg_type; //消息类型 + public long src_mac; + public long dst_mac; + public int protocol; //协议名 + public String session_serial_number;//会话序列号 + public String buss_type;//业务服务资源编码(C_01) +// public String net_segment;//网段标识(客户-web) + public long request_ip;//Web客户端IP + public int request_port;//Web客户端端口 + public long response_ip;//Web服务器IP + public int response_port;//Web服务器端口 + public long start_tv_sec;//Web开始时间秒 + public long start_tv_usec;//开始时间毫秒 + public long end_tv_sec;//结束时间秒 + public long end_tv_usec;//结束时间微妙 + public String disc_resource_ident; //web:url mid:apiBody db:sql + public String operating_system; //web:操作系统 midd:interfaceName db:db_name + //web midd + public String session_id;//sessionid + public int request_msg_length; //请求报文长度 + public String request_msg_detail; //请求报文详情 + public int response_msg_length; //响应报文长度 + public String response_msg_detail; //响应报文详情 + //web段 + public String reter_url; + public String x_requested_with; + public long req_method; //请求方式 + public String content_type; //请求类型 + public String accept; //jieshou + public int req_cookie_leng;//请求cookie报文长度 + public String req_cookie_detail;//请求cookie报文详情 + public long t_intodb_time; + public int load_or_step; //0: 页面 1:加载项 2:非web段数据 + public String business_detail_mesg; + public String bussiness_key_mesg; //关键字 格式:key=val|key=val.... + public int isUncomplete; //组包是否完全 0:组包完整 1:不完整 + public int deal_state=1; + public int server_res_code; + public long server_response_time; + public long client_translate_time; + public String browser; + public int server_start_tv_sec; + public long server_start_tv_usec; + public int server_end_tv_sec; + public long server_end_tv_usec; + public String probe_ip; + public int probe_if; + public long server_translate_time; + public long time_flag; + public String channel; + public String base_code; + public String ori_url; + public String remain_data; + public long bytes_in; + public long bytes_out; + public int package_in; + public int package_out; + + public String dataId; + + public int filterId; + + public String whiteCharacter; + + // tokenId + public String tokenId; + // 判断是否是首端资源( 2 ) + public Integer segmentId; + + @Override + public String[] getData() { + String data[] = new String[38]; + return data; + } + + public String getBussType() { + return buss_type; + } + + + @Override + public int getInterfaceNumber() { + return 0; + } +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/DataBaseModel.java b/src/main/java/com/yuandian/dataflow/.proto/decode/DataBaseModel.java new file mode 100644 index 0000000..6f062a7 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/DataBaseModel.java @@ -0,0 +1,379 @@ +package com.yuandian.dataflow.proto.decode; + + + +import java.io.Serializable; +import java.util.Date; + + + +/** + * 移植zjm流量信息接收类 + * Leehr + * + */ +public class DataBaseModel extends PacketBase implements Serializable{ + + + private static final long serialVersionUID = 1L; + + private static final String split=";"; + + private long mac_src; //源MAC + private long mac_dst; //目标MAC + private long ip_src; //源IP + private long ip_dst; //目标IP + private long port_src; //源端口,如果没有,为-1 + private long port_dst; //目标端口,如果没有,为-1 + private long l3_proto; //第三层协议ID,如果没有,为-1 + private long l4_proto; //第四层协议ID,如果没有,为-1 + private long tos; //Tos,一个字节,如果没有,为-1 + private long vlan_id; //vlan ID,如果没有,为-1 + + private long bytes; //字节总数 + private long packets; //数据包总数 + + private long packets_syn; //TCP同步包数 + private long packets_syn_ack; //TCP同步确认包数 + private long packets_syn_rst; //TCP同步重置包数 + + private long timestamp; //时间戳,秒 + + private long appid;//应用ID + private long app_group_id; + private long mpls_label; + + private long pkts_syn_rx; //tcp同步包,接收 + private long pkts_syn_ack_rx;//tcp同步确认包,接收 + private long pkts_syn_rst_rx; //tcp同步重置包,接收 + private long pkts_fin;//tcp终止包,接收 + private long pkts_rst;//tcp重置包,接收 + + private long bytes_rx;//字节收 + private long packets_rx;//数据包收 + + private long probe_time_sec; + private Date probe_time; + private Date createTime; + + public String getKey(){ + StringBuffer strb=new StringBuffer(); + strb.append(timestamp).append(split); + strb.append(mac_src).append(split); + strb.append(mac_dst).append(split); + strb.append(ip_src).append(split); + strb.append(ip_dst).append(split); + strb.append(port_src).append(split); + strb.append(port_dst).append(split); + strb.append(l3_proto).append(split); + strb.append(l4_proto).append(split); + strb.append(tos).append(split); + strb.append(vlan_id).append(split); + strb.append(appid); + return strb.toString(); + } + + public DataBaseModel Parse(byte[] data, int offset)throws Exception { + + if (offset + 108 > data.length) + throw new Exception("data length error!"); + + DataBaseModel db = new DataBaseModel(); + db.mac_src = ByteUtil.getLong(data, offset, 8); + offset+=8; + db.mac_dst = ByteUtil.getLong(data, offset, 8); + offset+=8; + db.ip_src = ByteUtil.getLong(data, offset, 8); + offset+=8; + db.ip_dst = ByteUtil.getLong(data, offset, 8); + offset+=8; + + db.port_src = ByteUtil.getInteger(data, offset, 4); + offset+=4; + db.port_dst = ByteUtil.getInteger(data, offset, 4); + offset+=4; + db.l3_proto = ByteUtil.getInteger(data, offset, 4); + offset+=4; + db.l4_proto = ByteUtil.getInteger(data, offset, 4); + offset+=4; + db.tos = ByteUtil.getInteger(data, offset, 4); + offset+=4; + db.vlan_id = ByteUtil.getInteger(data, offset, 4); + offset+=4; + db.bytes = ByteUtil.getLong(data, offset, 8); + offset+=8; + db.packets = ByteUtil.getLong(data, offset, 8); + offset+=8; + db.packets_syn = ByteUtil.getLong(data, offset, 8); + + offset+=8; + db.packets_syn_ack = ByteUtil.getLong(data, offset, 8); + offset+=8; + db.packets_syn_rst = ByteUtil.getLong(data, offset, 8); + offset+=8; + /** + * 2015-12-10txy 新增3个解析 + */ + db.appid=ByteUtil.getLong(data, offset, 4); + offset+=4; + db.app_group_id=ByteUtil.getLong(data, offset, 4); + offset+=4; + db.mpls_label=ByteUtil.getLong(data, offset, 4); + offset+=4; + return db; + } + + public String toString(){ + StringBuffer strb = new StringBuffer(); + strb.append(timestamp).append(","); + strb.append(mac_src).append(","); + strb.append(mac_dst).append(","); + strb.append(ip_src).append(","); + strb.append(ip_dst).append(","); + strb.append(port_src).append(","); + strb.append(port_dst).append(","); + strb.append(l3_proto).append(","); + strb.append(l4_proto).append(","); + strb.append(tos).append(","); + strb.append(vlan_id).append(","); + strb.append(appid).append(","); + strb.append(packets_syn).append(","); + strb.append(pkts_syn_rx).append(","); + strb.append(packets_syn_ack).append(","); + strb.append(pkts_syn_ack_rx).append(","); + strb.append(packets_syn_rst).append(","); + strb.append(pkts_syn_rst_rx).append(","); + strb.append(pkts_fin).append(","); + strb.append(pkts_rst).append(","); + strb.append(bytes).append(","); + strb.append(packets).append(","); + strb.append(bytes_rx).append(","); + strb.append(packets_rx).append(","); + strb.append(app_group_id).append(","); + strb.append(mpls_label).append("\n"); + return strb.toString(); + } + + + public long getProbeTimeSec() { + return probe_time_sec; + } + + public void setProbe_time_sec(long probe_time_sec) { + this.probe_time_sec = probe_time_sec; + } + + public Date getProbe_time() { + return probe_time; + } + + public void setProbe_time(Date probe_time) { + this.probe_time = probe_time; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public long getTimestamp() { + return timestamp; + } + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + public long getMac_src() { + return mac_src; + } + public void setMac_src(long macSrc) { + mac_src = macSrc; + } + public long getMac_dst() { + return mac_dst; + } + public void setMac_dst(long macDst) { + mac_dst = macDst; + } + public long getIp_src() { + return ip_src; + } + public void setIp_src(long ipSrc) { + ip_src = ipSrc; + } + public long getIp_dst() { + return ip_dst; + } + public void setIp_dst(long ipDst) { + ip_dst = ipDst; + } + public long getPort_src() { + return port_src; + } + public void setPort_src(long portSrc) { + port_src = portSrc; + } + public long getPort_dst() { + return port_dst; + } + public void setPort_dst(long portDst) { + port_dst = portDst; + } + public long getL3_proto() { + return l3_proto; + } + public void setL3_proto(long l3Proto) { + l3_proto = l3Proto; + } + public long getL4_proto() { + return l4_proto; + } + public void setL4_proto(long l4Proto) { + l4_proto = l4Proto; + } + public long getTos() { + return tos; + } + public void setTos(long tos) { + this.tos = tos; + } + public long getVlan_id() { + return vlan_id; + } + public void setVlan_id(long vlanId) { + vlan_id = vlanId; + } + public long getAppid() { + return appid; + } + public void setAppid(long appid) { + this.appid = appid; + } + + public long getPkts_syn_rx() { + return pkts_syn_rx; + } + public void setPkts_syn_rx(long pktsSynRx) { + pkts_syn_rx = pktsSynRx; + } + + public long getPkts_syn_ack_rx() { + return pkts_syn_ack_rx; + } + public void setPkts_syn_ack_rx(long pktsSynAckRx) { + pkts_syn_ack_rx = pktsSynAckRx; + } + + public long getPkts_syn_rst_rx() { + return pkts_syn_rst_rx; + } + public void setPkts_syn_rst_rx(long pktsSynRstRx) { + pkts_syn_rst_rx = pktsSynRstRx; + } + public long getPkts_fin() { + return pkts_fin; + } + public void setPkts_fin(long pktsFin) { + pkts_fin = pktsFin; + } + public long getPkts_rst() { + return pkts_rst; + } + public void setPkts_rst(long pktsRst) { + pkts_rst = pktsRst; + } + public long getBytes() { + return bytes; + } + public void setBytes(long bytes) { + this.bytes = bytes; + } + public long getPackets() { + return packets; + } + public void setPackets(long packets) { + this.packets = packets; + } + public long getBytes_rx() { + return bytes_rx; + } + public void setBytes_rx(long bytesRx) { + bytes_rx = bytesRx; + } + public long getPackets_rx() { + return packets_rx; + } + public void setPackets_rx(long packetsRx) { + packets_rx = packetsRx; + } + + public long getPackets_syn() { + return packets_syn; + } + + public void setPackets_syn(long packetsSyn) { + packets_syn = packetsSyn; + } + + public long getPackets_syn_ack() { + return packets_syn_ack; + } + + public void setPackets_syn_ack(long packetsSynAck) { + packets_syn_ack = packetsSynAck; + } + + public long getPackets_syn_rst() { + return packets_syn_rst; + } + + public void setPackets_syn_rst(long packetsSynRst) { + packets_syn_rst = packetsSynRst; + } + + public long getApp_group_id() { + return app_group_id; + } + + public void setApp_group_id(long app_group_id) { + this.app_group_id = app_group_id; + } + + public long getMpls_label() { + return mpls_label; + } + + public void setMpls_label(long mpls_label) { + this.mpls_label = mpls_label; + } + + public static String getSplit() { + return split; + } + + @Override + public PacketBase Parse(PacketHeader header,ByteBuffer data) + throws Exception { + return null; + } + + @Override + public int getUnitPacketLength() { + return 0; + } + + @Override + public String[] getData() { + return null; + } + + @Override + public int getInterfaceNumber() { + return 0; + } + + + +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/DataFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/DataFlow.java new file mode 100644 index 0000000..9dffdc2 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/DataFlow.java @@ -0,0 +1,160 @@ +package com.yuandian.dataflow.proto.decode; + + + +public class DataFlow extends PacketBase { + private final int UNIT_DATA_LENGTH = 157;//贵阳 + public String id; + + public Integer msg_type; //消息类型 + public Integer msg_version; //消息版本 + public Integer msg_seq; //序列号 + public Integer msg_len; //消息长度 + public Integer probe_if; //接口号 + public Long timestamp; //时间戳 + public Long mac_src; //源物理地址 + public Long mac_dst; //目的物理地址 + public Integer vlan_id; //vlan_id + public Long l3_proto; //l3层协议 + public Long l4_proto; //l4层协议 + public Integer tos; //tos + public Integer retran_count; //重传次数 + public Integer reset_count; //重置次数 + public Integer zerowin_count; //零窗口次数 + public Integer protocol; //协议名 + public Long seq; + public Long ack; + public Integer recog_status; //识别类型标识 + public Long bytes; //总字节 + public Long packets; //总包数 + public Integer start_tv_sec;//Web开始时间秒 + public Long start_tv_usec;//开始时间毫秒 + public Integer end_tv_sec;//结束时间秒 + public Long end_tv_usec;//结束时间微妙 + public Integer server_start_tv_sec;//服务器响应开始时间秒 + public Long server_start_tv_usec;//服务器响应开始时间毫秒 + public Integer server_end_tv_sec;//服务器响应结束时间秒 + public Long server_end_tv_usec;//服务器响应结束时间微妙 + + public Long server_response_time;//Web服务器响应时间 + public Long client_translate_time;//Web客户端传输耗时 + public Long server_translate_time;//Web服务器传输耗时 + + public Long bytes_in; + public Long bytes_out; + public Long packets_in; + public Long packets_out; + public Long ip_src; //源IP + public Long ip_dst; //目的IP + public Long port_src; //源端口 + public Long port_dst; //目的端口 + public Long probeIP; //探针IP + + public Long intodb_time; + public Long count = 1L; + + + + @Override + public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception { + if (offset + UNIT_DATA_LENGTH > data.length) + throw new Exception("data length error!"); + DataFlow dflow = new DataFlow(); +// dflow.m_Header = header; + + dflow.msg_type = header.getTableID(); +// offset += 4; +// dflow.msg_version = ByteUtil.getInteger(data, offset, 1); +// offset += 1; +// dflow.msg_seq = ByteUtil.getInteger(data, offset, 4); +// offset += 4; +// dflow.msg_len = ByteUtil.getInteger(data, offset, 4); +// offset += 4; + dflow.ip_src = ByteUtil.getLong(data, offset, 4); + offset += 4; + dflow.ip_dst = ByteUtil.getLong(data, offset, 4); + offset += 4; +// dflow.port_src = ByteUtil.getLong(data, offset, 2); + offset += 2; +// dflow.port_dst = ByteUtil.getLong(data, offset, 2); + offset += 2; + dflow.probe_if = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// dflow.timestamp = ByteUtil.getLong(data, offset, 4); + offset += 4; +// dflow.mac_src = ByteUtil.getLong(data, offset, 8); + offset += 8; +// dflow.mac_dst = ByteUtil.getLong(data, offset, 8); + offset += 8; +// dflow.vlan_id = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// dflow.l3_proto = ByteUtil.getLong(data, offset, 4); + offset += 4; +// dflow.l4_proto = ByteUtil.getLong(data, offset, 4); + offset += 4; +// dflow.tos = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// dflow.retran_count = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// dflow.reset_count = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// dflow.zerowin_count = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// dflow.protocol=ByteUtil.getInteger(data, offset, 4); + offset += 4; +// dflow.seq = ByteUtil.getLong(data, offset, 4); + offset += 4; +// dflow.ack = ByteUtil.getLong(data, offset, 4); + offset += 4; +// dflow.recog_status = ByteUtil.getInteger(data, offset, 4); + offset += 4; + dflow.bytes_in = ByteUtil.getLong(data, offset, 4); + offset += 4; + dflow.bytes_out = ByteUtil.getLong(data, offset, 4); + offset += 4; + dflow.packets_in = ByteUtil.getLong(data, offset, 4); + offset += 4; + dflow.packets_out = ByteUtil.getLong(data, offset, 4); + offset += 4; + dflow.bytes = ByteUtil.getLong(data, offset, 4); + offset += 4; + dflow.packets = ByteUtil.getLong(data, offset, 4); + offset += 4; +// dflow.start_tv_sec = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// dflow.start_tv_usec = ByteUtil.getLong(data, offset, 4); + offset += 4; +// dflow.end_tv_sec = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// dflow.end_tv_usec = ByteUtil.getLong(data, offset, 4); + offset += 4; +// dflow.server_start_tv_sec = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// dflow.server_start_tv_usec = ByteUtil.getLong(data, offset, 4); + offset += 4; +// dflow.server_end_tv_sec = ByteUtil.getInteger(data, offset, 4); + offset += 4; +// dflow.server_end_tv_usec = ByteUtil.getLong(data, offset, 4); + offset += 4; +// dflow.server_translate_time = ByteUtil.getLong(data, offset, 8); + offset += 8; +// dflow.client_translate_time = ByteUtil.getLong(data, offset, 8); + offset += 8; +// dflow.server_response_time = ByteUtil.getLong(data, offset, 8); + offset += 8; + return dflow; + } + @Override + public int getUnitPacketLength() { + return UNIT_DATA_LENGTH; + } + @Override + public String[] getData() { + String data[] = new String[45]; + return data; + } + @Override + public int getInterfaceNumber() { + return Integer.parseInt(probe_if.toString()); + } +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/PacketBase.java b/src/main/java/com/yuandian/dataflow/.proto/decode/PacketBase.java new file mode 100644 index 0000000..3eb2d91 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/PacketBase.java @@ -0,0 +1,27 @@ +package com.yuandian.dataflow.proto.decode; +import java.nio.ByteBuffer; + +public abstract class PacketBase { + + protected PacketHeader m_Header = null; + + public PacketHeader getPacketHead(){ + return m_Header; + } + + public void setPacketHeader(PacketHeader header) + { + this.m_Header = header; + } + + //抽象方法,每个数据操作类实现不一样 + public abstract PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception; + + public abstract int getUnitPacketLength(); + + public abstract String[] getData(); + + public abstract int getInterfaceNumber(); + + +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/PacketHeader.java b/src/main/java/com/yuandian/dataflow/.proto/decode/PacketHeader.java new file mode 100644 index 0000000..56f2a4d --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/PacketHeader.java @@ -0,0 +1,55 @@ +package com.yuandian.dataflow.proto.decode; + + + +public class PacketHeader { + + private int m_TableID = -1; //数据类型 + private int m_RecCount = 0; //数据总条数 + private int msg_len; //数据报文总长度,60010端口为压缩后长度 + private long timestamp; //60010端口发送数据时间 + private int probe_if; //60010端口抓包口 + private int umcompr_len;//60010端口数据报文压缩前长度 + private int now_type; //记录25类型数据数据当前数据类型 22 23 24 + + public int getTableID() { + return m_TableID; + } + public int getMsg_len() { + return msg_len; + } + public int getRecCount() { + return m_RecCount; + } + public long getTimestamp() { + return timestamp ; + } + public int getProbe_id(){ + return probe_if; + } + public int getUmcompr_len(){ + return umcompr_len; + } + + public PacketHeader(byte[] data) throws Exception { + if (data.length != 12 && data.length != 20) throw new Exception("Packet header length error!"); + this.m_TableID = ByteUtil.getInteger(data, 0, 4);//22 + this.m_RecCount = ByteUtil.getInteger(data, 4, 4);//4000 + this.msg_len = ByteUtil.getInteger(data, 8, 4); + } + + public void parseNextHeader_60010(byte[] data) throws Exception { + if (data.length != 12 && data.length != 20) throw new Exception("Packet header length error!"); + this.timestamp = ByteUtil.getLong(data, 0, 4); + this.probe_if = ByteUtil.getInteger(data, 4, 4); + this.umcompr_len = ByteUtil.getInteger(data, 8, 4); + } + + public void setNow_type(int now_type) { + this.now_type = now_type; + } + + public int getNow_type(){ + return now_type; + } +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/QoeFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/QoeFlow.java new file mode 100644 index 0000000..d141de5 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/QoeFlow.java @@ -0,0 +1,156 @@ +/** + * + */ +package com.yuandian.dataflow.proto.decode; + +import java.nio.ByteBuffer; + +import io.netty.buffer.ByteBuf; + +/** + * @author l + * 网络性能流 + */ +public class QoeFlow extends PacketBase { + private String qoe_Flow_Id; + public static int SIZE = 38*4; +// 字段类型 字段 原始类型 字节数 说明 +public int srcIp; //uint32_t 4 源ip地址 +public int dstIp; //uint32_t 4 目的ip地址 +public int stvSec; //uint32_t 4 开始时间秒 +public int stvUsec; //uint32_t 4 开始时间微秒 +public int ltvSec; //uint32_t 4 最后更新时间秒 +public int ltvUsec; //uint32_t 4 最后更新时间微秒 +public int dst2ResponNum; //uint32_t 4 响应总量 +public int dst2Fast; //uint32_t 4 +public int dst2FastExpected; //uint32_t 4 +public int dst2ExpectedDegrated; //uint32_t 4 +public int dst2DegratedService; //uint32_t 4 +public int dst2ServiceAvailability; //uint32_t 4 +public int dst2ResponTimeout; //uint32_t 4 响应超时数 +public int dst2ResponSuccess; //uint32_t 4 响应成功数 +public int dst2ResponFail; //uint32_t 4 响应失败数 +public int dst2ResponPeek; //uint32_t 4 峰值响应时间 +public int dst2ResponAverage; //uint32_t 4 响应时间均值 +public int csWindow; //uint32_t 4 +public int scWindow; //uint32_t 4 +public int csReset; //uint32_t 4 +public int scReset; //uint32_t 4 +public int csRetran; //uint32_t 4 +public int scRetran; //uint32_t 4 +public int appId; //uint32_t 4 Aphid +public int appGroupId; //uint32_t 4 app组id +public int probeIf; //uint32_t 4 探针接口id +public int appStyle; //uint32_t 4 +public int timeFlag; //uint32_t 4 发送时间戳 +public int connSetupTm; //uint32_t 4 链接建立时间 +public int dataTransferTm; //uint32_t 4 数据传输时间 +public int retransDelayTm; //uint32_t 4 数据重传时延 +public int networkInbound; //uint32_t 4 网络响应时间(c->s) +public int networkOutbound; //uint32_t 4 网络响应时间(s->c) +public int newSession; //uint32_t 4 新会话数 +public int userEvents; //uint32_t 4 用户事件 +public int serverEvents; //uint32_t 4 服务事件 +public int connSetupPeek; //uint32_t 4 连接建立时间峰值 +public int vlanId; //uint32_t 4 + + + +@Override +public QoeFlow Parse(PacketHeader header,ByteBuffer data) throws Exception { + QoeFlow qoeFlow = new QoeFlow(); + qoeFlow.srcIp=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.dstIp=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.stvSec=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.stvUsec=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.ltvSec=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.ltvUsec=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.dst2ResponNum=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.dst2Fast=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.dst2FastExpected=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.dst2ExpectedDegrated=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.dst2DegratedService=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.dst2ServiceAvailability=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.dst2ResponTimeout=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.dst2ResponSuccess=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.dst2ResponFail=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.dst2ResponPeek=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.dst2ResponAverage=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.csWindow=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.scWindow=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.csReset=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.scReset=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.csRetran=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.scRetran=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.appId=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.appGroupId=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.probeIf=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.appStyle=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.timeFlag=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.connSetupTm=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.dataTransferTm=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.retransDelayTm=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.networkInbound=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.networkOutbound=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.newSession=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.userEvents=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.serverEvents=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.connSetupPeek=ByteUtil.getInteger(data, offset, 4); + offset += 4; + qoeFlow.vlanId=ByteUtil.getInteger(data, offset, 4); + offset += 4; + return qoeFlow; +} +@Override +public int getUnitPacketLength() { + // TODO Auto-generated method stub + return 0; +} +@Override +public String[] getData() { + // TODO Auto-generated method stub + return null; +} +@Override +public int getInterfaceNumber() { + // TODO Auto-generated method stub + return 0; +} + +} diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/SstFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/SstFlow.java new file mode 100644 index 0000000..b87bc14 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/.proto/decode/SstFlow.java @@ -0,0 +1,114 @@ +package com.yuandian.dataflow.proto.decode; + +import java.util.Date; + + + +public class SstFlow extends PacketBase{ + + public static int SIZE = 108; + + public long mac_src; //源MAC + public long mac_dst; //目标MAC + public long ip_src; //源IP + public long ip_dst; //目标IP + public int port_src; //源端口,如果没有,为-1 + public int port_dst; //目标端口,如果没有,为-1 + public int l3_proto; //第三层协议ID,如果没有,为-1 + public int l4_proto; //第四层协议ID,如果没有,为-1 + public int tos; //Tos,一个字节,如果没有,为-1 + public int vlan_id; //vlan ID,如果没有,为-1 + + public long bytes; //字节总数 + public long packets; //数据包总数 + + public long packets_syn; //TCP同步包数 + public long packets_syn_ack; //TCP同步确认包数 + public long packets_syn_rst; //TCP同步重置包数 + + public long timestamp; //时间戳,秒 + + public long appid;//应用ID + public long app_group_id; + public int mpls_label; + + public long pkts_syn_rx; //tcp同步包,接收 + public long pkts_syn_ack_rx;//tcp同步确认包,接收 + public long pkts_syn_rst_rx; //tcp同步重置包,接收 + public long pkts_fin;//tcp终止包,接收 + public long pkts_rst;//tcp重置包,接收 + + public long bytes_rx;//字节收 + public long packets_rx;//数据包收 + + public long probe_time_sec; + public Date probe_time; + public Date createTime; + + public int probe_if; + + @Override + public PacketBase Parse(PacketHeader header,ByteBuffer data) + throws Exception { + SstFlow sstFlow = new SstFlow(); + sstFlow.mac_src = ByteUtil.getLong(data, offset, 8); + offset += 8; + sstFlow.mac_dst = ByteUtil.getLong(data, offset, 8); + offset += 8; + sstFlow.ip_src = ByteUtil.getLong(data, offset, 8); + offset += 8; + sstFlow.ip_dst = ByteUtil.getLong(data, offset, 8); + offset += 8; + sstFlow.port_src = ByteUtil.getInteger(data, offset, 4); + offset += 4; + sstFlow.port_dst = ByteUtil.getInteger(data, offset, 4); + offset += 4; + sstFlow.l3_proto = ByteUtil.getInteger(data, offset, 4); + offset += 4; + sstFlow.l4_proto = ByteUtil.getInteger(data, offset, 4); + offset += 4; + sstFlow.tos = ByteUtil.getInteger(data, offset, 4); + offset += 4; + sstFlow.vlan_id = ByteUtil.getInteger(data, offset, 4); + offset += 4; + + sstFlow.bytes = ByteUtil.getLong(data, offset, 8); + offset += 8; + sstFlow.packets = ByteUtil.getLong(data, offset, 8); + offset += 8; + sstFlow.packets_syn = ByteUtil.getLong(data, offset, 8); + offset += 8; + sstFlow.packets_syn_ack = ByteUtil.getLong(data, offset, 8); + offset += 8; + sstFlow.packets_syn_rst = ByteUtil.getLong(data, offset, 8); + offset += 8; + sstFlow.appid = ByteUtil.getInteger(data, offset, 4); + offset += 4; + sstFlow.app_group_id = ByteUtil.getInteger(data, offset, 4); + offset += 4; + sstFlow.mpls_label = ByteUtil.getInteger(data, offset, 4); + offset += 4; + sstFlow.timestamp = header.getTimestamp(); + sstFlow.probe_if = header.getProbe_id(); + return sstFlow; + } + + @Override + public int getUnitPacketLength() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public String[] getData() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getInterfaceNumber() { + // TODO Auto-generated method stub + return 0; + } + +} diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java index be88c3d..d88f01a 100644 --- a/src/main/java/com/yuandian/dataflow/Server.java +++ b/src/main/java/com/yuandian/dataflow/Server.java @@ -100,10 +100,10 @@ public class Server { done = new RaftClosure(); System.setProperty("server.port", sprPort); - SpringApplication.run(Server.class, args); - + var app = SpringApplication.run(Server.class, args); + app.start(); - // node.shutdown(done); + node.shutdown(done); } } diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java index 3d472f3..e77eda4 100644 --- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java +++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java @@ -6,7 +6,7 @@ import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.entity.Task; import com.yuandian.dataflow.Server; -import com.yuandian.dataflow.entity.Response; +import com.yuandian.dataflow.projo.Response; import com.yuandian.dataflow.statemachine.RaftClosure; import lombok.extern.slf4j.Slf4j; @@ -38,13 +38,8 @@ public class TaskLog { Task task = new Task(); - - - - - log.error(node.toString()); - + log.error(node.toString()); RaftClosure done = new RaftClosure(); task.setData(ByteBuffer.wrap("hello".getBytes())); @@ -55,8 +50,9 @@ public class TaskLog { Response response = new Response(); - response.code = HttpStatus.OK; - response.message = HttpStatus.OK.toString(); + + response.Code = HttpStatus.OK; + response.Message = HttpStatus.OK.toString(); return new ResponseEntity(response, HttpStatus.OK); } diff --git a/src/main/java/com/yuandian/dataflow/master/Header.java b/src/main/java/com/yuandian/dataflow/master/Header.java new file mode 100644 index 0000000..ec0aa64 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/master/Header.java @@ -0,0 +1,72 @@ +package com.yuandian.dataflow.master; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; + +import com.yuandian.dataflow.utils.PacketHeader; + +import lombok.Cleanup; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +/** + * Header + */ +@Slf4j + +public class Header { + + public static void main(String[] args) throws Exception { + ArrayList threads = new ArrayList() ; + + for(int i = 0 ; i < 100 ; i++) { + Thread thread = new Thread(() -> { + + try { + var addr = new InetSocketAddress("192.168.1.248", 60001); + @Cleanup + var sock = new Socket(); + sock.setReceiveBufferSize(10 * 1024 * 1024);// socket接收缓存 + sock.setSoTimeout(1000 * 30); + // 设置超时 + sock.connect(addr, 10 * 1000); + var in = new DataInputStream(sock.getInputStream()); + var out = new DataOutputStream(sock.getOutputStream()); + // 发送验证字符串 + out.write("public".getBytes()); + // var buf = ByteBuffer.wrap( in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN); + log.error("{}", PacketHeader.PacketCode(in)); + var pheader = new PacketHeader(in); + + // buf = ByteBuffer.wrap(in.readNBytes(12)).order(ByteOrder.LITTLE_ENDIAN); + log.error("{}", pheader); + } catch (Exception e) { + e.printStackTrace(); + } + }) ; + threads.add(thread); + thread.start(); + } + + threads.forEach((t)->{ + try { + t.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + + + } +} diff --git a/src/main/java/com/yuandian/dataflow/entity/Doc.java b/src/main/java/com/yuandian/dataflow/projo/Doc.java similarity index 95% rename from src/main/java/com/yuandian/dataflow/entity/Doc.java rename to src/main/java/com/yuandian/dataflow/projo/Doc.java index e6c7c01..888adfc 100644 --- a/src/main/java/com/yuandian/dataflow/entity/Doc.java +++ b/src/main/java/com/yuandian/dataflow/projo/Doc.java @@ -1,4 +1,4 @@ -package com.yuandian.dataflow.entity; +package com.yuandian.dataflow.projo; diff --git a/src/main/java/com/yuandian/dataflow/entity/Response.java b/src/main/java/com/yuandian/dataflow/projo/Response.java similarity index 52% rename from src/main/java/com/yuandian/dataflow/entity/Response.java rename to src/main/java/com/yuandian/dataflow/projo/Response.java index 6c1f6b5..8936a92 100644 --- a/src/main/java/com/yuandian/dataflow/entity/Response.java +++ b/src/main/java/com/yuandian/dataflow/projo/Response.java @@ -1,21 +1,14 @@ -package com.yuandian.dataflow.entity; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import lombok.Getter; -import lombok.Setter; + package com.yuandian.dataflow.projo; import org.springframework.http.HttpStatus; -@Getter -@Setter +import com.fasterxml.jackson.annotation.JsonProperty; + public class Response { @JsonProperty("code") - public HttpStatus code; + public HttpStatus Code; @JsonProperty("message") - public String message ; + public String Message; @JsonProperty("data") - public Object data ; - - + public Object Data; } diff --git a/src/main/java/com/yuandian/dataflow/utils/PacketHeader.java b/src/main/java/com/yuandian/dataflow/utils/PacketHeader.java new file mode 100644 index 0000000..c658772 --- /dev/null +++ b/src/main/java/com/yuandian/dataflow/utils/PacketHeader.java @@ -0,0 +1,55 @@ +/** + * description + * + * @author eson + *2022年6月07日-11:09:21 + */ +package com.yuandian.dataflow.utils; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +/** + * description + * + * @author eson + *2022年6月07日-11:09:21 + */ +@Getter +@Setter +@ToString +public class PacketHeader { + + private int tableID = -1; //数据类型 + private int recCount = 0; //数据总条数 + private int msgLen; //数据报文总长度,60010端口为压缩后长度 + private long timestamp; //60010端口发送数据时间 + private int probeIf; //60010端口抓包口 + private int umcomprLen;//60010端口数据报文压缩前长度 + private int nowType; //记录25类型数据数据当前数据类型 22 23 24 + + public PacketHeader(DataInputStream data) throws Exception { + this.tableID = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();//22 + this.recCount = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();//4000 + this.msgLen = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt(); + } + + public void parseNextHeader_60010(DataInputStream data) throws Exception { + this.timestamp = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getLong(); + this.probeIf =ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt(); + this.umcomprLen = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt(); + } + + public static int PacketCode(DataInputStream data) throws IOException { + var buf = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN); + return buf.getInt(); + } + + +} diff --git a/src/main/proto/scheduler.proto b/src/main/proto/scheduler.proto deleted file mode 100644 index d96bc51..0000000 --- a/src/main/proto/scheduler.proto +++ /dev/null @@ -1,26 +0,0 @@ -syntax = "proto3"; - -package com.yuandian.dataflow.rpc; -// option java_package = "com.yuandian.dataflow.rpc"; - -// option java_outer_classname = "com.yuandian.dataflow.rpc"; -option java_multiple_files = false;//以非外部类模式生成 - -service DataFlow { - rpc Update (State) returns (Response); -} - -message State { - map QueueMap = 1; - -} - -message QueueState { - int32 Size = 1; -} - -message Response { - int32 Code = 1; - string Message = 2; - bytes Data = 3; -} \ No newline at end of file diff --git a/src/test/java/com/yuandian/dataflow/AppTest.java b/src/test/java/com/yuandian/dataflow/AppTest.java index 6a9af2a..1198286 100644 --- a/src/test/java/com/yuandian/dataflow/AppTest.java +++ b/src/test/java/com/yuandian/dataflow/AppTest.java @@ -25,7 +25,7 @@ import org.springframework.expression.spel.ast.FunctionReference; import com.mongodb.MongoClient; import com.mongodb.client.model.InsertManyOptions; -import com.yuandian.dataflow.entity.Doc; +import com.yuandian.dataflow.projo.Doc; import io.netty.handler.codec.dns.DatagramDnsQuery; import lombok.Cleanup;