Kafka在Java项目中的应用
Kafka在Java项目中的应用
Docker 安装Kafka
一.首先需要安装docker,可看这篇文章安装docker
二.拉取zookeeper和KafKa镜像
docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafka
Kafka组件需要向zookeeper进行注册,所以也需要安装zookeeper
三.启动zookeeper、kafka组件
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeperdocker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
启动成功界面如下,status即为running(运行中)
四.创建Springboot项目
4.1 添加依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies>
4.2 application.yml文件
server:port: 9090
spring:kafka:bootstrap-servers: localhost:9092consumer:# 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)auto-offset-reset: earliestproducer:value-serializer: org.springframework.kafka.support.serializer.JsonSerializerretries: 3 # 重试次数
kafka:topic:my-topic: my-topicmy-topic2: my-topic2
4.3 创建实体类Book
public class Book {private Long id;private String name;public Book() {}public Book(Long id, String name) {this.id = id;this.name = name;}public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic String toString() {return "Book{" +"id=" + id +", name='" + name + '\'' +'}';}
}
4.4 配置KafKa信息
@Configuration
public class KafkaConfig {@Value("${kafka.topic.my-topic}")String myTopic;@Value("${kafka.topic.my-topic2}")String myTopic2;/*** JSON消息转换器*/@Beanpublic RecordMessageConverter jsonConverter() {return new StringJsonMessageConverter();}/*** 通过注入一个 NewTopic 类型的 Bean 来创建 topic,如果 topic 已存在,则会忽略。*/@Beanpublic NewTopic myTopic() {return new NewTopic(myTopic, 2, (short) 1);}@Beanpublic NewTopic myTopic2() {return new NewTopic(myTopic2, 1, (short) 1);}
}
4.5 controller代码
@RestController
@RequestMapping(value = "/book")
public class BookController {@Value("${kafka.topic.my-topic}")String myTopic;@Value("${kafka.topic.my-topic2}")String myTopic2;BookProducerService producer;private AtomicLong atomicLong = new AtomicLong();BookController(BookProducerService producer) {this.producer = producer;}@GetMapping("/send")public String sendMessageToKafkaTopic(@RequestParam("name") String name) {this.producer.sendMessage(myTopic, new Book(atomicLong.addAndGet(1), name));this.producer.sendMessage(myTopic2, new Book(atomicLong.addAndGet(1), name));return name+" : 消息已经发送!";}
}
4.6 book 的生成者业务
@Service
public class BookProducerService {private static final Logger logger = LoggerFactory.getLogger(BookProducerService.class);private final KafkaTemplate<String, Object> kafkaTemplate;//通过构造方法进行注入public BookProducerService(KafkaTemplate<String, Object> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, Object o) {ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息",result.getRecordMetadata().topic(),result.getRecordMetadata().partition()),ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));}}
4.7 book的消费者业务
@Service
public class BookConsumerService {@Value("${kafka.topic.my-topic}")private String myTopic;@Value("${kafka.topic.my-topic2}")private String myTopic2;private final Logger logger = LoggerFactory.getLogger(BookProducerService.class);private final ObjectMapper objectMapper = new ObjectMapper();@KafkaListener(topics = {"${kafka.topic.my-topic}"}, groupId = "group1")public void consumeMessage(ConsumerRecord<String, String> bookConsumerRecord) {try {Book book = objectMapper.readValue(bookConsumerRecord.value(), Book.class);logger.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), book.toString());} catch (JsonProcessingException e) {e.printStackTrace();}}@KafkaListener(topics = {"${kafka.topic.my-topic2}"}, groupId = "group2")public void consumeMessage2(Book book,ConsumerRecord<String,String> bookConsumerRecord) throws JsonProcessingException {Book value = objectMapper.readValue(bookConsumerRecord.value(), Book.class);logger.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), value.toString());logger.info("消费者消费{}的消息 -> {}", myTopic2, book.toString());}
}
代码整体目录如下
4.8 启动成功界面
4.9 浏览器访问
4.10 控制台显示
至此.基于KafKa的Springboot项目简单应用已经完成,后续需要对Kafka进行更深的学习以及应用!
相关文章:

