当前位置: 首页 > news >正文

RocketMQ消息发送基本示例(推送消费者)

消息生产者通过三种方式发送消息

1.同步发送:等待消息返回后再继续进行下面的操作  同步发送保证了消息的可靠性,适用于关键业务场景。

2.异步发送:不等待消息返回直接进入后续流程.broker将结果返回后调用callback函数,并使用

CountDownLatch计数

3.单向发送:只负责发送,不管消息是否发送成功  单向发送不保证消息的送达,仅适用于对可靠性要求不高的场景。

消费者消费消息分两种:

拉模式:消费者主动去Broker上拉取消息

推模式:消费者等待Broker把消息推送过来

事实上:尽管存在“推送消费者”(DefaultMQPushConsumer)和“拉取消费者”(DefaultMQPullConsumer)这两种消费者类型,但实际上它们都是以“拉取”模式工作的,只不过实现方式和使用场景有所不同。

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
</dependency>

客户端与服务器安装版本一致即可

演示1    同步发送模式   客户端推送模式

注意观察   broker是把消息分两次推送的  就是发多少条消息  推送多少次

生产者     

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** 同步发送* 使用场景:* 1.可靠性要求高,消息发送需要等待确认* 2.数据量较少的场景* 3.实时响应,消息发送需要立即得到结果* 小的订单系统* @author hrui* @date 2024/7/31 20:31*/
public class SyncProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group1"DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//启动生产者实例producer.start();//发送10条消息for (int i = 0; i < 2; i++) {//创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));//发送消息,并同步等待发送结果 (同步发送)SendResult sendResult = producer.send(message);//打印消息发送结果System.out.println("第" + i + "条消息发送成功:返回---->" + sendResult);}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

消费者

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** 简单消费者* @author hrui* @date 2024/7/31 20:40*/
public class Consumer {public static void main(String[] args) {//创建一个DefaultMQPushConsumer实例,指定消费者组名为"group1"//采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息consumer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//订阅主题"Topic1",过滤标签为"*",表示接收所有消息consumer.subscribe("Topic1", "*");//设置消息监听器,处理接收到的消息//可以传入两种类型的监听器://1. MessageListenerOrderly(顺序消费):保证消息按顺序处理//2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序consumer.setMessageListener(new MessageListenerConcurrently() {//consumeMessage方法用于处理接收到的消息列表@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//                    //遍历消息列表,处理每条消息
//                    list.forEach(messageExt -> {
//                        //输出消息体内容(需要根据具体的消息编码解码,这里假设为UTF-8)
//                        System.out.println(new String(messageExt.getBody()));
//                        //消息处理成功后输出确认信息
//                        System.out.println("消息消费成功");
//                    });for (int i=0;i<list.size();i++){System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));broker是将两条消息分别发送的}//返回消费状态,CONSUME_SUCCESS表示消息消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者实例,开始接收消息consumer.start();} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();}}
}

演示2  异步发送

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** 异步发送消息* 并发流量高的场景下,使用异步发送消息可以提高吞吐量。* @author hrui* @date 2024/7/31 21:53*/
public class AsyncProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group2"DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");//计数器,用于跟踪异步消息发送的完成情况CountDownLatch countDownLatch = new CountDownLatch(100);try {// 启动生产者实例producer.start();//发送100条消息for (int i = 0; i < 100; i++) {final int index = i;//创建消息实例,指定主题为"Topic2",标签为"Tag2",消息内容为"Hello World"加上编号Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));//发送消息,异步发送。第二个参数是SendCallback回调函数producer.send(message, new SendCallback() {@Override//发送成功时,Broker回调此方法public void onSuccess(SendResult sendResult) {//将CountDownLatch计数器减一,表示一个消息发送任务完成countDownLatch.countDown();System.out.println("消息发送成功_" + sendResult);}@Override//发送失败时,Broker回调此方法public void onException(Throwable throwable) {// 将CountDownLatch计数器减一,表示一个消息发送任务完成countDownLatch.countDown();System.out.println("消息发送失败_" + throwable.getStackTrace());}});}//等待所有消息发送完成//countDownLatch.await();boolean await = countDownLatch.await(5, TimeUnit.SECONDS);if (!await) {System.out.println("消息发送超时");}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

演示3  单向发送

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** 单向发送* 试用场景* 日志收集* @author hrui* @date 2024/7/31 22:27*/
public class OnewayProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group1"DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//启动生产者实例producer.start();//发送10条消息for (int i = 0; i < 2; i++) {//创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号  topic要和消费者相同Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));//发送消息,单向发送,不管发送成功与否producer.sendOneway(message);System.out.println(i+"_消息发送了");}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

相关文章:

RocketMQ消息发送基本示例(推送消费者)

