import os import sqlite3 from collections import OrderedDict from static_file import temp_file_dir create_table_sql_list = ['''create table if not exists project_info (id bigint primary key , project_no varchar(50) unique , db_ip varchar(50) , db_port varchar(50) , db_user varchar(50) , db_password varchar(50) , mes_db_name varchar(100) , host_ip varchar(50) , tiip_db_name varchar(100) default '' , right_db_name varchar(100) default '' , create_time date default (datetime('now','localtime')) )''' , '''create table if not exists app_upgrade_log (id bigint primary key , status varchar(50) , default_version bit , upgrade_time date default (datetime('now','localtime')) , upgrade_no varchar(50) )''' , '''create table if not exists app_upgrade_log_dtl (id integer primary key autoincrement , log_id bigint , app_code varchar(50) , app_name varchar(100) , app_image_id varchar(200) , app_image_name varchar(200) )''' , '''create table if not exists remote_server_info (id bigint , project_no varchar(50) unique , work_shop_no varchar(50) , server_ip varchar(50) , server_ssh_port varchar(50) , server_user varchar(50) , server_password varchar(50) )''' , '''create table if not exists remote_server_db_info (id bigint primary key , project_no varchar(50) , work_shop_no varchar(50) , db_ip varchar(50) , db_port varchar(50) , db_user varchar(50) , db_password varchar(50) , mes_db_name varchar(100) )''' ] # createtabsql1 = "create table if not exists scriptdata(id integer primary key autoincrement, name varchar(128), info varchar(128))" class DBDriver: def __init__(self, dbfile=None): if not dbfile: dbfile = os.path.join(temp_file_dir, 'data.db') self.dbfile = dbfile def cerateDB(self): self.conn = sqlite3.connect(self.dbfile, check_same_thread=False) # self.conn.isolation_level = None return def execDB(self, execsql): self.conn.execute(execsql) # self.conn.commit() return def exec_sql(self, sql, *args, **kwargs): cur = self.conn.cursor() cur.execute(sql, *args, **kwargs) return cur def retrive_sql(self, sql, *args, **kwargs): cur = self.exec_sql(sql, *args, **kwargs) fields = [item[0] for item in cur.description] res = cur.fetchone() if res is None: return res res = self.format_result(res) res_dict = dict(map(lambda x, y: [x, y], fields, res)) cur.close() return res_dict def query_sql(self, sql, *args, **kwargs): cur = self.exec_sql(sql, *args, **kwargs) fields = [item[0] for item in cur.description] res = cur.fetchall() res = self.format_result(res) res_list = [] for r in res: res_dict = OrderedDict(map(lambda x, y: [x, y], fields, r)) res_list.append(res_dict) cur.close() return res_list def __enter__(self): self.cerateDB() return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_val: self.conn.rollback() raise exc_val else: self.conn.commit() # self.conn.close() def format_result(self, res): item_list = [] if isinstance(res, list): for r in res: item_list.append(self.format_result(r)) elif isinstance(res, tuple): for r in res: if isinstance(r, int): item_list.append(str(r)) else: item_list.append(r) else: item_list = res return item_list db_driver = DBDriver()