slimming/rpc_client.go
2022-09-01 15:25:37 +08:00

143 lines
2.4 KiB
Go

package main
import (
"bytes"
"context"
"log"
gen "slimming/proto/gen"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type RPCClient struct {
Frames [][]byte
realAddr string
conn *grpc.ClientConn
lock sync.Mutex
trigger chan bool
}
func NewRPCClient(realAddr string) *RPCClient {
return &RPCClient{
realAddr: realAddr,
Frames: make([][]byte, 0, 1000),
conn: nil,
lock: sync.Mutex{},
trigger: make(chan bool),
}
}
func (cli *RPCClient) Lock(do func() bool) bool {
cli.lock.Lock()
defer cli.lock.Unlock()
return do()
}
func (cli *RPCClient) CheckConnect() {
if cli.Lock(func() bool {
return cli.conn != nil
}) {
return
}
log.Println("rpcclient connect", cli.realAddr)
// defer log.Println("rpcclient exit")
conn, err := grpc.Dial(cli.realAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Printf("did not connect: %v", err)
return
}
cli.Lock(func() bool {
cli.conn = conn
return true
})
go cli.run()
}
func (cli *RPCClient) Push(frame []byte) {
cli.Lock(func() bool {
cli.Frames = append(cli.Frames, frame)
if len(cli.Frames) >= 1000 {
cli.trigger <- true
}
return true
})
}
func (cli *RPCClient) run() {
defer log.Println("rpcclient exit")
defer func() {
if err := recover(); err != nil {
log.Println("recover", err)
cli.Lock(func() bool {
err = cli.conn.Close()
if err != nil {
log.Println(err)
}
cli.conn = nil
return false
})
}
}()
cliService := gen.NewFrameServiceClient(cli.conn)
stream, err := cliService.SendFrames(context.Background())
if err != nil {
log.Panic(err)
}
defer stream.CloseSend()
var ticker = time.NewTicker(time.Millisecond * 20)
buf := bytes.NewBuffer(nil)
for {
select {
case <-cli.trigger:
case <-ticker.C:
}
if !cli.Lock(func() bool {
if len(cli.Frames) == 0 {
return false
}
Compress(buf, cli.Frames)
if len(cli.Frames) >= 1000 {
var countbuf = 0
for _, frame := range cli.Frames {
countbuf += len(frame)
}
log.Printf("src size: %d compressed size: %d", countbuf, len(buf.Bytes()))
}
cli.Frames = cli.Frames[:0]
return true
}) {
continue
}
// 发到对面的网卡
err = stream.Send(&gen.RequestFrames{
Frames: buf.Bytes(),
})
if err != nil {
log.Panic(err)
}
buf.Reset()
}
}