springboot整合kafka-笔记
springboot整合kafka-笔记
配置pom.xml
这里我的springboot版本是2.3.8.RELEASE,使用的kafka-mq的版本是2.12
<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.3.8.RELEASE</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.3.6.RELEASE</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.3.6.RELEASE</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency></dependencies>
配置application.yml
spring:kafka:bootstrap-servers: 10.1.5.212:9092
配置EnableKafka注解
@EnableKafka
@EnableScheduling
@SpringBootApplication
public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class,args);}
}
定义kafka消息生产者
package cn.test.kafka;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private String topic= "my816topic";public void sendMessage(String message){log.info("向kafka发送消息:{}",message);kafkaTemplate.send(topic,message);}
}
定义kafka消息消费者
package cn.test.kafka;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
public class KafkaConsumerService {@KafkaListener(topics = "my816topic",groupId = "mygroup")public void listen(String message){log.info("rec msg:{}",message);}
}
测试发送kafka消息
@Component
@Slf4j
public class TestJob {@Autowiredprivate ObjectMapper objectMapper;@Autowiredprivate KafkaProducerService kafkaProducerService;@Scheduled(cron = "0/13 * * * * ?")public void hwreg() {String s = null;try {s = objectMapper.writeValueAsString(new CityInfo("hefei"+System.currentTimeMillis(), 117.17, 31.52));} catch (JsonProcessingException e) {e.printStackTrace();}//测试发送字符串消息kafkaProducerService.sendMessage(s);}
测试发送kafka消息-控制台日志
2023-08-16 19:16:50,832 INFO [17324] [main] [] o.a.k.c.c.ConsumerConfig [AbstractConfig.java : 347] ConsumerConfig values: allow.auto.create.topics = trueauto.commit.interval.ms = 5000auto.offset.reset = latestbootstrap.servers = [10.1.5.212:9092]check.crcs = trueclient.dns.lookup = defaultclient.id = client.rack = connections.max.idle.ms = 540000default.api.timeout.ms = 60000enable.auto.commit = falseexclude.internal.topics = truefetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = mygroupgroup.instance.id = nullheartbeat.interval.ms = 3000interceptor.classes = []internal.leave.group.on.close = trueisolation.level = read_uncommittedkey.deserializer = class org.apache.kafka.common.serialization.StringDeserializermax.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 500metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsend.buffer.bytes = 131072session.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.endpoint.identification.algorithm = httpsssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKSvalue.deserializer = class org.apache.kafka.common.serialization.StringDeserializer2023-08-16 19:16:50,884 INFO [17324] [main] [] o.a.k.c.u.AppInfoParser [AppInfoParser.java : 117] Kafka version: 2.3.1
2023-08-16 19:16:50,884 INFO [17324] [main] [] o.a.k.c.u.AppInfoParser [AppInfoParser.java : 118] Kafka commitId: 18a913733fb71c01
2023-08-16 19:16:50,884 INFO [17324] [main] [] o.a.k.c.u.AppInfoParser [AppInfoParser.java : 119] Kafka startTimeMs: 1692184610882
2023-08-16 19:16:50,887 INFO [17324] [main] [] o.a.k.c.c.KafkaConsumer [KafkaConsumer.java : 964] [Consumer clientId=consumer-1, groupId=mygroup] Subscribed to topic(s): my816topic
2023-08-16 19:16:51,094 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.Metadata [Metadata.java : 261] [Consumer clientId=consumer-1, groupId=mygroup] Cluster ID: mUe0Z0StRd2_X-_m_S557A
2023-08-16 19:16:51,096 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java : 728] [Consumer clientId=consumer-1, groupId=mygroup] Discovered group coordinator 10.1.5.212:9092 (id: 2147483647 rack: null)
2023-08-16 19:16:51,099 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java : 476] [Consumer clientId=consumer-1, groupId=mygroup] Revoking previously assigned partitions []
2023-08-16 19:16:51,099 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.s.k.l.KafkaMessageListenerContainer [LogAccessor.java : 292] mygroup: partitions revoked: []
2023-08-16 19:16:51,100 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java : 505] [Consumer clientId=consumer-1, groupId=mygroup] (Re-)joining group
2023-08-16 19:16:51,114 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java : 505] [Consumer clientId=consumer-1, groupId=mygroup] (Re-)joining group
2023-08-16 19:16:51,126 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java : 469] [Consumer clientId=consumer-1, groupId=mygroup] Successfully joined group with generation 9
2023-08-16 19:16:51,130 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java : 283] [Consumer clientId=consumer-1, groupId=mygroup] Setting newly assigned partitions: my816topic-0
2023-08-16 19:16:51,141 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java : 525] [Consumer clientId=consumer-1, groupId=mygroup] Setting offset for partition my816topic-0 to the committed offset FetchPosition{offset=288, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=10.1.5.212:9092 (id: 0 rack: null), epoch=0}}
2023-08-16 19:16:51,152 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.s.k.l.KafkaMessageListenerContainer [LogAccessor.java : 292] mygroup: partitions assigned: [my816topic-0]
2023-08-16 19:16:52,049 INFO [17324] [scheduling-1] [] c.t.k.KafkaProducerService [KafkaProducerService.java : 19] 向kafka发送消息:{"city":"hefei1692184612007","longitude":117.17,"latitude":31.52}
2023-08-16 19:16:52,053 INFO [17324] [scheduling-1] [] o.a.k.c.p.ProducerConfig [AbstractConfig.java : 347] ProducerConfig values: acks = 1batch.size = 16384bootstrap.servers = [10.1.5.212:9092]buffer.memory = 33554432client.dns.lookup = defaultclient.id = compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = falseinterceptor.classes = []key.serializer = class org.apache.kafka.common.serialization.StringSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitionerreceive.buffer.bytes = 32768reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retries = 2147483647retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsend.buffer.bytes = 131072ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.endpoint.identification.algorithm = httpsssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKStransaction.timeout.ms = 60000transactional.id = nullvalue.serializer = class org.apache.kafka.common.serialization.StringSerializer2023-08-16 19:16:52,072 INFO [17324] [scheduling-1] [] o.a.k.c.u.AppInfoParser [AppInfoParser.java : 117] Kafka version: 2.3.1
2023-08-16 19:16:52,072 INFO [17324] [scheduling-1] [] o.a.k.c.u.AppInfoParser [AppInfoParser.java : 118] Kafka commitId: 18a913733fb71c01
2023-08-16 19:16:52,072 INFO [17324] [scheduling-1] [] o.a.k.c.u.AppInfoParser [AppInfoParser.java : 119] Kafka startTimeMs: 1692184612071
2023-08-16 19:16:52,079 INFO [17324] [kafka-producer-network-thread | producer-1] [] o.a.k.c.Metadata [Metadata.java : 261] [Producer clientId=producer-1] Cluster ID: mUe0Z0StRd2_X-_m_S557A2023-08-16 19:16:52,115 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] c.t.k.KafkaConsumerService [KafkaConsumerService.java : 14] rec msg:{"city":"hefei1692184612007","longitude":117.17,"latitude":31.52}
相关文章:
springboot整合kafka-笔记
springboot整合kafka-笔记 配置pom.xml 这里我的springboot版本是2.3.8.RELEASE,使用的kafka-mq的版本是2.12 <dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>s…...

