package main import ( "bytes" "context" "log" gen "slimming/proto/gen" "sync" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type RPCClient struct { Frames [][]byte realAddr string conn *grpc.ClientConn lock sync.Mutex trigger chan bool } func NewRPCClient(realAddr string) *RPCClient { return &RPCClient{ realAddr: realAddr, Frames: make([][]byte, 0, 1000), conn: nil, lock: sync.Mutex{}, trigger: make(chan bool), } } func (cli *RPCClient) Lock(do func() bool) bool { cli.lock.Lock() defer cli.lock.Unlock() return do() } func (cli *RPCClient) CheckConnect() { if cli.Lock(func() bool { return cli.conn != nil }) { return } log.Println("rpcclient connect", cli.realAddr) // defer log.Println("rpcclient exit") conn, err := grpc.Dial(cli.realAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { log.Printf("did not connect: %v", err) return } cli.conn = conn go cli.run() } func (cli *RPCClient) Push(frame []byte) { cli.lock.Lock() defer cli.lock.Unlock() cli.Frames = append(cli.Frames, frame) if len(cli.Frames) >= 1000 { cli.trigger <- true } } func (cli *RPCClient) run() { defer log.Println("rpcclient exit") defer func() { if err := recover(); err != nil { log.Println("recover", err) cli.Lock(func() bool { err = cli.conn.Close() if err != nil { log.Println(err) } cli.conn = nil return false }) } }() cliService := gen.NewFrameServiceClient(cli.conn) stream, err := cliService.SendFrames(context.Background()) if err != nil { log.Panic(err) } defer stream.CloseSend() var ticker = time.NewTicker(time.Millisecond * 20) var frames [][]byte buf := bytes.NewBuffer(nil) for { select { case <-cli.trigger: case <-ticker.C: } buf.Reset() cli.Lock(func() bool { Compress(buf, frames) frames = frames[:0] return true }) // 发到对面的网卡 err = stream.Send(&gen.RequestFrames{ Frames: buf.Bytes(), }) if err != nil { log.Panic(err) } } }