想写一个实时转发 Stream 流内容的接口,不怎么懂 stream 流相关的知识
之前的接口是
async def index(path: str, request: Request):
data = await request.json()
resp = c.send_request(path, data)
headers = dict(resp.headers)
async def generate():
for chunk in resp.iter_lines():
if chunk:
yield chunk + b'\n\n'
return StreamingResponse(generate(), headers=headers)
问题是,这个会等接受完全部的 Stream 流信息,才会转发,想实现的效果是实时转发 stream 流信息,具体一点就是,接收到一行或者指定大小的信息块,就转发出去。
想法是将 resp = c.send_request(path, data)
这块改造成异步的,然后实时转发。
目前的代码是
async def index(path: str, request: Request):
data = await request.json()
async with httpx.AsyncClient() as client:
async with client.stream('POST', path, json=data) as response:
resp_headers = dict(response.headers)
async def generate():
async for line in response.aiter_lines():
if line:
yield line.encode('utf-8') + b'\n\n'
return StreamingResponse(generate(), headers=resp_headers)
但是会报:
httpx.StreamClosed: Attempted to read or stream content, but the stream has been closed.
不知道怎么改,求赐教
1
yuanxing008 2023-03-08 17:45:14 +08:00
这个为什么还要走代码。直接用 Nginx 转出去不就好了么 你是要有业务逻辑处理?
|
2
noparking188 2023-03-08 17:50:30 +08:00
有过类似需求,需要对下载过程中的 stream 数据进行处理,搜到篇文章 [How to stream Microsoft SQL Server to S3 using BCP on linux]( https://dstan.medium.com/streaming-microsoft-sql-server-to-s3-using-bcp-35241967d2e0),原理就是借助 named pipe 来对不支持 pipe 的下载工具截取 stream 进行处理。
基于这个原理我写了个迁移 SQL Server 到 S3 的小工具: https://github.com/zhiweio/StreamXfer 楼主可以参考下,读于你描述的需求,需要一个进程实时写 stream 到 named pipe ,另一个进程实时读 stream 写到其它地方,类似: 1. mkfifo /tmp/fifo 2. wget 'https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2021-financial-year-provisional/Download-data/annual-enterprise-survey-2021-financial-year-provisional-csv.csv' -o /tmp/fifo & 3. cat /tmp/fifo | aws s3 cp - s3://bucket/xxx 希望能够有帮助,如果可以的话欢迎给我一个 star ,谢谢 :) |
3
liprais 2023-03-08 17:51:52 +08:00
找个用 splice api 的完事
|
4
009694 2023-03-08 18:55:38 +08:00 via iPhone
不要用 async with 。这样 return 出去的时候连接就已经关了 。用 background 去处理 httpx 未关闭的连接
|
5
ruanimal 2023-03-09 09:47:50 +08:00
`data = await request.json()` 这里其实就取了全部数据了, 不存在实时转发吧
|