Rust软件外包开发语言的特点
Rust 是一种系统级编程语言,强调性能、安全性和并发性的编程语言,适用于广泛的应用领域,特别是那些需要高度可靠性和高性能的场景。下面和大家分享 Rust 语言的一些主要特点以及适用的场合,希望对大家有所帮助。北京木奇移动技术有…...

Spring Boot业务代码中使用@Transactional事务失效踩坑点总结
1.概述 接着之前我们对Spring AOP以及基于AOP实现事务控制的上文,今天我们来看看平时在项目业务开发中使用声明式事务Transactional的失效场景,并分析其失效原因,从而帮助开发人员尽量避免踩坑。 我们知道 Spring 声明式事务功能提供了极其…...

知识体系总结(九)设计原则、设计模式、分布式、高性能、高可用
文章目录 架构设计为什么要进行技术框架的设计 六大设计原则一、单一职责原则二、开闭原则三、依赖倒置原则四、接口分离原则五、迪米特法则(又称最小知道原则)六、里氏替换原则案例诠释 常见设计模式构造型单例模式工厂模式简单工厂工厂方法 生成器模式…...

Springboot 集成Beetl模板
一、在启动类下的pom.xml中导入依赖: <!--beetl模板引擎--><dependency><groupId>com.ibeetl</groupId><artifactId>beetl</artifactId><version>2.9.8</version></dependency> 二、 配置 beetl需要的Beetl…...
RabbitMQ查询队列使用情况和消费者详情实现
spring-boot-starter-amqp spring-boot-starter-amqp是Spring Boot框架中与AMQP(高级消息队列协议)相关的自动配置启动器。它提供了使用AMQP进行消息传递和异步通信的功能。 以下是spring-boot-starter-amqp的主要特性和功能: 自动配置:spring-boot-starter-amqp通过自动…...

