Kafka: 详解、使用教程和示例
Kafka: 详细介绍、使用教程和示例
什么是 Kafka?
Kafka 是一个分布式的流处理平台,最初由 LinkedIn 开发,现已成为 Apache 基金会的顶级项目。它以高吞吐量、可靠性和可扩展性而闻名,被广泛应用于实时数据传输、日志收集、事件处理和流式分析等场景。Kafka 的设计目标在于处理大规模的数据流,使其成为构建现代分布式应用的理想选择。
Kafka 的核心概念
在深入了解 Kafka 的使用教程之前,让我们先介绍一些 Kafka 的核心概念,这些概念是理解 Kafka 的基础:
-
Broker: Kafka 集群中的每个服务器节点称为 Broker,它们负责存储和处理数据。
-
Topic: 消息发布的主题,是数据流的类别。生产者将消息发布到主题,消费者从主题中订阅消息。
-
Partition: 每个 Topic 可以分成多个 Partition,每个 Partition 是一个有序的消息队列。分区允许数据水平分布和并行处理。
-
Producer: 数据的发布者,将消息发送到一个或多个 Topic。
-
Consumer: 数据的订阅者,从一个或多个 Topic 中消费消息。
-
Consumer Group: 一组消费者的集合,共同消费一个 Topic 的消息。每个分区只能由一个消费者组中的一个消费者消费。
-
Offset: 每个消息在 Partition 中的唯一标识,消费者使用 Offset 来追踪已消费的消息。
如何使用 Kafka?
以下是一个详细的 Kafka 使用教程,从安装到实际示例,全面介绍了 Kafka 的用法:
1. 安装和启动 Kafka
首先,你需要安装 Kafka。你可以从官方网站(https://kafka.apache.org/downloads)下载最新版本,并按照指南进行安装。在安装完成后,你需要启动 Kafka 服务器和 ZooKeeper。
启动 ZooKeeper(Kafka 依赖于 ZooKeeper):
bin/zookeeper-server-start.sh config/zookeeper.properties
然后,启动 Kafka 服务器:
bin/kafka-server-start.sh config/server.properties
2. 创建 Topic
在 Kafka 中,你需要创建一个或多个 Topic 来存储消息。使用以下命令创建一个名为 my-topic 的 Topic:
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
这将创建一个名为 my-topic 的 Topic,拥有 3 个分区和 1 个副本。
3. 使用 Kafka 生产者
Kafka 生产者用于将消息发布到指定的 Topic 中。以下是一个简单的 Java 生产者示例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(properties);String topic = "my-topic";for (int i = 0; i < 10; i++) {String message = "Message " + i;producer.send(new ProducerRecord<>(topic, message));System.out.println("Sent: " + message);}producer.close();}
}
4. 使用 Kafka 消费者
Kafka 消费者从 Topic 中订阅并处理消息。以下是一个简单的 Java 消费者示例:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(properties);String topic = "my-topic";consumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));records.forEach(record -> {System.out.println("Received: " + record.value());});}}
}
5. 运行示例
首先,打开一个终端窗口,运行 Kafka 生产者示例:
java KafkaProducerExample
然后,打开另一个终端窗口,运行 Kafka 消费者示例:
java KafkaConsumerExample
你将会看到生产者发送的消息被消费者接收和处理。
总结
Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。
相关文章:
Kafka: 详解、使用教程和示例
Kafka: 详细介绍、使用教程和示例 什么是 Kafka? Kafka 是一个分布式的流处理平台,最初由 LinkedIn 开发,现已成为 Apache 基金会的顶级项目。它以高吞吐量、可靠性和可扩展性而闻名,被广泛应用于实时数据传输、日志收集、事件处…...
【LeetCode周赛】LeetCode第358场周赛
LeetCode第358场周赛 数组中的最大数对和翻倍以链表形式表示的数字限制条件下元素之间的最小绝对差 数组中的最大数对和 给你一个下标从0开始的整数数组nums。请你从nums中找出和最大的一对数,且这两个数数位上最大的数字相等。 返回最大和,如果不存在满…...
Node.js学习笔记-04
这第九章也是个大重点 九、玩转进程 Node在选型时决定在V8引擎之上构建,也就意味着它的模型与浏览器类似。 本章关于进程的介绍和讨论将会解决如下两个问题: 单进程单线程并非完美,如今CPU基本均是多核的,真正的服务器…...
基于dbn+svr的交通流量预测,dbn详细原理
目录 背影 DBN神经网络的原理 DBN神经网络的定义 受限玻尔兹曼机(RBM) DBN+SVR的交通流量预测 基本结构 主要参数 数据 MATALB代码 结果图 展望 背影 DBN是一种深度学习神经网络,拥有提取特征,非监督学习的能力,是一种非常好的分类算法,本文将DBN+SVR用于交通流量预测…...
【第一阶段】kotlin中反引号中的函数名特点
在kotlin中可以直接中文定义函数,使用反引号进行调用 eg: fun main() {2023年8月9日定义的函数(5) }private fun 2023年8月9日定义的函数(num:Int){println("反引号的用法$num") }执行结果 在Java中is,in可以定义方法,但是在kotlin中is,in是…...
数据分析-python学习 (1)numpy相关
内容为:https://juejin.cn/book/7240731597035864121的学习笔记 导包 import numpy as np numpy数组创建 创建全0数组,正态分布、随机数组等就不说了,提供了相应的方法通过已有数据创建有两种 arr1np.array([1,2,3,4,5]) 或者datanp.loadt…...
数据库的游标
数据库的游标(Cursor)是用于在数据库中进行数据操作的一个控制结构。它类似于在编程语言中使用的指针或迭代器,用于遍历数据库结果集并在结果集上执行各种操作。 游标允许我们在数据库查询的结果集中逐行移动,并对每一行执行特定…...
【设计模式】前端控制器模式
前端控制器模式(Front Controller Pattern)是用来提供一个集中的请求处理机制,所有的请求都将由一个单一的处理程序处理。该处理程序可以做认证/授权/记录日志,或者跟踪请求,然后把请求传给相应的处理程序。以下是这种…...
SQL | 过滤数据
4-过滤数据 4.1-使用WHERE子句 数据根据 WHERE 子句中指定的搜索条件进行过滤。WHERE 子句在表名( FROM 子句)之后给出。 select prod_name,prod_price from products where prod_price 3.49; 上述语句查询价格为3.49的行,然后输出名字和…...
【力扣每日一题】2023.8.13 合并两个有序数组
目录 题目: 示例: 分析: 代码: 题目: 示例: 分析: 题目给我们两个升序数组,让我们合并它们,要求合并之后仍然是升序,并且这个合并操作是在数组1原地修改…...
数据结构篇七:排序
文章目录 前言1.插入排序1.1 基本思想1.2 代码实现1.3 特性总结 2.希尔排序2.1 基本思想2.2 代码实现2.3 特性总结 3. 选择排序3.1 基本思想3.2 代码实现3.3 特性总结 4. 堆排序4.1 基本思想4.2 代码实现4.3 特性总结 5. 冒泡排序5.1 基本思想5.2 代码实现5.3 特性总结 6. 快速…...
Vue组件的边界情况
01.$root; 访问组件的根实例;用的不多,基本上在vuex上进行数据操作; 02.$parent/$children; 可以获得父组件或者子组件上边的数据;一般不建议使用$parent,因为如果获取这个值进行修改的话,也会更改父组件上…...
less、sass的使用及其区别
CSS预处理器 CSS 预处理器是一种扩展了原生 CSS 的工具,它们添加了一些编程语言的特性,以便更有效地编写、组织和维护样式代码。预处理器允许开发者使用变量、嵌套、函数、混合等功能,从而使 CSS 更具可读性、可维护性和重用性,特…...
[保研/考研机试] 猫狗收容所 C++实现
题目描述: 输入: 第一个是n,它代表操作序列的次数。接下来是n行,每行有两个值m和t,分别代表题目中操作的两个元素。 输出: 按顺序输出收养动物的序列,编号之间以空格间隔。 源代码ÿ…...
Kotlin 基础教程一
Kotlin 基本数据类型 Java | Kotlin byte Byte short Short int Int long Long float Float double Double boolean Boolean c…...
数据结构笔记--前缀树的实现
1--前缀树的实现 前缀树的每一个节点拥有三个成员变量,pass表示有多少个字符串经过该节点,end表示有多少个字符串以该节点结尾,nexts表示该字符串可以走向哪些节点; #include <iostream> #include <unordered_map>str…...
C/C++时间获取函数
time.h包含C/C中用于获取时间,和时间转换方面的函数。 1、time() 函数 time_t time(time_t *seconds) 返回自(1970-01-01 00:00:00 UTC)起经过的时间,以秒为单位。如果 seconds 不为空,则返回值也存储在变量 seconds …...
sql中判断日期是否是同一天
sql中判断日期是否是同一天的sql sql: select id,product_id,seckill_price,stock_count,time,intergral,start_date from t_seckill_product where to_days(start_date) to_days(now()) to_days函数: 使用to_days(start_date) to_days(now())的方式是一种常见的…...
NAS搭建指南一——服务器的选择与搭建
一、服务器的选择 有自己的本地的公网 IP 的请跳过此篇文章按需求选择一个云服务器,目的就是为了进行 frp 的搭建,完成内网穿透我选择的是腾讯云服务器,我的配置如下,仅供参考: 4. 腾讯云服务器官网地址 二、服务器…...
豪越HYDO智能运维助力智慧医院信息化建设
随着国家政策的推动与支持,医疗行业信息化应用不断普及,大数据、AI、医疗物联网等技术的应用,快速推动了电子病历、智慧服务、智慧管理的智慧医院建设和医院信息标准化建设,通过不断探索创新“智慧医院”服务模式,实现…...
绵阳高新区小学晚托自习
在绵阳石桥铺,孩子在家写作业拖拉磨蹭、坐不住,手机干扰不断等问题让家长们头疼不已。而分小全AI智能学习体验中心旗下的分小全智习室,正是解决这些问题的专业之选。督学老师资质分小全智习室的督学老师均具备师范类或教育学专业背景…...
喔去,litellm 竟然被投毒了,赶紧检查你的机器中招了没有稳
一、什么是setuptools? setuptools 是一个用于创建、分发和安装 Python 包的核心库。 它可以帮助你: 定义 Python 包的元数据(如名称、版本、作者等)。 声明包的依赖项,确保你的包能够正确运行。 构建源代码分发包&…...
OpenClaw替代脚本:Qwen3.5-9B实现复杂自动化优势
OpenClaw替代脚本:Qwen3.5-9B实现复杂自动化优势 1. 为什么需要重新思考自动化工具链 三周前的一个深夜,我盯着屏幕上第17次报错的Python脚本发呆。这个用来批量重命名设计稿文件的脚本,因为文件名中突然出现的emoji字符再次崩溃。就在这个…...
深度解析PPP协议:定义、核心功能、工作流程与应用场景
深度解析PPP协议:定义、核心功能、工作流程与应用场景摘要一、PPP协议:基础定义1.1 PPP协议:是什么1.2 PPP协议:核心功能二、PPP协议:基本工作流程(完整版流程图)2.1 PPP工作流程图2.2 PPP完整工…...
从零入门性能测试:理论+JMETER实操,看完就能上手婪
一、环境准备 Free Spire.Doc for Python 是免费 Python 文档处理库,无需依赖 Microsoft Word,支持 Word 文档的创建、编辑、转换等操作,其中内置的 Markdown 解析能力,能高效实现 Markdown 到 Doc/Docx 格式的转换,且…...
WeChatMsg:如何从微信聊天记录中提取个人数据宝藏并生成年度报告?
WeChatMsg:如何从微信聊天记录中提取个人数据宝藏并生成年度报告? 【免费下载链接】WeChatMsg 提取微信聊天记录,将其导出成HTML、Word、CSV文档永久保存,对聊天记录进行分析生成年度聊天报告 项目地址: https://gitcode.com/Gi…...
2025年项目管理工具深度评测:Gitee如何以“代码+管理“双核驱动重塑企业协作
数字化转型浪潮下,项目管理工具市场正迎来前所未有的变革。作为中国最大的代码托管平台,Gitee通过全面升级项目管理模块,在2025年的工具评测中展现出独特竞争力。本文将深入解析Gitee的核心优势、行业应用场景以及未来发展趋势,为…...
MySQL ER_IB_MSG_919报错解析,故障修复与远程处理指南
快速解决MySQL错误ER_IB_MSG_919 (MY-012744)的方法是备份数据文件,检查并修复表空间文件损坏,必要时使用innodb_force_recovery参数启动并导出数据重建数据库。 错误代码含义解析 ER_IB_MSG_919,对应内部错误代码MY-012744,是M…...
MySQL性能优化10大技巧:让你的数据库运行如飞[特殊字符]
MySQL性能优化10大技巧:让你的数据库运行如飞🚀 【免费下载链接】mysql-tutorial MySQL入门教程(MySQL tutorial book) 项目地址: https://gitcode.com/gh_mirrors/mys/mysql-tutorial MySQL作为最流行的开源数据库之一&am…...
2025届学术党必备的六大降重复率工具推荐榜单
Ai论文网站排名(开题报告、文献综述、降aigc率、降重综合对比) TOP1. 千笔AI TOP2. aipasspaper TOP3. 清北论文 TOP4. 豆包 TOP5. kimi TOP6. deepseek 需从文本特征方面来着手降低AIGC率,详而言之,对于高频词汇能够予以替…...
