当前位置: 首页 > news >正文

四、RocketMQ发送普通消息、批量消息和延迟消息

Producer发送普通消息的方式

1.同步发送消息

同步消息代表发送端发送消息到broker之后,等待消息发送结果后,再次发送消息
在这里插入图片描述

实现步骤

  1. 创建生产端,声明在哪个生产组
  2. 注册NameServer地址
  3. 构建Message实体,指定topic、tag、body
  4. 启动生产端
  5. 发送消息
@Test
public void syncSend() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {// 1.创建生产端,声明在哪个生产组DefaultMQProducer producer = new DefaultMQProducer("test_group");// 2.注册NameServer地址producer.setNamesrvAddr(NAME_SERVER_ADDR);// 3.构建Message实体,指定topic、tag、bodyMessage message = new Message("test", "hello world".getBytes());// 4.启动生产端producer.start();// 5.发送消息SendResult sendResult = producer.send(message);System.out.println(sendResult.getSendStatus());
}

2.异步发送消息

异步消息代表发送端发送完消息后,会直接返回,但是可以注册一个回调函数,当broker将消息落盘后,回调这个回调函数
在这里插入图片描述

实现步骤

  1. 创建生产端,声明在哪个生产组
  2. 注册NameServer地址
  3. 构建Message实体,指定topic、tag、body
  4. 启动生产端
  5. 发送消息,并且实现SendCallback接口

注:这里必须等待异步返回,否则消费者无法消费成功

@Test
public void asyncSend() throws  RemotingException, InterruptedException, MQClientException {DefaultMQProducer producer = new DefaultMQProducer("test_group");producer.setNamesrvAddr(NAME_SERVER_ADDR);Message message = new Message("test", "tag-a","hello world".getBytes());producer.start();CountDownLatch countDownLatch = new CountDownLatch(1);// 发送消息,并且实现SendCallback接口producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.println("发送成功:" + sendResult.getSendStatus());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.println("发送失败:" + e);}});countDownLatch.await();
}

3、发送单向消息

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短

在这里插入图片描述

实现步骤

  1. 创建生产端,声明在哪个生产组
  2. 注册NameServer地址
  3. 构建Message实体,指定topic、tag、body
  4. 启动生产端
  5. 发送单向消息
@Test
public void sendOneWay() throws  RemotingException, InterruptedException, MQClientException {DefaultMQProducer producer = new DefaultMQProducer("test_group");producer.setNamesrvAddr(NAME_SERVER_ADDR);Message message = new Message("test","tag-a", "hello world".getBytes());producer.start();producer.sendOneway(message);
}

Producer发送批量消息

在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

@Test
public void sendBatch() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);// 构造批量消息List<Message> list = new ArrayList<>();list.add(new Message(RocketMQConfig.TEST_TOPIC, "hello world0".getBytes(Charset.defaultCharset())));list.add(new Message(RocketMQConfig.TEST_TOPIC, "hello world1".getBytes(Charset.defaultCharset())));list.add(new Message(RocketMQConfig.TEST_TOPIC, "hello world2".getBytes(Charset.defaultCharset())));producer.start();// 发送批量消息producer.send(list);producer.shutdown();
}

**注:**需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同。

Producer发送延迟消息

Producer想要发送延迟消息,只要设置Message的DelayTimeLevel属性大于0即可。

RocketMQ无法随意设置延迟消息的延迟时间,只能根据延迟级别进行

延迟级别和延迟时间的对应关系

延迟级别延迟时间延迟级别延迟时间
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h
@Test
public void sendDelay() throws Exception {DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");producer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);producer.start();Message message = new Message(RocketMQConfig.TEST_TOPIC, "hello world".getBytes(Charset.defaultCharset()));// 设置延迟级别message.setDelayTimeLevel(3);// 发送批量消息SendResult sendResult = producer.send(message);System.out.println(sendResult.getSendStatus());producer.shutdown();
}

延迟消息的原理

延迟消息并不会直接发送到指定的topic,而是发送到一个延迟消息对应的topic中

当延迟消息的时间到达后,在将消息发送到指定的topic中

