This commit is contained in:
laodaming 2023-09-04 14:26:55 +08:00
parent c062848a95
commit 030d9f8ab3
3 changed files with 118 additions and 99 deletions

View File

@ -5,7 +5,6 @@ import (
"fusenapi/constants" "fusenapi/constants"
"fusenapi/utils/auth" "fusenapi/utils/auth"
"fusenapi/utils/basic" "fusenapi/utils/basic"
"sync"
"time" "time"
"fusenapi/server/websocket/internal/svc" "fusenapi/server/websocket/internal/svc"
@ -37,7 +36,7 @@ type commonConnectionNotFoundDataCacheChanItem struct {
} }
// 放入缓冲队列 // 放入缓冲队列
func (l *CommonNotifyLogic) pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) { func pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) {
select { select {
case commonConnectionNotFoundDataCacheChan <- data: case commonConnectionNotFoundDataCacheChan <- data:
return return
@ -47,43 +46,48 @@ func (l *CommonNotifyLogic) pushCommonNotifyCache(data commonConnectionNotFoundD
} }
// 取出元素 // 取出元素
func (l *CommonNotifyLogic) popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) { func popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) {
return <-commonConnectionNotFoundDataCacheChan return <-commonConnectionNotFoundDataCacheChan
} }
// 保证处理消息就一个循环在执行
var consumeCommonCacheData sync.Once
// 消费公共通知未处理的消息(目前是轮巡方式,待优化) // 消费公共通知未处理的消息(目前是轮巡方式,待优化)
func (l *CommonNotifyLogic) consumeCommonCacheData() { func ConsumeCommonCacheData(ctx context.Context) {
//单例 defer func() {
consumeCommonCacheData.Do(func() { if err := recover(); err != nil {
for { logx.Error("consumeCommonCacheData panic :", err)
time.Sleep(time.Millisecond * 200)
info := l.popCommonNotifyCache()
//查询websocket连接
value, ok := mapConnPool.Load(info.data.Wid)
//没有连接
if !ok {
info.retryTimes--
//大于0则放回队列
if info.retryTimes > 0 {
l.pushCommonNotifyCache(info)
continue
}
//否则直接丢弃消息
continue
}
//断言连接
ws, ok := value.(wsConnectItem)
if !ok {
logx.Error("渲染回调断言websocket连接失败")
continue
}
//发送
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data))
} }
}) }()
go func() {
select {
case <-ctx.Done():
panic("consumeCommonCacheData ctx deadline")
}
}()
for {
time.Sleep(time.Millisecond * 200)
info := popCommonNotifyCache()
//查询websocket连接
value, ok := mapConnPool.Load(info.data.Wid)
//没有连接
if !ok {
info.retryTimes--
//大于0则放回队列
if info.retryTimes > 0 {
pushCommonNotifyCache(info)
continue
}
//否则直接丢弃消息
continue
}
//断言连接
ws, ok := value.(wsConnectItem)
if !ok {
logx.Error("渲染回调断言websocket连接失败")
continue
}
//发送
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data))
}
} }
// 处理进入前逻辑w,r // 处理进入前逻辑w,r
@ -91,32 +95,38 @@ func (l *CommonNotifyLogic) consumeCommonCacheData() {
// } // }
func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) { func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) {
//websocket连接id不能为空 searchConnectType := "uniqueId"
if req.Wid == "" { if req.Wid == "" {
return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "websocket connect id is empty") if !userinfo.IsUser() && !userinfo.IsGuest() {
return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "websocket connect id is empty")
}
searchConnectType = "userInfo"
} }
//触发消费公共未处理的消息(该方法是单例) switch searchConnectType {
go l.consumeCommonCacheData() case "uniqueId": //直接通过唯一标识发消息
//查询websocket连接 //查询websocket连接
value, ok := mapConnPool.Load(req.Wid) value, ok := mapConnPool.Load(req.Wid)
if !ok { if !ok {
//没找到连接就放到公共缓冲队列 //没找到连接就放到公共缓冲队列
go l.pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{ pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{
retryTimes: 20, //重试20次 retryTimes: 20, //重试20次
data: types.CommonNotifyReq{ data: types.CommonNotifyReq{
Wid: req.Wid, Wid: req.Wid,
Data: req.Data, Data: req.Data,
}, },
}) })
return resp.SetStatusWithMessage(basic.CodeOK, "success") return resp.SetStatusWithMessage(basic.CodeOK, "success")
}
//断言连接
ws, ok := value.(wsConnectItem)
if !ok {
logx.Error("渲染回调断言websocket连接失败")
return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误")
}
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data))
case "userInfo": //通过用户信息找连接发送
sendToOutChanByUserIndex(userinfo.UserId, userinfo.GuestId, (&wsConnectItem{}).respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data))
} }
//断言连接
ws, ok := value.(wsConnectItem)
if !ok {
logx.Error("渲染回调断言websocket连接失败")
return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误")
}
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data))
return resp.SetStatusWithMessage(basic.CodeOK, "success") return resp.SetStatusWithMessage(basic.CodeOK, "success")
} }

