kafka-消费者-指定offset消费(SpringBoot整合Kafka)
文章目录
- 1、指定offset消费
- 1.1、创建消费者监听器‘
- 1.2、application.yml配置
- 1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本
- 1.4、创建生产者发送消息
- 1.4.1、分区0中的数据
- 1.5、创建SpringBoot启动类
- 1.6、屏蔽 kafka debug 日志 logback.xml
- 1.7、引入spring-kafka依赖
- 1.8、消费者控制台:
1、指定offset消费
1.1、创建消费者监听器‘
package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaPartitionListener {//初始化偏移量指定后,每次重启都会从该位置消费一轮,所以一般是调式解决问题时才使用@KafkaListener(topicPartitions = {@TopicPartition(topic = "my_topic1",partitionOffsets = {@PartitionOffset(partition = "0",initialOffset = "2")})}, groupId = "my_group1")public void onMessage1(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}
1.2、application.yml配置
server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本
package com.atguigu.spring.kafka.consumer.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class MyKafkaConfig {@Beanpublic NewTopic springTestPartitionTopic() {return TopicBuilder.name("my_topic1") //主题名称.partitions(3) //分区数量.replicas(3) //副本数量.build();}
}


1.4、创建生产者发送消息
package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%3,"", "指定分区消费"+i);}}}


