springboot整合kafka-笔记
springboot整合kafka-笔记
配置pom.xml
这里我的springboot版本是2.3.8.RELEASE,使用的kafka-mq的版本是2.12
<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.3.8.RELEASE</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.3.6.RELEASE</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.3.6.RELEASE</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.1</version></dependency></dependencies>
配置application.yml
spring:kafka:bootstrap-servers: 10.1.5.212:9092
配置EnableKafka注解
@EnableKafka
@EnableScheduling
@SpringBootApplication
public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class,args);}
}
定义kafka消息生产者
package cn.test.kafka;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private String topic= "my816topic";public void sendMessage(String message){log.info("向kafka发送消息:{}",message);kafkaTemplate.send(topic,message);}
}
定义kafka消息消费者
package cn.test.kafka;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Slf4j
@Service
public class KafkaConsumerService {@KafkaListener(topics = "my816topic",groupId = "mygroup")public void listen(String message){log.info("rec msg:{}",message);}
}
测试发送kafka消息
@Component
@Slf4j
public class TestJob {@Autowiredprivate ObjectMapper objectMapper;@Autowiredprivate KafkaProducerService kafkaProducerService;@Scheduled(cron = "0/13 * * * * ?")public void hwreg() {String s = null;try {s = objectMapper.writeValueAsString(new CityInfo("hefei"+System.currentTimeMillis(), 117.17, 31.52));} catch (JsonProcessingException e) {e.printStackTrace();}//测试发送字符串消息kafkaProducerService.sendMessage(s);}
测试发送kafka消息-控制台日志
2023-08-16 19:16:50,832 INFO [17324] [main] [] o.a.k.c.c.ConsumerConfig [AbstractConfig.java : 347] ConsumerConfig values: allow.auto.create.topics = trueauto.commit.interval.ms = 5000auto.offset.reset = latestbootstrap.servers = [10.1.5.212:9092]check.crcs = trueclient.dns.lookup = defaultclient.id = client.rack = connections.max.idle.ms = 540000default.api.timeout.ms = 60000enable.auto.commit = falseexclude.internal.topics = truefetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = mygroupgroup.instance.id = nullheartbeat.interval.ms = 3000interceptor.classes = []internal.leave.group.on.close = trueisolation.level = read_uncommittedkey.deserializer = class org.apache.kafka.common.serialization.StringDeserializermax.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 500metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsend.buffer.bytes = 131072session.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.endpoint.identification.algorithm = httpsssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKSvalue.deserializer = class org.apache.kafka.common.serialization.StringDeserializer2023-08-16 19:16:50,884 INFO [17324] [main] [] o.a.k.c.u.AppInfoParser [AppInfoParser.java : 117] Kafka version: 2.3.1
2023-08-16 19:16:50,884 INFO [17324] [main] [] o.a.k.c.u.AppInfoParser [AppInfoParser.java : 118] Kafka commitId: 18a913733fb71c01
2023-08-16 19:16:50,884 INFO [17324] [main] [] o.a.k.c.u.AppInfoParser [AppInfoParser.java : 119] Kafka startTimeMs: 1692184610882
2023-08-16 19:16:50,887 INFO [17324] [main] [] o.a.k.c.c.KafkaConsumer [KafkaConsumer.java : 964] [Consumer clientId=consumer-1, groupId=mygroup] Subscribed to topic(s): my816topic
2023-08-16 19:16:51,094 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.Metadata [Metadata.java : 261] [Consumer clientId=consumer-1, groupId=mygroup] Cluster ID: mUe0Z0StRd2_X-_m_S557A
2023-08-16 19:16:51,096 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java : 728] [Consumer clientId=consumer-1, groupId=mygroup] Discovered group coordinator 10.1.5.212:9092 (id: 2147483647 rack: null)
2023-08-16 19:16:51,099 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java : 476] [Consumer clientId=consumer-1, groupId=mygroup] Revoking previously assigned partitions []
2023-08-16 19:16:51,099 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.s.k.l.KafkaMessageListenerContainer [LogAccessor.java : 292] mygroup: partitions revoked: []
2023-08-16 19:16:51,100 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java : 505] [Consumer clientId=consumer-1, groupId=mygroup] (Re-)joining group
2023-08-16 19:16:51,114 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java : 505] [Consumer clientId=consumer-1, groupId=mygroup] (Re-)joining group
2023-08-16 19:16:51,126 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.AbstractCoordinator [AbstractCoordinator.java : 469] [Consumer clientId=consumer-1, groupId=mygroup] Successfully joined group with generation 9
2023-08-16 19:16:51,130 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java : 283] [Consumer clientId=consumer-1, groupId=mygroup] Setting newly assigned partitions: my816topic-0
2023-08-16 19:16:51,141 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java : 525] [Consumer clientId=consumer-1, groupId=mygroup] Setting offset for partition my816topic-0 to the committed offset FetchPosition{offset=288, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=10.1.5.212:9092 (id: 0 rack: null), epoch=0}}
2023-08-16 19:16:51,152 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] o.s.k.l.KafkaMessageListenerContainer [LogAccessor.java : 292] mygroup: partitions assigned: [my816topic-0]
2023-08-16 19:16:52,049 INFO [17324] [scheduling-1] [] c.t.k.KafkaProducerService [KafkaProducerService.java : 19] 向kafka发送消息:{"city":"hefei1692184612007","longitude":117.17,"latitude":31.52}
2023-08-16 19:16:52,053 INFO [17324] [scheduling-1] [] o.a.k.c.p.ProducerConfig [AbstractConfig.java : 347] ProducerConfig values: acks = 1batch.size = 16384bootstrap.servers = [10.1.5.212:9092]buffer.memory = 33554432client.dns.lookup = defaultclient.id = compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = falseinterceptor.classes = []key.serializer = class org.apache.kafka.common.serialization.StringSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitionerreceive.buffer.bytes = 32768reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retries = 2147483647retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsend.buffer.bytes = 131072ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.endpoint.identification.algorithm = httpsssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKStransaction.timeout.ms = 60000transactional.id = nullvalue.serializer = class org.apache.kafka.common.serialization.StringSerializer2023-08-16 19:16:52,072 INFO [17324] [scheduling-1] [] o.a.k.c.u.AppInfoParser [AppInfoParser.java : 117] Kafka version: 2.3.1
2023-08-16 19:16:52,072 INFO [17324] [scheduling-1] [] o.a.k.c.u.AppInfoParser [AppInfoParser.java : 118] Kafka commitId: 18a913733fb71c01
2023-08-16 19:16:52,072 INFO [17324] [scheduling-1] [] o.a.k.c.u.AppInfoParser [AppInfoParser.java : 119] Kafka startTimeMs: 1692184612071
2023-08-16 19:16:52,079 INFO [17324] [kafka-producer-network-thread | producer-1] [] o.a.k.c.Metadata [Metadata.java : 261] [Producer clientId=producer-1] Cluster ID: mUe0Z0StRd2_X-_m_S557A2023-08-16 19:16:52,115 INFO [17324] [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [] c.t.k.KafkaConsumerService [KafkaConsumerService.java : 14] rec msg:{"city":"hefei1692184612007","longitude":117.17,"latitude":31.52}相关文章:
springboot整合kafka-笔记
springboot整合kafka-笔记 配置pom.xml 这里我的springboot版本是2.3.8.RELEASE,使用的kafka-mq的版本是2.12 <dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>s…...
Rust软件外包开发语言的特点
Rust 是一种系统级编程语言,强调性能、安全性和并发性的编程语言,适用于广泛的应用领域,特别是那些需要高度可靠性和高性能的场景。下面和大家分享 Rust 语言的一些主要特点以及适用的场合,希望对大家有所帮助。北京木奇移动技术有…...
Spring Boot业务代码中使用@Transactional事务失效踩坑点总结
1.概述 接着之前我们对Spring AOP以及基于AOP实现事务控制的上文,今天我们来看看平时在项目业务开发中使用声明式事务Transactional的失效场景,并分析其失效原因,从而帮助开发人员尽量避免踩坑。 我们知道 Spring 声明式事务功能提供了极其…...
知识体系总结(九)设计原则、设计模式、分布式、高性能、高可用
文章目录 架构设计为什么要进行技术框架的设计 六大设计原则一、单一职责原则二、开闭原则三、依赖倒置原则四、接口分离原则五、迪米特法则(又称最小知道原则)六、里氏替换原则案例诠释 常见设计模式构造型单例模式工厂模式简单工厂工厂方法 生成器模式…...
Springboot 集成Beetl模板
一、在启动类下的pom.xml中导入依赖: <!--beetl模板引擎--><dependency><groupId>com.ibeetl</groupId><artifactId>beetl</artifactId><version>2.9.8</version></dependency> 二、 配置 beetl需要的Beetl…...
RabbitMQ查询队列使用情况和消费者详情实现
spring-boot-starter-amqp spring-boot-starter-amqp是Spring Boot框架中与AMQP(高级消息队列协议)相关的自动配置启动器。它提供了使用AMQP进行消息传递和异步通信的功能。 以下是spring-boot-starter-amqp的主要特性和功能: 自动配置:spring-boot-starter-amqp通过自动…...
Spark第二课RDD的详解
1.前言 RDD JAVA中的IO 1.小知识点穿插 1. 装饰者设计模式 装饰者设计模式:本身功能不变,扩展功能. 举例: 数据流的读取 一层一层的包装,进而将功能进行进一步的扩展 2.sleep和wait的区别 本质区别是字体不一样,sleep斜体,wait正常 斜体是静态方法…...
人工智能学习框架—飞桨Paddle人工智能
1.人工智能框架 机器学习的三要素:模型、学习策略、优化算法。 当我们用机器学习来解决一些模式识别任务时,一般的流程包含以下几个步骤: 1.1.浅层学习和深度学习 浅层学习(Shallow Learning):不涉及特征学习,其特征…...
SElinux 导致 Keepalived 检测脚本无法执行
哈喽大家好,我是咸鱼 今天我们来看一个关于 Keepalived 检测脚本无法执行的问题 一位粉丝后台私信我,说他部署的 keepalived 集群 vrrp_script 模块中的脚本执行失败了,但是手动执行这个脚本却没有任何问题 这个问题也是咸鱼第一次遇到&…...
2022年电赛C题——小车跟随行驶系统——做题记录以及经验分享
前言 自己打算将做过的电赛真题,主要包含控制组的,近几年出现的小车控制题目,自己做过的真题以及在准备电赛期间刷真题出现的问题以及经验分享给大家 这次带来的是22年电赛C题——小车跟随行驶系统,这道题目指定使用的是TI的单片…...
vscode + python
序 参考链接: 【教程】VScode中配置Python运行环境_哔哩哔哩_bilibili Python部分 Python Releases for Windows | Python.org vscode部分 Visual Studio Code - Code Editing. Redefined 一路next,全部勾上: 就可以了: 安装插…...
badgerdb里面的事务
事务的ACID A 原子性(Atomicity) 多步骤操作,只能是两种状态,要么所有的步骤都成功执行,要么所有的步骤都不执行,举例说明就是小明向小红转账30元的场景,拆分成两个步骤,步骤1&#…...
C# this.Invoke(new Action(() => { /* some code */ }))用法说明
在 C# 中,this.Invoke(new Action(() > { /* some code */ })) 是一种用于在 UI 线程上执行代码的方法,通常用于在后台线程中更新 UI 控件的值或执行其他需要在 UI 线程上执行的操作。 在 Windows Forms 或 WPF 等图形界面应用程序中,UI …...
MongoDB:MySQL,Redis,ES,MongoDB的应用场景
简单明了说明MySQL,ES,MongoDB的各自特点,应用场景,以及MongoDB如何使用的第一章节. 一. SQL与NoSQL SQL被称为结构化查询语言.是传统意义上的数据库,数据之间存在很明确的关联关系,例如主外键关联,这种结构可以确保数据的完整性(数据没有缺失并且正确).但是正因为这种严密的结…...
leetcode每日一题_2682.找出转圈游戏输家
2682.找出转圈游戏输家 题目: n 个朋友在玩游戏。这些朋友坐成一个圈,按 顺时针方向 从 1 到 n 编号。从第 i 个朋友的位置开始顺时针移动 1 步会到达第 (i 1) 个朋友的位置(1 < i < n),而从第 n 个朋友的位置开始顺时针移…...
OpenCV之薄板样条插值(ThinPlateSpline)
官方文档:OpenCV: cv::ThinPlateSplineShapeTransformer Class Reference 使用方法: 头文件:#include <opencv2/shape/shape_transformer.hpp> (1)点匹配 一般根据有多少个样本(或者点)…...
034_小驰私房菜_[问题复盘] Qcom平台,某些三方相机拍照旋转90度
全网最具价值的Android Camera开发学习系列资料~ 作者:8年Android Camera开发,从Camera app一直做到Hal和驱动~ 欢迎订阅,相信能扩展你的知识面,提升个人能力~ 【一、问题】 某些三方相机,预览正常,拍照旋转90度 【二、问题排查】 1 ) HAL这边Jpeg编码数据在哪个地方…...
【TI-CCS笔记】工程编译配置 bin文件的编译和生成 各种架构的Post-build配置汇总
【TI-CCS笔记】工程编译配置 bin文件的编译和生成 各种架构的Post-build配置汇总 TI编译器分类 在CCS按照目录下 有个名为${CG_TOOL_ROOT}的目录 其下就是当前工程的编译器 存放目录为: C:\ti\ccs1240\ccs\tools\compiler按类型分为五种: ti-cgt-arm…...
深入探索Java中的File类与IO操作:从路径到文件的一切
文章目录 1. File类的作用与构造方法2. File类常用方法:获取、判断和创建2.1 获取功能方法2.2 判断功能方法2.3 创建和删除功能方法2.4 目录的遍历方法 3. 递归:探索更深的层次代码示例:递归遍历文件夹 结论 🎉欢迎来到Java学习路…...
Python 处理 Excel 表格的 14 个常用操作
目录 1. 安装依赖库 2. 导入库 3. 读取Excel文件 4. 写入Excel文件 5. 创建工作表 6. 访问工作表 7. 读取单元格数据 8. 写入单元格数据 9. 获取行数和列数 10. 过滤数据 11. 排序数据 12. 添加新行 13. 删除行或列 14. 计算汇总统计 总结 无论是数据分析师、财…...
51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...
rknn优化教程(二)
文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK,开始写第二篇的内容了。这篇博客主要能写一下: 如何给一些三方库按照xmake方式进行封装,供调用如何按…...
JavaScript 中的 ES|QL:利用 Apache Arrow 工具
作者:来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗?了解下一期 Elasticsearch Engineer 培训的时间吧! Elasticsearch 拥有众多新功能,助你为自己…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
《基于Apache Flink的流处理》笔记
思维导图 1-3 章 4-7章 8-11 章 参考资料 源码: https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...
ubuntu22.04 安装docker 和docker-compose
首先你要确保没有docker环境或者使用命令删掉docker sudo apt-get remove docker docker-engine docker.io containerd runc安装docker 更新软件环境 sudo apt update sudo apt upgrade下载docker依赖和GPG 密钥 # 依赖 apt-get install ca-certificates curl gnupg lsb-rel…...
第一篇:Liunx环境下搭建PaddlePaddle 3.0基础环境(Liunx Centos8.5安装Python3.10+pip3.10)
第一篇:Liunx环境下搭建PaddlePaddle 3.0基础环境(Liunx Centos8.5安装Python3.10pip3.10) 一:前言二:安装编译依赖二:安装Python3.10三:安装PIP3.10四:安装Paddlepaddle基础框架4.1…...
spring Security对RBAC及其ABAC的支持使用
RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型,它将权限分配给角色,再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...
jdbc查询mysql数据库时,出现id顺序错误的情况
我在repository中的查询语句如下所示,即传入一个List<intager>的数据,返回这些id的问题列表。但是由于数据库查询时ID列表的顺序与预期不一致,会导致返回的id是从小到大排列的,但我不希望这样。 Query("SELECT NEW com…...
