fusenapi/initalize/rabbitmq.go
laodaming df6e4a0899 fix
2023-08-29 15:30:54 +08:00

135 lines
3.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package initalize
import (
"context"
"crypto/tls"
"errors"
"fusenapi/constants"
"fusenapi/utils/mq_consumer_factory"
"github.com/streadway/amqp"
"github.com/zeromicro/go-zero/core/logx"
"log"
"strings"
"sync"
)
type RabbitMqHandle struct {
}
// 连接属性
type queueItem struct {
ch *amqp.Channel
queue amqp.Queue
}
// 存储连接
var mapMq = make(map[constants.RABBIT_MQ]queueItem)
func InitRabbitMq(url string, config *tls.Config) *RabbitMqHandle {
url = strings.Trim(url, " ")
if url == "" {
return nil
}
conn, err := amqp.DialTLS(url, config)
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
// 创建一个通道
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
//声明队列
for _, queueName := range constants.MqQueueArr {
q, err := ch.QueueDeclare(
string(queueName), // 队列名
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待服务器响应
nil, // 其他参数
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
mapMq[queueName] = queueItem{
ch: ch,
queue: q,
}
}
return &RabbitMqHandle{}
}
// 发送消息
func (h *RabbitMqHandle) SendMsg(queueName constants.RABBIT_MQ, message []byte) error {
object, ok := mapMq[queueName]
if !ok {
return errors.New("unknown queue")
}
// 发送消息到队列
return object.ch.Publish(
"", // exchange如果为空则使用默认交换机
object.queue.Name, // routing key将消息发送到指定队列
false, // 是否等待服务器响应
false, // 是否立即发送
amqp.Publishing{
ContentType: "text/plain", //普通文本
Body: message,
}, // 消息内容
)
}
// 消费消息
func (h *RabbitMqHandle) Consume(ctx context.Context, queueName constants.RABBIT_MQ, handle mq_consumer_factory.MqHandle) {
object, ok := mapMq[queueName]
if !ok {
panic("unknown queue")
}
go func() {
select {
case <-ctx.Done():
panic("err ctx deadline")
}
}()
msgs, err := object.ch.Consume(
object.queue.Name, // 队列名
object.queue.Name, // 消费者名,如果为空,则是随机生成一个
false, // 自动应答
false, // 是否排他
false, // 是否阻塞
false, // 是否等待服务器响应
nil, // 其他参数
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
//允许20的并发
limit := make(chan struct{}, 20)
wait := sync.WaitGroup{}
defer close(limit)
// 消费消息
for msg := range msgs {
limit <- struct{}{}
wait.Add(1)
go func(m amqp.Delivery) {
if err := recover(); err != nil {
logx.Error(err)
}
defer func() {
<-limit
wait.Done()
}()
if err = handle.Run(ctx, m.Body); err != nil {
logx.Error("failed to deal with MQ message:", string(m.Body))
return
}
if err = object.ch.Ack(m.DeliveryTag, false); err != nil {
logx.Error("failed to ack mq to delete")
} else {
log.Printf("Consume Mq message success: %s", m.Body)
}
}(msg)
}
wait.Wait()
}