当前位置: 首页 > news >正文

【快速解决】kafka崩了,重启之后,想继续消费,怎么做?

目录

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

1、一个问题:

 2、查看消费者消费主题__consumer_offsets

3、一个重要前提:消费时要提交offset

二、指定 Offset 消费


假如遇到kafka崩了,你重启kafka之后,想要继续消费,应该怎么办?

  1. 首先确定要消费的主题是哪几个
  2. 其次使用命令或者其他的组件查看 __consumer_offset 主题下的偏移量信息,找到我们关心的主题在崩溃之前消费到了哪里。
  3. 最后使用 java 代码,里面有一个非常重要的方法 seek,指定需要消费的主题,分区以及偏移量,就可以继续消费了。

下面是解决这个问题的具体步骤!!! 

一、怎么寻找我们关心的主题在崩溃之前消费到了哪里? 

        因为__consumer_offset 主题下记录了主题的偏移量信息,所以提交offset之后,消费__consumer_offset 主题便可查看所有主题的偏移量信息

1、一个问题:

__consumer_offset 主题下的数据是不能查看的,怎么解决?

解决方案:

在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false。然后分发一下

分发:(这里我使用了脚本)

xsync.sh /opt/installs/kafka3/config/consumer.properties

注意:修改之前要先关闭kafka和zookeeper,修改完毕后再开启!

说明:默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。如果不修改是无法查看offset的值的,因为这些都是加密数据。

 2、查看消费者消费主题__consumer_offsets

kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server bigdata01:9092 --consumer.config /opt/installs/kafka3/config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" 

查询结果如下:

图中:1是消费者组名;2是Topic主题名;3是分区;4是偏移量,说明该主题崩溃前消费到了这里

此时便查询到了偏移量信息!!!

3、一个重要前提:消费时要提交offset

能查询到偏移量的前提是消费时要自动提交 offset(默认开启)或者手动提交 offset

一般不用管,因为自动提交会默认开启!!!

二、指定 Offset 消费

 kafka提供了seek方法,可以让我们从分区的固定位置开始消费。

seek (TopicPartition Partition,offset offset):指定分区和偏移量

