Kafka消费者相关
Kafka生产者相关-CSDN博客
消费者消费数据基本流程
package com.hrui;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消费者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消费者组consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-1");//创建消费者对象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//订阅主题consumer.subscribe(Collections.singletonList("test"));//从Kafka主题中获取数据while (true){ConsumerRecords<String, String> poll = consumer.poll(100);for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}//关闭消费者对象 因上面在无线循环//consumer.close();}
}

消费数据偏移量问题
Kafka中的 偏移量(offset)是用于标识每个消费者在某个分区内消费到的位置。每个分区的消息都有一个唯一的偏移量,消费者会根据这个偏移量来读取消息。
Kafka偏移量的管理
Kafka默认提供两种方式来管理偏移量:
- 自动提交偏移量(默认方式)
- 手动提交偏移量(需要显式配置)
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); //默认设置
consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); //默认设置 每5秒提交一次
也就是说默认情况下 消费者如果后启动 无法读取到生产者已经发送的消息
偏移量的重置
如果需要重新消费数据,可以通过 auto.offset.reset 配置项来控制消费者的偏移量重置行为。这个配置项有几个常用的值:
earliest:如果没有找到偏移量(比如第一次消费),消费者会从最早的消息开始消费。latest:如果没有找到偏移量,消费者会从最新的消息开始消费。none:如果没有找到偏移量,消费者会抛出异常。
例如,设置为从最早的消息开始消费:
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

如果想从指定偏移量获取消息

解决消费者重复消费的问题(不能完全解决)
以上示例,偏移量默认都是5秒一次提交
例如先启动消费者
然后生产者发送了10000条数据 不好演示的话可以在生产者那边每发送一条数据然后
Thread.sleep 1秒
如果消费者在消费到一定程度之后 突然停止 观察再次启动消费者 存在消费者重复消费的情况
原因就是消费者偏移量默认5秒提交一次的原因
那么可以将消费者默认5秒提交偏移量缩短为1秒 但是这样不能完全解决问题
示例
消费者

代码
package com.hrui;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.*;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消费者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消费者组consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-3");//设置从最早的消息读取//consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//创建消费者对象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//订阅主题consumer.subscribe(Collections.singletonList("test"));
// boolean flg = true;
// while (flg) {
// // 拉取数据
// consumer.poll(Duration.ofMillis(100));
// final Set<TopicPartition> assignment = consumer.assignment();
//
// if (assignment != null && !assignment.isEmpty()) {
// // 检查分配的分区
// for (TopicPartition topicPartition : assignment) {
// if ("test".equals(topicPartition.topic())) {
// // 将偏移量设置为2
// consumer.seek(topicPartition, 2);
// // 停止循环
// flg = false;
// }
// }
// }
// }//从Kafka主题中获取数据while (true){//ConsumerRecords<String, String> poll = consumer.poll(100);ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}//关闭消费者对象 因上面在无线循环//consumer.close();}
}
生产者

