当前位置: 首页 > news >正文

Kubernetes kafka系列 | k8s部署kafka+zookeepe集群

一、kafka.zookeeper介绍

Kafka
简介: Apache Kafka 是一个开源的分布式流处理平台和消息队列系统。它最初由LinkedIn开发,并于2011年成为Apache软件基金会的顶级项目。

特点:

高吞吐量: Kafka 能够处理大规模的消息流,并具有很高的吞吐量。
持久性: 它将消息持久化到磁盘上,因此即使消费者不在线,也能保证消息不会丢失。
可伸缩性: Kafka 可以很容易地水平扩展以处理大量数据。
实时性: Kafka 可以提供几乎实时的消息传递,适用于大多数实时数据处理需求。
用途:

日志收集: Kafka 可以用作集中式的日志收集系统,收集来自不同源头的日志数据。
消息队列: Kafka 可以用作分布式应用程序之间的消息队列,用于解耦和异步通信。
流处理: Kafka 可以与流处理框架(如Apache Spark、Apache Flink等)结合使用,用于实时数据处理和分析。
ZooKeeper
简介: Apache ZooKeeper 是一个开源的分布式协调服务,最初也是由Yahoo开发的,并于2010年成为Apache软件基金会的顶级项目。

特点:

分布式协调: ZooKeeper 提供了分布式应用程序的协调服务,包括配置管理、命名服务、分布式锁等。
高可用性: ZooKeeper 通过在集群中保持多个节点的复制来实现高可用性和容错性。
一致性: ZooKeeper 提供了严格的一致性,确保所有的客户端在同一时间看到相同的数据视图。
用途:

配置管理: ZooKeeper 可以用于分布式系统的配置管理,例如动态配置更新。
命名服务: ZooKeeper 可以提供命名服务,帮助分布式系统中的节点发现和通信。
分布式锁: ZooKeeper 可以用于实现分布式锁,确保在分布式系统中对共享资源的互斥访问。
Kafka 和 ZooKeeper 的关系

在 Kafka 中,ZooKeeper 主要用于管理集群的元数据(如主题、分区、副本分配等)、领导者选举以及生产者和消费者的协调。Kafka 依赖于 ZooKeeper 来确保分布式系统的稳定运行。通常情况下,Kafka 和 ZooKeeper 会一起部署,但它们是两个独立的项目,各自提供不同的功能。

二、创建存储卷

nfs动态供给直通车

三、搭建Kafka集群

# 操作系统
# CentOS Linux release 7.9.2009 (Core)
lsb_release -a# 内核版本
# 3.10.0-1160.90.1.el7.x86_64
uname -a
# k8s 版本 1.21
# zookeeper 版本 3.4.10  kafka镜像版本0.11(嫌低可以自己换)

kafka需要依赖zookeeper
kafka的生产者与消费者需要在zookeeper中注册,不然消费者怎么知道生产者是否存活之类的哈哈。废话不多说,直接上干货!

本文用的是statefulset和动态存储部署zookeeper和kafka集群。

zookeeper.yaml

