0%

使用Fastapi实现 server-sent events (SSE)

使用Fastapi实现 server-sent events (SSE)

服务器推送事件(SSE)是一种在不重新加载页面的情况下向浏览器发送数据的方式。这使得您可以使用流式数据并构建可用于各种情境的实时应用程序。

在本教程中,我们将使用FastAPI创建一个简单的SSE服务器,该服务器将每秒发送一条消息。

安装相关包

1
2
3
pip install "fastapi[all]"
pip install sse-starlette
pip install asyncio

创建一个简单的Fastapi项目

main.py

1
2
3
4
5
6
7
8
9
10
import asyncio
import uvicorn
from fastapi import FastAPI, Request

app = FastAPI()


@app.get("/")
async def root():
return {"message": "Hello World"}

使用uvicorn 运行

1
uvicorn main:app --reload

这将在8000端口上运行服务器。 –reload标志将自动重新加载服务器当你对代码进行更改时,这样你就不必每次更改时都重新启动服务器。

增加SSE逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
STREAM_DELAY = 1  # second
RETRY_TIMEOUT = 15000 # milisecond

@app.get('/stream')
async def message_stream(request: Request):
def new_messages():
# Add logic here to check for new messages
yield 'Hello World'
async def event_generator():
while True:
# If client closes connection, stop sending events
if await request.is_disconnected():
break

# Checks for new messages and return them to client if any
if new_messages():
yield {
"event": "new_message",
"id": "message_id",
"retry": RETRY_TIMEOUT,
"data": "message_content"
}

await asyncio.sleep(STREAM_DELAY)

return EventSourceResponse(event_generator())