This commit is contained in:
laodaming 2023-09-04 11:17:12 +08:00
parent 88ae58c9d8
commit ae80cf12f3
2 changed files with 133 additions and 5 deletions

View File

@ -5,7 +5,7 @@ type Websocket string
// websocket消息类型(主类别)
const (
WEBSOCKET_UNAUTH Websocket = "WEBSOCKET_UNAUTH" //鉴权失败 1级消息单向通信
WEBSOCKET_GEN_UNIQUE_ID_ERR Websocket = "WEBSOCKET_GEN_UNIQUE_ID_ERR" //获取ws连接标识错误 1级消息单向通信
WEBSOCKET_CONNECT_ERR Websocket = "WEBSOCKET_CONNECT_ERR" //ws连接错误 1级消息单向通信
WEBSOCKET_CONNECT_SUCCESS Websocket = "WEBSOCKET_CONNECT_SUCCESS" //ws连接成功 1级消息单向通信
WEBSOCKET_REQUEST_REUSE_LAST_CONNECT Websocket = "WEBSOCKET_REQUEST_REUSE_LAST_CONNECT" //请求恢复为上次连接的标识 1级消息单向通信
WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR Websocket = "WEBSOCKET_REQUEST_RESUME_LAST_CONNECT_ERR" //请求恢复为上次连接的标识错误 1级消息单向通信

View File

@ -6,6 +6,7 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"fusenapi/constants"
"fusenapi/utils/auth"
"fusenapi/utils/basic"
@ -66,6 +67,10 @@ var (
}
//websocket连接存储
mapConnPool = sync.Map{}
//用户标识的连接(白板用户不存)
mapUserConnPool = make(map[string]map[string]struct{}) //key是user_id +"_"+guest_id val是个普通map存储这个用户的所有连接标识
//用户标识的连接增删操作队列
mapUserConnPoolCtlChan = make(chan userConnPoolChanItem, 2000)
//每个websocket连接入口缓冲队列长度
websocketInChanLen = 1000
//每个websocket连接出口缓冲队列长度
@ -76,6 +81,16 @@ var (
renderChanLen = 500
)
// 用户标识的连接增删操作队列传输的值的结构
type userConnPoolChanItem struct {
userId int64 //必须两个用户id任意一个不为0
guestId int64 //必须两个用户id任意一个不为0
uniqueId string //主连接池唯一标识(添加/删除时候必须)
message []byte //消息(发送消息传的,格式是经过标准输出序列化后的数据)
messageType constants.Websocket //消息类型(发送消息传的)
option int64 //操作 2发消息 1增加 0删除
}
// 每个连接的连接基本属性
type wsConnectItem struct {
conn *websocket.Conn //websocket的连接(基本属性)
@ -133,6 +148,8 @@ func (l *DataTransferLogic) DataTransfer(w http.ResponseWriter, r *http.Request)
conn.Close()
return
}
//消费用户索引控制chan的数据
go consumeUserPoolData()
//循环读客户端信息
go ws.acceptBrowserMessage()
//消费出口数据并发送浏览器端
@ -176,6 +193,15 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use
}
//保存连接
mapConnPool.Store(uniqueId, ws)
//非白板用户需要为这个用户建立map索引便于通过用户查询
if userInfo.IsUser() || userInfo.IsGuest() {
createUserConnPoolElement(userConnPoolChanItem{
userId: userInfo.UserId,
guestId: userInfo.GuestId,
uniqueId: uniqueId,
option: 1,
})
}
if isFirefoxBrowser {
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
}
@ -183,6 +209,101 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use
return ws, nil
}
// 添加用户索引池ws连接
func createUserConnPoolElement(data userConnPoolChanItem) {
data.option = 1
select {
case mapUserConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
logx.Error("向用户索引池中连接任务放入增加操作超时失败")
return
}
}
// 从用户索引池删除ws连接
func deleteUserConnPoolElement(data userConnPoolChanItem) {
data.option = 0
select {
case mapUserConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
logx.Error("向用户索引池中连接任务放入删除操作超时失败")
return
}
}
// 根据用户索引发现链接并发送消息到出口队列
func sendToOutChanByUserIndex(data userConnPoolChanItem) {
data.option = 0
select {
case mapUserConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
logx.Error("通过用户索引找连接发送消息失败")
return
}
}
// 消费用户索引池中的任务(单例)
var consumeUserPoolDataOnce sync.Once
func consumeUserPoolData() {
defer func() {
if err := recover(); err != nil {
logx.Error("consumeUserPoolData panic:", err)
}
}()
consumeUserPoolDataOnce.Do(func() {
for {
select {
case data := <-mapUserConnPoolCtlChan:
key := getmapUserConnPoolUniqueId(data.userId, data.guestId)
switch data.option {
case 2: //发送消息
logx.Info("通过用户id索引发送消息", data.uniqueId)
mapUserUniqueId, ok := mapUserConnPool[key]
if !ok {
continue
}
for _, uniqueId := range mapUserUniqueId {
//根据uniqueId查询原始池中连接
mapConnPoolVal, ok := mapConnPool.Load(uniqueId)
if !ok {
continue
}
originConn, ok := mapConnPoolVal.(wsConnectItem)
if !ok {
continue
}
originConn.sendToOutChan(data.message)
}
case 1: //添加
logx.Info("添加用户id索引标识", data.uniqueId)
if mapUserUniqueId, ok := mapUserConnPool[key]; ok {
mapUserUniqueId[data.uniqueId] = struct{}{}
} else {
mapUserConnPool[key] = make(map[string]struct{})
mapUserConnPool[key][data.uniqueId] = struct{}{}
}
case 0: //删除
logx.Info("删除用户id索引标识", data.uniqueId)
if mapUserUniqueId, ok := mapUserConnPool[key]; ok {
delete(mapUserUniqueId, data.uniqueId)
}
default:
}
}
}
})
}
// 获取mapUserConnPool唯一id
func getmapUserConnPoolUniqueId(userId, guestId int64) (uniqueId string) {
return fmt.Sprintf("%d_%d", userId, guestId)
}
// 获取唯一id
func (l *DataTransferLogic) getUniqueId(userInfo *auth.UserInfo, userAgent string, retryTimes int) (uniqueId string, err error) {
if retryTimes < 0 {
@ -236,7 +357,7 @@ func (l *DataTransferLogic) unAuthResponse(conn *websocket.Conn, isFirefoxBrowse
func (l *DataTransferLogic) sendGetUniqueIdErrResponse(conn *websocket.Conn) {
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
rsp := websocket_data.DataTransferData{
T: constants.WEBSOCKET_GEN_UNIQUE_ID_ERR,
T: constants.WEBSOCKET_CONNECT_ERR,
D: "err to gen unique id ",
}
b, _ := json.Marshal(rsp)
@ -276,6 +397,13 @@ func (w *wsConnectItem) close() {
if !w.isClose {
w.isClose = true
close(w.closeChan)
//删除用户级索引
deleteUserConnPoolElement(userConnPoolChanItem{
userId: w.userId,
guestId: w.guestId,
uniqueId: w.uniqueId,
option: 0,
})
}
logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closed")
}
@ -284,7 +412,7 @@ func (w *wsConnectItem) close() {
func (w *wsConnectItem) consumeOutChanData() {
defer func() {
if err := recover(); err != nil {
logx.Error("write loop panic:", err)
logx.Error("consumeOutChanData panic:", err)
}
}()
for {
@ -305,7 +433,7 @@ func (w *wsConnectItem) consumeOutChanData() {
func (w *wsConnectItem) consumeInChanData() {
defer func() {
if err := recover(); err != nil {
logx.Error("send loop panic:", err)
logx.Error("consumeInChanData:", err)
}
}()
for {
@ -323,7 +451,7 @@ func (w *wsConnectItem) consumeInChanData() {
func (w *wsConnectItem) acceptBrowserMessage() {
defer func() {
if err := recover(); err != nil {
logx.Error("read loop panic:", err)
logx.Error("acceptBrowserMessage panic:", err)
}
}()
for {