【rabbitmq】实现问答消息消费示例
目录
- 1. 说明
- 2. 截图
- 2.1 接口调用截图
- 2.2 项目结构截图
- 3. 代码示例
1. 说明
- 1.实现的是一个简单的sse接口,单向的长连接,后端可以向前端不断输出数据。
- 2.通过调用sse接口,触发rabbitmq向队列塞消息,向前端返回一个sseEmitter对象。
- 3.rabbitmq监听队列消息,消费消息后,向sseEmitter对象写入内容。
- 4.当业务逻辑结束,调用emitter.complete()方法,结束此次会话。
- 5.这里举一个问答的示例,采用的是work模式,逻辑比较简单,仅供参考。
2. 截图
2.1 接口调用截图

2.2 项目结构截图

3. 代码示例
- 1.pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.4</version></parent><groupId>com.learning</groupId><artifactId>springboot</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.5.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.12.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><!--打jar包使用--><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
- 2.application.yaml
spring:# rabbbitmq配置信息rabbitmq:host: 192.168.2.11port: 5672username: adminpassword: adminvirtual-host: /
- 3.rabbitmq配置类
package com.learning.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** rabbitmq配置类*/
@Configuration
public class RabbitMQConfig{/*** 存sseEmitter*/@Bean("emitterMap")public ConcurrentMap<String, SseEmitter> emitterMap(){ConcurrentMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();return emitters;}/*** 工作模式,交换机名*/public static final String EXCHANGE_NAME = "work_exchange";/*** 工作模式,队列名*/public static final String QUEUE_NAME = "work_queue"; @Bean("work_queue")public Queue queue() { return new Queue(QUEUE_NAME, true); } @Bean("work_exchange")public Exchange exchange() {return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();} @Bean public Binding binding(@Qualifier("work_queue") Queue queue,@Qualifier("work_exchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("work_routing_key").noargs();}
}
- 4.controller类
package com.learning.controller;import com.learning.service.QuestionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;/*** @Author wangyouhui* @Description 获取答案**/
@RestController
@RequestMapping("/question")
public class QuestionController {@Autowiredprivate QuestionService questionService;@Autowiredprivate ConcurrentMap<String, SseEmitter> emitterMap;@PostMapping(value="/ask", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter ask(@RequestParam String question) {String questionId = UUID.randomUUID().toString();SseEmitter emitter = new SseEmitter();emitterMap.put(questionId, emitter);questionService.ask(questionId, question);return emitter;}
}
- 5.消息实体
package com.learning.dto;import lombok.Data;import java.io.Serializable;/*** @Author wangyouhui* @Description 消息**/
@Data
public class MessageDTO implements Serializable {private String questionId;private String question;private String answer;private Boolean end;
}
- 6.service实现类
package com.learning.service.impl;import com.fasterxml.jackson.databind.ObjectMapper;
import com.learning.config.RabbitMQConfig;
import com.learning.dto.MessageDTO;
import com.learning.service.QuestionService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.concurrent.ConcurrentMap;/*** @Author wangyouhui* @Description**/
@Service
public class QuestionServiceImpl implements QuestionService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate ConcurrentMap<String, SseEmitter> emitterMap;@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, ackMode = "MANUAL")public void receiveMessage(Message message, Channel channel) {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {String json = new String(message.getBody());ObjectMapper mapper = new ObjectMapper();MessageDTO messageDTO = mapper.readValue(json, MessageDTO.class);SseEmitter sseEmitter = emitterMap.get(messageDTO.getQuestionId());if(sseEmitter != null){sseEmitter.send(messageDTO);}if(messageDTO.getEnd() != null && messageDTO.getEnd()){sseEmitter.complete();emitterMap.remove(messageDTO.getQuestionId());}// 手动签收channel.basicAck(deliveryTag, false);} catch (IOException e) {e.printStackTrace();// 拒绝签收,消息重新入队try {channel.basicReject(deliveryTag, true);} catch (IOException ioException) {ioException.printStackTrace();}}}@Overridepublic void ask(String questionId, String question) {MessageDTO message1 = new MessageDTO();message1.setQuestionId(questionId);message1.setQuestion(question);message1.setAnswer("您好,这个");message1.setEnd(false);this.sendMessage(message1);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}MessageDTO message2 = new MessageDTO();message2.setQuestionId(questionId);message2.setQuestion(question);message2.setAnswer("您好,这个是答案");message2.setEnd(false);this.sendMessage(message2);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}MessageDTO message3 = new MessageDTO();message3.setQuestionId(questionId);message3.setQuestion(question);message3.setAnswer("您好,这个是答案,请问是否能解决你的问题");message3.setEnd(true);this.sendMessage(message3);}public void sendMessage(MessageDTO message){ObjectMapper mapper = new ObjectMapper();String json = null;try {json = mapper.writeValueAsString(message);rabbitTemplate.convertAndSend("work_exchange", "work_routing_key", json);} catch (Exception e) {e.printStackTrace();}}
}
- 7.service接口
package com.learning.service;/*** @Author wangyouhui* @Description **/
public interface QuestionService {void ask(String questionId, String question);
}
- 8.应用类
package com.learning;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @Author wangyouhui* @Description**/
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}
相关文章:
【rabbitmq】实现问答消息消费示例
目录 1. 说明2. 截图2.1 接口调用截图2.2 项目结构截图 3. 代码示例 1. 说明 1.实现的是一个简单的sse接口,单向的长连接,后端可以向前端不断输出数据。2.通过调用sse接口,触发rabbitmq向队列塞消息,向前端返回一个sseEmitter对象…...
单片机_RTOS__架构概念
经典单片机程序 void main() {while(1){函数1();函数2();}} 有无RTOS区别 裸机 RTOS RTOS程序 喂饭() {while(1){喂一口饭();} } …...
ClickHouse在百度MEG数据中台的落地和优化
导读 百度MEG上一代大数据产品存在平台分散、质量不均和易用性差等问题,导致开发效率低下、学习成本高,业务需求响应迟缓。为了解决这些问题,百度MEG内部开发了图灵3.0生态系统,包括Turing Data Engine(TDE)计算引擎、Turing Dat…...
B/S架构(Browser/Server)与C/S架构(Client/Server)
基本概念 B/S架构(Browser/Server):即浏览器/服务器架构。在这种架构中,用户通过浏览器(如Chrome、Firefox、Safari等)访问服务器上的应用程序。服务器端负责处理业务逻辑、存储数据等核心功能,…...
idea中自定义注释模板语法
文章目录 idea 自定义模板语法1.自定义模板语法是什么?2.如何在idea中设置呢? idea 自定义模板语法 1.自定义模板语法是什么? 打开我的idea,创建一个测试类: 这里看到我的 test 测试类里面会有注释,这是怎…...
基于SSM的儿童教育网站【附源码】
基于SpringBoot的课程作业管理系统(源码L文说明文档) 目录 4 系统设计 4.1 系统概述 4.2 系统模块设计 4.3.3 数据库表设计 5 系统实现 5.1 管理员功能模块的实现 5.1.1 视频列表 5.1.2 文章信息管理 5.1.3 文章类…...
深挖自闭症病因与孩子表现的关联
自闭症,亦称为孤独症,乃是一种对儿童发展有着严重影响的神经发育障碍性疾病。深入探寻自闭症的病因与孩子表现之间的联系,对于更深刻地理解并助力自闭症儿童而言,可谓至关重要。 当前,自闭症的病因尚未完全明晰&#x…...
[网络协议篇] UDP协议
文章目录 1. 简介2. 特点3. UDP数据报结构4. 基于UDP的应用层协议5. UDP安全性问题6. 使用udp传输数据的系统就一定不可靠吗?7. 基于UDP的主机探活 python实现 1. 简介 User Datagram Protocol,用户数据报协议,基于IP协议提供面向无连接的网…...
关系型数据库(1)----MySQL(初阶)
目录 1.mysql 2.mysqld 3.mysql架构 1.连接层 2.核心服务层 3.存储引擎层 4.数据存储层 4.SQL分类 5.MySQL操作库 6.MySQL数据类型 1. 数值类型 2. 日期和时间类型 3. 字符串类型 4. 空间类型 5. JSON数据类型 7.MySQL表的约束 1. 主键约束(PRIMARY…...
计算机毕业设计Python+大模型租房推荐系统 租房大屏可视化 租房爬虫 hadoop spark 58同城租房爬虫 房源推荐系统
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 用到的技术: 1. python…...
深度学习技术演进:从 CNN、RNN 到 Transformer 的发展与原理解析
深度学习的技术演进经历了从卷积神经网络(CNN)到循环神经网络(RNN)再到 Transformer 的重要发展。这三个架构分别擅长处理图像、序列数据和多种任务的特征,标志着深度学习在不同领域取得的进步。 1. 卷积神经网络&…...
Lua中的goto语句
软考鸭微信小程序 过软考,来软考鸭! 提供软考免费软考讲解视频、题库、软考试题、软考模考、软考查分、软考咨询等服务 在Lua编程语言中,goto语句是一种跳转语句,用于将程序的执行流程无条件地转移到程序中的另一个位置。这个位置由一个标签(…...
【rust实战】rust博客系统2_使用wrap启动rust项目服务
如何创建一个使用warp框架的rust项目1.使用cargo 创建项目 cargo new blog 2.添加warp依赖 1.cd blog 2.编辑Cargo.toml文件 添加warp 和 tokio 作为依赖项 在[dependencies]中添加 [package] name "blog" version "0.1.0" …...
【实战案例】Django框架使用模板渲染视图页面及异常处理
本文基于之前内容列表如下: 【图文指引】5分钟搭建Django轻量级框架服务 【实战案例】Django框架基础之上编写第一个Django应用之基本请求和响应 【实战案例】Django框架连接并操作数据库MySQL相关API 视图概述 Django中的视图的概念是一类具有相同功能和模板的网…...
设置K8s管理节点异常容忍时间
说明 每个节点上的 kubelet 需要定时向 apiserver 上报当前节点状态,如果两者间网络异常导致心跳终端,kube-controller-manager 中的 NodeController 会将该节点标记为 Unknown 或 Unhealthy,持续一段时间异常状态后 kube-controller-manage…...
什么样的JSON编辑器才好用
简介 JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,易于人阅读和编写,同时也便于机器解析和生成。随着互联网和应用程序的快速发展,JSON已经成为数据传输和存储的主要格式之一。在处理和编辑JSON数据…...
ArkUI自定义TabBar组件
在ArkUI中的Tabs,通过页签进行内容视图切换的容器组件,每个页签对应一个内容视图。其中内容是图TabContent作为Tabs的自组件,通过给TabContent设置tabBar属性来自定义导航栏样式。现在我们就根据UI设计的效果图来实现下图效果: 根…...
pair类型应用举例
在main.cpp里输入程序如下: #include <iostream> //使能cin(),cout(); #include <utility> //使能pair数据类型; #include <string> //使能string字符串; #include <stdlib.h> //使能exit(); //pair类型可以将两个相同的或不同类…...
数字 图像处理算法的形式
一 基本功能形式 按图像处理的输出形式,图像处理的基本功能可分为三种形式。 1)单幅图像 单幅图像 2)多幅图像 单幅图像 3)单(或多)幅图像 数字或符号等 二 几种具体算法形式 1.局部处理邻域对于任一…...
安徽对口高考Python试题选:输入一个正整数,然后输出该整数的3的幂数相加形式。
第一步:求出3的最高次幂是多少 guoint(input("请输入一个正整数:")) iguo a0 while i>0: if 3**i<guo: ai break ii-1print(a)#此语句为了看懂题目,题目中不需要打印出最高幂数 第二步…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...
Spark 之 入门讲解详细版(1)
1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...
centos 7 部署awstats 网站访问检测
一、基础环境准备(两种安装方式都要做) bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats࿰…...
【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...
linux 错误码总结
1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...
从零实现STL哈希容器:unordered_map/unordered_set封装详解
本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说,直接开始吧! 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...
JDK 17 新特性
#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持,不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的ÿ…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
【生成模型】视频生成论文调研
工作清单 上游应用方向:控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...
人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式
今天是关于AI如何在教学中增强学生的学习体验,我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育,这并非炒作,而是已经发生的巨大变革。教育机构和教育者不能忽视它,试图简单地禁止学生使…...
