From ae80cf12f3c86c29d60291cbbde594f949d9f0fc Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Mon, 4 Sep 2023 11:17:12 +0800 Subject: [PATCH] fix --- constants/websocket.go | 2 +- .../internal/logic/datatransferlogic.go | 136 +++++++++++++++++- 2 files changed, 133 insertions(+), 5 deletions(-) diff --git a/constants/websocket.go b/constants/websocket.go index da7d8618..5efc8be3 100644 --- a/constants/websocket.go +++ b/constants/websocket.go @@ -5,7 +5,7 @@ type Websocket string // websocket消息类型(主类别) const ( WEBSOCKET_UNAUTH Websocket = "WEBSOCKET_UNAUTH" //鉴权失败 (1级消息,单向通信) - WEBSOCKET_GEN_UNIQUE_ID_ERR Websocket = "WEBSOCKET_GEN_UNIQUE_ID_ERR" //获取ws连接标识错误 (1级消息,单向通信) + WEBSOCKET_CONNECT_ERR Websocket = "WEBSOCKET_CONNECT_ERR" //ws连接错误 (1级消息,单向通信) WEBSOCKET_CONNECT_SUCCESS Websocket = "WEBSOCKET_CONNECT_SUCCESS" //ws连接成功 (1级消息,单向通信) WEBSOCKET_REQUEST_REUSE_LAST_CONNECT Websocket = "WEBSOCKET_REQUEST_REUSE_LAST_CONNECT" //请求恢复为上次连接的标识 (1级消息,单向通信) WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR Websocket = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR" //请求恢复为上次连接的标识错误 (1级消息,单向通信) diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index eb6434b8..43396c2e 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "encoding/json" "errors" + "fmt" "fusenapi/constants" "fusenapi/utils/auth" "fusenapi/utils/basic" @@ -66,6 +67,10 @@ var ( } //websocket连接存储 mapConnPool = sync.Map{} + //用户标识的连接(白板用户不存) + mapUserConnPool = make(map[string]map[string]struct{}) //key是user_id +"_"+guest_id (val是个普通map,存储这个用户的所有连接标识) + //用户标识的连接增删操作队列 + mapUserConnPoolCtlChan = make(chan userConnPoolChanItem, 2000) //每个websocket连接入口缓冲队列长度 websocketInChanLen = 1000 //每个websocket连接出口缓冲队列长度 @@ -76,6 +81,16 @@ var ( renderChanLen = 500 ) +// 用户标识的连接增删操作队列传输的值的结构 +type userConnPoolChanItem struct { + userId int64 //必须(两个用户id任意一个不为0) + guestId int64 //必须(两个用户id任意一个不为0) + uniqueId string //主连接池唯一标识(添加/删除时候必须) + message []byte //消息(发送消息传的,格式是经过标准输出序列化后的数据) + messageType constants.Websocket //消息类型(发送消息传的) + option int64 //操作 2发消息 1增加 0删除 +} + // 每个连接的连接基本属性 type wsConnectItem struct { conn *websocket.Conn //websocket的连接(基本属性) @@ -133,6 +148,8 @@ func (l *DataTransferLogic) DataTransfer(w http.ResponseWriter, r *http.Request) conn.Close() return } + //消费用户索引控制chan的数据 + go consumeUserPoolData() //循环读客户端信息 go ws.acceptBrowserMessage() //消费出口数据并发送浏览器端 @@ -176,6 +193,15 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use } //保存连接 mapConnPool.Store(uniqueId, ws) + //非白板用户,需要为这个用户建立map索引便于通过用户查询 + if userInfo.IsUser() || userInfo.IsGuest() { + createUserConnPoolElement(userConnPoolChanItem{ + userId: userInfo.UserId, + guestId: userInfo.GuestId, + uniqueId: uniqueId, + option: 1, + }) + } if isFirefoxBrowser { time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究) } @@ -183,6 +209,101 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use return ws, nil } +// 添加用户索引池ws连接 +func createUserConnPoolElement(data userConnPoolChanItem) { + data.option = 1 + select { + case mapUserConnPoolCtlChan <- data: + return + case <-time.After(time.Millisecond * 200): + logx.Error("向用户索引池中连接任务放入增加操作超时失败") + return + } +} + +// 从用户索引池删除ws连接 +func deleteUserConnPoolElement(data userConnPoolChanItem) { + data.option = 0 + select { + case mapUserConnPoolCtlChan <- data: + return + case <-time.After(time.Millisecond * 200): + logx.Error("向用户索引池中连接任务放入删除操作超时失败") + return + } +} + +// 根据用户索引发现链接并发送消息到出口队列 +func sendToOutChanByUserIndex(data userConnPoolChanItem) { + data.option = 0 + select { + case mapUserConnPoolCtlChan <- data: + return + case <-time.After(time.Millisecond * 200): + logx.Error("通过用户索引找连接发送消息失败") + return + } +} + +// 消费用户索引池中的任务(单例) +var consumeUserPoolDataOnce sync.Once + +func consumeUserPoolData() { + defer func() { + if err := recover(); err != nil { + logx.Error("consumeUserPoolData panic:", err) + } + }() + consumeUserPoolDataOnce.Do(func() { + for { + select { + case data := <-mapUserConnPoolCtlChan: + key := getmapUserConnPoolUniqueId(data.userId, data.guestId) + switch data.option { + case 2: //发送消息 + logx.Info("通过用户id索引发送消息:", data.uniqueId) + mapUserUniqueId, ok := mapUserConnPool[key] + if !ok { + continue + } + for _, uniqueId := range mapUserUniqueId { + //根据uniqueId查询原始池中连接 + mapConnPoolVal, ok := mapConnPool.Load(uniqueId) + if !ok { + continue + } + originConn, ok := mapConnPoolVal.(wsConnectItem) + if !ok { + continue + } + originConn.sendToOutChan(data.message) + } + case 1: //添加 + logx.Info("添加用户id索引标识:", data.uniqueId) + if mapUserUniqueId, ok := mapUserConnPool[key]; ok { + mapUserUniqueId[data.uniqueId] = struct{}{} + } else { + mapUserConnPool[key] = make(map[string]struct{}) + mapUserConnPool[key][data.uniqueId] = struct{}{} + } + case 0: //删除 + logx.Info("删除用户id索引标识:", data.uniqueId) + if mapUserUniqueId, ok := mapUserConnPool[key]; ok { + delete(mapUserUniqueId, data.uniqueId) + } + default: + + } + } + } + }) +} + +// 获取mapUserConnPool唯一id +func getmapUserConnPoolUniqueId(userId, guestId int64) (uniqueId string) { + return fmt.Sprintf("%d_%d", userId, guestId) +} + // 获取唯一id func (l *DataTransferLogic) getUniqueId(userInfo *auth.UserInfo, userAgent string, retryTimes int) (uniqueId string, err error) { if retryTimes < 0 { @@ -236,7 +357,7 @@ func (l *DataTransferLogic) unAuthResponse(conn *websocket.Conn, isFirefoxBrowse func (l *DataTransferLogic) sendGetUniqueIdErrResponse(conn *websocket.Conn) { time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究) rsp := websocket_data.DataTransferData{ - T: constants.WEBSOCKET_GEN_UNIQUE_ID_ERR, + T: constants.WEBSOCKET_CONNECT_ERR, D: "err to gen unique id ", } b, _ := json.Marshal(rsp) @@ -276,6 +397,13 @@ func (w *wsConnectItem) close() { if !w.isClose { w.isClose = true close(w.closeChan) + //删除用户级索引 + deleteUserConnPoolElement(userConnPoolChanItem{ + userId: w.userId, + guestId: w.guestId, + uniqueId: w.uniqueId, + option: 0, + }) } logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closed") } @@ -284,7 +412,7 @@ func (w *wsConnectItem) close() { func (w *wsConnectItem) consumeOutChanData() { defer func() { if err := recover(); err != nil { - logx.Error("write loop panic:", err) + logx.Error("consumeOutChanData panic:", err) } }() for { @@ -305,7 +433,7 @@ func (w *wsConnectItem) consumeOutChanData() { func (w *wsConnectItem) consumeInChanData() { defer func() { if err := recover(); err != nil { - logx.Error("send loop panic:", err) + logx.Error("consumeInChanData:", err) } }() for { @@ -323,7 +451,7 @@ func (w *wsConnectItem) consumeInChanData() { func (w *wsConnectItem) acceptBrowserMessage() { defer func() { if err := recover(); err != nil { - logx.Error("read loop panic:", err) + logx.Error("acceptBrowserMessage panic:", err) } }() for {