Kubernetes kafka系列 | k8s部署kafka+zookeepe集群
一、kafka.zookeeper介绍
Kafka
简介: Apache Kafka 是一个开源的分布式流处理平台和消息队列系统。它最初由LinkedIn开发,并于2011年成为Apache软件基金会的顶级项目。
特点:
高吞吐量: Kafka 能够处理大规模的消息流,并具有很高的吞吐量。
持久性: 它将消息持久化到磁盘上,因此即使消费者不在线,也能保证消息不会丢失。
可伸缩性: Kafka 可以很容易地水平扩展以处理大量数据。
实时性: Kafka 可以提供几乎实时的消息传递,适用于大多数实时数据处理需求。
用途:
日志收集: Kafka 可以用作集中式的日志收集系统,收集来自不同源头的日志数据。
消息队列: Kafka 可以用作分布式应用程序之间的消息队列,用于解耦和异步通信。
流处理: Kafka 可以与流处理框架(如Apache Spark、Apache Flink等)结合使用,用于实时数据处理和分析。
ZooKeeper
简介: Apache ZooKeeper 是一个开源的分布式协调服务,最初也是由Yahoo开发的,并于2010年成为Apache软件基金会的顶级项目。
特点:
分布式协调: ZooKeeper 提供了分布式应用程序的协调服务,包括配置管理、命名服务、分布式锁等。
高可用性: ZooKeeper 通过在集群中保持多个节点的复制来实现高可用性和容错性。
一致性: ZooKeeper 提供了严格的一致性,确保所有的客户端在同一时间看到相同的数据视图。
用途:
配置管理: ZooKeeper 可以用于分布式系统的配置管理,例如动态配置更新。
命名服务: ZooKeeper 可以提供命名服务,帮助分布式系统中的节点发现和通信。
分布式锁: ZooKeeper 可以用于实现分布式锁,确保在分布式系统中对共享资源的互斥访问。
Kafka 和 ZooKeeper 的关系
在 Kafka 中,ZooKeeper 主要用于管理集群的元数据(如主题、分区、副本分配等)、领导者选举以及生产者和消费者的协调。Kafka 依赖于 ZooKeeper 来确保分布式系统的稳定运行。通常情况下,Kafka 和 ZooKeeper 会一起部署,但它们是两个独立的项目,各自提供不同的功能。
二、创建存储卷
nfs动态供给直通车
三、搭建Kafka集群
# 操作系统
# CentOS Linux release 7.9.2009 (Core)
lsb_release -a# 内核版本
# 3.10.0-1160.90.1.el7.x86_64
uname -a
# k8s 版本 1.21
# zookeeper 版本 3.4.10 kafka镜像版本0.11(嫌低可以自己换)
kafka需要依赖zookeeper
kafka的生产者与消费者需要在zookeeper中注册,不然消费者怎么知道生产者是否存活之类的哈哈。废话不多说,直接上干货!
本文用的是statefulset和动态存储部署zookeeper和kafka集群。
zookeeper.yaml
apiVersion: v1
kind: Service
metadata:name: zk-hslabels:app: zk
spec:ports:- port: 2888name: server- port: 3888name: leader-electionclusterIP: Noneselector:app: zk
---
apiVersion: v1
kind: Service
metadata:name: zk-cslabels:app: zk
spec:ports:- port: 2181targetPort: 2181name: clientselector:app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:name: zk-pdb
spec:selector:matchLabels:app: zkmaxUnavailable: 2
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: zk
spec:selector:matchLabels:app: zkserviceName: zk-hsreplicas: 3updateStrategy:type: RollingUpdatepodManagementPolicy: OrderedReadytemplate:metadata:labels:app: zkspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"containers:- name: kubernetes-zookeeperimagePullPolicy: IfNotPresentimage: "zhaoguanghui6/kubernetes-zookeeper:1.0-3.4.10"resources:requests:memory: "0.5Gi"cpu: "0.5"ports:- containerPort: 2181name: client- containerPort: 2888name: server- containerPort: 3888name: leader-electioncommand:- sh- -c- "start-zookeeper \--servers=3 \--data_dir=/var/lib/zookeeper/data \--data_log_dir=/var/lib/zookeeper/data/log \--conf_dir=/opt/zookeeper/conf \--client_port=2181 \--election_port=3888 \--server_port=2888 \--tick_time=2000 \--init_limit=10 \--sync_limit=5 \--heap=512M \--max_client_cnxns=60 \--snap_retain_count=3 \--purge_interval=12 \--max_session_timeout=40000 \--min_session_timeout=4000 \--log_level=INFO"readinessProbe:exec:command:- sh- -c- "zookeeper-ready 2181"initialDelaySeconds: 10timeoutSeconds: 5livenessProbe:exec:command:- sh- -c- "zookeeper-ready 2181"initialDelaySeconds: 10timeoutSeconds: 5volumeMounts:- name: datadirmountPath: /var/lib/zookeepersecurityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirspec:storageClassName: nfs-clientaccessModes: [ "ReadWriteOnce" ]resources:requests:storage: 1Gi
for i in 0 1 2; do kubectl exec zk-$i – hostname -f; done
zk-0.zk-headless.default.svc.cluster.local
zk-1.zk-headless.default.svc.cluster.local
zk-2.zk-headless.default.svc.cluster.local
kafka.yaml
---
apiVersion: v1
kind: Service
metadata:name: kafka-hslabels:app: kafka
spec:ports:- port: 9092name: serverclusterIP: Noneselector:app: kafka
---
apiVersion: v1
kind: Service
metadata:name: kafka-cslabels:app: kafka
spec:selector:app: kafkatype: NodePortports:- name: clientport: 9092nodePort: 30092
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:name: kafka-pdb
spec:selector:matchLabels:app: kafkaminAvailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafka
spec:serviceName: kafka-hsreplicas: 3selector:matchLabels:app: kafkatemplate:metadata:labels:app: kafkaspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- kafkatopologyKey: "kubernetes.io/hostname"podAffinity:preferredDuringSchedulingIgnoredDuringExecution:- weight: 1podAffinityTerm:labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"terminationGracePeriodSeconds: 300containers:- name: kafkaimagePullPolicy: IfNotPresentimage: registry.cn-hangzhou.aliyuncs.com/jaxzhai/k8skafka:v1resources:requests:memory: "1Gi"cpu: 500mports:- containerPort: 9092name: servercommand:- sh- -c- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} --override listeners=PLAINTEXT://:9092 --override zookeeper.connect=zk-0.zk-hs.default.svc.cluster.local:2181,zk-0.zk-hs.default.svc.cluster.local:2181,zk-0.zk-hs.default.svc.cluster.local:2181 --override log.dir=/var/lib/kafka --override auto.create.topics.enable=true --override auto.leader.rebalance.enable=true --override background.threads=10 --override compression.type=producer --override delete.topic.enable=true --override leader.imbalance.check.interval.seconds=300 --override leader.imbalance.per.broker.percentage=10 --override log.flush.interval.messages=9223372036854775807 --override log.flush.offset.checkpoint.interval.ms=60000 --override log.flush.scheduler.interval.ms=9223372036854775807 --override log.retention.bytes=-1 --override log.retention.hours=168 --override log.roll.hours=168 --override log.roll.jitter.hours=0 --override log.segment.bytes=1073741824 --override log.segment.delete.delay.ms=60000 --override message.max.bytes=1000012 --override min.insync.replicas=1 --override num.io.threads=8 --override num.network.threads=3 --override num.recovery.threads.per.data.dir=1 --override num.replica.fetchers=1 --override offset.metadata.max.bytes=4096 --override offsets.commit.required.acks=-1 --override offsets.commit.timeout.ms=5000 --override offsets.load.buffer.size=5242880 --override offsets.retention.check.interval.ms=600000 --override offsets.retention.minutes=1440 --override offsets.topic.compression.codec=0 --override offsets.topic.num.partitions=50 --override offsets.topic.replication.factor=3 --override offsets.topic.segment.bytes=104857600 --override queued.max.requests=500 --override quota.consumer.default=9223372036854775807 --override quota.producer.default=9223372036854775807 --override replica.fetch.min.bytes=1 --override replica.fetch.wait.max.ms=500 --override replica.high.watermark.checkpoint.interval.ms=5000 --override replica.lag.time.max.ms=10000 --override replica.socket.receive.buffer.bytes=65536 --override replica.socket.timeout.ms=30000 --override request.timeout.ms=30000 --override socket.receive.buffer.bytes=102400 --override socket.request.max.bytes=104857600 --override socket.send.buffer.bytes=102400 --override unclean.leader.election.enable=true --override zookeeper.session.timeout.ms=6000 --override zookeeper.set.acl=false --override broker.id.generation.enable=true --override connections.max.idle.ms=600000 --override controlled.shutdown.enable=true --override controlled.shutdown.max.retries=3 --override controlled.shutdown.retry.backoff.ms=5000 --override controller.socket.timeout.ms=30000 --override default.replication.factor=1 --override fetch.purgatory.purge.interval.requests=1000 --override group.max.session.timeout.ms=300000 --override group.min.session.timeout.ms=6000 --override inter.broker.protocol.version=0.10.2-IV0 --override log.cleaner.backoff.ms=15000 --override log.cleaner.dedupe.buffer.size=134217728 --override log.cleaner.delete.retention.ms=86400000 --override log.cleaner.enable=true --override log.cleaner.io.buffer.load.factor=0.9 --override log.cleaner.io.buffer.size=524288 --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 --override log.cleaner.min.cleanable.ratio=0.5 --override log.cleaner.min.compaction.lag.ms=0 --override log.cleaner.threads=1 --override log.cleanup.policy=delete --override log.index.interval.bytes=4096 --override log.index.size.max.bytes=10485760 --override log.message.timestamp.difference.max.ms=9223372036854775807 --override log.message.timestamp.type=CreateTime --override log.preallocate=false --override log.retention.check.interval.ms=300000 --override max.connections.per.ip=2147483647 --override num.partitions=1 --override producer.purgatory.purge.interval.requests=1000 --override replica.fetch.backoff.ms=1000 --override replica.fetch.max.bytes=1048576 --override replica.fetch.response.max.bytes=10485760 --override reserved.broker.max.id=1000 "env:- name: KAFKA_HEAP_OPTSvalue : "-Xmx1G -Xms1G"- name: KAFKA_OPTSvalue: "-Dlogging.level=INFO"volumeMounts:- name: datadirmountPath: /var/lib/kafkareadinessProbe:exec:command:- sh- -c- "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9092"timeoutSeconds: 5periodSeconds: 5initialDelaySeconds: 70securityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirannotations:volume.beta.kubernetes.io/storage-class: "nfs-client"spec:accessModes: [ "ReadWriteMany" ]resources:requests:storage: 5Gi
四、验证集群
验证kafka是否可用:
1、进入kafka-0命令: kubectl exec -it kafka-0 bash
进入容器目录:cd /opt/kafka/config
2、创建一个名为aaa的topc命令:kafka-topics.sh --create --topic aaa --zookeeper zk-0.zk-headless.default.svc.cluster.local:2181,zk-1.zk-headless.default.svc.cluster.local:2181,zk-2.zk-headless.default.svc.cluster.local:2181 --partitions 3 --replication-factor 2
结果为:
Created topic “aaa”.
3、进入topic为aaa的生产者消息中心:kafka-console-consumer.sh --topic aaa --bootstrap-server localhost:9092
4、复制新的会话,进入另一个容器kafka-1:kubectl exec -it kafka-1 bash
进入消费者,输入命令:kafka-console-producer.sh --topic aaa --broker-list localhost:9092
输入:
hello
i lovle you
回车后,可在生产者消息中心看到消息
最新文章链接,含镜像制作,v3.5.2
相关文章:
Kubernetes kafka系列 | k8s部署kafka+zookeepe集群
一、kafka.zookeeper介绍 Kafka 简介: Apache Kafka 是一个开源的分布式流处理平台和消息队列系统。它最初由LinkedIn开发,并于2011年成为Apache软件基金会的顶级项目。 特点: 高吞吐量: Kafka 能够处理大规模的消息流…...
ip广播智慧工地广播喊话号角 IP网络号角在塔吊中应用 通过寻呼话筒预案广播
ip广播智慧工地广播喊话号角 IP网络号角在塔吊中应用 通过寻呼话筒预案广播 SV-704XT是深圳锐科达电子有限公司的一款壁挂式网络有源号角,具有10/100M以太网接口,可将网络音源通过自带的功放和号角喇叭输出播放,可达到功率50W。SV-704XT内置有…...
B端系统优化,可不是换个颜色和图标,看看与大厂系统的差距。
、不要被流于表面的需求描述迷惑。 很多人找我们优化系统界面,对需求总是轻描淡写,比如:换个颜色、换个图标、换个皮肤,甚至还有的说,随便改下就行。 这些需求都是听起来简单,实现起来难,你如…...
【LeetCode热题100】240. 搜索二维矩阵 II
一.题目要求 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性: 每行的元素从左到右升序排列。 ‘每列的元素从上到下升序排列。 二.题目难度 中等 三.输入样例 示例 1: 输入:matrix [[1,4,7…...
three.js 鼠标左右拖动改变玩家视角
这里主要用到了 一个方法 obj.getWorldDirection(); obj.getWorldDirection()表示的获取obj对象自身z轴正方向在世界坐标空间中的方向。 按下 W键前进运动; <template><div><el-container><el-main><div class"box-card-left…...
Pycharm jupyter server process exited with code 1
Pycharm jupyter server process exited with code 1 1. 问题描述2. 原因和解决方法 1. 问题描述 使用 Pycharm 启动 Jupyter 时,报错如下, jupyter server process exited with code 12. 原因和解决方法 Pycharm 启动 jupyter 时,默认的 …...
ubuntu 20.04 Python pip 配置 pip.conf
1. 状况描述 $ pip install timm WARNING: Retrying (Retry(total4, connectNone, readNone, redirectNone, statusNone)) after connection broken by ProxyError(Cannot connect to proxy., RemoteDisconnected(Remote end closed connection without response)): /simple/t…...
GPT-4.5 Turbo意外曝光,最快明天发布?OpenAI终于要放大招了!
大家好,我是木易,一个持续关注AI领域的互联网技术产品经理,国内Top2本科,美国Top10 CS研究生,MBA。我坚信AI是普通人变强的“外挂”,所以创建了“AI信息Gap”这个公众号,专注于分享AI全维度知识…...
Ubuntu 中 desktop-amd64 和 live-server-amd64 的区别
一、Ubuntu的操作系统镜像 Ubuntu的操作系统镜像主要有两种:desktop-amd64和live-server-amd64 这两者的主要区别在于使用场景和安装方式 1. Desktop-amd64: * 这是Ubuntu的桌面版本,用于安装具有图形用户界面的Ubuntu系统。 * 它包含了用于日常使…...
第10集《天台教观纲宗》
请大家打开讲义第十七页。我们讲到己二、结申正义。 己二、结申正义 《法华经》把我们修行人修行的相貌,比喻作一个车乘。车乘就是一种交通工具,它能够让我们从此岸超越到彼岸去。所以修行它是可以超越的,你今天比昨天超越了,就好…...
每日学习笔记:C++ STL 的forward_list
定义 特点 操作函数 元素查找、移除或安插 forward_list::emplace_after arg...指的是元素构造函数的参数(0~N个) #include <iostream> #include <memory> #include <list> #include <forward_list> using namespace std;class…...
【Java,Redis】Redis 数据库存取字符串数据以及类数据
1、 字符串存取数据 Resource private StringRedisTemplate stringRedisTemplate;//从Redis中获取string字符串 stringRedisTemplate.opsForValue().get("cache:shop:"id); //Json -> class Shop shop JSONUtil.toBean(ShopJson,Shop.class); //字符串写入redis…...
OpenCV 图像重映射函数remap()实例详解
OpenCV 图像重映射函数remap()对图像应用通用几何变换。其原型如下: void remap(InputArray src, OutputArray dst, InputArray map1, InputArray map2, int interpolation, int borderMode BORDER_CONSTANT, const Scalar & borde…...
Python基础课堂最后一课23——正则对象
文章目录 前言一、正则对象是什么?二、正则表达式基本分类1.普通字符2.元字符 总结 前言 很开心能和你们一些学习进步,在这一个多月的时间中,是你们让我坚持了下来,完成了python基础课堂编写,不管如何,我们…...
【算法训练营】凸包,图(Python实现)
凸包 描述 给定n个二维平面上的点,求他们的凸包。 输入 第一行包含一个正整数n。 接下来n行,每行包含两个整数x,y,表示一个点的坐标。 输出 令所有在凸包极边上的点依次为p1,p2,...,pm(序号),其中m表…...
webpack5零基础入门-6webpack处理图片资源
1.在webpack5中file-loader和url-loader为内置模块 通过在加载器中配置rule即可激活 {test: /\.(png|jpe?g|gif|webp)$/,type: asset} 2.使用webpack进行打包 执行npx webpack 可以看到图片资源打包后都被放到了dist文件目录下 3.使用webpack进行图片格式转换为base64 优势…...
计算机基础知识QA
目录 数据库 --mysql 关联查询 唯一索引如何创建,语句 更新表字段语句 查看字段类型 --redis 使用场景 数据结构 设置超时时间 linux 常用命令 发布版本 安装一个东西,发现一个东西安装的很慢,如何切换ip地的源?-&g…...
微信小程序一次性订阅requestSubscribeMessage授权和操作详解
一次性订阅:用户订阅一次发一次通知 一、授权 — requestSubscribeMessage Taro.requestSubscribeMessage({tmplIds: [], // 需要订阅的消息模板的id的集合success (res) {console.log("同意授权", res)},fail(res) {console.log(拒绝授权, res)}})点击或…...
ARM 汇编指令:(三)运算处理指令
目录 一.add指令 二.sub指令 三.MUL指令 一.add指令 add用于执行实现两个寄存器或寄存机或寄存器与立即数的相加操作。它可以用于整数、浮点数等各种数据类型的加法运算。 ADD{cond}{S} Rd,操作数,操作数 1.不带进位加法指令add add r1, r2, #4 //r1 r2 4 add r1, r2 …...
【C++庖丁解牛】STL简介 | string容器初次见面
🍁你好,我是 RO-BERRY 📗 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 🎄感谢你的陪伴与支持 ,故事既有了开头,就要画上一个完美的句号,让我们一起加油 目录 1. 什么是STL2. STL的版本…...
Android Wi-Fi 连接失败日志分析
1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分: 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析: CTR…...
条件运算符
C中的三目运算符(也称条件运算符,英文:ternary operator)是一种简洁的条件选择语句,语法如下: 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true,则整个表达式的结果为“表达式1”…...
服务器硬防的应用场景都有哪些?
服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式,避免服务器受到各种恶意攻击和网络威胁,那么,服务器硬防通常都会应用在哪些场景当中呢? 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...
抖音增长新引擎:品融电商,一站式全案代运营领跑者
抖音增长新引擎:品融电商,一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中,品牌如何破浪前行?自建团队成本高、效果难控;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...
【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...
JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作
一、上下文切换 即使单核CPU也可以进行多线程执行代码,CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短,所以CPU会不断地切换线程执行,从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...
力扣热题100 k个一组反转链表题解
题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...
pikachu靶场通关笔记19 SQL注入02-字符型注入(GET)
目录 一、SQL注入 二、字符型SQL注入 三、字符型注入与数字型注入 四、源码分析 五、渗透实战 1、渗透准备 2、SQL注入探测 (1)输入单引号 (2)万能注入语句 3、获取回显列orderby 4、获取数据库名database 5、获取表名…...
消息队列系统设计与实践全解析
文章目录 🚀 消息队列系统设计与实践全解析🔍 一、消息队列选型1.1 业务场景匹配矩阵1.2 吞吐量/延迟/可靠性权衡💡 权衡决策框架 1.3 运维复杂度评估🔧 运维成本降低策略 🏗️ 二、典型架构设计2.1 分布式事务最终一致…...
