from collections import Sequence from datetime import datetime from inspect import isfunction from uuid import UUID from flask import g from flask_sqlalchemy import DefaultMeta from marshmallow import Schema, fields, post_load from marshmallow.marshalling import Unmarshaller from marshmallow.utils import _Missing from huansi_utils.enum.enum import HSSchemaDataModel from huansi_utils.exception.exception import HSException class schema_data(object): ''' schema_Data ''' def __init__(self, schema, type=None, is_strat_schema_opts=True, valid_data=None): ''' 初始化 :param schema: schema :param type: 新增模式还是更新模式,根据模式来筛选新增下需要排除和新增的字段或者更新下需要排除和更新的字段。默认取全字段 :param is_strat_schema_opts: 是否启用schema的子方法排除filed(默认启动,在schema load内部设置为False,防止多次循环fields) :param valid_data: 已经校验ok的数据,直接赋值即可 ''' self.is_strat_schema_opts = is_strat_schema_opts self._set_fields(schema, type) self._valid_data = valid_data self.__tablename__ = getattr(schema, '__tablename__') self.__keyfield__ = getattr(schema, '__keyfield__') if getattr(schema, '__keyfield__', None) else 'id' self._childs = [] self._schema = schema # 兼容两套表结构 @property def key(self): if self.__keyfield__ == 'iIden' and self.iIden is not None: return self.iIden return self.id @key.setter def key(self, value): if self.__keyfield__ == 'iIden' and self.iIden is not None: self.iIden = value return self.id = value @property def billId(self): if self.iHdrId is not None: return self.iHdrId return self.bill_id @billId.setter def billId(self, value): if self.iHdrId is not None: self.iHdrId = value return self.bill_id = value def _get_fields(self, schema, type): fields_data = None exclude_data = None __fields = getattr(schema, '_declared_fields', None) if not __fields: raise HSException('schema的fields属性不能为空') if type is None: return __fields # 启动schema_data自身字段排除 elif type == HSSchemaDataModel.Insert and self.is_strat_schema_opts: fields_data = schema().get_fields_data(schema().add_fields()) exclude_data = schema().get_fields_data(schema().add_exclude()) elif type == HSSchemaDataModel.Update and self.is_strat_schema_opts: fields_data = schema().get_fields_data(schema().update_fields()) exclude_data = schema().get_fields_data(schema().update_exclude()) temp_fields = {} if fields_data: for key, value in __fields.items(): for item in fields_data: if key == item: temp_fields[key] = value else: temp_fields = __fields if exclude_data: for field in exclude_data: temp_fields.pop(field) return temp_fields def _set_fields(self, schema, type): __fields = self._get_fields(schema, type) self._columns_define = __fields for field, attribute in self._columns_define.items(): if not isinstance(attribute.default, _Missing): setattr(self, field, attribute.default) def __getattr__(self, item): if self._valid_data: return self._valid_data.get(item) elif item not in self._columns_define.keys(): # raise HSException('找不到属性[{}]'.format(item)) return None def __setattr__(self, key, value): if key == '_valid_data': self.__dict__[key] = value return if key == 'key': # 调用setter方法 super(schema_data, self).__setattr__(key, value) return if key == 'billId': super(schema_data, self).__setattr__(key, value) return self._schema_valdiate(key, value) self.__dict__[key] = value def _schema_valdiate(self, key, value): if key in self._exclude_attribute(): return try: data = {} fields_dict = {} data[key] = value fields_dict[key] = self._columns_define[key] Unmarshaller().deserialize(data, fields_dict) except KeyError as error: raise HSException('找不到属性[{}]', error) def _exclude_attribute(self): ''' 需要排除校验的属性 :return: ''' return ['_columns_define', 'is_strat_schema_opts', 'id', '_childs', '__tablename__', '_schema', '__keyfield__', 'bill_id'] def set_child(self, schema_data_list): ''' 设置子集,多用于存储明细数据 :param schema_data_list: schema_data_list :return: ''' self._childs.append(schema_data_list) def retrive_fields(self, *args): ''' 查回字段并填充(前提是id必须是有值) example: retrive_fields('name','size_name') :param args: 字段名 :return: ''' if not self.key: raise HSException('主键不存在') fields = ','.join(args) # # 如果是iIden表结构,但是传入了bill_id,bill_status关键字,替换成相应的字段 # if self.__keyfield__ == 'iIden': # fields = fields.replace('bill_id','iHdrId') # fields = fields.replace('bill_status','iBillStatus') sql = 'select {} from dbo.{}(NOLOCK) where {} = {}'.format(fields, self.__tablename__, self.__keyfield__, self.key) data = g.hs_session.db_session.execute(sql).first() if not data: return for item in args: setattr(self, item, getattr(data, item)) return self def load(self, load_data, only_set_null_field=True, field_map=None, exclude_field=None): ''' schema自动补充data的值 :param load_data : data :param only_set_null_field bool: 是否只设置空值 :param field_map dict: 字段名映射{'id':'iIden'} key为load_data的字段名 value为被load的schema_data的字段名 :param exclude_field: 排除要load的字段 'id,name,size_name' or ['id','name','size_name'] :return: ''' # result类型(model查出来的result类型结果,该类型是映射出来的一个类型,没有实际类型,暂时用Sequence判断) if isinstance(load_data, Sequence): fields = load_data._real_fields # dict类型(json_data) elif isinstance(load_data, dict): fields = [] for field in load_data.keys(): fields.append(field) # schema_data类型 elif isinstance(load_data, schema_data): fields = load_data.__fields.keys() else: raise HSException('暂时不支持当前类型转换') # 字符串转字典 if field_map and isinstance(field_map, str): field_map = dict(field_map) # 排除字段 if exclude_field and isinstance(exclude_field, str): exclude_field = exclude_field.split(',') for field in fields: for schema_field in self._columns_define.keys(): # 排除 if exclude_field and field in exclude_field: continue new_field = None if field_map: try: new_field = field_map.get(field) except KeyError: new_field = None if schema_field == field or schema_field == new_field: if schema_field != field: schema_field_data = getattr(self, new_field, None) else: schema_field_data = getattr(self, schema_field, None) if isinstance(load_data, dict): field_data = load_data.get(field) else: field_data = getattr(load_data, field, None) if only_set_null_field and not schema_field_data: setattr(self, field, field_data) elif not only_set_null_field: setattr(self, field, field_data) return self def validate_none(self): ''' 校验数据中的为空数据是否存在不能为空的属性 :return: ''' for key in self._columns_define.keys(): value = getattr(self, key, None) if value is None: setattr(self, key, None) class schema_data_list(object): ''' schema_data的一个list载体 有where,sum等便捷查询的属性 ''' def __init__(self, schema): ''' 初始化 :param schema: schema ''' self.__data_list__ = [] self.__tablename__ = schema.__tablename__ self._schema = schema @property def data(self): ''' 返回数据 :return: ''' return self.__data_list__ def append(self, schema_data): ''' 添加schema_data :param schema_data: :return: ''' self.__data_list__.append(schema_data) def remove(self, schema_data): ''' 删除schema_data :param schema_data: :return: ''' self.__data_list__.remove(schema_data) def load(self, load_data, update_status=3, only_set_null_field=True, field_map=None, key_field_map=None, exclude_field=None): ''' schema自动补充data的值 :param load_data : data :param update_status int: 1-新增,2-更新,3-新增和更新 新增下,只会新增schema_data中没有的数据 更新下,只会更新schem_data中有的数据 :param only_set_null_field bool: 是否只设置空值 :param field_map dict: 字段名映射{'id':'iIden'} key为load_data的字段名 value为被load的schema_data的字段名 :param key_field_map:关联字段映射,用于辨认唯一数据{'id':'id'} :param exclude_field: 排除要load的字段 'id,name,size_name' or ['id','name','size_name'] :return: ''' if not key_field_map: key_field_map = {'key': 'key'} for key, field in key_field_map.items(): _key = key _field = field break key_list = [] # 被load的数据集key数据 for schema_item_data in self.data: if not schema_item_data: break value = getattr(schema_item_data, _field) if not value is None: key_list.append(value) # load的数据集 for load_item_data in load_data: if key_list == []: temp_schema_data = self._schema().new_object() temp_schema_data.load(load_data=load_item_data, only_set_null_field=only_set_null_field, field_map=field_map, exclude_field=exclude_field) self.append(temp_schema_data) continue if isinstance(load_item_data, dict): index = load_item_data.get(_key) else: index = getattr(load_item_data, _key) if index not in key_list and update_status in (1, 3): temp_schema_data = self._schema().new_object() temp_schema_data.load(load_data=load_item_data, only_set_null_field=only_set_null_field, field_map=field_map, exclude_field=exclude_field) self.append(temp_schema_data) elif index in key_list and update_status in (2, 3): schema_item_data.load(load_data=load_item_data, only_set_null_field=only_set_null_field, field_map=field_map, exclude_field=exclude_field) def where(self, formula=None): ''' 筛选schema_data数据 example: where(formula=lambda id,quantity:id and quantity>100) :param formula: 查询函数 :return: ''' if not isfunction(formula): raise HSException('format参数应该为lamdda表达式或者函数') field_list = [] for field in formula.__code__.co_varnames: field_list.append(field) _schema_data_list = [] for schema_item_data in self.data: value_list = [] for field in field_list: value_list.append(getattr(schema_item_data, field, None)) if formula(*value_list): _schema_data_list.append(schema_item_data) new_schema_data_list = schema_data_list(self._schema) setattr(new_schema_data_list, '__data_list__', _schema_data_list) return new_schema_data_list def clone(self): ''' 克隆list中的所有schema_data数据,以便修改数据不会影响之前的数据(深拷贝) :return: ''' new_data_list = [] for schema_item_data in self.data: new_data_list.append(type('schema_data', schema_data.__bases__, dict(schema_item_data.__dict__))) new_schema_data_list = schema_data_list(self._schema) setattr(new_schema_data_list, '__data_list__', new_data_list) return new_schema_data_list def sum(self, filed): ''' 求和 example:sum(filed='quantity') :param filed: :return: ''' sum_value = 0 for schema_item_data in self.data: value = getattr(schema_item_data, filed, 0) sum_value += value return sum_value def join_text(self, filed, symbol=','): ''' 字符串联某个字段 example: join_text(filed='id') or join_text(filed='id',symbol='&') :param filed: :return: ''' text_list = [] for schema_item_data in self.data: value = getattr(schema_item_data, filed, '') text_list.append(str(value)) return symbol.join(text_list) class HSSchema(Schema): # 整单模式明细中的状态字段 iUpdateStatus = fields.Integer() id = fields.Integer(allow_none=True) bill_id = fields.Integer(allow_none=True) def new_object(self, data=None, type=None, many=False, **args): ''' 生成schema_data :param data:如果有值,转换后,自动赋值到schem_data上 :param type: json_Data为空的时候,type生效; type为HSSchemaDataModel.Insert时候,启动schema类的新增包含和排除方法 type为HSSchemaDataModel.Update时候,启动schema类的更新包含和排除方法 :param many:True多条数据,False单条数据 :param args:赋值数据,many=False才有效 :return: ''' if not data: data = schema_data(schema=self.__class__, type=type) # elif isinstance(json_data,DefaultMeta): # self.new_object(json_data=json_data,many=many) elif many: data_list = schema_data_list(self) for item_data in data: item_data = self.object_to_dict(data=item_data) # 支持两套主键 if item_data.get('id') or item_data.get('iIden'): data_list.append(self.load_update_data(json_data=item_data, many=False)) else: data_list.append(self.load_add_data(json_data=item_data, many=False)) return data_list elif not many: data = self.object_to_dict(data=data) # 支持两套主键 if data.get('id') or data.get('iIden'): data = self.load_update_data(json_data=data, many=False) else: data = self.load_add_data(json_data=data, many=many) if args: for key, value in args.items(): setattr(data, key, value) return data def new_field(self, type): if type == 'BigInteger': return fields.Integer() elif type == 'Integer': return fields.Integer() elif type == 'DateTime': return fields.DateTime(format='%Y-%m-%d %H:%M:%S') elif type == 'Date': return fields.DateTime(format='%Y-%m-%d') elif type == 'Bit': return fields.Boolean() elif type == 'Decimal': return fields.Number(as_string=True) else: return fields.String() def dict_to_object(self, data): if not data: return None object_data = schema_data(schema=self.__class__, type=self._schema_data_model, is_strat_schema_opts=False, valid_data=data) # object_data.__dict__ = data # if not getattr(object_data, 'id', None): # setattr(object_data, 'id', None) return object_data @post_load def make_object(self, data): return self.dict_to_object(data) def load_add_data(self, json_data, many=False): self.json_data = json_data self._set_meta(type=HSSchemaDataModel.Insert) data, error = self.load(json_data, many=many) if error: raise RuntimeError(error) return data def load_update_data(self, json_data, many=False): self.json_data = json_data self._set_meta(type=HSSchemaDataModel.Update) data, error = self.load(json_data, many=many) if error: raise RuntimeError(error) return data def get_fields_data(self, fields): _fields = fields if not _fields: return None if isinstance(_fields, str): _fields = _fields.split(',') if isinstance(_fields, tuple): _fields = list(_fields) return _fields def add_exclude(self): ''' 新增的时候,要排除的字段 def add_exclude(self): return 'a','b','c' :return: ''' pass def add_fields(self): ''' 新增的时候,要包括的字段 def add_fields(self): return 'a','b','c' :return: ''' pass def update_fields(self): ''' 更新的时候,要包括的字段 def update_fields(self): return 'a','b','c' :return: ''' pass def update_exclude(self): ''' 更新的时候,要排除的字段 def update_exclude(self): return 'a','b','c' :return: ''' pass def _get_fields(self, type): fields = [] _json_data = self.json_data if not _json_data: raise RuntimeError('json_data 不能为空') if isinstance(_json_data, list): _json_data = _json_data[0] if type == HSSchemaDataModel.Insert: for key in self.declared_fields.keys(): fields.append(key) fields_data = self.get_fields_data(self.add_fields()) exclude_data = self.get_fields_data(self.add_exclude()) elif type == HSSchemaDataModel.Update: for key in _json_data.keys(): fields.append(key) fields_data = self.get_fields_data(self.update_fields()) exclude_data = self.get_fields_data(self.update_exclude()) if fields_data: fields = list(set(fields) ^ set(fields_data)) if exclude_data: for field in exclude_data: fields.remove(field) return fields def _set_meta(self, type=HSSchemaDataModel.Insert): self._schema_data_model = type _fields = self._get_fields(type=type) if not _fields: return # setattr(self.Meta, 'fields', tuple(_fields)) self.fields = {} for key, value in self.declared_fields.items(): for item in _fields: if item == key: self.fields[item] = value def object_to_dict(self, data): if isinstance(data, dict): return data elif isinstance(data, str): return dict(data) elif isinstance(data, DefaultMeta): raise HSException('不支持当前类型的转换,请用query_model方法查询数据') elif isinstance(data, Sequence): dict_data = {} keys = data._real_fields for key in keys: value = getattr(data, key) if isinstance(value, datetime): value = str(value)[:19] elif isinstance(value, UUID): value = str(value) dict_data[key] = value return dict_data class HSMoudlelSchema(HSSchema): __tablename__ = ''