当前位置: 首页 > news >正文

Kafka: 详解、使用教程和示例

Kafka: 详细介绍、使用教程和示例

什么是 Kafka?

Kafka 是一个分布式的流处理平台,最初由 LinkedIn 开发,现已成为 Apache 基金会的顶级项目。它以高吞吐量、可靠性和可扩展性而闻名,被广泛应用于实时数据传输、日志收集、事件处理和流式分析等场景。Kafka 的设计目标在于处理大规模的数据流,使其成为构建现代分布式应用的理想选择。

Kafka 的核心概念

在深入了解 Kafka 的使用教程之前,让我们先介绍一些 Kafka 的核心概念,这些概念是理解 Kafka 的基础:

  • Broker: Kafka 集群中的每个服务器节点称为 Broker,它们负责存储和处理数据。

  • Topic: 消息发布的主题,是数据流的类别。生产者将消息发布到主题,消费者从主题中订阅消息。

  • Partition: 每个 Topic 可以分成多个 Partition,每个 Partition 是一个有序的消息队列。分区允许数据水平分布和并行处理。

  • Producer: 数据的发布者,将消息发送到一个或多个 Topic。

  • Consumer: 数据的订阅者,从一个或多个 Topic 中消费消息。

  • Consumer Group: 一组消费者的集合,共同消费一个 Topic 的消息。每个分区只能由一个消费者组中的一个消费者消费。

  • Offset: 每个消息在 Partition 中的唯一标识,消费者使用 Offset 来追踪已消费的消息。

如何使用 Kafka?

以下是一个详细的 Kafka 使用教程,从安装到实际示例,全面介绍了 Kafka 的用法:

1. 安装和启动 Kafka

首先,你需要安装 Kafka。你可以从官方网站(https://kafka.apache.org/downloads)下载最新版本,并按照指南进行安装。在安装完成后,你需要启动 Kafka 服务器和 ZooKeeper。

启动 ZooKeeper(Kafka 依赖于 ZooKeeper):

bin/zookeeper-server-start.sh config/zookeeper.properties

然后,启动 Kafka 服务器:

bin/kafka-server-start.sh config/server.properties

2. 创建 Topic

在 Kafka 中,你需要创建一个或多个 Topic 来存储消息。使用以下命令创建一个名为 my-topic 的 Topic:

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

这将创建一个名为 my-topic 的 Topic,拥有 3 个分区和 1 个副本。

3. 使用 Kafka 生产者

Kafka 生产者用于将消息发布到指定的 Topic 中。以下是一个简单的 Java 生产者示例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(properties);String topic = "my-topic";for (int i = 0; i < 10; i++) {String message = "Message " + i;producer.send(new ProducerRecord<>(topic, message));System.out.println("Sent: " + message);}producer.close();}
}

4. 使用 Kafka 消费者

Kafka 消费者从 Topic 中订阅并处理消息。以下是一个简单的 Java 消费者示例:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(properties);String topic = "my-topic";consumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));records.forEach(record -> {System.out.println("Received: " + record.value());});}}
}

5. 运行示例

首先,打开一个终端窗口,运行 Kafka 生产者示例:

java KafkaProducerExample

然后,打开另一个终端窗口,运行 Kafka 消费者示例:

java KafkaConsumerExample

你将会看到生产者发送的消息被消费者接收和处理。

总结

Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。

相关文章:

Kafka: 详解、使用教程和示例

Kafka: 详细介绍、使用教程和示例 什么是 Kafka&#xff1f; Kafka 是一个分布式的流处理平台&#xff0c;最初由 LinkedIn 开发&#xff0c;现已成为 Apache 基金会的顶级项目。它以高吞吐量、可靠性和可扩展性而闻名&#xff0c;被广泛应用于实时数据传输、日志收集、事件处…...

【LeetCode周赛】LeetCode第358场周赛

LeetCode第358场周赛 数组中的最大数对和翻倍以链表形式表示的数字限制条件下元素之间的最小绝对差 数组中的最大数对和 给你一个下标从0开始的整数数组nums。请你从nums中找出和最大的一对数&#xff0c;且这两个数数位上最大的数字相等。 返回最大和&#xff0c;如果不存在满…...

Node.js学习笔记-04

