From d0bd037d7b6e13f93915328482d69a381c8388eb Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Wed, 16 Aug 2023 16:20:16 +0800 Subject: [PATCH] fix --- .../internal/logic/datatransferlogic.go | 9 ++-- .../internal/logic/ws_render_image_logic.go | 52 +++++++++++++++---- 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 7288256b..8d6e2188 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -120,6 +120,8 @@ func (l *DataTransferLogic) DataTransfer(w http.ResponseWriter, r *http.Request) go ws.sendLoop() //操作连接中渲染任务的增加/删除 go ws.operationRenderTask() + //消费渲染缓冲队列 + go ws.renderImage() //心跳 ws.heartbeat() } @@ -135,13 +137,14 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.User logic: l, uniqueId: uniqueId, closeChan: make(chan struct{}, 1), - inChan: make(chan []byte, 1000), - outChan: make(chan []byte, 1000), + inChan: make(chan []byte, 100), + outChan: make(chan []byte, 100), userId: userInfo.UserId, guestId: userInfo.GuestId, renderProperty: renderProperty{ renderImageTask: make(map[string]string), renderImageTaskCtlChan: make(chan renderImageControlChanItem, 100), + renderChan: make(chan []byte, 100), }, } //保存连接 @@ -313,7 +316,7 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { switch parseInfo.T { //图片渲染 case constants.WEBSOCKET_RENDER_IMAGE: - w.renderImage(d) + w.sendToRenderChan(d) //刷新重连请求恢复上次连接的标识 case constants.WEBSOCKET_REQUEST_REUSE_LAST_CONNECT: w.reuseLastConnect(d) diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index 8d658bc6..4743c5ee 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -19,6 +19,7 @@ import ( type renderProperty struct { renderImageTask map[string]string //需要渲染的图片任务 key是taskId val 是renderId renderImageTaskCtlChan chan renderImageControlChanItem //渲染任务新增移除的控制通道 + renderChan chan []byte //渲染的缓冲队列 } // 渲染任务新增移除的控制通道的数据 @@ -28,13 +29,38 @@ type renderImageControlChanItem struct { RenderId string // map的val } +// 发送到渲染缓冲池 +func (w *wsConnectItem) sendToRenderChan(data []byte) { + select { + case <-w.closeChan: //已经关闭 + return + case w.renderProperty.renderChan <- data: + return + case <-time.After(time.Second * 3): + return + } +} + // 渲染发送到组装数据组装数据 -func (w *wsConnectItem) renderImage(data []byte) { +func (w *wsConnectItem) renderImage() { defer func() { if err := recover(); err != nil { logx.Error("renderImage panic:", err) } }() + for { + select { + case <-w.closeChan: //已关闭 + return + case data := <-w.renderProperty.renderChan: + w.consumeRenderCache(data) + } + } + +} + +// 消费渲染缓冲数据 +func (w *wsConnectItem) consumeRenderCache(data []byte) { var renderImageData websocket_data.RenderImageReqMsg if err := json.Unmarshal(data, &renderImageData); err != nil { w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:"+string(data))) @@ -111,11 +137,14 @@ func (w *wsConnectItem) renderImage(data []byte) { RenderId: renderImageData.RenderId, } //组装数据 - go w.assembleRenderData(taskId, renderImageData) + if err = w.assembleRenderData(taskId, renderImageData); err != nil { + logx.Error("组装数据失败:", err) + return + } } // 组装数据发送给unity -func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.RenderImageReqMsg) { +func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.RenderImageReqMsg) error { defer func() { if err := recover(); err != nil { logx.Error("assembleRenderData panic:", err) @@ -126,10 +155,10 @@ func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.Re if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { logx.Error("template info is not found") - return + return err } logx.Error("failed to get template info:", err) - return + return err } //获取刀版图 res, err := w.logic.svcCtx.Repositories.ImageHandle.LogoCombine(w.logic.ctx, &repositories.LogoCombineReq{ @@ -144,14 +173,14 @@ func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.Re }) if err != nil { logx.Error("合成刀版图失败:", err) - return + return err } combineImage := "" //刀版图 if res != nil && res.ResourceUrl != nil { combineImage = *res.ResourceUrl } else { logx.Error("合成刀版图失败,合成的刀版图是空指针:", err) - return + return err } logx.Info("合成刀版图成功:", *res.ResourceUrl) //获取渲染设置信息 @@ -159,10 +188,10 @@ func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.Re if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { logx.Error("element info is not found,model_id = ?", *productTemplate.ModelId) - return + return err } logx.Error("failed to get element list,", err) - return + return err } //组装数据 refletion := -1 @@ -174,7 +203,7 @@ func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.Re if element.Mode != nil && *element.Mode != "" { if err = json.Unmarshal([]byte(*element.Mode), &mode); err != nil { logx.Error("faile to parse element mode json:", err) - return + return err } } tempData := make([]map[string]interface{}, 0, 3) @@ -247,9 +276,10 @@ func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.Re _, err = curl.ApiCall(url, "POST", header, bytes.NewReader(p), time.Second*10) if err != nil { logx.Error("failed to send data to unity") - return + return err } logx.Info("发送到unity成功################") + return nil } // 操作连接中渲染任务的增加/删除