fix
This commit is contained in:
@@ -2,22 +2,28 @@ package initalize
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fusenapi/constants"
|
||||
"github.com/streadway/amqp"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"log"
|
||||
)
|
||||
|
||||
// handle
|
||||
type QueueItem struct {
|
||||
Ch *amqp.Channel
|
||||
Queue amqp.Queue
|
||||
type RabbitMqHandle struct {
|
||||
}
|
||||
|
||||
var mapMq = make(map[string]*QueueItem)
|
||||
// 连接属性
|
||||
type queueItem struct {
|
||||
ch *amqp.Channel
|
||||
queue amqp.Queue
|
||||
}
|
||||
|
||||
func InitRabbitMq(url string, config *tls.Config) {
|
||||
// 存储连接
|
||||
var mapMq = make(map[constants.RABBIT_MQ]queueItem)
|
||||
|
||||
func InitRabbitMq(url string, config *tls.Config) *RabbitMqHandle {
|
||||
if url == "" {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
conn, err := amqp.DialTLS(url, config)
|
||||
if err != nil {
|
||||
@@ -39,13 +45,76 @@ func InitRabbitMq(url string, config *tls.Config) {
|
||||
nil, // 其他参数
|
||||
)
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
ch.Close()
|
||||
log.Fatalf("Failed to declare a queue: %v", err)
|
||||
}
|
||||
mapMq[string(queueName)] = &QueueItem{
|
||||
Ch: ch,
|
||||
Queue: q,
|
||||
mapMq[queueName] = queueItem{
|
||||
ch: ch,
|
||||
queue: q,
|
||||
}
|
||||
}
|
||||
return &RabbitMqHandle{}
|
||||
}
|
||||
|
||||
// 发送消息
|
||||
func (h *RabbitMqHandle) SendMsg(queueName constants.RABBIT_MQ, message []byte) error {
|
||||
object, ok := mapMq[queueName]
|
||||
if !ok {
|
||||
return errors.New("unknown queue")
|
||||
}
|
||||
// 发送消息到队列
|
||||
return object.ch.Publish(
|
||||
"", // exchange,如果为空,则使用默认交换机
|
||||
object.queue.Name, // routing key,将消息发送到指定队列
|
||||
false, // 是否等待服务器响应
|
||||
false, // 是否立即发送
|
||||
amqp.Publishing{
|
||||
ContentType: "text/plain", //普通文本
|
||||
Body: message,
|
||||
}, // 消息内容
|
||||
)
|
||||
}
|
||||
|
||||
// 消费消息
|
||||
func (h *RabbitMqHandle) Consume(queueName constants.RABBIT_MQ, handleFunc func(data []byte) error) error {
|
||||
object, ok := mapMq[queueName]
|
||||
if !ok {
|
||||
return errors.New("unknown queue")
|
||||
}
|
||||
msgs, err := object.ch.Consume(
|
||||
object.queue.Name, // 队列名
|
||||
"", // 消费者名,如果为空,则是随机生成一个
|
||||
false, // 自动应答
|
||||
false, // 是否排他
|
||||
false, // 是否阻塞
|
||||
false, // 是否等待服务器响应
|
||||
nil, // 其他参数
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to register a consumer: %v", err)
|
||||
}
|
||||
//允许20的并发
|
||||
limit := make(chan struct{}, 20)
|
||||
defer close(limit)
|
||||
// 消费消息
|
||||
for msg := range msgs {
|
||||
limit <- struct{}{}
|
||||
go func(m amqp.Delivery) {
|
||||
if err := recover(); err != nil {
|
||||
logx.Error(err)
|
||||
}
|
||||
defer func() {
|
||||
<-limit
|
||||
}()
|
||||
if err = handleFunc(m.Body); err != nil {
|
||||
logx.Error("failed to deal with MQ message:", string(m.Body))
|
||||
return
|
||||
}
|
||||
if err = object.ch.Ack(m.DeliveryTag, false); err != nil {
|
||||
logx.Error("failed to ack mq to delete")
|
||||
} else {
|
||||
log.Printf("Consume Mq message success: %s", m.Body)
|
||||
}
|
||||
}(msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user