当前位置: 首页 > news >正文

Kubernetes kafka系列 | k8s部署kafka+zookeepe集群

一、kafka.zookeeper介绍

Kafka
简介: Apache Kafka 是一个开源的分布式流处理平台和消息队列系统。它最初由LinkedIn开发,并于2011年成为Apache软件基金会的顶级项目。

特点:

高吞吐量: Kafka 能够处理大规模的消息流,并具有很高的吞吐量。
持久性: 它将消息持久化到磁盘上,因此即使消费者不在线,也能保证消息不会丢失。
可伸缩性: Kafka 可以很容易地水平扩展以处理大量数据。
实时性: Kafka 可以提供几乎实时的消息传递,适用于大多数实时数据处理需求。
用途:

日志收集: Kafka 可以用作集中式的日志收集系统,收集来自不同源头的日志数据。
消息队列: Kafka 可以用作分布式应用程序之间的消息队列,用于解耦和异步通信。
流处理: Kafka 可以与流处理框架(如Apache Spark、Apache Flink等)结合使用,用于实时数据处理和分析。
ZooKeeper
简介: Apache ZooKeeper 是一个开源的分布式协调服务,最初也是由Yahoo开发的,并于2010年成为Apache软件基金会的顶级项目。

特点:

分布式协调: ZooKeeper 提供了分布式应用程序的协调服务,包括配置管理、命名服务、分布式锁等。
高可用性: ZooKeeper 通过在集群中保持多个节点的复制来实现高可用性和容错性。
一致性: ZooKeeper 提供了严格的一致性,确保所有的客户端在同一时间看到相同的数据视图。
用途:

配置管理: ZooKeeper 可以用于分布式系统的配置管理,例如动态配置更新。
命名服务: ZooKeeper 可以提供命名服务,帮助分布式系统中的节点发现和通信。
分布式锁: ZooKeeper 可以用于实现分布式锁,确保在分布式系统中对共享资源的互斥访问。
Kafka 和 ZooKeeper 的关系

在 Kafka 中,ZooKeeper 主要用于管理集群的元数据(如主题、分区、副本分配等)、领导者选举以及生产者和消费者的协调。Kafka 依赖于 ZooKeeper 来确保分布式系统的稳定运行。通常情况下,Kafka 和 ZooKeeper 会一起部署,但它们是两个独立的项目,各自提供不同的功能。

二、创建存储卷

nfs动态供给直通车

三、搭建Kafka集群

# 操作系统
# CentOS Linux release 7.9.2009 (Core)
lsb_release -a# 内核版本
# 3.10.0-1160.90.1.el7.x86_64
uname -a
# k8s 版本 1.21
# zookeeper 版本 3.4.10  kafka镜像版本0.11(嫌低可以自己换)

kafka需要依赖zookeeper
kafka的生产者与消费者需要在zookeeper中注册,不然消费者怎么知道生产者是否存活之类的哈哈。废话不多说,直接上干货!

本文用的是statefulset和动态存储部署zookeeper和kafka集群。

zookeeper.yaml

apiVersion: v1
kind: Service
metadata:name: zk-hslabels:app: zk
spec:ports:- port: 2888name: server- port: 3888name: leader-electionclusterIP: Noneselector:app: zk
---
apiVersion: v1
kind: Service
metadata:name: zk-cslabels:app: zk
spec:ports:- port: 2181targetPort: 2181name: clientselector:app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:name: zk-pdb
spec:selector:matchLabels:app: zkmaxUnavailable: 2
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: zk
spec:selector:matchLabels:app: zkserviceName: zk-hsreplicas: 3updateStrategy:type: RollingUpdatepodManagementPolicy: OrderedReadytemplate:metadata:labels:app: zkspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"containers:- name: kubernetes-zookeeperimagePullPolicy: IfNotPresentimage: "zhaoguanghui6/kubernetes-zookeeper:1.0-3.4.10"resources:requests:memory: "0.5Gi"cpu: "0.5"ports:- containerPort: 2181name: client- containerPort: 2888name: server- containerPort: 3888name: leader-electioncommand:- sh- -c- "start-zookeeper \--servers=3 \--data_dir=/var/lib/zookeeper/data \--data_log_dir=/var/lib/zookeeper/data/log \--conf_dir=/opt/zookeeper/conf \--client_port=2181 \--election_port=3888 \--server_port=2888 \--tick_time=2000 \--init_limit=10 \--sync_limit=5 \--heap=512M \--max_client_cnxns=60 \--snap_retain_count=3 \--purge_interval=12 \--max_session_timeout=40000 \--min_session_timeout=4000 \--log_level=INFO"readinessProbe:exec:command:- sh- -c- "zookeeper-ready 2181"initialDelaySeconds: 10timeoutSeconds: 5livenessProbe:exec:command:- sh- -c- "zookeeper-ready 2181"initialDelaySeconds: 10timeoutSeconds: 5volumeMounts:- name: datadirmountPath: /var/lib/zookeepersecurityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirspec:storageClassName: nfs-clientaccessModes: [ "ReadWriteOnce" ]resources:requests:storage: 1Gi

