当前位置: 首页 > news >正文

Kafka-Java四:Spring配置Kafka消费者提交Offset的策略

一、Kafka消费者提交Offset的策略

Kafka消费者提交Offset的策略有

  1. 自动提交Offset:
    1. 消费者将消息拉取下来以后未被消费者消费前,直接自动提交offset。
    2. 自动提交可能丢失数据,比如消息在被消费者消费前已经提交了offset,有可能消息拉取下来以后,消费者挂了
  2. 手动提交Offset
    1. 消费者在消费消息时/后,再提交offset,在消费者中实现
    2. 手动提交Offset分为:手动同步提交、手动异步提交
  3. 什么是Offset
    1. 参考文章:Linux:【Kafka三】组件介绍

二、自动提交策略

        Kafka消费者默认是自动提交Offset的策略

        可设置自动提交的时间间隔

package com.demo.lxb.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** @Description: kafka消费者消费消息,自动提交offset* @Author: lvxiaobu* @Date: 2023-10-24 16:26**/
public class MyConsumerAutoSubmitOffset {private  final static String CONSUMER_GROUP_NAME = "GROUP1";private  final static String TOPIC_NAME = "topic0921";public static void main(String[] args) {Properties props = new Properties();// 一、设置参数// 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置// 配置消息 键值的序列化规则props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 配置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);// 设置消费者offset的提交方式// 自动提交:默认配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");// 自动提交offset的时间间隔props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");// 二、创建消费者KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);// 三、消费者订阅主题consumer.subscribe(Arrays.asList(TOPIC_NAME));// 四、拉取消息,开始消费while (true){// 从kafka集群中拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));// 消费消息,当前是自动提交模式,在消息上一行消息被拉取下来以后,offset就自动被提交了,下面的代码如果出错,或者此时// 消费者挂掉了,那么消费其实是没有进行消费的(也就是业务逻辑处理)for (ConsumerRecord<String, String> record : records) {System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()+ ", key值: " + record.key() + " , value值: "+record.value());}}}
}

上述代码中的如下代码是自动提交策略的相关设置 

        // 设置消费者offset的提交方式// 自动提交:默认配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");// 自动提交offset的时间间隔props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

三、手动提交策略

3.1、手动同步提交策略

        手动同步提交,会在提交offset处阻塞。当消费者接收到 kafka集群返回的消费者提交offset成功的ack后,才开始执行消费者中后续的代码。

        因为使用异步提交容易丢失消息,固一般使用同步提交,在同步提交后不要再做其他逻辑处理。

package com.demo.lxb.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** @Description: kafka消费者消费消息,手动同步提交offset* @Author: lvxiaobu* @Date: 2023-10-24 16:26**/
public class MyConsumerMauSubmitOffset {private  final static String CONSUMER_GROUP_NAME = "GROUP1";private  final static String TOPIC_NAME = "topic0921";public static void main(String[] args) {Properties props = new Properties();// 一、设置参数// 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置// 配置消息 键值的序列化规则props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 配置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);// 设置消费者offset的提交方式// 手动提交offsetprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");// 自动提交offset的时间间隔:此时不再需要设置该值
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");// 二、创建消费者KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);// 三、消费者订阅主题consumer.subscribe(Arrays.asList(TOPIC_NAME));// 四、拉取消息,开始消费while (true){// 从kafka集群中拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//  业务逻辑处理for (ConsumerRecord<String, String> record : records) {System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()+ ", key值: " + record.key() + " , value值: "+record.value());}// 当for循环业务逻辑处理结束以后,再手动提交offset// 同步方式提交,此时会产生阻塞,当kafka集群返回了提交成功的ack以后,才会消除阻塞,进行后续的代码逻辑。// 一般使用同步提交,在同步提交后不再做其他逻辑处理consumer.commitAsync();// do anything}}
}

3.2、手动异步提交策略

