This commit is contained in:
laodaming 2023-09-07 15:54:24 +08:00
parent 45d93b94b4
commit 35eb087b5a
5 changed files with 197 additions and 132 deletions

View File

@ -6,7 +6,6 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"fusenapi/constants"
"fusenapi/utils/auth"
"fusenapi/utils/basic"
@ -67,18 +66,10 @@ var (
}
//websocket连接存储
mapConnPool = sync.Map{}
//用户标识的连接(白板用户不存)
mapUserConnPool = make(map[string]map[string]struct{}) //key是user_id +"_"+guest_id val是个普通map存储这个用户的所有连接标识
//用户标识的连接增删操作队列
userConnPoolCtlChan = make(chan userConnPoolCtlChanItem, 500)
//每个websocket连接入口缓冲队列长度默认值
websocketInChanLen = 500
//每个websocket连接出口缓冲队列长度默认值
websocketOutChanLen = 500
//每个websocket连接渲染任务调度队列长度默认值添加任务/删除任务/修改任务属性缓冲队列长度该队列用于避免map并发读写冲突
renderImageTaskCtlChanLen = 100
//每个websocket渲染任务缓冲队列长度默认值
renderChanLen = 500
//是否开启debug
openDebug = true
//允许跨域的origin
@ -216,132 +207,11 @@ func (l *DataTransferLogic) setConnPool(conn *websocket.Conn, userInfo *auth.Use
time.Sleep(time.Second * 1) //兼容下火狐(直接发回去收不到第一条消息:有待研究)
}
ws.sendToOutChan(ws.respondDataFormat(constants.WEBSOCKET_CONNECT_SUCCESS, uniqueId))
//发送累加统计连接书
increaseWebsocketConnectCount()
return ws, nil
}
// 添加用户索引池ws连接
func createUserConnPoolElement(userId, guestId int64, uniqueId string) {
data := userConnPoolCtlChanItem{
userId: userId,
guestId: guestId,
uniqueId: uniqueId,
message: nil,
messageType: "",
option: 1,
}
select {
case userConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
return
}
}
// 从用户索引池删除ws连接
func deleteUserConnPoolElement(userId, guestId int64, uniqueId string) {
data := userConnPoolCtlChanItem{
userId: userId,
guestId: guestId,
uniqueId: uniqueId,
message: nil,
messageType: "",
option: 0,
}
select {
case userConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
return
}
}
// 根据用户索引发现链接并发送(广播)消息到出口队列
func sendToOutChanByUserIndex(userId, guestId int64, message []byte) {
data := userConnPoolCtlChanItem{
userId: userId,
guestId: guestId,
uniqueId: "",
message: message,
option: 2,
}
select {
case userConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
return
}
}
// 消费用户索引创建/删除/发送消息中的任务数据
func ConsumeUserConnPoolCtlChanData(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
logx.Error("ConsumeUserConnPoolCtlChanData panic:", err)
}
}()
go func() {
select {
case <-ctx.Done():
panic("ConsumeUserConnPoolCtlChanData ctx deadline")
}
}()
var (
data userConnPoolCtlChanItem
userKey string
)
for {
select {
case data = <-userConnPoolCtlChan:
userKey = getmapUserConnPoolUniqueId(data.userId, data.guestId)
switch data.option {
case 2: //发送消息
logx.Info("通过用户id索引发送消息")
mapUserUniqueId, ok := mapUserConnPool[userKey]
if !ok {
logx.Info("通过用户id索引发送消息,连接不存在用户索引key:", userKey)
continue
}
for uniqueId, _ := range mapUserUniqueId {
//根据uniqueId查询原始池中连接
mapConnPoolVal, ok := mapConnPool.Load(uniqueId)
if !ok {
logx.Info("通过用户id索引发送消息,连接不存在用户索引key:", userKey, " 原始uniqueId:", uniqueId)
continue
}
originConn, ok := mapConnPoolVal.(wsConnectItem)
if !ok {
logx.Error("通过用户id索引发送消息,断言原始连接失败用户索引key:", userKey, " 原始uniqueId:", uniqueId)
continue
}
originConn.sendToOutChan(data.message)
}
case 1: //添加
logx.Info("添加用户id索引标识", data.uniqueId)
//存在这个用户的map池子
if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok {
mapUserUniqueId[data.uniqueId] = struct{}{}
} else {
mapUserConnPool[userKey] = make(map[string]struct{})
mapUserConnPool[userKey][data.uniqueId] = struct{}{}
}
case 0: //删除
logx.Info("删除用户id索引标识", data.uniqueId)
if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok {
delete(mapUserUniqueId, data.uniqueId)
}
}
}
}
}
// 获取mapUserConnPool唯一id
func getmapUserConnPoolUniqueId(userId, guestId int64) (uniqueId string) {
if userId > 0 {
guestId = 0
}
return fmt.Sprintf("%d_%d", userId, guestId)
}
// 获取websocket发送到前端使用的数据传输类型debug开启是文本否则是二进制
func getWebsocketBaseTransferDataFormat() int {
if openDebug {
@ -448,6 +318,8 @@ func (w *wsConnectItem) close() {
close(w.closeChan)
//删除用户级索引
deleteUserConnPoolElement(w.userId, w.guestId, w.uniqueId)
//减少连接数统计
decreaseWebsocketConnectCount()
}
logx.Info("###websocket:", w.uniqueId, " uid:", w.userId, " gid:", w.guestId, " is closed")
}

View File

@ -0,0 +1,46 @@
package logic
import (
"context"
"github.com/zeromicro/go-zero/core/logx"
)
var (
//当前ws连接数
currentWebsocketConnectCount = 0
//添加or减少连接的控制chan
websocketConnectCountCtlChan = make(chan int, 20)
)
// 累增计数
func increaseWebsocketConnectCount() {
websocketConnectCountCtlChan <- 1
}
// 减少计数
func decreaseWebsocketConnectCount() {
websocketConnectCountCtlChan <- -1
}
// 消费数据
func ConsumeWebsocketConnectCountCtlChanData(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
logx.Error("ConsumeWebsocketConnectCountCtlChanData panic:", err)
}
}()
go func() {
select {
case <-ctx.Done():
panic("ConsumeWebsocketConnectCountCtlChanData ctx deadline")
}
}()
var num int
for {
select {
case num = <-websocketConnectCountCtlChan:
currentWebsocketConnectCount += num
logx.Info("当前websocket连接总数:", currentWebsocketConnectCount)
}
}
}