Spark第二课RDD的详解
1.前言 RDD JAVA中的IO 1.小知识点穿插 1. 装饰者设计模式 装饰者设计模式:本身功能不变,扩展功能. 举例: 数据流的读取 一层一层的包装,进而将功能进行进一步的扩展 2.sleep和wait的区别 本质区别是字体不一样,sleep斜体,wait正常 斜体是静态方法…...

人工智能学习框架—飞桨Paddle人工智能
1.人工智能框架 机器学习的三要素:模型、学习策略、优化算法。 当我们用机器学习来解决一些模式识别任务时,一般的流程包含以下几个步骤: 1.1.浅层学习和深度学习 浅层学习(Shallow Learning):不涉及特征学习,其特征…...

SElinux 导致 Keepalived 检测脚本无法执行
哈喽大家好,我是咸鱼 今天我们来看一个关于 Keepalived 检测脚本无法执行的问题 一位粉丝后台私信我,说他部署的 keepalived 集群 vrrp_script 模块中的脚本执行失败了,但是手动执行这个脚本却没有任何问题 这个问题也是咸鱼第一次遇到&…...

2022年电赛C题——小车跟随行驶系统——做题记录以及经验分享
前言 自己打算将做过的电赛真题,主要包含控制组的,近几年出现的小车控制题目,自己做过的真题以及在准备电赛期间刷真题出现的问题以及经验分享给大家 这次带来的是22年电赛C题——小车跟随行驶系统,这道题目指定使用的是TI的单片…...

vscode + python
序 参考链接: 【教程】VScode中配置Python运行环境_哔哩哔哩_bilibili Python部分 Python Releases for Windows | Python.org vscode部分 Visual Studio Code - Code Editing. Redefined 一路next,全部勾上: 就可以了: 安装插…...

badgerdb里面的事务
事务的ACID A 原子性(Atomicity) 多步骤操作,只能是两种状态,要么所有的步骤都成功执行,要么所有的步骤都不执行,举例说明就是小明向小红转账30元的场景,拆分成两个步骤,步骤1&#…...
C# this.Invoke(new Action(() => { /* some code */ }))用法说明
在 C# 中,this.Invoke(new Action(() > { /* some code */ })) 是一种用于在 UI 线程上执行代码的方法,通常用于在后台线程中更新 UI 控件的值或执行其他需要在 UI 线程上执行的操作。 在 Windows Forms 或 WPF 等图形界面应用程序中,UI …...
MongoDB:MySQL,Redis,ES,MongoDB的应用场景
简单明了说明MySQL,ES,MongoDB的各自特点,应用场景,以及MongoDB如何使用的第一章节. 一. SQL与NoSQL SQL被称为结构化查询语言.是传统意义上的数据库,数据之间存在很明确的关联关系,例如主外键关联,这种结构可以确保数据的完整性(数据没有缺失并且正确).但是正因为这种严密的结…...
leetcode每日一题_2682.找出转圈游戏输家
2682.找出转圈游戏输家 题目: n 个朋友在玩游戏。这些朋友坐成一个圈,按 顺时针方向 从 1 到 n 编号。从第 i 个朋友的位置开始顺时针移动 1 步会到达第 (i 1) 个朋友的位置(1 < i < n),而从第 n 个朋友的位置开始顺时针移…...
OpenCV之薄板样条插值(ThinPlateSpline)
官方文档:OpenCV: cv::ThinPlateSplineShapeTransformer Class Reference 使用方法: 头文件:#include <opencv2/shape/shape_transformer.hpp> (1)点匹配 一般根据有多少个样本(或者点)…...

