当前位置: 首页 > news >正文

RocketMQ消息发送基本示例(推送消费者)

消息生产者通过三种方式发送消息

1.同步发送:等待消息返回后再继续进行下面的操作  同步发送保证了消息的可靠性,适用于关键业务场景。

2.异步发送:不等待消息返回直接进入后续流程.broker将结果返回后调用callback函数,并使用

CountDownLatch计数

3.单向发送:只负责发送,不管消息是否发送成功  单向发送不保证消息的送达,仅适用于对可靠性要求不高的场景。

消费者消费消息分两种:

拉模式:消费者主动去Broker上拉取消息

推模式:消费者等待Broker把消息推送过来

事实上:尽管存在“推送消费者”(DefaultMQPushConsumer)和“拉取消费者”(DefaultMQPullConsumer)这两种消费者类型,但实际上它们都是以“拉取”模式工作的,只不过实现方式和使用场景有所不同。

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
</dependency>

客户端与服务器安装版本一致即可

演示1    同步发送模式   客户端推送模式

注意观察   broker是把消息分两次推送的  就是发多少条消息  推送多少次

生产者     

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** 同步发送* 使用场景:* 1.可靠性要求高,消息发送需要等待确认* 2.数据量较少的场景* 3.实时响应,消息发送需要立即得到结果* 小的订单系统* @author hrui* @date 2024/7/31 20:31*/
public class SyncProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group1"DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//启动生产者实例producer.start();//发送10条消息for (int i = 0; i < 2; i++) {//创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));//发送消息,并同步等待发送结果 (同步发送)SendResult sendResult = producer.send(message);//打印消息发送结果System.out.println("第" + i + "条消息发送成功:返回---->" + sendResult);}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

消费者

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** 简单消费者* @author hrui* @date 2024/7/31 20:40*/
public class Consumer {public static void main(String[] args) {//创建一个DefaultMQPushConsumer实例,指定消费者组名为"group1"//采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息consumer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//订阅主题"Topic1",过滤标签为"*",表示接收所有消息consumer.subscribe("Topic1", "*");//设置消息监听器,处理接收到的消息//可以传入两种类型的监听器://1. MessageListenerOrderly(顺序消费):保证消息按顺序处理//2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序consumer.setMessageListener(new MessageListenerConcurrently() {//consumeMessage方法用于处理接收到的消息列表@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//                    //遍历消息列表,处理每条消息
//                    list.forEach(messageExt -> {
//                        //输出消息体内容(需要根据具体的消息编码解码,这里假设为UTF-8)
//                        System.out.println(new String(messageExt.getBody()));
//                        //消息处理成功后输出确认信息
//                        System.out.println("消息消费成功");
//                    });for (int i=0;i<list.size();i++){System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));broker是将两条消息分别发送的}//返回消费状态,CONSUME_SUCCESS表示消息消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者实例,开始接收消息consumer.start();} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();}}
}

演示2  异步发送

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** 异步发送消息* 并发流量高的场景下,使用异步发送消息可以提高吞吐量。* @author hrui* @date 2024/7/31 21:53*/
public class AsyncProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group2"DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");//计数器,用于跟踪异步消息发送的完成情况CountDownLatch countDownLatch = new CountDownLatch(100);try {// 启动生产者实例producer.start();//发送100条消息for (int i = 0; i < 100; i++) {final int index = i;//创建消息实例,指定主题为"Topic2",标签为"Tag2",消息内容为"Hello World"加上编号Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));//发送消息,异步发送。第二个参数是SendCallback回调函数producer.send(message, new SendCallback() {@Override//发送成功时,Broker回调此方法public void onSuccess(SendResult sendResult) {//将CountDownLatch计数器减一,表示一个消息发送任务完成countDownLatch.countDown();System.out.println("消息发送成功_" + sendResult);}@Override//发送失败时,Broker回调此方法public void onException(Throwable throwable) {// 将CountDownLatch计数器减一,表示一个消息发送任务完成countDownLatch.countDown();System.out.println("消息发送失败_" + throwable.getStackTrace());}});}//等待所有消息发送完成//countDownLatch.await();boolean await = countDownLatch.await(5, TimeUnit.SECONDS);if (!await) {System.out.println("消息发送超时");}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

