【rabbitmq】实现问答消息消费示例
目录
- 1. 说明
- 2. 截图
- 2.1 接口调用截图
- 2.2 项目结构截图
- 3. 代码示例
1. 说明
- 1.实现的是一个简单的sse接口,单向的长连接,后端可以向前端不断输出数据。
- 2.通过调用sse接口,触发rabbitmq向队列塞消息,向前端返回一个sseEmitter对象。
- 3.rabbitmq监听队列消息,消费消息后,向sseEmitter对象写入内容。
- 4.当业务逻辑结束,调用emitter.complete()方法,结束此次会话。
- 5.这里举一个问答的示例,采用的是work模式,逻辑比较简单,仅供参考。
2. 截图
2.1 接口调用截图
2.2 项目结构截图
3. 代码示例
- 1.pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.4</version></parent><groupId>com.learning</groupId><artifactId>springboot</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.5.4</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.12.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><!--打jar包使用--><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
- 2.application.yaml
spring:# rabbbitmq配置信息rabbitmq:host: 192.168.2.11port: 5672username: adminpassword: adminvirtual-host: /
- 3.rabbitmq配置类
package com.learning.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** rabbitmq配置类*/
@Configuration
public class RabbitMQConfig{/*** 存sseEmitter*/@Bean("emitterMap")public ConcurrentMap<String, SseEmitter> emitterMap(){ConcurrentMap<String, SseEmitter> emitters = new ConcurrentHashMap<>();return emitters;}/*** 工作模式,交换机名*/public static final String EXCHANGE_NAME = "work_exchange";/*** 工作模式,队列名*/public static final String QUEUE_NAME = "work_queue"; @Bean("work_queue")public Queue queue() { return new Queue(QUEUE_NAME, true); } @Bean("work_exchange")public Exchange exchange() {return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();} @Bean public Binding binding(@Qualifier("work_queue") Queue queue,@Qualifier("work_exchange") Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("work_routing_key").noargs();}
}
- 4.controller类
package com.learning.controller;import com.learning.service.QuestionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;/*** @Author wangyouhui* @Description 获取答案**/
@RestController
@RequestMapping("/question")
public class QuestionController {@Autowiredprivate QuestionService questionService;@Autowiredprivate ConcurrentMap<String, SseEmitter> emitterMap;@PostMapping(value="/ask", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter ask(@RequestParam String question) {String questionId = UUID.randomUUID().toString();SseEmitter emitter = new SseEmitter();emitterMap.put(questionId, emitter);questionService.ask(questionId, question);return emitter;}
}
- 5.消息实体
package com.learning.dto;import lombok.Data;import java.io.Serializable;/*** @Author wangyouhui* @Description 消息**/
@Data
public class MessageDTO implements Serializable {private String questionId;private String question;private String answer;private Boolean end;
}
- 6.service实现类
package com.learning.service.impl;import com.fasterxml.jackson.databind.ObjectMapper;
import com.learning.config.RabbitMQConfig;
import com.learning.dto.MessageDTO;
import com.learning.service.QuestionService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.concurrent.ConcurrentMap;/*** @Author wangyouhui* @Description**/
@Service
public class QuestionServiceImpl implements QuestionService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate ConcurrentMap<String, SseEmitter> emitterMap;@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, ackMode = "MANUAL")public void receiveMessage(Message message, Channel channel) {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {String json = new String(message.getBody());ObjectMapper mapper = new ObjectMapper();MessageDTO messageDTO = mapper.readValue(json, MessageDTO.class);SseEmitter sseEmitter = emitterMap.get(messageDTO.getQuestionId());if(sseEmitter != null){sseEmitter.send(messageDTO);}if(messageDTO.getEnd() != null && messageDTO.getEnd()){sseEmitter.complete();emitterMap.remove(messageDTO.getQuestionId());}// 手动签收channel.basicAck(deliveryTag, false);} catch (IOException e) {e.printStackTrace();// 拒绝签收,消息重新入队try {channel.basicReject(deliveryTag, true);} catch (IOException ioException) {ioException.printStackTrace();}}}@Overridepublic void ask(String questionId, String question) {MessageDTO message1 = new MessageDTO();message1.setQuestionId(questionId);message1.setQuestion(question);message1.setAnswer("您好,这个");message1.setEnd(false);this.sendMessage(message1);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}MessageDTO message2 = new MessageDTO();message2.setQuestionId(questionId);message2.setQuestion(question);message2.setAnswer("您好,这个是答案");message2.setEnd(false);this.sendMessage(message2);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}MessageDTO message3 = new MessageDTO();message3.setQuestionId(questionId);message3.setQuestion(question);message3.setAnswer("您好,这个是答案,请问是否能解决你的问题");message3.setEnd(true);this.sendMessage(message3);}public void sendMessage(MessageDTO message){ObjectMapper mapper = new ObjectMapper();String json = null;try {json = mapper.writeValueAsString(message);rabbitTemplate.convertAndSend("work_exchange", "work_routing_key", json);} catch (Exception e) {e.printStackTrace();}}
}
- 7.service接口
package com.learning.service;/*** @Author wangyouhui* @Description **/
public interface QuestionService {void ask(String questionId, String question);
}
- 8.应用类
package com.learning;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @Author wangyouhui* @Description**/
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}
相关文章:

【rabbitmq】实现问答消息消费示例
目录 1. 说明2. 截图2.1 接口调用截图2.2 项目结构截图 3. 代码示例 1. 说明 1.实现的是一个简单的sse接口,单向的长连接,后端可以向前端不断输出数据。2.通过调用sse接口,触发rabbitmq向队列塞消息,向前端返回一个sseEmitter对象…...

单片机_RTOS__架构概念
经典单片机程序 void main() {while(1){函数1();函数2();}} 有无RTOS区别 裸机 RTOS RTOS程序 喂饭() {while(1){喂一口饭();} } …...

ClickHouse在百度MEG数据中台的落地和优化
导读 百度MEG上一代大数据产品存在平台分散、质量不均和易用性差等问题,导致开发效率低下、学习成本高,业务需求响应迟缓。为了解决这些问题,百度MEG内部开发了图灵3.0生态系统,包括Turing Data Engine(TDE)计算引擎、Turing Dat…...
B/S架构(Browser/Server)与C/S架构(Client/Server)
基本概念 B/S架构(Browser/Server):即浏览器/服务器架构。在这种架构中,用户通过浏览器(如Chrome、Firefox、Safari等)访问服务器上的应用程序。服务器端负责处理业务逻辑、存储数据等核心功能,…...

idea中自定义注释模板语法
文章目录 idea 自定义模板语法1.自定义模板语法是什么?2.如何在idea中设置呢? idea 自定义模板语法 1.自定义模板语法是什么? 打开我的idea,创建一个测试类: 这里看到我的 test 测试类里面会有注释,这是怎…...

基于SSM的儿童教育网站【附源码】
基于SpringBoot的课程作业管理系统(源码L文说明文档) 目录 4 系统设计 4.1 系统概述 4.2 系统模块设计 4.3.3 数据库表设计 5 系统实现 5.1 管理员功能模块的实现 5.1.1 视频列表 5.1.2 文章信息管理 5.1.3 文章类…...

深挖自闭症病因与孩子表现的关联
自闭症,亦称为孤独症,乃是一种对儿童发展有着严重影响的神经发育障碍性疾病。深入探寻自闭症的病因与孩子表现之间的联系,对于更深刻地理解并助力自闭症儿童而言,可谓至关重要。 当前,自闭症的病因尚未完全明晰&#x…...

[网络协议篇] UDP协议
文章目录 1. 简介2. 特点3. UDP数据报结构4. 基于UDP的应用层协议5. UDP安全性问题6. 使用udp传输数据的系统就一定不可靠吗?7. 基于UDP的主机探活 python实现 1. 简介 User Datagram Protocol,用户数据报协议,基于IP协议提供面向无连接的网…...
关系型数据库(1)----MySQL(初阶)
目录 1.mysql 2.mysqld 3.mysql架构 1.连接层 2.核心服务层 3.存储引擎层 4.数据存储层 4.SQL分类 5.MySQL操作库 6.MySQL数据类型 1. 数值类型 2. 日期和时间类型 3. 字符串类型 4. 空间类型 5. JSON数据类型 7.MySQL表的约束 1. 主键约束(PRIMARY…...

计算机毕业设计Python+大模型租房推荐系统 租房大屏可视化 租房爬虫 hadoop spark 58同城租房爬虫 房源推荐系统
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 用到的技术: 1. python…...

深度学习技术演进:从 CNN、RNN 到 Transformer 的发展与原理解析
深度学习的技术演进经历了从卷积神经网络(CNN)到循环神经网络(RNN)再到 Transformer 的重要发展。这三个架构分别擅长处理图像、序列数据和多种任务的特征,标志着深度学习在不同领域取得的进步。 1. 卷积神经网络&…...
Lua中的goto语句
软考鸭微信小程序 过软考,来软考鸭! 提供软考免费软考讲解视频、题库、软考试题、软考模考、软考查分、软考咨询等服务 在Lua编程语言中,goto语句是一种跳转语句,用于将程序的执行流程无条件地转移到程序中的另一个位置。这个位置由一个标签(…...
【rust实战】rust博客系统2_使用wrap启动rust项目服务
如何创建一个使用warp框架的rust项目1.使用cargo 创建项目 cargo new blog 2.添加warp依赖 1.cd blog 2.编辑Cargo.toml文件 添加warp 和 tokio 作为依赖项 在[dependencies]中添加 [package] name "blog" version "0.1.0" …...

【实战案例】Django框架使用模板渲染视图页面及异常处理
本文基于之前内容列表如下: 【图文指引】5分钟搭建Django轻量级框架服务 【实战案例】Django框架基础之上编写第一个Django应用之基本请求和响应 【实战案例】Django框架连接并操作数据库MySQL相关API 视图概述 Django中的视图的概念是一类具有相同功能和模板的网…...

设置K8s管理节点异常容忍时间
说明 每个节点上的 kubelet 需要定时向 apiserver 上报当前节点状态,如果两者间网络异常导致心跳终端,kube-controller-manager 中的 NodeController 会将该节点标记为 Unknown 或 Unhealthy,持续一段时间异常状态后 kube-controller-manage…...

什么样的JSON编辑器才好用
简介 JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,易于人阅读和编写,同时也便于机器解析和生成。随着互联网和应用程序的快速发展,JSON已经成为数据传输和存储的主要格式之一。在处理和编辑JSON数据…...

ArkUI自定义TabBar组件
在ArkUI中的Tabs,通过页签进行内容视图切换的容器组件,每个页签对应一个内容视图。其中内容是图TabContent作为Tabs的自组件,通过给TabContent设置tabBar属性来自定义导航栏样式。现在我们就根据UI设计的效果图来实现下图效果: 根…...

pair类型应用举例
在main.cpp里输入程序如下: #include <iostream> //使能cin(),cout(); #include <utility> //使能pair数据类型; #include <string> //使能string字符串; #include <stdlib.h> //使能exit(); //pair类型可以将两个相同的或不同类…...

数字 图像处理算法的形式
一 基本功能形式 按图像处理的输出形式,图像处理的基本功能可分为三种形式。 1)单幅图像 单幅图像 2)多幅图像 单幅图像 3)单(或多)幅图像 数字或符号等 二 几种具体算法形式 1.局部处理邻域对于任一…...

