import json
import threading
import time
import uuid

import pika
from pika.exceptions import ConnectionClosed, ChannelClosed

from huansi_utils.rabbitMQ import ExchangeType
from huansi_utils.rabbitMQ.util._logger import logger


# class rabbitMQ(object):
#     '''
#     rabbitMQ操作类
#     '''
#
#     def __init__(self, host, port):
#         '''
#         连接MQ
#         :param host:
#         :param port:
#         :return:
#         '''
#         user_name = 'hs'
#         password = 'hs'
#         user_pwd = pika.PlainCredentials(user_name, password)
#         parameters = pika.ConnectionParameters(host=host, port=port, credentials=user_pwd)
#         self.connection = pika.BlockingConnection(parameters)
#
#     def send(self, queue, body):
#         # 创建通道
#         channel = self.connection.channel()
#         # 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
#         channel.queue_declare(queue=queue, durable=True)
#         # routing_key在使用匿名交换机的时候才需要指定,表示发送到哪个队列
#         channel.basic_publish(exchange='', routing_key=queue, body=body,
#                               properties=pika.BasicProperties(delivery_mode=2, ))
#         # channel.queue_delete(queue=queue_Name)
#         self.connection.close()
#
#     def receive(self, queue):
#         # 创建通道
#         channel = self.connection.channel()
#         # 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
#         channel.queue_declare(queue=queue, durable=True)
#
#         def callback(ch, method, properties, body):
#             print(body, 1)
#             # 向生产者发送应答
#             ch.basic_ack(delivery_tag=method.delivery_tag)
#
#             return body
#
#         # 将从队列里取出的数据回调给callback方法
#         channel.basic_consume(callback, queue=queue, no_ack=False)
#         # 开始取值
#         channel.start_consuming()


# mq = rabbitMQ(current_app.config["MQ_HOST"], current_app.config["MQ_PORT"])


