Python流式下发技术解析:StreamingResponse与EventSourceResponse的对比及适用场景探讨

流式响应在大模型服务中可大大提高用户体验,在Python 中主要有两种方式实现流式响应,即fastapi 的 StreamingResponse 和 SEE 模块的 EventSourceResponse,既然两者都可以实现流式响应,那么我们在实际应用中应该如何选择呢?

接下来我们就来详细分析一下二者的异同及适用场景。

一、核心相同点

1. 流式传输能力
  • 均支持异步生成器 (async generator),实现数据分块(chunked)传输。
  • 适用于需要实时、持续下发数据的场景(如实时监控、大文件下载、消息推送)。
  • 2. 长连接特性
  • 保持 HTTP 连接长时间开放,持续发送数据,而非一次性返回完整响应。
  • 可在客户端和服务器之间建立持久化通信通道。
  • 3. 异步兼容性
  • 完美适配 FastAPI 异步框架,能够结合 await 和异步 I/O 操作。
  • 避免阻塞事件循环,支持高并发场景。
  • 二、核心不同点

    特性

    StreamingResponse

    EventSourceResponse (SSE)

    协议规范/标准

    通用 HTTP 流式传输

    严格遵循 Server-Sent Events (SSE) 协议

    数据格式要求

    无格式限制(二进制/文本均可)

    必须符合 SSE 消息格式规范(data:前缀 + 空行分隔)

    客户端交互方式

    需要手动处理分块数据(如 Fetch API 流式读取)

    浏览器原生支持(EventSourceAPI 自动解析消息)

    Content-Type

    需手动指定(如video/mp4

    固定为text/event-stream

    消息结构 无结构化要求

    支持event:id:retry:等元数据字段

    连接管理

    传输完成即关闭

    默认长连接,需手动终止或处理断开重连逻辑

    典型应用场景

    大文件下载、视频流、日志流

    实时通知、股票行情、在线聊天消息推送

    三、技术实现对比

    1. 服务端数据格式示例
    StreamingResponse:自定义格式

            适用场景:逐块发送二进制数据(如文件下载)或自定义文本流。

    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    import asyncio
    
    app = FastAPI()
    
    # 示例1:实时日志流(纯文本流,无结构要求)
    async def log_generator():
        for i in range(5):
            yield f"Log entry {i}\n"
            await asyncio.sleep(1)
    
    @app.get("/logs_stream")
    async def stream_logs():
        return StreamingResponse(
            log_generator(),
            media_type="text/plain"
        )
    
    # 示例2:大文件分块下载(二进制)
    async def file_chunker(file_path: str):
        with open(file_path, "rb") as f:
            while chunk := f.read(1024*1024):  # 1MB chunks
                yield chunk
    
    @app.get("/download-large-file")
    async def download_large_file():
        return StreamingResponse(
            file_chunker("bigfile.zip"),
            media_type="application/octet-stream",
            headers={"Content-Disposition": "attachment; filename=bigfile.zip"}
        )

     EventSourceResponse:SSE 标准格式

            适用场景:需要浏览器自动解析的实时事件推送。

    from sse_starlette.sse import EventSourceResponse
    import datetime
    
    async def sse_news_generator():
        news_items = ["News 1", "News 2", "Breaking News"]
        for news in news_items:
            # SSE 格式要求:data字段 + 空行分隔
            yield {
                "event": "update",
                "data": json.dumps({
                    "time": datetime.datetime.now().isoformat(),
                    "content": news
                }),
                "retry": 3000  # 客户端重试时间(毫秒)
            }
            await asyncio.sleep(2)
    
    @app.get("/news-stream")
    async def news_stream():
        return EventSourceResponse(sse_news_generator())
    2. 客户端处理方式
    StreamingResponse(需手动解析)
    // 浏览器端使用 Fetch API 处理流
    const response = await fetch('/logs_stream');
    const reader = response.body.getReader();
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      console.log(new TextDecoder().decode(value));
    }
    EventSourceResponse(自动解析)
    // 浏览器端使用 EventSource API
    ## 方案一
    const eventSource = new EventSource('/news-stream');
    eventSource.onmessage = (e) => {
      console.log("Received:", e.data); // 自动解析 data 字段
    };
    
    
    ##方案二
    const eventSource = new EventSource('/news-stream');
    eventSource.addEventListener('update', (e) => {
        const data = JSON.parse(e.data);
        console.log('Received news:', data.content);
    });

    四、使用场景差异

    StreamingResponse 适用场景
    1. 文件下载/上传

      1. 分块传输大型文件(如视频、数据集)。
      2. 支持断点续传(通过 Range 请求头)。
    2. 媒体流传输: 

      1. 实时音视频流(如监控摄像头数据)。
      2. 动态生成的二进制数据流(如实时录屏)。
    3. 日志流式输出

      1. 服务器实时日志推送(如 CI/CD 构建日志)。
      2. 命令行工具实时输出。
    EventSourceResponse 适用场景
    1. 实时事件推送

    2. 股票行情更新、体育比赛比分实时推送。
    3. 用户通知(如邮件到达提醒、系统报警)。
    4. 在线聊天/协作

    5. 聊天消息实时广播。
    6. 文档协同编辑(如多人同时编辑时的状态同步)。
    7. 长轮询替代方案

    8. 需要服务器主动推送数据的场景(替代 HTTP 轮询)。
    9. 客户端自动重连支持(浏览器内置机制)。

    五、高级特性对比

    高级功能

    StreamingResponse

    EventSourceResponse

    断点续传支持

    ✅ (需自定义Range头处理)

    ❌ (SSE 协议不支持)

    消息重连机制

    ❌ (需手动实现)

    ✅ (浏览器自动重连,支持retry:字段)

    事件类型过滤

    ✅ (通过event:字段定义多事件类型)

    心跳保活

    需手动发送空数据

    支持发送注释行(如: heartbeat

    六、高级配置与注意事项

    StreamingResponse 高级配置
  • 分块编码控制:默认启用 Transfer-Encoding: chunked,可关闭:
    StreamingResponse(..., chunked=False)
  • 自定义响应头:如实现断点续传(Content-Range):
    headers={"Content-Range": "bytes 0-1023/2048"}
  • 2. EventSourceResponse 高级功能
  • 事件类型过滤:客户端可监听特定事件:
  •          python:

    yield {"event": "alert", "data": "Critical update!"}

             javascript

    eventSource.addEventListener('alert', (e) => { ... });
  • 连接保活:发送注释行保持连接:
    yield ": keep-alive\n\n"
  • 3. 通用注意事项
  • 生成器退出逻辑:确保在客户端断开时清理资源:
    async def generator():
        try:
            while True:
                yield data
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            print("客户端断开连接")
            # 执行清理操作
  • 错误处理:捕获生成器内的异常,返回错误信息
  • async def safe_generator():
        try:
            async for chunk in risky_source():
                yield chunk
        except Exception as e:
            yield f"Error occurred: {str(e)}"

    七、性能优化建议

    StreamingResponse
    1. 分块大小调优

    2. 二进制传输时,根据带宽调整块大小(如 256KB ~ 1MB)。
    3. 文本流可适当减少块大小(如 4KB)。
    4. 启用压缩:对文本流启用gzip 传输:

      from fastapi.middleware.gzip import GZipMiddleware
      app.add_middleware(GZipMiddleware, minimum_size=1024)  # 对大于 1KB 的数据启用压缩
    EventSourceResponse

    1.减少序列化开销

            直接生成符合 SSE 格式的字符串,避免多次序列化: 

    # 优化前(低效)
    yield {"data": json.dumps(message)}
    # 优化后(高效)
    yield f"data: {json.dumps(message)}\n\n"

     2.心跳保活:防止代理服务器超时断开:

    async def generator():
        while True:
            yield ":ping\n\n"  # 发送心跳注释行
            await asyncio.sleep(15)

    八、总结与选型建议

  • StreamingResponse:万能流式传输工具,适用于非结构化数据流,需手动处理协议细节。
  • EventSourceResponse:专为 SSE 设计,简化实时事件推送,自动处理协议格式,与浏览器 EventSource API 深度集成。
  • 选择 StreamingResponse

  • 需要传输二进制数据或自定义文本流。
  • 协议无格式要求,需高度灵活性。
  • 客户端需手动处理流(如前端实现分片下载)。
  • 选择 EventSourceResponse

  • 需浏览器原生支持实时事件推送。
  • 需自动重连、消息结构化和多事件类型。
  • 符合 SSE 协议规范的服务端主动推送场景。
  • 根据具体需求的数据格式、协议兼容性和客户端实现复杂度,合理选择以优化开发效率和性能。

    作者:lanbing

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python流式下发技术解析:StreamingResponse与EventSourceResponse的对比及适用场景探讨

    发表回复