安徽对口高考Python试题选:输入一个正整数,然后输出该整数的3的幂数相加形式。
第一步:求出3的最高次幂是多少 guoint(input("请输入一个正整数:")) iguo a0 while i>0: if 3**i<guo: ai break ii-1print(a)#此语句为了看懂题目,题目中不需要打印出最高幂数 第二步…...

云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地
借阿里云中企出海大会的东风,以**「云启出海,智联未来|打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办,现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...

新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案
随着新能源汽车的快速普及,充电桩作为核心配套设施,其安全性与可靠性备受关注。然而,在高温、高负荷运行环境下,充电桩的散热问题与消防安全隐患日益凸显,成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...
CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云
目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...
实现弹窗随键盘上移居中
实现弹窗随键盘上移的核心思路 在Android中,可以通过监听键盘的显示和隐藏事件,动态调整弹窗的位置。关键点在于获取键盘高度,并计算剩余屏幕空间以重新定位弹窗。 // 在Activity或Fragment中设置键盘监听 val rootView findViewById<V…...

C# 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
快刀集(1): 一刀斩断视频片头广告
一刀流:用一个简单脚本,秒杀视频片头广告,还你清爽观影体验。 1. 引子 作为一个爱生活、爱学习、爱收藏高清资源的老码农,平时写代码之余看看电影、补补片,是再正常不过的事。 电影嘛,要沉浸,…...

【从零开始学习JVM | 第四篇】类加载器和双亲委派机制(高频面试题)
前言: 双亲委派机制对于面试这块来说非常重要,在实际开发中也是经常遇见需要打破双亲委派的需求,今天我们一起来探索一下什么是双亲委派机制,在此之前我们先介绍一下类的加载器。 目录 编辑 前言: 类加载器 1. …...

ZYNQ学习记录FPGA(二)Verilog语言
一、Verilog简介 1.1 HDL(Hardware Description language) 在解释HDL之前,先来了解一下数字系统设计的流程:逻辑设计 -> 电路实现 -> 系统验证。 逻辑设计又称前端,在这个过程中就需要用到HDL,正文…...

【工具教程】多个条形码识别用条码内容对图片重命名,批量PDF条形码识别后用条码内容批量改名,使用教程及注意事项
一、条形码识别改名使用教程 打开软件并选择处理模式:打开软件后,根据要处理的文件类型,选择 “图片识别模式” 或 “PDF 识别模式”。如果是处理包含条形码的 PDF 文件,就选择 “PDF 识别模式”;若是处理图片文件&…...
GB/T 43887-2024 核级柔性石墨板材检测
核级柔性石墨板材是指以可膨胀石墨为原料、未经改性和增强、用于核工业的核级柔性石墨板材。 GB/T 43887-2024核级柔性石墨板材检测检测指标: 测试项目 测试标准 外观 GB/T 43887 尺寸偏差 GB/T 43887 化学成分 GB/T 43887 密度偏差 GB/T 43887 拉伸强度…...