Python流式下发技术解析:StreamingResponse与EventSourceResponse的对比及适用场景探讨
流式响应在大模型服务中可大大提高用户体验,在Python 中主要有两种方式实现流式响应,即fastapi 的 StreamingResponse 和 SEE 模块的 EventSourceResponse,既然两者都可以实现流式响应,那么我们在实际应用中应该如何选择呢?
接下来我们就来详细分析一下二者的异同及适用场景。
一、核心相同点
1. 流式传输能力
async generator
),实现数据分块(chunked)传输。2. 长连接特性
3. 异步兼容性
await
和异步 I/O 操作。二、核心不同点
特性 |
|
|
---|---|---|
协议规范/标准 |
通用 HTTP 流式传输 |
严格遵循 Server-Sent Events (SSE) 协议 |
数据格式要求 |
无格式限制(二进制/文本均可) |
必须符合 SSE 消息格式规范( |
客户端交互方式 |
需要手动处理分块数据(如 Fetch API 流式读取) |
浏览器原生支持( |
Content-Type |
需手动指定(如 |
固定为 |
消息结构 | 无结构化要求 |
支持 |
连接管理 |
传输完成即关闭 |
默认长连接,需手动终止或处理断开重连逻辑 |
典型应用场景 |
大文件下载、视频流、日志流 |
实时通知、股票行情、在线聊天消息推送 |
三、技术实现对比
1. 服务端数据格式示例
StreamingResponse
:自定义格式
适用场景:逐块发送二进制数据(如文件下载)或自定义文本流。
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
# 示例1:实时日志流(纯文本流,无结构要求)
async def log_generator():
for i in range(5):
yield f"Log entry {i}\n"
await asyncio.sleep(1)
@app.get("/logs_stream")
async def stream_logs():
return StreamingResponse(
log_generator(),
media_type="text/plain"
)
# 示例2:大文件分块下载(二进制)
async def file_chunker(file_path: str):
with open(file_path, "rb") as f:
while chunk := f.read(1024*1024): # 1MB chunks
yield chunk
@app.get("/download-large-file")
async def download_large_file():
return StreamingResponse(
file_chunker("bigfile.zip"),
media_type="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=bigfile.zip"}
)
EventSourceResponse
:SSE 标准格式
适用场景:需要浏览器自动解析的实时事件推送。
from sse_starlette.sse import EventSourceResponse
import datetime
async def sse_news_generator():
news_items = ["News 1", "News 2", "Breaking News"]
for news in news_items:
# SSE 格式要求:data字段 + 空行分隔
yield {
"event": "update",
"data": json.dumps({
"time": datetime.datetime.now().isoformat(),
"content": news
}),
"retry": 3000 # 客户端重试时间(毫秒)
}
await asyncio.sleep(2)
@app.get("/news-stream")
async def news_stream():
return EventSourceResponse(sse_news_generator())
2. 客户端处理方式
StreamingResponse
(需手动解析)
// 浏览器端使用 Fetch API 处理流
const response = await fetch('/logs_stream');
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(new TextDecoder().decode(value));
}
EventSourceResponse
(自动解析)
// 浏览器端使用 EventSource API
## 方案一
const eventSource = new EventSource('/news-stream');
eventSource.onmessage = (e) => {
console.log("Received:", e.data); // 自动解析 data 字段
};
##方案二
const eventSource = new EventSource('/news-stream');
eventSource.addEventListener('update', (e) => {
const data = JSON.parse(e.data);
console.log('Received news:', data.content);
});
四、使用场景差异
StreamingResponse
适用场景
-
文件下载/上传:
- 分块传输大型文件(如视频、数据集)。
- 支持断点续传(通过
Range
请求头)。
-
媒体流传输:
- 实时音视频流(如监控摄像头数据)。
- 动态生成的二进制数据流(如实时录屏)。
-
日志流式输出:
- 服务器实时日志推送(如 CI/CD 构建日志)。
- 命令行工具实时输出。
EventSourceResponse
适用场景
-
实时事件推送:
- 股票行情更新、体育比赛比分实时推送。
- 用户通知(如邮件到达提醒、系统报警)。
-
在线聊天/协作:
- 聊天消息实时广播。
- 文档协同编辑(如多人同时编辑时的状态同步)。
-
长轮询替代方案:
- 需要服务器主动推送数据的场景(替代 HTTP 轮询)。
- 客户端自动重连支持(浏览器内置机制)。
五、高级特性对比
高级功能 |
|
|
---|---|---|
断点续传支持 |
✅ (需自定义 |
❌ (SSE 协议不支持) |
消息重连机制 |
❌ (需手动实现) |
✅ (浏览器自动重连,支持 |
事件类型过滤 |
❌ |
✅ (通过 |
心跳保活 |
需手动发送空数据 |
支持发送注释行(如 |
六、高级配置与注意事项
StreamingResponse 高级配置
Transfer-Encoding: chunked
,可关闭:
StreamingResponse(..., chunked=False)
Content-Range
):
headers={"Content-Range": "bytes 0-1023/2048"}
2. EventSourceResponse 高级功能
python:
yield {"event": "alert", "data": "Critical update!"}
javascript
eventSource.addEventListener('alert', (e) => { ... });
yield ": keep-alive\n\n"
3. 通用注意事项
async def generator():
try:
while True:
yield data
await asyncio.sleep(1)
except asyncio.CancelledError:
print("客户端断开连接")
# 执行清理操作
async def safe_generator():
try:
async for chunk in risky_source():
yield chunk
except Exception as e:
yield f"Error occurred: {str(e)}"
七、性能优化建议
StreamingResponse
-
分块大小调优:
- 二进制传输时,根据带宽调整块大小(如 256KB ~ 1MB)。
- 文本流可适当减少块大小(如 4KB)。
-
启用压缩:对文本流启用gzip 传输:
from fastapi.middleware.gzip import GZipMiddleware app.add_middleware(GZipMiddleware, minimum_size=1024) # 对大于 1KB 的数据启用压缩
EventSourceResponse
1.减少序列化开销:
直接生成符合 SSE 格式的字符串,避免多次序列化:
# 优化前(低效)
yield {"data": json.dumps(message)}
# 优化后(高效)
yield f"data: {json.dumps(message)}\n\n"
2.心跳保活:防止代理服务器超时断开:
async def generator():
while True:
yield ":ping\n\n" # 发送心跳注释行
await asyncio.sleep(15)
八、总结与选型建议
选择 StreamingResponse
:
选择 EventSourceResponse
:
根据具体需求的数据格式、协议兼容性和客户端实现复杂度,合理选择以优化开发效率和性能。
作者:lanbing