演示3  单向发送

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** 单向发送* 试用场景* 日志收集* @author hrui* @date 2024/7/31 22:27*/
public class OnewayProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group1"DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//启动生产者实例producer.start();//发送10条消息for (int i = 0; i < 2; i++) {//创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号  topic要和消费者相同Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));//发送消息,单向发送,不管发送成功与否producer.sendOneway(message);System.out.println(i+"_消息发送了");}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

相关文章:

RocketMQ消息发送基本示例(推送消费者)

消息生产者通过三种方式发送消息 1.同步发送:等待消息返回后再继续进行下面的操作 同步发送保证了消息的可靠性&#xff0c;适用于关键业务场景。 2.异步发送:不等待消息返回直接进入后续流程.broker将结果返回后调用callback函数,并使用 CountDownLatch计数 3.单向发送:只…...

23 MySQL基本函数、分组查询、多列排序(3)

上一篇「22 B端产品经理与MySQL基本查询、排序&#xff08;2&#xff09;」了解了基本的常识和基本查询以及单列排序。下面介绍常见的基本函数、分组查询以及多列排序&#xff1a; 基本函数 user表 (注&#xff1a;以下SQL语句示例全部基于下面「user表」) uidunamedepiduag…...

PHP与SEO,应用curl库获取百度下拉关键词案例!

编程语言从来都是工具&#xff0c;编程逻辑思维才是最重要的&#xff0c;在限定的规则内&#xff0c;实现自己的想法&#xff0c;正如人生一样&#xff01; 不管是python还是php只要掌握了基础语法规则&#xff0c;明确了实现过程&#xff0c;都能达到想要实现的结果&#xff0…...

MySQL:子查询

MySQL 子查询 MySQL中的子查询是一个强大的功能&#xff0c;子查询是指在一个查询语句中嵌套另一个查询语句的情况。嵌套查询中的内部查询语句可以使用外部查询语句的结果来进行过滤、联接或作为子查询的值&#xff0c;它允许我们在一个查询内部嵌套另一个查询。通过子查询可以…...

C++—— IO流

一、C语言的输入与输出 C语言中我们用到的最频繁的输入输出方式就是scanf()和printf()。 scanf()&#xff1a;从标准输入设备&#xff08;键盘&#xff09;中读取数据&#xff0c;并将值存放在变量中。 printf()&#xff1a;将指定的文字/字符串输出到标准输出设备&#xff08;…...

vue+node后台处理大文件切片上传--前端部分

本文主要介绍&#xff0c;在vue3vite项目下&#xff0c;如何进行有效的大文件上传&#xff0c;本文章主要讲大文件切片上传方式&#xff0c;并提供简单的demo代码供参考 首先&#xff0c;请确保已经创建好项目&#xff0c;这一步跳过。 1、为了选择合适的文件&#xff0c;我们…...

【通俗理解】艺术与数学交融

【通俗理解】艺术与数学交融 艺术与数学的奇妙交融 你可以把艺术比作一个“梦幻花园”&#xff0c;它充满了无限的可能性和美感。而数学则是一把“精密钥匙”&#xff0c;它能够解开花园中的秘密&#xff0c;揭示美的内在结构。 艺术与数学交融的核心作用 组件/步骤描述艺术表…...

深入探讨 Docker 容器文件系统

引言 随着云计算和微服务架构的兴起&#xff0c;Docker 容器技术迅速成为开发和运维人员的首选工具。Docker 容器不仅提供了一种轻量级的虚拟化方式&#xff0c;还简化了应用程序的部署和管理。在众多的技术细节中&#xff0c;Docker 容器文件系统是一个至关重要的组成部分。本…...

《LeetCode热题100》---<4.子串篇三道>

本篇博客讲解LeetCode热题100道子串篇中的三道题 第一道&#xff1a;和为 K 的子数组 第二道&#xff1a;滑动窗口最大值 第三道&#xff1a;最小覆盖子串 第一道&#xff1a;和为 K 的子数组&#xff08;中等&#xff09; 法一&#xff1a;暴力枚举 class Solution {public in…...

全国区块链职业技能大赛样题第9套前端源码

