import xmltodict from flask.wrappers import Request, json from werkzeug.exceptions import BadRequest from flask.globals import current_app class HSRequest(Request): def get_json(self, force=False, silent=False, cache=True, node_list_str=""): """Parse and return the data as JSON. If the mimetype does not indicate JSON (:mimetype:`application/json`, see :meth:`is_json`), this returns ``None`` unless ``force`` is true. If parsing fails, :meth:`on_json_loading_failed` is called and its return value is used as the return value. :param force: Ignore the mimetype and always try to parse JSON. :param silent: Silence parsing errors and return ``None`` instead. :param cache: Store the parsed JSON to return for subsequent calls. """ content_type = self.environ.get('CONTENT_TYPE', 'application/json').lower() if 'application/json' in content_type: if cache and self._cached_json[silent] is not Ellipsis: return self._cached_json[silent] if not (force or self.is_json): return None data = self._get_data_for_json(cache=cache) try: rv = json.loads(data) except ValueError as e: if silent: rv = None if cache: normal_rv, _ = self._cached_json self._cached_json = (normal_rv, rv) else: rv = self.on_json_loading_failed(e) if cache: _, silent_rv = self._cached_json self._cached_json = (rv, silent_rv) else: if cache: self._cached_json = (rv, rv) elif 'application/xml' in content_type: _data = str(self.data, encoding='utf8') try: rv = xmltodict.parse(_data, attr_prefix="") if node_list_str: # 替换节点 rv = self.replace_dict_to_list_by_node(rv, node_list_str) if len(rv) == 1 and isinstance(list(rv.values())[0], list): rv = list(rv.values())[0] except Exception as e: if silent: rv = None else: rv = self.on_xml_loading_failed(e) else: rv = None return rv def replace_dict_to_list_by_node(self, data_dict, node_list_str: str): ''' 根据输入的节点替换该节点为列表 :param data_dict: :param node_list_str: :return: ''' node_list = node_list_str.split(',') for node in node_list: if isinstance(data_dict, dict): for key, value in data_dict.items(): # 找到了,并且还不是list if key == node and not isinstance(value, list): # 切换成数组 data_dict[key] = [value] self.replace_dict_to_list_by_node(value, node) elif isinstance(data_dict, list): for data_dict_item in data_dict: self.replace_dict_to_list_by_node(data_dict_item, node) else: pass return data_dict def on_xml_loading_failed(self, e): if current_app is not None and current_app.debug: raise BadRequest('Failed to decode XML object: {0}'.format(e)) raise BadRequest()