for i in 0 1 2; do kubectl exec zk-$i – hostname -f; done
zk-0.zk-headless.default.svc.cluster.local

zk-1.zk-headless.default.svc.cluster.local

zk-2.zk-headless.default.svc.cluster.local
kafka.yaml

---
apiVersion: v1
kind: Service
metadata:name: kafka-hslabels:app: kafka
spec:ports:- port: 9092name: serverclusterIP: Noneselector:app: kafka
--- 
apiVersion: v1
kind: Service
metadata:name: kafka-cslabels:app: kafka
spec:selector:app: kafkatype: NodePortports:- name: clientport: 9092nodePort: 30092
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:name: kafka-pdb
spec:selector:matchLabels:app: kafkaminAvailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafka
spec:serviceName: kafka-hsreplicas: 3selector:matchLabels:app: kafkatemplate:metadata:labels:app: kafkaspec:affinity:podAntiAffinity:requiredDuringSchedulingIgnoredDuringExecution:- labelSelector:matchExpressions:- key: "app"operator: Invalues:- kafkatopologyKey: "kubernetes.io/hostname"podAffinity:preferredDuringSchedulingIgnoredDuringExecution:- weight: 1podAffinityTerm:labelSelector:matchExpressions:- key: "app"operator: Invalues:- zktopologyKey: "kubernetes.io/hostname"terminationGracePeriodSeconds: 300containers:- name: kafkaimagePullPolicy: IfNotPresentimage: registry.cn-hangzhou.aliyuncs.com/jaxzhai/k8skafka:v1resources:requests:memory: "1Gi"cpu: 500mports:- containerPort: 9092name: servercommand:- sh- -c- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} --override listeners=PLAINTEXT://:9092 --override zookeeper.connect=zk-0.zk-hs.default.svc.cluster.local:2181,zk-0.zk-hs.default.svc.cluster.local:2181,zk-0.zk-hs.default.svc.cluster.local:2181 --override log.dir=/var/lib/kafka --override auto.create.topics.enable=true --override auto.leader.rebalance.enable=true --override background.threads=10 --override compression.type=producer --override delete.topic.enable=true --override leader.imbalance.check.interval.seconds=300 --override leader.imbalance.per.broker.percentage=10 --override log.flush.interval.messages=9223372036854775807 --override log.flush.offset.checkpoint.interval.ms=60000 --override log.flush.scheduler.interval.ms=9223372036854775807 --override log.retention.bytes=-1 --override log.retention.hours=168 --override log.roll.hours=168 --override log.roll.jitter.hours=0 --override log.segment.bytes=1073741824 --override log.segment.delete.delay.ms=60000 --override message.max.bytes=1000012 --override min.insync.replicas=1 --override num.io.threads=8 --override num.network.threads=3 --override num.recovery.threads.per.data.dir=1 --override num.replica.fetchers=1 --override offset.metadata.max.bytes=4096 --override offsets.commit.required.acks=-1 --override offsets.commit.timeout.ms=5000 --override offsets.load.buffer.size=5242880 --override offsets.retention.check.interval.ms=600000 --override offsets.retention.minutes=1440 --override offsets.topic.compression.codec=0 --override offsets.topic.num.partitions=50 --override offsets.topic.replication.factor=3 --override offsets.topic.segment.bytes=104857600 --override queued.max.requests=500 --override quota.consumer.default=9223372036854775807 --override quota.producer.default=9223372036854775807 --override replica.fetch.min.bytes=1 --override replica.fetch.wait.max.ms=500 --override replica.high.watermark.checkpoint.interval.ms=5000 --override replica.lag.time.max.ms=10000 --override replica.socket.receive.buffer.bytes=65536 --override replica.socket.timeout.ms=30000 --override request.timeout.ms=30000 --override socket.receive.buffer.bytes=102400 --override socket.request.max.bytes=104857600 --override socket.send.buffer.bytes=102400 --override unclean.leader.election.enable=true --override zookeeper.session.timeout.ms=6000 --override zookeeper.set.acl=false --override broker.id.generation.enable=true --override connections.max.idle.ms=600000 --override controlled.shutdown.enable=true --override controlled.shutdown.max.retries=3 --override controlled.shutdown.retry.backoff.ms=5000 --override controller.socket.timeout.ms=30000 --override default.replication.factor=1 --override fetch.purgatory.purge.interval.requests=1000 --override group.max.session.timeout.ms=300000 --override group.min.session.timeout.ms=6000 --override inter.broker.protocol.version=0.10.2-IV0 --override log.cleaner.backoff.ms=15000 --override log.cleaner.dedupe.buffer.size=134217728 --override log.cleaner.delete.retention.ms=86400000 --override log.cleaner.enable=true --override log.cleaner.io.buffer.load.factor=0.9 --override log.cleaner.io.buffer.size=524288 --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 --override log.cleaner.min.cleanable.ratio=0.5 --override log.cleaner.min.compaction.lag.ms=0 --override log.cleaner.threads=1 --override log.cleanup.policy=delete --override log.index.interval.bytes=4096 --override log.index.size.max.bytes=10485760 --override log.message.timestamp.difference.max.ms=9223372036854775807 --override log.message.timestamp.type=CreateTime --override log.preallocate=false --override log.retention.check.interval.ms=300000 --override max.connections.per.ip=2147483647 --override num.partitions=1 --override producer.purgatory.purge.interval.requests=1000 --override replica.fetch.backoff.ms=1000 --override replica.fetch.max.bytes=1048576 --override replica.fetch.response.max.bytes=10485760 --override reserved.broker.max.id=1000 "env:- name: KAFKA_HEAP_OPTSvalue : "-Xmx1G -Xms1G"- name: KAFKA_OPTSvalue: "-Dlogging.level=INFO"volumeMounts:- name: datadirmountPath: /var/lib/kafkareadinessProbe:exec:command:- sh- -c- "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9092"timeoutSeconds: 5periodSeconds: 5initialDelaySeconds: 70securityContext:runAsUser: 1000fsGroup: 1000volumeClaimTemplates:- metadata:name: datadirannotations:volume.beta.kubernetes.io/storage-class: "nfs-client"spec:accessModes: [ "ReadWriteMany" ]resources:requests:storage: 5Gi