后端源码地址:https://blog.csdn.net/Qhx20040819/article/details/140746050 前端源码地址:https://blog.csdn.net/Qhx20040819/article/details/140746216 智能合约+数据库表设计:https://blog.csdn.net/Qhx20040819/article/details/140746646 登录 ​ 用户管理...

如何提高编程面试成功率:LeetCode Top 100 问题及解答解析(详细面试宝典)

以下是 LeetCode Top 100 面试必备题目及其解决方案示例。这些题目涵盖了数据结构、算法、动态规划、回溯等多种重要的面试话题。希望各位同学有所收货&#xff0c;早日脱离底层到达彼岸&#xff01; 1. Two Sum 题目: 给定一个整数数组 nums 和一个目标值 target&#xff0c…...

K-近邻和神经网络

K-近邻&#xff08;K-NN, K-Nearest Neighbors&#xff09; 原理 K-近邻&#xff08;K-NN&#xff09;是一种非参数分类和回归算法。K-NN 的主要思想是根据距离度量&#xff08;如欧氏距离&#xff09;找到训练数据集中与待预测样本最近的 K 个样本&#xff0c;并根据这 K 个…...

用EasyV全景图低成本重现真实场景,360°感受数字孪生

全景图&#xff0c;即借助绘画、相片、视频、三维模型等形式&#xff0c;通过广角的表现手段&#xff0c;尽可能多表现出周围的环境。避免了一般平面效果图视角单一&#xff0c;不能带来全方位视角的缺陷&#xff0c;能够全方位的展示360度球型范围内的所有景致&#xff0c;最大…...

【Golang 面试 - 进阶题】每日 3 题(九)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/UWz06 &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏…...

孟德尔随机化、R语言,报错,如何解决?

&#x1f3c6;本文收录于《CSDN问答解惑-专业版》专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收…...

一文剖析高可用向量数据库的本质

面对因电力故障、网络问题或人为操作失误等导致的服务中断&#xff0c;数据库系统高可用能够保证系统在这些情况下仍然不间断地提供服务。如果数据库系统不具备高可用性&#xff0c;那么系统就需要承担停机和数据丢失等重大风险&#xff0c;而这些风险极有可能造成用户流失&…...

JavaScript青少年简明教程:异常处理

JavaScript青少年简明教程&#xff1a;异常处理 在 JavaScript 中&#xff0c;异常指的是程序执行过程中出现的错误或异常情况。这些错误可能导致程序无法正常执行&#xff0c;甚至崩溃。ECMA-262规范了多种JavaScript错误类型&#xff0c;这些类型都继承自Error基类。主要的错…...

科普文:Lombok使用及工作原理详解

1. 概叙 Lombok是什么&#xff1f; Project Lombok 是一个 JAVA 库&#xff0c;它可以自动插入编辑器和构建工具&#xff0c;为您的 JAVA 锦上添花。再也不要写另一个 getter/setter 或 equals 等方法&#xff0c;只要有一个注注解&#xff0c;你的类就有一个功能齐全的生成器…...

飞致云开源社区月度动态报告(2024年7月)

自2023年6月起&#xff0c;中国领先的开源软件公司FIT2CLOUD飞致云以月度为单位发布《飞致云开源社区月度动态报告》&#xff0c;旨在向广大社区用户同步飞致云旗下系列开源软件的发展情况&#xff0c;以及当月主要的产品新版本发布、社区运营成果等相关信息。 飞致云开源大屏…...

mybatis-plus——实现动态字段排序,根据实体获取字段映射数据库的具体字段

前言 前端需要根据表头的点击控件可以排序&#xff0c;虽然前端能根据当前页的数据进行对应字段的排序&#xff0c;但也仅局限于实现当前页的排序&#xff0c;无法满足全部数据的排序&#xff0c;所以需要走接口的查询进行排序&#xff0c;获取最全的排序数据 实现方案 前端…...

终极压枪指南:5步掌握PUBG罗技鼠标宏精准射击

终极压枪指南&#xff1a;5步掌握PUBG罗技鼠标宏精准射击 【免费下载链接】logitech-pubg PUBG no recoil script for Logitech gaming mouse / 绝地求生 罗技 鼠标宏 项目地址: https://gitcode.com/gh_mirrors/lo/logitech-pubg logitech-pubg是一款专为《绝地求生》玩…...

