Kafka在Java项目中的应用
Kafka在Java项目中的应用
Docker 安装Kafka
一.首先需要安装docker,可看这篇文章安装docker
二.拉取zookeeper和KafKa镜像
docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafka
Kafka组件需要向zookeeper进行注册,所以也需要安装zookeeper
三.启动zookeeper、kafka组件
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeperdocker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
启动成功界面如下,status即为running(运行中)

四.创建Springboot项目
4.1 添加依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency></dependencies>
4.2 application.yml文件
server:port: 9090
spring:kafka:bootstrap-servers: localhost:9092consumer:# 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)auto-offset-reset: earliestproducer:value-serializer: org.springframework.kafka.support.serializer.JsonSerializerretries: 3 # 重试次数
kafka:topic:my-topic: my-topicmy-topic2: my-topic2
4.3 创建实体类Book
public class Book {private Long id;private String name;public Book() {}public Book(Long id, String name) {this.id = id;this.name = name;}public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic String toString() {return "Book{" +"id=" + id +", name='" + name + '\'' +'}';}
}
4.4 配置KafKa信息
@Configuration
public class KafkaConfig {@Value("${kafka.topic.my-topic}")String myTopic;@Value("${kafka.topic.my-topic2}")String myTopic2;/*** JSON消息转换器*/@Beanpublic RecordMessageConverter jsonConverter() {return new StringJsonMessageConverter();}/*** 通过注入一个 NewTopic 类型的 Bean 来创建 topic,如果 topic 已存在,则会忽略。*/@Beanpublic NewTopic myTopic() {return new NewTopic(myTopic, 2, (short) 1);}@Beanpublic NewTopic myTopic2() {return new NewTopic(myTopic2, 1, (short) 1);}
}
4.5 controller代码
@RestController
@RequestMapping(value = "/book")
public class BookController {@Value("${kafka.topic.my-topic}")String myTopic;@Value("${kafka.topic.my-topic2}")String myTopic2;BookProducerService producer;private AtomicLong atomicLong = new AtomicLong();BookController(BookProducerService producer) {this.producer = producer;}@GetMapping("/send")public String sendMessageToKafkaTopic(@RequestParam("name") String name) {this.producer.sendMessage(myTopic, new Book(atomicLong.addAndGet(1), name));this.producer.sendMessage(myTopic2, new Book(atomicLong.addAndGet(1), name));return name+" : 消息已经发送!";}
}
4.6 book 的生成者业务
@Service
public class BookProducerService {private static final Logger logger = LoggerFactory.getLogger(BookProducerService.class);private final KafkaTemplate<String, Object> kafkaTemplate;//通过构造方法进行注入public BookProducerService(KafkaTemplate<String, Object> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, Object o) {ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息",result.getRecordMetadata().topic(),result.getRecordMetadata().partition()),ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));}}
4.7 book的消费者业务
@Service
public class BookConsumerService {@Value("${kafka.topic.my-topic}")private String myTopic;@Value("${kafka.topic.my-topic2}")private String myTopic2;private final Logger logger = LoggerFactory.getLogger(BookProducerService.class);private final ObjectMapper objectMapper = new ObjectMapper();@KafkaListener(topics = {"${kafka.topic.my-topic}"}, groupId = "group1")public void consumeMessage(ConsumerRecord<String, String> bookConsumerRecord) {try {Book book = objectMapper.readValue(bookConsumerRecord.value(), Book.class);logger.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), book.toString());} catch (JsonProcessingException e) {e.printStackTrace();}}@KafkaListener(topics = {"${kafka.topic.my-topic2}"}, groupId = "group2")public void consumeMessage2(Book book,ConsumerRecord<String,String> bookConsumerRecord) throws JsonProcessingException {Book value = objectMapper.readValue(bookConsumerRecord.value(), Book.class);logger.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), value.toString());logger.info("消费者消费{}的消息 -> {}", myTopic2, book.toString());}
}
代码整体目录如下

4.8 启动成功界面

4.9 浏览器访问

4.10 控制台显示

