当前位置: 首页 > news >正文

Kafka3.0.0版本——消费者(手动提交offset)

目录

    • 一、消费者(手动提交 offset)的概述
      • 1.1、手动提交offset的两种方式
      • 1.2、手动提交offset两种方式的区别
      • 1.3、手动提交offset的图解
    • 二、消费者(手动提交 offset)的代码示例
      • 2.1、手动提交 offset(采用同步提交的方式)代码
      • 2.1、手动提交 offset(采用异步提交的方式)代码

一、消费者(手动提交 offset)的概述

1.1、手动提交offset的两种方式

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

1.2、手动提交offset两种方式的区别

  • 相同点:都会将本次提交的一批数据最高的偏移量提交。
  • 不同点是:同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

1.3、手动提交offset的图解

在这里插入图片描述

二、消费者(手动提交 offset)的代码示例

2.1、手动提交 offset(采用同步提交的方式)代码

  • 同步提交代码
    由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。

     // 是否自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    // 手动提交offset(同步提交)
    kafkaConsumer.commitSync();
    
  • 同步提交完整代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 sevenTopicArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offset(同步提交)kafkaConsumer.commitSync();}}
    }
    

2.1、手动提交 offset(采用异步提交的方式)代码

  • 异步提交代码
    虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset的方式。

     // 是否自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    // 手动提交offset(异步提交)
    kafkaConsumer.commitAsync();
    
  • 异步提交完整代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 sevenTopicArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offset(异步提交)kafkaConsumer.commitAsync();}}
    }
    

相关文章:

Kafka3.0.0版本——消费者(手动提交offset)

目录 一、消费者&#xff08;手动提交 offset&#xff09;的概述1.1、手动提交offset的两种方式1.2、手动提交offset两种方式的区别1.3、手动提交offset的图解 二、消费者&#xff08;手动提交 offset&#xff09;的代码示例2.1、手动提交 offset&#xff08;采用同步提交的方式…...

【AIGC专题】Stable Diffusion 从入门到企业级实战0403

一、前言 本章是《Stable Diffusion 从入门到企业级实战》系列的第四部分能力进阶篇《Stable Diffusion ControlNet v1.1 图像精准控制》第03节&#xff0c; 利用Stable Diffusion ControlNet Canny模型精准控制图像生成。本部分内容&#xff0c;位于整个Stable Diffusion生态…...

linux提权

目录 一、linux提权靶场下载与安装 二、基础提权 1.sudo提权 2.suid提权 3.taskset执行bash 三、内核提权 相关网站 https://gtfobins.github.io/#sudohttps://blog.csdn.net/weixin_43873557/article/details/113784146 一、linux提权靶场下载与安装 #下载链接 http…...

Excel VSTO开发7 -可视化界面开发

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 7 可视化界面开发 前面的代码都是基于插件启动或者退出时&#xff0c;以及Excel Application的相关事件&#xff0c;在用户实际操作…...

英文科技论文写作与发表-投稿到发表(第6章)

1 投稿到发表 本章介绍典型会议和期刊从投稿到最终录用或退稿的全过程&#xff0c;期刊从投稿到最终录用或退稿的过程在各种不同学科领域差别不大。会议主要针对计算机科学及其相关领域&#xff08;如电子、信息、其他工程类&#xff09;的会议。最后总结几条怎样提高论文命中…...

2.4.3 【MySQL】设置系统变量

2.4.3.1 通过启动选项设置 大部分的系统变量都可以通过启动服务器时传送启动选项的方式来进行设置。如何填写启动选项就是下面两种方式&#xff1a; 通过命令行添加启动选项。 在启动服务器程序时用这个命令&#xff1a; mysqld --default-storage-engineMyISAM --max-conn…...

【Redis】2、Redis持久化和性能管理

Redis 高可用 在web服务器中&#xff0c;高可用是指服务器可以正常访问的时间&#xff0c;衡量的标准是在多长时间内可以提供正常服务&#xff08;99.9%、99.99%、99.999%等等&#xff09;。 但是在Redis语境中&#xff0c;高可用的含义似乎要宽泛一些&#xff0c;除了保证提供…...

MIT6.S081实验环境搭建

MIT6.S081 lab 环境搭建 本文参考了MIT的官方指南和知乎文章环境搭建 step1 首先需要一个ubuntu20.04的系统&#xff0c;我使用的是vscode的WSL2连接的ubuntu20.04&#xff0c;使用virtual box建一个ubuntu20.04的虚拟机应该也可以。 可以用 lsb_release -a 查看一下自己ub…...

spring spring-boot spring-cloud spring-cloud-alibaba之间版本对应关系

spring 版本与 jdk 的对应关系 https://github.com/spring-projects/spring-framework/wiki/Spring-Framework-Versions 从 spring 6.0 开始使用 jdk 17 进行编译 对应的相关 servlet 容器&#xff08;tomcat、undertow、jetty等&#xff09;的 servlet 规范转移到 eclipse&…...

Docker技术入门 | Part01:Docker简介

文章目录 1 虚拟化技术2 Docker概述2.1 Docker能解决的问题2.2 Docker介绍2.3 为什么使用Docker2.4 Docker特点2.5 Docker应用场景 3 Docker与虚拟机对比3.1 Docker和虚拟机组成结构3.2 Docker和虚拟机的不同点 4 Docker基本概念4.1 Docker引擎4.2 Docker基本架构4.3 Docker容器…...

Apache实现weblogic集群配置