消息生产者通过三种方式发送消息 1.同步发送:等待消息返回后再继续进行下面的操作 同步发送保证了消息的可靠性&#xff0c;适用于关键业务场景。 2.异步发送:不等待消息返回直接进入后续流程.broker将结果返回后调用callback函数,并使用 CountDownLatch计数 3.单向发送:只…...

23 MySQL基本函数、分组查询、多列排序(3)

上一篇「22 B端产品经理与MySQL基本查询、排序&#xff08;2&#xff09;」了解了基本的常识和基本查询以及单列排序。下面介绍常见的基本函数、分组查询以及多列排序&#xff1a; 基本函数 user表 (注&#xff1a;以下SQL语句示例全部基于下面「user表」) uidunamedepiduag…...

PHP与SEO,应用curl库获取百度下拉关键词案例!

编程语言从来都是工具&#xff0c;编程逻辑思维才是最重要的&#xff0c;在限定的规则内&#xff0c;实现自己的想法&#xff0c;正如人生一样&#xff01; 不管是python还是php只要掌握了基础语法规则&#xff0c;明确了实现过程&#xff0c;都能达到想要实现的结果&#xff0…...

MySQL:子查询

MySQL 子查询 MySQL中的子查询是一个强大的功能&#xff0c;子查询是指在一个查询语句中嵌套另一个查询语句的情况。嵌套查询中的内部查询语句可以使用外部查询语句的结果来进行过滤、联接或作为子查询的值&#xff0c;它允许我们在一个查询内部嵌套另一个查询。通过子查询可以…...

C++—— IO流

一、C语言的输入与输出 C语言中我们用到的最频繁的输入输出方式就是scanf()和printf()。 scanf()&#xff1a;从标准输入设备&#xff08;键盘&#xff09;中读取数据&#xff0c;并将值存放在变量中。 printf()&#xff1a;将指定的文字/字符串输出到标准输出设备&#xff08;…...

vue+node后台处理大文件切片上传--前端部分

本文主要介绍&#xff0c;在vue3vite项目下&#xff0c;如何进行有效的大文件上传&#xff0c;本文章主要讲大文件切片上传方式&#xff0c;并提供简单的demo代码供参考 首先&#xff0c;请确保已经创建好项目&#xff0c;这一步跳过。 1、为了选择合适的文件&#xff0c;我们…...

【通俗理解】艺术与数学交融

【通俗理解】艺术与数学交融 艺术与数学的奇妙交融 你可以把艺术比作一个“梦幻花园”&#xff0c;它充满了无限的可能性和美感。而数学则是一把“精密钥匙”&#xff0c;它能够解开花园中的秘密&#xff0c;揭示美的内在结构。 艺术与数学交融的核心作用 组件/步骤描述艺术表…...

深入探讨 Docker 容器文件系统

引言 随着云计算和微服务架构的兴起&#xff0c;Docker 容器技术迅速成为开发和运维人员的首选工具。Docker 容器不仅提供了一种轻量级的虚拟化方式&#xff0c;还简化了应用程序的部署和管理。在众多的技术细节中&#xff0c;Docker 容器文件系统是一个至关重要的组成部分。本…...

《LeetCode热题100》---<4.子串篇三道>

本篇博客讲解LeetCode热题100道子串篇中的三道题 第一道&#xff1a;和为 K 的子数组 第二道&#xff1a;滑动窗口最大值 第三道&#xff1a;最小覆盖子串 第一道&#xff1a;和为 K 的子数组&#xff08;中等&#xff09; 法一&#xff1a;暴力枚举 class Solution {public in…...

全国区块链职业技能大赛样题第9套前端源码

后端源码地址:https://blog.csdn.net/Qhx20040819/article/details/140746050 前端源码地址:https://blog.csdn.net/Qhx20040819/article/details/140746216 智能合约+数据库表设计:https://blog.csdn.net/Qhx20040819/article/details/140746646 登录 ​ 用户管理...

如何提高编程面试成功率:LeetCode Top 100 问题及解答解析(详细面试宝典)

以下是 LeetCode Top 100 面试必备题目及其解决方案示例。这些题目涵盖了数据结构、算法、动态规划、回溯等多种重要的面试话题。希望各位同学有所收货&#xff0c;早日脱离底层到达彼岸&#xff01; 1. Two Sum 题目: 给定一个整数数组 nums 和一个目标值 target&#xff0c…...

K-近邻和神经网络

K-近邻&#xff08;K-NN, K-Nearest Neighbors&#xff09; 原理 K-近邻&#xff08;K-NN&#xff09;是一种非参数分类和回归算法。K-NN 的主要思想是根据距离度量&#xff08;如欧氏距离&#xff09;找到训练数据集中与待预测样本最近的 K 个样本&#xff0c;并根据这 K 个…...

用EasyV全景图低成本重现真实场景,360°感受数字孪生