延迟消息投递的流程

  1. producer端设置消息delayLevel延迟级别,消息属性DELAY中存储了对应了延时级别

  2. broker端收到消息后,判断延时消息延迟级别,如果大于0,则备份消息原始topic,queueId,并将消息topic改为延时消息队列特定topic(SCHEDULE_TOPIC),queueId改为延时级别的delayLevel-1

  3. mq服务端ScheduleMessageService中,为每一个延迟级别单独设置一个定时器,定时(每隔1秒)拉取对应延迟级别的消费队列

  4. 根据消费偏移量offset从commitLog中解析出对应消息

  5. 从消息tagsCode中解析出消息应当被投递的时间,与当前时间做比较,判断是否应该进行投递

  6. 若到达了投递时间,则构建一个新的消息,并从消息属性中恢复出原始的topic,queueId,并清除消息延迟属性,从新进行消息投递

相关文章:

四、RocketMQ发送普通消息、批量消息和延迟消息

Producer发送普通消息的方式 1.同步发送消息 同步消息代表发送端发送消息到broker之后&#xff0c;等待消息发送结果后&#xff0c;再次发送消息 实现步骤 创建生产端&#xff0c;声明在哪个生产组注册NameServer地址构建Message实体&#xff0c;指定topic、tag、body启动…...

idea自定义 postfix completion提高编码效率

postfix completion的使用 详情见&#xff1a; https://www.cnblogs.com/expiator/p/17380495.html 自定义 postfix completion List、 String 初始化list&#xff1a; key: list表达式&#xff1a; List<$EXPR$> $END$List new ArrayList<>();字符串判空&…...

解锁学习电路设计的正确姿势!

...

【Linux】 ps命令使用

作为一个后端的程序员&#xff0c;我们经常用到ps -ef | grep XXX 到底什么事ps呢。 下面我们一起学习一下吧、 ps &#xff08;英文全拼&#xff1a;process status&#xff09;命令用于显示当前进程的状态&#xff0c;类似于 windows 的任务管理器。 ps命令 -Linux手册页 …...

打造高效的分布式爬虫系统:利用Scrapy框架实现

在大数据时代的今天&#xff0c;爬虫系统成为了获取和分析海量数据的重要工具。本文将介绍如何使用Scrapy框架来构建一个高效的分布式爬虫系统&#xff0c;以加速数据采集过程和提高系统的可扩展性。 Scrapy框架简介 Scrapy是一个基于Python的强大的开源网络爬虫框架&#xff…...

SpringCloud组件Ribbon的IRule的问题排查

最近很久没有写文章啦&#xff0c;刚好遇到了一个问题&#xff0c;其实问题也挺简单&#xff0c;但是还是得对源码有一定了解才能够发现。 最近在实现一个根据请求流量的标签&#xff0c;将请求转发到对应的节点&#xff0c;其实和俗称的灰度请求有点相似&#xff0c; 实现思…...

比较完整一些chatGPT项目代码(权威)

https://gitee.com/zccbbg/chatgpt-springboot-service yml中的配置文件无法读取&#xff0c;前端访问比较困难。...

Python - 生成二维码、条形码

二维码 import qrcode# 要生成的文本或链接 data "要生成的文本或链接"# 创建QR码对象 qr qrcode.QRCode(version1, # 版本号&#xff0c;通常设置为1error_correctionqrcode.constants.ERROR_CORRECT_L, # 错误修正级别box_size10, # 每个小方块的像素大小bor…...

8+纯生信,多组机器学习+分型探讨黑色素瘤发文思路。

今天给同学们分享一篇泛癌多组机器学习分型的生信文章“Comprehensive characterisation of immunogenic cell death in melanoma revealing the association with prognosis and tumor immune microenvironment”&#xff0c;这篇文章于2022年9月23日发表在Front Immunol 期刊…...

GPU高性能面试-写一个ReduceKernel

要求写一个reduceKernel 要求给出Kerne的完整调用: 1. 进行一维reduce 可以写一个最基础的&#xff0c;仅仅实现基础功能就行 使用share mem进行功能优化 使用shuffles指令完成block reduce操作 2.实现二维reduce...

深入探索STARK的安全性和可靠性——STARKs全面安全分析

