当前位置: 首页 > news >正文

kafka-消费者-指定offset消费(SpringBoot整合Kafka)

文章目录

  • 1、指定offset消费
    • 1.1、创建消费者监听器‘
    • 1.2、application.yml配置
    • 1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本
    • 1.4、创建生产者发送消息
      • 1.4.1、分区0中的数据
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:

1、指定offset消费

1.1、创建消费者监听器‘

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaPartitionListener {//初始化偏移量指定后,每次重启都会从该位置消费一轮,所以一般是调式解决问题时才使用@KafkaListener(topicPartitions = {@TopicPartition(topic = "my_topic1",partitionOffsets = {@PartitionOffset(partition = "0",initialOffset = "2")})}, groupId = "my_group1")public void onMessage1(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}

1.2、application.yml配置

server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest  #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本

package com.atguigu.spring.kafka.consumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class MyKafkaConfig {@Beanpublic NewTopic springTestPartitionTopic() {return TopicBuilder.name("my_topic1") //主题名称.partitions(3) //分区数量.replicas(3) //副本数量.build();}
}

在这里插入图片描述
在这里插入图片描述

1.4、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%3,"", "指定分区消费"+i);}}}

在这里插入图片描述
在这里插入图片描述

1.4.1、分区0中的数据

[[{"partition": 0,"offset": 0,"msg": "指定offset消费0","timespan": 1717660785962,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 1,"msg": "指定offset消费3","timespan": 1717660785974,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 2,"msg": "指定offset消费6","timespan": 1717660785975,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 3,"msg": "指定offset消费9","timespan": 1717660785975,"date": "2024-06-06 07:59:45"}]
]

1.5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}

1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.8、消费者控制台:

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9

此时如果重新启动 SpringKafkaConsumerApplication 消费者还是会消费数据,重复消费

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9

相关文章:

kafka-消费者-指定offset消费(SpringBoot整合Kafka)

文章目录 1、指定offset消费1.1、创建消费者监听器‘1.2、application.yml配置1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本1.4、创建生产者发送消息1.4.1、分区0中的数据 1.5、创建SpringBoot启动类1.6、屏蔽 kafka debug 日志 logback.xml1…...

JavaWeb2-Vue

Vue 前端框架&#xff0c;免除原生JS中的DOM操作简化书写 &#xff08;以前学过又忘了&#xff0c;现在才知道原来vue是前端的&#xff09; 基于MVVM思想&#xff08;model-view -viewModel&#xff09;实现数据双向绑定 model是数据模型 view负责数据展示 即DOM 中间这个负责…...

《广告数据定量分析》读书笔记之统计原理2

3.相关分析&#xff1a;描述的是两个数值变量间关系的强度。&#xff08;两个数值型变量之间的关系&#xff09; &#xff08;1&#xff09;图表表示&#xff1a;散点图 &#xff08;2&#xff09;衡量关系强度指标&#xff1a;相关系数r。 &#xff08;r的取值为-1到 1&…...

计算机视觉与模式识别实验2-2 SIFT特征提取与匹配

文章目录 &#x1f9e1;&#x1f9e1;实验流程&#x1f9e1;&#x1f9e1;SIFT算法原理总结&#xff1a;实现SIFT特征检测和匹配通过RANSAC 实现图片拼接更换其他图片再次测试效果&#xff08;依次进行SIFT特征提取、RANSAC 拼接&#xff09; &#x1f9e1;&#x1f9e1;全部代…...

kerberos: Clock skew too great (37) - PROCESS_TGS

kerberos认证失败错误信息&#xff1a; Caused by: org.ietf.jgss.GSSException: No valid credentials provided (Mechanism level: Clock skew too great (37) - PROCESS_TGS)at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:772)at sun.security.j…...

【MATLAB高级编程】入门篇 | 向量化编程

【入门篇】向量化编程 1. 什么是向量?2. 向量的创建2.1 行向量2.2 列向量2.3 使用冒号运算符2.4 使用`linspace`和`logspace`3. 向量的基本操作3.1 向量元素访问3.2 向量的长度3.3 向量的加法和减法3.4 向量的点乘和叉乘3.5 向量的元素乘法和除法4. 向量的高级操作4.1 逻辑索引…...

Debezium日常分享系列之:Debezium 2.7.0.Beta1发布

Debezium日常分享系列之&#xff1a;Debezium 2.7.0.Beta1发布 一、重大变化1.快照工件2.Oracle 二、新功能和改进1.在 z/OS 上支持 Db22.NATS JetStream 接收器身份验证改进3.JDBC 接收器 MariaDB 方言支持4.JMX 导出器添加到 Debezium 服务器5.使用 Debezium Operator 启用 J…...

