Kafka在Java项目中的应用
Kafka在Java项目中的应用
Docker 安装Kafka
一.首先需要安装docker,可看这篇文章安装docker
二.拉取zookeeper和KafKa镜像
docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafka
Kafka组件需要向zookeeper进行注册,所以也需要安装zookeeper
三.启动zookeeper、kafka组件
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeperdocker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
启动成功界面如下,status即为running(运行中)

四.创建Springboot项目
4.1 添加依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies>
4.2 application.yml文件
server:port: 9090
spring:kafka:bootstrap-servers: localhost:9092consumer:# 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)auto-offset-reset: earliestproducer:value-serializer: org.springframework.kafka.support.serializer.JsonSerializerretries: 3 # 重试次数
kafka:topic:my-topic: my-topicmy-topic2: my-topic2
4.3 创建实体类Book
public class Book {private Long id;private String name;public Book() {}public Book(Long id, String name) {this.id = id;this.name = name;}public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic String toString() {return "Book{" +"id=" + id +", name='" + name + '\'' +'}';}
}
4.4 配置KafKa信息
@Configuration
public class KafkaConfig {@Value("${kafka.topic.my-topic}")String myTopic;@Value("${kafka.topic.my-topic2}")String myTopic2;/*** JSON消息转换器*/@Beanpublic RecordMessageConverter jsonConverter() {return new StringJsonMessageConverter();}/*** 通过注入一个 NewTopic 类型的 Bean 来创建 topic,如果 topic 已存在,则会忽略。*/@Beanpublic NewTopic myTopic() {return new NewTopic(myTopic, 2, (short) 1);}@Beanpublic NewTopic myTopic2() {return new NewTopic(myTopic2, 1, (short) 1);}
}
4.5 controller代码
@RestController
@RequestMapping(value = "/book")
public class BookController {@Value("${kafka.topic.my-topic}")String myTopic;@Value("${kafka.topic.my-topic2}")String myTopic2;BookProducerService producer;private AtomicLong atomicLong = new AtomicLong();BookController(BookProducerService producer) {this.producer = producer;}@GetMapping("/send")public String sendMessageToKafkaTopic(@RequestParam("name") String name) {this.producer.sendMessage(myTopic, new Book(atomicLong.addAndGet(1), name));this.producer.sendMessage(myTopic2, new Book(atomicLong.addAndGet(1), name));return name+" : 消息已经发送!";}
}
4.6 book 的生成者业务
@Service
public class BookProducerService {private static final Logger logger = LoggerFactory.getLogger(BookProducerService.class);private final KafkaTemplate<String, Object> kafkaTemplate;//通过构造方法进行注入public BookProducerService(KafkaTemplate<String, Object> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, Object o) {ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息",result.getRecordMetadata().topic(),result.getRecordMetadata().partition()),ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));}}
4.7 book的消费者业务
@Service
public class BookConsumerService {@Value("${kafka.topic.my-topic}")private String myTopic;@Value("${kafka.topic.my-topic2}")private String myTopic2;private final Logger logger = LoggerFactory.getLogger(BookProducerService.class);private final ObjectMapper objectMapper = new ObjectMapper();@KafkaListener(topics = {"${kafka.topic.my-topic}"}, groupId = "group1")public void consumeMessage(ConsumerRecord<String, String> bookConsumerRecord) {try {Book book = objectMapper.readValue(bookConsumerRecord.value(), Book.class);logger.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), book.toString());} catch (JsonProcessingException e) {e.printStackTrace();}}@KafkaListener(topics = {"${kafka.topic.my-topic2}"}, groupId = "group2")public void consumeMessage2(Book book,ConsumerRecord<String,String> bookConsumerRecord) throws JsonProcessingException {Book value = objectMapper.readValue(bookConsumerRecord.value(), Book.class);logger.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), value.toString());logger.info("消费者消费{}的消息 -> {}", myTopic2, book.toString());}
}
代码整体目录如下

4.8 启动成功界面

4.9 浏览器访问

4.10 控制台显示