这第九章也是个大重点 九、玩转进程 Node在选型时决定在V8引擎之上构建&#xff0c;也就意味着它的模型与浏览器类似。 本章关于进程的介绍和讨论将会解决如下两个问题&#xff1a; 单进程单线程并非完美&#xff0c;如今CPU基本均是多核的&#xff0c;真正的服务器&#xf…...

基于dbn+svr的交通流量预测,dbn详细原理

目录 背影 DBN神经网络的原理 DBN神经网络的定义 受限玻尔兹曼机(RBM) DBN+SVR的交通流量预测 基本结构 主要参数 数据 MATALB代码 结果图 展望 背影 DBN是一种深度学习神经网络,拥有提取特征,非监督学习的能力,是一种非常好的分类算法,本文将DBN+SVR用于交通流量预测…...

【第一阶段】kotlin中反引号中的函数名特点

在kotlin中可以直接中文定义函数&#xff0c;使用反引号进行调用 eg: fun main() {2023年8月9日定义的函数(5) }private fun 2023年8月9日定义的函数(num:Int){println("反引号的用法$num") }执行结果 在Java中is,in可以定义方法&#xff0c;但是在kotlin中is,in是…...

数据分析-python学习 (1)numpy相关

内容为&#xff1a;https://juejin.cn/book/7240731597035864121的学习笔记 导包 import numpy as np numpy数组创建 创建全0数组&#xff0c;正态分布、随机数组等就不说了&#xff0c;提供了相应的方法通过已有数据创建有两种 arr1np.array([1,2,3,4,5]) 或者datanp.loadt…...

数据库的游标

数据库的游标&#xff08;Cursor&#xff09;是用于在数据库中进行数据操作的一个控制结构。它类似于在编程语言中使用的指针或迭代器&#xff0c;用于遍历数据库结果集并在结果集上执行各种操作。 游标允许我们在数据库查询的结果集中逐行移动&#xff0c;并对每一行执行特定…...

【设计模式】前端控制器模式

前端控制器模式&#xff08;Front Controller Pattern&#xff09;是用来提供一个集中的请求处理机制&#xff0c;所有的请求都将由一个单一的处理程序处理。该处理程序可以做认证/授权/记录日志&#xff0c;或者跟踪请求&#xff0c;然后把请求传给相应的处理程序。以下是这种…...

SQL | 过滤数据

4-过滤数据 4.1-使用WHERE子句 数据根据 WHERE 子句中指定的搜索条件进行过滤。WHERE 子句在表名&#xff08; FROM 子句&#xff09;之后给出。 select prod_name,prod_price from products where prod_price 3.49; 上述语句查询价格为3.49的行&#xff0c;然后输出名字和…...

【力扣每日一题】2023.8.13 合并两个有序数组

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目给我们两个升序数组&#xff0c;让我们合并它们&#xff0c;要求合并之后仍然是升序&#xff0c;并且这个合并操作是在数组1原地修改…...

数据结构篇七:排序

文章目录 前言1.插入排序1.1 基本思想1.2 代码实现1.3 特性总结 2.希尔排序2.1 基本思想2.2 代码实现2.3 特性总结 3. 选择排序3.1 基本思想3.2 代码实现3.3 特性总结 4. 堆排序4.1 基本思想4.2 代码实现4.3 特性总结 5. 冒泡排序5.1 基本思想5.2 代码实现5.3 特性总结 6. 快速…...

Vue组件的边界情况

01.$root&#xff1b; 访问组件的根实例&#xff1b;用的不多&#xff0c;基本上在vuex上进行数据操作&#xff1b; 02.$parent/$children; 可以获得父组件或者子组件上边的数据&#xff1b;一般不建议使用$parent,因为如果获取这个值进行修改的话&#xff0c;也会更改父组件上…...

less、sass的使用及其区别

CSS预处理器 CSS 预处理器是一种扩展了原生 CSS 的工具&#xff0c;它们添加了一些编程语言的特性&#xff0c;以便更有效地编写、组织和维护样式代码。预处理器允许开发者使用变量、嵌套、函数、混合等功能&#xff0c;从而使 CSS 更具可读性、可维护性和重用性&#xff0c;特…...

[保研/考研机试] 猫狗收容所 C++实现

题目描述&#xff1a; 输入&#xff1a; 第一个是n&#xff0c;它代表操作序列的次数。接下来是n行&#xff0c;每行有两个值m和t&#xff0c;分别代表题目中操作的两个元素。 输出&#xff1a; 按顺序输出收养动物的序列&#xff0c;编号之间以空格间隔。 源代码&#xff…...

