diff --git a/server/websocket/internal/logic/commonnotifylogic.go b/server/websocket/internal/logic/commonnotifylogic.go index 2e5e6e65..72abe5b6 100644 --- a/server/websocket/internal/logic/commonnotifylogic.go +++ b/server/websocket/internal/logic/commonnotifylogic.go @@ -37,7 +37,7 @@ type commonConnectionNotFoundDataCacheChanItem struct { } // 放入缓冲队列 -func (l *CommonNotifyLogic) pushCommonCache(data commonConnectionNotFoundDataCacheChanItem) { +func (l *CommonNotifyLogic) pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) { select { case commonConnectionNotFoundDataCacheChan <- data: return @@ -47,45 +47,41 @@ func (l *CommonNotifyLogic) pushCommonCache(data commonConnectionNotFoundDataCac } // 取出元素 -func (l *CommonNotifyLogic) popCommonCache() (data commonConnectionNotFoundDataCacheChanItem) { +func (l *CommonNotifyLogic) popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) { return <-commonConnectionNotFoundDataCacheChan } // 保证处理消息就一个循环在执行 var consumeCommonCacheData sync.Once -// 消费公共通知未处理的消息 +// 消费公共通知未处理的消息(目前是轮巡方式,待优化) func (l *CommonNotifyLogic) consumeCommonCacheData() { //单例 consumeCommonCacheData.Do(func() { - tick := time.Tick(time.Millisecond * 200) for { - select { - case <-tick: //200毫秒触发一次 - info := l.popCommonCache() - //查询websocket连接 - value, ok := mapConnPool.Load(info.data.Wid) - //没有连接 - if !ok { - info.retryTimes-- - //大于0,则放回队列 - if info.retryTimes > 0 { - l.pushCommonCache(info) - continue - } - //否则直接丢弃消息 + time.Sleep(time.Millisecond * 200) + info := l.popCommonNotifyCache() + //查询websocket连接 + value, ok := mapConnPool.Load(info.data.Wid) + //没有连接 + if !ok { + info.retryTimes-- + //大于0,则放回队列 + if info.retryTimes > 0 { + l.pushCommonNotifyCache(info) continue } - //断言连接 - ws, ok := value.(wsConnectItem) - if !ok { - logx.Error("渲染回调断言websocket连接失败") - continue - } - //发送 - ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data)) + //否则直接丢弃消息 + continue } - + //断言连接 + ws, ok := value.(wsConnectItem) + if !ok { + logx.Error("渲染回调断言websocket连接失败") + continue + } + //发送 + ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data)) } }) } @@ -105,7 +101,7 @@ func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *a value, ok := mapConnPool.Load(req.Wid) if !ok { //没找到连接就放到公共缓冲队列 - go l.pushCommonCache(commonConnectionNotFoundDataCacheChanItem{ + go l.pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{ retryTimes: 20, //重试20次 data: types.CommonNotifyReq{ Wid: req.Wid,