1. 引言 non-interactive STARKs&#xff0c;起源于Interactive Oracle Proofs (IOPs)&#xff0c;然后通过random oracle模式转换为非交互式。StarkWare团队 ethSTARK Documentation – Version 1.2&#xff08;2023年7月&#xff09;论文做了更新&#xff0c;给出了完整具体…...

WPF 控件分辨率自适应问题

WPF 控件分辨率自适应时&#xff0c;我首先想到的是使用ViewBox控件来做分辨率自适应。 ViewBox这个控件通常和其他控件结合起来使用&#xff0c;是WPF中非常有用的控件。定义一个内容容器。ViewBox组件的作用是拉伸或延展位于其中的组件&#xff0c;以填满可用空间&#xff0…...

CANoe创建仿真工程

CANoe创建仿真工程 写在前面仿真工程的创建创建工程添加CAN数据库添加系统变量创建面板创建网络节点为节点添加代码工程运行测试总结 写在前面 Canoe的安装不是特别方便&#xff0c;我是参加了松勤的培训课程&#xff0c;不仅需要安装软件还需要安装驱动&#xff0c;刚刚学习的…...

Scanner 输入回车跳不出循环的解决方法

题目要求&#xff1a; 输入一行内容包含字符串和数字&#xff0c;将字符串与数字分别提取。 解决方法&#xff1a; 可以使用两个Scanner对象&#xff0c;一个用来键入数据&#xff0c;另外一个用来对数据进行操作&#xff0c;以此来解决输入“回车”跳不出while循环的问题。 i…...

docker 启动 mysql 通过防火墙设置端口无法访问解决方案

1、问题描述&#xff1a;通过 docker compose 启动mysql服务&#xff0c;然而在防火墙添加了3306端口后却无法访问&#xff0c;但是关闭防火墙后又可以访问mysql数据库。 解决方案&#xff1a; 重启 docker 后解决&#xff1a;systemctl restart docker 如果没有解决问题则执…...

智能制造优化,RFID生产线管理系统解决方案

一、背景介绍 随着全球经济的发展&#xff0c;传统制造业面临着越来越高的成本和低利润的挑战&#xff0c;为了提升企业的整体利润率&#xff0c;优化管理流程成为必要的手段之一&#xff0c;在传统的制造企业中&#xff0c;生产线通常采用单件流生产模式&#xff0c;但这种模…...

【Mybatis】基于Mybatis插件+注解,实现敏感数据自动加解密

一、介绍 业务场景中经常会遇到诸如用户手机号&#xff0c;身份证号&#xff0c;银行卡号&#xff0c;邮箱&#xff0c;地址&#xff0c;密码等等信息&#xff0c;属于敏感信息&#xff0c;需要保存在数据库中。而很多公司会会要求对数据库中的此类数据进行加密存储。 敏感数据…...

【特纳斯电子】基于物联网的指纹密码锁系统设计-实物设计

资料下载链接&#xff1a;基于物联网的指纹密码锁系统设计-实物设计 - 电子校园网 编号&#xff1a; T3732205M-SW 设计简介&#xff1a; 本设计是基于单片机的指纹密码锁&#xff0c;主要实现以下功能&#xff1a; 1、可通过密码解锁 2、可通过云平台解锁 3、可通过指纹解…...

【牛客面试必刷TOP101】Day9.BM37 二叉搜索树的最近公共祖先和BM42 用两个栈实现队列

作者简介&#xff1a;大家好&#xff0c;我是未央&#xff1b; 博客首页&#xff1a;未央.303 系列专栏&#xff1a;牛客面试必刷TOP101 每日一句&#xff1a;人的一生&#xff0c;可以有所作为的时机只有一次&#xff0c;那就是现在&#xff01;&#xff01;&#xff01;&…...

10.12 校招 实习 内推 面经

绿*泡*泡&#xff1a; neituijunsir 交流裙 &#xff0c;内推/实习/校招汇总表格 1、校招 | 2024届秋招&#xff0c;美团哪些校招岗位最缺人&#xff1f;&#xff08;内推&#xff09; 校招 | 2024届秋招&#xff0c;美团哪些校招岗位最缺人&#xff1f;&#xff08;内推&…...

解锁端侧智能:基于BigDL-LLM与Qwen-1.8B-Chat的CPU高效推理实践

