Kafka 快速实战及基本原理详解解析-01
一、Kafka 介绍
1. MQ 的作用
消息队列(Message Queue,简称 MQ)是一种用于跨进程通信的技术,核心功能是通过异步消息的方式实现系统之间的解耦。它在现代分布式系统中有着广泛的应用,主要作用体现在以下三个方面:
异步处理
在传统的同步调用中,生产者和消费者需要同时在线,并且生产者在完成任务后才能继续执行其他工作。这种模式限制了系统的性能。而引入消息队列后,生产者可以将任务提交到队列中,消费者按需消费任务,从而提升系统的吞吐量。
- 示例:快递员送快递到客户家,效率低下。而菜鸟驿站的出现让快递员只需将包裹放置在驿站,客户可以根据自己的时间安排取件。这种方式大大提高了效率。
解耦
解耦是消息队列最重要的功能之一。服务之间通过消息队列传递数据,而不是直接调用对方的服务接口,这样可以有效降低系统的耦合度。
- 示例:《Thinking in JAVA》原书是英文版,但通过翻译社将内容翻译成多种语言,满足不同读者的需求。翻译社起到了桥梁作用,不同语言之间的沟通不再直接依赖于作者和读者。
削峰填谷
在高并发场景下,系统往往会遇到流量高峰,导致系统负载过重。通过消息队列,可以将流量暂存并按固定速率处理,从而避免系统崩溃。
- 示例:长江每年都会涨水,但通过三峡大坝的调节,下游的出水速度保持稳定,避免了洪水泛滥。
2. 为什么要用 Kafka
Kafka 是一种高吞吐量、低延迟、分布式的消息队列系统,适合在大规模数据处理场景中使用。以下是 Kafka 的典型使用场景和优势:
日志聚合场景
在大规模分布式系统中,各个服务都会产生大量的日志信息。传统的日志收集方式往往存在以下问题:
- 数据量大:需要快速收集和处理来自各个渠道的海量日志。
- 容错性要求高:集群中允许少量节点出现故障而不影响整体服务。
- 功能专注:Kafka 专注于高吞吐量、低延迟的消息传递,不追求复杂的消息处理功能。
核心优势
- 高吞吐量:Kafka 能够处理数百万 TPS(每秒事务处理量)。
- 低延迟:通常在毫秒级别的延迟时间内完成消息传递。
- 可扩展性:通过增加节点和分区数量,可以线性扩展处理能力。
- 容错性:通过副本机制保证消息的高可用性。
- 持久化:Kafka 使用磁盘存储消息,保证消息的持久性。
二、Kafka 快速上手
1. 实验环境准备
要快速上手 Kafka,首先需要搭建实验环境。以下是推荐的实验环境配置:
- 虚拟机数量:3 台
- 操作系统:CentOS 7
- Java 版本:Java 8
环境配置步骤
- 下载 Kafka 和 Zookeeper。
- 将 Kafka 解压到
/app/kafka
目录,将 Zookeeper 解压到/app/zookeeper
目录。 - 配置环境变量,确保系统能够识别 Kafka 和 Zookeeper 的命令。
- 关闭防火墙,以避免端口阻塞:
systemctl stop firewalld.service
2. 单机服务体验
为了更直观地理解 Kafka 的工作原理,我们可以先体验单机版 Kafka 服务。
步骤 1:启动 Zookeeper
Kafka 依赖 Zookeeper 进行元数据管理和选举机制。在实际部署中,通常使用独立的 Zookeeper 集群。
启动 Zookeeper 服务:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
检查 Zookeeper 是否正常启动:
jps
确认输出中有 QuorumPeerMain
进程。
步骤 2:启动 Kafka
启动 Kafka 服务前,需要确保 Zookeeper 服务正常运行。
启动 Kafka 服务:
nohup bin/kafka-server-start.sh config/server.properties &
确认 Kafka 是否正常启动:
jps
检查输出中是否包含 Kafka
进程。
步骤 3:创建和使用 Topic
Kafka 的基础工作机制是通过 Topic 进行消息的传递。
-
创建 Topic
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
-
发送消息 启动生产者端并发送消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > 这是一条测试消息
-
消费消息 启动消费者端并接收消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
3. 理解 Kafka 的消息传递机制
Kafka 的消息传递机制可以通过以下核心组件来理解:
- 生产者(Producer):将消息发送到指定的 Topic。
- 消费者(Consumer):从指定的 Topic 消费消息。
- Topic:逻辑概念,表示一类业务消息的集合。
- Partition:物理概念,实际存储消息的分区。
- Broker:Kafka 服务器实例,存储和管理 Partition。
Kafka 的设计目标是通过这些组件实现高效、可靠的消息传递,满足企业级数据管道的需求。
四、Kafka 集群服务
1. 为什么要使用集群
单机部署的 Kafka 在性能上虽然已经非常出色,但在实际生产环境中通常需要使用 Kafka 集群来进一步提升数据存储能力和系统的高可用性。集群可以解决以下问题:
1.1 解决海量数据存储问题
单个 Broker 服务器的存储能力有限,当数据量增长到一定程度时,单机难以承载。通过集群部署,可以将数据分散存储在多个 Broker 中,从而提升整体存储能力。
1.2 提高系统容错能力
单机环境中,如果 Broker 崩溃,所有数据都会丢失。而集群环境下,每个 Partition 都有多个副本,即使部分 Broker 节点宕机,系统依然可以正常运行,保证数据的高可用性。
五、理解服务端的 Topic、Partition 和 Broker
Kafka 的核心架构由 Topic、Partition 和 Broker 组成,这三者之间的关系至关重要:
- Topic:一个逻辑的消息分类,每个 Topic 包含多条消息。
- Partition:每个 Topic 可以分成多个 Partition,每个 Partition 是一个消息队列。
- Broker:Kafka 的服务器实例,负责存储 Partition 数据,并处理客户端请求。
5.1 创建分布式 Topic 示例
bin/kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 2 --partitions 4 --topic distributedTopic
5.2 查看 Topic 信息
bin/kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic distributedTopic
六、章节总结:Kafka 集群的整体结构
通过前面的学习,我们可以总结 Kafka 集群的整体结构:
- Topic 是逻辑概念,Producer 和 Consumer 通过 Topic 进行消息传递。
- Partition 是实际存储单元,保证数据分散存储和负载均衡。
- Broker 是 Kafka 的服务器实例,存储 Partition 数据并处理客户端请求。
- Zookeeper 管理 Kafka 集群的元数据和选举过程。
- Controller 是 Kafka 集群的核心管理节点,负责管理 Topic 和 Partition 的分配。
七、Spring Boot 实现 Kafka 消息有序性
为了保证 Kafka 的消息有序性,可以使用 Spring Boot 和 Kafka 的整合来实现。在 Java 的 Spring Boot 项目中,我们通过指定消息的 Key 和自定义分区器来确保消息发送到相同的 Partition,从而实现有序性。
7.1 依赖配置
在 Maven 项目中,引入 Kafka 的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
7.2 配置 KafkaProducer
创建 Kafka 的生产者配置类:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaProducerConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}
7.3 发送有序消息
创建一个消息发送服务,确保消息使用相同的 Key 发送到同一个 Partition:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private static final String TOPIC = "test_topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String key, String message) {kafkaTemplate.send(TOPIC, key, message);}
}
7.4 自定义分区器(可选)
如果有更复杂的分区逻辑,可以自定义分区器:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 自定义分区逻辑return Math.abs(key.hashCode()) % cluster.partitionCountForTopic(topic);}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
7.5 设置一个 Topic 对应一个 Partition 的方法
如果业务需求是保证某个 Topic 的消息全局有序,可以在创建 Topic 时将 Partition 数量设置为 1,从而保证所有消息存储在同一个 Partition 中,实现全局有序。
创建一个 Partition 的 Topic
bin/kafka-topics.sh --create --topic singlePartitionTopic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
在 Spring Boot 中发送消息到该 Topic
@Service
public class KafkaSinglePartitionProducerService {private static final String TOPIC = "singlePartitionTopic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send(TOPIC, message);}
}
通过这种方式,所有发送到 singlePartitionTopic
的消息都会进入同一个 Partition,确保消息顺序性。
相关文章:
Kafka 快速实战及基本原理详解解析-01
一、Kafka 介绍 1. MQ 的作用 消息队列(Message Queue,简称 MQ)是一种用于跨进程通信的技术,核心功能是通过异步消息的方式实现系统之间的解耦。它在现代分布式系统中有着广泛的应用,主要作用体现在以下三个方面&…...

wujie无界微前端框架初使用
先说一下项目需求:将单独的四套系统的登录操作统一放在一个入口页面进行登录,所有系统都使用的是vue3,(不要问我为啥会这样设计,产品说的客户要求) 1.主系统下载wujie 我全套都是vue3,所以直接…...
C++ 设计模式:职责链模式(Chain of Responsibility)
链接:C 设计模式 链接:C 设计模式 - 组合模式 链接:C 设计模式 - 迭代器模式 职责链模式(Chain of Responsibility Pattern)是一种行为型设计模式,它允许多个对象都有机会处理请求,从而避免请求…...

Yocto项目 - 详解PACKAGECONFIG机制
引言 Yocto项目是一个强大的嵌入式Linux开发工具,广泛应用于创建定制的嵌入式Linux发行版。在Yocto中,配置和定制化构建系统、软件包、以及生成适用于特定硬件的平台镜像是非常重要的。PACKAGECONFIG是Yocto项目中用于灵活启用或禁用软件包特性的强大工…...

Linux下部署ElasticSearch集群
Elasticsearch7.17.8集群的搭建 节点host名称节点ip节点部署内容k8s-m192.168.40.142主节点 数据节点k8s-w1192.168.40.141主节点 数据节点k8s-w2192.168.40.140数据节点 一、准备安装环境 1.下载安装包 官网 www.elastic.co 下载所有版本地址 点击跳转 下载elasticsearch-7.…...
超高分辨率 图像 分割处理
文章大纲 制造业半导体领域高分辨率图像半导体数据集开源的高分辨率晶圆图像数据集1. WM-811K数据集2. Kaggle上的WM-811K Clean Subset数据集医疗 病理领域高分辨率图像1. Camelyon+2. CAMELYON173. CPIA Dataset4. UCF-WSI-Dataset航拍 遥感中的高分辨率 图像航拍遥感领域高分…...

【含文档+PPT+源码】基于springboot的农贸菜市场租位管理系统的设计与实现
开题报告 本文旨在探讨基于SpringBoot框架构建的农贸菜市场租位管理系统的设计与实现。系统结合了现代化信息技术与农贸市场管理需求,为用户提供了注册登录、查看系统公告、分类搜索店铺、查看店铺详情、填写租赁信息、在线租赁、我的订单管理以及用户信息和密码修…...

信息科技伦理与道德1:绪论
1 问题描述 1.1 信息科技的进步给人类生活带来的是什么呢? 功能?智能?陪伴?乐趣?幸福? 基于GPT-3的对话Demo DeepFake 深伪技术:通过神经网络技术进行大样本学习,将个人的声音、面…...

Linux实验报告15-添加系统调用
目录 一:实验目的 二:实验内容 (1)查看系统内核版本 (2)安装内核版本源码 (3)修改注册表 (4)添加系统调用头文件 (5)实现系统调…...
logback之配置文件使用详解
目录 (一)配置文件的加载 (二)使用介绍 1、configuration:配置文件的跟元素 2、contextName:设置日志上下文名称 3、contextListener:设置上下文监听事件 4、property/variable/substituti…...

壁纸样机神器,这个工具适合专业设计师用吗?
壁纸样机神器在一定程度上适合专业设计师使用,但是否适合具体取决于设计师的需求和使用场景: 适合专业设计师的方面 快速实现设计想法:专业设计师在创作过程中,有时需要快速将设计想法变为可视化的效果图,以便进行初…...

