kafka-消费者组(SpringBoot整合Kafka)
文章目录
- 1、消费者组
- 1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
- 1.2、创建生产者发送消息
- 1.3、application.yml配置
- 1.4、创建消费者监听器
- 1.5、创建SpringBoot启动类
- 1.6、屏蔽 kafka debug 日志 logback.xml
- 1.7、引入spring-kafka依赖
- 1.8、消费者控制台:
1、消费者组
1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本
1.2、创建生产者发送消息
package com.atguigu.spring.kafka.consumer;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest
class SpringKafkaConsumerApplicationTests {@ResourceKafkaTemplate kafkaTemplate;@Testvoid contextLoads() {for (int i = 0; i < 10; i++) {kafkaTemplate.send("my_topic1",i%6,"", "消费者组"+i);}}
}
1.3、application.yml配置
server:port: 8120# v1
spring:Kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097consumer:# read-committed读事务已提交的消息 解决脏读问题isolation-level: read-committed # 消费者的事务隔离级别:read-uncommitted会导致脏读,可以读取生产者事务还未提交的消息# 消费者是否自动ack :true自动ack 消费者获取到消息后kafka提交消费者偏移量enable-auto-commit: true # 消费者提交ack时多长时间批量提交一次auto-commit-interval: 1000# 消费者第一次消费主题消息时从哪个位置开始auto-offset-reset: earliest #指定Offset消费:earliest | latest | nonekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
1.4、创建消费者监听器
package com.atguigu.spring.kafka.consumer.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyKafkaListener {@KafkaListener(topics ={"my_topic1"},groupId = "my_group1")public void onMessage1(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者1获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}@KafkaListener(topics ={"my_topic1"},groupId = "my_group1")public void onMessage2(ConsumerRecord<String, String> record) {System.out.println("my_group1消费者2获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}@KafkaListener(topics ={"my_topic1"},groupId = "my_group2")public void onMessage3(ConsumerRecord<String, String> record) {System.out.println("my_group2消费者获取到消息:topic = "+ record.topic()+",partition:"+record.partition()+",offset = "+record.offset()+",key = "+record.key()+",value = "+record.value());}}
1.5、创建SpringBoot启动类
package com.atguigu.spring.kafka.consumer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// Generated by https://start.springboot.io
// 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn
@SpringBootApplication
public class SpringKafkaConsumerApplication {public static void main(String[] args) {SpringApplication.run(SpringKafkaConsumerApplication.class, args);}}
1.6、屏蔽 kafka debug 日志 logback.xml
<configuration> <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>
1.7、引入spring-kafka依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu</groupId><artifactId>spring-kafka-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-kafka-consumer</name><description>spring-kafka-consumer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
1.8、消费者控制台:
. ____ _ __ _ _/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) )' |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot :: (v3.0.5)my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
my_group2消费者获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
my_group2消费者获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
my_group2消费者获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
my_group2消费者获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
my_group2消费者获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
my_group2消费者获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 0,key = ,value = 消费者组2
my_group1消费者2获取到消息:topic = my_topic1,partition:2,offset = 1,key = ,value = 消费者组8
my_group1消费者1获取到消息:topic = my_topic1,partition:5,offset = 0,key = ,value = 消费者组5
my_group1消费者1获取到消息:topic = my_topic1,partition:4,offset = 0,key = ,value = 消费者组4
my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 0,key = ,value = 消费者组3
my_group1消费者1获取到消息:topic = my_topic1,partition:3,offset = 1,key = ,value = 消费者组9
my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 0,key = ,value = 消费者组1
my_group1消费者2获取到消息:topic = my_topic1,partition:1,offset = 1,key = ,value = 消费者组7
my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 消费者组0
my_group1消费者2获取到消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 消费者组6
相关文章:

kafka-消费者组(SpringBoot整合Kafka)
文章目录 1、消费者组1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本1.2、创建生产者发送消息1.3、application.yml配置1.4、创建消费者监听器1.5、创建SpringBoot启动类1.6、屏蔽 kafka debug 日志 logback.xml1.7、引入spring-kafka依赖1.8、消费…...
Redisson知识
使用Redission获取锁 RLock lock redisson.getLock("my-lock"); 一、Redisson使用不指定锁过期时间的方式加锁: lock.lock(); 特点: 1.使用Redisson加的锁,具有自动续期机制,如果业务运行时间较长,运行…...
0103__【C/C++ 单线程性能分析工具 Gprof】 GNU的C/C++ 性能分析工具 Gprof 使用全面指南
【C/C 单线程性能分析工具 Gprof】 GNU的C/C 性能分析工具 Gprof 使用全面指南-CSDN博客...

如何把几个pdf文件合成在一个pdf文件
PDF合并,作为一种常见的文件处理方式,无论是在学术研究、工作汇报还是日常生活中,都有着广泛的应用。本文将详细介绍PDF合并的多种方法,帮助读者轻松掌握这一技能。 打开 “轻云处理pdf官网” 的网站,然后上传pdf。 pd…...
Stream与MLC测试CPU内存DDR5的原理与方法详解
在高性能计算和服务器领域,内存性能是决定整体系统性能的关键因素之一,特别是随着DDR5内存的普及,其更高的带宽和更低的延迟特性使得内存性能测试变得更加重要。本文将详细介绍使用Stream和MLC两种工具对CPU内存DDR5进行性能测试的原理和实施…...

linux业务代码性能优化点
planning优化的一些改动----------> 减少值传递,多用引用来传递 <---------- // ----------> 减少值传递,多用引用来传递 <---------- // 例1: class A{}; std::vector<A> v; // for(auto elem : v) {} // 不建议ÿ…...

Shell脚本学习_字符串变量
目录 1.Shell字符串变量:格式介绍 2.Shell字符串变量:拼接 3.Shell字符串变量:字符串截取 4.Shell索引数组变量:定义-获取-拼接-删除 1.Shell字符串变量:格式介绍 1、目标: 能够使用字符串的三种方式 …...

spring-kafka-生产者服务搭建测试(SpringBoot整合Kafka)
文章目录 1、生产者服务搭建1.1、引入spring-kafka依赖1.2、application.yml配置----v1版1.3、使用Java代码创建主题分区副本1.4、发送消息 1、生产者服务搭建 1.1、引入spring-kafka依赖 <?xml version"1.0" encoding"UTF-8"?> <project xml…...

JVM学习-内存泄漏
内存泄漏的理解和分类 可达性分析算法来判断对象是否是不再使用的对象,本质都是判断一上对象是否还被引用,对于这种情况下,由于代码的实现不同就会出现很多内存泄漏问题(让JVM误以为此对象还在引用,无法回收,造成内存泄…...

Go微服务: 分布式之通过本地消息实现最终一致性和最大努力通知方案
通过本地消息实现最终一致性 1 )概述 我们的业务场景是可以允许我们一段时间有不一致的消息的状态的,并没有说必须特别高的这个消息的一致性比如说在TCC这个架构中,如果采用了消息的最终一致性,整体架构设计要轻松好多即便我们库…...

BC C language
题目汇总 No.1 打印有规律的字符(牛牛的字符菱形) 代码展示 #include<stdio.h> int main() {char ch0;scanf("%c",&ch);for(int i0;i<5;i){for(int j0;j<5;j){if((i0||i4)&&j2)printf("%c", ch);else if ((i 1||i3) &&…...
算法训练营第四十九天 | LeetCode 139单词拆分
LeetCode 139 单词拆分 基本还是完全背包的思路,不过用了三重循环,第三重循环是用于判断当前字符串尾部指定长度字符是否和列表中某一字符串相同,是的话可以将当前dp[j]或上当前下标减去该单词长度后的下标值。 代码如下: clas…...
阿里云一键登录号码认证服务
阿里云文档:号码认证SDK_号码认证服务(PNVS)-阿里云帮助中心 对于后端大概流程 前端App会传一个token过来 后端通过下面方法解析 如果解析可以获得号码,说明号码认证成功,如果无法正确解析则认证失败 /*** actoken来换取电话号码* param token app端用户授权actok…...

【UML用户指南】-05-对基本结构建模-类
目录 1、名称(name) 2、属性 (attribute) 3、操作(operation) 4、对属性和操作的组织 4.1、衍型 4.2、职责 (responsibility) 4.3、其他特征 4.4、对简单类型建模 5、结构良…...

【C++ 初阶】引用 () 实际的一些用法、常引用问题 详解!
文章目录 1. 常引用的背景2. 字符 a 与 整形 97 是相同的,但是具体是怎么比较的呢 ? 1. 常引用的背景 注意: 🐧① 权限可以平移、可以缩小,但是权限 不可以放大。 🐧 类型转换中间会产生临时变量 2. 字…...
adb dump当前可见的窗口
1、窗口信息 adb shell dumpsys window windows > w.txt2、dump当前可见的窗口activity windows系统 adb shell dumpsys activity | findStr mFocusmac系统 adb shell dumpsys activity | grep mFocus3、dump当前处于栈顶的activity windows系统 adb shell dumpsys activi…...

Java Web学习笔记27——对话框、表单组件
常见组件对话框: Dialog对话框:在保留当前页面状态下,告知用户并承载相关操作。 dialogTableVisible: false 默认是不可见的。 在按钮属性中设置为true的意思,点击按钮的时候,才会true,对话框才会显示。 …...
使用vue3+ts封装一个Slider滑块组件
创建一个名为 Slider.vue 的文件 <template><div class"slider-container"><inputtype"range":value"value"input"handleInput"change"handleChange"/><div class"slider-value">{{ val…...

关于科技的总结与思考
文章目录 互联网时代有趣的数字数据驱动大数据的两个特性数据保护互联网免费模式的再探讨平台互联网的意义人工智能伦理的思考语言理性人梅特卡夫定律冲浪的神奇之处AR的恐怖之处叙词表、受控词表和大众分类法六度/十九度的解读知识图谱是真正的仿生智能幂次法则和优先连接现代…...

2024年几款优秀的SQL IDE优缺点分析
SQL 工具在数据库管理、查询优化和数据分析中扮演着重要角色。 以下是常见的 SQL 工具及其优缺点。 1. SQLynx 优点: 智能代码补全和建议:采用AI技术提供高级代码补全、智能建议和自动错误检测,大幅提高编写和调试SQL查询的效率。跨平台和…...

Chapter03-Authentication vulnerabilities
文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...
linux 错误码总结
1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

自然语言处理——Transformer
自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效,它能挖掘数据中的时序信息以及语义信息,但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN,但是…...

dify打造数据可视化图表
一、概述 在日常工作和学习中,我们经常需要和数据打交道。无论是分析报告、项目展示,还是简单的数据洞察,一个清晰直观的图表,往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server,由蚂蚁集团 AntV 团队…...
智能AI电话机器人系统的识别能力现状与发展水平
一、引言 随着人工智能技术的飞速发展,AI电话机器人系统已经从简单的自动应答工具演变为具备复杂交互能力的智能助手。这类系统结合了语音识别、自然语言处理、情感计算和机器学习等多项前沿技术,在客户服务、营销推广、信息查询等领域发挥着越来越重要…...
ubuntu22.04 安装docker 和docker-compose
首先你要确保没有docker环境或者使用命令删掉docker sudo apt-get remove docker docker-engine docker.io containerd runc安装docker 更新软件环境 sudo apt update sudo apt upgrade下载docker依赖和GPG 密钥 # 依赖 apt-get install ca-certificates curl gnupg lsb-rel…...

ui框架-文件列表展示
ui框架-文件列表展示 介绍 UI框架的文件列表展示组件,可以展示文件夹,支持列表展示和图标展示模式。组件提供了丰富的功能和可配置选项,适用于文件管理、文件上传等场景。 功能特性 支持列表模式和网格模式的切换展示支持文件和文件夹的层…...

高保真组件库:开关
一:制作关状态 拖入一个矩形作为关闭的底色:44 x 22,填充灰色CCCCCC,圆角23,边框宽度0,文本为”关“,右对齐,边距2,2,6,2,文本颜色白色FFFFFF。 拖拽一个椭圆,尺寸18 x 18,边框为0。3. 全选转为动态面板状态1命名为”关“。 二:制作开状态 复制关状态并命名为”开…...
当下AI智能硬件方案浅谈
背景: 现在大模型出来以后,打破了常规的机械式的对话,人机对话变得更聪明一点。 对话用到的技术主要是实时音视频,简称为RTC。下游硬件厂商一般都不会去自己开发音视频技术,开发自己的大模型。商用方案多见为字节、百…...