Merge branch 'develop' of https://gitee.com/fusenpack/fusenapi into develop

This commit is contained in:
eson 2023-09-04 16:23:16 +08:00
commit 397dda8ebf
6 changed files with 132 additions and 103 deletions

View File

@ -5,7 +5,6 @@ import (
"fusenapi/constants" "fusenapi/constants"
"fusenapi/utils/auth" "fusenapi/utils/auth"
"fusenapi/utils/basic" "fusenapi/utils/basic"
"sync"
"time" "time"
"fusenapi/server/websocket/internal/svc" "fusenapi/server/websocket/internal/svc"
@ -37,7 +36,7 @@ type commonConnectionNotFoundDataCacheChanItem struct {
} }
// 放入缓冲队列 // 放入缓冲队列
func (l *CommonNotifyLogic) pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) { func pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) {
select { select {
case commonConnectionNotFoundDataCacheChan <- data: case commonConnectionNotFoundDataCacheChan <- data:
return return
@ -47,20 +46,26 @@ func (l *CommonNotifyLogic) pushCommonNotifyCache(data commonConnectionNotFoundD
} }
// 取出元素 // 取出元素
func (l *CommonNotifyLogic) popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) { func popCommonNotifyCache() (data commonConnectionNotFoundDataCacheChanItem) {
return <-commonConnectionNotFoundDataCacheChan return <-commonConnectionNotFoundDataCacheChan
} }
// 保证处理消息就一个循环在执行
var consumeCommonCacheData sync.Once
// 消费公共通知未处理的消息(目前是轮巡方式,待优化) // 消费公共通知未处理的消息(目前是轮巡方式,待优化)
func (l *CommonNotifyLogic) consumeCommonCacheData() { func ConsumeCommonCacheData(ctx context.Context) {
//单例 defer func() {
consumeCommonCacheData.Do(func() { if err := recover(); err != nil {
logx.Error("consumeCommonCacheData panic :", err)
}
}()
go func() {
select {
case <-ctx.Done():
panic("consumeCommonCacheData ctx deadline")
}
}()
for { for {
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
info := l.popCommonNotifyCache() info := popCommonNotifyCache()
//查询websocket连接 //查询websocket连接
value, ok := mapConnPool.Load(info.data.Wid) value, ok := mapConnPool.Load(info.data.Wid)
//没有连接 //没有连接
@ -68,7 +73,7 @@ func (l *CommonNotifyLogic) consumeCommonCacheData() {
info.retryTimes-- info.retryTimes--
//大于0则放回队列 //大于0则放回队列
if info.retryTimes > 0 { if info.retryTimes > 0 {
l.pushCommonNotifyCache(info) pushCommonNotifyCache(info)
continue continue
} }
//否则直接丢弃消息 //否则直接丢弃消息
@ -83,7 +88,6 @@ func (l *CommonNotifyLogic) consumeCommonCacheData() {
//发送 //发送
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data)) ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, info.data.Data))
} }
})
} }
// 处理进入前逻辑w,r // 处理进入前逻辑w,r
@ -91,17 +95,20 @@ func (l *CommonNotifyLogic) consumeCommonCacheData() {
// } // }
func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) { func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *auth.UserInfo) (resp *basic.Response) {
//websocket连接id不能为空 searchConnectType := 1
if req.Wid == "" { if req.Wid == "" {
return resp.SetStatusWithMessage(basic.CodeRequestParamsErr, "websocket connect id is empty") if req.UserId == 0 && req.GuestId == 0 {
return resp.SetStatusWithMessage(basic.CodeOK, "用户信息或者连接标识必须保证至少有其中一个")
} }
//触发消费公共未处理的消息(该方法是单例) searchConnectType = 2
go l.consumeCommonCacheData() }
switch searchConnectType {
case 1: //直接通过唯一标识发消息
//查询websocket连接 //查询websocket连接
value, ok := mapConnPool.Load(req.Wid) value, ok := mapConnPool.Load(req.Wid)
if !ok { if !ok {
//没找到连接就放到公共缓冲队列 //没找到连接就放到公共缓冲队列
go l.pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{ pushCommonNotifyCache(commonConnectionNotFoundDataCacheChanItem{
retryTimes: 20, //重试20次 retryTimes: 20, //重试20次
data: types.CommonNotifyReq{ data: types.CommonNotifyReq{
Wid: req.Wid, Wid: req.Wid,
@ -117,6 +124,9 @@ func (l *CommonNotifyLogic) CommonNotify(req *types.CommonNotifyReq, userinfo *a
return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误") return resp.SetStatusWithMessage(basic.CodeServiceErr, "断言连接错误")
} }
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data)) ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data))
case 2: //通过用户信息找连接发送
sendToOutChanByUserIndex(req.UserId, req.GuestId, (&wsConnectItem{}).respondDataFormat(constants.WEBSOCKET_COMMON_NOTIFY, req.Data))
}
return resp.SetStatusWithMessage(basic.CodeOK, "success") return resp.SetStatusWithMessage(basic.CodeOK, "success")
} }

View File

@ -148,8 +148,6 @@ func (l *DataTransferLogic) DataTransfer(w http.ResponseWriter, r *http.Request)
conn.Close() conn.Close()
return return
} }
//消费用户索引控制chan的数据
go consumeUserPoolData()
//循环读客户端信息 //循环读客户端信息
go ws.acceptBrowserMessage() go ws.acceptBrowserMessage()
//消费出口数据并发送浏览器端 //消费出口数据并发送浏览器端
@ -257,35 +255,41 @@ func sendToOutChanByUserIndex(userId, guestId int64, message []byte) {
} }
} }
// 消费用户索引池中的任务(单例) // 消费用户索引创建/删除/发送消息中的任务数据
var consumeUserPoolDataOnce sync.Once func ConsumeUserPoolData(ctx context.Context) {
func consumeUserPoolData() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
logx.Error("consumeUserPoolData panic:", err) logx.Error("consumeUserPoolData panic:", err)
} }
}() }()
consumeUserPoolDataOnce.Do(func() { go func() {
select {
case <-ctx.Done():
panic("ConsumeUserPoolData ctx deadline")
}
}()
for { for {
select { select {
case data := <-mapUserConnPoolCtlChan: case data := <-mapUserConnPoolCtlChan:
key := getmapUserConnPoolUniqueId(data.userId, data.guestId) key := getmapUserConnPoolUniqueId(data.userId, data.guestId)
switch data.option { switch data.option {
case 2: //发送消息 case 2: //发送消息
logx.Info("通过用户id索引发送消息", data.uniqueId) logx.Info("通过用户id索引发送消息")
mapUserUniqueId, ok := mapUserConnPool[key] mapUserUniqueId, ok := mapUserConnPool[key]
if !ok { if !ok {
logx.Info("通过用户id索引发送消息,连接不存在用户索引key:", key)
continue continue
} }
for _, uniqueId := range mapUserUniqueId { for uniqueId, _ := range mapUserUniqueId {
//根据uniqueId查询原始池中连接 //根据uniqueId查询原始池中连接
mapConnPoolVal, ok := mapConnPool.Load(uniqueId) mapConnPoolVal, ok := mapConnPool.Load(uniqueId)
if !ok { if !ok {
logx.Info("通过用户id索引发送消息,连接不存在用户索引key:", key, " 原始uniqueId:", uniqueId)
continue continue
} }
originConn, ok := mapConnPoolVal.(wsConnectItem) originConn, ok := mapConnPoolVal.(wsConnectItem)
if !ok { if !ok {
logx.Error("通过用户id索引发送消息,断言原始连接失败用户索引key:", key, " 原始uniqueId:", uniqueId)
continue continue
} }
originConn.sendToOutChan(data.message) originConn.sendToOutChan(data.message)
@ -308,7 +312,6 @@ func consumeUserPoolData() {
} }
} }
} }
})
} }
// 获取mapUserConnPool唯一id // 获取mapUserConnPool唯一id

View File

@ -61,8 +61,11 @@ func (r *reuseConnProcessor) allocationMessage(w *wsConnectItem, data []byte) {
} }
//重新绑定 //重新绑定
logx.Info("开始重新绑定websocket连接标识") logx.Info("开始重新绑定websocket连接标识")
oldUniqueId := w.uniqueId
w.uniqueId = wid w.uniqueId = wid
mapConnPool.Store(wid, *w) mapConnPool.Store(wid, *w)
//删除用户id级别之前的索引
deleteUserConnPoolElement(w.userId, w.guestId, oldUniqueId)
//添加用户id级别索引 //添加用户id级别索引
createUserConnPoolElement(w.userId, w.guestId, wid) createUserConnPoolElement(w.userId, w.guestId, wid)
rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid) rsp := w.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, wid)

