kafka入门(二): 位移提交
位移提交:
Kafka的每条消息都有唯一的 offset, 用来表示消息在分区中对应的位置。有的也称之为 “偏移量”。
消费者每次在 poll() 拉取消息,它要返回的是还没有消费过的消息集,
因此,需要记录上一次消费时的消费位移,并且持久化。
消费者在消费完消息之后,需要执行消费位移的提交。
自动位移提交:
Kafka默认的消费位移的提交方式是 自动提交。
自动提交,由消费者客户端参数 enable.auto.commit 配置,默认值是 true。
默认的自动提交,是定期提交,提交的周期由 auto.commit.interval.ms 配置,默认是 5s。
自动位移提交,有可能会重复消费和消息丢失。
假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那又得从上一次位移提交的地方重新开始消费,这样就会重复消费。
手动位移提交:
手动位移提交,由消费者客户端参数 enable.auto.commit 配置, 设置为 false 就是手动位移提交。
手动位移提交,可以分为 同步提交、异步提交。
commitSync() 同步提交
同步提交,会阻塞消费者线程直到位移提交完成。
示例代码:
public class OffsetCommitSync {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//消费者订阅主题consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do something}//手动提交位移consumer.commitSync();System.out.println("手动提交位移成功.");}}public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//不自动提交,采用手动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}
commitAsync() 异步提交 :
异步提交,在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交,可以使消费者的性能得到一定的增强。
异步提交,将 consumer.commitSync(); 换成 commitAsync。
如果还需要回调,就用 OffsetCommitCallback对象作为参数。
示例如下:
public class OffsetCommitAsyncCallback {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do something}//异步回调,如果不需要回调,就采用无参的方法consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception == null) {System.out.println(offsets);} else {log.error("fail to commit offsets {}", offsets, exception);}}});}}public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}
参考资料:
《深入理解kafka:核心设计与实践原理》
相关文章:
kafka入门(二): 位移提交
位移提交: Kafka的每条消息都有唯一的 offset, 用来表示消息在分区中对应的位置。有的也称之为 “偏移量”。 消费者每次在 poll() 拉取消息,它要返回的是还没有消费过的消息集, 因此,需要记录上一次消费时的消费位…...
PG时间计算
PG数据库,时间计算使用场景总结 日期之差 --**获取秒差** SELECT round(date_part(epoch, TIMESTAMP 2019-05-05 12:11:20 - TIMESTAMP 2019-05-05 10:10:10)); --**获取分钟差** SELECT round(date_part(epoch, TIMESTAMP 2019-05-05 12:11:20 - TIMESTAMP 20…...
基于51单片机的交通灯_可调时间_夜间+紧急模式
51单片机交通灯 1 讲解视频:2 功能要求3 仿真图:4 原理图PCB5 实物图6 程序设计:7 设计报告8 资料清单(提供资料清单所有文件):设计资料下载链接: 51单片机简易交通灯_可调时间_夜间紧急 仿真代…...
网络通信原理,进制转化总结
来源,做个笔记,讲的还蛮清楚通信原理-2.5 数据封装与传输05_哔哩哔哩_bilibili ip地址范围...
西南科技大学(数据结构A)期末自测练习三
一、填空题(每空1分,共10分) 1、为解决计算机主机与打印机之间速度不匹配的问题,通常设置一个打印数据缓冲区。主机将要输出的数据依次写入缓冲区,打印机则依次从缓冲区中取出数据,则该换缓冲区的逻辑结构…...
【halcon】裁剪
前言 目前我遇到的裁剪相关的函数都是以clip打头的函数。一共4个: clip_end_points_contours_xldclip_contours_xldclip_regionclip_region_rel 前面两个是对轮廓的裁剪。 后面是对区域的裁剪。 裁剪轮廓的两端 clip_end_points_contours_xld 用于实现裁剪XLD…...
vue+less+style-resources-loader 配置全局颜色变量
全局统一样式后,可配置vue.config.js实现全局颜色变量,方便在编写时使用统一风格的色彩 一、新建global.less 二、下载安装style-resources-loader npm i style-resources-loader --save-dev三、在vue.config.js中进行配置 module.exports {pluginOpt…...
第二次量子化
专栏目录: 高质量文章导航-持续更新中 前置复盘: 玻色子和费米子: 首先,我们希望把描述单粒子态的量子力学推广到全同多粒子体系。我们的做法是从单粒子态的希尔伯特空间(Hilbert Space)出发,构造全同多粒子态的态空间——福克空间(Fock Space),它实际上就是无穷个…...
(三)Pytorch快速搭建卷积神经网络模型实现手写数字识别(代码+详细注解)
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言Q1:卷积网络和传统网络的区别Q2:卷积神经网络的架构Q3:卷积神经网络中的参数共享,也是比传统网络的优势所在4、 具体的实现代码网络搭建…...
【代码】多种调度模式下的光储电站经济性最优 储能容量配置分析matlab/yalmip
程序名称:多种调度模式下的光储电站经济性最优储能容量配置分析 实现平台:matlab-yalmip-cplex/gurobi 代码简介:代码主要做的是一个光储电站经济最优储能容量配置的问题,对光储电站中储能的容量进行优化,以实现经济…...
深度学习今年来经典模型优缺点总结,包括卷积、循环卷积、Transformer、LSTM、GANs等
文章目录 1、卷积神经网络(Convolutional Neural Networks,CNN)1.1 优点1.2 缺点1.3 应用场景1.4 网络图 2、循环神经网络(Recurrent Neural Networks,RNNs)2.1 优点2.2 缺点2.3 应用场景2.4 网络图 3、长短…...
ChatGPT成为“帮凶”:生成虚假数据集支持未知科学假设
ChatGPT 自发布以来,就成为了大家的好帮手,学生党和打工人更是每天都离不开。 然而这次好帮手 ChatGPT 却帮过头了,莫名奇妙的成为了“帮凶”,一位研究人员利用 ChatGPT 创建了虚假的数据集,用来支持未知的科学假设。…...
c#利用Forms.Timer定时检测Tcp连接状态
目的:本地创建客户端连接服务器端,如果连接正常显示连接正常如果连接异常显示连接异常。 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.T…...
空间注意力:改变我们理解图像的方式
空间注意力:改变我们理解图像的方式 欢迎来到深度学习和计算机视觉的新时代,在这里,空间注意力机制正改变着我们理解和处理图像的方式。本文将深入探讨空间注意力的概念,它如何工作,以及为什么它在现代图像处理技术中…...
【模型报错记录】‘PromptForGeneration‘ object has no attribute ‘can_generate‘
通过这个连接中的方法解决: “PromptForGeneration”对象没有属性“can_generate” 期刊 #277 thunlp/OpenPrompt GitHub的 问题描述:在使用model.generate() 的时候报错:PromptForGeneration object has no attribute can_generate 解决方法…...
mysql学习记录
关系型数据库:不是把所有的数据全部存储在一起,而是分类存储在一起。 常见的数据库 关系型:oracle大型收费,mysql小型免费。 sql语言(操作数据库) structured query language 结构化查询语言 1.DDL 数据定义语言 创建数…...
Hdoop学习笔记(HDP)-Part.11 安装Kerberos
目录 Part.01 关于HDP Part.02 核心组件原理 Part.03 资源规划 Part.04 基础环境配置 Part.05 Yum源配置 Part.06 安装OracleJDK Part.07 安装MySQL Part.08 部署Ambari集群 Part.09 安装OpenLDAP Part.10 创建集群 Part.11 安装Kerberos Part.12 安装HDFS Part.13 安装Ranger …...
浅谈UML的概念和模型之UML九种图
1、用例图(use case diagrams) 【概念】描述用户需求,从用户的角度描述系统的功能 【描述方式】椭圆表示某个用例;人形符号表示角色 【目的】帮组开发团队以一种可视化的方式理解系统的功能需求 【用例图】 2、静态图 类图&…...
杨志丰:OceanBase助力企业应对数据库转型深水区挑战
11 月 16 日,OceanBase 在北京顺利举办 2023 年度发布会,正式宣布:将持续践行“一体化”产品战略,为关键业务负载打造一体化数据库。OceanBase 产品总经理杨志丰发表了《助力企业应对数据库转型深水区挑战》主题演讲。 以下为演讲…...
版本控制系统Git学习笔记-Git分支操作
文章目录 概述一、Git分支简介1.1 基本概念1.2 创建分支1.3 分支切换1.4 删除分支 二、新建和合并分支2.1 工作流程示意图2.2 新建分支2.3 合并分支2.4 分支示例2.4.1 当前除了主分支,再次创建了两个分支2.4.2 先合并test1分支2.4.3 合并testbranch分支 2.5 解决合并…...
Opencv中的addweighted函数
一.addweighted函数作用 addweighted()是OpenCV库中用于图像处理的函数,主要功能是将两个输入图像(尺寸和类型相同)按照指定的权重进行加权叠加(图像融合),并添加一个标量值&#x…...
【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力
引言: 在人工智能快速发展的浪潮中,快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型(LLM)。该模型代表着该领域的重大突破,通过独特方式融合思考与非思考…...
linux 错误码总结
1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...
2021-03-15 iview一些问题
1.iview 在使用tree组件时,发现没有set类的方法,只有get,那么要改变tree值,只能遍历treeData,递归修改treeData的checked,发现无法更改,原因在于check模式下,子元素的勾选状态跟父节…...
Nuxt.js 中的路由配置详解
Nuxt.js 通过其内置的路由系统简化了应用的路由配置,使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...
Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)
在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马(服务器方面的)的原理,连接,以及各种木马及连接工具的分享 文件木马:https://w…...
Go语言多线程问题
打印零与奇偶数(leetcode 1116) 方法1:使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...
Caliper 配置文件解析:fisco-bcos.json
config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...
