当前位置: 首页 > news >正文

Kafka3.0.0版本——消费者(手动提交offset)

目录

    • 一、消费者(手动提交 offset)的概述
      • 1.1、手动提交offset的两种方式
      • 1.2、手动提交offset两种方式的区别
      • 1.3、手动提交offset的图解
    • 二、消费者(手动提交 offset)的代码示例
      • 2.1、手动提交 offset(采用同步提交的方式)代码
      • 2.1、手动提交 offset(采用异步提交的方式)代码

一、消费者(手动提交 offset)的概述

1.1、手动提交offset的两种方式

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

1.2、手动提交offset两种方式的区别

  • 相同点:都会将本次提交的一批数据最高的偏移量提交。
  • 不同点是:同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

1.3、手动提交offset的图解

在这里插入图片描述

二、消费者(手动提交 offset)的代码示例

2.1、手动提交 offset(采用同步提交的方式)代码

  • 同步提交代码
    由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。

     // 是否自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    // 手动提交offset(同步提交)
    kafkaConsumer.commitSync();
    
  • 同步提交完整代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 sevenTopicArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offset(同步提交)kafkaConsumer.commitSync();}}
    }
    

2.1、手动提交 offset(采用异步提交的方式)代码

  • 异步提交代码
    虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset的方式。

     // 是否自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    // 手动提交offset(异步提交)
    kafkaConsumer.commitAsync();
    
  • 异步提交完整代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 sevenTopicArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offset(异步提交)kafkaConsumer.commitAsync();}}
    }
    

相关文章:

Kafka3.0.0版本——消费者(手动提交offset)

目录 一、消费者&#xff08;手动提交 offset&#xff09;的概述1.1、手动提交offset的两种方式1.2、手动提交offset两种方式的区别1.3、手动提交offset的图解 二、消费者&#xff08;手动提交 offset&#xff09;的代码示例2.1、手动提交 offset&#xff08;采用同步提交的方式…...

【AIGC专题】Stable Diffusion 从入门到企业级实战0403

一、前言 本章是《Stable Diffusion 从入门到企业级实战》系列的第四部分能力进阶篇《Stable Diffusion ControlNet v1.1 图像精准控制》第03节&#xff0c; 利用Stable Diffusion ControlNet Canny模型精准控制图像生成。本部分内容&#xff0c;位于整个Stable Diffusion生态…...

linux提权

目录 一、linux提权靶场下载与安装 二、基础提权 1.sudo提权 2.suid提权 3.taskset执行bash 三、内核提权 相关网站 https://gtfobins.github.io/#sudohttps://blog.csdn.net/weixin_43873557/article/details/113784146 一、linux提权靶场下载与安装 #下载链接 http…...

Excel VSTO开发7 -可视化界面开发

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 7 可视化界面开发 前面的代码都是基于插件启动或者退出时&#xff0c;以及Excel Application的相关事件&#xff0c;在用户实际操作…...

英文科技论文写作与发表-投稿到发表(第6章)

1 投稿到发表 本章介绍典型会议和期刊从投稿到最终录用或退稿的全过程&#xff0c;期刊从投稿到最终录用或退稿的过程在各种不同学科领域差别不大。会议主要针对计算机科学及其相关领域&#xff08;如电子、信息、其他工程类&#xff09;的会议。最后总结几条怎样提高论文命中…...

2.4.3 【MySQL】设置系统变量

2.4.3.1 通过启动选项设置 大部分的系统变量都可以通过启动服务器时传送启动选项的方式来进行设置。如何填写启动选项就是下面两种方式&#xff1a; 通过命令行添加启动选项。 在启动服务器程序时用这个命令&#xff1a; mysqld --default-storage-engineMyISAM --max-conn…...

【Redis】2、Redis持久化和性能管理

