1.sse是什么?
是一个通信协议。最主要的是, 服务器必须发送一个 Content-Type
为 text/event-stream
的响应头,这告诉客户端:“接下来我发送的不是普通的 HTML 或 JSON,而是一个事件流。”
=====================
是的,完全正确!您可以非常准确地将 SSE (Server-Sent Events) 理解为一个通信标准或通信协议规范。
它不是一个独立的协议(像 TCP 或 HTTP 那样),而是构建在 HTTP 协议之上的一套标准。
这个标准由 WHATWG(网页超文本应用技术工作组,负责制定 HTML、DOM 等现代 Web 标准的组织)在 HTML Living Standard 中进行了详细定义。
这个标准具体规定了以下几个核心方面:
-
服务器的角色和响应格式 (Server's Role & Response Format):
- MIME 类型: 服务器必须发送一个
Content-Type
为text/event-stream
的响应头,这告诉客户端:“接下来我发送的不是普通的 HTML 或 JSON,而是一个事件流。” - 连接保持: 服务器需要保持这个 HTTP 连接持续打开。
- 消息结构: 发送的每一条消息都必须遵循特定的文本格式。最基本的就是
data: <message>\n\n
。此外,标准还定义了id
(消息ID)、event
(事件类型)、retry
(重连时间) 等可选字段。
- MIME 类型: 服务器必须发送一个
-
客户端的行为和 API (Client's Behavior & API):
- EventSource 接口: 标准定义了浏览器必须提供一个
EventSource
JavaScript API。开发者通过new EventSource('/my-stream-url')
来创建连接。 - 事件监听: 客户端通过
.onmessage
、.onerror
以及使用.addEventListener('custom_event', ...)
来监听不同类型的消息。 - 自动重连: 这是 SSE 标准一个非常重要的特性。如果网络连接意外断开,遵循标准的客户端(例如浏览器)会自动尝试重新连接到服务器。服务器可以通过
retry:
字段来建议客户端重连的间隔时间。
- EventSource 接口: 标准定义了浏览器必须提供一个
把它和类似技术做个对比,您会更清晰:
- 对比 WebSocket: WebSocket 是一个完全独立的、双向的协议(协议头是
ws://
或wss://
)。而 SSE 只是对 HTTP 协议的一种特殊用法,是单向的(服务器 -> 客户端)。因此,SSE 更轻量、实现更简单。 - 对比长轮询 (Long Polling): 长轮询是一种“模拟”推送的技术,客户端发起请求,服务器“hold”住不回,直到有数据才返回,然后客户端立即再次发起新请求。这会产生大量的请求开销。而 SSE 只需要建立一次连接,效率高得多。
总结来说:
称 SSE 为一个通信标准非常准确。它为“服务器如何通过 HTTP 将数据流式地、单向地推送给客户端”这一特定场景,提供了一套完整、可靠且被各大浏览器广泛支持的规则和实现机制。
2.如何写一个sse的示例代码?
重点:
在于这个request库种的stream=True 这个字段打开。
=================
好的,我们来详细探讨一下如何使用 Python 处理服务器发送事件 (Server-Sent Events, SSE)。
SSE 是一种允许服务器向客户端单向推送数据的 Web 技术。它基于 HTTP 协议,非常适合实现服务端的流式数据传输,例如实时更新、消息通知、数据仪表盘等。
与 WebSocket 相比,SSE 更简单,因为它只是标准的 HTTP,不需要新的协议。但它是单向的(服务器 -> 客户端),而 WebSocket 是双向的。
处理 SSE 分为两部分:服务端实现和客户端接收。
1. 服务端实现 (Server)
在服务端,你需要创建一个 HTTP 端点,该端点的响应头 Content-Type
必须是 text/event-stream
。然后,服务器保持该连接打开,并以特定格式持续发送数据。
最核心的部分是使用生成器 (Generator) 来流式地 yield
数据。
这里以最常见的 Web 框架 Flask 为例,展示如何创建一个 SSE 端点。
import time
from flask import Flask, Responseapp = Flask(__name__)@app.route('/stream')
def stream():def event_stream():count = 0while True:# 等待一秒time.sleep(1)# 发送数据,必须以 "data: " 开头,并以 "\n\n" 结尾yield f"data: 这是一条消息,ID: {count}\n\n"count += 1# 返回一个流式响应return Response(event_stream(), mimetype='text/event-stream')if __name__ == '__main__':app.run(threaded=True, port=5000)
代码解释:
event_stream()
是一个生成器函数,它在一个无限循环中不断生成数据。yield f"data: ...\n\n"
是 SSE 的核心。data:
是固定的前缀。- 后面跟着你的消息内容。
\n\n
(两个换行符) 表示一条消息的结束。
Response(event_stream(), mimetype='text/event-stream')
将生成器包装成一个 Flask 响应,并设置正确的 MIME 类型,告诉浏览器这是一个事件流。
2. 客户端接收 (Client)
在客户端,你需要连接到服务器的 /stream
端点,并循环读取响应流。
方法一:使用 requests
库
requests
库可以通过设置 stream=True
来处理流式响应。
import requests# 连接到 SSE 端点
with requests.get('http://127.0.0.1:5000/stream', stream=True) as r:# 检查响应头if 'text/event-stream' not in r.headers.get('Content-Type', ''):print("错误:服务器未使用 'text/event-stream'")exit()# 逐行迭代响应内容for line in r.iter_lines():# 过滤掉空行或注释行if line and line.startswith(b'data:'):# 解码并打印数据部分data = line.split(b':', 1)[1].strip()print(f"收到数据: {data.decode('utf-8')}")
代码解释:
requests.get(..., stream=True)
是关键,它告诉requests
不要立即下载所有内容,而是保持连接并允许你迭代响应。r.iter_lines()
会逐行读取响应流。- 我们检查每一行是否以
data:
开头,然后提取并打印真实的数据。
方法二:使用 sseclient-py
库
这是一个专门为处理 SSE 设计的库,它能更好地处理重连、id
、event
等 SSE 协议的细节,使用起来更方便。
首先,你需要安装它:pip install sseclient-py
import sseclient# 创建一个 SSE 客户端
client = sseclient.SSEClient('http://127.0.0.1:5000/stream')# 迭代事件
for event in client.events():print(f"收到数据: {event.data}")
这个库极大地简化了客户端代码。它会自动处理消息边界、解析 data
字段,并将其作为 event
对象提供给你。
为了让你更方便地测试,我将把一个完整的、可运行的服务端和客户端示例放在下面的代码文件中。你可以直接运行它来观察效果。
import time
import threading
import requests
from flask import Flask, Response# ==============================================================================
# 服务端 (Server)
# ==============================================================================
def create_sse_server():"""创建一个 Flask 应用作为 SSE 服务器"""app = Flask(__name__)@app.route('/stream')def stream():def event_stream():"""一个生成器函数,用于流式传输事件"""count = 0print("服务器:客户端已连接,开始发送事件...")try:while True:# 每隔一秒发送一次数据time.sleep(1)# SSE 消息格式: "data: <your_message>\n\n"message = f"这是第 {count} 条消息"yield f"data: {message}\n\n"print(f"服务器:已发送 -> '{message}'")count += 1except GeneratorExit:# 当客户端断开连接时,生成器会收到一个 GeneratorExit 异常print("服务器:客户端已断开连接。")# 返回一个流式响应,MIME 类型为 'text/event-stream'return Response(event_stream(), mimetype='text/event-stream')print("服务器:Flask SSE 服务器正在启动...")# 在单独的线程中运行 Flask 服务器,以避免阻塞主线程# 在生产环境中,应使用 Gunicorn 或 uWSGI 等 WSGI 服务器server_thread = threading.Thread(target=lambda: app.run(threaded=True, port=5000, host='0.0.0.0'))server_thread.daemon = Trueserver_thread.start()print("服务器:服务器已在 http://0.0.0.0:5000 上运行")# ==============================================================================
# 客户端 (Client)
# ==============================================================================
def run_sse_client():"""运行一个 SSE 客户端来接收事件"""sse_url = 'http://127.0.0.1:5000/stream'print(f"\n客户端:正在连接到 SSE 服务器 at {sse_url}...")try:# 使用 requests 库并设置 stream=True 来接收流式响应with requests.get(sse_url, stream=True, timeout=10) as response:# 确保服务器返回了正确的 Content-Typeif 'text/event-stream' not in response.headers.get('Content-Type', ''):print(f"客户端错误:无效的 Content-Type: {response.headers.get('Content-Type')}")returnprint("客户端:连接成功,开始监听事件...")# 使用 iter_lines 迭代来自服务器的每一行数据for line in response.iter_lines():# SSE 消息以 "data:" 开头if line and line.startswith(b'data:'):# 从 "data:" 前缀后提取消息内容# line 是 bytes 类型,需要解码为 utf-8 字符串data = line.split(b':', 1)[1].strip()print(f"客户端:收到数据 -> {data.decode('utf-8')}")except requests.exceptions.RequestException as e:print(f"\n客户端:连接错误: {e}")print("客户端:请确保服务器正在运行。")except KeyboardInterrupt:print("\n客户端:用户中断,正在关闭。")# ==============================================================================
# 主程序入口
# ==============================================================================
if __name__ == '__main__':# 1. 启动 SSE 服务器create_sse_server()# 等待服务器启动print("主程序:等待2秒让服务器完全启动...")time.sleep(2)# 2. 运行 SSE 客户端run_sse_client()print("\n主程序:客户端已停止。服务器仍在后台线程运行。")# 主线程将在这里结束,但后台的服务器线程会因为 daemon=True 而随之退出。
这个示例将启动一个 Flask 服务器,然后客户端会连接到它并实时打印出服务器发送的流式消息。你可以直接运行此文件来体验整个流程。
3.如果不设置stream=True会怎么样?
程序会卡住,默认模式,是等待内容体的结果来展示,因为是流式,等不到内容体的全部,就会一直在等待。
=============
是的,您的理解非常精准。
如果您不设置 stream=True
,requests
库会采用其默认的“缓冲”行为,这确实会导致连接通路在您能处理数据之前就已经被处理完毕,对于 SSE 这样的流式协议来说是行不通的。
我们来深入剖析一下这两种模式的区别:
默认行为 (不设置 stream=True
) - 缓冲模式
当您执行一个普通的 requests.get('...')
时,会发生以下情况:
- 发送请求:
requests
向服务器发送 HTTP 请求。 - 等待并下载所有内容: 库会一直等待,直到服务器响应了所有的数据。它会将整个响应体(response body)下载下来,并存储在内存中。
- 加载到内存: 所有数据被完整加载到
response
对象的content
或text
属性里。 - 返回并关闭: 只有当所有内容都下载完毕后,
get()
函数才会返回response
对象给您的代码。此时,底层的 TCP 连接通常会被释放回连接池或直接关闭,因为requests
认为这次通信已经“完成”了。
这对于 SSE 为什么是致命的?
SSE 的服务器永远不会“完成”响应。它会一直保持连接打开,以便随时推送新的数据。
所以,如果客户端使用默认的 requests
模式去连接 SSE 端点,客户端会永远等待下去,因为它总是在等待那个永远不会到来的“响应结束”信号。您的 requests.get()
调用会一直被阻塞,程序就卡在那里了。
设置 stream=True
- 流式模式
当您执行 requests.get('...', stream=True)
时,行为完全不同:
- 发送请求:
requests
向服务器发送 HTTP 请求。 - 只下载响应头: 库会立即返回,只要它接收到了服务器的响应头(headers)。它不会去下载整个响应体。
- 连接保持打开: 此时,底层的 TCP 连接是保持活动的。
response
对象内部保留了这个连接的“通路”。 - 按需读取: 您现在可以通过
response.iter_lines()
或response.iter_content()
这样的方法,主动地、一块一块地从这个打开的连接中读取数据。每当新数据从服务器传来,您就能通过迭代读到它。
这为什么适用于 SSE?
这个模式完美契合了 SSE 的工作方式。客户端不会傻等响应结束,而是建立连接后马上开始监听,服务器可以随时通过这个“通路”推送数据,客户端也能实时地一行一行接收和处理。
连接何时关闭?
在 stream=True
模式下,连接会在以下几种情况关闭:
- 您读取完所有数据: 当您通过迭代(例如
for
循环)把服务器发送的所有内容都读取完毕(对于 SSE 来说,这意味着服务器主动关闭了连接)。 with
语句结束: 当包裹requests.get
的with
代码块执行完毕时,requests
会自动为您关闭连接。这是处理流式请求的最佳实践。- 服务器关闭连接: 服务器可以单方面决定关闭连接。
- 网络错误或超时: 发生网络问题。
总结对比
特性 | 默认模式 (stream=False ) |
流式模式 (stream=True ) |
---|---|---|
行为 | 下载所有内容到内存,然后才返回。 | 只下载响应头,立即返回。 |
response.content |
立即可用,包含全部数据。 | 不可用,访问会抛出异常。必须迭代读取。 |
连接状态 | 请求完成后立即关闭/释放。 | 保持打开,直到您读取完毕或手动关闭。 |
适用场景 | 获取大小可预期的、完整的资源(JSON、HTML页面、小文件)。 | 处理大型文件、实时数据流 (SSE)、无法预知大小的响应。 |
对 SSE 的影响 | 程序会卡住,永远等待响应结束。 | 正确的工作方式,可以实时接收服务器推送的事件。 |
参考资料:
https://zhuanlan.zhihu.com/p/634581294
AI:gemini 2.5 pro