四、验证集群
验证kafka是否可用:

1、进入kafka-0命令: kubectl exec -it kafka-0 bash
进入容器目录:cd /opt/kafka/config

2、创建一个名为aaa的topc命令:kafka-topics.sh --create --topic aaa --zookeeper zk-0.zk-headless.default.svc.cluster.local:2181,zk-1.zk-headless.default.svc.cluster.local:2181,zk-2.zk-headless.default.svc.cluster.local:2181 --partitions 3 --replication-factor 2
结果为:
Created topic “aaa”.

3、进入topic为aaa的生产者消息中心:kafka-console-consumer.sh --topic aaa --bootstrap-server localhost:9092

4、复制新的会话,进入另一个容器kafka-1:kubectl exec -it kafka-1 bash

进入消费者,输入命令:kafka-console-producer.sh --topic aaa --broker-list localhost:9092

输入:

hello

i lovle you

回车后,可在生产者消息中心看到消息

最新文章链接,含镜像制作,v3.5.2

相关文章:

Kubernetes kafka系列 | k8s部署kafka+zookeepe集群

一、kafka.zookeeper介绍 Kafka 简介: Apache Kafka 是一个开源的分布式流处理平台和消息队列系统。它最初由LinkedIn开发,并于2011年成为Apache软件基金会的顶级项目。 特点: 高吞吐量: Kafka 能够处理大规模的消息流&#xf…...

ip广播智慧工地广播喊话号角 IP网络号角在塔吊中应用 通过寻呼话筒预案广播

ip广播智慧工地广播喊话号角 IP网络号角在塔吊中应用 通过寻呼话筒预案广播 SV-704XT是深圳锐科达电子有限公司的一款壁挂式网络有源号角,具有10/100M以太网接口,可将网络音源通过自带的功放和号角喇叭输出播放,可达到功率50W。SV-704XT内置有…...

B端系统优化,可不是换个颜色和图标,看看与大厂系统的差距。

、不要被流于表面的需求描述迷惑。 很多人找我们优化系统界面,对需求总是轻描淡写,比如:换个颜色、换个图标、换个皮肤,甚至还有的说,随便改下就行。 这些需求都是听起来简单,实现起来难,你如…...

【LeetCode热题100】240. 搜索二维矩阵 II

