【rabbitmq】实现问答消息消费示例
目录
- 1. 说明
- 2. 截图
- 2.1 接口调用截图
- 2.2 项目结构截图
- 3. 代码示例
1. 说明
- 1.实现的是一个简单的sse接口,单向的长连接,后端可以向前端不断输出数据。
- 2.通过调用sse接口,触发rabbitmq向队列塞消息,向前端返回一个sseEmitter对象。
- 3.rabbitmq监听队列消息,消费消息后,向sseEmitter对象写入内容。
- 4.当业务逻辑结束,调用emitter.complete()方法,结束此次会话。
- 5.这里举一个问答的示例,采用的是work模式,逻辑比较简单,仅供参考。
2. 截图
2.1 接口调用截图

2.2 项目结构截图

3. 代码示例
- 1.pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.4</version></parent><groupId>com.learning</groupId><artifactId>springboot</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.5.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.12.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><!--打jar包使用--><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
- 2.application.yaml
spring:# rabbbitmq配置信息rabbitmq:host: 192.168.2.11port: 5672username: adminpassword: adminvirtual-host: /
- 3.rabbitmq配置类
package com.learning.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** rabbitmq配置类*/
@Configuration
public class RabbitMQConfig{/*** 存sseEmitter*/@Bean("emitterMap")public ConcurrentMap<String, SseEmitter> emitterMap(){ConcurrentMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();return emitters;}/*** 工作模式,交换机名*/public static final String EXCHANGE_NAME = "work_exchange";/*** 工作模式,队列名*/public static final String QUEUE_NAME = "work_queue"; @Bean("work_queue")public Queue queue() { return new Queue(QUEUE_NAME, true); } @Bean("work_exchange")public Exchange exchange() {return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();} @Bean public Binding binding(@Qualifier("work_queue") Queue queue,@Qualifier("work_exchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("work_routing_key").noargs();}
}
- 4.controller类
package com.learning.controller;import com.learning.service.QuestionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;/*** @Author wangyouhui* @Description 获取答案**/
@RestController
@RequestMapping("/question")
public class QuestionController {@Autowiredprivate QuestionService questionService;@Autowiredprivate ConcurrentMap<String, SseEmitter> emitterMap;@PostMapping(value="/ask", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter ask(@RequestParam String question) {String questionId = UUID.randomUUID().toString();SseEmitter emitter = new SseEmitter();emitterMap.put(questionId, emitter);questionService.ask(questionId, question);return emitter;}
}
- 5.消息实体
package com.learning.dto;import lombok.Data;import java.io.Serializable;/*** @Author wangyouhui* @Description 消息**/
@Data
public class MessageDTO implements Serializable {private String questionId;private String question;private String answer;private Boolean end;
}
- 6.service实现类
package com.learning.service.impl;import com.fasterxml.jackson.databind.ObjectMapper;
import com.learning.config.RabbitMQConfig;
import com.learning.dto.MessageDTO;
import com.learning.service.QuestionService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.concurrent.ConcurrentMap;/*** @Author wangyouhui* @Description**/
@Service
public class QuestionServiceImpl implements QuestionService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate ConcurrentMap<String, SseEmitter> emitterMap;@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, ackMode = "MANUAL")public void receiveMessage(Message message, Channel channel) {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {String json = new String(message.getBody());ObjectMapper mapper = new ObjectMapper();MessageDTO messageDTO = mapper.readValue(json, MessageDTO.class);SseEmitter sseEmitter = emitterMap.get(messageDTO.getQuestionId());if(sseEmitter != null){sseEmitter.send(messageDTO);}if(messageDTO.getEnd() != null && messageDTO.getEnd()){sseEmitter.complete();emitterMap.remove(messageDTO.getQuestionId());}// 手动签收channel.basicAck(deliveryTag, false);} catch (IOException e) {e.printStackTrace();// 拒绝签收,消息重新入队try {channel.basicReject(deliveryTag, true);} catch (IOException ioException) {ioException.printStackTrace();}}}@Overridepublic void ask(String questionId, String question) {MessageDTO message1 = new MessageDTO();message1.setQuestionId(questionId);message1.setQuestion(question);message1.setAnswer("您好,这个");message1.setEnd(false);this.sendMessage(message1);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}MessageDTO message2 = new MessageDTO();message2.setQuestionId(questionId);message2.setQuestion(question);message2.setAnswer("您好,这个是答案");message2.setEnd(false);this.sendMessage(message2);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}MessageDTO message3 = new MessageDTO();message3.setQuestionId(questionId);message3.setQuestion(question);message3.setAnswer("您好,这个是答案,请问是否能解决你的问题");message3.setEnd(true);this.sendMessage(message3);}public void sendMessage(MessageDTO message){ObjectMapper mapper = new ObjectMapper();String json = null;try {json = mapper.writeValueAsString(message);rabbitTemplate.convertAndSend("work_exchange", "work_routing_key", json);} catch (Exception e) {e.printStackTrace();}}
}
- 7.service接口
package com.learning.service;/*** @Author wangyouhui* @Description **/
public interface QuestionService {void ask(String questionId, String question);
}
- 8.应用类
package com.learning;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @Author wangyouhui* @Description**/
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}
相关文章:
【rabbitmq】实现问答消息消费示例
目录 1. 说明2. 截图2.1 接口调用截图2.2 项目结构截图 3. 代码示例 1. 说明 1.实现的是一个简单的sse接口,单向的长连接,后端可以向前端不断输出数据。2.通过调用sse接口,触发rabbitmq向队列塞消息,向前端返回一个sseEmitter对象…...
单片机_RTOS__架构概念
经典单片机程序 void main() {while(1){函数1();函数2();}} 有无RTOS区别 裸机 RTOS RTOS程序 喂饭() {while(1){喂一口饭();} } …...
ClickHouse在百度MEG数据中台的落地和优化
导读 百度MEG上一代大数据产品存在平台分散、质量不均和易用性差等问题,导致开发效率低下、学习成本高,业务需求响应迟缓。为了解决这些问题,百度MEG内部开发了图灵3.0生态系统,包括Turing Data Engine(TDE)计算引擎、Turing Dat…...
B/S架构(Browser/Server)与C/S架构(Client/Server)
基本概念 B/S架构(Browser/Server):即浏览器/服务器架构。在这种架构中,用户通过浏览器(如Chrome、Firefox、Safari等)访问服务器上的应用程序。服务器端负责处理业务逻辑、存储数据等核心功能,…...
idea中自定义注释模板语法
文章目录 idea 自定义模板语法1.自定义模板语法是什么?2.如何在idea中设置呢? idea 自定义模板语法 1.自定义模板语法是什么? 打开我的idea,创建一个测试类: 这里看到我的 test 测试类里面会有注释,这是怎…...
基于SSM的儿童教育网站【附源码】
基于SpringBoot的课程作业管理系统(源码L文说明文档) 目录 4 系统设计 4.1 系统概述 4.2 系统模块设计 4.3.3 数据库表设计 5 系统实现 5.1 管理员功能模块的实现 5.1.1 视频列表 5.1.2 文章信息管理 5.1.3 文章类…...
深挖自闭症病因与孩子表现的关联
自闭症,亦称为孤独症,乃是一种对儿童发展有着严重影响的神经发育障碍性疾病。深入探寻自闭症的病因与孩子表现之间的联系,对于更深刻地理解并助力自闭症儿童而言,可谓至关重要。 当前,自闭症的病因尚未完全明晰&#x…...
[网络协议篇] UDP协议
文章目录 1. 简介2. 特点3. UDP数据报结构4. 基于UDP的应用层协议5. UDP安全性问题6. 使用udp传输数据的系统就一定不可靠吗?7. 基于UDP的主机探活 python实现 1. 简介 User Datagram Protocol,用户数据报协议,基于IP协议提供面向无连接的网…...
关系型数据库(1)----MySQL(初阶)
目录 1.mysql 2.mysqld 3.mysql架构 1.连接层 2.核心服务层 3.存储引擎层 4.数据存储层 4.SQL分类 5.MySQL操作库 6.MySQL数据类型 1. 数值类型 2. 日期和时间类型 3. 字符串类型 4. 空间类型 5. JSON数据类型 7.MySQL表的约束 1. 主键约束(PRIMARY…...
计算机毕业设计Python+大模型租房推荐系统 租房大屏可视化 租房爬虫 hadoop spark 58同城租房爬虫 房源推荐系统
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 用到的技术: 1. python…...
深度学习技术演进:从 CNN、RNN 到 Transformer 的发展与原理解析
深度学习的技术演进经历了从卷积神经网络(CNN)到循环神经网络(RNN)再到 Transformer 的重要发展。这三个架构分别擅长处理图像、序列数据和多种任务的特征,标志着深度学习在不同领域取得的进步。 1. 卷积神经网络&…...
Lua中的goto语句
软考鸭微信小程序 过软考,来软考鸭! 提供软考免费软考讲解视频、题库、软考试题、软考模考、软考查分、软考咨询等服务 在Lua编程语言中,goto语句是一种跳转语句,用于将程序的执行流程无条件地转移到程序中的另一个位置。这个位置由一个标签(…...
【rust实战】rust博客系统2_使用wrap启动rust项目服务
如何创建一个使用warp框架的rust项目1.使用cargo 创建项目 cargo new blog 2.添加warp依赖 1.cd blog 2.编辑Cargo.toml文件 添加warp 和 tokio 作为依赖项 在[dependencies]中添加 [package] name "blog" version "0.1.0" …...
【实战案例】Django框架使用模板渲染视图页面及异常处理
本文基于之前内容列表如下: 【图文指引】5分钟搭建Django轻量级框架服务 【实战案例】Django框架基础之上编写第一个Django应用之基本请求和响应 【实战案例】Django框架连接并操作数据库MySQL相关API 视图概述 Django中的视图的概念是一类具有相同功能和模板的网…...
设置K8s管理节点异常容忍时间
说明 每个节点上的 kubelet 需要定时向 apiserver 上报当前节点状态,如果两者间网络异常导致心跳终端,kube-controller-manager 中的 NodeController 会将该节点标记为 Unknown 或 Unhealthy,持续一段时间异常状态后 kube-controller-manage…...
什么样的JSON编辑器才好用
简介 JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,易于人阅读和编写,同时也便于机器解析和生成。随着互联网和应用程序的快速发展,JSON已经成为数据传输和存储的主要格式之一。在处理和编辑JSON数据…...
ArkUI自定义TabBar组件
在ArkUI中的Tabs,通过页签进行内容视图切换的容器组件,每个页签对应一个内容视图。其中内容是图TabContent作为Tabs的自组件,通过给TabContent设置tabBar属性来自定义导航栏样式。现在我们就根据UI设计的效果图来实现下图效果: 根…...
pair类型应用举例
在main.cpp里输入程序如下: #include <iostream> //使能cin(),cout(); #include <utility> //使能pair数据类型; #include <string> //使能string字符串; #include <stdlib.h> //使能exit(); //pair类型可以将两个相同的或不同类…...
数字 图像处理算法的形式
一 基本功能形式 按图像处理的输出形式,图像处理的基本功能可分为三种形式。 1)单幅图像 单幅图像 2)多幅图像 单幅图像 3)单(或多)幅图像 数字或符号等 二 几种具体算法形式 1.局部处理邻域对于任一…...
安徽对口高考Python试题选:输入一个正整数,然后输出该整数的3的幂数相加形式。
第一步:求出3的最高次幂是多少 guoint(input("请输入一个正整数:")) iguo a0 while i>0: if 3**i<guo: ai break ii-1print(a)#此语句为了看懂题目,题目中不需要打印出最高幂数 第二步…...
SpringBoot-17-MyBatis动态SQL标签之常用标签
文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...
C++实现分布式网络通信框架RPC(3)--rpc调用端
目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中,我们已经大致实现了rpc服务端的各项功能代…...
ESP32读取DHT11温湿度数据
芯片:ESP32 环境:Arduino 一、安装DHT11传感器库 红框的库,别安装错了 二、代码 注意,DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...
django filter 统计数量 按属性去重
在Django中,如果你想要根据某个属性对查询集进行去重并统计数量,你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求: 方法1:使用annotate()和Count 假设你有一个模型Item,并且你想…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...
Typeerror: cannot read properties of undefined (reading ‘XXX‘)
最近需要在离线机器上运行软件,所以得把软件用docker打包起来,大部分功能都没问题,出了一个奇怪的事情。同样的代码,在本机上用vscode可以运行起来,但是打包之后在docker里出现了问题。使用的是dialog组件,…...
关键领域软件测试的突围之路:如何破解安全与效率的平衡难题
在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件,这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下,实现高效测试与快速迭代?这一命题正考验着…...
音视频——I2S 协议详解
I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议,专门用于在数字音频设备之间传输数字音频数据。它由飞利浦(Philips)公司开发,以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...
CSS | transition 和 transform的用处和区别
省流总结: transform用于变换/变形,transition是动画控制器 transform 用来对元素进行变形,常见的操作如下,它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...
【 java 虚拟机知识 第一篇 】
目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...
