From 58c0580cad22fdfc2f9e3270f8e2d848ff788d4f Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Thu, 27 Jul 2023 16:27:42 +0800 Subject: [PATCH] fix --- constants/websocket.go | 1 + .../internal/logic/datatransferlogic.go | 22 ++++++++++-------- utils/id_generator/wesocket.go | 23 +++++++++++++++++++ 3 files changed, 36 insertions(+), 10 deletions(-) create mode 100644 utils/id_generator/wesocket.go diff --git a/constants/websocket.go b/constants/websocket.go index fcc6e372..7b4c9ca3 100644 --- a/constants/websocket.go +++ b/constants/websocket.go @@ -12,6 +12,7 @@ const ( WEBSOCKET_RENDER_IMAGE = "WEBSOCKET_RENDER_IMAGE" //数据格式错误 WEBSOCKET_ERR_DATA_FORMAT = "WEBSOCKET_ERR_DATA_FORMAT" + // ) // 云渲染完成通知api需要的签名字符串 diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 582624d1..1b25d9f8 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -7,7 +7,7 @@ import ( "fusenapi/constants" "fusenapi/server/websocket/internal/types" "fusenapi/utils/auth" - "github.com/google/uuid" + "fusenapi/utils/id_generator" "github.com/gorilla/websocket" "net/http" "sync" @@ -34,6 +34,8 @@ func NewDataTransferLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Data } var ( + //全局websocketid生成器 + websocketIdGenerator = id_generator.NewWebsocketId(1) //临时缓存对象池 buffPool = sync.Pool{ New: func() interface{} { @@ -61,7 +63,7 @@ type wsConnectItem struct { conn *websocket.Conn //websocket的连接 closeChan chan struct{} //ws连接关闭chan isClose bool //是否已经关闭 - flag string //ws连接唯一标识 + uniqueId uint64 //ws连接唯一标识 inChan chan []byte //接受消息缓冲通道 outChan chan []byte //发送回客户端的消息 mutex sync.Mutex //互斥锁 @@ -91,10 +93,10 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp return }*/ //生成连接唯一标识 - flag := uuid.New().String() + "time=" + time.Now().Format("15-04-05") + uniqueId := websocketIdGenerator.Get() ws := wsConnectItem{ conn: conn, - flag: flag, + uniqueId: uniqueId, closeChan: make(chan struct{}, 1), inChan: make(chan []byte, 100), outChan: make(chan []byte, 100), @@ -104,12 +106,12 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp }, } //保存连接 - mapConnPool.Store(flag, ws) + mapConnPool.Store(uniqueId, ws) defer ws.close() //把连接成功消息发回去 time.Sleep(time.Second) //兼容下火狐 rsp.T = constants.WEBSOCKET_CONNECT_SUCCESS - rsp.D = flag + rsp.D = uniqueId b, _ := json.Marshal(rsp) _ = conn.WriteMessage(websocket.TextMessage, b) //循环读客户端信息 @@ -155,7 +157,7 @@ func (w *wsConnectItem) heartbeat() { case <-tick: //发送心跳信息 if err := w.conn.WriteMessage(websocket.PongMessage, nil); err != nil { - logx.Error("发送心跳信息异常,关闭连接:", w.flag, err) + logx.Error("发送心跳信息异常,关闭连接:", w.uniqueId, err) w.close() return } @@ -167,16 +169,16 @@ func (w *wsConnectItem) heartbeat() { func (w *wsConnectItem) close() { w.mutex.Lock() defer w.mutex.Unlock() - logx.Info("websocket:", w.flag, " is closing...") + logx.Info("websocket:", w.uniqueId, " is closing...") //发送关闭信息 _ = w.conn.WriteMessage(websocket.CloseMessage, nil) w.conn.Close() - mapConnPool.Delete(w.flag) + mapConnPool.Delete(w.uniqueId) if !w.isClose { w.isClose = true close(w.closeChan) } - logx.Info("websocket:", w.flag, " is closed") + logx.Info("websocket:", w.uniqueId, " is closed") } // 读取输出返回给客户端 diff --git a/utils/id_generator/wesocket.go b/utils/id_generator/wesocket.go new file mode 100644 index 00000000..62f21adf --- /dev/null +++ b/utils/id_generator/wesocket.go @@ -0,0 +1,23 @@ +package id_generator + +import "sync" + +type WebsocketId struct { + nodeId uint64 + count uint64 + mu sync.Mutex +} + +func (wid *WebsocketId) Get() uint64 { + wid.mu.Lock() + defer wid.mu.Unlock() + wid.count++ + return (wid.count << 8) | wid.nodeId +} + +func NewWebsocketId(NodeId uint8) *WebsocketId { + return &WebsocketId{ + nodeId: uint64(NodeId), + count: 0, + } +}