当前位置: 首页 > news >正文

Kafka: 详解、使用教程和示例

Kafka: 详细介绍、使用教程和示例

什么是 Kafka?

Kafka 是一个分布式的流处理平台,最初由 LinkedIn 开发,现已成为 Apache 基金会的顶级项目。它以高吞吐量、可靠性和可扩展性而闻名,被广泛应用于实时数据传输、日志收集、事件处理和流式分析等场景。Kafka 的设计目标在于处理大规模的数据流,使其成为构建现代分布式应用的理想选择。

Kafka 的核心概念

在深入了解 Kafka 的使用教程之前,让我们先介绍一些 Kafka 的核心概念,这些概念是理解 Kafka 的基础:

  • Broker: Kafka 集群中的每个服务器节点称为 Broker,它们负责存储和处理数据。

  • Topic: 消息发布的主题,是数据流的类别。生产者将消息发布到主题,消费者从主题中订阅消息。

  • Partition: 每个 Topic 可以分成多个 Partition,每个 Partition 是一个有序的消息队列。分区允许数据水平分布和并行处理。

  • Producer: 数据的发布者,将消息发送到一个或多个 Topic。

  • Consumer: 数据的订阅者,从一个或多个 Topic 中消费消息。

  • Consumer Group: 一组消费者的集合,共同消费一个 Topic 的消息。每个分区只能由一个消费者组中的一个消费者消费。

  • Offset: 每个消息在 Partition 中的唯一标识,消费者使用 Offset 来追踪已消费的消息。

如何使用 Kafka?

以下是一个详细的 Kafka 使用教程,从安装到实际示例,全面介绍了 Kafka 的用法:

1. 安装和启动 Kafka

首先,你需要安装 Kafka。你可以从官方网站(https://kafka.apache.org/downloads)下载最新版本,并按照指南进行安装。在安装完成后,你需要启动 Kafka 服务器和 ZooKeeper。

启动 ZooKeeper(Kafka 依赖于 ZooKeeper):

bin/zookeeper-server-start.sh config/zookeeper.properties

然后,启动 Kafka 服务器:

bin/kafka-server-start.sh config/server.properties

2. 创建 Topic

在 Kafka 中,你需要创建一个或多个 Topic 来存储消息。使用以下命令创建一个名为 my-topic 的 Topic:

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

这将创建一个名为 my-topic 的 Topic,拥有 3 个分区和 1 个副本。

3. 使用 Kafka 生产者

Kafka 生产者用于将消息发布到指定的 Topic 中。以下是一个简单的 Java 生产者示例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(properties);String topic = "my-topic";for (int i = 0; i < 10; i++) {String message = "Message " + i;producer.send(new ProducerRecord<>(topic, message));System.out.println("Sent: " + message);}producer.close();}
}

4. 使用 Kafka 消费者

Kafka 消费者从 Topic 中订阅并处理消息。以下是一个简单的 Java 消费者示例:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(properties);String topic = "my-topic";consumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));records.forEach(record -> {System.out.println("Received: " + record.value());});}}
}

5. 运行示例

首先,打开一个终端窗口,运行 Kafka 生产者示例:

java KafkaProducerExample

然后,打开另一个终端窗口,运行 Kafka 消费者示例:

java KafkaConsumerExample

你将会看到生产者发送的消息被消费者接收和处理。

总结

Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。

相关文章:

Kafka: 详解、使用教程和示例

Kafka: 详细介绍、使用教程和示例 什么是 Kafka&#xff1f; Kafka 是一个分布式的流处理平台&#xff0c;最初由 LinkedIn 开发&#xff0c;现已成为 Apache 基金会的顶级项目。它以高吞吐量、可靠性和可扩展性而闻名&#xff0c;被广泛应用于实时数据传输、日志收集、事件处…...

【LeetCode周赛】LeetCode第358场周赛

LeetCode第358场周赛 数组中的最大数对和翻倍以链表形式表示的数字限制条件下元素之间的最小绝对差 数组中的最大数对和 给你一个下标从0开始的整数数组nums。请你从nums中找出和最大的一对数&#xff0c;且这两个数数位上最大的数字相等。 返回最大和&#xff0c;如果不存在满…...

Node.js学习笔记-04

