当前位置: 首页 > article >正文

【Rust基础】使用Rocket构建基于SSE的流式回复

背景

我们正在使用Rust开发基于RAG的知识库系统,其中对于模型的回复使用了常用的SSE,Web框架使用Rocket,Rocket提供了一个简单的方式支持SSE,但没有会话保持、会话恢复等功能,因此我们自己简单实现这两个功能。

在这里插入图片描述

使用Rocket推送消息流

以下,是Rocket给出的示例:

#[get("/text/stream")]
fn stream() -> TextStream![String] {TextStream! {let mut interval = time::interval(Duration::from_secs(1));for i in 0..10 {yield format!("n: {}", i);interval.tick().await;}}
}

我们需要改造这个示例,以满足将模型回复的消息推送给前端的需求。

首先,对于既然推流,需要知道将流推送给谁,也就是要推送到哪个会话中,所以我们在发起会话的时候,需要一个会话ID来标识一个唯一的会话。

我们使用sse.js这个库作为SSE的客户端,用于发起SSE连接,该库可以通过发起POST请求来建立连接,可以携带额外的数据和请求头。

使用以下结构来接收一个对话请求:

pub struct ChatMessageReq {/// 会话IDpub session_id: String,/// 消息内容pub content: String,
}

于是我们的接口需要修改为这样:

#[post("/chat", data = "<req>")]
async fn question(req: Json<ChatMessageReq>) -> (ContentType, TextStream<impl Stream<Item = String>>) 
{//TODO
}

其中TextStream<impl Stream<Item = String>>等价于TextStream![String]

需要注意的是,如果返回值没有指定ContentType,那么Rocket默认响应的ContentType是文本类型,而非stream类型,会导致前端无法解析。

接下来我们实现会话管理功能。

我们定义了一个名为SsePool的struct,来存储并管理SSE连接:

struct SsePool {/// 消息流传输通道channel: DashMap<String, Sender<SseEvent>>,
}impl SsePool {/// 初始化连接池pub fn init() -> Self {Self {channel: DashMap::new(),}}/// 移除连接fn remove(&self, id: &String) {if let Some((_, sender)) = self.channel.remove(id) {drop(sender);}}/// 获取连接fn get_sender(&self, id: &String) -> Option<Sender<SseEvent>> {self.channel.get(id).map(|v| v.value().clone())}/// 新建channelpub fn new_channel(&self, id: String) -> (Sender<SseEvent>, Receiver<SseEvent>) {let (sender, receiver) = tokio::sync::mpsc::channel(10_0000);// 获取并移除旧senderlet old_sender = self.channel.remove(&id).map(|(_, s)| s);// 插入新senderself.channel.insert(id, sender.clone());// 处理旧senderif let Some(old_sender) = old_sender {tokio::spawn(async move {// 发送终止信号let _ = old_sender.send(SseEvent::Abort).await;});}(sender, receiver)}/// 发送消息pub async fn send_message(&self, id: &String, message: ChatMessage) {if let Some(sender) = self.get_sender(id) {if let Err(e) = sender.send(SseEvent::ChatMessage(message)).await {log::warn!("消息发送失败,session id: {},失败原因:{}", &id, e);};// drop(sender);}}
}

其中channel使用的是tokio中mpsc的channel。

值得注意的是,new_channel中,新建连接时,需要向channel发送一条终止事件,确保已有的receiver关闭,返回新的receiver,这一点是用于后续的会话恢复使用。new_channel会返回receiver和sender,用于消息接收和发送。

当收到模型回复是,调用SsePool::send_message发送消息到channel,再头通过receiver接收消息,转发到前端。

可以把它初始化到静态变量中,方便全局调用:

static SSE_POOL: LazyLock<SsePool> = LazyLock::new(|| SsePool::init());

于是,我们的接口可以完善为以下内容:

#[post("/chat", data = "<req>")]
async fn question(req: Json<ChatMessageReq>,
) -> (ContentType, TextStream<impl Stream<Item = String>>){// 请求新消息,并返回receiverlet (_, _, mut receiver) = service::new_message(req).await.unwrap();let stream = TextStream! {// 持续接收receiver的消息,然后推送到前端while let Some(item) = receiver.recv().await {match item{//模型回复的消息SseEvent::ChatMessage(message) => {yield SseEvent::ChatMessage(message.clone()).to_message();if SseEvent::is_done(&message) {// 推送消息yield SseEvent::Abort.to_message();break;}},// 关闭通道SseEvent::Abort => {yield SseEvent::Abort.to_message();break;},_ => {}}}yield SseEvent::Abort.to_message();drop(receiver);};(ContentType::EventStream, stream)
}

至此,新会话的接口就完成了。

接下来是会话的恢复。

当前端切换会话或刷新页面时,我们希望能够继续收到未回复完成的消息,所以需要一个用于会话恢复的接口。同样的,接口需要会话ID来区分恢复哪一个会话。

#[post("/resume-stream", data = "<req>")]
async fn resume_stream(req: Json<ResumeStreamReq>,
) -> (ContentType, TextStream<impl Stream<Item = String>>){// 会话IDlet session_id = req.session_id.clone();let stream = TextStream! {// 尝试恢复会话,并返回receiver,如果能够返回receiver说明会话未完成,否则已经完成if let Some(mut receiver) = service::resume_stream(&req.session_id).await.unwrap(){// 持续接收未回复完成的消息while let Some(item) = receiver.recv().await {match item {// 模型回复的消息SseEvent::ChatMessage(message) => {yield SseEvent::ChatMessage(message.clone()).to_message();if SseEvent::is_done(&message) {yield SseEvent::Abort.to_message();break;}}// 关闭通道SseEvent::Abort => {yield SseEvent::Abort.to_message();break;}_ => {}}}drop(receiver);}yield SseEvent::Abort.to_message();};(ContentType::EventStream, stream)
}

service::resume_stream中,首先检查对应会话ID的channel是否存在,存在则新建channel返回receiver,不存在则表明已经回复完成。

pub async fn resume_stream(session_id: &String,
) -> AppResult<Option<Receiver<SseEvent>>> {if let None = chat::get_connection(session_id) {return Ok(None);}// 获取会话对应的channel,如果channel存在则标识消息仍在回复中let (_, receiver) = chat::new_connection(session_id);Ok(Some(receiver))
}

至此,便实现了会话恢复,刷新页面后仍能后接收strem消息。

总结

使用Rust写这些业务代码的速度,终归是没有Java快,一些常用的库,没有Java系列封装的简单易用,不过应用占用资源确实比Java小很多。

本次使用的一些库:

  • tokio:异步运行环境,以及mpsc的channel,
  • dashmap:支持并发的hashmap,但是使用不当容易造成死锁。

相关文章:

【Rust基础】使用Rocket构建基于SSE的流式回复

背景 我们正在使用Rust开发基于RAG的知识库系统&#xff0c;其中对于模型的回复使用了常用的SSE&#xff0c;Web框架使用Rocket&#xff0c;Rocket提供了一个简单的方式支持SSE&#xff0c;但没有会话保持、会话恢复等功能&#xff0c;因此我们自己简单实现这两个功能。 使用R…...

高级java每日一道面试题-2025年4月07日-微服务篇[Nacos篇]-如何监控Nacos的运行状态?

如果有遗漏,评论区告诉我进行补充 面试官: 如何监控Nacos的运行状态&#xff1f; 我回答: 监控Nacos运行状态的综合方案 在Java高级面试中&#xff0c;监控Nacos运行状态是一个重要的技术点&#xff0c;它直接关系到微服务架构的稳定性和性能。以下是一个综合的监控方案&am…...

【3GPP核心网】【5G】精讲5G系统的策略和计费控制框架

