rabbitMQ.py 14.2 KB
Newer Older
金凯强's avatar
金凯强 committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
import json
import threading
import time
import uuid

import pika
from pika.exceptions import ConnectionClosed, ChannelClosed

from huansi_utils.rabbitMQ import ExchangeType
from huansi_utils.rabbitMQ.util._logger import logger


# class rabbitMQ(object):
#     '''
#     rabbitMQ操作类
#     '''
#
#     def __init__(self, host, port):
#         '''
#         连接MQ
#         :param host:
#         :param port:
#         :return:
#         '''
#         user_name = 'hs'
#         password = 'hs'
#         user_pwd = pika.PlainCredentials(user_name, password)
#         parameters = pika.ConnectionParameters(host=host, port=port, credentials=user_pwd)
#         self.connection = pika.BlockingConnection(parameters)
#
#     def send(self, queue, body):
#         # 创建通道
#         channel = self.connection.channel()
#         # 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
#         channel.queue_declare(queue=queue, durable=True)
#         # routing_key在使用匿名交换机的时候才需要指定,表示发送到哪个队列
#         channel.basic_publish(exchange='', routing_key=queue, body=body,
#                               properties=pika.BasicProperties(delivery_mode=2, ))
#         # channel.queue_delete(queue=queue_Name)
#         self.connection.close()
#
#     def receive(self, queue):
#         # 创建通道
#         channel = self.connection.channel()
#         # 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
#         channel.queue_declare(queue=queue, durable=True)
#
#         def callback(ch, method, properties, body):
#             print(body, 1)
#             # 向生产者发送应答
#             ch.basic_ack(delivery_tag=method.delivery_tag)
#
#             return body
#
#         # 将从队列里取出的数据回调给callback方法
#         channel.basic_consume(callback, queue=queue, no_ack=False)
#         # 开始取值
#         channel.start_consuming()


# mq = rabbitMQ(current_app.config["MQ_HOST"], current_app.config["MQ_PORT"])


