当前位置: 首页 > news >正文

【大数据技术基础 | 实验十四】Kafka实验:订阅推送示例

在这里插入图片描述

文章目录

    • 一、实验目的
    • 二、实验要求
    • 三、实验原理
      • (一)Kafka简介
      • (二)Kafka使用场景
    • 四、实验环境
    • 五、实验内容和步骤
      • (一)配置各服务器之间的免密登录
      • (二)安装ZooKeeper集群
      • (三)安装Kafka集群
      • (四)验证消息推送
    • 六、实验结果
    • 七、实验心得


一、实验目的

  1. 掌握Kafka的安装部署
  2. 掌握Kafka的topic创建及如何生成消息和消费消息
  3. 掌握Kafka和Zookeeper之间的关系
  4. 了解Kafka如何保存数据及加深对Kafka相关概念的理解

二、实验要求

在两台机器上(以slave1,slave2为例),分别部署一个broker,Zookeeper使用的是单独的集群,然后创建一个topic,启动模拟的生产者和消费者脚本,在生产者端向topic里写数据,在消费者端观察读取到的数据。

三、实验原理

(一)Kafka简介

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。如图下所示:

在这里插入图片描述

一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性。

基于replicated方案,那么就意味着需要对多个备份进行调度;每个partition都有一个server为“leader”;leader负责所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可……由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个“leader”,kafka会将“leader”均衡的分散在每个实例上,来确保整体的性能稳定。

生产者:Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于“round-robin”方式或者通过其他的一些算法等。

消费者:本质上kafka只支持Topic,每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer。发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费。

如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡。

如果所有的consumer都具有不同的group,那这就是“发布-订阅”;消息将会广播给所有的消费者。

在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个“订阅”者,一个Topic中的每个partions,只会被一个“订阅者”中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息。kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的。事实上,从Topic角度来说,消息仍不是有序的。

kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。

Guarantees
(1)发送到partitions中的消息将会按照它接收的顺序追加到日志中。
(2)对于消费者而言,它们消费消息的顺序和日志中消息顺序一致。
(3)如果Topic的“replicationfactor”为N,那么允许N-1个kafka实例失效。

(二)Kafka使用场景

1. Messaging

对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势。不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的“事务性”、“消息传输担保(消息确认机制)”、“消息分组”等企业级特性;kafka只能使用作为“常规”的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如:消息重发,消息发送丢失等)。

2. Websit activity tracking

kafka可以作为“网站活性跟踪”的最佳工具;可以将网页/用户操作等信息发送到kafka中。并实时监控,或者离线统计分析等。

3. Log Aggregation

kafka的特性决定它非常适合作为“日志收集中心”,application可以将操作日志“批量”“异步”的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支。此时consumer端可以使hadoop等其他系统化的存储和分析系统。

四、实验环境

  • 云创大数据实验平台:
    在这里插入图片描述
  • Java 版本:jdk1.7.0_79
  • Hadoop 版本:hadoop-2.7.1
  • ZooKeeper 版本:zookeeper-3.4.6
  • Kafka 版本:kafka_2.10-0.9.0.1

五、实验内容和步骤

(一)配置各服务器之间的免密登录

首先配置master,slave1和slave2之间的免密登录和各虚拟机的/etc/hosts文件,具体步骤参考:【大数据技术基础 | 实验一】配置SSH免密登录

(二)安装ZooKeeper集群

配置完免密登录之后我们还需要安装Zookeeper集群,具体步骤参考:【大数据技术基础 | 实验五】ZooKeeper实验:部署ZooKeeper

(三)安装Kafka集群

首先我们将Kafka安装包解压到slave1的/usr/cstor目录:

tar -zxvf kafka_2.10-0.9.0.1.tar.gz -c /usr/cstor

并将kafka目录所属用户改成root:root

chown -R root:root /usr/cstor/kafka

然后将kafka目录传到其他机器上:

scp -r /usr/cstor/kafka hadoop@slave2:/usr/cstor

两台机器上分别进入解压目录下,在config目录修改server.properties文件:

cd /usr/cstor/kafka/config/
vim server.properties

然后修改其中的内容,首先是slave1配置:

#broker.id
broker.id=1
#broker.port
port=9092
#host.name
host.name=slave1
#本地日志文件位置
log.dirs=/usr/cstor/kafka/logs
#Zookeeper地址
zookeeper.connect=slave1:2181,slave2:2181,master:2181

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

然后修改slave2的配置:

#broker.id
broker.id=2
#broker.port
port=9092
#host.name
host.name=slave2
#本地日志文件位置
log.dirs=/usr/cstor/kafka/logs
#Zookeeper地址
zookeeper.connect=slave1:2181,slave2:2181,master:2181

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

然后,启动Kafka,并验证Kafka功能,进入安装目录下的bin目录,两台机器上分别执行以下命令启动各自的Kafka服务:

cd /usr/cstor/kafka/bin
nohup ./kafka-server-start.sh ../config/server.properties &

在任意一台机器上,执行以下命令(以下三行命令不要换行,是一整行)创建topic:

./kafka-topics.sh --create \
--zookeeper slave1:2181,slave2:2181,master:2181 \
--replication-factor 2 --partitions 2 --topic test

在这里插入图片描述

在任意一台机器上(这里我选择的是slave1),执行以下命令(以下三行命令不要换行,是一整行)启动模拟producer:

./kafka-console-producer.sh \
--broker-list slave1:9092,slave2:9092,master:9092 \
--topic test

在另一台机器上(slave2),执行以下命令(以下三行命令不要换行,是一整行)启动模拟consumer:

./kafka-console-consumer.sh \
--zookeeper slave1:2181,slave2:2181,master:2181 \
--topic test --from-beginning

(四)验证消息推送

我们在producer端输入任意信息,然后观察consumer端接收到的数据:

This is Kafka producer
Hello, Kafka

在slave1上输入信息:

在这里插入图片描述

然后slave2上也收到了信息:

在这里插入图片描述

六、实验结果

我们在producer端输入任意信息,然后观察consumer端接收到的数据:

This is Kafka producer
Hello, Kafka

在slave1上输入信息:

在这里插入图片描述

然后slave2上也收到了信息:

在这里插入图片描述

七、实验心得

  通过本次Kafka实验,我深入理解了分布式消息队列的核心概念及其实现方式。Kafka作为一种高吞吐量、低延迟的分布式发布订阅消息系统,其设计思想和实现细节让我受益匪浅。实验从Kafka与Zookeeper的安装部署入手,通过配置两个broker的Kafka集群,帮助我掌握了Kafka集群的基本搭建过程。同时,通过配置文件的修改,我更加清晰地认识到Kafka集群中broker.idzookeeper.connectlog.dirs等配置项的作用,为后续的生产环境部署打下了基础。

  实验中的生产者和消费者模拟验证让我直观地感受到了Kafka的高效数据处理能力。在生产者端输入消息后,消费者端能够实时接收到消息,这充分展示了Kafka在消息传递中的低延迟特点。此外,通过创建带有多个分区和副本的Topic,我理解了Kafka的分区机制及其在分布式环境中保证数据高可用性的策略。分区的Leader和Follower模型也让我体会到Kafka在负载均衡和容错性上的精巧设计,尤其是当Leader失效后,Follower能够及时接管,确保服务的稳定运行。

  与此同时,我也意识到Kafka在实际应用中并非完美。例如,Kafka虽然具有一定的容错能力,但对于数据的绝对可靠性保证(如消息丢失或重复发送)还有一定的局限性。这让我认识到,在实际项目中,需根据具体场景搭配其他机制来保证消息传递的可靠性和一致性。

  总之,本次实验帮助我从理论走向实践,不仅熟悉了Kafka的基本操作,还加深了对其内部工作原理的理解。在未来的学习和工作中,我希望能够进一步探索Kafka在日志收集、实时数据流处理等场景中的深度应用,为分布式系统的设计与优化积累更多经验。

:以上文中的数据文件及相关资源下载地址:
链接:https://pan.quark.cn/s/8f386ae8b871
提取码:EPKB

相关文章:

【大数据技术基础 | 实验十四】Kafka实验:订阅推送示例

文章目录 一、实验目的二、实验要求三、实验原理(一)Kafka简介(二)Kafka使用场景 四、实验环境五、实验内容和步骤(一)配置各服务器之间的免密登录(二)安装ZooKeeper集群&#xff08…...

SpringAi整合大模型(进阶版)

进阶版是在基础的对话版之上进行新增功能。 如果还没弄出基础版的,请参考 https://blog.csdn.net/weixin_54925172/article/details/144143523?sharetypeblogdetail&sharerId144143523&sharereferPC&sharesourceweixin_54925172&spm1011.2480.30…...

为什么爱用低秩矩阵

