Kubernetes kafka系列 | k8s部署kafka+zookeepe集群
一、kafka.zookeeper介绍
Kafka
简介: Apache Kafka 是一个开源的分布式流处理平台和消息队列系统。它最初由LinkedIn开发,并于2011年成为Apache软件基金会的顶级项目。
特点:
高吞吐量: Kafka 能够处理大规模的消息流,并具有很高的吞吐量。
持久性: 它将消息持久化到磁盘上,因此即使消费者不在线,也能保证消息不会丢失。
可伸缩性: Kafka 可以很容易地水平扩展以处理大量数据。
实时性: Kafka 可以提供几乎实时的消息传递,适用于大多数实时数据处理需求。
用途:
日志收集: Kafka 可以用作集中式的日志收集系统,收集来自不同源头的日志数据。
消息队列: Kafka 可以用作分布式应用程序之间的消息队列,用于解耦和异步通信。
流处理: Kafka 可以与流处理框架(如Apache Spark、Apache Flink等)结合使用,用于实时数据处理和分析。
ZooKeeper
简介: Apache ZooKeeper 是一个开源的分布式协调服务,最初也是由Yahoo开发的,并于2010年成为Apache软件基金会的顶级项目。
特点:
分布式协调: ZooKeeper 提供了分布式应用程序的协调服务,包括配置管理、命名服务、分布式锁等。
高可用性: ZooKeeper 通过在集群中保持多个节点的复制来实现高可用性和容错性。
一致性: ZooKeeper 提供了严格的一致性,确保所有的客户端在同一时间看到相同的数据视图。
用途:
配置管理: ZooKeeper 可以用于分布式系统的配置管理,例如动态配置更新。
命名服务: ZooKeeper 可以提供命名服务,帮助分布式系统中的节点发现和通信。
分布式锁: ZooKeeper 可以用于实现分布式锁,确保在分布式系统中对共享资源的互斥访问。
Kafka 和 ZooKeeper 的关系
在 Kafka 中,ZooKeeper 主要用于管理集群的元数据(如主题、分区、副本分配等)、领导者选举以及生产者和消费者的协调。Kafka 依赖于 ZooKeeper 来确保分布式系统的稳定运行。通常情况下,Kafka 和 ZooKeeper 会一起部署,但它们是两个独立的项目,各自提供不同的功能。
二、创建存储卷
nfs动态供给直通车
三、搭建Kafka集群
# 操作系统
# CentOS Linux release 7.9.2009 (Core)
lsb_release -a# 内核版本
# 3.10.0-1160.90.1.el7.x86_64
uname -a
# k8s 版本 1.21
# zookeeper 版本 3.4.10 kafka镜像版本0.11(嫌低可以自己换)
kafka需要依赖zookeeper
kafka的生产者与消费者需要在zookeeper中注册,不然消费者怎么知道生产者是否存活之类的哈哈。废话不多说,直接上干货!
本文用的是statefulset和动态存储部署zookeeper和kafka集群。
zookeeper.yaml
apiVersion: v1
kind: Service
metadata:name: zk-hslabels:app: zk
spec:ports:- port: 2888name: server- port: 3888name: leader-electionclusterIP: Noneselector:app: zk
---
apiVersion: v1
kind: Service
metadata:name: zk-cslabels:app: zk
spec:ports:- port: 2181targetPort: 2181name: clientselector:app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:name: zk-pdb
spec:selector:matchLabels:app: zkmaxUnavailable: 2
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: zk
spec:selector:matchLabels:app: zkserviceName: zk-hsreplicas: 3updateStrategy:type: RollingUpdatepodManagementPolicy: OrderedReadytemplate:metadata:labels:app: zkspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"containers:- name: kubernetes-zookeeperimagePullPolicy: IfNotPresentimage: "zhaoguanghui6/kubernetes-zookeeper:1.0-3.4.10"resources:requests:memory: "0.5Gi"cpu: "0.5"ports:- containerPort: 2181name: client- containerPort: 2888name: server- containerPort: 3888name: leader-electioncommand:- sh- -c- "start-zookeeper \--servers=3 \--data_dir=/var/lib/zookeeper/data \--data_log_dir=/var/lib/zookeeper/data/log \--conf_dir=/opt/zookeeper/conf \--client_port=2181 \--election_port=3888 \--server_port=2888 \--tick_time=2000 \--init_limit=10 \--sync_limit=5 \--heap=512M \--max_client_cnxns=60 \--snap_retain_count=3 \--purge_interval=12 \--max_session_timeout=40000 \--min_session_timeout=4000 \--log_level=INFO"readinessProbe:exec:command:- sh- -c- "zookeeper-ready 2181"initialDelaySeconds: 10timeoutSeconds: 5livenessProbe:exec:command:- sh- -c- "zookeeper-ready 2181"initialDelaySeconds: 10timeoutSeconds: 5volumeMounts:- name: datadirmountPath: /var/lib/zookeepersecurityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirspec:storageClassName: nfs-clientaccessModes: [ "ReadWriteOnce" ]resources:requests:storage: 1Gi
for i in 0 1 2; do kubectl exec zk-$i – hostname -f; done
zk-0.zk-headless.default.svc.cluster.local
zk-1.zk-headless.default.svc.cluster.local
zk-2.zk-headless.default.svc.cluster.local
kafka.yaml
---
apiVersion: v1
kind: Service
metadata:name: kafka-hslabels:app: kafka
spec:ports:- port: 9092name: serverclusterIP: Noneselector:app: kafka
---
apiVersion: v1
kind: Service
metadata:name: kafka-cslabels:app: kafka
spec:selector:app: kafkatype: NodePortports:- name: clientport: 9092nodePort: 30092
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:name: kafka-pdb
spec:selector:matchLabels:app: kafkaminAvailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafka
spec:serviceName: kafka-hsreplicas: 3selector:matchLabels:app: kafkatemplate:metadata:labels:app: kafkaspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- kafkatopologyKey: "kubernetes.io/hostname"podAffinity:preferredDuringSchedulingIgnoredDuringExecution:- weight: 1podAffinityTerm:labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"terminationGracePeriodSeconds: 300containers:- name: kafkaimagePullPolicy: IfNotPresentimage: registry.cn-hangzhou.aliyuncs.com/jaxzhai/k8skafka:v1resources:requests:memory: "1Gi"cpu: 500mports:- containerPort: 9092name: servercommand:- sh- -c- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} --override listeners=PLAINTEXT://:9092 --override zookeeper.connect=zk-0.zk-hs.default.svc.cluster.local:2181,zk-0.zk-hs.default.svc.cluster.local:2181,zk-0.zk-hs.default.svc.cluster.local:2181 --override log.dir=/var/lib/kafka --override auto.create.topics.enable=true --override auto.leader.rebalance.enable=true --override background.threads=10 --override compression.type=producer --override delete.topic.enable=true --override leader.imbalance.check.interval.seconds=300 --override leader.imbalance.per.broker.percentage=10 --override log.flush.interval.messages=9223372036854775807 --override log.flush.offset.checkpoint.interval.ms=60000 --override log.flush.scheduler.interval.ms=9223372036854775807 --override log.retention.bytes=-1 --override log.retention.hours=168 --override log.roll.hours=168 --override log.roll.jitter.hours=0 --override log.segment.bytes=1073741824 --override log.segment.delete.delay.ms=60000 --override message.max.bytes=1000012 --override min.insync.replicas=1 --override num.io.threads=8 --override num.network.threads=3 --override num.recovery.threads.per.data.dir=1 --override num.replica.fetchers=1 --override offset.metadata.max.bytes=4096 --override offsets.commit.required.acks=-1 --override offsets.commit.timeout.ms=5000 --override offsets.load.buffer.size=5242880 --override offsets.retention.check.interval.ms=600000 --override offsets.retention.minutes=1440 --override offsets.topic.compression.codec=0 --override offsets.topic.num.partitions=50 --override offsets.topic.replication.factor=3 --override offsets.topic.segment.bytes=104857600 --override queued.max.requests=500 --override quota.consumer.default=9223372036854775807 --override quota.producer.default=9223372036854775807 --override replica.fetch.min.bytes=1 --override replica.fetch.wait.max.ms=500 --override replica.high.watermark.checkpoint.interval.ms=5000 --override replica.lag.time.max.ms=10000 --override replica.socket.receive.buffer.bytes=65536 --override replica.socket.timeout.ms=30000 --override request.timeout.ms=30000 --override socket.receive.buffer.bytes=102400 --override socket.request.max.bytes=104857600 --override socket.send.buffer.bytes=102400 --override unclean.leader.election.enable=true --override zookeeper.session.timeout.ms=6000 --override zookeeper.set.acl=false --override broker.id.generation.enable=true --override connections.max.idle.ms=600000 --override controlled.shutdown.enable=true --override controlled.shutdown.max.retries=3 --override controlled.shutdown.retry.backoff.ms=5000 --override controller.socket.timeout.ms=30000 --override default.replication.factor=1 --override fetch.purgatory.purge.interval.requests=1000 --override group.max.session.timeout.ms=300000 --override group.min.session.timeout.ms=6000 --override inter.broker.protocol.version=0.10.2-IV0 --override log.cleaner.backoff.ms=15000 --override log.cleaner.dedupe.buffer.size=134217728 --override log.cleaner.delete.retention.ms=86400000 --override log.cleaner.enable=true --override log.cleaner.io.buffer.load.factor=0.9 --override log.cleaner.io.buffer.size=524288 --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 --override log.cleaner.min.cleanable.ratio=0.5 --override log.cleaner.min.compaction.lag.ms=0 --override log.cleaner.threads=1 --override log.cleanup.policy=delete --override log.index.interval.bytes=4096 --override log.index.size.max.bytes=10485760 --override log.message.timestamp.difference.max.ms=9223372036854775807 --override log.message.timestamp.type=CreateTime --override log.preallocate=false --override log.retention.check.interval.ms=300000 --override max.connections.per.ip=2147483647 --override num.partitions=1 --override producer.purgatory.purge.interval.requests=1000 --override replica.fetch.backoff.ms=1000 --override replica.fetch.max.bytes=1048576 --override replica.fetch.response.max.bytes=10485760 --override reserved.broker.max.id=1000 "env:- name: KAFKA_HEAP_OPTSvalue : "-Xmx1G -Xms1G"- name: KAFKA_OPTSvalue: "-Dlogging.level=INFO"volumeMounts:- name: datadirmountPath: /var/lib/kafkareadinessProbe:exec:command:- sh- -c- "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9092"timeoutSeconds: 5periodSeconds: 5initialDelaySeconds: 70securityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirannotations:volume.beta.kubernetes.io/storage-class: "nfs-client"spec:accessModes: [ "ReadWriteMany" ]resources:requests:storage: 5Gi
四、验证集群
验证kafka是否可用:
1、进入kafka-0命令: kubectl exec -it kafka-0 bash
进入容器目录:cd /opt/kafka/config
2、创建一个名为aaa的topc命令:kafka-topics.sh --create --topic aaa --zookeeper zk-0.zk-headless.default.svc.cluster.local:2181,zk-1.zk-headless.default.svc.cluster.local:2181,zk-2.zk-headless.default.svc.cluster.local:2181 --partitions 3 --replication-factor 2
结果为:
Created topic “aaa”.
3、进入topic为aaa的生产者消息中心:kafka-console-consumer.sh --topic aaa --bootstrap-server localhost:9092
4、复制新的会话,进入另一个容器kafka-1:kubectl exec -it kafka-1 bash
进入消费者,输入命令:kafka-console-producer.sh --topic aaa --broker-list localhost:9092
输入:
hello
i lovle you
回车后,可在生产者消息中心看到消息
最新文章链接,含镜像制作,v3.5.2
相关文章:
Kubernetes kafka系列 | k8s部署kafka+zookeepe集群
一、kafka.zookeeper介绍 Kafka 简介: Apache Kafka 是一个开源的分布式流处理平台和消息队列系统。它最初由LinkedIn开发,并于2011年成为Apache软件基金会的顶级项目。 特点: 高吞吐量: Kafka 能够处理大规模的消息流…...

