diff --git a/server/render/consumer/assemble_render_data.go b/server/render/consumer/assemble_render_data.go index dce9488a..6e6cc6a3 100644 --- a/server/render/consumer/assemble_render_data.go +++ b/server/render/consumer/assemble_render_data.go @@ -1,6 +1,8 @@ package consumer import ( + "encoding/json" + "fusenapi/utils/websocket_data" "github.com/zeromicro/go-zero/core/logx" ) @@ -10,5 +12,11 @@ type MqConsumerRenderAssemble struct { func (m *MqConsumerRenderAssemble) Run(data []byte) error { logx.Info("收到需要组装的消息:", string(data)) + var parseInfo websocket_data.AssembleRenderData + if err := json.Unmarshal(data, &parseInfo); err != nil { + logx.Error("MqConsumerRenderAssemble数据格式错误:", err) + return nil //不返回错误就删除消息 + } + return nil } diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 66bb21b8..d999d8e3 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -65,12 +65,13 @@ var ( type wsConnectItem struct { conn *websocket.Conn //websocket的连接 rabbitMq *initalize.RabbitMqHandle - closeChan chan struct{} //ws连接关闭chan - isClose bool //是否已经关闭 - uniqueId uint64 //ws连接唯一标识 - inChan chan []byte //接受消息缓冲通道 - outChan chan []byte //发送回客户端的消息 - mutex sync.Mutex //互斥锁 + closeChan chan struct{} //ws连接关闭chan + isClose bool //是否已经关闭 + uniqueId uint64 //ws连接唯一标识 + inChan chan []byte //接受消息缓冲通道 + outChan chan []byte //发送回客户端的消息 + mutex sync.Mutex //互斥锁 + userId int64 renderProperty renderProperty //扩展云渲染属性 } @@ -83,10 +84,14 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp } defer conn.Close() //鉴权不成功10秒后断开 - /*isAuth, _ := l.checkAuth(svcCtx, r) + var ( + userInfo *auth.UserInfo + isAuth bool + ) + isAuth, userInfo = l.checkAuth(svcCtx, r) if !isAuth { time.Sleep(time.Second) //兼容下火狐 - rsp := types.DataTransferData{ + rsp := websocket_data.DataTransferData{ T: constants.WEBSOCKET_UNAUTH, D: nil, } @@ -96,7 +101,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp //发送关闭信息 _ = conn.WriteMessage(websocket.CloseMessage, nil) return - }*/ + } //生成连接唯一标识 uniqueId := websocketIdGenerator.Get() ws := wsConnectItem{ @@ -111,6 +116,11 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp renderImageTaskCtlChan: make(chan renderImageControlChanItem, 100), }, } + if userInfo.UserId > 0 { + ws.userId = userInfo.UserId + } else { + ws.userId = userInfo.GuestId + } //保存连接 mapConnPool.Store(uniqueId, ws) defer ws.close() diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index 0b750cfb..d4974870 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -39,6 +39,7 @@ func (w *wsConnectItem) renderImage(data []byte) { } tmpData := websocket_data.AssembleRenderData{ TaskId: taskId, + UserId: w.userId, RenderData: renderImageData.RenderData, } d, _ := json.Marshal(tmpData) diff --git a/utils/websocket_data/render_data.go b/utils/websocket_data/render_data.go index 3adefdcf..09a203da 100644 --- a/utils/websocket_data/render_data.go +++ b/utils/websocket_data/render_data.go @@ -37,5 +37,6 @@ type ThirdPartyLoginRspMsg struct { // 发送到渲染组装的mq数据 type AssembleRenderData struct { TaskId string `json:"task_id"` + UserId int64 `json:"user_id"` RenderData RenderData `json:"render_data"` }