kafka入门(二): 位移提交
位移提交:
Kafka的每条消息都有唯一的 offset, 用来表示消息在分区中对应的位置。有的也称之为 “偏移量”。
消费者每次在 poll() 拉取消息,它要返回的是还没有消费过的消息集,
因此,需要记录上一次消费时的消费位移,并且持久化。
消费者在消费完消息之后,需要执行消费位移的提交。
自动位移提交:
Kafka默认的消费位移的提交方式是 自动提交。
自动提交,由消费者客户端参数 enable.auto.commit 配置,默认值是 true。
默认的自动提交,是定期提交,提交的周期由 auto.commit.interval.ms 配置,默认是 5s。
自动位移提交,有可能会重复消费和消息丢失。
假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那又得从上一次位移提交的地方重新开始消费,这样就会重复消费。
手动位移提交:
手动位移提交,由消费者客户端参数 enable.auto.commit 配置, 设置为 false 就是手动位移提交。
手动位移提交,可以分为 同步提交、异步提交。
commitSync() 同步提交
同步提交,会阻塞消费者线程直到位移提交完成。
示例代码:
public class OffsetCommitSync {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//消费者订阅主题consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do something}//手动提交位移consumer.commitSync();System.out.println("手动提交位移成功.");}}public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//不自动提交,采用手动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}
commitAsync() 异步提交 :
异步提交,在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交,可以使消费者的性能得到一定的增强。
异步提交,将 consumer.commitSync(); 换成 commitAsync。
如果还需要回调,就用 OffsetCommitCallback对象作为参数。
示例如下:
public class OffsetCommitAsyncCallback {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do something}//异步回调,如果不需要回调,就采用无参的方法consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception == null) {System.out.println(offsets);} else {log.error("fail to commit offsets {}", offsets, exception);}}});}}public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}
参考资料:
《深入理解kafka:核心设计与实践原理》
相关文章:
kafka入门(二): 位移提交
位移提交: Kafka的每条消息都有唯一的 offset, 用来表示消息在分区中对应的位置。有的也称之为 “偏移量”。 消费者每次在 poll() 拉取消息,它要返回的是还没有消费过的消息集, 因此,需要记录上一次消费时的消费位…...
PG时间计算
PG数据库,时间计算使用场景总结 日期之差 --**获取秒差** SELECT round(date_part(epoch, TIMESTAMP 2019-05-05 12:11:20 - TIMESTAMP 2019-05-05 10:10:10)); --**获取分钟差** SELECT round(date_part(epoch, TIMESTAMP 2019-05-05 12:11:20 - TIMESTAMP 20…...
基于51单片机的交通灯_可调时间_夜间+紧急模式
51单片机交通灯 1 讲解视频:2 功能要求3 仿真图:4 原理图PCB5 实物图6 程序设计:7 设计报告8 资料清单(提供资料清单所有文件):设计资料下载链接: 51单片机简易交通灯_可调时间_夜间紧急 仿真代…...
网络通信原理,进制转化总结
来源,做个笔记,讲的还蛮清楚通信原理-2.5 数据封装与传输05_哔哩哔哩_bilibili ip地址范围...
西南科技大学(数据结构A)期末自测练习三
一、填空题(每空1分,共10分) 1、为解决计算机主机与打印机之间速度不匹配的问题,通常设置一个打印数据缓冲区。主机将要输出的数据依次写入缓冲区,打印机则依次从缓冲区中取出数据,则该换缓冲区的逻辑结构…...
【halcon】裁剪
前言 目前我遇到的裁剪相关的函数都是以clip打头的函数。一共4个: clip_end_points_contours_xldclip_contours_xldclip_regionclip_region_rel 前面两个是对轮廓的裁剪。 后面是对区域的裁剪。 裁剪轮廓的两端 clip_end_points_contours_xld 用于实现裁剪XLD…...
vue+less+style-resources-loader 配置全局颜色变量
全局统一样式后,可配置vue.config.js实现全局颜色变量,方便在编写时使用统一风格的色彩 一、新建global.less 二、下载安装style-resources-loader npm i style-resources-loader --save-dev三、在vue.config.js中进行配置 module.exports {pluginOpt…...
第二次量子化
专栏目录: 高质量文章导航-持续更新中 前置复盘: 玻色子和费米子: 首先,我们希望把描述单粒子态的量子力学推广到全同多粒子体系。我们的做法是从单粒子态的希尔伯特空间(Hilbert Space)出发,构造全同多粒子态的态空间——福克空间(Fock Space),它实际上就是无穷个…...
(三)Pytorch快速搭建卷积神经网络模型实现手写数字识别(代码+详细注解)
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言Q1:卷积网络和传统网络的区别Q2:卷积神经网络的架构Q3:卷积神经网络中的参数共享,也是比传统网络的优势所在4、 具体的实现代码网络搭建…...
【代码】多种调度模式下的光储电站经济性最优 储能容量配置分析matlab/yalmip
程序名称:多种调度模式下的光储电站经济性最优储能容量配置分析 实现平台:matlab-yalmip-cplex/gurobi 代码简介:代码主要做的是一个光储电站经济最优储能容量配置的问题,对光储电站中储能的容量进行优化,以实现经济…...
深度学习今年来经典模型优缺点总结,包括卷积、循环卷积、Transformer、LSTM、GANs等
文章目录 1、卷积神经网络(Convolutional Neural Networks,CNN)1.1 优点1.2 缺点1.3 应用场景1.4 网络图 2、循环神经网络(Recurrent Neural Networks,RNNs)2.1 优点2.2 缺点2.3 应用场景2.4 网络图 3、长短…...
ChatGPT成为“帮凶”:生成虚假数据集支持未知科学假设
ChatGPT 自发布以来,就成为了大家的好帮手,学生党和打工人更是每天都离不开。 然而这次好帮手 ChatGPT 却帮过头了,莫名奇妙的成为了“帮凶”,一位研究人员利用 ChatGPT 创建了虚假的数据集,用来支持未知的科学假设。…...
c#利用Forms.Timer定时检测Tcp连接状态
目的:本地创建客户端连接服务器端,如果连接正常显示连接正常如果连接异常显示连接异常。 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.T…...
空间注意力:改变我们理解图像的方式
空间注意力:改变我们理解图像的方式 欢迎来到深度学习和计算机视觉的新时代,在这里,空间注意力机制正改变着我们理解和处理图像的方式。本文将深入探讨空间注意力的概念,它如何工作,以及为什么它在现代图像处理技术中…...
【模型报错记录】‘PromptForGeneration‘ object has no attribute ‘can_generate‘
通过这个连接中的方法解决: “PromptForGeneration”对象没有属性“can_generate” 期刊 #277 thunlp/OpenPrompt GitHub的 问题描述:在使用model.generate() 的时候报错:PromptForGeneration object has no attribute can_generate 解决方法…...
mysql学习记录
关系型数据库:不是把所有的数据全部存储在一起,而是分类存储在一起。 常见的数据库 关系型:oracle大型收费,mysql小型免费。 sql语言(操作数据库) structured query language 结构化查询语言 1.DDL 数据定义语言 创建数…...
Hdoop学习笔记(HDP)-Part.11 安装Kerberos
目录 Part.01 关于HDP Part.02 核心组件原理 Part.03 资源规划 Part.04 基础环境配置 Part.05 Yum源配置 Part.06 安装OracleJDK Part.07 安装MySQL Part.08 部署Ambari集群 Part.09 安装OpenLDAP Part.10 创建集群 Part.11 安装Kerberos Part.12 安装HDFS Part.13 安装Ranger …...
浅谈UML的概念和模型之UML九种图
1、用例图(use case diagrams) 【概念】描述用户需求,从用户的角度描述系统的功能 【描述方式】椭圆表示某个用例;人形符号表示角色 【目的】帮组开发团队以一种可视化的方式理解系统的功能需求 【用例图】 2、静态图 类图&…...
杨志丰:OceanBase助力企业应对数据库转型深水区挑战
11 月 16 日,OceanBase 在北京顺利举办 2023 年度发布会,正式宣布:将持续践行“一体化”产品战略,为关键业务负载打造一体化数据库。OceanBase 产品总经理杨志丰发表了《助力企业应对数据库转型深水区挑战》主题演讲。 以下为演讲…...
版本控制系统Git学习笔记-Git分支操作
文章目录 概述一、Git分支简介1.1 基本概念1.2 创建分支1.3 分支切换1.4 删除分支 二、新建和合并分支2.1 工作流程示意图2.2 新建分支2.3 合并分支2.4 分支示例2.4.1 当前除了主分支,再次创建了两个分支2.4.2 先合并test1分支2.4.3 合并testbranch分支 2.5 解决合并…...
工业安全零事故的智能守护者:一体化AI智能安防平台
前言: 通过AI视觉技术,为船厂提供全面的安全监控解决方案,涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面,能够实现对应负责人反馈机制,并最终实现数据的统计报表。提升船厂…...
深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法
深入浅出:JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中,随机数的生成看似简单,却隐藏着许多玄机。无论是生成密码、加密密钥,还是创建安全令牌,随机数的质量直接关系到系统的安全性。Jav…...
线程与协程
1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指:像函数调用/返回一样轻量地完成任务切换。 举例说明: 当你在程序中写一个函数调用: funcA() 然后 funcA 执行完后返回&…...
GitHub 趋势日报 (2025年06月08日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...
Python如何给视频添加音频和字幕
在Python中,给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加,包括必要的代码示例和详细解释。 环境准备 在开始之前,需要安装以下Python库:…...
【Java_EE】Spring MVC
目录 Spring Web MVC 编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 编辑参数重命名 RequestParam 编辑编辑传递集合 RequestParam 传递JSON数据 编辑RequestBody …...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
大数据学习(132)-HIve数据分析
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言Ǵ…...
排序算法总结(C++)
目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指:同样大小的样本 **(同样大小的数据)**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...
Python Einops库:深度学习中的张量操作革命
Einops(爱因斯坦操作库)就像给张量操作戴上了一副"语义眼镜"——让你用人类能理解的方式告诉计算机如何操作多维数组。这个基于爱因斯坦求和约定的库,用类似自然语言的表达式替代了晦涩的API调用,彻底改变了深度学习工程…...
