【Python】sse_starlette库:实现Starlette与FastAPI框架的Server-Sent Events(SSE)支持详解

sse_starlette 是一个 Python 库,为 Starlette 和 FastAPI 框架提供对 Server-Sent Events(SSE,服务器推送事件)的支持。它通过 EventSourceResponse 类实现 SSE 协议,允许服务器异步向客户端推送实时数据,适合构建实时 Web 应用,如通知系统、实时仪表盘或流式数据更新。sse_starlette 轻量且与 ASGI 框架无缝集成,常用于需要高效单向通信的场景。

以下是对 sse_starlette 库的详细介绍,包括其功能、用法和实际应用,基于最新信息(截至 2025)。


1. sse_starlette 库的作用

  • SSE 支持:为 Starlette/FastAPI 提供标准化的 SSE 实现,基于 HTML5 EventSource API。
  • 实时数据推送:支持服务器到客户端的单向数据流,适合实时更新。
  • 高性能:利用 asyncioanyio 的异步任务组(TaskGroups),确保高效并发。
  • 易于集成:与 Starlette/FastAPI 无缝兼容,支持自定义事件和头部。
  • 健壮性:处理客户端断开、服务器关闭和非线程安全对象,增强可靠性。
  • 近期动态

  • 最新版本:2.3.4(截至 2025-05-04,PyPI)。
  • 新增功能
  • 支持自定义 ping 间隔和消息工厂(ping_message_factory)。
  • 修复多 SSE 端点测试中的事件循环绑定问题(Issue #59)。
  • 增强与 anyio 的集成,优化任务管理。
  • 社区活跃:GitHub 仓库(https://github.com/sysid/sse-starlette)有 592 星,30 位贡献者,维护健康。
  • 注意事项:不支持 GZipMiddleware,需确保任务在关闭时清理以避免警告。

  • 2. 安装与环境要求

  • Python 版本:支持 Python 3.8+(推荐 3.9+)。
  • 核心依赖
  • starlette:ASGI 框架基础。
  • anyio:异步任务管理(最低 3.6.2)。
  • 可选:uvicorn(运行 ASGI 应用)、httpx(测试)。
  • 安装命令
    pip install sse-starlette
    
  • 完整安装(包括运行和测试):
    pip install sse-starlette uvicorn httpx
    
  • 验证安装
    import sse_starlette
    print(sse_starlette.__version__)  # 示例输出: 2.3.4
    
  • 系统要求

  • 确保 uvicornstarlette 版本兼容(starlette>=0.26.1)。
  • 测试环境可能需 pytestpytest-asyncio
    pip install pytest pytest-asyncio
    

  • 3. 核心功能与用法

    sse_starlette 提供 EventSourceResponse 类,用于创建 SSE 响应,支持异步生成器、自定义事件和连接管理。以下是主要功能和示例。

    3.1 基本 SSE 实现

    使用异步生成器推送事件。

    import asyncio
    import uvicorn
    from starlette.applications import Starlette
    from starlette.routing import Route
    from sse_starlette.sse import EventSourceResponse
    
    async def numbers(minimum: int, maximum: int):
        for i in range(minimum, maximum + 1):
            await asyncio.sleep(0.9)
            yield dict(data=i)
    
    async def sse(request):
        generator = numbers(1, 5)
        return EventSourceResponse(generator)
    
    routes = [
        Route("/", endpoint=sse)
    ]
    
    app = Starlette(debug=True, routes=routes)
    
    if __name__ == "__main__":
        uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
    

    运行

    python sse_example.py
    

    客户端测试(使用 curl):

    curl -N http://localhost:8000
    

    输出示例

    data: 1
    
    data: 2
    
    data: 3
    
    data: 4
    
    data: 5
    

    说明

  • EventSourceResponse 接受异步生成器,推送 data 字段。
  • 每 0.9 秒发送一个数字,符合 SSE 协议(text/event-stream)。
  • 默认每 15 秒发送 ping 保持连接。
  • 3.2 自定义事件

    指定事件名称、ID 和多行数据。

    from fastapi import FastAPI, Request
    from sse_starlette.sse import EventSourceResponse, ServerSentEvent
    import asyncio
    import time
    
    app = FastAPI()
    
    @app.get("/stream")
    async def sse_stream(request: Request):
        async def event_generator():
            for i in range(1, 6):
                if await request.is_disconnected():
                    break
                msg = f"Update {i}\nTime: {time.strftime('%H:%M:%S')}"
                yield ServerSentEvent(
                    data=msg,
                    event="update",
                    id=str(i)
                )
                await asyncio.sleep(1)
    
        return EventSourceResponse(event_generator(), ping=5)
    
    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
    

    客户端 JavaScript

    <script>
      const source = new EventSource("http://localhost:8000/stream");
      source.addEventListener("update", (event) => {
        console.log(`ID: ${event.id}, Data: ${event.data}`);
      });
    </script>
    

    输出示例(浏览器控制台):

    ID: 1, Data: Update 1
    Time: 12:34:56
    ID: 2, Data: Update 2
    Time: 12:34:57
    ...
    

    说明

  • ServerSentEvent 支持 dataeventidcomment 字段。
  • ping=5 每 5 秒发送 ping 保持连接。
  • 检查 request.is_disconnected() 确保客户端断开时停止生成。
  • 3.3 处理客户端断开

    优雅处理客户端关闭连接。

    from fastapi import FastAPI, Request
    from sse_starlette.sse import EventSourceResponse
    import asyncio
    import logging
    
    app = FastAPI()
    logging.basicConfig(level=logging.INFO)
    _log = logging.getLogger(__name__)
    
    @app.get("/endless")
    async def endless(req: Request):
        async def event_publisher():
            i = 0
            try:
                while True:
                    i += 1
                    yield dict(data=i)
                    await asyncio.sleep(0.2)
            except asyncio.CancelledError as e:
                _log.info(f"Disconnected from client {req.client}")
                raise e
    
        return EventSourceResponse(event_publisher())
    
    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
    

    说明

  • 捕获 asyncio.CancelledError 处理客户端断开(如刷新或关闭页面)。
  • 使用日志记录断开事件。
  • anyio.TaskGroups 确保任务安全管理。
  • 3.4 定向事件推送

    向特定客户端发送事件。

    from fastapi import FastAPI, Request
    from pydantic import BaseModel
    from sse_starlette.sse import EventSourceResponse
    from collections import defaultdict
    import asyncio
    from typing import Optional
    
    app = FastAPI()
    clients = defaultdict(list)
    
    class EmitEventModel(BaseModel):
        event_name: str
        event_data: Optional[str] = "No Event Data"
        event_id: Optional[int] = None
        recipient_id: str
    
    async def retrieve_events(recipient_id: str):
        yield dict(data="Connection established")
        while True:
            if recipient_id in clients and clients[recipient_id]:
                yield clients[recipient_id].pop(0)
            await asyncio.sleep(1)
    
    @app.get("/subscribe/{recipient_id}")
    async def loop_back_stream(req: Request, recipient_id: str):
        return EventSourceResponse(retrieve_events(recipient_id))
    
    @app.post("/emit")
    async def emit_event(event: EmitEventModel):
        clients[event.recipient_id].append({
            "event": event.event_name,
            "data": event.event_data,
            "id": event.event_id
        })
        return {"status": "event queued"}
    
    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
    

    测试

    1. 订阅客户端:
      curl http://localhost:8000/subscribe/user1
      
    2. 发送事件:
      curl -X POST http://localhost:8000/emit -H "Content-Type: application/json" -d '{"event_name": "notify", "event_data": "Hello user1", "recipient_id": "user1"}'
      

    输出示例(客户端):

    data: Connection established
    
    event: notify
    data: Hello user1
    

    说明

  • 使用 defaultdict 存储客户端事件队列。
  • /subscribe/{recipient_id} 订阅事件流。
  • /emit 向指定 recipient_id 推送事件。
  • 基于 Stack Overflow 示例改进。
  • 3.5 测试 SSE 端点

    使用 httpxhttpx_sse 测试 SSE。

    import asyncio
    import httpx
    from httpx_sse import aconnect_sse
    from sse_starlette.sse import EventSourceResponse
    from starlette.applications import Starlette
    from starlette.routing import Route
    
    async def auth_events(request):
        async def events():
            yield {"event": "login", "data": '{"user_id": "4135"}'}
        return EventSourceResponse(events())
    
    app = Starlette(routes=[Route("/sse/auth/", endpoint=auth_events)])
    
    async def test_sse():
        async with httpx.AsyncClient(app=app) as client:
            async with aconnect_sse(client, "GET", "http://localhost:8000/sse/auth/") as event_source:
                events = [sse async for sse in event_source.aiter_sse()]
                (sse,) = events
                assert sse.event == "login"
                assert sse.json() == {"user_id": "4135"}
    
    if __name__ == "__main__":
        asyncio.run(test_sse())
    

    说明

  • 使用 httpx_sseaconnect_sse 测试 SSE 端点。
  • 验证事件名称和数据。
  • 需安装 httpx-sse
    pip install httpx-sse
    

  • 4. 性能与特点

  • 高效性:基于 anyio.TaskGroups,支持高并发 SSE 连接。
  • 易用性EventSourceResponse 封装 SSE 协议,简化开发。
  • 兼容性:与 FastAPI、Starlette 和 uvicorn 无缝集成。
  • 局限性
  • 不支持 GZipMiddleware,可能导致压缩冲突。
  • 非线程安全对象(如 SQLAlchemy 的 AsyncSession)需在生成器内创建。
  • 需手动清理任务以避免关闭时的警告。
  • 与替代方案对比
  • Starlette StreamingResponse:手动实现 SSE,代码复杂,缺乏协议支持。
  • WebSocket:双向通信,适合复杂交互,但开销较高。
  • aiohttp-sse:针对 aiohttp 框架,不适用 Starlette/FastAPI。

  • 5. 实际应用场景

  • 实时通知:推送消息或警报(如聊天、系统状态)。
  • 数据流:流式传输数据库更新(如 PostgreSQL 的 TAIL)。
  • 仪表盘:实时更新图表或指标。
  • 进度跟踪:异步任务的状态更新(如文件上传、后台处理)。
  • HTMX 集成:结合 HTMX 的 hx-sse 属性实现动态前端。
  • 示例(数据库流式更新)

    from fastapi import FastAPI, Request
    from sse_starlette.sse import EventSourceResponse
    import asyncpg
    import asyncio
    from loguru import logger
    
    app = FastAPI()
    
    async def get_db_pool():
        return await asyncpg.create_pool(
            user="postgres", password="postgres", database="todos", host="localhost"
        )
    
    @app.get("/stream")
    async def message_stream(request: Request):
        pool = await get_db_pool()
        
        async def event_generator():
            try:
                async with pool.acquire() as conn:
                    async with conn.transaction():
                        await conn.execute("LISTEN todo_inserted")
                        async for notify in conn.notifies():
                            if await request.is_disconnected():
                                break
                            yield {"data": notify.payload, "event": "todo_added"}
            except Exception as e:
                logger.error(f"Stream error: {e}")
            finally:
                await pool.close()
    
        return EventSourceResponse(event_generator(), ping=10)
    
    if __name__ == "__main__":
        logger.add("app.log", rotation="1 MB", level="INFO")
        import uvicorn
        uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
    

    运行环境

  • 安装 asyncpg
    pip install asyncpg
    
  • 确保 PostgreSQL 运行并创建 todos 数据库。
  • 说明

  • 监听 PostgreSQL 的 todo_inserted 通道。
  • 使用 asyncpg 接收通知并通过 SSE 推送。
  • loguru 记录错误。
  • 基于 bunny.net 示例。

  • 6. 注意事项

  • GZipMiddleware 冲突
  • SSE 不兼容 GZipMiddleware,需禁用:
    app = Starlette(middleware=[])  # 移除 GZipMiddleware
    
  • 否则可能导致流式传输失败。
  • 非线程安全对象
  • 避免在生成器外创建数据库会话(如 AsyncSession):
    # ❌ 错误
    async def bad_route():
        async with AsyncSession() as session:
            async def generator():
                async for row in session.execute(select(User)):
                    yield dict(data=row)
            return EventSourceResponse(generator())
    
    # ✅ 正确
    async def good_route():
        async def generator():
            async with AsyncSession() as session:
                async for row in session.execute(select(User)):
                    yield dict(data=row)
        return EventSourceResponse(generator())
    
  • 确保会话在生成器内管理。
  • 服务器关闭
  • 停止所有生成器任务以避免警告:
    from sse_starlette.sse import AppStatus
    AppStatus.should_exit_event = None
    
  • 尤其在测试多 SSE 端点时(Issue #59)。
  • 客户端断开
  • 使用 request.is_disconnected() 检查客户端状态。
  • 捕获 asyncio.CancelledError 清理资源。
  • 测试问题
  • 多 SSE 端点测试可能引发 RuntimeError(事件循环绑定),使用 pytest 夹具修复:
    @pytest.fixture
    def reset_sse_starlette_appstatus_event():
        from sse_starlette.sse import AppStatus
        AppStatus.should_exit_event = None
    
  • 参考 Issue #59。
  • 浏览器限制
  • 浏览器对同一域名 SSE 连接数有限(通常 6 个),需高效管理连接。
  • 确保连接正确关闭(返回 HTTP 204 或显式关闭)。
  • 生产部署
  • 使用 uvicornmonkeypatch 优化信号处理:
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000, loop="uvloop")
    
  • 配合 Nginx 禁用缓冲:
    proxy_buffering off;
    proxy_cache off;
    
  • 参考 Stack Overflow 解决方案。

  • 7. 资源与文档

  • 官方文档:https://github.com/sysid/sse-starlette
  • PyPI 页面:https://pypi.org/project/sse-starlette/
  • 相关教程
  • FastAPI SSE 指南:https://devdojo.com/bobbyiliev/how-to-use-server-sent-events-sse-with-fastapi
  • Bunny.net SSE 示例:https://bunny.net/blog/what-is-sse-server-sent-events-and-how-do-they-work/
  • 社区
  • Stack Overflow(sse-starlette 标签):https://stackoverflow.com/questions/tagged/sse-starlette
  • GitHub Issues:https://github.com/sysid/sse-starlette/issues
  • 背景资料
  • SSE 协议:https://sysid.github.io/server-sent-events/
  • Starlette 文档:https://www.starlette.io/
  • 作者:彬彬侠

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【Python】sse_starlette库:实现Starlette与FastAPI框架的Server-Sent Events(SSE)支持详解

    发表回复