diff --git a/constants/websocket.go b/constants/websocket.go index be28eb32..ced0c947 100644 --- a/constants/websocket.go +++ b/constants/websocket.go @@ -8,6 +8,12 @@ const ( WEBSOCKET_UNAUTH = "WEBSOCKET_UNAUTH" //ws连接成功 WEBSOCKET_CONNECT_SUCCESS = "WEBSOCKET_CONNECT_SUCCESS" + //请求恢复为上次连接的标识 + WEBSOCKET_REQUEST_RESUME_LAST_CONNECT = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT" + //请求恢复为上次连接的标识错误 + WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR" + //请求恢复为上次连接的标识成功 + WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_SUCCESS = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_SUCCESS" //渲染前数据组装 WEBSOCKET_RENDER_IMAGE_ASSEMBLE = "WEBSOCKET_RENDER_IMAGE_ASSEMBLE" //图片渲染 diff --git a/server/render/consumer/assemble_render_data.go b/server/render/consumer/assemble_render_data.go index 3c7f1ffa..5b041711 100644 --- a/server/render/consumer/assemble_render_data.go +++ b/server/render/consumer/assemble_render_data.go @@ -25,7 +25,7 @@ import ( type pythonApiRsp struct { Id string `json:"id"` //物料模板的id LogoUrl string `json:"logo_url"` //logo地址 - result string `json:"result"` //图片base64 + Result string `json:"result"` //图片base64 } // 消费渲染需要组装的数据 @@ -33,6 +33,11 @@ type MqConsumerRenderAssemble struct { } func (m *MqConsumerRenderAssemble) Run(ctx context.Context, data []byte) error { + defer func() { + if err := recover(); err != nil { + logx.Error("MqConsumerRenderAssemble panic:", err) + } + }() logx.Info("收到需要组装的消息:", string(data)) var parseInfo websocket_data.AssembleRenderData if err := json.Unmarshal(data, &parseInfo); err != nil { @@ -244,6 +249,7 @@ func getCombineImage(ctx context.Context, svcCtx *svc.ServiceContext, parseInfo logx.Error("failed to parse python api rsp:", err) return "", err } + //fmt.Println("××××××××××××××××××××:", pythonApiInfo) //上传刀版图 var upload = file.Upload{ Ctx: ctx, @@ -252,7 +258,7 @@ func getCombineImage(ctx context.Context, svcCtx *svc.ServiceContext, parseInfo } uploadRes, err := upload.UploadFileByBase64(&file.UploadBaseReq{ FileHash: combineHash, - FileData: pythonApiInfo.result, + FileData: pythonApiInfo.Result, UploadBucket: 1, ApiType: 2, UserId: parseInfo.RenderData.UserId, diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 44dc3e82..16f822aa 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -9,6 +9,7 @@ import ( "fusenapi/utils/auth" "fusenapi/utils/id_generator" "fusenapi/utils/websocket_data" + "github.com/google/uuid" "github.com/gorilla/websocket" "net/http" "sync" @@ -60,6 +61,8 @@ var ( } //websocket连接存储 mapConnPool = sync.Map{} + //公共互斥锁 + publicMutex sync.Mutex ) // 每个连接的连接基本属性 @@ -70,7 +73,7 @@ type wsConnectItem struct { allModels *gmodel.AllModelsGen closeChan chan struct{} //ws连接关闭chan isClose bool //是否已经关闭 - uniqueId uint64 //ws连接唯一标识 + uniqueId string //ws连接唯一标识 inChan chan []byte //接受消息缓冲通道 outChan chan []byte //发送回客户端的消息 mutex sync.Mutex //互斥锁 @@ -94,7 +97,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp ) isAuth, userInfo = l.checkAuth(svcCtx, r) if !isAuth { - time.Sleep(time.Second * 4) //兼容下火狐 + time.Sleep(time.Second * 2) //兼容下火狐 rsp := websocket_data.DataTransferData{ T: constants.WEBSOCKET_UNAUTH, D: nil, @@ -109,8 +112,27 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp //测试的目前写死 39 var userInfo auth.UserInfo userInfo.UserId = 39 + //设置连接 + ws := l.setConnPool(conn, userInfo) + defer ws.close() + //循环读客户端信息 + go ws.readLoop() + //循环把数据发送给客户端 + go ws.writeLoop() + //推消息到云渲染 + go ws.sendLoop() + //操作连接中渲染任务的增加/删除 + go ws.operationRenderTask() + //心跳 + ws.heartbeat() +} + +// 设置连接 +func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.UserInfo) wsConnectItem { + publicMutex.Lock() + defer publicMutex.Unlock() //生成连接唯一标识 - uniqueId := websocketIdGenerator.Get() + uniqueId := l.getUniqueId() ws := wsConnectItem{ conn: conn, ctx: l.ctx, @@ -129,23 +151,22 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp } //保存连接 mapConnPool.Store(uniqueId, ws) - defer ws.close() go func() { //把连接成功消息发回去 - time.Sleep(time.Second * 4) //兼容下火狐 + time.Sleep(time.Second * 2) //兼容下火狐 b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId) _ = conn.WriteMessage(websocket.TextMessage, b) }() - //循环读客户端信息 - go ws.readLoop() - //循环把数据发送给客户端 - go ws.writeLoop() - //推消息到云渲染 - go ws.sendLoop() - //操作连接中渲染任务的增加/删除 - go ws.operationRenderTask() - //心跳 - ws.heartbeat() + return ws +} + +// 获取唯一id +func (l *DataTransferLogic) getUniqueId() string { + uniqueId := uuid.New().String() + time.Now().Format("20060102150405") + if _, ok := mapConnPool.Load(uniqueId); ok { + uniqueId = l.getUniqueId() + } + return uniqueId } // 鉴权 @@ -291,6 +312,9 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { //图片渲染 case constants.WEBSOCKET_RENDER_IMAGE: w.renderImage(d) + //刷新重连请求恢复上次连接的标识 + case constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT: + w.resumeLateConnect(d) default: } diff --git a/server/websocket/internal/logic/mq_consumer.go b/server/websocket/internal/logic/mq_consumer.go index 7488e229..410dee3e 100644 --- a/server/websocket/internal/logic/mq_consumer.go +++ b/server/websocket/internal/logic/mq_consumer.go @@ -13,6 +13,11 @@ type MqConsumerRenderResult struct { } func (m *MqConsumerRenderResult) Run(ctx context.Context, data []byte) error { + defer func() { + if err := recover(); err != nil { + logx.Error("MqConsumerRenderResult panic:", err) + } + }() logx.Info("接收到MqConsumerRenderResult数据:", string(data)) var parseInfo websocket_data.RenderImageNotify if err := json.Unmarshal(data, &parseInfo); err != nil { diff --git a/server/websocket/internal/logic/ws_resume_last_connect.go b/server/websocket/internal/logic/ws_resume_last_connect.go new file mode 100644 index 00000000..39f29414 --- /dev/null +++ b/server/websocket/internal/logic/ws_resume_last_connect.go @@ -0,0 +1,27 @@ +package logic + +import "fusenapi/constants" + +// 刷新重连请求恢复上次连接的标识 +func (w *wsConnectItem) resumeLateConnect(data []byte) { + clientId := string(data) + //id长度不对 + if len(clientId) != 50 { + rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "request id is invalid") + w.sendToOutChan(rsp) + return + } + publicMutex.Lock() + defer publicMutex.Unlock() + //存在是不能给他申请重新绑定 + if _, ok := mapConnPool.Load(clientId); ok { + rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "id has bound by other connect ") + w.sendToOutChan(rsp) + return + } + //重新绑定 + w.uniqueId = clientId + rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_SUCCESS, clientId) + w.sendToOutChan(rsp) + return +}