Kafka在Java项目中的应用
Kafka在Java项目中的应用 Docker 安装Kafka 一.首先需要安装docker,可看这篇文章安装docker 二.拉取zookeeper和KafKa镜像 docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafkaKafka组件需要向zookeeper进行注册,所以也需要安装zookeeper 三.启动zookeeper…...
理解分布式id生成算法SnowFlake
理解分布式id生成算法SnowFlake 分布式id生成算法的有很多种,Twitter的SnowFlake就是其中经典的一种。 概述 SnowFlake算法生成id的结果是一个64bit大小的整数,它的结构如下图: } public function __construct(){ $this->rnew…...

光纤收发器可以连接光模块吗?
随着科技的进步发展,城市信息化速度的加快,光通信产品在数据中心和安防监控等场景中的运用越来越广泛,而这之间的连接则需要光模块和光纤收发器来实现。很多用户对光模块和光纤收发器的使用有些疑虑,两者该如何连接?又…...

一文快速了解浏览器Sui Explorer
Sui作为一条基于第一原理重新设计和构建而成的L1公链,所有区块和交易信息皆公开透明,每个人都能自行查看。通过Sui链上浏览器,用户可以迅速了解链上的交易情况,比如当前的TPS和Gas价格,也可以使用Digest来查看特定交易…...
python中lambda、yield、map、filter、reduce的使用
1、 匿名函数lambda python中允许使用lambda关键字定义一个匿名函数。所谓的匿名函数就是说使用一次或者几次之后就不再需要的函数,属于“一次性”函数。 #例1:求两数之和 f lambda x, y: x y print(f(5, 1))#例2:求平方和 print((lambda…...

第十八章 使用LNMP架构部署动态网站环境
文章目录 第十八章 使用LNMP架构部署动态网站环境一、源码包程序1、源码包的优势2、基本步骤(1)、下载及解压源码包文件(2)、编译源码包代码(3)、生成二进制安装程序(4)、运行二进制…...

无人值守的IDC机房动环综合运维方案
企业数字化转型以及5G、物联网、云计算、人工智能等新业态带动了数据中心的发展,在国家一体化大数据中心及“东数西算”节点布局的推动下,数据中心机房已成为各大企事业单位维持业务正常运营的重要组成部分,网络设备、系统、业务应用数量与日…...

桌面远程工具推荐
目前市面上的远程工具多如牛毛,很多人不知道怎么选择,下面小编介绍两种桌面远程工具,它们都是跨平台的,均支持Windows,Mac OS,IOS和安卓,分别是RayLink,VNC,好用…...

MySQL高级——第15章_锁
第15章_锁 1. 概述 锁是计算机协调多个进程或线程并发访问某一资源的机制。在程序开发中会存在多线程同步的问题,当多个线程并发访问某个数据的时候,尤其是针对一-些敏感的数据(比如订单、金额等),我们就需要保证这个数据在任何 时刻最多只…...

【ROS】Ubuntu22.04安装ROS2(Humble Hawksbill)
0、版本说明 Ubuntu22.04对应的ROS2的版本为Humble Hawksbill(ros-humble) 如果不是在Ubuntu22.04中安装ROS,请参考下面Ubuntu和ROS的版本对应关系 1、更新apt包列表 $ sudo apt update2、设置编码 将ubuntu环境语言编码设置为en_US en_…...

【ChatGPT】体验一下ChatGPT
体验一下ChatGPT 可以帮你写代码、写邮件、编故事的神器 最近OpenAI 发布了备受期待的原型通用 ChatGPT,这是一种基于对话的 AI 聊天界面,算是GPT-3(Generative Pre-trained Transformer 3)的继承者,今天记录一下体验的过程,以前…...
Android 串口通信
可以使用开源usb-serial-for-android 库进行串口通信 添加 usb-serial-for-android 依赖项到项目中。在项目的 build.gradle 文件中添加以下内容: dependencies {// 其他依赖项...implementation com.github.mik3y:usb-serial-for-android:3.5.1// 其他依赖项... …...
Python3 日期和时间
Python 3 提供了强大的日期和时间处理模块,名为 datetime。它可以用于执行日期和时间的各种操作,包括创建、格式化、比较和计算等。 下面是一些常用的日期和时间操作的示例: ### 获取当前日期和时间 要获取当前日期和时间,可以使…...
Go 爬虫三种框架的基本使用介绍
目录 Go 爬虫三种框架的基本使用介绍1. Colly2. Golang.org/x/net/html3. GoQuery Go 爬虫示例使用Go中的http包进行爬虫Step 1:导入包Step 2:发送请求Step 3:读取响应Step 4:解析HTMLStep 5:总结 使用Colley爬虫 结语…...
python实现斐波那契数列详解(黄金分割)
今天给各位分享一个常见的题目:求斐波那契数列前n项分别是什么(也称为黄金分割数列),整个数列需满足一个条件即第三项的值等于前两项相加的和,如第一项是1、第二项是1、第三项是2、第四项是 3、第五项是5... 满足公式…...

整合营销和内容营销哪个好,有什么区别
如果想做自媒体运营,不管是品牌还是个体从业者,其实都要学会如何去营销。这个也分为很多种方式,比如整合营销和内容营销。今天,来和大家谈谈整合营销和内容营销哪个好,如何才能将他们应用好? 要想回答这个问题&#x…...

C# | [二进制字符串] 与 [字节数组] 互相转换,一行代码就搞定! - CodePlus系列
C#二进制字符串与字节数组互相转换 文章目录 C#二进制字符串与字节数组互相转换前言示例代码实现思路扩展方法说明引用CodePlus库结束语 前言 开发中有时需要将二进制数据转换为字符串或相反。虽然.NET提供了一些用于二进制数据操作的类库,但是它们的使用有时候会比…...
Java 细节汇总(5)-Comparator#compare() 升降序确定
文章目录 1. Comparator#compare() 升降序确定升序分析 1. Comparator#compare() 升降序确定 Java 语言中 Comparator#compare(T o1, T o2) 方法的实现可以决定排序元素的升序降序,但是许多人对升降序如何确定完全没有概念。要理解升降序是如何确定的,首…...
湖北棒球发展报告·棒球5号位
湖北棒球的发展报告与办法应该考虑以下几个因素: 1. 借助政策支持。湖北棒球要想发展,政策支持是必不可少的。政府需要提供足够的资金和政策支持,以帮助俱乐部提高运营能力和加强比赛的组织。获得政府的政策支持,可以促进湖北棒球…...

使用Eclipse 进行远程 Debug 调试
Eclipse远程调试 Java自身支持调试功能,并提供了一个简单的调试工具--JDB,类似于功能强大的GDB,JDB也是一个字符界面的调试环境,并支持设置断点,支持线程线级的调试。 由于部署环境的差异性&am…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...

【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力
引言: 在人工智能快速发展的浪潮中,快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型(LLM)。该模型代表着该领域的重大突破,通过独特方式融合思考与非思考…...

基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...

MySQL 8.0 OCP 英文题库解析(十三)
Oracle 为庆祝 MySQL 30 周年,截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始,将英文题库免费公布出来,并进行解析,帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...

AI病理诊断七剑下天山,医疗未来触手可及
一、病理诊断困局:刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断",医生需通过显微镜观察组织切片,在细胞迷宫中捕捉癌变信号。某省病理质控报告显示,基层医院误诊率达12%-15%,专家会诊…...