这第九章也是个大重点 九、玩转进程 Node在选型时决定在V8引擎之上构建&#xff0c;也就意味着它的模型与浏览器类似。 本章关于进程的介绍和讨论将会解决如下两个问题&#xff1a; 单进程单线程并非完美&#xff0c;如今CPU基本均是多核的&#xff0c;真正的服务器&#xf…...

基于dbn+svr的交通流量预测,dbn详细原理

目录 背影 DBN神经网络的原理 DBN神经网络的定义 受限玻尔兹曼机(RBM) DBN+SVR的交通流量预测 基本结构 主要参数 数据 MATALB代码 结果图 展望 背影 DBN是一种深度学习神经网络,拥有提取特征,非监督学习的能力,是一种非常好的分类算法,本文将DBN+SVR用于交通流量预测…...

【第一阶段】kotlin中反引号中的函数名特点

在kotlin中可以直接中文定义函数&#xff0c;使用反引号进行调用 eg: fun main() {2023年8月9日定义的函数(5) }private fun 2023年8月9日定义的函数(num:Int){println("反引号的用法$num") }执行结果 在Java中is,in可以定义方法&#xff0c;但是在kotlin中is,in是…...

数据分析-python学习 (1)numpy相关

内容为&#xff1a;https://juejin.cn/book/7240731597035864121的学习笔记 导包 import numpy as np numpy数组创建 创建全0数组&#xff0c;正态分布、随机数组等就不说了&#xff0c;提供了相应的方法通过已有数据创建有两种 arr1np.array([1,2,3,4,5]) 或者datanp.loadt…...

数据库的游标

数据库的游标&#xff08;Cursor&#xff09;是用于在数据库中进行数据操作的一个控制结构。它类似于在编程语言中使用的指针或迭代器&#xff0c;用于遍历数据库结果集并在结果集上执行各种操作。 游标允许我们在数据库查询的结果集中逐行移动&#xff0c;并对每一行执行特定…...

【设计模式】前端控制器模式

前端控制器模式&#xff08;Front Controller Pattern&#xff09;是用来提供一个集中的请求处理机制&#xff0c;所有的请求都将由一个单一的处理程序处理。该处理程序可以做认证/授权/记录日志&#xff0c;或者跟踪请求&#xff0c;然后把请求传给相应的处理程序。以下是这种…...

SQL | 过滤数据

4-过滤数据 4.1-使用WHERE子句 数据根据 WHERE 子句中指定的搜索条件进行过滤。WHERE 子句在表名&#xff08; FROM 子句&#xff09;之后给出。 select prod_name,prod_price from products where prod_price 3.49; 上述语句查询价格为3.49的行&#xff0c;然后输出名字和…...

【力扣每日一题】2023.8.13 合并两个有序数组

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目给我们两个升序数组&#xff0c;让我们合并它们&#xff0c;要求合并之后仍然是升序&#xff0c;并且这个合并操作是在数组1原地修改…...

数据结构篇七:排序

文章目录 前言1.插入排序1.1 基本思想1.2 代码实现1.3 特性总结 2.希尔排序2.1 基本思想2.2 代码实现2.3 特性总结 3. 选择排序3.1 基本思想3.2 代码实现3.3 特性总结 4. 堆排序4.1 基本思想4.2 代码实现4.3 特性总结 5. 冒泡排序5.1 基本思想5.2 代码实现5.3 特性总结 6. 快速…...

Vue组件的边界情况

01.$root&#xff1b; 访问组件的根实例&#xff1b;用的不多&#xff0c;基本上在vuex上进行数据操作&#xff1b; 02.$parent/$children; 可以获得父组件或者子组件上边的数据&#xff1b;一般不建议使用$parent,因为如果获取这个值进行修改的话&#xff0c;也会更改父组件上…...

less、sass的使用及其区别

CSS预处理器 CSS 预处理器是一种扩展了原生 CSS 的工具&#xff0c;它们添加了一些编程语言的特性&#xff0c;以便更有效地编写、组织和维护样式代码。预处理器允许开发者使用变量、嵌套、函数、混合等功能&#xff0c;从而使 CSS 更具可读性、可维护性和重用性&#xff0c;特…...

[保研/考研机试] 猫狗收容所 C++实现

