【企业级分布式系统】 Kafka集群
文章目录
- Kafka
- Kafka 概述
- 使用消息队列的好处
- Kafka 的特性
- Kafka 系统架构
- Kafka 的应用场景
- Kafka 的优缺点
- Kafka 集群部署
- 下载安装包
- 安装 Kafka
- Kafka 命令行操作
- Kafka 架构深入
- Filebeat+Kafka+ELK 部署指南~
- 部署 Zookeeper+Kafka 集群
- 部署 Filebeat
- 部署 ELK(Logstash 配置)
- Kibana 配置与查看日志
Kafka
Kafka 概述
Kafka 是一个分布式、基于发布/订阅模式的消息队列系统,由 Linkedin 开发并贡献给 Apache 基金会,现已成为顶级开源项目。它主要应用于大数据领域的实时计算以及日志收集,具有高吞吐量、低延迟、可扩展性、持久性、可靠性、容错性和高并发的特性。
使用消息队列的好处
- 解耦:允许独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
- 可恢复性:系统的一部分组件失效时,不会影响到整个系统。
- 缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
- 灵活性 & 峰值处理能力:使用消息队列能够使关键组件顶住突发的访问压力。
- 异步通信:允许用户把一个消息放入队列,但并不立即处理它。
Kafka 的特性
- 高吞吐量、低延迟:每秒可以处理几十万条消息,延迟最低只有几毫秒。
- 可扩展性:Kafka 集群支持热扩展。
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
- 容错性:允许集群中节点失败。
- 高并发:支持数千个客户端同时读写。
Kafka 系统架构
- Broker:
- 一台 Kafka 服务器就是一个 broker。
- 一个集群由多个 broker 组成。
- 一个 broker 可以容纳多个 topic。
- Topic:
- 可以理解为一个队列,生产者和消费者面向的都是一个 topic。
- 类似于数据库的表名或者 ES 的 index。
- 物理上不同 topic 的消息分开存储。
- Partition:
- 为了实现扩展性,一个非常大的 topic 可以分布到多个 broker 上。
- 一个 topic 可以分割为一个或多个 partition,每个 partition 是一个有序的队列。
- Kafka 只保证 partition 内的记录是有序的。
- 数据路由规则:指定了 partition 则直接使用;未指定但指定 key,通过对 key 的 value 进行 hash 取模选出一个 partition;都未指定,使用轮询选出一个 partition。
- 每个 partition 中的数据使用多个 segment 文件存储。
- Replica:
- 副本机制,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 Kafka 仍然能够继续工作。
- 一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
- Leader:
- 当前负责数据的读写的 partition。
- Follower:
- 跟随 Leader,所有写请求都通过 Leader 路由。
- 数据变更会广播给所有 Follower,与 Leader 保持数据同步。
- 只负责备份,不负责数据的读写。
- 如果 Leader 故障,则从 Follower 中选举出一个新的 Leader。
- Producer:
- 数据的发布者,将消息 push 发布到 Kafka 的 topic 中。
- Consumer:
- 从 broker 中 pull 拉取数据。
- 可以消费多个 topic 中的数据。
- Consumer Group(CG):
- 由多个 consumer 组成。
- 所有的消费者都属于某个消费者组。
- 消费者组内每个消费者负责消费不同分区的数据,防止数据被重复读取。
- 消费者组之间互不影响。
- Offset 偏移量:
- 唯一标识一条消息。
- 决定读取数据的位置。
- 消费者通过偏移量来决定下次读取的消息。
- 消息被消费之后,并不被马上删除。
- 某一个业务也可以通过修改偏移量达到重新读取消息的目的。
- 消息默认生命周期为 1 周(7*24小时)。
- Zookeeper:
- 在 Kafka 中,ZooKeeper 负责维护 Kafka 集群的一些元数据和 leader 选举等协调工作。
- 元数据存储:存储主题、分区、Broker 节点等信息。
- Leader 选举:参与领导者选举的过程。
- 健康监控:进行集群的健康监控。
- 消费者组协调:协调和追踪消费者的位置信息。
Kafka 的应用场景
- 日志收集:Kafka 可以被用作日志收集系统,将各种应用的日志数据集中收集起来,方便后续的处理和分析。
- 实时计算:Kafka 可以作为实时计算系统的数据源,如 Spark Streaming、Flink 等,用于实时数据处理和分析。
- 消息通讯:Kafka 可以作为消息通讯系统,实现不同系统之间的数据交换和通信。
- 流量削峰:在高并发场景下,Kafka 可以作为流量削峰的工具,将大量的请求缓存到 Kafka 中,然后按照一定的速率进行处理,避免系统崩溃。
Kafka 的优缺点
优点:
- 高吞吐量、低延迟。
- 可扩展性强。
- 持久性、可靠性高。
- 支持多副本、容错性强。
- 社区活跃、生态丰富。
缺点:
- 依赖 Zookeeper,如果 Zookeeper 出现故障,会影响 Kafka 的正常运行。
- 数据一致性方面,虽然 Kafka 提供了多副本机制,但是在极端情况下,仍然可能存在数据丢失的风险。
- 消息顺序问题,如果生产者发送消息到多个分区,那么消费者消费时可能无法保证消息的顺序性。
Kafka 集群部署
下载安装包
- 官方下载地址:Apache Kafka 下载页面
- 步骤:
- 切换到
/opt
目录。 - 使用
wget
从清华大学镜像站下载 Kafka 2.7.1 版本。
- 切换到
cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz
安装 Kafka
- 步骤:
- 解压 Kafka 压缩包。
- 将解压后的目录移动到
/usr/local/kafka
。 - 备份并编辑
server.properties
文件,配置 Kafka。
cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafkacd /usr/local/kafka/config/
cp server.properties{,.bak}
vim server.properties
-
关键配置项:
broker.id
:每个 Kafka 实例的唯一标识,集群中每个实例的broker.id
必须不同。listeners
:指定 Kafka 监听的 IP 和端口。num.network.threads
和num.io.threads
:分别设置处理网络请求和磁盘 IO 的线程数。log.dirs
:Kafka 数据和日志的存放路径。zookeeper.connect
:指定 Zookeeper 集群的地址。
-
环境变量配置:
- 将 Kafka 的 bin 目录添加到 PATH 环境变量中。
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
- 配置启动脚本:
- 创建一个 Kafka 的启动脚本,并设置开机自启。
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi
;;
*)echo "Usage: $0 {start|stop|restart|status}"
esacchmod +x /etc/init.d/kafka
chkconfig --add kafka
service kafka start
Kafka 命令行操作
- 创建 topic:
kafka-topics.sh --create --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --replication-factor 2 --partitions 3 --topic test
- 查看 topic:
kafka-topics.sh --list --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181
kafka-topics.sh --describe --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --topic test
- 发布和消费消息:
# 生产者
kafka-console-producer.sh --broker-list 192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092 --topic test# 消费者
kafka-console-consumer.sh --bootstrap-server 192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092 --topic test --from-beginning
- 修改和删除 topic:
# 修改分区数
kafka-topics.sh --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --alter --topic test --partitions 6# 删除 topic
kafka-topics.sh --delete --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --topic test
Kafka 架构深入
-
工作流程及文件存储机制:
- Kafka 以 topic 对消息进行分类,producer 和 consumer 都是面向 topic 的。
- Topic 是逻辑概念,partition 是物理概念,每个 partition 对应一个 log 文件。
- 为防止 log 文件过大,Kafka 采用分片和索引机制,将每个 partition 分为多个 segment,每个 segment 包含
.index
和.log
文件。
-
数据可靠性保证:
- Kafka 通过 ack 应答机制保证数据可靠性,producer 发送数据后需要等待 broker 的确认。
-
数据一致性问题:
- LEO:每个副本的最大 offset。
- HW:消费者能见到的最大 offset,所有副本中最小的 LEO。
- Leader 和 follower 故障时的数据恢复和同步机制。
-
ack 应答机制:
- Kafka 提供了三种可靠性级别(acks=0, 1, -1),用户可以根据需求选择。
- 幂等性:在 0.11 版本及以后,Kafka 引入了幂等性特性,保证 producer 发送重复数据时,server 端只持久化一条。
注释:
- Kafka 的安装和配置需要根据集群的实际环境进行调整,特别是 IP 地址和端口号。
- 在生产环境中,通常需要配置更多的参数以优化性能和可靠性。
- Kafka 的数据可靠性和一致性机制是其核心特性之一,理解这些机制对于保证数据的安全性和一致性至关重要。
Filebeat+Kafka+ELK 部署指南~
部署 Zookeeper+Kafka 集群
- 目的:搭建消息队列系统,用于日志数据的传输。
- 步骤:
- 安装并配置 Zookeeper 集群。
- 安装并配置 Kafka 集群,指定 Zookeeper 集群地址。
- 启动 Zookeeper 和 Kafka 服务,确保集群正常运行。
部署 Filebeat
- 目的:收集服务器上的日志数据。
- 步骤:
- 下载并解压 Filebeat 到指定目录(如
/usr/local/filebeat
)。 - 编辑
filebeat.yml
配置文件:filebeat.prospectors: - type: logenabled: truepaths:- /var/log/httpd/access_logtags: ["access"]- type: logenabled: truepaths:- /var/log/httpd/error_logtags: ["error"]# 添加输出到 Kafka 的配置 output.kafka:enabled: truehosts: ["192.168.80.10:9092","192.168.80.11:9092","192.168.80.12:9092"]topic: "httpd"
- 启动 Filebeat,开始收集日志并发送到 Kafka。
- 下载并解压 Filebeat 到指定目录(如
部署 ELK(Logstash 配置)
- 目的:从 Kafka 拉取日志数据,并处理、存储到 Elasticsearch 中。
- 步骤:
- 在 Logstash 组件所在节点上,新建一个 Logstash 配置文件
kafka.conf
:input {kafka {bootstrap_servers => "192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092"topics => "httpd"type => "httpd_kafka"codec => "json"auto_offset_reset => "latest"decorate_events => true} }output {if "access" in [tags] {elasticsearch {hosts => ["192.168.80.30:9200"]index => "httpd_access-%{+YYYY.MM.dd}"}}if "error" in [tags] {elasticsearch {hosts => ["192.168.80.30:9200"]index => "httpd_error-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug } }
- 启动 Logstash,开始从 Kafka 拉取日志并存储到 Elasticsearch。
- 在 Logstash 组件所在节点上,新建一个 Logstash 配置文件
Kibana 配置与查看日志
- 目的:通过 Kibana 可视化界面查看日志数据。
- 步骤:
- 在浏览器中访问 Kibana(如
http://192.168.80.30:5601
)。 - 登录 Kibana(如果设置了登录认证)。
- 单击“Create Index Pattern”按钮,添加索引模式,例如
httpd_access-*
和httpd_error-*
(注意:这里应与 Logstash 配置中的 index 名称匹配,但原笔记中的filebeat_test-*
是不正确的)。 - 单击“create”按钮创建索引模式。
- 单击“Discover”按钮,可查看图表信息及日志信息。
- 在浏览器中访问 Kibana(如
注释:
- 在配置 Filebeat 和 Logstash 时,确保 Kafka 集群的地址和 topic 名称正确无误。
- Logstash 的
auto_offset_reset
参数决定了从 Kafka 拉取数据的起始位置,latest
表示从最新的数据开始拉取,earliest
表示从头开始拉取。 - Kibana 中的索引模式应与 Logstash 配置中的 index 名称一致,以便正确显示日志数据。
- 在实际部署中,还需要考虑安全性、性能优化等方面的问题。
相关文章:
【企业级分布式系统】 Kafka集群
文章目录 KafkaKafka 概述使用消息队列的好处 Kafka 的特性Kafka 系统架构Kafka 的应用场景Kafka 的优缺点 Kafka 集群部署下载安装包安装 KafkaKafka 命令行操作Kafka 架构深入 FilebeatKafkaELK 部署指南~部署 ZookeeperKafka 集群部署 Filebeat部署 ELK(Logstash…...
MySQL 中有哪几种锁?
在 MySQL 中,锁(Locks)是为了保证数据的一致性和完整性而设计的机制。常见的锁可以从粒度和操作类型两个角度分类。以下是详细介绍: 按 粒度 分类 1. 全局锁 描述:锁定整个数据库实例。用途:主要用于备份…...
kafka中节点如何服役和退役
节点服役(添加新节点) 1.准备新节点: 安装 Kafka 和相关依赖。 配置 Kafka Broker 的 server.properties 文件,确保 broker.id 是唯一的,并且配置正确的 zookeeper.connect 地址。 重启网卡 2.启动新节点ÿ…...

HTML5实现剪刀石头布小游戏(附源码)
文章目录 1.设计来源1.1 主界面1.2 皮肤风格1.2 游戏中界面 2.效果和源码源码下载万套模板,程序开发,在线开发,在线沟通 作者:xcLeigh 文章地址:https://blog.csdn.net/weixin_43151418/article/details/143798520 HTM…...

集群聊天服务器(3)muduo网络库
目录 基于muduo的客户端服务器编程 muduo只能装在linux中,依赖boost库 客户端并不需要高并发 基于muduo的客户端服务器编程 支持epoll线程池,muduo封装了线程池 而且还有完善的日志系统 使用muduo库代码非常固定,基本就只有chatserver的类名…...
解决在Ubuntu 20.04中使用PyCharm时无法输入中文的问题
解决在Ubuntu 20.04中使用PyCharm时无法输入中文的问题 要解决在Ubuntu 20.04中使用PyCharm时无法输入中文的问题,特别是当使用IBus作为输入法框架时,我们需要通过设置适当的环境变量来确保PyCharm可以正确调用IBus输入法。下面将详细说明原因及解决步骤…...
【jvm】HotSpot中方法区的演进
目录 1. 说明2. JDK1.6及以前3. JDK1.74. JDK1.8及以后 1. 说明 1.在HotSpot虚拟机中,方法区(Method Area)的演进是一个重要的内存管理优化过程。2.从JDK1.6到JDK1.8,HotSpot虚拟机中的方法区经历了从永久代到元空间的重大变化。…...

Win10/11 安装使用 Neo4j Community Edition
如果你下载的是 Neo4j Community Edition 的压缩包,意味着你需要手动解压并配置 Neo4j。以下是详细的使用步骤: 0. 下载压缩包 访问Neo4j官网,找到 Community Edition 版本并选择 4.x 或者 5.x 下载:https://neo4j.com/deployme…...

Ubuntu 22.04 上快速搭建 Samba 文件共享服务器
Samba 简介 Samba 是一个开源软件,它扮演着不同操作系统间沟通的桥梁。通过实现 SMB(Server Message Block)协议,Samba 让文件和打印服务在 Windows、Linux 和 macOS 之间自由流动。 以下是 Samba 的特点: 跨平台兼…...

JQuery 基础知识学习(详尽版)2024.11.17
一、jQuery简介及使用详解 1.1 jQuery简介 写更少的代码,做更多的事;jQuery可以做:HTML 元素选取 , HTML 元素操作 ,CSS 操作 ,HTML 事件函数 ,JavaScript 特效和动画 ,HTML DOM 遍…...
Spring Validation参数校验
Validation Validation是Spring提供的一个参数校验框架,使用预定义的注解完成参数校验 使用步骤 引入Spring Validation起步依赖在需要校验的参数所在的类上添加Validated注解在需要校验的参数前面加上Pattern注解 <!--参数校验依赖--><dependency>&l…...
高斯数据库Postgresql死锁和锁表解决方法
解决死锁进方法: 查询死锁进程列表 select * from pg_stat_activity where waiting‘t’ 发现有好几条挂起的记录,记录下所有或需要解锁的pid 解决死锁进程 select pg_cancel_backend(‘pid值’) 解决完后,刷新后测试,恢复正…...
【设计模式】模板方法模式 在java中的应用
设计模式: 设计模式是对软件设计中普遍存在(反复出现)的各种问题,所提出的解决方案。这个术语是由Erich Gamma等人在1995年的书《设计模式:可复用面向对象软件的基础》中首次引入的。设计模式可以加快开发过程&#x…...

PVE纵览-安装系统卡“Loading Driver”的快速解决方案
PVE纵览-安装系统卡“Loading Driver”的快速解决方案 文章目录 PVE纵览-安装系统卡“Loading Driver”的快速解决方案摘要通过引导参数解决PVE安装卡在“Loading Driver”问题官方解决方法 关键字: PVE、 显卡、 Loading、 Driver、 nomodeset 摘要 在虚拟机…...
Lua资料
Lua脚本语言 cheet sheet Lua & c Lua与C API交互全面解析 Lua语言:和C语言的交互 Lua进阶用法之Lua和C的接口设计 Lua C API 简介 C和Lua之间的相互调用 深入Lua:用户数据userdata 基本数据类型 之 UserData calling-lua-from-c/ Embedding Lua i…...

【C语言】值传递和地址传递
值传递 引用传递(传地址,传引用)的区别 传值,是把实参的值赋值给行参 ,那么对行参的修改,不会影响实参的值。 传地址,是传值的一种特殊方式,只是他传递的是地址,不是普通…...

PyTorch 中使用自动求导计算梯度
使用 PyTorch 进行自动求导和梯度计算 在 PyTorch 中,张量的 requires_grad 属性决定了是否需要计算该张量的梯度。设置为 True 的张量会在计算过程中记录操作,以便在调用 .backward() 方法时自动计算梯度。通过构建计算图,PyTorch 能够有效…...
Oracle Instant Client 23.5安装配置完整教程
Oracle Instant Client 23.5安装配置完整教程 简介环境要求安装步骤1. 准备工作目录2. 下载Oracle Instant Client3. 解压Instant Client4. 安装依赖包5. 配置系统环境5.1 配置库文件路径5.2 配置环境变量 6. 配置Oracle钱包(可选) 验证安装常见问题解决…...
【jvm】方法区的理解
目录 1. 说明2. 方法区的演进3. 内部结构4. 作用5.内存管理 1. 说明 1.方法区用于存储已被虚拟机加载的类信息、常量、静态变量、即时编译器编译后的代码缓存等数据。它是各个线程共享的内存区域。2.尽管《Java虚拟机规范》中把方法区描述为堆的一个逻辑部分,但它却…...

ES-针对某个字段去重后-获取某个字段值的所有值
针对上面表的数据,现在想根据age分组,并获取每个分组后的name有哪些(去重后)。 select age, GROUP_CONCAT(DISTINCT(name)) from testtable group by age ; 结果: 如果想要增加排序: SELECT age, GROUP_CONCAT(DISTINCT name)…...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...
基于服务器使用 apt 安装、配置 Nginx
🧾 一、查看可安装的 Nginx 版本 首先,你可以运行以下命令查看可用版本: apt-cache madison nginx-core输出示例: nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...

聊聊 Pulsar:Producer 源码解析
一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者…...
ffmpeg(四):滤镜命令
FFmpeg 的滤镜命令是用于音视频处理中的强大工具,可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下: ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜: ffmpeg…...
什么是EULA和DPA
文章目录 EULA(End User License Agreement)DPA(Data Protection Agreement)一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA(End User License Agreement) 定义: EULA即…...
【生成模型】视频生成论文调研
工作清单 上游应用方向:控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...

Kafka入门-生产者
生产者 生产者发送流程: 延迟时间为0ms时,也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于:异步发送不需要等待结果,同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...
GitHub 趋势日报 (2025年06月06日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...
HTML前端开发:JavaScript 获取元素方法详解
作为前端开发者,高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法,分为两大系列: 一、getElementBy... 系列 传统方法,直接通过 DOM 接口访问,返回动态集合(元素变化会实时更新)。…...
pycharm 设置环境出错
pycharm 设置环境出错 pycharm 新建项目,设置虚拟环境,出错 pycharm 出错 Cannot open Local Failed to start [powershell.exe, -NoExit, -ExecutionPolicy, Bypass, -File, C:\Program Files\JetBrains\PyCharm 2024.1.3\plugins\terminal\shell-int…...