当前位置: 首页 > news >正文

RabbitMQ的部分模式

1发布订阅模式

发送者

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQfactory.setHost("192.168.74.75");Connection connection = null;Channel channel = null;try {connection = factory.newConnection();// 创建一个通道channel = connection.createChannel();//创建交换机channel.exchangeDeclare("qy172-fanout-exchange", BuiltinExchangeType.FANOUT, true);//创建队列,如果存在则不会创建channel.queueDeclare("qy172-publish-queue01", true, false, false, null);channel.queueDeclare("qy172-publish-queue02", true, false, false, null);//交互机和队列绑定channel.queueBind("qy172-publish-queue01", "qy172-fanout-exchange", "");channel.queueBind("qy172-publish-queue02", "qy172-fanout-exchange", "");// 创建消息内容HashMap<String, Object> map = new HashMap<>();map.put("name", "张三");map.put("age", "22");
//把数据给交换机,让他分发给队列channel.basicPublish("qy172-fanout-exchange", "", null, JSON.toJSONBytes(map));System.out.println("发送成功");} catch (IOException e) {// 发生 IO 异常时抛出运行时异常throw new RuntimeException(e);} catch (TimeoutException e) {// 发生超时异常时抛出运行时异常throw new RuntimeException(e);} finally {if (channel != null) {try {// 关闭通道channel.close();} catch (IOException | TimeoutException e) {// 发生 IO 或超时异常时抛出运行时异常throw new RuntimeException(e);}}if (connection != null) {try {// 关闭连接connection.close();} catch (IOException e) {// 发生 IO 异常时抛出运行时异常throw new RuntimeException(e);}}}}
}

2订阅个订阅者