034_小驰私房菜_[问题复盘] Qcom平台,某些三方相机拍照旋转90度
全网最具价值的Android Camera开发学习系列资料~ 作者:8年Android Camera开发,从Camera app一直做到Hal和驱动~ 欢迎订阅,相信能扩展你的知识面,提升个人能力~ 【一、问题】 某些三方相机,预览正常,拍照旋转90度 【二、问题排查】 1 ) HAL这边Jpeg编码数据在哪个地方…...

【TI-CCS笔记】工程编译配置 bin文件的编译和生成 各种架构的Post-build配置汇总
【TI-CCS笔记】工程编译配置 bin文件的编译和生成 各种架构的Post-build配置汇总 TI编译器分类 在CCS按照目录下 有个名为${CG_TOOL_ROOT}的目录 其下就是当前工程的编译器 存放目录为: C:\ti\ccs1240\ccs\tools\compiler按类型分为五种: ti-cgt-arm…...

深入探索Java中的File类与IO操作:从路径到文件的一切
文章目录 1. File类的作用与构造方法2. File类常用方法:获取、判断和创建2.1 获取功能方法2.2 判断功能方法2.3 创建和删除功能方法2.4 目录的遍历方法 3. 递归:探索更深的层次代码示例:递归遍历文件夹 结论 🎉欢迎来到Java学习路…...

Python 处理 Excel 表格的 14 个常用操作
目录 1. 安装依赖库 2. 导入库 3. 读取Excel文件 4. 写入Excel文件 5. 创建工作表 6. 访问工作表 7. 读取单元格数据 8. 写入单元格数据 9. 获取行数和列数 10. 过滤数据 11. 排序数据 12. 添加新行 13. 删除行或列 14. 计算汇总统计 总结 无论是数据分析师、财…...

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
基于大模型的 UI 自动化系统
基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...

Appium+python自动化(十六)- ADB命令
简介 Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具,其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利,如安装和调试…...
Java如何权衡是使用无序的数组还是有序的数组
在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...
五年级数学知识边界总结思考-下册
目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解:由来、作用与意义**一、知识点核心内容****二、知识点的由来:从生活实践到数学抽象****三、知识的作用:解决实际问题的工具****四、学习的意义:培养核心素养…...

Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具
文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)
前言: 最近在做行为检测相关的模型,用的是时空图卷积网络(STGCN),但原有kinetic-400数据集数据质量较低,需要进行细粒度的标注,同时粗略搜了下已有开源工具基本都集中于图像分割这块,…...
作为测试我们应该关注redis哪些方面
1、功能测试 数据结构操作:验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化:测试aof和aof持久化机制,确保数据在开启后正确恢复。 事务:检查事务的原子性和回滚机制。 发布订阅:确保消息正确传递。 2、性…...
学习一下用鸿蒙DevEco Studio HarmonyOS5实现百度地图
在鸿蒙(HarmonyOS5)中集成百度地图,可以通过以下步骤和技术方案实现。结合鸿蒙的分布式能力和百度地图的API,可以构建跨设备的定位、导航和地图展示功能。 1. 鸿蒙环境准备 开发工具:下载安装 De…...