        异步提交,不会在提交offset代码处阻塞,即消费者提交了offset后,不需要等待kafka集群返回的ack即可继续执行后续代码。但是在提交offset时需要提供一个回调方法,供kafka集群回调,来告诉消费者提交offset的结果。

package com.demo.lxb.kafka;import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;/*** @Description: kafka消费者消费消息,手动异步提交offset* @Author: lvxiaobu* @Date: 2023-10-24 16:26**/
public class MyConsumerMauSubmitOffset2 {private  final static String CONSUMER_GROUP_NAME = "GROUP1";private  final static String TOPIC_NAME = "topic0921";public static void main(String[] args) {Properties props = new Properties();// 一、设置参数// 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置// 配置消息 键值的序列化规则props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 配置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);// 设置消费者offset的提交方式// 手动提交offsetprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");// 自动提交offset的时间间隔:此时不再需要设置该值
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");// 二、创建消费者KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);// 三、消费者订阅主题consumer.subscribe(Arrays.asList(TOPIC_NAME));// 四、拉取消息,开始消费while (true){// 从kafka集群中拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()+ ", key值: " + record.key() + " , value值: "+record.value());}// 异步提交,不影响后续的内容。// new OffsetCommitCallback是kafka集群会回调的方法,告诉消费者提交offset的结果consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e != null){// 可将提交失败的消息记录到日志System.out.println("记录提交offset失败的消息到日志");System.out.println("消费者提交offset抛出异常:" + Arrays.toString(e.getStackTrace()));System.out.println("消费者提交offset异常的消息信息:" + JSONObject.toJSONString(map));}}});// 后续逻辑处理,不需要等到kafka集群返回了提交成功的ack以后才开始处理。//do anything}}
}

相关文章:

Kafka-Java四:Spring配置Kafka消费者提交Offset的策略

一、Kafka消费者提交Offset的策略 Kafka消费者提交Offset的策略有 自动提交Offset&#xff1a; 消费者将消息拉取下来以后未被消费者消费前&#xff0c;直接自动提交offset。自动提交可能丢失数据&#xff0c;比如消息在被消费者消费前已经提交了offset&#xff0c;有可能消息…...

Python 训练集、测试集以及验证集切分方法:sklearn及手动切分

目录 方法一 方法二 需求目的&#xff1a;针对模型训练输入&#xff0c;按照6:2:2的比例进行训练集、测试集和验证集的划分。当前数据量约10万条。如果针对的是记录条数达上百万的数据集&#xff0c;可按照98:1:1的比例进行切分。 方法一&#xff1a;切分训练集和测试集&…...

数据结构,及分类(存储分类、逻辑分类)介绍

一、数据结构&#xff1a; 数据是软件开发的核心。在软件开发过程中基本上就是对数据的新增、删除、修改、查看的操作。 如何合理存储数据&#xff0c;如何有效提升数据操作开发效率&#xff0c;都是软件开发中的重中之重。使用合理的数据结构是非常重要的。 1.1简介&#xff…...

Powershell脚本自动备份dhcp数据库

文章目录 为什么要备份DHCP数据库呢&#xff1f;在PowerShell中自动备份DHCP数据库1&#xff0c;创建备份目录2&#xff0c;判断备份路径是否存在3&#xff0c;备份DHCP数据库4&#xff0c;完整自动备份脚本5&#xff0c;安排定期备份 推荐阅读 为什么要备份DHCP数据库呢&#…...

第十六章总结:反射和注解

.1.1&#xff1a;访问构造方法 反射&#xff1a; 1.class类 2.获取构造方法 3.获取成员属性 4.获取成员方法 注解 1.内置注解 2.反射注解 3 创建Class对象的三种方式 1.使用getClass&#xff08;&#xff09;方法 object str new object&#xff08;&#xff09;…...

mysql 切割字符串函数

93、mysql 切割字符串函数 需求&#xff0c;使用in 匹配多个参数&#xff0c;name字段值类型&#xff1a;1234(小明) 结果&#xff1a; select * from user where SUBSTRING_INDEX(REPLACE(name, ), ), (, -1) in ( 小明,小李)使用的函数如下 1、使用SUBSTRING_INDEX函数 SU…...

汽车发动机电机右盖设计

摘要 随着我国微型电子技术和社会经济的发展&#xff0c;目前行业内为满足客户需求出现了大量的电器设备&#xff0c;而大多数的电气设备的重要组成中都有电机&#xff0c;并且电机端盖成为电机研发人员重点关注和研究的对象&#xff0c;逐渐成为电机的重要组成部分&#xff0c…...

ETHERNET/IP从站转CANOPEN主站连接AB系统的配置方法

你还在为配置网关的ETHERNET/IP从站和CANOPEN主站发愁吗&#xff1f;今天来教你解决办法&#xff01; 一&#xff0c;首先&#xff0c;配置网关的ETHERNET/IP从站&#xff0c;需要使用AB系统的配置方法&#xff0c;具体步骤如下 1&#xff0c;使用 AB 系统配置网关的 ETHERNET/…...

人工智能和机器学习:走向智能未来的关键

人工智能&#xff08;AI&#xff09;和机器学习&#xff08;ML&#xff09;是当今IT领域中最令人振奋的发展方向之一。从自动驾驶汽车到智能助手&#xff0c;AI技术的应用正在不断扩展&#xff0c;重新定义着我们的生活方式和商业模式。在这个文章中&#xff0c;我们将深入探讨…...

openGauss本地Centos7.6单机安装和简单应用

openGauss本地Centos7.6单机安装和简单应用 openGauss基础环境配置openGauss安装openGauss使用测试openGauss常用命令 openGauss基础环境配置 在VMware Workstation中安装一台 centos7.6 内存&#xff1a;8GB&#xff0c;亲测4GB安装不够 磁盘&#xff1a;测试50GB-100GB够用 …...

LeetCode--1 两数之和

文章目录 1 题目描述2 解题思路2.1 暴力破解2.2 使用 Map 1 题目描述 给定一个整数数组 nums 和一个整数目标值 target, 请你在该数组中找出 和为目标值 target 的那 两个 整数, 并返回它们的数组下标 你可以假设每种输入只会对应一个答案。但是, 数组中同一个元素在答案里不…...

Hafnium安全分区管理器和示例参考软件栈

安全之安全(security)博客目录导读 目录 一、安全分区管理器 1、术语 2、对旧平台的支持 二、示例参考软件栈 一、安全分区管理器 安全分区管理器的三种实现在TF-A代码库并存&#xff1a; 1.基于FF-A规范的S-EL2 SPMC&#xff08;SPM Core&#xff09;&#xff0c;使能安全…...

Python解读市场趋势:LSTM 和 GRU 在预测 Google 股价方面的探索

我将向您展示如何使用 LSTM 和 GRU 预测股票价格。 导入库 import pandas as pd import numpy as np import seaborn as sns import matplotlib.pyplot as plt import matplotlib.dates as mdates import plotly.express as pxfrom keras.preprocessing.sequence import Timese…...

vue源码分析(二)——vue的入口发生了什么

文章目录 前言&#xff08;1&#xff09;vue 项目构建的时候&#xff0c;通过package.json文件看到构建入口&#xff08;2&#xff09; 构建入口页面&#xff1a;导入同级模块config的getAllbuilds方法&#xff08;3&#xff09; 通过传入参数中的builds对象使用map获取&#x…...

系统架构师论文总结【持续更新】

系统架构师考试是对计算机从业人员&#xff0c;以考代评的重要考试&#xff0c;近几年一直在参加考试&#xff0c;屡战屡败&#xff0c;后又屡败屡战&#xff0c;记录总结论文相关的知识点&#xff0c;方便考前查看。 一、2010年论文 1&#xff09;论软件的静态演化和动态演化…...

STM32-LCD中英文显示及应用

目录 字符编码 ASCII码&#xff08;8位&#xff09; 中文编码&#xff08;16位&#xff09; GB2312标准 GBK编码 GB18030标准&#xff08;32位&#xff09; Big5编码 Unicode字符集和编码 UTF-32&#xff08;32位&#xff09; UTF-16&#xff08;16位/32位&#xff0…...

13.4web自动化测试(Selenium3+Java)

一.定义 用来做web自动化测试的框架. 二.特点 1.支持各种浏览器. 2.支持各种平台(操作系统). 3.支持各种编程语言. 4.有丰富的api. 三.工作原理 四.搭环境 1.对照Chrome浏览器版本号,下载ChromeDriver,配置环境变量,我直接把.exe文件放在了jdk安装路径的bin文件夹下了(j…...

P1966 [NOIP2013 提高组] 火柴排队

洛谷的一道原题&#xff0c;方法有很多&#xff0c;树状数组以及排序&#xff0c;对刚学树状数组的人来说用排序会比较好理解。 本题最重要的结论就是&#xff0c;要保证两个数组中相同位置的差最小&#xff0c;但是不一定两个数组中数值相同&#xff0c;所以只需要保证相同位…...

Linux文件I/O

下面的内容需要了解系统调用&#xff0c;可看下面的链接&#xff1a; 系统调用来龙去脉-CSDN博客 1.底层文件IO和标准IO 这里指的是操作系统提供的IO服务&#xff0c;不同于ANSI建立的标准IO。 底层IO和标准IO各自所使用的函数&#xff1a; 区别&#xff1a; 1.底层文件IO不…...

卡巴斯基2009杀毒软件

下载地址&#xff1a;https://user.qzone.qq.com/512526231/main https://user.qzone.qq.com/3503787372/main...

线程同步:确保多线程程序的安全与高效!

全文目录&#xff1a; 开篇语前序前言第一部分&#xff1a;线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分&#xff1a;synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分&#xff…...

【解密LSTM、GRU如何解决传统RNN梯度消失问题】

解密LSTM与GRU&#xff1a;如何让RNN变得更聪明&#xff1f; 在深度学习的世界里&#xff0c;循环神经网络&#xff08;RNN&#xff09;以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而&#xff0c;传统RNN存在的一个严重问题——梯度消失&#…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢

随着互联网技术的飞速发展&#xff0c;消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁&#xff0c;不仅优化了客户体验&#xff0c;还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用&#xff0c;并…...

python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

USB Over IP专用硬件的5个特点

USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中&#xff0c;从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备&#xff08;如专用硬件设备&#xff09;&#xff0c;从而消除了直接物理连接的需要。USB over IP的…...

SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题

分区配置 (ptab.json) img 属性介绍&#xff1a; img 属性指定分区存放的 image 名称&#xff0c;指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件&#xff0c;则以 proj_name:binary_name 格式指定文件名&#xff0c; proj_name 为工程 名&…...

Python Einops库:深度学习中的张量操作革命

Einops&#xff08;爱因斯坦操作库&#xff09;就像给张量操作戴上了一副"语义眼镜"——让你用人类能理解的方式告诉计算机如何操作多维数组。这个基于爱因斯坦求和约定的库&#xff0c;用类似自然语言的表达式替代了晦涩的API调用&#xff0c;彻底改变了深度学习工程…...

在 Spring Boot 项目里,MYSQL中json类型字段使用

前言&#xff1a; 因为程序特殊需求导致&#xff0c;需要mysql数据库存储json类型数据&#xff0c;因此记录一下使用流程 1.java实体中新增字段 private List<User> users 2.增加mybatis-plus注解 TableField(typeHandler FastjsonTypeHandler.class) private Lis…...