当前位置: 首页 > news >正文

kafka-消费者-指定offset消费(SpringBoot整合Kafka)

文章目录

  • 1、指定offset消费
    • 1.1、创建消费者监听器‘
    • 1.2、application.yml配置
    • 1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本
    • 1.4、创建生产者发送消息
      • 1.4.1、分区0中的数据
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:

1、指定offset消费

1.1、创建消费者监听器‘

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaPartitionListener {//初始化偏移量指定后,每次重启都会从该位置消费一轮,所以一般是调式解决问题时才使用@KafkaListener(topicPartitions = {@TopicPartition(topic = "my_topic1",partitionOffsets = {@PartitionOffset(partition = "0",initialOffset = "2")})}, groupId = "my_group1")public void onMessage1(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}

1.2、application.yml配置

server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest  #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本

package com.atguigu.spring.kafka.consumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class MyKafkaConfig {@Beanpublic NewTopic springTestPartitionTopic() {return TopicBuilder.name("my_topic1") //主题名称.partitions(3) //分区数量.replicas(3) //副本数量.build();}
}

在这里插入图片描述
在这里插入图片描述

1.4、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%3,"", "指定分区消费"+i);}}}

在这里插入图片描述
在这里插入图片描述

1.4.1、分区0中的数据

[[{"partition": 0,"offset": 0,"msg": "指定offset消费0","timespan": 1717660785962,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 1,"msg": "指定offset消费3","timespan": 1717660785974,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 2,"msg": "指定offset消费6","timespan": 1717660785975,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 3,"msg": "指定offset消费9","timespan": 1717660785975,"date": "2024-06-06 07:59:45"}]
]

1.5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}

1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.8、消费者控制台:

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9

此时如果重新启动 SpringKafkaConsumerApplication 消费者还是会消费数据,重复消费

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9

相关文章:

kafka-消费者-指定offset消费(SpringBoot整合Kafka)

文章目录 1、指定offset消费1.1、创建消费者监听器‘1.2、application.yml配置1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本1.4、创建生产者发送消息1.4.1、分区0中的数据 1.5、创建SpringBoot启动类1.6、屏蔽 kafka debug 日志 logback.xml1…...

JavaWeb2-Vue

Vue 前端框架&#xff0c;免除原生JS中的DOM操作简化书写 &#xff08;以前学过又忘了&#xff0c;现在才知道原来vue是前端的&#xff09; 基于MVVM思想&#xff08;model-view -viewModel&#xff09;实现数据双向绑定 model是数据模型 view负责数据展示 即DOM 中间这个负责…...

《广告数据定量分析》读书笔记之统计原理2

3.相关分析&#xff1a;描述的是两个数值变量间关系的强度。&#xff08;两个数值型变量之间的关系&#xff09; &#xff08;1&#xff09;图表表示&#xff1a;散点图 &#xff08;2&#xff09;衡量关系强度指标&#xff1a;相关系数r。 &#xff08;r的取值为-1到 1&…...

计算机视觉与模式识别实验2-2 SIFT特征提取与匹配

文章目录 &#x1f9e1;&#x1f9e1;实验流程&#x1f9e1;&#x1f9e1;SIFT算法原理总结&#xff1a;实现SIFT特征检测和匹配通过RANSAC 实现图片拼接更换其他图片再次测试效果&#xff08;依次进行SIFT特征提取、RANSAC 拼接&#xff09; &#x1f9e1;&#x1f9e1;全部代…...

kerberos: Clock skew too great (37) - PROCESS_TGS

kerberos认证失败错误信息&#xff1a; Caused by: org.ietf.jgss.GSSException: No valid credentials provided (Mechanism level: Clock skew too great (37) - PROCESS_TGS)at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:772)at sun.security.j…...

【MATLAB高级编程】入门篇 | 向量化编程

【入门篇】向量化编程 1. 什么是向量?2. 向量的创建2.1 行向量2.2 列向量2.3 使用冒号运算符2.4 使用`linspace`和`logspace`3. 向量的基本操作3.1 向量元素访问3.2 向量的长度3.3 向量的加法和减法3.4 向量的点乘和叉乘3.5 向量的元素乘法和除法4. 向量的高级操作4.1 逻辑索引…...

Debezium日常分享系列之:Debezium 2.7.0.Beta1发布

Debezium日常分享系列之&#xff1a;Debezium 2.7.0.Beta1发布 一、重大变化1.快照工件2.Oracle 二、新功能和改进1.在 z/OS 上支持 Db22.NATS JetStream 接收器身份验证改进3.JDBC 接收器 MariaDB 方言支持4.JMX 导出器添加到 Debezium 服务器5.使用 Debezium Operator 启用 J…...

