Rocket重试机制,消息模式,刷盘方式
一、Consumer 批量消费(推模式)
Consumer端先启动
Consumer端后启动. 正常情况下:应该是Consumer需要先启动
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条
package quickstart;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,订阅消息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费 ,(消费顺序消息的时候设置)
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
System.out.println("msgs的长度" + msgs.size());
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});由于这里是Consumer先启动,所以他回去轮询MQ上是否有订阅队列的消息,由于每次producer插入一条,Consumer就拿一条所以测试结果如下(每次size都是1)
2、Consumer端后启动,也就是Producer先启动
由于这里是Consumer后启动,所以MQ上也就堆积了一堆数据,Consumer的
1、Producer端重试
也就是Producer往MQ上发消息没有发送成功,我们可以设置发送失败重试的次数,发送并触发回调函数
2、Consumer端重试
2.1、exception的情况,一般重复16次 10s、30s、1分钟、2分钟、3分钟等等
上面的代码中消费异常的情况返回
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
正常则返回:
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
二、消息重试机制:消息重试分为2种
1、Producer端重试
2、Consumer端重试
consumer.start();
System.out.println("Consumer Started.");
}
}
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条
//设置重试的次数
producer.setRetryTimesWhenSendFailed(3);
//开启生产者
producer.start();
//创建一条消息
Message msg = new Message("PushTopic", "push", "1", "我是一条普通消息".getBytes());
//发送消息
SendResult result = producer.send(msg);
//发送,并触发回调函数
producer.send(msg, new SendCallback() {
@Override
//成功的回调函数
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult.getSendStatus());
System.out.println("成功了");
}
@Override
//出现异常的回调函数
public void onException(Throwable e) {
System.out.println("失败了"+e.getMessage());
}
});
package quickstart;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,订阅消息
*/
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// System.out.println("msgs的长度" + msgs.size());
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
for (MessageExt msg : msgs) {
String msgbody = new String(msg.getBody(), "utf-8");
if (msgbody.equals("Hello RocketMQ 4")) {
System.out.println("======错误=======");
int a = 1 / 0;
}
}
} catch (Exception e) {
e.printStackTrace();
if(msgs.get(0).getReconsumeTimes()==3){
//记录日志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}else{
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
假如超过了多少次之后我们可以让他不再重试记录 日志。
if(msgs.get(0).getReconsumeTimes()==3){
//记录日志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
2.2超时的情况,这种情况MQ会无限制的发送给消费端。
就是由于网络的情况,MQ发送数据之后,Consumer端并没有收到导致超时。也就是消费端没有给我返回return 任何状态,这样的就认为没有到达Consumer端。
这里模拟Producer只发送一条数据。consumer端暂停1分钟并且不发送接收状态给MQ
package model;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,订阅消息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");1、集群消费
2、广播消费
rocketMQ默认是集群消费,我们可以通过在Consumer来支持广播消费
三、消费模式
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// 表示业务处理时间
System.out.println("=========开始暂停===============");
Thread.sleep(60000);
for (MessageExt msg : msgs) {
System.out.println(" Receive New Messages: " + msg);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费
package model;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* Consumer,订阅消息
*/
public class Consumer2 {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {异步复制和同步双写主要是主和从的关系。消息需要实时消费的,就需要采用主从模式部署
异步复制:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就算从producer端发送成功了,然后通过异步复制的方法将数据复制到从节点
同步双写:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就并不算从producer端发送成功了,需要通过同步双写的方法将数据同步到从节点后, 才算数据发
送成功。
如果rocketMq才用双master部署,Producer往MQ上写入20条数据 其中Master1中拉取了12条 。Master2中拉取了8 条,这种情况下,Master1宕机,那么我们消费数据的时
候,只能消费到Master2中的8条,Master1中的12条默认持久化,不会丢失消息,需要Master1恢复之后这12条数据才能继续被消费,如果想保证消息实时消费,就才用双
Master双Slave的模式
同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。
异步刷盘:,同步刷盘是指数据到达内存之后,返回producer说数据已经发送成功。,然后再写入commitlog日志。
commitlog:
commitlog就是来存储所有的元信息,包含消息体,类似于MySQL、Oracle的redolog,所以主要有CommitLog在,Consume Queue即使数据丢失,仍然可以恢复出来。
consumequeue:记录数据的位置,以便Consume快速通过consumequeue找到commitlog中的数据
当生产者向Kafka发送消息,且正常得到响应的时候,可以确保生产者不会产生重复的消息。但是,如果生产者发送消息后,遇到网络问题,无法获取响应,生产者就无法判断该
消息是否成功提交给了Kafka。根据生产者的机制,我们知道,当出现异常时,会进行消息重传,这就可能出现“At least one”语义。为了实现“Exactly once”语义,这里提供两个
可选方案:
如果业务数据产生消息可以找到合适的字段作为主键,或是有一个全局ID生成器,可以优先考虑选用第二种方案。
为了实现消费者的“Exactly once”语义,在这里提供一种方案,供读者参考:消费者将关闭自动提交offset的功能且不再手动提交offset,这样就不使用Offsets Topic这个内部
Topic记录其offset,而是由消费者自己保存offset。这里利用事务的原子性来实现“Exactly once”语义,我们将offset和消息处理结果放在一个事务中,事务执行成功则认为此消
息被消费,否则事务回滚需要重新消费。当出现消费者宕机重启或Rebalance操作时,消费者可以从关系型数据库中找到对应的offset,然后调用KafkaConsumer.seek()方法手
动设置消费位置,从此offset处开始继续消费。
ISR(In-SyncReplica)集合表示的是目前“可用”(alive)且消息量与Leader相差不多的副本集合,这是整个副本集合的一个子集。“可用”和“相差不多”都是很模糊的描述,其实际
含义是ISR集合中的副本必须满足下面两个条件:
四、conf下的配置文件说明
五、刷盘方式
传递保证语义:
At most once:消息可能会丢,但绝不会重复传递。
At least once:消息绝不会丢,但可能会重复传递。
Exactly once: 每条消息只会被传递一次。
生产者的“Exactly once”语义方案
每个分区只有一个生产者写入消息,当出现异常或超时的情况时,生产者就要查询此分区的最后一个消息,用来决定后续操作是消息重传还是继续发送。
为每个消息添加一个全局唯一主键,生产者不做其他特殊处理,按照之前分析方式进行重传,由消费者对消息进行去重,实现“Exactly once”语义。
相关文章:
Rocket重试机制,消息模式,刷盘方式
一、Consumer 批量消费(推模式) Consumer端先启动 Consumer端后启动. 正常情况下:应该是Consumer需要先启动 consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条 package quickstart; import java.util.List; import co…...

linux+onenet可视化(图形化步骤)
文章目录 一、ONENET项目搭建1.1 ONENET注册1.2 创建产品与设备1.3 添加数据流 二、可视化配置 OneNET是由中国移动打造的PaaS物联网开放平台。平台能够帮助开发者轻松实现设备接入与设备连接,快速完成产品开发部署,为智能硬件、智能家居产品提供完善的物…...
汇编的基础
原视频 基础篇:1.1编程环境的安装 打开DOSBox 0.74-3 Options.bat调整窗口大小 windowresolution1200x640 outputddrawmount c D:\masm c: debugDEBUG 用Debug的R命令查看、改变CPU寄存器的内容: 用Debug的D命令查看内存中的内容: 用Debu…...

并发编程学习(十四):tomcat线程池
1、Tomcat 功能组件结构 Tomcat 的核心功能有两个,分别是负责接收和反馈外部请求的连接器 Connector,和负责处理请求的容器 Container。 其中连接器和容器相辅相成,一起构成了基本的 web 服务 Service。每个 Tomcat 服务器可以管理多个 Servi…...

简洁灵活工单管理系统,支持工单模版字段、工单状态自定义
一、开源项目简介 本项目为FeelDesk工单管理系统的开源版(OS),是基于开发者版(DEV)分离的标准版;支持工单模版字段、工单状态等自定义,可为不同的模版设置不同的路由规则;对工单需求…...

标签派单系统架构设计
需求描述 项目背景 根据员工历史成单情况,计算员工对不同类型工单的转化能力。根据员工和工单标签匹配进行派单。 业务流程图 规则描述 每10分钟,分城进行一次派单,派单规则可能会动态删减,需要支持动态配置 工单标签说明 一…...

Jmeter和Postman那个工具更适合做接口测试?
软件测试行业做功能测试和接口测试的人相对比较多。在测试工作中,有高手,自然也会有小白,但有一点我们无法否认,就是每一个高手都是从小白开始的,所以今天我们就来谈谈一大部分人在做的接口测试,小白变高手…...

k8s污点与容忍
1.前言 污点是给node节点打上污点标签,使得pod不能往该node节点上调度,污点有三种模式,分别是NoSchedule、PreferNoSchedule、NoExecute,容忍是给pod打上和node节点一样的污点标签,使pod能调度到带有该污点标签的node…...
市面上有哪些软件可以结合agentgpt的?众包平台结合的好处!
使用AgentGPT,提升工作效率! 随着科技的迅速发展,人工智能已经成为我们生活中不可或缺的一部分。而AgentGPT则是人工智能领域的一款杰出产品,它能够帮助我们提升工作效率,减少重复性劳动,让我们的生活更加便…...

【js】对象属性的拦截和Proxy代理与Reflect映射的用法与区别
✍️ 作者简介: 前端新手学习中。 💂 作者主页: 作者主页查看更多前端教学 🎓 专栏分享:css重难点教学 Node.js教学 从头开始学习 ajax学习 文章目录 对象属性的拦截介绍SetGet 对象的拦截介绍使用对象属性拦截和对象拦截区别练习题 映射…...

Yolov8涨点神器:ODConv+ConvNeXt提升小目标检测能力
1.涨点神器结合,助力YOLO 1.1 ICLR 2022涨点神器——即插即用的动态卷积ODConv 论文:Omni-Dimensional Dynamic Convolution 论文地址:Omni-Dimensional Dynamic Convolution | OpenReview ODConv通过并行策略引入一种多维注意力机制以对卷积核空间的四个维度学习更灵活的…...

git代码回滚是使用reset还是revert
时光不能回退,Git却允许我们改变历史。 想要让Git回退历史,有以下步骤: 使用git log命令,查看分支提交历史,确认需要回退的版本 使用git reset --hard commit_id命令,进行版本回退 使用git push origin命…...
深入理解Java ThreadLocal及其内存泄漏防范
文章目录 一、ThreadLocal简介二、ThreadLocal的内存泄漏问题三、防止ThreadLocal导致的内存泄漏四、总结 一、ThreadLocal简介 在Java中,ThreadLocal是一种线程封闭的机制,其主要目的是为每个线程都创建一个单独的变量副本。这意味着,每个线…...

介绍10款ChatGPT替代产品
ChatGPT 引领着聊天 AI 的世界,许多人已经开始在日常生活中使用它。OpenAI 的 GPT-3 语言模型是聊天机器人的基础,它使得用户能够通过回答问题与 AI 进行交互。 GPT-4 的引入为机器人提供了更强大的功能。然而,它也有一个明显的缺点ÿ…...

数字逻辑 期末
概述 教材:《电子技术基础(数字部分)》 第六版 7400系列是TTL型芯片,商用型 数制 十进制->二进制 除2取余法&乘2取整法(注意精度,但计科简单不考) 十六进制->二进制 一位变四位 八…...

MT4交易外汇平台有哪些优势?为何是外汇投资首选?
外汇市场上存在着各种各样的外汇交易商,但是很多的外汇交易商所选择的交易平台都是MT4交易外汇平台。作为全世界范围内使用最为广泛的交易平台,MT4交易外汇平台具有哪些优势,能够让外汇交易商和外汇投资者都选择使用。本文就来具体的聊聊&…...
问卷调查工具实力榜单发布
问卷调查是从目标受众那里收集有价值的反馈和见解的有效方式。正确的调查问卷工具可以使问卷的创建、分发和分析变得更加容易和高效。在本文中,我们将问卷调查工具排行榜实力榜,为大家选择问卷平台的时候提供有价值的参考意见。 1、Zoho Survey Zoho S…...
javascript中property和attribute有什么区别?
在JavaScript中,“property”(属性)和"attribute"(属性)这两个术语用于描述对象的特性,但它们在含义和用法上有一些区别。 1、属性(Properties): 属性是属于J…...

快速上手kettle
一、前言 最近由于工作需要,需要用到kettle工具进行数据迁移转换。特意找资料学习了一下,kettle基本操作算是学会了。 所学的也结合实际工作进行了验证。为了防止以后用到忘记了,便写了几篇文章记录一下。 二 、ETL简介 ETL ( Extract-Tran…...
Leetcode 399. 除法求值
Leetcode 399. 除法求值题目 给你一个变量对数组 equations 和一个实数值数组 values 作为已知条件,其中 equations[i] [Ai, Bi] 和 values[i] 共同表示等式 Ai / Bi values[i] 。每个Ai 或 Bi 是一个表示单个变量的字符串。另有一些以数组 queries 表示的问题&am…...

MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误
HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误,它们的含义、原因和解决方法都有显著区别。以下是详细对比: 1. HTTP 406 (Not Acceptable) 含义: 客户端请求的内容类型与服务器支持的内容类型不匹…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例
一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心
当仓库学会“思考”,物流的终极形态正在诞生 想象这样的场景: 凌晨3点,某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径;AI视觉系统在0.1秒内扫描包裹信息;数字孪生平台正模拟次日峰值流量压力…...

10-Oracle 23 ai Vector Search 概述和参数
一、Oracle AI Vector Search 概述 企业和个人都在尝试各种AI,使用客户端或是内部自己搭建集成大模型的终端,加速与大型语言模型(LLM)的结合,同时使用检索增强生成(Retrieval Augmented Generation &#…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用
一、方案背景 在现代生产与生活场景中,如工厂高危作业区、医院手术室、公共场景等,人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式,存在效率低、覆盖面不足、判断主观性强等问题,难以满足对人员打手机行为精…...

STM32---外部32.768K晶振(LSE)无法起振问题
晶振是否起振主要就检查两个1、晶振与MCU是否兼容;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容(CL)与匹配电容(CL1、CL2)的关系 2. 如何选择 CL1 和 CL…...
提升移动端网页调试效率:WebDebugX 与常见工具组合实践
在日常移动端开发中,网页调试始终是一个高频但又极具挑战的环节。尤其在面对 iOS 与 Android 的混合技术栈、各种设备差异化行为时,开发者迫切需要一套高效、可靠且跨平台的调试方案。过去,我们或多或少使用过 Chrome DevTools、Remote Debug…...
Docker拉取MySQL后数据库连接失败的解决方案
在使用Docker部署MySQL时,拉取并启动容器后,有时可能会遇到数据库连接失败的问题。这种问题可能由多种原因导致,包括配置错误、网络设置问题、权限问题等。本文将分析可能的原因,并提供解决方案。 一、确认MySQL容器的运行状态 …...