1.4.1、分区0中的数据
[[{"partition": 0,"offset": 0,"msg": "指定offset消费0","timespan": 1717660785962,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 1,"msg": "指定offset消费3","timespan": 1717660785974,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 2,"msg": "指定offset消费6","timespan": 1717660785975,"date": "2024-06-06 07:59:45"},{"partition": 0,"offset": 3,"msg": "指定offset消费9","timespan": 1717660785975,"date": "2024-06-06 07:59:45"}]
]
1.5、创建SpringBoot启动类
package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}
1.6、屏蔽 kafka debug 日志 logback.xml
<configuration> <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>
1.7、引入spring-kafka依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
1.8、消费者控制台:
. ____ _ __ _ _/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) )' |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot :: (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9
此时如果重新启动 SpringKafkaConsumerApplication 消费者还是会消费数据,重复消费
. ____ _ __ _ _/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) )' |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot :: (v3.0.5)my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定offset消费6
my_group1消费者1获取到消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定offset消费9
相关文章:
kafka-消费者-指定offset消费(SpringBoot整合Kafka)
文章目录 1、指定offset消费1.1、创建消费者监听器‘1.2、application.yml配置1.3、使用 Java代码 创建 主题 my_topic1 并建立3个分区并给每个分区建立3个副本1.4、创建生产者发送消息1.4.1、分区0中的数据 1.5、创建SpringBoot启动类1.6、屏蔽 kafka debug 日志 logback.xml1…...
JavaWeb2-Vue
Vue 前端框架,免除原生JS中的DOM操作简化书写 (以前学过又忘了,现在才知道原来vue是前端的) 基于MVVM思想(model-view -viewModel)实现数据双向绑定 model是数据模型 view负责数据展示 即DOM 中间这个负责…...
《广告数据定量分析》读书笔记之统计原理2
3.相关分析:描述的是两个数值变量间关系的强度。(两个数值型变量之间的关系) (1)图表表示:散点图 (2)衡量关系强度指标:相关系数r。 (r的取值为-1到 1&…...
计算机视觉与模式识别实验2-2 SIFT特征提取与匹配
文章目录 🧡🧡实验流程🧡🧡SIFT算法原理总结:实现SIFT特征检测和匹配通过RANSAC 实现图片拼接更换其他图片再次测试效果(依次进行SIFT特征提取、RANSAC 拼接) 🧡🧡全部代…...
kerberos: Clock skew too great (37) - PROCESS_TGS
kerberos认证失败错误信息: Caused by: org.ietf.jgss.GSSException: No valid credentials provided (Mechanism level: Clock skew too great (37) - PROCESS_TGS)at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:772)at sun.security.j…...
【MATLAB高级编程】入门篇 | 向量化编程
【入门篇】向量化编程 1. 什么是向量?2. 向量的创建2.1 行向量2.2 列向量2.3 使用冒号运算符2.4 使用`linspace`和`logspace`3. 向量的基本操作3.1 向量元素访问3.2 向量的长度3.3 向量的加法和减法3.4 向量的点乘和叉乘3.5 向量的元素乘法和除法4. 向量的高级操作4.1 逻辑索引…...
Debezium日常分享系列之:Debezium 2.7.0.Beta1发布
Debezium日常分享系列之:Debezium 2.7.0.Beta1发布 一、重大变化1.快照工件2.Oracle 二、新功能和改进1.在 z/OS 上支持 Db22.NATS JetStream 接收器身份验证改进3.JDBC 接收器 MariaDB 方言支持4.JMX 导出器添加到 Debezium 服务器5.使用 Debezium Operator 启用 J…...
eNSP学习——RIP的水平分割和触发更新
目录 主要命令 原理概述 实验目的 实验内容 实验拓扑 实验编址 实验步骤 1、基本配置 2、搭建RIP网络 3、验证触发更新 4.验证水平分割 5、验证毒性逆转 需要eNSP各种配置命令的点击链接自取:华为eNSP各种设备配置命令大全PDF版_…...
华为面经整理
文章目录 实习第一面准备提问相关算法相关 第一面结果提问环节 总结 实习 第一面准备 提问相关 操作系统有哪些功能 进程管理: 进程调度、进程同步和通信、多任务处理 内存管理: 内存分配、虚拟内存技术、内存保护 文件系统管理: 文件存储…...
数据恢复工具推荐:电脑回收站删除的文件怎么恢复?8个回收站恢复软件,收藏!
当文件从电脑的回收站被删除后,许多用户可能认为这些文件已永久丢失。然而,实际上,在数据被新数据覆盖之前,这些删除的文件仍然可以通过使用专门的数据恢复软件来恢复。本文将介绍8款顶级的文件恢复软件,恢复电脑回收站…...
前端之npm运行时配置文件.npmrc(可用于配置npm淘宝源)
文章目录 前端之npm运行时配置文件.npmrc什么是.npmrc设置项目配置文件设置用户配置文件设置全局配置文件给npm 命令添加注册源选项 前端之npm运行时配置文件.npmrc 什么是.npmrc 官网:https://nodejs.cn/npm/cli/v7/configuring-npm/npmrc/ .npmrc,可…...
如何充分利用代理IP扩大网络接触面
目录 前言 第一部分:什么是代理IP? 第二部分:如何获取代理IP? 1. IP质量 2. 匿名性 3. 限制 第三部分:如何使用代理IP? 第四部分:如何充分利用代理IP? 总结: 前…...
StableDiffusion Windows本地部署
检查电脑环境 启动CMD命令窗。 如上图,在CMD窗口输入python命令,可查看本地安装的python版本信息等。输入exit()退出python命令行 执行where命令,可查看python安装目录。 必须安装Python3.10.x,因为stable-diffusion-webui的一…...
OpenCV学习(4.5) 图像的形态转换
1.目标 在本教程中: 我们将学习不同的形态操作,如腐蚀、膨胀、开、闭等。我们将看到不同的函数,如: cv.erode()**、 **cv.dilate()**、 **cv.morphologyEx() 等。 理论: 图像的形态转换是图像处理中的一个重要领域…...
MFC设置窗口在Z轴上的位置
函数原型: BOOL CWnd::SetWindowPos(const CWnd* pWndInsertAfter, int x, int y, int cx, int cy, UINT nFlags);返回值: 如果函数成功,则返回非零值;否则返回0。 参数: pWndInsertAfter:标识了在Z轴次…...
STM32项目分享:智能门禁锁系统
目录 一、前言 二、项目简介 1.功能详解 2.主要器件 三、原理图设计 四、PCB硬件设计 1.PCB图 2.PCB板及元器件图 五、程序设计 六、实验效果 七、资料内容 项目分享 一、前言 项目成品图片: 哔哩哔哩视频链接: https://www.bilibili.c…...
PostgreSQL中有没有类似Oracle的dba_objects系统视图
PostgreSQL中有没有类似Oracle的dba_objects系统视图 在PostgreSQL中,没有一个完全集成了所有对象信息的视图(类似于Oracle中的DBA_OBJECTS)。但是,PostgreSQL提供了一些系统目录表和视图,可以用来获取数据库对象的信…...
【kubernetes】探索k8s集群的配置资源(secret和configma)
目录 一、Secret 1.1Secret 有四种类型 1.2Pod 有 3 种方式来使用 secret 1.3应用场景:凭据 1.4创建 Secret 1.4.1用kubectl create secret命令创建Secret 1.4.2内容用 base64 编码,创建Secret 1.4.2.1Base64编码 1.4.2.2创建YAML文件 1.4.2.3…...
基于springboot实现社区养老服务系统项目【项目源码+论文说明】计算机毕业设计
基于springboot实现社区养老服务系统演示 摘要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本社区养老服务系统就是在这样的大环境下诞生,其可以帮助…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...
[10-3]软件I2C读写MPU6050 江协科技学习笔记(16个知识点)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16...
【Java_EE】Spring MVC
目录 Spring Web MVC 编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 编辑参数重命名 RequestParam 编辑编辑传递集合 RequestParam 传递JSON数据 编辑RequestBody …...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...
算法笔记2
1.字符串拼接最好用StringBuilder,不用String 2.创建List<>类型的数组并创建内存 List arr[] new ArrayList[26]; Arrays.setAll(arr, i -> new ArrayList<>()); 3.去掉首尾空格...
Go语言多线程问题
打印零与奇偶数(leetcode 1116) 方法1:使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...
【JVM】Java虚拟机(二)——垃圾回收
目录 一、如何判断对象可以回收 (一)引用计数法 (二)可达性分析算法 二、垃圾回收算法 (一)标记清除 (二)标记整理 (三)复制 (四ÿ…...
多模态图像修复系统:基于深度学习的图片修复实现
多模态图像修复系统:基于深度学习的图片修复实现 1. 系统概述 本系统使用多模态大模型(Stable Diffusion Inpainting)实现图像修复功能,结合文本描述和图片输入,对指定区域进行内容修复。系统包含完整的数据处理、模型训练、推理部署流程。 import torch import numpy …...
抽象类和接口(全)
一、抽象类 1.概念:如果⼀个类中没有包含⾜够的信息来描绘⼀个具体的对象,这样的类就是抽象类。 像是没有实际⼯作的⽅法,我们可以把它设计成⼀个抽象⽅法,包含抽象⽅法的类我们称为抽象类。 2.语法 在Java中,⼀个类如果被 abs…...
