当前位置: 首页 > news >正文

kafka-消费者组(SpringBoot整合Kafka)

文章目录

  • 1、消费者组
    • 1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
    • 1.2、创建生产者发送消息
    • 1.3、application.yml配置
    • 1.4、创建消费者监听器
    • 1.5、创建SpringBoot启动类
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、消费者控制台:

1、消费者组

1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本

在这里插入图片描述

1.2、创建生产者发送消息

package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%6,"", "消费者组"+i);}}
}

在这里插入图片描述
在这里插入图片描述

1.3、application.yml配置

server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest  #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

1.4、创建消费者监听器

package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListener {@KafkaListener(topics ={"my_topic1"},groupId = "my_group1")public void onMessage1(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}@KafkaListener(topics ={"my_topic1"},groupId = "my_group1")public void onMessage2(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者2获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}@KafkaListener(topics ={"my_topic1"},groupId = "my_group2")public void onMessage3(ConsumerRecord<String, String> record) {System.out.println("my_group2消费者获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}

1.5、创建SpringBoot启动类

package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}

1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.8、消费者控制台:

  .   ____          _            __ _ _/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/  ___)| |_)| | | | | || (_| |  ) ) ) )'  |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot ::                (v3.0.5)my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
my_group2消费者获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
my_group2消费者获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
my_group1消费者1获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
my_group1消费者1获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6

在这里插入图片描述

相关文章:

kafka-消费者组(SpringBoot整合Kafka)

文章目录 1、消费者组1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本1.2、创建生产者发送消息1.3、application.yml配置1.4、创建消费者监听器1.5、创建SpringBoot启动类1.6、屏蔽 kafka debug 日志 logback.xml1.7、引入spring-kafka依赖1.8、消费…...

Redisson知识

使用Redission获取锁 RLock lock redisson.getLock("my-lock"); 一、Redisson使用不指定锁过期时间的方式加锁&#xff1a; lock.lock(); 特点&#xff1a; 1.使用Redisson加的锁&#xff0c;具有自动续期机制&#xff0c;如果业务运行时间较长&#xff0c;运行…...

0103__【C/C++ 单线程性能分析工具 Gprof】 GNU的C/C++ 性能分析工具 Gprof 使用全面指南

【C/C 单线程性能分析工具 Gprof】 GNU的C/C 性能分析工具 Gprof 使用全面指南-CSDN博客...

如何把几个pdf文件合成在一个pdf文件

PDF合并&#xff0c;作为一种常见的文件处理方式&#xff0c;无论是在学术研究、工作汇报还是日常生活中&#xff0c;都有着广泛的应用。本文将详细介绍PDF合并的多种方法&#xff0c;帮助读者轻松掌握这一技能。 打开 “轻云处理pdf官网” 的网站&#xff0c;然后上传pdf。 pd…...

Stream与MLC测试CPU内存DDR5的原理与方法详解

在高性能计算和服务器领域&#xff0c;内存性能是决定整体系统性能的关键因素之一&#xff0c;特别是随着DDR5内存的普及&#xff0c;其更高的带宽和更低的延迟特性使得内存性能测试变得更加重要。本文将详细介绍使用Stream和MLC两种工具对CPU内存DDR5进行性能测试的原理和实施…...

linux业务代码性能优化点

planning优化的一些改动----------> 减少值传递&#xff0c;多用引用来传递 <---------- // ----------> 减少值传递&#xff0c;多用引用来传递 <---------- // 例1&#xff1a; class A{}; std::vector<A> v; // for(auto elem : v) {} // 不建议&#xff…...

Shell脚本学习_字符串变量

目录 1.Shell字符串变量&#xff1a;格式介绍 2.Shell字符串变量&#xff1a;拼接 3.Shell字符串变量&#xff1a;字符串截取 4.Shell索引数组变量&#xff1a;定义-获取-拼接-删除 1.Shell字符串变量&#xff1a;格式介绍 1、目标&#xff1a; 能够使用字符串的三种方式 …...

