Merge branch 'develop' of https://gitee.com/fusenpack/fusenapi into feature/auth
This commit is contained in:
@@ -13,6 +13,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/gorilla/websocket"
|
||||
|
||||
"context"
|
||||
@@ -62,6 +63,8 @@ var (
|
||||
}
|
||||
//websocket连接存储
|
||||
mapConnPool = sync.Map{}
|
||||
//公共互斥锁
|
||||
publicMutex sync.Mutex
|
||||
)
|
||||
|
||||
// 每个连接的连接基本属性
|
||||
@@ -72,7 +75,7 @@ type wsConnectItem struct {
|
||||
allModels *gmodel.AllModelsGen
|
||||
closeChan chan struct{} //ws连接关闭chan
|
||||
isClose bool //是否已经关闭
|
||||
uniqueId uint64 //ws连接唯一标识
|
||||
uniqueId string //ws连接唯一标识
|
||||
inChan chan []byte //接受消息缓冲通道
|
||||
outChan chan []byte //发送回客户端的消息
|
||||
mutex sync.Mutex //互斥锁
|
||||
@@ -96,7 +99,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
|
||||
)
|
||||
isAuth, userInfo = l.checkAuth(svcCtx, r)
|
||||
if !isAuth {
|
||||
time.Sleep(time.Second * 4) //兼容下火狐
|
||||
time.Sleep(time.Second * 2) //兼容下火狐
|
||||
rsp := websocket_data.DataTransferData{
|
||||
T: constants.WEBSOCKET_UNAUTH,
|
||||
D: nil,
|
||||
@@ -111,8 +114,27 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
|
||||
//测试的目前写死 39
|
||||
var userInfo auth.UserInfo
|
||||
userInfo.UserId = 39
|
||||
//设置连接
|
||||
ws := l.setConnPool(conn, userInfo)
|
||||
defer ws.close()
|
||||
//循环读客户端信息
|
||||
go ws.readLoop()
|
||||
//循环把数据发送给客户端
|
||||
go ws.writeLoop()
|
||||
//推消息到云渲染
|
||||
go ws.sendLoop()
|
||||
//操作连接中渲染任务的增加/删除
|
||||
go ws.operationRenderTask()
|
||||
//心跳
|
||||
ws.heartbeat()
|
||||
}
|
||||
|
||||
// 设置连接
|
||||
func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.UserInfo) wsConnectItem {
|
||||
publicMutex.Lock()
|
||||
defer publicMutex.Unlock()
|
||||
//生成连接唯一标识
|
||||
uniqueId := websocketIdGenerator.Get()
|
||||
uniqueId := l.getUniqueId()
|
||||
ws := wsConnectItem{
|
||||
conn: conn,
|
||||
ctx: l.ctx,
|
||||
@@ -131,23 +153,22 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
|
||||
}
|
||||
//保存连接
|
||||
mapConnPool.Store(uniqueId, ws)
|
||||
defer ws.close()
|
||||
go func() {
|
||||
//把连接成功消息发回去
|
||||
time.Sleep(time.Second * 4) //兼容下火狐
|
||||
time.Sleep(time.Second * 2) //兼容下火狐
|
||||
b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId)
|
||||
_ = conn.WriteMessage(websocket.TextMessage, b)
|
||||
}()
|
||||
//循环读客户端信息
|
||||
go ws.readLoop()
|
||||
//循环把数据发送给客户端
|
||||
go ws.writeLoop()
|
||||
//推消息到云渲染
|
||||
go ws.sendLoop()
|
||||
//操作连接中渲染任务的增加/删除
|
||||
go ws.operationRenderTask()
|
||||
//心跳
|
||||
ws.heartbeat()
|
||||
return ws
|
||||
}
|
||||
|
||||
// 获取唯一id
|
||||
func (l *DataTransferLogic) getUniqueId() string {
|
||||
uniqueId := uuid.New().String() + time.Now().Format("20060102150405")
|
||||
if _, ok := mapConnPool.Load(uniqueId); ok {
|
||||
uniqueId = l.getUniqueId()
|
||||
}
|
||||
return uniqueId
|
||||
}
|
||||
|
||||
// 鉴权
|
||||
@@ -293,6 +314,9 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) {
|
||||
//图片渲染
|
||||
case constants.WEBSOCKET_RENDER_IMAGE:
|
||||
w.renderImage(d)
|
||||
//刷新重连请求恢复上次连接的标识
|
||||
case constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT:
|
||||
w.resumeLateConnect(d)
|
||||
default:
|
||||
|
||||
}
|
||||
|
||||
@@ -14,6 +14,11 @@ type MqConsumerRenderResult struct {
|
||||
}
|
||||
|
||||
func (m *MqConsumerRenderResult) Run(ctx context.Context, data []byte) error {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
logx.Error("MqConsumerRenderResult panic:", err)
|
||||
}
|
||||
}()
|
||||
logx.Info("接收到MqConsumerRenderResult数据:", string(data))
|
||||
var parseInfo websocket_data.RenderImageNotify
|
||||
if err := json.Unmarshal(data, &parseInfo); err != nil {
|
||||
|
||||
27
server/websocket/internal/logic/ws_resume_last_connect.go
Normal file
27
server/websocket/internal/logic/ws_resume_last_connect.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package logic
|
||||
|
||||
import "fusenapi/constants"
|
||||
|
||||
// 刷新重连请求恢复上次连接的标识
|
||||
func (w *wsConnectItem) resumeLateConnect(data []byte) {
|
||||
clientId := string(data)
|
||||
//id长度不对
|
||||
if len(clientId) != 50 {
|
||||
rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "request id is invalid")
|
||||
w.sendToOutChan(rsp)
|
||||
return
|
||||
}
|
||||
publicMutex.Lock()
|
||||
defer publicMutex.Unlock()
|
||||
//存在是不能给他申请重新绑定
|
||||
if _, ok := mapConnPool.Load(clientId); ok {
|
||||
rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR, "id has bound by other connect ")
|
||||
w.sendToOutChan(rsp)
|
||||
return
|
||||
}
|
||||
//重新绑定
|
||||
w.uniqueId = clientId
|
||||
rsp := w.respondDataFormat(constants.WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_SUCCESS, clientId)
|
||||
w.sendToOutChan(rsp)
|
||||
return
|
||||
}
|
||||
Reference in New Issue
Block a user