Merge branch 'develop' of https://gitee.com/fusenpack/fusenapi into feature/auth
This commit is contained in:
@@ -3,6 +3,7 @@ package logic
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"fusenapi/constants"
|
||||
"fusenapi/initalize"
|
||||
"fusenapi/model/gmodel"
|
||||
@@ -99,7 +100,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
|
||||
)
|
||||
isAuth, userInfo = l.checkAuth(svcCtx, r)
|
||||
if !isAuth {
|
||||
time.Sleep(time.Second * 2) //兼容下火狐
|
||||
time.Sleep(time.Second * 1) //兼容下火狐
|
||||
rsp := websocket_data.DataTransferData{
|
||||
T: constants.WEBSOCKET_UNAUTH,
|
||||
D: nil,
|
||||
@@ -134,7 +135,7 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.User
|
||||
publicMutex.Lock()
|
||||
defer publicMutex.Unlock()
|
||||
//生成连接唯一标识
|
||||
uniqueId := l.getUniqueId()
|
||||
uniqueId := l.getUniqueId(userInfo)
|
||||
ws := wsConnectItem{
|
||||
conn: conn,
|
||||
ctx: l.ctx,
|
||||
@@ -155,22 +156,27 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.User
|
||||
mapConnPool.Store(uniqueId, ws)
|
||||
go func() {
|
||||
//把连接成功消息发回去
|
||||
time.Sleep(time.Second * 2) //兼容下火狐
|
||||
b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId)
|
||||
_ = conn.WriteMessage(websocket.TextMessage, b)
|
||||
time.Sleep(time.Second * 1) //兼容下火狐
|
||||
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId))
|
||||
}()
|
||||
return ws
|
||||
}
|
||||
|
||||
// 获取唯一id
|
||||
func (l *DataTransferLogic) getUniqueId() string {
|
||||
uniqueId := uuid.New().String() + time.Now().Format("20060102150405")
|
||||
func (l *DataTransferLogic) getUniqueId(userInfo auth.UserInfo) string {
|
||||
//后面拼接上用户id
|
||||
uniqueId := uuid.New().String() + getUserPart(userInfo.UserId, userInfo.GuestId)
|
||||
if _, ok := mapConnPool.Load(uniqueId); ok {
|
||||
uniqueId = l.getUniqueId()
|
||||
uniqueId = l.getUniqueId(userInfo)
|
||||
}
|
||||
return uniqueId
|
||||
}
|
||||
|
||||
// 获取用户拼接部分
|
||||
func getUserPart(userId, guestId int64) string {
|
||||
return fmt.Sprintf("_%d_%d", userId, guestId)
|
||||
}
|
||||
|
||||
// 鉴权
|
||||
func (l *DataTransferLogic) checkAuth(svcCtx *svc.ServiceContext, r *http.Request) (isAuth bool, userInfo *auth.UserInfo) {
|
||||
// 解析JWT token,并对空用户进行判断
|
||||
@@ -286,7 +292,9 @@ func (w *wsConnectItem) sendToOutChan(data []byte) {
|
||||
case <-w.closeChan:
|
||||
return
|
||||
case w.outChan <- data:
|
||||
logx.Info("notify send render result to out chan")
|
||||
return
|
||||
case <-time.After(time.Second * 3): //阻塞超过3秒丢弃
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -304,8 +312,8 @@ func (w *wsConnectItem) respondDataFormat(msgType string, data interface{}) []by
|
||||
func (w *wsConnectItem) dealwithReciveData(data []byte) {
|
||||
var parseInfo websocket_data.DataTransferData
|
||||
if err := json.Unmarshal(data, &parseInfo); err != nil {
|
||||
logx.Error("invalid format of websocket message")
|
||||
w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket message:"+string(data))
|
||||
logx.Error("invalid format of websocket message:", err)
|
||||
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket message:"+string(data)))
|
||||
return
|
||||
}
|
||||
d, _ := json.Marshal(parseInfo.D)
|
||||
@@ -315,8 +323,8 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) {
|
||||
case constants.WEBSOCKET_RENDER_IMAGE:
|
||||
w.renderImage(d)
|
||||
//刷新重连请求恢复上次连接的标识
|
||||
case constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT:
|
||||
w.resumeLateConnect(d)
|
||||
case constants.WEBSOCKET_REQUEST_REUSE_LAST_CONNECT:
|
||||
w.reuseLastConnect(d)
|
||||
default:
|
||||
|
||||
}
|
||||
|
||||
@@ -28,39 +28,48 @@ type renderImageControlChanItem struct {
|
||||
func (w *wsConnectItem) renderImage(data []byte) {
|
||||
var renderImageData websocket_data.RenderImageReqMsg
|
||||
if err := json.Unmarshal(data, &renderImageData); err != nil {
|
||||
w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:"+string(data))
|
||||
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:"+string(data)))
|
||||
logx.Error("invalid format of websocket render image message", err)
|
||||
return
|
||||
}
|
||||
logx.Info("收到请求云渲染图片数据:", renderImageData)
|
||||
if renderImageData.RenderId == "" {
|
||||
w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:render_id is empty")
|
||||
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:render_id is empty"))
|
||||
logx.Error("invalid format of websocket render image message:render_id is empty")
|
||||
return
|
||||
}
|
||||
if renderImageData.RenderData.ProductId <= 0 {
|
||||
w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:product_id ")
|
||||
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:product_id "))
|
||||
logx.Error("invalid format of websocket render image message:product_id")
|
||||
return
|
||||
}
|
||||
if renderImageData.RenderData.TemplateTagId <= 0 {
|
||||
w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:template_tag_id ")
|
||||
logx.Error("invalid format of websocket render image message:template_tag_id")
|
||||
if renderImageData.RenderData.TemplateTag == "" {
|
||||
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "invalid format of websocket render image message:template_tag "))
|
||||
logx.Error("invalid format of websocket render image message:template_tag")
|
||||
return
|
||||
}
|
||||
//获取上传最近的logo
|
||||
userMaterial, err := w.allModels.FsUserMaterial.FindLatestOne(w.ctx, w.userId, w.guestId)
|
||||
if err != nil {
|
||||
if !errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
w.outChan <- w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "failed to get user logo")
|
||||
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "failed to get user logo"))
|
||||
logx.Error("failed to get user logo")
|
||||
return
|
||||
}
|
||||
//使用默认logo(写死一个默认)
|
||||
renderImageData.RenderData.Logo = "https://s3.us-west-1.amazonaws.com/storage.fusenpack.com/f5ccd11365099fa47a6316b1cd639f6dd6064dcd2d37c8d2fcd0a322160b33cc"
|
||||
//使用默认logo(id=0)
|
||||
userMaterialDefault, err := w.allModels.FsUserMaterial.FindOneById(w.ctx, 0)
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "default logo is not exists"))
|
||||
return
|
||||
}
|
||||
logx.Error("default logo is not exists")
|
||||
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_ERR_DATA_FORMAT, "failed to get default logo"))
|
||||
return
|
||||
}
|
||||
renderImageData.RenderData.Logo = *userMaterialDefault.ResourceUrl
|
||||
} else {
|
||||
renderImageData.RenderData.Logo = *userMaterial.ResourceUrl
|
||||
renderImageData.RenderData.UserMaterialId = userMaterial.Id
|
||||
}
|
||||
//用户id赋值
|
||||
renderImageData.RenderData.UserId = w.userId
|
||||
@@ -99,7 +108,7 @@ func (w *wsConnectItem) renderImage(data []byte) {
|
||||
}
|
||||
d, _ := json.Marshal(tmpData)
|
||||
//发送给对应的流水线组装数据
|
||||
if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, d); err != nil {
|
||||
if err = w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, d); err != nil {
|
||||
logx.Error("发送渲染任务数据到MQ失败:", string(d), "err:", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
package logic
|
||||
|
||||
import "fusenapi/constants"
|
||||
|
||||
// 刷新重连请求恢复上次连接的标识
|
||||
func (w *wsConnectItem) resumeLateConnect(data []byte) {
|
||||
clientId := string(data)
|
||||
//id长度不对
|
||||
if len(clientId) != 50 {
|
||||
rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "request id is invalid")
|
||||
w.sendToOutChan(rsp)
|
||||
return
|
||||
}
|
||||
publicMutex.Lock()
|
||||
defer publicMutex.Unlock()
|
||||
//存在是不能给他申请重新绑定
|
||||
if _, ok := mapConnPool.Load(clientId); ok {
|
||||
rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "id has bound by other connect ")
|
||||
w.sendToOutChan(rsp)
|
||||
return
|
||||
}
|
||||
//重新绑定
|
||||
w.uniqueId = clientId
|
||||
rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_SUCCESS, clientId)
|
||||
w.sendToOutChan(rsp)
|
||||
return
|
||||
}
|
||||
49
server/websocket/internal/logic/ws_reuse_last_connect.go
Normal file
49
server/websocket/internal/logic/ws_reuse_last_connect.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package logic
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fusenapi/constants"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
// 刷新重连请求恢复上次连接的标识
|
||||
func (w *wsConnectItem) reuseLastConnect(data []byte) {
|
||||
logx.Info("收到请求恢复上次连接标识数据:", string(data))
|
||||
var clientId string
|
||||
if err := json.Unmarshal(data, &clientId); err != nil {
|
||||
logx.Error(" invalid format of client id :", clientId)
|
||||
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "invalid format of client id"))
|
||||
return
|
||||
}
|
||||
lenClientId := len(clientId)
|
||||
//id长度不对
|
||||
if lenClientId > 100 {
|
||||
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "length of client id is to long"))
|
||||
return
|
||||
}
|
||||
//合成client后缀,不是同个后缀的不能复用
|
||||
userPart := getUserPart(w.userId, w.guestId)
|
||||
lenUserPart := len(userPart)
|
||||
if lenClientId <= lenUserPart {
|
||||
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "length of client id is to short"))
|
||||
return
|
||||
}
|
||||
//尾部不同不能复用
|
||||
if clientId[lenClientId-lenUserPart:] != userPart {
|
||||
w.sendToOutChan(w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "the client id is not belong you before"))
|
||||
return
|
||||
}
|
||||
publicMutex.Lock()
|
||||
defer publicMutex.Unlock()
|
||||
//存在是不能给他申请重新绑定
|
||||
if _, ok := mapConnPool.Load(clientId); ok {
|
||||
rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "id has bound by other connect ")
|
||||
w.sendToOutChan(rsp)
|
||||
return
|
||||
}
|
||||
//重新绑定
|
||||
w.uniqueId = clientId
|
||||
rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, clientId)
|
||||
w.sendToOutChan(rsp)
|
||||
return
|
||||
}
|
||||
Reference in New Issue
Block a user