题目描述&#xff1a; 输入&#xff1a; 第一个是n&#xff0c;它代表操作序列的次数。接下来是n行&#xff0c;每行有两个值m和t&#xff0c;分别代表题目中操作的两个元素。 输出&#xff1a; 按顺序输出收养动物的序列&#xff0c;编号之间以空格间隔。 源代码&#xff…...

Kotlin 基础教程一

Kotlin 基本数据类型 Java | Kotlin byte Byte short Short int Int long Long float Float double Double boolean Boolean c…...

数据结构笔记--前缀树的实现

1--前缀树的实现 前缀树的每一个节点拥有三个成员变量&#xff0c;pass表示有多少个字符串经过该节点&#xff0c;end表示有多少个字符串以该节点结尾&#xff0c;nexts表示该字符串可以走向哪些节点&#xff1b; #include <iostream> #include <unordered_map>str…...

C/C++时间获取函数

time.h包含C/C中用于获取时间&#xff0c;和时间转换方面的函数。 1、time() 函数 time_t time(time_t *seconds) 返回自&#xff08;1970-01-01 00:00:00 UTC&#xff09;起经过的时间&#xff0c;以秒为单位。如果 seconds 不为空&#xff0c;则返回值也存储在变量 seconds …...

sql中判断日期是否是同一天

sql中判断日期是否是同一天的sql sql: select id,product_id,seckill_price,stock_count,time,intergral,start_date from t_seckill_product where to_days(start_date) to_days(now()) to_days函数&#xff1a; 使用to_days(start_date) to_days(now())的方式是一种常见的…...

NAS搭建指南一——服务器的选择与搭建

一、服务器的选择 有自己的本地的公网 IP 的请跳过此篇文章按需求选择一个云服务器&#xff0c;目的就是为了进行 frp 的搭建&#xff0c;完成内网穿透我选择的是腾讯云服务器&#xff0c;我的配置如下&#xff0c;仅供参考&#xff1a; 4. 腾讯云服务器官网地址 二、服务器…...

豪越HYDO智能运维助力智慧医院信息化建设

随着国家政策的推动与支持&#xff0c;医疗行业信息化应用不断普及&#xff0c;大数据、AI、医疗物联网等技术的应用&#xff0c;快速推动了电子病历、智慧服务、智慧管理的智慧医院建设和医院信息标准化建设&#xff0c;通过不断探索创新“智慧医院”服务模式&#xff0c;实现…...

KubeSphere 容器平台高可用:环境搭建与可视化操作指南

Linux_k8s篇 欢迎来到Linux的世界&#xff0c;看笔记好好学多敲多打&#xff0c;每个人都是大神&#xff01; 题目&#xff1a;KubeSphere 容器平台高可用&#xff1a;环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...

边缘计算医疗风险自查APP开发方案

核心目标:在便携设备(智能手表/家用检测仪)部署轻量化疾病预测模型,实现低延迟、隐私安全的实时健康风险评估。 一、技术架构设计 #mermaid-svg-iuNaeeLK2YoFKfao {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1)&#xff1a;从基础到实战的深度解析-CSDN博客&#xff0c;但实际面试中&#xff0c;企业更关注候选人对复杂场景的应对能力&#xff08;如多设备并发扫描、低功耗与高发现率的平衡&#xff09;和前沿技术的…...

【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力

引言&#xff1a; 在人工智能快速发展的浪潮中&#xff0c;快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型&#xff08;LLM&#xff09;。该模型代表着该领域的重大突破&#xff0c;通过独特方式融合思考与非思考…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序

一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...

select、poll、epoll 与 Reactor 模式

在高并发网络编程领域&#xff0c;高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表&#xff0c;以及基于它们实现的 Reactor 模式&#xff0c;为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。​ 一、I…...

Python ROS2【机器人中间件框架】 简介

销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...

基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解

JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用&#xff0c;结合SQLite数据库实现联系人管理功能&#xff0c;并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能&#xff0c;同时可以最小化到系统…...

JavaScript基础-API 和 Web API

在学习JavaScript的过程中&#xff0c;理解API&#xff08;应用程序接口&#xff09;和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能&#xff0c;使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...

【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)

LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 题目描述解题思路Java代码 题目描述 题目链接&#xff1a;LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…...