diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 44dc3e82..cbd1a189 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -9,6 +9,7 @@ import ( "fusenapi/utils/auth" "fusenapi/utils/id_generator" "fusenapi/utils/websocket_data" + "github.com/google/uuid" "github.com/gorilla/websocket" "net/http" "sync" @@ -60,6 +61,8 @@ var ( } //websocket连接存储 mapConnPool = sync.Map{} + //公共互斥锁 + publicMutex sync.Mutex ) // 每个连接的连接基本属性 @@ -70,7 +73,7 @@ type wsConnectItem struct { allModels *gmodel.AllModelsGen closeChan chan struct{} //ws连接关闭chan isClose bool //是否已经关闭 - uniqueId uint64 //ws连接唯一标识 + uniqueId string //ws连接唯一标识 inChan chan []byte //接受消息缓冲通道 outChan chan []byte //发送回客户端的消息 mutex sync.Mutex //互斥锁 @@ -94,7 +97,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp ) isAuth, userInfo = l.checkAuth(svcCtx, r) if !isAuth { - time.Sleep(time.Second * 4) //兼容下火狐 + time.Sleep(time.Second * 2) //兼容下火狐 rsp := websocket_data.DataTransferData{ T: constants.WEBSOCKET_UNAUTH, D: nil, @@ -109,8 +112,32 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp //测试的目前写死 39 var userInfo auth.UserInfo userInfo.UserId = 39 + ws := l.setConnPool(conn, userInfo) + defer ws.close() + go func() { + //把连接成功消息发回去 + time.Sleep(time.Second * 2) //兼容下火狐 + b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId) + _ = conn.WriteMessage(websocket.TextMessage, b) + }() + //循环读客户端信息 + go ws.readLoop() + //循环把数据发送给客户端 + go ws.writeLoop() + //推消息到云渲染 + go ws.sendLoop() + //操作连接中渲染任务的增加/删除 + go ws.operationRenderTask() + //心跳 + ws.heartbeat() +} + +// 获取唯一id +func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.UserInfo) wsConnectItem { + publicMutex.Lock() + defer publicMutex.Unlock() //生成连接唯一标识 - uniqueId := websocketIdGenerator.Get() + uniqueId := l.getUniqueId() ws := wsConnectItem{ conn: conn, ctx: l.ctx, @@ -129,23 +156,14 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp } //保存连接 mapConnPool.Store(uniqueId, ws) - defer ws.close() - go func() { - //把连接成功消息发回去 - time.Sleep(time.Second * 4) //兼容下火狐 - b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId) - _ = conn.WriteMessage(websocket.TextMessage, b) - }() - //循环读客户端信息 - go ws.readLoop() - //循环把数据发送给客户端 - go ws.writeLoop() - //推消息到云渲染 - go ws.sendLoop() - //操作连接中渲染任务的增加/删除 - go ws.operationRenderTask() - //心跳 - ws.heartbeat() + return ws +} +func (l *DataTransferLogic) getUniqueId() string { + uniqueId := uuid.New().String() + time.Now().Format("20060102150405") + if _, ok := mapConnPool.Load(uniqueId); ok { + uniqueId = l.getUniqueId() + } + return uniqueId } // 鉴权