当前位置: 首页 > news >正文

kafka入门(二): 位移提交

位移提交:

Kafka的每条消息都有唯一的 offset, 用来表示消息在分区中对应的位置。有的也称之为 “偏移量”。

消费者每次在 poll() 拉取消息,它要返回的是还没有消费过的消息集,

因此,需要记录上一次消费时的消费位移,并且持久化。

消费者在消费完消息之后,需要执行消费位移的提交。

自动位移提交:

Kafka默认的消费位移的提交方式是 自动提交。

自动提交,由消费者客户端参数 enable.auto.commit 配置,默认值是 true。

默认的自动提交,是定期提交,提交的周期由 auto.commit.interval.ms 配置,默认是 5s。

自动位移提交,有可能会重复消费和消息丢失。

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那又得从上一次位移提交的地方重新开始消费,这样就会重复消费。

手动位移提交:

手动位移提交,由消费者客户端参数 enable.auto.commit 配置, 设置为 false 就是手动位移提交。

手动位移提交,可以分为 同步提交、异步提交。

commitSync() 同步提交

同步提交,会阻塞消费者线程直到位移提交完成。

示例代码:

public class OffsetCommitSync {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//消费者订阅主题consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do something}//手动提交位移consumer.commitSync();System.out.println("手动提交位移成功.");}}public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//不自动提交,采用手动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}

commitAsync() 异步提交 :

异步提交,在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交,可以使消费者的性能得到一定的增强。

异步提交,将 consumer.commitSync(); 换成 commitAsync。

如果还需要回调,就用 OffsetCommitCallback对象作为参数。

示例如下:

public class OffsetCommitAsyncCallback {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do something}//异步回调,如果不需要回调,就采用无参的方法consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception == null) {System.out.println(offsets);} else {log.error("fail to commit offsets {}", offsets, exception);}}});}}public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}

参考资料:

《深入理解kafka:核心设计与实践原理》

相关文章:

kafka入门(二): 位移提交

位移提交&#xff1a; Kafka的每条消息都有唯一的 offset&#xff0c; 用来表示消息在分区中对应的位置。有的也称之为 “偏移量”。 消费者每次在 poll() 拉取消息&#xff0c;它要返回的是还没有消费过的消息集&#xff0c; 因此&#xff0c;需要记录上一次消费时的消费位…...

PG时间计算

PG数据库&#xff0c;时间计算使用场景总结 日期之差 --**获取秒差** SELECT round(date_part(epoch, TIMESTAMP 2019-05-05 12:11:20 - TIMESTAMP 2019-05-05 10:10:10)); --**获取分钟差** SELECT round(date_part(epoch, TIMESTAMP 2019-05-05 12:11:20 - TIMESTAMP 20…...

基于51单片机的交通灯_可调时间_夜间+紧急模式

51单片机交通灯 1 讲解视频&#xff1a;2 功能要求3 仿真图&#xff1a;4 原理图PCB5 实物图6 程序设计&#xff1a;7 设计报告8 资料清单&#xff08;提供资料清单所有文件&#xff09;&#xff1a;设计资料下载链接&#xff1a; 51单片机简易交通灯_可调时间_夜间紧急 仿真代…...

网络通信原理,进制转化总结

来源&#xff0c;做个笔记&#xff0c;讲的还蛮清楚通信原理-2.5 数据封装与传输05_哔哩哔哩_bilibili ip地址范围...

西南科技大学(数据结构A)期末自测练习三

一、填空题&#xff08;每空1分&#xff0c;共10分&#xff09; 1、为解决计算机主机与打印机之间速度不匹配的问题&#xff0c;通常设置一个打印数据缓冲区。主机将要输出的数据依次写入缓冲区&#xff0c;打印机则依次从缓冲区中取出数据&#xff0c;则该换缓冲区的逻辑结构…...

【halcon】裁剪

前言 目前我遇到的裁剪相关的函数都是以clip打头的函数。一共4个&#xff1a; clip_end_points_contours_xldclip_contours_xldclip_regionclip_region_rel 前面两个是对轮廓的裁剪。 后面是对区域的裁剪。 裁剪轮廓的两端 clip_end_points_contours_xld 用于实现裁剪XLD…...

vue+less+style-resources-loader 配置全局颜色变量

全局统一样式后&#xff0c;可配置vue.config.js实现全局颜色变量&#xff0c;方便在编写时使用统一风格的色彩 一、新建global.less 二、下载安装style-resources-loader npm i style-resources-loader --save-dev三、在vue.config.js中进行配置 module.exports {pluginOpt…...

第二次量子化

专栏目录: 高质量文章导航-持续更新中 前置复盘: 玻色子和费米子: 首先,我们希望把描述单粒子态的量子力学推广到全同多粒子体系。我们的做法是从单粒子态的希尔伯特空间(Hilbert Space)出发,构造全同多粒子态的态空间——福克空间(Fock Space),它实际上就是无穷个…...

(三)Pytorch快速搭建卷积神经网络模型实现手写数字识别(代码+详细注解)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言Q1&#xff1a;卷积网络和传统网络的区别Q2:卷积神经网络的架构Q3:卷积神经网络中的参数共享&#xff0c;也是比传统网络的优势所在4、 具体的实现代码网络搭建…...

【代码】多种调度模式下的光储电站经济性最优 储能容量配置分析matlab/yalmip

程序名称&#xff1a;多种调度模式下的光储电站经济性最优储能容量配置分析 实现平台&#xff1a;matlab-yalmip-cplex/gurobi 代码简介&#xff1a;代码主要做的是一个光储电站经济最优储能容量配置的问题&#xff0c;对光储电站中储能的容量进行优化&#xff0c;以实现经济…...

深度学习今年来经典模型优缺点总结,包括卷积、循环卷积、Transformer、LSTM、GANs等

文章目录 1、卷积神经网络&#xff08;Convolutional Neural Networks&#xff0c;CNN&#xff09;1.1 优点1.2 缺点1.3 应用场景1.4 网络图 2、循环神经网络&#xff08;Recurrent Neural Networks&#xff0c;RNNs&#xff09;2.1 优点2.2 缺点2.3 应用场景2.4 网络图 3、长短…...

ChatGPT成为“帮凶”:生成虚假数据集支持未知科学假设

ChatGPT 自发布以来&#xff0c;就成为了大家的好帮手&#xff0c;学生党和打工人更是每天都离不开。 然而这次好帮手 ChatGPT 却帮过头了&#xff0c;莫名奇妙的成为了“帮凶”&#xff0c;一位研究人员利用 ChatGPT 创建了虚假的数据集&#xff0c;用来支持未知的科学假设。…...

c#利用Forms.Timer定时检测Tcp连接状态

目的&#xff1a;本地创建客户端连接服务器端&#xff0c;如果连接正常显示连接正常如果连接异常显示连接异常。 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.T…...

空间注意力:改变我们理解图像的方式

空间注意力&#xff1a;改变我们理解图像的方式 欢迎来到深度学习和计算机视觉的新时代&#xff0c;在这里&#xff0c;空间注意力机制正改变着我们理解和处理图像的方式。本文将深入探讨空间注意力的概念&#xff0c;它如何工作&#xff0c;以及为什么它在现代图像处理技术中…...

【模型报错记录】‘PromptForGeneration‘ object has no attribute ‘can_generate‘

通过这个连接中的方法解决&#xff1a; “PromptForGeneration”对象没有属性“can_generate” 期刊 #277 thunlp/OpenPrompt GitHub的 问题描述&#xff1a;在使用model.generate() 的时候报错&#xff1a;PromptForGeneration object has no attribute can_generate 解决方法…...

mysql学习记录

关系型数据库&#xff1a;不是把所有的数据全部存储在一起&#xff0c;而是分类存储在一起。 常见的数据库 关系型&#xff1a;oracle大型收费,mysql小型免费。 sql语言&#xff08;操作数据库&#xff09; structured query language 结构化查询语言 1.DDL 数据定义语言 创建数…...

Hdoop学习笔记(HDP)-Part.11 安装Kerberos

目录 Part.01 关于HDP Part.02 核心组件原理 Part.03 资源规划 Part.04 基础环境配置 Part.05 Yum源配置 Part.06 安装OracleJDK Part.07 安装MySQL Part.08 部署Ambari集群 Part.09 安装OpenLDAP Part.10 创建集群 Part.11 安装Kerberos Part.12 安装HDFS Part.13 安装Ranger …...

浅谈UML的概念和模型之UML九种图

1、用例图&#xff08;use case diagrams&#xff09; 【概念】描述用户需求&#xff0c;从用户的角度描述系统的功能 【描述方式】椭圆表示某个用例&#xff1b;人形符号表示角色 【目的】帮组开发团队以一种可视化的方式理解系统的功能需求 【用例图】 2、静态图 类图&…...

杨志丰:OceanBase助力企业应对数据库转型深水区挑战

11 月 16 日&#xff0c;OceanBase 在北京顺利举办 2023 年度发布会&#xff0c;正式宣布&#xff1a;将持续践行“一体化”产品战略&#xff0c;为关键业务负载打造一体化数据库。OceanBase 产品总经理杨志丰发表了《助力企业应对数据库转型深水区挑战》主题演讲。 以下为演讲…...

版本控制系统Git学习笔记-Git分支操作

文章目录 概述一、Git分支简介1.1 基本概念1.2 创建分支1.3 分支切换1.4 删除分支 二、新建和合并分支2.1 工作流程示意图2.2 新建分支2.3 合并分支2.4 分支示例2.4.1 当前除了主分支&#xff0c;再次创建了两个分支2.4.2 先合并test1分支2.4.3 合并testbranch分支 2.5 解决合并…...

日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻

在如今就业市场竞争日益激烈的背景下&#xff0c;越来越多的求职者将目光投向了日本及中日双语岗位。但是&#xff0c;一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧&#xff1f;面对生疏的日语交流环境&#xff0c;即便提前恶补了…...

聊聊 Pulsar:Producer 源码解析

一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台&#xff0c;以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中&#xff0c;Producer&#xff08;生产者&#xff09; 是连接客户端应用与消息队列的第一步。生产者…...

大数据零基础学习day1之环境准备和大数据初步理解

学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 &#xff08;1&#xff09;设置网关 打开VMware虚拟机&#xff0c;点击编辑…...

376. Wiggle Subsequence

376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用

文章目录 问题现象问题原因解决办法 问题现象 macOS启动台&#xff08;Launchpad&#xff09;多出来了&#xff1a;Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显&#xff0c;都是Google家的办公全家桶。这些应用并不是通过独立安装的…...

【AI学习】三、AI算法中的向量

在人工智能&#xff08;AI&#xff09;算法中&#xff0c;向量&#xff08;Vector&#xff09;是一种将现实世界中的数据&#xff08;如图像、文本、音频等&#xff09;转化为计算机可处理的数值型特征表示的工具。它是连接人类认知&#xff08;如语义、视觉特征&#xff09;与…...

R 语言科研绘图第 55 期 --- 网络图-聚类

在发表科研论文的过程中&#xff0c;科研绘图是必不可少的&#xff0c;一张好看的图形会是文章很大的加分项。 为了便于使用&#xff0c;本系列文章介绍的所有绘图都已收录到了 sciRplot 项目中&#xff0c;获取方式&#xff1a; R 语言科研绘图模板 --- sciRplothttps://mp.…...

为什么要创建 Vue 实例

核心原因:Vue 需要一个「控制中心」来驱动整个应用 你可以把 Vue 实例想象成你应用的**「大脑」或「引擎」。它负责协调模板、数据、逻辑和行为,将它们变成一个活的、可交互的应用**。没有这个实例,你的代码只是一堆静态的 HTML、JavaScript 变量和函数,无法「活」起来。 …...

TSN交换机正在重构工业网络,PROFINET和EtherCAT会被取代吗?

在工业自动化持续演进的今天&#xff0c;通信网络的角色正变得愈发关键。 2025年6月6日&#xff0c;为期三天的华南国际工业博览会在深圳国际会展中心&#xff08;宝安&#xff09;圆满落幕。作为国内工业通信领域的技术型企业&#xff0c;光路科技&#xff08;Fiberroad&…...

【阅读笔记】MemOS: 大语言模型内存增强生成操作系统

核心速览 研究背景 ​​研究问题​​&#xff1a;这篇文章要解决的问题是当前大型语言模型&#xff08;LLMs&#xff09;在处理内存方面的局限性。LLMs虽然在语言感知和生成方面表现出色&#xff0c;但缺乏统一的、结构化的内存架构。现有的方法如检索增强生成&#xff08;RA…...