SpringBoot手动实现流式输出方案整理以及SSE规范输出详解
背景:
最近做流式输出时,一直使用python实现的,应需求方的要求,需要通过java应用做一次封装并在java侧完成系统鉴权、模型鉴权等功能后才能真正去调用智能体应用,基于此调研java实现流式输出的几种方式,并完成与python服务对接的方案。
方案:
- 使用Servlet原生API实现流式输出
- 使用ResponseBodyEmitter实现异步流式输出
- 使用SseEmitter实现服务器发送事件(SSE)
- 使用WebFlux实现响应式流式输出
- 使用Spring MVC的StreamingResponseBody
- websockt
说一下我的业务场景,我原本的前后端适配已经按照SSE规范完成了功能,因此新写接口时也采用SSE规范,避免同一个系统中前端出现多种方式的调用,而且我的python微服务采用SSE规范,当时第一反应采用Feign去调用接口返回即可,但是使用后发现Openfeign支持这种调用不友好,因此接口对接这里采用的是WebClient。因此本文着重说一下SSE规范调用
一、SSE是什么
SSE (Server-Sent Events) 是一种基于HTTP的服务器向客户端推送数据的Web技术规范,它允许服务器单向地向客户端发送事件流。以下是SSE规范的全面解析:
1.基本概念
SSE是HTML5标准的一部分,主要特点包括:
-
单向通信:仅服务器→客户端方向
-
基于HTTP:使用普通HTTP连接
-
文本协议:事件以纯文本格式传输
-
自动重连:内置连接恢复机制
-
简单易用:比WebSocket更轻量级
2. 协议格式
SSE事件流是一个UTF-8编码的文本流,包含以下字段(每个字段以\n
结尾):
event: message\n
id: 123\n
retry: 5000\n
data: {\n
data: "name": "John",\n
data: "age": 30\n
data: }\n\n
data
: 有效载荷内容(可多行,每行需加"data: "前缀)
event
: 自定义事件类型(默认"message")
id
: 事件ID(用于断线重连时定位)
retry
: 重连时间(毫秒)
服务器响为:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
3.客户端API
浏览器端JavaScript使用EventSource
接口:
const eventSource = new EventSource('/sse-endpoint');// 监听默认事件
eventSource.onmessage = (e) => {console.log('Message:', e.data);
};// 监听自定义事件
eventSource.addEventListener('customEvent', (e) => {console.log('Custom event:', e.data);
});// 错误处理
eventSource.onerror = (e) => {console.error('SSE error:', e);
};
4.与相关技术的对比
特性 | SSE | WebSocket | Long Polling |
---|---|---|---|
方向 | 单向(服务器→客户端) | 双向 | 单向(轮询) |
协议 | HTTP | WS/WSS | HTTP |
连接管理 | 自动重连 | 需手动处理 | 每次请求新建连接 |
数据格式 | 文本 | 二进制/文本 | 文本 |
复杂度 | 低 | 中 | 低 |
5. 适用场景
SSE特别适合:
-
实时通知(新闻、股价、天气)
-
日志流监控
-
进度报告(文件处理、任务执行)
-
社交媒体动态更新
-
需要简单实时功能但不需要双向通信的场景
虽然WebSocket更强大,但SSE仍有很多优势:
-
更简单的实现
-
自动利用HTTP/2的多路复用
-
不需要额外的协议升级
-
被所有现代浏览器支持(IE除外)
二、WebClient
1.概念
WebClient 是 Spring Framework 5中引入的一个基于响应式编程模型的 HTTP客户端 ,主要用于执行HTTP请求。相比传统的 RestTemplate ,WebClient采用了 Reactor库 ,支持非阻塞式(异步)调用,能够充分利用多核CPU资源,特别适合高并发场景。
-
WebClient 是 线程安全 的,适合作为单例Bean复用。
-
底层使用 连接池(默认基于Reactor Netty),减少重复创建连接的开销。
2.与OpenFeign比较
推荐方案:优先使用WebClient + Service分层架构
原因:WebClient原生支持响应式流处理,更适合SSE场景,而OpenFeign更适合普通REST调用
备选方案:使用OpenFeign(需要特殊配置)
注意:需要Spring Cloud 2020.0.3+版本和响应式Feign支持
特性 | WebClient方案 | OpenFeign方案 |
---|---|---|
响应式支持 | ✅ 原生支持 | ⚠️ 需要特殊配置 |
代码复杂度 | 简单 | 较复杂 |
维护性 | 高 | 中 |
性能 | 高(非阻塞IO) | 中等 |
连接池管理 | 自动 | 需要手动配置 |
适合场景 | 高并发流式处理 | 简单接口调用 |
3.与传统RestTemplate对比
特性 | WebClient(推荐) | RestTemplate(过时) |
---|---|---|
协议支持 | HTTP/1.1, HTTP/2, WebSocket | 仅HTTP/1.1 |
编程模型 | 响应式(Reactive) | 同步阻塞 |
性能 | 非阻塞IO,高并发 | 线程池阻塞模型 |
配置方式 | Builder模式,灵活扩展 | 直接实例化 |
优势
-
响应式集成:完美兼容Spring WebFlux。
-
函数式API:链式调用更清晰。
-
更好的性能:基于Project Reactor和Netty。
三、代码实现
1.基础实现
@RestController
public class SseController {@GetMapping("/sse-stream")public SseEmitter streamSse() {SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {SseEmitter.SseEventBuilder event = SseEmitter.event().data("SSE Event " + i).id(String.valueOf(i)).name("sse-event");emitter.send(event);Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}
2.业务进阶
2.1 依赖配置:
在pom.xml中添加必要依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
2.2 WebClient配置:
-
使用
WebClient
创建HTTP客户端,支持响应式流处理 -
配置第三方SSE接口地址和必要的请求头(如认证信息)
WebClient配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;@Configuration
public class WebClientConfig {@Beanpublic WebClient webClient() {return WebClient.builder().baseUrl("https://api.example.com").build();}
}
2.3Service层实现
@Service
public class WebClientSseService {@Autowiredprivate WebClient webClient;public Flux<String> streamEvents() {System.out.println("前置校验。。。。");Flux<String> resFlux = null;try{resFlux = webClient.get().uri("/stream").accept(MediaType.TEXT_EVENT_STREAM).retrieve().bodyToFlux(String.class).map(data -> {// 处理原始SSE数据#if (data.startsWith("data:")) {#return data.substring(5).trim();#}return data;});}catch (Exception exception){resFlux = Flux.just("{'status': 'Error', 'message': '"+exception.getMessage()+"'}");}return resFlux;}
}
2.4 Controller
// application-web模块
@RestController
public class DataStreamController {@PostMapping(value = "/stream",consumes = MediaType.MULTIPART_FORM_DATA_VALUE,produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> provideStream(@RequestBody StreamRequest request) {return dataProcessor.streamEvents(request);}@PostMapping(value = "/stream",produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> provideStream(@RequestParam(name = "file", required = false) MultipartFile file, @RequestParam Map<String, Object> jsonObject) {return dataProcessor.streamEvents(file, jsonObject);}
}
解释一下这两个参数:
consumes = MULTIPART_FORM_DATA_VALUE,
produces = TEXT_EVENT_STREAM_VALUE
consumes、produces 两个参数的作用与区别
参数 | 作用 | 示例值 |
---|---|---|
consumes = MULTIPART_FORM_DATA_VALUE | 声明接口接收的请求内容类型(客户端→服务端) | multipart/form-data |
produces = TEXT_EVENT_STREAM_VALUE | 声明接口返回的响应内容类型(服务端→客户端) | text/event-stream |
为什么需要同时声明?
-
输入输出分离原则:
-
输入(consumes):处理文件上传需要
multipart/form-data
-
输出(produces):SSE流式响应需要
text/event-stream
-
-
HTTP协议规范:
POST /upload HTTP/1.1
Content-Type: multipart/form-data ← 对应consumes
Accept: text/event-stream ← 对应produces
内容类型对照速查表
场景 | 客户端设置 | 服务端声明 |
---|---|---|
文件上传+JSON响应 | Content-Type: multipart/form-data | consumes = MULTIPART_FORM_DATA_VALUE |
文件上传+SSE流响应 | Accept: text/event-stream | produces = TEXT_EVENT_STREAM_VALUE |
JSON上传+SSE流响应 | Content-Type: application/json | consumes = APPLICATION_JSON_VALUE |
根据需要自由选择。
2.5 这里对webclient做个扩展
如果上传的是文件可以用这个方式写body的内容
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(BodyInserters.fromMultipartData(formData))
如果不同的json类型的body请求体可以这么写
.body(BodyInserters.fromValue(res))
注意这块的细节,我就是在这里写绕了很多
四、其他方案实现
1. 使用Servlet原生API实现流式输出
@RestController
public class StreamingController {@GetMapping("/stream1")public void stream1(HttpServletResponse response) throws IOException {response.setContentType("text/plain;charset=UTF-8");try (PrintWriter writer = response.getWriter()) {for (int i = 0; i < 100; i++) {writer.write("Data line " + i + "\n");writer.flush(); // 手动刷新缓冲区Thread.sleep(100); // 模拟延迟}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
2. 使用ResponseBodyEmitter实现异步流式输出
@RestController
public class StreamingController {@GetMapping("/stream2")public ResponseBodyEmitter stream2() {ResponseBodyEmitter emitter = new ResponseBodyEmitter();CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {emitter.send("Data line " + i + "\n");Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}
3. 使用SseEmitter实现服务器发送事件(SSE)
@RestController
public class SseController {@GetMapping("/sse-stream")public SseEmitter streamSse() {SseEmitter emitter = new SseEmitter(30_000L); // 30秒超时CompletableFuture.runAsync(() -> {try {for (int i = 0; i < 100; i++) {SseEmitter.SseEventBuilder event = SseEmitter.event().data("SSE Event " + i).id(String.valueOf(i)).name("sse-event");emitter.send(event);Thread.sleep(100);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}
}
4. 使用WebFlux实现响应式流式输出
@RestController
@RequestMapping("/reactive")
public class ReactiveStreamingController {@GetMapping("/stream")public Flux<String> streamData() {return Flux.interval(Duration.ofMillis(100)).map(sequence -> "Reactive data " + sequence + "\n").take(100); // 限制输出数量}@GetMapping(value = "/stream-file", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamLargeFile() {return Flux.using(() -> Files.lines(Paths.get("large-file.txt")),Flux::fromStream,Stream::close);}
}
5. 使用Spring MVC的StreamingResponseBody
@RestController
public class StreamingResponseBodyController {@GetMapping("/stream3")public StreamingResponseBody stream3() {return outputStream -> {Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream));for (int i = 0; i < 100; i++) {writer.write("Streaming line " + i + "\n");writer.flush();Thread.sleep(100);}};}
}
五、WebClient配置扩展
1. 添加默认请求头
@Bean
public WebClient thirdPartyWebClient() {return WebClient.builder().baseUrl("https://api.example.com").defaultHeader("Authorization", "Bearer token123").defaultHeader("Accept", "application/json").build();
}
2. 配置超时
import java.time.Duration;@Bean
public WebClient thirdPartyWebClient() {return WebClient.builder().baseUrl("https://api.example.com").clientConnector(new ReactorClientHttpConnector(HttpClient.create().responseTimeout(Duration.ofSeconds(5))).build();
}
3. 添加拦截器
@Bean
public WebClient thirdPartyWebClient() {return WebClient.builder().baseUrl("https://api.example.com").filter((request, next) -> {System.out.println("Sending request to: " + request.url());return next.exchange(request);}).build();
}
常见问题
1: 能否创建多个WebClient Bean
可以,但需要指定不同的Bean名称:
@Bean("paymentClient")
public WebClient paymentClient() {return WebClient.builder().baseUrl("https://payment.api").build();
}@Bean("weatherClient")
public WebClient weatherClient() {return WebClient.builder().baseUrl("https://weather.api").build();
}
2: 如何测试WebClient
使用MockWebServer(OkHttp)模拟API:
@SpringBootTest
class ApiTest {@Autowiredprivate WebClient webClient;@Testvoid testGetUser() {MockWebServer server = new MockWebServer();server.enqueue(new MockResponse().setBody("{\"name\":\"John\"}").addHeader("Content-Type", "application/json"));server.start();webClient = webClient.mutate().baseUrl(server.url("/").toString()).build();Mono<User> user = webClient.get().uri("/users/1").retrieve().bodyToMono(User.class);StepVerifier.create(user).expectNextMatches(u -> u.getName().equals("John")).verifyComplete();server.shutdown();}
}
相关文章:

SpringBoot手动实现流式输出方案整理以及SSE规范输出详解
背景: 最近做流式输出时,一直使用python实现的,应需求方的要求,需要通过java应用做一次封装并在java侧完成系统鉴权、模型鉴权等功能后才能真正去调用智能体应用,基于此调研java实现流式输出的几种方式,并…...

深入解析I²C总线接口:从基础到应用
IC总线概述与基本概念 一句话概述:本章节将介绍IC总线的历史、定义及其在嵌入式系统中的作用,帮助读者建立对IC的基本理解。 IC(Inter-Integrated Circuit)总线是一种广泛应用于嵌入式系统中的串行通信协议,最初由飞利…...

Sklearn 机器学习 缺失值处理 检测数据每列的缺失值
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在代码与灵感交织的数字世界里和大家相遇~💖 ✨ 在这个技术浪潮奔涌的时代,我们既是探索者,也是分享者。我始终相信,每一行代码都是通往创新的钥匙,而分享则能让这把钥匙照亮更多人的…...
Unity基于GraphView的可视化关卡编辑器开发指南
一、GraphView技术基础与应用场景 1. GraphView核心组件 组件功能描述关卡编辑应用GraphView画布容器关卡拓扑结构编辑区Node基础节点房间/敌人/道具等关卡元素Edge节点连接线路径/依赖关系Port连接端口入口/出口标记Blackboard属性面板元素参数配置Minimap缩略图导航大型关卡…...

STL解析——list的使用
目录 1.简介 2.构造函数 3.迭代器 3.1封装 3.2迭代器分类 4.排序性能 4.1链式与数组 4.2缓存读取 1.简介 STL容器中提供的list容器也是一种顺序容器,底层实现方式是带头双向链表,这种实现方式能比单链表更高效的访问数据。 下面围绕部分重要接口…...
华为大规模——重塑生产力
华为大模型通过以下几个方面重塑生产力: 提供强大算力支持 华为致力于构建领先的昇腾人工智能算力平台,推出高性能昇腾AI集群,支持月级长期稳定训练,可靠性业界领先。同时打造开放的昇腾计算平台,兼容主流算子、框…...
【Go面试陷阱】对未初始化的chan进行读写为何会卡死?
Go面试陷阱:对未初始化的chan进行读写为何会卡死?深入解析nil channel的诡异行为 在Go的世界里,var ch chan int 看似人畜无害,实则暗藏杀机。它不会报错,不会panic,却能让你的程序悄无声息地"卡死&qu…...
SpringBoot自动化部署实战技术文章大纲
技术背景与目标 介绍SpringBoot在现代开发中的重要性自动化部署的价值:提升效率、减少人为错误、实现CI/CD适用场景:中小型Web应用、微服务架构 自动化部署核心方案 基于Docker的容器化部署 SpringBoot应用打包为Docker镜像使用Docker Compose编排多容…...
软件项目管理(3) 软件项目任务分解
一、相关概念 1.任务分解的方法和步骤 (1)方法 模板参照方法:参照有标准或半标准的任分解结构图类比方法:任务分解结构图经常被重复使用,具有相似性自顶向下方法:一般->特殊,演绎推理从大…...

MQTTX连接阿里云的物联网配置
本文的目标是通过MQTTX的客户端,连接到阿里云的物联网的平台,发送温度信息,在阿里云的平台中显示出来。阿里云免费注册,免费有一个MQTT的服务器。有数量限制,但是对于测试来讲,已经足够。 1、注册阿里云的物…...

20250606-C#知识:匿名函数、Lambda表达式与闭包
C#知识:匿名方法、Lambda表达式与闭包 闭包乍一听感觉很复杂,其实一点也不简单 1、匿名方法 没有方法名的方法一般用于委托和事件 Func<int, int, int> myAction delegate(int a, int b) { return a b; }; Console.WriteLine( myAction(1, 2)…...
数字证书_CA_详解
目录 一、数字证书简介 二、 CA(证书颁发机构) (一) 证书链(信任链) 1. 根证书 2. 中间证书 3. 网站证书 (二) 抓包软件的证书链与信任机制 1. 抓包通信流程 2. 证书链伪造与信任验证流程 (三) 关于移动设备的CA 一、数…...

衡量嵌入向量的相似性的方法
衡量嵌入向量的相似性的方法 一、常见相似性计算方法对比 方法核心原理公式优点缺点适用场景余弦相似度计算向量夹角的余弦值,衡量方向相似性,与向量长度无关。$\text{cos}\theta = \frac{\mathbf{a} \cdot \mathbf{b}}{\mathbf{a}\mathbf{b}欧氏距离计算向量空间中的直线距离…...
Python爬虫实战:Yelp餐厅数据采集完整教程
前言 在数据分析和商业智能领域,餐厅和商户信息的采集是一个常见需求。Yelp作为全球知名的本地商户评论平台,包含了大量有价值的商户信息。本文将详细介绍如何使用Python开发一个高效的Yelp数据爬虫,实现商户信息的批量采集。 技术栈介绍 …...
微服务常用日志追踪方案:Sleuth + Zipkin + ELK
在微服务架构中,一个用户请求往往需要经过多个服务的协同处理。为了有效追踪请求的完整调用链路,需要一套完整的日志追踪方案。Sleuth Zipkin ELK 组合提供了完整的解决方案 Sleuth:生成和传播追踪IDZipkin:收集、存储和可视化…...

API是什么意思?如何实现开放API?
目录 一、API 是什么 (一)API 的定义 (二)API 的作用 二、API 的类型 (一)Web API 1. RESTful API 2. SOAP API (二)操作系统 API (三)数据库 API …...
12.6Swing控件4 JSplitPane JTabbedPane
JSplitPane JSplitPane 是 Java Swing 中用于创建分隔面板的组件,支持两个可调整大小组件的容器。它允许用户通过拖动分隔条来调整两个组件的相对大小,适合用于需要动态调整视图比例的场景。 常用方法: setLeftComponent(Component comp)&a…...

Python训练第四十六天
DAY 46 通道注意力(SE注意力) 知识点回顾: 不同CNN层的特征图:不同通道的特征图什么是注意力:注意力家族,类似于动物园,都是不同的模块,好不好试了才知道。通道注意力:模型的定义和插入的位置通…...
C++编程——关于比较器的使用
注: 简单记录一下C里比较器的构建,常用于自定义 sort() 函数和优先队列的改写优先级。 简单构建比较器: sort() 函数: vector<int> arr;//(a, b) -> true : a < b //升序排列 bool compare(int a, int b) {retur…...

第2天:认识LSTM
🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 目标 具体实现 (一)环境 语言环境:Python 3.10 编 译 器: PyCharm 框 架: pytorch (二)具体步骤…...

自动化提示生成框架(AutoPrompt)
自动化提示生成框架(AutoPrompt) 一、核心创新点 自动化提示生成框架(AutoPrompt) 创新本质:提出基于梯度引导搜索的自动化提示生成方法,替代人工设计模板的传统模式。技术路径: 将提示视为可训练的离散token序列,通过优化提示向量(prompt embedding)搜索语义空间。利…...
两轮自平衡机器人建模、LQR控制与仿真分析
以下是一个针对两轮自平衡机器人(平衡车) 的完整建模、控制设计与仿真分析报告,包含详细的理论推导、控制算法实现及Python仿真代码。 两轮自平衡机器人建模、LQR控制与仿真分析 1. 引言 两轮自平衡机器人是一种典型的欠驱动、非线性、不稳定系统,其动力学特性与倒立摆高度…...
在NLP文本处理中,将字符映射到阿拉伯数字(构建词汇表vocab)的核心目的和意义
一、词汇表的核心作用 数值化表示 将离散的文本字符转换为连续的数值索引,使计算机能够处理非结构化的语言数据57。例如: "中国" → 2"a" → 5 统一输入格式 不同长度的文本通过填充/截断转换为固定长度的数字序列…...

中国首套1公里高分辨率大气湿度指数数据集(2003~2020)
时间分辨率:月空间分辨率:100m - 1km共享方式:开放获取数据大小:34.79 GB数据时间范围:2003-01-01 — 2020-12-31元数据更新时间:2023-07-26 数据集摘要 中国首套1公里高分辨率大气湿度指数数据集…...

计算机视觉顶刊《International Journal of Computer Vision》2025年5月前沿热点可视化分析
追踪计算机视觉领域的前沿热点是把握技术发展方向、推动创新落地的关键,分析这些热点,不仅能洞察技术趋势,更能为科研选题和工程实践提供重要参考。本文对计算机视觉顶刊《International Journal of Computer Vision》2025年5月前沿热点进行了…...

python学习打卡day45
DAY 45 Tensorboard使用介绍 知识点回顾: tensorboard的发展历史和原理tensorboard的常见操作tensorboard在cifar上的实战:MLP和CNN模型 效果展示如下,很适合拿去组会汇报撑页数: 作业:对resnet18在cifar10上采用微调策…...
JAVA元编程
一、引言:元编程的本质与 Java 实现 元编程(Metaprogramming)是一种 “操纵程序的程序” 的编程范式,其核心思想是通过代码动态操作代码本身。在 Java 中,元编程主要通过 ** 反射(Reflection)、…...

Verilog编程技巧01——如何编写三段式状态机
前言 Verilog编程技巧系列文章将聚焦于介绍Verilog的各种编程范式或者说技巧,编程技巧和编程规范有部分重合,但并非完全一样。规范更注重编码的格式,像变量命名、缩进、注释风格等,而编程技巧则更偏重更直观易读、更便于维护、综合…...

智启未来:当知识库遇见莫奈的调色盘——API工作流重构企业服务美学
目录 引言 一、初识蓝耘元生代MaaS平台 1.1 平台架构 1.2 平台的优势 1.3 应用场景 二、手把手教你如何在蓝耘进行注册 (1)输入手机号,将验证码正确填入即可快速完成注册 (2)进入下面的页面表示已经成功注册&…...
java教程笔记(十一)-泛型
Java 泛型(Generics)是 Java 5 引入的重要特性之一,它允许在定义类、接口和方法时使用类型参数。泛型的核心思想是将类型由具体的数据类型推迟到使用时再确定,从而提升代码的复用性和类型安全性。 1.泛型的基本概念 1. 什么是泛…...