当前位置: 首页 > news >正文

RocketMQ消息发送基本示例(推送消费者)

消息生产者通过三种方式发送消息

1.同步发送:等待消息返回后再继续进行下面的操作  同步发送保证了消息的可靠性,适用于关键业务场景。

2.异步发送:不等待消息返回直接进入后续流程.broker将结果返回后调用callback函数,并使用

CountDownLatch计数

3.单向发送:只负责发送,不管消息是否发送成功  单向发送不保证消息的送达,仅适用于对可靠性要求不高的场景。

消费者消费消息分两种:

拉模式:消费者主动去Broker上拉取消息

推模式:消费者等待Broker把消息推送过来

事实上:尽管存在“推送消费者”(DefaultMQPushConsumer)和“拉取消费者”(DefaultMQPullConsumer)这两种消费者类型,但实际上它们都是以“拉取”模式工作的,只不过实现方式和使用场景有所不同。

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
</dependency>

客户端与服务器安装版本一致即可

演示1    同步发送模式   客户端推送模式

注意观察   broker是把消息分两次推送的  就是发多少条消息  推送多少次

生产者     

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** 同步发送* 使用场景:* 1.可靠性要求高,消息发送需要等待确认* 2.数据量较少的场景* 3.实时响应,消息发送需要立即得到结果* 小的订单系统* @author hrui* @date 2024/7/31 20:31*/
public class SyncProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group1"DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//启动生产者实例producer.start();//发送10条消息for (int i = 0; i < 2; i++) {//创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));//发送消息,并同步等待发送结果 (同步发送)SendResult sendResult = producer.send(message);//打印消息发送结果System.out.println("第" + i + "条消息发送成功:返回---->" + sendResult);}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

消费者

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** 简单消费者* @author hrui* @date 2024/7/31 20:40*/
public class Consumer {public static void main(String[] args) {//创建一个DefaultMQPushConsumer实例,指定消费者组名为"group1"//采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息consumer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//订阅主题"Topic1",过滤标签为"*",表示接收所有消息consumer.subscribe("Topic1", "*");//设置消息监听器,处理接收到的消息//可以传入两种类型的监听器://1. MessageListenerOrderly(顺序消费):保证消息按顺序处理//2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序consumer.setMessageListener(new MessageListenerConcurrently() {//consumeMessage方法用于处理接收到的消息列表@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//                    //遍历消息列表,处理每条消息
//                    list.forEach(messageExt -> {
//                        //输出消息体内容(需要根据具体的消息编码解码,这里假设为UTF-8)
//                        System.out.println(new String(messageExt.getBody()));
//                        //消息处理成功后输出确认信息
//                        System.out.println("消息消费成功");
//                    });for (int i=0;i<list.size();i++){System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));broker是将两条消息分别发送的}//返回消费状态,CONSUME_SUCCESS表示消息消费成功return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者实例,开始接收消息consumer.start();} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();}}
}

演示2  异步发送

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** 异步发送消息* 并发流量高的场景下,使用异步发送消息可以提高吞吐量。* @author hrui* @date 2024/7/31 21:53*/
public class AsyncProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group2"DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");//计数器,用于跟踪异步消息发送的完成情况CountDownLatch countDownLatch = new CountDownLatch(100);try {// 启动生产者实例producer.start();//发送100条消息for (int i = 0; i < 100; i++) {final int index = i;//创建消息实例,指定主题为"Topic2",标签为"Tag2",消息内容为"Hello World"加上编号Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));//发送消息,异步发送。第二个参数是SendCallback回调函数producer.send(message, new SendCallback() {@Override//发送成功时,Broker回调此方法public void onSuccess(SendResult sendResult) {//将CountDownLatch计数器减一,表示一个消息发送任务完成countDownLatch.countDown();System.out.println("消息发送成功_" + sendResult);}@Override//发送失败时,Broker回调此方法public void onException(Throwable throwable) {// 将CountDownLatch计数器减一,表示一个消息发送任务完成countDownLatch.countDown();System.out.println("消息发送失败_" + throwable.getStackTrace());}});}//等待所有消息发送完成//countDownLatch.await();boolean await = countDownLatch.await(5, TimeUnit.SECONDS);if (!await) {System.out.println("消息发送超时");}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

演示3  单向发送

