【Python】sse_starlette库:实现Starlette与FastAPI框架的Server-Sent Events(SSE)支持详解
sse_starlette 是一个 Python 库,为 Starlette 和 FastAPI 框架提供对 Server-Sent Events(SSE,服务器推送事件)的支持。它通过 EventSourceResponse 类实现 SSE 协议,允许服务器异步向客户端推送实时数据,适合构建实时 Web 应用,如通知系统、实时仪表盘或流式数据更新。sse_starlette 轻量且与 ASGI 框架无缝集成,常用于需要高效单向通信的场景。
以下是对 sse_starlette 库的详细介绍,包括其功能、用法和实际应用,基于最新信息(截至 2025)。
1. sse_starlette 库的作用
asyncio 和 anyio 的异步任务组(TaskGroups),确保高效并发。近期动态:
ping 间隔和消息工厂(ping_message_factory)。anyio 的集成,优化任务管理。GZipMiddleware,需确保任务在关闭时清理以避免警告。2. 安装与环境要求
starlette:ASGI 框架基础。anyio:异步任务管理(最低 3.6.2)。uvicorn(运行 ASGI 应用)、httpx(测试)。pip install sse-starlette
pip install sse-starlette uvicorn httpx
import sse_starlette
print(sse_starlette.__version__) # 示例输出: 2.3.4
系统要求:
uvicorn 和 starlette 版本兼容(starlette>=0.26.1)。pytest 和 pytest-asyncio:
pip install pytest pytest-asyncio
3. 核心功能与用法
sse_starlette 提供 EventSourceResponse 类,用于创建 SSE 响应,支持异步生成器、自定义事件和连接管理。以下是主要功能和示例。
3.1 基本 SSE 实现
使用异步生成器推送事件。
import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse
async def numbers(minimum: int, maximum: int):
for i in range(minimum, maximum + 1):
await asyncio.sleep(0.9)
yield dict(data=i)
async def sse(request):
generator = numbers(1, 5)
return EventSourceResponse(generator)
routes = [
Route("/", endpoint=sse)
]
app = Starlette(debug=True, routes=routes)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
运行:
python sse_example.py
客户端测试(使用 curl):
curl -N http://localhost:8000
输出示例:
data: 1
data: 2
data: 3
data: 4
data: 5
说明:
EventSourceResponse 接受异步生成器,推送 data 字段。text/event-stream)。ping 保持连接。3.2 自定义事件
指定事件名称、ID 和多行数据。
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse, ServerSentEvent
import asyncio
import time
app = FastAPI()
@app.get("/stream")
async def sse_stream(request: Request):
async def event_generator():
for i in range(1, 6):
if await request.is_disconnected():
break
msg = f"Update {i}\nTime: {time.strftime('%H:%M:%S')}"
yield ServerSentEvent(
data=msg,
event="update",
id=str(i)
)
await asyncio.sleep(1)
return EventSourceResponse(event_generator(), ping=5)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
客户端 JavaScript:
<script>
const source = new EventSource("http://localhost:8000/stream");
source.addEventListener("update", (event) => {
console.log(`ID: ${event.id}, Data: ${event.data}`);
});
</script>
输出示例(浏览器控制台):
ID: 1, Data: Update 1
Time: 12:34:56
ID: 2, Data: Update 2
Time: 12:34:57
...
说明:
ServerSentEvent 支持 data、event、id 和 comment 字段。ping=5 每 5 秒发送 ping 保持连接。request.is_disconnected() 确保客户端断开时停止生成。3.3 处理客户端断开
优雅处理客户端关闭连接。
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
import logging
app = FastAPI()
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger(__name__)
@app.get("/endless")
async def endless(req: Request):
async def event_publisher():
i = 0
try:
while True:
i += 1
yield dict(data=i)
await asyncio.sleep(0.2)
except asyncio.CancelledError as e:
_log.info(f"Disconnected from client {req.client}")
raise e
return EventSourceResponse(event_publisher())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
说明:
asyncio.CancelledError 处理客户端断开(如刷新或关闭页面)。anyio.TaskGroups 确保任务安全管理。3.4 定向事件推送
向特定客户端发送事件。
from fastapi import FastAPI, Request
from pydantic import BaseModel
from sse_starlette.sse import EventSourceResponse
from collections import defaultdict
import asyncio
from typing import Optional
app = FastAPI()
clients = defaultdict(list)
class EmitEventModel(BaseModel):
event_name: str
event_data: Optional[str] = "No Event Data"
event_id: Optional[int] = None
recipient_id: str
async def retrieve_events(recipient_id: str):
yield dict(data="Connection established")
while True:
if recipient_id in clients and clients[recipient_id]:
yield clients[recipient_id].pop(0)
await asyncio.sleep(1)
@app.get("/subscribe/{recipient_id}")
async def loop_back_stream(req: Request, recipient_id: str):
return EventSourceResponse(retrieve_events(recipient_id))
@app.post("/emit")
async def emit_event(event: EmitEventModel):
clients[event.recipient_id].append({
"event": event.event_name,
"data": event.event_data,
"id": event.event_id
})
return {"status": "event queued"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
测试:
- 订阅客户端:
curl http://localhost:8000/subscribe/user1 - 发送事件:
curl -X POST http://localhost:8000/emit -H "Content-Type: application/json" -d '{"event_name": "notify", "event_data": "Hello user1", "recipient_id": "user1"}'
输出示例(客户端):
data: Connection established
event: notify
data: Hello user1
说明:
defaultdict 存储客户端事件队列。/subscribe/{recipient_id} 订阅事件流。/emit 向指定 recipient_id 推送事件。3.5 测试 SSE 端点
使用 httpx 和 httpx_sse 测试 SSE。
import asyncio
import httpx
from httpx_sse import aconnect_sse
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route
async def auth_events(request):
async def events():
yield {"event": "login", "data": '{"user_id": "4135"}'}
return EventSourceResponse(events())
app = Starlette(routes=[Route("/sse/auth/", endpoint=auth_events)])
async def test_sse():
async with httpx.AsyncClient(app=app) as client:
async with aconnect_sse(client, "GET", "http://localhost:8000/sse/auth/") as event_source:
events = [sse async for sse in event_source.aiter_sse()]
(sse,) = events
assert sse.event == "login"
assert sse.json() == {"user_id": "4135"}
if __name__ == "__main__":
asyncio.run(test_sse())
说明:
httpx_sse 的 aconnect_sse 测试 SSE 端点。httpx-sse:
pip install httpx-sse
4. 性能与特点
anyio.TaskGroups,支持高并发 SSE 连接。EventSourceResponse 封装 SSE 协议,简化开发。uvicorn 无缝集成。GZipMiddleware,可能导致压缩冲突。AsyncSession)需在生成器内创建。StreamingResponse:手动实现 SSE,代码复杂,缺乏协议支持。aiohttp 框架,不适用 Starlette/FastAPI。5. 实际应用场景
TAIL)。hx-sse 属性实现动态前端。示例(数据库流式更新):
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncpg
import asyncio
from loguru import logger
app = FastAPI()
async def get_db_pool():
return await asyncpg.create_pool(
user="postgres", password="postgres", database="todos", host="localhost"
)
@app.get("/stream")
async def message_stream(request: Request):
pool = await get_db_pool()
async def event_generator():
try:
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute("LISTEN todo_inserted")
async for notify in conn.notifies():
if await request.is_disconnected():
break
yield {"data": notify.payload, "event": "todo_added"}
except Exception as e:
logger.error(f"Stream error: {e}")
finally:
await pool.close()
return EventSourceResponse(event_generator(), ping=10)
if __name__ == "__main__":
logger.add("app.log", rotation="1 MB", level="INFO")
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")
运行环境:
asyncpg:
pip install asyncpg
todos 数据库。说明:
todo_inserted 通道。asyncpg 接收通知并通过 SSE 推送。loguru 记录错误。6. 注意事项
GZipMiddleware,需禁用:
app = Starlette(middleware=[]) # 移除 GZipMiddleware
AsyncSession):
# ❌ 错误
async def bad_route():
async with AsyncSession() as session:
async def generator():
async for row in session.execute(select(User)):
yield dict(data=row)
return EventSourceResponse(generator())
# ✅ 正确
async def good_route():
async def generator():
async with AsyncSession() as session:
async for row in session.execute(select(User)):
yield dict(data=row)
return EventSourceResponse(generator())
from sse_starlette.sse import AppStatus
AppStatus.should_exit_event = None
request.is_disconnected() 检查客户端状态。asyncio.CancelledError 清理资源。RuntimeError(事件循环绑定),使用 pytest 夹具修复:
@pytest.fixture
def reset_sse_starlette_appstatus_event():
from sse_starlette.sse import AppStatus
AppStatus.should_exit_event = None
uvicorn 的 monkeypatch 优化信号处理:
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, loop="uvloop")
proxy_buffering off;
proxy_cache off;
7. 资源与文档
sse-starlette 标签):https://stackoverflow.com/questions/tagged/sse-starlette作者:彬彬侠