TODO: 了解探针的接收数据过程
This commit is contained in:
parent
3dadd8c4e8
commit
6e7a35b76d
|
@ -0,0 +1,287 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author User
|
||||
*
|
||||
*/
|
||||
public class ApmBaseDataFlow extends PacketBase {
|
||||
private static Logger logger = LoggerFactory.getLogger(ApmBaseDataFlow.class);
|
||||
public static int SIZE = 156;//88;//
|
||||
/**
|
||||
* 抓包口
|
||||
*/
|
||||
public long probeIf;
|
||||
|
||||
|
||||
//四元组
|
||||
/**
|
||||
* 请求端口
|
||||
*/
|
||||
public int requestPort;
|
||||
/**
|
||||
* 响应端口
|
||||
*/
|
||||
public int responsePort;
|
||||
/**
|
||||
* 请求IP
|
||||
*/
|
||||
public long requestIp;
|
||||
/**
|
||||
* 响应Ip
|
||||
*/
|
||||
public long responseIp;
|
||||
|
||||
/**
|
||||
* 源mac
|
||||
*/
|
||||
public long srcMac;
|
||||
|
||||
/**
|
||||
* 目的mac
|
||||
*/
|
||||
public long dstMac;
|
||||
|
||||
/**
|
||||
* 链路编号
|
||||
*/
|
||||
public long vlanId;
|
||||
|
||||
public long tvSec;
|
||||
public long tvUsec;
|
||||
|
||||
/**
|
||||
* 开始时间
|
||||
*/
|
||||
public long startTm;
|
||||
|
||||
/**
|
||||
* 总字节数
|
||||
*/
|
||||
public long totalBytes;
|
||||
|
||||
/**
|
||||
* 总包数
|
||||
*/
|
||||
public long totalPackets;
|
||||
/**
|
||||
* 总丢包数
|
||||
*/
|
||||
public long totalDropPackets;
|
||||
/**
|
||||
* 重传延时
|
||||
*/
|
||||
public long retranTimeDelay;
|
||||
/**
|
||||
* 客户端rtt
|
||||
*/
|
||||
public long clientRtt;
|
||||
|
||||
/**
|
||||
* 服务端Rtt
|
||||
*/
|
||||
public long serverRtt;
|
||||
|
||||
|
||||
/**
|
||||
* 用户响应时间
|
||||
*/
|
||||
public long userResponseTime;
|
||||
/**
|
||||
* 服务响应时间
|
||||
*/
|
||||
public long serverResponseTime;
|
||||
/**
|
||||
* tcp回话连接失败数
|
||||
*/
|
||||
public long conFail;
|
||||
|
||||
/**
|
||||
* 会话重置数
|
||||
*/
|
||||
// public long reset;
|
||||
|
||||
/**
|
||||
* 服务端总字节数
|
||||
*/
|
||||
public long bytesIn;
|
||||
/**
|
||||
* 客户端总字节数
|
||||
*/
|
||||
public long bytesOut;
|
||||
/**
|
||||
* 探针推送时间
|
||||
*/
|
||||
public long timeFlag;
|
||||
|
||||
/**
|
||||
* 结束时间
|
||||
*/
|
||||
public long endTm;
|
||||
|
||||
/**
|
||||
* 结束时间微秒
|
||||
*/
|
||||
public long endTmUsec;
|
||||
|
||||
/**
|
||||
* 总响应数
|
||||
*/
|
||||
public long responNum;
|
||||
/**
|
||||
* 客户端零窗口数
|
||||
*/
|
||||
public long csWindow;
|
||||
/**
|
||||
* 服务端零窗口数
|
||||
*/
|
||||
public long scWindow;
|
||||
/**
|
||||
* 客户端重置数
|
||||
*/
|
||||
public long csReset;
|
||||
/**
|
||||
* 服务端重置数
|
||||
*/
|
||||
public long scReset;
|
||||
/**
|
||||
* 客户端重传数
|
||||
*/
|
||||
public long csRetran;
|
||||
/**
|
||||
* 服务端重传数
|
||||
*/
|
||||
public long scRetran;
|
||||
/**
|
||||
* 会话建立时间
|
||||
*/
|
||||
public long connSetupTm;
|
||||
/**
|
||||
* 新建会话数
|
||||
*/
|
||||
public long newSession;
|
||||
|
||||
public long csAlert;
|
||||
|
||||
public long scAlert;
|
||||
|
||||
public String protocal;
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception {
|
||||
ApmBaseDataFlow oneData = new ApmBaseDataFlow();
|
||||
oneData.setPacketHeader(header);
|
||||
|
||||
oneData.probeIf = data.getLong(4);
|
||||
|
||||
|
||||
oneData.requestPort = data.getInt( 2);
|
||||
|
||||
oneData.responsePort = data.getInt( 2);
|
||||
|
||||
oneData.requestIp = data.getLong(4);
|
||||
|
||||
oneData.responseIp = data.getLong(4);
|
||||
|
||||
|
||||
oneData.srcMac = data.getLong(6);
|
||||
|
||||
oneData.dstMac = data.getLong(6);
|
||||
|
||||
oneData.vlanId = data.getLong(4);
|
||||
|
||||
|
||||
oneData.tvSec = data.getLong(4);
|
||||
|
||||
oneData.tvUsec = data.getLong(4);
|
||||
|
||||
|
||||
// oneData.startTm = data.getLong(8);
|
||||
|
||||
|
||||
oneData.totalBytes = data.getLong(4);
|
||||
|
||||
oneData.totalPackets = data.getLong(4);
|
||||
|
||||
oneData.totalDropPackets = data.getLong(4);
|
||||
|
||||
oneData.retranTimeDelay = data.getLong(4);
|
||||
|
||||
oneData.clientRtt = data.getLong(4);
|
||||
|
||||
//服务端Rtt
|
||||
oneData.serverRtt = data.getLong(4);
|
||||
|
||||
oneData.userResponseTime = data.getLong(4);
|
||||
|
||||
oneData.serverResponseTime = data.getLong(4);
|
||||
|
||||
oneData.conFail = data.getLong(4);
|
||||
|
||||
|
||||
// oneData.reset = data.getLong(4);
|
||||
|
||||
oneData.bytesIn = data.getLong(4);
|
||||
|
||||
oneData.bytesOut = data.getLong(4);
|
||||
|
||||
oneData.timeFlag = data.getLong(4);
|
||||
|
||||
|
||||
oneData.endTm = data.getLong(4);
|
||||
|
||||
oneData.endTmUsec = data.getLong(4);
|
||||
|
||||
|
||||
oneData.responNum = data.getLong(4);
|
||||
|
||||
oneData.csWindow = data.getLong(4);
|
||||
|
||||
oneData.scWindow = data.getLong(4);
|
||||
|
||||
oneData.csReset = data.getLong(4);
|
||||
|
||||
oneData.scReset = data.getLong(4);
|
||||
|
||||
oneData.csRetran = data.getLong(4);
|
||||
|
||||
oneData.scRetran = data.getLong(4);
|
||||
|
||||
oneData.connSetupTm = data.getLong(4);
|
||||
|
||||
oneData.newSession = data.getLong(4);
|
||||
|
||||
oneData.csAlert = data.getLong(4);
|
||||
|
||||
oneData.scAlert = data.getLong(4);
|
||||
|
||||
|
||||
oneData.protocal = new String(data.array());
|
||||
|
||||
return oneData;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getData() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInterfaceNumber() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUnitPacketLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
101
src/main/java/com/yuandian/dataflow/.proto/decode/AppFlow.java
Normal file
101
src/main/java/com/yuandian/dataflow/.proto/decode/AppFlow.java
Normal file
|
@ -0,0 +1,101 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
|
||||
public class AppFlow extends PacketBase {
|
||||
public static int SIZE = 104;
|
||||
public long srcIp;
|
||||
public int srcPort;
|
||||
public long dstIp;
|
||||
public int dstPort;
|
||||
public long startTvSec;
|
||||
public long startTvUsec;
|
||||
public long lastTvSec;
|
||||
public long lastTvUsec;
|
||||
public long endTvSec;
|
||||
public long endTvUsec;
|
||||
public int inputPackets;
|
||||
public int outputPackets;
|
||||
public int inputBytes;
|
||||
public int outputBytes;
|
||||
public String protocaol;
|
||||
public int appId;
|
||||
public int appGroupId;
|
||||
public int probeIf;
|
||||
public int appStyle;
|
||||
public long timeFlag;
|
||||
public int vlanId;
|
||||
public int mplsLable;
|
||||
public int tos;
|
||||
|
||||
|
||||
@Override
|
||||
public PacketBase Parse(PacketHeader header,ByteBuffer data)
|
||||
throws Exception {
|
||||
AppFlow nlf = new AppFlow();
|
||||
nlf.srcIp = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.srcPort = ByteUtil.getInteger(data, offset, 2);
|
||||
offset += 2;
|
||||
nlf.dstIp = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.dstPort = ByteUtil.getInteger(data, offset, 2);
|
||||
offset += 2;
|
||||
nlf.startTvSec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.startTvUsec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.lastTvSec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.lastTvUsec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.endTvSec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.endTvUsec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.inputPackets = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.outputPackets = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.inputBytes = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.outputBytes = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.protocaol = ByteUtil.getString(data, offset, 20);
|
||||
offset += 20;
|
||||
nlf.appId = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.appGroupId = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.probeIf = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.appStyle = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.timeFlag = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.vlanId = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.mplsLable = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
nlf.tos = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
|
||||
return nlf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUnitPacketLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getData() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInterfaceNumber() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* @author User
|
||||
*
|
||||
*/
|
||||
public class BacktrackingFlow extends PacketBase{
|
||||
public static int SIZE = 108;
|
||||
//tuple StatisTuple10 56 10元组信息
|
||||
private long macSrc; //源MAC 8
|
||||
private long macDst; //目的MAC 8
|
||||
private long ipSrc; //源IP 8
|
||||
private long ipDst; //目的IP 8
|
||||
private long portSrc; //源端口,如果没有,为-1
|
||||
private long portDst; //目标端口,如果没有,为-1
|
||||
private int l3Proto; //第三层协议ID,如果没有,为-1
|
||||
private int l4Proto; //第四层协议ID,如果没有,为-1
|
||||
private int tos; //Tos,如果没有,为-1
|
||||
private int vlanId; //vlan ID,如果没有,为-1
|
||||
|
||||
private long bytes; // 8 字节总数
|
||||
private long packets; // 8 数据包总数
|
||||
private long tcpSp; // 8 tcp同步包数
|
||||
private long tcpScpn;// 8 tcp同步确认包数
|
||||
private long tcpSrp; // 8 tcp同步重置包数
|
||||
private long appId; // 4 appID
|
||||
private long appGroupId;// 4 app组ID
|
||||
private long mplsLabel;// 4
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception {
|
||||
BacktrackingFlow backFlow = new BacktrackingFlow();
|
||||
backFlow.macSrc = ByteUtil.getLong(data, offset, 8); //源MAC 8
|
||||
offset+=8;
|
||||
backFlow.macDst = ByteUtil.getLong(data, offset, 8); //目的MAC 8
|
||||
offset+=8;
|
||||
backFlow.ipSrc = ByteUtil.getLong(data, offset, 8); //源IP 8
|
||||
offset+=8;
|
||||
backFlow.ipDst = ByteUtil.getLong(data, offset, 8); //目的IP 8
|
||||
offset+=8;
|
||||
backFlow.portSrc = ByteUtil.getInteger(data, offset, 4); //源端口,如果没有,为-1
|
||||
offset+=4;
|
||||
backFlow.portDst = ByteUtil.getInteger(data, offset, 4); //目标端口,如果没有,为-1
|
||||
offset+=4;
|
||||
backFlow.l3Proto = ByteUtil.getInteger(data, offset, 4); //第三层协议ID,如果没有,为-1
|
||||
offset+=4;
|
||||
backFlow.l4Proto = ByteUtil.getInteger(data, offset, 4); //第四层协议ID,如果没有,为-1
|
||||
offset+=4;
|
||||
backFlow.tos = ByteUtil.getInteger(data, offset, 4); //Tos,如果没有,为-1
|
||||
offset+=4;
|
||||
backFlow.vlanId = ByteUtil.getInteger(data, offset, 4); //vlan ID,如果没有,为-1
|
||||
offset+=4;
|
||||
|
||||
backFlow.bytes = ByteUtil.getLong(data, offset, 8); // 8 字节总数
|
||||
offset+=8;
|
||||
backFlow.packets = ByteUtil.getLong(data, offset, 8); // 8 数据包总数
|
||||
offset+=8;
|
||||
backFlow.tcpSp = ByteUtil.getLong(data, offset, 8); // 8 tcp同步包数
|
||||
offset+=8;
|
||||
backFlow.tcpScpn = ByteUtil.getLong(data, offset, 8);// 8 tcp同步确认包数
|
||||
offset+=8;
|
||||
backFlow.tcpSrp = ByteUtil.getLong(data, offset, 8); // 8 tcp同步重置包数
|
||||
offset+=8;
|
||||
backFlow.appId = ByteUtil.getInteger(data, offset, 4); // 4 appID
|
||||
offset+=4;
|
||||
backFlow.appGroupId = ByteUtil.getInteger(data, offset, 4);// 4 app组ID
|
||||
offset+=4;
|
||||
backFlow.mplsLabel = ByteUtil.getInteger(data, offset, 4);// 4
|
||||
offset+=4;
|
||||
|
||||
return backFlow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUnitPacketLength() {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getData() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInterfaceNumber() {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,82 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
|
||||
|
||||
public class BasicTrafficFlow extends PacketBase {
|
||||
public static int SIZE = 56;
|
||||
public long capPort;
|
||||
public int requestPort;
|
||||
public int responsePort;
|
||||
public long requestIp;
|
||||
public long responseIp;
|
||||
public long startTime;
|
||||
public long totalBytes;
|
||||
public long totalPackets;
|
||||
public long spackets64;
|
||||
public long spackets128;
|
||||
public long spackets256;
|
||||
public long spackets512;
|
||||
public long spackets1024;
|
||||
public long spackets;
|
||||
public long sendTime;
|
||||
|
||||
@Override
|
||||
public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception {
|
||||
BasicTrafficFlow btf = new BasicTrafficFlow();
|
||||
btf.setPacketHeader(header);
|
||||
|
||||
btf.capPort = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
|
||||
// btf.requestPort = ByteUtil.getInteger(data, offset, 2);
|
||||
offset += 2;
|
||||
// btf.responsePort = ByteUtil.getInteger(data, offset, 2);
|
||||
offset += 2;
|
||||
// btf.requestIp = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
// btf.responseIp = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
|
||||
btf.startTime = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
btf.totalBytes = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
btf.totalPackets = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
btf.spackets64 = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
btf.spackets128 = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
btf.spackets256 = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
btf.spackets512 = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
btf.spackets1024 = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
btf.spackets = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
btf.sendTime = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
|
||||
return btf;
|
||||
}
|
||||
@Override
|
||||
public String[] getData() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
public int getInterfaceNumber() {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
@Override
|
||||
public int getUnitPacketLength() {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
|
||||
public class BusinessBodyData {
|
||||
|
||||
public String relvanceDataId;
|
||||
public long request_ip;
|
||||
public int request_port;
|
||||
public long response_ip;
|
||||
public int response_port;
|
||||
|
||||
public long start_tv_sec;//开始时间秒
|
||||
public long start_tv_usec;//开始时间毫秒
|
||||
public long end_tv_sec;//结束时间秒
|
||||
public long end_tv_usec;//结束时间微妙
|
||||
|
||||
public String resource_code;
|
||||
public String noParameterRecognition;
|
||||
public String originalRecognition;
|
||||
public String requestCookie;
|
||||
|
||||
public String requestBodyContext;
|
||||
public String responseBodyContext;
|
||||
|
||||
public int filterId;
|
||||
public String business_detail_mesg;
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,244 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
import com.ud.module.common.SystemConfigManager;
|
||||
|
||||
|
||||
|
||||
public class BussFlowDb extends PacketBase {
|
||||
private int UNIT_DATA_LENGTH = 3362;
|
||||
|
||||
// redis资源归并,处理服务资源发现是需要设置识别串(正则表达式)
|
||||
public String redisRegex;
|
||||
|
||||
public String id; //id
|
||||
public int msg_len; //消息长度
|
||||
public int msg_type; //消息类型
|
||||
public long src_mac;
|
||||
public long dst_mac;
|
||||
public int protocol; //协议名
|
||||
public String session_serial_number;//会话序列号
|
||||
public String buss_type;//业务服务资源编码(C_01)
|
||||
public long request_ip;//Web客户端IP
|
||||
public int request_port;//Web客户端端口
|
||||
public long response_ip;//Web服务器IP
|
||||
public int response_port;//Web服务器端口
|
||||
public long start_tv_sec;//Web开始时间秒
|
||||
public long start_tv_usec;//开始时间毫秒
|
||||
public long end_tv_sec;//结束时间秒
|
||||
public long end_tv_usec;//结束时间微妙
|
||||
public String disc_resource_ident; //web:url mid:apiBody db:sql
|
||||
public String name; //web:操作系统 midd:interfaceName db:db_name
|
||||
//web midd
|
||||
public String session_id;//sessionid
|
||||
public int request_msg_length; //请求报文长度
|
||||
public String request_msg_detail; //请求报文详情
|
||||
public int response_msg_length; //响应报文长度
|
||||
public String response_msg_detail; //响应报文详情
|
||||
//web段
|
||||
public String reter_url;
|
||||
public String x_requested_with;
|
||||
public long req_method; //请求方式
|
||||
public String content_type; //请求类型
|
||||
public String accept; //jieshou
|
||||
public int req_cookie_leng;//请求cookie报文长度
|
||||
public String req_cookie_detail;//请求cookie报文详情
|
||||
public long t_intodb_time;
|
||||
public int load_or_step; //0: 页面 1:加载项 2:非web段数据
|
||||
public String business_detail_mesg;
|
||||
public String bussiness_key_mesg; //关键字 格式:key=val|key=val....
|
||||
public int isUncomplete; //组包是否完全 0:组包完整 1:不完整
|
||||
public int deal_state=1;
|
||||
public int server_res_code;
|
||||
public long server_response_time;
|
||||
public long client_translate_time;
|
||||
public String browser;
|
||||
public int server_start_tv_sec;
|
||||
public long server_start_tv_usec;
|
||||
public int server_end_tv_sec;
|
||||
public long server_end_tv_usec;
|
||||
public String probe_ip;
|
||||
public long probe_if;
|
||||
public long server_translate_time;
|
||||
public long time_flag;
|
||||
public String base_code;
|
||||
public String ori_sql;
|
||||
public String reserved;
|
||||
public long bytes_in;
|
||||
public long bytes_out;
|
||||
public int package_in;
|
||||
public int package_out;
|
||||
public String dataId;
|
||||
public int filterId;
|
||||
@Override
|
||||
public int getUnitPacketLength() {
|
||||
// TODO Auto-generated method stub
|
||||
return UNIT_DATA_LENGTH;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public BussFlowDb Parse(PacketHeader header,ByteBuffer data)
|
||||
throws Exception {
|
||||
BussFlowDb buss = new BussFlowDb();
|
||||
// buss.m_Header = header;
|
||||
|
||||
buss.msg_type = header.getTableID();
|
||||
// offset += 4;
|
||||
// buss.msg_version = ByteUtil.getInteger(data, offset, 1);
|
||||
// offset += 1;
|
||||
// buss.msg_seq = ByteUtil.getInteger(data, offset, 4);
|
||||
// offset += 4;
|
||||
// buss.msg_len = header.getMsg_len();
|
||||
// offset += 4;
|
||||
|
||||
// UNIT_DATA_LENGTH=msg_len;
|
||||
buss.src_mac = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
buss.dst_mac = ByteUtil.getLong(data, offset, 8);
|
||||
if(SystemConfigManager.macFilterFlag && buss.src_mac!=0 && buss.dst_mac==0){
|
||||
return null;
|
||||
}
|
||||
offset += 8;
|
||||
// buss.vlan_id = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// buss.tos = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// buss.retran_count = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// buss.reset_count = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// buss.zerowin_count = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.protocol=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.bytes_out = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.bytes_in = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.package_out = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.package_in = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.session_serial_number = ByteUtil.getString(data, offset, 24);
|
||||
offset += 24;
|
||||
// buss.recog_status = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.probe_if=ByteUtil.getInteger(data, offset,4);
|
||||
offset += 4;
|
||||
// buss.channel=ByteUtil.getString(data, offset, 24);
|
||||
offset+=24;
|
||||
buss.name = ByteUtil.getString(data, offset, 64);
|
||||
offset+=64;
|
||||
|
||||
buss.request_ip=ByteUtil.getLong(data, offset,4);
|
||||
offset += 4;
|
||||
buss.request_port=ByteUtil.getInteger(data, offset,4);
|
||||
offset += 4;
|
||||
buss.response_ip=ByteUtil.getLong(data, offset,4);
|
||||
offset += 4;
|
||||
buss.response_port=ByteUtil.getInteger(data, offset,4);
|
||||
offset += 4;
|
||||
// buss.deal_state=ByteUtil.getInteger(data, offset,8);
|
||||
offset += 8;
|
||||
buss.server_translate_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
buss.client_translate_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
buss.server_response_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
buss.server_start_tv_sec=ByteUtil.getInteger(data, offset,4);
|
||||
offset +=4;
|
||||
buss.server_start_tv_usec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
//结束时间
|
||||
buss.server_end_tv_sec=ByteUtil.getInteger(data, offset,4);
|
||||
offset +=4;
|
||||
buss.server_end_tv_usec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
buss.start_tv_sec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
buss.start_tv_usec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
//结束时间
|
||||
buss.end_tv_sec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
buss.end_tv_usec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
buss.isUncomplete = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.time_flag=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
|
||||
int sqlLen = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int detailMsgLen = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.request_msg_length = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.response_msg_length = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int reservedLen = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
|
||||
buss.ori_sql=ByteUtil.getString(data, offset,sqlLen);
|
||||
offset += sqlLen;
|
||||
buss.business_detail_mesg=ByteUtil.getString(data, offset, detailMsgLen);
|
||||
offset += detailMsgLen;
|
||||
buss.request_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.request_msg_length);
|
||||
offset += buss.request_msg_length;
|
||||
buss.response_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.response_msg_length);
|
||||
offset += buss.response_msg_length;
|
||||
buss.reserved = ByteUtil.getString(data, offset, reservedLen);
|
||||
offset += reservedLen;
|
||||
|
||||
/*if(SystemConfigManager.projectName.contains("GDYDBPM")){
|
||||
|
||||
boolean exist=false;
|
||||
if(SystemConfigManager.reduceFlag){
|
||||
|
||||
redisTemplate = (RedisTemplate<String, String>) ApplicationContextUtil.getBean("redisTemplate");
|
||||
exist = reduceByKey(buss);
|
||||
}
|
||||
|
||||
//不存在就加入缓存
|
||||
if(!exist)return buss;
|
||||
//存在就返回空
|
||||
return null;
|
||||
}*/
|
||||
|
||||
return buss;
|
||||
}
|
||||
|
||||
|
||||
public String getBussType() {
|
||||
return buss_type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getData() {
|
||||
String data[] = new String[45];
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInterfaceNumber() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* 如果已经存在这个session_id(即key),则说明是重复数据,不再进行后续操作
|
||||
* @param buss
|
||||
* @return 存在返回true 不存在返回false
|
||||
*/
|
||||
/*private boolean reduceByKey(BussFlowDb buss) {
|
||||
String coding = CodingGeneration.md5GenCoding2(buss.ori_sql);
|
||||
String key=buss.session_serial_number+"|"+buss.request_ip+"|"+buss.request_port+"|"+buss.response_ip+"|"+buss.response_port+"|"+coding+"|"+buss.business_detail_mesg;
|
||||
|
||||
//分布式同步锁,如果存在该key或者锁已经被获取则返回false,设置
|
||||
if(redisTemplate.opsForValue().setIfAbsent(key, System.currentTimeMillis()+"")){
|
||||
redisTemplate.expire(key,SystemConfigManager.keyExistTime,TimeUnit.SECONDS);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}*/
|
||||
}
|
|
@ -0,0 +1,132 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
|
||||
|
||||
|
||||
public class BussFlowExternal extends PacketBase {
|
||||
private final int UNIT_DATA_LENGTH =871;
|
||||
|
||||
@Override
|
||||
public int getUnitPacketLength() {
|
||||
return UNIT_DATA_LENGTH;
|
||||
}
|
||||
|
||||
public BussFlowExternal Parse(PacketHeader header,ByteBuffer data)
|
||||
throws Exception {
|
||||
if (offset + UNIT_DATA_LENGTH > data.length)
|
||||
throw new Exception("data length error!");
|
||||
BussFlowExternal buss = new BussFlowExternal();
|
||||
// buss.m_Header = header;
|
||||
|
||||
buss.probe_if=ByteUtil.getLong(data, offset,4);
|
||||
offset += 4;
|
||||
buss.channel=ByteUtil.getString(data, offset,24);
|
||||
offset += 24;
|
||||
buss.interfaceid=ByteUtil.getString(data, offset,100);
|
||||
offset += 100;
|
||||
buss.systemName=ByteUtil.getString(data, offset,32);
|
||||
offset += 32;
|
||||
buss.net_type=ByteUtil.getString(data, offset,6);
|
||||
offset += 6;
|
||||
buss.net_segment = ByteUtil.getString(data, offset, 5);
|
||||
offset += 5;
|
||||
buss.session_id=ByteUtil.getString(data, offset,80);
|
||||
offset += 80;
|
||||
buss.phoneid=ByteUtil.getString(data, offset,12);
|
||||
offset += 12;
|
||||
|
||||
buss.request_ip=ByteUtil.getLong(data, offset,4);
|
||||
offset += 4;
|
||||
buss.request_port=ByteUtil.getLong(data, offset,4);
|
||||
offset += 4;
|
||||
buss.response_ip=ByteUtil.getLong(data, offset,4);
|
||||
offset += 4;
|
||||
buss.response_port=ByteUtil.getLong(data, offset,4);
|
||||
offset += 4;
|
||||
//开始时间
|
||||
buss.start_tv_sec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
buss.start_tv_usec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
//结束时间
|
||||
buss.end_tv_sec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
buss.end_tv_usec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
|
||||
buss.deal_state=ByteUtil.getInteger(data, offset,4);
|
||||
offset += 4;
|
||||
buss.server_translate_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
buss.server_response_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
buss.begin_url=ByteUtil.getString(data, offset,100);
|
||||
offset += 100;
|
||||
buss.operating_sytem=ByteUtil.getString(data, offset,20);
|
||||
offset += 20;
|
||||
buss.server_res_code=ByteUtil.getInteger(data, offset,2);
|
||||
offset += 2;
|
||||
buss.browser=ByteUtil.getString(data, offset,30);
|
||||
offset += 30;
|
||||
buss.business_detail_mesg=ByteUtil.getString(data, offset,200);
|
||||
offset += 200;
|
||||
buss.business_involve_msg = ByteUtil.getString(data, offset,200);
|
||||
offset += 200;
|
||||
isUncomplete = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.time_flag=ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
return buss;
|
||||
}
|
||||
|
||||
public String id;
|
||||
public Long probe_if;//接口号
|
||||
public String channel;//营业厅渠道:前台或者分析服务器给出,渠道标识
|
||||
public String systemName; //外部系统名称
|
||||
public String interfaceid;// 业务接口编码
|
||||
public String session_id;//sessionid
|
||||
public String phoneid;//受理手机号码
|
||||
public String net_type;
|
||||
public String net_segment;//网段标识(客户-web)
|
||||
public Long request_ip;//Web客户端IP
|
||||
public Long request_port;//Web客户端端口
|
||||
public Long response_ip;//Web服务器IP
|
||||
public Long response_port;//Web服务器端口
|
||||
|
||||
public Long start_tv_sec;//Web开始时间秒
|
||||
public Long start_tv_usec;//开始时间微秒
|
||||
public Long end_tv_sec;//结束时间秒
|
||||
public Long end_tv_usec;//结束时间微妙
|
||||
|
||||
public Integer deal_state;//Web操作成功/失败标识1成功0失败
|
||||
public Long server_translate_time;//Web服务器传输耗时
|
||||
public Long server_response_time;//Web服务器响应时间
|
||||
public String begin_url;//url
|
||||
public String operating_sytem;//操作系统
|
||||
public Integer server_res_code; //Web系统返回码
|
||||
public String browser;//浏览器
|
||||
public String business_detail_mesg;//要获取的指标
|
||||
public Date insert_time;//插入时间
|
||||
public String business_involve_msg; //要关联的指标
|
||||
public Integer isUncomplete;
|
||||
public Long time_flag;
|
||||
|
||||
|
||||
@Override
|
||||
public String[] getData() {
|
||||
String data[] = new String[37];
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInterfaceNumber() {
|
||||
|
||||
return Integer.parseInt(probe_if.toString());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,267 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
import com.ud.module.common.ApplicationContextUtil;
|
||||
import com.ud.module.common.SystemConfigManager;
|
||||
|
||||
import com.ud.module.util.code.CodingGeneration;
|
||||
|
||||
public class BussFlowMidd extends PacketBase {
|
||||
private int UNIT_DATA_LENGTH =3569;
|
||||
|
||||
public String id; //id
|
||||
public long src_mac;
|
||||
public long dst_mac;
|
||||
public int msg_len; //消息长度
|
||||
public int msg_type; //消息类型
|
||||
public int protocol; //协议名
|
||||
public String session_serial_number;//会话序列号
|
||||
public String buss_type;//业务服务资源编码(C_01)
|
||||
public String net_segment;//网段标识(客户-web)
|
||||
public long request_ip;//Web客户端IP
|
||||
public int request_port;//Web客户端端口
|
||||
public long response_ip;//Web服务器IP
|
||||
public int response_port;//Web服务器端口
|
||||
public long start_tv_sec;//Web开始时间秒
|
||||
public long start_tv_usec;//开始时间毫秒
|
||||
public long end_tv_sec;//结束时间秒
|
||||
public long end_tv_usec;//结束时间微妙
|
||||
public String disc_resource_ident; //web:url mid:apiBody db:sql
|
||||
public String name; //web:操作系统 midd:interfaceName db:db_name
|
||||
//web midd
|
||||
public String session_id;//sessionid
|
||||
public int request_msg_length; //请求报文长度
|
||||
public String request_msg_detail; //请求报文详情
|
||||
public int response_msg_length; //响应报文长度
|
||||
public String response_msg_detail; //响应报文详情
|
||||
//web段
|
||||
public String reter_url;
|
||||
public String x_requested_with;
|
||||
public long req_method; //请求方式
|
||||
public String content_type; //请求类型
|
||||
public String accept; //jieshou
|
||||
public int req_cookie_leng;//请求cookie报文长度
|
||||
public String req_cookie_detail;//请求cookie报文详情
|
||||
public long t_intodb_time;
|
||||
public int load_or_step; //0: 页面 1:加载项 2:非web段数据
|
||||
public String business_detail_mesg;
|
||||
public String bussiness_key_mesg; //关键字 格式:key=val|key=val....
|
||||
public int isUncomplete; //组包是否完全 0:组包完整 1:不完整
|
||||
public int deal_state=1;
|
||||
public int server_res_code;
|
||||
public long server_response_time;
|
||||
public long client_translate_time;
|
||||
public String browser;
|
||||
public int server_start_tv_sec;
|
||||
public long server_start_tv_usec;
|
||||
public int server_end_tv_sec;
|
||||
public long server_end_tv_usec;
|
||||
public String probe_ip;
|
||||
public int probe_if;
|
||||
public long server_translate_time;
|
||||
public long time_flag;
|
||||
public String channel;
|
||||
public String base_code;
|
||||
public String ori_api;
|
||||
public String remain_data;
|
||||
public long bytes_in;
|
||||
public long bytes_out;
|
||||
public int package_in;
|
||||
public int package_out;
|
||||
public String dataId;
|
||||
public int filterId;
|
||||
|
||||
@Override
|
||||
public int getUnitPacketLength() {
|
||||
// TODO Auto-generated method stub
|
||||
return UNIT_DATA_LENGTH;
|
||||
}
|
||||
|
||||
public BussFlowMidd Parse(PacketHeader header,ByteBuffer data)
|
||||
throws Exception {
|
||||
BussFlowMidd buss = new BussFlowMidd();
|
||||
// buss.m_Header = header;
|
||||
|
||||
buss.msg_type = header.getTableID();
|
||||
// offset += 4;
|
||||
// buss.msg_version = ByteUtil.getInteger(data, offset, 1);
|
||||
// offset += 1;
|
||||
// buss.msg_seq = ByteUtil.getInteger(data, offset, 4);
|
||||
// offset += 4;
|
||||
// buss.msg_len = ByteUtil.getInteger(data, offset, 4);
|
||||
// offset += 4;
|
||||
|
||||
// UNIT_DATA_LENGTH=msg_len;
|
||||
buss.src_mac = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
buss.dst_mac = ByteUtil.getLong(data, offset, 8);
|
||||
if(SystemConfigManager.macFilterFlag && buss.src_mac!=0 && buss.dst_mac==0){
|
||||
return null;
|
||||
}
|
||||
offset += 8;
|
||||
// buss.vlan_id = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// buss.tos = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// buss.retran_count = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// buss.reset_count = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// buss.zerowin_count = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.protocol=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.bytes_out = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.bytes_in = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.package_out = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.package_in = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
|
||||
buss.session_serial_number = ByteUtil.getString(data, offset, 24);
|
||||
offset += 24;
|
||||
// buss.recog_status = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// buss.req_method = ByteUtil.getLong(data, offset, 4);
|
||||
// offset += 4;
|
||||
// buss.content_type = ByteUtil.getString(data, offset, 40);
|
||||
// offset += 40;
|
||||
// buss.accept = ByteUtil.getString(data, offset, 40);
|
||||
// offset += 40;
|
||||
buss.probe_if=ByteUtil.getInteger(data, offset,4);
|
||||
offset += 4;
|
||||
// buss.buss_type =ByteUtil.getString(data, offset,10);
|
||||
// offset += 10;
|
||||
buss.channel=ByteUtil.getString(data, offset,24);
|
||||
offset += 24;
|
||||
// buss.session_id=ByteUtil.getString(data, offset,36);
|
||||
// offset += 36;
|
||||
// buss.net_segment=ByteUtil.getString(data, offset,6);
|
||||
// offset += 6;
|
||||
buss.request_ip=ByteUtil.getLong(data, offset,4);
|
||||
offset += 4;
|
||||
buss.request_port=ByteUtil.getInteger(data, offset,4);
|
||||
offset += 4;
|
||||
buss.response_ip=ByteUtil.getLong(data, offset,4);
|
||||
offset += 4;
|
||||
buss.response_port=ByteUtil.getInteger(data, offset,4);
|
||||
offset += 4;
|
||||
//开始时间
|
||||
buss.start_tv_sec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
buss.start_tv_usec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
//结束时间
|
||||
buss.end_tv_sec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
buss.end_tv_usec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
// buss.deal_state=ByteUtil.getInteger(data, offset,4);
|
||||
offset +=4;
|
||||
buss.server_res_code = ByteUtil.getInteger(data, offset, 2);
|
||||
offset +=2;
|
||||
buss.server_translate_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
|
||||
buss.server_start_tv_sec=ByteUtil.getInteger(data, offset,4);
|
||||
offset +=4;
|
||||
buss.server_start_tv_usec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
//结束时间
|
||||
buss.server_end_tv_sec=ByteUtil.getInteger(data, offset,4);
|
||||
offset +=4;
|
||||
buss.server_end_tv_usec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
|
||||
buss.server_response_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
buss.client_translate_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
buss.isUncomplete = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.time_flag=ByteUtil.getInteger(data, offset, 4);
|
||||
offset +=4;
|
||||
int detailMsgLen = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int keyMsgLen = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int apiLen = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.request_msg_length = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.response_msg_length = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int remainLen = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
|
||||
buss.business_detail_mesg=ByteUtil.getString(data, offset,detailMsgLen);
|
||||
offset += detailMsgLen;
|
||||
buss.bussiness_key_mesg = ByteUtil.getString(data, offset, keyMsgLen);
|
||||
offset += keyMsgLen;
|
||||
buss.ori_api=ByteUtil.getString(data, offset, apiLen);
|
||||
offset += apiLen;
|
||||
buss.request_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.request_msg_length);
|
||||
offset += buss.request_msg_length;
|
||||
buss.response_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.response_msg_length);
|
||||
offset += buss.response_msg_length;
|
||||
// buss.remain_data = ByteUtil.getString(data, offset, remainLen);
|
||||
offset += remainLen;
|
||||
|
||||
|
||||
|
||||
/*if(SystemConfigManager.projectName.contains("GDYDBPM")){
|
||||
boolean exist=false;
|
||||
if(SystemConfigManager.reduceFlag){
|
||||
|
||||
redisTemplate = (RedisTemplate<String, String>) ApplicationContextUtil.getBean("redisTemplate");
|
||||
exist = reduceByKey(buss);
|
||||
}
|
||||
|
||||
//不存在就加入缓存
|
||||
if(!exist)return buss;
|
||||
//存在就返回空
|
||||
return null;
|
||||
}*/
|
||||
|
||||
return buss;
|
||||
}
|
||||
|
||||
public String getBussType() {
|
||||
return buss_type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getData() {
|
||||
String data[] = new String[35];//贵阳
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInterfaceNumber() {
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* 如果已经存在这个session_id(即key),则说明是重复数据,不再进行后续操作
|
||||
* @param buss
|
||||
* @return
|
||||
*/
|
||||
/*private boolean reduceByKey(BussFlowMidd buss) {
|
||||
String coding = CodingGeneration.md5GenCoding2(buss.ori_api);
|
||||
String key=buss.session_serial_number+"|"+buss.request_ip+"|"+buss.request_port+"|"+buss.response_ip+"|"+buss.response_port+"|"+coding+"|"+buss.business_detail_mesg;
|
||||
|
||||
//分布式同步锁,如果存在该key或者锁已经被获取则返回false,设置
|
||||
if(redisTemplate.opsForValue().setIfAbsent(key, System.currentTimeMillis()+"")){
|
||||
redisTemplate.expire(key,SystemConfigManager.keyExistTime,TimeUnit.SECONDS);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}*/
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
|
||||
|
||||
public class BussFlowOrl extends PacketBase {
|
||||
|
||||
public int msg_type;
|
||||
public int msg_version;
|
||||
public int msg_seq;
|
||||
public int msg_len;
|
||||
public long request_mac;
|
||||
public long response_mac;
|
||||
public long request_ip;
|
||||
public int request_port;
|
||||
public long response_ip;
|
||||
public int response_port;
|
||||
public int probeif;
|
||||
public int protocol;
|
||||
public long start_tv_sec;//Web开始时间秒
|
||||
public long start_tv_usec;//开始时间毫秒
|
||||
public long end_tv_sec;//结束时间秒
|
||||
public long end_tv_usec;//结束时间微妙
|
||||
public int req_len;
|
||||
public int res_len;
|
||||
public int busi_msg_len;
|
||||
public int key_msg_len;
|
||||
public int detail_msg_len;
|
||||
public int remain_len;
|
||||
public String business_code;
|
||||
public String sessionid;
|
||||
public String req_data;
|
||||
public String res_data;
|
||||
public String busi_msg;
|
||||
public String busi_key_msg;
|
||||
public String busi_detail_msg;
|
||||
public String remain_data;
|
||||
|
||||
@Override
|
||||
public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception {
|
||||
BussFlowOrl orl = new BussFlowOrl();
|
||||
orl.msg_type = header.getNow_type();
|
||||
orl.request_mac = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
orl.response_mac = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
orl.request_ip = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
orl.request_port = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
orl.response_ip = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
orl.response_port = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
orl.probeif = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
orl.protocol = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
orl.start_tv_sec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
orl.start_tv_usec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
orl.end_tv_sec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
orl.end_tv_usec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
int req_len = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int res_len = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int busi_msg_len = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int key_msg_len = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int detail_msg_len = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int remain_len = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
orl.business_code = ByteUtil.getString(data, offset, 32);
|
||||
offset += 32;
|
||||
orl.sessionid = ByteUtil.getString(data, offset, 80);
|
||||
offset += 80;
|
||||
orl.req_data = ByteUtil.getString(data, offset, req_len);
|
||||
offset += req_len;
|
||||
orl.res_data = ByteUtil.getString(data, offset, res_len);
|
||||
offset += res_len;
|
||||
orl.busi_msg = ByteUtil.getString(data, offset, busi_msg_len);
|
||||
offset += busi_msg_len;
|
||||
orl.busi_key_msg = ByteUtil.getString(data, offset, key_msg_len);
|
||||
offset += key_msg_len;
|
||||
orl.busi_detail_msg = ByteUtil.getString(data, offset, detail_msg_len);
|
||||
offset += detail_msg_len;
|
||||
orl.remain_data = ByteUtil.getString(data, offset, remain_len);
|
||||
offset += remain_len;
|
||||
return orl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUnitPacketLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getData() {
|
||||
return new String[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInterfaceNumber() {
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,362 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
||||
import com.ud.module.common.Constants;
|
||||
import com.ud.module.common.SystemConfigManager;
|
||||
import com.yuandian.dataflow.proto.decode.PacketBase;
|
||||
import com.yuandian.dataflow.proto.decode.PacketHeader;
|
||||
|
||||
import com.ud.module.util.flow.JXYDBPMDetailMesgUtils;
|
||||
import com.ud.module.util.flow.URLUtil;
|
||||
|
||||
|
||||
public class BussFlowWeb extends PacketBase {
|
||||
private Integer UNIT_DATA_LENGTH =6104;
|
||||
|
||||
private static int i = 1;
|
||||
@Override
|
||||
public int getUnitPacketLength() {
|
||||
return UNIT_DATA_LENGTH;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public BussFlowWeb Parse(PacketHeader header,ByteBuffer data)
|
||||
throws Exception {
|
||||
i++;
|
||||
BussFlowWeb buss = new BussFlowWeb();
|
||||
// buss.m_Header = header;
|
||||
|
||||
buss.msg_type = header.getTableID();
|
||||
// offset += 4;
|
||||
// buss.msg_version = ByteUtil.getInteger(data, offset, 1);
|
||||
// offset += 1;
|
||||
// buss.msg_seq = ByteUtil.getInteger(data, offset, 4);
|
||||
// offset += 4;
|
||||
// buss.msg_len = ByteUtil.getInteger(data, offset, 4);
|
||||
// offset += 4;
|
||||
|
||||
buss.src_mac = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
buss.dst_mac = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
// buss.vlan_id = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// buss.tos = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// buss.retran_count = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// buss.reset_count = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// buss.zerowin_count = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.protocol=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.bytes_out = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.bytes_in = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.package_out = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.package_in = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.session_serial_number = ByteUtil.getString(data, offset, 24); //5268109200
|
||||
offset += 24;
|
||||
// buss.recog_status = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.req_method = ByteUtil.getLong(data, offset, 4); //1
|
||||
offset += 4;
|
||||
buss.content_type = ByteUtil.getString(data, offset, 40);
|
||||
offset += 40;
|
||||
buss.accept = ByteUtil.getString(data, offset, 40); //image/png, image/svg+xml, image/*;q=0.8
|
||||
offset += 40;
|
||||
buss.probe_if=ByteUtil.getInteger(data, offset,4);
|
||||
offset += 4;
|
||||
buss.channel=ByteUtil.getString(data, offset,24);
|
||||
offset += 24;
|
||||
buss.session_id=ByteUtil.getString(data, offset,80); //7Ip6bYet8n0wfxeJMJtAm7v2FW76ZTImJVyNjbYoRbUdUf71QaxQ!-202316728
|
||||
offset += 80;
|
||||
buss.request_ip=ByteUtil.getLong(data, offset,4); //3232235850
|
||||
offset += 4;
|
||||
buss.request_port=ByteUtil.getInteger(data, offset,4); //3309
|
||||
offset += 4;
|
||||
buss.response_ip=ByteUtil.getLong(data, offset,4); //1971882754
|
||||
offset += 4;
|
||||
buss.response_port=ByteUtil.getInteger(data, offset,4); //80
|
||||
offset += 4;
|
||||
//开始时间
|
||||
buss.start_tv_sec=ByteUtil.getLong(data, offset,4); //1492473697
|
||||
offset +=4;
|
||||
buss.start_tv_usec=ByteUtil.getLong(data, offset,4); //459461
|
||||
offset +=4;
|
||||
//结束时间
|
||||
buss.end_tv_sec=ByteUtil.getLong(data, offset,4); //1492473697
|
||||
offset +=4;
|
||||
buss.end_tv_usec=ByteUtil.getLong(data, offset,4); //459490
|
||||
offset +=4;
|
||||
|
||||
buss.server_translate_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
|
||||
buss.server_start_tv_sec=ByteUtil.getInteger(data, offset,4);
|
||||
offset +=4;
|
||||
buss.server_start_tv_usec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
//结束时间
|
||||
buss.server_end_tv_sec=ByteUtil.getInteger(data, offset,4);
|
||||
offset +=4;
|
||||
buss.server_end_tv_usec=ByteUtil.getLong(data, offset,4);
|
||||
offset +=4;
|
||||
|
||||
buss.server_response_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
buss.client_translate_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
// buss.locate_server_translate_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
// buss.locate_server_response_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
// buss.locate_client_translate_time=ByteUtil.getLong(data, offset,8);
|
||||
offset += 8;
|
||||
buss.x_requested_with = ByteUtil.getString(data, offset, 20);
|
||||
offset += 20;
|
||||
buss.operating_system=ByteUtil.getString(data, offset,20); //Windows NT 6.1
|
||||
offset += 20;
|
||||
buss.server_res_code=ByteUtil.getInteger(data, offset,2);
|
||||
offset += 2;
|
||||
buss.browser=ByteUtil.getString(data, offset,30);
|
||||
offset += 30;
|
||||
buss.isUncomplete = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
buss.time_flag=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int detailMsgLen = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int keyMsgLen = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int reqMsgLeng= ByteUtil.getInteger(data, offset, 4);
|
||||
buss.request_msg_length =reqMsgLeng;
|
||||
offset += 4;
|
||||
int resMsgLength=ByteUtil.getInteger(data, offset, 4);
|
||||
buss.response_msg_length = resMsgLength;
|
||||
offset += 4;
|
||||
int cookieLength=ByteUtil.getInteger(data, offset, 4);
|
||||
buss.req_cookie_leng=cookieLength;
|
||||
offset +=4;
|
||||
int url_len=ByteUtil.getInteger(data, offset, 4);
|
||||
offset +=4;
|
||||
int refer_len=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
int remainLen = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
|
||||
// if (SystemConfigManager.systemCode.equals("JXYDBPM")) {
|
||||
// buss.business_detail_mesg=JXYDBPMDetailMesgUtils.decodeValue(ByteUtil.getString(data, offset,detailMsgLen));
|
||||
// } else {
|
||||
// buss.business_detail_mesg=ByteUtil.getString(data, offset,detailMsgLen);
|
||||
// }
|
||||
buss.business_detail_mesg=ByteUtil.getString(data, offset,detailMsgLen);
|
||||
offset += detailMsgLen;
|
||||
buss.bussiness_key_mesg = ByteUtil.getString(data, offset,keyMsgLen);
|
||||
offset += keyMsgLen;
|
||||
buss.request_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, reqMsgLeng);
|
||||
offset += reqMsgLeng;
|
||||
buss.response_msg_detail = ByteUtil.getStringIgnoreZero(data, offset,resMsgLength);
|
||||
offset +=resMsgLength;
|
||||
buss.req_cookie_detail = ByteUtil.getString(data, offset,cookieLength);
|
||||
offset += cookieLength;
|
||||
buss.ori_url=ByteUtil.getString(data, offset,url_len); // /images/index/220130.gif
|
||||
if(buss.ori_url!=null && !"".equals(buss.ori_url.trim())){
|
||||
String result = buss.ori_url.replaceAll("\\?", "").replaceAll("\\/", "").replaceAll("\\*", "").replaceAll("\\=", "").replaceAll("\\&", "").replaceAll("\\+", "");
|
||||
if(isMessyCode(result)){//如果中文乱码
|
||||
String withoutValue = URLUtil.getInsertURL(buss.ori_url, Constants.SPLIT_NULL).trim();//去参数值
|
||||
String result2 = withoutValue.replaceAll("\\?", "").replaceAll("\\/", "").replaceAll("\\*", "").replaceAll("\\=", "").replaceAll("\\&", "").replaceAll("\\+", "");
|
||||
if(isMessyCode(result2)){//判断是否还乱码
|
||||
buss.ori_url="";
|
||||
}else{
|
||||
buss.ori_url=withoutValue;
|
||||
}
|
||||
};
|
||||
}
|
||||
offset += url_len;
|
||||
buss.reter_url = ByteUtil.getString(data, offset, refer_len); // http://www.10086.cn/gd/index_200_200.html
|
||||
offset += refer_len;
|
||||
buss.remain_data = ByteUtil.getString(data, offset, remainLen);
|
||||
offset += remainLen;
|
||||
/*if(SystemConfigManager.projectName.contains("GDYDBPM")){
|
||||
boolean exist=false;
|
||||
if(SystemConfigManager.reduceFlag){
|
||||
|
||||
redisTemplate = (RedisTemplate<String, String>) ApplicationContextUtil.getBean("redisTemplate");
|
||||
exist = reduceByKey(buss);
|
||||
}
|
||||
|
||||
//不存在就加入缓存
|
||||
if(!exist)return buss;
|
||||
//存在就返回空
|
||||
return null;
|
||||
}
|
||||
*/
|
||||
|
||||
return buss;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 如果已经存在这个key,则说明是重复数据,不再进行后续操作
|
||||
* @param buss
|
||||
* @return
|
||||
*/
|
||||
/*private boolean reduceByKey(BussFlowWeb buss) {
|
||||
|
||||
|
||||
String coding = CodingGeneration.md5GenCoding2(buss.ori_url);
|
||||
String key=buss.session_serial_number+"|"+buss.request_ip+"|"+buss.request_port+"|"+buss.response_ip+"|"+buss.response_port+"|"+coding+"|"+buss.business_detail_mesg;
|
||||
|
||||
//分布式同步锁,如果存在该key或者锁已经被获取则返回false,设置
|
||||
|
||||
boolean flag=redisTemplate.opsForValue().setIfAbsent(key, System.currentTimeMillis()+"");
|
||||
|
||||
if(flag){
|
||||
redisTemplate.expire(key,SystemConfigManager.keyExistTime,TimeUnit.SECONDS);
|
||||
//原来不存在这条数据
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}*/
|
||||
|
||||
/**
|
||||
* 判断字符串是否是乱码
|
||||
*
|
||||
* @param strName 字符串
|
||||
* @return 是否是乱码
|
||||
*/
|
||||
|
||||
public static boolean isMessyCode(String strName) {
|
||||
Pattern p = Pattern.compile("\\s*|\t*|\r*|\n*");
|
||||
Matcher m = p.matcher(strName);
|
||||
String after = m.replaceAll("").replace("-", "").replace("|", "");
|
||||
String temp = after.replaceAll("\\p{P}", "");
|
||||
char[] ch = temp.trim().toCharArray();
|
||||
float chLength = 0 ;
|
||||
float count = 0;
|
||||
for (int i = 0; i < ch.length; i++) {
|
||||
char c = ch[i];
|
||||
if (!Character.isLetterOrDigit(c)) {
|
||||
if (!isChinese(c)) {
|
||||
count = count + 1;
|
||||
}
|
||||
chLength++;
|
||||
}
|
||||
}
|
||||
float result = count / chLength ;
|
||||
if (result > 0.4) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isChinese(char c) {
|
||||
Character.UnicodeBlock ub = Character.UnicodeBlock.of(c);
|
||||
if (ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS
|
||||
|| ub == Character.UnicodeBlock.CJK_COMPATIBILITY_IDEOGRAPHS
|
||||
|| ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS_EXTENSION_A
|
||||
|| ub == Character.UnicodeBlock.GENERAL_PUNCTUATION
|
||||
|| ub == Character.UnicodeBlock.CJK_SYMBOLS_AND_PUNCTUATION
|
||||
|| ub == Character.UnicodeBlock.HALFWIDTH_AND_FULLWIDTH_FORMS) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public String id; //id
|
||||
public int msg_len; //消息长度
|
||||
public int msg_type; //消息类型
|
||||
public long src_mac;
|
||||
public long dst_mac;
|
||||
public int protocol; //协议名
|
||||
public String session_serial_number;//会话序列号
|
||||
public String buss_type;//业务服务资源编码(C_01)
|
||||
// public String net_segment;//网段标识(客户-web)
|
||||
public long request_ip;//Web客户端IP
|
||||
public int request_port;//Web客户端端口
|
||||
public long response_ip;//Web服务器IP
|
||||
public int response_port;//Web服务器端口
|
||||
public long start_tv_sec;//Web开始时间秒
|
||||
public long start_tv_usec;//开始时间毫秒
|
||||
public long end_tv_sec;//结束时间秒
|
||||
public long end_tv_usec;//结束时间微妙
|
||||
public String disc_resource_ident; //web:url mid:apiBody db:sql
|
||||
public String operating_system; //web:操作系统 midd:interfaceName db:db_name
|
||||
//web midd
|
||||
public String session_id;//sessionid
|
||||
public int request_msg_length; //请求报文长度
|
||||
public String request_msg_detail; //请求报文详情
|
||||
public int response_msg_length; //响应报文长度
|
||||
public String response_msg_detail; //响应报文详情
|
||||
//web段
|
||||
public String reter_url;
|
||||
public String x_requested_with;
|
||||
public long req_method; //请求方式
|
||||
public String content_type; //请求类型
|
||||
public String accept; //jieshou
|
||||
public int req_cookie_leng;//请求cookie报文长度
|
||||
public String req_cookie_detail;//请求cookie报文详情
|
||||
public long t_intodb_time;
|
||||
public int load_or_step; //0: 页面 1:加载项 2:非web段数据
|
||||
public String business_detail_mesg;
|
||||
public String bussiness_key_mesg; //关键字 格式:key=val|key=val....
|
||||
public int isUncomplete; //组包是否完全 0:组包完整 1:不完整
|
||||
public int deal_state=1;
|
||||
public int server_res_code;
|
||||
public long server_response_time;
|
||||
public long client_translate_time;
|
||||
public String browser;
|
||||
public int server_start_tv_sec;
|
||||
public long server_start_tv_usec;
|
||||
public int server_end_tv_sec;
|
||||
public long server_end_tv_usec;
|
||||
public String probe_ip;
|
||||
public int probe_if;
|
||||
public long server_translate_time;
|
||||
public long time_flag;
|
||||
public String channel;
|
||||
public String base_code;
|
||||
public String ori_url;
|
||||
public String remain_data;
|
||||
public long bytes_in;
|
||||
public long bytes_out;
|
||||
public int package_in;
|
||||
public int package_out;
|
||||
|
||||
public String dataId;
|
||||
|
||||
public int filterId;
|
||||
|
||||
public String whiteCharacter;
|
||||
|
||||
// tokenId
|
||||
public String tokenId;
|
||||
// 判断是否是首端资源( 2 )
|
||||
public Integer segmentId;
|
||||
|
||||
@Override
|
||||
public String[] getData() {
|
||||
String data[] = new String[38];
|
||||
return data;
|
||||
}
|
||||
|
||||
public String getBussType() {
|
||||
return buss_type;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getInterfaceNumber() {
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,379 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Date;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* 移植zjm流量信息接收类
|
||||
* Leehr
|
||||
*
|
||||
*/
|
||||
public class DataBaseModel extends PacketBase implements Serializable{
|
||||
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private static final String split=";";
|
||||
|
||||
private long mac_src; //源MAC
|
||||
private long mac_dst; //目标MAC
|
||||
private long ip_src; //源IP
|
||||
private long ip_dst; //目标IP
|
||||
private long port_src; //源端口,如果没有,为-1
|
||||
private long port_dst; //目标端口,如果没有,为-1
|
||||
private long l3_proto; //第三层协议ID,如果没有,为-1
|
||||
private long l4_proto; //第四层协议ID,如果没有,为-1
|
||||
private long tos; //Tos,一个字节,如果没有,为-1
|
||||
private long vlan_id; //vlan ID,如果没有,为-1
|
||||
|
||||
private long bytes; //字节总数
|
||||
private long packets; //数据包总数
|
||||
|
||||
private long packets_syn; //TCP同步包数
|
||||
private long packets_syn_ack; //TCP同步确认包数
|
||||
private long packets_syn_rst; //TCP同步重置包数
|
||||
|
||||
private long timestamp; //时间戳,秒
|
||||
|
||||
private long appid;//应用ID
|
||||
private long app_group_id;
|
||||
private long mpls_label;
|
||||
|
||||
private long pkts_syn_rx; //tcp同步包,接收
|
||||
private long pkts_syn_ack_rx;//tcp同步确认包,接收
|
||||
private long pkts_syn_rst_rx; //tcp同步重置包,接收
|
||||
private long pkts_fin;//tcp终止包,接收
|
||||
private long pkts_rst;//tcp重置包,接收
|
||||
|
||||
private long bytes_rx;//字节收
|
||||
private long packets_rx;//数据包收
|
||||
|
||||
private long probe_time_sec;
|
||||
private Date probe_time;
|
||||
private Date createTime;
|
||||
|
||||
public String getKey(){
|
||||
StringBuffer strb=new StringBuffer();
|
||||
strb.append(timestamp).append(split);
|
||||
strb.append(mac_src).append(split);
|
||||
strb.append(mac_dst).append(split);
|
||||
strb.append(ip_src).append(split);
|
||||
strb.append(ip_dst).append(split);
|
||||
strb.append(port_src).append(split);
|
||||
strb.append(port_dst).append(split);
|
||||
strb.append(l3_proto).append(split);
|
||||
strb.append(l4_proto).append(split);
|
||||
strb.append(tos).append(split);
|
||||
strb.append(vlan_id).append(split);
|
||||
strb.append(appid);
|
||||
return strb.toString();
|
||||
}
|
||||
|
||||
public DataBaseModel Parse(byte[] data, int offset)throws Exception {
|
||||
|
||||
if (offset + 108 > data.length)
|
||||
throw new Exception("data length error!");
|
||||
|
||||
DataBaseModel db = new DataBaseModel();
|
||||
db.mac_src = ByteUtil.getLong(data, offset, 8);
|
||||
offset+=8;
|
||||
db.mac_dst = ByteUtil.getLong(data, offset, 8);
|
||||
offset+=8;
|
||||
db.ip_src = ByteUtil.getLong(data, offset, 8);
|
||||
offset+=8;
|
||||
db.ip_dst = ByteUtil.getLong(data, offset, 8);
|
||||
offset+=8;
|
||||
|
||||
db.port_src = ByteUtil.getInteger(data, offset, 4);
|
||||
offset+=4;
|
||||
db.port_dst = ByteUtil.getInteger(data, offset, 4);
|
||||
offset+=4;
|
||||
db.l3_proto = ByteUtil.getInteger(data, offset, 4);
|
||||
offset+=4;
|
||||
db.l4_proto = ByteUtil.getInteger(data, offset, 4);
|
||||
offset+=4;
|
||||
db.tos = ByteUtil.getInteger(data, offset, 4);
|
||||
offset+=4;
|
||||
db.vlan_id = ByteUtil.getInteger(data, offset, 4);
|
||||
offset+=4;
|
||||
db.bytes = ByteUtil.getLong(data, offset, 8);
|
||||
offset+=8;
|
||||
db.packets = ByteUtil.getLong(data, offset, 8);
|
||||
offset+=8;
|
||||
db.packets_syn = ByteUtil.getLong(data, offset, 8);
|
||||
|
||||
offset+=8;
|
||||
db.packets_syn_ack = ByteUtil.getLong(data, offset, 8);
|
||||
offset+=8;
|
||||
db.packets_syn_rst = ByteUtil.getLong(data, offset, 8);
|
||||
offset+=8;
|
||||
/**
|
||||
* 2015-12-10txy 新增3个解析
|
||||
*/
|
||||
db.appid=ByteUtil.getLong(data, offset, 4);
|
||||
offset+=4;
|
||||
db.app_group_id=ByteUtil.getLong(data, offset, 4);
|
||||
offset+=4;
|
||||
db.mpls_label=ByteUtil.getLong(data, offset, 4);
|
||||
offset+=4;
|
||||
return db;
|
||||
}
|
||||
|
||||
public String toString(){
|
||||
StringBuffer strb = new StringBuffer();
|
||||
strb.append(timestamp).append(",");
|
||||
strb.append(mac_src).append(",");
|
||||
strb.append(mac_dst).append(",");
|
||||
strb.append(ip_src).append(",");
|
||||
strb.append(ip_dst).append(",");
|
||||
strb.append(port_src).append(",");
|
||||
strb.append(port_dst).append(",");
|
||||
strb.append(l3_proto).append(",");
|
||||
strb.append(l4_proto).append(",");
|
||||
strb.append(tos).append(",");
|
||||
strb.append(vlan_id).append(",");
|
||||
strb.append(appid).append(",");
|
||||
strb.append(packets_syn).append(",");
|
||||
strb.append(pkts_syn_rx).append(",");
|
||||
strb.append(packets_syn_ack).append(",");
|
||||
strb.append(pkts_syn_ack_rx).append(",");
|
||||
strb.append(packets_syn_rst).append(",");
|
||||
strb.append(pkts_syn_rst_rx).append(",");
|
||||
strb.append(pkts_fin).append(",");
|
||||
strb.append(pkts_rst).append(",");
|
||||
strb.append(bytes).append(",");
|
||||
strb.append(packets).append(",");
|
||||
strb.append(bytes_rx).append(",");
|
||||
strb.append(packets_rx).append(",");
|
||||
strb.append(app_group_id).append(",");
|
||||
strb.append(mpls_label).append("\n");
|
||||
return strb.toString();
|
||||
}
|
||||
|
||||
|
||||
public long getProbeTimeSec() {
|
||||
return probe_time_sec;
|
||||
}
|
||||
|
||||
public void setProbe_time_sec(long probe_time_sec) {
|
||||
this.probe_time_sec = probe_time_sec;
|
||||
}
|
||||
|
||||
public Date getProbe_time() {
|
||||
return probe_time;
|
||||
}
|
||||
|
||||
public void setProbe_time(Date probe_time) {
|
||||
this.probe_time = probe_time;
|
||||
}
|
||||
|
||||
public Date getCreateTime() {
|
||||
return createTime;
|
||||
}
|
||||
|
||||
public void setCreateTime(Date createTime) {
|
||||
this.createTime = createTime;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
public void setTimestamp(long timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
public long getMac_src() {
|
||||
return mac_src;
|
||||
}
|
||||
public void setMac_src(long macSrc) {
|
||||
mac_src = macSrc;
|
||||
}
|
||||
public long getMac_dst() {
|
||||
return mac_dst;
|
||||
}
|
||||
public void setMac_dst(long macDst) {
|
||||
mac_dst = macDst;
|
||||
}
|
||||
public long getIp_src() {
|
||||
return ip_src;
|
||||
}
|
||||
public void setIp_src(long ipSrc) {
|
||||
ip_src = ipSrc;
|
||||
}
|
||||
public long getIp_dst() {
|
||||
return ip_dst;
|
||||
}
|
||||
public void setIp_dst(long ipDst) {
|
||||
ip_dst = ipDst;
|
||||
}
|
||||
public long getPort_src() {
|
||||
return port_src;
|
||||
}
|
||||
public void setPort_src(long portSrc) {
|
||||
port_src = portSrc;
|
||||
}
|
||||
public long getPort_dst() {
|
||||
return port_dst;
|
||||
}
|
||||
public void setPort_dst(long portDst) {
|
||||
port_dst = portDst;
|
||||
}
|
||||
public long getL3_proto() {
|
||||
return l3_proto;
|
||||
}
|
||||
public void setL3_proto(long l3Proto) {
|
||||
l3_proto = l3Proto;
|
||||
}
|
||||
public long getL4_proto() {
|
||||
return l4_proto;
|
||||
}
|
||||
public void setL4_proto(long l4Proto) {
|
||||
l4_proto = l4Proto;
|
||||
}
|
||||
public long getTos() {
|
||||
return tos;
|
||||
}
|
||||
public void setTos(long tos) {
|
||||
this.tos = tos;
|
||||
}
|
||||
public long getVlan_id() {
|
||||
return vlan_id;
|
||||
}
|
||||
public void setVlan_id(long vlanId) {
|
||||
vlan_id = vlanId;
|
||||
}
|
||||
public long getAppid() {
|
||||
return appid;
|
||||
}
|
||||
public void setAppid(long appid) {
|
||||
this.appid = appid;
|
||||
}
|
||||
|
||||
public long getPkts_syn_rx() {
|
||||
return pkts_syn_rx;
|
||||
}
|
||||
public void setPkts_syn_rx(long pktsSynRx) {
|
||||
pkts_syn_rx = pktsSynRx;
|
||||
}
|
||||
|
||||
public long getPkts_syn_ack_rx() {
|
||||
return pkts_syn_ack_rx;
|
||||
}
|
||||
public void setPkts_syn_ack_rx(long pktsSynAckRx) {
|
||||
pkts_syn_ack_rx = pktsSynAckRx;
|
||||
}
|
||||
|
||||
public long getPkts_syn_rst_rx() {
|
||||
return pkts_syn_rst_rx;
|
||||
}
|
||||
public void setPkts_syn_rst_rx(long pktsSynRstRx) {
|
||||
pkts_syn_rst_rx = pktsSynRstRx;
|
||||
}
|
||||
public long getPkts_fin() {
|
||||
return pkts_fin;
|
||||
}
|
||||
public void setPkts_fin(long pktsFin) {
|
||||
pkts_fin = pktsFin;
|
||||
}
|
||||
public long getPkts_rst() {
|
||||
return pkts_rst;
|
||||
}
|
||||
public void setPkts_rst(long pktsRst) {
|
||||
pkts_rst = pktsRst;
|
||||
}
|
||||
public long getBytes() {
|
||||
return bytes;
|
||||
}
|
||||
public void setBytes(long bytes) {
|
||||
this.bytes = bytes;
|
||||
}
|
||||
public long getPackets() {
|
||||
return packets;
|
||||
}
|
||||
public void setPackets(long packets) {
|
||||
this.packets = packets;
|
||||
}
|
||||
public long getBytes_rx() {
|
||||
return bytes_rx;
|
||||
}
|
||||
public void setBytes_rx(long bytesRx) {
|
||||
bytes_rx = bytesRx;
|
||||
}
|
||||
public long getPackets_rx() {
|
||||
return packets_rx;
|
||||
}
|
||||
public void setPackets_rx(long packetsRx) {
|
||||
packets_rx = packetsRx;
|
||||
}
|
||||
|
||||
public long getPackets_syn() {
|
||||
return packets_syn;
|
||||
}
|
||||
|
||||
public void setPackets_syn(long packetsSyn) {
|
||||
packets_syn = packetsSyn;
|
||||
}
|
||||
|
||||
public long getPackets_syn_ack() {
|
||||
return packets_syn_ack;
|
||||
}
|
||||
|
||||
public void setPackets_syn_ack(long packetsSynAck) {
|
||||
packets_syn_ack = packetsSynAck;
|
||||
}
|
||||
|
||||
public long getPackets_syn_rst() {
|
||||
return packets_syn_rst;
|
||||
}
|
||||
|
||||
public void setPackets_syn_rst(long packetsSynRst) {
|
||||
packets_syn_rst = packetsSynRst;
|
||||
}
|
||||
|
||||
public long getApp_group_id() {
|
||||
return app_group_id;
|
||||
}
|
||||
|
||||
public void setApp_group_id(long app_group_id) {
|
||||
this.app_group_id = app_group_id;
|
||||
}
|
||||
|
||||
public long getMpls_label() {
|
||||
return mpls_label;
|
||||
}
|
||||
|
||||
public void setMpls_label(long mpls_label) {
|
||||
this.mpls_label = mpls_label;
|
||||
}
|
||||
|
||||
public static String getSplit() {
|
||||
return split;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PacketBase Parse(PacketHeader header,ByteBuffer data)
|
||||
throws Exception {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUnitPacketLength() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getData() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInterfaceNumber() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
160
src/main/java/com/yuandian/dataflow/.proto/decode/DataFlow.java
Normal file
160
src/main/java/com/yuandian/dataflow/.proto/decode/DataFlow.java
Normal file
|
@ -0,0 +1,160 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
|
||||
|
||||
public class DataFlow extends PacketBase {
|
||||
private final int UNIT_DATA_LENGTH = 157;//贵阳
|
||||
public String id;
|
||||
|
||||
public Integer msg_type; //消息类型
|
||||
public Integer msg_version; //消息版本
|
||||
public Integer msg_seq; //序列号
|
||||
public Integer msg_len; //消息长度
|
||||
public Integer probe_if; //接口号
|
||||
public Long timestamp; //时间戳
|
||||
public Long mac_src; //源物理地址
|
||||
public Long mac_dst; //目的物理地址
|
||||
public Integer vlan_id; //vlan_id
|
||||
public Long l3_proto; //l3层协议
|
||||
public Long l4_proto; //l4层协议
|
||||
public Integer tos; //tos
|
||||
public Integer retran_count; //重传次数
|
||||
public Integer reset_count; //重置次数
|
||||
public Integer zerowin_count; //零窗口次数
|
||||
public Integer protocol; //协议名
|
||||
public Long seq;
|
||||
public Long ack;
|
||||
public Integer recog_status; //识别类型标识
|
||||
public Long bytes; //总字节
|
||||
public Long packets; //总包数
|
||||
public Integer start_tv_sec;//Web开始时间秒
|
||||
public Long start_tv_usec;//开始时间毫秒
|
||||
public Integer end_tv_sec;//结束时间秒
|
||||
public Long end_tv_usec;//结束时间微妙
|
||||
public Integer server_start_tv_sec;//服务器响应开始时间秒
|
||||
public Long server_start_tv_usec;//服务器响应开始时间毫秒
|
||||
public Integer server_end_tv_sec;//服务器响应结束时间秒
|
||||
public Long server_end_tv_usec;//服务器响应结束时间微妙
|
||||
|
||||
public Long server_response_time;//Web服务器响应时间
|
||||
public Long client_translate_time;//Web客户端传输耗时
|
||||
public Long server_translate_time;//Web服务器传输耗时
|
||||
|
||||
public Long bytes_in;
|
||||
public Long bytes_out;
|
||||
public Long packets_in;
|
||||
public Long packets_out;
|
||||
public Long ip_src; //源IP
|
||||
public Long ip_dst; //目的IP
|
||||
public Long port_src; //源端口
|
||||
public Long port_dst; //目的端口
|
||||
public Long probeIP; //探针IP
|
||||
|
||||
public Long intodb_time;
|
||||
public Long count = 1L;
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception {
|
||||
if (offset + UNIT_DATA_LENGTH > data.length)
|
||||
throw new Exception("data length error!");
|
||||
DataFlow dflow = new DataFlow();
|
||||
// dflow.m_Header = header;
|
||||
|
||||
dflow.msg_type = header.getTableID();
|
||||
// offset += 4;
|
||||
// dflow.msg_version = ByteUtil.getInteger(data, offset, 1);
|
||||
// offset += 1;
|
||||
// dflow.msg_seq = ByteUtil.getInteger(data, offset, 4);
|
||||
// offset += 4;
|
||||
// dflow.msg_len = ByteUtil.getInteger(data, offset, 4);
|
||||
// offset += 4;
|
||||
dflow.ip_src = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
dflow.ip_dst = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.port_src = ByteUtil.getLong(data, offset, 2);
|
||||
offset += 2;
|
||||
// dflow.port_dst = ByteUtil.getLong(data, offset, 2);
|
||||
offset += 2;
|
||||
dflow.probe_if = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.timestamp = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.mac_src = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
// dflow.mac_dst = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
// dflow.vlan_id = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.l3_proto = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.l4_proto = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.tos = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.retran_count = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.reset_count = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.zerowin_count = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.protocol=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.seq = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.ack = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.recog_status = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
dflow.bytes_in = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
dflow.bytes_out = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
dflow.packets_in = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
dflow.packets_out = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
dflow.bytes = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
dflow.packets = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.start_tv_sec = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.start_tv_usec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.end_tv_sec = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.end_tv_usec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.server_start_tv_sec = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.server_start_tv_usec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.server_end_tv_sec = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.server_end_tv_usec = ByteUtil.getLong(data, offset, 4);
|
||||
offset += 4;
|
||||
// dflow.server_translate_time = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
// dflow.client_translate_time = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
// dflow.server_response_time = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
return dflow;
|
||||
}
|
||||
@Override
|
||||
public int getUnitPacketLength() {
|
||||
return UNIT_DATA_LENGTH;
|
||||
}
|
||||
@Override
|
||||
public String[] getData() {
|
||||
String data[] = new String[45];
|
||||
return data;
|
||||
}
|
||||
@Override
|
||||
public int getInterfaceNumber() {
|
||||
return Integer.parseInt(probe_if.toString());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public abstract class PacketBase {
|
||||
|
||||
protected PacketHeader m_Header = null;
|
||||
|
||||
public PacketHeader getPacketHead(){
|
||||
return m_Header;
|
||||
}
|
||||
|
||||
public void setPacketHeader(PacketHeader header)
|
||||
{
|
||||
this.m_Header = header;
|
||||
}
|
||||
|
||||
//抽象方法,每个数据操作类实现不一样
|
||||
public abstract PacketBase Parse(PacketHeader header,ByteBuffer data) throws Exception;
|
||||
|
||||
public abstract int getUnitPacketLength();
|
||||
|
||||
public abstract String[] getData();
|
||||
|
||||
public abstract int getInterfaceNumber();
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
|
||||
|
||||
public class PacketHeader {
|
||||
|
||||
private int m_TableID = -1; //数据类型
|
||||
private int m_RecCount = 0; //数据总条数
|
||||
private int msg_len; //数据报文总长度,60010端口为压缩后长度
|
||||
private long timestamp; //60010端口发送数据时间
|
||||
private int probe_if; //60010端口抓包口
|
||||
private int umcompr_len;//60010端口数据报文压缩前长度
|
||||
private int now_type; //记录25类型数据数据当前数据类型 22 23 24
|
||||
|
||||
public int getTableID() {
|
||||
return m_TableID;
|
||||
}
|
||||
public int getMsg_len() {
|
||||
return msg_len;
|
||||
}
|
||||
public int getRecCount() {
|
||||
return m_RecCount;
|
||||
}
|
||||
public long getTimestamp() {
|
||||
return timestamp ;
|
||||
}
|
||||
public int getProbe_id(){
|
||||
return probe_if;
|
||||
}
|
||||
public int getUmcompr_len(){
|
||||
return umcompr_len;
|
||||
}
|
||||
|
||||
public PacketHeader(byte[] data) throws Exception {
|
||||
if (data.length != 12 && data.length != 20) throw new Exception("Packet header length error!");
|
||||
this.m_TableID = ByteUtil.getInteger(data, 0, 4);//22
|
||||
this.m_RecCount = ByteUtil.getInteger(data, 4, 4);//4000
|
||||
this.msg_len = ByteUtil.getInteger(data, 8, 4);
|
||||
}
|
||||
|
||||
public void parseNextHeader_60010(byte[] data) throws Exception {
|
||||
if (data.length != 12 && data.length != 20) throw new Exception("Packet header length error!");
|
||||
this.timestamp = ByteUtil.getLong(data, 0, 4);
|
||||
this.probe_if = ByteUtil.getInteger(data, 4, 4);
|
||||
this.umcompr_len = ByteUtil.getInteger(data, 8, 4);
|
||||
}
|
||||
|
||||
public void setNow_type(int now_type) {
|
||||
this.now_type = now_type;
|
||||
}
|
||||
|
||||
public int getNow_type(){
|
||||
return now_type;
|
||||
}
|
||||
}
|
156
src/main/java/com/yuandian/dataflow/.proto/decode/QoeFlow.java
Normal file
156
src/main/java/com/yuandian/dataflow/.proto/decode/QoeFlow.java
Normal file
|
@ -0,0 +1,156 @@
|
|||
/**
|
||||
*
|
||||
*/
|
||||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
/**
|
||||
* @author l
|
||||
* 网络性能流
|
||||
*/
|
||||
public class QoeFlow extends PacketBase {
|
||||
private String qoe_Flow_Id;
|
||||
public static int SIZE = 38*4;
|
||||
// 字段类型 字段 原始类型 字节数 说明
|
||||
public int srcIp; //uint32_t 4 源ip地址
|
||||
public int dstIp; //uint32_t 4 目的ip地址
|
||||
public int stvSec; //uint32_t 4 开始时间秒
|
||||
public int stvUsec; //uint32_t 4 开始时间微秒
|
||||
public int ltvSec; //uint32_t 4 最后更新时间秒
|
||||
public int ltvUsec; //uint32_t 4 最后更新时间微秒
|
||||
public int dst2ResponNum; //uint32_t 4 响应总量
|
||||
public int dst2Fast; //uint32_t 4
|
||||
public int dst2FastExpected; //uint32_t 4
|
||||
public int dst2ExpectedDegrated; //uint32_t 4
|
||||
public int dst2DegratedService; //uint32_t 4
|
||||
public int dst2ServiceAvailability; //uint32_t 4
|
||||
public int dst2ResponTimeout; //uint32_t 4 响应超时数
|
||||
public int dst2ResponSuccess; //uint32_t 4 响应成功数
|
||||
public int dst2ResponFail; //uint32_t 4 响应失败数
|
||||
public int dst2ResponPeek; //uint32_t 4 峰值响应时间
|
||||
public int dst2ResponAverage; //uint32_t 4 响应时间均值
|
||||
public int csWindow; //uint32_t 4
|
||||
public int scWindow; //uint32_t 4
|
||||
public int csReset; //uint32_t 4
|
||||
public int scReset; //uint32_t 4
|
||||
public int csRetran; //uint32_t 4
|
||||
public int scRetran; //uint32_t 4
|
||||
public int appId; //uint32_t 4 Aphid
|
||||
public int appGroupId; //uint32_t 4 app组id
|
||||
public int probeIf; //uint32_t 4 探针接口id
|
||||
public int appStyle; //uint32_t 4
|
||||
public int timeFlag; //uint32_t 4 发送时间戳
|
||||
public int connSetupTm; //uint32_t 4 链接建立时间
|
||||
public int dataTransferTm; //uint32_t 4 数据传输时间
|
||||
public int retransDelayTm; //uint32_t 4 数据重传时延
|
||||
public int networkInbound; //uint32_t 4 网络响应时间(c->s)
|
||||
public int networkOutbound; //uint32_t 4 网络响应时间(s->c)
|
||||
public int newSession; //uint32_t 4 新会话数
|
||||
public int userEvents; //uint32_t 4 用户事件
|
||||
public int serverEvents; //uint32_t 4 服务事件
|
||||
public int connSetupPeek; //uint32_t 4 连接建立时间峰值
|
||||
public int vlanId; //uint32_t 4
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public QoeFlow Parse(PacketHeader header,ByteBuffer data) throws Exception {
|
||||
QoeFlow qoeFlow = new QoeFlow();
|
||||
qoeFlow.srcIp=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.dstIp=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.stvSec=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.stvUsec=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.ltvSec=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.ltvUsec=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.dst2ResponNum=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.dst2Fast=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.dst2FastExpected=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.dst2ExpectedDegrated=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.dst2DegratedService=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.dst2ServiceAvailability=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.dst2ResponTimeout=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.dst2ResponSuccess=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.dst2ResponFail=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.dst2ResponPeek=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.dst2ResponAverage=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.csWindow=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.scWindow=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.csReset=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.scReset=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.csRetran=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.scRetran=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.appId=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.appGroupId=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.probeIf=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.appStyle=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.timeFlag=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.connSetupTm=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.dataTransferTm=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.retransDelayTm=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.networkInbound=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.networkOutbound=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.newSession=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.userEvents=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.serverEvents=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.connSetupPeek=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
qoeFlow.vlanId=ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
return qoeFlow;
|
||||
}
|
||||
@Override
|
||||
public int getUnitPacketLength() {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
@Override
|
||||
public String[] getData() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
public int getInterfaceNumber() {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
114
src/main/java/com/yuandian/dataflow/.proto/decode/SstFlow.java
Normal file
114
src/main/java/com/yuandian/dataflow/.proto/decode/SstFlow.java
Normal file
|
@ -0,0 +1,114 @@
|
|||
package com.yuandian.dataflow.proto.decode;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
|
||||
|
||||
public class SstFlow extends PacketBase{
|
||||
|
||||
public static int SIZE = 108;
|
||||
|
||||
public long mac_src; //源MAC
|
||||
public long mac_dst; //目标MAC
|
||||
public long ip_src; //源IP
|
||||
public long ip_dst; //目标IP
|
||||
public int port_src; //源端口,如果没有,为-1
|
||||
public int port_dst; //目标端口,如果没有,为-1
|
||||
public int l3_proto; //第三层协议ID,如果没有,为-1
|
||||
public int l4_proto; //第四层协议ID,如果没有,为-1
|
||||
public int tos; //Tos,一个字节,如果没有,为-1
|
||||
public int vlan_id; //vlan ID,如果没有,为-1
|
||||
|
||||
public long bytes; //字节总数
|
||||
public long packets; //数据包总数
|
||||
|
||||
public long packets_syn; //TCP同步包数
|
||||
public long packets_syn_ack; //TCP同步确认包数
|
||||
public long packets_syn_rst; //TCP同步重置包数
|
||||
|
||||
public long timestamp; //时间戳,秒
|
||||
|
||||
public long appid;//应用ID
|
||||
public long app_group_id;
|
||||
public int mpls_label;
|
||||
|
||||
public long pkts_syn_rx; //tcp同步包,接收
|
||||
public long pkts_syn_ack_rx;//tcp同步确认包,接收
|
||||
public long pkts_syn_rst_rx; //tcp同步重置包,接收
|
||||
public long pkts_fin;//tcp终止包,接收
|
||||
public long pkts_rst;//tcp重置包,接收
|
||||
|
||||
public long bytes_rx;//字节收
|
||||
public long packets_rx;//数据包收
|
||||
|
||||
public long probe_time_sec;
|
||||
public Date probe_time;
|
||||
public Date createTime;
|
||||
|
||||
public int probe_if;
|
||||
|
||||
@Override
|
||||
public PacketBase Parse(PacketHeader header,ByteBuffer data)
|
||||
throws Exception {
|
||||
SstFlow sstFlow = new SstFlow();
|
||||
sstFlow.mac_src = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
sstFlow.mac_dst = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
sstFlow.ip_src = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
sstFlow.ip_dst = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
sstFlow.port_src = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
sstFlow.port_dst = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
sstFlow.l3_proto = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
sstFlow.l4_proto = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
sstFlow.tos = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
sstFlow.vlan_id = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
|
||||
sstFlow.bytes = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
sstFlow.packets = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
sstFlow.packets_syn = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
sstFlow.packets_syn_ack = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
sstFlow.packets_syn_rst = ByteUtil.getLong(data, offset, 8);
|
||||
offset += 8;
|
||||
sstFlow.appid = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
sstFlow.app_group_id = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
sstFlow.mpls_label = ByteUtil.getInteger(data, offset, 4);
|
||||
offset += 4;
|
||||
sstFlow.timestamp = header.getTimestamp();
|
||||
sstFlow.probe_if = header.getProbe_id();
|
||||
return sstFlow;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUnitPacketLength() {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getData() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInterfaceNumber() {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
|
@ -100,10 +100,10 @@ public class Server {
|
|||
done = new RaftClosure();
|
||||
|
||||
System.setProperty("server.port", sprPort);
|
||||
SpringApplication.run(Server.class, args);
|
||||
|
||||
var app = SpringApplication.run(Server.class, args);
|
||||
app.start();
|
||||
|
||||
|
||||
// node.shutdown(done);
|
||||
node.shutdown(done);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ import com.alipay.sofa.jraft.Closure;
|
|||
import com.alipay.sofa.jraft.entity.Task;
|
||||
|
||||
import com.yuandian.dataflow.Server;
|
||||
import com.yuandian.dataflow.entity.Response;
|
||||
import com.yuandian.dataflow.projo.Response;
|
||||
import com.yuandian.dataflow.statemachine.RaftClosure;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -38,13 +38,8 @@ public class TaskLog {
|
|||
|
||||
Task task = new Task();
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
log.error(node.toString());
|
||||
|
||||
|
||||
log.error(node.toString());
|
||||
|
||||
RaftClosure done = new RaftClosure();
|
||||
task.setData(ByteBuffer.wrap("hello".getBytes()));
|
||||
|
@ -55,8 +50,9 @@ public class TaskLog {
|
|||
|
||||
|
||||
Response response = new Response();
|
||||
response.code = HttpStatus.OK;
|
||||
response.message = HttpStatus.OK.toString();
|
||||
|
||||
response.Code = HttpStatus.OK;
|
||||
response.Message = HttpStatus.OK.toString();
|
||||
return new ResponseEntity<Response>(response, HttpStatus.OK);
|
||||
}
|
||||
|
||||
|
|
72
src/main/java/com/yuandian/dataflow/master/Header.java
Normal file
72
src/main/java/com/yuandian/dataflow/master/Header.java
Normal file
|
@ -0,0 +1,72 @@
|
|||
package com.yuandian.dataflow.master;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketException;
|
||||
import java.nio.Buffer;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import com.yuandian.dataflow.utils.PacketHeader;
|
||||
|
||||
import lombok.Cleanup;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Header
|
||||
*/
|
||||
@Slf4j
|
||||
|
||||
public class Header {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ArrayList<Thread> threads = new ArrayList<Thread>() ;
|
||||
|
||||
for(int i = 0 ; i < 100 ; i++) {
|
||||
Thread thread = new Thread(() -> {
|
||||
|
||||
try {
|
||||
var addr = new InetSocketAddress("192.168.1.248", 60001);
|
||||
@Cleanup
|
||||
var sock = new Socket();
|
||||
sock.setReceiveBufferSize(10 * 1024 * 1024);// socket接收缓存
|
||||
sock.setSoTimeout(1000 * 30);
|
||||
// 设置超时
|
||||
sock.connect(addr, 10 * 1000);
|
||||
var in = new DataInputStream(sock.getInputStream());
|
||||
var out = new DataOutputStream(sock.getOutputStream());
|
||||
// 发送验证字符串
|
||||
out.write("public".getBytes());
|
||||
// var buf = ByteBuffer.wrap( in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN);
|
||||
log.error("{}", PacketHeader.PacketCode(in));
|
||||
var pheader = new PacketHeader(in);
|
||||
|
||||
// buf = ByteBuffer.wrap(in.readNBytes(12)).order(ByteOrder.LITTLE_ENDIAN);
|
||||
log.error("{}", pheader);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}) ;
|
||||
threads.add(thread);
|
||||
thread.start();
|
||||
}
|
||||
|
||||
threads.forEach((t)->{
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.yuandian.dataflow.entity;
|
||||
package com.yuandian.dataflow.projo;
|
||||
|
||||
|
||||
|
|
@ -1,21 +1,14 @@
|
|||
package com.yuandian.dataflow.entity;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
package com.yuandian.dataflow.projo;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class Response {
|
||||
@JsonProperty("code")
|
||||
public HttpStatus code;
|
||||
public HttpStatus Code;
|
||||
@JsonProperty("message")
|
||||
public String message ;
|
||||
public String Message;
|
||||
@JsonProperty("data")
|
||||
public Object data ;
|
||||
|
||||
|
||||
public Object Data;
|
||||
}
|
55
src/main/java/com/yuandian/dataflow/utils/PacketHeader.java
Normal file
55
src/main/java/com/yuandian/dataflow/utils/PacketHeader.java
Normal file
|
@ -0,0 +1,55 @@
|
|||
/**
|
||||
* description
|
||||
*
|
||||
* @author eson
|
||||
*2022年6月07日-11:09:21
|
||||
*/
|
||||
package com.yuandian.dataflow.utils;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
|
||||
/**
|
||||
* description
|
||||
*
|
||||
* @author eson
|
||||
*2022年6月07日-11:09:21
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
public class PacketHeader {
|
||||
|
||||
private int tableID = -1; //数据类型
|
||||
private int recCount = 0; //数据总条数
|
||||
private int msgLen; //数据报文总长度,60010端口为压缩后长度
|
||||
private long timestamp; //60010端口发送数据时间
|
||||
private int probeIf; //60010端口抓包口
|
||||
private int umcomprLen;//60010端口数据报文压缩前长度
|
||||
private int nowType; //记录25类型数据数据当前数据类型 22 23 24
|
||||
|
||||
public PacketHeader(DataInputStream data) throws Exception {
|
||||
this.tableID = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();//22
|
||||
this.recCount = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();//4000
|
||||
this.msgLen = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
|
||||
}
|
||||
|
||||
public void parseNextHeader_60010(DataInputStream data) throws Exception {
|
||||
this.timestamp = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getLong();
|
||||
this.probeIf =ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
|
||||
this.umcomprLen = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
|
||||
}
|
||||
|
||||
public static int PacketCode(DataInputStream data) throws IOException {
|
||||
var buf = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN);
|
||||
return buf.getInt();
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,26 +0,0 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package com.yuandian.dataflow.rpc;
|
||||
// option java_package = "com.yuandian.dataflow.rpc";
|
||||
|
||||
// option java_outer_classname = "com.yuandian.dataflow.rpc";
|
||||
option java_multiple_files = false;//以非外部类模式生成
|
||||
|
||||
service DataFlow {
|
||||
rpc Update (State) returns (Response);
|
||||
}
|
||||
|
||||
message State {
|
||||
map<int32, QueueState> QueueMap = 1;
|
||||
|
||||
}
|
||||
|
||||
message QueueState {
|
||||
int32 Size = 1;
|
||||
}
|
||||
|
||||
message Response {
|
||||
int32 Code = 1;
|
||||
string Message = 2;
|
||||
bytes Data = 3;
|
||||
}
|
|
@ -25,7 +25,7 @@ import org.springframework.expression.spel.ast.FunctionReference;
|
|||
|
||||
import com.mongodb.MongoClient;
|
||||
import com.mongodb.client.model.InsertManyOptions;
|
||||
import com.yuandian.dataflow.entity.Doc;
|
||||
import com.yuandian.dataflow.projo.Doc;
|
||||
|
||||
import io.netty.handler.codec.dns.DatagramDnsQuery;
|
||||
import lombok.Cleanup;
|
||||
|
|
Loading…
Reference in New Issue
Block a user