From 6e7a35b76d61a50f4dc0dd33d98695c2c450cebf Mon Sep 17 00:00:00 2001
From: eson <eson.hsm@nonolive.com>
Date: Tue, 7 Jun 2022 14:03:03 +0800
Subject: [PATCH] =?UTF-8?q?TODO:=20=E4=BA=86=E8=A7=A3=E6=8E=A2=E9=92=88?=
 =?UTF-8?q?=E7=9A=84=E6=8E=A5=E6=94=B6=E6=95=B0=E6=8D=AE=E8=BF=87=E7=A8=8B?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../.proto/decode/ApmBaseDataFlow.java        | 287 +++++++++++++
 .../dataflow/.proto/decode/AppFlow.java       | 101 +++++
 .../.proto/decode/BacktrackingFlow.java       |  98 +++++
 .../.proto/decode/BasicTrafficFlow.java       |  82 ++++
 .../.proto/decode/BusinessBodyData.java       |  29 ++
 .../dataflow/.proto/decode/BussFlowDb.java    | 244 +++++++++++
 .../.proto/decode/BussFlowExternal.java       | 132 ++++++
 .../dataflow/.proto/decode/BussFlowMidd.java  | 267 ++++++++++++
 .../dataflow/.proto/decode/BussFlowOrl.java   | 111 +++++
 .../dataflow/.proto/decode/BussFlowWeb.java   | 362 +++++++++++++++++
 .../dataflow/.proto/decode/DataBaseModel.java | 379 ++++++++++++++++++
 .../dataflow/.proto/decode/DataFlow.java      | 160 ++++++++
 .../dataflow/.proto/decode/PacketBase.java    |  27 ++
 .../dataflow/.proto/decode/PacketHeader.java  |  55 +++
 .../dataflow/.proto/decode/QoeFlow.java       | 156 +++++++
 .../dataflow/.proto/decode/SstFlow.java       | 114 ++++++
 .../java/com/yuandian/dataflow/Server.java    |   6 +-
 .../yuandian/dataflow/controller/TaskLog.java |  14 +-
 .../com/yuandian/dataflow/master/Header.java  |  72 ++++
 .../dataflow/{entity => projo}/Doc.java       |   2 +-
 .../dataflow/{entity => projo}/Response.java  |  19 +-
 .../yuandian/dataflow/utils/PacketHeader.java |  55 +++
 src/main/proto/scheduler.proto                |  26 --
 .../java/com/yuandian/dataflow/AppTest.java   |   2 +-
 24 files changed, 2747 insertions(+), 53 deletions(-)
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/ApmBaseDataFlow.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/AppFlow.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/BacktrackingFlow.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/BasicTrafficFlow.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/BusinessBodyData.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowDb.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowExternal.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowMidd.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowOrl.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowWeb.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/DataBaseModel.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/DataFlow.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/PacketBase.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/PacketHeader.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/QoeFlow.java
 create mode 100644 src/main/java/com/yuandian/dataflow/.proto/decode/SstFlow.java
 create mode 100644 src/main/java/com/yuandian/dataflow/master/Header.java
 rename src/main/java/com/yuandian/dataflow/{entity => projo}/Doc.java (95%)
 rename src/main/java/com/yuandian/dataflow/{entity => projo}/Response.java (52%)
 create mode 100644 src/main/java/com/yuandian/dataflow/utils/PacketHeader.java
 delete mode 100644 src/main/proto/scheduler.proto

diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/ApmBaseDataFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/ApmBaseDataFlow.java
new file mode 100644
index 0000000..def7133
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/ApmBaseDataFlow.java
@@ -0,0 +1,287 @@
+package com.yuandian.dataflow.proto.decode;
+
+ 
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * @author User
+ *
+ */
+public class ApmBaseDataFlow extends PacketBase {
+	private static Logger logger = LoggerFactory.getLogger(ApmBaseDataFlow.class);
+	public static int SIZE = 156;//88;//
+	/**
+	 * 抓包口
+	 */
+	public long probeIf;
+
+	
+	//四元组
+	/**
+	 * 请求端口
+	 */
+	public int requestPort;
+	/**
+	 * 响应端口
+	 */
+	public int responsePort;
+	/**
+	 * 请求IP
+	 */
+	public long requestIp;
+	/**
+	 * 响应Ip
+	 */
+	public long responseIp;
+
+	/**
+	 * 源mac
+	 */
+	public long srcMac;
+
+	/**
+	 * 目的mac
+	 */
+	public long dstMac;
+
+	/**
+	 * 链路编号
+	 */
+	public long vlanId;
+	
+	public long tvSec;
+	public long tvUsec;
+	
+	/**
+	 * 开始时间
+	 */
+	public long startTm;
+
+	/**
+	 * 总字节数
+	 */
+	public long totalBytes;
+
+	/**
+	 * 总包数
+	 */
+	public long totalPackets;
+	/**
+	 * 总丢包数
+	 */
+	public long totalDropPackets;
+	/**
+	 * 重传延时
+	 */
+	public long retranTimeDelay;
+	/**
+	 * 客户端rtt
+	 */
+	public long clientRtt;
+	
+	/**
+	 * 服务端Rtt
+	 */
+	public long serverRtt;
+	
+	
+	/**
+	 * 用户响应时间
+	 */
+	public long userResponseTime;
+	/**
+	 * 服务响应时间
+	 */
+	public long serverResponseTime;
+	/**
+	 * tcp回话连接失败数
+	 */
+	public long conFail;
+
+	/**
+	 * 会话重置数
+	 */
+//	public long reset;
+	
+	/**
+	 * 服务端总字节数
+	 */
+	public long bytesIn;
+	/**
+	 * 客户端总字节数
+	 */
+	public long bytesOut;
+	/**
+	 * 探针推送时间
+	 */
+	public long timeFlag;
+	
+	/**
+	 * 结束时间
+	 */
+	public long endTm;
+	
+	/**
+	 * 结束时间微秒
+	 */
+	public long endTmUsec;
+	
+	/**
+	 * 总响应数
+	 */
+	public long	responNum;
+	/**
+	 * 客户端零窗口数
+	 */
+	public long	csWindow;
+	/**
+	 * 服务端零窗口数
+	 */
+	public long	scWindow;
+	/**
+	 * 客户端重置数
+	 */
+	public long	csReset;
+	/**
+	 * 服务端重置数
+	 */
+	public long	scReset;
+	/**
+	 * 客户端重传数
+	 */
+	public long	csRetran;
+	/**
+	 * 服务端重传数
+	 */
+	public long	scRetran;
+	/**
+	 * 会话建立时间
+	 */
+	public long	connSetupTm;
+	/**
+	 * 新建会话数
+	 */
+	public long	newSession;
+	
+	public long csAlert;
+	
+	public long scAlert;
+
+	public String protocal;
+	
+	
+	
+	@Override
+	public PacketBase  Parse(PacketHeader header,ByteBuffer data) throws Exception {
+		ApmBaseDataFlow oneData = new ApmBaseDataFlow();		
+		oneData.setPacketHeader(header);
+		
+		oneData.probeIf = data.getLong(4);
+		
+		
+		oneData.requestPort = data.getInt( 2);
+		
+		oneData.responsePort = data.getInt( 2);
+		
+		oneData.requestIp = data.getLong(4);
+		
+		oneData.responseIp = data.getLong(4);
+		
+		
+		oneData.srcMac = data.getLong(6);
+		
+		oneData.dstMac = data.getLong(6);
+		
+		oneData.vlanId = data.getLong(4);
+		
+
+		oneData.tvSec = data.getLong(4);
+		
+		oneData.tvUsec = data.getLong(4);
+		
+		
+//		oneData.startTm = data.getLong(8);
+
+		
+		oneData.totalBytes = data.getLong(4);
+		
+		oneData.totalPackets = data.getLong(4);
+		
+		oneData.totalDropPackets = data.getLong(4);
+		
+		oneData.retranTimeDelay = data.getLong(4);
+		
+		oneData.clientRtt = data.getLong(4);
+		
+		//服务端Rtt
+		oneData.serverRtt = data.getLong(4);
+		
+		oneData.userResponseTime = data.getLong(4);
+		
+		oneData.serverResponseTime = data.getLong(4);
+		
+		oneData.conFail = data.getLong(4);
+		
+		
+//		oneData.reset = data.getLong(4);
+
+		oneData.bytesIn = data.getLong(4);
+		
+		oneData.bytesOut = data.getLong(4);
+		
+		oneData.timeFlag = data.getLong(4);
+		
+		
+		oneData.endTm = data.getLong(4);
+	
+		oneData.endTmUsec = data.getLong(4);
+	
+		
+		oneData.responNum = data.getLong(4);
+		
+		oneData.csWindow  = data.getLong(4);
+		
+		oneData.scWindow  = data.getLong(4);
+		
+		oneData.csReset   = data.getLong(4);
+		
+		oneData.scReset   = data.getLong(4);
+		
+		oneData.csRetran  = data.getLong(4);
+		
+		oneData.scRetran  = data.getLong(4);
+		
+		oneData.connSetupTm = data.getLong(4);
+		
+		oneData.newSession = data.getLong(4);
+		
+		oneData.csAlert = data.getLong(4);
+		
+		oneData.scAlert = data.getLong(4);
+
+		
+		oneData.protocal = new String(data.array());
+		
+		return oneData;
+	}
+
+	@Override
+	public String[] getData() {
+		return null; 
+	}
+
+	@Override
+	public int getInterfaceNumber() {
+		return 0;
+	}
+
+	@Override
+	public int getUnitPacketLength() {
+		return 0;
+	}
+
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/AppFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/AppFlow.java
new file mode 100644
index 0000000..ee96ef9
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/AppFlow.java
@@ -0,0 +1,101 @@
+package com.yuandian.dataflow.proto.decode;
+import java.nio.ByteBuffer;
+ 
+
+public class AppFlow extends PacketBase {
+	public static int SIZE = 104;
+	public long srcIp;
+	public int srcPort;
+	public long dstIp;
+	public int dstPort;
+	public long startTvSec;
+	public long startTvUsec;
+	public long lastTvSec;
+	public long lastTvUsec;
+	public long endTvSec;
+	public long endTvUsec;
+	public int inputPackets;
+	public int outputPackets;
+	public int inputBytes;
+	public int outputBytes;
+	public String protocaol;
+	public int appId;
+	public int appGroupId;
+	public int probeIf;
+	public int appStyle;
+	public long timeFlag;
+	public int vlanId;
+	public int mplsLable;
+	public int tos;
+	
+
+	@Override
+	public PacketBase  Parse(PacketHeader header,ByteBuffer data)
+			throws Exception {
+		AppFlow nlf = new AppFlow();
+		nlf.srcIp = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		nlf.srcPort = ByteUtil.getInteger(data, offset, 2);
+		offset += 2;
+		nlf.dstIp = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		nlf.dstPort = ByteUtil.getInteger(data, offset, 2);
+		offset += 2;
+		nlf.startTvSec = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		nlf.startTvUsec = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		nlf.lastTvSec = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		nlf.lastTvUsec = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		nlf.endTvSec = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		nlf.endTvUsec = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		nlf.inputPackets = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		nlf.outputPackets = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		nlf.inputBytes = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		nlf.outputBytes = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		nlf.protocaol = ByteUtil.getString(data, offset, 20);
+		offset += 20;
+		nlf.appId = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		nlf.appGroupId = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		nlf.probeIf = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		nlf.appStyle = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		nlf.timeFlag = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		nlf.vlanId = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		nlf.mplsLable = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		nlf.tos = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		
+		return nlf;
+	}
+
+	@Override
+	public int getUnitPacketLength() {
+		return 0;
+	}
+
+	@Override
+	public String[] getData() {
+		return null;
+	}
+
+	@Override
+	public int getInterfaceNumber() {
+		return 0;
+	}
+	
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BacktrackingFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BacktrackingFlow.java
new file mode 100644
index 0000000..fd7757b
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BacktrackingFlow.java
@@ -0,0 +1,98 @@
+package com.yuandian.dataflow.proto.decode;
+
+ 
+
+/**
+ * @author User
+ *
+ */
+public class BacktrackingFlow extends PacketBase{
+	public static int SIZE = 108;
+	//tuple	StatisTuple10	56	10元组信息
+	private long macSrc;          //源MAC  8 
+	private long macDst;          //目的MAC 8
+	private long ipSrc;              //源IP 8
+	private long ipDst;              //目的IP 8
+	private long portSrc;           //源端口,如果没有,为-1
+	private long portDst;           //目标端口,如果没有,为-1
+	private int l3Proto;           //第三层协议ID,如果没有,为-1
+	private int l4Proto;          //第四层协议ID,如果没有,为-1
+	private int tos;                 //Tos,如果没有,为-1
+	private int vlanId;            //vlan ID,如果没有,为-1
+	
+	private long bytes;	//	8	字节总数
+	private long packets;	//	8	数据包总数
+	private long tcpSp;	//	8	tcp同步包数
+	private long tcpScpn;//	8	tcp同步确认包数
+	private long tcpSrp;	//	8	tcp同步重置包数
+	private long appId;	//	4	appID
+	private long appGroupId;//	4	app组ID
+	private long mplsLabel;//	4	
+
+
+	
+	
+	
+	@Override
+	public PacketBase  Parse(PacketHeader header,ByteBuffer data) throws Exception {
+		BacktrackingFlow backFlow = new BacktrackingFlow();
+		backFlow.macSrc = ByteUtil.getLong(data, offset, 8);          //源MAC  8
+		offset+=8;
+		backFlow.macDst = ByteUtil.getLong(data, offset, 8);          //目的MAC 8
+		offset+=8;
+		backFlow.ipSrc = ByteUtil.getLong(data, offset, 8);              //源IP 8
+		offset+=8;
+		backFlow.ipDst = ByteUtil.getLong(data, offset, 8);              //目的IP 8
+		offset+=8;
+		backFlow.portSrc = ByteUtil.getInteger(data, offset, 4);           //源端口,如果没有,为-1
+		offset+=4;
+		backFlow.portDst = ByteUtil.getInteger(data, offset, 4);           //目标端口,如果没有,为-1
+		offset+=4;
+		backFlow.l3Proto = ByteUtil.getInteger(data, offset, 4);           //第三层协议ID,如果没有,为-1
+		offset+=4;
+		backFlow.l4Proto = ByteUtil.getInteger(data, offset, 4);          //第四层协议ID,如果没有,为-1
+		offset+=4;
+		backFlow.tos = ByteUtil.getInteger(data, offset, 4);                 //Tos,如果没有,为-1
+		offset+=4;
+		backFlow.vlanId = ByteUtil.getInteger(data, offset, 4);            //vlan ID,如果没有,为-1
+		offset+=4;
+		
+		backFlow.bytes = ByteUtil.getLong(data, offset, 8);	//	8	字节总数
+		offset+=8;
+		backFlow.packets = ByteUtil.getLong(data, offset, 8);	//	8	数据包总数
+		offset+=8;
+		backFlow.tcpSp = ByteUtil.getLong(data, offset, 8);	//	8	tcp同步包数
+		offset+=8;
+		backFlow.tcpScpn = ByteUtil.getLong(data, offset, 8);//	8	tcp同步确认包数
+		offset+=8;
+		backFlow.tcpSrp = ByteUtil.getLong(data, offset, 8);	//	8	tcp同步重置包数
+		offset+=8;
+		backFlow.appId = ByteUtil.getInteger(data, offset, 4);	//	4	appID
+		offset+=4;
+		backFlow.appGroupId = ByteUtil.getInteger(data, offset, 4);//	4	app组ID
+		offset+=4;
+		backFlow.mplsLabel = ByteUtil.getInteger(data, offset, 4);//	4	
+		offset+=4;
+		
+		return backFlow;
+	}
+
+	@Override
+	public int getUnitPacketLength() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	@Override
+	public String[] getData() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public int getInterfaceNumber() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+	
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BasicTrafficFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BasicTrafficFlow.java
new file mode 100644
index 0000000..4064dc1
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BasicTrafficFlow.java
@@ -0,0 +1,82 @@
+package com.yuandian.dataflow.proto.decode;
+
+ 
+
+public class BasicTrafficFlow extends PacketBase {
+	public static int SIZE = 56;
+	public long capPort;
+	public int requestPort;
+	public int responsePort;
+	public long requestIp;
+	public long responseIp;
+	public long startTime;
+	public long totalBytes;
+	public long totalPackets;
+	public long spackets64;
+	public long spackets128;
+	public long spackets256;
+	public long spackets512;
+	public long spackets1024;
+	public long spackets;
+	public long sendTime;
+	
+	@Override
+	public PacketBase  Parse(PacketHeader header,ByteBuffer data) throws Exception {
+		BasicTrafficFlow btf = new BasicTrafficFlow();
+		btf.setPacketHeader(header);
+		
+		btf.capPort = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		
+//		btf.requestPort = ByteUtil.getInteger(data, offset, 2);
+		offset += 2;		
+//		btf.responsePort = ByteUtil.getInteger(data, offset, 2);
+		offset += 2;		
+//		btf.requestIp = ByteUtil.getLong(data, offset, 4);
+		offset += 4;			
+//		btf.responseIp = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		
+		btf.startTime = ByteUtil.getLong(data, offset, 4);
+		offset += 4;		
+		btf.totalBytes = ByteUtil.getLong(data, offset, 4);
+		offset += 4;		
+		btf.totalPackets = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		btf.spackets64 = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		btf.spackets128 = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		btf.spackets256 = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		btf.spackets512 = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		btf.spackets1024 = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		btf.spackets = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		btf.sendTime = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		
+		return btf;
+	}
+	@Override
+	public String[] getData() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+	@Override
+	public int getInterfaceNumber() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+	@Override
+	public int getUnitPacketLength() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+	
+	
+	
+	
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BusinessBodyData.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BusinessBodyData.java
new file mode 100644
index 0000000..d8e5f6f
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BusinessBodyData.java
@@ -0,0 +1,29 @@
+package com.yuandian.dataflow.proto.decode;
+
+
+public class BusinessBodyData {
+	
+	public String relvanceDataId;
+	public long request_ip;
+	public int request_port;
+	public long response_ip;
+	public int response_port;
+	
+	public long start_tv_sec;//开始时间秒
+	public long start_tv_usec;//开始时间毫秒
+	public long end_tv_sec;//结束时间秒
+	public long end_tv_usec;//结束时间微妙
+	
+	public String resource_code;
+	public String noParameterRecognition;
+	public String originalRecognition;
+	public String requestCookie;
+	
+	public String requestBodyContext;
+	public String responseBodyContext;
+	
+	public int filterId;
+	public String business_detail_mesg;
+	
+	
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowDb.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowDb.java
new file mode 100644
index 0000000..21731ec
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowDb.java
@@ -0,0 +1,244 @@
+package com.yuandian.dataflow.proto.decode;
+
+import com.ud.module.common.SystemConfigManager;
+ 
+
+
+public class BussFlowDb extends PacketBase {
+	private int UNIT_DATA_LENGTH = 3362;  
+
+	// redis资源归并,处理服务资源发现是需要设置识别串(正则表达式)
+	public String redisRegex;
+
+	public String id; //id
+	public int msg_len;                 //消息长度
+	public int msg_type;                //消息类型
+	public long src_mac;
+	public long dst_mac;
+	public int protocol;                //协议名
+	public String session_serial_number;//会话序列号			
+	public String  buss_type;//业务服务资源编码(C_01)
+	public long request_ip;//Web客户端IP
+	public int request_port;//Web客户端端口
+	public long  response_ip;//Web服务器IP
+	public int response_port;//Web服务器端口
+	public long start_tv_sec;//Web开始时间秒
+	public long start_tv_usec;//开始时间毫秒
+	public long end_tv_sec;//结束时间秒
+	public long end_tv_usec;//结束时间微妙
+	public String disc_resource_ident;    //web:url   mid:apiBody   db:sql
+	public String name;               //web:操作系统   midd:interfaceName  db:db_name
+	//web midd
+	public String session_id;//sessionid
+	public int request_msg_length;  //请求报文长度
+	public String request_msg_detail;   //请求报文详情
+	public int response_msg_length; //响应报文长度
+	public String response_msg_detail;  //响应报文详情
+	//web段
+	public String reter_url;           
+	public String x_requested_with;
+	public long req_method;         //请求方式
+	public String content_type;     //请求类型
+	public String accept;          //jieshou
+	public int req_cookie_leng;//请求cookie报文长度
+	public String req_cookie_detail;//请求cookie报文详情
+	public long t_intodb_time;
+	public int load_or_step;        //0: 页面   1:加载项   2:非web段数据 
+	public String business_detail_mesg;
+	public String bussiness_key_mesg;  //关键字  格式:key=val|key=val....
+	public int isUncomplete;    //组包是否完全    0:组包完整  1:不完整
+	public int deal_state=1;
+	public int server_res_code;
+	public long server_response_time;
+	public long client_translate_time;
+	public String browser;
+	public int server_start_tv_sec;
+	public long server_start_tv_usec;
+	public int server_end_tv_sec;
+	public long server_end_tv_usec;
+	public String probe_ip;
+	public long probe_if;
+	public long server_translate_time;
+	public long time_flag;	
+	public String base_code;
+	public String ori_sql;
+	public String reserved;
+	public long bytes_in;
+	public long bytes_out;
+	public int package_in;
+	public int package_out;
+	public String  dataId;
+	public int filterId;
+	@Override
+	public int getUnitPacketLength() {
+		// TODO Auto-generated method stub
+		return UNIT_DATA_LENGTH;
+	}
+
+	@SuppressWarnings("unchecked")
+	public BussFlowDb  Parse(PacketHeader header,ByteBuffer data)
+			throws Exception {
+		BussFlowDb buss = new BussFlowDb();		
+//		buss.m_Header = header;
+		
+ 		buss.msg_type = header.getTableID();
+//		offset += 4;
+//		buss.msg_version = ByteUtil.getInteger(data, offset, 1);
+//		offset += 1;
+//		buss.msg_seq = ByteUtil.getInteger(data, offset, 4);
+//		offset += 4;
+//		buss.msg_len = header.getMsg_len();
+//		offset += 4;
+		
+//		UNIT_DATA_LENGTH=msg_len;
+		buss.src_mac = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+		buss.dst_mac = ByteUtil.getLong(data, offset, 8);
+		if(SystemConfigManager.macFilterFlag && buss.src_mac!=0 && buss.dst_mac==0){
+			return null;
+		}
+		offset += 8;
+//		buss.vlan_id = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		buss.tos = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		buss.retran_count = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		buss.reset_count = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		buss.zerowin_count = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.protocol=ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.bytes_out = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		buss.bytes_in = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		buss.package_out = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.package_in = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.session_serial_number = ByteUtil.getString(data, offset, 24);
+		offset += 24;
+//		buss.recog_status = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;		
+		buss.probe_if=ByteUtil.getInteger(data, offset,4);
+		offset += 4;
+//		buss.channel=ByteUtil.getString(data, offset, 24);
+		offset+=24;
+		buss.name = ByteUtil.getString(data, offset, 64);
+		offset+=64;
+		
+		buss.request_ip=ByteUtil.getLong(data, offset,4);
+		offset += 4;
+		buss.request_port=ByteUtil.getInteger(data, offset,4);
+		offset += 4;
+		buss.response_ip=ByteUtil.getLong(data, offset,4);
+		offset += 4;
+		buss.response_port=ByteUtil.getInteger(data, offset,4);
+		offset += 4;
+//		buss.deal_state=ByteUtil.getInteger(data, offset,8);
+		offset += 8;
+		buss.server_translate_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+		buss.client_translate_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+		buss.server_response_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+		buss.server_start_tv_sec=ByteUtil.getInteger(data, offset,4);
+		offset +=4;
+		buss.server_start_tv_usec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		//结束时间
+		buss.server_end_tv_sec=ByteUtil.getInteger(data, offset,4);
+		offset +=4;
+		buss.server_end_tv_usec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		buss.start_tv_sec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		buss.start_tv_usec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		//结束时间
+		buss.end_tv_sec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		buss.end_tv_usec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		buss.isUncomplete = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.time_flag=ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		
+		int sqlLen = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		int detailMsgLen = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.request_msg_length = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.response_msg_length = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		int reservedLen = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		
+		buss.ori_sql=ByteUtil.getString(data, offset,sqlLen);
+		offset += sqlLen;
+		buss.business_detail_mesg=ByteUtil.getString(data, offset, detailMsgLen);
+		offset += detailMsgLen;
+		buss.request_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.request_msg_length);
+		offset += buss.request_msg_length;		
+		buss.response_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.response_msg_length);
+		offset += buss.response_msg_length;
+		buss.reserved = ByteUtil.getString(data, offset, reservedLen);
+		offset += reservedLen;
+		
+		/*if(SystemConfigManager.projectName.contains("GDYDBPM")){
+			
+			boolean exist=false;
+			if(SystemConfigManager.reduceFlag){
+				
+				redisTemplate = (RedisTemplate<String, String>) ApplicationContextUtil.getBean("redisTemplate");
+				exist = reduceByKey(buss);
+			}
+			
+			//不存在就加入缓存
+			if(!exist)return buss;
+			//存在就返回空
+			return null;
+		}*/
+		
+		return buss;
+	}
+
+
+	public String getBussType() {
+		return buss_type;
+	}
+
+	@Override
+	public String[] getData() {
+		String data[] = new String[45];
+		return data;
+	}
+
+	@Override
+	public int getInterfaceNumber() {
+		return 0;
+	}
+	
+	/**
+	 * 如果已经存在这个session_id(即key),则说明是重复数据,不再进行后续操作
+	 * @param buss
+	 * @return 存在返回true 不存在返回false
+	 */
+	/*private boolean reduceByKey(BussFlowDb buss) {
+		String coding = CodingGeneration.md5GenCoding2(buss.ori_sql);
+		String key=buss.session_serial_number+"|"+buss.request_ip+"|"+buss.request_port+"|"+buss.response_ip+"|"+buss.response_port+"|"+coding+"|"+buss.business_detail_mesg;
+		
+		//分布式同步锁,如果存在该key或者锁已经被获取则返回false,设置
+		if(redisTemplate.opsForValue().setIfAbsent(key, System.currentTimeMillis()+"")){
+			redisTemplate.expire(key,SystemConfigManager.keyExistTime,TimeUnit.SECONDS);
+			return false;
+		}
+		
+		return true;
+	}*/
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowExternal.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowExternal.java
new file mode 100644
index 0000000..1fbd515
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowExternal.java
@@ -0,0 +1,132 @@
+package com.yuandian.dataflow.proto.decode;
+
+import java.util.Date;
+
+ 
+
+
+public class BussFlowExternal extends PacketBase {
+	private final int UNIT_DATA_LENGTH =871; 
+
+	@Override
+	public int getUnitPacketLength() {
+		return UNIT_DATA_LENGTH;
+	}
+
+	public BussFlowExternal  Parse(PacketHeader header,ByteBuffer data)
+			throws Exception {
+		if (offset + UNIT_DATA_LENGTH > data.length)
+			throw new Exception("data length error!");
+		BussFlowExternal buss = new BussFlowExternal();		
+//		buss.m_Header = header;
+		
+		buss.probe_if=ByteUtil.getLong(data, offset,4);
+		offset += 4;
+		buss.channel=ByteUtil.getString(data, offset,24);
+		offset += 24;
+		buss.interfaceid=ByteUtil.getString(data, offset,100);
+		offset += 100;
+		buss.systemName=ByteUtil.getString(data, offset,32);
+		offset += 32;
+		buss.net_type=ByteUtil.getString(data, offset,6);
+		offset += 6;
+		buss.net_segment = ByteUtil.getString(data, offset, 5);
+		offset += 5;
+		buss.session_id=ByteUtil.getString(data, offset,80);
+		offset += 80;
+		buss.phoneid=ByteUtil.getString(data, offset,12);
+		offset += 12;
+	
+		buss.request_ip=ByteUtil.getLong(data, offset,4);
+		offset += 4;
+		buss.request_port=ByteUtil.getLong(data, offset,4);
+		offset += 4;
+		buss.response_ip=ByteUtil.getLong(data, offset,4);
+		offset += 4;
+		buss.response_port=ByteUtil.getLong(data, offset,4);
+		offset += 4;
+		//开始时间
+		buss.start_tv_sec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		buss.start_tv_usec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		//结束时间
+		buss.end_tv_sec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		buss.end_tv_usec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		
+		buss.deal_state=ByteUtil.getInteger(data, offset,4);
+		offset += 4;
+		buss.server_translate_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+		buss.server_response_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+		buss.begin_url=ByteUtil.getString(data, offset,100);
+		offset += 100;
+		buss.operating_sytem=ByteUtil.getString(data, offset,20);
+		offset += 20;
+		buss.server_res_code=ByteUtil.getInteger(data, offset,2);
+		offset += 2;
+		buss.browser=ByteUtil.getString(data, offset,30);
+		offset += 30;
+		buss.business_detail_mesg=ByteUtil.getString(data, offset,200);
+		offset += 200;
+		buss.business_involve_msg = ByteUtil.getString(data, offset,200);
+		offset += 200;
+		isUncomplete = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.time_flag=ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		return buss;
+	}
+	
+	public String id;
+	public Long  probe_if;//接口号
+	public String channel;//营业厅渠道:前台或者分析服务器给出,渠道标识
+	public String systemName;  //外部系统名称
+	public String interfaceid;// 业务接口编码
+	public String session_id;//sessionid
+	public String phoneid;//受理手机号码
+	public String net_type;
+	public String  net_segment;//网段标识(客户-web)
+	public Long request_ip;//Web客户端IP
+	public Long request_port;//Web客户端端口
+	public Long  response_ip;//Web服务器IP
+	public Long response_port;//Web服务器端口
+	
+	public  Long start_tv_sec;//Web开始时间秒
+	public Long start_tv_usec;//开始时间微秒
+	public Long end_tv_sec;//结束时间秒
+	public  Long end_tv_usec;//结束时间微妙
+	
+	public Integer deal_state;//Web操作成功/失败标识1成功0失败
+	public Long server_translate_time;//Web服务器传输耗时
+	public Long server_response_time;//Web服务器响应时间
+	public String begin_url;//url
+	public String operating_sytem;//操作系统
+	public Integer server_res_code; //Web系统返回码
+	public String browser;//浏览器
+	public String business_detail_mesg;//要获取的指标
+	public Date insert_time;//插入时间
+	public String business_involve_msg; //要关联的指标
+	public Integer isUncomplete;
+	public Long time_flag;
+
+
+	@Override
+	public String[] getData() {
+		String data[] = new String[37];
+		return data;
+	}
+
+	@Override
+	public int getInterfaceNumber() {
+		
+		return Integer.parseInt(probe_if.toString());
+	}
+
+	
+
+	
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowMidd.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowMidd.java
new file mode 100644
index 0000000..afc31e3
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowMidd.java
@@ -0,0 +1,267 @@
+package com.yuandian.dataflow.proto.decode;
+
+import java.util.concurrent.TimeUnit;
+
+
+import com.ud.module.common.ApplicationContextUtil;
+import com.ud.module.common.SystemConfigManager;
+ 
+import com.ud.module.util.code.CodingGeneration;
+
+public class BussFlowMidd extends PacketBase {
+	private  int UNIT_DATA_LENGTH =3569; 
+	
+	public String id; //id
+	public long src_mac;
+	public long dst_mac;
+	public int msg_len;                 //消息长度
+	public int msg_type;                //消息类型
+	public int protocol;                //协议名
+	public String session_serial_number;//会话序列号			
+	public String  buss_type;//业务服务资源编码(C_01)
+	public String  net_segment;//网段标识(客户-web)
+	public long request_ip;//Web客户端IP
+	public int request_port;//Web客户端端口
+	public long  response_ip;//Web服务器IP
+	public int response_port;//Web服务器端口
+	public long start_tv_sec;//Web开始时间秒
+	public long start_tv_usec;//开始时间毫秒
+	public long end_tv_sec;//结束时间秒
+	public long end_tv_usec;//结束时间微妙
+	public String disc_resource_ident;    //web:url   mid:apiBody   db:sql
+	public String name;               //web:操作系统   midd:interfaceName  db:db_name 
+	//web midd
+	public String session_id;//sessionid
+	public int request_msg_length;  //请求报文长度
+	public String request_msg_detail;   //请求报文详情
+	public int response_msg_length; //响应报文长度
+	public String response_msg_detail;  //响应报文详情
+	//web段
+	public String reter_url;           
+	public String x_requested_with;
+	public long req_method;         //请求方式
+	public String content_type;     //请求类型
+	public String accept;           //jieshou
+	public int req_cookie_leng;//请求cookie报文长度
+	public String req_cookie_detail;//请求cookie报文详情
+	public long t_intodb_time;
+	public int load_or_step;        //0: 页面   1:加载项   2:非web段数据 
+	public String business_detail_mesg;
+	public String bussiness_key_mesg;  //关键字  格式:key=val|key=val....
+	public int isUncomplete;    //组包是否完全    0:组包完整  1:不完整
+	public int deal_state=1;
+	public int server_res_code;
+	public long server_response_time;
+	public long client_translate_time;
+	public String browser;
+	public int server_start_tv_sec;
+	public long server_start_tv_usec;
+	public int server_end_tv_sec;
+	public long server_end_tv_usec;
+	public String probe_ip;
+	public int probe_if;
+	public long server_translate_time;
+	public long time_flag;	
+	public String channel;
+	public String base_code;
+	public String ori_api;
+	public String remain_data;
+	public long bytes_in;
+	public long bytes_out;
+	public int package_in;
+	public int package_out;
+	public String  dataId;
+	public int filterId;
+	
+	@Override
+	public int getUnitPacketLength() {
+		// TODO Auto-generated method stub
+		return UNIT_DATA_LENGTH;
+	}
+
+	public BussFlowMidd  Parse(PacketHeader header,ByteBuffer data)
+			throws Exception {
+		BussFlowMidd buss = new BussFlowMidd();		
+//		buss.m_Header = header;
+		
+ 		buss.msg_type = header.getTableID();
+//		offset += 4;
+//		buss.msg_version = ByteUtil.getInteger(data, offset, 1);
+//		offset += 1;
+//		buss.msg_seq = ByteUtil.getInteger(data, offset, 4);
+//		offset += 4;
+//		buss.msg_len = ByteUtil.getInteger(data, offset, 4);
+//		offset += 4;
+		
+//		UNIT_DATA_LENGTH=msg_len;
+		buss.src_mac = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+		buss.dst_mac = ByteUtil.getLong(data, offset, 8);
+		if(SystemConfigManager.macFilterFlag && buss.src_mac!=0 && buss.dst_mac==0){
+			return null;
+		}
+		offset += 8;
+//		buss.vlan_id = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		buss.tos = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		buss.retran_count = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		buss.reset_count = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		buss.zerowin_count = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.protocol=ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.bytes_out = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		buss.bytes_in = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		buss.package_out = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.package_in = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		
+		buss.session_serial_number = ByteUtil.getString(data, offset, 24);
+		offset += 24;
+//		buss.recog_status = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		buss.req_method = ByteUtil.getLong(data, offset, 4);
+//		offset += 4;
+//		buss.content_type = ByteUtil.getString(data, offset, 40);
+//		offset += 40;
+//		buss.accept = ByteUtil.getString(data, offset, 40);
+//		offset += 40;
+		buss.probe_if=ByteUtil.getInteger(data, offset,4);
+		offset += 4;
+//		buss.buss_type =ByteUtil.getString(data, offset,10);
+//		offset += 10;
+		buss.channel=ByteUtil.getString(data, offset,24);
+		offset += 24;
+//		buss.session_id=ByteUtil.getString(data, offset,36);
+//		offset += 36;
+//		buss.net_segment=ByteUtil.getString(data, offset,6);
+//		offset += 6;
+		buss.request_ip=ByteUtil.getLong(data, offset,4);
+		offset += 4;
+		buss.request_port=ByteUtil.getInteger(data, offset,4);
+		offset += 4;
+		buss.response_ip=ByteUtil.getLong(data, offset,4);
+		offset += 4;
+		buss.response_port=ByteUtil.getInteger(data, offset,4);
+		offset += 4;
+		//开始时间
+		buss.start_tv_sec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		buss.start_tv_usec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		//结束时间
+		buss.end_tv_sec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		buss.end_tv_usec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+//		buss.deal_state=ByteUtil.getInteger(data, offset,4);
+		offset +=4;
+		buss.server_res_code = ByteUtil.getInteger(data, offset, 2);
+		offset +=2;
+		buss.server_translate_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+		
+		buss.server_start_tv_sec=ByteUtil.getInteger(data, offset,4);
+		offset +=4;
+		buss.server_start_tv_usec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		//结束时间
+		buss.server_end_tv_sec=ByteUtil.getInteger(data, offset,4);
+		offset +=4;
+		buss.server_end_tv_usec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		
+		buss.server_response_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+		buss.client_translate_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+		buss.isUncomplete = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.time_flag=ByteUtil.getInteger(data, offset, 4);
+		offset +=4;
+		int detailMsgLen = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		int keyMsgLen = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		int apiLen = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.request_msg_length = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.response_msg_length = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		int remainLen = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		
+		buss.business_detail_mesg=ByteUtil.getString(data, offset,detailMsgLen);
+		offset += detailMsgLen;
+		buss.bussiness_key_mesg = ByteUtil.getString(data, offset, keyMsgLen);
+		offset += keyMsgLen;
+		buss.ori_api=ByteUtil.getString(data, offset, apiLen);
+		offset += apiLen;
+		buss.request_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.request_msg_length);
+		offset += buss.request_msg_length;
+		buss.response_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, buss.response_msg_length);
+		offset += buss.response_msg_length;
+//		buss.remain_data = ByteUtil.getString(data, offset, remainLen);
+		offset += remainLen;
+
+		
+		
+		/*if(SystemConfigManager.projectName.contains("GDYDBPM")){
+			boolean exist=false;
+			if(SystemConfigManager.reduceFlag){
+				
+				redisTemplate = (RedisTemplate<String, String>) ApplicationContextUtil.getBean("redisTemplate");
+				exist = reduceByKey(buss);
+			}
+			
+			//不存在就加入缓存
+			if(!exist)return buss;
+			//存在就返回空
+			return null;
+		}*/
+		
+		return buss;
+	}
+
+	public String getBussType() {
+		return buss_type;
+	}
+
+	@Override
+	public String[] getData() {
+		String data[] = new String[35];//贵阳
+		
+		return data;
+	}
+
+	@Override
+	public int getInterfaceNumber() {
+		
+		return 0;
+	}
+	
+	/**
+	 * 如果已经存在这个session_id(即key),则说明是重复数据,不再进行后续操作
+	 * @param buss
+	 * @return
+	 */
+	/*private boolean reduceByKey(BussFlowMidd buss) {
+		String coding = CodingGeneration.md5GenCoding2(buss.ori_api);
+		String key=buss.session_serial_number+"|"+buss.request_ip+"|"+buss.request_port+"|"+buss.response_ip+"|"+buss.response_port+"|"+coding+"|"+buss.business_detail_mesg;
+		
+		//分布式同步锁,如果存在该key或者锁已经被获取则返回false,设置
+		if(redisTemplate.opsForValue().setIfAbsent(key, System.currentTimeMillis()+"")){
+			redisTemplate.expire(key,SystemConfigManager.keyExistTime,TimeUnit.SECONDS);
+			return false;
+		}
+		
+		return true;
+	}*/
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowOrl.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowOrl.java
new file mode 100644
index 0000000..b41a7db
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowOrl.java
@@ -0,0 +1,111 @@
+package com.yuandian.dataflow.proto.decode;
+
+ 
+
+public class BussFlowOrl extends PacketBase {
+
+    public int msg_type;
+    public int msg_version;
+    public int msg_seq;
+    public int msg_len;
+    public long request_mac;
+    public long response_mac;
+    public long request_ip;
+    public int request_port;
+    public long response_ip;
+    public int response_port;
+    public int probeif;
+    public int protocol;
+    public long start_tv_sec;//Web开始时间秒
+    public long start_tv_usec;//开始时间毫秒
+    public long end_tv_sec;//结束时间秒
+    public long end_tv_usec;//结束时间微妙
+    public int req_len;
+    public int res_len;
+    public int busi_msg_len;
+    public int key_msg_len;
+    public int detail_msg_len;
+    public int remain_len;
+    public String business_code;
+    public String sessionid;
+    public String req_data;
+    public String res_data;
+    public String busi_msg;
+    public String busi_key_msg;
+    public String busi_detail_msg;
+    public String remain_data;
+
+    @Override
+    public PacketBase  Parse(PacketHeader header,ByteBuffer data) throws Exception {
+        BussFlowOrl orl = new BussFlowOrl();
+        orl.msg_type = header.getNow_type();
+        orl.request_mac = ByteUtil.getLong(data, offset, 8);
+        offset += 8;
+        orl.response_mac = ByteUtil.getLong(data, offset, 8);
+        offset += 8;
+        orl.request_ip = ByteUtil.getLong(data, offset, 4);
+        offset += 4;
+        orl.request_port = ByteUtil.getInteger(data, offset, 4);
+        offset += 4;
+        orl.response_ip = ByteUtil.getLong(data, offset, 4);
+        offset += 4;
+        orl.response_port = ByteUtil.getInteger(data, offset, 4);
+        offset += 4;
+        orl.probeif = ByteUtil.getInteger(data, offset, 4);
+        offset += 4;
+        orl.protocol = ByteUtil.getInteger(data, offset, 4);
+        offset += 4;
+        orl.start_tv_sec = ByteUtil.getLong(data, offset, 4);
+        offset += 4;
+        orl.start_tv_usec = ByteUtil.getLong(data, offset, 4);
+        offset += 4;
+        orl.end_tv_sec = ByteUtil.getLong(data, offset, 4);
+        offset += 4;
+        orl.end_tv_usec = ByteUtil.getLong(data, offset, 4);
+        offset += 4;
+        int req_len = ByteUtil.getInteger(data, offset, 4);
+        offset += 4;
+        int res_len = ByteUtil.getInteger(data, offset, 4);
+        offset += 4;
+        int busi_msg_len = ByteUtil.getInteger(data, offset, 4);
+        offset += 4;
+        int key_msg_len = ByteUtil.getInteger(data, offset, 4);
+        offset += 4;
+        int detail_msg_len = ByteUtil.getInteger(data, offset, 4);
+        offset += 4;
+        int remain_len = ByteUtil.getInteger(data, offset, 4);
+        offset += 4;
+        orl.business_code = ByteUtil.getString(data, offset, 32);
+        offset += 32;
+        orl.sessionid = ByteUtil.getString(data, offset, 80);
+        offset += 80;
+        orl.req_data = ByteUtil.getString(data, offset, req_len);
+        offset += req_len;
+        orl.res_data = ByteUtil.getString(data, offset, res_len);
+        offset += res_len;
+        orl.busi_msg = ByteUtil.getString(data, offset, busi_msg_len);
+        offset += busi_msg_len;
+        orl.busi_key_msg = ByteUtil.getString(data, offset, key_msg_len);
+        offset += key_msg_len;
+        orl.busi_detail_msg = ByteUtil.getString(data, offset, detail_msg_len);
+        offset += detail_msg_len;
+        orl.remain_data = ByteUtil.getString(data, offset, remain_len);
+        offset += remain_len;
+        return orl;
+    }
+
+    @Override
+    public int getUnitPacketLength() {
+        return 0;
+    }
+
+    @Override
+    public String[] getData() {
+        return new String[0];
+    }
+
+    @Override
+    public int getInterfaceNumber() {
+        return 0;
+    }
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowWeb.java b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowWeb.java
new file mode 100644
index 0000000..c7b2e99
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/BussFlowWeb.java
@@ -0,0 +1,362 @@
+package com.yuandian.dataflow.proto.decode;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+import com.ud.module.common.Constants;
+import com.ud.module.common.SystemConfigManager;
+import com.yuandian.dataflow.proto.decode.PacketBase;
+import com.yuandian.dataflow.proto.decode.PacketHeader;
+ 
+import com.ud.module.util.flow.JXYDBPMDetailMesgUtils;
+import com.ud.module.util.flow.URLUtil;
+
+
+public class BussFlowWeb extends PacketBase {
+	private  Integer UNIT_DATA_LENGTH =6104;
+	
+	private static int i = 1;
+	@Override
+	public int getUnitPacketLength() {
+		return UNIT_DATA_LENGTH;
+	}
+
+	@SuppressWarnings("unchecked")
+	public BussFlowWeb  Parse(PacketHeader header,ByteBuffer data)
+			throws Exception {
+		i++;
+		BussFlowWeb buss = new BussFlowWeb();
+//		buss.m_Header = header;
+		
+		buss.msg_type = header.getTableID();
+//		offset += 4;
+//		buss.msg_version = ByteUtil.getInteger(data, offset, 1);
+//		offset += 1;
+//		buss.msg_seq = ByteUtil.getInteger(data, offset, 4);
+//		offset += 4;
+//		buss.msg_len = ByteUtil.getInteger(data, offset, 4);
+//		offset += 4;
+		
+		buss.src_mac = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+		buss.dst_mac = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+//		buss.vlan_id = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		buss.tos = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		buss.retran_count = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		buss.reset_count = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		buss.zerowin_count = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.protocol=ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.bytes_out = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		buss.bytes_in = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		buss.package_out = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.package_in = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.session_serial_number = ByteUtil.getString(data, offset, 24);   //5268109200
+		offset += 24;
+//		buss.recog_status = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.req_method = ByteUtil.getLong(data, offset, 4);      //1
+		offset += 4;
+		buss.content_type = ByteUtil.getString(data, offset, 40);
+		offset += 40;
+		buss.accept = ByteUtil.getString(data, offset, 40);  //image/png, image/svg+xml, image/*;q=0.8
+		offset += 40;
+		buss.probe_if=ByteUtil.getInteger(data, offset,4);
+		offset += 4;
+		buss.channel=ByteUtil.getString(data, offset,24);
+		offset += 24;
+		buss.session_id=ByteUtil.getString(data, offset,80);  //7Ip6bYet8n0wfxeJMJtAm7v2FW76ZTImJVyNjbYoRbUdUf71QaxQ!-202316728
+		offset += 80;
+		buss.request_ip=ByteUtil.getLong(data, offset,4);    //3232235850
+		offset += 4;
+		buss.request_port=ByteUtil.getInteger(data, offset,4); //3309
+		offset += 4;
+		buss.response_ip=ByteUtil.getLong(data, offset,4);   //1971882754
+		offset += 4;
+		buss.response_port=ByteUtil.getInteger(data, offset,4); //80
+		offset += 4;
+		//开始时间
+		buss.start_tv_sec=ByteUtil.getLong(data, offset,4); //1492473697
+		offset +=4;
+		buss.start_tv_usec=ByteUtil.getLong(data, offset,4); //459461
+		offset +=4;
+		//结束时间
+		buss.end_tv_sec=ByteUtil.getLong(data, offset,4); //1492473697
+		offset +=4;
+		buss.end_tv_usec=ByteUtil.getLong(data, offset,4); //459490
+		offset +=4;
+		
+		buss.server_translate_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+		
+		buss.server_start_tv_sec=ByteUtil.getInteger(data, offset,4);
+		offset +=4;
+		buss.server_start_tv_usec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		//结束时间
+		buss.server_end_tv_sec=ByteUtil.getInteger(data, offset,4);
+		offset +=4;
+		buss.server_end_tv_usec=ByteUtil.getLong(data, offset,4);
+		offset +=4;
+		
+		buss.server_response_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+		buss.client_translate_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+//		buss.locate_server_translate_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+//		buss.locate_server_response_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+//		buss.locate_client_translate_time=ByteUtil.getLong(data, offset,8);
+		offset += 8;
+		buss.x_requested_with = ByteUtil.getString(data, offset, 20);
+		offset += 20;
+		buss.operating_system=ByteUtil.getString(data, offset,20);                //Windows NT 6.1
+		offset += 20;
+		buss.server_res_code=ByteUtil.getInteger(data, offset,2);
+		offset += 2;
+		buss.browser=ByteUtil.getString(data, offset,30);
+		offset += 30;
+		buss.isUncomplete = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		buss.time_flag=ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		int detailMsgLen = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		int keyMsgLen = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		int reqMsgLeng= ByteUtil.getInteger(data, offset, 4);
+		buss.request_msg_length =reqMsgLeng;
+		offset += 4;
+		int resMsgLength=ByteUtil.getInteger(data, offset, 4);
+		buss.response_msg_length = resMsgLength;
+		offset += 4;
+		int cookieLength=ByteUtil.getInteger(data, offset, 4);
+		buss.req_cookie_leng=cookieLength;
+		offset +=4;
+		int url_len=ByteUtil.getInteger(data, offset, 4);
+		offset +=4;
+		int refer_len=ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		int remainLen = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		
+//		if (SystemConfigManager.systemCode.equals("JXYDBPM")) {
+//			buss.business_detail_mesg=JXYDBPMDetailMesgUtils.decodeValue(ByteUtil.getString(data, offset,detailMsgLen));
+//		} else {
+//			buss.business_detail_mesg=ByteUtil.getString(data, offset,detailMsgLen);
+//		}
+		buss.business_detail_mesg=ByteUtil.getString(data, offset,detailMsgLen);
+		offset += detailMsgLen;
+		buss.bussiness_key_mesg = ByteUtil.getString(data, offset,keyMsgLen);
+		offset += keyMsgLen;
+		buss.request_msg_detail = ByteUtil.getStringIgnoreZero(data, offset, reqMsgLeng);
+		offset += reqMsgLeng;
+		buss.response_msg_detail = ByteUtil.getStringIgnoreZero(data, offset,resMsgLength);
+		offset +=resMsgLength;
+		buss.req_cookie_detail = ByteUtil.getString(data, offset,cookieLength);
+		offset += cookieLength;
+		buss.ori_url=ByteUtil.getString(data, offset,url_len);   // /images/index/220130.gif
+		if(buss.ori_url!=null && !"".equals(buss.ori_url.trim())){
+			String result = buss.ori_url.replaceAll("\\?", "").replaceAll("\\/", "").replaceAll("\\*", "").replaceAll("\\=", "").replaceAll("\\&", "").replaceAll("\\+", "");
+			if(isMessyCode(result)){//如果中文乱码
+				String withoutValue = URLUtil.getInsertURL(buss.ori_url, Constants.SPLIT_NULL).trim();//去参数值
+				String result2 = withoutValue.replaceAll("\\?", "").replaceAll("\\/", "").replaceAll("\\*", "").replaceAll("\\=", "").replaceAll("\\&", "").replaceAll("\\+", "");
+				if(isMessyCode(result2)){//判断是否还乱码
+					buss.ori_url="";
+				}else{
+					buss.ori_url=withoutValue;
+				}
+			};
+		}
+		offset += url_len;
+		buss.reter_url = ByteUtil.getString(data, offset, refer_len);         //  http://www.10086.cn/gd/index_200_200.html
+		offset += refer_len;
+		buss.remain_data = ByteUtil.getString(data, offset, remainLen);
+		offset += remainLen;
+		/*if(SystemConfigManager.projectName.contains("GDYDBPM")){
+			boolean exist=false;
+			if(SystemConfigManager.reduceFlag){
+				
+				redisTemplate = (RedisTemplate<String, String>) ApplicationContextUtil.getBean("redisTemplate");
+				exist = reduceByKey(buss);
+			}
+			
+			//不存在就加入缓存
+			if(!exist)return buss;
+			//存在就返回空
+			return null;
+		}
+		*/
+		
+		return buss;
+	}
+
+
+	/**
+	 * 如果已经存在这个key,则说明是重复数据,不再进行后续操作
+	 * @param buss
+	 * @return
+	 */
+	/*private boolean reduceByKey(BussFlowWeb buss) {
+		
+		
+		String coding = CodingGeneration.md5GenCoding2(buss.ori_url);
+		String key=buss.session_serial_number+"|"+buss.request_ip+"|"+buss.request_port+"|"+buss.response_ip+"|"+buss.response_port+"|"+coding+"|"+buss.business_detail_mesg;
+		
+		//分布式同步锁,如果存在该key或者锁已经被获取则返回false,设置
+		
+		boolean flag=redisTemplate.opsForValue().setIfAbsent(key, System.currentTimeMillis()+"");
+		
+		if(flag){
+			redisTemplate.expire(key,SystemConfigManager.keyExistTime,TimeUnit.SECONDS);
+			//原来不存在这条数据
+			return false;
+		}
+		
+		return true;
+	}*/
+	
+	/**
+     * 判断字符串是否是乱码
+     *
+     * @param strName 字符串
+     * @return 是否是乱码
+     */
+    
+    public static boolean isMessyCode(String strName) {
+        Pattern p = Pattern.compile("\\s*|\t*|\r*|\n*");
+        Matcher m = p.matcher(strName);
+        String after = m.replaceAll("").replace("-", "").replace("|", "");
+        String temp = after.replaceAll("\\p{P}", "");
+        char[] ch = temp.trim().toCharArray();
+        float chLength = 0 ;
+        float count = 0;
+        for (int i = 0; i < ch.length; i++) {
+            char c = ch[i];
+            if (!Character.isLetterOrDigit(c)) {
+                if (!isChinese(c)) {
+                    count = count + 1;
+                }
+                chLength++; 
+            }
+        }
+        float result = count / chLength ;
+        if (result > 0.4) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    private static boolean isChinese(char c) {
+        Character.UnicodeBlock ub = Character.UnicodeBlock.of(c);
+        if (ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS
+                || ub == Character.UnicodeBlock.CJK_COMPATIBILITY_IDEOGRAPHS
+                || ub == Character.UnicodeBlock.CJK_UNIFIED_IDEOGRAPHS_EXTENSION_A
+                || ub == Character.UnicodeBlock.GENERAL_PUNCTUATION
+                || ub == Character.UnicodeBlock.CJK_SYMBOLS_AND_PUNCTUATION
+                || ub == Character.UnicodeBlock.HALFWIDTH_AND_FULLWIDTH_FORMS) {
+            return true;
+        }
+        return false;
+    }
+	
+	public String id; //id
+	public int msg_len;                 //消息长度
+	public int msg_type;                //消息类型
+	public long src_mac;
+	public long dst_mac; 
+	public int protocol;                //协议名
+	public String session_serial_number;//会话序列号			
+	public String  buss_type;//业务服务资源编码(C_01)
+//	public String  net_segment;//网段标识(客户-web)
+	public long request_ip;//Web客户端IP
+	public int request_port;//Web客户端端口
+	public long  response_ip;//Web服务器IP
+	public int response_port;//Web服务器端口
+	public long start_tv_sec;//Web开始时间秒
+	public long start_tv_usec;//开始时间毫秒
+	public long end_tv_sec;//结束时间秒
+	public long end_tv_usec;//结束时间微妙
+	public String disc_resource_ident;    //web:url   mid:apiBody   db:sql
+	public String operating_system;       //web:操作系统   midd:interfaceName  db:db_name 
+	//web midd
+	public String session_id;//sessionid
+	public int request_msg_length;  //请求报文长度
+	public String request_msg_detail;   //请求报文详情
+	public int response_msg_length; //响应报文长度
+	public String response_msg_detail;  //响应报文详情
+	//web段
+	public String reter_url;           
+	public String x_requested_with;
+	public long req_method;         //请求方式
+	public String content_type;     //请求类型
+	public String accept;           //jieshou
+	public int req_cookie_leng;//请求cookie报文长度
+	public String req_cookie_detail;//请求cookie报文详情
+	public long t_intodb_time;
+	public int load_or_step;        //0: 页面   1:加载项   2:非web段数据 
+	public String business_detail_mesg;
+	public String bussiness_key_mesg;  //关键字  格式:key=val|key=val....
+	public int isUncomplete;    //组包是否完全    0:组包完整  1:不完整
+	public int deal_state=1;
+	public int server_res_code;
+	public long server_response_time;
+	public long client_translate_time;
+	public String browser;
+	public int server_start_tv_sec;
+	public long server_start_tv_usec;
+	public int server_end_tv_sec;
+	public long server_end_tv_usec;
+	public String probe_ip;
+	public int probe_if;
+	public long server_translate_time;
+	public long time_flag;	
+	public String channel;
+	public String base_code;
+	public String ori_url;
+	public String remain_data;
+	public long bytes_in;
+	public long bytes_out;
+	public int package_in;
+	public int package_out;
+
+	public String  dataId;
+	
+	public int filterId;
+
+	public String whiteCharacter;
+
+	// tokenId
+	public String tokenId;
+	// 判断是否是首端资源( 2 )
+	public Integer segmentId;
+	
+	@Override
+	public String[] getData() {
+		String data[] = new String[38];
+		return data;
+	}
+
+	public String getBussType() {
+		return buss_type;
+	}
+
+
+	@Override
+	public int getInterfaceNumber() {
+		return 0;
+	}
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/DataBaseModel.java b/src/main/java/com/yuandian/dataflow/.proto/decode/DataBaseModel.java
new file mode 100644
index 0000000..6f062a7
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/DataBaseModel.java
@@ -0,0 +1,379 @@
+package com.yuandian.dataflow.proto.decode;
+
+
+
+import java.io.Serializable;
+import java.util.Date;
+
+ 
+
+/**
+ * 移植zjm流量信息接收类
+ * Leehr
+ *
+ */
+public class DataBaseModel extends PacketBase implements Serializable{
+
+	
+	private static final long serialVersionUID = 1L;
+
+	private static final String split=";";
+	
+	private long mac_src;  //源MAC
+	private long mac_dst;  //目标MAC
+	private long ip_src;   //源IP
+	private long ip_dst;   //目标IP
+	private long port_src; //源端口,如果没有,为-1
+	private long port_dst; //目标端口,如果没有,为-1
+	private long l3_proto; //第三层协议ID,如果没有,为-1
+	private long l4_proto; //第四层协议ID,如果没有,为-1
+	private long tos;      //Tos,一个字节,如果没有,为-1
+	private long vlan_id;  //vlan ID,如果没有,为-1 
+	
+	private long bytes;    //字节总数
+	private long packets;  //数据包总数
+	 
+	private long packets_syn;  //TCP同步包数
+	private long packets_syn_ack; //TCP同步确认包数
+	private long packets_syn_rst; //TCP同步重置包数
+
+	private long timestamp; //时间戳,秒
+
+	private long appid;//应用ID
+	private long app_group_id;
+	private long mpls_label;
+	
+	private long pkts_syn_rx; //tcp同步包,接收
+	private long pkts_syn_ack_rx;//tcp同步确认包,接收
+	private long pkts_syn_rst_rx; //tcp同步重置包,接收
+	private long pkts_fin;//tcp终止包,接收
+	private long pkts_rst;//tcp重置包,接收
+
+	private long bytes_rx;//字节收
+	private long packets_rx;//数据包收
+	
+	private long probe_time_sec;
+	private Date probe_time;
+	private Date createTime;
+	
+	public String getKey(){
+		StringBuffer strb=new StringBuffer();
+		strb.append(timestamp).append(split);
+		strb.append(mac_src).append(split);
+		strb.append(mac_dst).append(split);
+		strb.append(ip_src).append(split);
+		strb.append(ip_dst).append(split);
+		strb.append(port_src).append(split);
+		strb.append(port_dst).append(split);
+		strb.append(l3_proto).append(split);
+		strb.append(l4_proto).append(split);
+		strb.append(tos).append(split);
+		strb.append(vlan_id).append(split);
+		strb.append(appid);
+		return strb.toString();
+	}
+	
+	public DataBaseModel Parse(byte[] data, int offset)throws Exception {
+
+		if (offset + 108 > data.length)
+			throw new Exception("data length error!");
+	
+		DataBaseModel db = new DataBaseModel();
+		db.mac_src = ByteUtil.getLong(data, offset, 8);
+		offset+=8;
+		db.mac_dst = ByteUtil.getLong(data, offset, 8);
+		offset+=8;
+		db.ip_src = ByteUtil.getLong(data, offset, 8);
+		offset+=8;
+		db.ip_dst = ByteUtil.getLong(data, offset, 8);
+		offset+=8;
+	 
+		db.port_src = ByteUtil.getInteger(data, offset, 4);
+		offset+=4;
+		db.port_dst = ByteUtil.getInteger(data, offset, 4);
+		offset+=4;
+		db.l3_proto = ByteUtil.getInteger(data, offset, 4);
+		offset+=4;
+		db.l4_proto = ByteUtil.getInteger(data, offset, 4);
+		offset+=4;
+		db.tos = ByteUtil.getInteger(data, offset, 4);
+		offset+=4;
+		db.vlan_id = ByteUtil.getInteger(data, offset, 4);
+		offset+=4;
+		db.bytes = ByteUtil.getLong(data, offset, 8);
+		offset+=8;
+		db.packets = ByteUtil.getLong(data, offset, 8);
+		offset+=8;
+		db.packets_syn = ByteUtil.getLong(data, offset, 8);
+		
+		offset+=8;
+		db.packets_syn_ack = ByteUtil.getLong(data, offset, 8);
+		offset+=8;
+		db.packets_syn_rst = ByteUtil.getLong(data, offset, 8);
+		offset+=8;
+		/**
+		 * 2015-12-10txy 新增3个解析
+		 */
+		db.appid=ByteUtil.getLong(data, offset, 4);
+		offset+=4;
+		db.app_group_id=ByteUtil.getLong(data, offset, 4);
+		offset+=4;
+		db.mpls_label=ByteUtil.getLong(data, offset, 4);
+		offset+=4;
+		return db;
+	}
+	
+	public String toString(){
+			StringBuffer strb = new StringBuffer();
+			strb.append(timestamp).append(",");
+			strb.append(mac_src).append(",");
+			strb.append(mac_dst).append(",");
+			strb.append(ip_src).append(",");
+			strb.append(ip_dst).append(",");
+			strb.append(port_src).append(",");
+			strb.append(port_dst).append(",");
+			strb.append(l3_proto).append(",");
+			strb.append(l4_proto).append(",");
+			strb.append(tos).append(",");
+			strb.append(vlan_id).append(",");
+			strb.append(appid).append(",");
+			strb.append(packets_syn).append(",");
+			strb.append(pkts_syn_rx).append(",");
+			strb.append(packets_syn_ack).append(",");
+			strb.append(pkts_syn_ack_rx).append(",");
+			strb.append(packets_syn_rst).append(",");
+			strb.append(pkts_syn_rst_rx).append(",");
+			strb.append(pkts_fin).append(",");
+			strb.append(pkts_rst).append(",");
+			strb.append(bytes).append(",");
+			strb.append(packets).append(",");
+			strb.append(bytes_rx).append(",");
+			strb.append(packets_rx).append(",");
+			strb.append(app_group_id).append(",");
+			strb.append(mpls_label).append("\n");
+			return strb.toString();
+	}
+	
+	
+	public long getProbeTimeSec() {
+		return probe_time_sec;
+	}
+
+	public void setProbe_time_sec(long probe_time_sec) {
+		this.probe_time_sec = probe_time_sec;
+	}
+
+	public Date getProbe_time() {
+		return probe_time;
+	}
+
+	public void setProbe_time(Date probe_time) {
+		this.probe_time = probe_time;
+	}
+
+	public Date getCreateTime() {
+		return createTime;
+	}
+
+	public void setCreateTime(Date createTime) {
+		this.createTime = createTime;
+	}
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+	public void setTimestamp(long timestamp) {
+		this.timestamp = timestamp;
+	}
+	public long getMac_src() {
+		return mac_src;
+	}
+	public void setMac_src(long macSrc) {
+		mac_src = macSrc;
+	}
+	public long getMac_dst() {
+		return mac_dst;
+	}
+	public void setMac_dst(long macDst) {
+		mac_dst = macDst;
+	}
+	public long getIp_src() {
+		return ip_src;
+	}
+	public void setIp_src(long ipSrc) {
+		ip_src = ipSrc;
+	}
+	public long getIp_dst() {
+		return ip_dst;
+	}
+	public void setIp_dst(long ipDst) {
+		ip_dst = ipDst;
+	}
+	public long getPort_src() {
+		return port_src;
+	}
+	public void setPort_src(long portSrc) {
+		port_src = portSrc;
+	}
+	public long getPort_dst() {
+		return port_dst;
+	}
+	public void setPort_dst(long portDst) {
+		port_dst = portDst;
+	}
+	public long getL3_proto() {
+		return l3_proto;
+	}
+	public void setL3_proto(long l3Proto) {
+		l3_proto = l3Proto;
+	}
+	public long getL4_proto() {
+		return l4_proto;
+	}
+	public void setL4_proto(long l4Proto) {
+		l4_proto = l4Proto;
+	}
+	public long getTos() {
+		return tos;
+	}
+	public void setTos(long tos) {
+		this.tos = tos;
+	}
+	public long getVlan_id() {
+		return vlan_id;
+	}
+	public void setVlan_id(long vlanId) {
+		vlan_id = vlanId;
+	}
+	public long getAppid() {
+		return appid;
+	}
+	public void setAppid(long appid) {
+		this.appid = appid;
+	}
+
+	public long getPkts_syn_rx() {
+		return pkts_syn_rx;
+	}
+	public void setPkts_syn_rx(long pktsSynRx) {
+		pkts_syn_rx = pktsSynRx;
+	}
+
+	public long getPkts_syn_ack_rx() {
+		return pkts_syn_ack_rx;
+	}
+	public void setPkts_syn_ack_rx(long pktsSynAckRx) {
+		pkts_syn_ack_rx = pktsSynAckRx;
+	}
+
+	public long getPkts_syn_rst_rx() {
+		return pkts_syn_rst_rx;
+	}
+	public void setPkts_syn_rst_rx(long pktsSynRstRx) {
+		pkts_syn_rst_rx = pktsSynRstRx;
+	}
+	public long getPkts_fin() {
+		return pkts_fin;
+	}
+	public void setPkts_fin(long pktsFin) {
+		pkts_fin = pktsFin;
+	}
+	public long getPkts_rst() {
+		return pkts_rst;
+	}
+	public void setPkts_rst(long pktsRst) {
+		pkts_rst = pktsRst;
+	}
+	public long getBytes() {
+		return bytes;
+	}
+	public void setBytes(long bytes) {
+		this.bytes = bytes;
+	}
+	public long getPackets() {
+		return packets;
+	}
+	public void setPackets(long packets) {
+		this.packets = packets;
+	}
+	public long getBytes_rx() {
+		return bytes_rx;
+	}
+	public void setBytes_rx(long bytesRx) {
+		bytes_rx = bytesRx;
+	}
+	public long getPackets_rx() {
+		return packets_rx;
+	}
+	public void setPackets_rx(long packetsRx) {
+		packets_rx = packetsRx;
+	}
+
+	public long getPackets_syn() {
+		return packets_syn;
+	}
+
+	public void setPackets_syn(long packetsSyn) {
+		packets_syn = packetsSyn;
+	}
+
+	public long getPackets_syn_ack() {
+		return packets_syn_ack;
+	}
+
+	public void setPackets_syn_ack(long packetsSynAck) {
+		packets_syn_ack = packetsSynAck;
+	}
+
+	public long getPackets_syn_rst() {
+		return packets_syn_rst;
+	}
+
+	public void setPackets_syn_rst(long packetsSynRst) {
+		packets_syn_rst = packetsSynRst;
+	}
+
+	public long getApp_group_id() {
+		return app_group_id;
+	}
+
+	public void setApp_group_id(long app_group_id) {
+		this.app_group_id = app_group_id;
+	}
+
+	public long getMpls_label() {
+		return mpls_label;
+	}
+
+	public void setMpls_label(long mpls_label) {
+		this.mpls_label = mpls_label;
+	}
+
+	public static String getSplit() {
+		return split;
+	}
+
+	@Override
+	public PacketBase  Parse(PacketHeader header,ByteBuffer data)
+			throws Exception {
+		return null;
+	}
+
+	@Override
+	public int getUnitPacketLength() {
+		return 0;
+	}
+
+	@Override
+	public String[] getData() {
+		return null;
+	}
+
+	@Override
+	public int getInterfaceNumber() {
+		return 0;
+	}
+	
+	
+	
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/DataFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/DataFlow.java
new file mode 100644
index 0000000..9dffdc2
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/DataFlow.java
@@ -0,0 +1,160 @@
+package com.yuandian.dataflow.proto.decode;
+
+ 
+
+public class DataFlow  extends PacketBase {
+	private final int UNIT_DATA_LENGTH = 157;//贵阳 
+	public String id;
+	
+	public Integer msg_type;                //消息类型
+	public Integer msg_version;             //消息版本
+	public Integer msg_seq;                 //序列号
+	public Integer msg_len;                 //消息长度
+	public Integer probe_if;                //接口号
+	public Long timestamp;    //时间戳
+	public Long mac_src;      //源物理地址
+	public Long mac_dst;      //目的物理地址
+	public Integer vlan_id;                 //vlan_id
+	public Long l3_proto;     //l3层协议
+	public Long l4_proto;     //l4层协议
+	public Integer tos;                     //tos
+	public Integer retran_count;            //重传次数
+	public Integer reset_count;             //重置次数
+	public Integer zerowin_count;           //零窗口次数
+	public Integer protocol;                //协议名
+	public Long seq;
+	public Long ack;
+	public Integer recog_status;            //识别类型标识
+	public Long bytes;        //总字节
+	public Long packets;      //总包数
+	public  Integer start_tv_sec;//Web开始时间秒
+	public Long start_tv_usec;//开始时间毫秒
+	public Integer end_tv_sec;//结束时间秒
+	public  Long end_tv_usec;//结束时间微妙
+	public  Integer server_start_tv_sec;//服务器响应开始时间秒
+	public Long server_start_tv_usec;//服务器响应开始时间毫秒
+	public Integer server_end_tv_sec;//服务器响应结束时间秒
+	public  Long server_end_tv_usec;//服务器响应结束时间微妙
+	
+	public Long server_response_time;//Web服务器响应时间
+	public Long client_translate_time;//Web客户端传输耗时
+	public Long server_translate_time;//Web服务器传输耗时
+
+	public Long bytes_in;
+	public Long bytes_out;
+	public Long packets_in;
+	public Long packets_out;
+	public Long ip_src;       //源IP
+	public Long ip_dst;       //目的IP
+	public Long port_src;     //源端口
+	public Long port_dst;     //目的端口
+	public Long probeIP;              //探针IP
+	
+	public Long intodb_time;
+	public Long count = 1L;
+
+
+	
+	@Override
+	public PacketBase  Parse(PacketHeader header,ByteBuffer data) throws Exception {
+		if (offset + UNIT_DATA_LENGTH > data.length)
+			throw new Exception("data length error!");
+		DataFlow dflow = new DataFlow();		
+//		dflow.m_Header = header;
+		
+		dflow.msg_type = header.getTableID();
+//		offset += 4;
+//		dflow.msg_version = ByteUtil.getInteger(data, offset, 1);
+//		offset += 1;
+//		dflow.msg_seq = ByteUtil.getInteger(data, offset, 4);
+//		offset += 4;
+//		dflow.msg_len = ByteUtil.getInteger(data, offset, 4);
+//		offset += 4;
+		dflow.ip_src = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		dflow.ip_dst = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+//		dflow.port_src = ByteUtil.getLong(data, offset, 2);
+		offset += 2;
+//		dflow.port_dst = ByteUtil.getLong(data, offset, 2);
+		offset += 2;
+		dflow.probe_if =  ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		dflow.timestamp = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+//		dflow.mac_src = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+//		dflow.mac_dst = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+//		dflow.vlan_id = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		dflow.l3_proto = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+//		dflow.l4_proto = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+//		dflow.tos = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		dflow.retran_count = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		dflow.reset_count = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		dflow.zerowin_count = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		dflow.protocol=ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		dflow.seq = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+//		dflow.ack = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+//		dflow.recog_status = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		dflow.bytes_in = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		dflow.bytes_out = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		dflow.packets_in = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		dflow.packets_out = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		dflow.bytes = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+		dflow.packets = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+//		dflow.start_tv_sec = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		dflow.start_tv_usec = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+//		dflow.end_tv_sec = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		dflow.end_tv_usec = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+//		dflow.server_start_tv_sec = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		dflow.server_start_tv_usec = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+//		dflow.server_end_tv_sec = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+//		dflow.server_end_tv_usec = ByteUtil.getLong(data, offset, 4);
+		offset += 4;
+//		dflow.server_translate_time = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+//		dflow.client_translate_time = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+//		dflow.server_response_time = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+		return dflow;
+	}
+	@Override
+	public int getUnitPacketLength() {
+		return UNIT_DATA_LENGTH;
+	}
+	@Override
+	public String[] getData() {
+		String data[] = new String[45];
+		return data;
+	}
+	@Override
+	public int getInterfaceNumber() {
+		return Integer.parseInt(probe_if.toString());
+	}
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/PacketBase.java b/src/main/java/com/yuandian/dataflow/.proto/decode/PacketBase.java
new file mode 100644
index 0000000..3eb2d91
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/PacketBase.java
@@ -0,0 +1,27 @@
+package com.yuandian.dataflow.proto.decode;
+import java.nio.ByteBuffer;
+
+public abstract class PacketBase {
+
+	protected PacketHeader m_Header = null;
+	
+	public PacketHeader getPacketHead(){
+		return m_Header;
+	}
+	
+	public void setPacketHeader(PacketHeader header)
+	{
+		this.m_Header = header;
+	}
+	
+	//抽象方法,每个数据操作类实现不一样
+	public abstract PacketBase  Parse(PacketHeader header,ByteBuffer data) throws Exception;
+	
+	public abstract int getUnitPacketLength();
+	
+	public abstract String[] getData();
+	
+	public abstract int getInterfaceNumber();
+	
+	 
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/PacketHeader.java b/src/main/java/com/yuandian/dataflow/.proto/decode/PacketHeader.java
new file mode 100644
index 0000000..56f2a4d
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/PacketHeader.java
@@ -0,0 +1,55 @@
+package com.yuandian.dataflow.proto.decode;
+
+ 
+
+public class PacketHeader {
+	
+	private int m_TableID = -1; //数据类型
+	private int m_RecCount = 0;	//数据总条数
+	private int msg_len;	//数据报文总长度,60010端口为压缩后长度
+	private long timestamp;	//60010端口发送数据时间
+	private int probe_if;	//60010端口抓包口
+	private int umcompr_len;//60010端口数据报文压缩前长度
+	private int now_type; //记录25类型数据数据当前数据类型 22 23 24
+	
+	public int getTableID() {
+		return m_TableID;
+	}
+	public int getMsg_len() {
+		return msg_len;
+	}
+	public int getRecCount() {
+		return m_RecCount;
+	}
+	public long getTimestamp() {
+		return timestamp ;
+	}
+	public int getProbe_id(){
+		return probe_if;
+	}
+	public int getUmcompr_len(){
+		return umcompr_len;
+	}
+	
+	public PacketHeader(byte[] data) throws Exception {
+		if (data.length != 12 && data.length != 20) throw new Exception("Packet header length error!");
+		this.m_TableID = ByteUtil.getInteger(data, 0, 4);//22
+		this.m_RecCount = ByteUtil.getInteger(data, 4, 4);//4000
+		this.msg_len = ByteUtil.getInteger(data, 8, 4);
+	}
+	
+	public void parseNextHeader_60010(byte[] data) throws Exception {
+		if (data.length != 12 && data.length != 20) throw new Exception("Packet header length error!");
+		this.timestamp = ByteUtil.getLong(data, 0, 4);
+		this.probe_if = ByteUtil.getInteger(data, 4, 4);
+		this.umcompr_len = ByteUtil.getInteger(data, 8, 4);
+	}
+
+	public void setNow_type(int now_type) {
+		this.now_type = now_type;
+	}
+
+	public int getNow_type(){
+		return now_type;
+	}
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/QoeFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/QoeFlow.java
new file mode 100644
index 0000000..d141de5
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/QoeFlow.java
@@ -0,0 +1,156 @@
+/**
+ * 
+ */
+package com.yuandian.dataflow.proto.decode;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * @author l
+ * 网络性能流
+ */
+public class QoeFlow extends PacketBase {
+	private String qoe_Flow_Id; 
+	public static int SIZE = 38*4;
+//		字段类型		字段							原始类型	    字节数 	说明 
+public int srcIp;	 					 //uint32_t 	4	源ip地址
+public int dstIp;	 					 //uint32_t 	4	目的ip地址
+public int stvSec; 	 				 //uint32_t 	4	开始时间秒
+public int stvUsec; 	 				 //uint32_t 	4	开始时间微秒
+public int ltvSec;	 				 //uint32_t 	4	最后更新时间秒
+public int ltvUsec;	 				 //uint32_t 	4	最后更新时间微秒
+public int dst2ResponNum;	 		 	 //uint32_t 	4	响应总量
+public int dst2Fast;	 				 //uint32_t 	4	
+public int dst2FastExpected;	 		 //uint32_t 	4	
+public int dst2ExpectedDegrated;	 	 //uint32_t 	4	
+public int dst2DegratedService;	 	 //uint32_t 	4	
+public int dst2ServiceAvailability;	 //uint32_t 	4	
+public int dst2ResponTimeout;	 	 	 //uint32_t 	4	响应超时数
+public int dst2ResponSuccess;	 	 	 //uint32_t 	4	响应成功数
+public int dst2ResponFail;	 		 //uint32_t 	4	响应失败数
+public int dst2ResponPeek;	 		 //uint32_t 	4	峰值响应时间
+public int dst2ResponAverage;	 	 	 //uint32_t 	4	响应时间均值
+public int csWindow;	 				 //uint32_t 	4	
+public int scWindow;	 				 //uint32_t 	4	
+public int csReset;	 				 //uint32_t 	4	
+public int scReset;	 				 //uint32_t 	4	
+public int csRetran;	 				 //uint32_t 	4	
+public int scRetran;	 				 //uint32_t 	4	
+public int appId;	 					 //uint32_t 	4	Aphid
+public int appGroupId;	 			 //uint32_t 	4	app组id
+public int probeIf;					 //uint32_t 	4	探针接口id
+public int appStyle;					 //uint32_t 	4	
+public int timeFlag;					 //uint32_t 	4	发送时间戳
+public int connSetupTm;				 //uint32_t 	4	链接建立时间
+public int dataTransferTm;			 //uint32_t 	4	数据传输时间
+public int retransDelayTm;			 //uint32_t 	4	数据重传时延
+public int networkInbound;			 	 //uint32_t 	4	网络响应时间(c->s)
+public int networkOutbound;			 //uint32_t 	4	网络响应时间(s->c)
+public int newSession;				 	 //uint32_t 	4	新会话数
+public int userEvents;				 	 //uint32_t 	4	用户事件
+public int serverEvents;				 //uint32_t 	4	服务事件
+public int connSetupPeek;			 	 //uint32_t 	4	连接建立时间峰值
+public int vlanId;					 	 //uint32_t 	4
+
+
+
+@Override
+public QoeFlow Parse(PacketHeader header,ByteBuffer data) throws Exception {
+	QoeFlow qoeFlow = new QoeFlow();
+	qoeFlow.srcIp=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.dstIp=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.stvSec=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.stvUsec=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.ltvSec=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.ltvUsec=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.dst2ResponNum=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.dst2Fast=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.dst2FastExpected=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.dst2ExpectedDegrated=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.dst2DegratedService=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.dst2ServiceAvailability=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.dst2ResponTimeout=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.dst2ResponSuccess=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.dst2ResponFail=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.dst2ResponPeek=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.dst2ResponAverage=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.csWindow=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.scWindow=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.csReset=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.scReset=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.csRetran=ByteUtil.getInteger(data, offset, 4);	
+	offset += 4;
+	qoeFlow.scRetran=ByteUtil.getInteger(data, offset, 4);	
+	offset += 4;
+	qoeFlow.appId=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.appGroupId=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.probeIf=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.appStyle=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.timeFlag=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.connSetupTm=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.dataTransferTm=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.retransDelayTm=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.networkInbound=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.networkOutbound=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.newSession=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.userEvents=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.serverEvents=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.connSetupPeek=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	qoeFlow.vlanId=ByteUtil.getInteger(data, offset, 4);
+	offset += 4;
+	return qoeFlow;
+}
+@Override
+public int getUnitPacketLength() {
+	// TODO Auto-generated method stub
+	return 0;
+}
+@Override
+public String[] getData() {
+	// TODO Auto-generated method stub
+	return null;
+}
+@Override
+public int getInterfaceNumber() {
+	// TODO Auto-generated method stub
+	return 0;
+}
+
+}
diff --git a/src/main/java/com/yuandian/dataflow/.proto/decode/SstFlow.java b/src/main/java/com/yuandian/dataflow/.proto/decode/SstFlow.java
new file mode 100644
index 0000000..b87bc14
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/.proto/decode/SstFlow.java
@@ -0,0 +1,114 @@
+package com.yuandian.dataflow.proto.decode;
+
+import java.util.Date;
+
+ 
+
+public class SstFlow extends PacketBase{
+
+	public static int SIZE = 108;
+	
+	public long mac_src;  //源MAC
+	public long mac_dst;  //目标MAC
+	public long ip_src;   //源IP
+	public long ip_dst;   //目标IP
+	public int port_src; //源端口,如果没有,为-1
+	public int port_dst; //目标端口,如果没有,为-1
+	public int l3_proto; //第三层协议ID,如果没有,为-1
+	public int l4_proto; //第四层协议ID,如果没有,为-1
+	public int tos;      //Tos,一个字节,如果没有,为-1
+	public int vlan_id;  //vlan ID,如果没有,为-1 
+	
+	public long bytes;    //字节总数
+	public long packets;  //数据包总数
+	 
+	public long packets_syn;  //TCP同步包数
+	public long packets_syn_ack; //TCP同步确认包数
+	public long packets_syn_rst; //TCP同步重置包数
+
+	public long timestamp; //时间戳,秒
+
+	public long appid;//应用ID
+	public long app_group_id;
+	public int mpls_label;
+	
+	public long pkts_syn_rx; //tcp同步包,接收
+	public long pkts_syn_ack_rx;//tcp同步确认包,接收
+	public long pkts_syn_rst_rx; //tcp同步重置包,接收
+	public long pkts_fin;//tcp终止包,接收
+	public long pkts_rst;//tcp重置包,接收
+
+	public long bytes_rx;//字节收
+	public long packets_rx;//数据包收
+	
+	public long probe_time_sec;
+	public Date probe_time;
+	public Date createTime;
+	
+	public int probe_if;
+	
+	@Override
+	public PacketBase  Parse(PacketHeader header,ByteBuffer data)
+			throws Exception {
+		SstFlow sstFlow = new SstFlow();
+		sstFlow.mac_src = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+		sstFlow.mac_dst = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+		sstFlow.ip_src = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+		sstFlow.ip_dst = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+		sstFlow.port_src = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		sstFlow.port_dst = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		sstFlow.l3_proto = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		sstFlow.l4_proto = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		sstFlow.tos = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		sstFlow.vlan_id = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+
+		sstFlow.bytes = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+		sstFlow.packets = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+		sstFlow.packets_syn = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+		sstFlow.packets_syn_ack = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+		sstFlow.packets_syn_rst = ByteUtil.getLong(data, offset, 8);
+		offset += 8;
+		sstFlow.appid = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		sstFlow.app_group_id = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		sstFlow.mpls_label = ByteUtil.getInteger(data, offset, 4);
+		offset += 4;
+		sstFlow.timestamp = header.getTimestamp();
+		sstFlow.probe_if = header.getProbe_id();
+		return sstFlow;
+	}
+
+	@Override
+	public int getUnitPacketLength() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+	@Override
+	public String[] getData() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public int getInterfaceNumber() {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+}
diff --git a/src/main/java/com/yuandian/dataflow/Server.java b/src/main/java/com/yuandian/dataflow/Server.java
index be88c3d..d88f01a 100644
--- a/src/main/java/com/yuandian/dataflow/Server.java
+++ b/src/main/java/com/yuandian/dataflow/Server.java
@@ -100,10 +100,10 @@ public class Server {
         done = new RaftClosure();
 
         System.setProperty("server.port", sprPort);
-        SpringApplication.run(Server.class, args);
- 
+        var app = SpringApplication.run(Server.class, args);
+        app.start();
  
         
-        // node.shutdown(done);
+        node.shutdown(done);
     }
 }
diff --git a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java
index 3d472f3..e77eda4 100644
--- a/src/main/java/com/yuandian/dataflow/controller/TaskLog.java
+++ b/src/main/java/com/yuandian/dataflow/controller/TaskLog.java
@@ -6,7 +6,7 @@ import com.alipay.sofa.jraft.Closure;
 import com.alipay.sofa.jraft.entity.Task;
  
 import com.yuandian.dataflow.Server;
-import com.yuandian.dataflow.entity.Response;
+import com.yuandian.dataflow.projo.Response;
 import com.yuandian.dataflow.statemachine.RaftClosure;
 
 import lombok.extern.slf4j.Slf4j;
@@ -38,13 +38,8 @@ public class TaskLog {
 
 		Task task = new Task();
 		
- 
-        
-        
-		 
-		log.error(node.toString());
-		
 
+		log.error(node.toString());
  
 		RaftClosure done = new RaftClosure();
 		task.setData(ByteBuffer.wrap("hello".getBytes()));
@@ -55,8 +50,9 @@ public class TaskLog {
  
 		
 		Response response  = new Response();
-		response.code = HttpStatus.OK;
-		response.message = HttpStatus.OK.toString();
+		
+		response.Code = HttpStatus.OK;
+		response.Message = HttpStatus.OK.toString();
 		return new ResponseEntity<Response>(response, HttpStatus.OK);
 	}
 
diff --git a/src/main/java/com/yuandian/dataflow/master/Header.java b/src/main/java/com/yuandian/dataflow/master/Header.java
new file mode 100644
index 0000000..ec0aa64
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/master/Header.java
@@ -0,0 +1,72 @@
+package com.yuandian.dataflow.master;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+
+import com.yuandian.dataflow.utils.PacketHeader;
+
+import lombok.Cleanup;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Header
+ */
+@Slf4j
+
+public class Header {
+
+    public static void main(String[] args) throws Exception {
+        ArrayList<Thread> threads  = new ArrayList<Thread>() ;
+      
+        for(int i = 0 ; i < 100 ; i++) {
+            Thread thread = new Thread(() -> {
+
+                try {
+                    var addr = new InetSocketAddress("192.168.1.248", 60001);
+                    @Cleanup
+                    var sock = new Socket();
+                    sock.setReceiveBufferSize(10 * 1024 * 1024);// socket接收缓存
+                    sock.setSoTimeout(1000 * 30);
+                    // 设置超时
+                    sock.connect(addr, 10 * 1000);
+                    var in = new DataInputStream(sock.getInputStream());
+                    var out = new DataOutputStream(sock.getOutputStream());
+                    // 发送验证字符串
+                    out.write("public".getBytes());
+                    // var buf = ByteBuffer.wrap( in.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN);
+                    log.error("{}", PacketHeader.PacketCode(in));
+                    var pheader = new PacketHeader(in);
+        
+                    // buf = ByteBuffer.wrap(in.readNBytes(12)).order(ByteOrder.LITTLE_ENDIAN);
+                    log.error("{}", pheader);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }) ;
+            threads.add(thread);
+            thread.start();
+        }
+
+        threads.forEach((t)->{
+            try {
+                t.join();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        });
+     
+
+    }
+}
diff --git a/src/main/java/com/yuandian/dataflow/entity/Doc.java b/src/main/java/com/yuandian/dataflow/projo/Doc.java
similarity index 95%
rename from src/main/java/com/yuandian/dataflow/entity/Doc.java
rename to src/main/java/com/yuandian/dataflow/projo/Doc.java
index e6c7c01..888adfc 100644
--- a/src/main/java/com/yuandian/dataflow/entity/Doc.java
+++ b/src/main/java/com/yuandian/dataflow/projo/Doc.java
@@ -1,4 +1,4 @@
-package com.yuandian.dataflow.entity;
+package com.yuandian.dataflow.projo;
 
  
 
diff --git a/src/main/java/com/yuandian/dataflow/entity/Response.java b/src/main/java/com/yuandian/dataflow/projo/Response.java
similarity index 52%
rename from src/main/java/com/yuandian/dataflow/entity/Response.java
rename to src/main/java/com/yuandian/dataflow/projo/Response.java
index 6c1f6b5..8936a92 100644
--- a/src/main/java/com/yuandian/dataflow/entity/Response.java
+++ b/src/main/java/com/yuandian/dataflow/projo/Response.java
@@ -1,21 +1,14 @@
-package com.yuandian.dataflow.entity;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import lombok.Getter;
-import lombok.Setter;
+ package com.yuandian.dataflow.projo;
 
 import org.springframework.http.HttpStatus;
 
-@Getter
-@Setter
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 public class Response {
     @JsonProperty("code")
-    public HttpStatus code;
+    public HttpStatus Code;
     @JsonProperty("message")
-    public String message ;
+    public String Message;
     @JsonProperty("data")
-    public Object data ;
-
-    
+    public Object Data;
 }
diff --git a/src/main/java/com/yuandian/dataflow/utils/PacketHeader.java b/src/main/java/com/yuandian/dataflow/utils/PacketHeader.java
new file mode 100644
index 0000000..c658772
--- /dev/null
+++ b/src/main/java/com/yuandian/dataflow/utils/PacketHeader.java
@@ -0,0 +1,55 @@
+/**
+ * description
+ *
+ * @author eson
+ *2022年6月07日-11:09:21
+ */
+package com.yuandian.dataflow.utils;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+/**
+ * description
+ *
+ * @author eson
+ *2022年6月07日-11:09:21
+ */
+@Getter
+@Setter
+@ToString
+public class PacketHeader {
+ 
+        private int tableID = -1; //数据类型
+        private int recCount = 0;	//数据总条数
+        private int msgLen;	//数据报文总长度,60010端口为压缩后长度
+        private long timestamp;	//60010端口发送数据时间
+        private int probeIf;	//60010端口抓包口
+        private int umcomprLen;//60010端口数据报文压缩前长度
+        private int nowType; //记录25类型数据数据当前数据类型 22 23 24
+
+        public PacketHeader(DataInputStream data) throws Exception {
+            this.tableID = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();//22
+            this.recCount = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();//4000
+            this.msgLen = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
+        }
+        
+        public void parseNextHeader_60010(DataInputStream data) throws Exception {
+            this.timestamp = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getLong();
+            this.probeIf =ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
+            this.umcomprLen = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN).getInt();
+        }
+
+        public static int PacketCode(DataInputStream data) throws IOException  {
+            var buf = ByteBuffer.wrap( data.readNBytes(4)).order(ByteOrder.LITTLE_ENDIAN);
+            return buf.getInt();
+        }
+     
+
+}
diff --git a/src/main/proto/scheduler.proto b/src/main/proto/scheduler.proto
deleted file mode 100644
index d96bc51..0000000
--- a/src/main/proto/scheduler.proto
+++ /dev/null
@@ -1,26 +0,0 @@
-syntax = "proto3";
-
-package com.yuandian.dataflow.rpc;
-// option java_package = "com.yuandian.dataflow.rpc"; 
-
-// option java_outer_classname = "com.yuandian.dataflow.rpc";
-option java_multiple_files = false;//以非外部类模式生成
-
-service DataFlow {
-    rpc Update (State) returns (Response);
-}
-
-message State {
-    map<int32, QueueState> QueueMap = 1;
-
-}
-
-message QueueState {
-    int32 Size = 1; 
-}
-
-message Response {
-    int32 Code = 1;
-    string Message = 2; 
-    bytes Data = 3;
-}
\ No newline at end of file
diff --git a/src/test/java/com/yuandian/dataflow/AppTest.java b/src/test/java/com/yuandian/dataflow/AppTest.java
index 6a9af2a..1198286 100644
--- a/src/test/java/com/yuandian/dataflow/AppTest.java
+++ b/src/test/java/com/yuandian/dataflow/AppTest.java
@@ -25,7 +25,7 @@ import org.springframework.expression.spel.ast.FunctionReference;
 
 import com.mongodb.MongoClient;
 import com.mongodb.client.model.InsertManyOptions;
-import com.yuandian.dataflow.entity.Doc;
+import com.yuandian.dataflow.projo.Doc;
 
 import io.netty.handler.codec.dns.DatagramDnsQuery;
 import lombok.Cleanup;