ip广播智慧工地广播喊话号角 IP网络号角在塔吊中应用 通过寻呼话筒预案广播
ip广播智慧工地广播喊话号角 IP网络号角在塔吊中应用 通过寻呼话筒预案广播 SV-704XT是深圳锐科达电子有限公司的一款壁挂式网络有源号角,具有10/100M以太网接口,可将网络音源通过自带的功放和号角喇叭输出播放,可达到功率50W。SV-704XT内置有…...

B端系统优化,可不是换个颜色和图标,看看与大厂系统的差距。
、不要被流于表面的需求描述迷惑。 很多人找我们优化系统界面,对需求总是轻描淡写,比如:换个颜色、换个图标、换个皮肤,甚至还有的说,随便改下就行。 这些需求都是听起来简单,实现起来难,你如…...

【LeetCode热题100】240. 搜索二维矩阵 II
一.题目要求 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性: 每行的元素从左到右升序排列。 ‘每列的元素从上到下升序排列。 二.题目难度 中等 三.输入样例 示例 1: 输入:matrix [[1,4,7…...

three.js 鼠标左右拖动改变玩家视角
这里主要用到了 一个方法 obj.getWorldDirection(); obj.getWorldDirection()表示的获取obj对象自身z轴正方向在世界坐标空间中的方向。 按下 W键前进运动; <template><div><el-container><el-main><div class"box-card-left…...
Pycharm jupyter server process exited with code 1
Pycharm jupyter server process exited with code 1 1. 问题描述2. 原因和解决方法 1. 问题描述 使用 Pycharm 启动 Jupyter 时,报错如下, jupyter server process exited with code 12. 原因和解决方法 Pycharm 启动 jupyter 时,默认的 …...
ubuntu 20.04 Python pip 配置 pip.conf
1. 状况描述 $ pip install timm WARNING: Retrying (Retry(total4, connectNone, readNone, redirectNone, statusNone)) after connection broken by ProxyError(Cannot connect to proxy., RemoteDisconnected(Remote end closed connection without response)): /simple/t…...

GPT-4.5 Turbo意外曝光,最快明天发布?OpenAI终于要放大招了!
大家好,我是木易,一个持续关注AI领域的互联网技术产品经理,国内Top2本科,美国Top10 CS研究生,MBA。我坚信AI是普通人变强的“外挂”,所以创建了“AI信息Gap”这个公众号,专注于分享AI全维度知识…...
Ubuntu 中 desktop-amd64 和 live-server-amd64 的区别
一、Ubuntu的操作系统镜像 Ubuntu的操作系统镜像主要有两种:desktop-amd64和live-server-amd64 这两者的主要区别在于使用场景和安装方式 1. Desktop-amd64: * 这是Ubuntu的桌面版本,用于安装具有图形用户界面的Ubuntu系统。 * 它包含了用于日常使…...

第10集《天台教观纲宗》
请大家打开讲义第十七页。我们讲到己二、结申正义。 己二、结申正义 《法华经》把我们修行人修行的相貌,比喻作一个车乘。车乘就是一种交通工具,它能够让我们从此岸超越到彼岸去。所以修行它是可以超越的,你今天比昨天超越了,就好…...

每日学习笔记:C++ STL 的forward_list
定义 特点 操作函数 元素查找、移除或安插 forward_list::emplace_after arg...指的是元素构造函数的参数(0~N个) #include <iostream> #include <memory> #include <list> #include <forward_list> using namespace std;class…...

【Java,Redis】Redis 数据库存取字符串数据以及类数据
1、 字符串存取数据 Resource private StringRedisTemplate stringRedisTemplate;//从Redis中获取string字符串 stringRedisTemplate.opsForValue().get("cache:shop:"id); //Json -> class Shop shop JSONUtil.toBean(ShopJson,Shop.class); //字符串写入redis…...

OpenCV 图像重映射函数remap()实例详解
OpenCV 图像重映射函数remap()对图像应用通用几何变换。其原型如下: void remap(InputArray src, OutputArray dst, InputArray map1, InputArray map2, int interpolation, int borderMode BORDER_CONSTANT, const Scalar & borde…...

Python基础课堂最后一课23——正则对象
文章目录 前言一、正则对象是什么?二、正则表达式基本分类1.普通字符2.元字符 总结 前言 很开心能和你们一些学习进步,在这一个多月的时间中,是你们让我坚持了下来,完成了python基础课堂编写,不管如何,我们…...

【算法训练营】凸包,图(Python实现)
凸包 描述 给定n个二维平面上的点,求他们的凸包。 输入 第一行包含一个正整数n。 接下来n行,每行包含两个整数x,y,表示一个点的坐标。 输出 令所有在凸包极边上的点依次为p1,p2,...,pm(序号),其中m表…...

webpack5零基础入门-6webpack处理图片资源
1.在webpack5中file-loader和url-loader为内置模块 通过在加载器中配置rule即可激活 {test: /\.(png|jpe?g|gif|webp)$/,type: asset} 2.使用webpack进行打包 执行npx webpack 可以看到图片资源打包后都被放到了dist文件目录下 3.使用webpack进行图片格式转换为base64 优势…...
计算机基础知识QA
目录 数据库 --mysql 关联查询 唯一索引如何创建,语句 更新表字段语句 查看字段类型 --redis 使用场景 数据结构 设置超时时间 linux 常用命令 发布版本 安装一个东西,发现一个东西安装的很慢,如何切换ip地的源?-&g…...

微信小程序一次性订阅requestSubscribeMessage授权和操作详解
一次性订阅:用户订阅一次发一次通知 一、授权 — requestSubscribeMessage Taro.requestSubscribeMessage({tmplIds: [], // 需要订阅的消息模板的id的集合success (res) {console.log("同意授权", res)},fail(res) {console.log(拒绝授权, res)}})点击或…...
ARM 汇编指令:(三)运算处理指令
目录 一.add指令 二.sub指令 三.MUL指令 一.add指令 add用于执行实现两个寄存器或寄存机或寄存器与立即数的相加操作。它可以用于整数、浮点数等各种数据类型的加法运算。 ADD{cond}{S} Rd,操作数,操作数 1.不带进位加法指令add add r1, r2, #4 //r1 r2 4 add r1, r2 …...

【C++庖丁解牛】STL简介 | string容器初次见面
🍁你好,我是 RO-BERRY 📗 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 🎄感谢你的陪伴与支持 ,故事既有了开头,就要画上一个完美的句号,让我们一起加油 目录 1. 什么是STL2. STL的版本…...
CSS篇-2
4. position 的值分别是相对于哪个位置定位的? position 属性是 CSS 布局中一个非常核心的概念,它允许我们精确控制元素在文档中的定位方式,从而脱离或部分脱离正常的文档流。理解 position 的不同值以及它们各自的定位基准,是实…...

从零打造算法题刷题助手:Agent搭建保姆级攻略
我用Trae 做了一个有意思的Agent 「大厂机试助手」。 点击 https://s.trae.com.cn/a/d2a596 立即复刻,一起来玩吧! Agent 简介 Agent名称为大厂机试助手,主要功能有以下三点。 解题: 根据用户给出的题目给出具体的解题思路引导做…...
div或button一些好看实用的 CSS 样式示例
1:现代渐变按钮 .count {width: 800px;background: linear-gradient(135deg, #72EDF2 0%, #5151E5 100%);padding: 12px 24px;border-radius: 10px;box-shadow: 0 4px 15px rgba(81, 81, 229, 0.3);color: white;font-weight: bold;border: none;cursor: pointer;t…...

20250526惠普HP锐14 AMD锐龙 14英寸轻薄笔记本电脑(八核R7-7730U)的显卡驱动下载
20250526惠普HP锐14 AMD锐龙 14英寸轻薄笔记本电脑(八核R7-7730U)的显卡驱动下载 2025/5/26 14:44 百度:AMD 7700 显卡驱动 amd APU 显卡驱动 https://item.jd.com/100054819707.html 惠普HP【国家补贴20%】锐14 AMD锐龙 14英寸轻薄笔记本电脑(八核R7-7730U 16G 1T…...

GoldenDB管理节点zk部署
目录 1、准备阶段 1.1、部署规划 1.2、硬件准备 1.3、软件准备 1.4、网络端口开通 1.5、环境清理 2、实施阶段 2.1、操作系统配置 2.1.1、主机名修改 2.1.2、修改hosts文件 2.1.3、禁用防火墙 2.1.4、禁用selinux 2.1.5、禁用透明大页 2.1.6、资源限制调整 2.1.…...

asio之async_result
简介 async_result用来表示异步处理返回类型 async_result 是类模板 type:为类模板中声明的类型,对于不同的类型,可以使用类模板特例化,比如针对use_future...
网站缓存入门与实战:浏览器与Nginx/Apache服务器端缓存,让网站速度起飞!(2025)
更多服务器知识,尽在hostol.com 嘿,各位站长和网站管理员朋友们!咱们精心打造的网站,内容再好,设计再炫,如果用户打开它的时候,加载速度慢得像“老牛拉破车”,那体验可就大打折扣了…...
day41 python图像识别任务
目录 一、数据预处理:为模型打下坚实基础 二、模型构建:多层感知机的实现 三、训练过程:迭代优化与性能评估 四、测试结果:模型性能的最终检验 五、总结与展望 在深度学习的旅程中,多层感知机(MLP&…...

【Axure结合Echarts绘制图表】
1.绘制一个矩形,用于之后存放图表,将其命名为test: 2.新建交互 -> 载入时 -> 打开链接: 3.链接到URL或文件路径: 4.点击fx: 5.输入: javascript: var script document.createEleme…...

【Zephyr 系列 2】用 Zephyr 玩转 Arduino UNO / MEGA,实现串口通信与 CLI 命令交互
🎯 本篇目标 在 Ubuntu 下将 Zephyr 运行在 Arduino UNO / MEGA 上 打通串口通信,实现通过串口发送命令与反馈 使用 Zephyr Shell 模块,实现 CLI 命令处理 🪧 为什么 Arduino + Zephyr? 虽然 Arduino 开发板通常用于简单的 C/C++ 开发,但 Zephyr 的支持范围远超 STM32…...