import datetime import decimal import json from flask import jsonify from flask_sqlalchemy import DefaultMeta from sqlalchemy.engine.result import RowProxy from huansi_utils.db.db import get_model_column_names from huansi_utils.db.sqlalchemy import db from huansi_utils.exception.exception import HSException def format_data(data): ''' 格式化数据 :param data: 字段值 :return: 格式化后的数据 ''' if isinstance(data, bool): return data elif isinstance(data, datetime.datetime): data = data.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # SQLserver数据库中毫秒是3位,日期格式;2015-05-12 11:13:58.543 elif isinstance(data, datetime.date): data = data.strftime("%Y-%m-%d") elif isinstance(data, decimal.Decimal): data = float(data) # elif isinstance(data, int) and len(str(data)) == 19: elif isinstance(data, int): data = str(data) elif isinstance(data, bytes): data = str(data) return data def format_json_data(data): for key in data: if isinstance(key, int): value = data.pop(key) data[str(key)] = format_data(value) else: data[key] = format_data(data[key]) return data def model_to_json(model, many=True, _jsonify=False): ''' 将model转成json :param model: 实体对象 :param many: 是否多行实体 :param _jsonify: 是否json序列化 :return: Json数据包 ''' if many: if not model: return [] item_list = [] for item in model: data = model_to_json(item, many=False, _jsonify=False) item_list.append(data) return jsonify(item_list) if _jsonify else item_list elif isinstance(model, dict): return format_json_data(model) elif isinstance(model, RowProxy): RowProxy_data = dict(model) for key, value in RowProxy_data.items(): RowProxy_data[key] = format_data(RowProxy_data[key]) return RowProxy_data elif isinstance(model, tuple): fields = {} for column in model.keys(): data = getattr(model, column) if isinstance(data, db.Model): obj = data for column in get_model_column_names(obj): data = getattr(obj, column) fields[column] = format_data(data) else: fields[column] = format_data(data) return jsonify(fields) if _jsonify else fields elif isinstance(model, list): if not model: return [] try: return format_json_data(dict(model)) except: return model elif type(model).__class__ is DefaultMeta: if not model: return {} fields = {} for column in get_model_column_names(model): # 兼容之前的表结构id if column == 'iIden' or column == 'uGUID': column = "id" if column == 'iHdrId': column = 'bill_id' data = getattr(model, column) fields[column] = format_data(data) return jsonify(fields) if _jsonify else fields elif hasattr(model, "__dict__"): # temp_data = model.__dict__ # for key in list(temp_data.keys()): # if key[:1] == '_': # del temp_data[key] return format_json_data(model.__dict__) else: return format_data(model) def sql_data_to_json(data, column_list): result = [] for line in data: new_line = format_json_data(dict(zip(column_list, line))) result.append(new_line) return result def json_to_dict(json_data): return json.loads(s=json_data) def json_to_object(json_data): # json_data = '{"id": "007", "name": "007", "age": 28}' dict = json.loads(s=json_data) obj = object() obj.__dict__ = dict return obj def dict_to_json(dict): return json.dumps(dict) def object_to_json(obj): dict = obj.__dict__ # 将对象转成dict字典 return json.dumps(obj=dict) def dict_to_object(dict): obj = object() print('dict:', dict) for key, value in dict.items(): setattr(obj, key, value) print('new attr:', key, value) return obj def group_by(input_list: list, group_by_field: str, is_with_child: bool = False, is_remove_group_by_field: bool = True) -> list: ''' 按特定的字段分组 :param input_list: 需要分组的list :param group_by_field: 'id,name,class' :param is_with_child: 是否带上明细数据 属性名child :param is_remove_group_by_field: 是否排除child中排序的字段 :return: ''' field_list = group_by_field.split(',') _output_dict = {} for input in input_list: key = '|||'.join([str(input[item]) for item in field_list]) _input = {} for field in field_list: try: _input[field] = input[field] if is_remove_group_by_field: del input[field] except KeyError: raise HSException('{}找不到'.format(field)) item = _output_dict.get(key, None) if not item: _output_dict[key] = _input item2 = _output_dict.get(key, None) if is_with_child: if item2.get('child', None) is None: item2['child'] = [] item2['child'].append(input) return [value for value in _output_dict.values()] def merge_dict_by_link(dicts: list, child_dicts: list, link: str, attribute: str = 'item_data') -> list: ''' 合并字典,通过link关系 连接两个字典,属性设置为attribute :param dicts:被插入的字典 :param child_dicts:插入的字典 :param link:关系key 'id=bill_id' :param attribute: 属性,匹配成功后再dicts中新增的属性。 默认为item_data :return: ''' o_link = link.split('=')[0] i_link = link.split('=')[1] for dict in dicts: dict[attribute] = [] for insert_dict in child_dicts: if insert_dict[i_link] == dict[o_link]: dict[attribute].append(insert_dict) return dicts def merge_dict_by_link2(dicts: list, child_dicts: list, link: str, attribute: str = 'item_data') -> list: ''' 合并字典,通过link关系 连接两个字典,属性设置为attribute :param dicts:被插入的字典 :param child_dicts:插入的字典 :param link:关系key 'id=bill_id' :param attribute: 属性,匹配成功后再dicts中新增的属性。 默认为item_data :return: ''' o_link = link.split('=')[0] i_link = link.split('=')[1] l_link = link.split('=')[2] for dict in dicts: dict[attribute] = [] for insert_dict in child_dicts: if insert_dict[i_link] == dict[o_link] and insert_dict[l_link] == dict[l_link]: dict[attribute].append(insert_dict) return dicts def dict_to_tree_by_link(dicts: list, link: str = 'id=parent_id', attribute: str = 'childs', parent_id: int = 0) -> list: ''' 通过字典中的映射关系自动转换为树形结构 :param dicts: 需要转换的字典 :param link: 关系字段 :param attribute: 关联之后的属性 :param parent_id: 父级id :return: ''' self = link.split('=')[0] parent = link.split('=')[1] temp_dicts = [] for dict in dicts: if str(dict[parent]) == str(parent_id): temp_dicts.append(dict) if not temp_dicts: return results = [] for temp_dict in temp_dicts: temp_dict[attribute] = dict_to_tree_by_link(dicts=dicts, link=link, attribute=attribute, parent_id=temp_dict[self]) results.append(temp_dict) return results