安装apache&#xff0c;安装相对稳定的版本。如果安装后测试能否正常启动&#xff0c;可以通过访问http://localhost/进行测试。安装Weblogic&#xff0c;参见文档将bea安装目录 weblogic81/server/bin 下的 mod_wl_20.so 文件copy到 apache安装目录下Apache2/modules/目录下A…...

Java面试题总结2023

Java面试题总结2023 基础String中常用的方法 与 equals的区别值传递和引用传递数组和集合的区别成员变量和局部变量的区别final和finally和finalize的区别Cookie和Session的的区别接口分类接口和抽象类的区别说说你对抽象类的理解String/StringBuffer/StringBuilderjdk1.8的新特…...

采用ROUANT 方法对 nex-gddp-cmip6 数据进行精度校正

专题一 CMIP6中的模式比较计划 1.1 GCM介绍全球气候模型&#xff08;Global Climate Model, GCM&#xff09;&#xff0c;也被称为全球环流模型或全球大气模型&#xff0c;是一种用于模拟地球的气候系统的数值模型。这种模型使用一系列的数学公式来描述气候系统的主要组成部分…...

超级电容-电池-超级电容混合储能系统能量管理simulink仿真建模模型

建立混合储能系统模型 在Simulink中&#xff0c;首先需要建立一个超级电容和蓄电池并联的混合储能系统模型。其中&#xff0c;超级电容和蓄电池的荷电状态&#xff08;SOC&#xff09;需要根据实际情况进行管理。荷电状态可以通过对电池和超级电容的电压、电流等进行测量&…...

最新仿闲鱼链接+独立后台管理 跳转APP

2024最新仿xy链接源码 后台一键生成链接&#xff0c;后台管理教程&#xff1a;解压源码&#xff0c;修改数据库config/Congig 不会可以看源码里有教程 下载程序&#xff1a;https://pan.baidu.com/s/16lN3gvRIZm7pqhvVMYYecQ?pwd6zw3...

DoIP协议——汽车以太网应用介绍

DoIP目录 前言一、DoIP术语和缩写二、网络拓扑三、DoIP数据基本结构四、应用场景4.1 直接点对点连接4.2 多台外部测试设备分别和多台汽车在局域网内通过交换机点对点连接4.3 一台外部测试设备跨越本地网络与多台车辆连接4.4 外部测试设备的多个应用层实体(在一台硬件或多台硬件…...

标准C++day1——名字空间和堆内存管理

一、C介绍 本贾尼.斯特劳斯特卢普&#xff0c;于1979年在贝尔实验室负责分析UNIX系统内核流量的分布情况时&#xff0c;特别希望有一种更加模块化的工具&#xff0c;于1979.10开始着手研发一款新的编程语言&#xff0c;在C语言的基础上增加了面向对象的机制&#xff0c;也就是C…...

草图大师SketchUp Pro 2023 for Mac

SketchUp Pro 2023 for Mac&#xff08;草图大师&#xff09;是一款专业的三维建模软件&#xff0c;由Trimble Inc.开发。它可以用于创建、修改和分享3D模型&#xff0c;包括建筑、家具、景观等。 SketchUp Pro 2023 for Mac提供了简单易学的用户界面和强大的工具集&#xff0…...

doris docker环境编译部署

1.准备doris docker环境 xiuchenggongxiuchengdeMacBook-Pro bin % docker pull apache/doris:build-env-ldb-toolchain-latestbuild-env-ldb-toolchain-latest: Pulling from apache/doris eeedae70be19: Pull complete a3ed95caeb02: Pull complete Digest: sha256:63d9a9…...

java封装国密SM4为 jar包,PHP调用

java封装国密SM4为 jar包,PHP调用 创建java工程引入SM4 jar包封装CMD可调用jar包PHP 传参调用刚用java弄了个class给php调用,本以为项目上用到java封装功能的事情就结束了,没想到又来了java的加密需求,这玩意上头,毕竟不是强项,没办法,只好再次封装。 但是这次的有点不…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 抽象 现代智能交通系统 &#xff08;ITS&#xff09; 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 &#xff08;…...

模型参数、模型存储精度、参数与显存

模型参数量衡量单位 M&#xff1a;百万&#xff08;Million&#xff09; B&#xff1a;十亿&#xff08;Billion&#xff09; 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的&#xff0c;但是一个参数所表示多少字节不一定&#xff0c;需要看这个参数以什么…...

【力扣数据库知识手册笔记】索引

索引 索引的优缺点 优点1. 通过创建唯一性索引&#xff0c;可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度&#xff08;创建索引的主要原因&#xff09;。3. 可以加速表和表之间的连接&#xff0c;实现数据的参考完整性。4. 可以在查询过程中&#xff0c;…...

Python:操作 Excel 折叠

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...

CMake基础:构建流程详解

目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...

【第二十一章 SDIO接口(SDIO)】

第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...

什么是库存周转?如何用进销存系统提高库存周转率?

你可能听说过这样一句话&#xff1a; “利润不是赚出来的&#xff0c;是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业&#xff0c;很多企业看着销售不错&#xff0c;账上却没钱、利润也不见了&#xff0c;一翻库存才发现&#xff1a; 一堆卖不动的旧货…...

vue3 字体颜色设置的多种方式

在Vue 3中设置字体颜色可以通过多种方式实现&#xff0c;这取决于你是想在组件内部直接设置&#xff0c;还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法&#xff1a; 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...

04-初识css

一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...

今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存

文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...