一.题目要求 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性: 每行的元素从左到右升序排列。 ‘每列的元素从上到下升序排列。 二.题目难度 中等 三.输入样例 示例 1: 输入:matrix [[1,4,7…...

three.js 鼠标左右拖动改变玩家视角

这里主要用到了 一个方法 obj.getWorldDirection(); obj.getWorldDirection()表示的获取obj对象自身z轴正方向在世界坐标空间中的方向。 按下 W键前进运动&#xff1b; <template><div><el-container><el-main><div class"box-card-left…...

Pycharm jupyter server process exited with code 1

Pycharm jupyter server process exited with code 1 1. 问题描述2. 原因和解决方法 1. 问题描述 使用 Pycharm 启动 Jupyter 时&#xff0c;报错如下&#xff0c; jupyter server process exited with code 12. 原因和解决方法 Pycharm 启动 jupyter 时&#xff0c;默认的 …...

ubuntu 20.04 Python pip 配置 pip.conf

1. 状况描述 $ pip install timm WARNING: Retrying (Retry(total4, connectNone, readNone, redirectNone, statusNone)) after connection broken by ProxyError(Cannot connect to proxy., RemoteDisconnected(Remote end closed connection without response)): /simple/t…...

GPT-4.5 Turbo意外曝光,最快明天发布?OpenAI终于要放大招了!

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;所以创建了“AI信息Gap”这个公众号&#xff0c;专注于分享AI全维度知识…...

Ubuntu 中 desktop-amd64 和 live-server-amd64 的区别

一、Ubuntu的操作系统镜像 Ubuntu的操作系统镜像主要有两种&#xff1a;desktop-amd64和live-server-amd64 这两者的主要区别在于使用场景和安装方式 1. Desktop-amd64: * 这是Ubuntu的桌面版本&#xff0c;用于安装具有图形用户界面的Ubuntu系统。 * 它包含了用于日常使…...

第10集《天台教观纲宗》

请大家打开讲义第十七页。我们讲到己二、结申正义。 己二、结申正义 《法华经》把我们修行人修行的相貌&#xff0c;比喻作一个车乘。车乘就是一种交通工具&#xff0c;它能够让我们从此岸超越到彼岸去。所以修行它是可以超越的&#xff0c;你今天比昨天超越了&#xff0c;就好…...

每日学习笔记:C++ STL 的forward_list

定义 特点 操作函数 元素查找、移除或安插 forward_list::emplace_after arg...指的是元素构造函数的参数&#xff08;0~N个&#xff09; #include <iostream> #include <memory> #include <list> #include <forward_list> using namespace std;class…...

【Java,Redis】Redis 数据库存取字符串数据以及类数据

1、 字符串存取数据 Resource private StringRedisTemplate stringRedisTemplate;//从Redis中获取string字符串 stringRedisTemplate.opsForValue().get("cache:shop:"id); //Json -> class Shop shop JSONUtil.toBean(ShopJson,Shop.class); //字符串写入redis…...

OpenCV 图像重映射函数remap()实例详解

OpenCV 图像重映射函数remap()对图像应用通用几何变换。其原型如下&#xff1a; void remap(InputArray src, OutputArray dst, InputArray map1, InputArray map2, int interpolation&#xff0c; int borderMode BORDER_CONSTANT&#xff0c; const Scalar & borde…...

Python基础课堂最后一课23——正则对象

文章目录 前言一、正则对象是什么&#xff1f;二、正则表达式基本分类1.普通字符2.元字符 总结 前言 很开心能和你们一些学习进步&#xff0c;在这一个多月的时间中&#xff0c;是你们让我坚持了下来&#xff0c;完成了python基础课堂编写&#xff0c;不管如何&#xff0c;我们…...

【算法训练营】凸包,图(Python实现)

凸包 描述 给定n个二维平面上的点&#xff0c;求他们的凸包。 输入 第一行包含一个正整数n。 接下来n行&#xff0c;每行包含两个整数x,y&#xff0c;表示一个点的坐标。 输出 令所有在凸包极边上的点依次为p1,p2,...,pm&#xff08;序号&#xff09;&#xff0c;其中m表…...

webpack5零基础入门-6webpack处理图片资源

1.在webpack5中file-loader和url-loader为内置模块 通过在加载器中配置rule即可激活 {test: /\.(png|jpe?g|gif|webp)$/,type: asset} 2.使用webpack进行打包 执行npx webpack 可以看到图片资源打包后都被放到了dist文件目录下 3.使用webpack进行图片格式转换为base64 优势…...

计算机基础知识QA

目录 数据库 --mysql 关联查询 唯一索引如何创建&#xff0c;语句 更新表字段语句 查看字段类型 --redis 使用场景 数据结构 设置超时时间 linux 常用命令 发布版本 安装一个东西&#xff0c;发现一个东西安装的很慢&#xff0c;如何切换ip地的源&#xff1f;-&g…...

微信小程序一次性订阅requestSubscribeMessage授权和操作详解

一次性订阅&#xff1a;用户订阅一次发一次通知 一、授权 — requestSubscribeMessage Taro.requestSubscribeMessage({tmplIds: [], // 需要订阅的消息模板的id的集合success (res) {console.log("同意授权", res)},fail(res) {console.log(拒绝授权, res)}})点击或…...

ARM 汇编指令:(三)运算处理指令

目录 一.add指令 二.sub指令 三.MUL指令 一.add指令 add用于执行实现两个寄存器或寄存机或寄存器与立即数的相加操作。它可以用于整数、浮点数等各种数据类型的加法运算。 ADD{cond}{S} Rd,操作数,操作数 1.不带进位加法指令add add r1, r2, #4 //r1 r2 4 add r1, r2 …...

【C++庖丁解牛】STL简介 | string容器初次见面

&#x1f341;你好&#xff0c;我是 RO-BERRY &#x1f4d7; 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f384;感谢你的陪伴与支持 &#xff0c;故事既有了开头&#xff0c;就要画上一个完美的句号&#xff0c;让我们一起加油 目录 1. 什么是STL2. STL的版本…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心

当仓库学会“思考”&#xff0c;物流的终极形态正在诞生 想象这样的场景&#xff1a; 凌晨3点&#xff0c;某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径&#xff1b;AI视觉系统在0.1秒内扫描包裹信息&#xff1b;数字孪生平台正模拟次日峰值流量压力…...

Go语言多线程问题

打印零与奇偶数&#xff08;leetcode 1116&#xff09; 方法1&#xff1a;使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...

c# 局部函数 定义、功能与示例

C# 局部函数&#xff1a;定义、功能与示例 1. 定义与功能 局部函数&#xff08;Local Function&#xff09;是嵌套在另一个方法内部的私有方法&#xff0c;仅在包含它的方法内可见。 • 作用&#xff1a;封装仅用于当前方法的逻辑&#xff0c;避免污染类作用域&#xff0c;提升…...

土建施工员考试:建筑施工技术重点知识有哪些?

《管理实务》是土建施工员考试中侧重实操应用与管理能力的科目&#xff0c;核心考查施工组织、质量安全、进度成本等现场管理要点。以下是结合考试大纲与高频考点整理的重点内容&#xff0c;附学习方向和应试技巧&#xff1a; 一、施工组织与进度管理 核心目标&#xff1a; 规…...

【实施指南】Android客户端HTTPS双向认证实施指南

&#x1f510; 一、所需准备材料 证书文件&#xff08;6类核心文件&#xff09; 类型 格式 作用 Android端要求 CA根证书 .crt/.pem 验证服务器/客户端证书合法性 需预置到Android信任库 服务器证书 .crt 服务器身份证明 客户端需持有以验证服务器 客户端证书 .crt 客户端身份…...

数据库——redis

一、Redis 介绍 1. 概述 Redis&#xff08;Remote Dictionary Server&#xff09;是一个开源的、高性能的内存键值数据库系统&#xff0c;具有以下核心特点&#xff1a; 内存存储架构&#xff1a;数据主要存储在内存中&#xff0c;提供微秒级的读写响应 多数据结构支持&…...

Springboot 高校报修与互助平台小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;高校报修与互助平台小程序被用户普遍使用&#xff0c;为…...

MLP实战二:MLP 实现图像数字多分类

任务 实战&#xff08;二&#xff09;&#xff1a;MLP 实现图像多分类 基于 mnist 数据集&#xff0c;建立 mlp 模型&#xff0c;实现 0-9 数字的十分类 task: 1、实现 mnist 数据载入&#xff0c;可视化图形数字&#xff1b; 2、完成数据预处理&#xff1a;图像数据维度转换与…...

【笔记】结合 Conda任意创建和配置不同 Python 版本的双轨隔离的 Poetry 虚拟环境

如何结合 Conda 任意创建和配置不同 Python 版本的双轨隔离的Poetry 虚拟环境&#xff1f; 在 Python 开发中&#xff0c;为不同项目配置独立且适配的虚拟环境至关重要。结合 Conda 和 Poetry 工具&#xff0c;能高效创建不同 Python 版本的 Poetry 虚拟环境&#xff0c;接下来…...