Kafka3.0.0版本——消费者(手动提交offset)
目录
- 一、消费者(手动提交 offset)的概述
- 1.1、手动提交offset的两种方式
- 1.2、手动提交offset两种方式的区别
- 1.3、手动提交offset的图解
- 二、消费者(手动提交 offset)的代码示例
- 2.1、手动提交 offset(采用同步提交的方式)代码
- 2.1、手动提交 offset(采用异步提交的方式)代码
一、消费者(手动提交 offset)的概述
1.1、手动提交offset的两种方式
- commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
- commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。
1.2、手动提交offset两种方式的区别
- 相同点:都会将本次提交的一批数据最高的偏移量提交。
- 不同点是:同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
1.3、手动提交offset的图解

二、消费者(手动提交 offset)的代码示例
2.1、手动提交 offset(采用同步提交的方式)代码
-
同步提交代码
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。// 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 手动提交offset(同步提交) kafkaConsumer.commitSync(); -
同步提交完整代码
package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.ArrayList; import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者 "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 sevenTopicArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offset(同步提交)kafkaConsumer.commitSync();}} }
2.1、手动提交 offset(采用异步提交的方式)代码
-
异步提交代码
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset的方式。// 是否自动提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 手动提交offset(异步提交) kafkaConsumer.commitAsync(); -
异步提交完整代码
package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration; import java.util.ArrayList; import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者 "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 sevenTopicArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offset(异步提交)kafkaConsumer.commitAsync();}} }
相关文章:
Kafka3.0.0版本——消费者(手动提交offset)
目录 一、消费者(手动提交 offset)的概述1.1、手动提交offset的两种方式1.2、手动提交offset两种方式的区别1.3、手动提交offset的图解 二、消费者(手动提交 offset)的代码示例2.1、手动提交 offset(采用同步提交的方式…...
【AIGC专题】Stable Diffusion 从入门到企业级实战0403
一、前言 本章是《Stable Diffusion 从入门到企业级实战》系列的第四部分能力进阶篇《Stable Diffusion ControlNet v1.1 图像精准控制》第03节, 利用Stable Diffusion ControlNet Canny模型精准控制图像生成。本部分内容,位于整个Stable Diffusion生态…...
linux提权
目录 一、linux提权靶场下载与安装 二、基础提权 1.sudo提权 2.suid提权 3.taskset执行bash 三、内核提权 相关网站 https://gtfobins.github.io/#sudohttps://blog.csdn.net/weixin_43873557/article/details/113784146 一、linux提权靶场下载与安装 #下载链接 http…...
Excel VSTO开发7 -可视化界面开发
版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 7 可视化界面开发 前面的代码都是基于插件启动或者退出时,以及Excel Application的相关事件,在用户实际操作…...
英文科技论文写作与发表-投稿到发表(第6章)
1 投稿到发表 本章介绍典型会议和期刊从投稿到最终录用或退稿的全过程,期刊从投稿到最终录用或退稿的过程在各种不同学科领域差别不大。会议主要针对计算机科学及其相关领域(如电子、信息、其他工程类)的会议。最后总结几条怎样提高论文命中…...
2.4.3 【MySQL】设置系统变量
2.4.3.1 通过启动选项设置 大部分的系统变量都可以通过启动服务器时传送启动选项的方式来进行设置。如何填写启动选项就是下面两种方式: 通过命令行添加启动选项。 在启动服务器程序时用这个命令: mysqld --default-storage-engineMyISAM --max-conn…...
【Redis】2、Redis持久化和性能管理
Redis 高可用 在web服务器中,高可用是指服务器可以正常访问的时间,衡量的标准是在多长时间内可以提供正常服务(99.9%、99.99%、99.999%等等)。 但是在Redis语境中,高可用的含义似乎要宽泛一些,除了保证提供…...
MIT6.S081实验环境搭建
MIT6.S081 lab 环境搭建 本文参考了MIT的官方指南和知乎文章环境搭建 step1 首先需要一个ubuntu20.04的系统,我使用的是vscode的WSL2连接的ubuntu20.04,使用virtual box建一个ubuntu20.04的虚拟机应该也可以。 可以用 lsb_release -a 查看一下自己ub…...
spring spring-boot spring-cloud spring-cloud-alibaba之间版本对应关系
spring 版本与 jdk 的对应关系 https://github.com/spring-projects/spring-framework/wiki/Spring-Framework-Versions 从 spring 6.0 开始使用 jdk 17 进行编译 对应的相关 servlet 容器(tomcat、undertow、jetty等)的 servlet 规范转移到 eclipse&…...
Docker技术入门 | Part01:Docker简介
文章目录 1 虚拟化技术2 Docker概述2.1 Docker能解决的问题2.2 Docker介绍2.3 为什么使用Docker2.4 Docker特点2.5 Docker应用场景 3 Docker与虚拟机对比3.1 Docker和虚拟机组成结构3.2 Docker和虚拟机的不同点 4 Docker基本概念4.1 Docker引擎4.2 Docker基本架构4.3 Docker容器…...
Apache实现weblogic集群配置
安装apache,安装相对稳定的版本。如果安装后测试能否正常启动,可以通过访问http://localhost/进行测试。安装Weblogic,参见文档将bea安装目录 weblogic81/server/bin 下的 mod_wl_20.so 文件copy到 apache安装目录下Apache2/modules/目录下A…...
Java面试题总结2023
Java面试题总结2023 基础String中常用的方法 与 equals的区别值传递和引用传递数组和集合的区别成员变量和局部变量的区别final和finally和finalize的区别Cookie和Session的的区别接口分类接口和抽象类的区别说说你对抽象类的理解String/StringBuffer/StringBuilderjdk1.8的新特…...
采用ROUANT 方法对 nex-gddp-cmip6 数据进行精度校正
专题一 CMIP6中的模式比较计划 1.1 GCM介绍全球气候模型(Global Climate Model, GCM),也被称为全球环流模型或全球大气模型,是一种用于模拟地球的气候系统的数值模型。这种模型使用一系列的数学公式来描述气候系统的主要组成部分…...
超级电容-电池-超级电容混合储能系统能量管理simulink仿真建模模型
建立混合储能系统模型 在Simulink中,首先需要建立一个超级电容和蓄电池并联的混合储能系统模型。其中,超级电容和蓄电池的荷电状态(SOC)需要根据实际情况进行管理。荷电状态可以通过对电池和超级电容的电压、电流等进行测量&…...
最新仿闲鱼链接+独立后台管理 跳转APP
2024最新仿xy链接源码 后台一键生成链接,后台管理教程:解压源码,修改数据库config/Congig 不会可以看源码里有教程 下载程序:https://pan.baidu.com/s/16lN3gvRIZm7pqhvVMYYecQ?pwd6zw3...
DoIP协议——汽车以太网应用介绍
DoIP目录 前言一、DoIP术语和缩写二、网络拓扑三、DoIP数据基本结构四、应用场景4.1 直接点对点连接4.2 多台外部测试设备分别和多台汽车在局域网内通过交换机点对点连接4.3 一台外部测试设备跨越本地网络与多台车辆连接4.4 外部测试设备的多个应用层实体(在一台硬件或多台硬件…...
标准C++day1——名字空间和堆内存管理
一、C介绍 本贾尼.斯特劳斯特卢普,于1979年在贝尔实验室负责分析UNIX系统内核流量的分布情况时,特别希望有一种更加模块化的工具,于1979.10开始着手研发一款新的编程语言,在C语言的基础上增加了面向对象的机制,也就是C…...
草图大师SketchUp Pro 2023 for Mac
SketchUp Pro 2023 for Mac(草图大师)是一款专业的三维建模软件,由Trimble Inc.开发。它可以用于创建、修改和分享3D模型,包括建筑、家具、景观等。 SketchUp Pro 2023 for Mac提供了简单易学的用户界面和强大的工具集࿰…...
doris docker环境编译部署
1.准备doris docker环境 xiuchenggongxiuchengdeMacBook-Pro bin % docker pull apache/doris:build-env-ldb-toolchain-latestbuild-env-ldb-toolchain-latest: Pulling from apache/doris eeedae70be19: Pull complete a3ed95caeb02: Pull complete Digest: sha256:63d9a9…...
java封装国密SM4为 jar包,PHP调用
java封装国密SM4为 jar包,PHP调用 创建java工程引入SM4 jar包封装CMD可调用jar包PHP 传参调用刚用java弄了个class给php调用,本以为项目上用到java封装功能的事情就结束了,没想到又来了java的加密需求,这玩意上头,毕竟不是强项,没办法,只好再次封装。 但是这次的有点不…...
SciencePlots——绘制论文中的图片
文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了:一行…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...
基于Flask实现的医疗保险欺诈识别监测模型
基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...
UDP(Echoserver)
网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法:netstat [选项] 功能:查看网络状态 常用选项: n 拒绝显示别名&#…...
postgresql|数据库|只读用户的创建和删除(备忘)
CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...
【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验
系列回顾: 在上一篇中,我们成功地为应用集成了数据库,并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了!但是,如果你仔细审视那些 API,会发现它们还很“粗糙”:有…...
CVE-2020-17519源码分析与漏洞复现(Flink 任意文件读取)
漏洞概览 漏洞名称:Apache Flink REST API 任意文件读取漏洞CVE编号:CVE-2020-17519CVSS评分:7.5影响版本:Apache Flink 1.11.0、1.11.1、1.11.2修复版本:≥ 1.11.3 或 ≥ 1.12.0漏洞类型:路径遍历&#x…...
现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?
现有的 Redis 分布式锁库(如 Redisson)相比于开发者自己基于 Redis 命令(如 SETNX, EXPIRE, DEL)手动实现分布式锁,提供了巨大的便利性和健壮性。主要体现在以下几个方面: 原子性保证 (Atomicity)ÿ…...
【Redis】笔记|第8节|大厂高并发缓存架构实战与优化
缓存架构 代码结构 代码详情 功能点: 多级缓存,先查本地缓存,再查Redis,最后才查数据库热点数据重建逻辑使用分布式锁,二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...
LabVIEW双光子成像系统技术
双光子成像技术的核心特性 双光子成像通过双低能量光子协同激发机制,展现出显著的技术优势: 深层组织穿透能力:适用于活体组织深度成像 高分辨率观测性能:满足微观结构的精细研究需求 低光毒性特点:减少对样本的损伤…...
