当前位置: 首页 > news >正文

RabbitMQ的部分模式

1发布订阅模式

发送者

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQfactory.setHost("192.168.74.75");Connection connection = null;Channel channel = null;try {connection = factory.newConnection();// 创建一个通道channel = connection.createChannel();//创建交换机channel.exchangeDeclare("qy172-fanout-exchange", BuiltinExchangeType.FANOUT, true);//创建队列,如果存在则不会创建channel.queueDeclare("qy172-publish-queue01", true, false, false, null);channel.queueDeclare("qy172-publish-queue02", true, false, false, null);//交互机和队列绑定channel.queueBind("qy172-publish-queue01", "qy172-fanout-exchange", "");channel.queueBind("qy172-publish-queue02", "qy172-fanout-exchange", "");// 创建消息内容HashMap<String, Object> map = new HashMap<>();map.put("name", "张三");map.put("age", "22");
//把数据给交换机,让他分发给队列channel.basicPublish("qy172-fanout-exchange", "", null, JSON.toJSONBytes(map));System.out.println("发送成功");} catch (IOException e) {// 发生 IO 异常时抛出运行时异常throw new RuntimeException(e);} catch (TimeoutException e) {// 发生超时异常时抛出运行时异常throw new RuntimeException(e);} finally {if (channel != null) {try {// 关闭通道channel.close();} catch (IOException | TimeoutException e) {// 发生 IO 或超时异常时抛出运行时异常throw new RuntimeException(e);}}if (connection != null) {try {// 关闭连接connection.close();} catch (IOException e) {// 发生 IO 异常时抛出运行时异常throw new RuntimeException(e);}}}}
}

2订阅个订阅者

