From 78e2c579b9fe85ed6c2fb306e7a7e7b952771134 Mon Sep 17 00:00:00 2001 From: Wu Clan Date: Sat, 9 Mar 2024 21:56:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E8=87=AA=E5=AE=9A=E4=B9=89to?= =?UTF-8?q?ken=E6=8E=88=E6=9D=83=E6=96=B9=E5=BC=8F=20(#158)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- httpfpt/core/auth.yaml | 16 ++++++--- httpfpt/enums/request/auth.py | 1 + httpfpt/utils/auth_plugins.py | 36 +++++++++++++-------- httpfpt/utils/request/request_data_parse.py | 3 ++ 4 files changed, 37 insertions(+), 19 deletions(-) diff --git a/httpfpt/core/auth.yaml b/httpfpt/core/auth.yaml index 0889c7c..fae961e 100644 --- a/httpfpt/core/auth.yaml +++ b/httpfpt/core/auth.yaml @@ -2,9 +2,9 @@ is_auth: false # 认证类型,请填写为认证方式的键值,如:bearer_token auth_type: bearer_token -############### -## token 认证 -############### +## +## bearer token 认证(自动登录获取) +## bearer_token: url: https://api.pity.fun/auth/login username: tester @@ -16,9 +16,15 @@ bearer_token: token_key: $.data.token # token有效期,单位: 秒 timeout: 100000 -################ +## +## bearer token 认证(自定义) +## +bearer_token_custom: + token: xxx + timeout: 100000 +## ## cookie 认证 -################ +## header_cookie: url: xxx username: xxx diff --git a/httpfpt/enums/request/auth.py b/httpfpt/enums/request/auth.py index 730dab5..c2e97e3 100644 --- a/httpfpt/enums/request/auth.py +++ b/httpfpt/enums/request/auth.py @@ -5,4 +5,5 @@ class AuthType(StrEnum): TOKEN = 'bearer_token' + TOKEN_CUSTOM = 'bearer_token_custom' COOKIE = 'header_cookie' diff --git a/httpfpt/utils/auth_plugins.py b/httpfpt/utils/auth_plugins.py index 08279c2..de9ba70 100644 --- a/httpfpt/utils/auth_plugins.py +++ b/httpfpt/utils/auth_plugins.py @@ -22,11 +22,6 @@ def __init__(self) -> None: self.is_auth = self.auth_data['is_auth'] self.auth_type = self.auth_data['auth_type'] self.auth_type_verify() - # 授权接口请求参数 - self.url = self.auth_data[f'{self.auth_type}']['url'] - self.username = self.auth_data[f'{self.auth_type}']['username'] - self.password = self.auth_data[f'{self.auth_type}']['password'] - self.headers = self.auth_data[f'{self.auth_type}']['headers'] self.timeout = self.auth_data[f'{self.auth_type}']['timeout'] or 86400 @lru_cache @@ -43,13 +38,17 @@ def auth_type_verify(self) -> None: def request_auth(self) -> requests.Response: try: + url = self.auth_data[f'{self.auth_type}']['url'] + username = self.auth_data[f'{self.auth_type}']['username'] + password = self.auth_data[f'{self.auth_type}']['password'] + headers = self.auth_data[f'{self.auth_type}']['headers'] request_data = { - 'url': self.url, - 'data': {'username': self.username, 'password': self.password}, - 'headers': self.headers, + 'url': url, + 'data': {'username': username, 'password': password}, + 'headers': headers, 'proxies': {'http': None, 'https': None}, } - if 'application/json' in str(self.headers): + if 'application/json' in str(headers): request_data.update({'json': request_data.pop('data')}) response = requests.post(**request_data) response.raise_for_status() @@ -59,8 +58,7 @@ def request_auth(self) -> requests.Response: @property def bearer_token(self) -> str: - self.headers.update({'Connection': 'close'}) - cache_bearer_token = redis_client.get(f'{redis_client.token_prefix}:{self.url}', logging=False) + cache_bearer_token = redis_client.get(f'{redis_client.token_prefix}:bearer_token', logging=False) if cache_bearer_token: token = cache_bearer_token else: @@ -69,12 +67,22 @@ def bearer_token(self) -> str: token = jp_token[0] if not token: raise AuthError('Token 获取失败,请检查登录接口响应或 token 提取表达式') - redis_client.set(f'{redis_client.token_prefix}:{self.url}', token, ex=self.timeout) + redis_client.set(f'{redis_client.token_prefix}:bearer_token', token, ex=self.timeout) + return token + + @property + def bearer_token_custom(self) -> str: + cache_bearer_token_custom = redis_client.get(f'{redis_client.token_prefix}:bearer_token_custom', logging=False) + if cache_bearer_token_custom: + token = cache_bearer_token_custom + else: + token = self.auth_data[f'{self.auth_type}']['token'] + redis_client.set(f'{redis_client.token_prefix}:bearer_token_custom', token, ex=self.timeout) return token @property def header_cookie(self) -> dict: - cache_cookie = redis_client.get(f'{redis_client.cookie_prefix}:{self.url}', logging=False) + cache_cookie = redis_client.get(f'{redis_client.cookie_prefix}:header_cookie', logging=False) if cache_cookie: cookies = json.loads(cache_cookie) else: @@ -84,7 +92,7 @@ def header_cookie(self) -> dict: if not cookies: raise AuthError('Cookie 获取失败,请检查登录接口响应') redis_client.set( - f'{redis_client.cookie_prefix}:{self.url}', json.dumps(cookies, ensure_ascii=False), ex=self.timeout + f'{redis_client.cookie_prefix}:header_cookie', json.dumps(cookies, ensure_ascii=False), ex=self.timeout ) return cookies diff --git a/httpfpt/utils/request/request_data_parse.py b/httpfpt/utils/request/request_data_parse.py index 0673018..8d8c33b 100644 --- a/httpfpt/utils/request/request_data_parse.py +++ b/httpfpt/utils/request/request_data_parse.py @@ -375,6 +375,9 @@ def headers(self) -> dict | None: if auth.auth_type == AuthType.TOKEN: bearer_token = {'Authorization': f'Bearer {auth.bearer_token}'} headers = headers.update(bearer_token) if headers else bearer_token + elif auth.auth_type == AuthType.TOKEN_CUSTOM: + bearer_token = {'Authorization': f'Bearer {auth.bearer_token_custom}'} + headers = headers.update(bearer_token) if headers else bearer_token return headers @property