从零开始搭建AI网站(6):如何使用响应式编程
响应式编程(Reactive Programming)是一种编程范式,旨在处理异步数据流和事件流。它通过使用观察者模式和函数式编程的概念,将数据流和事件流抽象为可观察的序列,然后通过操作这些序列来实现各种功能。
在响应式编程中,数据流和事件流被视为连续的时间序列,可以通过操作符来转换、过滤和组合它们。这种编程范式的主要优势是它可以简化异步编程,并提供一种声明式的方式来处理数据流和事件流。它还可以提高代码的可读性和可维护性,因为它将复杂的异步逻辑封装在操作符中,使得代码更易于理解和修改。
响应式编程可以应用于各种领域,包括前端开发、后端开发、移动开发等。在前端开发中,响应式编程可以用于处理用户界面的事件流和数据流,使得界面能够动态地响应用户的操作。在后端开发中,响应式编程可以用于处理大量的异步请求和数据流,提高系统的吞吐量和响应速度。
常见的响应式编程框架包括RxJava、RxJS、ReactiveX等。这些框架提供了一系列的操作符和工具,用于处理数据流和事件流,并提供了一种简洁而强大的方式来处理异步编程。
当前响应式编程的典型例子莫过于最近炙手可热的ChatGPT的流式输出了。因为ChatGPT请求响应时间较长,如果采用传统的一直等待全部数据就绪,用户恐怕早就跑光了,而响应式方式则不需要等待所有数据就绪,而只需要有部分数据就绪即可输出,从而极大地提升了用户体验。下面以此为例,来说明实现这种效果的原理(开发语言Java)。
先来看看上文中提到的的三个响应式编程框架:RxJava、RxJS和ReactiveX。它们是三个相关的概念,同时也是不同平台上的实现。
- RxJava:RxJava是ReactiveX在Java平台上的实现,它提供了一套丰富的API和操作符,用于处理异步和事件驱动的编程。RxJava是基于观察者模式和迭代器模式的,可以用于处理数据流、事件流和异步任务等。
- RxJS:RxJS是ReactiveX在JavaScript平台上的实现,它提供了类似于RxJava的API和操作符,用于处理异步和事件驱动的编程。RxJS可以在浏览器端和Node.js环境中使用,可以处理DOM事件、AJAX请求、定时器等。
- ReactiveX:ReactiveX是一个跨平台的响应式编程库,它提供了一套统一的API和操作符,用于处理异步和事件驱动的编程。ReactiveX的目标是提供一种通用的编程模型,使得开发者可以在不同的平台和语言中共享代码和思想。
在Springboot中,另有WebFlux模块可供使用,同时它也可以跟上面的模块一起使用。说起Flux,这里也会涉及到另一个概念:Flowable。其实Flowable和Flux都是响应式流的实现,它们有以下关系:
- Flowable是RxJava的一部分,而Flux是Reactor的一部分。RxJava是一个用于Java的响应式编程库,而Reactor是一个用于Java的响应式编程框架。
- Flowable是RxJava中的一个类,它实现了Reactive-Streams规范,提供了对背压(backpressure)的支持。Flowable可以处理异步和并发的数据流,并且可以控制数据流的速率,以避免生产者和消费者之间的不匹配。
- Flux是Reactor中的一个类,它也实现了Reactive-Streams规范,提供了类似的功能。Flux可以处理异步和并发的数据流,并且可以控制数据流的速率。
- Flowable和Flux都提供了一系列的操作符,可以对数据流进行转换、过滤、映射等操作。这些操作符可以帮助开发者处理和操作数据流,使代码更加简洁和可读。
跟tRxJava和Reactor密切相关的开发库之一是WebClien。WebClient是一个用于发送HTTP请求的非阻塞的响应式客户端,它是Reactor项目的一部分。
WebClient提供了一种简洁、灵活和可组合的方式来发送HTTP请求,并处理响应。它可以与RxJava和Reactor的异步和响应式编程模型无缝集成,使得在响应式应用程序中处理HTTP请求变得更加方便和高效。
WebClient可以与RxJava的Flowable一起使用,通过toFlowable()方法将响应转换为Flowable流,从而实现对响应的处理和操作。
WebClient webClient = WebClient.create();
Flowable<String> response = webClient.get().uri("https://example.com").retrieve().bodyToFlux(String.class).toFlowable();
同样,WebClient也可以与Reactor的Flux一起使用,通过bodyToFlux()方法将响应转换为Flux流,从而实现对响应的处理和操作。
WebClient webClient = WebClient.create();
Flux<String> response = webClient.get().uri("https://example.com").retrieve().bodyToFlux(String.class);
下面我们将关注点放在Reactor框架中,在Reactor中,不得不提另一个跟Flux相对的概念:Mono。Flux和Mono是Reactor框架中的两个关键类,它们都是用于处理响应式流的。
- Flux是一个表示0到N个元素的响应式流。它可以发出多个元素,并以异步的方式产生这些元素。Flux可以用于处理多个值的数据流,例如从数据库查询结果、文件读取等。
- Mono是一个表示0或1个元素的响应式流。它要么发出一个元素,要么不发出任何元素。Mono可以用于处理单个值的数据流,例如从缓存中获取数据、获取单个实体等。
- Flux和Mono之间有以下关系:
- Flux可以被转换成Mono。
Flux<Integer> flux = Flux.just(1, 2, 3);
Mono<Integer> mono = flux.single();
-
- Mono可以被转换成Flux。
Mono<Integer> mono = Mono.just(1);
Flux<Integer> flux = mono.flux();
Flux和Mono可以通过一系列的操作符进行转换、过滤、映射等操作,使得对响应式流的处理变得更加灵活和方便。它们是Reactor框架中的核心类,用于构建响应式应用程序。
webClient可以实现复杂的处理逻辑,比如异常处理:
webClient.get().uri(url).retrieve().onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new CustomException("客户端错误"))).onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new CustomException("服务器错误"))).bodyToMono(String.class).onErrorResume(throwable -> {if (throwable instanceof WebClientResponseException) {WebClientResponseException ex = (WebClientResponseException) throwable;// 处理响应异常} else {// 处理其他异常}});
在使用 Spring Boot 的 WebClient 时,bodyToMono 和 bodyToFlux 方法都可以用于将响应体转换为 Mono 或 Flux 对象。
bodyToMono 方法用于将响应体转换为 Mono 对象,适用于响应体只有一个元素的情况,例如返回一个 JSON 对象或者一个字符串。
bodyToFlux 方法用于将响应体转换为 Flux 对象,适用于响应体有多个元素的情况,例如返回一个 JSON 数组或者一个流式数据。
因此,当我们需要处理的响应体只有一个元素时,应该使用 bodyToMono 方法;当我们需要处理的响应体有多个元素时,应该使用 bodyToFlux 方法。
在 Reactor 中,Flux 流结束的实现原理是通过发送一个 onComplete 信号来通知订阅者流已经结束。当 Flux 流中的所有元素都被消费完毕时,会自动发送一个 onComplete 信号。
例如,当我们使用 Flux.range(1, 10) 创建一个包含 1 到 10 的整数序列的 Flux 流时,当订阅者订阅该流并消费完所有元素后,会自动发送一个 onComplete 信号来通知订阅者流已经结束。
在使用 Spring Boot 的 WebClient 时,当我们使用 bodyToFlux 方法将响应体转换为 Flux 对象时,如果响应体是一个流式数据,那么当流式数据传输完毕后,会自动发送一个 onComplete 信号来通知订阅者流已经结束。
webClient.get().uri(url).retrieve().bodyToFlux(String.class).doFinally(signalType -> {if (signalType == SignalType.ON_COMPLETE) {System.out.println("流已结束");}}).subscribe();
有了这些基础知识的准备,我们再来看看ChatGPT的响应结果样例。OpenAI的聊天接口是:
http://api.openai.com/v1/chat/completitions。
该接口接受这样的一个请求数据结构:ChatCompletionRequest。其中有个属性stream 可以设定是否采用流输出。默认false。
这个例子是非stream输出,输出格式为:ChatCompletionResponse
$ curl https://api.openai.com/v1/chat/completions -H 'Content-Type: application/json' -H "Authorization: Bearer sk-zDxkX0Na0e63B18c9c6bT3BlBkFJf3De387b398749c5bD1d" -d '{"model": "gpt-3.5-turbo","stream":"false","messages": [{"role": "user", "content": "Hello!"}]}'
{"id":"chatcmpl-7tywVQ4vSPzs8yuZy5FqvL0CX07W0","object":"chat.completion","created":1693576659,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"message":{"role":"assistant","content":"Hello
这个例子是stream输出,输出结构体为:字符串格式的ChatCompletionResponse:
curl https://api.openai.com/v1/chat/completions -H 'Content-Type: application/json' -H "Authorization: Bearer sk-zDxkX0Na0e63B18c9c6bT3BlBkFJf3De387b398749c5bD1d" -d '{"model": "gpt-3.5-turbo","stream":"true","messages": [{"role": "user", "content": "Hello!"}]}'
比较stream和非stream的输出区别,有一下几点:
1.非stream 输出只有一条记录;stream 有若干条,取决于响应内容大小;
2. 非stream 输出包含消耗的tokens数量,stream 没有;
3. 非stream 输出结果是json格式的ChatCompletionResponse结构,stream 输出j格式类似:data:str(ChatCompletionResponse),同时以data:[NONE]结尾;
结合上面的知识,我们就能实现上述功能:
public Publisher<String> generateChatCompletion(ChatCompletionRequest chatCompletionRequest) {WebClient.ResponseSpec responseSpec = webClient.post().uri(this.apiUrl + "/chat/completions").header("Authorization", "Bearer " + this.apiKey)// .accept(MediaType.TEXT_EVENT_STREAM) .bodyValue(chatCompletionRequest).retrieve();if (chatCompletionRequest.getStream())return responseSpec.bodyToFlux(ChatCompletionResponse.class).onErrorResume(error -> {// 异常处理逻辑 logger.error("bodyToFlux error: {}", error);return Flux.empty();}).filter(response -> {ChatMessage message = response.getChoices().get(0).getMessage();if (message != null) {String content = message.getContent();return StringUtils.isNotEmpty(StringUtils.trim(content));}return false;}).mapNotNull(response -> {try {return objectMapper.writeValueAsString(response);} catch (JsonProcessingException e) {logger.error(e);return null;}}).concatWithValues("[DONE]");else return responseSpec.bodyToMono(ChatCompletionResponse.class).onErrorResume(error -> {// 异常处理逻辑 logger.error("bodyToMono error: {}", error);return Mono.empty();}).mapNotNull(response -> {try {return objectMapper.writeValueAsString(response);} catch (JsonProcessingException e) {logger.error(e);return null;}});}
Publisher是一个通用的概念,它代表一个发布者,可以发布数据或事件。在Spring WebFlux中,Flux和Mono都是Publisher的实现类。
试用地址:https://chatgpt-discount.zeabur.app
相关文章:

从零开始搭建AI网站(6):如何使用响应式编程
响应式编程(Reactive Programming)是一种编程范式,旨在处理异步数据流和事件流。它通过使用观察者模式和函数式编程的概念,将数据流和事件流抽象为可观察的序列,然后通过操作这些序列来实现各种功能。 在响应式编程中…...

MPI之虚拟进程拓扑
什么是虚拟进程拓扑 在很多并行应用进程中,进程的线性排列不能充分的反映进程间在逻辑上的通信模型,通常由问题几何和所用的算法决定,进程经常被排列成二维或者三维网络形式的拓扑模型而通常用一个图来描述逻辑进程排列,此种逻辑…...

Three.js相机参数及Z-Fighting问题的解决方案
本主题讨论透视相机以及如何为远距离环境设置合适的视锥体。 推荐:用 NSDT编辑器 快速搭建可编程3D场景 透视相机是一种投影模式,旨在模仿人类在现实世界中看待事物的方式。 这是渲染 3D 场景最常用的投影模式。 - three.js 如果你看一下 Three.js 文档…...
微信小程序食疗微信小程序的设计与实现
摘要 现在人们的生活水平高了,大家都想在多活个几十年,要想实现这个想法,有很多事情都必须考虑到,第一个就是适当运动,第二个就是心情好,第三个就是要注意饮食。民以食为天,科学合理的饮食结构是…...
mac环境使用pkgbuild命令打pkg包的几个小细节
mac环境使用pkgbuild命令打pkg包的几个小细节 最近,研发提出要使用jenkins来自动生成mac环境下的pkg包,研究了一下,可以使用pkgbuild来打包。但是有几个小细节需要注意一下: 1 如果有pre-install和post-install脚本,…...

在 Spring Boot 中集成 MinIO 对象存储
MinIO 是一个开源的对象存储服务器,专注于高性能、分布式和兼容S3 API的存储解决方案。本文将介绍如何在 Spring Boot 应用程序中集成 MinIO,以便您可以轻松地将对象存储集成到您的应用中。 安装minio 拉取 minio Docker镜像 docker pull minio/minio创…...

seq2seq与引入注意力机制的seq2seq
1、什么是 seq2seq? 就是字面意思,“句子 到 句子”。比如翻译。 2、seq2seq 有一些特点 seq2seq 的整体架构是 “编码器-解码器”。 其中,编码器是 RNN,并将 最后一个hidden state(隐藏状态)【即&…...

【zookeeper】zookeeper介绍
分布式协调技术 在学习ZooKeeper之前需要先了解一种技术——分布式协调技术。那么什么是分布式协调技术?其实分布式协调技术主要用来解决分布式环境当中多个进程之间的同步控制,让他们有序的去访问某种临界资源,防止造成"脏数据"的…...

2023高教社杯数学建模思路 - 案例:ID3-决策树分类算法
文章目录 0 赛题思路1 算法介绍2 FP树表示法3 构建FP树4 实现代码 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 算法介绍 FP-Tree算法全称是FrequentPattern Tree算法,就是频繁模…...

springboot docker
在Spring Boot中使用Docker可以帮助你将应用程序与其依赖的容器化,并简化部署和管理过程。 当你在Spring Boot中使用Docker时,你的代码不需要特殊的更改。你可以按照通常的方式编写Spring Boot应用程序。 java示例代码,展示了如何编写一个基…...

docker-compose 部署nacos 整合 postgresql 为DB
标题docker-compose 部署nacos 整合 postgresql 为DB 前提: 已经安装好postgresql数据库 先创建好一个数据库 nacos,执行以下sql: /** Copyright 1999-2018 Alibaba Group Holding Ltd.** Licensed under the Apache License, Version 2.0 (the "…...

详解 ElasticSearch Kibana 配置部署
默认安装部署所在机器允许外网 SSH工具 Putty 链接:https://pan.baidu.com/s/1b6gumtsjL_L64rEsOdhd4A 提取码:lxs9 Winscp 链接:https://pan.baidu.com/s/1tD8_2knvv0EJ5OYvXP6VTg 提取码:lxs9 WinSCP安装直接下一步到完成…...

SourceTree 使用技巧
参考资料 SourceTree使用教程(一)—克隆、提交、推送SourceTree的软合并、混合合并、强合并区别SourceTree 合并分支上的多个提交,一次性合并分支的多次提交至另一分支,主分支前进时的合并冲突解决 目录 一. 基础设置1.1 用户信息…...

VIRTIO-BLK代码分析(0)概述
也无风雨也无晴。- 苏轼(宋) 接下来介绍VIRTIO相关内容。首先从VIRTIO-BLK开始分析,VIRTIO-BLK各部分交互图如下所示: 这里包含以下几个部分: Guest UserSpace:虚拟机用户空间,如虚拟机中运行f…...
【2023年11月第四版教材】第10章《进度管理》(第一部分)
第10章《进度管理》(第一部分) 1 章节说明2 管理基础3 管理过程3.1 管理的过程★★★3.2 管理ITTO汇总★★★ 1 章节说明 【本章分值预测】大部分内容不变,细节有一些变化,预计选择题考3-4分,案例和论文 都有可能考&a…...

【多线程案例】生产者消费者模型(堵塞队列)
文章目录 1. 什么是堵塞队列?2. 堵塞队列的方法3. 生产者消费者模型4. 自己实现堵塞队列 1. 什么是堵塞队列? 堵塞队列也是队列,故遵循先进先出的原则。但堵塞队列是一种线程安全的数据结构,可以避免线程安全问题,当队…...

数据结构与算法基础-学习-30-插入排序之直接插入排序、二分插入排序、希尔排序
一、排序概念 将一组杂乱无章的数据按一定规律顺次排列起来。 将无序序列排成一个有序序列(由小到大或由大到小)的运算。 二、排序方法分类 1、按数据存储介质 名称描述内部排序数据量不大、数据在内存,无需内外交换存交换存储。外部排序…...

Qt+C++桌面计算器源码
程序示例精选 QtC桌面计算器源码 如需安装运行环境或远程调试,见文章底部个人QQ名片,由专业技术人员远程协助! 前言 这篇博客针对<<QtC桌面计算器源码>>编写代码,代码整洁,规则,易读。 学习与…...

kubesphere安装Maven+JDK17 流水线打包
kubesphere 3.4.0版本,默认支持的jav版本是8和11,不支持17 。需要我们自己定义JenKins Agent 。方法如下: 一、构建镜像 1、我们需要从Jenkins Agent的github仓库拉取master最新源码,最新源码里已经支持jdk17了。 git clone ht…...

百度搜索清理大量低质量网站
我是卢松松,点点上面的头像,欢迎关注我哦! 据部分站长爆料:百度大规模删低质量网站的百度资源站长平台权限,很多网站都被删除了百度站长资源平台后台权限,以前在百度后台添加的网站大量被删除!…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)
题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
c++ 面试题(1)-----深度优先搜索(DFS)实现
操作系统:ubuntu22.04 IDE:Visual Studio Code 编程语言:C11 题目描述 地上有一个 m 行 n 列的方格,从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子,但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

CMake 从 GitHub 下载第三方库并使用
有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...
JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案
JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停 1. 安全点(Safepoint)阻塞 现象:JVM暂停但无GC日志,日志显示No GCs detected。原因:JVM等待所有线程进入安全点(如…...

深度学习习题2
1.如果增加神经网络的宽度,精确度会增加到一个特定阈值后,便开始降低。造成这一现象的可能原因是什么? A、即使增加卷积核的数量,只有少部分的核会被用作预测 B、当卷积核数量增加时,神经网络的预测能力会降低 C、当卷…...

永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器
一、原理介绍 传统滑模观测器采用如下结构: 传统SMO中LPF会带来相位延迟和幅值衰减,并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF),可以去除高次谐波,并且不用相位补偿就可以获得一个误差较小的转子位…...

Xela矩阵三轴触觉传感器的工作原理解析与应用场景
Xela矩阵三轴触觉传感器通过先进技术模拟人类触觉感知,帮助设备实现精确的力测量与位移监测。其核心功能基于磁性三维力测量与空间位移测量,能够捕捉多维触觉信息。该传感器的设计不仅提升了触觉感知的精度,还为机器人、医疗设备和制造业的智…...