Redis 高可用 在web服务器中&#xff0c;高可用是指服务器可以正常访问的时间&#xff0c;衡量的标准是在多长时间内可以提供正常服务&#xff08;99.9%、99.99%、99.999%等等&#xff09;。 但是在Redis语境中&#xff0c;高可用的含义似乎要宽泛一些&#xff0c;除了保证提供…...

MIT6.S081实验环境搭建

MIT6.S081 lab 环境搭建 本文参考了MIT的官方指南和知乎文章环境搭建 step1 首先需要一个ubuntu20.04的系统&#xff0c;我使用的是vscode的WSL2连接的ubuntu20.04&#xff0c;使用virtual box建一个ubuntu20.04的虚拟机应该也可以。 可以用 lsb_release -a 查看一下自己ub…...

spring spring-boot spring-cloud spring-cloud-alibaba之间版本对应关系

spring 版本与 jdk 的对应关系 https://github.com/spring-projects/spring-framework/wiki/Spring-Framework-Versions 从 spring 6.0 开始使用 jdk 17 进行编译 对应的相关 servlet 容器&#xff08;tomcat、undertow、jetty等&#xff09;的 servlet 规范转移到 eclipse&…...

Docker技术入门 | Part01:Docker简介

文章目录 1 虚拟化技术2 Docker概述2.1 Docker能解决的问题2.2 Docker介绍2.3 为什么使用Docker2.4 Docker特点2.5 Docker应用场景 3 Docker与虚拟机对比3.1 Docker和虚拟机组成结构3.2 Docker和虚拟机的不同点 4 Docker基本概念4.1 Docker引擎4.2 Docker基本架构4.3 Docker容器…...

Apache实现weblogic集群配置

安装apache&#xff0c;安装相对稳定的版本。如果安装后测试能否正常启动&#xff0c;可以通过访问http://localhost/进行测试。安装Weblogic&#xff0c;参见文档将bea安装目录 weblogic81/server/bin 下的 mod_wl_20.so 文件copy到 apache安装目录下Apache2/modules/目录下A…...

Java面试题总结2023

Java面试题总结2023 基础String中常用的方法 与 equals的区别值传递和引用传递数组和集合的区别成员变量和局部变量的区别final和finally和finalize的区别Cookie和Session的的区别接口分类接口和抽象类的区别说说你对抽象类的理解String/StringBuffer/StringBuilderjdk1.8的新特…...

采用ROUANT 方法对 nex-gddp-cmip6 数据进行精度校正

专题一 CMIP6中的模式比较计划 1.1 GCM介绍全球气候模型&#xff08;Global Climate Model, GCM&#xff09;&#xff0c;也被称为全球环流模型或全球大气模型&#xff0c;是一种用于模拟地球的气候系统的数值模型。这种模型使用一系列的数学公式来描述气候系统的主要组成部分…...

超级电容-电池-超级电容混合储能系统能量管理simulink仿真建模模型

建立混合储能系统模型 在Simulink中&#xff0c;首先需要建立一个超级电容和蓄电池并联的混合储能系统模型。其中&#xff0c;超级电容和蓄电池的荷电状态&#xff08;SOC&#xff09;需要根据实际情况进行管理。荷电状态可以通过对电池和超级电容的电压、电流等进行测量&…...

最新仿闲鱼链接+独立后台管理 跳转APP

2024最新仿xy链接源码 后台一键生成链接&#xff0c;后台管理教程&#xff1a;解压源码&#xff0c;修改数据库config/Congig 不会可以看源码里有教程 下载程序&#xff1a;https://pan.baidu.com/s/16lN3gvRIZm7pqhvVMYYecQ?pwd6zw3...

DoIP协议——汽车以太网应用介绍

