import json import threading import time import uuid import pika from pika.exceptions import ConnectionClosed, ChannelClosed from huansi_utils.rabbitMQ import ExchangeType from huansi_utils.rabbitMQ.util._logger import logger # class rabbitMQ(object): # ''' # rabbitMQ操作类 # ''' # # def __init__(self, host, port): # ''' # 连接MQ # :param host: # :param port: # :return: # ''' # user_name = 'hs' # password = 'hs' # user_pwd = pika.PlainCredentials(user_name, password) # parameters = pika.ConnectionParameters(host=host, port=port, credentials=user_pwd) # self.connection = pika.BlockingConnection(parameters) # # def send(self, queue, body): # # 创建通道 # channel = self.connection.channel() # # 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行 # channel.queue_declare(queue=queue, durable=True) # # routing_key在使用匿名交换机的时候才需要指定,表示发送到哪个队列 # channel.basic_publish(exchange='', routing_key=queue, body=body, # properties=pika.BasicProperties(delivery_mode=2, )) # # channel.queue_delete(queue=queue_Name) # self.connection.close() # # def receive(self, queue): # # 创建通道 # channel = self.connection.channel() # # 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行 # channel.queue_declare(queue=queue, durable=True) # # def callback(ch, method, properties, body): # print(body, 1) # # 向生产者发送应答 # ch.basic_ack(delivery_tag=method.delivery_tag) # # return body # # # 将从队列里取出的数据回调给callback方法 # channel.basic_consume(callback, queue=queue, no_ack=False) # # 开始取值 # channel.start_consuming() # mq = rabbitMQ(current_app.config["MQ_HOST"], current_app.config["MQ_PORT"]) class RabbitMQ(object): ''' RabbitMQ操作类 ''' def __init__(self, host, port=80, username=None, passward=None, queue=None, heartbeat=120): self.rabbitmq_server_host = host self.rabbitmq_server_port = port self.rabbitmq_server_username = username self.rabbitmq_server_password = passward self.queue = queue self.heartbeat = heartbeat self._connection = None self._channel = None self._rpc_class_list = [] self.data = {} # initialize some operation self.init() def init(self): self.valid_config() self.connect_rabbitmq_server() def valid_config(self): if not self.rabbitmq_server_host: raise Exception("The rabbitMQ application must configure host.") # connect RabbitMQ server def connect_rabbitmq_server(self): if not (self.rabbitmq_server_username and self.rabbitmq_server_password): # connect RabbitMQ server with no authentication self._connection = pika.BlockingConnection() elif (self.rabbitmq_server_username and self.rabbitmq_server_password): # connect RabbitMQ server with authentication credentials = pika.PlainCredentials( self.rabbitmq_server_username, self.rabbitmq_server_password ) self._connection = pika.BlockingConnection( pika.ConnectionParameters( host=self.rabbitmq_server_host, port=self.rabbitmq_server_port, heartbeat=self.heartbeat, credentials=credentials )) else: raise Exception() logger.info("connect RabbitMQ server sucessful") # create channel object self._channel = self._connection.channel() def start_rabbitmq_server(self): ''' 重新开启MQ服务 :return: ''' if self._connection.is_closed: self.connect_rabbitmq_server() else: raise Exception('MQ服务已经处于开启状态') def close_rabbitmq_server(self): ''' 关闭MQ服务 :return: ''' if self._connection.is_open: self._connection.close() else: raise Exception('MQ服务已经处于关闭状态') def queue_declare(self, queue_name='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None): result = self._channel.queue_declare( queue=queue_name, passive=passive, durable=durable, exclusive=exclusive, auto_delete=auto_delete, arguments=arguments ) return result.method.queue def temporary_queue_declare(self): """ declare a temporary queue that named random string and will automatically deleted when we disconnect the consumer :return: the name of temporary queue like amq.gen-4NI42Nw3gJaXuWwMxW4_Vg """ return self.queue_declare(exclusive=True, auto_delete=True) def exchange_bind_to_queue(self, type, exchange_name, routing_key, queue): """ Declare exchange and bind queue to exchange :param type: The type of exchange :param exchange_name: The name of exchange :param routing_key: The key of exchange bind to queue :param queue: queue name """ self._channel.exchange_declare(exchange=exchange_name, exchange_type=type) self._channel.queue_bind(queue=queue, exchange=exchange_name, routing_key=routing_key) def basic_consuming(self, queue_name, callback): self._channel.basic_consume( consumer_callback=callback, queue=queue_name ) def consuming_handle(self): try: self.consuming() except ConnectionClosed: # 某些客户端网络会有波动,这里改为每五分钟重连一次,直到成功为止 while self._connection.is_closed: try: # 重连 self.connect_rabbitmq_server() # 重新注册队列 self.register_queue() try: # 重新消费 self.consuming() except ConnectionClosed: pass except: # 五分钟 time.sleep(300) def consuming(self): # 执行callback的时候,可能需要flask的上下文,这里压入一个上下文 if self.app: with self.app.app_context(): self._channel.start_consuming() else: self._channel.start_consuming() def sendEx(self, body, exchange, key, expiration=None, corr_id=None): ''' 加强版的send,失败了重连频道,奥利给! :param body: :param exchange: :param key: :param corr_id: :return: ''' try: logger.info(f'MQ消息{exchange}----{key}-----{expiration}-----{corr_id}----{body}') self.send(body, exchange=exchange, key=key, expiration=expiration, corr_id=corr_id) except Exception as e: # 很皮,发消息的时候老是频道被回收 if self._connection.is_closed: # 重新连接,频道被回收,连接会断 self.connect_rabbitmq_server() logger.error('连接重启中。。。。。') self.send(body, exchange=exchange, key=key, expiration=expiration, corr_id=corr_id) elif self._channel.is_closed: self._channel = self._connection.channel() logger.error('频道重启中。。。。。') self.send(body, exchange=exchange, key=key, expiration=expiration, corr_id=corr_id) else: import traceback logger.error('MQ发送失败---失败原因:{}'.format(traceback.format_exc())) # 重新连接,频道被回收,连接会断 self.connect_rabbitmq_server() logger.error('连接重启中。。。。。') self.send(body, exchange=exchange, key=key, expiration=expiration, corr_id=corr_id) def send(self, body, exchange, key, expiration=None, corr_id=None): if expiration is not None: expiration = int(expiration) * 1000 # 过期时间单位是毫秒,换算一下 # 过期时间小于0就不设置 if int(expiration) <= 0: expiration = None if not corr_id: self._channel.basic_publish( exchange=exchange, routing_key=key, body=body, properties=pika.BasicProperties( expiration=str(expiration) if expiration else None # 必须是字符串,不然给我报错,霸王条款 ) ) else: self._channel.basic_publish( exchange=exchange, routing_key=key, body=body, properties=pika.BasicProperties( correlation_id=corr_id , expiration=expiration ) ) def send_json(self, body, exchange, key, corr_id=None): data = json.dumps(body) self.send(data, exchange=exchange, key=key, corr_id=corr_id) # Send message to server synchronously(just like Remote Procedure Call) def send_sync(self, body, key=None, timeout=5): if not key: raise Exception("The routing key is not present.") corr_id = str(uuid.uuid4()) # generate correlation id callback_queue = self.temporary_queue_declare() # 得到随机回调队列名 self.data[corr_id] = { 'isAccept': False, 'result': None, 'reply_queue_name': callback_queue } # Client consume reply_queue self._channel.basic_consume(self.on_response, no_ack=True, queue=callback_queue) # send message to queue that server is consuming self._channel.basic_publish( exchange='', routing_key=key, body=body, properties=pika.BasicProperties( reply_to=callback_queue, correlation_id=corr_id, ) ) end = time.time() + timeout while time.time() < end: if self.data[corr_id]['isAccept']: # 判断是否接收到服务端返回的消息 logger.info("Got the RPC server response => {}".format(self.data[corr_id]['result'])) return self.data[corr_id]['result'] else: self._connection.process_data_events() time.sleep(0.3) continue # 超时处理 logger.error("Get the response timeout.") return None def send_json_sync(self, body, key=None): if not key: raise Exception("The routing key is not present.") data = json.dumps(body) return self.send_sync(data, key=key) def delete_queue(self, queue): self._channel.queue_delete(queue=queue) def accept(self, key, result): """ 同步接受确认消息 :param key: correlation_id :param result 服务端返回的消息 """ self.data[key]['isAccept'] = True # 设置为已经接受到服务端返回的消息 self.data[key]['result'] = str(result) self._channel.queue_delete(self.data[key]['reply_queue_name']) # 删除客户端声明的回调队列 def on_response(self, ch, method, props, body): """ 所有的RPC请求回调都会调用该方法,在该方法内修改对应corr_id已经接受消息的isAccept值和返回结果 """ logger.info("on response => {}".format(body)) corr_id = props.correlation_id # 从props得到服务端返回的客户度传入的corr_id值 self.accept(corr_id, body) def register_class(self, rpc_class): if not hasattr(rpc_class, 'declare'): raise AttributeError("The registered class must contains the declare method") self._rpc_class_list.append(rpc_class) def _run(self): # register queues and declare all of exchange and queue self.register_queue() logger.info(" * The RabbitMQ application is consuming") t = threading.Thread(target=self.consuming_handle) t.start() # run the consumer application def register_queue(self): ''' register queues and declare all of exchange and queue :return: ''' for item in self._rpc_class_list: item().declare() for (type, queue_name, exchange_name, routing_key, callback, arguments) in self.queue._rpc_class_list: if type == ExchangeType.DEFAULT: if not queue_name: # If queue name is empty, then declare a temporary queue queue_name = self.temporary_queue_declare() else: self._channel.queue_declare(queue=queue_name, auto_delete=True) self.basic_consuming(queue_name, callback) if type == ExchangeType.FANOUT or type == ExchangeType.DIRECT or type == ExchangeType.TOPIC: if not queue_name: # If queue name is empty, then declare a temporary queue queue_name = self.temporary_queue_declare() else: self._channel.queue_declare(queue=queue_name, auto_delete=True, arguments=arguments) self.exchange_bind_to_queue(type, exchange_name, routing_key, queue_name) # Consume the queue self.basic_consuming(queue_name, callback) def run(self, app=None): self.app = app self._run()