package initalize

import (
	"context"
	"crypto/tls"
	"errors"
	"fusenapi/constants"
	"fusenapi/utils/mq_consumer_factory"
	"github.com/streadway/amqp"
	"github.com/zeromicro/go-zero/core/logx"
	"log"
	"sync"
)

type RabbitMqHandle struct {
}

// 连接属性
type queueItem struct {
	ch    *amqp.Channel
	queue amqp.Queue
}

// 存储连接
var mapMq = make(map[constants.RABBIT_MQ]queueItem)

func InitRabbitMq(url string, config *tls.Config) *RabbitMqHandle {
	if url == "" {
		return nil
	}
	conn, err := amqp.DialTLS(url, config)
	if err != nil {
		log.Fatalf("Failed to connect to RabbitMQ: %v", err)
	}
	// 创建一个通道
	ch, err := conn.Channel()
	if err != nil {
		log.Fatalf("Failed to open a channel: %v", err)
	}
	//声明队列
	for _, queueName := range constants.MqQueueArr {
		q, err := ch.QueueDeclare(
			string(queueName), // 队列名
			true,              // 是否持久化
			false,             // 是否自动删除
			false,             // 是否排他
			false,             // 是否等待服务器响应
			nil,               // 其他参数
		)
		if err != nil {
			log.Fatalf("Failed to declare a queue: %v", err)
		}
		mapMq[queueName] = queueItem{
			ch:    ch,
			queue: q,
		}
	}
	return &RabbitMqHandle{}
}

// 发送消息
func (h *RabbitMqHandle) SendMsg(queueName constants.RABBIT_MQ, message []byte) error {
	object, ok := mapMq[queueName]
	if !ok {
		return errors.New("unknown queue")
	}
	// 发送消息到队列
	return object.ch.Publish(
		"",                // exchange,如果为空,则使用默认交换机
		object.queue.Name, // routing key,将消息发送到指定队列
		false,             // 是否等待服务器响应
		false,             // 是否立即发送
		amqp.Publishing{
			ContentType: "text/plain", //普通文本
			Body:        message,
		}, // 消息内容
	)
}

// 消费消息
func (h *RabbitMqHandle) Consume(ctx context.Context, queueName constants.RABBIT_MQ, handle mq_consumer_factory.MqHandle) {
	object, ok := mapMq[queueName]
	if !ok {
		panic("unknown queue")
	}
	go func() {
		select {
		case <-ctx.Done():
			panic("err ctx deadline")
		}
	}()
	msgs, err := object.ch.Consume(
		object.queue.Name, // 队列名
		object.queue.Name, // 消费者名,如果为空,则是随机生成一个
		false,             // 自动应答
		false,             // 是否排他
		false,             // 是否阻塞
		false,             // 是否等待服务器响应
		nil,               // 其他参数
	)
	if err != nil {
		log.Fatalf("Failed to register a consumer: %v", err)
	}
	//允许20的并发
	limit := make(chan struct{}, 20)
	wait := sync.WaitGroup{}
	defer close(limit)
	// 消费消息
	for msg := range msgs {
		limit <- struct{}{}
		wait.Add(1)
		go func(m amqp.Delivery) {
			if err := recover(); err != nil {
				logx.Error(err)
			}
			defer func() {
				<-limit
				wait.Done()
			}()
			if err = handle.Run(ctx, m.Body); err != nil {
				logx.Error("failed to deal with MQ message:", string(m.Body))
				return
			}
			if err = object.ch.Ack(m.DeliveryTag, false); err != nil {
				logx.Error("failed to ack mq to delete")
			} else {
				log.Printf("Consume Mq message success: %s", m.Body)
			}
		}(msg)
	}
	wait.Wait()
}