Kotlin 基础教程一

Kotlin 基本数据类型 Java | Kotlin byte Byte short Short int Int long Long float Float double Double boolean Boolean c…...

数据结构笔记--前缀树的实现

1--前缀树的实现 前缀树的每一个节点拥有三个成员变量&#xff0c;pass表示有多少个字符串经过该节点&#xff0c;end表示有多少个字符串以该节点结尾&#xff0c;nexts表示该字符串可以走向哪些节点&#xff1b; #include <iostream> #include <unordered_map>str…...

C/C++时间获取函数

time.h包含C/C中用于获取时间&#xff0c;和时间转换方面的函数。 1、time() 函数 time_t time(time_t *seconds) 返回自&#xff08;1970-01-01 00:00:00 UTC&#xff09;起经过的时间&#xff0c;以秒为单位。如果 seconds 不为空&#xff0c;则返回值也存储在变量 seconds …...

sql中判断日期是否是同一天

sql中判断日期是否是同一天的sql sql: select id,product_id,seckill_price,stock_count,time,intergral,start_date from t_seckill_product where to_days(start_date) to_days(now()) to_days函数&#xff1a; 使用to_days(start_date) to_days(now())的方式是一种常见的…...

NAS搭建指南一——服务器的选择与搭建

一、服务器的选择 有自己的本地的公网 IP 的请跳过此篇文章按需求选择一个云服务器&#xff0c;目的就是为了进行 frp 的搭建&#xff0c;完成内网穿透我选择的是腾讯云服务器&#xff0c;我的配置如下&#xff0c;仅供参考&#xff1a; 4. 腾讯云服务器官网地址 二、服务器…...

豪越HYDO智能运维助力智慧医院信息化建设

随着国家政策的推动与支持&#xff0c;医疗行业信息化应用不断普及&#xff0c;大数据、AI、医疗物联网等技术的应用&#xff0c;快速推动了电子病历、智慧服务、智慧管理的智慧医院建设和医院信息标准化建设&#xff0c;通过不断探索创新“智慧医院”服务模式&#xff0c;实现…...

conda相比python好处

Conda 作为 Python 的环境和包管理工具&#xff0c;相比原生 Python 生态&#xff08;如 pip 虚拟环境&#xff09;有许多独特优势&#xff0c;尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处&#xff1a; 一、一站式环境管理&#xff1a…...

Oracle查询表空间大小

1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...

c#开发AI模型对话

AI模型 前面已经介绍了一般AI模型本地部署&#xff0c;直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型&#xff0c;但是目前国内可能使用不多&#xff0c;至少实践例子很少看见。开发训练模型就不介绍了&am…...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

Java 二维码

Java 二维码 **技术&#xff1a;**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...

力扣热题100 k个一组反转链表题解

题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...

Razor编程中@Html的方法使用大全

文章目录 1. 基础HTML辅助方法1.1 Html.ActionLink()1.2 Html.RouteLink()1.3 Html.Display() / Html.DisplayFor()1.4 Html.Editor() / Html.EditorFor()1.5 Html.Label() / Html.LabelFor()1.6 Html.TextBox() / Html.TextBoxFor() 2. 表单相关辅助方法2.1 Html.BeginForm() …...

Web中间件--tomcat学习

Web中间件–tomcat Java虚拟机详解 什么是JAVA虚拟机 Java虚拟机是一个抽象的计算机&#xff0c;它可以执行Java字节码。Java虚拟机是Java平台的一部分&#xff0c;Java平台由Java语言、Java API和Java虚拟机组成。Java虚拟机的主要作用是将Java字节码转换为机器代码&#x…...

GO协程(Goroutine)问题总结

在使用Go语言来编写代码时&#xff0c;遇到的一些问题总结一下 [参考文档]&#xff1a;https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/goroutine.html 1. main()函数默认的Goroutine 场景再现&#xff1a; 今天在看到这个教程的时候&#xff0c;在自己的电…...

Java求职者面试指南:Spring、Spring Boot、Spring MVC与MyBatis技术解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、Spring MVC与MyBatis技术解析 一、第一轮基础概念问题 1. Spring框架的核心容器是什么&#xff1f;它的作用是什么&#xff1f; Spring框架的核心容器是IoC&#xff08;控制反转&#xff09;容器。它的主要作用是管理对…...