flask 项目需要访问阿里云上部署的 kubeflow api 服务, 这个 api 参数需要使用 query 传递, 因 query 构造过长报了 413,后考虑拆分 query 并发访问服务.
大致代码抽象如下
import asyncio
import json
import re
import functools
from typing import Dict, List
import aiohttp
from loguru import logger
def kubeflow_auth_with_async(func):
"""做 kubeflow 的 auth"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
async with aiohttp.ClientSession() as session:
# login
payload = {
"username": conf.USERNAME,
"password": conf.PASSWORD,
}
await session.post(conf.AIX_AUTH_URL, data=payload)
# get req
# async with session.get(conf.PIPELINE_URL) as response:
# text = await response.text()
response = await session.get(conf.PIPELINE_URL)
text = await response.text()
pattern = r"/dex/auth/aix\?req="
index_beg = re.search(pattern, text).span()
index_end = text.find('"', index_beg[1])
req = text[index_beg[1] : index_end]
params = {"req": req}
# login kubeflow with aix
await session.get(conf.DEX_AUTH_URL, params=params)
return await func(session, *args, **kwargs)
return wrapper
async def async_get_runs(
session: aiohttp.ClientSession,
page_size=1,
page_token=None,
experiment_id=None,
filters=None,
sort_by="created_at desc",
):
if not filters:
filters = {}
query = {
"page_size": page_size,
"page_token": page_token,
"sort_by": sort_by,
"resource_reference_key.type": "EXPERIMENT",
"resource_reference_key.id": experiment_id,
"filter": json.dumps(filters),
}
response = await session.get(
url=f"{conf.PIPELINE_URL}/apis/v1beta1/runs", params=query
)
# async with session.get(
# url=f"{conf.PIPELINE_URL}/apis/v1beta1/runs", params=query
# ) as response:
# response.raise_for_status()
# return await response.json()
response.raise_for_status()
return await response.json()
@kubeflow_auth_with_async
async def gather_fetch_runs(
session: aiohttp.ClientSession,
runs_lst:List[str],
):
exp_id = "xxxxx"
tasks = []
for sub_runs in runs_lst:
filters: dict = {
"predicates": [
{
"key": "id",
"op": FILTER_OPERATIONS.IN.value,
"string_values": {"values": sub_runs},
},
]
}
tasks.append(
async_get_runs(
session,
page_size=len(sub_runs),
experiment_id=exp_id,
filters=filters,
)
)
return await asyncio.gather(*tasks)
res = []
for r in asyncio.run(gather_fetch_runs(["xxx","xxx","xx"])):
res.extend(r.get("runs", []))
出现的报错,不知道怎么贴图,手动概括一下异常
服务稳定运行一段时间后,会出现突然 500 刷新又可以坚挺一段时间
aiohttp.client_exceptions.ContentypTypeError
从报错信息上看像是 session 失效导致的,被 auth 重定向到了登录页. contentType 变成了 text/html
而请求的 url 重定向到了登录的 url,已经不是我传入的那个
因为没试过 async 函数的装饰器,不知道是不是这个问题,
还有就是 clientSession 的连接池是不是保存了 session 的状态. 装饰器每次都重新请求了.这个 session 按道理不应该还是一样的.
也试了,没有这个问题,不过访问次数多起来之后容易莫名 GG,大概率是被自己家部署的 kubeflow 反爬了?
1
amlee 2022-01-15 06:48:07 +08:00
你可能是想在装饰函数里面获取 token ,然后 session 发出的每次请求的请求头都要带上这个 token ?
如果我理解没错的话,装饰器里面的 wrapper 只执行一次的,你可以看看服务端的 token 过期时间。 |
2
tomtao00001 OP @amlee emm 服务器每次获取请求后, 内部实现都会用这个装饰器, 您指的 wrapper 执行一次, 我不是很理解, 表达的是 wrapper 函数外的作用域只执行一次么? 如果是这个意思 , 对于目前这个方式来讲 它应该是不影响的对么? 不知道我理解的对不对.
|
3
amlee 2022-01-18 19:40:41 +08:00
@tomtao00001 之前看你代码不仔细,我上面那个回答是错误的,不要理会上面那个回答了。
我重新读了一遍你的代码,你说的“每次获取请求,内部函数都会用这个装饰器”跟你的代码行为有误差。 你通过 gather_fetch_runs 构建一组了协程对象并发执行,但这一组协程对象通过 gather_fetch_runs 的 @kubeflow_auth_with_async 装饰器赋予了同一个 session 对象。当这一组协程并发运行足够长的时间,登录会超时。而你的登录状态是保存在同一个 session 中的,这一组协程共同使用这个 session ,所以登录超时以后这个 session 失效,你这一组并发的协程也会请求失败。 而你说的刷新又好了的情况,或许是重新运行了一遍 gather_fetch_runs ,这会导致重新构建一组协程,重新构建一个已登录的 session |
4
tomtao00001 OP @amlee 好的 非常感谢回复 最后还是换了一种方式 这个就直接放弃了
|