1. 为什么要在CPU上部署大模型&#xff1f; 最近两年大模型技术发展迅猛&#xff0c;但大多数应用都依赖昂贵的GPU服务器。我在实际项目中发现&#xff0c;很多中小企业和个人开发者其实更需要能在普通电脑上运行的轻量化方案。这就是为什么基于CPU的大模型部署方案变得越来越…...

深部空间专属孪生,打造密闭硐室独有不可替代透明体系技术白皮书

深部空间专属孪生&#xff0c;打造密闭硐室独有不可替代透明体系技术白皮书副标题&#xff1a;井下专用暗光算法实现三维实时重建&#xff0c;搭配地下专属无感定位、多盲区跨镜穿透追踪、身体指纹特征识别&#xff0c;场景适配独一无二&#xff0c;行业无同类对标方案前言矿山…...

QKeyMapper深度解析:现代输入设备管理系统的架构揭秘与实战指南

QKeyMapper深度解析&#xff1a;现代输入设备管理系统的架构揭秘与实战指南 【免费下载链接】QKeyMapper [按键映射工具] QKeyMapper&#xff0c;Qt开发Win10&Win11可用&#xff0c;不修改注册表、不需重新启动系统&#xff0c;可立即生效和停止。支持游戏手柄映射到键鼠&a…...

智慧树自动刷课神器Autovisor:3分钟极速上手的完整指南

智慧树自动刷课神器Autovisor&#xff1a;3分钟极速上手的完整指南 【免费下载链接】Autovisor 2025智慧树刷课脚本 基于Python Playwright的自动化程序 [有免安装版] 项目地址: https://gitcode.com/gh_mirrors/au/Autovisor 还在为智慧树平台的繁琐操作而烦恼吗&#…...

DLSS Swapper终极指南:免费开源的游戏DLSS智能管理工具

DLSS Swapper终极指南&#xff1a;免费开源的游戏DLSS智能管理工具 【免费下载链接】dlss-swapper 项目地址: https://gitcode.com/GitHub_Trending/dl/dlss-swapper DLSS Swapper是一款革命性的免费开源工具&#xff0c;专为PC游戏玩家设计&#xff0c;能够智能管理、…...

如何3秒破解百度网盘提取码难题:开源工具baidupankey的技术解析与实战指南

如何3秒破解百度网盘提取码难题&#xff1a;开源工具baidupankey的技术解析与实战指南 【免费下载链接】baidupankey 项目地址: https://gitcode.com/gh_mirrors/ba/baidupankey 你是否曾在寻找百度网盘资源时&#xff0c;被一个小小的提取码卡住&#xff0c;不得不花费…...

轻量级服务器监控面板:从原理到部署实战

1. 项目概述&#xff1a;一个开源监控面板的诞生最近在折腾服务器和容器化应用&#xff0c;发现一个挺普遍的需求&#xff1a;当你手头有几台服务器&#xff0c;上面跑着几个Docker容器&#xff0c;或者一些自己写的服务&#xff0c;你总想知道它们现在“活”得怎么样。CPU是不…...

基于语义搜索的AI代码理解工具copaw-code深度解析

1. 项目概述&#xff1a;一个面向代码搜索与理解的AI工具 最近在GitHub上看到一个挺有意思的项目&#xff0c;叫 QSEEKING/copaw-code 。乍一看这个标题&#xff0c;可能会有点摸不着头脑&#xff0c;“copaw”是什么&#xff1f;但结合“code”和项目托管在QSEEKING这个组织…...

ComfyUI-Manager终极指南:3步掌握AI绘画插件管理技巧

ComfyUI-Manager终极指南&#xff1a;3步掌握AI绘画插件管理技巧 【免费下载链接】ComfyUI-Manager ComfyUI-Manager is an extension designed to enhance the usability of ComfyUI. It offers management functions to install, remove, disable, and enable various custom…...

基于RP2040的客制化宏键盘:从硬件设计到KMK固件开发全攻略

1. 项目概述与核心价值最近在折腾一个挺有意思的开源项目&#xff0c;叫clawdpad&#xff0c;作者是kudretyilmazz。乍一看这个名字&#xff0c;可能有点摸不着头脑&#xff0c;但如果你对机械键盘、客制化输入设备或者桌面自动化感兴趣&#xff0c;那这个项目绝对值得你花时间…...