ApiSecurityFilter.py 3.54 KB
Newer Older
金凯强's avatar
金凯强 committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
import hashlib
import hmac
import time
from collections import OrderedDict

from flask import request

# from flask_config import secretKey
from huansi_utils.exception.exception import HSException

secretKey = '018f162e804f945ee6b23aebfa863639'

def ApiSercurity(*args, **kwargs):
    '''
    API签名验证
    :param kwargs:
    :return:
    '''
    try:
        # if __debug__:
        #     return
        if not request:
            return
        if request.headers.environ.get('HTTP_FROM') == 'api_debug':
            return
        requestData = request.args
        timestamp = requestData.get('timestamp')
        if not timestamp or not timestamp.isdigit():
            raise Exception("非法请求")
        if (int(time.time()) - int(timestamp)) / 60 > 5:
            raise Exception("请求已过期")
        if not secretKey:
            raise Exception("accessToken不正确,或没有设置SecretKey")
        # 验证签名
        signData = getSignData(request, kwargs)
        sign = generateSinature(secretKey, signData)
        if sign != requestData.get('signature'):
            print(("非法请求:签名校验失败,接收到的签名为:{},本次请求传递的时间戳:{},随机数{},生成的签名:{},待签名数据:{}").format(requestData.get('signature'),
                                                                                          requestData.get('timestamp'),
                                                                                          requestData.get('nonce'),
                                                                                          sign, signData))
            raise Exception("非法请求")
    except Exception:
        raise HSException("签名验证失败")


def generateSinature(sinatureSecertKey: str, singnPlan: str) -> str:
    '''
    生成加密签名(哈希256算法)
    :param sinatureSecertKey: 密钥
    :param singnPlan: 密文
    :return: 签名
    '''
    return hmac.new(bytes(sinatureSecertKey, 'utf-8'), bytes(singnPlan, 'utf-8'), hashlib.sha256).hexdigest()


def makeSignPlan(queryStringDict: dict, body: str) -> str:
    '''
    生成待签名文本
    :param queryStringDict: 待签名参数字典
    :param body: post和put请求的body
    :return: 待签名文本
    '''
    timestamp = queryStringDict['timestamp']
    nonce = queryStringDict['nonce']
    signPlan = ''
    for key, value in queryStringDict.items():
        if key in ["timestamp", "nonce", "accessToken", "signature"]:
            continue
        signPlan += ('&{}={}').format(key, value)
    signPlan = signPlan[1:]
    if body:
        signPlan += body
    signPlan += timestamp
    signPlan += nonce
    return signPlan


def getSignData(request: request, routeParam: dict) -> str:
    '''
    获取待签名文本
    :param request: 当前请求
    :param routeParam:路由参数
    :return:待签名文本
    '''
    temp_Dict = request.args.to_dict()
    tempDict = {}
    for key, value in temp_Dict.items():
        if value:
            tempDict[key] = value
    if routeParam:
        for key, value in routeParam.items():
            tempDict[key] = value
    # 将url参数和路由参数按key排序放入dict中
    _queryStringDict = sorted(tempDict.items(), key=lambda x: x[0])
    queryStringDict = OrderedDict()
    for item in _queryStringDict:
        queryStringDict[item[0]] = item[1]
    # 将body字符串化
    body = ''
    if request.data:
        body = request.data.decode(encoding="utf-8", errors="ignore")
    # 生成代签名文本
    return makeSignPlan(queryStringDict, body)