Merge branch 'develop' of gitee.com:fusenpack/fusenapi into develop

This commit is contained in:
Hiven 2023-08-11 14:27:55 +08:00
commit 1a35669b95
5 changed files with 85 additions and 17 deletions

View File

@ -8,6 +8,12 @@ const (
WEBSOCKET_UNAUTH = "WEBSOCKET_UNAUTH" WEBSOCKET_UNAUTH = "WEBSOCKET_UNAUTH"
//ws连接成功 //ws连接成功
WEBSOCKET_CONNECT_SUCCESS = "WEBSOCKET_CONNECT_SUCCESS" WEBSOCKET_CONNECT_SUCCESS = "WEBSOCKET_CONNECT_SUCCESS"
//请求恢复为上次连接的标识
WEBSOCKET_REQUEST_RESUME_LAST_CONNECT = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT"
//请求恢复为上次连接的标识错误
WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR"
//请求恢复为上次连接的标识成功
WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_SUCCESS = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_SUCCESS"
//渲染前数据组装 //渲染前数据组装
WEBSOCKET_RENDER_IMAGE_ASSEMBLE = "WEBSOCKET_RENDER_IMAGE_ASSEMBLE" WEBSOCKET_RENDER_IMAGE_ASSEMBLE = "WEBSOCKET_RENDER_IMAGE_ASSEMBLE"
//图片渲染 //图片渲染

View File

@ -25,7 +25,7 @@ import (
type pythonApiRsp struct { type pythonApiRsp struct {
Id string `json:"id"` //物料模板的id Id string `json:"id"` //物料模板的id
LogoUrl string `json:"logo_url"` //logo地址 LogoUrl string `json:"logo_url"` //logo地址
result string `json:"result"` //图片base64 Result string `json:"result"` //图片base64
} }
// 消费渲染需要组装的数据 // 消费渲染需要组装的数据
@ -33,6 +33,11 @@ type MqConsumerRenderAssemble struct {
} }
func (m *MqConsumerRenderAssemble) Run(ctx context.Context, data []byte) error { func (m *MqConsumerRenderAssemble) Run(ctx context.Context, data []byte) error {
defer func() {
if err := recover(); err != nil {
logx.Error("MqConsumerRenderAssemble panic:", err)
}
}()
logx.Info("收到需要组装的消息:", string(data)) logx.Info("收到需要组装的消息:", string(data))
var parseInfo websocket_data.AssembleRenderData var parseInfo websocket_data.AssembleRenderData
if err := json.Unmarshal(data, &parseInfo); err != nil { if err := json.Unmarshal(data, &parseInfo); err != nil {
@ -244,6 +249,7 @@ func getCombineImage(ctx context.Context, svcCtx *svc.ServiceContext, parseInfo
logx.Error("failed to parse python api rsp:", err) logx.Error("failed to parse python api rsp:", err)
return "", err return "", err
} }
//fmt.Println("××××××××××××××××××××", pythonApiInfo)
//上传刀版图 //上传刀版图
var upload = file.Upload{ var upload = file.Upload{
Ctx: ctx, Ctx: ctx,
@ -252,7 +258,7 @@ func getCombineImage(ctx context.Context, svcCtx *svc.ServiceContext, parseInfo
} }
uploadRes, err := upload.UploadFileByBase64(&file.UploadBaseReq{ uploadRes, err := upload.UploadFileByBase64(&file.UploadBaseReq{
FileHash: combineHash, FileHash: combineHash,
FileData: pythonApiInfo.result, FileData: pythonApiInfo.Result,
UploadBucket: 1, UploadBucket: 1,
ApiType: 2, ApiType: 2,
UserId: parseInfo.RenderData.UserId, UserId: parseInfo.RenderData.UserId,

View File

@ -9,6 +9,7 @@ import (
"fusenapi/utils/auth" "fusenapi/utils/auth"
"fusenapi/utils/id_generator" "fusenapi/utils/id_generator"
"fusenapi/utils/websocket_data" "fusenapi/utils/websocket_data"
"github.com/google/uuid"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"net/http" "net/http"
"sync" "sync"
@ -60,6 +61,8 @@ var (
} }
//websocket连接存储 //websocket连接存储
mapConnPool = sync.Map{} mapConnPool = sync.Map{}
//公共互斥锁
publicMutex sync.Mutex
) )
// 每个连接的连接基本属性 // 每个连接的连接基本属性
@ -70,7 +73,7 @@ type wsConnectItem struct {
allModels *gmodel.AllModelsGen allModels *gmodel.AllModelsGen
closeChan chan struct{} //ws连接关闭chan closeChan chan struct{} //ws连接关闭chan
isClose bool //是否已经关闭 isClose bool //是否已经关闭
uniqueId uint64 //ws连接唯一标识 uniqueId string //ws连接唯一标识
inChan chan []byte //接受消息缓冲通道 inChan chan []byte //接受消息缓冲通道
outChan chan []byte //发送回客户端的消息 outChan chan []byte //发送回客户端的消息
mutex sync.Mutex //互斥锁 mutex sync.Mutex //互斥锁
@ -94,7 +97,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
) )
isAuth, userInfo = l.checkAuth(svcCtx, r) isAuth, userInfo = l.checkAuth(svcCtx, r)
if !isAuth { if !isAuth {
time.Sleep(time.Second * 4) //兼容下火狐 time.Sleep(time.Second * 2) //兼容下火狐
rsp := websocket_data.DataTransferData{ rsp := websocket_data.DataTransferData{
T: constants.WEBSOCKET_UNAUTH, T: constants.WEBSOCKET_UNAUTH,
D: nil, D: nil,
@ -109,8 +112,27 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
//测试的目前写死 39 //测试的目前写死 39
var userInfo auth.UserInfo var userInfo auth.UserInfo
userInfo.UserId = 39 userInfo.UserId = 39
//设置连接
ws := l.setConnPool(conn, userInfo)
defer ws.close()
//循环读客户端信息
go ws.readLoop()
//循环把数据发送给客户端
go ws.writeLoop()
//推消息到云渲染
go ws.sendLoop()
//操作连接中渲染任务的增加/删除
go ws.operationRenderTask()
//心跳
ws.heartbeat()
}
// 设置连接
func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.UserInfo) wsConnectItem {
publicMutex.Lock()
defer publicMutex.Unlock()
//生成连接唯一标识 //生成连接唯一标识
uniqueId := websocketIdGenerator.Get() uniqueId := l.getUniqueId()
ws := wsConnectItem{ ws := wsConnectItem{
conn: conn, conn: conn,
ctx: l.ctx, ctx: l.ctx,
@ -129,23 +151,22 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
} }
//保存连接 //保存连接
mapConnPool.Store(uniqueId, ws) mapConnPool.Store(uniqueId, ws)
defer ws.close()
go func() { go func() {
//把连接成功消息发回去 //把连接成功消息发回去
time.Sleep(time.Second * 4) //兼容下火狐 time.Sleep(time.Second * 2) //兼容下火狐
b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId) b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId)
_ = conn.WriteMessage(websocket.TextMessage, b) _ = conn.WriteMessage(websocket.TextMessage, b)
}() }()
//循环读客户端信息 return ws
go ws.readLoop() }
//循环把数据发送给客户端
go ws.writeLoop() // 获取唯一id
//推消息到云渲染 func (l *DataTransferLogic) getUniqueId() string {
go ws.sendLoop() uniqueId := uuid.New().String() + time.Now().Format("20060102150405")
//操作连接中渲染任务的增加/删除 if _, ok := mapConnPool.Load(uniqueId); ok {
go ws.operationRenderTask() uniqueId = l.getUniqueId()
//心跳 }
ws.heartbeat() return uniqueId
} }
// 鉴权 // 鉴权
@ -291,6 +312,9 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) {
//图片渲染 //图片渲染
case constants.WEBSOCKET_RENDER_IMAGE: case constants.WEBSOCKET_RENDER_IMAGE:
w.renderImage(d) w.renderImage(d)
//刷新重连请求恢复上次连接的标识
case constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT:
w.resumeLateConnect(d)
default: default:
} }

View File

@ -13,6 +13,11 @@ type MqConsumerRenderResult struct {
} }
func (m *MqConsumerRenderResult) Run(ctx context.Context, data []byte) error { func (m *MqConsumerRenderResult) Run(ctx context.Context, data []byte) error {
defer func() {
if err := recover(); err != nil {
logx.Error("MqConsumerRenderResult panic:", err)
}
}()
logx.Info("接收到MqConsumerRenderResult数据:", string(data)) logx.Info("接收到MqConsumerRenderResult数据:", string(data))
var parseInfo websocket_data.RenderImageNotify var parseInfo websocket_data.RenderImageNotify
if err := json.Unmarshal(data, &parseInfo); err != nil { if err := json.Unmarshal(data, &parseInfo); err != nil {

View File

@ -0,0 +1,27 @@
package logic
import "fusenapi/constants"
// 刷新重连请求恢复上次连接的标识
func (w *wsConnectItem) resumeLateConnect(data []byte) {
clientId := string(data)
//id长度不对
if len(clientId) != 50 {
rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "request id is invalid")
w.sendToOutChan(rsp)
return
}
publicMutex.Lock()
defer publicMutex.Unlock()
//存在是不能给他申请重新绑定
if _, ok := mapConnPool.Load(clientId); ok {
rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "id has bound by other connect ")
w.sendToOutChan(rsp)
return
}
//重新绑定
w.uniqueId = clientId
rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_SUCCESS, clientId)
w.sendToOutChan(rsp)
return
}