From f316a340c89c3bf11777c9d4a08ab5aea627803f Mon Sep 17 00:00:00 2001 From: laodaming <11058467+laudamine@user.noreply.gitee.com> Date: Fri, 28 Jul 2023 15:10:06 +0800 Subject: [PATCH] fix --- constants/rabbitmq.go | 17 ++++++ go.mod | 1 + go.sum | 10 ++++ goctl_template/api/config.tpl | 1 + goctl_template/api/etc.tpl | 3 +- initalize/rabbitmq.go | 67 ++++++++++++++++++++++ server/websocket/etc/websocket.yaml | 3 +- server/websocket/internal/config/config.go | 6 +- 8 files changed, 103 insertions(+), 5 deletions(-) create mode 100644 constants/rabbitmq.go create mode 100644 initalize/rabbitmq.go diff --git a/constants/rabbitmq.go b/constants/rabbitmq.go new file mode 100644 index 00000000..c7b8281e --- /dev/null +++ b/constants/rabbitmq.go @@ -0,0 +1,17 @@ +package constants + +type RABBIT_MQ string + +// 消息队列队列名 +const ( + //组装渲染数据队列 + RABBIT_MQ_ASSEMBLE_RENDER_DATA RABBIT_MQ = "RABBIT_MQ_ASSEMBLE_RENDER_DATA" + //渲染结果数据队列 + RABBIT_MQ_RENDER_RESULT_DATA RABBIT_MQ = "RABBIT_MQ_RENDER_RESULT_DATA" +) + +// 队列列表 +var MQ_QUEUE_ARR = []RABBIT_MQ{ + RABBIT_MQ_ASSEMBLE_RENDER_DATA, + RABBIT_MQ_RENDER_RESULT_DATA, +} diff --git a/go.mod b/go.mod index 6d45a2e4..39c5f943 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/andybalholm/brotli v1.0.5 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/schollz/progressbar v1.0.0 // indirect + github.com/streadway/amqp v1.1.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.14.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect diff --git a/go.sum b/go.sum index 959acd49..30103532 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,7 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -120,6 +121,7 @@ github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -183,8 +185,11 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -199,6 +204,9 @@ github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPn github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= @@ -231,6 +239,8 @@ github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e/go.mod h1:XV66xRDq github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= +github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= diff --git a/goctl_template/api/config.tpl b/goctl_template/api/config.tpl index dff7003f..0e814d42 100644 --- a/goctl_template/api/config.tpl +++ b/goctl_template/api/config.tpl @@ -6,6 +6,7 @@ type Config struct { rest.RestConf SourceMysql string Auth types.Auth + SourceRabbitMq string {{.auth}} {{.jwtTrans}} } diff --git a/goctl_template/api/etc.tpl b/goctl_template/api/etc.tpl index 3279e7ea..e656b973 100644 --- a/goctl_template/api/etc.tpl +++ b/goctl_template/api/etc.tpl @@ -5,4 +5,5 @@ SourceMysql: fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest Auth: AccessSecret: fusen2023 AccessExpire: 2592000 - RefreshAfter: 1592000 \ No newline at end of file + RefreshAfter: 1592000 +SourceRabbitMq: amqp://guest:guest@localhost:5672 \ No newline at end of file diff --git a/initalize/rabbitmq.go b/initalize/rabbitmq.go new file mode 100644 index 00000000..96a4ad1d --- /dev/null +++ b/initalize/rabbitmq.go @@ -0,0 +1,67 @@ +package initalize + +import ( + "crypto/tls" + "fusenapi/constants" + "github.com/streadway/amqp" + "log" + "time" +) + +// handle +type queueItem struct { + Ch *amqp.Channel + Queue amqp.Queue +} + +var mapMq = make(map[string]*queueItem) + +func InitRabbitMq(url string, config *tls.Config) (map[string]*queueItem, error) { + conn, err := amqp.DialTLS(url, config) + if err != nil { + return nil, err + } + // 创建一个通道 + ch, err := conn.Channel() + if err != nil { + return nil, err + } + //声明队列 + for _, queueName := range constants.MQ_QUEUE_ARR { + q, err := ch.QueueDeclare( + string(queueName), // 队列名 + true, // 是否持久化 + false, // 是否自动删除 + false, // 是否排他 + false, // 是否等待服务器响应 + nil, // 其他参数 + ) + if err != nil { + conn.Close() + ch.Close() + log.Fatalf("Failed to declare a queue: %v", err) + } + mapMq[string(queueName)] = &queueItem{ + Ch: ch, + Queue: q, + } + } + go checkAlive(url, config, conn, ch) + return mapMq, nil +} + +func checkAlive(url string, config *tls.Config, conn *amqp.Connection, ch *amqp.Channel) { + var err error + for { + time.Sleep(time.Second * 1) + //断开重连 + if conn.IsClosed() { + mapMq, err = InitRabbitMq(url, config) + if err == nil { + return + } else { + continue + } + } + } +} diff --git a/server/websocket/etc/websocket.yaml b/server/websocket/etc/websocket.yaml index 7d2b68d7..71b4fead 100644 --- a/server/websocket/etc/websocket.yaml +++ b/server/websocket/etc/websocket.yaml @@ -5,4 +5,5 @@ SourceMysql: fusentest:XErSYmLELKMnf3Dh@tcp(110.41.19.98:3306)/fusentest Auth: AccessSecret: fusen2023 AccessExpire: 2592000 - RefreshAfter: 1592000 \ No newline at end of file + RefreshAfter: 1592000 +SourceRabbitMq: amqp://guest:guest@localhost:5672 \ No newline at end of file diff --git a/server/websocket/internal/config/config.go b/server/websocket/internal/config/config.go index 3ed0e8c4..ee0f22f7 100644 --- a/server/websocket/internal/config/config.go +++ b/server/websocket/internal/config/config.go @@ -2,12 +2,12 @@ package config import ( "fusenapi/server/websocket/internal/types" - "github.com/zeromicro/go-zero/rest" ) type Config struct { rest.RestConf - SourceMysql string - Auth types.Auth + SourceMysql string + Auth types.Auth + SourceRabbitMq string }