class RabbitMQ(object):
    '''
    RabbitMQ操作类
    '''

    def __init__(self, host, port=80, username=None, passward=None, queue=None, heartbeat=120):
        self.rabbitmq_server_host = host
        self.rabbitmq_server_port = port
        self.rabbitmq_server_username = username
        self.rabbitmq_server_password = passward
        self.queue = queue
        self.heartbeat = heartbeat
        self._connection = None
        self._channel = None
        self._rpc_class_list = []
        self.data = {}
        # initialize some operation
        self.init()

    def init(self):
        self.valid_config()
        self.connect_rabbitmq_server()

    def valid_config(self):
        if not self.rabbitmq_server_host:
            raise Exception("The rabbitMQ application must configure host.")

    # connect RabbitMQ server
    def connect_rabbitmq_server(self):
        if not (self.rabbitmq_server_username and self.rabbitmq_server_password):
            # connect RabbitMQ server with no authentication
            self._connection = pika.BlockingConnection()
        elif (self.rabbitmq_server_username and self.rabbitmq_server_password):
            # connect RabbitMQ server with authentication
            credentials = pika.PlainCredentials(
                self.rabbitmq_server_username,
                self.rabbitmq_server_password
            )
            self._connection = pika.BlockingConnection(
                pika.ConnectionParameters(
                    host=self.rabbitmq_server_host,
                    port=self.rabbitmq_server_port,
                    heartbeat=self.heartbeat,
                    credentials=credentials
                ))
        else:
            raise Exception()
        logger.info("connect RabbitMQ server sucessful")
        # create channel object
        self._channel = self._connection.channel()

    def start_rabbitmq_server(self):
        '''
        重新开启MQ服务
        :return:
        '''
        if self._connection.is_closed:
            self.connect_rabbitmq_server()
        else:
            raise Exception('MQ服务已经处于开启状态')

    def close_rabbitmq_server(self):
        '''
        关闭MQ服务
        :return:
        '''
        if self._connection.is_open:
            self._connection.close()
        else:
            raise Exception('MQ服务已经处于关闭状态')

    def queue_declare(self, queue_name='', passive=False, durable=False,
                      exclusive=False, auto_delete=False, arguments=None):
        result = self._channel.queue_declare(
            queue=queue_name,
            passive=passive,
            durable=durable,
            exclusive=exclusive,
            auto_delete=auto_delete,
            arguments=arguments
        )
        return result.method.queue

    def temporary_queue_declare(self):
        """
        declare a temporary queue that named random string
        and will automatically deleted when we disconnect the consumer
        :return: the name of temporary queue like amq.gen-4NI42Nw3gJaXuWwMxW4_Vg
        """
        return self.queue_declare(exclusive=True,
                                  auto_delete=True)

    def exchange_bind_to_queue(self, type, exchange_name, routing_key, queue):
        """
        Declare exchange and bind queue to exchange
        :param type: The type of exchange
        :param exchange_name: The name of exchange
        :param routing_key: The key of exchange bind to queue
        :param queue: queue name
        """
        self._channel.exchange_declare(exchange=exchange_name,
                                       exchange_type=type)
        self._channel.queue_bind(queue=queue,
                                 exchange=exchange_name,
                                 routing_key=routing_key)

    def basic_consuming(self, queue_name, callback):
        self._channel.basic_consume(
            consumer_callback=callback,
            queue=queue_name
        )

    def consuming_handle(self):
        try:
            self.consuming()
        except ConnectionClosed:
            # 某些客户端网络会有波动,这里改为每五分钟重连一次,直到成功为止
            while self._connection.is_closed:
                try:
                    # 重连
                    self.connect_rabbitmq_server()
                    # 重新注册队列
                    self.register_queue()
                    try:
                        # 重新消费
                        self.consuming()
                    except ConnectionClosed:
                        pass
                except:
                    # 五分钟
                    time.sleep(300)

    def consuming(self):
        # 执行callback的时候,可能需要flask的上下文,这里压入一个上下文
        if self.app:
            with self.app.app_context():
                self._channel.start_consuming()
        else:
            self._channel.start_consuming()

    def sendEx(self, body, exchange, key, expiration=None, corr_id=None):
        '''
        加强版的send,失败了重连频道,奥利给!
        :param body:
        :param exchange:
        :param key:
        :param corr_id:
        :return:
        '''
        try:
            logger.info(f'MQ消息{exchange}----{key}-----{expiration}-----{corr_id}----{body}')
            self.send(body, exchange=exchange, key=key, expiration=expiration, corr_id=corr_id)
        except Exception as e:
            # 很皮,发消息的时候老是频道被回收
            if self._connection.is_closed:
                # 重新连接,频道被回收,连接会断
                self.connect_rabbitmq_server()
                logger.error('连接重启中。。。。。')
                self.send(body, exchange=exchange, key=key, expiration=expiration, corr_id=corr_id)
            elif self._channel.is_closed:
                self._channel = self._connection.channel()
                logger.error('频道重启中。。。。。')
                self.send(body, exchange=exchange, key=key, expiration=expiration, corr_id=corr_id)
            else:
                import traceback
                logger.error('MQ发送失败---失败原因:{}'.format(traceback.format_exc()))
                # 重新连接,频道被回收,连接会断
                self.connect_rabbitmq_server()
                logger.error('连接重启中。。。。。')
                self.send(body, exchange=exchange, key=key, expiration=expiration, corr_id=corr_id)

    def send(self, body, exchange, key, expiration=None, corr_id=None):
        if expiration is not None:
            expiration = int(expiration) * 1000  # 过期时间单位是毫秒,换算一下

            # 过期时间小于0就不设置
            if int(expiration) <= 0:
                expiration = None

        if not corr_id:
            self._channel.basic_publish(
                exchange=exchange,
                routing_key=key,
                body=body,
                properties=pika.BasicProperties(
                    expiration=str(expiration) if expiration else None  # 必须是字符串,不然给我报错,霸王条款
                )
            )
        else:
            self._channel.basic_publish(
                exchange=exchange,
                routing_key=key,
                body=body,
                properties=pika.BasicProperties(
                    correlation_id=corr_id
                    , expiration=expiration
                )
            )

    def send_json(self, body, exchange, key, corr_id=None):
        data = json.dumps(body)
        self.send(data, exchange=exchange, key=key, corr_id=corr_id)

        # Send message to server synchronously(just like Remote Procedure Call)

    def send_sync(self, body, key=None, timeout=5):
        if not key:
            raise Exception("The routing key is not present.")
        corr_id = str(uuid.uuid4())  # generate correlation id
        callback_queue = self.temporary_queue_declare()  # 得到随机回调队列名
        self.data[corr_id] = {
            'isAccept': False,
            'result': None,
            'reply_queue_name': callback_queue
        }
        # Client consume reply_queue
        self._channel.basic_consume(self.on_response,
                                    no_ack=True,
                                    queue=callback_queue)
        # send message to queue that server is consuming
        self._channel.basic_publish(
            exchange='',
            routing_key=key,
            body=body,
            properties=pika.BasicProperties(
                reply_to=callback_queue,
                correlation_id=corr_id,
            )
        )

        end = time.time() + timeout
        while time.time() < end:
            if self.data[corr_id]['isAccept']:  # 判断是否接收到服务端返回的消息
                logger.info("Got the RPC server response => {}".format(self.data[corr_id]['result']))
                return self.data[corr_id]['result']
            else:
                self._connection.process_data_events()
                time.sleep(0.3)
                continue
        # 超时处理
        logger.error("Get the response timeout.")
        return None

    def send_json_sync(self, body, key=None):
        if not key:
            raise Exception("The routing key is not present.")
        data = json.dumps(body)
        return self.send_sync(data, key=key)

    def delete_queue(self, queue):
        self._channel.queue_delete(queue=queue)

    def accept(self, key, result):
        """
        同步接受确认消息
        :param key: correlation_id
        :param result 服务端返回的消息
        """
        self.data[key]['isAccept'] = True  # 设置为已经接受到服务端返回的消息
        self.data[key]['result'] = str(result)
        self._channel.queue_delete(self.data[key]['reply_queue_name'])  # 删除客户端声明的回调队列

    def on_response(self, ch, method, props, body):
        """
        所有的RPC请求回调都会调用该方法,在该方法内修改对应corr_id已经接受消息的isAccept值和返回结果
        """
        logger.info("on response => {}".format(body))

        corr_id = props.correlation_id  # 从props得到服务端返回的客户度传入的corr_id值
        self.accept(corr_id, body)

    def register_class(self, rpc_class):
        if not hasattr(rpc_class, 'declare'):
            raise AttributeError("The registered class must contains the declare method")
        self._rpc_class_list.append(rpc_class)

    def _run(self):
        # register queues and declare all of exchange and queue
        self.register_queue()

        logger.info(" * The RabbitMQ application is consuming")
        t = threading.Thread(target=self.consuming_handle)
        t.start()

        # run the consumer application

    def register_queue(self):
        '''
        register queues and declare all of exchange and queue
        :return:
        '''
        for item in self._rpc_class_list:
            item().declare()
        for (type, queue_name, exchange_name, routing_key, callback, arguments) in self.queue._rpc_class_list:
            if type == ExchangeType.DEFAULT:
                if not queue_name:
                    # If queue name is empty, then declare a temporary queue
                    queue_name = self.temporary_queue_declare()
                else:
                    self._channel.queue_declare(queue=queue_name, auto_delete=True)
                    self.basic_consuming(queue_name, callback)

            if type == ExchangeType.FANOUT or type == ExchangeType.DIRECT or type == ExchangeType.TOPIC:
                if not queue_name:
                    # If queue name is empty, then declare a temporary queue
                    queue_name = self.temporary_queue_declare()
                else:
                    self._channel.queue_declare(queue=queue_name, auto_delete=True, arguments=arguments)
                self.exchange_bind_to_queue(type, exchange_name, routing_key, queue_name)
                # Consume the queue
                self.basic_consuming(queue_name, callback)

    def run(self, app=None):
        self.app = app
        self._run()