LLM流式方案解决方案和客户端解决方案
背景
接上一篇《LLM大模型统一封装接口解决方案》架构确定后,流式方案非常规请求,需要特殊处理。
本解决方案就是针对上一篇中所需要的流式(打字机效果进行编码)
什么是SSE
SSE(Server-Sent Events,服务器发送事件)是一种基于HTTP的服务器到客户端的单向通信技术,用于实现服务器向客户端推送数据的功能。SSE协议标准由HTML5规范定义,并且其定义被包含在HTML Living Standard中。
SSE允许服务器通过HTTP连接向客户端发送数据,而无需客户端发起请求。这使得SSE非常适合于实时通信或推送通知给客户端的应用程序,例如实时股票报价、即时通讯、实时监控等场景。
基本上,SSE由以下要素组成:
- 服务器:负责向客户端发送事件流的HTTP服务器。
- 客户端:通过浏览器中的EventSource API与服务器建立连接,接收服务器发送的事件。
- 事件流(Event Stream):服务器向客户端发送的数据流,格式为纯文本,使用一种特定的格式进行编码,例如MIME类型为"text/event-stream"。
SSE的优点包括简单易用、实现方便、跨浏览器支持良好等。然而,它也有一些限制,例如不能支持双向通信,与WebSocket相比,SSE的实时性稍逊一筹。
Java框架说明
pom 文件引入的核心依赖包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>aip.com</groupId><artifactId>aip-com</artifactId><version>0.0.1</version><name>aip-com</name><description>aip com project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>io.reactivex.rxjava2</groupId><artifactId>rxjava</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
Java后端核心代码
本方法是标准的SSE协议标准
private final ExecutorService executorService = Executors.newFixedThreadPool(5);/*** 会话请求** @return String*/@PostMapping(value = "/completions", consumes = MediaType.APPLICATION_JSON_VALUE)@Operation(summary = "会话请求")public SseEmitter completions(@RequestBody CompletionRequest completionRequest) {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);SseEmitter emitter = new SseEmitter();executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {// 向客户端发送事件emitter.send(SseEmitter.event().name("message").data(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build())));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;/*** 会话请求** @return String*/@GetMapping(value = "/stream")@Operation(summary = "会话请求")public SseEmitter stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);SseEmitter emitter = new SseEmitter();executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {// 向客户端发送事件emitter.send(SseEmitter.event().name("message").data(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build())));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;
Flux 和 Flowable 对比
Flux 和 Flowable 都是响应式编程库中的数据流类型,用于处理异步和基于事件的流式数据。它们分别来自于不同的库,Flux 是 Reactor 库的一部分,而 Flowable 则是 RxJava 库的一部分。以下是它们之间的一些区别:
-
库的来源:
- Flux 来自于 Reactor 库,是 Reactor 的核心组件之一,React的核心模块用于基于反应式流规范处理数据流。
- Flowable 来自于 RxJava 库,是 RxJava 的核心类之一,RxJava 是 Java 平台的反应式扩展库,用于处理异步和基于事件的编程。
-
背压策略:
- Flux 默认采用背压策略为 BUFFER,可以通过 onBackpressureBuffer、onBackpressureDrop、onBackpressureLatest 等方法来指定不同的背压策略。
- Flowable 默认也是支持背压的,但是相比 Flux,Flowable 提供了更多的背压策略,如 BUFFER、DROP、LATEST、ERROR、MISSING。
-
反应式规范:
- Flux 遵循 Reactor 库的反应式流规范,使用 Mono 和 Flux 来表示异步流和单个结果。
- Flowable 遵循 RxJava 库的反应式流规范,使用 Observable 和 Flowable 来表示异步流和单个结果。
-
生态系统:
- Reactor 生态系统主要用于基于 Reactor 的应用程序。
- RxJava 生态系统则更广泛,它是 ReactiveX 的一部分,支持多种语言和平台,并有许多衍生项目。
总的来说,Flux 和 Flowable 在概念上很相似,都用于处理异步和基于事件的流式数据,但它们来自于不同的库,并且有一些细微的区别,如背压策略和生态系统支持。您可以根据项目需求选择适合的库和数据流类型。
Java后端Flowable方式
本方法是Flowable方式,非标准流式规则
/*** 会话请求** @return String*/@GetMapping(value = "/stream")@Operation(summary = "会话请求")public Flowable<String> stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);Flowable<String> typingFlow = Flowable.create(emitter -> {executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {emitter.onNext(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build()));Thread.sleep(1000);}emitter.onComplete();} catch (Exception e) {}});}, BackpressureStrategy.BUFFER);return typingFlow;}
Java后端Flux方式
本方法是Flux方式,非标准流式规则
/*** 会话请求** @return String*/@GetMapping(value = "/stream")@Operation(summary = "会话请求")public Flux<String> stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);Flux<String> typingFlow = Flux.create(emitter -> {executorService.execute(() -> {try {for (int i = 0; i < 10; i++) {emitter.next(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build()));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {}});}, FluxSink.OverflowStrategy.BUFFER);return typingFlow;}
}
HTML 客户端接收示例程序
function EventSourceGetRequest() SSE 默认方法,只支持GET请求,适合演示用途以及后端包装好服务
function fetchPostRequest() fetch POST 请求实现SSE,支持所有请求(POST,GET等)以及传递参数
sse.html 内容
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>SEE Example</title><script>// SSE 默认方法,只支持GET请求function EventSourceGetRequest() {if(typeof(EventSource)!=="undefined"){var eventSource = new EventSource('http://127.0.0.1:8090/v1/chat/stream');eventSource.onmessage = function(event){document.getElementById('result').insertAdjacentHTML('beforeend', `${event.data}<br/><br/>`);console.log(event)};}else{document.getElementById("result").innerHTML="抱歉,你的浏览器不支持 server-sent 事件...";}}// fetch POST 请求实现SSEfunction fetchPostRequest() {fetch('http://127.0.0.1:8090/v1/chat/completions', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify({}),}).then(response => {// 检查响应是否成功if (!response.ok) {throw new Error('Network response was not ok');}// 返回 ReadableStream 对象return response.body;}).then(stream => {// 创建一个新的文本解码器const decoder = new TextDecoder();// 获取一个 reader 对象const reader = stream.getReader();let chunk = ''// 逐块读取数据function read() {reader.read().then(({ done, value }) => {if (done) {document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);console.log('Stream has ended');return;}// 将数据块转换为字符串并显示const tmp = decoder.decode(value, { stream: true });if (tmp.startsWith('event:') && chunk!='') {document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);chunk = tmp}else{chunk = chunk + tmp}// 继续读取下一块数据read();});}// 开始读取数据read();}).catch(error => {// 处理错误console.error('There was a problem with the fetch operation:', error);});}// EventSourceGetRequest();fetchPostRequest();</script>
</head>
<body><h1>SEE result</h1><div id="result"></div>
</body>
</html>
- 标准SSE示例
- 扩展SSE
相关文章:
LLM流式方案解决方案和客户端解决方案
背景 接上一篇《LLM大模型统一封装接口解决方案》架构确定后,流式方案非常规请求,需要特殊处理。 本解决方案就是针对上一篇中所需要的流式(打字机效果进行编码) 什么是SSE SSE(Server-Sent Events,服务器发…...
ROS2 高效学习系列
ROS2 高效学习系列 1 说明2 正文 1 说明 2023 年,我们总结输出了 ROS高效入门系列 和 ROS高效进阶系列,系统学习了 ros1 基础知识和 ros 的机器人算法。由于 ros2 的普及,我们将系统学习 ros2 ,包括 ros2 基础知识和相关高级组件…...
SpringBoot + MyBatisPlus分页查询
文章目录 1.思路分析2.分页查询后端实现1.com/sun/furn/config/MybatisConfig.java 注入MyBatisPlus分页拦截器2.com/sun/furn/controller/FurnController.java 添加方法3.postman测试 3.分页查询前端实现1.src/views/HomeView.vue 引入分页导航条组件2.src/views/HomeView.vue…...
记使用sjson的一次小事故
1. 前言 之前在设计一个兼容函数的时候,使用了sjson动态设入参数,从而实现一些参数的兼容。大致的逻辑如下所示: // 有一堆不规则的json数据 {"a":"aaa","b":"bbb","any_key1":{"k…...
如何在iOS系统抓取log
前言:因为作者目前工作领域和苹果智能家居有关,然后发现一些bug其实是apple sdk原生code的问题,所以需要给apple提radar单,就需要抓ios端Log充当证据给apple看,其实ios抓log非常简单,大家感兴趣可以学习下哦…...
【嵌入式——QT】Charts常见的图表的绘制
【嵌入式——QT】Charts常见的图表的绘制 柱状图QBarSetQBarSeriesQBarCategoryAxis图示 饼图堆叠柱状图百分比柱状图散点图和光滑曲线图代码示例 柱状图 QBarSet 用于创建柱状图的数据集。 主要函数 setLabel():设置数据集标签 ;setLabelBrush()&am…...
pandas读写excel,csv
1.读excel 1.to_dict() 函数基本语法 DataFrame.to_dict (self, orientdict , into ) --- 官方文档 函数种只需要填写一个参数:orient 即可 ,但对于写入orient的不同,字典的构造方式也不同,官网一共给出了6种,…...
清华大学突破性研究:GVGEN技术,7秒内从文字到3D高保真生成
引言:3D模型生成的挑战与机遇 随着计算机图形学的发展,3D模型的生成在各个行业中变得越来越重要,包括视频游戏设计、电影制作以及AR/VR技术等。在3D建模的不同方面中,从文本描述生成3D模型成为一个特别有趣的研究领域,…...
软件测试要学习的基础知识——黑盒测试
概述 黑盒测试也叫功能测试,通过测试来检测每个功能是否都能正常使用。在测试中,把程序看作是一个不能打开的黑盒子,在完全不考虑程序内部结构和内部特性的情况下,对程序接口进行测试,只检查程序功能是否按照需求规格…...
如何用Airtest脚本连接无线Android设备?
之前我们已经详细介绍过如何用AirtestIDE无线连接Android设备,它的关键点在于,需要先 adb connect 一次,才能点击 connect 按钮无线连接上该设备: 但是有很多同学,在使用纯Airtest脚本的形式连接无线设备时,…...
c语言函数大全(C开头)
c语言函数大全(C开头) There is no nutrition in the blog content. After reading it, you will not only suffer from malnutrition, but also impotence. The blog content is all parallel goods. Those who are worried about being cheated should leave quickly. 函数名…...
初始Redis关联和非关联
基础篇Redis 3.初始Redis 3.1.2.关联和非关联 传统数据库的表与表之间往往存在关联,例如外键: 而非关系型数据库不存在关联关系,要维护关系要么靠代码中的业务逻辑,要么靠数据之间的耦合: {id: 1,name: "张三…...
Redis 更新开源许可证 - 不再支持云供应商提供商业化的 Redis
原文:Rowan Trollope - 2024.03.20 未来的 Redis 版本将继续在 RSALv2 和 SSPLv1 双许可证下提供源代码的免费和宽松使用;这些版本将整合先前仅在 Redis Stack 中可用的高级数据类型和处理引擎。 从今天开始,所有未来的 Redis 版本都将以开…...
生产者Producer往BufferQueue中写数据的过程
In normal operation, the producer calls dequeueBuffer() to get an empty buffer, fills it with data, then calls queueBuffer() to make it available to the consumer 代码如下: // XXX: Tests that fork a process to hold the BufferQueue must run bef…...
使用 Vite 和 Bun 构建前端
虽然 Vite 目前可以与 Bun 配合使用,但它尚未进行大量优化,也未调整以使用 Bun 的打包器、模块解析器或转译器。 Vite 可以与 Bun 完美兼容。从 Vite 的模板开始使用吧。 bun create vite my-app ✔ Select a framework: › React ✔ Select a variant:…...
如何设置IDEA远程连接服务器开发环境并结合cpolar实现ssh远程开发
文章目录 1. 检查Linux SSH服务2. 本地连接测试3. Linux 安装Cpolar4. 创建远程连接公网地址5. 公网远程连接测试6. 固定连接公网地址7. 固定地址连接测试 本文主要介绍如何在IDEA中设置远程连接服务器开发环境,并结合Cpolar内网穿透工具实现无公网远程连接…...
【项目管理后台】Vue3+Ts+Sass实战框架搭建二
Vue3TsSass搭建 git cz的配置mock 数据配置viteMockServe 建立mock/user.ts文件夹测试一下mock是否配置成功 axios二次封装解决env报错问题,ImportMeta”上不存在属性“env” 统一管理相关接口新建api/index.js 路由的配置建立router/index.ts将路由进行集中封装&am…...
制作一个RISC-V的操作系统六-bootstrap program(risv 引导程序)
文章目录 硬件基本概念qemu-virt地址映射系统引导CSRmachine模式下的csr对应的csr指令csrrwcsrrs mhartid引导程序做的事情判断当前hart是不是第一个hart初始化栈跳转到c语言的…...
haproxy和keepalived的区别与联系
HAProxy(High Availability Proxy) 是一个开源的、高效且可靠的解决方案,主要用于负载均衡。它工作在应用层(第七层),支持多种协议,如HTTP、HTTPS、FTP等。HAProxy通过健康检查机制持续监控后…...
云效 AppStack + 阿里云 MSE 实现应用服务全链路灰度
作者:周静、吴宇奇、泮圣伟 在应用开发测试验证通过后、进行生产发布前,为了降低新版本发布带来的风险,期望能够先部署到灰度环境,用小部分业务流量进行全链路灰度验证,验证通过后再全量发布生产。本文主要介绍如何通…...
深度解析:ctfileGet如何实现城通网盘直链解析的3大技术突破
深度解析:ctfileGet如何实现城通网盘直链解析的3大技术突破 【免费下载链接】ctfileGet 获取城通网盘一次性直连地址 项目地址: https://gitcode.com/gh_mirrors/ct/ctfileGet ctfileGet是一款专为城通网盘设计的开源直链解析工具,通过创新的技术…...
QMCDecode:终极QQ音乐格式解密指南,一键解放你的加密音乐库
QMCDecode:终极QQ音乐格式解密指南,一键解放你的加密音乐库 【免费下载链接】QMCDecode QQ音乐QMC格式转换为普通格式(qmcflac转flac,qmc0,qmc3转mp3, mflac,mflac0等转flac),仅支持macOS,可自动识别到QQ音乐下载目录&…...
探索NHSE:动物森友会存档编辑器的7个隐藏技巧
探索NHSE:动物森友会存档编辑器的7个隐藏技巧 【免费下载链接】NHSE Animal Crossing: New Horizons save editor 项目地址: https://gitcode.com/gh_mirrors/nh/NHSE 你是否曾梦想在动物森友会中拥有无限资源?是否渴望打造完美岛屿却受限于游戏机…...
如何为sync-settings开发自定义存储位置插件:完整开发者指南
如何为sync-settings开发自定义存储位置插件:完整开发者指南 【免费下载链接】sync-settings Synchronize all your settings and packages across atom instances 项目地址: https://gitcode.com/gh_mirrors/sy/sync-settings 你是否想要为Atom的sync-setti…...
机器学习赋能冷等离子体种子处理:Extra Trees模型精准预测发芽率提升
1. 项目概述与核心价值 在精准农业的探索前沿,我们常常面临一个看似简单却极其关键的挑战:如何在不损伤种子的前提下,有效提升其发芽率和幼苗活力?传统方法依赖大量重复的田间试验,周期长、成本高,且结果受…...
Java NIO.2 异步基石:AsynchronousChannel 接口契约与并发安全深度剖析
前言:异步 I/O 的“宪法级”契约 在 Java NIO.2(AIO)的宏大架构中,AsynchronousChannel 是所有异步通道的根接口。它不定义任何具体的读写方法,也不关心网络拓扑或文件偏移——它只做一件事:确立异步 I/O 操…...
ZS315Q Type-C转DP1.4带PD100w方案,边投屏边充电,告别接口焦虑
作为轻薄本、游戏本用户,外接DP显示器时你是不是也遇到过这样的痛点:想投屏到大屏工作娱乐,Type-C接口被视频线占了,充电口就得另占一个,本来接口就没几个,鼠标U盘全都排不上队;更烦人的是就算不…...
12周学习笔记
...
OpenCL图像格式兼容性与性能优化指南
1. OpenCL图像格式兼容性解析在OpenCL编程中,图像处理是一个核心功能模块。图像格式的正确配置直接影响着内核程序的执行效率和结果的准确性。CL_UNSIGNED_INT8和CL_RGB作为OpenCL中常见的图像格式参数,它们的兼容性问题值得深入探讨。1.1 图像格式描述符…...
实战避坑:在Unity里用A*做2D网格寻路,我踩过的性能坑和优化方案都在这了
Unity中A*算法性能优化的实战指南当你在Unity项目中实现了一个基础A寻路系统后,随着游戏单位数量增加或地图规模扩大,性能问题往往会突然出现。帧率下降、卡顿现象频发,这些问题在移动端或需要大量单位同时寻路的RTS、塔防类游戏中尤为明显。…...