eNSP学习——RIP的水平分割和触发更新

目录 主要命令 原理概述 实验目的 实验内容 实验拓扑 实验编址 实验步骤 1、基本配置 2、搭建RIP网络 3、验证触发更新 4&#xff0e;验证水平分割 5、验证毒性逆转 需要eNSP各种配置命令的点击链接自取&#xff1a;华为&#xff45;NSP各种设备配置命令大全PDF版_…...

华为面经整理

文章目录 实习第一面准备提问相关算法相关 第一面结果提问环节 总结 实习 第一面准备 提问相关 操作系统有哪些功能 进程管理&#xff1a; 进程调度、进程同步和通信、多任务处理 内存管理&#xff1a; 内存分配、虚拟内存技术、内存保护 文件系统管理&#xff1a; 文件存储…...

数据恢复工具推荐:电脑回收站删除的文件怎么恢复?8个回收站恢复软件,收藏!

当文件从电脑的回收站被删除后&#xff0c;许多用户可能认为这些文件已永久丢失。然而&#xff0c;实际上&#xff0c;在数据被新数据覆盖之前&#xff0c;这些删除的文件仍然可以通过使用专门的数据恢复软件来恢复。本文将介绍8款顶级的文件恢复软件&#xff0c;恢复电脑回收站…...

Java 执行字符串 GroovyShell

1...

前端之npm运行时配置文件.npmrc(可用于配置npm淘宝源)

文章目录 前端之npm运行时配置文件.npmrc什么是.npmrc设置项目配置文件设置用户配置文件设置全局配置文件给npm 命令添加注册源选项 前端之npm运行时配置文件.npmrc 什么是.npmrc 官网&#xff1a;https://nodejs.cn/npm/cli/v7/configuring-npm/npmrc/ .npmrc&#xff0c;可…...

如何充分利用代理IP扩大网络接触面

目录 前言 第一部分&#xff1a;什么是代理IP&#xff1f; 第二部分&#xff1a;如何获取代理IP&#xff1f; 1. IP质量 2. 匿名性 3. 限制 第三部分&#xff1a;如何使用代理IP&#xff1f; 第四部分&#xff1a;如何充分利用代理IP&#xff1f; 总结&#xff1a; 前…...

StableDiffusion Windows本地部署

检查电脑环境 启动CMD命令窗。 如上图&#xff0c;在CMD窗口输入python命令&#xff0c;可查看本地安装的python版本信息等。输入exit()退出python命令行 执行where命令&#xff0c;可查看python安装目录。 必须安装Python3.10.x&#xff0c;因为stable-diffusion-webui的一…...

OpenCV学习(4.5) 图像的形态转换

1.目标 在本教程中&#xff1a; 我们将学习不同的形态操作&#xff0c;如腐蚀、膨胀、开、闭等。我们将看到不同的函数&#xff0c;如&#xff1a; cv.erode()**、 **cv.dilate()**、 **cv.morphologyEx() 等。 理论&#xff1a; 图像的形态转换是图像处理中的一个重要领域…...

MFC设置窗口在Z轴上的位置

函数原型&#xff1a; BOOL CWnd::SetWindowPos(const CWnd* pWndInsertAfter, int x, int y, int cx, int cy, UINT nFlags);返回值&#xff1a; 如果函数成功&#xff0c;则返回非零值&#xff1b;否则返回0。 参数&#xff1a; pWndInsertAfter&#xff1a;标识了在Z轴次…...

STM32项目分享:智能门禁锁系统

目录 一、前言 二、项目简介 1.功能详解 2.主要器件 三、原理图设计 四、PCB硬件设计 1.PCB图 2.PCB板及元器件图 五、程序设计 六、实验效果 七、资料内容 项目分享 一、前言 项目成品图片&#xff1a; 哔哩哔哩视频链接&#xff1a; https://www.bilibili.c…...

PostgreSQL中有没有类似Oracle的dba_objects系统视图

PostgreSQL中有没有类似Oracle的dba_objects系统视图 在PostgreSQL中&#xff0c;没有一个完全集成了所有对象信息的视图&#xff08;类似于Oracle中的DBA_OBJECTS&#xff09;。但是&#xff0c;PostgreSQL提供了一些系统目录表和视图&#xff0c;可以用来获取数据库对象的信…...

【kubernetes】探索k8s集群的配置资源(secret和configma)

目录 一、Secret 1.1Secret 有四种类型 1.2Pod 有 3 种方式来使用 secret 1.3应用场景&#xff1a;凭据 1.4创建 Secret 1.4.1用kubectl create secret命令创建Secret 1.4.2内容用 base64 编码&#xff0c;创建Secret 1.4.2.1Base64编码 1.4.2.2创建YAML文件 1.4.2.3…...

