Rocket重试机制,消息模式,刷盘方式
一、Consumer 批量消费(推模式)
Consumer端先启动
Consumer端后启动. 正常情况下:应该是Consumer需要先启动
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条
package quickstart;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,订阅消息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费 ,(消费顺序消息的时候设置)
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
System.out.println("msgs的长度" + msgs.size());
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});由于这里是Consumer先启动,所以他回去轮询MQ上是否有订阅队列的消息,由于每次producer插入一条,Consumer就拿一条所以测试结果如下(每次size都是1)
2、Consumer端后启动,也就是Producer先启动
由于这里是Consumer后启动,所以MQ上也就堆积了一堆数据,Consumer的
1、Producer端重试
也就是Producer往MQ上发消息没有发送成功,我们可以设置发送失败重试的次数,发送并触发回调函数
2、Consumer端重试
2.1、exception的情况,一般重复16次 10s、30s、1分钟、2分钟、3分钟等等
上面的代码中消费异常的情况返回
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
正常则返回:
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
二、消息重试机制:消息重试分为2种
1、Producer端重试
2、Consumer端重试
consumer.start();
System.out.println("Consumer Started.");
}
}
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条
//设置重试的次数
producer.setRetryTimesWhenSendFailed(3);
//开启生产者
producer.start();
//创建一条消息
Message msg = new Message("PushTopic", "push", "1", "我是一条普通消息".getBytes());
//发送消息
SendResult result = producer.send(msg);
//发送,并触发回调函数
producer.send(msg, new SendCallback() {
@Override
//成功的回调函数
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult.getSendStatus());
System.out.println("成功了");
}
@Override
//出现异常的回调函数
public void onException(Throwable e) {
System.out.println("失败了"+e.getMessage());
}
});
package quickstart;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,订阅消息
*/
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// System.out.println("msgs的长度" + msgs.size());
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
for (MessageExt msg : msgs) {
String msgbody = new String(msg.getBody(), "utf-8");
if (msgbody.equals("Hello RocketMQ 4")) {
System.out.println("======错误=======");
int a = 1 / 0;
}
}
} catch (Exception e) {
e.printStackTrace();
if(msgs.get(0).getReconsumeTimes()==3){
//记录日志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}else{
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
假如超过了多少次之后我们可以让他不再重试记录 日志。
if(msgs.get(0).getReconsumeTimes()==3){
//记录日志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
2.2超时的情况,这种情况MQ会无限制的发送给消费端。
就是由于网络的情况,MQ发送数据之后,Consumer端并没有收到导致超时。也就是消费端没有给我返回return 任何状态,这样的就认为没有到达Consumer端。
这里模拟Producer只发送一条数据。consumer端暂停1分钟并且不发送接收状态给MQ
package model;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,订阅消息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");1、集群消费
2、广播消费
rocketMQ默认是集群消费,我们可以通过在Consumer来支持广播消费
三、消费模式
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// 表示业务处理时间
System.out.println("=========开始暂停===============");
Thread.sleep(60000);
for (MessageExt msg : msgs) {
System.out.println(" Receive New Messages: " + msg);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费
package model;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* Consumer,订阅消息
*/
public class Consumer2 {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {异步复制和同步双写主要是主和从的关系。消息需要实时消费的,就需要采用主从模式部署
异步复制:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就算从producer端发送成功了,然后通过异步复制的方法将数据复制到从节点
同步双写:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就并不算从producer端发送成功了,需要通过同步双写的方法将数据同步到从节点后, 才算数据发
送成功。
如果rocketMq才用双master部署,Producer往MQ上写入20条数据 其中Master1中拉取了12条 。Master2中拉取了8 条,这种情况下,Master1宕机,那么我们消费数据的时
候,只能消费到Master2中的8条,Master1中的12条默认持久化,不会丢失消息,需要Master1恢复之后这12条数据才能继续被消费,如果想保证消息实时消费,就才用双
Master双Slave的模式
同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。
异步刷盘:,同步刷盘是指数据到达内存之后,返回producer说数据已经发送成功。,然后再写入commitlog日志。
commitlog:
commitlog就是来存储所有的元信息,包含消息体,类似于MySQL、Oracle的redolog,所以主要有CommitLog在,Consume Queue即使数据丢失,仍然可以恢复出来。
consumequeue:记录数据的位置,以便Consume快速通过consumequeue找到commitlog中的数据
当生产者向Kafka发送消息,且正常得到响应的时候,可以确保生产者不会产生重复的消息。但是,如果生产者发送消息后,遇到网络问题,无法获取响应,生产者就无法判断该
消息是否成功提交给了Kafka。根据生产者的机制,我们知道,当出现异常时,会进行消息重传,这就可能出现“At least one”语义。为了实现“Exactly once”语义,这里提供两个
可选方案:
如果业务数据产生消息可以找到合适的字段作为主键,或是有一个全局ID生成器,可以优先考虑选用第二种方案。
为了实现消费者的“Exactly once”语义,在这里提供一种方案,供读者参考:消费者将关闭自动提交offset的功能且不再手动提交offset,这样就不使用Offsets Topic这个内部
Topic记录其offset,而是由消费者自己保存offset。这里利用事务的原子性来实现“Exactly once”语义,我们将offset和消息处理结果放在一个事务中,事务执行成功则认为此消
息被消费,否则事务回滚需要重新消费。当出现消费者宕机重启或Rebalance操作时,消费者可以从关系型数据库中找到对应的offset,然后调用KafkaConsumer.seek()方法手
动设置消费位置,从此offset处开始继续消费。
ISR(In-SyncReplica)集合表示的是目前“可用”(alive)且消息量与Leader相差不多的副本集合,这是整个副本集合的一个子集。“可用”和“相差不多”都是很模糊的描述,其实际
含义是ISR集合中的副本必须满足下面两个条件:
四、conf下的配置文件说明
五、刷盘方式
传递保证语义:
At most once:消息可能会丢,但绝不会重复传递。
At least once:消息绝不会丢,但可能会重复传递。
Exactly once: 每条消息只会被传递一次。
生产者的“Exactly once”语义方案
每个分区只有一个生产者写入消息,当出现异常或超时的情况时,生产者就要查询此分区的最后一个消息,用来决定后续操作是消息重传还是继续发送。
为每个消息添加一个全局唯一主键,生产者不做其他特殊处理,按照之前分析方式进行重传,由消费者对消息进行去重,实现“Exactly once”语义。
相关文章:

Rocket重试机制,消息模式,刷盘方式
一、Consumer 批量消费(推模式) Consumer端先启动 Consumer端后启动. 正常情况下:应该是Consumer需要先启动 consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条 package quickstart; import java.util.List; import co…...

linux+onenet可视化(图形化步骤)
文章目录 一、ONENET项目搭建1.1 ONENET注册1.2 创建产品与设备1.3 添加数据流 二、可视化配置 OneNET是由中国移动打造的PaaS物联网开放平台。平台能够帮助开发者轻松实现设备接入与设备连接,快速完成产品开发部署,为智能硬件、智能家居产品提供完善的物…...

汇编的基础
原视频 基础篇:1.1编程环境的安装 打开DOSBox 0.74-3 Options.bat调整窗口大小 windowresolution1200x640 outputddrawmount c D:\masm c: debugDEBUG 用Debug的R命令查看、改变CPU寄存器的内容: 用Debug的D命令查看内存中的内容: 用Debu…...

并发编程学习(十四):tomcat线程池
1、Tomcat 功能组件结构 Tomcat 的核心功能有两个,分别是负责接收和反馈外部请求的连接器 Connector,和负责处理请求的容器 Container。 其中连接器和容器相辅相成,一起构成了基本的 web 服务 Service。每个 Tomcat 服务器可以管理多个 Servi…...

简洁灵活工单管理系统,支持工单模版字段、工单状态自定义
一、开源项目简介 本项目为FeelDesk工单管理系统的开源版(OS),是基于开发者版(DEV)分离的标准版;支持工单模版字段、工单状态等自定义,可为不同的模版设置不同的路由规则;对工单需求…...

标签派单系统架构设计
需求描述 项目背景 根据员工历史成单情况,计算员工对不同类型工单的转化能力。根据员工和工单标签匹配进行派单。 业务流程图 规则描述 每10分钟,分城进行一次派单,派单规则可能会动态删减,需要支持动态配置 工单标签说明 一…...

Jmeter和Postman那个工具更适合做接口测试?
软件测试行业做功能测试和接口测试的人相对比较多。在测试工作中,有高手,自然也会有小白,但有一点我们无法否认,就是每一个高手都是从小白开始的,所以今天我们就来谈谈一大部分人在做的接口测试,小白变高手…...

k8s污点与容忍
1.前言 污点是给node节点打上污点标签,使得pod不能往该node节点上调度,污点有三种模式,分别是NoSchedule、PreferNoSchedule、NoExecute,容忍是给pod打上和node节点一样的污点标签,使pod能调度到带有该污点标签的node…...

市面上有哪些软件可以结合agentgpt的?众包平台结合的好处!
使用AgentGPT,提升工作效率! 随着科技的迅速发展,人工智能已经成为我们生活中不可或缺的一部分。而AgentGPT则是人工智能领域的一款杰出产品,它能够帮助我们提升工作效率,减少重复性劳动,让我们的生活更加便…...

【js】对象属性的拦截和Proxy代理与Reflect映射的用法与区别
✍️ 作者简介: 前端新手学习中。 💂 作者主页: 作者主页查看更多前端教学 🎓 专栏分享:css重难点教学 Node.js教学 从头开始学习 ajax学习 文章目录 对象属性的拦截介绍SetGet 对象的拦截介绍使用对象属性拦截和对象拦截区别练习题 映射…...

Yolov8涨点神器:ODConv+ConvNeXt提升小目标检测能力
1.涨点神器结合,助力YOLO 1.1 ICLR 2022涨点神器——即插即用的动态卷积ODConv 论文:Omni-Dimensional Dynamic Convolution 论文地址:Omni-Dimensional Dynamic Convolution | OpenReview ODConv通过并行策略引入一种多维注意力机制以对卷积核空间的四个维度学习更灵活的…...

git代码回滚是使用reset还是revert
时光不能回退,Git却允许我们改变历史。 想要让Git回退历史,有以下步骤: 使用git log命令,查看分支提交历史,确认需要回退的版本 使用git reset --hard commit_id命令,进行版本回退 使用git push origin命…...

深入理解Java ThreadLocal及其内存泄漏防范
文章目录 一、ThreadLocal简介二、ThreadLocal的内存泄漏问题三、防止ThreadLocal导致的内存泄漏四、总结 一、ThreadLocal简介 在Java中,ThreadLocal是一种线程封闭的机制,其主要目的是为每个线程都创建一个单独的变量副本。这意味着,每个线…...

介绍10款ChatGPT替代产品
ChatGPT 引领着聊天 AI 的世界,许多人已经开始在日常生活中使用它。OpenAI 的 GPT-3 语言模型是聊天机器人的基础,它使得用户能够通过回答问题与 AI 进行交互。 GPT-4 的引入为机器人提供了更强大的功能。然而,它也有一个明显的缺点ÿ…...

数字逻辑 期末
概述 教材:《电子技术基础(数字部分)》 第六版 7400系列是TTL型芯片,商用型 数制 十进制->二进制 除2取余法&乘2取整法(注意精度,但计科简单不考) 十六进制->二进制 一位变四位 八…...

MT4交易外汇平台有哪些优势?为何是外汇投资首选?
外汇市场上存在着各种各样的外汇交易商,但是很多的外汇交易商所选择的交易平台都是MT4交易外汇平台。作为全世界范围内使用最为广泛的交易平台,MT4交易外汇平台具有哪些优势,能够让外汇交易商和外汇投资者都选择使用。本文就来具体的聊聊&…...

问卷调查工具实力榜单发布
问卷调查是从目标受众那里收集有价值的反馈和见解的有效方式。正确的调查问卷工具可以使问卷的创建、分发和分析变得更加容易和高效。在本文中,我们将问卷调查工具排行榜实力榜,为大家选择问卷平台的时候提供有价值的参考意见。 1、Zoho Survey Zoho S…...

javascript中property和attribute有什么区别?
在JavaScript中,“property”(属性)和"attribute"(属性)这两个术语用于描述对象的特性,但它们在含义和用法上有一些区别。 1、属性(Properties): 属性是属于J…...

快速上手kettle
一、前言 最近由于工作需要,需要用到kettle工具进行数据迁移转换。特意找资料学习了一下,kettle基本操作算是学会了。 所学的也结合实际工作进行了验证。为了防止以后用到忘记了,便写了几篇文章记录一下。 二 、ETL简介 ETL ( Extract-Tran…...

Leetcode 399. 除法求值
Leetcode 399. 除法求值题目 给你一个变量对数组 equations 和一个实数值数组 values 作为已知条件,其中 equations[i] [Ai, Bi] 和 values[i] 共同表示等式 Ai / Bi values[i] 。每个Ai 或 Bi 是一个表示单个变量的字符串。另有一些以数组 queries 表示的问题&am…...

kotlin协程并发/并行与串行互相切换,CoroutineScope与await
kotlin协程并发/并行与串行互相切换,CoroutineScope与await import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.delay import kotlinx.coroutines.launch import java.time.LocalTimefun main(args: Arra…...

初识linux之简单了解TCP协议与UDP协议
目录 一、理解源IP地址和目的IP地址 二、端口号 1. 为什么要有端口号 2. 理解端口号 3. 源端口号和目的端口号 三、初步了解TCP协议和UDP协议 1. 初步认识TCP协议 2. 初步认识UDP协议 3. 可靠传输与不可靠传输 四、网络字节序 1. 网络字节序的概念 2. 如何形成网络…...

【String——简单使用】
文章目录 String1. 字符串定义和初始化2. 字符串基本操作2.1 访问单个字符2.2 修改字符串内容2.3 字符串查找和比较 3. 常用字符串函数3.1 length() 和 size()3.2 empty()3.3 substr()3.4 c_str() 4.字符与整形之间相互转换4.1 char 类型转 int 类型4.2 int 类型转 char 类型4.…...

Python下Taobao封装API接口的优势
Python是一门面向对象编程的语言,封装是面向对象编程中的一种重要概念,它把数据和方法包装在一起,实现了对数据的保护和控制。Python封装接口的优势如下: 1.安全性 封装可以保证数据的安全性,禁止外部对数据的直接访…...

LeetCode 49 字母异位词分组
LeetCode 49 字母异位词分组 来源:力扣(LeetCode) 链接:https://leetcode.cn/problems/group-anagrams/description/ 博主Github:https://github.com/GDUT-Rp/LeetCode 题目: 给你一个字符串数组&#x…...

( 链表) 142. 环形链表 II——【Leetcode每日一题】
❓142. 环形链表 II 难度:中等 给定一个链表的头节点 head ,返回链表开始入环的第一个节点。 如果链表无环,则返回 null。 如果链表中有某个节点,可以通过连续跟踪 next 指针再次到达,则链表中存在环。 为了表示给定…...

论文解读 | 基于改进点对特征的点云6D姿态估计
原创 | 文 BFT机器人 01 摘要 点对特征(PPF)方法已被证明是一种有效的杂波和遮挡下的姿态估计方法。 文章的改进方法主要包括: (1)一种基于奇偶规则求解封闭几何的法向的方法; (2)通过将体素网格划分为等效角度单元的有效降采样方法; (3)基于拟合点的验证步骤。在真实杂波数据集…...

Shell脚本while循环语句应用
记录:433 场景:Shell脚本while循环语句应用。Shell脚本while循环语句应用。while do done、while : do done、while true do done。 版本:CentOS Linux release 7.9.2009。 1.while常用格式 1.1格式一:while do done while c…...

Kubernetes Dashboard + Ingress 及其 yaml 文件分析
概述 记录部署Dashboard Ingress的具体过程及其 yaml 文件分析 Dashboard Yaml # Copyright 2017 The Kubernetes Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the Li…...

【SpringCloud组件——Nacos】
前置准备: 分别提供订单系统(OrderService)和用户系统(UserService)。订单系统主要负责订单相关信息的处理,用户系统主要负责用户相关信息的处理。 一、服务注册与发现 1.1、在父工程当中引入Nacos依赖 …...