Merge branch 'develop' of gitee.com:fusenpack/fusenapi into develop

This commit is contained in:
Hiven 2023-08-16 16:33:34 +08:00
commit 9bcb95e540
2 changed files with 52 additions and 16 deletions

View File

@ -120,6 +120,8 @@ func (l *DataTransferLogic) DataTransfer(w http.ResponseWriter, r *http.Request)
go ws.sendLoop() go ws.sendLoop()
//操作连接中渲染任务的增加/删除 //操作连接中渲染任务的增加/删除
go ws.operationRenderTask() go ws.operationRenderTask()
//消费渲染缓冲队列
go ws.renderImage()
//心跳 //心跳
ws.heartbeat() ws.heartbeat()
} }
@ -135,13 +137,14 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.User
logic: l, logic: l,
uniqueId: uniqueId, uniqueId: uniqueId,
closeChan: make(chan struct{}, 1), closeChan: make(chan struct{}, 1),
inChan: make(chan []byte, 1000), inChan: make(chan []byte, 100),
outChan: make(chan []byte, 1000), outChan: make(chan []byte, 100),
userId: userInfo.UserId, userId: userInfo.UserId,
guestId: userInfo.GuestId, guestId: userInfo.GuestId,
renderProperty: renderProperty{ renderProperty: renderProperty{
renderImageTask: make(map[string]string), renderImageTask: make(map[string]string),
renderImageTaskCtlChan: make(chan renderImageControlChanItem, 100), renderImageTaskCtlChan: make(chan renderImageControlChanItem, 100),
renderChan: make(chan []byte, 100),
}, },
} }
//保存连接 //保存连接
@ -313,7 +316,7 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) {
switch parseInfo.T { switch parseInfo.T {
//图片渲染 //图片渲染
case constants.WEBSOCKET_RENDER_IMAGE: case constants.WEBSOCKET_RENDER_IMAGE:
w.renderImage(d) w.sendToRenderChan(d)
//刷新重连请求恢复上次连接的标识 //刷新重连请求恢复上次连接的标识
case constants.WEBSOCKET_REQUEST_REUSE_LAST_CONNECT: case constants.WEBSOCKET_REQUEST_REUSE_LAST_CONNECT:
w.reuseLastConnect(d) w.reuseLastConnect(d)

View File

@ -19,6 +19,7 @@ import (
type renderProperty struct { type renderProperty struct {
renderImageTask map[string]string //需要渲染的图片任务 key是taskId val 是renderId renderImageTask map[string]string //需要渲染的图片任务 key是taskId val 是renderId
renderImageTaskCtlChan chan renderImageControlChanItem //渲染任务新增移除的控制通道 renderImageTaskCtlChan chan renderImageControlChanItem //渲染任务新增移除的控制通道
renderChan chan []byte //渲染的缓冲队列
} }
// 渲染任务新增移除的控制通道的数据 // 渲染任务新增移除的控制通道的数据
@ -28,13 +29,38 @@ type renderImageControlChanItem struct {
RenderId string // map的val RenderId string // map的val
} }
// 发送到渲染缓冲池
func (w *wsConnectItem) sendToRenderChan(data []byte) {
select {
case <-w.closeChan: //已经关闭
return
case w.renderProperty.renderChan <- data:
return
case <-time.After(time.Second * 3):
return
}
}
// 渲染发送到组装数据组装数据 // 渲染发送到组装数据组装数据
func (w *wsConnectItem) renderImage(data []byte) { func (w *wsConnectItem) renderImage() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logx.Error("renderImage panic:", err) logx.Error("renderImage panic:", err)
} }
}() }()
for {
select {
case <-w.closeChan: //已关闭
return
case data := <-w.renderProperty.renderChan:
w.consumeRenderCache(data)
}
}
}
// 消费渲染缓冲数据
func (w *wsConnectItem) consumeRenderCache(data []byte) {
var renderImageData websocket_data.RenderImageReqMsg var renderImageData websocket_data.RenderImageReqMsg
if err := json.Unmarshal(data, &renderImageData); err != nil { if err := json.Unmarshal(data, &renderImageData); err != nil {
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:"+string(data))) w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:"+string(data)))
@ -84,8 +110,11 @@ func (w *wsConnectItem) renderImage(data []byte) {
renderImageData.RenderData.UserId = w.userId renderImageData.RenderData.UserId = w.userId
renderImageData.RenderData.GuestId = w.guestId renderImageData.RenderData.GuestId = w.guestId
//生成任务id //生成任务id(需要把user_id,guest_id设为0)
taskId := hash.JsonHashKey(renderImageData.RenderData) hashVal := renderImageData.RenderData
hashVal.UserId = 0
hashVal.GuestId = 0
taskId := hash.JsonHashKey(hashVal)
//查询有没有缓存的资源,有就返回###################### //查询有没有缓存的资源,有就返回######################
resource, err := w.logic.svcCtx.AllModels.FsResource.FindOneById(w.logic.ctx, taskId) resource, err := w.logic.svcCtx.AllModels.FsResource.FindOneById(w.logic.ctx, taskId)
if err != nil { if err != nil {
@ -111,11 +140,14 @@ func (w *wsConnectItem) renderImage(data []byte) {
RenderId: renderImageData.RenderId, RenderId: renderImageData.RenderId,
} }
//组装数据 //组装数据
go w.assembleRenderData(taskId, renderImageData) if err = w.assembleRenderData(taskId, renderImageData); err != nil {
logx.Error("组装数据失败:", err)
return
}
} }
// 组装数据发送给unity // 组装数据发送给unity
func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.RenderImageReqMsg) { func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.RenderImageReqMsg) error {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logx.Error("assembleRenderData panic:", err) logx.Error("assembleRenderData panic:", err)
@ -126,10 +158,10 @@ func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.Re
if err != nil { if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) { if errors.Is(err, gorm.ErrRecordNotFound) {
logx.Error("template info is not found") logx.Error("template info is not found")
return return err
} }
logx.Error("failed to get template info:", err) logx.Error("failed to get template info:", err)
return return err
} }
//获取刀版图 //获取刀版图
res, err := w.logic.svcCtx.Repositories.ImageHandle.LogoCombine(w.logic.ctx, &repositories.LogoCombineReq{ res, err := w.logic.svcCtx.Repositories.ImageHandle.LogoCombine(w.logic.ctx, &repositories.LogoCombineReq{
@ -144,14 +176,14 @@ func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.Re
}) })
if err != nil { if err != nil {
logx.Error("合成刀版图失败:", err) logx.Error("合成刀版图失败:", err)
return return err
} }
combineImage := "" //刀版图 combineImage := "" //刀版图
if res != nil && res.ResourceUrl != nil { if res != nil && res.ResourceUrl != nil {
combineImage = *res.ResourceUrl combineImage = *res.ResourceUrl
} else { } else {
logx.Error("合成刀版图失败,合成的刀版图是空指针:", err) logx.Error("合成刀版图失败,合成的刀版图是空指针:", err)
return return err
} }
logx.Info("合成刀版图成功:", *res.ResourceUrl) logx.Info("合成刀版图成功:", *res.ResourceUrl)
//获取渲染设置信息 //获取渲染设置信息
@ -159,10 +191,10 @@ func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.Re
if err != nil { if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) { if errors.Is(err, gorm.ErrRecordNotFound) {
logx.Error("element info is not found,model_id = ?", *productTemplate.ModelId) logx.Error("element info is not found,model_id = ?", *productTemplate.ModelId)
return return err
} }
logx.Error("failed to get element list,", err) logx.Error("failed to get element list,", err)
return return err
} }
//组装数据 //组装数据
refletion := -1 refletion := -1
@ -174,7 +206,7 @@ func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.Re
if element.Mode != nil && *element.Mode != "" { if element.Mode != nil && *element.Mode != "" {
if err = json.Unmarshal([]byte(*element.Mode), &mode); err != nil { if err = json.Unmarshal([]byte(*element.Mode), &mode); err != nil {
logx.Error("faile to parse element mode json:", err) logx.Error("faile to parse element mode json:", err)
return return err
} }
} }
tempData := make([]map[string]interface{}, 0, 3) tempData := make([]map[string]interface{}, 0, 3)
@ -247,9 +279,10 @@ func (w *wsConnectItem) assembleRenderData(taskId string, info websocket_data.Re
_, err = curl.ApiCall(url, "POST", header, bytes.NewReader(p), time.Second*10) _, err = curl.ApiCall(url, "POST", header, bytes.NewReader(p), time.Second*10)
if err != nil { if err != nil {
logx.Error("failed to send data to unity") logx.Error("failed to send data to unity")
return return err
} }
logx.Info("发送到unity成功################") logx.Info("发送到unity成功################")
return nil
} }
// 操作连接中渲染任务的增加/删除 // 操作连接中渲染任务的增加/删除