From ae80cf12f3c86c29d60291cbbde594f949d9f0fc Mon Sep 17 00:00:00 2001
From: laodaming <11058467+laudamine@user.noreply.gitee.com>
Date: Mon, 4 Sep 2023 11:17:12 +0800
Subject: [PATCH 1/2] fix

---
 constants/websocket.go                        |   2 +-
 .../internal/logic/datatransferlogic.go       | 136 +++++++++++++++++-
 2 files changed, 133 insertions(+), 5 deletions(-)

diff --git a/constants/websocket.go b/constants/websocket.go
index da7d8618..5efc8be3 100644
--- a/constants/websocket.go
+++ b/constants/websocket.go
@@ -5,7 +5,7 @@ type Websocket string
 // websocket消息类型(主类别)
 const (
 	WEBSOCKET_UNAUTH                          Websocket = "WEBSOCKET_UNAUTH"                          //鉴权失败 (1级消息,单向通信)
-	WEBSOCKET_GEN_UNIQUE_ID_ERR               Websocket = "WEBSOCKET_GEN_UNIQUE_ID_ERR"               //获取ws连接标识错误 (1级消息,单向通信)
+	WEBSOCKET_CONNECT_ERR                     Websocket = "WEBSOCKET_CONNECT_ERR"                     //ws连接错误 (1级消息,单向通信)
 	WEBSOCKET_CONNECT_SUCCESS                 Websocket = "WEBSOCKET_CONNECT_SUCCESS"                 //ws连接成功 (1级消息,单向通信)
 	WEBSOCKET_REQUEST_REUSE_LAST_CONNECT      Websocket = "WEBSOCKET_REQUEST_REUSE_LAST_CONNECT"      //请求恢复为上次连接的标识 (1级消息,单向通信)
 	WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR Websocket = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR" //请求恢复为上次连接的标识错误 (1级消息,单向通信)
diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go
index eb6434b8..43396c2e 100644
--- a/server/websocket/internal/logic/datatransferlogic.go
+++ b/server/websocket/internal/logic/datatransferlogic.go
@@ -6,6 +6,7 @@ import (
 	"encoding/hex"
 	"encoding/json"
 	"errors"
+	"fmt"
 	"fusenapi/constants"
 	"fusenapi/utils/auth"
 	"fusenapi/utils/basic"
@@ -66,6 +67,10 @@ var (
 	}
 	//websocket连接存储
 	mapConnPool = sync.Map{}
+	//用户标识的连接(白板用户不存)
+	mapUserConnPool = make(map[string]map[string]struct{}) //key是user_id +"_"+guest_id (val是个普通map,存储这个用户的所有连接标识)
+	//用户标识的连接增删操作队列
+	mapUserConnPoolCtlChan = make(chan userConnPoolChanItem, 2000)
 	//每个websocket连接入口缓冲队列长度
 	websocketInChanLen = 1000
 	//每个websocket连接出口缓冲队列长度
@@ -76,6 +81,16 @@ var (
 	renderChanLen = 500
 )
 
+// 用户标识的连接增删操作队列传输的值的结构
+type userConnPoolChanItem struct {
+	userId      int64               //必须(两个用户id任意一个不为0)
+	guestId     int64               //必须(两个用户id任意一个不为0)
+	uniqueId    string              //主连接池唯一标识(添加/删除时候必须)
+	message     []byte              //消息(发送消息传的,格式是经过标准输出序列化后的数据)
+	messageType constants.Websocket //消息类型(发送消息传的)
+	option      int64               //操作 2发消息 1增加 0删除
+}
+
 // 每个连接的连接基本属性
 type wsConnectItem struct {
 	conn                 *websocket.Conn      //websocket的连接(基本属性)
@@ -133,6 +148,8 @@ func (l *DataTransferLogic) DataTransfer(w http.ResponseWriter, r *http.Request)
 		conn.Close()
 		return
 	}
+	//消费用户索引控制chan的数据
+	go consumeUserPoolData()
 	//循环读客户端信息
 	go ws.acceptBrowserMessage()
 	//消费出口数据并发送浏览器端
@@ -176,6 +193,15 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use
 	}
 	//保存连接
 	mapConnPool.Store(uniqueId, ws)
+	//非白板用户,需要为这个用户建立map索引便于通过用户查询
+	if userInfo.IsUser() || userInfo.IsGuest() {
+		createUserConnPoolElement(userConnPoolChanItem{
+			userId:   userInfo.UserId,
+			guestId:  userInfo.GuestId,
+			uniqueId: uniqueId,
+			option:   1,
+		})
+	}
 	if isFirefoxBrowser {
 		time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
 	}
@@ -183,6 +209,101 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use
 	return ws, nil
 }
 
