Rocket重试机制,消息模式,刷盘方式
一、Consumer 批量消费(推模式)
Consumer端先启动
Consumer端后启动. 正常情况下:应该是Consumer需要先启动
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条
package quickstart;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,订阅消息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费 ,(消费顺序消息的时候设置)
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
System.out.println("msgs的长度" + msgs.size());
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});由于这里是Consumer先启动,所以他回去轮询MQ上是否有订阅队列的消息,由于每次producer插入一条,Consumer就拿一条所以测试结果如下(每次size都是1)
2、Consumer端后启动,也就是Producer先启动
由于这里是Consumer后启动,所以MQ上也就堆积了一堆数据,Consumer的
1、Producer端重试
也就是Producer往MQ上发消息没有发送成功,我们可以设置发送失败重试的次数,发送并触发回调函数
2、Consumer端重试
2.1、exception的情况,一般重复16次 10s、30s、1分钟、2分钟、3分钟等等
上面的代码中消费异常的情况返回
return ConsumeConcurrentlyStatus.RECONSUME_LATER;//重试
正常则返回:
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//成功
二、消息重试机制:消息重试分为2种
1、Producer端重试
2、Consumer端重试
consumer.start();
System.out.println("Consumer Started.");
}
}
consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条
//设置重试的次数
producer.setRetryTimesWhenSendFailed(3);
//开启生产者
producer.start();
//创建一条消息
Message msg = new Message("PushTopic", "push", "1", "我是一条普通消息".getBytes());
//发送消息
SendResult result = producer.send(msg);
//发送,并触发回调函数
producer.send(msg, new SendCallback() {
@Override
//成功的回调函数
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult.getSendStatus());
System.out.println("成功了");
}
@Override
//出现异常的回调函数
public void onException(Throwable e) {
System.out.println("失败了"+e.getMessage());
}
});
package quickstart;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,订阅消息
*/
public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// System.out.println("msgs的长度" + msgs.size());
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
for (MessageExt msg : msgs) {
String msgbody = new String(msg.getBody(), "utf-8");
if (msgbody.equals("Hello RocketMQ 4")) {
System.out.println("======错误=======");
int a = 1 / 0;
}
}
} catch (Exception e) {
e.printStackTrace();
if(msgs.get(0).getReconsumeTimes()==3){
//记录日志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}else{
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
假如超过了多少次之后我们可以让他不再重试记录 日志。
if(msgs.get(0).getReconsumeTimes()==3){
//记录日志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
2.2超时的情况,这种情况MQ会无限制的发送给消费端。
就是由于网络的情况,MQ发送数据之后,Consumer端并没有收到导致超时。也就是消费端没有给我返回return 任何状态,这样的就认为没有到达Consumer端。
这里模拟Producer只发送一条数据。consumer端暂停1分钟并且不发送接收状态给MQ
package model;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,订阅消息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");1、集群消费
2、广播消费
rocketMQ默认是集群消费,我们可以通过在Consumer来支持广播消费
三、消费模式
consumer.setConsumeMessageBatchMaxSize(10);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// 表示业务处理时间
System.out.println("=========开始暂停===============");
Thread.sleep(60000);
for (MessageExt msg : msgs) {
System.out.println(" Receive New Messages: " + msg);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费
package model;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
/**
* Consumer,订阅消息
*/
public class Consumer2 {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876");
consumer.setConsumeMessageBatchMaxSize(10);
consumer.setMessageModel(MessageModel.BROADCASTING);// 广播消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {异步复制和同步双写主要是主和从的关系。消息需要实时消费的,就需要采用主从模式部署
异步复制:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就算从producer端发送成功了,然后通过异步复制的方法将数据复制到从节点
同步双写:比如这里有一主一从,我们发送一条消息到主节点之后,这样消息就并不算从producer端发送成功了,需要通过同步双写的方法将数据同步到从节点后, 才算数据发
送成功。
如果rocketMq才用双master部署,Producer往MQ上写入20条数据 其中Master1中拉取了12条 。Master2中拉取了8 条,这种情况下,Master1宕机,那么我们消费数据的时
候,只能消费到Master2中的8条,Master1中的12条默认持久化,不会丢失消息,需要Master1恢复之后这12条数据才能继续被消费,如果想保证消息实时消费,就才用双
Master双Slave的模式
同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。
异步刷盘:,同步刷盘是指数据到达内存之后,返回producer说数据已经发送成功。,然后再写入commitlog日志。
commitlog:
commitlog就是来存储所有的元信息,包含消息体,类似于MySQL、Oracle的redolog,所以主要有CommitLog在,Consume Queue即使数据丢失,仍然可以恢复出来。
consumequeue:记录数据的位置,以便Consume快速通过consumequeue找到commitlog中的数据
当生产者向Kafka发送消息,且正常得到响应的时候,可以确保生产者不会产生重复的消息。但是,如果生产者发送消息后,遇到网络问题,无法获取响应,生产者就无法判断该
消息是否成功提交给了Kafka。根据生产者的机制,我们知道,当出现异常时,会进行消息重传,这就可能出现“At least one”语义。为了实现“Exactly once”语义,这里提供两个
可选方案:
如果业务数据产生消息可以找到合适的字段作为主键,或是有一个全局ID生成器,可以优先考虑选用第二种方案。
为了实现消费者的“Exactly once”语义,在这里提供一种方案,供读者参考:消费者将关闭自动提交offset的功能且不再手动提交offset,这样就不使用Offsets Topic这个内部
Topic记录其offset,而是由消费者自己保存offset。这里利用事务的原子性来实现“Exactly once”语义,我们将offset和消息处理结果放在一个事务中,事务执行成功则认为此消
息被消费,否则事务回滚需要重新消费。当出现消费者宕机重启或Rebalance操作时,消费者可以从关系型数据库中找到对应的offset,然后调用KafkaConsumer.seek()方法手
动设置消费位置,从此offset处开始继续消费。
ISR(In-SyncReplica)集合表示的是目前“可用”(alive)且消息量与Leader相差不多的副本集合,这是整个副本集合的一个子集。“可用”和“相差不多”都是很模糊的描述,其实际
含义是ISR集合中的副本必须满足下面两个条件:
四、conf下的配置文件说明
五、刷盘方式
传递保证语义:
At most once:消息可能会丢,但绝不会重复传递。
At least once:消息绝不会丢,但可能会重复传递。
Exactly once: 每条消息只会被传递一次。
生产者的“Exactly once”语义方案
每个分区只有一个生产者写入消息,当出现异常或超时的情况时,生产者就要查询此分区的最后一个消息,用来决定后续操作是消息重传还是继续发送。
为每个消息添加一个全局唯一主键,生产者不做其他特殊处理,按照之前分析方式进行重传,由消费者对消息进行去重,实现“Exactly once”语义。
相关文章:
Rocket重试机制,消息模式,刷盘方式
一、Consumer 批量消费(推模式) Consumer端先启动 Consumer端后启动. 正常情况下:应该是Consumer需要先启动 consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10条 package quickstart; import java.util.List; import co…...
linux+onenet可视化(图形化步骤)
文章目录 一、ONENET项目搭建1.1 ONENET注册1.2 创建产品与设备1.3 添加数据流 二、可视化配置 OneNET是由中国移动打造的PaaS物联网开放平台。平台能够帮助开发者轻松实现设备接入与设备连接,快速完成产品开发部署,为智能硬件、智能家居产品提供完善的物…...
汇编的基础
原视频 基础篇:1.1编程环境的安装 打开DOSBox 0.74-3 Options.bat调整窗口大小 windowresolution1200x640 outputddrawmount c D:\masm c: debugDEBUG 用Debug的R命令查看、改变CPU寄存器的内容: 用Debug的D命令查看内存中的内容: 用Debu…...
并发编程学习(十四):tomcat线程池
1、Tomcat 功能组件结构 Tomcat 的核心功能有两个,分别是负责接收和反馈外部请求的连接器 Connector,和负责处理请求的容器 Container。 其中连接器和容器相辅相成,一起构成了基本的 web 服务 Service。每个 Tomcat 服务器可以管理多个 Servi…...
简洁灵活工单管理系统,支持工单模版字段、工单状态自定义
一、开源项目简介 本项目为FeelDesk工单管理系统的开源版(OS),是基于开发者版(DEV)分离的标准版;支持工单模版字段、工单状态等自定义,可为不同的模版设置不同的路由规则;对工单需求…...
标签派单系统架构设计
需求描述 项目背景 根据员工历史成单情况,计算员工对不同类型工单的转化能力。根据员工和工单标签匹配进行派单。 业务流程图 规则描述 每10分钟,分城进行一次派单,派单规则可能会动态删减,需要支持动态配置 工单标签说明 一…...
Jmeter和Postman那个工具更适合做接口测试?
软件测试行业做功能测试和接口测试的人相对比较多。在测试工作中,有高手,自然也会有小白,但有一点我们无法否认,就是每一个高手都是从小白开始的,所以今天我们就来谈谈一大部分人在做的接口测试,小白变高手…...
k8s污点与容忍
1.前言 污点是给node节点打上污点标签,使得pod不能往该node节点上调度,污点有三种模式,分别是NoSchedule、PreferNoSchedule、NoExecute,容忍是给pod打上和node节点一样的污点标签,使pod能调度到带有该污点标签的node…...
市面上有哪些软件可以结合agentgpt的?众包平台结合的好处!
使用AgentGPT,提升工作效率! 随着科技的迅速发展,人工智能已经成为我们生活中不可或缺的一部分。而AgentGPT则是人工智能领域的一款杰出产品,它能够帮助我们提升工作效率,减少重复性劳动,让我们的生活更加便…...
【js】对象属性的拦截和Proxy代理与Reflect映射的用法与区别
✍️ 作者简介: 前端新手学习中。 💂 作者主页: 作者主页查看更多前端教学 🎓 专栏分享:css重难点教学 Node.js教学 从头开始学习 ajax学习 文章目录 对象属性的拦截介绍SetGet 对象的拦截介绍使用对象属性拦截和对象拦截区别练习题 映射…...
Yolov8涨点神器:ODConv+ConvNeXt提升小目标检测能力
1.涨点神器结合,助力YOLO 1.1 ICLR 2022涨点神器——即插即用的动态卷积ODConv 论文:Omni-Dimensional Dynamic Convolution 论文地址:Omni-Dimensional Dynamic Convolution | OpenReview ODConv通过并行策略引入一种多维注意力机制以对卷积核空间的四个维度学习更灵活的…...
git代码回滚是使用reset还是revert
时光不能回退,Git却允许我们改变历史。 想要让Git回退历史,有以下步骤: 使用git log命令,查看分支提交历史,确认需要回退的版本 使用git reset --hard commit_id命令,进行版本回退 使用git push origin命…...
深入理解Java ThreadLocal及其内存泄漏防范
文章目录 一、ThreadLocal简介二、ThreadLocal的内存泄漏问题三、防止ThreadLocal导致的内存泄漏四、总结 一、ThreadLocal简介 在Java中,ThreadLocal是一种线程封闭的机制,其主要目的是为每个线程都创建一个单独的变量副本。这意味着,每个线…...
介绍10款ChatGPT替代产品
ChatGPT 引领着聊天 AI 的世界,许多人已经开始在日常生活中使用它。OpenAI 的 GPT-3 语言模型是聊天机器人的基础,它使得用户能够通过回答问题与 AI 进行交互。 GPT-4 的引入为机器人提供了更强大的功能。然而,它也有一个明显的缺点ÿ…...
数字逻辑 期末
概述 教材:《电子技术基础(数字部分)》 第六版 7400系列是TTL型芯片,商用型 数制 十进制->二进制 除2取余法&乘2取整法(注意精度,但计科简单不考) 十六进制->二进制 一位变四位 八…...
MT4交易外汇平台有哪些优势?为何是外汇投资首选?
外汇市场上存在着各种各样的外汇交易商,但是很多的外汇交易商所选择的交易平台都是MT4交易外汇平台。作为全世界范围内使用最为广泛的交易平台,MT4交易外汇平台具有哪些优势,能够让外汇交易商和外汇投资者都选择使用。本文就来具体的聊聊&…...
问卷调查工具实力榜单发布
问卷调查是从目标受众那里收集有价值的反馈和见解的有效方式。正确的调查问卷工具可以使问卷的创建、分发和分析变得更加容易和高效。在本文中,我们将问卷调查工具排行榜实力榜,为大家选择问卷平台的时候提供有价值的参考意见。 1、Zoho Survey Zoho S…...
javascript中property和attribute有什么区别?
在JavaScript中,“property”(属性)和"attribute"(属性)这两个术语用于描述对象的特性,但它们在含义和用法上有一些区别。 1、属性(Properties): 属性是属于J…...
快速上手kettle
一、前言 最近由于工作需要,需要用到kettle工具进行数据迁移转换。特意找资料学习了一下,kettle基本操作算是学会了。 所学的也结合实际工作进行了验证。为了防止以后用到忘记了,便写了几篇文章记录一下。 二 、ETL简介 ETL ( Extract-Tran…...
Leetcode 399. 除法求值
Leetcode 399. 除法求值题目 给你一个变量对数组 equations 和一个实数值数组 values 作为已知条件,其中 equations[i] [Ai, Bi] 和 values[i] 共同表示等式 Ai / Bi values[i] 。每个Ai 或 Bi 是一个表示单个变量的字符串。另有一些以数组 queries 表示的问题&am…...
RocketMQ延迟消息机制
两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数,对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后…...
阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
AtCoder 第409场初级竞赛 A~E题解
A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...
家政维修平台实战20:权限设计
目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系,主要是分成几个表,用户表我们是记录用户的基础信息,包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题,不同的角色…...
VTK如何让部分单位不可见
最近遇到一个需求,需要让一个vtkDataSet中的部分单元不可见,查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行,是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示,主要是最后一个参数,透明度…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...
在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案
这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...
基于Java+VUE+MariaDB实现(Web)仿小米商城
仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意:运行前…...
