diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 43396c2e..f76061f5 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -195,12 +195,7 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use mapConnPool.Store(uniqueId, ws) //非白板用户,需要为这个用户建立map索引便于通过用户查询 if userInfo.IsUser() || userInfo.IsGuest() { - createUserConnPoolElement(userConnPoolChanItem{ - userId: userInfo.UserId, - guestId: userInfo.GuestId, - uniqueId: uniqueId, - option: 1, - }) + createUserConnPoolElement(userInfo.UserId, userInfo.GuestId, uniqueId) } if isFirefoxBrowser { time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究) @@ -210,37 +205,54 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use } // 添加用户索引池ws连接 -func createUserConnPoolElement(data userConnPoolChanItem) { - data.option = 1 +func createUserConnPoolElement(userId, guestId int64, uniqueId string) { + data := userConnPoolChanItem{ + userId: userId, + guestId: guestId, + uniqueId: uniqueId, + message: nil, + messageType: "", + option: 1, + } select { case mapUserConnPoolCtlChan <- data: return case <-time.After(time.Millisecond * 200): - logx.Error("向用户索引池中连接任务放入增加操作超时失败") return } } // 从用户索引池删除ws连接 -func deleteUserConnPoolElement(data userConnPoolChanItem) { - data.option = 0 +func deleteUserConnPoolElement(userId, guestId int64, uniqueId string) { + data := userConnPoolChanItem{ + userId: userId, + guestId: guestId, + uniqueId: uniqueId, + message: nil, + messageType: "", + option: 0, + } select { case mapUserConnPoolCtlChan <- data: return case <-time.After(time.Millisecond * 200): - logx.Error("向用户索引池中连接任务放入删除操作超时失败") return } } // 根据用户索引发现链接并发送消息到出口队列 -func sendToOutChanByUserIndex(data userConnPoolChanItem) { - data.option = 0 +func sendToOutChanByUserIndex(userId, guestId int64, uniqueId string, message []byte) { + data := userConnPoolChanItem{ + userId: userId, + guestId: guestId, + uniqueId: uniqueId, + message: message, + option: 2, + } select { case mapUserConnPoolCtlChan <- data: return case <-time.After(time.Millisecond * 200): - logx.Error("通过用户索引找连接发送消息失败") return } } @@ -398,12 +410,7 @@ func (w *wsConnectItem) close() { w.isClose = true close(w.closeChan) //删除用户级索引 - deleteUserConnPoolElement(userConnPoolChanItem{ - userId: w.userId, - guestId: w.guestId, - uniqueId: w.uniqueId, - option: 0, - }) + deleteUserConnPoolElement(w.userId, w.guestId, w.uniqueId) } logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closed") } diff --git a/server/websocket/internal/logic/ws_reuse_last_connect.go b/server/websocket/internal/logic/ws_reuse_last_connect.go index 065f43eb..1a8ea395 100644 --- a/server/websocket/internal/logic/ws_reuse_last_connect.go +++ b/server/websocket/internal/logic/ws_reuse_last_connect.go @@ -64,12 +64,7 @@ func (r *reuseConnProcessor) allocationMessage(w *wsConnectItem, data []byte) { w.uniqueId = wid mapConnPool.Store(wid, *w) //添加用户id级别索引 - createUserConnPoolElement(userConnPoolChanItem{ - userId: w.userId, - guestId: w.guestId, - uniqueId: wid, - option: 1, - }) + createUserConnPoolElement(w.userId, w.guestId, wid) rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid) w.sendToOutChan(rsp) logx.Info("重新绑定websocket连接标识成功")