kafka-消费者组(SpringBoot整合Kafka)
文章目录
- 1、消费者组
- 1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
- 1.2、创建生产者发送消息
- 1.3、application.yml配置
- 1.4、创建消费者监听器
- 1.5、创建SpringBoot启动类
- 1.6、屏蔽 kafka debug 日志 logback.xml
- 1.7、引入spring-kafka依赖
- 1.8、消费者控制台:
1、消费者组
1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本

1.2、创建生产者发送消息
package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%6,"", "消费者组"+i);}}
}


1.3、application.yml配置
server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
1.4、创建消费者监听器
package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListener {@KafkaListener(topics ={"my_topic1"},groupId = "my_group1")public void onMessage1(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}@KafkaListener(topics ={"my_topic1"},groupId = "my_group1")public void onMessage2(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者2获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}@KafkaListener(topics ={"my_topic1"},groupId = "my_group2")public void onMessage3(ConsumerRecord<String, String> record) {System.out.println("my_group2消费者获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}
1.5、创建SpringBoot启动类
package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}
1.6、屏蔽 kafka debug 日志 logback.xml
<configuration> <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>
1.7、引入spring-kafka依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
1.8、消费者控制台:
. ____ _ __ _ _/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) )' |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot :: (v3.0.5)my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
my_group2消费者获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
my_group2消费者获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
my_group1消费者1获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
my_group1消费者1获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6