+// 添加用户索引池ws连接
+func createUserConnPoolElement(data userConnPoolChanItem) {
+	data.option = 1
+	select {
+	case mapUserConnPoolCtlChan <- data:
+		return
+	case <-time.After(time.Millisecond * 200):
+		logx.Error("向用户索引池中连接任务放入增加操作超时失败")
+		return
+	}
+}
+
+// 从用户索引池删除ws连接
+func deleteUserConnPoolElement(data userConnPoolChanItem) {
+	data.option = 0
+	select {
+	case mapUserConnPoolCtlChan <- data:
+		return
+	case <-time.After(time.Millisecond * 200):
+		logx.Error("向用户索引池中连接任务放入删除操作超时失败")
+		return
+	}
+}
+
+// 根据用户索引发现链接并发送消息到出口队列
+func sendToOutChanByUserIndex(data userConnPoolChanItem) {
+	data.option = 0
+	select {
+	case mapUserConnPoolCtlChan <- data:
+		return
+	case <-time.After(time.Millisecond * 200):
+		logx.Error("通过用户索引找连接发送消息失败")
+		return
+	}
+}
+
+// 消费用户索引池中的任务(单例)
+var consumeUserPoolDataOnce sync.Once
+
+func consumeUserPoolData() {
+	defer func() {
+		if err := recover(); err != nil {
+			logx.Error("consumeUserPoolData panic:", err)
+		}
+	}()
+	consumeUserPoolDataOnce.Do(func() {
+		for {
+			select {
+			case data := <-mapUserConnPoolCtlChan:
+				key := getmapUserConnPoolUniqueId(data.userId, data.guestId)
+				switch data.option {
+				case 2: //发送消息
+					logx.Info("通过用户id索引发送消息:", data.uniqueId)
+					mapUserUniqueId, ok := mapUserConnPool[key]
+					if !ok {
+						continue
+					}
+					for _, uniqueId := range mapUserUniqueId {
+						//根据uniqueId查询原始池中连接
+						mapConnPoolVal, ok := mapConnPool.Load(uniqueId)
+						if !ok {
+							continue
+						}
+						originConn, ok := mapConnPoolVal.(wsConnectItem)
+						if !ok {
+							continue
+						}
+						originConn.sendToOutChan(data.message)
+					}
+				case 1: //添加
+					logx.Info("添加用户id索引标识:", data.uniqueId)
+					if mapUserUniqueId, ok := mapUserConnPool[key]; ok {
+						mapUserUniqueId[data.uniqueId] = struct{}{}
+					} else {
+						mapUserConnPool[key] = make(map[string]struct{})
+						mapUserConnPool[key][data.uniqueId] = struct{}{}
+					}
+				case 0: //删除
+					logx.Info("删除用户id索引标识:", data.uniqueId)
+					if mapUserUniqueId, ok := mapUserConnPool[key]; ok {
+						delete(mapUserUniqueId, data.uniqueId)
+					}
+				default:
+
+				}
+			}
+		}
+	})
+}
+
+// 获取mapUserConnPool唯一id
+func getmapUserConnPoolUniqueId(userId, guestId int64) (uniqueId string) {
+	return fmt.Sprintf("%d_%d", userId, guestId)
+}
+
 // 获取唯一id
 func (l *DataTransferLogic) getUniqueId(userInfo *auth.UserInfo, userAgent string, retryTimes int) (uniqueId string, err error) {
 	if retryTimes < 0 {
@@ -236,7 +357,7 @@ func (l *DataTransferLogic) unAuthResponse(conn *websocket.Conn, isFirefoxBrowse
 func (l *DataTransferLogic) sendGetUniqueIdErrResponse(conn *websocket.Conn) {
 	time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
 	rsp := websocket_data.DataTransferData{
-		T: constants.WEBSOCKET_GEN_UNIQUE_ID_ERR,
+		T: constants.WEBSOCKET_CONNECT_ERR,
 		D: "err to gen unique id ",
 	}
 	b, _ := json.Marshal(rsp)
@@ -276,6 +397,13 @@ func (w *wsConnectItem) close() {
 	if !w.isClose {
 		w.isClose = true
 		close(w.closeChan)
+		//删除用户级索引
+		deleteUserConnPoolElement(userConnPoolChanItem{
+			userId:   w.userId,
+			guestId:  w.guestId,
+			uniqueId: w.uniqueId,
+			option:   0,
+		})
 	}
 	logx.Info("###websocket:", w.uniqueId, "  uid:", w.userId, "  gid:", w.guestId, " is closed")
 }
@@ -284,7 +412,7 @@ func (w *wsConnectItem) close() {
 func (w *wsConnectItem) consumeOutChanData() {
 	defer func() {
 		if err := recover(); err != nil {
-			logx.Error("write loop panic:", err)
+			logx.Error("consumeOutChanData panic:", err)
 		}
 	}()
 	for {
@@ -305,7 +433,7 @@ func (w *wsConnectItem) consumeOutChanData() {
 func (w *wsConnectItem) consumeInChanData() {
 	defer func() {
 		if err := recover(); err != nil {
-			logx.Error("send loop panic:", err)
+			logx.Error("consumeInChanData:", err)
 		}
 	}()
 	for {
@@ -323,7 +451,7 @@ func (w *wsConnectItem) consumeInChanData() {
 func (w *wsConnectItem) acceptBrowserMessage() {
 	defer func() {
 		if err := recover(); err != nil {
-			logx.Error("read loop panic:", err)
+			logx.Error("acceptBrowserMessage panic:", err)
 		}
 	}()
 	for {

From 1a1903c82674c90ebfb237e3cd0f2fca5f11b46f Mon Sep 17 00:00:00 2001
From: laodaming <11058467+laudamine@user.noreply.gitee.com>
Date: Mon, 4 Sep 2023 11:21:24 +0800
Subject: [PATCH 2/2] fix

---
 server/websocket/internal/logic/ws_reuse_last_connect.go | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/server/websocket/internal/logic/ws_reuse_last_connect.go b/server/websocket/internal/logic/ws_reuse_last_connect.go
index 1816993b..065f43eb 100644
--- a/server/websocket/internal/logic/ws_reuse_last_connect.go
+++ b/server/websocket/internal/logic/ws_reuse_last_connect.go
@@ -63,6 +63,13 @@ func (r *reuseConnProcessor) allocationMessage(w *wsConnectItem, data []byte) {
 	logx.Info("开始重新绑定websocket连接标识")
 	w.uniqueId = wid
 	mapConnPool.Store(wid, *w)
+	//添加用户id级别索引
+	createUserConnPoolElement(userConnPoolChanItem{
+		userId:   w.userId,
+		guestId:  w.guestId,
+		uniqueId: wid,
+		option:   1,
+	})
 	rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid)
 	w.sendToOutChan(rsp)
 	logx.Info("重新绑定websocket连接标识成功")