至此.基于KafKa的Springboot项目简单应用已经完成,后续需要对Kafka进行更深的学习以及应用!
相关文章:
Kafka在Java项目中的应用
Kafka在Java项目中的应用 Docker 安装Kafka 一.首先需要安装docker,可看这篇文章安装docker 二.拉取zookeeper和KafKa镜像 docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafkaKafka组件需要向zookeeper进行注册,所以也需要安装zookeeper 三.启动zookeeper…...
理解分布式id生成算法SnowFlake
理解分布式id生成算法SnowFlake 分布式id生成算法的有很多种,Twitter的SnowFlake就是其中经典的一种。 概述 SnowFlake算法生成id的结果是一个64bit大小的整数,它的结构如下图: } public function __construct(){ $this->rnew…...
光纤收发器可以连接光模块吗?
随着科技的进步发展,城市信息化速度的加快,光通信产品在数据中心和安防监控等场景中的运用越来越广泛,而这之间的连接则需要光模块和光纤收发器来实现。很多用户对光模块和光纤收发器的使用有些疑虑,两者该如何连接?又…...
一文快速了解浏览器Sui Explorer
Sui作为一条基于第一原理重新设计和构建而成的L1公链,所有区块和交易信息皆公开透明,每个人都能自行查看。通过Sui链上浏览器,用户可以迅速了解链上的交易情况,比如当前的TPS和Gas价格,也可以使用Digest来查看特定交易…...
python中lambda、yield、map、filter、reduce的使用
1、 匿名函数lambda python中允许使用lambda关键字定义一个匿名函数。所谓的匿名函数就是说使用一次或者几次之后就不再需要的函数,属于“一次性”函数。 #例1:求两数之和 f lambda x, y: x y print(f(5, 1))#例2:求平方和 print((lambda…...
第十八章 使用LNMP架构部署动态网站环境
文章目录 第十八章 使用LNMP架构部署动态网站环境一、源码包程序1、源码包的优势2、基本步骤(1)、下载及解压源码包文件(2)、编译源码包代码(3)、生成二进制安装程序(4)、运行二进制…...
无人值守的IDC机房动环综合运维方案
企业数字化转型以及5G、物联网、云计算、人工智能等新业态带动了数据中心的发展,在国家一体化大数据中心及“东数西算”节点布局的推动下,数据中心机房已成为各大企事业单位维持业务正常运营的重要组成部分,网络设备、系统、业务应用数量与日…...
桌面远程工具推荐
目前市面上的远程工具多如牛毛,很多人不知道怎么选择,下面小编介绍两种桌面远程工具,它们都是跨平台的,均支持Windows,Mac OS,IOS和安卓,分别是RayLink,VNC,好用…...
MySQL高级——第15章_锁
第15章_锁 1. 概述 锁是计算机协调多个进程或线程并发访问某一资源的机制。在程序开发中会存在多线程同步的问题,当多个线程并发访问某个数据的时候,尤其是针对一-些敏感的数据(比如订单、金额等),我们就需要保证这个数据在任何 时刻最多只…...
【ROS】Ubuntu22.04安装ROS2(Humble Hawksbill)
0、版本说明 Ubuntu22.04对应的ROS2的版本为Humble Hawksbill(ros-humble) 如果不是在Ubuntu22.04中安装ROS,请参考下面Ubuntu和ROS的版本对应关系 1、更新apt包列表 $ sudo apt update2、设置编码 将ubuntu环境语言编码设置为en_US en_…...
【ChatGPT】体验一下ChatGPT
体验一下ChatGPT 可以帮你写代码、写邮件、编故事的神器 最近OpenAI 发布了备受期待的原型通用 ChatGPT,这是一种基于对话的 AI 聊天界面,算是GPT-3(Generative Pre-trained Transformer 3)的继承者,今天记录一下体验的过程,以前…...
Android 串口通信
可以使用开源usb-serial-for-android 库进行串口通信 添加 usb-serial-for-android 依赖项到项目中。在项目的 build.gradle 文件中添加以下内容: dependencies {// 其他依赖项...implementation com.github.mik3y:usb-serial-for-android:3.5.1// 其他依赖项... …...
Python3 日期和时间
Python 3 提供了强大的日期和时间处理模块,名为 datetime。它可以用于执行日期和时间的各种操作,包括创建、格式化、比较和计算等。 下面是一些常用的日期和时间操作的示例: ### 获取当前日期和时间 要获取当前日期和时间,可以使…...
Go 爬虫三种框架的基本使用介绍
目录 Go 爬虫三种框架的基本使用介绍1. Colly2. Golang.org/x/net/html3. GoQuery Go 爬虫示例使用Go中的http包进行爬虫Step 1:导入包Step 2:发送请求Step 3:读取响应Step 4:解析HTMLStep 5:总结 使用Colley爬虫 结语…...
python实现斐波那契数列详解(黄金分割)
今天给各位分享一个常见的题目:求斐波那契数列前n项分别是什么(也称为黄金分割数列),整个数列需满足一个条件即第三项的值等于前两项相加的和,如第一项是1、第二项是1、第三项是2、第四项是 3、第五项是5... 满足公式…...
整合营销和内容营销哪个好,有什么区别
如果想做自媒体运营,不管是品牌还是个体从业者,其实都要学会如何去营销。这个也分为很多种方式,比如整合营销和内容营销。今天,来和大家谈谈整合营销和内容营销哪个好,如何才能将他们应用好? 要想回答这个问题&#x…...
C# | [二进制字符串] 与 [字节数组] 互相转换,一行代码就搞定! - CodePlus系列
C#二进制字符串与字节数组互相转换 文章目录 C#二进制字符串与字节数组互相转换前言示例代码实现思路扩展方法说明引用CodePlus库结束语 前言 开发中有时需要将二进制数据转换为字符串或相反。虽然.NET提供了一些用于二进制数据操作的类库,但是它们的使用有时候会比…...
Java 细节汇总(5)-Comparator#compare() 升降序确定
文章目录 1. Comparator#compare() 升降序确定升序分析 1. Comparator#compare() 升降序确定 Java 语言中 Comparator#compare(T o1, T o2) 方法的实现可以决定排序元素的升序降序,但是许多人对升降序如何确定完全没有概念。要理解升降序是如何确定的,首…...
湖北棒球发展报告·棒球5号位
湖北棒球的发展报告与办法应该考虑以下几个因素: 1. 借助政策支持。湖北棒球要想发展,政策支持是必不可少的。政府需要提供足够的资金和政策支持,以帮助俱乐部提高运营能力和加强比赛的组织。获得政府的政策支持,可以促进湖北棒球…...
使用Eclipse 进行远程 Debug 调试
Eclipse远程调试 Java自身支持调试功能,并提供了一个简单的调试工具--JDB,类似于功能强大的GDB,JDB也是一个字符界面的调试环境,并支持设置断点,支持线程线级的调试。 由于部署环境的差异性&am…...
[智能体-69]:重新认知MCP:协议不生产智能,只是AI全域交互的标准化基石
MCP只是提供了大模型、编排调度、外部工具能够进行结构化交流的标准,而整个系统的智能主要依赖编排调度,与外部软件系统的交互取决于外部工具,包括外部语音交互、视觉交互、数字化交互。当下MCP(Model Context Protocol࿰…...
3步解锁专业级MMD创作:Blender插件如何重塑二次元动画工作流
3步解锁专业级MMD创作:Blender插件如何重塑二次元动画工作流 【免费下载链接】blender_mmd_tools MMD Tools is a blender addon for importing/exporting Models and Motions of MikuMikuDance. 项目地址: https://gitcode.com/gh_mirrors/bl/blender_mmd_tools …...
举一个具体例子说明为什么索引不是越多越好,举具体字段
文章目录1. 核心舞台:笔记表 (t_note) 结构设计🚨 错误的操作:2. 结合具体字段,拆解三大翻车现场现场一:给 view_count(浏览量)加索引 —— 导致写放大,拖垮数据库现场二:…...
巨量投放总结
巨量商务管理平台 : https://business.oceanengine.com 巨量广告投放平台: https://ad.oceanengine.com 商务管理平台 账户 广告组 计划 广告投放平台 层级关系: 广告组 -> 计划 -> 创意 对应FB: 系列 - > 广告组 -> 广告...
php有什么版本,php语言有几个版本
php有什么版本,php语言有几个版本PHP的大版本主要分四支:PHP4/PHP5/PHP6/PHP7 其中,PHP4由于太古老、对OO支持不力已基本被淘汰,请无视PHP4。 PHP6由于基本没有生产线上的应用,还基本只是一款概念产品,很多功能已在PHP…...
从NLP到RAG:AI标书生成系统的技术架构与落地路径深度剖析
引言2026年2月,国家发改委等八部门联合印发《关于加快招标投标领域人工智能推广应用的实施意见》,明确到2026年底招标文件检测、智能辅助评标、围串标识别等重点场景在部分省市实现全覆盖。同一时期,《招标投标法》修订草案经国务院常务会议原…...
在数据预处理与分析流水线中集成大模型API进行智能标注与摘要
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 在数据预处理与分析流水线中集成大模型API进行智能标注与摘要 对于数据工程师而言,处理海量非结构化文本数据是一项常见…...
星露谷物语SMAPI模组加载器:从新手到专家的完整使用指南
星露谷物语SMAPI模组加载器:从新手到专家的完整使用指南 【免费下载链接】SMAPI The modding API for Stardew Valley. 项目地址: https://gitcode.com/gh_mirrors/smap/SMAPI 星露谷物语SMAPI模组加载器是官方推荐的模组API,它为玩家和开发者提供…...
DeTikZify:基于AI的TikZ图形程序自动生成技术深度解析
DeTikZify:基于AI的TikZ图形程序自动生成技术深度解析 【免费下载链接】DeTikZify Synthesizing Graphics Programs for Scientific Figures and Sketches with TikZ. 项目地址: https://gitcode.com/gh_mirrors/de/DeTikZify DeTikZify是一款革命性的多模态…...
全方位梳理 OpenClaw 部署与使用干货
OpenClaw 一键安装包|可视化部署,简化环境配置流程 ✨适配系统:Windows10/11 64 位 当前版本:v2.7.5(虾壳云版) ✨核心优势:全程可视化操作,不用命令行、不用手动配置 Python/Node…...