apiVersion: v1
kind: Service
metadata:name: zk-hslabels:app: zk
spec:ports:- port: 2888name: server- port: 3888name: leader-electionclusterIP: Noneselector:app: zk
---
apiVersion: v1
kind: Service
metadata:name: zk-cslabels:app: zk
spec:ports:- port: 2181targetPort: 2181name: clientselector:app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:name: zk-pdb
spec:selector:matchLabels:app: zkmaxUnavailable: 2
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: zk
spec:selector:matchLabels:app: zkserviceName: zk-hsreplicas: 3updateStrategy:type: RollingUpdatepodManagementPolicy: OrderedReadytemplate:metadata:labels:app: zkspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"containers:- name: kubernetes-zookeeperimagePullPolicy: IfNotPresentimage: "zhaoguanghui6/kubernetes-zookeeper:1.0-3.4.10"resources:requests:memory: "0.5Gi"cpu: "0.5"ports:- containerPort: 2181name: client- containerPort: 2888name: server- containerPort: 3888name: leader-electioncommand:- sh- -c- "start-zookeeper \--servers=3 \--data_dir=/var/lib/zookeeper/data \--data_log_dir=/var/lib/zookeeper/data/log \--conf_dir=/opt/zookeeper/conf \--client_port=2181 \--election_port=3888 \--server_port=2888 \--tick_time=2000 \--init_limit=10 \--sync_limit=5 \--heap=512M \--max_client_cnxns=60 \--snap_retain_count=3 \--purge_interval=12 \--max_session_timeout=40000 \--min_session_timeout=4000 \--log_level=INFO"readinessProbe:exec:command:- sh- -c- "zookeeper-ready 2181"initialDelaySeconds: 10timeoutSeconds: 5livenessProbe:exec:command:- sh- -c- "zookeeper-ready 2181"initialDelaySeconds: 10timeoutSeconds: 5volumeMounts:- name: datadirmountPath: /var/lib/zookeepersecurityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirspec:storageClassName: nfs-clientaccessModes: [ "ReadWriteOnce" ]resources:requests:storage: 1Gi

for i in 0 1 2; do kubectl exec zk-$i – hostname -f; done
zk-0.zk-headless.default.svc.cluster.local

zk-1.zk-headless.default.svc.cluster.local

zk-2.zk-headless.default.svc.cluster.local
kafka.yaml