在AutoDL上从零部署YOLO训练环境:新手避坑指南

1. 为什么选择AutoDL部署YOLO训练环境 第一次接触目标检测任务时&#xff0c;我和大多数新手一样被各种环境配置问题折磨得够呛。本地显卡跑不动YOLOv5&#xff0c;租用云服务器又担心操作复杂&#xff0c;直到发现了AutoDL这个宝藏平台。它最大的优势就是把复杂的GPU实例管理简…...

Java 设计模式・策略模式篇:从思想到代码实现

一、行为型模式 在面向对象的世界里&#xff0c;如何优雅地组织对象间的交互、分配职责&#xff0c;是每一位开发者都会反复思考的问题。直接硬编码交互逻辑固然简单&#xff0c;但当业务复杂度上升、对象协作关系变得错综复杂时&#xff0c;这种方式就会让代码变得僵化、难以…...

【ArkTS】编程规范

ArkTS 是 HarmonyOS 应用的默认开发语言,在 TypeScript(简称 TS)生态基础上做了扩展,保持 TS 的基本风格。通过规范定义,从而强化了开发期的静态检查和分析,提升了程序执行的稳定性和性能。 一、术语与定义 术语 缩略语 中文解释 ArkTS 无 ArkTS编程语言 TypeScript TS …...

避坑指南:Ollama部署DeepSeek-R1时,如何安全地开放API端口给内网其他服务调用?

深度解析&#xff1a;Ollama部署DeepSeek-R1时内网API安全开放实战 当你在一台Linux服务器上成功部署了Ollama和DeepSeek-R1模型后&#xff0c;下一步自然是想让内网中的其他服务也能调用这个强大的AI能力。但直接开放端口就像把家门钥匙插在锁上——方便但危险。本文将带你深入…...

LangGraph实战:从零构建并部署一个多功能智能体

1. LangGraph框架概述&#xff1a;新一代智能体开发范式 在人工智能应用开发领域&#xff0c;智能体&#xff08;Agent&#xff09;技术正经历着从简单问答到复杂任务执行的进化。LangGraph作为LangChain生态中的新一代开发框架&#xff0c;彻底改变了传统链式结构的局限性。我…...

从拼图游戏到自动驾驶:点云配准技术的跨领域进化史

从拼图游戏到自动驾驶&#xff1a;点云配准技术的跨领域进化史 1. 三维世界的数字拼图师 1987年&#xff0c;当Paul Besl和Neil McKay在实验室里尝试将两组扫描数据对齐时&#xff0c;他们可能不会想到&#xff0c;这项被称为迭代最近点&#xff08;ICP&#xff09;的技术会成为…...

WiFi热图绘制工具:用Python为你的无线网络做一次“CT扫描“ [特殊字符][特殊字符]

WiFi热图绘制工具&#xff1a;用Python为你的无线网络做一次"CT扫描" &#x1f3e5;&#x1f4f6; 【免费下载链接】wifi-heat-mapper whm also known as wifi-heat-mapper is a Python library for benchmarking Wi-Fi networks and gather useful metrics that can…...

经典概率题:飞机座位分配问题(LeetCode 1227)超详细解析

一、题目背景与描述这是一道非常经典的概率与逻辑推理面试题&#xff0c;也是 LeetCode 第 1227 题「飞机座位分配概率」。题目描述有 n 位乘客即将登机&#xff0c;飞机正好有 n 个座位。第一位乘客的票丢了&#xff0c;他随机选一个座位坐下。剩下的乘客&#xff1a;如果自己…...

避坑指南:如何在torch 2.4.0 + CUDA 12.1环境下成功安装llamafactory及其依赖

深度避坑&#xff1a;PyTorch 2.4.0与CUDA 12.1环境下的Llamafactory全栈部署实战 当开发者尝试在PyTorch 2.4.0和CUDA 12.1环境下部署Llamafactory时&#xff0c;往往会陷入依赖地狱——从Torch版本误装到vllm模块缺失&#xff0c;每个环节都可能成为耗时数小时的深坑。本文将…...