Skip to content

Commit

Permalink
添加自定义token授权方式 (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-clan authored Mar 9, 2024
1 parent 17770a1 commit 78e2c57
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 19 deletions.
16 changes: 11 additions & 5 deletions httpfpt/core/auth.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
is_auth: false
# 认证类型,请填写为认证方式的键值,如:bearer_token
auth_type: bearer_token
###############
## token 认证
###############
##
## bearer token 认证(自动登录获取)
##
bearer_token:
url: https://api.pity.fun/auth/login
username: tester
Expand All @@ -16,9 +16,15 @@ bearer_token:
token_key: $.data.token
# token有效期,单位: 秒
timeout: 100000
################
##
## bearer token 认证(自定义)
##
bearer_token_custom:
token: xxx
timeout: 100000
##
## cookie 认证
################
##
header_cookie:
url: xxx
username: xxx
Expand Down
1 change: 1 addition & 0 deletions httpfpt/enums/request/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@

class AuthType(StrEnum):
TOKEN = 'bearer_token'
TOKEN_CUSTOM = 'bearer_token_custom'
COOKIE = 'header_cookie'
36 changes: 22 additions & 14 deletions httpfpt/utils/auth_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ def __init__(self) -> None:
self.is_auth = self.auth_data['is_auth']
self.auth_type = self.auth_data['auth_type']
self.auth_type_verify()
# 授权接口请求参数
self.url = self.auth_data[f'{self.auth_type}']['url']
self.username = self.auth_data[f'{self.auth_type}']['username']
self.password = self.auth_data[f'{self.auth_type}']['password']
self.headers = self.auth_data[f'{self.auth_type}']['headers']
self.timeout = self.auth_data[f'{self.auth_type}']['timeout'] or 86400

@lru_cache
Expand All @@ -43,13 +38,17 @@ def auth_type_verify(self) -> None:

def request_auth(self) -> requests.Response:
try:
url = self.auth_data[f'{self.auth_type}']['url']
username = self.auth_data[f'{self.auth_type}']['username']
password = self.auth_data[f'{self.auth_type}']['password']
headers = self.auth_data[f'{self.auth_type}']['headers']
request_data = {
'url': self.url,
'data': {'username': self.username, 'password': self.password},
'headers': self.headers,
'url': url,
'data': {'username': username, 'password': password},
'headers': headers,
'proxies': {'http': None, 'https': None},
}
if 'application/json' in str(self.headers):
if 'application/json' in str(headers):
request_data.update({'json': request_data.pop('data')})
response = requests.post(**request_data)
response.raise_for_status()
Expand All @@ -59,8 +58,7 @@ def request_auth(self) -> requests.Response:

@property
def bearer_token(self) -> str:
self.headers.update({'Connection': 'close'})
cache_bearer_token = redis_client.get(f'{redis_client.token_prefix}:{self.url}', logging=False)
cache_bearer_token = redis_client.get(f'{redis_client.token_prefix}:bearer_token', logging=False)
if cache_bearer_token:
token = cache_bearer_token
else:
Expand All @@ -69,12 +67,22 @@ def bearer_token(self) -> str:
token = jp_token[0]
if not token:
raise AuthError('Token 获取失败,请检查登录接口响应或 token 提取表达式')
redis_client.set(f'{redis_client.token_prefix}:{self.url}', token, ex=self.timeout)
redis_client.set(f'{redis_client.token_prefix}:bearer_token', token, ex=self.timeout)
return token

@property
def bearer_token_custom(self) -> str:
cache_bearer_token_custom = redis_client.get(f'{redis_client.token_prefix}:bearer_token_custom', logging=False)
if cache_bearer_token_custom:
token = cache_bearer_token_custom
else:
token = self.auth_data[f'{self.auth_type}']['token']
redis_client.set(f'{redis_client.token_prefix}:bearer_token_custom', token, ex=self.timeout)
return token

@property
def header_cookie(self) -> dict:
cache_cookie = redis_client.get(f'{redis_client.cookie_prefix}:{self.url}', logging=False)
cache_cookie = redis_client.get(f'{redis_client.cookie_prefix}:header_cookie', logging=False)
if cache_cookie:
cookies = json.loads(cache_cookie)
else:
Expand All @@ -84,7 +92,7 @@ def header_cookie(self) -> dict:
if not cookies:
raise AuthError('Cookie 获取失败,请检查登录接口响应')
redis_client.set(
f'{redis_client.cookie_prefix}:{self.url}', json.dumps(cookies, ensure_ascii=False), ex=self.timeout
f'{redis_client.cookie_prefix}:header_cookie', json.dumps(cookies, ensure_ascii=False), ex=self.timeout
)
return cookies

Expand Down
3 changes: 3 additions & 0 deletions httpfpt/utils/request/request_data_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ def headers(self) -> dict | None:
if auth.auth_type == AuthType.TOKEN:
bearer_token = {'Authorization': f'Bearer {auth.bearer_token}'}
headers = headers.update(bearer_token) if headers else bearer_token
elif auth.auth_type == AuthType.TOKEN_CUSTOM:
bearer_token = {'Authorization': f'Bearer {auth.bearer_token_custom}'}
headers = headers.update(bearer_token) if headers else bearer_token
return headers

@property
Expand Down

0 comments on commit 78e2c57

Please sign in to comment.