fusenapi/server/websocket/internal/logic/commonnotifylogic.go
laodaming 4c62b941a0 fix
2023-09-04 15:06:07 +08:00

137 lines
3.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package logic
import (
"context"
"fusenapi/constants"
"fusenapi/utils/auth"
"fusenapi/utils/basic"
"time"
"fusenapi/server/websocket/internal/svc"
"fusenapi/server/websocket/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type CommonNotifyLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCommonNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommonNotifyLogic {
return &CommonNotifyLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
// 定义公共回调未找到websocket连接时暂存数据缓冲队列
var commonConnectionNotFoundDataCacheChan = make(chan commonConnectionNotFoundDataCacheChanItem, 2000)
type commonConnectionNotFoundDataCacheChanItem struct {
retryTimes int //重回队列次数
data types.CommonNotifyReq //数据
}
// 放入缓冲队列
func pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) {
select {
case commonConnectionNotFoundDataCacheChan <- data:
return
case <-time.After(time.Millisecond * 50): //超50ms就丢弃
return
}
}
// 取出元素
func popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) {
return <-commonConnectionNotFoundDataCacheChan
}
// 消费公共通知未处理的消息(目前是轮巡方式,待优化)
func ConsumeCommonCacheData(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
logx.Error("consumeCommonCacheData panic :", err)
}
}()
go func() {
select {
case <-ctx.Done():
panic("consumeCommonCacheData ctx deadline")
}
}()
for {
time.Sleep(time.Millisecond * 200)
info := popCommonNotifyCache()
//查询websocket连接
value, ok := mapConnPool.Load(info.data.Wid)
//没有连接
if !ok {
info.retryTimes--
//大于0则放回队列
if info.retryTimes > 0 {
pushCommonNotifyCache(info)
continue
}
//否则直接丢弃消息
continue
}
//断言连接
ws, ok := value.(wsConnectItem)
if !ok {
logx.Error("渲染回调断言websocket连接失败")
continue
}
//发送
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data))
}
}
// 处理进入前逻辑w,r
// func (l *CommonNotifyLogic) BeforeLogic(w http.ResponseWriter, r *http.Request) {
// }
func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) {
searchConnectType := "uniqueId"
if req.Wid == "" {
if req.UserId == 0 && req.GuestId == 0 {
return resp.SetStatusWithMessage(basic.CodeOK, "用户信息或者连接标识必须保证至少有其中一个")
}
searchConnectType = "userInfo"
}
switch searchConnectType {
case "uniqueId": //直接通过唯一标识发消息
//查询websocket连接
value, ok := mapConnPool.Load(req.Wid)
if !ok {
//没找到连接就放到公共缓冲队列
pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{
retryTimes: 20, //重试20次
data: types.CommonNotifyReq{
Wid: req.Wid,
Data: req.Data,
},
})
return resp.SetStatusWithMessage(basic.CodeOK, "success")
}
//断言连接
ws, ok := value.(wsConnectItem)
if !ok {
logx.Error("渲染回调断言websocket连接失败")
return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误")
}
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data))
case "userInfo": //通过用户信息找连接发送
sendToOutChanByUserIndex(req.UserId, req.GuestId, (&wsConnectItem{}).respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data))
}
return resp.SetStatusWithMessage(basic.CodeOK, "success")
}
// 处理逻辑后 w,r 如:重定向, resp 必须重新处理
// func (l *CommonNotifyLogic) AfterLogic(w http.ResponseWriter, r *http.Request, resp *basic.Response) {
// // httpx.OkJsonCtx(r.Context(), w, resp)
// }