package main import ( "bytes" "context" "log" gen "slimming/proto/gen" "sync" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type RPCClient struct { Frames [][]byte realAddr string conn *grpc.ClientConn lock sync.Mutex trigger chan bool } func NewRPCClient(realAddr string) *RPCClient { return &RPCClient{ realAddr: realAddr, Frames: make([][]byte, 0, 1000), conn: nil, lock: sync.Mutex{}, trigger: make(chan bool), } } func (cli *RPCClient) Lock(do func() bool) bool { cli.lock.Lock() defer cli.lock.Unlock() return do() } func (cli *RPCClient) CheckConnect() { if cli.Lock(func() bool { return cli.conn != nil }) { return } log.Println("rpcclient connect", cli.realAddr) // defer log.Println("rpcclient exit") conn, err := grpc.Dial(cli.realAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { log.Printf("did not connect: %v", err) return } cli.Lock(func() bool { cli.conn = conn return true }) go cli.run() } func (cli *RPCClient) Push(frame []byte) { cli.Lock(func() bool { cli.Frames = append(cli.Frames, frame) if len(cli.Frames) >= 1000 { cli.trigger <- true } return true }) } func (cli *RPCClient) run() { defer log.Println("rpcclient exit") defer func() { if err := recover(); err != nil { log.Println("recover", err) cli.Lock(func() bool { err = cli.conn.Close() if err != nil { log.Println(err) } cli.conn = nil return false }) } }() cliService := gen.NewFrameServiceClient(cli.conn) stream, err := cliService.SendFrames(context.Background()) if err != nil { log.Panic(err) } defer stream.CloseSend() var ticker = time.NewTicker(time.Millisecond * 200) buf := bytes.NewBuffer(nil) for { select { case <-cli.trigger: case <-ticker.C: } if !cli.Lock(func() bool { if len(cli.Frames) == 0 { return false } Compress(buf, cli.Frames) if len(cli.Frames) >= 2 { var countbuf = 0 for _, frame := range cli.Frames { countbuf += len(frame) } log.Printf("src size: %d compressed size: %d", countbuf, len(buf.Bytes())) } cli.Frames = cli.Frames[:0] return true }) { continue } // 发到对面的网卡 err = stream.Send(&gen.RequestFrames{ Frames: buf.Bytes(), }) if err != nil { log.Panic(err) } buf.Reset() } }