LLM流式方案解决方案和客户端解决方案
背景
接上一篇《LLM大模型统一封装接口解决方案》架构确定后,流式方案非常规请求,需要特殊处理。
本解决方案就是针对上一篇中所需要的流式(打字机效果进行编码)
什么是SSE
SSE(Server-Sent Events,服务器发送事件)是一种基于HTTP的服务器到客户端的单向通信技术,用于实现服务器向客户端推送数据的功能。SSE协议标准由HTML5规范定义,并且其定义被包含在HTML Living Standard中。
SSE允许服务器通过HTTP连接向客户端发送数据,而无需客户端发起请求。这使得SSE非常适合于实时通信或推送通知给客户端的应用程序,例如实时股票报价、即时通讯、实时监控等场景。
基本上,SSE由以下要素组成:
- 服务器:负责向客户端发送事件流的HTTP服务器。
- 客户端:通过浏览器中的EventSource API与服务器建立连接,接收服务器发送的事件。
- 事件流(Event Stream):服务器向客户端发送的数据流,格式为纯文本,使用一种特定的格式进行编码,例如MIME类型为"text/event-stream"。
SSE的优点包括简单易用、实现方便、跨浏览器支持良好等。然而,它也有一些限制,例如不能支持双向通信,与WebSocket相比,SSE的实时性稍逊一筹。
Java框架说明
pom 文件引入的核心依赖包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>aip.com</groupId><artifactId>aip-com</artifactId><version>0.0.1</version><name>aip-com</name><description>aip com project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
Java后端核心代码
本方法是标准的SSE协议标准
private final ExecutorService executorService = Executors.newFixedThreadPool(5);/*** 会话请求** @return String*/@PostMapping(value = "/completions", consumes = MediaType.APPLICATION_JSON_VALUE)@Operation(summary = "会话请求")public SseEmitter completions(@RequestBody CompletionRequest completionRequest) {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);SseEmitter emitter = new SseEmitter();executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {// 向客户端发送事件emitter.send(SseEmitter.event().name("message").data(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build())));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;/*** 会话请求** @return String*/@GetMapping(value = "/stream")@Operation(summary = "会话请求")public SseEmitter stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);SseEmitter emitter = new SseEmitter();executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {// 向客户端发送事件emitter.send(SseEmitter.event().name("message").data(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build())));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;
Flux 和 Flowable 对比
Flux 和 Flowable 都是响应式编程库中的数据流类型,用于处理异步和基于事件的流式数据。它们分别来自于不同的库,Flux 是 Reactor 库的一部分,而 Flowable 则是 RxJava 库的一部分。以下是它们之间的一些区别:
-
库的来源:
- Flux 来自于 Reactor 库,是 Reactor 的核心组件之一,React的核心模块用于基于反应式流规范处理数据流。
- Flowable 来自于 RxJava 库,是 RxJava 的核心类之一,RxJava 是 Java 平台的反应式扩展库,用于处理异步和基于事件的编程。
-
背压策略:
- Flux 默认采用背压策略为 BUFFER,可以通过 onBackpressureBuffer、onBackpressureDrop、onBackpressureLatest 等方法来指定不同的背压策略。
- Flowable 默认也是支持背压的,但是相比 Flux,Flowable 提供了更多的背压策略,如 BUFFER、DROP、LATEST、ERROR、MISSING。
-
反应式规范:
- Flux 遵循 Reactor 库的反应式流规范,使用 Mono 和 Flux 来表示异步流和单个结果。
- Flowable 遵循 RxJava 库的反应式流规范,使用 Observable 和 Flowable 来表示异步流和单个结果。
-
生态系统:
- Reactor 生态系统主要用于基于 Reactor 的应用程序。
- RxJava 生态系统则更广泛,它是 ReactiveX 的一部分,支持多种语言和平台,并有许多衍生项目。
总的来说,Flux 和 Flowable 在概念上很相似,都用于处理异步和基于事件的流式数据,但它们来自于不同的库,并且有一些细微的区别,如背压策略和生态系统支持。您可以根据项目需求选择适合的库和数据流类型。
Java后端Flowable方式
本方法是Flowable方式,非标准流式规则
/*** 会话请求** @return String*/@GetMapping(value = "/stream")@Operation(summary = "会话请求")public Flowable<String> stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);Flowable<String> typingFlow = Flowable.create(emitter -> {executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {emitter.onNext(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build()));Thread.sleep(1000);}emitter.onComplete();} catch (Exception e) {}});}, BackpressureStrategy.BUFFER);return typingFlow;}
Java后端Flux方式
本方法是Flux方式,非标准流式规则
/*** 会话请求** @return String*/@GetMapping(value = "/stream")@Operation(summary = "会话请求")public Flux<String> stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);Flux<String> typingFlow = Flux.create(emitter -> {executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {emitter.next(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build()));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {}});}, FluxSink.OverflowStrategy.BUFFER);return typingFlow;}
}
HTML 客户端接收示例程序
function EventSourceGetRequest() SSE 默认方法,只支持GET请求,适合演示用途以及后端包装好服务
function fetchPostRequest() fetch POST 请求实现SSE,支持所有请求(POST,GET等)以及传递参数
sse.html 内容
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SEE Example</title><script>// SSE 默认方法,只支持GET请求function EventSourceGetRequest() {if(typeof(EventSource)!=="undefined"){var eventSource = new EventSource('http://127.0.0.1:8090/v1/chat/stream');eventSource.onmessage = function(event){document.getElementById('result').insertAdjacentHTML('beforeend', `${event.data}<br/><br/>`);console.log(event)};}else{document.getElementById("result").innerHTML="抱歉,你的浏览器不支持 server-sent 事件...";}}// fetch POST 请求实现SSEfunction fetchPostRequest() {fetch('http://127.0.0.1:8090/v1/chat/completions', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify({}),}).then(response => {// 检查响应是否成功if (!response.ok) {throw new Error('Network response was not ok');}// 返回 ReadableStream 对象return response.body;}).then(stream => {// 创建一个新的文本解码器const decoder = new TextDecoder();// 获取一个 reader 对象const reader = stream.getReader();let chunk = ''// 逐块读取数据function read() {reader.read().then(({ done, value }) => {if (done) {document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);console.log('Stream has ended');return;}// 将数据块转换为字符串并显示const tmp = decoder.decode(value, { stream: true });if (tmp.startsWith('event:') && chunk!='') {document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);chunk = tmp}else{chunk = chunk + tmp}// 继续读取下一块数据read();});}// 开始读取数据read();}).catch(error => {// 处理错误console.error('There was a problem with the fetch operation:', error);});}// EventSourceGetRequest();fetchPostRequest();</script>
</head>
<body><h1>SEE result</h1><div id="result"></div>
</body>
</html>
- 标准SSE示例
- 扩展SSE
相关文章:
LLM流式方案解决方案和客户端解决方案
背景 接上一篇《LLM大模型统一封装接口解决方案》架构确定后,流式方案非常规请求,需要特殊处理。 本解决方案就是针对上一篇中所需要的流式(打字机效果进行编码) 什么是SSE SSE(Server-Sent Events,服务器发…...
ROS2 高效学习系列
ROS2 高效学习系列 1 说明2 正文 1 说明 2023 年,我们总结输出了 ROS高效入门系列 和 ROS高效进阶系列,系统学习了 ros1 基础知识和 ros 的机器人算法。由于 ros2 的普及,我们将系统学习 ros2 ,包括 ros2 基础知识和相关高级组件…...
SpringBoot + MyBatisPlus分页查询
文章目录 1.思路分析2.分页查询后端实现1.com/sun/furn/config/MybatisConfig.java 注入MyBatisPlus分页拦截器2.com/sun/furn/controller/FurnController.java 添加方法3.postman测试 3.分页查询前端实现1.src/views/HomeView.vue 引入分页导航条组件2.src/views/HomeView.vue…...
记使用sjson的一次小事故
1. 前言 之前在设计一个兼容函数的时候,使用了sjson动态设入参数,从而实现一些参数的兼容。大致的逻辑如下所示: // 有一堆不规则的json数据 {"a":"aaa","b":"bbb","any_key1":{"k…...
如何在iOS系统抓取log
前言:因为作者目前工作领域和苹果智能家居有关,然后发现一些bug其实是apple sdk原生code的问题,所以需要给apple提radar单,就需要抓ios端Log充当证据给apple看,其实ios抓log非常简单,大家感兴趣可以学习下哦…...
【嵌入式——QT】Charts常见的图表的绘制
【嵌入式——QT】Charts常见的图表的绘制 柱状图QBarSetQBarSeriesQBarCategoryAxis图示 饼图堆叠柱状图百分比柱状图散点图和光滑曲线图代码示例 柱状图 QBarSet 用于创建柱状图的数据集。 主要函数 setLabel():设置数据集标签 ;setLabelBrush()&am…...
pandas读写excel,csv
1.读excel 1.to_dict() 函数基本语法 DataFrame.to_dict (self, orientdict , into ) --- 官方文档 函数种只需要填写一个参数:orient 即可 ,但对于写入orient的不同,字典的构造方式也不同,官网一共给出了6种,…...
清华大学突破性研究:GVGEN技术,7秒内从文字到3D高保真生成
引言:3D模型生成的挑战与机遇 随着计算机图形学的发展,3D模型的生成在各个行业中变得越来越重要,包括视频游戏设计、电影制作以及AR/VR技术等。在3D建模的不同方面中,从文本描述生成3D模型成为一个特别有趣的研究领域,…...
软件测试要学习的基础知识——黑盒测试
概述 黑盒测试也叫功能测试,通过测试来检测每个功能是否都能正常使用。在测试中,把程序看作是一个不能打开的黑盒子,在完全不考虑程序内部结构和内部特性的情况下,对程序接口进行测试,只检查程序功能是否按照需求规格…...
如何用Airtest脚本连接无线Android设备?
之前我们已经详细介绍过如何用AirtestIDE无线连接Android设备,它的关键点在于,需要先 adb connect 一次,才能点击 connect 按钮无线连接上该设备: 但是有很多同学,在使用纯Airtest脚本的形式连接无线设备时,…...
c语言函数大全(C开头)
c语言函数大全(C开头) There is no nutrition in the blog content. After reading it, you will not only suffer from malnutrition, but also impotence. The blog content is all parallel goods. Those who are worried about being cheated should leave quickly. 函数名…...
初始Redis关联和非关联
基础篇Redis 3.初始Redis 3.1.2.关联和非关联 传统数据库的表与表之间往往存在关联,例如外键: 而非关系型数据库不存在关联关系,要维护关系要么靠代码中的业务逻辑,要么靠数据之间的耦合: {id: 1,name: "张三…...
Redis 更新开源许可证 - 不再支持云供应商提供商业化的 Redis
原文:Rowan Trollope - 2024.03.20 未来的 Redis 版本将继续在 RSALv2 和 SSPLv1 双许可证下提供源代码的免费和宽松使用;这些版本将整合先前仅在 Redis Stack 中可用的高级数据类型和处理引擎。 从今天开始,所有未来的 Redis 版本都将以开…...
生产者Producer往BufferQueue中写数据的过程
In normal operation, the producer calls dequeueBuffer() to get an empty buffer, fills it with data, then calls queueBuffer() to make it available to the consumer 代码如下: // XXX: Tests that fork a process to hold the BufferQueue must run bef…...
使用 Vite 和 Bun 构建前端
虽然 Vite 目前可以与 Bun 配合使用,但它尚未进行大量优化,也未调整以使用 Bun 的打包器、模块解析器或转译器。 Vite 可以与 Bun 完美兼容。从 Vite 的模板开始使用吧。 bun create vite my-app ✔ Select a framework: › React ✔ Select a variant:…...
如何设置IDEA远程连接服务器开发环境并结合cpolar实现ssh远程开发
文章目录 1. 检查Linux SSH服务2. 本地连接测试3. Linux 安装Cpolar4. 创建远程连接公网地址5. 公网远程连接测试6. 固定连接公网地址7. 固定地址连接测试 本文主要介绍如何在IDEA中设置远程连接服务器开发环境,并结合Cpolar内网穿透工具实现无公网远程连接…...
【项目管理后台】Vue3+Ts+Sass实战框架搭建二
Vue3TsSass搭建 git cz的配置mock 数据配置viteMockServe 建立mock/user.ts文件夹测试一下mock是否配置成功 axios二次封装解决env报错问题,ImportMeta”上不存在属性“env” 统一管理相关接口新建api/index.js 路由的配置建立router/index.ts将路由进行集中封装&am…...
制作一个RISC-V的操作系统六-bootstrap program(risv 引导程序)
文章目录 硬件基本概念qemu-virt地址映射系统引导CSRmachine模式下的csr对应的csr指令csrrwcsrrs mhartid引导程序做的事情判断当前hart是不是第一个hart初始化栈跳转到c语言的…...
haproxy和keepalived的区别与联系
HAProxy(High Availability Proxy) 是一个开源的、高效且可靠的解决方案,主要用于负载均衡。它工作在应用层(第七层),支持多种协议,如HTTP、HTTPS、FTP等。HAProxy通过健康检查机制持续监控后…...
云效 AppStack + 阿里云 MSE 实现应用服务全链路灰度
作者:周静、吴宇奇、泮圣伟 在应用开发测试验证通过后、进行生产发布前,为了降低新版本发布带来的风险,期望能够先部署到灰度环境,用小部分业务流量进行全链路灰度验证,验证通过后再全量发布生产。本文主要介绍如何通…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
idea大量爆红问题解决
问题描述 在学习和工作中,idea是程序员不可缺少的一个工具,但是突然在有些时候就会出现大量爆红的问题,发现无法跳转,无论是关机重启或者是替换root都无法解决 就是如上所展示的问题,但是程序依然可以启动。 问题解决…...
23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...
【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...
蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练
前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...
《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...
RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文全面剖析RNN核心原理,深入讲解梯度消失/爆炸问题,并通过LSTM/GRU结构实现解决方案,提供时间序列预测和文本生成…...
项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...
Xen Server服务器释放磁盘空间
disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...
蓝桥杯 冶炼金属
原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...