View File

@ -19,6 +19,13 @@ import (
"time"
)
var (
//每个websocket连接渲染任务调度队列长度默认值添加任务/删除任务/修改任务属性缓冲队列长度该队列用于避免map并发读写冲突
renderImageTaskCtlChanLen = 100
//每个websocket渲染任务缓冲队列长度默认值
renderChanLen = 500
)
// 渲染处理器
type renderProcessor struct {
}

View File

@ -0,0 +1,138 @@
package logic
import (
"context"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"time"
)
var (
//用户标识的连接(白板用户不存)
mapUserConnPool = make(map[string]map[string]struct{}) //key是user_id +"_"+guest_id val是个普通map存储这个用户的所有连接标识
//用户标识的连接增删操作队列
userConnPoolCtlChan = make(chan userConnPoolCtlChanItem, 500)
)
// 添加用户索引池ws连接
func createUserConnPoolElement(userId, guestId int64, uniqueId string) {
data := userConnPoolCtlChanItem{
userId: userId,
guestId: guestId,
uniqueId: uniqueId,
message: nil,
messageType: "",
option: 1,
}
select {
case userConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
return
}
}
// 从用户索引池删除ws连接
func deleteUserConnPoolElement(userId, guestId int64, uniqueId string) {
data := userConnPoolCtlChanItem{
userId: userId,
guestId: guestId,
uniqueId: uniqueId,
message: nil,
messageType: "",
option: 0,
}
select {
case userConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
return
}
}
// 根据用户索引发现链接并发送(广播)消息到出口队列
func sendToOutChanByUserIndex(userId, guestId int64, message []byte) {
data := userConnPoolCtlChanItem{
userId: userId,
guestId: guestId,
uniqueId: "",
message: message,
option: 2,
}
select {
case userConnPoolCtlChan <- data:
return
case <-time.After(time.Millisecond * 200):
return
}
}
// 消费用户索引创建/删除/发送消息中的任务数据
func ConsumeUserConnPoolCtlChanData(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
logx.Error("ConsumeUserConnPoolCtlChanData panic:", err)
}
}()
go func() {
select {
case <-ctx.Done():
panic("ConsumeUserConnPoolCtlChanData ctx deadline")
}
}()
var (
data userConnPoolCtlChanItem
userKey string
)
for {
select {
case data = <-userConnPoolCtlChan:
userKey = getmapUserConnPoolUniqueId(data.userId, data.guestId)
switch data.option {
case 2: //发送消息
logx.Info("通过用户id索引发送消息")
mapUserUniqueId, ok := mapUserConnPool[userKey]
if !ok {
logx.Info("通过用户id索引发送消息,连接不存在用户索引key:", userKey)
continue
}
for uniqueId, _ := range mapUserUniqueId {
//根据uniqueId查询原始池中连接
mapConnPoolVal, ok := mapConnPool.Load(uniqueId)
if !ok {
logx.Info("通过用户id索引发送消息,连接不存在用户索引key:", userKey, " 原始uniqueId:", uniqueId)
continue
}
originConn, ok := mapConnPoolVal.(wsConnectItem)
if !ok {
logx.Error("通过用户id索引发送消息,断言原始连接失败用户索引key:", userKey, " 原始uniqueId:", uniqueId)
continue
}
originConn.sendToOutChan(data.message)
}
case 1: //添加
logx.Info("添加用户id索引标识", data.uniqueId)
//存在这个用户的map池子
if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok {
mapUserUniqueId[data.uniqueId] = struct{}{}
} else {
mapUserConnPool[userKey] = make(map[string]struct{})
mapUserConnPool[userKey][data.uniqueId] = struct{}{}
}
case 0: //删除
logx.Info("删除用户id索引标识", data.uniqueId)
if mapUserUniqueId, ok := mapUserConnPool[userKey]; ok {
delete(mapUserUniqueId, data.uniqueId)
}
}
}
}
}
// 获取mapUserConnPool唯一id
func getmapUserConnPoolUniqueId(userId, guestId int64) (uniqueId string) {
if userId > 0 {
guestId = 0
}
return fmt.Sprintf("%d_%d", userId, guestId)
}

View File

@ -37,6 +37,8 @@ func main() {
go logic.ConsumeCommonCacheData(ctx1)
//消费用户索引创建/删除/发送消息中的任务数据
go logic.ConsumeUserConnPoolCtlChanData(ctx1)
//消费连接统计信息
go logic.ConsumeWebsocketConnectCountCtlChanData(ctx1)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
server.Start()
}