diff --git a/netcard.go b/netcard.go index 6e7c06b..346bf41 100644 --- a/netcard.go +++ b/netcard.go @@ -133,7 +133,7 @@ func NewNetTunnel() *NetTunnel { if v, ok := nt.clients.Load(ipv4key); ok { client := v.(*RPCClient) client.CheckConnect() - client.Frame <- frame + client.Push(frame) log.Println(len(frame)) } } diff --git a/rpc_client.go b/rpc_client.go index e081bd6..aa8c1d0 100644 --- a/rpc_client.go +++ b/rpc_client.go @@ -6,25 +6,30 @@ import ( "log" gen "slimming/proto/gen" "sync" + "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type RPCClient struct { - Frame chan []byte + Frames [][]byte realAddr string conn *grpc.ClientConn lock sync.Mutex + + trigger chan bool } func NewRPCClient(realAddr string) *RPCClient { return &RPCClient{ realAddr: realAddr, - Frame: make(chan []byte, 1000), - conn: nil, - lock: sync.Mutex{}, + + Frames: make([][]byte, 0, 1000), + conn: nil, + lock: sync.Mutex{}, + trigger: make(chan bool), } } @@ -57,6 +62,15 @@ func (cli *RPCClient) CheckConnect() { go cli.run() } +func (cli *RPCClient) Push(frame []byte) { + cli.lock.Lock() + defer cli.lock.Unlock() + cli.Frames = append(cli.Frames, frame) + if len(cli.Frames) >= 1000 { + cli.trigger <- true + } +} + func (cli *RPCClient) run() { defer log.Println("rpcclient exit") defer func() { @@ -80,18 +94,20 @@ func (cli *RPCClient) run() { } defer stream.CloseSend() + var ticker = time.NewTicker(time.Millisecond * 20) var frames [][]byte - // var nextTime = time.Now().Add(time.Millisecond * 20) - // var now = time.Now() - buf := bytes.NewBuffer(nil) + for { - - frames = append(frames, <-cli.Frame) - + select { + case <-cli.trigger: + case <-ticker.C: + } buf.Reset() - Compress(buf, frames) - frames = nil + cli.Lock(func() bool { + Compress(buf, frames) + return true + }) // 发到对面的网卡 err = stream.Send(&gen.RequestFrames{ @@ -102,24 +118,5 @@ func (cli *RPCClient) run() { log.Panic(err) } - // if len(frames) >= 1000 || nextTime.After(now) { - // nextTime = now - - // // TODO: - // // Contact the server and print out its response. - // buf.Reset() - // Compress(buf, frames) - // frames = nil - - // // 发到对面的网卡 - // err = stream.Send(&gen.RequestFrames{ - // Frames: buf.Bytes(), - // }) - - // if err != nil { - // log.Panic(err) - // } - // } - } }