slimming/rpc_client.go
2022-09-01 16:19:32 +08:00

143 lines
2.5 KiB
Go

package main
import (
"context"
"log"
gen "slimming/proto/gen"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type RPCClient struct {
Frames chan []byte
realAddr string
conn *grpc.ClientConn
lock sync.Mutex
trigger chan bool
}
func NewRPCClient(realAddr string) *RPCClient {
return &RPCClient{
realAddr: realAddr,
Frames: make(chan []byte),
conn: nil,
lock: sync.Mutex{},
trigger: make(chan bool),
}
}
func (cli *RPCClient) Lock(do func() bool) bool {
cli.lock.Lock()
defer cli.lock.Unlock()
return do()
}
func (cli *RPCClient) CheckConnect() {
if cli.Lock(func() bool {
return cli.conn != nil
}) {
return
}
log.Println("rpcclient connect", cli.realAddr)
// defer log.Println("rpcclient exit")
conn, err := grpc.Dial(cli.realAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Printf("did not connect: %v", err)
return
}
cli.Lock(func() bool {
cli.conn = conn
return true
})
go cli.run()
}
func (cli *RPCClient) Push(frame []byte) {
cli.Frames <- frame
// cli.Lock(func() bool {
// cli.Frames = append(cli.Frames, frame)
// if len(cli.Frames) >= 1000 {
// cli.trigger <- true
// }
// return true
// })
}
func (cli *RPCClient) run() {
defer log.Println("rpcclient exit")
defer func() {
if err := recover(); err != nil {
log.Println("recover", err)
cli.Lock(func() bool {
err = cli.conn.Close()
if err != nil {
log.Println(err)
}
cli.conn = nil
return false
})
}
}()
cliService := gen.NewFrameServiceClient(cli.conn)
stream, err := cliService.SendFrames(context.Background())
if err != nil {
log.Panic(err)
}
defer stream.CloseSend()
// var ticker = time.NewTicker(time.Millisecond * 1)
// buffer := bytes.NewBuffer(nil)
for buf := range cli.Frames {
// select {
// case <-cli.trigger:
// case <-ticker.C:
// }
// if !cli.Lock(func() bool {
// if len(cli.Frames) == 0 {
// return false
// }
// Compress(buf, cli.Frames)
// if len(cli.Frames) >= 2 {
// var countbuf = 0
// for _, frame := range cli.Frames {
// countbuf += len(frame)
// }
// log.Printf("src size: %d compressed size: %d", countbuf, len(buf.Bytes()))
// }
// cli.Frames = cli.Frames[:0]
// return true
// }) {
// continue
// }
// 发到对面的网卡
err = stream.Send(&gen.RequestFrames{
Frames: Compress(buf),
})
if err != nil {
log.Panic(err)
}
// buf.Reset()
}
}