""" 改写了sqlalchemy的logger机制,实现自定义功能 1、优化了sql语句的输出 2、debug_sql模式下,能返回完整的sql """ import logging import sys from flask import g def _add_default_handler(logger): handler = logging.StreamHandler(sys.stdout) # handler.setFormatter(logging.Formatter( # '%(asctime)s %(levelname)s %(name)s %(message)s')) handler.setFormatter(logging.Formatter( '%(message)s')) logger.addHandler(handler) logger.propagate = False class InstanceLogger(object): """A logger adapter (wrapper) for :class:`.Identified` subclasses. This allows multiple instances (e.g. Engine or Pool instances) to share a logger, but have its verbosity controlled on a per-instance basis. The basic functionality is to return a logging level which is based on an instance's echo setting. Default implementation is: 'debug' -> logging.DEBUG True -> logging.INFO False -> Effective level of underlying logger (logging.WARNING by default) None -> same as False """ # Map echo settings to logger levels _echo_map = { None: logging.NOTSET, False: logging.NOTSET, True: logging.INFO, 'debug': logging.DEBUG, } def __init__(self, echo, name): # logger.debug('实例化') self.echo = echo self.logger = logging.getLogger(name) # if echo flag is enabled and no handlers, # add a handler to the list if self._echo_map[echo] <= logging.INFO \ and not self.logger.handlers: _add_default_handler(self.logger) # # Boilerplate convenience methods # def debug(self, msg, *args, **kwargs): """Delegate a debug call to the underlying logger.""" self.log(logging.DEBUG, msg, *args, **kwargs) def info(self, msg, *args, **kwargs): """Delegate an info call to the underlying logger.""" # 改写sqlalchemy的记录日志 # 判断msg是否为%r 是表示为参数 args是值 # 如果是参数,则替换上一个log的sql里面的参数 if getattr(g, 'sql', None) is None: return if msg == '%r': last_sql = str(g.sql.pop()) if isinstance(args[0], dict): params = args[0] else: params = args[0].params # 多条更新语句会以tuple形式传入 if isinstance(params, tuple): for param in params: for key, value in param.items(): g.sql.append(last_sql.replace('%({})s'.format(key), "'{}'".format(str(value)))) else: for key, value in params.items(): last_sql = last_sql.replace('%({})s'.format(key), "'{}'".format(str(value))) g.sql.append(last_sql) else: g.sql.append(msg) # 只有确定打印日志,才会显示日志 from flask import current_app if current_app.config.get('GLOBAL_SQL_ECHO', False): self.log(logging.INFO, msg, *args, **kwargs) def warning(self, msg, *args, **kwargs): """Delegate a warning call to the underlying logger.""" self.log(logging.WARNING, msg, *args, **kwargs) warn = warning def error(self, msg, *args, **kwargs): """ Delegate an error call to the underlying logger. """ self.log(logging.ERROR, msg, *args, **kwargs) def exception(self, msg, *args, **kwargs): """Delegate an exception call to the underlying logger.""" kwargs["exc_info"] = 1 self.log(logging.ERROR, msg, *args, **kwargs) def critical(self, msg, *args, **kwargs): """Delegate a critical call to the underlying logger.""" self.log(logging.CRITICAL, msg, *args, **kwargs) def log(self, level, msg, *args, **kwargs): """Delegate a log call to the underlying logger. The level here is determined by the echo flag as well as that of the underlying logger, and logger._log() is called directly. """ # inline the logic from isEnabledFor(), # getEffectiveLevel(), to avoid overhead. if self.logger.manager.disable >= level: return selected_level = self._echo_map[self.echo] if selected_level == logging.NOTSET: selected_level = self.logger.getEffectiveLevel() if level >= selected_level: self.logger._log(level, msg, args, **kwargs) def isEnabledFor(self, level): """Is this logger enabled for level 'level'?""" if self.logger.manager.disable >= level: return False return level >= self.getEffectiveLevel() def getEffectiveLevel(self): """What's the effective level for this logger?""" level = self._echo_map[self.echo] if level == logging.NOTSET: level = self.logger.getEffectiveLevel() return level