This commit is contained in:
laodaming 2023-09-04 11:37:04 +08:00
parent 1a1903c826
commit 1b091892a4
2 changed files with 29 additions and 27 deletions

View File

@ -195,12 +195,7 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use
mapConnPool.Store(uniqueId, ws)
//非白板用户需要为这个用户建立map索引便于通过用户查询
if userInfo.IsUser() || userInfo.IsGuest() {
createUserConnPoolElement(userConnPoolChanItem{
userId: userInfo.UserId,
guestId: userInfo.GuestId,
uniqueId: uniqueId,
option: 1,
})
createUserConnPoolElement(userInfo.UserId, userInfo.GuestId, uniqueId)
}
if isFirefoxBrowser {
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
@ -210,37 +205,54 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use
}
// 添加用户索引池ws连接
func createUserConnPoolElement(data userConnPoolChanItem) {
data.option = 1
func createUserConnPoolElement(userId, guestId int64, uniqueId string) {
data := userConnPoolChanItem{
userId: userId,
guestId: guestId,
uniqueId: uniqueId,
message: nil,
messageType: "",
option: 1,
}
select {
case mapUserConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
logx.Error("向用户索引池中连接任务放入增加操作超时失败")
return
}
}
// 从用户索引池删除ws连接
func deleteUserConnPoolElement(data userConnPoolChanItem) {
data.option = 0
func deleteUserConnPoolElement(userId, guestId int64, uniqueId string) {
data := userConnPoolChanItem{
userId: userId,
guestId: guestId,
uniqueId: uniqueId,
message: nil,
messageType: "",
option: 0,
}
select {
case mapUserConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
logx.Error("向用户索引池中连接任务放入删除操作超时失败")
return
}
}
// 根据用户索引发现链接并发送消息到出口队列
func sendToOutChanByUserIndex(data userConnPoolChanItem) {
data.option = 0
func sendToOutChanByUserIndex(userId, guestId int64, uniqueId string, message []byte) {
data := userConnPoolChanItem{
userId: userId,
guestId: guestId,
uniqueId: uniqueId,
message: message,
option: 2,
}
select {
case mapUserConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
logx.Error("通过用户索引找连接发送消息失败")
return
}
}
@ -398,12 +410,7 @@ func (w *wsConnectItem) close() {
w.isClose = true
close(w.closeChan)
//删除用户级索引
deleteUserConnPoolElement(userConnPoolChanItem{
userId: w.userId,
guestId: w.guestId,
uniqueId: w.uniqueId,
option: 0,
})
deleteUserConnPoolElement(w.userId, w.guestId, w.uniqueId)
}
logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closed")
}

View File

@ -64,12 +64,7 @@ func (r *reuseConnProcessor) allocationMessage(w *wsConnectItem, data []byte) {
w.uniqueId = wid
mapConnPool.Store(wid, *w)
//添加用户id级别索引
createUserConnPoolElement(userConnPoolChanItem{
userId: w.userId,
guestId: w.guestId,
uniqueId: wid,
option: 1,
})
createUserConnPoolElement(w.userId, w.guestId, wid)
rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid)
w.sendToOutChan(rsp)
logx.Info("重新绑定websocket连接标识成功")