全景图&#xff0c;即借助绘画、相片、视频、三维模型等形式&#xff0c;通过广角的表现手段&#xff0c;尽可能多表现出周围的环境。避免了一般平面效果图视角单一&#xff0c;不能带来全方位视角的缺陷&#xff0c;能够全方位的展示360度球型范围内的所有景致&#xff0c;最大…...

【Golang 面试 - 进阶题】每日 3 题(九)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/UWz06 &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏…...

孟德尔随机化、R语言,报错,如何解决?

&#x1f3c6;本文收录于《CSDN问答解惑-专业版》专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收…...

一文剖析高可用向量数据库的本质

面对因电力故障、网络问题或人为操作失误等导致的服务中断&#xff0c;数据库系统高可用能够保证系统在这些情况下仍然不间断地提供服务。如果数据库系统不具备高可用性&#xff0c;那么系统就需要承担停机和数据丢失等重大风险&#xff0c;而这些风险极有可能造成用户流失&…...

JavaScript青少年简明教程:异常处理

JavaScript青少年简明教程&#xff1a;异常处理 在 JavaScript 中&#xff0c;异常指的是程序执行过程中出现的错误或异常情况。这些错误可能导致程序无法正常执行&#xff0c;甚至崩溃。ECMA-262规范了多种JavaScript错误类型&#xff0c;这些类型都继承自Error基类。主要的错…...

科普文:Lombok使用及工作原理详解

1. 概叙 Lombok是什么&#xff1f; Project Lombok 是一个 JAVA 库&#xff0c;它可以自动插入编辑器和构建工具&#xff0c;为您的 JAVA 锦上添花。再也不要写另一个 getter/setter 或 equals 等方法&#xff0c;只要有一个注注解&#xff0c;你的类就有一个功能齐全的生成器…...

飞致云开源社区月度动态报告(2024年7月)

自2023年6月起&#xff0c;中国领先的开源软件公司FIT2CLOUD飞致云以月度为单位发布《飞致云开源社区月度动态报告》&#xff0c;旨在向广大社区用户同步飞致云旗下系列开源软件的发展情况&#xff0c;以及当月主要的产品新版本发布、社区运营成果等相关信息。 飞致云开源大屏…...

mybatis-plus——实现动态字段排序,根据实体获取字段映射数据库的具体字段

前言 前端需要根据表头的点击控件可以排序&#xff0c;虽然前端能根据当前页的数据进行对应字段的排序&#xff0c;但也仅局限于实现当前页的排序&#xff0c;无法满足全部数据的排序&#xff0c;所以需要走接口的查询进行排序&#xff0c;获取最全的排序数据 实现方案 前端…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 抽象 现代智能交通系统 &#xff08;ITS&#xff09; 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 &#xff08;…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)

HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

多模态大语言模型arxiv论文略读(108)

CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题&#xff1a;CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者&#xff1a;Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...

Map相关知识

数据结构 二叉树 二叉树&#xff0c;顾名思义&#xff0c;每个节点最多有两个“叉”&#xff0c;也就是两个子节点&#xff0c;分别是左子 节点和右子节点。不过&#xff0c;二叉树并不要求每个节点都有两个子节点&#xff0c;有的节点只 有左子节点&#xff0c;有的节点只有…...

腾讯云V3签名

想要接入腾讯云的Api&#xff0c;必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口&#xff0c;但总是卡在签名这一步&#xff0c;最后放弃选择SDK&#xff0c;这次终于自己代码实现。 可能腾讯云翻新了接口文档&#xff0c;现在阅读起来&#xff0c;清晰了很多&…...

上位机开发过程中的设计模式体会(1):工厂方法模式、单例模式和生成器模式

简介 在我的 QT/C 开发工作中&#xff0c;合理运用设计模式极大地提高了代码的可维护性和可扩展性。本文将分享我在实际项目中应用的三种创造型模式&#xff1a;工厂方法模式、单例模式和生成器模式。 1. 工厂模式 (Factory Pattern) 应用场景 在我的 QT 项目中曾经有一个需…...

使用SSE解决获取状态不一致问题

使用SSE解决获取状态不一致问题 1. 问题描述2. SSE介绍2.1 SSE 的工作原理2.2 SSE 的事件格式规范2.3 SSE与其他技术对比2.4 SSE 的优缺点 3. 实战代码 1. 问题描述 目前做的一个功能是上传多个文件&#xff0c;这个上传文件是整体功能的一部分&#xff0c;文件在上传的过程中…...

针对药品仓库的效期管理问题,如何利用WMS系统“破局”

案例&#xff1a; 某医药分销企业&#xff0c;主要经营各类药品的批发与零售。由于药品的特殊性&#xff0c;效期管理至关重要&#xff0c;但该企业一直面临效期问题的困扰。在未使用WMS系统之前&#xff0c;其药品入库、存储、出库等环节的效期管理主要依赖人工记录与检查。库…...