spring-kafka-生产者服务搭建测试(SpringBoot整合Kafka)

文章目录 1、生产者服务搭建1.1、引入spring-kafka依赖1.2、application.yml配置----v1版1.3、使用Java代码创建主题分区副本1.4、发送消息 1、生产者服务搭建 1.1、引入spring-kafka依赖 <?xml version"1.0" encoding"UTF-8"?> <project xml…...

JVM学习-内存泄漏

内存泄漏的理解和分类 可达性分析算法来判断对象是否是不再使用的对象&#xff0c;本质都是判断一上对象是否还被引用&#xff0c;对于这种情况下&#xff0c;由于代码的实现不同就会出现很多内存泄漏问题(让JVM误以为此对象还在引用&#xff0c;无法回收&#xff0c;造成内存泄…...

Go微服务: 分布式之通过本地消息实现最终一致性和最大努力通知方案

通过本地消息实现最终一致性 1 &#xff09;概述 我们的业务场景是可以允许我们一段时间有不一致的消息的状态的&#xff0c;并没有说必须特别高的这个消息的一致性比如说在TCC这个架构中&#xff0c;如果采用了消息的最终一致性&#xff0c;整体架构设计要轻松好多即便我们库…...

BC C language

题目汇总 No.1 打印有规律的字符(牛牛的字符菱形) 代码展示 #include<stdio.h> int main() {char ch0;scanf("%c",&ch);for(int i0;i<5;i){for(int j0;j<5;j){if((i0||i4)&&j2)printf("%c", ch);else if ((i 1||i3) &&…...

算法训练营第四十九天 | LeetCode 139单词拆分

LeetCode 139 单词拆分 基本还是完全背包的思路&#xff0c;不过用了三重循环&#xff0c;第三重循环是用于判断当前字符串尾部指定长度字符是否和列表中某一字符串相同&#xff0c;是的话可以将当前dp[j]或上当前下标减去该单词长度后的下标值。 代码如下&#xff1a; clas…...

阿里云一键登录号码认证服务

阿里云文档&#xff1a;号码认证SDK_号码认证服务(PNVS)-阿里云帮助中心 对于后端大概流程 前端App会传一个token过来 后端通过下面方法解析 如果解析可以获得号码,说明号码认证成功,如果无法正确解析则认证失败 /*** actoken来换取电话号码* param token app端用户授权actok…...

【UML用户指南】-05-对基本结构建模-类

目录 1、名称&#xff08;name&#xff09; 2、属性 &#xff08;attribute&#xff09; 3、操作&#xff08;operation&#xff09; 4、对属性和操作的组织 4.1、衍型 4.2、职责 &#xff08;responsibility&#xff09; 4.3、其他特征 4.4、对简单类型建模 5、结构良…...

【C++ 初阶】引用 () 实际的一些用法、常引用问题 详解!

文章目录 1. 常引用的背景2. 字符 a 与 整形 97 是相同的&#xff0c;但是具体是怎么比较的呢 &#xff1f; 1. 常引用的背景 注意&#xff1a; &#x1f427;① 权限可以平移、可以缩小&#xff0c;但是权限 不可以放大。 &#x1f427; 类型转换中间会产生临时变量 2. 字…...

adb dump当前可见的窗口

1、窗口信息 adb shell dumpsys window windows > w.txt2、dump当前可见的窗口activity windows系统 adb shell dumpsys activity | findStr mFocusmac系统 adb shell dumpsys activity | grep mFocus3、dump当前处于栈顶的activity windows系统 adb shell dumpsys activi…...

Java Web学习笔记27——对话框、表单组件

常见组件对话框&#xff1a; Dialog对话框&#xff1a;在保留当前页面状态下&#xff0c;告知用户并承载相关操作。 dialogTableVisible: false 默认是不可见的。 在按钮属性中设置为true的意思&#xff0c;点击按钮的时候&#xff0c;才会true&#xff0c;对话框才会显示。 …...

使用vue3+ts封装一个Slider滑块组件