---
apiVersion: v1
kind: Service
metadata:name: kafka-hslabels:app: kafka
spec:ports:- port: 9092name: serverclusterIP: Noneselector:app: kafka
--- 
apiVersion: v1
kind: Service
metadata:name: kafka-cslabels:app: kafka
spec:selector:app: kafkatype: NodePortports:- name: clientport: 9092nodePort: 30092
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:name: kafka-pdb
spec:selector:matchLabels:app: kafkaminAvailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafka
spec:serviceName: kafka-hsreplicas: 3selector:matchLabels:app: kafkatemplate:metadata:labels:app: kafkaspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- kafkatopologyKey: "kubernetes.io/hostname"podAffinity:preferredDuringSchedulingIgnoredDuringExecution:- weight: 1podAffinityTerm:labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"terminationGracePeriodSeconds: 300containers:- name: kafkaimagePullPolicy: IfNotPresentimage: registry.cn-hangzhou.aliyuncs.com/jaxzhai/k8skafka:v1resources:requests:memory: "1Gi"cpu: 500mports:- containerPort: 9092name: servercommand:- sh- -c- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} --override listeners=PLAINTEXT://:9092 --override zookeeper.connect=zk-0.zk-hs.default.svc.cluster.local:2181,zk-0.zk-hs.default.svc.cluster.local:2181,zk-0.zk-hs.default.svc.cluster.local:2181 --override log.dir=/var/lib/kafka --override auto.create.topics.enable=true --override auto.leader.rebalance.enable=true --override background.threads=10 --override compression.type=producer --override delete.topic.enable=true --override leader.imbalance.check.interval.seconds=300 --override leader.imbalance.per.broker.percentage=10 --override log.flush.interval.messages=9223372036854775807 --override log.flush.offset.checkpoint.interval.ms=60000 --override log.flush.scheduler.interval.ms=9223372036854775807 --override log.retention.bytes=-1 --override log.retention.hours=168 --override log.roll.hours=168 --override log.roll.jitter.hours=0 --override log.segment.bytes=1073741824 --override log.segment.delete.delay.ms=60000 --override message.max.bytes=1000012 --override min.insync.replicas=1 --override num.io.threads=8 --override num.network.threads=3 --override num.recovery.threads.per.data.dir=1 --override num.replica.fetchers=1 --override offset.metadata.max.bytes=4096 --override offsets.commit.required.acks=-1 --override offsets.commit.timeout.ms=5000 --override offsets.load.buffer.size=5242880 --override offsets.retention.check.interval.ms=600000 --override offsets.retention.minutes=1440 --override offsets.topic.compression.codec=0 --override offsets.topic.num.partitions=50 --override offsets.topic.replication.factor=3 --override offsets.topic.segment.bytes=104857600 --override queued.max.requests=500 --override quota.consumer.default=9223372036854775807 --override quota.producer.default=9223372036854775807 --override replica.fetch.min.bytes=1 --override replica.fetch.wait.max.ms=500 --override replica.high.watermark.checkpoint.interval.ms=5000 --override replica.lag.time.max.ms=10000 --override replica.socket.receive.buffer.bytes=65536 --override replica.socket.timeout.ms=30000 --override request.timeout.ms=30000 --override socket.receive.buffer.bytes=102400 --override socket.request.max.bytes=104857600 --override socket.send.buffer.bytes=102400 --override unclean.leader.election.enable=true --override zookeeper.session.timeout.ms=6000 --override zookeeper.set.acl=false --override broker.id.generation.enable=true --override connections.max.idle.ms=600000 --override controlled.shutdown.enable=true --override controlled.shutdown.max.retries=3 --override controlled.shutdown.retry.backoff.ms=5000 --override controller.socket.timeout.ms=30000 --override default.replication.factor=1 --override fetch.purgatory.purge.interval.requests=1000 --override group.max.session.timeout.ms=300000 --override group.min.session.timeout.ms=6000 --override inter.broker.protocol.version=0.10.2-IV0 --override log.cleaner.backoff.ms=15000 --override log.cleaner.dedupe.buffer.size=134217728 --override log.cleaner.delete.retention.ms=86400000 --override log.cleaner.enable=true --override log.cleaner.io.buffer.load.factor=0.9 --override log.cleaner.io.buffer.size=524288 --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 --override log.cleaner.min.cleanable.ratio=0.5 --override log.cleaner.min.compaction.lag.ms=0 --override log.cleaner.threads=1 --override log.cleanup.policy=delete --override log.index.interval.bytes=4096 --override log.index.size.max.bytes=10485760 --override log.message.timestamp.difference.max.ms=9223372036854775807 --override log.message.timestamp.type=CreateTime --override log.preallocate=false --override log.retention.check.interval.ms=300000 --override max.connections.per.ip=2147483647 --override num.partitions=1 --override producer.purgatory.purge.interval.requests=1000 --override replica.fetch.backoff.ms=1000 --override replica.fetch.max.bytes=1048576 --override replica.fetch.response.max.bytes=10485760 --override reserved.broker.max.id=1000 "env:- name: KAFKA_HEAP_OPTSvalue : "-Xmx1G -Xms1G"- name: KAFKA_OPTSvalue: "-Dlogging.level=INFO"volumeMounts:- name: datadirmountPath: /var/lib/kafkareadinessProbe:exec:command:- sh- -c- "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9092"timeoutSeconds: 5periodSeconds: 5initialDelaySeconds: 70securityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirannotations:volume.beta.kubernetes.io/storage-class: "nfs-client"spec:accessModes: [ "ReadWriteMany" ]resources:requests:storage: 5Gi

四、验证集群
验证kafka是否可用:

1、进入kafka-0命令: kubectl exec -it kafka-0 bash
进入容器目录:cd /opt/kafka/config

2、创建一个名为aaa的topc命令:kafka-topics.sh --create --topic aaa --zookeeper zk-0.zk-headless.default.svc.cluster.local:2181,zk-1.zk-headless.default.svc.cluster.local:2181,zk-2.zk-headless.default.svc.cluster.local:2181 --partitions 3 --replication-factor 2
结果为:
Created topic “aaa”.

3、进入topic为aaa的生产者消息中心:kafka-console-consumer.sh --topic aaa --bootstrap-server localhost:9092

4、复制新的会话,进入另一个容器kafka-1:kubectl exec -it kafka-1 bash

进入消费者,输入命令:kafka-console-producer.sh --topic aaa --broker-list localhost:9092