package com.bigdata._03offsetTest;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;public class _03CustomConsumerSeek {public static void main(String[] args) {Properties properties = new Properties();//连接kafka setProperty和put都行properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组(组名任意起名) 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test5");// 是否自动提交 offset  通过这个字段设置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);// 消费者消费的是kafka集群的数据,消费哪个主题的数据呢?List<String> topics = new ArrayList<>();topics.add("bigdata");// list可以设置多个主题的名称kafkaConsumer.subscribe(topics);// 执行计划// 此时的消费计划是空的,因为没有时间生成Set<TopicPartition> assignment = kafkaConsumer.assignment();while(assignment.size() == 0){// 这个本身是拉取数据的代码,此处可以帮助快速构建分区方案出来kafkaConsumer.poll(Duration.ofSeconds(1));// 一直获取它的分区方案,什么时候有了,就什么时候跳出这个循环assignment = kafkaConsumer.assignment();}// 获取分区0的offset =100 以后的数据kafkaConsumer.seek(new TopicPartition("bigdata",0),100);// 因为消费者是不停的消费,所以是while truewhile(true){// 每隔一秒钟,从kafka 集群中拉取一次数据,有可能拉取多条数据ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));// 循环打印每一条数据for (ConsumerRecord record:records) {// 打印一条数据System.out.println(record);// 打印数据中的值System.out.println(record.value());}}}
}

执行这个java代码就可以从精确的指定位置继续消费了!!!

结果如下:

从上图可以看出,确实是从指定的主题、分区、偏移量开始消费的! 

相关文章:

【快速解决】kafka崩了,重启之后,想继续消费,怎么做?

目录 一、怎么寻找我们关心的主题在崩溃之前消费到了哪里&#xff1f; 1、一个问题&#xff1a; 2、查看消费者消费主题__consumer_offsets 3、一个重要前提&#xff1a;消费时要提交offset 二、指定 Offset 消费 假如遇到kafka崩了&#xff0c;你重启kafka之后&#xff0…...

C++ 的发展

目录 C 的发展总结&#xff1a;​编辑 1. C 的早期发展&#xff08;1979-1985&#xff09; 2. C 标准化过程&#xff08;1985-1998&#xff09; 3. C 标准演化&#xff08;2003-2011&#xff09; 4. C11&#xff08;2011年&#xff09; 5. C14&#xff08;2014年&#xf…...

RabbitMQ 高级特性——延迟队列

文章目录 前言延迟队列延迟队列的概念TTL 死信队列模拟延迟队列设置队列的 TTL设置消息的 TTL 延迟队列插件安装并且启动插件服务使用插件实现延迟功能 前言 前面我们学习了 TTL 和死信队列&#xff0c;当队列中的消息达到了过期时间之后&#xff0c;那么这个消息就会被死信交…...

‌EAC(Estimate at Completion)和ETC(Estimate to Complete)

‌EAC 预计完工成本ETC 预计尚需成本Estimate at CompletionEstimate to Complete完成预估完工时尚需成本估算 EAC ETC ACETC EAC – AC 预测项目总成本&#xff0c;包含了到目前为止实际发生的成本&#xff08;AC&#xff09;和预计将发生的成本。如果EAC大于BAC&#xf…...

【React】状态管理之Zustand

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 状态管理之Zustand引言1. Zustand 的核心特点1.1 简单直观的 API1.2 无需 Provi…...

Vue3打包自动生成版本JSON文件,添加系统版本检查,实现系统自动更新提示

实现该功能一共有三步。废话不多说&#xff0c;直接上代码&#xff01;&#xff01;&#xff01; 第一步&#xff1a;打包时自动生成版本信息的js文件&#xff0c;versionUpdate.js import fs from fs; import path from path; import { ElMessageBox } from element-plus; i…...

海量数据有限内存系列问题解决方案

1. 排序问题 有限数据充足内存&#xff1a;内存中有十万整数&#xff0c;对所有数据进行排序。 内部排序即可 单节点海量数据有限内存&#xff1a;某台机器有一个文件&#xff0c;文件中包含六十亿整数&#xff0c;一个整数一行&#xff0c;可用内存1G&#xff0c;对所有数据…...

FFmpeg 4.3 音视频-多路H265监控录放C++开发十四,总结编码过程,从摄像头获得数据后,转成AVFrame,然后再次转成AVPacket,

也就是将摄像头采集到的YUV 的数据换成 AVFrame&#xff0c;然后再次转成 AVPacket&#xff0c;那么这AVPakcet数据要怎么办呢&#xff1f;分为三种情况&#xff1a; 一种是将AVPacket存储成h264文件&#xff0c;由于h264编码器在将avframe变成avpacket的时候就是按照h264的格…...

内容占位符:Kinetic Loader HTML+CSS 使用CSS制作三角形原理

内容占位符 前言 随着我们对HTML和CSS3的学习逐渐深入&#xff0c;相信大家都已经掌握了网页制作的基础知识&#xff0c;包括如何使用HTML标记构建网页结构&#xff0c;以及如何运用CSS样式美化页面。为了进一步巩固和熟练这些技能&#xff0c;今天我们一起来完成一个有趣且实…...

麒麟nginx配置

一、配置负载均衡 配置麒麟的yum源 vim /etc/yum.repos.d/kylin_aarch64.repo Copy 删除原来内容&#xff0c;写入如下yum源 [ks10-adv-os] name Kylin Linux Advanced Server 10 - Os baseurl http://update.cs2c.com.cn:8080/NS/V10/V10SP2/os/adv/lic/base/aarch64/ …...

如何在 Ubuntu 上安装 Emby 媒体服务器

Emby 是一个开源的媒体服务器解决方案&#xff0c;它能让你整理、流媒体播放和分享你的个人媒体收藏&#xff0c;包括电影、音乐、电视节目和照片。Emby 帮你集中多媒体内容&#xff0c;让你无论在家还是在外都能轻松访问。它还支持转码&#xff0c;让你能够播放各种格式的内容…...

Mac上详细配置java开发环境和软件(更新中)

文章目录 概要JDK的配置JDK下载安装配置JDK环境变量文件 Idea的安装Mysql安装和配置Navicat Premium16.1安装安装Vscode安装和配置Maven配置本地仓库配置阿里云私服Idea集成Maven 概要 这里使用的是M3型片 14.6版本的Mac 用到的资源放在网盘 链接: https://pan.baidu.com/s/17…...

jmeter常用配置元件介绍总结之定时器

系列文章目录 安装jmeter jmeter常用配置元件介绍总结之定时器 5.定时器5.1.固定定时器5.2.统一随机定时器5.3.Precise Throughput Timer5.4.Constant Throughput Timer5.5.Synchronizing Timer5.6.泊松随机定时器5.7.高斯随机定时器 5.定时器 5.1.固定定时器 固定定时器Cons…...

Spring——提前编译

提前编译&#xff1a;AOT AOT概述 JIT与AOT的区别 JIT和AOT 这个名词是指两种不同的编译方式&#xff0c;这两种编译方式的主要区别在于是否在“运行时”进行编译 &#xff08;1&#xff09;JIT&#xff0c; Just-in-time,动态(即时)编译&#xff0c;边运行边编译&#xff1…...

乐理的学习(音程)

二度&#xff0c;三度&#xff0c;六度&#xff0c;七度的大n度都是直接的音名到音名&#xff0c;如#A到#G的&#xff0c;这样为大n度 而这个基础上向内收&#xff0c;收半音为小n度&#xff0c;在小n度再收&#xff0c;为减n度 在大n度的基础上再向外扩半音&#xff0c;为增…...

【网络】数据链路层协议——以太网,ARP协议

> 作者&#xff1a;დ旧言~ > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 目标&#xff1a;了解什么是以太网协议和ARP协议。 > 毒鸡汤&#xff1a;有些事情&#xff0c;总是不明白&#xff0c;所以我不会坚持。早安! > 专栏选自&#xf…...

Linux分区、挂载、配额、逻辑卷、RAID、系统综合状态查看

分区与挂载 fdisk fdisk 命令是一个用于磁盘分区管理的命令行工具&#xff0c;可以用来创建、删除、调整分区等操作。常用的 fdisk 命令选项包括&#xff1a; fdisk -l&#xff1a;列出系统中的所有磁盘分区信息。 fdisk /dev/sdX&#xff1a;打开指定磁盘进行分区操作。 n&…...

3D Gaussian Splatting 代码层理解之Part1

2023 年初,来自蔚蓝海岸大学和 马克斯普朗克学会的作者发表了一篇题为“用于实时现场渲染的 3D 高斯泼溅”的论文。该论文提出了实时神经渲染的重大进步,超越了NeRF等以前方法的实用性。高斯泼溅不仅减少了延迟,而且达到或超过了 NeRF 的渲染质量,在神经渲染领域掀起了一场…...

Qt小知识-Q_GLOBAL_STATIC

你还在为创建全局静态对象烦恼嘛&#xff0c;它来了&#xff01;它来了&#xff01; qt5提供了两个宏定义Q_GLOBAL_STATIC和Q_GLOBAL_STATIC_WITH_ARGS来实现。可以创建一个全局静态对象&#xff0c;对象在第一次使用时初始化自身&#xff0c;这意味着它不会增加应用程序或库的…...

【SpringBoot】使用过滤器进行XSS防御

在Spring Boot中&#xff0c;我们可以使用注解的方式来进行XSS防御。注解是一种轻量级的防御手段&#xff0c;它可以在方法或字段级别对输入进行校验&#xff0c;从而防止XSS攻击。 而想对全局的请求都进行XSS防御可以使用servlet中的过滤器或者spring mvc中的拦截器&#xff…...

springboot 百货中心供应链管理系统小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;百货中心供应链管理系统被用户普遍使用&#xff0c;为方…...

<6>-MySQL表的增删查改

目录 一&#xff0c;create&#xff08;创建表&#xff09; 二&#xff0c;retrieve&#xff08;查询表&#xff09; 1&#xff0c;select列 2&#xff0c;where条件 三&#xff0c;update&#xff08;更新表&#xff09; 四&#xff0c;delete&#xff08;删除表&#xf…...

K8S认证|CKS题库+答案| 11. AppArmor

目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作&#xff1a; 1&#xff09;、切换集群 2&#xff09;、切换节点 3&#xff09;、切换到 apparmor 的目录 4&#xff09;、执行 apparmor 策略模块 5&#xff09;、修改 pod 文件 6&#xff09;、…...

2024年赣州旅游投资集团社会招聘笔试真

2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)

