云原生时代 Kafka 深度实践:02快速上手与环境搭建
2.1 本地开发环境搭建
单机模式安装
- 下载与解压:前往Apache Kafka 官网,下载最新稳定版本的 Kafka 二进制包(如
kafka_2.13-3.6.0.tgz
,其中2.13
为 Scala 版本)。解压到本地目录,例如/opt/kafka
:
tar -xzf kafka\_2.13-3.6.0.tgz
mv kafka\_2.13-3.6.0 /opt/kafka
- 配置文件调整:Kafka 的核心配置文件位于
/opt/kafka/config
目录下。
server.properties
:修改关键参数,如listeners=PLAINTEXT://``localhost:9092
指定 Broker 监听地址和端口;log.dirs=/var/lib/kafka-logs
设置消息存储目录;zookeeper.connect=``localhost:2181
(若使用 Zookeeper)配置元数据管理地址。zookeeper.properties
(若未单独安装 Zookeeper):可保持默认配置,默认数据存储目录为/tmp/zookeeper
,端口为2181
。
- 启动服务:依次启动 Zookeeper 和 Kafka Broker:
# 启动Zookeeper(若未单独安装)
/opt/kafka/bin/zookeeper-server-start.sh
/opt/kafka/config/zookeeper.properties# 启动Kafka Broker
/opt/kafka/bin/kafka-server-start.sh
/opt/kafka/config/server.properties
启动后,Kafka 将在localhost:9092
监听 Producer 和 Consumer 的请求。
Docker 容器化部署
使用 Docker Compose 可快速搭建多节点 Kafka 集群,并简化环境管理:
- 创建
docker-compose.yml
文件:
version: '3' # 指定Docker Compose文件版本为3services:zookeeper:image: confluentinc/cp-zookeeper:7.3.0 # 使用Confluent提供的Zookeeper镜像,版本7.3.0environment:ZOOKEEPER_CLIENT_PORT: 2181 # 设置Zookeeper客户端连接端口为2181ZOOKEEPER_TICK_TIME: 2000 # 设置Zookeeper的心跳时间(单位:毫秒)ports:- "2181:2181" # 将容器内的2181端口映射到主机的2181端口kafka:image: confluentinc/cp-kafka:7.3.0 # 使用Confluent提供的Kafka镜像,版本7.3.0depends_on:- zookeeper # 指定Kafka服务依赖于Zookeeper服务environment:KAFKA_BROKER_ID: 1 # 设置Kafka broker的唯一IDKAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' # 指定Kafka连接的Zookeeper地址KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT # 定义监听器安全协议映射KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_INTERNAL://localhost:9093 # 定义对外广播的监听器地址KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:9093 # 定义Kafka监听的地址和端口KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL # 指定broker间通信使用的监听器名称ports:- "9092:9092" # 将容器内的9092端口映射到主机的9092端口
- 启动集群:在包含
docker-compose.yml
的目录下执行:
docker-compose up -d
此配置启动一个单节点 Zookeeper 和一个 Kafka Broker,通过映射本地端口9092
实现外部访问。如需扩展集群,可增加kafka
服务实例并调整配置。
2.2 基础操作入门
命令行工具实战
- 创建 Topic:使用
kafka-topics.sh
命令创建一个名为test_topic
,包含 3 个分区、2 个副本的 Topic:
/opt/kafka/bin/kafka-topics.sh --create \--topic test_topic \--bootstrap-server localhost:9092 \--partitions 3 \--replication-factor 2
- 生产与消费消息:
- 生产者:通过
kafka-console-producer.sh
向test_topic
发送消息:
/opt/kafka/bin/kafka-console-producer.sh \--topic test_topic \--bootstrap-server localhost:9092
输入消息内容(如Hello, Kafka!
)并回车发送。
- 消费者:使用
kafka-console-consumer.sh
从test_topic
拉取消息,支持从头开始消费或从最新位置消费:
# 从头开始消费
/opt/kafka/bin/kafka-console-consumer.sh \--topic test_topic \--from-beginning \--bootstrap-server localhost:9092# 从最新位置消费
/opt/kafka/bin/kafka-console-consumer.sh \--topic test_topic \--bootstrap-server localhost:9092
- 查看 Topic 元数据:使用
--describe
参数查看test_topic
的分区分布、Leader 副本等信息:
/opt/kafka/bin/kafka-topics.sh --describe \--topic test_topic \--bootstrap-server localhost:9092
- 消费位移管理:默认情况下,Consumer 自动提交 Offset。如需手动提交,可在消费时添加
--enable-auto-commit=false
参数,并通过commitSync()
或commitAsync()
方法控制提交时机。
2.3 首个 Java 程序:Producer & Consumer
Maven 依赖配置
在pom.xml
中添加 Kafka 客户端依赖:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>
Producer 代码示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException; public class KafkaProducerExample {public static void main(String[] args) {// 1. 配置Kafka生产者属性Properties props = new Properties();// 设置Kafka集群地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置键的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 设置值的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 设置消息确认机制:等待所有副本确认(最可靠但最慢)props.put(ProducerConfig.ACKS_CONFIG, "all");// 设置发送失败时的重试次数props.put(ProducerConfig.RETRIES_CONFIG, 3);// 2. 创建Kafka生产者实例Producer<String, String> producer = new KafkaProducer<>(props);// 3. 创建要发送的消息记录// 参数:topic名称,消息key,消息valueProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key1", "message1");try {// 4. 发送消息(同步方式)// send()返回Future,get()会阻塞直到收到响应RecordMetadata metadata = producer.send(record).get();// 5. 打印消息发送成功的元数据System.out.println("Message sent to partition " + metadata.partition() + " with offset " + metadata.offset());} catch (InterruptedException | ExecutionException e) {// 6. 处理发送过程中可能出现的异常e.printStackTrace();} finally {// 7. 关闭生产者(重要!避免资源泄漏)producer.close();}}
}
Consumer 代码示例
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties; public class KafkaConsumerExample {public static void main(String[] args) {// 1. 配置Kafka消费者属性Properties props = new Properties();// 设置Kafka集群地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置消费者组ID(同一组内的消费者共享消息)props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_group");// 设置键的反序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 设置值的反序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");// 注意:默认是自动提交offset,这里我们改为手动提交(见下方commitSync())// 2. 创建Kafka消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 订阅主题(可以订阅多个主题,这里用单例集合订阅单个主题)consumer.subscribe(Collections.singletonList("test_topic"));// 4. 持续轮询消息while (true) {// poll()方法获取消息,参数是等待时间(避免CPU空转)// 返回一批记录(可能包含0到多条消息)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));// 5. 处理收到的每条消息for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d%n",record.key(), record.value(), record.partition(), record.offset());}// 6. 手动同步提交offset(确保消息被成功处理后再提交)// 注意:生产环境应考虑错误处理和异步提交(commitAsync)consumer.commitSync(); }// 实际应用中应该添加关闭逻辑(如通过ShutdownHook)// consumer.close();}
}
上述 Java 程序分别实现了消息的生产与消费,通过配置 Producer 和 Consumer 的参数,可灵活控制消息发送策略与消费行为。
相关文章:
云原生时代 Kafka 深度实践:02快速上手与环境搭建
2.1 本地开发环境搭建 单机模式安装 下载与解压:前往Apache Kafka 官网,下载最新稳定版本的 Kafka 二进制包(如kafka_2.13-3.6.0.tgz,其中2.13为 Scala 版本)。解压到本地目录,例如/opt/kafka:…...
Redis7 新增数据结构深度解析:ListPack 的革新与优化
Redis 作为高性能的键值存储系统,其核心优势之一在于丰富的数据结构。随着版本迭代,Redis 不断优化现有结构并引入新特性。在 Redis 7.0 中,ListPack 作为新一代序列化格式正式登场,替代了传统的 ZipList(压缩列表&…...
分布式爬虫架构设计
随着互联网数据的爆炸式增长,单机爬虫已经难以满足大规模数据采集的需求。分布式爬虫应运而生,它通过多节点协作,实现了数据采集的高效性和容错性。本文将深入探讨分布式爬虫的架构设计,包括常见的架构模式、关键技术组件、完整项…...

汽配快车道:助力汽车零部件行业的产业重构与数字化出海
汽配快车道:助力汽车零部件行业的数字化升级与出海解决方案。 在当今快速发展的汽车零部件市场中,随着消费者对汽车性能、安全和舒适性的要求不断提高,汽车刹车助力系统作为汽车安全的关键部件之一,其市场需求也在持续增长。汽车…...

Windows 11 家庭版 安装Docker教程
Windows 家庭版需要通过脚本手动安装 Hyper-V 一、前置检查 1、查看系统 快捷键【winR】,输入“control” 【控制面板】—>【系统和安全】—>【系统】 2、确认虚拟化 【任务管理器】—【性能】 二、安装Hyper-V 1、创建并运行安装脚本 在桌面新建一个 .…...

PyQt6基础_QtCharts绘制横向柱状图
前置: pip install PyQt6-Charts 结果: 代码: import sysfrom PyQt6.QtCharts import (QBarCategoryAxis, QBarSet, QChart,QChartView, QValueAxis,QHorizontalBarSeries) from PyQt6.QtCore import Qt,QSize from PyQt6.QtGui import QP…...

《TCP/IP 详解 卷1:协议》第2章:Internet 地址结构
基本的IP地址结构 分类寻址 早期Internet采用分类地址(Classful Addressing),将IPv4地址划分为五类: A类和B类网络号通常浪费太多主机号,而C类网络号不能为很多站点提供足够的主机号。 子网寻址 子网(Su…...
Python学习(5) ----- Python的JSON处理
下面是关于 Python 中如何全面处理 JSON 的详细说明,包括模块介绍、数据类型映射、常用函数、文件操作、异常处理、进阶技巧等。 🧩 一、什么是 JSON? JSON(JavaScript Object Notation)是一种轻量级的数据交换格式&a…...

如何通过一次需求评审,让项目效率提升50%?
想象一下,你的团队启动了一个新项目,但需求模糊不清,开发到一半才发现方向错了,返工、加班、客户投诉接踵而至……听起来像噩梦?一次完美的需求评审就能避免这一切!它就像项目的“导航仪”,确保…...

再见Notepad++,你好Notepad--
Notepad-- 是一款国产开源的轻量级、跨平台文本编辑器,支持 Window、Linux、macOS 以及国产 UOS、麒麟等操作系统。 除了具有常用编辑器的功能之外,Notepad-- 还内置了专业级的代码对比功能,支持文件、文件夹、二进制文件的比对,支…...

element-plus bug整理
1.el-table嵌入el-image标签预览时,显示错乱 解决:添加preview-teleported属性 <el-table-column label"等级图标" align"center" prop"icon" min-width"80"><template #default"scope"&g…...

技术-工程-管用养修保-智能硬件-智能软件五维黄金序位模型
融智学工程技术体系:五维协同架构 基于邹晓辉教授的框架,工程技术体系重构为:技术-工程-管用养修保-智能硬件-智能软件五维黄金序位模型: math \mathbb{E}_{\text{技}} \underbrace{\prod_{\text{Dis}} \text{TechnoCore}}_{\…...

LangChain-自定义Tool和Agent结合DeepSeek应用实例
除了调用LangChain内置工具外,也可以自定义工具 实例1: 自定义多个工具 from langchain.agents import initialize_agent, AgentType from langchain_community.agent_toolkits.load_tools import load_tools from langchain_core.tools import tool, …...

用 3D 可视化颠覆你的 JSON 数据体验
大家好,这里是架构资源栈!点击上方关注,添加“星标”,一起学习大厂前沿架构! 复杂的 JSON 数据结构常常让人头疼:层层嵌套的对象、错综复杂的数组关系,用传统的树状视图或表格一览千头万绪&…...
联想小新笔记本电脑静电问题导致无法开机/充电的解决方案
一、问题背景 近期部分用户反馈联想小新系列笔记本电脑在特定环境下(如秋冬干燥季节)出现无法开机或充电的问题。经分析,此类现象多由静电积累触发主板保护机制导致,少数情况可能与电源适配器、电池老化或环境因素相关。本文将从技…...

MVCC(多版本并发控制)机制
1. MVCC(多版本并发控制)机制 MVCC 的核心就是 Undo Log Read View,“MV”就是通过 Undo Log 来保存数据的历史版本,实现多版本的管理,“CC”是通过 Read View 来实现管理,通过 Read View 原则来决定数据是…...

Mac M1 安装 ffmpeg
1.前言 官网那货没有准备m系列的静态包,然后我呢,不知道怎么想的就从maven项目中的 javacv-platform,且版本为1.5.11依赖里面将这个静态包把了出来,亲测能用,感觉比那些网上说的用什么wget编译安装、brew安装快多了。…...

Spring框架学习day3--Spring数据访问层管理(IOC)
开发步骤 Spring 是个一站式框架:Spring 自身也提供了web层的 SpringWeb 和 持 久层的 SpringJdbcTemplate。 开发步骤 1.导入jar包 pom.xml <!-- spring-jdbc--> <dependency><groupId>org.springframework</groupId><artifactId>…...
什么是集群(Cluster)?如何保证集群的高可用性?
一、什么是Elasticsearch集群(Cluster)? 集群是指由一个或多个节点(Node)组成的集合,这些节点共同存储数据、处理请求,并协调工作以提供统一的搜索服务。一个集群有唯一的集群名称(默认名为elasticsearch),节点通过名称加入对应的集群。集群的核心目标是: 扩展存储…...
React从基础入门到高级实战:React 核心技术 - 动画与过渡效果:提升 UI 交互体验
React 动画与过渡效果:提升 UI 交互体验 在现代 Web 开发中,动画和过渡效果不仅仅是视觉上的点缀,它们在提升用户体验、引导用户注意力以及增强交互性方面扮演着重要角色。作为一款广受欢迎的前端框架,React 提供了多种实现动画的…...

重读《人件》Peopleware -(13)Ⅱ 办公环境 Ⅵ 电话
当你开始收集有关工作时间质量的数据时,你的注意力自然会集中在主要的干扰源之一——打进来的电话。一天内接15个电话并不罕见。虽然这看似平常,但由于重新沉浸所需的时间,它可能会耗尽你几乎一整天的时间。当一天结束时,你会纳闷…...
Free2AI:企业智能化转型的加速器
随着数字化与智能化的深度交融,企业的竞争舞台已悄然转变为数据处理能力和智能服务水平的竞技场。Free2AI以其三大核心功能——智能数据采集、多格式文档解析、智能FAQ构建,为企业铺设了一条从数据洞察到智能服务的全链路升级之路,成为推动企…...

Python训练营打卡Day40
DAY 40 训练和测试的规范写法 知识点回顾: 1.彩色和灰度图片测试和训练的规范写法:封装在函数中 2.展平操作:除第一个维度batchsize外全部展平 3.dropout操作:训练阶段随机丢弃神经元,测试阶段eval模式关闭dropout 作…...

制作一款打飞机游戏63:自动保存
1.编辑器的自动保存实现 目标:将自动保存功能扩展到所有编辑器,包括脑编辑器、模式编辑器、敌人编辑器和动画/精灵编辑器。实现方式: 代码复制:将关卡编辑器中的自动保存代码复制到其他编辑器中。标记数据变更&a…...
使用animation.css库快速实现CSS3旋转动画效果
CSS3旋转动画效果实现(使用Animate.css) 下面我将展示如何使用Animate.css库快速实现各种CSS3旋转动画效果,同时提供一个直观的演示界面。 思路分析 引入Animate.css库创建不同旋转动画的展示区域添加控制面板自定义动画效果实现实时预览功…...
基于NetWork的类FNAF游戏DEMO框架
脑洞大开 想做个fnaf1并加入自己的设计.. 开干!!!! #include <stdio.h> #include <iostream> #include <random> #include <ctime>bool leftdoor true, rightdoor true, camddoor true; float power 900,fanusepower 0;typedef struct movement…...
湖北理元理律师事务所:债务优化中的生活保障实践
在债务压力与生活质量失衡的普遍困境中,法律服务的价值不仅在于解决债务问题,更在于帮助债务人重建生活秩序。湖北理元理律师事务所通过其债务优化服务,探索出一条“法律生活”的双轨路径。 债务规划的核心矛盾:还款能力与生存需…...

golang连接sm3认证加密(app)
文章目录 环境文档用途详细信息 环境 系统平台:Linux x86-64 Red Hat Enterprise Linux 7 版本:4.5 文档用途 golang连接安全版sm3认证加密数据库,驱动程序详见附件。 详细信息 1.下载Linux golang安装包 go1.17.3.linux-amd64.tar.gz 1.1. 解压安…...

【Zephyr 系列 2】用 Zephyr 玩转 Arduino UNO / MEGA,实现串口通信与 CLI 命令交互
🎯 本篇目标 在 Ubuntu 下将 Zephyr 运行在 Arduino UNO / MEGA 上 打通串口通信,实现通过串口发送命令与反馈 使用 Zephyr Shell 模块,实现 CLI 命令处理 🪧 为什么 Arduino + Zephyr? 虽然 Arduino 开发板通常用于简单的 C/C++ 开发,但 Zephyr 的支持范围远超 STM32…...
AIS常见问题解答(AIS知识补充)
AIS常见问题解答 什么是 AIS? AIS 是“自动识别系统”的缩写。AIS 是一种基于甚高频 (VHF) 的导航和防撞工具,可以实现船舶之间的信息交换。这些信息(AIS 数据)还会被丹麦海事局运营的岸基 AIS 系统收集。因此,在提及 …...