基于springboot实现社区养老服务系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现社区养老服务系统演示 摘要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本社区养老服务系统就是在这样的大环境下诞生&#xff0c;其可以帮助…...

中国象棋AlphaZero:从零构建强化学习象棋AI的完整指南

中国象棋AlphaZero&#xff1a;从零构建强化学习象棋AI的完整指南 【免费下载链接】ChineseChess-AlphaZero Implement AlphaZero/AlphaGo Zero methods on Chinese chess. 项目地址: https://gitcode.com/gh_mirrors/ch/ChineseChess-AlphaZero 中国象棋AlphaZero是一个…...

如何在Windows下使用Rufus轻松格式化ext文件系统:完整指南

如何在Windows下使用Rufus轻松格式化ext文件系统&#xff1a;完整指南 【免费下载链接】rufus The Reliable USB Formatting Utility 项目地址: https://gitcode.com/GitHub_Trending/ru/rufus 还在为在Windows系统下无法直接创建Linux文件系统而烦恼吗&#xff1f;&…...

Mermaid图表绘制终极指南:用Markdown代码快速创建专业图表

Mermaid图表绘制终极指南&#xff1a;用Markdown代码快速创建专业图表 【免费下载链接】mermaid mermaid-js/mermaid: 是一个用于生成图表和流程图的 Markdown 渲染器&#xff0c;支持多种图表类型和丰富的样式。适合对 Markdown、图表和流程图以及想要使用 Markdown 绘制图表和…...

OpenClaw 3.28重磅发布:Grok搜索内置,高危操作迎来“保命”拦截机制

引言&#xff1a; 不仅仅是“草台”后的补救&#xff0c;更是智能体操作系统的成人礼 就在前两天&#xff0c;OpenClaw 之父 Peter 的一次“漏打包”操作&#xff0c;直接导致 3.22 版本大面积白屏&#xff0c;让无数开发者以为自己辛辛苦苦养了一周的“赛博小龙虾”就这么“死…...

Rolify 项目部署指南:从开发环境到生产环境的完整迁移流程

Rolify 项目部署指南&#xff1a;从开发环境到生产环境的完整迁移流程 【免费下载链接】rolify Role management library with resource scoping 项目地址: https://gitcode.com/gh_mirrors/ro/rolify Rolify 是一款功能强大的角色管理库&#xff0c;支持资源范围的权限…...

从数据到洞察:如何利用2024版建筑高度SHP数据,5步完成城市热岛效应初步分析

从数据到洞察&#xff1a;如何利用2024版建筑高度SHP数据&#xff0c;5步完成城市热岛效应初步分析 城市热岛效应是城市化进程中普遍存在的环境问题&#xff0c;表现为城市中心区域温度明显高于周边郊区的现象。这种现象不仅影响居民的生活质量&#xff0c;还会加剧能源消耗和空…...

AviatorScript函数扩展避坑指南:固定参数vs可变参数的选择与实现差异

AviatorScript函数扩展避坑指南&#xff1a;固定参数vs可变参数的选择与实现差异 在AviatorScript的深度开发中&#xff0c;函数扩展是提升脚本灵活性的核心手段。但许多开发者在面对固定参数&#xff08;AbstractFunction&#xff09;和可变参数&#xff08;AbstractVariadicF…...

2步实现格式自由:Save Image as Type让网页图片转换体验升级10倍

2步实现格式自由&#xff1a;Save Image as Type让网页图片转换体验升级10倍 【免费下载链接】Save-Image-as-Type Save Image as Type is an chrome extension which add Save as PNG / JPG / WebP to the context menu of image. 项目地址: https://gitcode.com/gh_mirrors…...

Audio Pixel Studio语音合成实战:正则表达式预处理文本标点停顿

Audio Pixel Studio语音合成实战&#xff1a;正则表达式预处理文本标点停顿 1. 引言&#xff1a;为什么需要文本预处理 在语音合成应用中&#xff0c;文本预处理是一个经常被忽视但至关重要的环节。Audio Pixel Studio作为一款轻量级音频处理工具&#xff0c;虽然内置了强大的…...

MidScene:让AI成为你的自动化协作者 副标题:无需编程的多平台智能操作解决方案

MidScene&#xff1a;让AI成为你的自动化协作者 副标题&#xff1a;无需编程的多平台智能操作解决方案 【免费下载链接】midscene Let AI be your browser operator. 项目地址: https://gitcode.com/GitHub_Trending/mid/midscene 在数字化时代&#xff0c;重复性操作和…...