DoIP目录 前言一、DoIP术语和缩写二、网络拓扑三、DoIP数据基本结构四、应用场景4.1 直接点对点连接4.2 多台外部测试设备分别和多台汽车在局域网内通过交换机点对点连接4.3 一台外部测试设备跨越本地网络与多台车辆连接4.4 外部测试设备的多个应用层实体(在一台硬件或多台硬件…...

标准C++day1——名字空间和堆内存管理

一、C介绍 本贾尼.斯特劳斯特卢普&#xff0c;于1979年在贝尔实验室负责分析UNIX系统内核流量的分布情况时&#xff0c;特别希望有一种更加模块化的工具&#xff0c;于1979.10开始着手研发一款新的编程语言&#xff0c;在C语言的基础上增加了面向对象的机制&#xff0c;也就是C…...

草图大师SketchUp Pro 2023 for Mac

SketchUp Pro 2023 for Mac&#xff08;草图大师&#xff09;是一款专业的三维建模软件&#xff0c;由Trimble Inc.开发。它可以用于创建、修改和分享3D模型&#xff0c;包括建筑、家具、景观等。 SketchUp Pro 2023 for Mac提供了简单易学的用户界面和强大的工具集&#xff0…...

doris docker环境编译部署

1.准备doris docker环境 xiuchenggongxiuchengdeMacBook-Pro bin % docker pull apache/doris:build-env-ldb-toolchain-latestbuild-env-ldb-toolchain-latest: Pulling from apache/doris eeedae70be19: Pull complete a3ed95caeb02: Pull complete Digest: sha256:63d9a9…...

java封装国密SM4为 jar包,PHP调用

java封装国密SM4为 jar包,PHP调用 创建java工程引入SM4 jar包封装CMD可调用jar包PHP 传参调用刚用java弄了个class给php调用,本以为项目上用到java封装功能的事情就结束了,没想到又来了java的加密需求,这玩意上头,毕竟不是强项,没办法,只好再次封装。 但是这次的有点不…...

浅谈 React Hooks

React Hooks 是 React 16.8 引入的一组 API&#xff0c;用于在函数组件中使用 state 和其他 React 特性&#xff08;例如生命周期方法、context 等&#xff09;。Hooks 通过简洁的函数接口&#xff0c;解决了状态与 UI 的高度解耦&#xff0c;通过函数式编程范式实现更灵活 Rea…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…...

国防科技大学计算机基础课程笔记02信息编码

1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制&#xff0c;因此这个了16进制的数据既可以翻译成为这个机器码&#xff0c;也可以翻译成为这个国标码&#xff0c;所以这个时候很容易会出现这个歧义的情况&#xff1b; 因此&#xff0c;我们的这个国…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析

今天聊的内容&#xff0c;我认为是AI开发里面非常重要的内容。它在AI开发里无处不在&#xff0c;当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗"&#xff0c;或者让翻译模型 "将这段合同翻译成商务日语" 时&#xff0c;输入的这句话就是 Prompt。…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

2024年赣州旅游投资集团社会招聘笔试真

2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...

Java多线程实现之Callable接口深度解析

Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...

Spring Boot面试题精选汇总

&#x1f91f;致敬读者 &#x1f7e9;感谢阅读&#x1f7e6;笑口常开&#x1f7ea;生日快乐⬛早点睡觉 &#x1f4d8;博主相关 &#x1f7e7;博主信息&#x1f7e8;博客首页&#x1f7eb;专栏推荐&#x1f7e5;活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...

如何理解 IP 数据报中的 TTL?

目录 前言理解 前言 面试灵魂一问&#xff1a;说说对 IP 数据报中 TTL 的理解&#xff1f;我们都知道&#xff0c;IP 数据报由首部和数据两部分组成&#xff0c;首部又分为两部分&#xff1a;固定部分和可变部分&#xff0c;共占 20 字节&#xff0c;而即将讨论的 TTL 就位于首…...

稳定币的深度剖析与展望

一、引言 在当今数字化浪潮席卷全球的时代&#xff0c;加密货币作为一种新兴的金融现象&#xff0c;正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而&#xff0c;加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下&#xff0c;稳定…...