MySQL秘籍之索引与查询优化实战指南
MySQL秘籍之索引与查询优化实战指南 目录 MySQL秘籍之索引与查询优化实战指南相关阅读索引相关EXPLAIN 版本 1. 初级篇1.1 【练体术】基础1.1.1 库操作1.1.1 表操作创建一个表增加表字段 1.1.2 增删改插入一条数据删除一条数据更新一条数据库 1.1.3 查询查询所有数据条件查询&a…...
【AI日记】25.01.03 kaggle 比赛 3-2 未来的命运
【AI论文解读】【AI知识点】【AI小项目】【AI战略思考】【AI日记】 工作 参加:kaggle 比赛 Forecasting Sticker Sales时间:8 小时 读书 书名:秦制两千年时间:1.5 小时评估:读完,非常不错,很…...

Linux(Centos 7.6)命令详解:ls
1.命令作用 列出目录内容(list directory contents) 2.命令语法 Usage: ls [OPTION]... [FILE]... 3.参数详解 OPTION: -l,long list 使用长列表格式-a,all 不忽略.开头的条目(打印所有条目,包括.开头的隐藏条目)…...

【Unity3D】UGUI Canvas画布渲染流程
参考文档:画布 - Unity 手册 Canvas组件:画布组件是进行 UI 布局和渲染的抽象空间。所有 UI 元素都必须是附加了画布组件的游戏对象的子对象。 参数: Render Mode 渲染模式:Screen Space - Overlay、Screen Spa…...
minikube安装k8s
一、安装k8s版本 export REGISTRY_MIRRORhttps://registry.cn-hangzhou.aliyuncs.com curl -sSL https://kuboard.cn/install-script/v1.30.x/install_kubelet.sh | sh -s 1.30.0 二、安装docker及minikube useradd docker passwd docker 密码也设置为docker #创建docker组…...

Docker图形化界面工具Portainer最佳实践
前言 安装Portainer 实践-基于Portainer安装redis-sentinel部署 Spring Boot集成Redis Sentinel 前言 本篇文章笔者推荐一个笔者最常用的docker图形化管理工具——Portainer。 安装Portainer 编写docker-compose文件 Portainer部署的步骤比较简单,我们还是以…...

借助 FinClip 跨端技术探索鸿蒙原生应用开发之旅
在当今数字化浪潮汹涌澎湃的时代,移动应用开发领域正经历着深刻的变革与创新。鸿蒙操作系统的崛起,以其独特的分布式架构和强大的性能表现,吸引了众多开发者的目光。而FinClip 跨端技术的出现,为开发者涉足鸿蒙原生应用开发提供了…...
【网络】ARP表、MAC表、路由表
ARP表 网络设备存储IP-MAC映射关系的表项,便于快速查找和转发数据包 ARP协议工作原理 ARP(Address Resolution Protocol),地址解析协议,能够将网络层的IP地址解析为数据链路层的MAC地址。 1.主机在自己的ARP缓冲区中建…...

Linux驱动开发学习准备(Linux内核源码添加到工程-Workspace)
Linux内核源码添加到VsCode工程 下载Linux-4.9.88源码: 没有处理同名文件的压缩包: https://pan.baidu.com/s/1yjIBXmxG9pwP0aOhW8VAVQ?pwde9cv 已把同名文件中以大写命名的文件加上_2后缀的压缩包: https://pan.baidu.com/s/1RIRRUllYFn2…...
vscode里如何用git
打开vs终端执行如下: 1 初始化 Git 仓库(如果尚未初始化) git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...
利用ngx_stream_return_module构建简易 TCP/UDP 响应网关
一、模块概述 ngx_stream_return_module 提供了一个极简的指令: return <value>;在收到客户端连接后,立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量(如 $time_iso8601、$remote_addr 等)&a…...
Python如何给视频添加音频和字幕
在Python中,给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加,包括必要的代码示例和详细解释。 环境准备 在开始之前,需要安装以下Python库:…...

Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...
#Uniapp篇:chrome调试unapp适配
chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器:Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...

基于TurtleBot3在Gazebo地图实现机器人远程控制
1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...

Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...

Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...

mac 安装homebrew (nvm 及git)
mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用: 方法一:使用 Homebrew 安装 Git(推荐) 步骤如下:打开终端(Terminal.app) 1.安装 Homebrew…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用
一、方案背景 在现代生产与生活场景中,如工厂高危作业区、医院手术室、公共场景等,人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式,存在效率低、覆盖面不足、判断主观性强等问题,难以满足对人员打手机行为精…...