143 lines
2.5 KiB
Go
143 lines
2.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
gen "slimming/proto/gen"
|
|
"sync"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
)
|
|
|
|
type RPCClient struct {
|
|
Frames chan []byte
|
|
|
|
realAddr string
|
|
conn *grpc.ClientConn
|
|
lock sync.Mutex
|
|
|
|
trigger chan bool
|
|
}
|
|
|
|
func NewRPCClient(realAddr string) *RPCClient {
|
|
return &RPCClient{
|
|
realAddr: realAddr,
|
|
|
|
Frames: make(chan []byte),
|
|
conn: nil,
|
|
lock: sync.Mutex{},
|
|
trigger: make(chan bool),
|
|
}
|
|
}
|
|
|
|
func (cli *RPCClient) Lock(do func() bool) bool {
|
|
cli.lock.Lock()
|
|
defer cli.lock.Unlock()
|
|
return do()
|
|
}
|
|
|
|
func (cli *RPCClient) CheckConnect() {
|
|
|
|
if cli.Lock(func() bool {
|
|
return cli.conn != nil
|
|
}) {
|
|
return
|
|
}
|
|
|
|
log.Println("rpcclient connect", cli.realAddr)
|
|
// defer log.Println("rpcclient exit")
|
|
|
|
conn, err := grpc.Dial(cli.realAddr,
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
)
|
|
if err != nil {
|
|
log.Printf("did not connect: %v", err)
|
|
return
|
|
}
|
|
|
|
cli.Lock(func() bool {
|
|
cli.conn = conn
|
|
return true
|
|
})
|
|
|
|
go cli.run()
|
|
}
|
|
|
|
func (cli *RPCClient) Push(frame []byte) {
|
|
|
|
cli.Frames <- frame
|
|
// cli.Lock(func() bool {
|
|
// cli.Frames = append(cli.Frames, frame)
|
|
// if len(cli.Frames) >= 1000 {
|
|
// cli.trigger <- true
|
|
// }
|
|
// return true
|
|
// })
|
|
}
|
|
|
|
func (cli *RPCClient) run() {
|
|
defer log.Println("rpcclient exit")
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
log.Println("recover", err)
|
|
cli.Lock(func() bool {
|
|
err = cli.conn.Close()
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
cli.conn = nil
|
|
return false
|
|
})
|
|
}
|
|
}()
|
|
|
|
cliService := gen.NewFrameServiceClient(cli.conn)
|
|
stream, err := cliService.SendFrames(context.Background())
|
|
if err != nil {
|
|
log.Panic(err)
|
|
}
|
|
defer stream.CloseSend()
|
|
|
|
// var ticker = time.NewTicker(time.Millisecond * 1)
|
|
|
|
// buffer := bytes.NewBuffer(nil)
|
|
for buf := range cli.Frames {
|
|
|
|
// select {
|
|
// case <-cli.trigger:
|
|
// case <-ticker.C:
|
|
// }
|
|
|
|
// if !cli.Lock(func() bool {
|
|
// if len(cli.Frames) == 0 {
|
|
// return false
|
|
// }
|
|
|
|
// Compress(buf, cli.Frames)
|
|
|
|
// if len(cli.Frames) >= 2 {
|
|
// var countbuf = 0
|
|
// for _, frame := range cli.Frames {
|
|
// countbuf += len(frame)
|
|
// }
|
|
// log.Printf("src size: %d compressed size: %d", countbuf, len(buf.Bytes()))
|
|
// }
|
|
|
|
// cli.Frames = cli.Frames[:0]
|
|
// return true
|
|
// }) {
|
|
// continue
|
|
// }
|
|
|
|
// 发到对面的网卡
|
|
err = stream.Send(&gen.RequestFrames{
|
|
Frames: buf,
|
|
})
|
|
if err != nil {
|
|
log.Panic(err)
|
|
}
|
|
// buf.Reset()
|
|
}
|
|
}
|