1. 欢迎大家订阅和关注,精讲3GPP通信协议(2G/3G/4G/5G/IMS)知识点,专栏会持续更新中.....敬请期待! 目录 1. 系统架构 1.1 非漫游架构 1.2 漫游架构 1.3 支持Rx接口 2. 服务化接口及参考点 2.1 PCF 与 AF 间接口 2.2 PCF与SMF间接口 2.3 PCF与AMF间接口 2.4 V-PC…...

大前端基础学习

一、cs架构和bs架构 c&#xff1a;客户端&#xff0c; b&#xff1a;浏览器&#xff08;无需安装&#xff0c;无需更新&#xff0c;可跨平台&#xff09;√ s&#xff1a;server服务端&#xff0c;帮我们保 存信息&#xff0c;传递信息 二、 altshift向下键向下复制一行 …...

Axios 的 POST 请求:QS 处理数据的奥秘与使用场景解析

在现代前端开发中&#xff0c;Axios 已经成为了进行 HTTP 请求的首选库之一&#xff0c;它的简洁易用和强大功能深受开发者喜爱。当使用 Axios 进行 POST 请求时&#xff0c;我们常常会遇到一个问题&#xff1a;是否需要使用 QS 库来处理请求数据&#xff1f;什么时候又可以不用…...

Linux 防火墙( iptables )

目录 一、 Linux 防火墙基础 1. 防火墙基础概念 &#xff08;1&#xff09;防火墙的概述与作用 &#xff08;2&#xff09;防火墙的结构与匹配流程 &#xff08;3&#xff09;防火墙的类别与各个防火墙的区别 2. iptables 的表、链结构 &#xff08;1&#xff09;规则表 …...

【redis进阶三】分布式系统之主从复制结构(1)

目录 一 为什么要有分布式系统&#xff1f; 二 分布式系统涉及到的非常关键的问题&#xff1a;单点问题 三 学习部署主从结构的redis (1)创建一个目录 (2)进入目录拷贝两份原有redis (3)使用vim修改几个选项 (4)启动两个从节点服务器 (5)建立复制&#xff0c;要想配…...

EM储能网关ZWS智慧储能云应用(9) — 远程OTA升级

ZWS智慧储能云平台支持远程OTA固件升级&#xff0c;可以针对具体的储能设备进行升级&#xff0c;升级储能网关、EMS主控软件、PCS、BMS等。 简介 储能系统通常高度集成化&#xff0c;一体化设计&#xff0c;将EMS、BMS&#xff08;电池管理系统&#xff09;、PCS&#xff08…...

RabbitMQ 详解:核心概念、集群模式与消息分布

RabbitMQ 是一个基于高级消息队列协议&#xff08;AMQP&#xff09;的开源消息中间件&#xff0c;广泛应用于分布式系统中&#xff0c;用于实现可靠的消息传递。其强大的功能和灵活的架构使其成为构建高可用、可扩展系统的理想选择。本文整理了 RabbitMQ 的核心概念、集群模式及…...

ubuntu24.04LTS安装向日葵解决方案

去向日葵官方下载ubuntu使用的deb包 向日葵 输入如下命令安装&#xff0c;将具体版本修改成自己下载的版本 andrew in ~/下载 λ sudo dpkg -i SunloginClient_15.2.0.63064_amd64.deb 正在选中未选择的软件包 sunloginclient。 (正在读取数据库 ... 系统当前共安装有 290947…...

LoRA个关键超参数:`LoRA_rank`(通常简称为 `rank` 或 `r`)和 `LoRA_alpha`(通常简称为 `alpha`)

LoRA (Low-Rank Adaptation) 中的两个关键超参数&#xff1a;LoRA_rank&#xff08;通常简称为 rank 或 r&#xff09;和 LoRA_alpha&#xff08;通常简称为 alpha&#xff09;。 LoRA 的核心思想是&#xff0c;在对大型预训练模型&#xff08;如 LLM 或 Stable Diffusion&…...

C++11智能指针深度解析:在Visual Studio中高效管理内存