View File

@ -13,7 +13,9 @@ type RenderNotifyReq struct {
} }
type CommonNotifyReq struct { type CommonNotifyReq struct {
Wid string `json:"wid"` //websocket连接标识 Wid string `json:"wid,optional"` //websocket连接标识
UserId int64 `json:"user_id,optional"` //用户id
GuestId int64 `json:"guest_id,optional"` //游客id
Data map[string]interface{} `json:"data"` //后端与前端约定好的数据 Data map[string]interface{} `json:"data"` //后端与前端约定好的数据
} }

View File

@ -1,8 +1,10 @@
package main package main
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"fusenapi/server/websocket/internal/logic"
"net/http" "net/http"
"fusenapi/utils/auth" "fusenapi/utils/auth"
@ -28,6 +30,13 @@ func main() {
ctx := svc.NewServiceContext(c) ctx := svc.NewServiceContext(c)
handler.RegisterHandlers(server, ctx) handler.RegisterHandlers(server, ctx)
ctx1 := context.Background()
ctx1, cancel := context.WithCancel(ctx1)
defer cancel()
//消费公共通知队列的数据
go logic.ConsumeCommonCacheData(ctx1)
//消费用户索引创建/删除/发送消息中的任务数据
go logic.ConsumeUserPoolData(ctx1)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
server.Start() server.Start()
} }

View File

@ -29,6 +29,8 @@ type RenderNotifyReq {
} }
//通用回调接口 //通用回调接口
type CommonNotifyReq { type CommonNotifyReq {
Wid string `json:"wid"` //websocket连接标识 Wid string `json:"wid,optional"` //websocket连接标识
UserId int64 `json:"user_id,optional"` //用户id
GuestId int64 `json:"guest_id,optional"` //游客id
Data map[string]interface{} `json:"data"` //后端与前端约定好的数据 Data map[string]interface{} `json:"data"` //后端与前端约定好的数据
} }