# -*- coding:utf-8 -*- from _socket import timeout from huansi_utils.db.db import new_id from huansi_utils.exception.exception import HSException from huansi_utils.server.service_uc import HSBaseUCService from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from app.utils.db_tools import db_driver class ConnectionService(HSBaseUCService): def get_project_info(self): ''' 获取项目信息 :return: ''' with db_driver as session: project_info = session.retrive_sql('select * from project_info') return project_info def set_project_info(self, json_data): ''' 设置项目信息 :param json:{"project_no": "j1kq", "db_ip": "47.97.206.38", "db_port": "9610", "db_user": "000", "db_password": "000", "mes_db_name": "HSGmtMes", "tiip_db_name": "HSTIIP"} :return: ''' id = json_data.get('id') with db_driver as session: if id: update_sql = '''update project_info set project_no=:project_no,db_ip=:db_ip,db_port=:db_port,db_user=:db_user ,db_password=:db_password,mes_db_name=:mes_db_name,tiip_db_name=:tiip_db_name,host_ip=:host_ip where id=:id''' session.exec_sql(update_sql, json_data) else: id = new_id() json_data['id'] = id insert_sql = '''insert into project_info (id,project_no,db_ip,db_port,db_user ,db_password,mes_db_name,tiip_db_name,host_ip) values (:id,:project_no,:db_ip,:db_port,:db_user ,:db_password,:mes_db_name,:tiip_db_name,:host_ip)''' session.exec_sql(insert_sql, json_data) data = session.retrive_sql('select * from project_info where id=:id', {"id": id}) if not data: raise HSException('保存失败') import os from static_file import profile_dir project_info_path = os.path.join(profile_dir, 'huansi.sh') # 写入 with open(project_info_path, "w") as f: for k, v in data.items(): k = self.math_name(k) if k in ['ID', 'CREATE_TIME']: continue f.writelines("export {}={}\n".format(k, v)) # mes的数据库写两次,兼容之前的 if k == 'HSDB_NAME': f.writelines("export {}={}\n".format('MESDB_NAME', v)) return {'message': "保存成功"} def math_name(self, k): ''' 匹配名称 :param k: :return: ''' if k == 'project_no': return 'HSCUSCODE' if k == 'db_ip': return 'HSDB_HOST' if k == 'db_port': return 'HSDB_PORT' if k == 'db_user': return 'HSDB_USER' if k == 'db_password': return 'HSDB_PASSWORD' if k == 'mes_db_name': return 'HSDB_NAME' else: return k.upper() def test_connection(self, project_no, db_name): ''' 测试数据库连接 :param project_no: :param db_name: :return: ''' with db_driver as session: project_info = session.retrive_sql(f"select * from project_info where project_no='{project_no}'") if not project_info: raise HSException(f'未找到【{project_no}】的信息') db_ip = project_info['db_ip'] db_port = project_info['db_port'] db_user = project_info['db_user'] db_password = project_info['db_password'] test_db_name = db_name con_str = f'mssql+pymssql://{db_user}:{db_password}@{db_ip}:{db_port}/{test_db_name}' engine = create_engine(con_str) DBSession = sessionmaker(bind=engine) try: session = DBSession() result = session.execute("select test='huansi'") data = result.fetchone() if data.test == 'huansi': return {'message': '连接成功'} except Exception as e: orig = getattr(e, 'orig') args = getattr(orig, 'args', ((),))[0] error_message = '' for e_item in args: if isinstance(e_item, bytes): try: e_item = e_item.decode() except: e_item = str(e_item) error_message += str(e_item) if not error_message: error_message = str(e) print("数据库连接失败,失败原因:{}".format(error_message)) return {'message': '连接失败'} def get_remote_server_info(self): ''' 获取远端服务器信息 :return: ''' with db_driver as session: remote_server_info = session.retrive_sql('select * from remote_server_info') return remote_server_info def set_remote_server_info(self, json_data): ''' 设置远端服务器信息 :return: ''' id = json_data.get('id') with db_driver as session: if id: update_sql = '''update remote_server_info set project_no=:project_no,server_ip=:server_ip,server_ssh_port=:server_ssh_port,server_user=:server_user ,server_password=:server_password where id=:id''' session.exec_sql(update_sql, json_data) else: id = new_id() json_data['id'] = id insert_sql = '''insert into remote_server_info (id,project_no,server_ip,server_ssh_port,server_user ,server_password) values (:id,:project_no,:server_ip,:server_ssh_port,:server_user ,:server_password)''' session.exec_sql(insert_sql, json_data) return {'message': "保存成功"} def test_port(self, port): project_info = self.get_project_info() if not project_info: raise HSException('项目信息未查到,请先配置') import telnetlib host_ip = project_info.get('host_ip') try: telnetlib.Telnet(host=host_ip, port=port, timeout=2) except timeout: raise HSException(f'{host_ip}:{port}连接超时') except Exception as e: raise HSException(f'{host_ip}:{port}连接失败')