package main import ( "context" "log" gen "slimming/proto/gen" "sync" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type RPCClient struct { Frames chan []byte realAddr string conn *grpc.ClientConn lock sync.Mutex trigger chan bool } func NewRPCClient(realAddr string) *RPCClient { return &RPCClient{ realAddr: realAddr, Frames: make(chan []byte), conn: nil, lock: sync.Mutex{}, trigger: make(chan bool), } } func (cli *RPCClient) Lock(do func() bool) bool { cli.lock.Lock() defer cli.lock.Unlock() return do() } func (cli *RPCClient) CheckConnect() { if cli.Lock(func() bool { return cli.conn != nil }) { return } log.Println("rpcclient connect", cli.realAddr) // defer log.Println("rpcclient exit") conn, err := grpc.Dial(cli.realAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { log.Printf("did not connect: %v", err) return } cli.Lock(func() bool { cli.conn = conn return true }) go cli.run() } func (cli *RPCClient) Push(frame []byte) { cli.Frames <- frame // cli.Lock(func() bool { // cli.Frames = append(cli.Frames, frame) // if len(cli.Frames) >= 1000 { // cli.trigger <- true // } // return true // }) } func (cli *RPCClient) run() { defer log.Println("rpcclient exit") defer func() { if err := recover(); err != nil { log.Println("recover", err) cli.Lock(func() bool { err = cli.conn.Close() if err != nil { log.Println(err) } cli.conn = nil return false }) } }() cliService := gen.NewFrameServiceClient(cli.conn) stream, err := cliService.SendFrames(context.Background()) if err != nil { log.Panic(err) } defer stream.CloseSend() // var ticker = time.NewTicker(time.Millisecond * 1) // buffer := bytes.NewBuffer(nil) for buf := range cli.Frames { // select { // case <-cli.trigger: // case <-ticker.C: // } // if !cli.Lock(func() bool { // if len(cli.Frames) == 0 { // return false // } // Compress(buf, cli.Frames) // if len(cli.Frames) >= 2 { // var countbuf = 0 // for _, frame := range cli.Frames { // countbuf += len(frame) // } // log.Printf("src size: %d compressed size: %d", countbuf, len(buf.Bytes())) // } // cli.Frames = cli.Frames[:0] // return true // }) { // continue // } // 发到对面的网卡 err = stream.Send(&gen.RequestFrames{ Frames: buf, }) if err != nil { log.Panic(err) } // buf.Reset() } }