订阅者1

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {public static void main(String[] args) throws Exception {// 创建连接工厂对象ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"factory.setHost("192.168.74.75");Connection connection = factory.newConnection();// 创建一个 RabbitMQ 连接Channel channel = connection.createChannel();// 创建一个通道,用于与 RabbitMQ 之间的通信com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {// 创建一个消费者对象,并重写其方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费消息的处理方法String json = new String(body);// 将消息内容转换为字符串Map map = JSON.parseObject(json, Map.class);// 使用 JSON 解析成 Map 对象System.out.println("消息内容Consumer01"+map);// 输出消息内容}};channel.basicConsume("qy172-publish-queue01",true,consumer);}
}

订阅者2

package com.aaa;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer02 {public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.74.75");try {Connection connection = factory.newConnection();Channel channel = connection.createChannel();com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String json = new String(body);Map map = JSON.parseObject(json, Map.class);System.out.println("消息内容Consumer02" + map);}};//订阅者2channel.basicConsume("qy172-publish-queue02",true,consumer);} catch (IOException | TimeoutException e) {// 处理连接、通道创建或消费消息时可能抛出的异常e.printStackTrace();}}
}

2路由模式

发送者

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQfactory.setHost("192.168.74.75");Connection connection = null;Channel channel = null;try {connection = factory.newConnection();// 创建一个通道channel = connection.createChannel();//创建交换机,channel.exchangeDeclare("qy172-router-exchange", BuiltinExchangeType.DIRECT, true);//创建队列,如果存在则不会创建channel.queueDeclare("qy172-router-queue01", true, false, false, null);channel.queueDeclare("qy172-router-queue02", true, false, false, null);//交互机和队列绑定channel.queueBind("qy172-router-queue01", "qy172-router-exchange", "error");channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "error");channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "info");channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "warning");// 创建消息内容HashMap<String, Object> map = new HashMap<>();map.put("name", "张三");map.put("age", "22");//把数据给交换机,让他分发给队列channel.basicPublish("qy172-router-exchange","error",null,JSON.toJSONBytes(map));
//            channel.basicPublish("qy172-router-exchange","info",null,JSON.toJSONBytes(map));System.out.println("发送成功");} catch (IOException e) {// 发生 IO 异常时抛出运行时异常throw new RuntimeException(e);} catch (TimeoutException e) {// 发生超时异常时抛出运行时异常throw new RuntimeException(e);} finally {if (channel != null) {try {// 关闭通道channel.close();} catch (IOException | TimeoutException e) {// 发生 IO 或超时异常时抛出运行时异常throw new RuntimeException(e);}}if (connection != null) {try {// 关闭连接connection.close();} catch (IOException e) {// 发生 IO 异常时抛出运行时异常throw new RuntimeException(e);}}}}
}

接收者1

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {public static void main(String[] args) throws Exception {// 创建连接工厂对象ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"factory.setHost("192.168.74.75");Connection connection = factory.newConnection();// 创建一个 RabbitMQ 连接Channel channel = connection.createChannel();// 创建一个通道,用于与 RabbitMQ 之间的通信com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {// 创建一个消费者对象,并重写其方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费消息的处理方法String json = new String(body);// 将消息内容转换为字符串Map map = JSON.parseObject(json, Map.class);// 使用 JSON 解析成 Map 对象System.out.println("消息内容Consumer01"+map);// 输出消息内容}};channel.basicConsume("qy172-router-queue01",true,consumer);}
}

接收者2

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {public static void main(String[] args) throws Exception {// 创建连接工厂对象ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"factory.setHost("192.168.74.75");Connection connection = factory.newConnection();// 创建一个 RabbitMQ 连接Channel channel = connection.createChannel();// 创建一个通道,用于与 RabbitMQ 之间的通信com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {// 创建一个消费者对象,并重写其方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费消息的处理方法String json = new String(body);// 将消息内容转换为字符串Map map = JSON.parseObject(json, Map.class);// 使用 JSON 解析成 Map 对象System.out.println("消息内容Consumer01"+map);// 输出消息内容}};channel.basicConsume("qy172-router-queue01",true,consumer);}
}

3主题模式

发送者

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQfactory.setHost("192.168.74.75");Connection connection = null;Channel channel = null;try {connection = factory.newConnection();// 创建一个通道channel = connection.createChannel();//创建交换机,channel.exchangeDeclare("qy172-topic-exchange", BuiltinExchangeType.TOPIC, true);//创建队列,如果存在则不会创建channel.queueDeclare("qy172-topic-queue01", true, false, false, null);channel.queueDeclare("qy172-topic-queue02", true, false, false, null);//交互机和队列绑定//主题匹配给这个channel.queueBind("qy172-topic-queue01", "qy172-topic-exchange", "*.orange.*");//主题,也匹配给这个channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "*.*.rabbit");channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "lazy.#");// 创建消息内容HashMap<String, Object> map = new HashMap<>();map.put("name", "张三");map.put("age", "22");//把数据给交换机,让他分发给队列channel.basicPublish("qy172-topic-exchange","lazy.orange.rabbit",null,JSON.toJSONBytes(map));System.out.println("发送成功");} catch (IOException e) {// 发生 IO 异常时抛出运行时异常throw new RuntimeException(e);} catch (TimeoutException e) {// 发生超时异常时抛出运行时异常throw new RuntimeException(e);} finally {if (channel != null) {try {// 关闭通道channel.close();} catch (IOException | TimeoutException e) {// 发生 IO 或超时异常时抛出运行时异常throw new RuntimeException(e);}}if (connection != null) {try {// 关闭连接connection.close();} catch (IOException e) {// 发生 IO 异常时抛出运行时异常throw new RuntimeException(e);}}}}
}

接收者1

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {public static void main(String[] args) throws Exception {// 创建连接工厂对象ConnectionFactory factory = new ConnectionFactory();// 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"factory.setHost("192.168.74.75");Connection connection = factory.newConnection();// 创建一个 RabbitMQ 连接Channel channel = connection.createChannel();// 创建一个通道,用于与 RabbitMQ 之间的通信com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {// 创建一个消费者对象,并重写其方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费消息的处理方法String json = new String(body);// 将消息内容转换为字符串Map map = JSON.parseObject(json, Map.class);// 使用 JSON 解析成 Map 对象System.out.println("消息内容Consumer01"+map);// 输出消息内容}};channel.basicConsume("qy172-topic-queue01",true,consumer);}
}

接收者2

package com.aaa;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer02 {public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.74.75");try {Connection connection = factory.newConnection();Channel channel = connection.createChannel();com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String json = new String(body);Map map = JSON.parseObject(json, Map.class);System.out.println("消息内容Consumer02" + map);}};//订阅者2channel.basicConsume("qy172-topic-queue02",true,consumer);} catch (IOException | TimeoutException e) {// 处理连接、通道创建或消费消息时可能抛出的异常e.printStackTrace();}}
}

相关文章:

RabbitMQ的部分模式

1发布订阅模式 发送者 package org.example; import com.alibaba.fastjson.JSON; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import ja…...

提取单选框的值,并通过ajax传值到后台

<!DOCTYPE html> <html lang"zh" xmlns:th"http://www.thymeleaf.org" xmlns:shiro"http://www.pollix.at/thymeleaf/shiro"> <head><th:block th:include"include :: header(日库存更新提示)" /> </head&…...

Django创建多app应用

目录 1. 引言 2. 多app创建的两种方式 2.1 多个app结构 2.2 单个apps多个app 3. 最后 1. 引言 在平常业务开发中&#xff0c;我们遇到的功能可能会有很多&#xff0c;单个app的应用可能无法满足我们 这个时候&#xff0c;我们就需要多app应用&#xff0c;例如&#xff1a…...

如何反反爬虫

我们来讲最常见的反反爬虫方法 import requests r requests.get(网页网址) print(r.requests.headers) 一.使用简单的方法把请求头改为真的浏览器模式 import requests link网页地址 heraders{User-Agent:} rrequests.get(link,headersheaders) print(r.requsts.headers)我们…...

wireshark抓包之DNS协议

DNS协议 DNS协议的主要作用是将域名解析为对应的IP地址。当我们在浏览器中输入一个网址时&#xff0c;计算机需要通过DNS协议来查找该网址对应的IP地址&#xff0c;以便能够建立连接并访问目标资源。 DNS协议的工作流程大致如下&#xff1a; 用户的计算机或设备&#xff08;充…...

升级到 Java 21 是值得的

升级到 Java 21 是值得的 又到了一年中的这个时候——New Relic 的年度“State of the Java Ecosystem”调查结果出来了&#xff0c;我一如既往地深入研究了它。虽然我认为该报告做得很好并且提出了很好的问题&#xff0c;但我对有多少 Java 开发人员正在使用低版本感到沮丧。…...

C# 多线程

文章目录 C# 多线程进程与线程无参数的子线程带参数的子线程运行结果 销毁线程 Abort()运行结果 ThreadPool和Task运行结果 异步与同步运行结果 lock单线程运行结果 多线程运行结果 使用lock运行结果 C# 多线程 进程与线程 进程&#xff1a;进程就是一个应用程序&#xff0c;…...

快速安装sudachipy日语包

1、前往 https://rustup.rs 下载并安装 Rustup Linux系统可直接运行以下命令 Window系统需要去网站下载exe包 curl --proto https --tlsv1.2 -sSf https://sh.rustup.rs | sh2、安装 Rust 编译器 rustup install stable3、设置默认版本 rustup default stable4、重新安装 …...

蓝桥杯刷题day13——乘飞机【算法赛】

一、问题描述 等待登机的你看着眼前有老有小长长的队伍十分无聊&#xff0c;你突然想要知道&#xff0c;是否存在两个年龄相仿的乘客。每个乘客的年龄用一个 0 到 36500 的整数表示&#xff0c;两个乘客的年龄相差 365 以内就认为是相仿的。 具体来说&#xff0c;你有一个长度…...

大模型量化技术-BitsAndBytes

Transformers 量化技术 BitsAndBytes bitsandbytes是将模型量化为8位和4位的最简单选择。 8位量化将fp16中的异常值与int8中的非异常值相乘,将非异常值转换回fp16,然后将它们相加以返回fp16中的权重。这减少了异常值对模型性能产生的降级效果。4位量化进一步压缩了模型,并且…...

EasyExcel 复杂表头的导出(动态表头和静态表头)

问题&#xff1a;如图&#xff0c;1部分的表头是动态的根据日期变化&#xff0c;2部分是数据库对应的字段&#xff0c;静态不变的&#xff1b; 解决方案&#xff1a;如果不看1的部分&#xff0c;2部分内容可以根据实体类注解的方式导出&#xff0c;那么我们是不是可以先将动态表…...

centos7 fatal error: curl/curl.h: No such file or directory

若编译遇到此问题&#xff0c;可以查看环境是否libcurl库 yum list installed | grep libcurl 发现未安装libcurl库 执行libcurl库的安装命令&#xff1a; 1.对于Debian/Ubuntu系统&#xff1a; sudo apt-get install libcurl4-openssl-dev 2.对于RHEL/CentOS系统&#xf…...

【Linux】自定义协议+序列化+反序列化

自定义协议序列化反序列化 1.再谈 "协议"2.Cal TCP服务端2.Cal TCP客户端4.Json 喜欢的点赞&#xff0c;收藏&#xff0c;关注一下把&#xff01; 1.再谈 “协议” 协议是一种 “约定”。在前面我们说过父亲和儿子约定打电话的例子&#xff0c;不过这是感性的认识&a…...

常见故障排查和优化

一、MySQL单实例故障排查 故障现象 1 ERROR 2002 (HY000): Cant connect to local MySQL server through socket /data/mysql/mysql.sock (2) 问题分析&#xff1a;以上情况一般都是数据库未启动或者数据库端口被防火墙拦截导致。 解决方法&#xff1a;启动数据库或者防火墙…...

选择华为HCIE培训机构有哪些注意事项

选择软件培训机构注意四点事项1、口碑&#xff1a;学员和社会人士对该机构的评价怎样&#xff1f; 口碑对于一个机构是十分重要的&#xff0c;这也是考量一个机构好不好的重要标准&#xff0c;包括社会评价和学员的评价和感言。誉天作为华为首批授权培训中心&#xff0c;一直致…...

python怎么处理txt

导入文件处理模块 import os 检测路径是否存在&#xff0c;存在则返回True&#xff0c;不存在则返回False os.path.exists("demo.txt") 如果你要创建一个文件并要写入内容 #如果demo.txt文件存在则会覆盖&#xff0c;并且demo.txt文件里面的内容被清空&#xff0c;如…...

SAMRTFORMS 转换PDF 发送邮件

最终成果&#xff1a; *&---------------------------------------------------------------------**& Report ZLC_FIND_EXIT*&---------------------------------------------------------------------**&根据T-CODE / 程序名查询出口、BADI增强*&-------…...

探讨在大数据体系中API的通信机制与工作原理

** 引言 关联阅读博客文章&#xff1a;深入解析大数据体系中的ETL工作原理及常见组件 关联阅读博客文章&#xff1a;深入理解HDFS工作原理&#xff1a;大数据存储和容错性机制解析 ** 在当今数字化时代&#xff0c;数据已经成为企业发展和决策的核心。随着数据规模的不断增长…...

算法打卡day23

今日任务&#xff1a; 1&#xff09;39. 组合总和 2&#xff09;40.组合总和II 3&#xff09;131.分割回文串 39. 组合总和 题目链接&#xff1a;39. 组合总和 - 力扣&#xff08;LeetCode&#xff09; 给定一个无重复元素的数组 candidates 和一个目标数 target &#xff0c;…...

每天五分钟深度学习:神经网络和深度学习有什么样的关系?

本文重点 神经网络是一种模拟人脑神经元连接方式的计算模型&#xff0c;通过大量神经元之间的连接和权重调整&#xff0c;实现对输入数据的处理和分析。而深度学习则是神经网络的一种特殊形式&#xff0c;它通过构建深层次的神经网络结构&#xff0c;实现对复杂数据的深度学习…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

PHP和Node.js哪个更爽?

先说结论&#xff0c;rust完胜。 php&#xff1a;laravel&#xff0c;swoole&#xff0c;webman&#xff0c;最开始在苏宁的时候写了几年php&#xff0c;当时觉得php真的是世界上最好的语言&#xff0c;因为当初活在舒适圈里&#xff0c;不愿意跳出来&#xff0c;就好比当初活在…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

MMaDA: Multimodal Large Diffusion Language Models

CODE &#xff1a; https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA&#xff0c;它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构&#xf…...

华为OD机试-食堂供餐-二分法

import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)

骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术&#xff0c;它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton)&#xff1a;由层级结构的骨头组成&#xff0c;类似于人体骨骼蒙皮 (Mesh Skinning)&#xff1a;将模型网格顶点绑定到骨骼上&#xff0c;使骨骼移动…...

使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台

🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...

Linux --进程控制

本文从以下五个方面来初步认识进程控制&#xff1a; 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程&#xff0c;创建出来的进程就是子进程&#xff0c;原来的进程为父进程。…...

laravel8+vue3.0+element-plus搭建方法

创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...