输入:

hello

i lovle you

回车后,可在生产者消息中心看到消息

最新文章链接,含镜像制作,v3.5.2

相关文章:

Kubernetes kafka系列 | k8s部署kafka+zookeepe集群

一、kafka.zookeeper介绍 Kafka 简介: Apache Kafka 是一个开源的分布式流处理平台和消息队列系统。它最初由LinkedIn开发,并于2011年成为Apache软件基金会的顶级项目。 特点: 高吞吐量: Kafka 能够处理大规模的消息流&#xf…...

ip广播智慧工地广播喊话号角 IP网络号角在塔吊中应用 通过寻呼话筒预案广播

ip广播智慧工地广播喊话号角 IP网络号角在塔吊中应用 通过寻呼话筒预案广播 SV-704XT是深圳锐科达电子有限公司的一款壁挂式网络有源号角,具有10/100M以太网接口,可将网络音源通过自带的功放和号角喇叭输出播放,可达到功率50W。SV-704XT内置有…...

B端系统优化,可不是换个颜色和图标,看看与大厂系统的差距。

、不要被流于表面的需求描述迷惑。 很多人找我们优化系统界面,对需求总是轻描淡写,比如:换个颜色、换个图标、换个皮肤,甚至还有的说,随便改下就行。 这些需求都是听起来简单,实现起来难,你如…...

【LeetCode热题100】240. 搜索二维矩阵 II