View File

@ -148,8 +148,6 @@ func (l *DataTransferLogic) DataTransfer(w http.ResponseWriter, r *http.Request)
conn.Close() conn.Close()
return return
} }
//消费用户索引控制chan的数据
go consumeUserPoolData()
//循环读客户端信息 //循环读客户端信息
go ws.acceptBrowserMessage() go ws.acceptBrowserMessage()
//消费出口数据并发送浏览器端 //消费出口数据并发送浏览器端
@ -257,58 +255,60 @@ func sendToOutChanByUserIndex(userId, guestId int64, message []byte) {
} }
} }
// 消费用户索引池中的任务(单例) // 消费用户索引创建/删除/发送消息中的任务数据
var consumeUserPoolDataOnce sync.Once func ConsumeUserPoolData(ctx context.Context) {
func consumeUserPoolData() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logx.Error("consumeUserPoolData panic:", err) logx.Error("consumeUserPoolData panic:", err)
} }
}() }()
consumeUserPoolDataOnce.Do(func() { go func() {
for { select {
select { case <-ctx.Done():
case data := <-mapUserConnPoolCtlChan: panic("ConsumeUserPoolData ctx deadline")
key := getmapUserConnPoolUniqueId(data.userId, data.guestId) }
switch data.option { }()
case 2: //发送消息 for {
logx.Info("通过用户id索引发送消息", data.uniqueId) select {
mapUserUniqueId, ok := mapUserConnPool[key] case data := <-mapUserConnPoolCtlChan:
key := getmapUserConnPoolUniqueId(data.userId, data.guestId)
switch data.option {
case 2: //发送消息
logx.Info("通过用户id索引发送消息", data.uniqueId)
mapUserUniqueId, ok := mapUserConnPool[key]
if !ok {
continue
}
for _, uniqueId := range mapUserUniqueId {
//根据uniqueId查询原始池中连接
mapConnPoolVal, ok := mapConnPool.Load(uniqueId)
if !ok { if !ok {
continue continue
} }
for _, uniqueId := range mapUserUniqueId { originConn, ok := mapConnPoolVal.(wsConnectItem)
//根据uniqueId查询原始池中连接 if !ok {
mapConnPoolVal, ok := mapConnPool.Load(uniqueId) continue
if !ok {
continue
}
originConn, ok := mapConnPoolVal.(wsConnectItem)
if !ok {
continue
}
originConn.sendToOutChan(data.message)
} }
case 1: //添加 originConn.sendToOutChan(data.message)
logx.Info("添加用户id索引标识", data.uniqueId)
if mapUserUniqueId, ok := mapUserConnPool[key]; ok {
mapUserUniqueId[data.uniqueId] = struct{}{}
} else {
mapUserConnPool[key] = make(map[string]struct{})
mapUserConnPool[key][data.uniqueId] = struct{}{}
}
case 0: //删除
logx.Info("删除用户id索引标识", data.uniqueId)
if mapUserUniqueId, ok := mapUserConnPool[key]; ok {
delete(mapUserUniqueId, data.uniqueId)
}
default:
} }
case 1: //添加
logx.Info("添加用户id索引标识", data.uniqueId)
if mapUserUniqueId, ok := mapUserConnPool[key]; ok {
mapUserUniqueId[data.uniqueId] = struct{}{}
} else {
mapUserConnPool[key] = make(map[string]struct{})
mapUserConnPool[key][data.uniqueId] = struct{}{}
}
case 0: //删除
logx.Info("删除用户id索引标识", data.uniqueId)
if mapUserUniqueId, ok := mapUserConnPool[key]; ok {
delete(mapUserUniqueId, data.uniqueId)
}
default:
} }
} }
}) }
} }
// 获取mapUserConnPool唯一id // 获取mapUserConnPool唯一id

View File

@ -1,8 +1,10 @@
package main package main
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"fusenapi/server/websocket/internal/logic"
"net/http" "net/http"
"fusenapi/utils/auth" "fusenapi/utils/auth"
@ -28,6 +30,13 @@ func main() {
ctx := svc.NewServiceContext(c) ctx := svc.NewServiceContext(c)
handler.RegisterHandlers(server, ctx) handler.RegisterHandlers(server, ctx)
ctx1 := context.Background()
ctx1, cancel := context.WithCancel(ctx1)
defer cancel()
//消费公共通知队列的数据
go logic.ConsumeCommonCacheData(ctx1)
//消费用户索引创建/删除/发送消息中的任务数据
go logic.ConsumeUserPoolData(ctx1)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
server.Start() server.Start()
} }