创建一个名为 Slider.vue 的文件 <template><div class"slider-container"><inputtype"range":value"value"input"handleInput"change"handleChange"/><div class"slider-value">{{ val…...

关于科技的总结与思考

文章目录 互联网时代有趣的数字数据驱动大数据的两个特性数据保护互联网免费模式的再探讨平台互联网的意义人工智能伦理的思考语言理性人梅特卡夫定律冲浪的神奇之处AR的恐怖之处叙词表、受控词表和大众分类法六度/十九度的解读知识图谱是真正的仿生智能幂次法则和优先连接现代…...

2024年几款优秀的SQL IDE优缺点分析

SQL 工具在数据库管理、查询优化和数据分析中扮演着重要角色。 以下是常见的 SQL 工具及其优缺点。 1. SQLynx 优点&#xff1a; 智能代码补全和建议&#xff1a;采用AI技术提供高级代码补全、智能建议和自动错误检测&#xff0c;大幅提高编写和调试SQL查询的效率。跨平台和…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

Map相关知识

数据结构 二叉树 二叉树&#xff0c;顾名思义&#xff0c;每个节点最多有两个“叉”&#xff0c;也就是两个子节点&#xff0c;分别是左子 节点和右子节点。不过&#xff0c;二叉树并不要求每个节点都有两个子节点&#xff0c;有的节点只 有左子节点&#xff0c;有的节点只有…...

Spring数据访问模块设计

前面我们已经完成了IoC和web模块的设计&#xff0c;聪明的码友立马就知道了&#xff0c;该到数据访问模块了&#xff0c;要不就这俩玩个6啊&#xff0c;查库势在必行&#xff0c;至此&#xff0c;它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据&#xff08;数据库、No…...

有限自动机到正规文法转换器v1.0

1 项目简介 这是一个功能强大的有限自动机&#xff08;Finite Automaton, FA&#xff09;到正规文法&#xff08;Regular Grammar&#xff09;转换器&#xff0c;它配备了一个直观且完整的图形用户界面&#xff0c;使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

2025季度云服务器排行榜

在全球云服务器市场&#xff0c;各厂商的排名和地位并非一成不变&#xff0c;而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势&#xff0c;对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析&#xff1a; 一、全球“三巨头”…...

Kafka入门-生产者

生产者 生产者发送流程&#xff1a; 延迟时间为0ms时&#xff0c;也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于&#xff1a;异步发送不需要等待结果&#xff0c;同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...

区块链技术概述

区块链技术是一种去中心化、分布式账本技术&#xff0c;通过密码学、共识机制和智能合约等核心组件&#xff0c;实现数据不可篡改、透明可追溯的系统。 一、核心技术 1. 去中心化 特点&#xff1a;数据存储在网络中的多个节点&#xff08;计算机&#xff09;&#xff0c;而非…...

绕过 Xcode?使用 Appuploader和主流工具实现 iOS 上架自动化

iOS 应用的发布流程一直是开发链路中最“苹果味”的环节&#xff1a;强依赖 Xcode、必须使用 macOS、各种证书和描述文件配置……对很多跨平台开发者来说&#xff0c;这一套流程并不友好。 特别是当你的项目主要在 Windows 或 Linux 下开发&#xff08;例如 Flutter、React Na…...

jdbc查询mysql数据库时,出现id顺序错误的情况

我在repository中的查询语句如下所示&#xff0c;即传入一个List<intager>的数据&#xff0c;返回这些id的问题列表。但是由于数据库查询时ID列表的顺序与预期不一致&#xff0c;会导致返回的id是从小到大排列的&#xff0c;但我不希望这样。 Query("SELECT NEW com…...

【笔记】AI Agent 项目 SUNA 部署 之 Docker 构建记录

#工作记录 构建过程记录 Microsoft Windows [Version 10.0.27871.1000] (c) Microsoft Corporation. All rights reserved.(suna-py3.12) F:\PythonProjects\suna>python setup.py --admin███████╗██╗ ██╗███╗ ██╗ █████╗ ██╔════╝…...