笔记整理&#xff1a;刘治强&#xff0c;浙江大学硕士生&#xff0c;研究方向为知识图谱表示学习&#xff0c;大语言模型 论文链接&#xff1a;http://arxiv.org/abs/2407.16127 发表会议&#xff1a;ISWC 2024 1. 动机 传统的知识图谱补全&#xff08;KGC&#xff09;模型通过…...

基于Java+MySQL实现(GUI)客户管理系统

客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息&#xff0c;对客户进行统一管理&#xff0c;可以把所有客户信息录入系统&#xff0c;进行维护和统计功能。可通过文件的方式保存相关录入数据&#xff0c;对…...

Java数值运算常见陷阱与规避方法

整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...

学习一下用鸿蒙​​DevEco Studio HarmonyOS5实现百度地图

在鸿蒙&#xff08;HarmonyOS5&#xff09;中集成百度地图&#xff0c;可以通过以下步骤和技术方案实现。结合鸿蒙的分布式能力和百度地图的API&#xff0c;可以构建跨设备的定位、导航和地图展示功能。 ​​1. 鸿蒙环境准备​​ ​​开发工具​​&#xff1a;下载安装 ​​De…...

五子棋测试用例

一.项目背景 1.1 项目简介 传统棋类文化的推广 五子棋是一种古老的棋类游戏&#xff0c;有着深厚的文化底蕴。通过将五子棋制作成网页游戏&#xff0c;可以让更多的人了解和接触到这一传统棋类文化。无论是国内还是国外的玩家&#xff0c;都可以通过网页五子棋感受到东方棋类…...