相关文章:
kafka-消费者组(SpringBoot整合Kafka)
文章目录 1、消费者组1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本1.2、创建生产者发送消息1.3、application.yml配置1.4、创建消费者监听器1.5、创建SpringBoot启动类1.6、屏蔽 kafka debug 日志 logback.xml1.7、引入spring-kafka依赖1.8、消费…...
Redisson知识
使用Redission获取锁 RLock lock redisson.getLock("my-lock"); 一、Redisson使用不指定锁过期时间的方式加锁: lock.lock(); 特点: 1.使用Redisson加的锁,具有自动续期机制,如果业务运行时间较长,运行…...
0103__【C/C++ 单线程性能分析工具 Gprof】 GNU的C/C++ 性能分析工具 Gprof 使用全面指南
【C/C 单线程性能分析工具 Gprof】 GNU的C/C 性能分析工具 Gprof 使用全面指南-CSDN博客...
如何把几个pdf文件合成在一个pdf文件
PDF合并,作为一种常见的文件处理方式,无论是在学术研究、工作汇报还是日常生活中,都有着广泛的应用。本文将详细介绍PDF合并的多种方法,帮助读者轻松掌握这一技能。 打开 “轻云处理pdf官网” 的网站,然后上传pdf。 pd…...
Stream与MLC测试CPU内存DDR5的原理与方法详解
在高性能计算和服务器领域,内存性能是决定整体系统性能的关键因素之一,特别是随着DDR5内存的普及,其更高的带宽和更低的延迟特性使得内存性能测试变得更加重要。本文将详细介绍使用Stream和MLC两种工具对CPU内存DDR5进行性能测试的原理和实施…...
linux业务代码性能优化点
planning优化的一些改动----------> 减少值传递,多用引用来传递 <---------- // ----------> 减少值传递,多用引用来传递 <---------- // 例1: class A{}; std::vector<A> v; // for(auto elem : v) {} // 不建议ÿ…...
Shell脚本学习_字符串变量
目录 1.Shell字符串变量:格式介绍 2.Shell字符串变量:拼接 3.Shell字符串变量:字符串截取 4.Shell索引数组变量:定义-获取-拼接-删除 1.Shell字符串变量:格式介绍 1、目标: 能够使用字符串的三种方式 …...
spring-kafka-生产者服务搭建测试(SpringBoot整合Kafka)
文章目录 1、生产者服务搭建1.1、引入spring-kafka依赖1.2、application.yml配置----v1版1.3、使用Java代码创建主题分区副本1.4、发送消息 1、生产者服务搭建 1.1、引入spring-kafka依赖 <?xml version"1.0" encoding"UTF-8"?> <project xml…...
JVM学习-内存泄漏
内存泄漏的理解和分类 可达性分析算法来判断对象是否是不再使用的对象,本质都是判断一上对象是否还被引用,对于这种情况下,由于代码的实现不同就会出现很多内存泄漏问题(让JVM误以为此对象还在引用,无法回收,造成内存泄…...
Go微服务: 分布式之通过本地消息实现最终一致性和最大努力通知方案
通过本地消息实现最终一致性 1 )概述 我们的业务场景是可以允许我们一段时间有不一致的消息的状态的,并没有说必须特别高的这个消息的一致性比如说在TCC这个架构中,如果采用了消息的最终一致性,整体架构设计要轻松好多即便我们库…...
BC C language
题目汇总 No.1 打印有规律的字符(牛牛的字符菱形) 代码展示 #include<stdio.h> int main() {char ch0;scanf("%c",&ch);for(int i0;i<5;i){for(int j0;j<5;j){if((i0||i4)&&j2)printf("%c", ch);else if ((i 1||i3) &&…...
算法训练营第四十九天 | LeetCode 139单词拆分
LeetCode 139 单词拆分 基本还是完全背包的思路,不过用了三重循环,第三重循环是用于判断当前字符串尾部指定长度字符是否和列表中某一字符串相同,是的话可以将当前dp[j]或上当前下标减去该单词长度后的下标值。 代码如下: clas…...
阿里云一键登录号码认证服务
阿里云文档:号码认证SDK_号码认证服务(PNVS)-阿里云帮助中心 对于后端大概流程 前端App会传一个token过来 后端通过下面方法解析 如果解析可以获得号码,说明号码认证成功,如果无法正确解析则认证失败 /*** actoken来换取电话号码* param token app端用户授权actok…...
【UML用户指南】-05-对基本结构建模-类
目录 1、名称(name) 2、属性 (attribute) 3、操作(operation) 4、对属性和操作的组织 4.1、衍型 4.2、职责 (responsibility) 4.3、其他特征 4.4、对简单类型建模 5、结构良…...
【C++ 初阶】引用 () 实际的一些用法、常引用问题 详解!
文章目录 1. 常引用的背景2. 字符 a 与 整形 97 是相同的,但是具体是怎么比较的呢 ? 1. 常引用的背景 注意: 🐧① 权限可以平移、可以缩小,但是权限 不可以放大。 🐧 类型转换中间会产生临时变量 2. 字…...
adb dump当前可见的窗口
1、窗口信息 adb shell dumpsys window windows > w.txt2、dump当前可见的窗口activity windows系统 adb shell dumpsys activity | findStr mFocusmac系统 adb shell dumpsys activity | grep mFocus3、dump当前处于栈顶的activity windows系统 adb shell dumpsys activi…...
Java Web学习笔记27——对话框、表单组件
常见组件对话框: Dialog对话框:在保留当前页面状态下,告知用户并承载相关操作。 dialogTableVisible: false 默认是不可见的。 在按钮属性中设置为true的意思,点击按钮的时候,才会true,对话框才会显示。 …...
使用vue3+ts封装一个Slider滑块组件
创建一个名为 Slider.vue 的文件 <template><div class"slider-container"><inputtype"range":value"value"input"handleInput"change"handleChange"/><div class"slider-value">{{ val…...
关于科技的总结与思考
文章目录 互联网时代有趣的数字数据驱动大数据的两个特性数据保护互联网免费模式的再探讨平台互联网的意义人工智能伦理的思考语言理性人梅特卡夫定律冲浪的神奇之处AR的恐怖之处叙词表、受控词表和大众分类法六度/十九度的解读知识图谱是真正的仿生智能幂次法则和优先连接现代…...
2024年几款优秀的SQL IDE优缺点分析
SQL 工具在数据库管理、查询优化和数据分析中扮演着重要角色。 以下是常见的 SQL 工具及其优缺点。 1. SQLynx 优点: 智能代码补全和建议:采用AI技术提供高级代码补全、智能建议和自动错误检测,大幅提高编写和调试SQL查询的效率。跨平台和…...
基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...
【Linux】C语言执行shell指令
在C语言中执行Shell指令 在C语言中,有几种方法可以执行Shell指令: 1. 使用system()函数 这是最简单的方法,包含在stdlib.h头文件中: #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...
企业如何增强终端安全?
在数字化转型加速的今天,企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机,到工厂里的物联网设备、智能传感器,这些终端构成了企业与外部世界连接的 “神经末梢”。然而,随着远程办公的常态化和设备接入的爆炸式…...
如何在网页里填写 PDF 表格?
有时候,你可能希望用户能在你的网站上填写 PDF 表单。然而,这件事并不简单,因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件,但原生并不支持编辑或填写它们。更糟的是,如果你想收集表单数据ÿ…...
【笔记】WSL 中 Rust 安装与测试完整记录
#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统:Ubuntu 24.04 LTS (WSL2)架构:x86_64 (GNU/Linux)Rust 版本:rustc 1.87.0 (2025-05-09)Cargo 版本:cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...
华为OD机考-机房布局
import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...
Go语言多线程问题
打印零与奇偶数(leetcode 1116) 方法1:使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...
9-Oracle 23 ai Vector Search 特性 知识准备
很多小伙伴是不是参加了 免费认证课程(限时至2025/5/15) Oracle AI Vector Search 1Z0-184-25考试,都顺利拿到certified了没。 各行各业的AI 大模型的到来,传统的数据库中的SQL还能不能打,结构化和非结构的话数据如何和…...
