fusenapi/server/websocket/internal/logic/commonnotifylogic.go

131 lines
3.6 KiB
Go
Raw Normal View History

2023-08-24 07:45:23 +00:00
package logic
import (
"context"
"fusenapi/constants"
"fusenapi/utils/auth"
"fusenapi/utils/basic"
2023-08-25 06:49:24 +00:00
"sync"
"time"
2023-08-24 07:45:23 +00:00
"fusenapi/server/websocket/internal/svc"
"fusenapi/server/websocket/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type CommonNotifyLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCommonNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommonNotifyLogic {
return &CommonNotifyLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
2023-08-25 06:49:24 +00:00
// 定义公共回调未找到websocket连接时暂存数据缓冲队列
var commonConnectionNotFoundDataCacheChan = make(chan commonConnectionNotFoundDataCacheChanItem, 2000)
type commonConnectionNotFoundDataCacheChanItem struct {
retryTimes int //重回队列次数
data types.CommonNotifyReq //数据
}
// 放入缓冲队列
func (l *CommonNotifyLogic) pushCommonCache(data commonConnectionNotFoundDataCacheChanItem) {
select {
case commonConnectionNotFoundDataCacheChan <- data:
return
2023-08-25 07:06:09 +00:00
case <-time.After(time.Millisecond * 50): //超50ms就丢弃
2023-08-25 06:49:24 +00:00
return
}
}
// 取出元素
func (l *CommonNotifyLogic) popCommonCache() (data commonConnectionNotFoundDataCacheChanItem) {
return <-commonConnectionNotFoundDataCacheChan
}
// 保证处理消息就一个循环在执行
var consumeCommonCacheData sync.Once
// 消费公共通知未处理的消息
func (l *CommonNotifyLogic) consumeCommonCacheData() {
//单例
consumeCommonCacheData.Do(func() {
tick := time.Tick(time.Millisecond * 200)
for {
select {
case <-tick: //200毫秒触发一次
info := l.popCommonCache()
//查询websocket连接
value, ok := mapConnPool.Load(info.data.Wid)
//没有连接
if !ok {
info.retryTimes--
//大于0则放回队列
if info.retryTimes > 0 {
l.pushCommonCache(info)
continue
}
//否则直接丢弃消息
continue
}
//断言连接
ws, ok := value.(wsConnectItem)
if !ok {
logx.Error("渲染回调断言websocket连接失败")
continue
}
//发送
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data))
}
}
})
}
2023-08-24 07:45:23 +00:00
// 处理进入前逻辑w,r
// func (l *CommonNotifyLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) {
// }
func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) {
//websocket连接id不能为空
2023-08-24 07:59:55 +00:00
if req.Wid == "" {
2023-08-24 07:45:23 +00:00
return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "websocket connect id is empty")
}
2023-08-25 06:52:42 +00:00
//触发消费公共未处理的消息(该方法是单例)
2023-08-25 06:59:36 +00:00
go l.consumeCommonCacheData()
2023-08-24 07:45:23 +00:00
//查询websocket连接
2023-08-24 07:59:55 +00:00
value, ok := mapConnPool.Load(req.Wid)
2023-08-24 07:45:23 +00:00
if !ok {
2023-08-25 06:49:24 +00:00
//没找到连接就放到公共缓冲队列
2023-08-25 07:06:09 +00:00
go l.pushCommonCache(commonConnectionNotFoundDataCacheChanItem{
2023-08-25 07:15:00 +00:00
retryTimes: 20, //重试20次
2023-08-25 06:49:24 +00:00
data: types.CommonNotifyReq{
Wid: req.Wid,
Data: req.Data,
},
})
return resp.SetStatusWithMessage(basic.CodeOK, "success")
2023-08-24 07:45:23 +00:00
}
//断言连接
ws, ok := value.(wsConnectItem)
if !ok {
logx.Error("渲染回调断言websocket连接失败")
return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误")
}
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data))
return resp.SetStatusWithMessage(basic.CodeOK, "success")
}
// 处理逻辑后 w,r 如:重定向, resp 必须重新处理
// func (l *CommonNotifyLogic) AfterLogic(w http.ResponseWriter, r *http.Request, resp *basic.Response) {
// // httpx.OkJsonCtx(r.Context(), w, resp)
// }