【Kafka】SpringBoot整合Kafka详细介绍及代码示例
Kafka介绍
Apache Kafka是一个分布式流处理平台。它最初由LinkedIn开发,后来成为Apache软件基金会的一部分,并在开源社区中得到了广泛应用。Kafka的核心概念包括Producer、Consumer、Broker、Topic、Partition和Offset。
- Producer:生产者,负责将数据发送到Kafka集群。
- Consumer:消费者,从Kafka集群中读取数据。
- Broker:Kafka服务器实例,Kafka集群通常由多个Broker组成。
- Topic:主题,数据按主题进行分类。
- Partition:分区,每个主题可以有多个分区,用于实现并行处理和提高吞吐量。
- Offset:偏移量,每个消息在其分区中的唯一标识。
使用场景
Kafka适用于以下场景:
- 日志收集:集中收集系统日志和应用日志,通过Kafka传输到大数据处理系统。
- 消息队列:作为高吞吐量、低延迟的消息队列系统。
- 数据流处理:实时处理数据流,用于实时分析、监控和处理。
- 事件源架构:将所有的变更事件存储在Kafka中,实现事件溯源和回放。
- 流数据管道:构建数据管道,连接数据源和数据存储系统。
Spring Boot整合Kafka
项目结构
springboot-kafka
│
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com.example.kafka
│ │ │ ├── KafkaApplication.java
│ │ │ ├── config
│ │ │ │ └── KafkaConfig.java
│ │ │ ├── producer
│ │ │ │ └── KafkaProducer.java
│ │ │ ├── consumer
│ │ │ │ └── KafkaConsumer.java
│ │ │ └── controller
│ │ │ └── KafkaController.java
│ │ └── resources
│ │ ├── application.yml
│ │ └── logback-spring.xml (可选)
│ └── test
│ └── java
│ └── com.example.kafka
│ └── KafkaApplicationTests.java
└── pom.xml
1. 创建Spring Boot项目并添加依赖
pom.xml
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
2. 配置Kafka
application.yml
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
3. 创建Kafka配置类
KafkaConfig.java
package com.example.kafka.config;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class KafkaConfig {@Beanpublic NewTopic myTopic() {return new NewTopic("my-topic", 1, (short) 1);}
}
4. 创建Kafka生产者
KafkaProducer.java
package com.example.kafka.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}
}
5. 创建Kafka消费者
KafkaConsumer.java
package com.example.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group")public void listen(String message) {System.out.println("Received message: " + message);}
}
6. 创建控制器发送消息
KafkaController.java
package com.example.kafka.controller;import com.example.kafka.producer.KafkaProducer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {private final KafkaProducer kafkaProducer;public KafkaController(KafkaProducer kafkaProducer) {this.kafkaProducer = kafkaProducer;}@GetMapping("/send")public String sendMessage(@RequestParam String message) {kafkaProducer.sendMessage("my-topic", message);return "Message sent";}
}
7. 创建Spring Boot主类
KafkaApplication.java
package com.example.kafka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}
}
8. 测试应用
通过访问以下URL来发送消息:
http://localhost:8080/send?message=HelloKafka
9. 日志配置(可选)
为了更好地查看Kafka的日志,可以添加logback-spring.xml配置:
logback-spring.xml
<configuration><springProfile name="default"><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss} - %msg%n</pattern></encoder></appender><logger name="org.apache.kafka" level="INFO"/><root level="INFO"><appender-ref ref="STDOUT"/></root></springProfile>
</configuration>
10. 测试类(可选)
KafkaApplicationTests.java
package com.example.kafka;import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class KafkaApplicationTests {@Testvoid contextLoads() {}
}
至此,你已经完成了Spring Boot整合Kafka的详细配置和代码示例。你可以根据实际需求进一步扩展和修改这个基础代码。
相关文章:
【Kafka】SpringBoot整合Kafka详细介绍及代码示例
Kafka介绍 Apache Kafka是一个分布式流处理平台。它最初由LinkedIn开发,后来成为Apache软件基金会的一部分,并在开源社区中得到了广泛应用。Kafka的核心概念包括Producer、Consumer、Broker、Topic、Partition和Offset。 Producer:生产者&a…...
C++ 质数因子分解
描述 功能:输入一个正整数,按照从小到大的顺序输出它的所有质因子(重复的也要列举)(如180的质因子为2 2 3 3 5 ) 输入描述: 输入一个整数 输出描述: 按照从小到大的顺序输出它的所有质数的…...
laravel版本≥ 8.1
laravel10 php ≥ 8.1 且 ≤ 8.3? 8.1 < php < 8.3PHP版本要求在 8.1 到 8.3 之间,包括这两个版本。具体来说:"≥ 8.1" 表示 PHP 的版本至少是 8.1,也就是说 8.1 及以上的版本都可以。 "≤ 8.3" 表示 P…...
【iOS】MRC下的单例模式批量创建单例
单例模式的介绍和ARC下的单例请见这篇:【iOS】单例模式 目录 关闭ARC环境MRC下的单例ARC下的单例批量创建单例Demo 关闭ARC环境 首先关闭ARC环境,即打开MRC: 或是指定某特定目标文件为非ARC环境: 双击某个类文件,指定…...
计算机网络期末复习
今天考专四,环境都蛮好的,试卷也很新,老师人也不错,明年再来。 又到期末考试咯,大家复习没有?还没复习啊?还不复???? 目录 第一章 1-02 试简述…...
python写一个获取竞品信息报告
要编写一个获取竞品信息报告的Python程序,首先需要明确您想要获取的竞品信息以及数据来源。在这个示例中,我将展示如何从网页提取竞品信息,并编写一个简单的报告。 假设您想要获取以下竞品信息: 1. 产品名称 2. 产品价格 3. 产品特…...
一文彻底理解机器学习 ROC-AUC 指标
在机器学习和数据科学的江湖中,评估模型的好坏是非常关键的一环。而 ROC(Receiver Operating Characteristic)曲线和 AUC(Area Under Curve)正是评估分类模型性能的重要工具。 这个知识点在面试中也很频繁的出现。尽管…...
【二】【动态规划NEW】91. 解码方法,62. 不同路径,63. 不同路径 II
91. 解码方法 一条包含字母 A-Z 的消息通过以下映射进行了 编码 : ‘A’ -> “1” ‘B’ -> “2” … ‘Z’ -> “26” 要 解码 已编码的消息,所有数字必须基于上述映射的方法,反向映射回字母(可能有多种方法ÿ…...
Python闯LeetCode--第3题:无重复字符的最长子串
Problem: 3. 无重复字符的最长子串 文章目录 思路解题方法复杂度Code 思路 一上来马上想到两层for循环暴力枚举,但是又立马想到复杂度是 O ( n 2 ) O(n^2) O(n2),思考了一下能否有更优解,于是想到用头尾两个指针来指定滑动窗口(主…...
HTML DOM 对象
HTML DOM 对象 1. 概述 HTML DOM(文档对象模型)是一个跨平台和语言独立的接口,它允许程序和脚本动态地访问和更新文档的内容、结构和样式。在HTML DOM中,文档被表示为节点树,其中每个节点代表文档中的一个部分,例如元素、文本或属性。HTML DOM对象是构成这个节点树的基…...
如何解决 BeautifulSoup 安装问题:从 BeautifulSoup 3 到 BeautifulSoup 4
在使用 Python 的过程中,解析 HTML 和 XML 数据是一项常见任务。BeautifulSoup 是一个非常流行的解析库。然而,最近在安装 BeautifulSoup 时,遇到了一些问题。本文将介绍如何解决这些问题,并成功安装 BeautifulSoup 4。 问题描述 …...
原型模式--深复制/浅复制
原型模式用于克隆复杂对象,由于new一个实例对象会消耗大部分时间,所以原型模式可以节约大量时间 1 public class Sheep implements Cloneable{2 private String name;3 private Date birth;4 public Sheep(String name, Date birth) {5 …...
C# TextBox模糊查询及输入提示
在程序中,我们经常会遇到文本框中不知道输入什么内容,这时我们可以在文本框中显示提示词提示用户;或者需要查询某个内容却记不清完整信息,通常可以通过文本框列出与输入词相匹配的信息,帮助用户快速索引信息。 文本框…...
Node入门以及express创建项目
前言 记录学习NodeJS 一、NodeJS是什么? Node.js 是一个开源和跨平台的 JavaScript 运行时环境 二、下载NodeJs 1.下载地址(一直点击next即可,记得修改安装地址) https://nodejs.p2hp.com/download/ 2.查看是否安装成功,打开命令行 nod…...
Cheat Engine CE v7.5 安装教程(专注于游戏的修改器)
前言 Cheat Engine是一款专注于游戏的修改器。它可以用来扫描游戏中的内存,并允许修改它们。它还附带了调试器、反汇编器、汇编器、变速器、作弊器生成、Direct3D操作工具、系统检查工具等。 一、下载地址 下载链接:http://dygod/source 点击搜索&…...
【实例分享】访问后端服务超时,银河麒麟服务器操作系统分析及处理建议
1.服务器环境以及配置 【机型】 处理器: Intel 32核 内存: 128G 整机类型/架构: x86_64虚拟机 【内核版本】 4.19.90-25.22.v2101.kylin.x86_64 【OS镜像版本】 kylin server V10 SP2 【第三方软件】 开阳k8s 2.问题现象描述 …...
Java中和的区别
在Java中,& 和 && 都是逻辑运算符,但它们之间存在一些重要的区别,特别是在它们如何评估其操作数以及它们的性能影响方面。 短路评估(Short-Circuit Evaluation): &&(逻辑…...
深入理解计算机系统 CSAPP 家庭作业6.34
第一步先求(S,E,B,m) 题目说共C32个字节,块大小B为16个字节,那就是分为两组:0,1.然后每组存4个int 每个4字节 CB*E*S .B16 ,直接映射的E就是1,所以S2 m为啥等于7? 通过写出两个数组所有的地址可以得出m7. 得出高速缓存的参数:(S,E,B,m)(2,1,16,7),注意图6-26每个参数的定义…...
[leetcode 141环形链表]双指针解决环形链表
Problem: 141. 环形链表 文章目录 思路Code 思路 首先想到如果链表为空直接返回false 其次想到用双指针,一个一回走一步,另一个一回走两步 如果是环形,总有一个时刻,两指针会指向同一个节点,而且该结点不能为空(空是快指针遍历完单链表了) Code /*** Definition for singly-li…...
【深度学习】Precision、Accuracy的区别,精确率与准确率:深度学习多分类问题中的性能评估详解
在深度学习的多分类问题中,Precision(精确率)和Accuracy(准确率)是两种常用的性能评估指标,它们各自有不同的定义和用途。 Precision(精确率)的中文发音是:pǔ rēi xī…...
XML Group端口详解
在XML数据映射过程中,经常需要对数据进行分组聚合操作。例如,当处理包含多个物料明细的XML文件时,可能需要将相同物料号的明细归为一组,或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码,增加了开…...
关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...
汽车生产虚拟实训中的技能提升与生产优化
在制造业蓬勃发展的大背景下,虚拟教学实训宛如一颗璀璨的新星,正发挥着不可或缺且日益凸显的关键作用,源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例,汽车生产线上各类…...
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...
tree 树组件大数据卡顿问题优化
问题背景 项目中有用到树组件用来做文件目录,但是由于这个树组件的节点越来越多,导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多,导致的浏览器卡顿,这里很明显就需要用到虚拟列表的技术&…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
Reasoning over Uncertain Text by Generative Large Language Models
https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...
Linux离线(zip方式)安装docker
目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1:修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本:CentOS 7 64位 内核版本:3.10.0 相关命令: uname -rcat /etc/os-rele…...
R语言速释制剂QBD解决方案之三
本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...
C/C++ 中附加包含目录、附加库目录与附加依赖项详解
在 C/C 编程的编译和链接过程中,附加包含目录、附加库目录和附加依赖项是三个至关重要的设置,它们相互配合,确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中,这些概念容易让人混淆,但深入理解它们的作用和联…...
