121 lines
2.9 KiB
Go
121 lines
2.9 KiB
Go
package initalize
|
||
|
||
import (
|
||
"crypto/tls"
|
||
"errors"
|
||
"fusenapi/constants"
|
||
"github.com/streadway/amqp"
|
||
"github.com/zeromicro/go-zero/core/logx"
|
||
"log"
|
||
)
|
||
|
||
type RabbitMqHandle struct {
|
||
}
|
||
|
||
// 连接属性
|
||
type queueItem struct {
|
||
ch *amqp.Channel
|
||
queue amqp.Queue
|
||
}
|
||
|
||
// 存储连接
|
||
var mapMq = make(map[constants.RABBIT_MQ]queueItem)
|
||
|
||
func InitRabbitMq(url string, config *tls.Config) *RabbitMqHandle {
|
||
if url == "" {
|
||
return nil
|
||
}
|
||
conn, err := amqp.DialTLS(url, config)
|
||
if err != nil {
|
||
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
|
||
}
|
||
// 创建一个通道
|
||
ch, err := conn.Channel()
|
||
if err != nil {
|
||
log.Fatalf("Failed to open a channel: %v", err)
|
||
}
|
||
//声明队列
|
||
for _, queueName := range constants.MQ_QUEUE_ARR {
|
||
q, err := ch.QueueDeclare(
|
||
string(queueName), // 队列名
|
||
true, // 是否持久化
|
||
false, // 是否自动删除
|
||
false, // 是否排他
|
||
false, // 是否等待服务器响应
|
||
nil, // 其他参数
|
||
)
|
||
if err != nil {
|
||
log.Fatalf("Failed to declare a queue: %v", err)
|
||
}
|
||
mapMq[queueName] = queueItem{
|
||
ch: ch,
|
||
queue: q,
|
||
}
|
||
}
|
||
return &RabbitMqHandle{}
|
||
}
|
||
|
||
// 发送消息
|
||
func (h *RabbitMqHandle) SendMsg(queueName constants.RABBIT_MQ, message []byte) error {
|
||
object, ok := mapMq[queueName]
|
||
if !ok {
|
||
return errors.New("unknown queue")
|
||
}
|
||
// 发送消息到队列
|
||
return object.ch.Publish(
|
||
"", // exchange,如果为空,则使用默认交换机
|
||
object.queue.Name, // routing key,将消息发送到指定队列
|
||
false, // 是否等待服务器响应
|
||
false, // 是否立即发送
|
||
amqp.Publishing{
|
||
ContentType: "text/plain", //普通文本
|
||
Body: message,
|
||
}, // 消息内容
|
||
)
|
||
}
|
||
|
||
// 消费消息
|
||
func (h *RabbitMqHandle) Consume(queueName constants.RABBIT_MQ, handleFunc func(data []byte) error) error {
|
||
object, ok := mapMq[queueName]
|
||
if !ok {
|
||
return errors.New("unknown queue")
|
||
}
|
||
msgs, err := object.ch.Consume(
|
||
object.queue.Name, // 队列名
|
||
object.queue.Name, // 消费者名,如果为空,则是随机生成一个
|
||
false, // 自动应答
|
||
false, // 是否排他
|
||
false, // 是否阻塞
|
||
false, // 是否等待服务器响应
|
||
nil, // 其他参数
|
||
)
|
||
if err != nil {
|
||
log.Fatalf("Failed to register a consumer: %v", err)
|
||
}
|
||
//允许20的并发
|
||
limit := make(chan struct{}, 20)
|
||
defer close(limit)
|
||
// 消费消息
|
||
for msg := range msgs {
|
||
limit <- struct{}{}
|
||
go func(m amqp.Delivery) {
|
||
if err := recover(); err != nil {
|
||
logx.Error(err)
|
||
}
|
||
defer func() {
|
||
<-limit
|
||
}()
|
||
if err = handleFunc(m.Body); err != nil {
|
||
logx.Error("failed to deal with MQ message:", string(m.Body))
|
||
return
|
||
}
|
||
if err = object.ch.Ack(m.DeliveryTag, false); err != nil {
|
||
logx.Error("failed to ack mq to delete")
|
||
} else {
|
||
log.Printf("Consume Mq message success: %s", m.Body)
|
||
}
|
||
}(msg)
|
||
}
|
||
return nil
|
||
}
|