eNSP学习——RIP的水平分割和触发更新

目录 主要命令 原理概述 实验目的 实验内容 实验拓扑 实验编址 实验步骤 1、基本配置 2、搭建RIP网络 3、验证触发更新 4&#xff0e;验证水平分割 5、验证毒性逆转 需要eNSP各种配置命令的点击链接自取&#xff1a;华为&#xff45;NSP各种设备配置命令大全PDF版_…...

华为面经整理

文章目录 实习第一面准备提问相关算法相关 第一面结果提问环节 总结 实习 第一面准备 提问相关 操作系统有哪些功能 进程管理&#xff1a; 进程调度、进程同步和通信、多任务处理 内存管理&#xff1a; 内存分配、虚拟内存技术、内存保护 文件系统管理&#xff1a; 文件存储…...

数据恢复工具推荐:电脑回收站删除的文件怎么恢复?8个回收站恢复软件,收藏!

当文件从电脑的回收站被删除后&#xff0c;许多用户可能认为这些文件已永久丢失。然而&#xff0c;实际上&#xff0c;在数据被新数据覆盖之前&#xff0c;这些删除的文件仍然可以通过使用专门的数据恢复软件来恢复。本文将介绍8款顶级的文件恢复软件&#xff0c;恢复电脑回收站…...

Java 执行字符串 GroovyShell

1...

前端之npm运行时配置文件.npmrc(可用于配置npm淘宝源)

文章目录 前端之npm运行时配置文件.npmrc什么是.npmrc设置项目配置文件设置用户配置文件设置全局配置文件给npm 命令添加注册源选项 前端之npm运行时配置文件.npmrc 什么是.npmrc 官网&#xff1a;https://nodejs.cn/npm/cli/v7/configuring-npm/npmrc/ .npmrc&#xff0c;可…...

如何充分利用代理IP扩大网络接触面

目录 前言 第一部分&#xff1a;什么是代理IP&#xff1f; 第二部分&#xff1a;如何获取代理IP&#xff1f; 1. IP质量 2. 匿名性 3. 限制 第三部分&#xff1a;如何使用代理IP&#xff1f; 第四部分&#xff1a;如何充分利用代理IP&#xff1f; 总结&#xff1a; 前…...

StableDiffusion Windows本地部署

检查电脑环境 启动CMD命令窗。 如上图&#xff0c;在CMD窗口输入python命令&#xff0c;可查看本地安装的python版本信息等。输入exit()退出python命令行 执行where命令&#xff0c;可查看python安装目录。 必须安装Python3.10.x&#xff0c;因为stable-diffusion-webui的一…...

OpenCV学习(4.5) 图像的形态转换

1.目标 在本教程中&#xff1a; 我们将学习不同的形态操作&#xff0c;如腐蚀、膨胀、开、闭等。我们将看到不同的函数&#xff0c;如&#xff1a; cv.erode()**、 **cv.dilate()**、 **cv.morphologyEx() 等。 理论&#xff1a; 图像的形态转换是图像处理中的一个重要领域…...

MFC设置窗口在Z轴上的位置

函数原型&#xff1a; BOOL CWnd::SetWindowPos(const CWnd* pWndInsertAfter, int x, int y, int cx, int cy, UINT nFlags);返回值&#xff1a; 如果函数成功&#xff0c;则返回非零值&#xff1b;否则返回0。 参数&#xff1a; pWndInsertAfter&#xff1a;标识了在Z轴次…...

STM32项目分享:智能门禁锁系统

目录 一、前言 二、项目简介 1.功能详解 2.主要器件 三、原理图设计 四、PCB硬件设计 1.PCB图 2.PCB板及元器件图 五、程序设计 六、实验效果 七、资料内容 项目分享 一、前言 项目成品图片&#xff1a; 哔哩哔哩视频链接&#xff1a; https://www.bilibili.c…...

PostgreSQL中有没有类似Oracle的dba_objects系统视图

PostgreSQL中有没有类似Oracle的dba_objects系统视图 在PostgreSQL中&#xff0c;没有一个完全集成了所有对象信息的视图&#xff08;类似于Oracle中的DBA_OBJECTS&#xff09;。但是&#xff0c;PostgreSQL提供了一些系统目录表和视图&#xff0c;可以用来获取数据库对象的信…...

【kubernetes】探索k8s集群的配置资源(secret和configma)

