当前位置: 首页 > news >正文

python sse的是什么?

1.sse是什么?

 

是一个通信协议。最主要的是, 服务器必须发送一个 Content-Type 为 text/event-stream 的响应头,这告诉客户端:“接下来我发送的不是普通的 HTML 或 JSON,而是一个事件流。”

=====================

是的,完全正确!您可以非常准确地将 SSE (Server-Sent Events) 理解为一个通信标准通信协议规范

它不是一个独立的协议(像 TCP 或 HTTP 那样),而是构建在 HTTP 协议之上的一套标准。

这个标准由 WHATWG(网页超文本应用技术工作组,负责制定 HTML、DOM 等现代 Web 标准的组织)在 HTML Living Standard 中进行了详细定义。

这个标准具体规定了以下几个核心方面:

  1. 服务器的角色和响应格式 (Server's Role & Response Format):

    • MIME 类型: 服务器必须发送一个 Content-Typetext/event-stream 的响应头,这告诉客户端:“接下来我发送的不是普通的 HTML 或 JSON,而是一个事件流。”
    • 连接保持: 服务器需要保持这个 HTTP 连接持续打开。
    • 消息结构: 发送的每一条消息都必须遵循特定的文本格式。最基本的就是 data: <message>\n\n。此外,标准还定义了 id (消息ID)、event (事件类型)、retry (重连时间) 等可选字段。
  2. 客户端的行为和 API (Client's Behavior & API):

    • EventSource 接口: 标准定义了浏览器必须提供一个 EventSource JavaScript API。开发者通过 new EventSource('/my-stream-url') 来创建连接。
    • 事件监听: 客户端通过 .onmessage.onerror 以及使用 .addEventListener('custom_event', ...) 来监听不同类型的消息。
    • 自动重连: 这是 SSE 标准一个非常重要的特性。如果网络连接意外断开,遵循标准的客户端(例如浏览器)会自动尝试重新连接到服务器。服务器可以通过 retry: 字段来建议客户端重连的间隔时间。

把它和类似技术做个对比,您会更清晰:

  • 对比 WebSocket: WebSocket 是一个完全独立的、双向的协议(协议头是 ws://wss://)。而 SSE 只是对 HTTP 协议的一种特殊用法,是单向的(服务器 -> 客户端)。因此,SSE 更轻量、实现更简单。
  • 对比长轮询 (Long Polling): 长轮询是一种“模拟”推送的技术,客户端发起请求,服务器“hold”住不回,直到有数据才返回,然后客户端立即再次发起新请求。这会产生大量的请求开销。而 SSE 只需要建立一次连接,效率高得多。

总结来说:

称 SSE 为一个通信标准非常准确。它为“服务器如何通过 HTTP 将数据流式地、单向地推送给客户端”这一特定场景,提供了一套完整、可靠且被各大浏览器广泛支持的规则和实现机制。

2.如何写一个sse的示例代码?

重点:

在于这个request库种的stream=True 这个字段打开。

 

=================

好的,我们来详细探讨一下如何使用 Python 处理服务器发送事件 (Server-Sent Events, SSE)。

SSE 是一种允许服务器向客户端单向推送数据的 Web 技术。它基于 HTTP 协议,非常适合实现服务端的流式数据传输,例如实时更新、消息通知、数据仪表盘等。

与 WebSocket 相比,SSE 更简单,因为它只是标准的 HTTP,不需要新的协议。但它是单向的(服务器 -> 客户端),而 WebSocket 是双向的。

处理 SSE 分为两部分:服务端实现客户端接收

1. 服务端实现 (Server)

在服务端,你需要创建一个 HTTP 端点,该端点的响应头 Content-Type 必须是 text/event-stream。然后,服务器保持该连接打开,并以特定格式持续发送数据。

最核心的部分是使用生成器 (Generator) 来流式地 yield 数据。

这里以最常见的 Web 框架 Flask 为例,展示如何创建一个 SSE 端点。

import time
from flask import Flask, Responseapp = Flask(__name__)@app.route('/stream')
def stream():def event_stream():count = 0while True:# 等待一秒time.sleep(1)# 发送数据,必须以 "data: " 开头,并以 "\n\n" 结尾yield f"data: 这是一条消息,ID: {count}\n\n"count += 1# 返回一个流式响应return Response(event_stream(), mimetype='text/event-stream')if __name__ == '__main__':app.run(threaded=True, port=5000)

代码解释:

  1. event_stream() 是一个生成器函数,它在一个无限循环中不断生成数据。
  2. yield f"data: ...\n\n" 是 SSE 的核心。
    • data: 是固定的前缀。
    • 后面跟着你的消息内容。
    • \n\n (两个换行符) 表示一条消息的结束。
  3. Response(event_stream(), mimetype='text/event-stream') 将生成器包装成一个 Flask 响应,并设置正确的 MIME 类型,告诉浏览器这是一个事件流。

2. 客户端接收 (Client)

在客户端,你需要连接到服务器的 /stream 端点,并循环读取响应流。

方法一:使用 requests

requests 库可以通过设置 stream=True 来处理流式响应。

import requests# 连接到 SSE 端点
with requests.get('http://127.0.0.1:5000/stream', stream=True) as r:# 检查响应头if 'text/event-stream' not in r.headers.get('Content-Type', ''):print("错误:服务器未使用 'text/event-stream'")exit()# 逐行迭代响应内容for line in r.iter_lines():# 过滤掉空行或注释行if line and line.startswith(b'data:'):# 解码并打印数据部分data = line.split(b':', 1)[1].strip()print(f"收到数据: {data.decode('utf-8')}")

代码解释:

  1. requests.get(..., stream=True) 是关键,它告诉 requests 不要立即下载所有内容,而是保持连接并允许你迭代响应。
  2. r.iter_lines() 会逐行读取响应流。
  3. 我们检查每一行是否以 data: 开头,然后提取并打印真实的数据。

方法二:使用 sseclient-py

这是一个专门为处理 SSE 设计的库,它能更好地处理重连、idevent 等 SSE 协议的细节,使用起来更方便。

首先,你需要安装它:pip install sseclient-py

import sseclient# 创建一个 SSE 客户端
client = sseclient.SSEClient('http://127.0.0.1:5000/stream')# 迭代事件
for event in client.events():print(f"收到数据: {event.data}")

这个库极大地简化了客户端代码。它会自动处理消息边界、解析 data 字段,并将其作为 event 对象提供给你。


为了让你更方便地测试,我将把一个完整的、可运行的服务端和客户端示例放在下面的代码文件中。你可以直接运行它来观察效果。

import time
import threading
import requests
from flask import Flask, Response# ==============================================================================
# 服务端 (Server)
# ==============================================================================
def create_sse_server():"""创建一个 Flask 应用作为 SSE 服务器"""app = Flask(__name__)@app.route('/stream')def stream():def event_stream():"""一个生成器函数,用于流式传输事件"""count = 0print("服务器:客户端已连接,开始发送事件...")try:while True:# 每隔一秒发送一次数据time.sleep(1)# SSE 消息格式: "data: <your_message>\n\n"message = f"这是第 {count} 条消息"yield f"data: {message}\n\n"print(f"服务器:已发送 -> '{message}'")count += 1except GeneratorExit:# 当客户端断开连接时,生成器会收到一个 GeneratorExit 异常print("服务器:客户端已断开连接。")# 返回一个流式响应,MIME 类型为 'text/event-stream'return Response(event_stream(), mimetype='text/event-stream')print("服务器:Flask SSE 服务器正在启动...")# 在单独的线程中运行 Flask 服务器,以避免阻塞主线程# 在生产环境中,应使用 Gunicorn 或 uWSGI 等 WSGI 服务器server_thread = threading.Thread(target=lambda: app.run(threaded=True, port=5000, host='0.0.0.0'))server_thread.daemon = Trueserver_thread.start()print("服务器:服务器已在 http://0.0.0.0:5000 上运行")# ==============================================================================
# 客户端 (Client)
# ==============================================================================
def run_sse_client():"""运行一个 SSE 客户端来接收事件"""sse_url = 'http://127.0.0.1:5000/stream'print(f"\n客户端:正在连接到 SSE 服务器 at {sse_url}...")try:# 使用 requests 库并设置 stream=True 来接收流式响应with requests.get(sse_url, stream=True, timeout=10) as response:# 确保服务器返回了正确的 Content-Typeif 'text/event-stream' not in response.headers.get('Content-Type', ''):print(f"客户端错误:无效的 Content-Type: {response.headers.get('Content-Type')}")returnprint("客户端:连接成功,开始监听事件...")# 使用 iter_lines 迭代来自服务器的每一行数据for line in response.iter_lines():# SSE 消息以 "data:" 开头if line and line.startswith(b'data:'):# 从 "data:" 前缀后提取消息内容# line 是 bytes 类型,需要解码为 utf-8 字符串data = line.split(b':', 1)[1].strip()print(f"客户端:收到数据 -> {data.decode('utf-8')}")except requests.exceptions.RequestException as e:print(f"\n客户端:连接错误: {e}")print("客户端:请确保服务器正在运行。")except KeyboardInterrupt:print("\n客户端:用户中断,正在关闭。")# ==============================================================================
# 主程序入口
# ==============================================================================
if __name__ == '__main__':# 1. 启动 SSE 服务器create_sse_server()# 等待服务器启动print("主程序:等待2秒让服务器完全启动...")time.sleep(2)# 2. 运行 SSE 客户端run_sse_client()print("\n主程序:客户端已停止。服务器仍在后台线程运行。")# 主线程将在这里结束,但后台的服务器线程会因为 daemon=True 而随之退出。

这个示例将启动一个 Flask 服务器,然后客户端会连接到它并实时打印出服务器发送的流式消息。你可以直接运行此文件来体验整个流程。

3.如果不设置stream=True会怎么样?

程序会卡住,默认模式,是等待内容体的结果来展示,因为是流式,等不到内容体的全部,就会一直在等待。

=============

是的,您的理解非常精准。

如果您不设置 stream=Truerequests 库会采用其默认的“缓冲”行为,这确实会导致连接通路在您能处理数据之前就已经被处理完毕,对于 SSE 这样的流式协议来说是行不通的。

我们来深入剖析一下这两种模式的区别:

默认行为 (不设置 stream=True) - 缓冲模式

当您执行一个普通的 requests.get('...') 时,会发生以下情况:

  1. 发送请求: requests 向服务器发送 HTTP 请求。
  2. 等待并下载所有内容: 库会一直等待,直到服务器响应了所有的数据。它会将整个响应体(response body)下载下来,并存储在内存中。
  3. 加载到内存: 所有数据被完整加载到 response 对象的 contenttext 属性里。
  4. 返回并关闭: 只有当所有内容都下载完毕后get() 函数才会返回 response 对象给您的代码。此时,底层的 TCP 连接通常会被释放回连接池或直接关闭,因为 requests 认为这次通信已经“完成”了。

这对于 SSE 为什么是致命的?

SSE 的服务器永远不会“完成”响应。它会一直保持连接打开,以便随时推送新的数据。

所以,如果客户端使用默认的 requests 模式去连接 SSE 端点,客户端会永远等待下去,因为它总是在等待那个永远不会到来的“响应结束”信号。您的 requests.get() 调用会一直被阻塞,程序就卡在那里了。

设置 stream=True - 流式模式

当您执行 requests.get('...', stream=True) 时,行为完全不同:

  1. 发送请求: requests 向服务器发送 HTTP 请求。
  2. 只下载响应头: 库会立即返回,只要它接收到了服务器的响应头(headers)。它不会去下载整个响应体。
  3. 连接保持打开: 此时,底层的 TCP 连接是保持活动的。response 对象内部保留了这个连接的“通路”。
  4. 按需读取: 您现在可以通过 response.iter_lines()response.iter_content() 这样的方法,主动地、一块一块地从这个打开的连接中读取数据。每当新数据从服务器传来,您就能通过迭代读到它。

这为什么适用于 SSE?

这个模式完美契合了 SSE 的工作方式。客户端不会傻等响应结束,而是建立连接后马上开始监听,服务器可以随时通过这个“通路”推送数据,客户端也能实时地一行一行接收和处理。

连接何时关闭?

stream=True 模式下,连接会在以下几种情况关闭:

  1. 您读取完所有数据: 当您通过迭代(例如 for 循环)把服务器发送的所有内容都读取完毕(对于 SSE 来说,这意味着服务器主动关闭了连接)。
  2. with 语句结束: 当包裹 requests.getwith 代码块执行完毕时,requests 会自动为您关闭连接。这是处理流式请求的最佳实践。
  3. 服务器关闭连接: 服务器可以单方面决定关闭连接。
  4. 网络错误或超时: 发生网络问题。

总结对比

特性 默认模式 (stream=False) 流式模式 (stream=True)
行为 下载所有内容到内存,然后才返回。 只下载响应头,立即返回。
response.content 立即可用,包含全部数据。 不可用,访问会抛出异常。必须迭代读取。
连接状态 请求完成后立即关闭/释放。 保持打开,直到您读取完毕或手动关闭。
适用场景 获取大小可预期的、完整的资源(JSON、HTML页面、小文件)。 处理大型文件、实时数据流 (SSE)、无法预知大小的响应。
对 SSE 的影响 程序会卡住,永远等待响应结束。 正确的工作方式,可以实时接收服务器推送的事件。

 

参考资料:

https://zhuanlan.zhihu.com/p/634581294

AI:gemini 2.5 pro

http://www.hskmm.com/?act=detail&tid=32593

相关文章:

  • idea代码阿里格式化
  • 万字长文详述单据引擎原理、流程、单据管理 - 智慧园区
  • windows 链接共享打印机出现错误0x00000709?打印机0x0000011b错误?0x0000bcd、0x00000709、0x00000011b
  • 解码Linux文件IO目录检索与文件属性
  • p66实验题
  • 【比赛记录】2025NOIP 冲刺模拟赛合集I
  • 20251016
  • C# - 串口助手
  • 使用SpringBoot+MyBatisPlus实现增删改查
  • P4168 [Violet] 蒲公英题解
  • Java了解
  • VGG使用块的网络
  • 使用SpringBoot + Thymeleaf + MyBatisPlus实现一个简单的书籍管理系统
  • Linux 基础
  • P2605 [ZJOI2010] 基站选址
  • NVIDIA Jetson AGX Xavier刷机教程
  • 洛谷p1462-通往奥格瑞码道路
  • 详细介绍:VR 太阳光参数与快速渲染
  • 位运算中没用的小技巧
  • 第六周
  • 超越基础:SightAI 智能路由与多模型选择实战 - sight
  • [Vulhub靶机]JARBAS靶机渗透
  • 10月16号
  • CF622D 题解
  • vue学习的总结
  • 最小二乘问题详解5:非线性最小二乘求解实例
  • AlexNet
  • 【28】C# WinForm入门到精通 ——多文档窗体MDI【属性、强大的方法、实例、源码】【多窗口重叠、水平平铺、垂直平铺、窗体传值】
  • 第五周预习
  • 2025 非标门/铸铝门/别墅大门厂家推荐榜:聚焦品质与服务的实力之选