From f316a340c89c3bf11777c9d4a08ab5aea627803f Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 28 Jul 2023 15:10:06 +0800 Subject: [PATCH 01/12] fix --- constants/rabbitmq.go | 17 ++++++ go.mod | 1 + go.sum | 10 ++++ goctl_template/api/config.tpl | 1 + goctl_template/api/etc.tpl | 3 +- initalize/rabbitmq.go | 67 ++++++++++++++++++++++ server/websocket/etc/websocket.yaml | 3 +- server/websocket/internal/config/config.go | 6 +- 8 files changed, 103 insertions(+), 5 deletions(-) create mode 100644 constants/rabbitmq.go create mode 100644 initalize/rabbitmq.go diff --git a/constants/rabbitmq.go b/constants/rabbitmq.go new file mode 100644 index 00000000..c7b8281e --- /dev/null +++ b/constants/rabbitmq.go @@ -0,0 +1,17 @@ +package constants + +type RABBIT_MQ string + +// 消息队列队列名 +const ( + //组装渲染数据队列 + RABBIT_MQ_ASSEMBLE_RENDER_DATA RABBIT_MQ = "RABBIT_MQ_ASSEMBLE_RENDER_DATA" + //渲染结果数据队列 + RABBIT_MQ_RENDER_RESULT_DATA RABBIT_MQ = "RABBIT_MQ_RENDER_RESULT_DATA" +) + +// 队列列表 +var MQ_QUEUE_ARR = []RABBIT_MQ{ + RABBIT_MQ_ASSEMBLE_RENDER_DATA, + RABBIT_MQ_RENDER_RESULT_DATA, +} diff --git a/go.mod b/go.mod index 6d45a2e4..39c5f943 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/schollz/progressbar v1.0.0 // indirect + github.com/streadway/amqp v1.1.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect diff --git a/go.sum b/go.sum index 959acd49..30103532 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,7 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -120,6 +121,7 @@ github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -183,8 +185,11 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -199,6 +204,9 @@ github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPn github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= @@ -231,6 +239,8 @@ github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e/go.mod h1:XV66xRDq github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= +github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= diff --git a/goctl_template/api/config.tpl b/goctl_template/api/config.tpl index dff7003f..0e814d42 100644 --- a/goctl_template/api/config.tpl +++ b/goctl_template/api/config.tpl @@ -6,6 +6,7 @@ type Config struct { rest.RestConf SourceMysql string Auth types.Auth + SourceRabbitMq string {{.auth}} {{.jwtTrans}} } diff --git a/goctl_template/api/etc.tpl b/goctl_template/api/etc.tpl index 3279e7ea..e656b973 100644 --- a/goctl_template/api/etc.tpl +++ b/goctl_template/api/etc.tpl @@ -5,4 +5,5 @@ SourceMysql: fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest Auth: AccessSecret: fusen2023 AccessExpire: 2592000 - RefreshAfter: 1592000 \ No newline at end of file + RefreshAfter: 1592000 +SourceRabbitMq: amqp://guest:guest@localhost:5672 \ No newline at end of file diff --git a/initalize/rabbitmq.go b/initalize/rabbitmq.go new file mode 100644 index 00000000..96a4ad1d --- /dev/null +++ b/initalize/rabbitmq.go @@ -0,0 +1,67 @@ +package initalize + +import ( + "crypto/tls" + "fusenapi/constants" + "github.com/streadway/amqp" + "log" + "time" +) + +// handle +type queueItem struct { + Ch *amqp.Channel + Queue amqp.Queue +} + +var mapMq = make(map[string]*queueItem) + +func InitRabbitMq(url string, config *tls.Config) (map[string]*queueItem, error) { + conn, err := amqp.DialTLS(url, config) + if err != nil { + return nil, err + } + // 创建一个通道 + ch, err := conn.Channel() + if err != nil { + return nil, err + } + //声明队列 + for _, queueName := range constants.MQ_QUEUE_ARR { + q, err := ch.QueueDeclare( + string(queueName), // 队列名 + true, // 是否持久化 + false, // 是否自动删除 + false, // 是否排他 + false, // 是否等待服务器响应 + nil, // 其他参数 + ) + if err != nil { + conn.Close() + ch.Close() + log.Fatalf("Failed to declare a queue: %v", err) + } + mapMq[string(queueName)] = &queueItem{ + Ch: ch, + Queue: q, + } + } + go checkAlive(url, config, conn, ch) + return mapMq, nil +} + +func checkAlive(url string, config *tls.Config, conn *amqp.Connection, ch *amqp.Channel) { + var err error + for { + time.Sleep(time.Second * 1) + //断开重连 + if conn.IsClosed() { + mapMq, err = InitRabbitMq(url, config) + if err == nil { + return + } else { + continue + } + } + } +} diff --git a/server/websocket/etc/websocket.yaml b/server/websocket/etc/websocket.yaml index 7d2b68d7..71b4fead 100644 --- a/server/websocket/etc/websocket.yaml +++ b/server/websocket/etc/websocket.yaml @@ -5,4 +5,5 @@ SourceMysql: fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest Auth: AccessSecret: fusen2023 AccessExpire: 2592000 - RefreshAfter: 1592000 \ No newline at end of file + RefreshAfter: 1592000 +SourceRabbitMq: amqp://guest:guest@localhost:5672 \ No newline at end of file diff --git a/server/websocket/internal/config/config.go b/server/websocket/internal/config/config.go index 3ed0e8c4..ee0f22f7 100644 --- a/server/websocket/internal/config/config.go +++ b/server/websocket/internal/config/config.go @@ -2,12 +2,12 @@ package config import ( "fusenapi/server/websocket/internal/types" - "github.com/zeromicro/go-zero/rest" ) type Config struct { rest.RestConf - SourceMysql string - Auth types.Auth + SourceMysql string + Auth types.Auth + SourceRabbitMq string } From 9c3d9c06b3162bfbe066e80c6a4cb1188b6c35b2 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 28 Jul 2023 15:19:48 +0800 Subject: [PATCH 02/12] fix --- goctl_template/api/context.tpl | 2 ++ initalize/rabbitmq.go | 33 ++++--------------- .../websocket/internal/svc/servicecontext.go | 2 ++ 3 files changed, 11 insertions(+), 26 deletions(-) diff --git a/goctl_template/api/context.tpl b/goctl_template/api/context.tpl index 3195fe63..70f70ff2 100644 --- a/goctl_template/api/context.tpl +++ b/goctl_template/api/context.tpl @@ -18,6 +18,7 @@ type ServiceContext struct { {{.middleware}} MysqlConn *gorm.DB AllModels *gmodel.AllModelsGen + RabbitMq map[string]*initalize.QueueItem } func NewServiceContext(c {{.config}}) *ServiceContext { @@ -26,6 +27,7 @@ func NewServiceContext(c {{.config}}) *ServiceContext { Config: c, MysqlConn: initalize.InitMysql(c.SourceMysql), AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)), + RabbitMq: initalize.InitRabbitMq(c.SourceRabbitMq, nil), {{.middlewareAssignment}} } } diff --git a/initalize/rabbitmq.go b/initalize/rabbitmq.go index 96a4ad1d..484da01f 100644 --- a/initalize/rabbitmq.go +++ b/initalize/rabbitmq.go @@ -5,27 +5,25 @@ import ( "fusenapi/constants" "github.com/streadway/amqp" "log" - "time" ) // handle -type queueItem struct { +type QueueItem struct { Ch *amqp.Channel Queue amqp.Queue } -var mapMq = make(map[string]*queueItem) - -func InitRabbitMq(url string, config *tls.Config) (map[string]*queueItem, error) { +func InitRabbitMq(url string, config *tls.Config) map[string]*QueueItem { conn, err := amqp.DialTLS(url, config) if err != nil { - return nil, err + log.Fatalf("Failed to connect to RabbitMQ: %v", err) } // 创建一个通道 ch, err := conn.Channel() if err != nil { - return nil, err + log.Fatalf("Failed to open a channel: %v", err) } + mapMq := make(map[string]*QueueItem) //声明队列 for _, queueName := range constants.MQ_QUEUE_ARR { q, err := ch.QueueDeclare( @@ -41,27 +39,10 @@ func InitRabbitMq(url string, config *tls.Config) (map[string]*queueItem, error) ch.Close() log.Fatalf("Failed to declare a queue: %v", err) } - mapMq[string(queueName)] = &queueItem{ + mapMq[string(queueName)] = &QueueItem{ Ch: ch, Queue: q, } } - go checkAlive(url, config, conn, ch) - return mapMq, nil -} - -func checkAlive(url string, config *tls.Config, conn *amqp.Connection, ch *amqp.Channel) { - var err error - for { - time.Sleep(time.Second * 1) - //断开重连 - if conn.IsClosed() { - mapMq, err = InitRabbitMq(url, config) - if err == nil { - return - } else { - continue - } - } - } + return mapMq } diff --git a/server/websocket/internal/svc/servicecontext.go b/server/websocket/internal/svc/servicecontext.go index 9a22471e..cecf6811 100644 --- a/server/websocket/internal/svc/servicecontext.go +++ b/server/websocket/internal/svc/servicecontext.go @@ -18,6 +18,7 @@ type ServiceContext struct { MysqlConn *gorm.DB AllModels *gmodel.AllModelsGen + RabbitMq map[string]*initalize.QueueItem } func NewServiceContext(c config.Config) *ServiceContext { @@ -26,6 +27,7 @@ func NewServiceContext(c config.Config) *ServiceContext { Config: c, MysqlConn: initalize.InitMysql(c.SourceMysql), AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)), + RabbitMq: initalize.InitRabbitMq(c.SourceRabbitMq, nil), } } From a3e0f4b7d74c639e1b0438f88b1401e2fd797b3f Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 28 Jul 2023 15:39:15 +0800 Subject: [PATCH 03/12] fix --- goctl_template/api/context.tpl | 4 +--- initalize/rabbitmq.go | 9 ++++++--- server/websocket/internal/logic/ws_render_image_logic.go | 1 + 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/goctl_template/api/context.tpl b/goctl_template/api/context.tpl index 70f70ff2..ca4b66ff 100644 --- a/goctl_template/api/context.tpl +++ b/goctl_template/api/context.tpl @@ -18,16 +18,14 @@ type ServiceContext struct { {{.middleware}} MysqlConn *gorm.DB AllModels *gmodel.AllModelsGen - RabbitMq map[string]*initalize.QueueItem } func NewServiceContext(c {{.config}}) *ServiceContext { - + initalize.InitRabbitMq(c.SourceRabbitMq, nil) return &ServiceContext{ Config: c, MysqlConn: initalize.InitMysql(c.SourceMysql), AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)), - RabbitMq: initalize.InitRabbitMq(c.SourceRabbitMq, nil), {{.middlewareAssignment}} } } diff --git a/initalize/rabbitmq.go b/initalize/rabbitmq.go index 484da01f..f3a70475 100644 --- a/initalize/rabbitmq.go +++ b/initalize/rabbitmq.go @@ -13,7 +13,12 @@ type QueueItem struct { Queue amqp.Queue } -func InitRabbitMq(url string, config *tls.Config) map[string]*QueueItem { +var mapMq = make(map[string]*QueueItem) + +func InitRabbitMq(url string, config *tls.Config) { + if url == "" { + return + } conn, err := amqp.DialTLS(url, config) if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) @@ -23,7 +28,6 @@ func InitRabbitMq(url string, config *tls.Config) map[string]*QueueItem { if err != nil { log.Fatalf("Failed to open a channel: %v", err) } - mapMq := make(map[string]*QueueItem) //声明队列 for _, queueName := range constants.MQ_QUEUE_ARR { q, err := ch.QueueDeclare( @@ -44,5 +48,4 @@ func InitRabbitMq(url string, config *tls.Config) map[string]*QueueItem { Queue: q, } } - return mapMq } diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index d58def77..7a5242cd 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -59,6 +59,7 @@ func (w *wsConnectItem) SendToCloudRender(data []byte) { Option: 1, //0删除 1添加 Key: key, } + w.rabbitMq // TODO 数据发送给云渲染服务器 } } From b3f28c3cb816b563cf7382a25ceb229a66ad326c Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 28 Jul 2023 15:39:57 +0800 Subject: [PATCH 04/12] fix --- .../websocket/internal/svc/servicecontext.go | 63 ------------------- 1 file changed, 63 deletions(-) delete mode 100644 server/websocket/internal/svc/servicecontext.go diff --git a/server/websocket/internal/svc/servicecontext.go b/server/websocket/internal/svc/servicecontext.go deleted file mode 100644 index cecf6811..00000000 --- a/server/websocket/internal/svc/servicecontext.go +++ /dev/null @@ -1,63 +0,0 @@ -package svc - -import ( - "errors" - "fmt" - "fusenapi/server/websocket/internal/config" - "net/http" - - "fusenapi/initalize" - "fusenapi/model/gmodel" - - "github.com/golang-jwt/jwt" - "gorm.io/gorm" -) - -type ServiceContext struct { - Config config.Config - - MysqlConn *gorm.DB - AllModels *gmodel.AllModelsGen - RabbitMq map[string]*initalize.QueueItem -} - -func NewServiceContext(c config.Config) *ServiceContext { - - return &ServiceContext{ - Config: c, - MysqlConn: initalize.InitMysql(c.SourceMysql), - AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)), - RabbitMq: initalize.InitRabbitMq(c.SourceRabbitMq, nil), - } -} - -func (svcCtx *ServiceContext) ParseJwtToken(r *http.Request) (jwt.MapClaims, error) { - AuthKey := r.Header.Get("Authorization") - if AuthKey == "" { - return nil, nil - } - AuthKey = AuthKey[7:] - - if len(AuthKey) <= 50 { - return nil, errors.New(fmt.Sprint("Error parsing token, len:", len(AuthKey))) - } - - token, err := jwt.Parse(AuthKey, func(token *jwt.Token) (interface{}, error) { - // 检查签名方法是否为 HS256 - if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { - return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"]) - } - // 返回用于验证签名的密钥 - return []byte(svcCtx.Config.Auth.AccessSecret), nil - }) - if err != nil { - return nil, errors.New(fmt.Sprint("Error parsing token:", err)) - } - - // 验证成功返回 - if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid { - return claims, nil - } - - return nil, errors.New(fmt.Sprint("Invalid token", err)) -} From 8be93b9d5c3c6cf7c2f784bc43615774ad5ad55b Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 28 Jul 2023 15:40:45 +0800 Subject: [PATCH 05/12] fix --- server/websocket/internal/logic/ws_render_image_logic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index 7a5242cd..019838cb 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -59,7 +59,7 @@ func (w *wsConnectItem) SendToCloudRender(data []byte) { Option: 1, //0删除 1添加 Key: key, } - w.rabbitMq + //w.rabbitMq // TODO 数据发送给云渲染服务器 } } From a33f39df5cbb40e0aa4e771c77499b5221a1d5da Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 28 Jul 2023 15:44:29 +0800 Subject: [PATCH 06/12] fix --- server/websocket/internal/logic/ws_render_image_logic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index 019838cb..9a5f7d11 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -59,7 +59,7 @@ func (w *wsConnectItem) SendToCloudRender(data []byte) { Option: 1, //0删除 1添加 Key: key, } - //w.rabbitMq + // TODO 数据发送给云渲染服务器 } } From 5871da7b547ae9270a2e1f90c7c5ccaed1b8dec8 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 28 Jul 2023 15:44:42 +0800 Subject: [PATCH 07/12] fix --- .../websocket/internal/svc/servicecontext.go | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 server/websocket/internal/svc/servicecontext.go diff --git a/server/websocket/internal/svc/servicecontext.go b/server/websocket/internal/svc/servicecontext.go new file mode 100644 index 00000000..83c5d7a3 --- /dev/null +++ b/server/websocket/internal/svc/servicecontext.go @@ -0,0 +1,61 @@ +package svc + +import ( + "errors" + "fmt" + "fusenapi/server/websocket/internal/config" + "net/http" + + "fusenapi/initalize" + "fusenapi/model/gmodel" + + "github.com/golang-jwt/jwt" + "gorm.io/gorm" +) + +type ServiceContext struct { + Config config.Config + + MysqlConn *gorm.DB + AllModels *gmodel.AllModelsGen +} + +func NewServiceContext(c config.Config) *ServiceContext { + initalize.InitRabbitMq(c.SourceRabbitMq, nil) + return &ServiceContext{ + Config: c, + MysqlConn: initalize.InitMysql(c.SourceMysql), + AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)), + } +} + +func (svcCtx *ServiceContext) ParseJwtToken(r *http.Request) (jwt.MapClaims, error) { + AuthKey := r.Header.Get("Authorization") + if AuthKey == "" { + return nil, nil + } + AuthKey = AuthKey[7:] + + if len(AuthKey) <= 50 { + return nil, errors.New(fmt.Sprint("Error parsing token, len:", len(AuthKey))) + } + + token, err := jwt.Parse(AuthKey, func(token *jwt.Token) (interface{}, error) { + // 检查签名方法是否为 HS256 + if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { + return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"]) + } + // 返回用于验证签名的密钥 + return []byte(svcCtx.Config.Auth.AccessSecret), nil + }) + if err != nil { + return nil, errors.New(fmt.Sprint("Error parsing token:", err)) + } + + // 验证成功返回 + if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid { + return claims, nil + } + + return nil, errors.New(fmt.Sprint("Invalid token", err)) +} From fef9f8d3908c142e7df72572beff51f5b1748a27 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 28 Jul 2023 17:15:37 +0800 Subject: [PATCH 08/12] fix --- goctl_template/api/context.tpl | 2 + initalize/rabbitmq.go | 93 ++++++++++++++++--- .../internal/logic/datatransferlogic.go | 19 ++-- .../internal/logic/ws_render_image_logic.go | 6 +- .../websocket/internal/svc/servicecontext.go | 2 + 5 files changed, 101 insertions(+), 21 deletions(-) diff --git a/goctl_template/api/context.tpl b/goctl_template/api/context.tpl index ca4b66ff..e5884b34 100644 --- a/goctl_template/api/context.tpl +++ b/goctl_template/api/context.tpl @@ -18,6 +18,7 @@ type ServiceContext struct { {{.middleware}} MysqlConn *gorm.DB AllModels *gmodel.AllModelsGen + RabbitMq *initalize.RabbitMqHandle } func NewServiceContext(c {{.config}}) *ServiceContext { @@ -26,6 +27,7 @@ func NewServiceContext(c {{.config}}) *ServiceContext { Config: c, MysqlConn: initalize.InitMysql(c.SourceMysql), AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)), + RabbitMq:initalize.InitRabbitMq(c.SourceRabbitMq, nil), {{.middlewareAssignment}} } } diff --git a/initalize/rabbitmq.go b/initalize/rabbitmq.go index f3a70475..5bfffa9e 100644 --- a/initalize/rabbitmq.go +++ b/initalize/rabbitmq.go @@ -2,22 +2,28 @@ package initalize import ( "crypto/tls" + "errors" "fusenapi/constants" "github.com/streadway/amqp" + "github.com/zeromicro/go-zero/core/logx" "log" ) -// handle -type QueueItem struct { - Ch *amqp.Channel - Queue amqp.Queue +type RabbitMqHandle struct { } -var mapMq = make(map[string]*QueueItem) +// 连接属性 +type queueItem struct { + ch *amqp.Channel + queue amqp.Queue +} -func InitRabbitMq(url string, config *tls.Config) { +// 存储连接 +var mapMq = make(map[constants.RABBIT_MQ]queueItem) + +func InitRabbitMq(url string, config *tls.Config) *RabbitMqHandle { if url == "" { - return + return nil } conn, err := amqp.DialTLS(url, config) if err != nil { @@ -39,13 +45,76 @@ func InitRabbitMq(url string, config *tls.Config) { nil, // 其他参数 ) if err != nil { - conn.Close() - ch.Close() log.Fatalf("Failed to declare a queue: %v", err) } - mapMq[string(queueName)] = &QueueItem{ - Ch: ch, - Queue: q, + mapMq[queueName] = queueItem{ + ch: ch, + queue: q, } } + return &RabbitMqHandle{} +} + +// 发送消息 +func (h *RabbitMqHandle) SendMsg(queueName constants.RABBIT_MQ, message []byte) error { + object, ok := mapMq[queueName] + if !ok { + return errors.New("unknown queue") + } + // 发送消息到队列 + return object.ch.Publish( + "", // exchange,如果为空,则使用默认交换机 + object.queue.Name, // routing key,将消息发送到指定队列 + false, // 是否等待服务器响应 + false, // 是否立即发送 + amqp.Publishing{ + ContentType: "text/plain", //普通文本 + Body: message, + }, // 消息内容 + ) +} + +// 消费消息 +func (h *RabbitMqHandle) Consume(queueName constants.RABBIT_MQ, handleFunc func(data []byte) error) error { + object, ok := mapMq[queueName] + if !ok { + return errors.New("unknown queue") + } + msgs, err := object.ch.Consume( + object.queue.Name, // 队列名 + "", // 消费者名,如果为空,则是随机生成一个 + false, // 自动应答 + false, // 是否排他 + false, // 是否阻塞 + false, // 是否等待服务器响应 + nil, // 其他参数 + ) + if err != nil { + log.Fatalf("Failed to register a consumer: %v", err) + } + //允许20的并发 + limit := make(chan struct{}, 20) + defer close(limit) + // 消费消息 + for msg := range msgs { + limit <- struct{}{} + go func(m amqp.Delivery) { + if err := recover(); err != nil { + logx.Error(err) + } + defer func() { + <-limit + }() + if err = handleFunc(m.Body); err != nil { + logx.Error("failed to deal with MQ message:", string(m.Body)) + return + } + if err = object.ch.Ack(m.DeliveryTag, false); err != nil { + logx.Error("failed to ack mq to delete") + } else { + log.Printf("Consume Mq message success: %s", m.Body) + } + }(msg) + } + return nil } diff --git a/server/websocket/internal/logic/datatransferlogic.go b/server/websocket/internal/logic/datatransferlogic.go index 44ba0abb..25bf50e7 100644 --- a/server/websocket/internal/logic/datatransferlogic.go +++ b/server/websocket/internal/logic/datatransferlogic.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "fusenapi/constants" + "fusenapi/initalize" "fusenapi/server/websocket/internal/types" "fusenapi/utils/auth" "fusenapi/utils/id_generator" @@ -64,13 +65,14 @@ var ( // 每个连接的连接基本属性 type wsConnectItem struct { conn *websocket.Conn //websocket的连接 - closeChan chan struct{} //ws连接关闭chan - isClose bool //是否已经关闭 - uniqueId uint64 //ws连接唯一标识 - inChan chan []byte //接受消息缓冲通道 - outChan chan []byte //发送回客户端的消息 - mutex sync.Mutex //互斥锁 - renderProperty renderProperty //扩展云渲染属性 + rabbitMq *initalize.RabbitMqHandle + closeChan chan struct{} //ws连接关闭chan + isClose bool //是否已经关闭 + uniqueId uint64 //ws连接唯一标识 + inChan chan []byte //接受消息缓冲通道 + outChan chan []byte //发送回客户端的消息 + mutex sync.Mutex //互斥锁 + renderProperty renderProperty //扩展云渲染属性 } func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.ResponseWriter, r *http.Request) { @@ -100,6 +102,7 @@ func (l *DataTransferLogic) DataTransfer(svcCtx *svc.ServiceContext, w http.Resp uniqueId := websocketIdGenerator.Get() ws := wsConnectItem{ conn: conn, + rabbitMq: l.svcCtx.RabbitMq, uniqueId: uniqueId, closeChan: make(chan struct{}, 1), inChan: make(chan []byte, 1000), @@ -272,7 +275,7 @@ func (w *wsConnectItem) dealwithReciveData(data []byte) { switch parseInfo.T { //图片渲染 case constants.WEBSOCKET_RENDER_IMAGE: - go w.SendToCloudRender(d) + w.SendToCloudRender(d) default: } diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index 9a5f7d11..901a01d8 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -59,8 +59,12 @@ func (w *wsConnectItem) SendToCloudRender(data []byte) { Option: 1, //0删除 1添加 Key: key, } - // TODO 数据发送给云渲染服务器 + if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, data); err != nil { + logx.Error(err) + continue + } + logx.Info("发送渲染数据到rabbitmq成功") } } } diff --git a/server/websocket/internal/svc/servicecontext.go b/server/websocket/internal/svc/servicecontext.go index 83c5d7a3..3ca743ec 100644 --- a/server/websocket/internal/svc/servicecontext.go +++ b/server/websocket/internal/svc/servicecontext.go @@ -18,6 +18,7 @@ type ServiceContext struct { MysqlConn *gorm.DB AllModels *gmodel.AllModelsGen + RabbitMq *initalize.RabbitMqHandle } func NewServiceContext(c config.Config) *ServiceContext { @@ -26,6 +27,7 @@ func NewServiceContext(c config.Config) *ServiceContext { Config: c, MysqlConn: initalize.InitMysql(c.SourceMysql), AllModels: gmodel.NewAllModels(initalize.InitMysql(c.SourceMysql)), + RabbitMq: initalize.InitRabbitMq(c.SourceRabbitMq, nil), } } From 3a9c1fb8d9ec57d7384c6e9d308e96bffa78da00 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 28 Jul 2023 17:36:49 +0800 Subject: [PATCH 09/12] fix --- goctl_template/api/etc.tpl | 2 +- server/websocket/etc/websocket.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/goctl_template/api/etc.tpl b/goctl_template/api/etc.tpl index e656b973..a39c0af4 100644 --- a/goctl_template/api/etc.tpl +++ b/goctl_template/api/etc.tpl @@ -6,4 +6,4 @@ Auth: AccessSecret: fusen2023 AccessExpire: 2592000 RefreshAfter: 1592000 -SourceRabbitMq: amqp://guest:guest@localhost:5672 \ No newline at end of file +SourceRabbitMq: amqp://rabbit001:rabbit001129@110.41.19.98:5672 \ No newline at end of file diff --git a/server/websocket/etc/websocket.yaml b/server/websocket/etc/websocket.yaml index 71b4fead..5f58765d 100644 --- a/server/websocket/etc/websocket.yaml +++ b/server/websocket/etc/websocket.yaml @@ -6,4 +6,4 @@ Auth: AccessSecret: fusen2023 AccessExpire: 2592000 RefreshAfter: 1592000 -SourceRabbitMq: amqp://guest:guest@localhost:5672 \ No newline at end of file +SourceRabbitMq: amqp://rabbit001:rabbit001129@110.41.19.98:5672 \ No newline at end of file From 1fd7721e996ff87cb3bf7d8b2d333b2cf6f9b33d Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 28 Jul 2023 17:38:21 +0800 Subject: [PATCH 10/12] fix --- server/websocket/internal/logic/ws_render_image_logic.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index 901a01d8..a14fc1d0 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -61,10 +61,10 @@ func (w *wsConnectItem) SendToCloudRender(data []byte) { } // TODO 数据发送给云渲染服务器 if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, data); err != nil { - logx.Error(err) + logx.Error("发送渲染任务数据到MQ失败:", string(data)) continue } - logx.Info("发送渲染数据到rabbitmq成功") + logx.Info("发送渲染数据到rabbitmq成功:", string(data)) } } } From 297c65776b4c68540b3533246d41103eb9d88245 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 28 Jul 2023 17:39:28 +0800 Subject: [PATCH 11/12] fix --- server/websocket/internal/logic/ws_render_image_logic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/websocket/internal/logic/ws_render_image_logic.go b/server/websocket/internal/logic/ws_render_image_logic.go index a14fc1d0..c91d09c0 100644 --- a/server/websocket/internal/logic/ws_render_image_logic.go +++ b/server/websocket/internal/logic/ws_render_image_logic.go @@ -59,7 +59,7 @@ func (w *wsConnectItem) SendToCloudRender(data []byte) { Option: 1, //0删除 1添加 Key: key, } - // TODO 数据发送给云渲染服务器 + //发送给对应的流水线组装数据 if err := w.rabbitMq.SendMsg(constants.RABBIT_MQ_ASSEMBLE_RENDER_DATA, data); err != nil { logx.Error("发送渲染任务数据到MQ失败:", string(data)) continue From b8c77f1db1ab2e5628bae369cff00bc9e7cd3377 Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 28 Jul 2023 17:43:19 +0800 Subject: [PATCH 12/12] fix --- initalize/rabbitmq.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/initalize/rabbitmq.go b/initalize/rabbitmq.go index 5bfffa9e..3713a7b8 100644 --- a/initalize/rabbitmq.go +++ b/initalize/rabbitmq.go @@ -82,7 +82,7 @@ func (h *RabbitMqHandle) Consume(queueName constants.RABBIT_MQ, handleFunc func( } msgs, err := object.ch.Consume( object.queue.Name, // 队列名 - "", // 消费者名,如果为空,则是随机生成一个 + object.queue.Name, // 消费者名,如果为空,则是随机生成一个 false, // 自动应答 false, // 是否排他 false, // 是否阻塞