package main import ( "bytes" "context" "encoding/binary" "encoding/gob" "fmt" "log" "net" "os/exec" gen "slimming/proto/gen" "strings" "sync" "sync/atomic" "time" "github.com/songgao/packets/ethernet" "github.com/songgao/water" "github.com/songgao/water/waterutil" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type ExchangeBuffer struct { BytesArray [][]byte } type NetCard struct { FrameChan chan []byte ifce *water.Interface server *RPCServer clientMap map[string]*RPCClient lock sync.Mutex } func (nc *NetCard) Lock(do func()) { nc.lock.Lock() defer nc.lock.Unlock() do() } type RPCClient struct { FrameChan chan *ExchangeBuffer conn *grpc.ClientConn } func (cli *RPCClient) connect(realAddr string) { log.Println("rpcclient connect", realAddr) // defer log.Println("rpcclient exit") conn, err := grpc.Dial(realAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { log.Printf("did not connect: %v", err) return } cli.conn = conn go cli.run() time.Sleep(time.Millisecond) } func (cli *RPCClient) run() { defer log.Println("rpcclient exit") defer func() { if err := recover(); err != nil { log.Println("recover") cli.conn = nil } }() cliService := gen.NewFrameServiceClient(cli.conn) stream, err := cliService.SendFrames(context.Background()) if err != nil { log.Panic(err) } buf := bytes.NewBuffer(nil) for { // Contact the server and print out its response. buf.Reset() // encode gob enc := gob.NewEncoder(buf) cliBuffer := <-cli.FrameChan log.Printf("send to target %s source bytes: %d", waterutil.IPv4Destination(cliBuffer.BytesArray[0]), len(cliBuffer.BytesArray[0])) err := enc.Encode(cliBuffer.BytesArray) if err != nil { log.Panic(err) } // zstd compress // zenc, err := zstd.NewWriter(buf) // if err != nil { // log.Panic(err) // } // err = zenc.Flush() // if err != nil { // log.Println(err) // } // 发到对面的网卡 err = stream.Send(&gen.Request{ Frames: buf.Bytes(), }) if err != nil { log.Panic(err) } } } func (nc *NetCard) Run() { go nc.runRead() go nc.runWrite() // go nc.cli.run() time.Sleep(time.Second) cmdstr := fmt.Sprintf("ip addr add %s dev stun", config.Network.Self.Virt) log.Println(cmdstr) cmd := strings.Split(cmdstr, " ") err := exec.Command(cmd[0], cmd[1:]...).Run() if err != nil { log.Panic(err) } cmdstr = "ip link set dev stun up" cmd = strings.Split(cmdstr, " ") err = exec.Command(cmd[0], cmd[1:]...).Run() if err != nil { log.Panic(err) } nc.server.run() } func NewNetCard() *NetCard { config := water.Config{ DeviceType: water.TUN, PlatformSpecificParams: water.PlatformSpecificParams{ Name: "stun", Persist: true, MultiQueue: true, }, } ifce, err := water.New(config) if err != nil { log.Panic(err) } nc := &NetCard{ FrameChan: make(chan []byte, 3000), ifce: ifce, clientMap: map[string]*RPCClient{}, } nc.server = newRPCServer(nc) return nc } func (nc *NetCard) runRead() { log.Println("start netcard read") var ifce *water.Interface = nc.ifce for { var bytesMap map[string]*ExchangeBuffer = make(map[string]*ExchangeBuffer) var ok bool var isLoop int32 = 1 // 20 微秒后停止收集数据 go func() { var after = time.NewTimer(time.Millisecond * 20) <-after.C atomic.StoreInt32(&isLoop, 0) }() for atomic.LoadInt32(&isLoop) > 0 { var rframe ethernet.Frame rframe.Resize(1500) n, err := ifce.Read([]byte(rframe)) if err != nil { log.Fatal(err) } rframe = rframe[:n] if !waterutil.IsIPv4(rframe) || waterutil.IPv4Source(rframe).Equal(net.IPv4(0, 0, 0, 0)) { continue } var realAddr string = "" config.Lock(func() { if realAddr, ok = config.IPv4Nodes[binary.LittleEndian.Uint32([]byte(waterutil.IPv4Destination(rframe)))]; !ok { return } }) if realAddr == "" { log.Printf("%s is not exists", waterutil.IPv4Destination(rframe)) continue } // log.Printf("Payload: % x\n", rframe.Payload()) log.Printf("Ethertype: % x %v realAddr %s\n", rframe.Ethertype(), waterutil.IsIPv4(rframe), realAddr) log.Printf("Src %s Dst: %s\n", waterutil.IPv4Source(rframe), waterutil.IPv4Destination(rframe)) var buffer *ExchangeBuffer if buffer, ok = bytesMap[realAddr]; !ok { buffer = &ExchangeBuffer{BytesArray: make([][]byte, 0, 1000)} bytesMap[realAddr] = buffer } ifce.Write([]byte(rframe)) buffer.BytesArray = append(buffer.BytesArray, []byte(rframe)) } for dst, buffer := range bytesMap { var cli *RPCClient if cli, ok = nc.clientMap[dst]; !ok { cli = &RPCClient{FrameChan: make(chan *ExchangeBuffer)} nc.clientMap[dst] = cli } if cli.conn == nil { cli.connect(dst) } else { cli.FrameChan <- buffer // 网卡数据 发到对方 } } // 写到grpc服务 // log.Printf("Dst: %s\n", rframe.Destination()[0:4]) // log.Printf("Src: %s\n", rframe.Source()[0:4]) // log.Printf("Ethertype: % x\n", rframe.Ethertype()) // log.Printf("Payload: % x\n", rframe.Payload()) } } func (nc *NetCard) runWrite() { var ifce *water.Interface = nc.ifce var err error for wframe := range nc.FrameChan { log.Printf("get wframes bytes len: %d", len(wframe)) var buf = bytes.NewBuffer(wframe) // zdec, err := zstd.NewReader(buf) // if err != nil { // log.Panic(err) // } // zdec.Close() dec := gob.NewDecoder(buf) var bufs [][]byte err = dec.Decode(&bufs) if err != nil { log.Panic(err) } for _, buf := range bufs { log.Printf("get wframes decode len: %d", len(buf)) log.Printf("get decode frames decode source: %s dst: %s", waterutil.IPv4Source(buf), waterutil.IPv4Destination(buf)) _, err := ifce.Write(buf) if err != nil { log.Panic(err) } } } }