class RabbitMQ(object):
    '''
    RabbitMQ操作类
    '''

    def __init__(self, host, port=80, username=None, passward=None, queue=None, heartbeat=120):
        self.rabbitmq_server_host = host
        self.rabbitmq_server_port = port
        self.rabbitmq_server_username = username
        self.rabbitmq_server_password = passward
        self.queue = queue
        self.heartbeat = heartbeat
        self._connection = None
        self._channel = None
        self._rpc_class_list = []
        self.data = {}
        # initialize some operation
        self.init()

    def init(self):
        self.valid_config()
        self.connect_rabbitmq_server()

    def valid_config(self):
        if not self.rabbitmq_server_host:
            raise Exception("The rabbitMQ application must configure host.")

    # connect RabbitMQ server
    def connect_rabbitmq_server(self):
        if not (self.rabbitmq_server_username and self.rabbitmq_server_password):
            # connect RabbitMQ server with no authentication
            self._connection = pika.BlockingConnection()
        elif (self.rabbitmq_server_username and self.rabbitmq_server_password):
            # connect RabbitMQ server with authentication
            credentials = pika.PlainCredentials(
                self.rabbitmq_server_username,
                self.rabbitmq_server_password
            )
            self._connection = pika.BlockingConnection(
                pika.ConnectionParameters(
                    host=self.rabbitmq_server_host,
                    port=self.rabbitmq_server_port,
                    heartbeat=self.heartbeat,
                    credentials=credentials
                ))
        else:
            raise Exception()
        logger.info("connect RabbitMQ server sucessful")
        # create channel object
        self._channel = self._connection.channel()

    def start_rabbitmq_server(self):
        '''
        重新开启MQ服务
        :return:
        '''
        if self._connection.is_closed:
            self.connect_rabbitmq_server()
        else:
            raise Exception('MQ服务已经处于开启状态')

    def close_rabbitmq_server(self):
        '''
        关闭MQ服务
        :return:
        '''
        if self._connection.is_open:
            self._connection.close()
        else:
            raise Exception('MQ服务已经处于关闭状态')

    def queue_declare(self, queue_name='', passive=False, durable=False,
                      exclusive=False, auto_delete=False, arguments=None):
        result = self._channel.queue_declare(
            queue=queue_name,
            passive=passive,
            durable=durable,
            exclusive=exclusive,
            auto_delete=auto_delete,
            arguments=arguments
        )
        return result.method.queue

    def temporary_queue_declare(self):
        """
        declare a temporary queue that named random string
        and will automatically deleted when we disconnect the consumer
        :return: the name of temporary queue like amq.gen-4NI42Nw3gJaXuWwMxW4_Vg
        """
        return self.queue_declare(exclusive=True,
                                  auto_delete=True)

    def exchange_bind_to_queue(self, type, exchange_name, routing_key, queue):
        """
        Declare exchange and bind queue to exchange
        :param type: The type of exchange
        :param exchange_name: The name of exchange
        :param routing_key: The key of exchange bind to queue
        :param queue: queue name
        """
        self._channel.exchange_declare(exchange=exchange_name,
                                       exchange_type=type)
        self._channel.queue_bind(queue=queue,
                                 exchange=exchange_name,
                                 routing_key=routing_key)

    def basic_consuming(self, queue_name, callback):
        self._channel.basic_consume(
            consumer_callback=callback,
            queue=queue_name
        )

    def consuming_handle(self):
        try:
            self.consuming()
        except ConnectionClosed:
            # 某些客户端网络会有波动,这里改为每五分钟重连一次,直到成功为止
            while self._connection.is_closed:
                try:
                    # 重连
                    self.connect_rabbitmq_server()
                    # 重新注册队列
                    self.register_queue()
                    try:
                        # 重新消费
                        self.consuming()
                    except ConnectionClosed:
                        pass
                except:
                    # 五分钟
                    time.sleep(300)

    def consuming(self):
        # 执行callback的时候,可能需要flask的上下文,这里压入一个上下文
        if self.app:
            with self.app.app_context():
                self._channel.start_consuming()
        else:
            self._channel.start_consuming()

    def sendEx(self, body, exchange, key, expiration=None, corr_id=None):
        '''
        加强版的send,失败了重连频道,奥利给!
        :param body:
        :param exchange:
        :param key:
        :param corr_id:
        :return:
        '''
        try:
            logger.info(f'MQ消息{exchange}----{key}-----{expiration}-----{corr_id}----{body}')
            self.send(body, exchange=exchange, key=key, expiration=expiration, corr_id=corr_id)
        except Exception as e:
            # 很皮,发消息的时候老是频道被回收
            if self._connection.is_closed:
                # 重新连接,频道被回收,连接会断
                self.connect_rabbitmq_server()
                logger.error('连接重启中。。。。。')
                self.send(body, exchange=exchange, key=key, expiration=expiration, corr_id=corr_id)
            elif self._channel.is_closed:
                self._channel = self._connection.channel()
                logger.error('频道重启中。。。。。')
                self.send(body, exchange=exchange, key=key, expiration=expiration, corr_id=corr_id)
            else:
                import traceback
                logger.error('MQ发送失败---失败原因:{}'.format(traceback.format_exc()))
                # 重新连接,频道被回收,连接会断
                self.connect_rabbitmq_server()
                logger.error('连接重启中。。。。。')
                self.send(body, exchange=exchange, key=key, expiration=expiration, corr_id=corr_id)

    def send(self, body, exchange, key, expiration=None, corr_id=None):
        if expiration is not None:
            expiration = int(expiration) * 1000  # 过期时间单位是毫秒,换算一下

            # 过期时间小于0就不设置
            if int(expiration) <= 0:
                expiration = None

        if not corr_id:
            self._channel.basic_publish(
                exchange=exchange,
                routing_key=key,
                body=body,
                properties=pika.BasicProperties(
                    expiration=str(expiration) if expiration else None  # 必须是字符串,不然给我报错,霸王条款
                )
            )
        else:
            self._channel.basic_publish(
                exchange=exchange,
                routing_key=key,
                body=body,
                properties=pika.BasicProperties(
                    correlation_id=corr_id
                    , expiration=expiration
                )
            )

    def send_json(self, body, exchange, key, corr_id=None):
        data = json.dumps(body)
        self.send(data, exchange=exchange, key=key, corr_id=corr_id)

        # Send message to server synchronously(just like Remote Procedure Call)

    def send_sync(self, body, key=None, timeout=5):
        if not key:
            raise Exception("The routing key is not present.")
        corr_id = str(uuid.uuid4())  # generate correlation id
        callback_queue = self.temporary_queue_declare()  # 得到随机回调队列名
        self.data[corr_id] = {
            'isAccept': False,
            'result': None,
            'reply_queue_name': callback_queue
        }
        # Client consume reply_queue
        self._channel.basic_consume(self.on_response,
                                    no_ack=True,
                                    queue=callback_queue)
        # send message to queue that server is consuming
        self._channel.basic_publish(
            exchange='',
            routing_key=key,
            body=body,
            properties=pika.BasicProperties(
                reply_to=callback_queue,
                correlation_id=corr_id,
            )
        )

        end = time.time() + timeout
        while time.time() < end:
            if self.data[corr_id]['isAccept']:  # 判断是否接收到服务端返回的消息
                logger.info("Got the RPC server response => {}".format(self.data[corr_id]['result']))
                return self.data[corr_id]['result']
            else:
                self._connection.process_data_events()
                time.sleep(0.3)
                continue
        # 超时处理
        logger.error("Get the response timeout.")
        return None

    def send_json_sync(self, body, key=None):
        if not key:
            raise Exception("The routing key is not present.")
        data = json.dumps(body)
        return self.send_sync(data, key=key)

    def delete_queue(self, queue):
        self._channel.queue_delete(queue=queue)

    def accept(self, key, result):
        """
        同步接受确认消息
        :param key: correlation_id
        :param result 服务端返回的消息
        """
        self.data[key]['isAccept'] = True  # 设置为已经接受到服务端返回的消息
        self.data[key]['result'] = str(result)
        self._channel.queue_delete(self.data[key]['reply_queue_name'])  # 删除客户端声明的回调队列

    def on_response(self, ch, method, props, body):
        """
        所有的RPC请求回调都会调用该方法,在该方法内修改对应corr_id已经接受消息的isAccept值和返回结果
        """
        logger.info("on response => {}".format(body))

        corr_id = props.correlation_id  # 从props得到服务端返回的客户度传入的corr_id值
        self.accept(corr_id, body)

    def register_class(self, rpc_class):
        if not hasattr(rpc_class, 'declare'):
            raise AttributeError("The registered class must contains the declare method")
        self._rpc_class_list.append(rpc_class)

    def _run(self):
        # register queues and declare all of exchange and queue
        self.register_queue()

        logger.info(" * The RabbitMQ application is consuming")
        t = threading.Thread(target=self.consuming_handle)
        t.start()

        # run the consumer application

    def register_queue(self):
        '''
        register queues and declare all of exchange and queue
        :return:
        '''
        for item in self._rpc_class_list:
            item().declare()
        for (type, queue_name, exchange_name, routing_key, callback, arguments) in self.queue._rpc_class_list:
            if type == ExchangeType.DEFAULT:
                if not queue_name:
                    # If queue name is empty, then declare a temporary queue
                    queue_name = self.temporary_queue_declare()
                else:
                    self._channel.queue_declare(queue=queue_name, auto_delete=True)
                    self.basic_consuming(queue_name, callback)

            if type == ExchangeType.FANOUT or type == ExchangeType.DIRECT or type == ExchangeType.TOPIC:
                if not queue_name:
                    # If queue name is empty, then declare a temporary queue
                    queue_name = self.temporary_queue_declare()
                else:
                    self._channel.queue_declare(queue=queue_name, auto_delete=True, arguments=arguments)
                self.exchange_bind_to_queue(type, exchange_name, routing_key, queue_name)
                # Consume the queue
                self.basic_consuming(queue_name, callback)

    def run(self, app=None):
        self.app = app
        self._run()