订阅者1

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {public static void main(String[] args) throws Exception {// 创建连接工厂对象ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"factory.setHost("192.168.74.75");Connection connection = factory.newConnection();// 创建一个 RabbitMQ 连接Channel channel = connection.createChannel();// 创建一个通道,用于与 RabbitMQ 之间的通信com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {// 创建一个消费者对象,并重写其方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费消息的处理方法String json = new String(body);// 将消息内容转换为字符串Map map = JSON.parseObject(json, Map.class);// 使用 JSON 解析成 Map 对象System.out.println("消息内容Consumer01"+map);// 输出消息内容}};channel.basicConsume("qy172-publish-queue01",true,consumer);}
}

订阅者2

package com.aaa;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer02 {public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.74.75");try {Connection connection = factory.newConnection();Channel channel = connection.createChannel();com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String json = new String(body);Map map = JSON.parseObject(json, Map.class);System.out.println("消息内容Consumer02" + map);}};//订阅者2channel.basicConsume("qy172-publish-queue02",true,consumer);} catch (IOException | TimeoutException e) {// 处理连接、通道创建或消费消息时可能抛出的异常e.printStackTrace();}}
}

2路由模式

发送者

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQfactory.setHost("192.168.74.75");Connection connection = null;Channel channel = null;try {connection = factory.newConnection();// 创建一个通道channel = connection.createChannel();//创建交换机,channel.exchangeDeclare("qy172-router-exchange", BuiltinExchangeType.DIRECT, true);//创建队列,如果存在则不会创建channel.queueDeclare("qy172-router-queue01", true, false, false, null);channel.queueDeclare("qy172-router-queue02", true, false, false, null);//交互机和队列绑定channel.queueBind("qy172-router-queue01", "qy172-router-exchange", "error");channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "error");channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "info");channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "warning");// 创建消息内容HashMap<String, Object> map = new HashMap<>();map.put("name", "张三");map.put("age", "22");//把数据给交换机,让他分发给队列channel.basicPublish("qy172-router-exchange","error",null,JSON.toJSONBytes(map));
//            channel.basicPublish("qy172-router-exchange","info",null,JSON.toJSONBytes(map));System.out.println("发送成功");} catch (IOException e) {// 发生 IO 异常时抛出运行时异常throw new RuntimeException(e);} catch (TimeoutException e) {// 发生超时异常时抛出运行时异常throw new RuntimeException(e);} finally {if (channel != null) {try {// 关闭通道channel.close();} catch (IOException | TimeoutException e) {// 发生 IO 或超时异常时抛出运行时异常throw new RuntimeException(e);}}if (connection != null) {try {// 关闭连接connection.close();} catch (IOException e) {// 发生 IO 异常时抛出运行时异常throw new RuntimeException(e);}}}}
}

接收者1

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {public static void main(String[] args) throws Exception {// 创建连接工厂对象ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"factory.setHost("192.168.74.75");Connection connection = factory.newConnection();// 创建一个 RabbitMQ 连接Channel channel = connection.createChannel();// 创建一个通道,用于与 RabbitMQ 之间的通信com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {// 创建一个消费者对象,并重写其方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费消息的处理方法String json = new String(body);// 将消息内容转换为字符串Map map = JSON.parseObject(json, Map.class);// 使用 JSON 解析成 Map 对象System.out.println("消息内容Consumer01"+map);// 输出消息内容}};channel.basicConsume("qy172-router-queue01",true,consumer);}
}

接收者2

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {public static void main(String[] args) throws Exception {// 创建连接工厂对象ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"factory.setHost("192.168.74.75");Connection connection = factory.newConnection();// 创建一个 RabbitMQ 连接Channel channel = connection.createChannel();// 创建一个通道,用于与 RabbitMQ 之间的通信com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {// 创建一个消费者对象,并重写其方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费消息的处理方法String json = new String(body);// 将消息内容转换为字符串Map map = JSON.parseObject(json, Map.class);// 使用 JSON 解析成 Map 对象System.out.println("消息内容Consumer01"+map);// 输出消息内容}};channel.basicConsume("qy172-router-queue01",true,consumer);}
}

3主题模式

发送者

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQfactory.setHost("192.168.74.75");Connection connection = null;Channel channel = null;try {connection = factory.newConnection();// 创建一个通道channel = connection.createChannel();//创建交换机,channel.exchangeDeclare("qy172-topic-exchange", BuiltinExchangeType.TOPIC, true);//创建队列,如果存在则不会创建channel.queueDeclare("qy172-topic-queue01", true, false, false, null);channel.queueDeclare("qy172-topic-queue02", true, false, false, null);//交互机和队列绑定//主题匹配给这个channel.queueBind("qy172-topic-queue01", "qy172-topic-exchange", "*.orange.*");//主题,也匹配给这个channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "*.*.rabbit");channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "lazy.#");// 创建消息内容HashMap<String, Object> map = new HashMap<>();map.put("name", "张三");map.put("age", "22");//把数据给交换机,让他分发给队列channel.basicPublish("qy172-topic-exchange","lazy.orange.rabbit",null,JSON.toJSONBytes(map));System.out.println("发送成功");} catch (IOException e) {// 发生 IO 异常时抛出运行时异常throw new RuntimeException(e);} catch (TimeoutException e) {// 发生超时异常时抛出运行时异常throw new RuntimeException(e);} finally {if (channel != null) {try {// 关闭通道channel.close();} catch (IOException | TimeoutException e) {// 发生 IO 或超时异常时抛出运行时异常throw new RuntimeException(e);}}if (connection != null) {try {// 关闭连接connection.close();} catch (IOException e) {// 发生 IO 异常时抛出运行时异常throw new RuntimeException(e);}}}}
}

接收者1

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {public static void main(String[] args) throws Exception {// 创建连接工厂对象ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"factory.setHost("192.168.74.75");Connection connection = factory.newConnection();// 创建一个 RabbitMQ 连接Channel channel = connection.createChannel();// 创建一个通道,用于与 RabbitMQ 之间的通信com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {// 创建一个消费者对象,并重写其方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费消息的处理方法String json = new String(body);// 将消息内容转换为字符串Map map = JSON.parseObject(json, Map.class);// 使用 JSON 解析成 Map 对象System.out.println("消息内容Consumer01"+map);// 输出消息内容}};channel.basicConsume("qy172-topic-queue01",true,consumer);}
}

接收者2

package com.aaa;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer02 {public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.74.75");try {Connection connection = factory.newConnection();Channel channel = connection.createChannel();com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String json = new String(body);Map map = JSON.parseObject(json, Map.class);System.out.println("消息内容Consumer02" + map);}};//订阅者2channel.basicConsume("qy172-topic-queue02",true,consumer);} catch (IOException | TimeoutException e) {// 处理连接、通道创建或消费消息时可能抛出的异常e.printStackTrace();}}
}

相关文章:

RabbitMQ的部分模式

1发布订阅模式 发送者 package org.example; import com.alibaba.fastjson.JSON; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import ja…...

提取单选框的值,并通过ajax传值到后台

<!DOCTYPE html> <html lang"zh" xmlns:th"http://www.thymeleaf.org" xmlns:shiro"http://www.pollix.at/thymeleaf/shiro"> <head><th:block th:include"include :: header(日库存更新提示)" /> </head&…...

Django创建多app应用

目录 1. 引言 2. 多app创建的两种方式 2.1 多个app结构 2.2 单个apps多个app 3. 最后 1. 引言 在平常业务开发中&#xff0c;我们遇到的功能可能会有很多&#xff0c;单个app的应用可能无法满足我们 这个时候&#xff0c;我们就需要多app应用&#xff0c;例如&#xff1a…...

如何反反爬虫

我们来讲最常见的反反爬虫方法 import requests r requests.get(网页网址) print(r.requests.headers) 一.使用简单的方法把请求头改为真的浏览器模式 import requests link网页地址 heraders{User-Agent:} rrequests.get(link,headersheaders) print(r.requsts.headers)我们…...

wireshark抓包之DNS协议

DNS协议 DNS协议的主要作用是将域名解析为对应的IP地址。当我们在浏览器中输入一个网址时&#xff0c;计算机需要通过DNS协议来查找该网址对应的IP地址&#xff0c;以便能够建立连接并访问目标资源。 DNS协议的工作流程大致如下&#xff1a; 用户的计算机或设备&#xff08;充…...

升级到 Java 21 是值得的

升级到 Java 21 是值得的 又到了一年中的这个时候——New Relic 的年度“State of the Java Ecosystem”调查结果出来了&#xff0c;我一如既往地深入研究了它。虽然我认为该报告做得很好并且提出了很好的问题&#xff0c;但我对有多少 Java 开发人员正在使用低版本感到沮丧。…...

C# 多线程

文章目录 C# 多线程进程与线程无参数的子线程带参数的子线程运行结果 销毁线程 Abort()运行结果 ThreadPool和Task运行结果 异步与同步运行结果 lock单线程运行结果 多线程运行结果 使用lock运行结果 C# 多线程 进程与线程 进程&#xff1a;进程就是一个应用程序&#xff0c;…...

快速安装sudachipy日语包

1、前往 https://rustup.rs 下载并安装 Rustup Linux系统可直接运行以下命令 Window系统需要去网站下载exe包 curl --proto https --tlsv1.2 -sSf https://sh.rustup.rs | sh2、安装 Rust 编译器 rustup install stable3、设置默认版本 rustup default stable4、重新安装 …...

蓝桥杯刷题day13——乘飞机【算法赛】

一、问题描述 等待登机的你看着眼前有老有小长长的队伍十分无聊&#xff0c;你突然想要知道&#xff0c;是否存在两个年龄相仿的乘客。每个乘客的年龄用一个 0 到 36500 的整数表示&#xff0c;两个乘客的年龄相差 365 以内就认为是相仿的。 具体来说&#xff0c;你有一个长度…...

大模型量化技术-BitsAndBytes

Transformers 量化技术 BitsAndBytes bitsandbytes是将模型量化为8位和4位的最简单选择。 8位量化将fp16中的异常值与int8中的非异常值相乘,将非异常值转换回fp16,然后将它们相加以返回fp16中的权重。这减少了异常值对模型性能产生的降级效果。4位量化进一步压缩了模型,并且…...

EasyExcel 复杂表头的导出(动态表头和静态表头)

问题&#xff1a;如图&#xff0c;1部分的表头是动态的根据日期变化&#xff0c;2部分是数据库对应的字段&#xff0c;静态不变的&#xff1b; 解决方案&#xff1a;如果不看1的部分&#xff0c;2部分内容可以根据实体类注解的方式导出&#xff0c;那么我们是不是可以先将动态表…...

centos7 fatal error: curl/curl.h: No such file or directory

若编译遇到此问题&#xff0c;可以查看环境是否libcurl库 yum list installed | grep libcurl 发现未安装libcurl库 执行libcurl库的安装命令&#xff1a; 1.对于Debian/Ubuntu系统&#xff1a; sudo apt-get install libcurl4-openssl-dev 2.对于RHEL/CentOS系统&#xf…...

【Linux】自定义协议+序列化+反序列化

自定义协议序列化反序列化 1.再谈 "协议"2.Cal TCP服务端2.Cal TCP客户端4.Json 喜欢的点赞&#xff0c;收藏&#xff0c;关注一下把&#xff01; 1.再谈 “协议” 协议是一种 “约定”。在前面我们说过父亲和儿子约定打电话的例子&#xff0c;不过这是感性的认识&a…...

常见故障排查和优化

一、MySQL单实例故障排查 故障现象 1 ERROR 2002 (HY000): Cant connect to local MySQL server through socket /data/mysql/mysql.sock (2) 问题分析&#xff1a;以上情况一般都是数据库未启动或者数据库端口被防火墙拦截导致。 解决方法&#xff1a;启动数据库或者防火墙…...

选择华为HCIE培训机构有哪些注意事项

选择软件培训机构注意四点事项1、口碑&#xff1a;学员和社会人士对该机构的评价怎样&#xff1f; 口碑对于一个机构是十分重要的&#xff0c;这也是考量一个机构好不好的重要标准&#xff0c;包括社会评价和学员的评价和感言。誉天作为华为首批授权培训中心&#xff0c;一直致…...

python怎么处理txt

导入文件处理模块 import os 检测路径是否存在&#xff0c;存在则返回True&#xff0c;不存在则返回False os.path.exists("demo.txt") 如果你要创建一个文件并要写入内容 #如果demo.txt文件存在则会覆盖&#xff0c;并且demo.txt文件里面的内容被清空&#xff0c;如…...

SAMRTFORMS 转换PDF 发送邮件

最终成果&#xff1a; *&---------------------------------------------------------------------**& Report ZLC_FIND_EXIT*&---------------------------------------------------------------------**&根据T-CODE / 程序名查询出口、BADI增强*&-------…...

探讨在大数据体系中API的通信机制与工作原理

** 引言 关联阅读博客文章&#xff1a;深入解析大数据体系中的ETL工作原理及常见组件 关联阅读博客文章&#xff1a;深入理解HDFS工作原理&#xff1a;大数据存储和容错性机制解析 ** 在当今数字化时代&#xff0c;数据已经成为企业发展和决策的核心。随着数据规模的不断增长…...

算法打卡day23

今日任务&#xff1a; 1&#xff09;39. 组合总和 2&#xff09;40.组合总和II 3&#xff09;131.分割回文串 39. 组合总和 题目链接&#xff1a;39. 组合总和 - 力扣&#xff08;LeetCode&#xff09; 给定一个无重复元素的数组 candidates 和一个目标数 target &#xff0c;…...

每天五分钟深度学习:神经网络和深度学习有什么样的关系?

本文重点 神经网络是一种模拟人脑神经元连接方式的计算模型&#xff0c;通过大量神经元之间的连接和权重调整&#xff0c;实现对输入数据的处理和分析。而深度学习则是神经网络的一种特殊形式&#xff0c;它通过构建深层次的神经网络结构&#xff0c;实现对复杂数据的深度学习…...

深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法

深入浅出&#xff1a;JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中&#xff0c;随机数的生成看似简单&#xff0c;却隐藏着许多玄机。无论是生成密码、加密密钥&#xff0c;还是创建安全令牌&#xff0c;随机数的质量直接关系到系统的安全性。Jav…...

前端导出带有合并单元格的列表

// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

连锁超市冷库节能解决方案:如何实现超市降本增效

在连锁超市冷库运营中&#xff0c;高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术&#xff0c;实现年省电费15%-60%&#xff0c;且不改动原有装备、安装快捷、…...

生成 Git SSH 证书

&#x1f511; 1. ​​生成 SSH 密钥对​​ 在终端&#xff08;Windows 使用 Git Bash&#xff0c;Mac/Linux 使用 Terminal&#xff09;执行命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​&#xff1a; -t rsa&#x…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

Mysql8 忘记密码重置,以及问题解决

1.使用免密登录 找到配置MySQL文件&#xff0c;我的文件路径是/etc/mysql/my.cnf&#xff0c;有的人的是/etc/mysql/mysql.cnf 在里最后加入 skip-grant-tables重启MySQL服务 service mysql restartShutting down MySQL… SUCCESS! Starting MySQL… SUCCESS! 重启成功 2.登…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

NPOI操作EXCEL文件 ——CAD C# 二次开发

缺点:dll.版本容易加载错误。CAD加载插件时&#xff0c;没有加载所有类库。插件运行过程中用到某个类库&#xff0c;会从CAD的安装目录找&#xff0c;找不到就报错了。 【方案2】让CAD在加载过程中把类库加载到内存 【方案3】是发现缺少了哪个库&#xff0c;就用插件程序加载进…...

Vite中定义@软链接

在webpack中可以直接通过符号表示src路径&#xff0c;但是vite中默认不可以。 如何实现&#xff1a; vite中提供了resolve.alias&#xff1a;通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...

在 Spring Boot 项目里,MYSQL中json类型字段使用

前言&#xff1a; 因为程序特殊需求导致&#xff0c;需要mysql数据库存储json类型数据&#xff0c;因此记录一下使用流程 1.java实体中新增字段 private List<User> users 2.增加mybatis-plus注解 TableField(typeHandler FastjsonTypeHandler.class) private Lis…...