slimming/rpc_client.go

143 lines
2.5 KiB
Go
Raw Normal View History

2022-08-31 09:52:16 +00:00
package main
import (
"context"
"log"
gen "slimming/proto/gen"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type RPCClient struct {
2022-09-01 08:09:20 +00:00
Frames chan []byte
2022-08-31 09:52:16 +00:00
realAddr string
conn *grpc.ClientConn
lock sync.Mutex
2022-09-01 06:59:30 +00:00
trigger chan bool
2022-08-31 09:52:16 +00:00
}
func NewRPCClient(realAddr string) *RPCClient {
return &RPCClient{
realAddr: realAddr,
2022-09-01 06:59:30 +00:00
2022-09-01 08:09:20 +00:00
Frames: make(chan []byte),
2022-09-01 06:59:30 +00:00
conn: nil,
lock: sync.Mutex{},
trigger: make(chan bool),
2022-08-31 09:52:16 +00:00
}
}
func (cli *RPCClient) Lock(do func() bool) bool {
cli.lock.Lock()
defer cli.lock.Unlock()
return do()
}
func (cli *RPCClient) CheckConnect() {
if cli.Lock(func() bool {
return cli.conn != nil
}) {
return
}
log.Println("rpcclient connect", cli.realAddr)
// defer log.Println("rpcclient exit")
conn, err := grpc.Dial(cli.realAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Printf("did not connect: %v", err)
return
}
2022-09-01 07:09:27 +00:00
cli.Lock(func() bool {
cli.conn = conn
return true
})
2022-08-31 09:52:16 +00:00
go cli.run()
}
2022-09-01 06:59:30 +00:00
func (cli *RPCClient) Push(frame []byte) {
2022-09-01 08:09:20 +00:00
cli.Frames <- frame
// cli.Lock(func() bool {
// cli.Frames = append(cli.Frames, frame)
// if len(cli.Frames) >= 1000 {
// cli.trigger <- true
// }
// return true
// })
2022-09-01 06:59:30 +00:00
}
2022-08-31 09:52:16 +00:00
func (cli *RPCClient) run() {
defer log.Println("rpcclient exit")
defer func() {
if err := recover(); err != nil {
log.Println("recover", err)
cli.Lock(func() bool {
err = cli.conn.Close()
if err != nil {
log.Println(err)
}
cli.conn = nil
return false
})
}
}()
cliService := gen.NewFrameServiceClient(cli.conn)
stream, err := cliService.SendFrames(context.Background())
if err != nil {
log.Panic(err)
}
defer stream.CloseSend()
2022-09-01 08:09:20 +00:00
// var ticker = time.NewTicker(time.Millisecond * 1)
2022-09-01 07:02:31 +00:00
2022-09-01 08:09:20 +00:00
// buf := bytes.NewBuffer(nil)
for buf := range cli.Frames {
2022-09-01 07:09:27 +00:00
2022-09-01 08:09:20 +00:00
// select {
// case <-cli.trigger:
// case <-ticker.C:
// }
2022-09-01 07:05:44 +00:00
2022-09-01 08:09:20 +00:00
// if !cli.Lock(func() bool {
// if len(cli.Frames) == 0 {
// return false
// }
2022-09-01 07:25:37 +00:00
2022-09-01 08:09:20 +00:00
// Compress(buf, cli.Frames)
2022-09-01 07:25:37 +00:00
2022-09-01 08:09:20 +00:00
// if len(cli.Frames) >= 2 {
// var countbuf = 0
// for _, frame := range cli.Frames {
// countbuf += len(frame)
// }
// log.Printf("src size: %d compressed size: %d", countbuf, len(buf.Bytes()))
// }
2022-09-01 07:25:37 +00:00
2022-09-01 08:09:20 +00:00
// cli.Frames = cli.Frames[:0]
// return true
// }) {
// continue
// }
2022-08-31 09:52:16 +00:00
2022-09-01 06:42:12 +00:00
// 发到对面的网卡
err = stream.Send(&gen.RequestFrames{
2022-09-01 08:09:20 +00:00
Frames: buf,
2022-09-01 06:42:12 +00:00
})
if err != nil {
log.Panic(err)
2022-08-31 09:52:16 +00:00
}
2022-09-01 08:09:20 +00:00
// buf.Reset()
2022-08-31 09:52:16 +00:00
}
}