package main import ( "bytes" "context" "log" gen "slimming/proto/gen" "sync" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type RPCClient struct { Frame chan []byte realAddr string conn *grpc.ClientConn lock sync.Mutex } func NewRPCClient(realAddr string) *RPCClient { return &RPCClient{ realAddr: realAddr, Frame: make(chan []byte, 1000), conn: nil, lock: sync.Mutex{}, } } func (cli *RPCClient) Lock(do func() bool) bool { cli.lock.Lock() defer cli.lock.Unlock() return do() } func (cli *RPCClient) CheckConnect() { if cli.Lock(func() bool { return cli.conn != nil }) { return } log.Println("rpcclient connect", cli.realAddr) // defer log.Println("rpcclient exit") conn, err := grpc.Dial(cli.realAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { log.Printf("did not connect: %v", err) return } cli.conn = conn go cli.run() } func (cli *RPCClient) run() { defer log.Println("rpcclient exit") defer func() { if err := recover(); err != nil { log.Println("recover", err) cli.Lock(func() bool { err = cli.conn.Close() if err != nil { log.Println(err) } cli.conn = nil return false }) } }() cliService := gen.NewFrameServiceClient(cli.conn) stream, err := cliService.SendFrames(context.Background()) if err != nil { log.Panic(err) } defer stream.CloseSend() var frames [][]byte var nextTime = time.Now().Add(time.Millisecond * 20) var now = time.Now() buf := bytes.NewBuffer(nil) for { frames = append(frames, <-cli.Frame) if len(frames) >= 1000 || nextTime.After(now) { nextTime = now // TODO: // Contact the server and print out its response. buf.Reset() Compress(buf, frames) frames = nil // 发到对面的网卡 err = stream.Send(&gen.Request{ Frames: buf.Bytes(), }) if err != nil { log.Panic(err) } } } }