Commit a0345752 authored by jinkaiqiang's avatar jinkaiqiang

脚本

parent 40f4cd38
# -*- coding:utf-8 -*-
import datetime
import re
from NginxLog.DBDriver import DBDriver
def get_cell_data(row_data):
re_result = re.match(
r'^(?P<ip>\d+.\d+.\d+.\d+) - - \[(?P<time>.*) \+0800\] "(?P<url>.*)" (?P<status>\d{3}) .*',
row_data)
if re_result is None:
print(row_data)
return ["", "", "", ""]
ip = re_result.group("ip")
access_time = str(datetime.datetime.strptime(re_result.group("time"), '%d/%b/%Y:%H:%M:%S'))
url = re_result.group("url")
status = re_result.group("status")
# cell_data = {"访问ip": ip, "访问时间": access_time, "访问地址": url, "访问状态": status}
cell_data = [ip, access_time, url, status]
# print(cell_data)
return cell_data
if __name__ == '__main__':
file_path = r"D:\Work\nginx-1.19.0\logs\access-2021-04-27T09.log"
db_driver = DBDriver("mysql+pymysql://jkq:741963@localhost:3306/test?charset=utf8")
lines = 0
insert_count = 0
sql_str_list = []
with open(file_path, 'r') as f:
while lines < 1000000:
data = f.readline()
if data is None:
print("exit:", lines)
break
cell_data = get_cell_data(data)
# 拼接insert 字符串
insert_sql_header = f"""insert into nginx_log
(ip,url,access_time,status)
values
"""
url = cell_data[2].replace("'", "''")
sql_str_list.append(f"('{cell_data[0]}','{url}','{cell_data[1]}','{cell_data[3]}')")
insert_count += 1
# 100条执行一次
if insert_count == 100:
try:
# print(insert_sql_header + ",\n".join(sql_str_list))
db_driver.session.execute(insert_sql_header + ",\n".join(sql_str_list))
db_driver.session.commit()
sql_str_list = []
insert_count = 0
except Exception as e:
print(e)
print(insert_sql_header + ",\n".join(sql_str_list))
lines += 1
print(lines)
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
class DBDriver:
def __init__(self, connect_str: str):
engine = create_engine(connect_str)
self.DBSession = sessionmaker(bind=engine)
self.new_session()
def new_session(self):
self.session = self.DBSession()
return self.session
def query_all(self, sql, **kwargs):
if getattr(self, "session", None) is None or self.session.is_active is False:
self.new_session()
result = self.session.execute(sql, **kwargs).fetchall()
return result
def query_one(self, sql, **kwargs):
if getattr(self, "session", None) is None or self.session.is_active is False:
self.new_session()
result = self.session.execute(sql, **kwargs).fetchone()
return result
def execute(self, sql, **kwargs):
if getattr(self, "session", None) is None or self.session.is_active is False:
self.new_session()
self.session.execute(sql, **kwargs)
# -*- coding:utf-8 -*-
import os
if __name__ == '__main__':
file_path = r"D:\Work\nginx-1.19.0\logs\access-2021-04-27T09.log"
# 查看文件大小
file_size = os.path.getsize(file_path)
print(file_size)
print(file_size / 1024 / 1024 / 1024, "G")
# 100m大小切割为一个文件
file_count = int(file_size / (100 * 1024 * 1024)) + 1
print(file_count)
file_number = 1
with open(file_path, 'r') as rf:
chunk_number = 0
while True:
temp_file_name = "file" + str(file_number) + ".log"
# 每次读取1m
with open(temp_file_name, 'a+') as wf:
chunk_data = rf.read(1024 * 1024)
# 读取出来的没有数据了,就跳出
if chunk_data is None:
break
wf.write(chunk_data)
chunk_number += 1
# 10次之后切换下一个文件
if chunk_number >= 100:
file_number += 1
chunk_number = 0
# 大于分配的文件数量也跳出
if file_number > file_count:
break
# -*- coding:utf-8 -*-
\ No newline at end of file
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
class DBDriver:
def __init__(self, connect_str: str):
engine = create_engine(connect_str)
self.DBSession = sessionmaker(bind=engine)
self.new_session()
def new_session(self):
self.session = self.DBSession()
return self.session
def query_all(self, sql, **kwargs):
if getattr(self, "session", None) is None or self.session.is_active is False:
self.new_session()
result = self.session.execute(sql, **kwargs).fetchall()
return result
def query_one(self, sql, **kwargs):
if getattr(self, "session", None) is None or self.session.is_active is False:
self.new_session()
result = self.session.execute(sql, **kwargs).fetchone()
return result
def execute(self, sql, **kwargs):
if getattr(self, "session", None) is None or self.session.is_active is False:
self.new_session()
self.session.execute(sql, **kwargs)
# -*- coding:utf-8 -*-
import os
class Config:
# 1、mysql连接信息
MYSQL_CONNECTION = os.getenv("MYSQL_CONNECTION",
"mysql+pymysql://root:huansi.net@47.110.145.204:30619/jkq_novel?charset=utf8")
# 2、mssql连接信息
MSSQL_CONNECTION = os.getenv("MSSQL_CONNECTION", "mssql+pymssql://huansi:huansi@47.97.206.38:9610/HsGmtMes")
# 3、偏差时间
DEVIATION_TIME = os.getenv("DEVIATION_TIME", "120")
# 4、同步表脚本
TABLE_IM_ARRIVE = """CREATE TABLE IF NOT EXISTS im_arrive
(
id BIGINT AUTO_INCREMENT PRIMARY KEY,
bill_no VARCHAR(20) NOT NULL,
bill_date DATE NOT NULL,
bill_status INT DEFAULT 0 NOT NULL,
create_time DATETIME NOT NULL,
material_category INT DEFAULT 16 NOT NULL,
customer_name VARCHAR(50) DEFAULT '' NOT NULL,
provider_name VARCHAR(50) DEFAULT '' NOT NULL
)"""
# -*- coding:utf-8 -*-
import datetime
import logging
import random
import time
import schedule
from SqlServerToMysql.EnvGetter import Config
from SqlServerToMysql.MySqlDriver import MySqlDriver
from SqlServerToMysql.SqlServerDriver import SqlServerDriver
logger = logging.getLogger(__name__)
formatter = logging.Formatter('[%(asctime)s] - [%(levelname)s] - [%(message)s]')
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.setLevel(logging.INFO)
def sysnc_db_task():
logger.info(f"sysnc_db_task starting:{datetime.datetime.now()}")
# threading.Thread(target=sysnc_db).start()
sysnc_db()
logger.info(f"sysnc_db_task ending:{datetime.datetime.now()}")
def sysnc_db():
# 动态延迟
devitation_time = Config.DEVIATION_TIME
try:
devitation_time = int(devitation_time)
except Exception as e:
devitation_time = 120
# (0-配置时间)之间
devitation_second = random.randint(0, devitation_time)
logger.info(f"本次执行延迟时间:{devitation_second}")
time.sleep(devitation_second)
mysql_driver = MySqlDriver(Config.MYSQL_CONNECTION)
mssql_driver = SqlServerDriver(Config.MSSQL_CONNECTION)
try:
# 1、获取环境变量中配置的表
table_im_arrive_str = Config.TABLE_IM_ARRIVE
# 2、从SQLserver数据库中获取数据
sqlserver_data_list = mssql_driver.query_all("""SELECT TOP 10 id,bill_no,bill_date,bill_status,create_time
,material_category,customer_name,provider_name
FROM dbo.im_arrive(NOLOCK)
WHERE id>0
ORDER BY id DESC""")
# 3、往Mysql中插入数据
mysql_driver.execute(table_im_arrive_str)
# 3.1、生成新增语句
for sqlserver_data in sqlserver_data_list:
sql = f'''INSERT INTO im_arrive
(`bill_no`,`bill_date`,`bill_status`,`create_time`,`material_category`,`customer_name`,`provider_name`)
VALUES
("{sqlserver_data.bill_no}","{sqlserver_data.bill_date}","{sqlserver_data.bill_status}",NOW()
,"{sqlserver_data.material_category}","{sqlserver_data.customer_name}","{sqlserver_data.provider_name}");'''
# print(sql)
mysql_driver.execute(sql)
# 3.2、删除15天前的数据
delete_sql = f'''DELETE FROM im_arrive
WHERE create_time < "{datetime.date.today() - datetime.timedelta(days=15)}"'''
mysql_driver.execute(delete_sql)
mysql_driver.session.commit()
finally:
# 4、close session
mysql_driver.session.close()
mssql_driver.session.close()
def run():
# 每10到15分钟执行一次
schedule.every(10).to(15).minutes.do(sysnc_db_task)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == '__main__':
logger.info("任务开始...")
run()
# -*- coding:utf-8 -*-
from SqlServerToMysql.DBDriver import DBDriver
class MySqlDriver(DBDriver):
pass
\ No newline at end of file
# -*- coding:utf-8 -*-
from SqlServerToMysql.DBDriver import DBDriver
class SqlServerDriver(DBDriver):
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment