当前位置: 首页 > news >正文

Kafka: 详解、使用教程和示例

Kafka: 详细介绍、使用教程和示例

什么是 Kafka?

Kafka 是一个分布式的流处理平台,最初由 LinkedIn 开发,现已成为 Apache 基金会的顶级项目。它以高吞吐量、可靠性和可扩展性而闻名,被广泛应用于实时数据传输、日志收集、事件处理和流式分析等场景。Kafka 的设计目标在于处理大规模的数据流,使其成为构建现代分布式应用的理想选择。

Kafka 的核心概念

在深入了解 Kafka 的使用教程之前,让我们先介绍一些 Kafka 的核心概念,这些概念是理解 Kafka 的基础:

  • Broker: Kafka 集群中的每个服务器节点称为 Broker,它们负责存储和处理数据。

  • Topic: 消息发布的主题,是数据流的类别。生产者将消息发布到主题,消费者从主题中订阅消息。

  • Partition: 每个 Topic 可以分成多个 Partition,每个 Partition 是一个有序的消息队列。分区允许数据水平分布和并行处理。

  • Producer: 数据的发布者,将消息发送到一个或多个 Topic。

  • Consumer: 数据的订阅者,从一个或多个 Topic 中消费消息。

  • Consumer Group: 一组消费者的集合,共同消费一个 Topic 的消息。每个分区只能由一个消费者组中的一个消费者消费。

  • Offset: 每个消息在 Partition 中的唯一标识,消费者使用 Offset 来追踪已消费的消息。

如何使用 Kafka?

以下是一个详细的 Kafka 使用教程,从安装到实际示例,全面介绍了 Kafka 的用法:

1. 安装和启动 Kafka

首先,你需要安装 Kafka。你可以从官方网站(https://kafka.apache.org/downloads)下载最新版本,并按照指南进行安装。在安装完成后,你需要启动 Kafka 服务器和 ZooKeeper。

启动 ZooKeeper(Kafka 依赖于 ZooKeeper):

bin/zookeeper-server-start.sh config/zookeeper.properties

然后,启动 Kafka 服务器:

bin/kafka-server-start.sh config/server.properties

2. 创建 Topic

在 Kafka 中,你需要创建一个或多个 Topic 来存储消息。使用以下命令创建一个名为 my-topic 的 Topic:

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

这将创建一个名为 my-topic 的 Topic,拥有 3 个分区和 1 个副本。

3. 使用 Kafka 生产者

Kafka 生产者用于将消息发布到指定的 Topic 中。以下是一个简单的 Java 生产者示例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(properties);String topic = "my-topic";for (int i = 0; i < 10; i++) {String message = "Message " + i;producer.send(new ProducerRecord<>(topic, message));System.out.println("Sent: " + message);}producer.close();}
}

4. 使用 Kafka 消费者

Kafka 消费者从 Topic 中订阅并处理消息。以下是一个简单的 Java 消费者示例:

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(properties);String topic = "my-topic";consumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));records.forEach(record -> {System.out.println("Received: " + record.value());});}}
}

5. 运行示例

首先,打开一个终端窗口,运行 Kafka 生产者示例:

java KafkaProducerExample

然后,打开另一个终端窗口,运行 Kafka 消费者示例:

java KafkaConsumerExample

你将会看到生产者发送的消息被消费者接收和处理。

总结

Kafka 是一个强大的分布式流处理平台,用于实时数据传输和处理。通过本文详细的介绍、使用教程和示例,你可以了解 Kafka 的核心概念、安装、创建 Topic、使用生产者和消费者,从而为构建现代分布式应用打下坚实的基础。无论是构建实时数据流平台、日志收集系统还是事件驱动架构,Kafka 都是一个可靠、高效的解决方案。

相关文章:

Kafka: 详解、使用教程和示例

Kafka: 详细介绍、使用教程和示例 什么是 Kafka&#xff1f; Kafka 是一个分布式的流处理平台&#xff0c;最初由 LinkedIn 开发&#xff0c;现已成为 Apache 基金会的顶级项目。它以高吞吐量、可靠性和可扩展性而闻名&#xff0c;被广泛应用于实时数据传输、日志收集、事件处…...

【LeetCode周赛】LeetCode第358场周赛

LeetCode第358场周赛 数组中的最大数对和翻倍以链表形式表示的数字限制条件下元素之间的最小绝对差 数组中的最大数对和 给你一个下标从0开始的整数数组nums。请你从nums中找出和最大的一对数&#xff0c;且这两个数数位上最大的数字相等。 返回最大和&#xff0c;如果不存在满…...

Node.js学习笔记-04

这第九章也是个大重点 九、玩转进程 Node在选型时决定在V8引擎之上构建&#xff0c;也就意味着它的模型与浏览器类似。 本章关于进程的介绍和讨论将会解决如下两个问题&#xff1a; 单进程单线程并非完美&#xff0c;如今CPU基本均是多核的&#xff0c;真正的服务器&#xf…...

基于dbn+svr的交通流量预测,dbn详细原理

目录 背影 DBN神经网络的原理 DBN神经网络的定义 受限玻尔兹曼机(RBM) DBN+SVR的交通流量预测 基本结构 主要参数 数据 MATALB代码 结果图 展望 背影 DBN是一种深度学习神经网络,拥有提取特征,非监督学习的能力,是一种非常好的分类算法,本文将DBN+SVR用于交通流量预测…...

【第一阶段】kotlin中反引号中的函数名特点

在kotlin中可以直接中文定义函数&#xff0c;使用反引号进行调用 eg: fun main() {2023年8月9日定义的函数(5) }private fun 2023年8月9日定义的函数(num:Int){println("反引号的用法$num") }执行结果 在Java中is,in可以定义方法&#xff0c;但是在kotlin中is,in是…...

数据分析-python学习 (1)numpy相关

内容为&#xff1a;https://juejin.cn/book/7240731597035864121的学习笔记 导包 import numpy as np numpy数组创建 创建全0数组&#xff0c;正态分布、随机数组等就不说了&#xff0c;提供了相应的方法通过已有数据创建有两种 arr1np.array([1,2,3,4,5]) 或者datanp.loadt…...

数据库的游标

数据库的游标&#xff08;Cursor&#xff09;是用于在数据库中进行数据操作的一个控制结构。它类似于在编程语言中使用的指针或迭代器&#xff0c;用于遍历数据库结果集并在结果集上执行各种操作。 游标允许我们在数据库查询的结果集中逐行移动&#xff0c;并对每一行执行特定…...

【设计模式】前端控制器模式

前端控制器模式&#xff08;Front Controller Pattern&#xff09;是用来提供一个集中的请求处理机制&#xff0c;所有的请求都将由一个单一的处理程序处理。该处理程序可以做认证/授权/记录日志&#xff0c;或者跟踪请求&#xff0c;然后把请求传给相应的处理程序。以下是这种…...

SQL | 过滤数据

4-过滤数据 4.1-使用WHERE子句 数据根据 WHERE 子句中指定的搜索条件进行过滤。WHERE 子句在表名&#xff08; FROM 子句&#xff09;之后给出。 select prod_name,prod_price from products where prod_price 3.49; 上述语句查询价格为3.49的行&#xff0c;然后输出名字和…...

【力扣每日一题】2023.8.13 合并两个有序数组

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 题目给我们两个升序数组&#xff0c;让我们合并它们&#xff0c;要求合并之后仍然是升序&#xff0c;并且这个合并操作是在数组1原地修改…...

数据结构篇七:排序

文章目录 前言1.插入排序1.1 基本思想1.2 代码实现1.3 特性总结 2.希尔排序2.1 基本思想2.2 代码实现2.3 特性总结 3. 选择排序3.1 基本思想3.2 代码实现3.3 特性总结 4. 堆排序4.1 基本思想4.2 代码实现4.3 特性总结 5. 冒泡排序5.1 基本思想5.2 代码实现5.3 特性总结 6. 快速…...

Vue组件的边界情况

01.$root&#xff1b; 访问组件的根实例&#xff1b;用的不多&#xff0c;基本上在vuex上进行数据操作&#xff1b; 02.$parent/$children; 可以获得父组件或者子组件上边的数据&#xff1b;一般不建议使用$parent,因为如果获取这个值进行修改的话&#xff0c;也会更改父组件上…...

less、sass的使用及其区别

CSS预处理器 CSS 预处理器是一种扩展了原生 CSS 的工具&#xff0c;它们添加了一些编程语言的特性&#xff0c;以便更有效地编写、组织和维护样式代码。预处理器允许开发者使用变量、嵌套、函数、混合等功能&#xff0c;从而使 CSS 更具可读性、可维护性和重用性&#xff0c;特…...

[保研/考研机试] 猫狗收容所 C++实现

题目描述&#xff1a; 输入&#xff1a; 第一个是n&#xff0c;它代表操作序列的次数。接下来是n行&#xff0c;每行有两个值m和t&#xff0c;分别代表题目中操作的两个元素。 输出&#xff1a; 按顺序输出收养动物的序列&#xff0c;编号之间以空格间隔。 源代码&#xff…...

Kotlin 基础教程一