package com.example.rocketmqdemo.simple;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;import java.nio.charset.StandardCharsets;/*** 单向发送* 试用场景* 日志收集* @author hrui* @date 2024/7/31 22:27*/
public class OnewayProducer {public static void main(String[] args) {//创建一个DefaultMQProducer实例,指定生产者组名为"group1"DefaultMQProducer producer = new DefaultMQProducer("group1");//生产者组和消费者组是不同概念  不需要相同//设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息producer.setNamesrvAddr("xxx.xxx.xxx:9876");try {//启动生产者实例producer.start();//发送10条消息for (int i = 0; i < 2; i++) {//创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号  topic要和消费者相同Message message = new Message("Topic1", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));//发送消息,单向发送,不管发送成功与否producer.sendOneway(message);System.out.println(i+"_消息发送了");}} catch (Exception e) {//捕获并打印异常信息e.printStackTrace();} finally {//关闭生产者实例,释放资源producer.shutdown();}}
}

相关文章:

RocketMQ消息发送基本示例(推送消费者)

消息生产者通过三种方式发送消息 1.同步发送:等待消息返回后再继续进行下面的操作 同步发送保证了消息的可靠性&#xff0c;适用于关键业务场景。 2.异步发送:不等待消息返回直接进入后续流程.broker将结果返回后调用callback函数,并使用 CountDownLatch计数 3.单向发送:只…...

23 MySQL基本函数、分组查询、多列排序(3)

上一篇「22 B端产品经理与MySQL基本查询、排序&#xff08;2&#xff09;」了解了基本的常识和基本查询以及单列排序。下面介绍常见的基本函数、分组查询以及多列排序&#xff1a; 基本函数 user表 (注&#xff1a;以下SQL语句示例全部基于下面「user表」) uidunamedepiduag…...

PHP与SEO,应用curl库获取百度下拉关键词案例!

编程语言从来都是工具&#xff0c;编程逻辑思维才是最重要的&#xff0c;在限定的规则内&#xff0c;实现自己的想法&#xff0c;正如人生一样&#xff01; 不管是python还是php只要掌握了基础语法规则&#xff0c;明确了实现过程&#xff0c;都能达到想要实现的结果&#xff0…...

MySQL:子查询

MySQL 子查询 MySQL中的子查询是一个强大的功能&#xff0c;子查询是指在一个查询语句中嵌套另一个查询语句的情况。嵌套查询中的内部查询语句可以使用外部查询语句的结果来进行过滤、联接或作为子查询的值&#xff0c;它允许我们在一个查询内部嵌套另一个查询。通过子查询可以…...

C++—— IO流

一、C语言的输入与输出 C语言中我们用到的最频繁的输入输出方式就是scanf()和printf()。 scanf()&#xff1a;从标准输入设备&#xff08;键盘&#xff09;中读取数据&#xff0c;并将值存放在变量中。 printf()&#xff1a;将指定的文字/字符串输出到标准输出设备&#xff08;…...

vue+node后台处理大文件切片上传--前端部分

本文主要介绍&#xff0c;在vue3vite项目下&#xff0c;如何进行有效的大文件上传&#xff0c;本文章主要讲大文件切片上传方式&#xff0c;并提供简单的demo代码供参考 首先&#xff0c;请确保已经创建好项目&#xff0c;这一步跳过。 1、为了选择合适的文件&#xff0c;我们…...

【通俗理解】艺术与数学交融

【通俗理解】艺术与数学交融 艺术与数学的奇妙交融 你可以把艺术比作一个“梦幻花园”&#xff0c;它充满了无限的可能性和美感。而数学则是一把“精密钥匙”&#xff0c;它能够解开花园中的秘密&#xff0c;揭示美的内在结构。 艺术与数学交融的核心作用 组件/步骤描述艺术表…...

深入探讨 Docker 容器文件系统

引言 随着云计算和微服务架构的兴起&#xff0c;Docker 容器技术迅速成为开发和运维人员的首选工具。Docker 容器不仅提供了一种轻量级的虚拟化方式&#xff0c;还简化了应用程序的部署和管理。在众多的技术细节中&#xff0c;Docker 容器文件系统是一个至关重要的组成部分。本…...

《LeetCode热题100》---<4.子串篇三道>

本篇博客讲解LeetCode热题100道子串篇中的三道题 第一道&#xff1a;和为 K 的子数组 第二道&#xff1a;滑动窗口最大值 第三道&#xff1a;最小覆盖子串 第一道&#xff1a;和为 K 的子数组&#xff08;中等&#xff09; 法一&#xff1a;暴力枚举 class Solution {public in…...

全国区块链职业技能大赛样题第9套前端源码

后端源码地址:https://blog.csdn.net/Qhx20040819/article/details/140746050 前端源码地址:https://blog.csdn.net/Qhx20040819/article/details/140746216 智能合约+数据库表设计:https://blog.csdn.net/Qhx20040819/article/details/140746646 登录 ​ 用户管理...

如何提高编程面试成功率:LeetCode Top 100 问题及解答解析(详细面试宝典)

以下是 LeetCode Top 100 面试必备题目及其解决方案示例。这些题目涵盖了数据结构、算法、动态规划、回溯等多种重要的面试话题。希望各位同学有所收货&#xff0c;早日脱离底层到达彼岸&#xff01; 1. Two Sum 题目: 给定一个整数数组 nums 和一个目标值 target&#xff0c…...

K-近邻和神经网络

K-近邻&#xff08;K-NN, K-Nearest Neighbors&#xff09; 原理 K-近邻&#xff08;K-NN&#xff09;是一种非参数分类和回归算法。K-NN 的主要思想是根据距离度量&#xff08;如欧氏距离&#xff09;找到训练数据集中与待预测样本最近的 K 个样本&#xff0c;并根据这 K 个…...

用EasyV全景图低成本重现真实场景,360°感受数字孪生

全景图&#xff0c;即借助绘画、相片、视频、三维模型等形式&#xff0c;通过广角的表现手段&#xff0c;尽可能多表现出周围的环境。避免了一般平面效果图视角单一&#xff0c;不能带来全方位视角的缺陷&#xff0c;能够全方位的展示360度球型范围内的所有景致&#xff0c;最大…...

【Golang 面试 - 进阶题】每日 3 题(九)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/UWz06 &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏…...

孟德尔随机化、R语言,报错,如何解决?

&#x1f3c6;本文收录于《CSDN问答解惑-专业版》专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收…...

一文剖析高可用向量数据库的本质

面对因电力故障、网络问题或人为操作失误等导致的服务中断&#xff0c;数据库系统高可用能够保证系统在这些情况下仍然不间断地提供服务。如果数据库系统不具备高可用性&#xff0c;那么系统就需要承担停机和数据丢失等重大风险&#xff0c;而这些风险极有可能造成用户流失&…...

JavaScript青少年简明教程:异常处理

JavaScript青少年简明教程&#xff1a;异常处理 在 JavaScript 中&#xff0c;异常指的是程序执行过程中出现的错误或异常情况。这些错误可能导致程序无法正常执行&#xff0c;甚至崩溃。ECMA-262规范了多种JavaScript错误类型&#xff0c;这些类型都继承自Error基类。主要的错…...

科普文:Lombok使用及工作原理详解

1. 概叙 Lombok是什么&#xff1f; Project Lombok 是一个 JAVA 库&#xff0c;它可以自动插入编辑器和构建工具&#xff0c;为您的 JAVA 锦上添花。再也不要写另一个 getter/setter 或 equals 等方法&#xff0c;只要有一个注注解&#xff0c;你的类就有一个功能齐全的生成器…...

飞致云开源社区月度动态报告(2024年7月)

自2023年6月起&#xff0c;中国领先的开源软件公司FIT2CLOUD飞致云以月度为单位发布《飞致云开源社区月度动态报告》&#xff0c;旨在向广大社区用户同步飞致云旗下系列开源软件的发展情况&#xff0c;以及当月主要的产品新版本发布、社区运营成果等相关信息。 飞致云开源大屏…...

mybatis-plus——实现动态字段排序,根据实体获取字段映射数据库的具体字段

前言 前端需要根据表头的点击控件可以排序&#xff0c;虽然前端能根据当前页的数据进行对应字段的排序&#xff0c;但也仅局限于实现当前页的排序&#xff0c;无法满足全部数据的排序&#xff0c;所以需要走接口的查询进行排序&#xff0c;获取最全的排序数据 实现方案 前端…...

2024年赣州旅游投资集团社会招聘笔试真

2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级

在互联网的快速发展中&#xff0c;高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司&#xff0c;近期做出了一个重大技术决策&#xff1a;弃用长期使用的 Nginx&#xff0c;转而采用其内部开发…...

Linux云原生安全:零信任架构与机密计算

Linux云原生安全&#xff1a;零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言&#xff1a;云原生安全的范式革命 随着云原生技术的普及&#xff0c;安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测&#xff0c;到2025年&#xff0c;零信任架构将成为超…...

ip子接口配置及删除

配置永久生效的子接口&#xff0c;2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...

省略号和可变参数模板

本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...

比较数据迁移后MySQL数据库和OceanBase数据仓库中的表

设计一个MySQL数据库和OceanBase数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较…...

通过MicroSip配置自己的freeswitch服务器进行调试记录

之前用docker安装的freeswitch的&#xff0c;启动是正常的&#xff0c; 但用下面的Microsip连接不上 主要原因有可能一下几个 1、通过下面命令可以看 [rootlocalhost default]# docker exec -it freeswitch fs_cli -x "sofia status profile internal"Name …...

机器学习的数学基础:线性模型

线性模型 线性模型的基本形式为&#xff1a; f ( x ) ω T x b f\left(\boldsymbol{x}\right)\boldsymbol{\omega}^\text{T}\boldsymbol{x}b f(x)ωTxb 回归问题 利用最小二乘法&#xff0c;得到 ω \boldsymbol{\omega} ω和 b b b的参数估计$ \boldsymbol{\hat{\omega}}…...

边缘计算网关提升水产养殖尾水处理的远程运维效率

一、项目背景 随着水产养殖行业的快速发展&#xff0c;养殖尾水的处理成为了一个亟待解决的环保问题。传统的尾水处理方式不仅效率低下&#xff0c;而且难以实现精准监控和管理。为了提升尾水处理的效果和效率&#xff0c;同时降低人力成本&#xff0c;某大型水产养殖企业决定…...

Android屏幕刷新率与FPS(Frames Per Second) 120hz

Android屏幕刷新率与FPS(Frames Per Second) 120hz 屏幕刷新率是屏幕每秒钟刷新显示内容的次数&#xff0c;单位是赫兹&#xff08;Hz&#xff09;。 60Hz 屏幕&#xff1a;每秒刷新 60 次&#xff0c;每次刷新间隔约 16.67ms 90Hz 屏幕&#xff1a;每秒刷新 90 次&#xff0c;…...