This commit is contained in:
laodaming 2023-09-21 17:56:48 +08:00
parent 0885c645c3
commit 5db1f3f09d
4 changed files with 9 additions and 25 deletions

View File

@ -28,7 +28,7 @@ func NewCommonNotifyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Comm
}
// 定义公共回调未找到websocket连接时暂存数据缓冲队列
var commonConnectionNotFoundDataCacheChan = make(chan commonConnectionNotFoundDataCacheChanItem, 100)
var commonConnectionNotFoundDataCacheChan = make(chan commonConnectionNotFoundDataCacheChanItem, 2000)
type commonConnectionNotFoundDataCacheChanItem struct {
retryTimes int //重回队列次数
@ -40,8 +40,6 @@ func pushCommonNotifyCache(data commonConnectionNotFoundDataCacheChanItem) {
select {
case commonConnectionNotFoundDataCacheChan <- data:
return
case <-time.After(time.Millisecond * 50): //超50ms就丢弃
return
}
}

View File

@ -67,9 +67,9 @@ var (
//websocket连接存储
mapConnPool = sync.Map{}
//每个websocket连接入口缓冲队列长度默认值
websocketInChanLen = 50
websocketInChanLen = 500
//每个websocket连接出口缓冲队列长度默认值
websocketOutChanLen = 50
websocketOutChanLen = 500
//是否开启debug
openDebug = true
//允许跨域的origin
@ -192,8 +192,7 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use
userId: userInfo.UserId,
guestId: userInfo.GuestId,
extendRenderProperty: extendRenderProperty{
renderChan: make(chan []byte, renderChanLen),
renderConsumeTickTime: 1, //默认1纳秒后面需要根据不同用户不同触发速度
renderChan: make(chan []byte, renderChanLen),
},
}
//保存连接

View File

@ -21,7 +21,7 @@ import (
var (
//每个websocket渲染任务缓冲队列长度默认值
renderChanLen = 20
renderChanLen = 200
)
// 渲染处理器
@ -30,8 +30,7 @@ type renderProcessor struct {
// 云渲染属性
type extendRenderProperty struct {
renderChan chan []byte //渲染消息入口的缓冲队列
renderConsumeTickTime time.Duration //消费渲染消息时钟间隔(纳秒),用于后期控制不同类型用户渲染速度限制
renderChan chan []byte //渲染消息入口的缓冲队列
}
// 处理分发到这里的数据
@ -52,18 +51,13 @@ func (w *wsConnectItem) consumeRenderImageData() {
logx.Error("func renderImage err:", err)
}
}()
var duration time.Duration = 1
if w.extendRenderProperty.renderConsumeTickTime > 0 {
duration = w.extendRenderProperty.renderConsumeTickTime
}
ticker := time.NewTicker(duration)
defer ticker.Stop()
var data []byte
for {
select {
case <-w.closeChan: //已关闭
return
case <-ticker.C: //消费数据
w.renderImage(<-w.extendRenderProperty.renderChan)
case data = <-w.extendRenderProperty.renderChan: //消费数据
w.renderImage(data)
}
}
}

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"time"
)
var (
@ -30,8 +29,6 @@ func createUserConnPoolElement(userId, guestId int64, uniqueId string) {
select {
case userConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
return
}
}
@ -51,8 +48,6 @@ func deleteUserConnPoolElement(userId, guestId int64, uniqueId string) {
select {
case userConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
return
}
}
@ -68,8 +63,6 @@ func sendToOutChanByUserIndex(userId, guestId int64, message []byte) {
select {
case userConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
return
}
}