Kotlin 基本数据类型 Java | Kotlin byte Byte short Short int Int long Long float Float double Double boolean Boolean c…...

数据结构笔记--前缀树的实现

1--前缀树的实现 前缀树的每一个节点拥有三个成员变量&#xff0c;pass表示有多少个字符串经过该节点&#xff0c;end表示有多少个字符串以该节点结尾&#xff0c;nexts表示该字符串可以走向哪些节点&#xff1b; #include <iostream> #include <unordered_map>str…...

C/C++时间获取函数

time.h包含C/C中用于获取时间&#xff0c;和时间转换方面的函数。 1、time() 函数 time_t time(time_t *seconds) 返回自&#xff08;1970-01-01 00:00:00 UTC&#xff09;起经过的时间&#xff0c;以秒为单位。如果 seconds 不为空&#xff0c;则返回值也存储在变量 seconds …...

sql中判断日期是否是同一天

sql中判断日期是否是同一天的sql sql: select id,product_id,seckill_price,stock_count,time,intergral,start_date from t_seckill_product where to_days(start_date) to_days(now()) to_days函数&#xff1a; 使用to_days(start_date) to_days(now())的方式是一种常见的…...

NAS搭建指南一——服务器的选择与搭建

一、服务器的选择 有自己的本地的公网 IP 的请跳过此篇文章按需求选择一个云服务器&#xff0c;目的就是为了进行 frp 的搭建&#xff0c;完成内网穿透我选择的是腾讯云服务器&#xff0c;我的配置如下&#xff0c;仅供参考&#xff1a; 4. 腾讯云服务器官网地址 二、服务器…...

豪越HYDO智能运维助力智慧医院信息化建设

随着国家政策的推动与支持&#xff0c;医疗行业信息化应用不断普及&#xff0c;大数据、AI、医疗物联网等技术的应用&#xff0c;快速推动了电子病历、智慧服务、智慧管理的智慧医院建设和医院信息标准化建设&#xff0c;通过不断探索创新“智慧医院”服务模式&#xff0c;实现…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)

说明&#xff1a; 想象一下&#xff0c;你正在用eNSP搭建一个虚拟的网络世界&#xff0c;里面有虚拟的路由器、交换机、电脑&#xff08;PC&#xff09;等等。这些设备都在你的电脑里面“运行”&#xff0c;它们之间可以互相通信&#xff0c;就像一个封闭的小王国。 但是&#…...

springboot 百货中心供应链管理系统小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;百货中心供应链管理系统被用户普遍使用&#xff0c;为方…...

黑马Mybatis

Mybatis 表现层&#xff1a;页面展示 业务层&#xff1a;逻辑处理 持久层&#xff1a;持久数据化保存 在这里插入图片描述 Mybatis快速入门 ![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/6501c2109c4442118ceb6014725e48e4.png //logback.xml <?xml ver…...

大型活动交通拥堵治理的视觉算法应用

大型活动下智慧交通的视觉分析应用 一、背景与挑战 大型活动&#xff08;如演唱会、马拉松赛事、高考中考等&#xff09;期间&#xff0c;城市交通面临瞬时人流车流激增、传统摄像头模糊、交通拥堵识别滞后等问题。以演唱会为例&#xff0c;暖城商圈曾因观众集中离场导致周边…...

visual studio 2022更改主题为深色

visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中&#xff0c;选择 环境 -> 常规 &#xff0c;将其中的颜色主题改成深色 点击确定&#xff0c;更改完成...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异&#xff0c;它们的数据同步要求既要保持数据的准确性和一致性&#xff0c;又要处理好性能问题。以下是一些主要的技术要点&#xff1a; 数据结构差异 数据类型差异&#xff…...

Linux云原生安全:零信任架构与机密计算

Linux云原生安全&#xff1a;零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言&#xff1a;云原生安全的范式革命 随着云原生技术的普及&#xff0c;安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测&#xff0c;到2025年&#xff0c;零信任架构将成为超…...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

自然语言处理——Transformer

自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效&#xff0c;它能挖掘数据中的时序信息以及语义信息&#xff0c;但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN&#xff0c;但是…...

苹果AI眼镜:从“工具”到“社交姿态”的范式革命——重新定义AI交互入口的未来机会

在2025年的AI硬件浪潮中,苹果AI眼镜(Apple Glasses)正在引发一场关于“人机交互形态”的深度思考。它并非简单地替代AirPods或Apple Watch,而是开辟了一个全新的、日常可接受的AI入口。其核心价值不在于功能的堆叠,而在于如何通过形态设计打破社交壁垒,成为用户“全天佩戴…...