目录 为什么爱用低秩矩阵 一、定义与性质 二、区别与例子 为什么爱用低秩矩阵 我们更多地提及低秩分解而非满秩分解,主要是因为低秩分解在数据压缩、噪声去除、模型简化和特征提取等方面具有显著的优势。而满秩分解虽然能够保持数据的完整性,但在实际应用中的场景较为有限…...

React 自定义钩子:useOnlineStatus

我们今天的重点是 “useOnlineStatus” 钩子,这是 React 自定义钩子集合中众多精心制作的钩子之一。 Github 的:https://github.com/sergeyleschev/react-custom-hooks import { useState } from "react" import useEventListener from &quo…...

uniapp 小程序 监听全局路由跳转 获取路由参数

uniapp 小程序 监听全局路由跳转 获取路由参数 app.vue中 api文档 onLaunch: function(options) {let that this;let event [navigateTo, redirectTo, switchTab, navigateBack];event.forEach(item > {uni.addInterceptor(item, { //监听跳转//监听跳转success(e) {tha…...

12.02 深度学习-卷积

# 卷积 是用于图像处理 能够保存图像的一些特征 卷积层 如果用全连接神经网络处理图像 计算价格太大了 图像也被转为线性的对象导致失去了图像的空间特征 只有在卷积神经网络cnn的最后一层使用全连接神经网络 # 图像处理的三大任务 # 目标检测 对图像中的目标进行框出来 # 图…...

MySQL 主从同步一致性详解

MySQL主从同步是一种数据复制技术,它允许数据从一个数据库服务器(主服务器)自动同步到一个或多个数据库服务器(从服务器)。这种技术主要用于实现读写分离、提升数据库性能、容灾恢复以及数据冗余备份等目的。下面将详细…...

Spring源码导入idea时gradle构建慢问题

当我们将spring源码导入到idea进行构建的时候,spring采用的是gradle进行构建,默认下注在依赖是从https://repo.maven.apache.org会特别慢,需要改为国内的镜像地址会加快速度。 将项目中build.gradle配置进行调整: repositories …...

Dockerfile 安装echarts插件给java提供服务

java调用echarts插件,生成图片保存到磁盘然后插入到pptx中报表。 Dockerfile文件内容: #基础镜像,如果本地仓库没有,会从远程仓库拉取 openjdk:8 FROM docker.io/centos:centos7 #暴露端口 EXPOSE 9311 # 避免centos 日志输出 …...

Springboot小知识(1):启动类与配置

一、启动类(引导类) 在通常情况下,你创建的Spring应用项目都会为你自动生成一个启动类,它是这个应用的起点。 在Spring Boot中,引导类(也称为启动类,通常是main方法所在的类)是整个…...

[CISCN 2019华东南]Web11

[CISCN 2019华东南]Web11 给了两个链接但是都无法访问 这里我们直接抓包试一下 我们插入X-Forwarded-For:127.0.0.1 发现可以修改了右上角的IP地址,从而可以进行注入 {$smarty.version} 查看版本号 if标签执行PHP命令 {if phpinfo()}{/if} 查看协议 {if system(…...

Cypress内存溢出奔溃问题汇总

内存溢出报错信息 <--- Last few GCs ---> [196:0xe58001bc000] 683925 ms: Scavenge 1870.7 (1969.9) -> 1865.6 (1969.9) MB, 6.07 / 0.00 ms (average mu 0.359, current mu 0.444) task; [196:0xe58001bc000] 683999 ms: Scavenge 1872.4 (1969.9) -> 1867.1…...

树莓派4B--OpenCV安装踩坑

报错&#xff1a; Source directory: /tmp/pip-install-pv7l9r25/opencv-python_08fdf5a130a5429f89b0e0eaab39a329 Working directory: /tmp/pip-install-pv7l9r25/opencv-python_08fdf5a130a5429f89b0e0eaab39a329/_skbuild/linux-armv7l-3.7/cmake-build Please check the i…...

电子电气架构 --- 面向服务的汽车诊断架构

我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 所有人的看法和评价都是暂时的,只有自己的经历是伴随一生的,几乎所有的担忧和畏惧,都是来源于自己的想象,只有你真的去做了,才会发现有多快乐。…...

Pytest --capture 参数详解:如何控制测试执行过程中的输出行为

--capture 选项用于控制测试用例执行过程中标准输出&#xff08;stdout&#xff09;和标准错误输出&#xff08;stderr&#xff09;的捕获行为。 --capture 的选项值&#xff1a; fd&#xff08;默认&#xff09; 捕获文件描述符级别的输出&#xff08;stdout 和 stderr&#x…...

IS-IS的原理

IS-IS的基本概念&#xff1a; 概述&#xff1a; IS-IS&#xff0c;中间系统到中间系统&#xff0c;是ISO国际标准化组织为它的无连接网络协议设计的一种动态路由协议 IS-IS支持CLNP网络和IP网络&#xff0c;采用数据链路层封装&#xff0c;区别于ospf只支持IP网络&#xff0…...

C++(4个类型转换)

1. C语言中的类型转换 1. 隐式 类型转换&#xff1a; 具有相近的类型才能进行互相转换&#xff0c;如&#xff1a;int,char,double都表示数值。 2. 强制类型转换&#xff1a;能隐式类型转换就能强制类型转换&#xff0c;隐式类型之间的转换类型强相关&#xff0c;强制类型转换…...

Ubuntu20.04安装NVIDIA显卡驱动

Ubuntu20.04安装NVIDIA显卡驱动 参考资料&#xff1a;https://blog.csdn.net/weixin_39244242/article/details/136282614?fromshareblogdetail&sharetypeblogdetail&sharerId136282614&sharereferPC&sharesourceqq_37397652&sharefromfrom_link 成功配置…...

速盾:介绍一下高防cdn的缓存响应事什么功能?

高防CDN&#xff08;Content Delivery Network&#xff09;是一种基于分布式缓存技术的网络加速服务&#xff0c;能够提供强大的缓存响应功能。它的缓存响应功能主要包括缓存加速和智能缓存两个方面。 首先&#xff0c;高防CDN的缓存加速功能是指通过在全球范围内部署大量的缓…...

Nuclei-快速漏洞扫描器

Nuclei-快速漏洞扫描器 声明 学习内容来自 B 站UP主泷羽sec&#xff0c;如涉及侵权马上删除文章。 笔记的只是方便各位师傅学习知识&#xff0c;以下网站只涉及学习内容&#xff0c;其他的都与本人无关&#xff0c;切莫逾越法律红线&#xff0c;否则后果自负。 ✍&#x1f3f…...

挑战杯推荐项目

“人工智能”创意赛 - 智能艺术创作助手&#xff1a;借助大模型技术&#xff0c;开发能根据用户输入的主题、风格等要求&#xff0c;生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用&#xff0c;帮助艺术家和创意爱好者激发创意、提高创作效率。 ​ - 个性化梦境…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

【力扣数据库知识手册笔记】索引

索引 索引的优缺点 优点1. 通过创建唯一性索引&#xff0c;可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度&#xff08;创建索引的主要原因&#xff09;。3. 可以加速表和表之间的连接&#xff0c;实现数据的参考完整性。4. 可以在查询过程中&#xff0c;…...

关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案

问题描述&#xff1a;iview使用table 中type: "index",分页之后 &#xff0c;索引还是从1开始&#xff0c;试过绑定后台返回数据的id, 这种方法可行&#xff0c;就是后台返回数据的每个页面id都不完全是按照从1开始的升序&#xff0c;因此百度了下&#xff0c;找到了…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

Mac软件卸载指南,简单易懂!

刚和Adobe分手&#xff0c;它却总在Library里给你写"回忆录"&#xff1f;卸载的Final Cut Pro像电子幽灵般阴魂不散&#xff1f;总是会有残留文件&#xff0c;别慌&#xff01;这份Mac软件卸载指南&#xff0c;将用最硬核的方式教你"数字分手术"&#xff0…...

根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:

根据万维钢精英日课6的内容&#xff0c;使用AI&#xff08;2025&#xff09;可以参考以下方法&#xff1a; 四个洞见 模型已经比人聪明&#xff1a;以ChatGPT o3为代表的AI非常强大&#xff0c;能运用高级理论解释道理、引用最新学术论文&#xff0c;生成对顶尖科学家都有用的…...

.Net Framework 4/C# 关键字(非常用,持续更新...)

一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...

Python 包管理器 uv 介绍

Python 包管理器 uv 全面介绍 uv 是由 Astral&#xff08;热门工具 Ruff 的开发者&#xff09;推出的下一代高性能 Python 包管理器和构建工具&#xff0c;用 Rust 编写。它旨在解决传统工具&#xff08;如 pip、virtualenv、pip-tools&#xff09;的性能瓶颈&#xff0c;同时…...

【分享】推荐一些办公小工具

1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由&#xff1a;大部分的转换软件需要收费&#xff0c;要么功能不齐全&#xff0c;而开会员又用不了几次浪费钱&#xff0c;借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...