当前位置: 首页 > news >正文

Spring Boot集成kafka的相关配置

引入依赖:

额外依赖只需要这一个,kafka-client 不是springboot 的东西,那是原生的 kafka 客户端, kafka-test也不需要,是用代码控制broker的东西。

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

yml配置:

也可以用java类Config 方式配置,如果没有特殊要求,可以只用spring配置的方式

server:port: 8080
spring:kafka:# Kafka服务器,支持集群bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092# 生产者配置producer:# 消息发送重试次数,注意会引起重复消费,消费者需要做幂等retries: 3# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。# 开启事务时,必须设置为allacks: all# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 生产者内存缓冲区的大小。buffer-memory: 1024000# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# 消费者配置consumer:# 消费组ID,同一个消费组不会重复消费数据group-id: testGroup# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D#auto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)# none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。# 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况max-poll-records: 3properties:# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancemax:poll:interval:ms: 600000# 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10ssession:timeout:ms: 10000listener:# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数concurrency: 4# 自动提交关闭,需要设置手动消息确认ack-mode: manual_immediate# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误missing-topics-fatal: false# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepoll-timeout: 600000#批量提交#      type: batch

业务代码:

  1. 简单生产

    @RestController
    public class kafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/normal/{message}")public void sendNormalMessage(@PathVariable("message") String message) {kafkaTemplate.send("testTopic", message);}
    }
    
  2. 简单消费

注意加上@Component,被spring管理监听才有效

@Component
public class KafkaConsumer {//监听消费@KafkaListener(topics = {"testTopic"})public void onNormalMessage(ConsumerRecord<String, Object> record) {System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +record.value());}}

主题使用配置的方式:

  1. 在yml配置里加上topic

    server:port: 8080
    spring:kafka:# Kafka服务器,支持集群bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092# 自定义配置myconfig:topic: testTopic
  2. 简单生产+配置主题

    @RestController
    public class kafkaProducer {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@Value("${spring.kafka.myconfig.topic}")private String topic;@GetMapping("/kafka/normal/{message}")public void sendNormalMessage(@PathVariable("message") String message) {kafkaTemplate.send(topic, message);}
    }
  3. 简单消费+配置主题

注意这里不能用@Value注解的方式,会报错

@Component
public class KafkaConsumer {//监听消费@KafkaListener(topics = {"${spring.kafka.myconfig.topic}"})public void onNormalMessage(ConsumerRecord<String, Object> record) {System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +record.value());}}

如果是多个主题需要消费, 可以使用Spring的SpEl表达式

server:port: 8080
spring:kafka:# Kafka服务器,支持集群bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092# 自定义配置myconfig:topic: testTopic1,testTopic2
@KafkaListener(topics = {"#{'${spring.kafka.myconfig.topic}'.split(',')}"})
@Component
public class KafkaConsumer {//监听消费@KafkaListener(topics = {"#{'${spring.kafka.myconfig.topic}'.split(',')}"})public void onNormalMessage(ConsumerRecord<String, Object> record) {System.out.println("简单消费:" + record.topic() + "-" + record.partition() + "=" +record.value());}}