目录 一、Secret 1.1Secret 有四种类型 1.2Pod 有 3 种方式来使用 secret 1.3应用场景&#xff1a;凭据 1.4创建 Secret 1.4.1用kubectl create secret命令创建Secret 1.4.2内容用 base64 编码&#xff0c;创建Secret 1.4.2.1Base64编码 1.4.2.2创建YAML文件 1.4.2.3…...

基于springboot实现社区养老服务系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现社区养老服务系统演示 摘要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本社区养老服务系统就是在这样的大环境下诞生&#xff0c;其可以帮助…...

艾尔登法环帧率解锁终极指南:如何彻底解放游戏性能限制

艾尔登法环帧率解锁终极指南&#xff1a;如何彻底解放游戏性能限制 【免费下载链接】EldenRingFpsUnlockAndMore A small utility to remove frame rate limit, change FOV, add widescreen support and more for Elden Ring 项目地址: https://gitcode.com/gh_mirrors/el/El…...

中文大语言模型智能路由:统一接口调度多模型,实现降本增效

1. 项目概述&#xff1a;一个中文大语言模型路由器的诞生最近在折腾大语言模型应用开发的朋友&#xff0c;估计都遇到过这个头疼的问题&#xff1a;手头有好几个模型&#xff0c;比如智谱的GLM、百度的文心、阿里的通义&#xff0c;还有一堆开源的&#xff0c;每个模型都有自己…...

在Photoshop中解锁AVIF格式:开源插件深度应用指南

在Photoshop中解锁AVIF格式&#xff1a;开源插件深度应用指南 【免费下载链接】avif-format An AV1 Image (AVIF) file format plug-in for Adobe Photoshop 项目地址: https://gitcode.com/gh_mirrors/avi/avif-format 作为网页设计师、摄影师或数字内容创作者&#xf…...

3分钟掌握暗黑破坏神2存档编辑器:免费在线工具让你的游戏体验全面升级

3分钟掌握暗黑破坏神2存档编辑器&#xff1a;免费在线工具让你的游戏体验全面升级 【免费下载链接】d2s-editor 项目地址: https://gitcode.com/gh_mirrors/d2/d2s-editor 还在为刷不到心仪的装备而烦恼吗&#xff1f;想要快速体验不同职业的乐趣却不想重新练级&#x…...

6G网络中的流体天线与速率分割多址技术解析

1. 6G网络中的流体天线与速率分割多址技术解析 在移动通信技术快速迭代的今天&#xff0c;6G网络正逐渐从概念走向现实。作为下一代通信系统的核心候选技术&#xff0c;流体天线系统(FAS)与速率分割多址(RSMA)的结合展现出独特的优势。FAS通过动态调整天线位置提供灵活的空间自…...

智能体开发实战:从LLM工具调用到自主决策系统的架构指南

1. 项目概述与核心价值最近在开源社区里&#xff0c;一个名为DaMaxime/openclaw-agents-docs的项目引起了我的注意。乍一看&#xff0c;这像是一个围绕“OpenClaw Agents”的文档仓库&#xff0c;但当你深入进去&#xff0c;会发现它远不止是简单的API手册或使用说明。这个项目…...

网盘直链解析方案:如何通过浏览器脚本实现多平台文件下载优化

网盘直链解析方案&#xff1a;如何通过浏览器脚本实现多平台文件下载优化 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 &#xff0c;支持 百度网盘 / 阿里云盘 / 中国移动云盘…...

Python金融数据获取终极指南:3分钟掌握同花顺问财数据采集

Python金融数据获取终极指南&#xff1a;3分钟掌握同花顺问财数据采集 【免费下载链接】pywencai 获取同花顺问财数据 项目地址: https://gitcode.com/gh_mirrors/py/pywencai 想要快速获取同花顺问财的金融数据吗&#xff1f;pywencai是你需要了解的终极Python工具&…...

NotebookLM PDF解析失效?3步精准定位文档结构断层并重建语义锚点

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;NotebookLM PDF解析失效的本质归因 NotebookLM 在处理某些 PDF 文档时出现“无法提取文本”或“内容为空”的现象&#xff0c;并非偶然的前端报错&#xff0c;而是源于底层 PDF 解析链路中多个关键环节…...

LoRA微调工程化2026:从实验到生产的完整落地指南

LoRA&#xff08;Low-Rank Adaptation&#xff09;已经成为大模型微调的工业标准。不是因为它最先进&#xff0c;而是因为它在成本、效果、灵活性之间取得了最好的平衡。本文从工程实践角度&#xff0c;覆盖LoRA微调的完整流程——从数据准备、训练配置到生产部署。 LoRA的工程…...