Kafka3.0.0版本——消费者(手动提交offset)
目录
- 一、消费者(手动提交 offset)的概述
- 1.1、手动提交offset的两种方式
- 1.2、手动提交offset两种方式的区别
- 1.3、手动提交offset的图解
- 二、消费者(手动提交 offset)的代码示例
- 2.1、手动提交 offset(采用同步提交的方式)代码
- 2.1、手动提交 offset(采用异步提交的方式)代码
一、消费者(手动提交 offset)的概述
1.1、手动提交offset的两种方式
- commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
- commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
1.2、手动提交offset两种方式的区别
- 相同点:都会将本次提交的一批数据最高的偏移量提交。
- 不同点是:同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
1.3、手动提交offset的图解

二、消费者(手动提交 offset)的代码示例
2.1、手动提交 offset(采用同步提交的方式)代码
-
同步提交代码
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。// 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 手动提交offset(同步提交) kafkaConsumer.commitSync(); -
同步提交完整代码
package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.ArrayList; import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者 "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 sevenTopicArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offset(同步提交)kafkaConsumer.commitSync();}} }
2.1、手动提交 offset(采用异步提交的方式)代码
-
异步提交代码
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset的方式。// 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 手动提交offset(异步提交) kafkaConsumer.commitAsync(); -
异步提交完整代码
package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.ArrayList; import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者 "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 sevenTopicArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offset(异步提交)kafkaConsumer.commitAsync();}} }
相关文章:
Kafka3.0.0版本——消费者(手动提交offset)
目录 一、消费者(手动提交 offset)的概述1.1、手动提交offset的两种方式1.2、手动提交offset两种方式的区别1.3、手动提交offset的图解 二、消费者(手动提交 offset)的代码示例2.1、手动提交 offset(采用同步提交的方式…...
【AIGC专题】Stable Diffusion 从入门到企业级实战0403
一、前言 本章是《Stable Diffusion 从入门到企业级实战》系列的第四部分能力进阶篇《Stable Diffusion ControlNet v1.1 图像精准控制》第03节, 利用Stable Diffusion ControlNet Canny模型精准控制图像生成。本部分内容,位于整个Stable Diffusion生态…...
linux提权
目录 一、linux提权靶场下载与安装 二、基础提权 1.sudo提权 2.suid提权 3.taskset执行bash 三、内核提权 相关网站 https://gtfobins.github.io/#sudohttps://blog.csdn.net/weixin_43873557/article/details/113784146 一、linux提权靶场下载与安装 #下载链接 http…...
Excel VSTO开发7 -可视化界面开发
版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 7 可视化界面开发 前面的代码都是基于插件启动或者退出时,以及Excel Application的相关事件,在用户实际操作…...
英文科技论文写作与发表-投稿到发表(第6章)
1 投稿到发表 本章介绍典型会议和期刊从投稿到最终录用或退稿的全过程,期刊从投稿到最终录用或退稿的过程在各种不同学科领域差别不大。会议主要针对计算机科学及其相关领域(如电子、信息、其他工程类)的会议。最后总结几条怎样提高论文命中…...
2.4.3 【MySQL】设置系统变量
2.4.3.1 通过启动选项设置 大部分的系统变量都可以通过启动服务器时传送启动选项的方式来进行设置。如何填写启动选项就是下面两种方式: 通过命令行添加启动选项。 在启动服务器程序时用这个命令: mysqld --default-storage-engineMyISAM --max-conn…...
【Redis】2、Redis持久化和性能管理
Redis 高可用 在web服务器中,高可用是指服务器可以正常访问的时间,衡量的标准是在多长时间内可以提供正常服务(99.9%、99.99%、99.999%等等)。 但是在Redis语境中,高可用的含义似乎要宽泛一些,除了保证提供…...
MIT6.S081实验环境搭建
MIT6.S081 lab 环境搭建 本文参考了MIT的官方指南和知乎文章环境搭建 step1 首先需要一个ubuntu20.04的系统,我使用的是vscode的WSL2连接的ubuntu20.04,使用virtual box建一个ubuntu20.04的虚拟机应该也可以。 可以用 lsb_release -a 查看一下自己ub…...
spring spring-boot spring-cloud spring-cloud-alibaba之间版本对应关系
spring 版本与 jdk 的对应关系 https://github.com/spring-projects/spring-framework/wiki/Spring-Framework-Versions 从 spring 6.0 开始使用 jdk 17 进行编译 对应的相关 servlet 容器(tomcat、undertow、jetty等)的 servlet 规范转移到 eclipse&…...
Docker技术入门 | Part01:Docker简介
文章目录 1 虚拟化技术2 Docker概述2.1 Docker能解决的问题2.2 Docker介绍2.3 为什么使用Docker2.4 Docker特点2.5 Docker应用场景 3 Docker与虚拟机对比3.1 Docker和虚拟机组成结构3.2 Docker和虚拟机的不同点 4 Docker基本概念4.1 Docker引擎4.2 Docker基本架构4.3 Docker容器…...
Apache实现weblogic集群配置
安装apache,安装相对稳定的版本。如果安装后测试能否正常启动,可以通过访问http://localhost/进行测试。安装Weblogic,参见文档将bea安装目录 weblogic81/server/bin 下的 mod_wl_20.so 文件copy到 apache安装目录下Apache2/modules/目录下A…...
Java面试题总结2023
Java面试题总结2023 基础String中常用的方法 与 equals的区别值传递和引用传递数组和集合的区别成员变量和局部变量的区别final和finally和finalize的区别Cookie和Session的的区别接口分类接口和抽象类的区别说说你对抽象类的理解String/StringBuffer/StringBuilderjdk1.8的新特…...
采用ROUANT 方法对 nex-gddp-cmip6 数据进行精度校正
专题一 CMIP6中的模式比较计划 1.1 GCM介绍全球气候模型(Global Climate Model, GCM),也被称为全球环流模型或全球大气模型,是一种用于模拟地球的气候系统的数值模型。这种模型使用一系列的数学公式来描述气候系统的主要组成部分…...
超级电容-电池-超级电容混合储能系统能量管理simulink仿真建模模型
建立混合储能系统模型 在Simulink中,首先需要建立一个超级电容和蓄电池并联的混合储能系统模型。其中,超级电容和蓄电池的荷电状态(SOC)需要根据实际情况进行管理。荷电状态可以通过对电池和超级电容的电压、电流等进行测量&…...
最新仿闲鱼链接+独立后台管理 跳转APP
2024最新仿xy链接源码 后台一键生成链接,后台管理教程:解压源码,修改数据库config/Congig 不会可以看源码里有教程 下载程序:https://pan.baidu.com/s/16lN3gvRIZm7pqhvVMYYecQ?pwd6zw3...
DoIP协议——汽车以太网应用介绍
DoIP目录 前言一、DoIP术语和缩写二、网络拓扑三、DoIP数据基本结构四、应用场景4.1 直接点对点连接4.2 多台外部测试设备分别和多台汽车在局域网内通过交换机点对点连接4.3 一台外部测试设备跨越本地网络与多台车辆连接4.4 外部测试设备的多个应用层实体(在一台硬件或多台硬件…...
标准C++day1——名字空间和堆内存管理
一、C介绍 本贾尼.斯特劳斯特卢普,于1979年在贝尔实验室负责分析UNIX系统内核流量的分布情况时,特别希望有一种更加模块化的工具,于1979.10开始着手研发一款新的编程语言,在C语言的基础上增加了面向对象的机制,也就是C…...
草图大师SketchUp Pro 2023 for Mac
SketchUp Pro 2023 for Mac(草图大师)是一款专业的三维建模软件,由Trimble Inc.开发。它可以用于创建、修改和分享3D模型,包括建筑、家具、景观等。 SketchUp Pro 2023 for Mac提供了简单易学的用户界面和强大的工具集࿰…...
doris docker环境编译部署
1.准备doris docker环境 xiuchenggongxiuchengdeMacBook-Pro bin % docker pull apache/doris:build-env-ldb-toolchain-latestbuild-env-ldb-toolchain-latest: Pulling from apache/doris eeedae70be19: Pull complete a3ed95caeb02: Pull complete Digest: sha256:63d9a9…...
java封装国密SM4为 jar包,PHP调用
java封装国密SM4为 jar包,PHP调用 创建java工程引入SM4 jar包封装CMD可调用jar包PHP 传参调用刚用java弄了个class给php调用,本以为项目上用到java封装功能的事情就结束了,没想到又来了java的加密需求,这玩意上头,毕竟不是强项,没办法,只好再次封装。 但是这次的有点不…...
超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...
简易版抽奖活动的设计技术方案
1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...
Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
学校招生小程序源码介绍
基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码,专为学校招生场景量身打造,功能实用且操作便捷。 从技术架构来看,ThinkPHP提供稳定可靠的后台服务,FastAdmin加速开发流程,UniApp则保障小程序在多端有良好的兼…...
Python实现prophet 理论及参数优化
文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候,写过一篇简单实现,后期随着对该模型的深入研究,本次记录涉及到prophet 的公式以及参数调优,从公式可以更直观…...
oracle与MySQL数据库之间数据同步的技术要点
Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异ÿ…...
【C语言练习】080. 使用C语言实现简单的数据库操作
080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...
