This commit is contained in:
laodaming 2023-08-10 19:41:25 +08:00
parent d101d241f4
commit b085f3dbef

View File

@ -9,6 +9,7 @@ import (
"fusenapi/utils/auth"
"fusenapi/utils/id_generator"
"fusenapi/utils/websocket_data"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"net/http"
"sync"
@ -60,6 +61,8 @@ var (
}
//websocket连接存储
mapConnPool = sync.Map{}
//公共互斥锁
publicMutex sync.Mutex
)
// 每个连接的连接基本属性
@ -70,7 +73,7 @@ type wsConnectItem struct {
allModels *gmodel.AllModelsGen
closeChan chan struct{} //ws连接关闭chan
isClose bool //是否已经关闭
uniqueId uint64 //ws连接唯一标识
uniqueId string //ws连接唯一标识
inChan chan []byte //接受消息缓冲通道
outChan chan []byte //发送回客户端的消息
mutex sync.Mutex //互斥锁
@ -94,7 +97,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
)
isAuth, userInfo = l.checkAuth(svcCtx, r)
if !isAuth {
time.Sleep(time.Second * 4) //兼容下火狐
time.Sleep(time.Second * 2) //兼容下火狐
rsp := websocket_data.DataTransferData{
T: constants.WEBSOCKET_UNAUTH,
D: nil,
@ -109,8 +112,32 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
//测试的目前写死 39
var userInfo auth.UserInfo
userInfo.UserId = 39
ws := l.setConnPool(conn, userInfo)
defer ws.close()
go func() {
//把连接成功消息发回去
time.Sleep(time.Second * 2) //兼容下火狐
b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId)
_ = conn.WriteMessage(websocket.TextMessage, b)
}()
//循环读客户端信息
go ws.readLoop()
//循环把数据发送给客户端
go ws.writeLoop()
//推消息到云渲染
go ws.sendLoop()
//操作连接中渲染任务的增加/删除
go ws.operationRenderTask()
//心跳
ws.heartbeat()
}
// 获取唯一id
func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo auth.UserInfo) wsConnectItem {
publicMutex.Lock()
defer publicMutex.Unlock()
//生成连接唯一标识
uniqueId := websocketIdGenerator.Get()
uniqueId := l.getUniqueId()
ws := wsConnectItem{
conn: conn,
ctx: l.ctx,
@ -129,23 +156,14 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp
}
//保存连接
mapConnPool.Store(uniqueId, ws)
defer ws.close()
go func() {
//把连接成功消息发回去
time.Sleep(time.Second * 4) //兼容下火狐
b := ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId)
_ = conn.WriteMessage(websocket.TextMessage, b)
}()
//循环读客户端信息
go ws.readLoop()
//循环把数据发送给客户端
go ws.writeLoop()
//推消息到云渲染
go ws.sendLoop()
//操作连接中渲染任务的增加/删除
go ws.operationRenderTask()
//心跳
ws.heartbeat()
return ws
}
func (l *DataTransferLogic) getUniqueId() string {
uniqueId := uuid.New().String() + time.Now().Format("20060102150405")
if _, ok := mapConnPool.Load(uniqueId); ok {
uniqueId = l.getUniqueId()
}
return uniqueId
}
// 鉴权