Commit c62ff4b2 authored by 金凯强's avatar 金凯强 🎨

2.0初版

parent 20e5f79a
FROM 47.110.145.204:8084/huansi/python3:slim-util-0.0.3 FROM 47.110.145.204:8084/huansi/python3:slim-util-0.0.1
COPY . . COPY . .
......
...@@ -5,6 +5,16 @@ import requests ...@@ -5,6 +5,16 @@ import requests
if __name__ == '__main__': if __name__ == '__main__':
data = {'user_name': 'jkq', 'password': 'jkq_test'} data = {'user_name': 'jkq', 'password': 'jkq_test'}
json_data = ['{"app_code":"Portainer","app_name":"Docker可视化管理工具","web_port":23001}',
'{"app_code":"Tool","id":"8","app_name":"环思工具集","api_port":23280,"web_version":"1.0"}',
'{"app_code":"RabbitMQ","app_name":"RabbitMQ服务","web_port":23281,"api_port":23282,"api_port2":8140}',
'{"app_code":"tmc_web","id":"48","app_name":"大屏监控中心APP","web_port":23200,"web_version":"1.0"}',
'{"app_code":"nginx","id":"999","app_name":"Nginx","api_port":59168,"api_port2":59169,"api_version":"1.0"}']
res = requests.post('http://localhost:5000/application/jkq_test/', json=json_data)
print(res.json())
# # 设置登录信息 # # 设置登录信息
# requests.post('http://localhost:5000/authorization/',json=data) # requests.post('http://localhost:5000/authorization/',json=data)
# #
......
# -*- coding:utf-8 -*-
\ No newline at end of file
# -*- coding:utf-8 -*-
from flask import request
from app.application.application_service import ApplicationService
from huansi_utils.webapi import ApiController
from app import api
from app.authorization.authorization_service import AuthorizationService
@api('application')
class ApplicationAPI(ApiController):
@api('<string:project_no>')
def get_application_list(self, project_no):
'''
获取app列表
:return:
'''
ApplicationService().get_application_list(project_no)
@api('<string:project_no>')
def post_edit_application_setting(self, project_no):
'''
修改app列表
:param project_no:
:return:
'''
ApplicationService().edit_application_list(project_no, request.json)
@api('test_port/<string:project_no>')
def get_port_list(self, project_no):
'''
获取端口号列表
:param project_no:
:return:
'''
ApplicationService().get_port_list(project_no)
@api('test_port/<string:project_no>')
def post_port_list(self, project_no):
'''
批量测试端口
:param project_no:
:return:
'''
ApplicationService().test_port(project_no, request.json)
# -*- coding:utf-8 -*-
import json
import os
import re
import time
from git import GitCommandError
from app.conncetion.conncetion_service import ConnectionService
from app.utils.common_tools import CommonTools
from app.utils.db_tools import db_driver
from flask_app import git_path, repo
from huansi_utils.common.string import between_string
from huansi_utils.exception.exception import HSException
class ApplicationService():
def __init__(self):
self.init_validate()
def get_application_list(self, project_no):
'''
获取application列表
:return:
'''
self.checkout(project_no)
compose_content = self.get_compose_content()
# 获取app列表
server_app_list = re.findall(r"# ({\"app_code\":.*})", compose_content)
# 获取本地App列表
with db_driver as session:
query_sql = '''select A.*
from app_upgrade_log_dtl A
join app_upgrade_log B on B.id=A.log_id
where B.default_version=1'''
local_app_list = session.query_sql(query_sql)
result_list = []
for server_app in server_app_list:
app_str = server_app
server_app = json.loads(server_app)
server_app['app_str'] = app_str
server_app_code = server_app['app_code']
server_app['b_download'] = False
for local_app in local_app_list:
local_app_code = local_app['app_code']
if server_app_code == local_app_code:
server_app['b_download'] = True
break
result_list.append(server_app)
return result_list
def get_compose_content(self):
self.compose_path = os.path.join(git_path, 'docker-compose.yml')
# 读取文件
with open(self.compose_path, 'r', encoding='utf8') as f:
compose_content = f.read().replace('\r', '')
return compose_content
def checkout(self, project_no):
try:
repo.git.checkout(project_no)
repo.git.pull()
except GitCommandError as e:
raise HSException(f'切换分支报错,报错原因:{e},联系技术人员协调解决')
def init_validate(self):
'''
初始化验证
:return:
'''
if not os.path.exists(git_path):
raise HSException('未能找到Git仓库,联系技术人员协调解决')
def edit_application_list(self, project_no, json_data):
'''
修改应用列表并上传git
:param project_no:
:param json:
:return:
'''
# CommonTools.validate_runner_is_install()
self.checkout(project_no)
compose_content = self.get_compose_content()
# 全选是*
if json_data == '*':
app_list_str = '*'
else:
app_list = []
for app_str in json_data:
app = re.findall(f'# {app_str} *\n *(.*):', compose_content)
if not app or len(app) > 1:
raise HSException('获取app出错,联系技术人员协调解决')
app_list.append(app[0])
app_list_str = ' '.join(app_list)
compose_content = re.sub('# app_list: "(.*)"', f'# app_list: "{app_list_str}"', compose_content)
with open(self.compose_path, 'w', encoding='utf8') as f:
f.write(compose_content)
# ---------------------
download_log_path = os.path.join(git_path, 'download_log.log')
if os.path.exists(download_log_path):
with open(download_log_path, 'r') as f:
log_content = f.read() + '\n'
else:
with open(download_log_path, 'a+') as f:
log_content = f.read()
log_content = f"{log_content}{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))}===>[{app_list_str}]"
with open(download_log_path, 'w', encoding='utf8') as f:
f.write(log_content)
repo.git.add([self.compose_path, download_log_path])
repo.git.commit(f"-m {time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))}===>[{app_list_str}]")
repo.git.push()
def get_port_list(self, project_no):
'''
获取端口号列表
:param project_no:
:return:
'''
self.checkout(project_no)
compose_content = self.get_compose_content()
# 获取app列表
app_str_list = re.findall(r"# ({\"app_code\":.*})", compose_content)
app_list = []
for app_str in app_str_list:
_dict = {}
app_dict = json.loads(app_str, encoding='utf8')
app_code = app_dict.get('app_code')
_dict['app_code'] = app_code
api_port = app_dict.get('api_port')
if api_port:
_dict['port'] = api_port
app_list.append(_dict)
api_port2 = app_dict.get('api_port2')
if api_port2:
_dict['port'] = api_port2
app_list.append(_dict)
web_port = app_dict.get('web_port')
if web_port:
_dict['port'] = web_port
app_list.append(_dict)
web_port2 = app_dict.get('web_port2')
if web_port2:
_dict['port'] = web_port2
app_list.append(_dict)
return app_list
def test_port(self, project_no, json_data):
project_info = ConnectionService().get_project_info(project_no)
host_ip = project_info['host_ip']
info_list = []
for port_info in json_data:
info_dict = {}
info_dict['port'] = port_info['port']
info_dict['app_code'] = port_info['app_code']
try:
self.test_connect_port(host_ip, port_info['port'])
info_dict['success'] = True
except Exception:
info_dict['success'] = False
info_list.append(info_dict)
return info_list
import timeout_decorator
# 超过两秒,基本上没救了,告辞~~~~
@timeout_decorator.timeout(2)
def test_connect_port(self, ip, port):
'''
检查端口是否开放
:param ip:
:param port:
:return:
'''
import socket
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if not port:
port = 80
try:
sk.connect((ip, int(port)))
sk.shutdown(2)
finally:
sk.close()
...@@ -9,12 +9,22 @@ from app.conncetion.conncetion_service import ConnectionService ...@@ -9,12 +9,22 @@ from app.conncetion.conncetion_service import ConnectionService
@api('connection') @api('connection')
class ConnectionAPI(ApiController): class ConnectionAPI(ApiController):
@api() @api()
def get_project_info(self): def get_project_no_list(self):
''' '''
获取项目信息 获取客户代码列表
:return: :return:
''' '''
return ConnectionService().get_project_info() args = request.args
project_no = args.get('project_no', '')
return ConnectionService().get_project_no_list(project_no)
@api('<string:project_no>')
def get_project_info(self, project_no):
'''
获取项目信息 通过项目编号获取详细信息
:return:
'''
return ConnectionService().get_project_info(project_no, sync_file=True)
@api() @api()
def post_set_project_info(self): def post_set_project_info(self):
...@@ -24,23 +34,21 @@ class ConnectionAPI(ApiController): ...@@ -24,23 +34,21 @@ class ConnectionAPI(ApiController):
''' '''
return ConnectionService().set_project_info(request.json) return ConnectionService().set_project_info(request.json)
@api('<string:project_no>/<string:db_name>') @api('test_db_connection')
def get_test_conncetion(self, project_no, db_name): def post_test_connection(self):
''' '''
测试项目数据库连接 测试项目数据库连接
:param project_no:
:param db_name:
:return: :return:
''' '''
return ConnectionService().test_connection(project_no, db_name) return ConnectionService().test_connection(request.json)
@api('remote_server') @api('remote_server/<string:project_no>/')
def get_remote_server(self): def get_remote_server(self, project_no):
''' '''
获取远程服务器连接 获取远程服务器连接
:return: :return:
''' '''
return ConnectionService().get_remote_server_info() return ConnectionService().get_remote_server_info(project_no)
@api('remote_server') @api('remote_server')
def post_set_remote_server_info(self): def post_set_remote_server_info(self):
...@@ -50,27 +58,27 @@ class ConnectionAPI(ApiController): ...@@ -50,27 +58,27 @@ class ConnectionAPI(ApiController):
''' '''
return ConnectionService().set_remote_server_info(request.json) return ConnectionService().set_remote_server_info(request.json)
@api('remote_db') @api('work_shop/<string:project_no>')
def post_set_remote_db_info(self): def get_work_shop_list(self, project_no):
''' '''
设置远程服务器数据库配置 获取车间列表
:param project_no:
:return: :return:
''' '''
return ConnectionService().set_remote_db_info(request.get_json()) return ConnectionService().get_work_shop_list(project_no)
@api('remote_db') @api('remote_db')
def get_remote_db_info(self): def post_set_remote_db_info(self):
''' '''
获取远程服务器数据库配置 设置远程服务器数据库配置
:return: :return:
''' '''
return ConnectionService().get_remote_db_info() return ConnectionService().set_remote_db_info(request.get_json())
@api('test_port/<int:port>') @api('remote_db/<string:project_no>/<string:work_shop_no>')
def get_test_port(self, port): def get_remote_db_info(sel, project_no, work_shop_no):
''' '''
测试端口是否开放 获取远程服务器数据库配置
:param port:
:return: :return:
''' '''
return ConnectionService().test_port(port) return ConnectionService().get_remote_db_info(project_no, work_shop_no, sync_file=True)
This diff is collapsed.
...@@ -43,7 +43,6 @@ class InfoAPI(ApiController): ...@@ -43,7 +43,6 @@ class InfoAPI(ApiController):
args = request.args args = request.args
return InfoService().get_upgrade_log_dtl(log_id, args) return InfoService().get_upgrade_log_dtl(log_id, args)
@api('logs') @api('logs')
def get_install_logs_info(self): def get_install_logs_info(self):
''' '''
......
...@@ -16,42 +16,18 @@ class InstallAPI(ApiController): ...@@ -16,42 +16,18 @@ class InstallAPI(ApiController):
time.sleep(int(s)) time.sleep(int(s))
return 1 return 1
@api('runner') @api('runner/<string:project_no>')
def get_list(self): def get_list(self, project_no):
''' '''
安装runner 安装runner
:return: :return:
''' '''
return InstallService().install_runner() return InstallService().install_runner(project_no)
@api('runner/step1') @api('remote_server/docker/<string:project_no>')
def get_install_runner_step1(self): def get_install_remote_server_docker(self, project_no):
'''
安装runner1
:return:
'''
return InstallService().install_huansi_runner_step1()
@api('runner/step2')
def get_install_runner_step2(self):
'''
安装runner2
:return:
'''
return InstallService().install_huansi_runner_step2()
@api('runner/step3')
def get_install_runner_step3(self):
'''
安装runner3
:return:
'''
return InstallService().install_huansi_runner_step3()
@api('remote_server/docker/')
def get_install_remote_server_docker(self):
''' '''
远端服务器安装docker 远端服务器安装docker
:return: :return:
''' '''
return InstallService().install_remote_service_docker() return InstallService().install_remote_service_docker(project_no)
# -*- coding:utf-8 -*- # -*- coding:utf-8 -*-
import os import os
import threading
import time import time
from threading import Thread from threading import Thread
from app.utils.thread_tools import ThreadTools
from huansi_utils.app.apploader import logger from huansi_utils.app.apploader import logger
from huansi_utils.exception.exception import HSException from huansi_utils.exception.exception import HSException
from huansi_utils.server.service_uc import HSBaseUCService from huansi_utils.server.service_uc import HSBaseUCService
...@@ -14,30 +16,35 @@ from static_file import system_file_dir ...@@ -14,30 +16,35 @@ from static_file import system_file_dir
class InstallService(HSBaseUCService): class InstallService(HSBaseUCService):
def install_runner(self): def install_runner(self, project_no):
''' '''
安装runner 安装runner
:return: :return:
''' '''
t = Thread(target=self._install_runner) # 每次只能有一个推送进程
for thread_item in threading.enumerate():
if not isinstance(thread_item, threading.Thread):
continue
if thread_item.name == 'install_runner':
logger.info('发现之前一个线程正在处理,杀死它,重新启动')
# 停止线程
ThreadTools().stop_thread(thread_item)
t = Thread(target=self._install_runner, args=(project_no,), name='install_runner')
t.start() t.start()
return {"message": "开始安装runner"} return {"message": "开始安装runner"}
def _install_runner(self): def _install_runner(self, project_no):
logger.info('开启安装runner')
from flask_app import global_app from flask_app import global_app
with global_app.app_context(): with global_app.app_context():
from flask import g from flask import g
g.user = {} g.user = {}
g.language = 'cn' g.language = 'cn'
conncetion_service = ConnectionService() self.install_huansi_runner(project_no)
project_info = conncetion_service.get_project_info()
if not project_info:
logger.info('runner安装失败')
raise HSException('项目信息未查到,请先配置')
project_code = project_info['project_no']
self.install_huansi_runner(project_code)
def install_huansi_runner(self, HSCUSCODE): def install_huansi_runner(self, HSCUSCODE):
''' '''
...@@ -58,55 +65,13 @@ class InstallService(HSBaseUCService): ...@@ -58,55 +65,13 @@ class InstallService(HSBaseUCService):
self.start_runner() self.start_runner()
logger.info('runner安装成功') logger.info('runner安装成功')
def install_huansi_runner_step1(self):
'''
安装runner第一步
:param HSCUSCODE:
:return:
'''
# 测试下载时间
conncetion_service = ConnectionService()
project_info = conncetion_service.get_project_info()
if not project_info:
raise HSException('项目信息未查到,请先配置')
project_code = project_info['project_no']
self.delete_setting_file()
s1 = os.system("docker run --rm -i -v /etc/gitlab-runner:/etc/gitlab-runner gitlab/gitlab-runner register \
-n -u http://47.110.145.204:8085/ -r WwpzH4qk19KjvAjEwoTz --executor docker --docker-image docker \
--description huansi.{hscode} \
--tag-list huansi.{hscode}".format(hscode=project_code))
if s1 != 0:
raise HSException('gitlab-runner安装失败')
return {"message": "runner注册成功", "step": "1", "status": "running", "later_message": "正在修改runner配置..."}
def install_huansi_runner_step2(self):
'''
安装runner第二步
:param HSCUSCODE:
:return:
'''
self._edit_runner_setting()
self._remove_old_runner_container()
return {"message": "runner配置修改成功", "step": "2", "status": "running", "later_message": "正在重启runner..."}
def install_huansi_runner_step3(self):
'''
安装runner第三步
:param HSCUSCODE:
:return:
'''
self.start_runner()
return {"message": "runner重启成功", "step": "3", "status": "success"}
def _edit_runner_setting(self): def _edit_runner_setting(self):
logger.info('修改配置文件') logger.info('修改配置文件')
os.system("chmod 777 /etc/gitlab-runner/config.toml") os.system("chmod 777 /etc/gitlab-runner/config.toml")
with open('/etc/gitlab-runner/config.toml', 'r') as f: with open('/etc/gitlab-runner/config.toml', 'r') as f:
s = f.read() s = f.read()
a = s.replace('volumes = ["/cache"]', a = s.replace('volumes = ["/cache"]',
'volumes = ["/var/run/docker.sock:/var/run/docker.sock","/etc/profile.d/huansi.sh:/etc/profile.d/huansi.sh" ,"/cache","/huansi/gitlab-runner/builds:/builds/hs"]') 'volumes = ["/var/run/docker.sock:/var/run/docker.sock","/etc/profile.d/huansi.sh:/etc/profile.d/huansi.sh" ,"/cache","/huansi/gitlab-runner/builds:/builds/hs","/data/nginx/conf/:/data/nginx/conf/"]')
with open('/etc/gitlab-runner/config.toml', 'w') as f: with open('/etc/gitlab-runner/config.toml', 'w') as f:
f.write(a) f.write(a)
logger.info('修改配置文件成功') logger.info('修改配置文件成功')
...@@ -138,24 +103,26 @@ class InstallService(HSBaseUCService): ...@@ -138,24 +103,26 @@ class InstallService(HSBaseUCService):
os.system('rm -f /etc/gitlab-runner/config.toml') os.system('rm -f /etc/gitlab-runner/config.toml')
logger.info('删除配置文件成功') logger.info('删除配置文件成功')
def install_remote_service_docker(self): def install_remote_service_docker(self, project_no):
''' '''
安装远端服务器的docker 安装远端服务器的docker
:return: :return:
''' '''
t = Thread(target=self._install_remote_service_docker) t = Thread(target=self._install_remote_service_docker, args=(project_no,))
t.start() t.start()
return {"message": "开始安装docker"} return {"message": "开始安装docker"}
def _install_remote_service_docker(self): def _install_remote_service_docker(self, project_no):
logger.info('开启安装docker')
logger.info('期间需要一段时间,请耐心等待')
from flask_app import global_app from flask_app import global_app
with global_app.app_context(): with global_app.app_context():
from flask import g from flask import g
g.user = {} g.user = {}
g.language = 'cn' g.language = 'cn'
remote_server_info = ConnectionService().get_remote_server_info() remote_server_info = ConnectionService().get_remote_server_info(project_no)
if not remote_server_info: if not remote_server_info:
logger.info('docker安装失败') logger.info('docker安装失败')
raise HSException('远端服务器信息未查到,请先配置') raise HSException('远端服务器信息未查到,请先配置')
...@@ -170,20 +137,23 @@ class InstallService(HSBaseUCService): ...@@ -170,20 +137,23 @@ class InstallService(HSBaseUCService):
self.validate_remote_server_install_docker(ssh) self.validate_remote_server_install_docker(ssh)
# 先上传必要的文件 # 先上传必要的文件
logger.info('上传文件docker-19.03.4.tgz') logger.info('上传文件docker-19.03.4.tgz')
ssh.upload(os.path.join(system_file_dir, 'docker-19.03.4.tgz'), '/docker-19.03.4.tgz') ssh.upload('/data/docker-19.03.4.tgz', '/docker-19.03.4.tgz')
logger.info('上传文件docker-compose-Linux-x86_64') logger.info('上传文件docker-compose-Linux-x86_64')
ssh.upload(os.path.join(system_file_dir, 'docker-compose-Linux-x86_64'), ssh.upload('/data/docker-compose-Linux-x86_64', '/docker-compose-Linux-x86_64')
'/docker-compose-Linux-x86_64')
logger.info('上传文件install_docker.py') logger.info('上传文件install_docker.py')
ssh.upload(os.path.join(system_file_dir, 'install_docker.py'), '/install_docker.py') ssh.upload('/data/install_docker.py', '/install_docker.py')
# 执行命令 # 执行命令
logger.info('执行安装命令') logger.info('执行安装命令')
out, err = ssh.exec_command('cd / && python install_docker.py') out, err = ssh.exec_command('cd / && python install_docker.py')
logger.info(f'执行安装命令信息:{out},错误信息:{err}') logger.info(f'执行安装命令信息:{out},其他信息:{err}')
logger.info('执行重启命令') # logger.info('执行重启命令')
out2, err2 = ssh.exec_command('reboot') # out2, err2 = ssh.exec_command('reboot')
logger.info(f'执行重启命令信息:{out2},错误信息:{err2}') # logger.info(f'执行重启命令信息:{out2},错误信息:{err2}')
logger.info(f'执行成功,静等服务器重启。。。') # logger.info(f'执行成功,静等服务器重启。。。')
logger.info('******************************************************')
logger.info('*********需要在服务器上手工执行reboot重启服务器************')
logger.info('*********如果有疑问,请联系开发人员***********************')
logger.info('******************************************************')
logger.info('docker安装成功') logger.info('docker安装成功')
def validate_remote_server_install_docker(self, ssh): def validate_remote_server_install_docker(self, ssh):
......
...@@ -14,82 +14,41 @@ class UpgradeAPI(ApiController): ...@@ -14,82 +14,41 @@ class UpgradeAPI(ApiController):
3、导出安装包 3、导出安装包
4、导入安装包升级(不紧急) 4、导入安装包升级(不紧急)
''' '''
@api('apply')
def get_apply_db_setting(self):
'''
应用数据库配置
:return:
'''
return UpgradeService().apply_db_setting()
@api('rollback/<int:log_id>') @api('apply/<string:project_no>')
def get_rollback_upgrade(self, log_id): def get_apply_db_setting(self, project_no):
''' '''
回滚升级 应用数据库配置
:return:
'''
return UpgradeService().rollback_upgrade(log_id)
@api('remote')
def get_remote_upgrade(self):
'''
远程升级
:return:
'''
log_id = request.args.get('log_id')
return UpgradeService().remote_upgrade(log_id)
@api('remote/step1')
def get_remote_upgrade1(self):
'''
远程升级---1
:return:
'''
return UpgradeService().remote_upgrade_step1()
@api('remote/step2')
def get_remote_upgrade2(self):
'''
远程升级---2
:return:
'''
log_id = request.args.get('log_id')
return UpgradeService().remote_upgrade_step2(log_id)
@api('remote/step3/<int:app_count>/')
def get_remote_upgrade3(self, app_count):
'''
远程升级---3
:return: :return:
''' '''
log_id = request.args.get('log_id') return UpgradeService().apply_db_setting(project_no)
return UpgradeService().remote_upgrade_step3(app_count, log_id)
@api('remote/step4') @api('remote/server_db/<string:project_no>/<string:work_shop_no>')
def get_remote_upgrade4(self): def get_upload_remote_server_db(self, project_no, work_shop_no):
''' '''
远程升级---4 上传远程服务器数据库配置文件
:param project_no:
:return: :return:
''' '''
log_id = request.args.get('log_id') return UpgradeService().upload_remote_server_db(project_no, work_shop_no)
return UpgradeService().remote_upgrade_step4(log_id)
@api('remote/step5/<int:tar_file_index>') @api('rollback/<string:project_no>/<int:log_id>')
def get_remote_upgrade5(self, tar_file_index): def get_rollback_upgrade(self, project_no, log_id):
''' '''
远程升级---5 回滚升级
:return: :return:
''' '''
log_id = request.args.get('log_id') return UpgradeService().rollback_upgrade(project_no, log_id)
return UpgradeService().remote_upgrade_step5(tar_file_index, log_id)
@api('remote/step6') @api('remote/<string:project_no>/<string:work_shop_no>')
def get_remote_upgrade6(self): def post_remote_upgrade(self, project_no, work_shop_no):
''' '''
远程升级---6 远程升级
:return: :return:
''' '''
return UpgradeService().remote_upgrade_step6() json_data = request.json
code = json_data.get('code_list')
return UpgradeService().remote_upgrade(project_no, work_shop_no, code)
@api('package_images/<int:log_id>') @api('package_images/<int:log_id>')
def get_package_images(self, log_id): def get_package_images(self, log_id):
...@@ -99,3 +58,12 @@ class UpgradeAPI(ApiController): ...@@ -99,3 +58,12 @@ class UpgradeAPI(ApiController):
:return: :return:
''' '''
return UpgradeService().package_images(log_id) return UpgradeService().package_images(log_id)
@api('compare_image_info/<string:project_no>')
def get_compare_image_info(self, project_no):
'''
获取服务器和本地要推送的镜像对比
:param project_no:
:return:
'''
return UpgradeService().get_compare_image_info(project_no)
This diff is collapsed.
# -*- coding:utf-8 -*-
import os
import timeout_decorator
from huansi_utils.app.apploader import logger
from huansi_utils.exception.exception import HSException
class CommonTools:
@classmethod
def validate_dict_value_all_not_none(cls, dict_data: dict, mapping=None):
'''
验证字典的值全不为空
:return:
'''
if not isinstance(dict_data, dict):
raise HSException('dict_data必须为字典类型')
for key, value in dict_data.items():
if value is None:
raise HSException(f"{key}不能为空")
# 超过四秒,基本上没救了,告辞~~~~
@timeout_decorator.timeout(4)
@classmethod
def test_db_connection(cls, ip: int, port: int, user_name: str, password: str, db_name: str):
'''
测试数据库连接
:param ip:
:param port:
:param user_name:
:param password:
:param db_name:
:param sql:
:return:
'''
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
con_str = f'mssql+pymssql://{user_name}:{password}@{ip}:{port}/{db_name}'
engine = create_engine(con_str)
DBSession = sessionmaker(bind=engine)
try:
session = DBSession()
result = session.execute("select test='huansi'")
data = result.fetchone()
if data.test == 'huansi':
return True
else:
return False
except Exception as e:
orig = getattr(e, 'orig')
args = getattr(orig, 'args', ((),))[0]
error_message = ''
for e_item in args:
if isinstance(e_item, bytes):
try:
e_item = e_item.decode()
except:
e_item = str(e_item)
error_message += str(e_item)
if not error_message:
error_message = str(e)
logger.error("数据库连接失败,失败原因:{}".format(error_message))
return False
@classmethod
def validate_runner_is_install(self):
'''
验证runner是否成功安装
:return:
'''
runner_file_path = '/etc/gitlab-runner/config.toml'
if not os.path.exists(runner_file_path):
raise HSException('没有安装runner')
...@@ -6,7 +6,7 @@ from static_file import temp_file_dir ...@@ -6,7 +6,7 @@ from static_file import temp_file_dir
create_table_sql_list = ['''create table if not exists project_info create_table_sql_list = ['''create table if not exists project_info
(id bigint primary key (id bigint primary key
, project_no varchar(50) , project_no varchar(50) unique
, db_ip varchar(50) , db_ip varchar(50)
, db_port varchar(50) , db_port varchar(50)
, db_user varchar(50) , db_user varchar(50)
...@@ -16,27 +16,41 @@ create_table_sql_list = ['''create table if not exists project_info ...@@ -16,27 +16,41 @@ create_table_sql_list = ['''create table if not exists project_info
, tiip_db_name varchar(100) default '' , tiip_db_name varchar(100) default ''
, right_db_name varchar(100) default '' , right_db_name varchar(100) default ''
, create_time date default (datetime('now','localtime')) , create_time date default (datetime('now','localtime'))
)''', '''create table if not exists app_upgrade_log )'''
, '''create table if not exists app_upgrade_log
(id bigint primary key (id bigint primary key
, status varchar(50) , status varchar(50)
, default_version bit , default_version bit
, upgrade_time date default (datetime('now','localtime')) , upgrade_time date default (datetime('now','localtime'))
, upgrade_no varchar(50) , upgrade_no varchar(50)
)''', '''create table if not exists app_upgrade_log_dtl )'''
, '''create table if not exists app_upgrade_log_dtl
(id integer primary key autoincrement (id integer primary key autoincrement
, log_id bigint , log_id bigint
, app_code varchar(50) , app_code varchar(50)
, app_name varchar(100) , app_name varchar(100)
, app_image_id varchar(200) , app_image_id varchar(200)
, app_image_name varchar(200) , app_image_name varchar(200)
)''', '''create table if not exists remote_server_info )'''
, '''create table if not exists remote_server_info
(id bigint (id bigint
, project_no varchar(50) , project_no varchar(50) unique
, server_ip varchar(50) , server_ip varchar(50)
, server_ssh_port varchar(50) , server_ssh_port varchar(50)
, server_user varchar(50) , server_user varchar(50)
, server_password varchar(50) , server_password varchar(50)
)'''] )'''
, '''create table if not exists remote_server_db_info
(id bigint primary key
, project_no varchar(50)
, work_shop_no varchar(50)
, db_ip varchar(50)
, db_port varchar(50)
, db_user varchar(50)
, db_password varchar(50)
, mes_db_name varchar(100)
)'''
]
# createtabsql1 = "create table if not exists scriptdata(id integer primary key autoincrement, name varchar(128), info varchar(128))" # createtabsql1 = "create table if not exists scriptdata(id integer primary key autoincrement, name varchar(128), info varchar(128))"
...@@ -49,8 +63,6 @@ class DBDriver: ...@@ -49,8 +63,6 @@ class DBDriver:
def cerateDB(self): def cerateDB(self):
self.conn = sqlite3.connect(self.dbfile, check_same_thread=False) self.conn = sqlite3.connect(self.dbfile, check_same_thread=False)
# self.conn.isolation_level = None # self.conn.isolation_level = None
for create_table_sql in create_table_sql_list:
self.conn.execute(create_table_sql) ####create new table
return return
def execDB(self, execsql): def execDB(self, execsql):
...@@ -100,7 +112,7 @@ class DBDriver: ...@@ -100,7 +112,7 @@ class DBDriver:
raise exc_val raise exc_val
else: else:
self.conn.commit() self.conn.commit()
self.conn.close() # self.conn.close()
def format_result(self, res): def format_result(self, res):
item_list = [] item_list = []
......
# -*- coding:utf-8 -*-
import ctypes
import inspect
class ThreadTools:
def _async_raise(self, tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stop_thread(self, thread):
self._async_raise(thread.ident, SystemExit)
import os
import git
from flask import Flask from flask import Flask
from huansi_utils.app.apploader import AppLoaderBase from huansi_utils.app.apploader import AppLoaderBase
from huansi_utils.db.sqlalchemy import db from huansi_utils.db.sqlalchemy import db
from huansi_utils.flask_docs import ApiDoc from huansi_utils.flask_docs import ApiDoc
from flask_config import Config from flask_config import Config
from static_file import temp_file_dir
deploy_repo_path = 'ssh://git@47.110.145.204:2222/hs/deploy.git'
git_path = os.path.join(temp_file_dir, 'deploy')
# 初始化git仓库
if not os.path.exists(os.path.join(git_path, '.git')):
repo = git.Repo.clone_from(url=deploy_repo_path, to_path=git_path)
else:
repo = git.Repo(git_path)
repo.git.pull()
class FlashAppLoader(AppLoaderBase): class FlashAppLoader(AppLoaderBase):
...@@ -12,6 +28,14 @@ class FlashAppLoader(AppLoaderBase): ...@@ -12,6 +28,14 @@ class FlashAppLoader(AppLoaderBase):
app.config.from_object(Config) app.config.from_object(Config)
# 因为之前的基类不规范,到处引用,这样强行将import_name修改成当前文件 # 因为之前的基类不规范,到处引用,这样强行将import_name修改成当前文件
app.import_name = 'flask_app' app.import_name = 'flask_app'
from app.utils.db_tools import db_driver, create_table_sql_list
with db_driver as session:
for create_table_sql in create_table_sql_list:
session.exec_sql(create_table_sql)
# 兼容之前的db配置
from app.conncetion.conncetion_service import ConnectionService
ConnectionService().sync_remote_db_info()
def register_blueprint(self, app): def register_blueprint(self, app):
# 新版本api doc # 新版本api doc
......
import hashlib
import hmac
import time
from collections import OrderedDict
from flask import request
# from flask_config import secretKey
from huansi_utils.exception.exception import HSException
secretKey = '018f162e804f945ee6b23aebfa863639'
def ApiSercurity(*args, **kwargs):
'''
API签名验证
:param kwargs:
:return:
'''
try:
# if __debug__:
# return
if not request:
return
if request.headers.environ.get('HTTP_FROM') == 'api_debug':
return
requestData = request.args
timestamp = requestData.get('timestamp')
if not timestamp or not timestamp.isdigit():
raise Exception("非法请求")
if (int(time.time()) - int(timestamp)) / 60 > 5:
raise Exception("请求已过期")
if not secretKey:
raise Exception("accessToken不正确,或没有设置SecretKey")
# 验证签名
signData = getSignData(request, kwargs)
sign = generateSinature(secretKey, signData)
if sign != requestData.get('signature'):
print(("非法请求:签名校验失败,接收到的签名为:{},本次请求传递的时间戳:{},随机数{},生成的签名:{},待签名数据:{}").format(requestData.get('signature'),
requestData.get('timestamp'),
requestData.get('nonce'),
sign, signData))
raise Exception("非法请求")
except Exception:
raise HSException("签名验证失败")
def generateSinature(sinatureSecertKey: str, singnPlan: str) -> str:
'''
生成加密签名(哈希256算法)
:param sinatureSecertKey: 密钥
:param singnPlan: 密文
:return: 签名
'''
return hmac.new(bytes(sinatureSecertKey, 'utf-8'), bytes(singnPlan, 'utf-8'), hashlib.sha256).hexdigest()
def makeSignPlan(queryStringDict: dict, body: str) -> str:
'''
生成待签名文本
:param queryStringDict: 待签名参数字典
:param body: post和put请求的body
:return: 待签名文本
'''
timestamp = queryStringDict['timestamp']
nonce = queryStringDict['nonce']
signPlan = ''
for key, value in queryStringDict.items():
if key in ["timestamp", "nonce", "accessToken", "signature"]:
continue
signPlan += ('&{}={}').format(key, value)
signPlan = signPlan[1:]
if body:
signPlan += body
signPlan += timestamp
signPlan += nonce
return signPlan
def getSignData(request: request, routeParam: dict) -> str:
'''
获取待签名文本
:param request: 当前请求
:param routeParam:路由参数
:return:待签名文本
'''
temp_Dict = request.args.to_dict()
tempDict = {}
for key, value in temp_Dict.items():
if value:
tempDict[key] = value
if routeParam:
for key, value in routeParam.items():
tempDict[key] = value
# 将url参数和路由参数按key排序放入dict中
_queryStringDict = sorted(tempDict.items(), key=lambda x: x[0])
queryStringDict = OrderedDict()
for item in _queryStringDict:
queryStringDict[item[0]] = item[1]
# 将body字符串化
body = ''
if request.data:
body = request.data.decode(encoding="utf-8", errors="ignore")
# 生成代签名文本
return makeSignPlan(queryStringDict, body)
import json
from functools import wraps
from flask import request, current_app, g
from huansi_utils.exception.exception import HSException
from huansi_utils.webapi import ApiController
def Authorization(func):
@wraps(func)
def decorated_function(*args, **kwargs):
if getattr(func, '__ignore__', False):
g.user = {}
return func(*args, **kwargs)
# 环境变量控制authorize的启动
start_authorize = current_app.config.get('START_AUTHORIZE')
if not start_authorize or str(start_authorize).lower() == 'false':
g.user = {}
return func(*args, **kwargs)
if 'Authorization' in request.headers:
token = request.headers.get('Authorization')
else:
raise HSException('用户没有登录!')
'''
token_verify为hsright中验证token的rpc服务名,可以认为是我们进行API访问的url
message是传递给服务函数的参数,字典格式
app是开启rpc服务的app
response取决于服务方如何返回数据
'''
response = None
if response['code'] == 200:
result = json.loads(response['message'])
g.user_id = result.get('user_id')
g.user = result.get('user')
else:
raise HSException(response['message'])
return func(*args, **kwargs)
# 以下代码演示如何添加一个ApiController的类装饰器,以达到所有方法都应用的目的
if isinstance(func, type) and issubclass(func, ApiController):
cls = func
cls.append_method_decorator(Authorization)
return cls
return decorated_function
def UnAuthorization(func):
'''
不用验证登录的装饰器
:param func:
:return:
'''
setattr(func, '__ignore__', True)
return func
from functools import wraps
from flask import jsonify
from huansi_utils.ActionFilter.ApiSecurityFilter import ApiSercurity
from huansi_utils.webapi import ApiController
def ApiSecurityFilter(func):
'''
签名验证装饰器
:param func:
:return:
'''
@wraps(func)
def inner(*args, **kwargs):
if getattr(func, '__ignore__', False):
return func(*args, **kwargs)
# 线上代码启动用的gunicorn来启动,__debug__还是false
# if __debug__:
# print('debug 不作签名校验')
# return func(*args, **kwargs)
print('签名校验')
result = ApiSercurity(*args, **kwargs)
if result:
if func.__name__ in ['get', 'post', 'delete', 'put']:
return result, 500
else:
return jsonify(result), 500
return func(*args, **kwargs)
# 以下代码演示如何添加一个ApiController的类装饰器,以达到所有方法都应用的目的
if isinstance(func, type) and issubclass(func, ApiController):
cls = func
cls.append_method_decorator(ApiSecurityFilter)
return cls
return inner
def ingorApiSecurityFilter(func):
'''
设置忽略签名的属性用于忽略签名
:param func:
:return:
'''
setattr(func, '__ignore__', True)
return func
def Filter(before_func, after_func):
'''
函数前后过程装饰器
:param before_func:
:param after_func:
:return:
'''
def outer(main_func):
def wrapper(*args, **kwargs):
before_result = before_func(*args, **kwargs)
if before_result:
return before_result
main_result = main_func(*args, **kwargs)
after_result = after_func(*args, **kwargs)
if main_result:
return main_result
if after_result:
return after_result
return wrapper
return outer
import math
from datetime import datetime
from functools import wraps
from flask import request, current_app, g
from huansi_utils.rpc_tools.redis_rpc import redis_log_api_count, redis_log_api_timeout
def api_count(func):
"""
api访问统计计数
:param func:
:return:
"""
@wraps(func)
def decorated_function(*args, **kwargs):
app_code = fullfil_condition()
if app_code is None:
return func(*args, **kwargs)
# 获取路径
api = request.url_rule.rule
now = datetime.strftime(datetime.now(), '%Y%m%d')
key = 'log:api_count:{date}:{app_code}'.format(date=now, app_code=app_code)
tenant_code = get_tenant_code()
redis_log_api_count(key=key, api=api, tenant_code=tenant_code)
return func(*args, **kwargs)
return decorated_function
def api_timeout(func):
"""
api执行时间统计,如果有执行报错则不会写入
:param func:
:return:
"""
@wraps(func)
def decorated_function(*args, **kwargs):
app_code = fullfil_condition()
if app_code is None:
return func(*args, **kwargs)
api = request.url_rule.rule
start_time = datetime.now()
r = func(*args, **kwargs)
end_time = datetime.now()
run_total_millisecond = math.floor((end_time - start_time).total_seconds() * 1000)
now = datetime.strftime(end_time, '%Y%m%d')
key = 'log:api_timeout:{date}:{app_code}'.format(date=now, app_code=app_code)
tenant_code = get_tenant_code()
redis_log_api_timeout(key=key, api=api, run_total_millisecond=run_total_millisecond, tenant_code=tenant_code)
return r
return decorated_function
def fullfil_condition():
# 判断是否配置app_code, 未配置直接跳过,否则记录无意义
app_code = current_app.config.get('APP_CODE')
# 如果配置的rpc代理连接不上,直接跳过
hsrpc_status = current_app.config.get('HSRPC_STATUS', False)
if app_code is not None and hsrpc_status:
return app_code
def get_tenant_code():
return g.user_info.get('tenant', 'huansi')
\ No newline at end of file
from functools import wraps
from flask import request
from huansi_utils.enum.enum import HSSchemaConvertModel
def schema(*view_module):
'''
scheam load json装饰器
:param func:
:return:
'''
def warpper(func):
@wraps(func)
def inner(*args, **kwargs):
print('schema加载json')
schema_data = json_to_schema(view_module).data
setattr(request, 'schema_data', schema_data)
return func(*args, **kwargs)
return inner
return warpper
def schema_result(module):
'''
返回结果序列化
:param module:
:return:
'''
def warpper(func):
@wraps(func)
def inner(*args, **kwargs):
o_res = func(*args, **kwargs)
if isinstance(o_res, list):
n_res = module().load(o_res, many=True)
else:
n_res = module().load(o_res, many=False)
return n_res
return inner
return warpper
class json_to_schema(object):
def __init__(self, view_module: tuple):
# 注册schema到dict中,如果只有一个,直接返回实例,如果多个,返回key为tablename的字典
# self._register_schema(view_module)
self.data = self._schema_load(view_module)
def _schema_load(self, view_module: tuple):
many = False
json_data = request.get_json() if request.get_json() else request.form
if isinstance(json_data, list):
many = True
# save_bill json模式和单表 child模式区别
if len(view_module) > 1 and json_data.get('header'):
return self._json_to_schema(json_data=json_data, view_module=view_module, type=HSSchemaConvertModel.HdrDtl)
elif not many and len(view_module) > 1:
return self._json_to_schema(json_data=json_data, view_module=view_module, type=HSSchemaConvertModel.Child)
elif many and len(view_module) > 1:
result = []
for data in json_data:
child_data = self._json_to_schema(json_data=data, view_module=view_module,
type=HSSchemaConvertModel.Child)
result.append(child_data)
return result
else:
return view_module[0]().new_object(data=json_data, many=many)
def _json_to_schema(self, json_data, view_module, type):
if type == HSSchemaConvertModel.Child:
header_schema_data = view_module[0]().new_object(data=json_data)
for module in view_module:
if module == view_module[0]:
continue
# 递归自己拿到child
child = json_data['child']
child_schema_data = module().new_object(data=child, many=True)
header_schema_data._childs = child_schema_data
return header_schema_data
elif type == HSSchemaConvertModel.HdrDtl:
header_schema_data = view_module[0]().new_object(data=json_data['header']['data'])
for module in view_module:
if module == view_module[0]:
continue
for child in json_data['childs']:
if module.__tablename__ == child['table_name']:
child_schema_data = module().new_object(data=child['data'], many=True)
header_schema_data.set_child(child_schema_data)
return header_schema_data
import logging
import sys
from flask import Flask
from flask_bootstrap import Bootstrap
from flask_logconfig import LogConfig
from huansi_utils.webapi.HSRequest import HSRequest
PYTHON_VERSION = sys.version_info[0]
logger = logging.getLogger(name='myapp')
# 全局数据校验开关
global_validate_switch = True
# 全局token验证开关
# global_verify_token_switch = True
global_verify_token_switch = False
class HSFlask(Flask):
# 注入自定义的HSRequest类
request_class = HSRequest
class AppLoaderBase():
# 暂时不需要单例
# _instance_lock = threading.Lock()
#
# def __new__(cls, *args, **kwargs):
# if not hasattr(AppLoaderBase, "_instance"):
# with AppLoaderBase._instance_lock:
# if not hasattr(AppLoaderBase, "_instance"):
# AppLoaderBase._instance = object.__new__(cls)
# return AppLoaderBase._instance
def __call__(self, *args, **kwargs):
return self.__app
@property
def app(self):
return self.__app
def __init__(self):
self.__app = self.__create_new_app()
def __create_new_app(self):
_app = HSFlask(__name__)
# 禁用信号发送,减少额外内存
_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
_app.url_map.strict_slashes = True
self.init(_app)
LogConfig(_app)
Bootstrap(_app)
self.init_db(_app)
self.register_blueprint(_app)
self.register_webapi(_app)
# self.test_connection(_app)
# 日志处理
# from huansi_utils.common.cache_log import logThread
# from huansi_utils.rpc_tools.redis_rpc import handle_rpc_message
# logThread.start(handle_rpc_message)
return _app
def init_db(self, app):
"""
初始化数据库设置,比如注册orm框架
:param app: 当前创建的Flask app
:return:
"""
pass
def register_blueprint(self, app):
"""
注册blueprint
:param app:当前创建的Flask app
:return:
"""
pass
def register_webapi(self, app):
"""
注册webapi
:param app:当前创建的Flask app
:return:
"""
pass
def init(self, app):
"""
要在初始化时做的其他事情,由子类控制
:param app:当前创建的Flask app
:return:
"""
pass
def test_connection(self, _app):
'''
测试环境变量中的连接是否能正常连接
:param _app:
:return:
'''
# 部署的时候,发现这么一个问题,huansi_nginx程序部署了,所以连接是通的,但是nginx的反向代理没有部署
# 老是导致rpc连接超时,暂时只能检测172.17.0.1:8117
# app_rpc = _app.config.get('RPC_PROXY_URL', None)
app_rpc = '172.17.0.1:8117'
# 不存在直接return
if app_rpc is None:
return
# 加上默认端口80
app_rpc = app_rpc if ':' in app_rpc else app_rpc + ':80'
ip, port = app_rpc.split(':')
# 检测app_rpc为正确的地址
import platform
sysstr = platform.system()
# 检测超时的包用到signal,只能在Unix系统上跑,这里加判断防止在Windows机子上开发时报错
if (sysstr == "Windows"):
return
try:
test_connect_port(ip, port)
_app.config['HSRPC_STATUS'] = True
logger.info(f'RPC_PROXY_URL地址{ip}:{port}连上了')
except Exception:
_app.config['HSRPC_STATUS'] = False
logger.info(f'RPC_PROXY_URL地址{ip}:{port}连不上')
finally:
logger.info(f"_app.config['HSRPC_STATUS']:{_app.config['HSRPC_STATUS']}")
import timeout_decorator
# 超过两秒,基本上没救了,告辞~~~~
@timeout_decorator.timeout(2)
def test_connect_port(ip, port):
'''
检查端口是否开放
:param ip:
:param port:
:return:
'''
import socket
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if not port:
port = 80
try:
sk.connect((ip, int(port)))
sk.shutdown(2)
finally:
sk.close()
This diff is collapsed.
# global_bill_types={54:IM_arrive_service,90:IM_store_service}
global_bill_types = {}
test_cases = []
def register_bill_type(bill_type, cls):
global_bill_types[bill_type] = cls
def register_test_services(cls):
test_cases.append(cls)
def HSRegisterBillType(bill_type=None):
'''
注册单据类型的装饰器
:param bill_type:
:return:
'''
def decorator(func):
cls = func.__new__(func) if func else None
bill_type_id = bill_type if bill_type else cls.bill_type
global_bill_types[bill_type_id] = func
return func
return decorator
from celery import Celery,platforms
from celery_config import CELERY_BROKER_URL
def make_celery():
from flask_app import global_app
celery = Celery(broker=CELERY_BROKER_URL)
celery.config_from_object('celery_config')
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with global_app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery()
platforms.C_FORCE_ROOT = True
import time
import hashlib
import requests
import celery_config
class DefaultAuth:
def __init__(self, to):
self.sync_data_api = celery_config.API[to]['sync_data_api']
try:
self.sync_back_api = celery_config.API[to]['sync_back_api']
except KeyError:
pass
self.key = celery_config.API[to]['auth_key']
self.key_name = celery_config.API[to]['auth_key_name']
def auth(self):
return {'Content-Type': 'application/json'}
def send_data(self, data):
'''
发送数据
:param data:数据包
:return:
'''
headers = {}
headers.update(self.auth())
requests.post(
url=self.sync_back_api if 'state' in data else self.sync_data_api,
headers=headers,
json=data
)
class ErpAuth(DefaultAuth):
def auth(self):
"""
如需要接口认证,可自定义认证规则
:return:
"""
# ha = hashlib.md5(cls.key.encode('utf-8'))
# time_span = time.time()
# ha.update(bytes("%s|%f" % (cls.key, time_span), encoding='utf-8'))
# encryption = ha.hexdigest()
# result = "%s|%f" % (encryption, time_span)
# return {cls.key_name: result, 'Content-Type': 'application/json'}
return {'Content-Type': 'application/json'}
from marshmallow import fields
from huansi_utils.common.schema import HSSchema
import json
# 外部接口调用日志[0]
class PB_InterfaceInvokeLog_schema(HSSchema):
__tablename__ = 'pbInterfaceInvokeLog'
__keyfield__ = 'iIden'
iIden = fields.Integer(allow_none=True)
sFrom = fields.String(allow_none=True, load_from='from')
sTo = fields.String(allow_none=True, load_from='to')
sType = fields.String(allow_none=True, load_from='name')
sKey = fields.String(allow_none=True, load_from='key')
sData = fields.Function(allow_none=True, load_from='data', deserialize=lambda x:json.dumps(x))
tCreateTime = fields.String(load_from='upload_time', function=lambda x:x[:-3])
iExecResult = fields.Integer(allow_none=True, default=0)
sResult = fields.String(allow_none=True, load_from='data')
tEndTime = fields.String(allow_none=True, load_from='time', function=lambda x:x[:-3])
import importlib
from datetime import datetime
from flask import g
from huansi_utils.server.service_uc import HSSingleUCService
from huansi_utils.celery import celery
from huansi_utils.celery.auth import DefaultAuth
from huansi_utils.db.db import new_session
from huansi_utils.common.json import format_json_data
from .pbInterfaceInvokeLog_schema import PB_InterfaceInvokeLog_schema
from celery.worker.request import Request
from flask_app import global_app
import celery_config
class DataPackage:
def __init__(self, data_name, key, data, session, destination):
self.name = data_name
self.key = key
self.upload_time = datetime.now()
self.data = data
self.from_ = 'mes'
self.to = destination
self.session = session
@property
def data_package(self):
'''
打包数据,并格式化数据
:return:
'''
d = self.__dict__.copy()
d.pop('session')
d.update({'from': d.pop('from_')})
return format_json_data(d)
def save(self):
'''
获取到数据后,写入中间表
'''
schema_data = PB_InterfaceInvokeLog_schema().load(self.data_package).data
# 保存到数据库
try:
self.session.begin_trans()
sql = '''
INSERT INTO dbo.pbInterfaceInvokeLog
(sFrom, sTo, sType, sKey, sData, tCreateTime)
VALUES('{sFrom}', '{sTo}', '{sType}', '{sKey}', '{sData}', '{tCreateTime}')'''.format(**schema_data)
self.session.exec_sql(sql)
self.session.commit_trans()
except Exception:
self.session.rollback_trans()
class HsRequest(Request):
'接收到任务先创建一条日志'
def __init__(self, *args, **kwargs):
with global_app.app_context():
self.session = new_session()
self.header = kwargs['headers']
self.args = eval(self.header['kwargsrepr'])
if self.args.get('write_log', True):
self.write_log()
super().__init__(*args, **kwargs)
def write_log(self):
sTaskKey = self.header['id'] # 获取任务key
sTask = self.header.get('task') # 获取任务函数
for key, value in celery_config.CELERYBEAT_SCHEDULE.items():
if value['task'] == sTask and str(value['kwargs']) == self.header['kwargsrepr']:
sTaskName = key # 获取任务名称
break
sArg = self.header['kwargsrepr'].replace("'", '"') # 传递的所有参数
try:
self.session.begin_trans()
sql = '''
INSERT INTO dbo.pbCeleryTaskHistory
(sTaskName, sTask, sArg, sTaskKey)
VALUES('{sTaskName}', '{sTask}', '{sArg}', '{sTaskKey}')'''.\
format(sTaskName=sTaskName,
sTask=sTask,
sArg=sArg,
sTaskKey=sTaskKey)
self.session.exec_sql(sql)
self.session.commit_trans()
except Exception:
self.session.rollback_trans()
class HsTaskService(celery.Task, HSSingleUCService):
abstract=True
Request = HsRequest
def __init__(self):
with global_app.app_context():
self.session = new_session()
g.hs_session = self.session
super().__init__()
def get_auth_class(self, to):
'''
向目标方发送数据时获取认证类定义的认证规则,如未获取到则使用默认类
:return:返回认证类对象
'''
auth_config = celery_config.AUTH_CLASS.get(to, None)
if auth_config is None:
return DefaultAuth(to)
module_name, auth_class_name = auth_config.rsplit('.', 1)
module = importlib.import_module(module_name)
auth_class = getattr(module, auth_class_name, None)
if auth_class is None:
return DefaultAuth(to)
return auth_class(to)
def do_task(self, **kwargs):
key = self.request.id # 获取消息key,这个key是收到任务后celery生成的
self.data_type = kwargs.get('type') # 获取传输的数据类型
to = kwargs.get('to') # 获取数据发送的目标方
sql = '''
declare @sData NVARCHAR(MAX)=0x
declare @sType INT=0
EXEC dbo.sppbInterface_Export @sTypeName='{}',@sData=@sData OUTPUT,@sType=@sType OUTPUT
select @sData, @sType
'''.format(self.data_type)
upload_data = self.sql_data_to_dict(self.query_sql(sql)) # 将sql查询出来的数据转成约定好的数据格式
if isinstance(to, str):
self.send_data_single(to, key, upload_data)
elif isinstance(to, list): # 向多个系统发送数据,进行循环发送
for system in to:
self.send_data_single(system, key, upload_data)
def sql_data_to_dict(self, sql_data):
'''
将sql查询出来的数据转成约定好的数据格式
:param sql_data:
:return:
'''
result = self.dump_data(sql_data)
add, del_ = [], []
for row in result:
if row['is_del'] == '1':
row.pop('is_del')
del_.append(row)
else:
row.pop('is_del')
add.append(row)
return {'add': add, 'del': del_}
def handle_failure(self, exc, to):
try:
self.session.begin_trans()
sql = '''
UPDATE dbo.pbInterfaceInvokeLog
SET iExecResult={},sResult='{}',tEndTime='{tEndTime}'
WHERE sKey='{sKey}' AND sTo='{sTo}'
'''.format(1, str(exc).replace("'", '"'), sKey=str(self.request.id), sTo=str(to), tEndTime=datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
self.session.exec_sql(sql)
self.session.commit_trans()
except Exception:
self.session.rollback_trans()
def send_data_single(self, to, key, upload_data):
'''
打包数据,向其他系统接口转发
:param to: 目标方
:param key: 消息key
:param upload_data: 将要上传的数据
:return:
'''
data_package = DataPackage(self.data_type, key, upload_data, self.session, to)
data_package.save()
auth_class = self.get_auth_class(to)
# 如果在请求过程中出现错误,将错误信息写入日志
try:
auth_class.send_data(data_package.data_package)
except Exception as e:
self.handle_failure(e, to)
def on_failure(self, exc, task_id, args, kwargs, einfo):
'''
任务失败时将错误堆栈信息写到数据库
:param exc:
:param task_id:
:param args:
:param kwargs:
:param einfo:
:return:
'''
if kwargs.get('write_log', True):
try:
self.session.begin_trans()
sql = '''
UPDATE dbo.pbCeleryTaskHistory
SET sErrorMessage='{}', tEndTime='{tEndTime}'
WHERE sTaskKey='{}'
'''.format(str(einfo).replace("'", '"'), task_id,tEndTime=datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
self.session.exec_sql(sql)
self.session.commit_trans()
except Exception:
self.session.rollback_trans()
return super(HsTaskService, self).on_failure(exc, task_id, args, kwargs, einfo)
def on_success(self, retval, task_id, args, kwargs):
if kwargs.get('write_log', True):
try:
self.session.begin_trans()
sql = '''
UPDATE dbo.pbCeleryTaskHistory
SET tEndTime='{tEndTime}'
WHERE sTaskKey='{sTaskKey}'
'''.format(tEndTime=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), sTaskKey=task_id)
self.session.exec_sql(sql)
self.session.commit_trans()
except Exception:
self.session.rollback_trans()
return super(HsTaskService, self).on_success(retval, task_id, args, kwargs)
import time
import logging
import re
class IdWorker(object):
'''
ID生成器
worker_id:工作id
data_center_id:数据中心id
:return long类型的ID
exmple:
from IdWorker import *
id = IdWorker(1,1)
id2 = IdWorker(1,2)
for i in range(100000):
print(id.get_id(),i,id2.get_id())
'''
def __init__(self, worker_id, data_center_id):
self.worker_id = worker_id
self.data_center_id = data_center_id
self.user_agent_parser = re.compile("[a-zA-Z][a-zA-Z\-0-9]*")
self.logger = logging.getLogger("idworker")
# stats
self.ids_generated = 0
self.twepoch = 1288834974657
self.sequence = 0
self.worker_id_bits = 5
self.data_center_id_bits = 5
self.max_worker_id = -1 ^ (-1 << self.worker_id_bits)
self.max_data_center_id = -1 ^ (-1 << self.data_center_id_bits)
self.sequence_bits = 12
self.worker_id_shift = self.sequence_bits
self.data_center_id_shift = self.sequence_bits + self.worker_id_bits
self.timestamp_left_shift = self.sequence_bits + self.worker_id_bits + self.data_center_id_bits
self.sequence_mask = -1 ^ (-1 << self.sequence_bits)
self.last_timestamp = -1
# Sanity check for worker_id
if self.worker_id > self.max_worker_id or self.worker_id < 0:
raise Exception("worker_id", "worker id can't be greater than %i or less than 0" % self.max_worker_id)
if self.data_center_id > self.max_data_center_id or self.data_center_id < 0:
raise Exception("data_center_id",
"data center id can't be greater than %i or less than 0" % self.max_data_center_id)
self.logger.info(
"worker starting. timestamp left shift %d, data center id bits %d, worker id bits %d, sequence bits %d, worker id %d" % (
self.timestamp_left_shift, self.data_center_id_bits, self.worker_id_bits, self.sequence_bits,
self.worker_id))
def _time_gen(self):
return int(time.time() * 1000)
def _till_next_millis(self, last_timestamp):
timestamp = self._time_gen()
while timestamp <= last_timestamp:
timestamp = self._time_gen()
return timestamp
def _next_id(self):
timestamp = self._time_gen()
if self.last_timestamp > timestamp:
self.logger.warning("clock is moving backwards. Rejecting request until %i" % self.last_timestamp)
raise Exception("Clock moved backwards. Refusing to generate id for %i milliseocnds" % self.last_timestamp)
if self.last_timestamp == timestamp:
self.sequence = (self.sequence + 1) & self.sequence_mask
if self.sequence == 0:
timestamp = self._till_next_millis(self.last_timestamp)
else:
self.sequence = 0
self.last_timestamp = timestamp
new_id = ((timestamp - self.twepoch) << self.timestamp_left_shift) | (
self.data_center_id << self.data_center_id_shift) | (
self.worker_id << self.worker_id_shift) | self.sequence
self.ids_generated += 1
return new_id
def _valid_user_agent(self, user_agent):
return self.user_agent_parser.search(user_agent) is not None
def get_worker_id(self):
return self.worker_id
def get_timestamp(self):
return self._time_gen()
def get_id(self):
# if not self._valid_user_agent(useragent):
# pass
new_id = self._next_id()
# self.logger.debug("id: %i user_agent: %s worker_id: %i data_center_id: %i" % (
# new_id, useragent, self.worker_id, self.data_center_id))
return new_id
def get_datacenter_id(self):
return self.data_center_id
import datetime
import math
from queue import Queue
from threading import Thread
class LogCache():
def __init__(self, cache_count=1000):
if cache_count < 0:
cache_count = 0
self.cache_queue = Queue(cache_count)
def put(self, **data):
'''
往队列put数据(队列溢出将会自动清空队列数据,再put)
:param data:
:return:
'''
if self.cache_queue.full():
self.cache_queue.queue.clear()
self.cache_queue.put(data)
class Logger():
def log_start(self, **message):
'''
开始记录日志
:param message:
:return:
'''
message['start_time'] = datetime.datetime.now()
return message
def log_end(self, **message):
'''
结束记录日志
:param message:
:return:
'''
start_time = message.pop('start_time')
end_time = datetime.datetime.now()
diff_seconds = math.floor((end_time - start_time).total_seconds() * 1000)
message['diff_seconds'] = diff_seconds
return message
class LogThread():
def __init__(self, cache_count=1000):
self.log_cache = LogCache(cache_count)
self.logger = Logger()
def handle_message(self, func, queue):
'''
消息处理
:param func:
:param queue:
:return:
'''
while True:
try:
message = queue.get()
if message:
func(**message)
else:
import time
time.sleep(0.1)
except Exception as e:
print('LogThread函数处理失败:', e)
def put(self, **message):
'''
put数据(记入日志)
:param message:
:return:
'''
self.log_cache.put(**message)
def start(self, func):
'''
开始执行
:param func:
:return:
'''
t = Thread(target=self.handle_message, args=(func, self.log_cache.cache_queue))
t.start()
def log_start(self, **message):
return self.logger.log_start(**message)
def log_end(self, **message):
return self.logger.log_end(**message)
logThread = LogThread()
import datetime
import decimal
import json
from flask import jsonify
from flask_sqlalchemy import DefaultMeta
from sqlalchemy.engine.result import RowProxy
from huansi_utils.db.db import get_model_column_names
from huansi_utils.db.sqlalchemy import db
from huansi_utils.exception.exception import HSException
def format_data(data):
'''
格式化数据
:param data: 字段值
:return: 格式化后的数据
'''
if isinstance(data, bool):
return data
elif isinstance(data, datetime.datetime):
data = data.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # SQLserver数据库中毫秒是3位,日期格式;2015-05-12 11:13:58.543
elif isinstance(data, datetime.date):
data = data.strftime("%Y-%m-%d")
elif isinstance(data, decimal.Decimal):
data = float(data)
# elif isinstance(data, int) and len(str(data)) == 19:
elif isinstance(data, int):
data = str(data)
elif isinstance(data, bytes):
data = str(data)
return data
def format_json_data(data):
for key in data:
if isinstance(key, int):
value = data.pop(key)
data[str(key)] = format_data(value)
else:
data[key] = format_data(data[key])
return data
def model_to_json(model, many=True, _jsonify=False):
'''
将model转成json
:param model: 实体对象
:param many: 是否多行实体
:param _jsonify: 是否json序列化
:return: Json数据包
'''
if many:
if not model:
return []
item_list = []
for item in model:
data = model_to_json(item, many=False, _jsonify=False)
item_list.append(data)
return jsonify(item_list) if _jsonify else item_list
elif isinstance(model, dict):
return format_json_data(model)
elif isinstance(model, RowProxy):
RowProxy_data = dict(model)
for key, value in RowProxy_data.items():
RowProxy_data[key] = format_data(RowProxy_data[key])
return RowProxy_data
elif isinstance(model, tuple):
fields = {}
for column in model.keys():
data = getattr(model, column)
if isinstance(data, db.Model):
obj = data
for column in get_model_column_names(obj):
data = getattr(obj, column)
fields[column] = format_data(data)
else:
fields[column] = format_data(data)
return jsonify(fields) if _jsonify else fields
elif isinstance(model, list):
if not model:
return []
try:
return format_json_data(dict(model))
except:
return model
elif type(model).__class__ is DefaultMeta:
if not model:
return {}
fields = {}
for column in get_model_column_names(model):
# 兼容之前的表结构id
if column == 'iIden' or column == 'uGUID':
column = "id"
if column == 'iHdrId':
column = 'bill_id'
data = getattr(model, column)
fields[column] = format_data(data)
return jsonify(fields) if _jsonify else fields
elif hasattr(model, "__dict__"):
# temp_data = model.__dict__
# for key in list(temp_data.keys()):
# if key[:1] == '_':
# del temp_data[key]
return format_json_data(model.__dict__)
else:
return format_data(model)
def sql_data_to_json(data, column_list):
result = []
for line in data:
new_line = format_json_data(dict(zip(column_list, line)))
result.append(new_line)
return result
def json_to_dict(json_data):
return json.loads(s=json_data)
def json_to_object(json_data):
# json_data = '{"id": "007", "name": "007", "age": 28}'
dict = json.loads(s=json_data)
obj = object()
obj.__dict__ = dict
return obj
def dict_to_json(dict):
return json.dumps(dict)
def object_to_json(obj):
dict = obj.__dict__ # 将对象转成dict字典
return json.dumps(obj=dict)
def dict_to_object(dict):
obj = object()
print('dict:', dict)
for key, value in dict.items():
setattr(obj, key, value)
print('new attr:', key, value)
return obj
def group_by(input_list: list, group_by_field: str, is_with_child: bool = False,
is_remove_group_by_field: bool = True) -> list:
'''
按特定的字段分组
:param input_list: 需要分组的list
:param group_by_field: 'id,name,class'
:param is_with_child: 是否带上明细数据 属性名child
:param is_remove_group_by_field: 是否排除child中排序的字段
:return:
'''
field_list = group_by_field.split(',')
_output_dict = {}
for input in input_list:
key = '|||'.join([str(input[item]) for item in field_list])
_input = {}
for field in field_list:
try:
_input[field] = input[field]
if is_remove_group_by_field:
del input[field]
except KeyError:
raise HSException('{}找不到'.format(field))
item = _output_dict.get(key, None)
if not item:
_output_dict[key] = _input
item2 = _output_dict.get(key, None)
if is_with_child:
if item2.get('child', None) is None:
item2['child'] = []
item2['child'].append(input)
return [value for value in _output_dict.values()]
def merge_dict_by_link(dicts: list, child_dicts: list, link: str, attribute: str = 'item_data') -> list:
'''
合并字典,通过link关系 连接两个字典,属性设置为attribute
:param dicts:被插入的字典
:param child_dicts:插入的字典
:param link:关系key 'id=bill_id'
:param attribute: 属性,匹配成功后再dicts中新增的属性。 默认为item_data
:return:
'''
o_link = link.split('=')[0]
i_link = link.split('=')[1]
for dict in dicts:
dict[attribute] = []
for insert_dict in child_dicts:
if insert_dict[i_link] == dict[o_link]:
dict[attribute].append(insert_dict)
return dicts
def merge_dict_by_link2(dicts: list, child_dicts: list, link: str, attribute: str = 'item_data') -> list:
'''
合并字典,通过link关系 连接两个字典,属性设置为attribute
:param dicts:被插入的字典
:param child_dicts:插入的字典
:param link:关系key 'id=bill_id'
:param attribute: 属性,匹配成功后再dicts中新增的属性。 默认为item_data
:return:
'''
o_link = link.split('=')[0]
i_link = link.split('=')[1]
l_link = link.split('=')[2]
for dict in dicts:
dict[attribute] = []
for insert_dict in child_dicts:
if insert_dict[i_link] == dict[o_link] and insert_dict[l_link] == dict[l_link]:
dict[attribute].append(insert_dict)
return dicts
def dict_to_tree_by_link(dicts: list, link: str = 'id=parent_id', attribute: str = 'childs',
parent_id: int = 0) -> list:
'''
通过字典中的映射关系自动转换为树形结构
:param dicts: 需要转换的字典
:param link: 关系字段
:param attribute: 关联之后的属性
:param parent_id: 父级id
:return:
'''
self = link.split('=')[0]
parent = link.split('=')[1]
temp_dicts = []
for dict in dicts:
if str(dict[parent]) == str(parent_id):
temp_dicts.append(dict)
if not temp_dicts:
return
results = []
for temp_dict in temp_dicts:
temp_dict[attribute] = dict_to_tree_by_link(dicts=dicts, link=link, attribute=attribute,
parent_id=temp_dict[self])
results.append(temp_dict)
return results
"""
改写了sqlalchemy的logger机制,实现自定义功能
1、优化了sql语句的输出
2、debug_sql模式下,能返回完整的sql
"""
import logging
import sys
from flask import g
def _add_default_handler(logger):
handler = logging.StreamHandler(sys.stdout)
# handler.setFormatter(logging.Formatter(
# '%(asctime)s %(levelname)s %(name)s %(message)s'))
handler.setFormatter(logging.Formatter(
'%(message)s'))
logger.addHandler(handler)
logger.propagate = False
class InstanceLogger(object):
"""A logger adapter (wrapper) for :class:`.Identified` subclasses.
This allows multiple instances (e.g. Engine or Pool instances)
to share a logger, but have its verbosity controlled on a
per-instance basis.
The basic functionality is to return a logging level
which is based on an instance's echo setting.
Default implementation is:
'debug' -> logging.DEBUG
True -> logging.INFO
False -> Effective level of underlying logger
(logging.WARNING by default)
None -> same as False
"""
# Map echo settings to logger levels
_echo_map = {
None: logging.NOTSET,
False: logging.NOTSET,
True: logging.INFO,
'debug': logging.DEBUG,
}
def __init__(self, echo, name):
# logger.debug('实例化')
self.echo = echo
self.logger = logging.getLogger(name)
# if echo flag is enabled and no handlers,
# add a handler to the list
if self._echo_map[echo] <= logging.INFO \
and not self.logger.handlers:
_add_default_handler(self.logger)
#
# Boilerplate convenience methods
#
def debug(self, msg, *args, **kwargs):
"""Delegate a debug call to the underlying logger."""
self.log(logging.DEBUG, msg, *args, **kwargs)
def info(self, msg, *args, **kwargs):
"""Delegate an info call to the underlying logger."""
# 改写sqlalchemy的记录日志
# 判断msg是否为%r 是表示为参数 args是值
# 如果是参数,则替换上一个log的sql里面的参数
if getattr(g, 'sql', None) is None:
return
if msg == '%r':
last_sql = str(g.sql.pop())
if isinstance(args[0], dict):
params = args[0]
else:
params = args[0].params
# 多条更新语句会以tuple形式传入
if isinstance(params, tuple):
for param in params:
for key, value in param.items():
g.sql.append(last_sql.replace('%({})s'.format(key), "'{}'".format(str(value))))
else:
for key, value in params.items():
last_sql = last_sql.replace('%({})s'.format(key), "'{}'".format(str(value)))
g.sql.append(last_sql)
else:
g.sql.append(msg)
# 只有确定打印日志,才会显示日志
from flask import current_app
if current_app.config.get('GLOBAL_SQL_ECHO', False):
self.log(logging.INFO, msg, *args, **kwargs)
def warning(self, msg, *args, **kwargs):
"""Delegate a warning call to the underlying logger."""
self.log(logging.WARNING, msg, *args, **kwargs)
warn = warning
def error(self, msg, *args, **kwargs):
"""
Delegate an error call to the underlying logger.
"""
self.log(logging.ERROR, msg, *args, **kwargs)
def exception(self, msg, *args, **kwargs):
"""Delegate an exception call to the underlying logger."""
kwargs["exc_info"] = 1
self.log(logging.ERROR, msg, *args, **kwargs)
def critical(self, msg, *args, **kwargs):
"""Delegate a critical call to the underlying logger."""
self.log(logging.CRITICAL, msg, *args, **kwargs)
def log(self, level, msg, *args, **kwargs):
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
flag as well as that of the underlying logger, and
logger._log() is called directly.
"""
# inline the logic from isEnabledFor(),
# getEffectiveLevel(), to avoid overhead.
if self.logger.manager.disable >= level:
return
selected_level = self._echo_map[self.echo]
if selected_level == logging.NOTSET:
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
self.logger._log(level, msg, args, **kwargs)
def isEnabledFor(self, level):
"""Is this logger enabled for level 'level'?"""
if self.logger.manager.disable >= level:
return False
return level >= self.getEffectiveLevel()
def getEffectiveLevel(self):
"""What's the effective level for this logger?"""
level = self._echo_map[self.echo]
if level == logging.NOTSET:
level = self.logger.getEffectiveLevel()
return level
import os
def dynamic_import(import_prefix, root_dir, file_suffix=".py"):
"""
动态导入模块
:param import_prefix:导入模块的前缀
:param root_dir:扫描的根目录
:param file_suffix:文件后缀
:return:
"""
def suffix_is_match(file):
if isinstance(file_suffix, str):
return file.lower().endswith(file_suffix.lower())
return any(filter(lambda x: file.lower().endswith(x.lower()), file_suffix))
for root, dirs, files in os.walk(root_dir):
# logger.debug('路径{}'.format(root_dir))
for file in files:
# logger.debug('{},{},{}'.format(file.startswith("__"), 111, suffix_is_match(file)))
if file.startswith("__") or not suffix_is_match(file):
continue
module_name = "{}.{}".format(import_prefix, os.path.splitext(file)[0])
# logger.debug('导入前{}'.format(module_name))
__import__(module_name)
# logger.debug('导入后{}'.format(module_name))
# 递归调用
for _dir in dirs:
# .开头的文件夹不处理和utils的文件夹不作处理
if _dir == "__pycache__" or _dir.startswith(".") or _dir == 'huansi_utils':
continue
dynamic_import(import_prefix + '.' + _dir, os.path.join(root, _dir), file_suffix)
# 由于自己递归了,所以不再往下递归
break
class HSObject(object):
def __new__(cls, *args, **kwargs):
return super(HSObject, cls).__new__(cls, *args, **kwargs)
import redis
from datetime import datetime
class RedisCache(object):
""" Redis操作函数 """
def connect(self, host, port, db, password):
pool = redis.ConnectionPool(host=host,
port=port,
db=db,
password=password,
decode_responses=True)
self.r = redis.Redis(connection_pool=pool)
self.pipe = self.r.pipeline()
def set(self, key, value, expired=None):
if expired and isinstance(expired, datetime):
dif = expired - datetime.now()
seconds = int(dif.total_seconds())
if seconds < 0:
seconds = 0
expired = seconds
elif isinstance(expired, int):
pass
else:
expired = None
self.r.set(key, value, ex=expired)
def mset(self, *args, **kwargs):
'''
批量设置值
r.mset(name1='zhangsan', name2='lisi')
r.mset({"name1":'zhangsan', "name2":'lisi'})
:param key:
:param value:
:return:
'''
self.r.mset(*args, **kwargs)
def get(self, key):
'''
获取值
:param key:
:return:
'''
return self.r.get(key)
def mget(self, keys, *args):
'''
#批量获取
r.mget("name1","name2")
#或
li=["name1","name2"]
r.mget(li)
:param key:
:param value:
:return:
'''
return self.r.mget(keys, *args)
def getset(self, key, value):
'''
设置新值,打印旧值
:param key:
:param value:
:return:
'''
return self.r.getset(key, value)
def ttl(self, key):
'''
返回key到期之前的秒数
:param key:
:return:
'''
return self.r.ttl(key)
def expire(self, key, time):
'''
给key设置一个到期的秒数
:param key:
:param time:
:return:
'''
return self.r.expire(key, time)
def keys(self, pattern='*'):
'''
返回与``pattern``匹配的键列表
:param pattern:
:return:
'''
return self.r.keys(pattern)
def flushdb(self):
'''
清空数据库
:return:
'''
return self.r.flushdb()
def get_key_value(self, pattern='*'):
'''
返回与``pattern``匹配的键值对
:param pattern:
:return:
'''
_dict = {}
keys = self.keys(pattern)
for item in keys:
self.pipe.get(item)
for key, value in zip(keys, self.pipe.execute()):
_dict[key] = value
return _dict
def sadd(self, name, value):
return self.r.sadd(name, value)
def smembers(self, name):
return self.r.smembers(name)
def delete(self, name):
self.r.delete(name)
def delete_all(self):
for key in self.keys():
self.delete(key)
hs_redis = RedisCache()
This diff is collapsed.
import hashlib
import hmac
import random
import time
from urllib.parse import quote
# from flask_config import secretKey
secretKey = '018f162e804f945ee6b23aebfa863639'
class HSSignatureUrl:
'''
HSSignatureUrl(url,route_param,query_param,body_param).url
routerParms={
route_url:':name/:age',
data:{
name:'001',age:'18'
}
}
'''
def __init__(self, url, route_param, query_param, body_param):
'''
签名验证
:param url: url
:param route_param: 路由参数
:param query_param: 问号参数
:param body_param: body参数
'''
url = url if url[-1:] == '/' else url + '/'
self.url = ''
self.param_dict = {}
# 获取路由参数url
self._get_header_url(url, route_param)
# 获取签名后url
self._get_signature_url(query_param, body_param)
def _get_header_url(self, url, route_param):
'''
获取不带参数的url
:param url: url
:param route_param: 路由参数
:return:
'''
if not route_param:
self.url = url
return
param_data = route_param['data']
route_url = str(route_param['route_url'])
# _route_url = ''
for key in param_data.keys():
route_url = route_url.replace(':' + key, str(quote(param_data[key])))
self.param_dict[key] = param_data[key]
self.url = url + route_url if self.url[-1:] == '/' else url + route_url + '/'
def _get_signature_url(self, query_param=None, body_param=None):
'''
获取签名后的url
:param query_param:
:param body_param:
:return:
'''
_temp_param_url = ''
if query_param:
for key, value in query_param.items():
_temp_param_url += '&{}={}'.format(key, quote(str(value)))
self.param_dict[key] = value
time_stamp = self.get_time_stamp()
nonce = self.get_random(5)
signature_txt = self.make_signature(self.param_dict, body_param, time_stamp, nonce)
signature = self.generateSinature(secretKey, signature_txt)
self.url = '{}?timestamp={}&nonce={}&signature={}{}'.format(self.url, time_stamp, nonce, signature,
_temp_param_url)
@staticmethod
def get_time_stamp() -> str:
'''
生成时间戳
:return:
'''
return str(int(time.time()))
@staticmethod
def get_random(len) -> str:
'''
生成随机数
:param len: 随机数位数
:return:
'''
return str(int(random.uniform(0, 1) * 10 ** int(len)))
@staticmethod
def make_signature(param_dict, body_param, time_stamp, nonce):
'''
待签名文本生成
:param param_dict: 参数字段
:param body_param: body参数
:param time_stamp: 时间戳
:param nonce: 随机数
:return:
'''
_param_txt = ''
_dict_key = sorted(param_dict.keys())
for key in _dict_key:
_param_txt += '{}={}&'.format(key, param_dict[key])
if len(_param_txt) > 1:
_param_txt = _param_txt[:-1]
if body_param:
_param_txt += str(body_param)
_param_txt += str(time_stamp) + str(nonce)
return _param_txt
@staticmethod
def generateSinature(secretKey, singnPlan) -> str:
'''
生成加密签名(哈希256算法)
:param secretKey: 密钥
:param singnPlan: 密文
:return: 签名
'''
return hmac.new(bytes(secretKey, 'utf-8'), bytes(singnPlan, 'utf-8'), hashlib.sha256).hexdigest()
import base64
import gzip
import time
# 拆分字符串
# 按指定字符分解字符串
# </summary>
# str:原字符串
# p1:排除范围的前置字符
# p2:排除范围的后置字符
# splitor:分隔符
# bIncludeSplitor:是否包含分隔符
# sNotIn:排除范围的前后置字符数组
# "AA(a,b),BB(c,d)".m_Split(",","(",")")==>AA(a,b)|BB(c,d)
# <returns>如不在{}之内的逗号分隔</returns>
# @staticmethod
def split_string(str, splitor=',', p1=None, p2=None, bIncludeSplitor=False, bIncludeEmpty=False, sNotIn=[]):
return str.strip(',').split(splitor)
def str_to_int(str, default=None):
if isinstance(str, int):
return str
elif not str:
return default
try:
if isinstance(str, int) is False:
return str
value = int(str)
except Exception as e:
value = default
return value
def get_bill_date():
'''
获取当前时间的%Y-%m-%d格式
:return:
'''
return time.strftime('%Y-%m-%d', time.localtime(time.time()))
def before_string(str: str, start: str = ''):
'''
从源字符串获取指定字符串之前的字符串
:param str: 源字符串
:param start: 指定字符串(开始字符串)
:param i_start: 从i_start 下标开始查找开始字符串的位置
:return:
'''
res = str
if str and start and len(start) > 0:
str_find = str.find(start)
if str_find != -1:
res = str[:str_find]
return res
def after_string(str, start='', i_start=0):
'''
从源字符串获取指定字符串之后的字符串
:param str: 源字符串
:param start: 指定字符串(开始字符串)
:param i_start: 从i_start 下标开始查找开始字符串的位置
:return:
'''
res = str
if str and start and len(start) > 0:
str_find = str.find(start, i_start)
if str_find != -1:
res = str[str_find + len(start):]
return res
def between_string(str, start='', end='', i_start=0):
"""
获取源字符串的中间字符串
:param str: 源字符串
:param start: 开始字符串
:param end: 结束字符串
:param i_start: 从i_start 下标开始查找开始字符串的位置
:param i_end: 从开始字符串位置+i_end 开始查找结束字符串的位置
:return: 目标字符串
"""
if not str:
str = ""
result = str
if (not start or start == "") and (not end or end == ""):
pass
elif not start or start == "":
result = before_string(str, end)
elif not end or end == "":
result = after_string(str, start, i_start)
else:
iStart = str.find(start, i_start) + 1
# i_start = iStart + len(str)
iEnd = str.find(end, i_start)
if iStart < 0 or iEnd < 0:
result = ''
else:
result = str[iStart:iEnd]
return result
def compress_string(s):
'''
压缩字符串
:param s:
:return:
'''
compress_data = gzip.compress(s.encode('utf-16-le'))
base64_str = base64.b64encode(compress_data).decode()
return base64_str
def decompress_string(s):
'''
解压字符串
:param s:
:return:
'''
compress_data = base64.b64decode(s)
new_str = gzip.decompress(compress_data).decode('utf-16-le')
return new_str
import uuid
from flask import g
from huansi_utils.db.sqlalchemy import db
from huansi_utils.common.IdWorker import IdWorker
from huansi_utils.common.logger import InstanceLogger
from huansi_utils.db.dbsession import HSDBSession
from huansi_utils.db.multi_db import MultiDBEngineManager
'''id生成器规则'''
idWorker = IdWorker(9, 9)
'''自定义logger'''
hs_logger = InstanceLogger(True, 'sqlalchemy.engine.base.Engine')
'''session'''
hs_session = db.create_scoped_session(options={'autocommit': False, 'autoflush': False})
# 多数据库管理引擎
multi_db_manager = MultiDBEngineManager(db)
def change_database(db_key: str):
multi_db_manager.change_database(db_key, hs_session)
def new_session(begin=False, total_rollback_count=1, db_key: str = ""):
change_database(db_key)
session = HSDBSession(db_session=hs_session, total_rollback_count=total_rollback_count)
if begin:
session.begin_trans()
# 注入自定义logger
hs_logger.echo = True
session.db_session.bind.logger = hs_logger
return session
def get_session():
return db.create_scoped_session(options={
'autocommit': False,
'autoflush': False
})
def new_id():
return idWorker.get_id()
def new_id_by_sql():
sql = 'SELECT id=dbo.fnpbNextId()'
return hs_session.execute(sql).first().id
def new_guid():
return uuid.uuid1()
def new_bill_no(formula_name=None, formula_id=None):
return new_bill_no_by_sql(formula_name, formula_id)
def db_session():
return g.hs_session
def new_bill_no_by_sql(formula_name=None, formula_id=None):
if not formula_id:
formula_id = 0
if not formula_name:
formula_name = ''
sql = '''DECLARE @sBillNo NVARCHAR(20)=0x
EXEC dbo.sppbNewBillNo @iFormulaId={0},@sFormulaName='{1}',@sNewbillNo=@sBillNo OUTPUT
SELECT sBillNo=@sBillNo
'''.format(formula_id, formula_name)
# print('formulaid=', formula_id, 'formula_name=', formula_name, 'sql=', sql)
try:
data = db_session().retrive_sql(sql)
data = data.sBillNo if data else ''
return str(data)
except Exception as e:
print('生成单据号出错,sql=', sql)
print(e)
raise
def get_model_column_names(model):
'''
返回model的所有列名
:param model: 实体对象
:return: 列名的元组,如('id','bill_no')
'''
if hasattr(model, '_fields'):
return model._fields
else:
return tuple([v.name for v in model.__table__.columns])
def get_model_columns(model):
'''
返回model的列信息(列名+列类型)
:param model: 实体对象
:return: 如[{'id': <class 'str'>}, {'bill_no': <class 'str'>}]
或[{'id': BigInteger()}, {'bill_date': Date()}, {'bill_status': Integer()}, {'creator_name': NVARCHAR(length=20)}]
'''
if hasattr(model, '_fields'):
return [{v: type(v)} for v in model._fields]
else:
return [{v.name: v.type} for v in model.__table__.columns]
def get_model_all_Columns(model):
'''
返回model的所有列信息,仅按类查询时才有效
:param model: 实体对象
:return: 所有列对象列表[Column(),Column()]
'''
return model.__table__.columns if hasattr(model, '__table__') else None
def get_model_class_columns(cls, columns):
# return IM_arrive.provider_name
return tuple([getattr(cls, column) for column in columns])
def model_load_json_data(model, json_data, only_set_null_column=True, ignore_columns=None):
'''
从json对象中装载数据到实体中
:param model: 实体对象
:param json_data: json对象
:param only_set_null_column: 是否仅设置空值列
:param ignore_columns: 忽略字段列表,如'id,create_time'
:return: model
'''
if not json_data:
return model
columns = get_model_column_names(model)
if ignore_columns:
ignore_columns = ignore_columns.strip(',').split(',')
columns = set(columns).difference(ignore_columns)
columns = set(json_data.keys()).intersection(columns)
for column in columns:
if not hasattr(model, column):
continue
old_value = getattr(model, column)
new_value = json_data.get(column)
if not only_set_null_column or old_value is None:
setattr(model, column, new_value)
return model
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment