消息队列 Kafka
Kafka
Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据实时处理领域
为什么使用消息队列MQ
在高并发环境下,同步请求来不及处理会发生堵塞,从而触发too many connection错误,引发雪崩效应。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多
我们使用消息队列,通过异步请求,缓解系统压力,消息队列经常应用于异步处理,流量削峰,应用解耦,消息通讯等场景
当前比较常见的 MQ 中间件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等
使用消息队列的好处
- 解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
- 可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂了,加入队列的消息仍然可以在系统恢复后被处理
- 缓冲
有助于控制和优化数据流结果系统的速度,解决生产消息和消费消息的处理速度不一致的情况
- 灵活性,峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见
如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费
使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃
- 异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它
想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们
消息队列的两种模式
- 点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
消息生产者生产消息发送到消息队列中,然后消息消费者从消息队列中取出并且消费消息
消息被消费以后,消息队列中不再有存储,所以消息消费者不可能消费到已经被消费的消息消息队列支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费
- 发布/订阅模式(一对多,又叫观察者模式,消费者消费数据之后不会清除消息)
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息和点对点方式不同,发布到 topic 的消息会被所有订阅者消费
发布/订阅模式是定义对象间一种一对多的依赖关系,使得每当一个对象(目对标象)的状态发生改变,则所有依赖于它的对象(观察者对象)都会得到通知并自动更新
Kafka 概述
基于 Zookeeper
Kafka 是最初由 Linkedin 公司开发,是一个分布式、支持分区的(partition)、多副本的(replicar 协调的分布式消息中间件系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于 hadoop 的批处理系统、低延迟的实时系统、Spark/Flink 流式处理引擎,nginx 访问日志,消息服务等等,用 scala 语言编写
Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目
Kafka 特性
- 高吞吐量、低延迟
Kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。每个 topic 可以分多个 Partition,Consumer Group 对 Partition 进行消费操作,提高负载均衡能力和消费能力。
- 可扩展性
kafka 集群支持热扩展
- 持久性、可靠性
消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性
允许集群中节点失败(多副本情况下,若副本数量为 n,则允许 n-1 个节点失败)
- 高并发
支持数千个客户端同时读写
Kafka 系统架构
- Broker 服务器
一台 kafka 服务器就是一个 broker
一个集群由多个 broker 组成,一个 broker 可以容纳多个 topic
- Topic 主题
可以理解为一个队列,生产者和消费者面向的都是一个 topic
类似于数据库的表名或者 ES 的 index
物理上不同 topic 的消息分开存储
- Partition 分区
为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上
一个 topic 可以分割为一个或多个 partition,每个 partition 是一个有序的队列
Kafka 只保证 partition 内的记录是有序的,而不保证 topic 中不同 partition 的顺序
每个 topic 至少有一个 partition,当生产者产生数据的时候,会根据分配策略选择分区
然后将消息追加到指定的分区的队列末尾
分区的原因
- 方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了
- 可以提高并发,因为可以以Partition为单位读写了
基础架构
1、Replica
副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失
且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本
一个 leader 和若干个 follower
2、Leader
每个 partition 有多个副本,其中有且仅有一个作为 Leader
Leader 是当前负责数据的读写的 partition
3、Follower
Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower
Follower 与 Leader 保持数据同步。Follower 只负责备份,不负责数据的读写。
如果 Leader 故障,则从 Follower 中选举出一个新的 Leader
当 Follower 挂掉、卡住或者同步太慢,Leader 会把这个 Follower 从 ISR
(Leader 维护的一个和 Leader 保持同步的 Follower 集合) 列表中删除,重新创建一个 Follower
4、producer
生产者即数据的发布者,该角色将消息 push 发布到 Kafka 的 topic 中。
broker 接收到生产者发送的消息后,broker 将该消息追加到当前用于追加数据的 segment 文件中
生产者发送的消息,存储到一个 partition 中,生产者也可以指定数据存储的 partition
5、Consumer
消费者可以从 broker 中 pull 拉取数据。消费者可以消费多个 topic 中的数据
6、Consumer Group(CG)
消费者组,由多个 consumer 组成
所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
可为每个消费者指定组名,若不指定组名则属于默认的组
将多个消费者集中到一起去处理某一个 Topic 的数据,可以更快的提高数据的消费能力。
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,
防止数据被重复读取,消费者组之间互不影响
7、offset 偏移量
可以唯一的标识一条消息
偏移量决定读取数据的位置,不会有线程安全的问题
消费者通过偏移量来决定下次读取的消息(即消费位置)
消息被消费之后,并不被马上删除,这样多个业务就可以重复使用 Kafka 的消息
某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制
消息最终是会还被删除的,默认生命周期为 1 周(7*24小时)
8、Zookeeper
Kafka 通过 Zookeeper 来存储集群的 meta 信息
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,
需要从故障前的位置继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,
以便故障恢复后继续消费
部署 Kafka 集群
安装 Kafka
//官方下载地址:http://kafka.apache.org/downloads.html
cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz//安装 Kafka
cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka//修改配置文件
cd /usr/local/kafka/config/
cp server.properties{,.bak}vim server.properties
修改 Kafka 配置文件

broker.id=0
#21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置
broker.id=1、broker.id=2listeners=PLAINTEXT://192.168.10.17:9092
#31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
num.network.threads=3 #42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
num.io.threads=8 #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
socket.send.buffer.bytes=102400 #48行,发送套接字的缓冲区大小
socket.receive.buffer.bytes=102400 #51行,接收套接字的缓冲区大小
socket.request.max.bytes=104857600 #54行,请求套接字的缓冲区大小
log.dirs=/usr/local/kafka/logs #60行,kafka运行日志存放的路径,也是数据存放的路径
num.partitions=1 #65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir=1 #69行,用来恢复和清理data下数据的线程数量log.retention.hours=168
#103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除log.segment.bytes=1073741824
#110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件zookeeper.connect=192.168.54.10:2181,192.168.154.20:2181,192.168.154.30:2181
#123行,配置连接Zookeeper集群地址
修改环境变量
//修改环境变量
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/binsource /etc/profile
配置 Zookeeper 启动脚本

//设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka//分别启动 Kafka
service kafka start
Kafka 命令行操作
创建topic


查看当前服务器中的所有 topic
kafka-topics.sh --list --zookeeper
192.168.154.10:2181,192.168.154.20:2181,192.168.154.30:2181
查看某个 topic 详情
kafka-topics.sh --describe --zookeeper
192.168.154.10:2181,192.168.154.20:2181,192.168.154.30:2181
发布消息
kafka-console-producer.sh --broker-list
192.168.154.10:2181,192.168.154.20:2181,192.168.154.30:2181 --topic test
消费消息
kafka-console-consumer.sh --bootstrap-server
192.168.154.10:2181,192.168.154.20:2181,192.168.154.30:2181
--topic test --from-beginning
--from-beginning:会把主题中以往所有的数据都读取出来
修改分区数
kafka-topics.sh --zookeeper
192.168.154.10:2181,192.168.154.20:2181,192.168.154.30:2181
--alter --topic test --partitions 6
删除 topic
kafka-topics.sh --delete --zookeeper
192.168.154.10:2181,192.168.154.20:2181,192.168.154.30:2181 --topic test
相关文章:
消息队列 Kafka
Kafka Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据实时处理领域 为什么使用消息队列MQ 在高并发环境下,同步请求来不及处理会发生堵塞,从而触发too many conne…...
抽象轻松的java-mybatis简单入门
第一步:用IDEA新建一个java包 第二步:在IDEA中添加数据库(ps:自己百度) 点击数据库 第二步,新建数据库 选择你使用的数据库 用户与密码根据自己的设置进行配置 为了更方便的查看数据库,可以像图…...
012-第二代硬件选型
第二代硬件选型 文章目录 第二代硬件选型项目介绍重新换平台缘由X86 && Arm 架构切换 ARM Linux 硬件选型系统确定Qt 版本确定总结一下 关键字: Qt、 Qml、 Arm、 X86、 linux 项目介绍 欢迎来到我们的 QML & C 项目!这个项目结合了 QM…...
Spring中的设计模式
目录 工厂模式 组合模式 适配器模式 代理模式 单例模式 观察者模式 模板方法模式 责任链模式 Spring有着非常优雅的设计,很多地方都遵循SOLID原则,里面的设计模式更是数不胜数大概有以下几种: 工厂模式 所谓的工厂模式,核…...
软考 系统架构设计师系列知识点之软件质量属性(1)
这个十一注定是一个不能放松、保持“紧”的十一。由于报名了全国计算机技术与软件专业技术资格(水平)考试,11月4号就要考试,因此8天长假绝不能荒废,必须要好好利用起来。现在将各个核心知识点一一进行提炼并做记录。 所…...
GPT系列论文解读:GPT-1
GPT系列 GPT(Generative Pre-trained Transformer)是一系列基于Transformer架构的预训练语言模型,由OpenAI开发。以下是GPT系列的主要模型: GPT:GPT-1是于2018年发布的第一个版本,它使用了12个Transformer…...
数学分析:含参变量的积分
同样很多收敛性的证明不是重点,但里面的知识还是需要适当掌握,知道中间的大致思考和解决路径即可。 本质还是极限的可交换性,求导可以换到积分里面去操作。 这里要注意变量的区别,首先积分的被积变量是x,但是函数的变量…...
关于一篇ElementUI之CUD+表单验证
目录 一.CUD增删改查简述 1.1.增删改功能实现 二.表单验证 前端所有代码: 好啦今天就分享到这了,希望能帮到你哦!!! 以下的代码基于我博客中的代码进行续写 : 关于ElementUI之动态树数据表格分页实例 一.CUD增删改…...
VUE模板编译的实现原理
前言 在Vue.js 2.0中,模板编译是通过将模板转换为渲染函数来实现的。渲染函数是一个函数,它返回虚拟DOM节点,用于渲染实际的DOM。Vue.js的模板编译过程可以分为以下几个步骤: 将模板解析为抽象语法树(AST)…...
基础算法之——【动态规划之路径问题】1
今天更新动态规划路径问题1,后续会继续更新其他有关动态规划的问题!动态规划的路径问题,顾名思义,就是和路径相关的问题。当然,我们是从最简单的找路径开始! 动态规划的使用方法: 1.确定状态并…...
三十三、【进阶】索引的分类
1、索引的分类 (1)总分类 主键索引、唯一索引、常规索引、全文索引 (2)InnoDB存储引擎中的索引分类 2、 索引的选取规则(InnoDB存储引擎) 如果存在主键,主键索引就是聚集索引; 如果不存在主键ÿ…...
VBox启动失败、Genymotion启动失败、Vagrant迁移
VBox启动失败、Genymotion启动失败、Vagrant迁移 2023.10.9 最新版本vbox7.0.10、Genymotion3.5.0 Vbox启动失败 1、查看日志 Error -610 in supR3HardenedMainInitRuntime! (enmWhat4) Failed to locate ‘vcruntime140.dll’ 日志信息查看方法->找到虚拟机所在位置->…...
一篇短小精悍的文章让你彻底明白KMP算法中next数组的原理
以后保持每日一更,由于兴趣较多,更新内容不限于数据结构,计算机组成原理,数论,拓扑学......,所谓:深度围绕职业发展,广度围绕兴趣爱好。往下看今日内容 一.什么是KMP算法 KMP&#x…...
CSS盒子定位的扩张
定位的扩展 绝对定位(固定定位)会完全压住盒子 浮动元素不会压住下面标准流的文字,而绝对定位或固定位会压住下面标准流的所有内容 如果一个盒子既有向左又有向右,则执行左,同理执行上 显示隐藏 display: none&…...
SpringBoot整合POI实现Excel文件读写操作
1.环境准备 1、导入sql脚本: create database if not exists springboot default charset utf8mb4;use springboot;create table if not exists user (id bigint(20) primary key auto_increment comment 主键id,username varchar(255) not null comment 用…...
从零开始的力扣刷题记录-第八十七天
力扣每日四题 129. 求根节点到叶节点数字之和-中等130. 被围绕的区域-中等437. 路径总和 III-中等376. 摆动序列-中等总结 129. 求根节点到叶节点数字之和-中等 题目描述: 给你一个二叉树的根节点 root ,树中每个节点都存放有一个 0 到 9 之间的数字。 …...
【1】c++设计模式——>UML类图的画法
UML介绍 UML:unified modeling language 统一建模语言 面向对象设计主要就是使用UML类图,类图用于描述系统中所包含的类以及他们之间的相互关系,帮助人们简化对系统的理解,他是系统分析和设计阶段的重要产物,也是系统编码和测试的…...
SAP UI5 指定 / 变更版本
SAP UI5 指定 / 变更版本 Currently, SAP Fiori tools support SAP Fiori elements and SAPUI5 freestyle projects with minimum SAPUI5 versions 1.65 or higher. In case there’s a need to test an existing projects with a lower SAPUI5 version, the following worka…...
SpringMVC中异常处理详解
单个控制器异常处理 // 添加ExceptionHandler,表示该方法是处理异常的方法,属性为处理的异常类ExceptionHandler({java.lang.NullPointerException.class,java.lang.ArithmeticException.class})public String exceptionHandle1(Exception ex, Model mo…...
PPT课件培训视频生成系统实现全自动化
前言 困扰全动自化的重要环节,AI语音合成功能,终于可以实现自动化流程,在此要感谢团队不懈的努力和韧性的精神! 实现原理 请参照我的文章《Craneoffice云PPT课件培训视频生成系统》 基本流程 演示视频 PPT全自动 总结 过去实…...
docker详细操作--未完待续
docker介绍 docker官网: Docker:加速容器应用程序开发 harbor官网:Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台,用于将应用程序及其依赖项(如库、运行时环…...
FastAPI 教程:从入门到实践
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,支持 Python 3.6。它基于标准 Python 类型提示,易于学习且功能强大。以下是一个完整的 FastAPI 入门教程,涵盖从环境搭建到创建并运行一个简单的…...
django filter 统计数量 按属性去重
在Django中,如果你想要根据某个属性对查询集进行去重并统计数量,你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求: 方法1:使用annotate()和Count 假设你有一个模型Item,并且你想…...
工程地质软件市场:发展现状、趋势与策略建议
一、引言 在工程建设领域,准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具,正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...
学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...
【论文阅读28】-CNN-BiLSTM-Attention-(2024)
本文把滑坡位移序列拆开、筛优质因子,再用 CNN-BiLSTM-Attention 来动态预测每个子序列,最后重构出总位移,预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵(S…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...
Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...