文章目录 **C++11智能指针深度解析:在Visual Studio中高效管理内存****一、C++11智能指针的核心价值****二、三大智能指针详解与Visual Studio实战****1. `std::unique_ptr`:独占所有权****2. `std::shared_ptr`:共享所有权****3. `std::weak_ptr`:打破循环引用****三、高级…...

达梦官方管理工具SQLark:自动识别外键约束、check约束与虚拟列,助力高效生成测试数据

在数据库管理和应用开发过程中&#xff0c;高质量的测试数据对于系统调试和POC测试至关重要。达梦官方推出的新一代管理工具 SQLark百灵连接&#xff0c;其数据生成功能&#xff0c;可以为应用开发者、DBA 以及测试人员带来极大便利&#xff0c;能够轻松应对各类复杂的测试场景…...

Visio绘图工具全面科普:解锁专业图表绘制新境界[特殊字符]

Visio绘图工具全面科普&#xff1a;解锁专业图表绘制新境界&#x1f31f; 在信息爆炸的时代&#xff0c;清晰、直观地呈现复杂信息变得至关重要。无论是绘制流程图&#x1f4ca;、组织结构图&#x1f465;&#xff0c;还是规划网络拓扑&#x1f5a7;&#xff0c;一款强大的绘图…...

RPCRT4!OSF_CCONNECTION::OSF_CCONNECTION函数分析之创建一个RPCRT4!OSF_CCALL--RPC源代码分析

RPCRT4!OSF_CCONNECTION::OSF_CCONNECTION函数分析之创建一个RPCRT4!OSF_CCALL 第一部分&#xff1a; 1: kd> p RPCRT4!OSF_CCONNECTION::OSF_CCONNECTION0x167: 001b:77bf6957 393dec35c877 cmp dword ptr [RPCRT4!gfRPCVerifierEnabled (77c835ec)],edi 1: kd> …...

快速认识:数据库、数仓(数据仓库)、数据湖与数据运河

数据技术核心概念对比表 概念核心定义核心功能数据特征典型技术/工具核心应用场景数据库结构化数据的「电子档案柜」&#xff0c;按固定 schema 存储和管理数据&#xff0c;支持高效读写和事务处理。实时事务处理&#xff08;增删改查&#xff09;&#xff0c;确保数据一致性&…...

不关“猫”如何改变外网IP?3种免重启切换IP方案

每次更换外网IP都要重启路由器&#xff1f;太麻烦了&#xff01;那么&#xff0c;不关猫怎么改变外网IP&#xff1f;无论是为了网络调试、爬虫需求&#xff0c;还是解决IP限制问题&#xff0c;频繁重启设备既耗时又影响效率。其实&#xff0c;更换外网IP并不一定要依赖“重启大…...

C#进阶学习(五)单向链表和双向链表,循环链表(中)双向链表

目录 一、双向链表的声明 1. 节点类声明 2. 链表类框架声明 3、实现其中的每一个函数 增删操作&#xff08;核心方法组&#xff09; 删除操作&#xff08;核心方法组&#xff09; 查询操作&#xff08;辅助方法&#xff09; 维护方法&#xff08;内部逻辑&#xff09; …...

case客户续保预测中用到的特征工程、回归分析和决策树分析的总结

文章目录 [toc]1. 回归分析概述1.1 基本概念1.2 与分类的区别 2. 常见回归算法2.1 线性回归2.2 决策树回归2.3 逻辑回归&#xff08;Logistic Regression&#xff09;2.3 其他算法补充&#xff1a;通俗版&#xff1a;决策树 vs 随机森林&#x1f333; 决策树&#xff1a;像玩「…...

Android系统通知机制深度解析:Framework至SystemUI全链路剖析

1. 前言 在Android 13的ROM定制化开发中&#xff0c;系统通知机制作为用户交互的核心组件&#xff0c;其实现涉及Framework层到SystemUI的复杂协作。本文将深入剖析从Notification发送到呈现的全链路流程&#xff0c;重点解析关键类的作用机制及系统服务间的交互逻辑&#xff…...

重学Redis:Redis常用数据类型+存储结构(源码篇)

