三十八、大数据技术之Kafka(1)
🌻🌻 目录
- 一、Kafka 概述
- 1.1 定义
- 1.2 消息队列
- 1.2.1 消息队列内部实现原理
- 1.2.2 传统消息队列的应用场景
- 1.2.3 消息队列的两种模式
- 1.3 Kafka 基础架构
- 二、 Kafka 快速入门
- 2.1 安装前的准备
- 2.2 安装部署
- 2.2.1 集群规划
- 2.2.2 单节点或集群部署
- 2.2.3 集群启停脚本
- 2.3 Kafka命令行操作
- 2.3.1 主题命令行操作
- 2.3.2 生产者命令行操作
- 2.3.3 消费者命令行操作
- 2.3.4 生产者生产消费者消费
学习本文技术需要已经有如下的基础要求:
- 熟悉
Javase
基础- 熟悉
Linux
常用命令- 熟悉
ldea
开发工具
一、Kafka 概述
1.1 定义
Kafka是
由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
1.2 消息队列
- 目前企业中比较常见的消息队列产品主要有
Kafka
、ActiveMQ
、RabbitMQ
、RocketMQ
等。- 在大数据场景主要采用Kafka作为消息队列。
在JavaEE开发中主要采用ActiveMQ、RabbitMQ、RocketMQ 作为消息队列。
1.2.1 消息队列内部实现原理
消息队列(非kafka)内部实现原理
1.2.2 传统消息队列的应用场景
传统的消息队列的主要应用场景包括:
缓存/消峰
(消去峰值)、解耦和异步通信
。
- 消息队列的应用场景——
缓存/消峰
- 消息队列的应用场景——
解耦
- 消息队列的应用场景——
异步通信
1.2.3 消息队列的两种模式
1.3 Kafka 基础架构
- (1)
Producer
:消息生产者,就是向Kafka broker发消息的客户端。- (2)
Consumer
:消息消费者,向Kafka broker取消息的客户端。- (3)
Consumer Group(CG)
:消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。- (4)
Broker
:一台Kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。- (5)
Topic
:可以理解为一个队列,生产者和消费者面向的都是一个topic。- (6)
Partition
:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。- (7)
Replica
:副本。一个topic的每个分区都有若干个副本,一个Leader和若干个Follower。- (8)
Leader
:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。- (9)
Follower
:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。
抽象理解 (生产者消费者):
- 生产者消费者,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了。
- 再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”鸡蛋“又丢失了。
- 这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里。
上面的例子里面:
1.篮子就是kafka,鸡蛋其实就是数据流,系统之间的交互都是通过数据流来传输的(就是tcp、https什么的),也称为报文,或者消息。
2. 消息队列满了,其实就是篮子满了,鸡蛋放不下了,那赶紧多放几个篮子,其实就是kafka的扩容。
producer
:生产者,就是它来生产“鸡蛋”的。consumer
:消费者,生出的“鸡蛋”它来消费。broker
:就是篮子了,鸡蛋生产出来后放在篮子里。topic
:你把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,有的只吃草鸡蛋,有的吃洋鸡蛋,篮子中分为一个个小盒子,草鸡蛋放一个盒子里,洋鸡蛋放另一个盒子里。这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。
大家一定要学会抽象的去思考,上面只是属于业务的角度,如果从技术角度,
topic
标签实际就是队列,生产者把所有“鸡蛋(消息)”都放到对应的队列里了,消费者到指定的队列里取。
二、 Kafka 快速入门
2.1 安装前的准备
- 先搭建一台虚拟机,再克隆三台出来
- 步骤在这里(三台上面仅需安装jdk,hadoop(可不装),zookeeper(必须装))
2.2 安装部署
安装前的准备:
资源获取:
通过百度网盘分享的文件获取:
kafka软件大全 提取码:yyds
- 1.VMware的安装:VMware-workstation-full-15.5.0
- 2.镜像的安装:CentOS-7.5-x86_64-DVD-1804.iso
- 3.JDK的安装: jdk-8u212-linux-x64.tar.gz
- 4.Hadoop的安装: hadoop-3.1.3.tar.gz
2.2.1 集群规划
linux-102 | linux-103 | linux-104 |
---|---|---|
zk | zk | zk |
kafka | kafka | kafka |
2.2.2 单节点或集群部署
- 0)官方下载 获取
- 1)本地资源下载获取(上面网盘获取)
- 2)上传压缩包到服务器 /usr/local下面并且进行解压
tar -zxvf kafka_2.12-3.0.0.tgz
3)修改解压后的文件名称到当前目录下
mv kafka_2.12-3.0.0 kafka
4)进入到/usr/local/kafka/config
目录,修改配置文件 server.properties
如下③ 处地方,并保存。
vi server.properties
①
②
③
创建文件夹 datas
5)分发安装包(针对上述集群配置,注:分发后配置里面的这个需要依次改动 broker.id=0
,broker.id不得重复,整个集群中唯一。)
scp -r /usr/local/kafka root@linux-103:/usr/local/
6)配置环境变量
(1)在/etc/profile
文件中增加kafka环境变量配置
增加如下内容:
#kafka
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
(2)刷新一下环境变量。
(3)分发环境(只针对集群
)变量文件到其他节点,并source。
7)启动集群
(1)先启动Zookeeper集群,然后启动Kafka。
cd /usr/local/zookeeper/bin/./zkServer.sh start./zkServer.sh statuscd /usr/local/kafka/bin/./kafka-server-start.sh -daemon /usr/local/kafka/config/server.propertiesjps
(2)依次在linux-102、linux-103、linux-104节点上启动Kafka。(针对集群
)(如同上述启动)
注意:配置文件的路径要能够到server.properties。
8)关闭(集群)
./kafka-server-stop.sh
2.2.3 集群启停脚本
1)在/home/bin
目录下创建文件kf.sh
脚本文件(没有bin文件夹则创建文件夹)
脚本如下:
#! /bin/bashcase $1 in
"start"){#集群#for i in linux-102 linux-103 linux-104#单节点for i in linux-102 doecho " --------启动 $i Kafka-------"ssh $i "/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties"done
};;
"stop"){#集群#for i in linux-102 linux-103 linux-104#单节点for i in linux-102doecho " --------停止 $i Kafka-------"ssh $i "/usr/local/kafka/bin/kafka-server-stop.sh "done
};;
esac
2)添加执行权限
chmod +x kf.sh
3)启动集群命令
kf.sh start
4)停止集群命令
./kf.sh stop
注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。
2.3 Kafka命令行操作
2.3.1 主题命令行操作
1)查看操作主题命令参数
./kafka-topics.sh
参数 | 描述 |
---|---|
- -bootstrap-server <String: server toconnect to> | 连接的Kafka Broker主机名称和端口号。 |
- -topic <String: topic> | 操作的topic名称。 |
- - create | 创建主题。 |
- - delete | 删除主题。 |
- - alter | 修改主题。 |
- - list | 查看所有主题。 |
- - describe | 查看主题详细描述。 |
- - partitions <Integer: # of partitions> | 设置分区数。 |
- - replication-factor<Integer: replication factor> | 设置分区副本。 |
- - config <String: name=value> | 更新系统默认的配置。 |
2)查看当前服务器中的所有 topic(其中 9092
是kafka的默认端口)
./kafka-topics.sh --bootstrap-server linux-102:9092 --list
#一般是生产环境
./kafka-topics.sh --bootstrap-server linux-102:9092,linux-103:9092 --list
3)创建 first topic
./kafka-topics.sh --bootstrap-server linux-102:9092 --topic first --create --partitions 1 --replication-factor 1./kafka-topics.sh --bootstrap-server linux-102:9092 --list
选项说明:
--topic 定义topic名
--partitions 定义分区数
--replication-factor 定义副本数
4)查看first主题的详情
./kafka-topics.sh --bootstrap-server linux-102:9092 --describe --topic first
5)修改分区数(注意:分区数只能增加,不能减少)
6)再次查看first主题的详情
./kafka-topics.sh --bootstrap-server linux-102:9092 --alter --topic first --partitions 2./kafka-topics.sh --bootstrap-server linux-102:9092 --describe --topic first
7)删除 topic,删除后再次查看还有没有 topic
./kafka-topics.sh --bootstrap-server linux-102:9092 --delete --topic first./kafka-topics.sh --bootstrap-server linux-102:9092 --list
2.3.2 生产者命令行操作
操作:开两个窗口,一个作为生产者,一个作为消费者,当在生产者窗口命令输入的时候,消费者窗口就会自动输出(附截图最后)。
1)查看操作生产者命令参数
kafka-console-producer.sh
参数 | 描述 |
---|---|
- -bootstrap-server <String: server toconnect to> | 连接的Kafka Broker主机名称和端口号。 |
- -topic <String: topic> | 操作的topic名称。 |
2)发送消息
2.3.3 消费者命令行操作
1)查看操作消费者命令参数
./kafka-console-consumer.sh
参数 | 描述 |
---|---|
- -bootstrap-server <String: server toconnect to> | 连接的Kafka Broker主机名称和端口号。 |
- -topic <String: topic> | 操作的topic名称。 |
- -from-beginning | 从头开始消费。 |
- -group <String: consumer group id> | 指定消费者组名称。 |
2)消费消息
(1)消费first主题中的数据。
./kafka-console-consumer.sh --bootstrap-server linux-102:9092 --topic first
(2)把主题中所有的数据都读取出来(包括历史数据)。
./kafka-console-consumer.sh --bootstrap-server linux-102:9092 --from-beginning --topic first
2.3.4 生产者生产消费者消费
相关文章:

三十八、大数据技术之Kafka(1)
🌻🌻 目录 一、Kafka 概述1.1 定义1.2 消息队列1.2.1 消息队列内部实现原理1.2.2 传统消息队列的应用场景1.2.3 消息队列的两种模式 1.3 Kafka 基础架构 二、 Kafka 快速入门2.1 安装前的准备2.2 安装部署2.2.1 集群规划2.2.2 单节点或集群部署2.2.3 集群…...

将 Tcpdump 输出内容重定向到 Wireshark
在 Linux 系统中使用 Tcpdump 抓包后分析数据包不是很方便。 通常 Wireshark 比 tcpdump 更容易分析应用层协议。 一般的做法是在远程主机上先使用 tcpdump 抓取数据并写入文件,然后再将文件拷贝到本地工作站上用 Wireshark 分析。 还有一种更高效的方法…...
【Python蓝屏程序(管理员)】
说明:该程序为临摹(😀)作品,源地址C蓝屏程序(非管理员) 我试图使用Python调用 NtRaiseHardError API ,实现类似的蓝屏效果。可惜我发现Python在普通权限下,直接调用 NtRaiseHardError API 是不被允许的,因为…...
OpenGL ES->GLSurfaceView绘制图形的流程
自定义View代码 class MyGLSurfaceView(context: Context, attrs: AttributeSet) : GLSurfaceView(context, attrs), GLSurfaceView.Renderer {var mProgrem 0init {// 设置 OpenGL ES 3.0 版本setEGLContextClientVersion(3)// 设置当前类为渲染器, 注册回调接口的实现类set…...
Linux OOM Killer详解
Linux OOM Killer详解 一、概述二、OOM Killer的技术原理1. 内存区域划分2. 内存耗尽与OOM Killer触发3. 选择被杀进程的策略4. 内存回收机制5. 内存分配策略 三、OOM Killer的工作机制1. 内存压力监测2. 触发条件3. 选择被杀进程4. 终止进程 四、实际场景举例场景一࿱…...
2024rk(案例二)
试题二(25分) 阅读以下关于数据库缓存的叙述,在答题纸上回答问题1至问题3。 【说明】 某大型电商平台建立了一个在线 B2B 商店系统,并在全国多地建设了货物仓储中心,通过提前备货的方式来提高货物的运送效率。但是在运营过程中,发现会出现很多跨仓储中心调货从而延误货物…...

小红书爆文秘籍:ChatGPT助你从0到1创造热门内容!
在小红书打造爆款文案的策略中,以下是一些调整和同义词替换的建议,以便达到文章去重的要求: 了解目标受众: 在撰写文案前,先深入分析目标读者的属性,如年龄层次、性别、爱好和购买行为。通过ChatGPT, 你能迅…...

django快速实现个人博客(附源码)
文章目录 一、工程目录组织结构二、模型及管理实现1、模型2、admin管理 三、博客展现实现1、视图实现2、模板实现 四、部署及效果五、源代码 Django作为一款成熟的Python Web开发框架提供了丰富的内置功能,如ORM(对象关系映射)、Admin管理界面…...

K8s部署篇之手动部署二进制高可用集群架构
一、系统环境初始化 一)架构设计 所有节点都操作:3个master(etcd集群三个节点)和2个node 1、K8s服务调用如图 2、各组件说明 1、API Server 供Kubernetes API接口,主要处理 REST操作以及更新ETCD中的对象所有资源增删…...
【Unity/XLua】xlua自带教程示例分析(6)—— lua协程
文章目录 工具准备协程测试 工具准备 首先是工具脚本,一个Coroutine_Runner.cs和一个cs_coroutine.lua 前者定义了一个继承自Monobehavior的脚本组件,后者则使用lua去在Unity中实例化一个挂载该组件的GameObject,并将其设置为DontDestroyOn…...

CV目标检测概述
文章目录 目标检测概述目标检测图像分割目标检测和图像分割的区别 目标检测概述 目标检测和图像分割是计算机视觉中的两个重要任务,它们有着不同的目的和应用。以下是它们的简要介绍和区别: 目标检测 目标检测(Object Detection࿰…...

如何在notebook中运行nodejs
在 Python 生态系统的推动下,机器学习和人工智能日益流行,这带来了计算笔记本的概念。这些交互式计算平台主要是为以 Python 为中心的数据科学应用而开发的,它们将代码、计算输出、解释性文本和多媒体合并成一个有内聚力的文档。 作为 JavaS…...

Mybatis学习-day19
Mybatis学习-day19 1. resultMap resultMap 是 MyBatis 中最复杂的元素,主要用于解决实体类属性名与数据库表中字段名不一致的情况,可以将查询结果映射成实体对象。 <resultMap id"staffAndDep" type"com.easy.bean.Staff">…...

IDEA构建SpringBoot多模块项目
前言 最近一直在思考一个问题,springboot的多模块项目到底是怎么运行和运作的? 一般我们大部分的springboot项目都是单模块的项目,但是如果后续有要求开发多模块的项目应该怎么处理?于是基于这点进行了研究。 本次文章将会带大…...
【前端】NodeJS:nvm
文章目录 1 介绍2 使用2.1 下载安装2.2 常用命令 1 介绍 nvm全称:Node Version Manager,顾名思义它是用来管理node版本的工具,方便切换不同版本的Node.js。 2 使用 nvm的使用非常的简单,跟npm的使用方法类似。 2.1 下载安装 …...

Docker网络模式及通信
一、Docker默认的网络通信 1.1 Docker安装后默认的网络设置 Docker服务器安装完成之后,默认在每个宿主机会生成一个名称为docker0的网卡,其IP地址都是172.17.0.1/16 [rootubuntu1804 ~]#apt -y install bridge-utils [rootubuntu1804 ~]#brctl show 另…...
类模板实现实现Qt click/hover自定义操作
一、场景 常常会需要实现点击/hover时修改图片,可能是一个QPushButton、QLabel、QToolButton…… 由于Qt bug,QIcon/QSS只能实现常规态、按下态的图标切换,hover态的图片设置无效。 解决思路无非是安装事件过滤器、自定义类并重实现事件。 …...
Arco Design:引领未来的Vue 3创意先锋,一键开启高效与美感并重的Web开发之旅!
Arco Design 是一个基于 Vue 3 的 UI 框架,它提供了丰富的组件和样式,可以帮助开发者快速构建高质量的 Web 应用程序。以下是 Arco Design 的一些详细特点: 完整的设计系统:Arco Design 提供了一套完整的设计系统,包括…...

【MySQL】Linux下用C/C++链接MySQL数据库
文章目录 一、准备工作二、验证库和接口的使用三、链接数据库四、对数据库进行增删查改增删改查 五、结尾 一、准备工作 要使用C链接数据库, 首先要去MySQL官网下载官网提供的库, MySQL 社区下载. 如图所示: 接着选择: 按需选择版本: 如果用的是云服务器, 那么在安装mysql时…...
Python金融量化专栏简介
量化分析实战 - 专栏大纲 👉👉👉 《玩转Python金融量化专栏》👈👈👈 订阅本专栏的可以下载对应的代码和数据集 专栏目标 本专栏旨在帮助读者全面掌握使用Python进行金融技术指标的计算与应用,从基础到高级,涵盖各种技术指标的实现、策略开发与回测等内容。通过…...
基于算法竞赛的c++编程(28)结构体的进阶应用
结构体的嵌套与复杂数据组织 在C中,结构体可以嵌套使用,形成更复杂的数据结构。例如,可以通过嵌套结构体描述多层级数据关系: struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...

Docker 离线安装指南
参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性,不同版本的Docker对内核版本有不同要求。例如,Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本,Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...

从WWDC看苹果产品发展的规律
WWDC 是苹果公司一年一度面向全球开发者的盛会,其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具,对过去十年 WWDC 主题演讲内容进行了系统化分析,形成了这份…...

【入坑系列】TiDB 强制索引在不同库下不生效问题
文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

解读《网络安全法》最新修订,把握网络安全新趋势
《网络安全法》自2017年施行以来,在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂,网络攻击、数据泄露等事件频发,现行法律已难以完全适应新的风险挑战。 2025年3月28日,国家网信办会同相关部门起草了《网络安全…...

逻辑回归暴力训练预测金融欺诈
简述 「使用逻辑回归暴力预测金融欺诈,并不断增加特征维度持续测试」的做法,体现了一种逐步建模与迭代验证的实验思路,在金融欺诈检测中非常有价值,本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…...
前端中slice和splic的区别
1. slice slice 用于从数组中提取一部分元素,返回一个新的数组。 特点: 不修改原数组:slice 不会改变原数组,而是返回一个新的数组。提取数组的部分:slice 会根据指定的开始索引和结束索引提取数组的一部分。不包含…...
Modbus RTU与Modbus TCP详解指南
目录 1. Modbus协议基础 1.1 什么是Modbus? 1.2 Modbus协议历史 1.3 Modbus协议族 1.4 Modbus通信模型 🎭 主从架构 🔄 请求响应模式 2. Modbus RTU详解 2.1 RTU是什么? 2.2 RTU物理层 🔌 连接方式 ⚡ 通信参数 2.3 RTU数据帧格式 📦 帧结构详解 🔍…...

从物理机到云原生:全面解析计算虚拟化技术的演进与应用
前言:我的虚拟化技术探索之旅 我最早接触"虚拟机"的概念是从Java开始的——JVM(Java Virtual Machine)让"一次编写,到处运行"成为可能。这个软件层面的虚拟化让我着迷,但直到后来接触VMware和Doc…...