 Kafka数据重复和数据丢失的解决方案_kafka相同数据_谢小涛的博客-CSDN博客

相关文章:

Spring Boot集成kafka的相关配置

引入依赖&#xff1a; 额外依赖只需要这一个&#xff0c;kafka-client 不是springboot 的东西&#xff0c;那是原生的 kafka 客户端&#xff0c; kafka-test也不需要&#xff0c;是用代码控制broker的东西。 <dependency><groupId>org.springframework.kafka</g…...

Git(11)——Git相关问题解答以及常用命令总结

目录 一、简介 二、问题 三、常用命令总结 一、简介 本篇文章将介绍作者在学习Git的过程所遇到的困惑以及熟悉Git后总结的常用命令 二、问题 ①Git配置的邮箱和用户名和Git的ssh密钥有什么联系&#xff1f;假如我使用Gitlab在张三这个账户上配置了ssh公钥&#xff0c;但是…...

【LeetCode高频SQL50题-基础版】打卡第7天:第36~40题

文章目录 【LeetCode高频SQL50题-基础版】打卡第7天&#xff1a;第36~40题⛅前言按分类统计薪水&#x1f512;题目&#x1f511;题解 上级经理已离职的公司员工&#x1f512;题目&#x1f511;题解 换座位&#x1f512;题目&#x1f511;题解 电影评分&#x1f512;题目&#x…...

C++入门1

C入门1 1.前言2.命名空间1.C语言对于命名空间方面的缺陷2.命名空间的语法特性1.域作用限定符2.命名空间的可嵌套性 3.声明与定义分离的命名空间4.命名空间的展开5.多个命名空间中命名冲突6.对于命名空间的推荐写法 3.iostream1.cout和endl2.cin 3.缺省参数1.缺省参数的形式2.缺…...

Matlab论文插图绘制模板第118期—进阶气泡图

之前的文章中&#xff0c;分享过Matlab气泡图的绘制模板&#xff1a; 图虽说好看&#xff0c;但有一个缺点&#xff1a;需要手动调节两个图例的位置。 为了解决这一问题&#xff0c;我们不妨结合前段时间分享的紧凑排列多子图的绘制模板&#xff1a; 从而达到自动对齐排列的效…...

grafana接入OpenTSDB设置大盘语法

目录 1、filter过滤语法1.1 精准匹配1.2 正则匹配1.3 通配符匹配 完整示例1、 展示应用app的CPU利用率监控2&#xff09;展示应用app的在线核数 1、filter过滤语法 1.1 精准匹配 literal_or &#xff1a; tagv的过滤规则: 精确匹配多项迭代值&#xff0c;多项迭代值以’|分隔&a…...

HarmonyOS 远端状态订阅开发实例

IPC/RPC 提供对远端 Stub 对象状态的订阅机制&#xff0c; 在远端 Stub 对象消亡时&#xff0c;可触发消亡通知告诉本地 Proxy 对象。这种状态通知订阅需要调用特定接口完成&#xff0c;当不再需要订阅时也需要调用特定接口取消。使用这种订阅机制的用户&#xff0c;需要实现消…...

实战一:Http轮询弹幕拦截

系列文章目录 训练地址:https://www.qiulianmao.com websocket逆向http拦截websocket拦截视频号直播弹幕采集实战一:Http轮询更新中实战一:Http轮询 系列文章目录前言一、判断消息传输技术二、用户进入直播间三、 用户发言四、 用户送礼五、点赞事件六、用户唯一id的获取七…...

虚拟机独立 IP 配置

虚拟机独立 IP 配置 1. 点击虚拟网络编辑器 2. 点击更改设置 3. 查看本地电脑网卡型号并设置虚拟网络编辑器桥接网卡为同型号网卡 4. 设置有限网络信息 5. 点击网络编辑按钮并点击身份 6. 编辑名称并选择MAC地址 7. 配置 IPv4 地址后点击应用即可...

升级教育技术软件的多合一解决方案

当今时代技术和教育联系越来越紧密&#xff0c;教育机构对强大、安全、灵活的 IT 解决方案的探索至关重要。 全球事件、技术进步以及学生和教职员工不断变化的需求影响着不断变化的教育格局&#xff0c;我们要采取变革性的方法来确保教育的连续性和质量提升。 Splashtop Ente…...

c++视觉检测-----角点检测

角点检测&#xff1a;cornerHarris() cornerHarris()函数是OpenCV中用于执行Harris角点检测的函数。Harris角点检测是一种用于检测图像中角点的技术&#xff0c;通常用于特征检测和图像匹配。以下是cornerHarris()函数的用法&#xff1a; void cornerHarris(InputArray src, …...

虚拟机安装Docker

安装Docker Docker 分为 CE 和 EE 两大版本。CE 即社区版&#xff08;免费&#xff0c;支持周期 7 个月&#xff09;&#xff0c;EE 即企业版&#xff0c;强调安全&#xff0c;付费使用&#xff0c;支持周期 24 个月。 Docker CE 分为 stable test 和 nightly 三个更新频道。…...

虚幻引擎5:增强输入的使用方法

一、基本配置 1.创建一个输入映射上下文&#xff08;映射表&#xff09; 2.创建自己需要的操作映射或者轴映射 3.创建完成之后进入这个映射&#xff0c;来设置类型&#xff0c;共有4个类型 1.Digital:是旧版操作映射类型&#xff0c;一般是按下抬起来使用&#xff0c;像跳跃…...

buffer overflow detected

背景 在应用上云改造中&#xff0c;业务场景如下&#xff1a; 在使用ecs的场景中&#xff0c;应用的ip都是固定的&#xff1b;在使用k8s之后pod的ip就变的不固定了&#xff0c;k8s提供了statefulset的模式来支持这种场景&#xff0c;以固定域名的方式支持。 问题 在平台pod开…...

【c++源码】老飞飞源码完整v15源码(包含数据库前端后端源文件)

老飞飞源码完整v15源码&#xff08;包含数据库前端后端源文件&#xff09;程序来源于国外网站。程序仅供参考学习游戏开发流程。以及框架内容。 测试环境搭建 Visual Studio 2013 SQL Server 2008r Windows 10 和 11 专业版 这些文件已经过测试&#xff0c;搭建&#xff0c;运行…...

MySQL创建数据库、创建表操作和用户权限

1、创建数据库school&#xff0c;字符集为utf8 2、在school数据库中创建Student和Score表 3、授权用户tom&#xff0c;密码Mysql123&#xff0c;能够从任何地方登录并管理数据库school 4、使用mysql客户端登录服务器&#xff0c;重置root密码...

时间序列分析基础篇

**时间序列分析&#xff08;time series analysis&#xff09;是量化投资中的一门基本技术。时间序列是指在一定时间内按时间顺序测量的某个变量的取值序列。**比如变量是股票价格&#xff0c;那么它随时间的变化就是一个时间序列&#xff1b;同样的&#xff0c;如果变量是股票…...

Idea JavaWeb项目,继承自HttpFilter的过滤器,启动Tomcat时部署工件出错

JDK版本&#xff1a;1.8 Tomcat版本&#xff1a;8.5 10-Oct-2023 13:55:17.586 严重 [RMI TCP Connection(3)-127.0.0.1] org.apache.catalina.core.StandardContext.startInternal One or more Filters failed to start. Full details will be found in the appropriate conta…...

02Maven核心程序的下载与settings.xml文件的配置,环境变量的配置

Maven核心程序的解压与配置 Maven的下载与解压 Maven官网下载安装包 将下载的Maven核心程序压缩包apache-maven-3.8.4-bin.zip解压到一个非中文且没有空格的目录 Maven的核心配置文件 在Maven的解压目录conf中我们需要配置Maven的核心配置文件settings.xml 配置本地仓库位置…...

栈实现深度优先搜索

引言 之前刚学DFS的时候并不完全理解为什么递归可以一直往下做&#xff0c;后来直到了递归的本质是栈&#xff0c;就想着能不能手写栈来代替递归呢。当时刚学&#xff0c;自己觉得水平不够就搁置了这个想法&#xff0c;今天上数据结构老师正好讲了栈的应用&#xff0c;其中就有…...

Java 基于SpringBoot的某家乡美食系统

1 简介 《Java 基于SpringBoot的某家乡美食系统》该项目含有源码、文档等资料、配套开发软件、软件安装教程等。系统功能完整&#xff0c;适合作为毕业设计、课程设计、数据库大作业学习使用。 功能介绍 这个项目是基于 SpringBoot和 Vue 开发的地方美食系统&#xff0c;包括…...

splice 和 slice 会改变原数组吗? 怎么删除数组最后一个元素?

1、splice 和 slice 会改变原数组吗? splice() 会改变原数组&#xff0c;返回的是改变的内容&#xff1b; splice() 方法可能是数组中的最强大的方法之一了&#xff0c;使用它的形式有很多种&#xff0c;它会向/从数组中添加/删除项目&#xff0c;然后返回被删除的项目。 该方…...

解锁互联网安全的新钥匙:JWT(JSON Web Token)

目录 前言 一、JWT简介 1. 什么是JWT&#xff1f; ​编辑 2. JWT的工作原理 3.JWT如何工作的 4. JWT的优势 5. 在实际应用中使用JWT 6.传统Session和JWT认证的区别 6.1.session认证方式 6.2.JWT认证方式 7.基于Token的身份认证 与 基于服务器的身份认证 二、JWT的…...

alsa音频pcm设备之i2c调试

i2cdetect 列举 I2C bus i2cdetect -l ls /dev/i2c* 列出I2C bus i2c-7 上面连接的所有设备,并得到i2c设备地址 i2cdetect -y 7 发现i2c设备的位置显示为UU或表示设备地址的数值,UU表示设备在driver中被使用. I2cdump i2c设备大量register的值 i2cdump -y 7 0x40 I2cset设置…...

1. Windows平台下如何编译C++版本的Redis库hiredis

Redis是一个key-value存储系统。和Memcached类似&#xff0c;它支持存储的value类型相对更多&#xff0c;包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash&#xff08;哈希类型&#xff09;。这些数据类型都支持push/pop、add/remove及取交集并…...

Centos中利用自带的定时器Crontab_实现mysql数据库自动备份_linux中mysql自动备份脚本---Linux运维工作笔记056

这个经常需要,怕出问题因而需要经常备份数据库,可以利用centos自带的定时器,配合脚本实现自动备份. 1.首先查看一下,这个crontab服务有没有打开: 执行:ntsysv 可以看到已经开机自启动了. 注意这个操作界面,用鼠标不行,需要用,tab按键,直接tab到确定,或取消,然后按回车回到命…...

完美解决Android adb install 安装提示 INSTALL_FAILED_TEST_ONLY

完美解决Android adb install 安装提示 INSTALL_FAILED_TEST_ONLY 目录 所遇问题 有些时候我们用命令进行安装apk如下&#xff1a; adb install xxx.apk但是会安装不成功&#xff0c;报如下错误&#xff1a; 错误现象&#xff1a;提示&#xff1a;Failed to install app-d…...

[清华大学]漏洞挖掘之状态敏感的模糊测试StateFuzz

Dr.赵博栋 Prof.张超 清华大学 网络研究院 INSC 本文主要介绍了通过State Fuzz对Linux驱动程序进行模糊测试&#xff0c;该Fuzz方法由赵博栋博士在InForSec会议上分享&#xff0c;并在USENIX Security上发布.StateFuzz :System Call-Based State-Aware Linux Driver Fuzzing.该…...

嵌入式养成计划-40----C++菱形继承--虚继承--多态--模板--异常

九十四、菱形继承 94.1 概念 菱形继承又称为钻石继承&#xff0c;是由公共基类派生出多个中间子类&#xff0c;又由中间子类共同派生出汇聚子类&#xff0c;汇聚子类会得到多份中间子类从公共基类继承下来的数据成员&#xff0c;会造成空间浪费&#xff0c;没有必要。 所以存…...

C++入门指南:类和对象总结友元类笔记(下)

C入门指南:类和对象总结友元类笔记&#xff08;下&#xff09; 一、深度剖析构造函数1.1 构造函数体赋值1.2 初始化列表1.3 explicit关键字 二、static成员2.1 概念2.2 特性 三、友元3.1 友元函数3.2 友元类 四、 内部类4.1 概念4.2 特征 五、拷贝对象时的一些编译器优化六、深…...