diff --git a/constants/rabbitmq.go b/constants/rabbitmq.go new file mode 100644 index 00000000..c7b8281e --- /dev/null +++ b/constants/rabbitmq.go @@ -0,0 +1,17 @@ +package constants + +type RABBIT_MQ string + +// 消息队列队列名 +const ( + //组装渲染数据队列 + RABBIT_MQ_ASSEMBLE_RENDER_DATA RABBIT_MQ = "RABBIT_MQ_ASSEMBLE_RENDER_DATA" + //渲染结果数据队列 + RABBIT_MQ_RENDER_RESULT_DATA RABBIT_MQ = "RABBIT_MQ_RENDER_RESULT_DATA" +) + +// 队列列表 +var MQ_QUEUE_ARR = []RABBIT_MQ{ + RABBIT_MQ_ASSEMBLE_RENDER_DATA, + RABBIT_MQ_RENDER_RESULT_DATA, +} diff --git a/go.mod b/go.mod index 7427c4bc..7a3e4859 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/hashicorp/raft v1.5.0 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e + github.com/streadway/amqp v1.1.0 github.com/stripe/stripe-go/v74 v74.26.0 github.com/zeromicro/go-zero v1.5.4 golang.org/x/image v0.0.0-20190802002840-cff245a6509b diff --git a/go.sum b/go.sum index 0bb4c655..b58c3903 100644 --- a/go.sum +++ b/go.sum @@ -295,6 +295,8 @@ github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e/go.mod h1:XV66xRDq github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= +github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= diff --git a/goctl_template/api/config.tpl b/goctl_template/api/config.tpl index dff7003f..0e814d42 100644 --- a/goctl_template/api/config.tpl +++ b/goctl_template/api/config.tpl @@ -6,6 +6,7 @@ type Config struct { rest.RestConf SourceMysql string Auth types.Auth + SourceRabbitMq string {{.auth}} {{.jwtTrans}} } diff --git a/goctl_template/api/context.tpl b/goctl_template/api/context.tpl index b9b97827..ba20b339 100644 --- a/goctl_template/api/context.tpl +++ b/goctl_template/api/context.tpl @@ -21,6 +21,7 @@ type ServiceContext struct { SharedState *fsm.StateCluster MysqlConn *gorm.DB AllModels *gmodel.AllModelsGen + RabbitMq *initalize.RabbitMqHandle } func NewServiceContext(c {{.config}}) *ServiceContext { @@ -32,6 +33,7 @@ func NewServiceContext(c {{.config}}) *ServiceContext { MysqlConn: conn, SharedState: StateServer, AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)), + RabbitMq:initalize.InitRabbitMq(c.SourceRabbitMq, nil), {{.middlewareAssignment}} } } \ No newline at end of file diff --git a/goctl_template/api/etc.tpl b/goctl_template/api/etc.tpl index 3279e7ea..d0ec35e0 100644 --- a/goctl_template/api/etc.tpl +++ b/goctl_template/api/etc.tpl @@ -2,7 +2,8 @@ Name: {{.serviceName}} Host: {{.host}} Port: {{.port}} SourceMysql: fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest +SourceRabbitMq: amqp://rabbit001:rabbit001129@110.41.19.98:5672 Auth: AccessSecret: fusen2023 AccessExpire: 2592000 - RefreshAfter: 1592000 \ No newline at end of file + RefreshAfter: 1592000 diff --git a/initalize/rabbitmq.go b/initalize/rabbitmq.go new file mode 100644 index 00000000..3713a7b8 --- /dev/null +++ b/initalize/rabbitmq.go @@ -0,0 +1,120 @@ +package initalize + +import ( + "crypto/tls" + "errors" + "fusenapi/constants" + "github.com/streadway/amqp" + "github.com/zeromicro/go-zero/core/logx" + "log" +) + +type RabbitMqHandle struct { +} + +// 连接属性 +type queueItem struct { + ch *amqp.Channel + queue amqp.Queue +} + +// 存储连接 +var mapMq = make(map[constants.RABBIT_MQ]queueItem) + +func InitRabbitMq(url string, config *tls.Config) *RabbitMqHandle { + if url == "" { + return nil + } + conn, err := amqp.DialTLS(url, config) + if err != nil { + log.Fatalf("Failed to connect to RabbitMQ: %v", err) + } + // 创建一个通道 + ch, err := conn.Channel() + if err != nil { + log.Fatalf("Failed to open a channel: %v", err) + } + //声明队列 + for _, queueName := range constants.MQ_QUEUE_ARR { + q, err := ch.QueueDeclare( + string(queueName), // 队列名 + true, // 是否持久化 + false, // 是否自动删除 + false, // 是否排他 + false, // 是否等待服务器响应 + nil, // 其他参数 + ) + if err != nil { + log.Fatalf("Failed to declare a queue: %v", err) + } + mapMq[queueName] = queueItem{ + ch: ch, + queue: q, + } + } + return &RabbitMqHandle{} +} + +// 发送消息 +func (h *RabbitMqHandle) SendMsg(queueName constants.RABBIT_MQ, message []byte) error { + object, ok := mapMq[queueName] + if !ok { + return errors.New("unknown queue") + } + // 发送消息到队列 + return object.ch.Publish( + "", // exchange,如果为空,则使用默认交换机 + object.queue.Name, // routing key,将消息发送到指定队列 + false, // 是否等待服务器响应 + false, // 是否立即发送 + amqp.Publishing{ + ContentType: "text/plain", //普通文本 + Body: message, + }, // 消息内容 + ) +} + +// 消费消息 +func (h *RabbitMqHandle) Consume(queueName constants.RABBIT_MQ, handleFunc func(data []byte) error) error { + object, ok := mapMq[queueName] + if !ok { + return errors.New("unknown queue") + } + msgs, err := object.ch.Consume( + object.queue.Name, // 队列名 + object.queue.Name, // 消费者名,如果为空,则是随机生成一个 + false, // 自动应答 + false, // 是否排他 + false, // 是否阻塞 + false, // 是否等待服务器响应 + nil, // 其他参数 + ) + if err != nil { + log.Fatalf("Failed to register a consumer: %v", err) + } + //允许20的并发 + limit := make(chan struct{}, 20) + defer close(limit) + // 消费消息 + for msg := range msgs { + limit <- struct{}{} + go func(m amqp.Delivery) { + if err := recover(); err != nil { + logx.Error(err) + } + defer func() { + <-limit + }() + if err = handleFunc(m.Body); err != nil { + logx.Error("failed to deal with MQ message:", string(m.Body)) + return + } + if err = object.ch.Ack(m.DeliveryTag, false); err != nil { + logx.Error("failed to ack mq to delete") + } else { + log.Printf("Consume Mq message success: %s", m.Body) + } + }(msg) + } + return nil +} diff --git a/server/websocket/etc/websocket.yaml b/server/websocket/etc/websocket.yaml index 7d2b68d7..5f58765d 100644 --- a/server/websocket/etc/websocket.yaml +++ b/server/websocket/etc/websocket.yaml @@ -5,4 +5,5 @@ SourceMysql: fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest Auth: AccessSecret: fusen2023 AccessExpire: 2592000 - RefreshAfter: 1592000 \ No newline at end of file + RefreshAfter: 1592000 +SourceRabbitMq: amqp://rabbit001:rabbit001129@110.41.19.98:5672 \ No newline at end of file diff --git a/server/websocket/internal/config/config.go b/server/websocket/internal/config/config.go index 3ed0e8c4..321047cb 100644 --- a/server/websocket/internal/config/config.go +++ b/server/websocket/internal/config/config.go @@ -8,6 +8,7 @@ import ( type Config struct { rest.RestConf - SourceMysql string - Auth types.Auth + SourceMysql string + Auth types.Auth + SourceRabbitMq string } diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 44ba0abb..25bf50e7 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "fusenapi/constants" + "fusenapi/initalize" "fusenapi/server/websocket/internal/types" "fusenapi/utils/auth" "fusenapi/utils/id_generator" @@ -64,13 +65,14 @@ var ( // 每个连接的连接基本属性 type wsConnectItem struct { conn *websocket.Conn //websocket的连接 - closeChan chan struct{} //ws连接关闭chan - isClose bool //是否已经关闭 - uniqueId uint64 //ws连接唯一标识 - inChan chan []byte //接受消息缓冲通道 - outChan chan []byte //发送回客户端的消息 - mutex sync.Mutex //互斥锁 - renderProperty renderProperty //扩展云渲染属性 + rabbitMq *initalize.RabbitMqHandle + closeChan chan struct{} //ws连接关闭chan + isClose bool //是否已经关闭 + uniqueId uint64 //ws连接唯一标识 + inChan chan []byte //接受消息缓冲通道 + outChan chan []byte //发送回客户端的消息 + mutex sync.Mutex //互斥锁 + renderProperty renderProperty //扩展云渲染属性 } func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.ResponseWriter, r *http.Request) { @@ -100,6 +102,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp uniqueId := websocketIdGenerator.Get() ws := wsConnectItem{ conn: conn, + rabbitMq: l.svcCtx.RabbitMq, uniqueId: uniqueId, closeChan: make(chan struct{}, 1), inChan: make(chan []byte, 1000), @@ -272,7 +275,7 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { switch parseInfo.T { //图片渲染 case constants.WEBSOCKET_RENDER_IMAGE: - go w.SendToCloudRender(d) + w.SendToCloudRender(d) default: } diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index d58def77..c91d09c0 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -59,7 +59,12 @@ func (w *wsConnectItem) SendToCloudRender(data []byte) { Option: 1, //0删除 1添加 Key: key, } - // TODO 数据发送给云渲染服务器 + //发送给对应的流水线组装数据 + if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, data); err != nil { + logx.Error("发送渲染任务数据到MQ失败:", string(data)) + continue + } + logx.Info("发送渲染数据到rabbitmq成功:", string(data)) } } } diff --git a/server/websocket/internal/svc/servicecontext.go b/server/websocket/internal/svc/servicecontext.go index e5c61d07..35b017de 100644 --- a/server/websocket/internal/svc/servicecontext.go +++ b/server/websocket/internal/svc/servicecontext.go @@ -21,6 +21,7 @@ type ServiceContext struct { MysqlConn *gorm.DB AllModels *gmodel.AllModelsGen + RabbitMq *initalize.RabbitMqHandle } func NewServiceContext(c config.Config) *ServiceContext { @@ -29,9 +30,10 @@ func NewServiceContext(c config.Config) *ServiceContext { return &ServiceContext{ Config: c, - MysqlConn: conn, SharedState: StateServer, + MysqlConn: initalize.InitMysql(c.SourceMysql), AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)), + RabbitMq: initalize.InitRabbitMq(c.SourceRabbitMq, nil), } } func (svcCtx *ServiceContext) ParseJwtToken(r *http.Request) (jwt.MapClaims, error) {