代码
package com.hrui;import com.hrui.interceptor.KafkaProducerInterceptorTest;
import com.hrui.interceptor.ValueInterceptor;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/*** @author hrui* @date 2025/2/26 13:36*/
public class KafkaProducerTest {public static void main(String[] args) throws ExecutionException, InterruptedException {//创建配置对象Map<String,Object> configMap=new HashMap<>();//如果是集群随意指定一个configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//对Key Value进行序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());configMap.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptor.class.getName());//可以配置ACKSconfigMap.put(ProducerConfig.ACKS_CONFIG,"-1");//配置幂等性configMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//配置重试次数configMap.put(ProducerConfig.RETRIES_CONFIG,3);//配置超时configMap.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);//配置事务 事务基于幂等性configMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-tx-id");//创建生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<>(configMap);//初始化事务kafkaProducer.initTransactions();try {//开启事务kafkaProducer.beginTransaction();for(int i=0;i<10000;i++){//key的作用是通过某种算法,放到topic的某个分区中//可以不设置key 默认是按照轮询的方式ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1","hello kafka" + i);//发送数据 send方法还可以接收一个参数,就是回调函数 kafkaProducer.send(record);是异步的Future<RecordMetadata> send = kafkaProducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {// 处理发送失败的情况e.printStackTrace();} else {// 处理发送成功的情况System.out.println("发送成功:" + recordMetadata);}}});//这样变成同步了send.get();Thread.sleep(1000);}//提交事务kafkaProducer.commitTransaction();}catch (Exception e){e.printStackTrace();//中止事务kafkaProducer.abortTransaction();}finally {//关闭生产者对象kafkaProducer.close();}}
}
先启动消费者
然后启动生产者
观察消费者控制台

过会我再次启动消费者

1.缩短自动提交偏移量的时间
因为默认消费者每5秒自动提交提交
可以缩短自动提交偏移量的时间 但这样只能减少重复消费的量 并不能彻底解决重复消费的问题
2.手动提交偏移量


package com.hrui;import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.*;/*** @author hrui* @date 2025/2/26 17:35*/
public class KafkaConsumerTest {public static void main(String[] args) {Map<String,Object> consumerConfig=new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//消费者反序列化consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消费者组consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test-3");//默认自动提交 改成false 变成手动提交偏移量consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//设置从最早的消息读取//consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//创建消费者对象KafkaConsumer<String,String> consumer=new KafkaConsumer<>(consumerConfig);//订阅主题consumer.subscribe(Collections.singletonList("test"));
// boolean flg = true;
// while (flg) {
// // 拉取数据
// consumer.poll(Duration.ofMillis(100));
// final Set<TopicPartition> assignment = consumer.assignment();
//
// if (assignment != null && !assignment.isEmpty()) {
// // 检查分配的分区
// for (TopicPartition topicPartition : assignment) {
// if ("test".equals(topicPartition.topic())) {
// // 将偏移量设置为2
// consumer.seek(topicPartition, 2);
// // 停止循环
// flg = false;
// }
// }
// }
// }//从Kafka主题中获取数据while (true){//ConsumerRecords<String, String> poll = consumer.poll(100);ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}//设置自动提交偏移量之后 这里要手动去保存偏移量//这里有两种方式 同步提交 和异步提交偏移量consumer.commitAsync();//异步consumer.commitSync();//同步}//关闭消费者对象 因上面在无线循环//consumer.close();}
}
说明以上方式都不能彻底解决重复消费问题
重复消费问题还是存在
如果要进行原子绑定 并非做不到,Kafka本身没有提供相关功能
例如把拉取到的数据全部处理完了,才进行事务提交
一旦出现意外,业务数据恢复 但是Kafka本身没有提供相关功能 和与其他支持事务处理的应用结合使用
消费数据-事务隔离级别
生产者事务图

这个报错 写在提交之前即可

消费者组介绍
如果两个应用都是同一个消费者组
生产者A生产消息 消费者B和C在同一个消费者组 那么A的消息如果被B消费过了那么C是消费不到的 B和C默认是竞争关系
如果生产者A生产消息 消费者B和C在不同消费者组 那么消息会被B和C都消费
第一个场景:消费者B和C在同一个消费者组
如果消费者B和消费者C在同一个消费者组内,消息会按照负载均衡的方式分配给它们。这意味着生产者A生产的消息会被消费者B或消费者C中的一个消费,而不是同时被两个消费者消费。所以,如果B已经消费了某条消息,消费者C就无法再消费到这条消息。这种行为是消费者组的基本特性,主要用于确保每条消息只被某个消费者处理一次。
第二个场景:消费者B和C在不同的消费者组
如果消费者B和消费者C在不同的消费者组中,那么生产者A生产的消息会分别被B和C都消费到。因为每个消费者组有自己独立的消费进度(每个组有独立的偏移量),所以每个消费者组都能独立消费该消息。
第三个场景:消费者B和C同时开启从头消费
如果B和C都在同一个消费者组,并且设置了从头消费,那么它们将从消息队列的最开始位置开始消费。这种情况下,B和C是共享消费队列的,它们会根据负载均衡规则交替消费消息,而不是同时消费同一条消息。因此,A的消息仍然不会同时被B和C消费。每条消息仍然只会被消费者组内的某个消费者消费,并且消息的消费是共享的,但并不是同时共享。
总结来说,在同一个消费者组内,消息的消费是竞争式的。即使B和C同时开启从头消费,它们也不会同时消费同一条消息。每条消息只会由其中一个消费者处理。
相关文章:
Kafka消费者相关
Kafka生产者相关-CSDN博客 消费者消费数据基本流程 package com.hrui;import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache…...
Vue nextTick原理回顾
nextTick就是将异步函数放在下一次实践循环的微任务队列中执行 实现原理比较简单,极简版本: function myNextTick(cb){let p;pPromise.resolve().then(cb)return cb?p:Promise.resolve() }复杂版本,考虑异步函数入队、执行锁、兼容处理 l…...
JavaWeb登录认证
在Web系统中,如果没有登录功能和登录认证,是可以直接访问到Web系统的后台的。 这是不安全的,所以我们今天的主题就是登录认证。最终要实现的效果是: 如果用户名密码错误,不允许登录系统。如果用户名和密码都正确&…...
半导体制造工艺(二)光刻工艺—掩模版
在上文中我们已经简单概述了光刻工艺的大致流程。接下来将会介绍在光刻工艺中所需用到的必备材料以及设备。例如掩模版、光刻胶、匀胶机、光刻机等等。由于需要保持讲述工艺的完整性以及流畅,每一个都需要涉及,所以每次仅是侧重点不同。此篇主要讲述的是…...
计算机视觉算法实战——高精度分割(主页有源码)
✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连 ✨ ✨个人主页欢迎您的访问 ✨期待您的三连✨ 1. 高精度分割领域简介✨✨ 图像分割是计算机视觉中的核心任务之一,其目标是将图像划分为多个语义区域,并为…...
DeepSeek-R1-Zero:基于基础模型的强化学习
注:此文章内容均节选自充电了么创始人,CEO兼CTO陈敬雷老师的新书《自然语言处理原理与实战》(人工智能科学与技术丛书)【陈敬雷编著】【清华大学出版社】 文章目录 DeepSeek大模型技术系列四DeepSeek大模型技术系列四》DeepSeek-…...
判断一个文件中以三个#号开头有多少行的shell脚本怎么写
在Linux中,你可以使用grep命令结合正则表达式来统计一个文件中以三个#号开头的行数。以下是一个简单的命令: grep -c ^### filename这里的grep是搜索工具,-c选项表示统计匹配的行数,###是正则表达式,表示行…...
PHP如何与HTML结合使用?
PHP与HTML结合使用的主要方式是通过在HTML文件中嵌入PHP代码,从而实现动态内容的生成和网页的交互性。以下是详细的方法和最佳实践: 1. 嵌入PHP代码到HTML中 PHP代码可以直接嵌入到HTML文件中,通过<?php ?>标签来包裹PHP代码。服务…...
计算机网络之传输层(传输层的功能)
一、数据分段与重组 传输层从会话层接收数据,并将其分割成较小的数据段,以适应网络层的最大传输单元(MTU)限制。在目的端,传输层负责将这些数据段重新组合成原始数据,确保数据的完整性和正确性。 二、端口…...
矩阵碰一碰发视频源码搭建之,支持OEM
引言 阵碰一碰发视频" 技术凭借其便捷的交互方式和高效的传播能力,已成为品牌推广和内容创作的重要工具。为进一步提升视频传播效果,本文将深入探讨如何在矩阵碰一碰系统中集成 AI 文案生成功能,实现 "一碰即传 智能文案" 的…...
DeepSeek 2月27日技术突破:三大核心功能解析与行业影响
DeepSeek 2月27日技术突破:三大核心功能解析与行业影响 一、最新发布功能全景图 1. DualPipe:双向流水线并行革命 DualPipe是一项极具创新性的双向管道并行算法,旨在解决大规模模型训练过程中计算与通信效率低下的关键问题。在传统的模型训…...
【实战 ES】实战 Elasticsearch:快速上手与深度实践-1.2.2倒排索引原理与分词器(Analyzer)
👉 点击关注不迷路 👉 点击关注不迷路 👉 点击关注不迷路 文章大纲 1.2.2倒排索引原理与分词器(Analyzer)1. 倒排索引:搜索引擎的基石1.1 正排索引 vs 倒排索引示例数据对比: 1.2 倒排索引核心结…...
Vue.js响应式基础
响应式基础 API 参考 本页和后面很多页面中都分别包含了选项式 API 和组合式 API 的示例代码。现在你选择的是 组合式 API。你可以使用左侧侧边栏顶部的“API 风格偏好”开关在 API 风格之间切换。 声明响应式状态 ref() 在组合式 API 中,推荐使用 ref() 函数来声明…...
DeepSeek-OpenSourceWeek-第四天-Optimized Parallelism Strategies
DeepSeek 在 #OpenSourceWeek(开源周) 的第四天推出了两项新工具,旨在让深度学习更快、更高效:**DualPipe** 和 **EPLB**。 DualPipe 定义:DualPipe 是一种用于 V3/R1 训练中计算与通信重叠的双向pipline并行算法。 作用:它通过实现前向和后向计算-通信阶段的完全重叠,减…...
深入浅出:插入排序算法完全解析
1. 什么是插入排序? 插入排序(Insertion Sort)是一种简单的排序算法,其基本思想与我们整理扑克牌的方式非常相似。我们将扑克牌从第二张开始依次与前面已排序的牌进行比较,将其插入到合适的位置,直到所有牌…...
【Keras图像处理入门:图像加载与预处理全解析】
本文将全面讲解如何使用Keras进行图像加载、预处理和数据增强,为深度学习模型准备高质量的图像数据。 一、单张图像处理基础 1. 图像加载与尺寸调整 from keras.preprocessing import image# 加载图像并调整尺寸 img image.load_img(example.jpg, target_size(1…...
企业级AI办公落地实践:基于钉钉/飞书的标准产品解决方案
一、平台化AI的崛起:开箱即用的智能革命 2024年企业AI应用调研数据显示: 73%的中型企业选择平台标准产品而非自研头部SaaS平台AI功能渗透率达89%典型ROI周期从18个月缩短至3-6个月 核心优势对比: 维度自研方案平台标准产品部署周期6-12个…...
对于邮箱地址而言,短中划线(Hyphen, -)和长中划线(Em dash, —)有区别吗
对于邮箱地址而言,**短中划线(Hyphen, -)和长中划线(Em dash, —)**有明确的区别: 短中划线(Hyphen, -): 在邮箱地址中,短中划线是可以使用的,通常…...
C++ STL(三)list
目录 list是什么 构造函数 元素访问 容量操作 修改 迭代器 code实例 实现简单的list forward_list是什么 构造函数 元素访问 容量 修改 迭代器 code实例 实现一个简单的forward_list list是什么 std::list 是 C 标准模板库(STL)中的一个…...
Vue3+TypeScript 封装一个好用的防抖节流自定义指令
一、前言:为什么需要防抖节流? 在前端开发中,高频触发的事件(如滚动、输入、点击等)容易导致性能问题。防抖(debounce) 和 节流(throttle) 是两种常用的优化手段&#x…...
无机布防火卷帘门价格怎么算?按尺寸定制,按需报价
无机布防火卷帘门作为建筑防火分区的核心设备,价格一直是工程采购的关注重点。很多用户在询价时,会发现不同厂家的报价差异较大,这是因为无机布防火卷帘门的价格并非按统一单价计算,而是完全根据项目的实际需求定制化核算。 &…...
从怀疑到真香!2026我日常办公离不开的这款在线文字转换器太好用了
刚入职那半年我踩过太多坑:一周三次新人培训,怕漏记知识点全程录音,下课手动整理1小时录音要熬3小时,知识点散得根本没法复习;部门周会做完记录,散会就要我出整理好的纪要,赶工赶得饭都吃不上&a…...
智能检索新范式,让AIAgent自主决策,提升RAG效率100%!
市面上的 RAG 系统,不管叫什么名字,本质上只有两种做法: 第一种,一次性检索。把用户的 query 向量化,从语料库里捞出 Top-K 个文档片段,拼成一个大 prompt 塞给模型。GraphRAG、HippoRAG、LightRAG 都属于…...
用STM32CubeMX和HAL库快速上手WS2812B:告别手动计算延时,一键生成驱动框架
基于STM32CubeMX的WS2812B智能灯光控制:从零构建现代化驱动方案在智能硬件和物联网设备快速发展的今天,WS2812B可编程LED灯带因其丰富的色彩表现和简单的单线控制方式,成为创客和工程师们最喜爱的显示组件之一。然而,传统的寄存器…...
内网环境下Win7系统批量离线补丁部署实战指南
1. 内网Win7补丁部署的挑战与解决方案老旧Win7系统在内网环境中的安全隐患就像漏雨的屋顶,看似不影响日常使用,但随时可能引发严重后果。我经手过几十家单位的系统加固项目,发现这些场景存在三个典型痛点:首先是补丁来源问题&…...
我靠这个测试设计方法,把漏测率降低了80%
当“直觉测试”撞上南墙很长一段时间里,我和许多测试同行一样,测试用例的设计主要依靠两样东西:需求文档和“测试直觉”。这种模式在业务逻辑相对简单、迭代速度平缓时还能勉强应付。一旦面对复杂的企业级应用、高频的敏捷迭代,或…...
内存占用3KB!极致瘦身释放MCU无限可能
极致小体积,给工业领域带来了无限的可能:更低硬件成本,更小芯片体积,更低功耗,更高可靠性,让每一颗小MCU都拥有大系统的完整能力。 https://www.bilibili.com/video/BV1eZLi6PEjc/?spm_id_from333.1387.ho…...
InVideo插件深度解析:如何在Unreal Engine中实现高效视频流播放与录制
InVideo插件深度解析:如何在Unreal Engine中实现高效视频流播放与录制 【免费下载链接】InVideo 基于UE4实现的rtsp的视频播放插件 项目地址: https://gitcode.com/gh_mirrors/in/InVideo InVideo是一个基于Unreal Engine 5开发的RTSP视频播放插件࿰…...
3分钟搞定专业短视频!Pixelle-Video终极AI创作指南
3分钟搞定专业短视频!Pixelle-Video终极AI创作指南 【免费下载链接】Pixelle-Video 🚀 AI 全自动短视频引擎 | AI Fully Automated Short Video Engine 项目地址: https://gitcode.com/GitHub_Trending/pi/Pixelle-Video 还在为视频制作发愁吗&am…...
终极音乐解锁指南:3步让加密音乐在任何设备自由播放
终极音乐解锁指南:3步让加密音乐在任何设备自由播放 【免费下载链接】unlock-music 在浏览器中解锁加密的音乐文件。原仓库: 1. https://github.com/unlock-music/unlock-music ;2. https://git.unlock-music.dev/um/web 项目地址: https:/…...

