From e632181dd12993797afe39daee41e0c46c258fb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=80=9D=E6=95=8F?= Date: Wed, 31 Aug 2022 17:52:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0TAP=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main_test.go | 7 ++ netcard_old.go | 291 +++++++++++++++++++++++++++++++++++++++++++++++++ rpc_client.go | 112 +++++++++++++++++++ rpc_old.go | 56 ++++++++++ utils.go | 56 ++++++++++ 5 files changed, 522 insertions(+) create mode 100644 main_test.go create mode 100644 netcard_old.go create mode 100644 rpc_client.go create mode 100644 rpc_old.go create mode 100644 utils.go diff --git a/main_test.go b/main_test.go new file mode 100644 index 0000000..ef64976 --- /dev/null +++ b/main_test.go @@ -0,0 +1,7 @@ +package main + +import "testing" + +func TestMain(t *testing.T) { + main() +} diff --git a/netcard_old.go b/netcard_old.go new file mode 100644 index 0000000..69ddff6 --- /dev/null +++ b/netcard_old.go @@ -0,0 +1,291 @@ +package main + +import ( + "bytes" + "context" + "crypto/md5" + "encoding/binary" + "encoding/gob" + "fmt" + "log" + "net" + "os/exec" + gen "slimming/proto/gen" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/klauspost/compress/zstd" + "github.com/songgao/packets/ethernet" + "github.com/songgao/water" + "github.com/songgao/water/waterutil" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type ExchangeBuffer_OLD struct { + BytesArray [][]byte +} + +type NetCard_OLD struct { + FrameChan chan []byte + ifce *water.Interface + server *RPCServer_OLD + + clientMap map[string]*RPCClient_OLD + + lock sync.Mutex +} + +func (nc *NetCard_OLD) Lock(do func()) { + nc.lock.Lock() + defer nc.lock.Unlock() + do() +} + +type RPCClient_OLD struct { + FrameChan chan *ExchangeBuffer_OLD + conn *grpc.ClientConn +} + +func (cli *RPCClient_OLD) connect(realAddr string) { + log.Println("rpcclient connect", realAddr) + // defer log.Println("rpcclient exit") + + conn, err := grpc.Dial(realAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + log.Printf("did not connect: %v", err) + return + } + + cli.conn = conn + go cli.run() + time.Sleep(time.Millisecond) +} + +func (cli *RPCClient_OLD) run() { + defer log.Println("rpcclient exit") + defer func() { + if err := recover(); err != nil { + log.Println("recover") + cli.conn = nil + } + }() + + cliService := gen.NewFrameServiceClient(cli.conn) + stream, err := cliService.SendFrames(context.Background()) + if err != nil { + log.Panic(err) + } + + buf := bytes.NewBuffer(nil) + for { + // Contact the server and print out its response. + + buf.Reset() + + // encode gob + enc := gob.NewEncoder(buf) + cliBuffer := <-cli.FrameChan + + log.Printf("send to target %s source bytes: %d", waterutil.IPv4Destination(cliBuffer.BytesArray[0]), len(cliBuffer.BytesArray[0])) + err := enc.Encode(cliBuffer.BytesArray) + if err != nil { + log.Panic(err) + } + + // zstd compress + zenc, err := zstd.NewWriter(buf) + if err != nil { + log.Panic(err) + } + err = zenc.Flush() + if err != nil { + log.Println(err) + } + + // 发到对面的网卡 + err = stream.Send(&gen.Request{ + Frames: buf.Bytes(), + }) + + if err != nil { + log.Panic(err) + } + } +} + +func (nc *NetCard_OLD) Run() { + go nc.runRead() + go nc.runWrite() + // go nc.cli.run() + time.Sleep(time.Second) + + cmdstr := fmt.Sprintf("ip addr add %s dev stun", config.Network.Self.Virt) + log.Println(cmdstr) + cmd := strings.Split(cmdstr, " ") + err := exec.Command(cmd[0], cmd[1:]...).Run() + if err != nil { + log.Panic(err) + } + + cmdstr = "ip link set dev stun up" + cmd = strings.Split(cmdstr, " ") + err = exec.Command(cmd[0], cmd[1:]...).Run() + if err != nil { + log.Panic(err) + } + nc.server.run() +} + +func NewNetCard_OLD() *NetCard_OLD { + + config := water.Config{ + DeviceType: water.TUN, + PlatformSpecificParams: water.PlatformSpecificParams{ + Name: "stun", + }, + } + + ifce, err := water.New(config) + if err != nil { + log.Panic(err) + } + + nc := &NetCard_OLD{ + FrameChan: make(chan []byte, 3000), + ifce: ifce, + clientMap: map[string]*RPCClient_OLD{}, + } + nc.server = newRPCServer_OLD(nc) + return nc +} + +func (nc *NetCard_OLD) runRead() { + log.Println("start netcard read") + + var ifce *water.Interface = nc.ifce + + for { + + var bytesMap map[string]*ExchangeBuffer_OLD = make(map[string]*ExchangeBuffer_OLD) + + var ok bool + var isLoop int32 = 1 + + // 20 微秒后停止收集数据 + go func() { + var after = time.NewTimer(time.Millisecond * 20) + <-after.C + atomic.StoreInt32(&isLoop, 0) + }() + + for atomic.LoadInt32(&isLoop) > 0 { + var rframe ethernet.Frame + rframe.Resize(1500) + n, err := ifce.Read([]byte(rframe)) + if err != nil { + log.Fatal(err) + } + rframe = rframe[:n] + + if !waterutil.IsIPv4(rframe) || waterutil.IPv4Source(rframe).Equal(net.IPv4(0, 0, 0, 0)) { + continue + } + + var realAddr string = "" + config.Lock(func() { + if realAddr, ok = config.IPv4Nodes[binary.LittleEndian.Uint32([]byte(waterutil.IPv4Destination(rframe)))]; !ok { + return + } + }) + + if realAddr == "" { + log.Printf("%s is not exists", waterutil.IPv4Destination(rframe)) + continue + } + + // log.Printf("Payload: % x\n", rframe.Payload()) + + log.Printf("Ethertype: % x %v realAddr %s\n", rframe.Ethertype(), waterutil.IsIPv4(rframe), realAddr) + log.Printf("Src %s Dst: %s\n", waterutil.IPv4Source(rframe), waterutil.IPv4Destination(rframe)) + + var buffer *ExchangeBuffer_OLD + if buffer, ok = bytesMap[realAddr]; !ok { + buffer = &ExchangeBuffer_OLD{BytesArray: make([][]byte, 0, 1000)} + bytesMap[realAddr] = buffer + } + + h := md5.New() + h.Write([]byte(rframe)) + log.Printf("%x", h.Sum(nil)) + buffer.BytesArray = append(buffer.BytesArray, []byte(rframe)) + } + + for dst, buffer := range bytesMap { + + var cli *RPCClient_OLD + if cli, ok = nc.clientMap[dst]; !ok { + cli = &RPCClient_OLD{FrameChan: make(chan *ExchangeBuffer_OLD)} + nc.clientMap[dst] = cli + } + + if cli.conn == nil { + cli.connect(dst) + } else { + cli.FrameChan <- buffer // 网卡数据 发到对方 + } + + } + + // 写到grpc服务 + + // log.Printf("Dst: %s\n", rframe.Destination()[0:4]) + // log.Printf("Src: %s\n", rframe.Source()[0:4]) + // log.Printf("Ethertype: % x\n", rframe.Ethertype()) + // log.Printf("Payload: % x\n", rframe.Payload()) + + } +} + +func (nc *NetCard_OLD) runWrite() { + var ifce *water.Interface = nc.ifce + var err error + + for wframe := range nc.FrameChan { + + log.Printf("get wframes bytes len: %d", len(wframe)) + var buf = bytes.NewBuffer(wframe) + + var zdec *zstd.Decoder + zdec, err = zstd.NewReader(buf) + if err != nil { + log.Panic(err) + } + zdec.Close() + + dec := gob.NewDecoder(buf) + var bufs [][]byte + err = dec.Decode(&bufs) + if err != nil { + log.Panic(err) + } + + for _, buf := range bufs { + h := md5.New() + h.Write(buf) + + log.Printf("get wframes decode len: %d, write buf hash: %x", len(buf), h.Sum(nil)) + log.Printf("get decode frames decode source: %s dst: %s", waterutil.IPv4Source(buf), waterutil.IPv4Destination(buf)) + _, err := ifce.Write(buf) + if err != nil { + log.Panic(err) + } + } + + } + +} diff --git a/rpc_client.go b/rpc_client.go new file mode 100644 index 0000000..a58163b --- /dev/null +++ b/rpc_client.go @@ -0,0 +1,112 @@ +package main + +import ( + "bytes" + "context" + "log" + gen "slimming/proto/gen" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type RPCClient struct { + Frame chan []byte + + realAddr string + conn *grpc.ClientConn + lock sync.Mutex +} + +func NewRPCClient(realAddr string) *RPCClient { + return &RPCClient{ + realAddr: realAddr, + Frame: make(chan []byte, 1000), + conn: nil, + lock: sync.Mutex{}, + } +} + +func (cli *RPCClient) Lock(do func() bool) bool { + cli.lock.Lock() + defer cli.lock.Unlock() + return do() +} + +func (cli *RPCClient) CheckConnect() { + + if cli.Lock(func() bool { + return cli.conn != nil + }) { + return + } + + log.Println("rpcclient connect", cli.realAddr) + // defer log.Println("rpcclient exit") + + conn, err := grpc.Dial(cli.realAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + log.Printf("did not connect: %v", err) + return + } + + cli.conn = conn + go cli.run() +} + +func (cli *RPCClient) run() { + defer log.Println("rpcclient exit") + defer func() { + if err := recover(); err != nil { + log.Println("recover", err) + cli.Lock(func() bool { + err = cli.conn.Close() + if err != nil { + log.Println(err) + } + cli.conn = nil + return false + }) + } + }() + + cliService := gen.NewFrameServiceClient(cli.conn) + stream, err := cliService.SendFrames(context.Background()) + if err != nil { + log.Panic(err) + } + defer stream.CloseSend() + + var frames [][]byte + var nextTime = time.Now().Add(time.Millisecond * 20) + var now = time.Now() + + buf := bytes.NewBuffer(nil) + for { + + frames = append(frames, <-cli.Frame) + + if len(frames) >= 1000 || nextTime.After(now) { + nextTime = now + + // TODO: + // Contact the server and print out its response. + buf.Reset() + Compress(buf, frames) + + // 发到对面的网卡 + err = stream.Send(&gen.Request{ + Frames: buf.Bytes(), + }) + + if err != nil { + log.Panic(err) + } + } + + } +} diff --git a/rpc_old.go b/rpc_old.go new file mode 100644 index 0000000..7dca012 --- /dev/null +++ b/rpc_old.go @@ -0,0 +1,56 @@ +package main + +import ( + "log" + "net" + gen "slimming/proto/gen" + + "google.golang.org/grpc" +) + +type RPCServer_OLD struct { + gen.UnimplementedFrameServiceServer + netCard *NetCard_OLD +} + +var () + +func newRPCServer_OLD(netCard *NetCard_OLD) *RPCServer_OLD { + return &RPCServer_OLD{netCard: netCard} +} + +func (rpc *RPCServer_OLD) run() { + + lis, err := net.Listen("tcp", config.Network.Self.Real) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + gen.RegisterFrameServiceServer(s, rpc) + log.Printf("server listening at %v", lis.Addr()) + + if err := s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } +} + +// SayHello implements helloworld.GreeterServer +func (s *RPCServer_OLD) SendFrames(stream gen.FrameService_SendFramesServer) error { + log.Printf("Start: %v", stream) + + for { + request, err := stream.Recv() + if err != nil { + log.Panic(err) + } + log.Printf("request: %v", len(request.Frames)) + s.netCard.FrameChan <- request.GetFrames() // 接受数据 广播到网卡上 + } + + // err := stream.SendAndClose(&gen.Response{Code: 0}) + // if err != nil { + // log.Panic(err) + // } + + return nil +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..bd262c4 --- /dev/null +++ b/utils.go @@ -0,0 +1,56 @@ +package main + +import ( + "bytes" + "encoding/gob" + "io" + "log" + + "github.com/klauspost/compress/zstd" + "github.com/songgao/water/waterutil" +) + +func Compress(writer io.Writer, data [][]byte) { + + // encode gob + enc := gob.NewEncoder(writer) + + log.Printf("send to target %s source bytes: %d", waterutil.IPv4Destination(data[0]), len(data[0])) + err := enc.Encode(data) + if err != nil { + log.Panic(err) + } + + // zstd compress + zenc, err := zstd.NewWriter(writer) + if err != nil { + log.Panic(err) + } + err = zenc.Flush() + if err != nil { + log.Println(err) + } + +} + +func Decompress(data []byte) (frames [][]byte) { + var err error + + log.Printf("get data bytes len: %d", len(data)) + + var buf = bytes.NewBuffer(data) + var zdec *zstd.Decoder + zdec, err = zstd.NewReader(buf) + if err != nil { + log.Panic(err) + } + zdec.Close() + + dec := gob.NewDecoder(buf) + err = dec.Decode(&frames) + if err != nil { + log.Panic(err) + } + + return +}