From 35eb087b5a106d136427232ab623e2256a1dc403 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Thu, 7 Sep 2023 15:54:24 +0800 Subject: [PATCH] fix --- .../internal/logic/datatransferlogic.go | 136 +---------------- .../internal/logic/ws_connect_statistics.go | 46 ++++++ .../internal/logic/ws_render_image.go | 7 + .../internal/logic/ws_user_connect_pool.go | 138 ++++++++++++++++++ server/websocket/websocket.go | 2 + 5 files changed, 197 insertions(+), 132 deletions(-) create mode 100644 server/websocket/internal/logic/ws_connect_statistics.go create mode 100644 server/websocket/internal/logic/ws_user_connect_pool.go diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index f4e057fb..6f68bb4b 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -6,7 +6,6 @@ import ( "encoding/hex" "encoding/json" "errors" - "fmt" "fusenapi/constants" "fusenapi/utils/auth" "fusenapi/utils/basic" @@ -67,18 +66,10 @@ var ( } //websocket连接存储 mapConnPool = sync.Map{} - //用户标识的连接(白板用户不存) - mapUserConnPool = make(map[string]map[string]struct{}) //key是user_id +"_"+guest_id (val是个普通map,存储这个用户的所有连接标识) - //用户标识的连接增删操作队列 - userConnPoolCtlChan = make(chan userConnPoolCtlChanItem, 500) //每个websocket连接入口缓冲队列长度默认值 websocketInChanLen = 500 //每个websocket连接出口缓冲队列长度默认值 websocketOutChanLen = 500 - //每个websocket连接渲染任务调度队列长度默认值(添加任务/删除任务/修改任务属性)缓冲队列长度(该队列用于避免map并发读写冲突) - renderImageTaskCtlChanLen = 100 - //每个websocket渲染任务缓冲队列长度默认值 - renderChanLen = 500 //是否开启debug openDebug = true //允许跨域的origin @@ -216,132 +207,11 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究) } ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId)) + //发送累加统计连接书 + increaseWebsocketConnectCount() return ws, nil } -// 添加用户索引池ws连接 -func createUserConnPoolElement(userId, guestId int64, uniqueId string) { - data := userConnPoolCtlChanItem{ - userId: userId, - guestId: guestId, - uniqueId: uniqueId, - message: nil, - messageType: "", - option: 1, - } - select { - case userConnPoolCtlChan <- data: - return - case <-time.After(time.Millisecond * 200): - return - } -} - -// 从用户索引池删除ws连接 -func deleteUserConnPoolElement(userId, guestId int64, uniqueId string) { - data := userConnPoolCtlChanItem{ - userId: userId, - guestId: guestId, - uniqueId: uniqueId, - message: nil, - messageType: "", - option: 0, - } - select { - case userConnPoolCtlChan <- data: - return - case <-time.After(time.Millisecond * 200): - return - } -} - -// 根据用户索引发现链接并发送(广播)消息到出口队列 -func sendToOutChanByUserIndex(userId, guestId int64, message []byte) { - data := userConnPoolCtlChanItem{ - userId: userId, - guestId: guestId, - uniqueId: "", - message: message, - option: 2, - } - select { - case userConnPoolCtlChan <- data: - return - case <-time.After(time.Millisecond * 200): - return - } -} - -// 消费用户索引创建/删除/发送消息中的任务数据 -func ConsumeUserConnPoolCtlChanData(ctx context.Context) { - defer func() { - if err := recover(); err != nil { - logx.Error("ConsumeUserConnPoolCtlChanData panic:", err) - } - }() - go func() { - select { - case <-ctx.Done(): - panic("ConsumeUserConnPoolCtlChanData ctx deadline") - } - }() - var ( - data userConnPoolCtlChanItem - userKey string - ) - for { - select { - case data = <-userConnPoolCtlChan: - userKey = getmapUserConnPoolUniqueId(data.userId, data.guestId) - switch data.option { - case 2: //发送消息 - logx.Info("通过用户id索引发送消息") - mapUserUniqueId, ok := mapUserConnPool[userKey] - if !ok { - logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", userKey) - continue - } - for uniqueId, _ := range mapUserUniqueId { - //根据uniqueId查询原始池中连接 - mapConnPoolVal, ok := mapConnPool.Load(uniqueId) - if !ok { - logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", userKey, " 原始uniqueId:", uniqueId) - continue - } - originConn, ok := mapConnPoolVal.(wsConnectItem) - if !ok { - logx.Error("通过用户id索引发送消息,断言原始连接失败,用户索引key:", userKey, " 原始uniqueId:", uniqueId) - continue - } - originConn.sendToOutChan(data.message) - } - case 1: //添加 - logx.Info("添加用户id索引标识:", data.uniqueId) - //存在这个用户的map池子 - if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok { - mapUserUniqueId[data.uniqueId] = struct{}{} - } else { - mapUserConnPool[userKey] = make(map[string]struct{}) - mapUserConnPool[userKey][data.uniqueId] = struct{}{} - } - case 0: //删除 - logx.Info("删除用户id索引标识:", data.uniqueId) - if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok { - delete(mapUserUniqueId, data.uniqueId) - } - } - } - } -} - -// 获取mapUserConnPool唯一id -func getmapUserConnPoolUniqueId(userId, guestId int64) (uniqueId string) { - if userId > 0 { - guestId = 0 - } - return fmt.Sprintf("%d_%d", userId, guestId) -} - // 获取websocket发送到前端使用的数据传输类型(debug开启是文本,否则是二进制) func getWebsocketBaseTransferDataFormat() int { if openDebug { @@ -448,6 +318,8 @@ func (w *wsConnectItem) close() { close(w.closeChan) //删除用户级索引 deleteUserConnPoolElement(w.userId, w.guestId, w.uniqueId) + //减少连接数统计 + decreaseWebsocketConnectCount() } logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closed") } diff --git a/server/websocket/internal/logic/ws_connect_statistics.go b/server/websocket/internal/logic/ws_connect_statistics.go new file mode 100644 index 00000000..a1ea9731 --- /dev/null +++ b/server/websocket/internal/logic/ws_connect_statistics.go @@ -0,0 +1,46 @@ +package logic + +import ( + "context" + "github.com/zeromicro/go-zero/core/logx" +) + +var ( + //当前ws连接数 + currentWebsocketConnectCount = 0 + //添加or减少连接的控制chan + websocketConnectCountCtlChan = make(chan int, 20) +) + +// 累增计数 +func increaseWebsocketConnectCount() { + websocketConnectCountCtlChan <- 1 +} + +// 减少计数 +func decreaseWebsocketConnectCount() { + websocketConnectCountCtlChan <- -1 +} + +// 消费数据 +func ConsumeWebsocketConnectCountCtlChanData(ctx context.Context) { + defer func() { + if err := recover(); err != nil { + logx.Error("ConsumeWebsocketConnectCountCtlChanData panic:", err) + } + }() + go func() { + select { + case <-ctx.Done(): + panic("ConsumeWebsocketConnectCountCtlChanData ctx deadline") + } + }() + var num int + for { + select { + case num = <-websocketConnectCountCtlChan: + currentWebsocketConnectCount += num + logx.Info("当前websocket连接总数:", currentWebsocketConnectCount) + } + } +} diff --git a/server/websocket/internal/logic/ws_render_image.go b/server/websocket/internal/logic/ws_render_image.go index 4cb83109..6e06329e 100644 --- a/server/websocket/internal/logic/ws_render_image.go +++ b/server/websocket/internal/logic/ws_render_image.go @@ -19,6 +19,13 @@ import ( "time" ) +var ( + //每个websocket连接渲染任务调度队列长度默认值(添加任务/删除任务/修改任务属性)缓冲队列长度(该队列用于避免map并发读写冲突) + renderImageTaskCtlChanLen = 100 + //每个websocket渲染任务缓冲队列长度默认值 + renderChanLen = 500 +) + // 渲染处理器 type renderProcessor struct { } diff --git a/server/websocket/internal/logic/ws_user_connect_pool.go b/server/websocket/internal/logic/ws_user_connect_pool.go new file mode 100644 index 00000000..51a02031 --- /dev/null +++ b/server/websocket/internal/logic/ws_user_connect_pool.go @@ -0,0 +1,138 @@ +package logic + +import ( + "context" + "fmt" + "github.com/zeromicro/go-zero/core/logx" + "time" +) + +var ( + //用户标识的连接(白板用户不存) + mapUserConnPool = make(map[string]map[string]struct{}) //key是user_id +"_"+guest_id (val是个普通map,存储这个用户的所有连接标识) + //用户标识的连接增删操作队列 + userConnPoolCtlChan = make(chan userConnPoolCtlChanItem, 500) +) + +// 添加用户索引池ws连接 +func createUserConnPoolElement(userId, guestId int64, uniqueId string) { + data := userConnPoolCtlChanItem{ + userId: userId, + guestId: guestId, + uniqueId: uniqueId, + message: nil, + messageType: "", + option: 1, + } + select { + case userConnPoolCtlChan <- data: + return + case <-time.After(time.Millisecond * 200): + return + } +} + +// 从用户索引池删除ws连接 +func deleteUserConnPoolElement(userId, guestId int64, uniqueId string) { + data := userConnPoolCtlChanItem{ + userId: userId, + guestId: guestId, + uniqueId: uniqueId, + message: nil, + messageType: "", + option: 0, + } + select { + case userConnPoolCtlChan <- data: + return + case <-time.After(time.Millisecond * 200): + return + } +} + +// 根据用户索引发现链接并发送(广播)消息到出口队列 +func sendToOutChanByUserIndex(userId, guestId int64, message []byte) { + data := userConnPoolCtlChanItem{ + userId: userId, + guestId: guestId, + uniqueId: "", + message: message, + option: 2, + } + select { + case userConnPoolCtlChan <- data: + return + case <-time.After(time.Millisecond * 200): + return + } +} + +// 消费用户索引创建/删除/发送消息中的任务数据 +func ConsumeUserConnPoolCtlChanData(ctx context.Context) { + defer func() { + if err := recover(); err != nil { + logx.Error("ConsumeUserConnPoolCtlChanData panic:", err) + } + }() + go func() { + select { + case <-ctx.Done(): + panic("ConsumeUserConnPoolCtlChanData ctx deadline") + } + }() + var ( + data userConnPoolCtlChanItem + userKey string + ) + for { + select { + case data = <-userConnPoolCtlChan: + userKey = getmapUserConnPoolUniqueId(data.userId, data.guestId) + switch data.option { + case 2: //发送消息 + logx.Info("通过用户id索引发送消息") + mapUserUniqueId, ok := mapUserConnPool[userKey] + if !ok { + logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", userKey) + continue + } + for uniqueId, _ := range mapUserUniqueId { + //根据uniqueId查询原始池中连接 + mapConnPoolVal, ok := mapConnPool.Load(uniqueId) + if !ok { + logx.Info("通过用户id索引发送消息,连接不存在,用户索引key:", userKey, " 原始uniqueId:", uniqueId) + continue + } + originConn, ok := mapConnPoolVal.(wsConnectItem) + if !ok { + logx.Error("通过用户id索引发送消息,断言原始连接失败,用户索引key:", userKey, " 原始uniqueId:", uniqueId) + continue + } + originConn.sendToOutChan(data.message) + } + case 1: //添加 + logx.Info("添加用户id索引标识:", data.uniqueId) + //存在这个用户的map池子 + if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok { + mapUserUniqueId[data.uniqueId] = struct{}{} + } else { + mapUserConnPool[userKey] = make(map[string]struct{}) + mapUserConnPool[userKey][data.uniqueId] = struct{}{} + } + case 0: //删除 + logx.Info("删除用户id索引标识:", data.uniqueId) + if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok { + delete(mapUserUniqueId, data.uniqueId) + } + } + } + } +} + +// 获取mapUserConnPool唯一id +func getmapUserConnPoolUniqueId(userId, guestId int64) (uniqueId string) { + if userId > 0 { + guestId = 0 + } + return fmt.Sprintf("%d_%d", userId, guestId) +} diff --git a/server/websocket/websocket.go b/server/websocket/websocket.go index 11cad250..d77e9242 100644 --- a/server/websocket/websocket.go +++ b/server/websocket/websocket.go @@ -37,6 +37,8 @@ func main() { go logic.ConsumeCommonCacheData(ctx1) //消费用户索引创建/删除/发送消息中的任务数据 go logic.ConsumeUserConnPoolCtlChanData(ctx1) + //消费连接统计信息 + go logic.ConsumeWebsocketConnectCountCtlChanData(ctx1) fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) server.Start() }