一.题目要求 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性: 每行的元素从左到右升序排列。 ‘每列的元素从上到下升序排列。 二.题目难度 中等 三.输入样例 示例 1: 输入:matrix [[1,4,7…...

three.js 鼠标左右拖动改变玩家视角

这里主要用到了 一个方法 obj.getWorldDirection(); obj.getWorldDirection()表示的获取obj对象自身z轴正方向在世界坐标空间中的方向。 按下 W键前进运动&#xff1b; <template><div><el-container><el-main><div class"box-card-left…...

Pycharm jupyter server process exited with code 1

Pycharm jupyter server process exited with code 1 1. 问题描述2. 原因和解决方法 1. 问题描述 使用 Pycharm 启动 Jupyter 时&#xff0c;报错如下&#xff0c; jupyter server process exited with code 12. 原因和解决方法 Pycharm 启动 jupyter 时&#xff0c;默认的 …...

ubuntu 20.04 Python pip 配置 pip.conf

1. 状况描述 $ pip install timm WARNING: Retrying (Retry(total4, connectNone, readNone, redirectNone, statusNone)) after connection broken by ProxyError(Cannot connect to proxy., RemoteDisconnected(Remote end closed connection without response)): /simple/t…...

GPT-4.5 Turbo意外曝光,最快明天发布?OpenAI终于要放大招了!

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;所以创建了“AI信息Gap”这个公众号&#xff0c;专注于分享AI全维度知识…...

Ubuntu 中 desktop-amd64 和 live-server-amd64 的区别

一、Ubuntu的操作系统镜像 Ubuntu的操作系统镜像主要有两种&#xff1a;desktop-amd64和live-server-amd64 这两者的主要区别在于使用场景和安装方式 1. Desktop-amd64: * 这是Ubuntu的桌面版本&#xff0c;用于安装具有图形用户界面的Ubuntu系统。 * 它包含了用于日常使…...

第10集《天台教观纲宗》

请大家打开讲义第十七页。我们讲到己二、结申正义。 己二、结申正义 《法华经》把我们修行人修行的相貌&#xff0c;比喻作一个车乘。车乘就是一种交通工具&#xff0c;它能够让我们从此岸超越到彼岸去。所以修行它是可以超越的&#xff0c;你今天比昨天超越了&#xff0c;就好…...

每日学习笔记:C++ STL 的forward_list

定义 特点 操作函数 元素查找、移除或安插 forward_list::emplace_after arg...指的是元素构造函数的参数&#xff08;0~N个&#xff09; #include <iostream> #include <memory> #include <list> #include <forward_list> using namespace std;class…...

【Java,Redis】Redis 数据库存取字符串数据以及类数据

1、 字符串存取数据 Resource private StringRedisTemplate stringRedisTemplate;//从Redis中获取string字符串 stringRedisTemplate.opsForValue().get("cache:shop:"id); //Json -> class Shop shop JSONUtil.toBean(ShopJson,Shop.class); //字符串写入redis…...

OpenCV 图像重映射函数remap()实例详解

OpenCV 图像重映射函数remap()对图像应用通用几何变换。其原型如下&#xff1a; void remap(InputArray src, OutputArray dst, InputArray map1, InputArray map2, int interpolation&#xff0c; int borderMode BORDER_CONSTANT&#xff0c; const Scalar & borde…...

Python基础课堂最后一课23——正则对象

文章目录 前言一、正则对象是什么&#xff1f;二、正则表达式基本分类1.普通字符2.元字符 总结 前言 很开心能和你们一些学习进步&#xff0c;在这一个多月的时间中&#xff0c;是你们让我坚持了下来&#xff0c;完成了python基础课堂编写&#xff0c;不管如何&#xff0c;我们…...

【算法训练营】凸包,图(Python实现)

凸包 描述 给定n个二维平面上的点&#xff0c;求他们的凸包。 输入 第一行包含一个正整数n。 接下来n行&#xff0c;每行包含两个整数x,y&#xff0c;表示一个点的坐标。 输出 令所有在凸包极边上的点依次为p1,p2,...,pm&#xff08;序号&#xff09;&#xff0c;其中m表…...

webpack5零基础入门-6webpack处理图片资源

1.在webpack5中file-loader和url-loader为内置模块 通过在加载器中配置rule即可激活 {test: /\.(png|jpe?g|gif|webp)$/,type: asset} 2.使用webpack进行打包 执行npx webpack 可以看到图片资源打包后都被放到了dist文件目录下 3.使用webpack进行图片格式转换为base64 优势…...

计算机基础知识QA

目录 数据库 --mysql 关联查询 唯一索引如何创建&#xff0c;语句 更新表字段语句 查看字段类型 --redis 使用场景 数据结构 设置超时时间 linux 常用命令 发布版本 安装一个东西&#xff0c;发现一个东西安装的很慢&#xff0c;如何切换ip地的源&#xff1f;-&g…...

微信小程序一次性订阅requestSubscribeMessage授权和操作详解

一次性订阅&#xff1a;用户订阅一次发一次通知 一、授权 — requestSubscribeMessage Taro.requestSubscribeMessage({tmplIds: [], // 需要订阅的消息模板的id的集合success (res) {console.log("同意授权", res)},fail(res) {console.log(拒绝授权, res)}})点击或…...

ARM 汇编指令:(三)运算处理指令

目录 一.add指令 二.sub指令 三.MUL指令 一.add指令 add用于执行实现两个寄存器或寄存机或寄存器与立即数的相加操作。它可以用于整数、浮点数等各种数据类型的加法运算。 ADD{cond}{S} Rd,操作数,操作数 1.不带进位加法指令add add r1, r2, #4 //r1 r2 4 add r1, r2 …...

【C++庖丁解牛】STL简介 | string容器初次见面

&#x1f341;你好&#xff0c;我是 RO-BERRY &#x1f4d7; 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f384;感谢你的陪伴与支持 &#xff0c;故事既有了开头&#xff0c;就要画上一个完美的句号&#xff0c;让我们一起加油 目录 1. 什么是STL2. STL的版本…...

CSS篇-2

4. position 的值分别是相对于哪个位置定位的&#xff1f; position 属性是 CSS 布局中一个非常核心的概念&#xff0c;它允许我们精确控制元素在文档中的定位方式&#xff0c;从而脱离或部分脱离正常的文档流。理解 position 的不同值以及它们各自的定位基准&#xff0c;是实…...

从零打造算法题刷题助手:Agent搭建保姆级攻略

我用Trae 做了一个有意思的Agent 「大厂机试助手」。 点击 https://s.trae.com.cn/a/d2a596 立即复刻&#xff0c;一起来玩吧&#xff01; Agent 简介 Agent名称为大厂机试助手&#xff0c;主要功能有以下三点。 解题&#xff1a; 根据用户给出的题目给出具体的解题思路引导做…...

div或button一些好看实用的 CSS 样式示例

1&#xff1a;现代渐变按钮 .count {width: 800px;background: linear-gradient(135deg, #72EDF2 0%, #5151E5 100%);padding: 12px 24px;border-radius: 10px;box-shadow: 0 4px 15px rgba(81, 81, 229, 0.3);color: white;font-weight: bold;border: none;cursor: pointer;t…...

20250526惠普HP锐14 AMD锐龙 14英寸轻薄笔记本电脑(八核R7-7730U)的显卡驱动下载

20250526惠普HP锐14 AMD锐龙 14英寸轻薄笔记本电脑(八核R7-7730U)的显卡驱动下载 2025/5/26 14:44 百度&#xff1a;AMD 7700 显卡驱动 amd APU 显卡驱动 https://item.jd.com/100054819707.html 惠普HP【国家补贴20%】锐14 AMD锐龙 14英寸轻薄笔记本电脑(八核R7-7730U 16G 1T…...

GoldenDB管理节点zk部署

目录 1、准备阶段 1.1、部署规划 1.2、硬件准备 1.3、软件准备 1.4、网络端口开通 1.5、环境清理 2、实施阶段 2.1、操作系统配置 2.1.1、主机名修改 2.1.2、修改hosts文件 2.1.3、禁用防火墙 2.1.4、禁用selinux 2.1.5、禁用透明大页 2.1.6、资源限制调整 2.1.…...

asio之async_result

简介 async_result用来表示异步处理返回类型 async_result 是类模板 type&#xff1a;为类模板中声明的类型&#xff0c;对于不同的类型&#xff0c;可以使用类模板特例化&#xff0c;比如针对use_future...

网站缓存入门与实战:浏览器与Nginx/Apache服务器端缓存,让网站速度起飞!(2025)

更多服务器知识&#xff0c;尽在hostol.com 嘿&#xff0c;各位站长和网站管理员朋友们&#xff01;咱们精心打造的网站&#xff0c;内容再好&#xff0c;设计再炫&#xff0c;如果用户打开它的时候&#xff0c;加载速度慢得像“老牛拉破车”&#xff0c;那体验可就大打折扣了…...

day41 python图像识别任务

目录 一、数据预处理&#xff1a;为模型打下坚实基础 二、模型构建&#xff1a;多层感知机的实现 三、训练过程&#xff1a;迭代优化与性能评估 四、测试结果&#xff1a;模型性能的最终检验 五、总结与展望 在深度学习的旅程中&#xff0c;多层感知机&#xff08;MLP&…...

【Axure结合Echarts绘制图表】

1.绘制一个矩形&#xff0c;用于之后存放图表&#xff0c;将其命名为test&#xff1a; 2.新建交互 -> 载入时 -> 打开链接&#xff1a; 3.链接到URL或文件路径&#xff1a; 4.点击fx&#xff1a; 5.输入&#xff1a; javascript: var script document.createEleme…...

【Zephyr 系列 2】用 Zephyr 玩转 Arduino UNO / MEGA,实现串口通信与 CLI 命令交互

🎯 本篇目标 在 Ubuntu 下将 Zephyr 运行在 Arduino UNO / MEGA 上 打通串口通信,实现通过串口发送命令与反馈 使用 Zephyr Shell 模块,实现 CLI 命令处理 🪧 为什么 Arduino + Zephyr? 虽然 Arduino 开发板通常用于简单的 C/C++ 开发,但 Zephyr 的支持范围远超 STM32…...