一、SDS 1&#xff0c;SDS源码解读 sds (Simple Dynamic String)&#xff0c;Simple的意思是简单&#xff0c;Dynamic即动态&#xff0c;意味着其具有动态增加空间的能力&#xff0c;扩容不需要使用者关心。String是字符串的意思。说白了就是用C语言自己封装了一个字符串类型&a…...

Elasticsearch的Java客户端库QueryBuilders查询方法大全

matchAllQuery 使用方法&#xff1a;创建一个查询&#xff0c;匹配所有文档。 示例&#xff1a;QueryBuilders.matchAllQuery() 注意事项&#xff1a;这种查询不加任何条件&#xff0c;会返回索引中的所有文档&#xff0c;可能会影响性能&#xff0c;特别是文档数量很多时。 ma…...

js原型和原型链

js原型&#xff1a; 1、原型诞生的目的是什么呢&#xff1f; js原型的产生是为了解决在js对象实例之间共享属性和方法&#xff0c;并把他们很好聚集在一起&#xff08;原型对象上&#xff09;。每个函数都会创建一个prototype属性&#xff0c;这个属性指向的就是原型对象。 …...

usb重定向qemu前端处理

1、qemu添加spicevmc前端时会创建vmc通道。 -chardev spicevmc,idusbredirchardev0,nameusbredir red::shared_ptr<RedCharDevice> spicevmc_device_connect(RedsState *reds, SpiceCharDeviceInstance *sin, uint8_t channel_type) {auto channel(red_vmc_channel_new(r…...

OpenHarmony - 小型系统内核(LiteOS-A)(五)

OpenHarmony - 小型系统内核&#xff08;LiteOS-A&#xff09;&#xff08;五&#xff09; 六、文件系统 虚拟文件系统 基本概念 VFS&#xff08;Virtual File System&#xff09;是文件系统的虚拟层&#xff0c;它不是一个实际的文件系统&#xff0c;而是一个异构文件系统之…...

PyTorch进阶学习笔记[长期更新]

第一章 PyTorch简介和安装 PyTorch是一个很强大的深度学习库&#xff0c;在学术中使用占比很大。 我这里是Mac系统的安装&#xff0c;相比起教程中的win/linux安装感觉还是简单不少&#xff08;之前就已经安好啦&#xff09;&#xff0c;有需要指导的小伙伴可以评论。 第二章…...

proteus8.17 环境配置

Proteus介绍 Proteus 8.17 是一款功能强大的电子设计自动化&#xff08;EDA&#xff09;软件&#xff0c;广泛应用于电子电路设计、仿真和分析。以下是其主要特点和新功能&#xff1a; ### 主要功能 - **电路仿真**&#xff1a;支持数字和模拟电路的仿真&#xff0c;包括静态…...

Microsoft SQL Server Management 一键删除数据库所有外键

DECLARE ESQL VARCHAR(1000); DECLARE FCursor CURSOR --定义游标 FOR (SELECT ALTER TABLE O.name DROP CONSTRAINT F.name; AS CommandSQL from SYS.FOREIGN_KEYS F JOIN SYS.ALL_OBJECTS O ON F.PARENT_OBJECT_ID O.OBJECT_ID WHERE O.TYPE U AND F.TYPE …...

【JAVAFX】自定义FXML 文件存放的位置以及使用

情况 1&#xff1a;FXML 文件与调用类在同一个包中&#xff08;推荐&#xff09; 假设类 MainApp 的包是 com.example&#xff0c;且 FXML 文件放在 resources/com/example 下&#xff1a; 项目根目录 ├── src │ └── sample │ └── Main.java ├── src/s…...

Oracle 如何停止正在运行的 Job

Oracle 如何停止正在运行的 Job 先了解是dbms_job 还是 dbms_scheduler&#xff0c;再确定操作命令。 一 使用 DBMS_JOB 包停止作业&#xff08;适用于旧版 Job&#xff09; 1.1 查看正在运行的 Job SELECT job, what, this_date, this_sec, failures, broken FROM user_j…...