至此.基于KafKa的Springboot项目简单应用已经完成,后续需要对Kafka进行更深的学习以及应用!
相关文章:
Kafka在Java项目中的应用
Kafka在Java项目中的应用 Docker 安装Kafka 一.首先需要安装docker,可看这篇文章安装docker 二.拉取zookeeper和KafKa镜像 docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafkaKafka组件需要向zookeeper进行注册,所以也需要安装zookeeper 三.启动zookeeper…...
理解分布式id生成算法SnowFlake
理解分布式id生成算法SnowFlake 分布式id生成算法的有很多种,Twitter的SnowFlake就是其中经典的一种。 概述 SnowFlake算法生成id的结果是一个64bit大小的整数,它的结构如下图: } public function __construct(){ $this->rnew…...
光纤收发器可以连接光模块吗?
随着科技的进步发展,城市信息化速度的加快,光通信产品在数据中心和安防监控等场景中的运用越来越广泛,而这之间的连接则需要光模块和光纤收发器来实现。很多用户对光模块和光纤收发器的使用有些疑虑,两者该如何连接?又…...
一文快速了解浏览器Sui Explorer
Sui作为一条基于第一原理重新设计和构建而成的L1公链,所有区块和交易信息皆公开透明,每个人都能自行查看。通过Sui链上浏览器,用户可以迅速了解链上的交易情况,比如当前的TPS和Gas价格,也可以使用Digest来查看特定交易…...
python中lambda、yield、map、filter、reduce的使用
1、 匿名函数lambda python中允许使用lambda关键字定义一个匿名函数。所谓的匿名函数就是说使用一次或者几次之后就不再需要的函数,属于“一次性”函数。 #例1:求两数之和 f lambda x, y: x y print(f(5, 1))#例2:求平方和 print((lambda…...
第十八章 使用LNMP架构部署动态网站环境
文章目录 第十八章 使用LNMP架构部署动态网站环境一、源码包程序1、源码包的优势2、基本步骤(1)、下载及解压源码包文件(2)、编译源码包代码(3)、生成二进制安装程序(4)、运行二进制…...
无人值守的IDC机房动环综合运维方案
企业数字化转型以及5G、物联网、云计算、人工智能等新业态带动了数据中心的发展,在国家一体化大数据中心及“东数西算”节点布局的推动下,数据中心机房已成为各大企事业单位维持业务正常运营的重要组成部分,网络设备、系统、业务应用数量与日…...
桌面远程工具推荐
目前市面上的远程工具多如牛毛,很多人不知道怎么选择,下面小编介绍两种桌面远程工具,它们都是跨平台的,均支持Windows,Mac OS,IOS和安卓,分别是RayLink,VNC,好用…...
MySQL高级——第15章_锁
第15章_锁 1. 概述 锁是计算机协调多个进程或线程并发访问某一资源的机制。在程序开发中会存在多线程同步的问题,当多个线程并发访问某个数据的时候,尤其是针对一-些敏感的数据(比如订单、金额等),我们就需要保证这个数据在任何 时刻最多只…...
【ROS】Ubuntu22.04安装ROS2(Humble Hawksbill)
0、版本说明 Ubuntu22.04对应的ROS2的版本为Humble Hawksbill(ros-humble) 如果不是在Ubuntu22.04中安装ROS,请参考下面Ubuntu和ROS的版本对应关系 1、更新apt包列表 $ sudo apt update2、设置编码 将ubuntu环境语言编码设置为en_US en_…...
【ChatGPT】体验一下ChatGPT
体验一下ChatGPT 可以帮你写代码、写邮件、编故事的神器 最近OpenAI 发布了备受期待的原型通用 ChatGPT,这是一种基于对话的 AI 聊天界面,算是GPT-3(Generative Pre-trained Transformer 3)的继承者,今天记录一下体验的过程,以前…...
Android 串口通信
可以使用开源usb-serial-for-android 库进行串口通信 添加 usb-serial-for-android 依赖项到项目中。在项目的 build.gradle 文件中添加以下内容: dependencies {// 其他依赖项...implementation com.github.mik3y:usb-serial-for-android:3.5.1// 其他依赖项... …...
Python3 日期和时间
Python 3 提供了强大的日期和时间处理模块,名为 datetime。它可以用于执行日期和时间的各种操作,包括创建、格式化、比较和计算等。 下面是一些常用的日期和时间操作的示例: ### 获取当前日期和时间 要获取当前日期和时间,可以使…...
Go 爬虫三种框架的基本使用介绍
目录 Go 爬虫三种框架的基本使用介绍1. Colly2. Golang.org/x/net/html3. GoQuery Go 爬虫示例使用Go中的http包进行爬虫Step 1:导入包Step 2:发送请求Step 3:读取响应Step 4:解析HTMLStep 5:总结 使用Colley爬虫 结语…...
python实现斐波那契数列详解(黄金分割)
今天给各位分享一个常见的题目:求斐波那契数列前n项分别是什么(也称为黄金分割数列),整个数列需满足一个条件即第三项的值等于前两项相加的和,如第一项是1、第二项是1、第三项是2、第四项是 3、第五项是5... 满足公式…...
整合营销和内容营销哪个好,有什么区别
如果想做自媒体运营,不管是品牌还是个体从业者,其实都要学会如何去营销。这个也分为很多种方式,比如整合营销和内容营销。今天,来和大家谈谈整合营销和内容营销哪个好,如何才能将他们应用好? 要想回答这个问题&#x…...
C# | [二进制字符串] 与 [字节数组] 互相转换,一行代码就搞定! - CodePlus系列
C#二进制字符串与字节数组互相转换 文章目录 C#二进制字符串与字节数组互相转换前言示例代码实现思路扩展方法说明引用CodePlus库结束语 前言 开发中有时需要将二进制数据转换为字符串或相反。虽然.NET提供了一些用于二进制数据操作的类库,但是它们的使用有时候会比…...
Java 细节汇总(5)-Comparator#compare() 升降序确定
文章目录 1. Comparator#compare() 升降序确定升序分析 1. Comparator#compare() 升降序确定 Java 语言中 Comparator#compare(T o1, T o2) 方法的实现可以决定排序元素的升序降序,但是许多人对升降序如何确定完全没有概念。要理解升降序是如何确定的,首…...
湖北棒球发展报告·棒球5号位
湖北棒球的发展报告与办法应该考虑以下几个因素: 1. 借助政策支持。湖北棒球要想发展,政策支持是必不可少的。政府需要提供足够的资金和政策支持,以帮助俱乐部提高运营能力和加强比赛的组织。获得政府的政策支持,可以促进湖北棒球…...
使用Eclipse 进行远程 Debug 调试
Eclipse远程调试 Java自身支持调试功能,并提供了一个简单的调试工具--JDB,类似于功能强大的GDB,JDB也是一个字符界面的调试环境,并支持设置断点,支持线程线级的调试。 由于部署环境的差异性&am…...
云原生核心技术 (7/12): K8s 核心概念白话解读(上):Pod 和 Deployment 究竟是什么?
大家好,欢迎来到《云原生核心技术》系列的第七篇! 在上一篇,我们成功地使用 Minikube 或 kind 在自己的电脑上搭建起了一个迷你但功能完备的 Kubernetes 集群。现在,我们就像一个拥有了一块崭新数字土地的农场主,是时…...
基于ASP.NET+ SQL Server实现(Web)医院信息管理系统
医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上,开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识,在 vs 2017 平台上,进行 ASP.NET 应用程序和简易网站的开发;初步熟悉开发一…...
前端导出带有合并单元格的列表
// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
.Net Framework 4/C# 关键字(非常用,持续更新...)
一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...
OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 在 GPU 上对图像执行 均值漂移滤波(Mean Shift Filtering),用于图像分割或平滑处理。 该函数将输入图像中的…...
均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...
在web-view 加载的本地及远程HTML中调用uniapp的API及网页和vue页面是如何通讯的?
uni-app 中 Web-view 与 Vue 页面的通讯机制详解 一、Web-view 简介 Web-view 是 uni-app 提供的一个重要组件,用于在原生应用中加载 HTML 页面: 支持加载本地 HTML 文件支持加载远程 HTML 页面实现 Web 与原生的双向通讯可用于嵌入第三方网页或 H5 应…...
Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)
在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马(服务器方面的)的原理,连接,以及各种木马及连接工具的分享 文件木马:https://w…...
