slimming/rpc_client.go
2022-08-31 17:52:16 +08:00

113 lines
1.9 KiB
Go

package main
import (
"bytes"
"context"
"log"
gen "slimming/proto/gen"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type RPCClient struct {
Frame chan []byte
realAddr string
conn *grpc.ClientConn
lock sync.Mutex
}
func NewRPCClient(realAddr string) *RPCClient {
return &RPCClient{
realAddr: realAddr,
Frame: make(chan []byte, 1000),
conn: nil,
lock: sync.Mutex{},
}
}
func (cli *RPCClient) Lock(do func() bool) bool {
cli.lock.Lock()
defer cli.lock.Unlock()
return do()
}
func (cli *RPCClient) CheckConnect() {
if cli.Lock(func() bool {
return cli.conn != nil
}) {
return
}
log.Println("rpcclient connect", cli.realAddr)
// defer log.Println("rpcclient exit")
conn, err := grpc.Dial(cli.realAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Printf("did not connect: %v", err)
return
}
cli.conn = conn
go cli.run()
}
func (cli *RPCClient) run() {
defer log.Println("rpcclient exit")
defer func() {
if err := recover(); err != nil {
log.Println("recover", err)
cli.Lock(func() bool {
err = cli.conn.Close()
if err != nil {
log.Println(err)
}
cli.conn = nil
return false
})
}
}()
cliService := gen.NewFrameServiceClient(cli.conn)
stream, err := cliService.SendFrames(context.Background())
if err != nil {
log.Panic(err)
}
defer stream.CloseSend()
var frames [][]byte
var nextTime = time.Now().Add(time.Millisecond * 20)
var now = time.Now()
buf := bytes.NewBuffer(nil)
for {
frames = append(frames, <-cli.Frame)
if len(frames) >= 1000 || nextTime.After(now) {
nextTime = now
// TODO:
// Contact the server and print out its response.
buf.Reset()
Compress(buf, frames)
// 发到对面的网卡
err = stream.Send(&gen.Request{
Frames: buf.Bytes(),
})
if err != nil {
log.Panic(err)
}
}
}
}