基于Docker的Kafka分布式集群
目录
1. 说明
2. 服务器规划
3. docker-compose文件
kafka{i}.yaml
kafka-ui.yaml
4. kafka-ui配置集群监控
5. 参数表
6. 测试脚本
生产者-异步生产: AsyncKafkaProducer1.py
消费者-异步消费: AsyncKafkaConsumer1.py
7. 参考
1. 说明
- 创建一个本地开发环境所需的kafka集群
- 分布在3个虚拟机上,以docker容器方式互联互通
2. 服务器规划
| Host | 端口 | 备注 |
|---|---|---|
| host001.dev.sb | 9092, 9093, 9081 | kafka ui 访问 kafka0 节点 |
| host002.dev.sb | 9092, 9093 | kafka1 节点 |
| host003.dev.sb | 9092, 9093 | kafka2 节点 |
3. docker-compose文件
kafka{i}.yaml
- 其中 {i} 对应0,1,2
- 用户密码都配在文件里面
services:kafka:image: 'bitnami/kafka:3.6.2'container_name: kafka{i}hostname: kafka{i}restart: alwaysports:- 9092:9092- 9093:9093environment:# KRaft- KAFKA_CFG_NODE_ID={i}- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093,1@kafka1:9093,2@kafka2:9093- KAFKA_KRAFT_CLUSTER_ID=sbcluster01-mnopqrstuv# Listeners- KAFKA_CFG_LISTENERS=INTERNAL://:9094,CLIENT://:9095,CONTROLLER://:9093,EXTERNAL://:9092- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka0:9094,CLIENT://:9095,EXTERNAL://kafka0:9092- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_NUM_PARTITIONS=3- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL# Clustering- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2# Log- KAFKA_CFG_LOG_RETENTION_HOURS = 72# SASL- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN- KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN- KAFKA_CONTROLLER_USER=kfkuser- KAFKA_CONTROLLER_PASSWORD=youknow- KAFKA_INTER_BROKER_USER=kfkuser- KAFKA_INTER_BROKER_PASSWORD=youknow- KAFKA_CLIENT_USERS=kfkuser- KAFKA_CLIENT_PASSWORDS=youknow# Others- TZ=Asia/Shanghaivolumes:- '/data0/Server/Db/kafka0:/bitnami/kafka'extra_hosts: - "kafka0:172.16.20.60"- "kafka1:172.16.20.61"- "kafka2:172.16.20.62"
kafka-ui.yaml
services:kafka-ui:image: 'provectuslabs/kafka-ui:master'container_name: kafka-uirestart: alwaysports:- 9081:8080environment:- KAFKA_CLUSTERS_0_NAME=local- DYNAMIC_CONFIG_ENABLED=true- AUTH_TYPE=LOGIN_FORM- SPRING_SECURITY_USER_NAME=admin- SPRING_SECURITY_USER_PASSWORD=youknowextra_hosts: - "kafka0:172.16.20.60"- "kafka1:172.16.20.61"- "kafka2:172.16.20.62"
4. kafka-ui配置集群监控


5. 参数表
| 参数 | 说明 |
|---|---|
| KAFKA_CFG_PROCESS_ROLES | kafka角色,做broker, controller 示例: |
| KAFKA_KRAFT_CLUSTER_ID | 集群id, 同属节点需一样 |
| KAFKA_CFG_CONTROLLER_QUORUM_VOTERS | 投票选举列表 |
| KAFKA_CFG_CONTROLLER_LISTENER_NAMES | 控制器名称 |
| KAFKA_CFG_NUM_PARTITIONS | 默认分区数 |
| KAFKA_CFG_LISTENERS | 监听器的地址和端口 |
| KAFKA_CFG_ADVERTISED_LISTENERS | 发布监听器的地址和端口 |
| KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP | 监听器的协议 这里sasl_plain表示 仅认证加密 传输不加密 |
| KAFKA_CLIENT_USERS | 加密客户端账号 |
| KAFKA_CLIENT_PASSWORDS | 加密客户端密码 |
| #Clustering | |
| KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR | Kafka 内部使用的 __consumer_offsets 主题的复制因子。这个主题是用来存储消费者偏移 量 |
| KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR | Kafka 内部使用的 __transaction_state 主题的复制因子。这个主题是用来存储事务日志 |
| KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR | Kafka 内部使用的 __transaction_state 主题的最小 ISR(In-Sync Replicas)数量。ISR 是与 leader 保持同步的副本集合 |
| #Log | |
| KAFKA_CFG_LOG_DIRS | 日志目录 |
| KAFKA_CFG_LOG_RETENTION_HOURS | 数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理,默认168小时,一周时间 |
6. 测试脚本
生产者-异步生产: AsyncKafkaProducer1.py
from confluent_kafka import Producer
import jsondef delivery_report(err, msg):"""Called once for each message produced to indicate delivery result.Triggered by poll() or flush()."""if err is not None:print(f"Message delivery failed: {err}")else:print(f"Message delivered to {msg.topic()} [{msg.partition()}]")def create_async_producer(config):"""Creates an instance of an asynchronous Kafka producer."""return Producer(config)def produce_messages(producer, topic, messages):"""Asynchronously produces messages to a Kafka topic."""for message in messages:# Trigger any available delivery report callbacks from previous produce() callsproducer.poll(0)# Asynchronously produce a message, the delivery report callback# will be triggered from poll() above, or flush() below, when the message has# been successfully delivered or failed permanently.producer.produce(topic, json.dumps(message).encode("utf-8"), callback=delivery_report)# Wait for any outstanding messages to be delivered and delivery report# callbacks to be triggered.producer.flush()if __name__ == "__main__":# Kafka configuration# Replace these with your server's configurationconf = {"bootstrap.servers": "host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092","client.id": "PythonProducer","security.protocol": "SASL_PLAINTEXT","sasl.mechanisms": "PLAIN","sasl.username": "kfkuser","sasl.password": "youknow",}# Create an asynchronous Kafka producerasync_producer = create_async_producer(conf)# Messages to send to Kafkamessages_to_send = [{"key": "value1a"}, {"key": "value2a"}, {"key": "value3a"}]# Produce messagesproduce_messages(async_producer, "zx001.msg.user", messages_to_send)
消费者-异步消费: AsyncKafkaConsumer1.py
from confluent_kafka import Consumer, KafkaError, KafkaException
import asyncio
import json
import logging
from datetime import datetime# 设置日志格式,'%()'表示日志参数
log_format = "%(message)s"
logging.basicConfig(filename="logs/kafka_messages1.log", format=log_format, level=logging.INFO
)async def consume_loop(consumer, topics):try:# 订阅主题consumer.subscribe(topics)while True:# 轮询消息msg = consumer.poll(timeout=1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:# End of partition eventprint("%% %s [%d] reached end at offset %d\n"% (msg.topic(), msg.partition(), msg.offset()))elif msg.error():raise KafkaException(msg.error())else:# 正常消息raw_message = msg.value()# print(f"Raw message: {raw_message}")str_msg = raw_message.decode("utf-8")parsed_message = json.loads(str_msg)parsed_message["time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"Received message: {type(parsed_message)} : {parsed_message}")json_data = json.dumps(parsed_message, ensure_ascii=False)logging.info("{}".format(json_data))await asyncio.sleep(0.01) # 小睡片刻,让出控制权finally:# 关闭消费者consumer.close()async def consume():# 消费者配置conf = {"bootstrap.servers": "host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092","group.id": "MsgGroup2","auto.offset.reset": "earliest","client.id" : "PythonConsumer","security.protocol" : "SASL_PLAINTEXT","sasl.mechanisms" : "PLAIN","sasl.username" : "kfkuser","sasl.password" : "youknow"}# 创建消费者consumer = Consumer(conf)await consume_loop(consumer, ["zx001.msg.user"])if __name__ == "__main__":asyncio.run(consume())
7. 参考
- Apache Kafka® Quick Start - Local Install With Docker
- kafka-ui-docs/configuration/configuration-wizard.md at main · provectus/kafka-ui-docs · GitHub
- https://juejin.cn/post/7187301063832109112
相关文章:
基于Docker的Kafka分布式集群
目录 1. 说明 2. 服务器规划 3. docker-compose文件 kafka{i}.yaml kafka-ui.yaml 4. kafka-ui配置集群监控 5. 参数表 6. 测试脚本 生产者-异步生产: AsyncKafkaProducer1.py 消费者-异步消费: AsyncKafkaConsumer1.py 7. 参考 1. 说明 创建一个本地开发环境所需的k…...
【博客之星】年度总结:在云影与墨香中探寻成长的足迹
🐇明明跟你说过:个人主页 🔖行路有良友,便是天堂🔖 目录 一、年度回顾 1、创作历程 2、个人成长 3、个人生活与博客事业 二、技术总结 1、赛道选择 2、技术工具 3、实战项目 三、前景与展望 1、云原生未来…...
SpringBoot的Swagger配置
一、Swagger配置 1.添加依赖 <dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-spring-boot-starter</artifactId><version>3.0.2</version> </dependency> 2.修改WebMvcConfig Slf4j Configurat…...
machine learning knn算法之使用KNN对鸢尾花数据集进行分类
通过导入必要的scikit-learn导入必要的库,加载给定的数据,划分测试集和训练集之后训练预测和评估即可 具体代码如下: import numpy as np from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split f…...
C语言练习(16)
猴子吃桃问题。猴子第一天摘下若干个桃子,当即吃了一半,还不过瘾,又多吃了一个。第二天早上又将剩下的桃子吃掉一半,又多吃了一个。以后每天早上都吃了前一天剩下的一半加一个。到第10天早上想再吃时,见只剩一个桃子了…...
SOAFEE 技术研讨会:汽车软件定义与自动驾驶技术探讨
在本次技术研讨会上,来自汽车与科技领域的专家们围绕汽车软件定义及自动驾驶技术展开了深入交流与探讨。从 SOAFEE 蓝图计划的创新性理念,到 Autoware 开源项目及 Open AD Kit 在实际应用中的探索,再到 Edge Workload Abstraction and Orches…...
R语言学习笔记之开发环境配置
一、概要 整个安装过程及遇到的问题记录 操作步骤备注(包含遇到的问题)1下载安装R语言2下载安装RStudio3离线安装pacman提示需要安装Rtools4安装Rtoolspacman、tidyfst均离线安装完成5加载tidyfst报错 提示需要安装依赖,试错逐步下载并安装…...
多版本并发控制:MVCC的作用和基本原理
多版本并发控制:MVCC的作用和基本原理 1、MVCC简介1.1 快照读与当前读的区别1.1.1 快照读1.1.2 当前读 1.2 数据库的读写问题1.3 MVCC的作用 2、MVCC实现原理之ReadView2.1 什么是ReadView2.2 ReadView的设计思路2.3 MVCC整体操作流程 1、MVCC简介 1.1 快照读与当前…...
ubuntu18.04安装nvm管理本机node和npm
ubuntu18.04安装nvm管理本机node和npm nvm的使用方法1. 安装nvm2. 加载nvm3. 安装执行版本4. 设置默认版本(可选)5. 检查:6. 将配置加入到shell配置文件中(默认已经加入) 如果系统全局的 Node.js 存在,但被 nvm 覆盖了,可以通过禁用或卸载 nvm 恢复到系统…...
【数据结构进阶】红黑树超详解 + 实现(附源码)
🌟🌟作者主页:ephemerals__ 🌟🌟所属专栏:数据结构 目录 前言 一、红黑树介绍 二、红黑树原理详解 三、红黑树的实现 1. 节点定义 2. 红黑树类型定义及接口声明 3. 红黑树的插入(重点&a…...
leetcode_3092. 最高频率的 ID
https://leetcode.cn/problems/most-frequent-ids/description/ 看到这个数据范围 最极端情况 如果nums全为一个数 并且数量取到最大 那么范围是10的10次方 需要longlong储存 这题主要运用了哈希表配合multiset实现 哈希表主要用作存储某个数的出现次数 mst则用于记录出现次…...
鸿蒙仓颉环境配置(仓颉SDK下载,仓颉VsCode开发环境配置,仓颉DevEco开发环境配置)
目录 1)仓颉的SDK下载 1--进入仓颉的官网 2--点击图片中的下载按钮 3--在新跳转的页面点击即刻下载 4--下载 5--找到你们自己下载好的地方 6--解压软件 2)仓颉编程环境配置 1--找到自己的根目录 2--进入命令行窗口 3--输入 envsetup.bat 4--验证是否安…...
数据统计–图形报表(day11)
Apache ECharts 介绍 Apache ECharts 介绍 Apache ECharts 是一款基于 Javascript 的数据可视化图表库,提供直观,生动,可交互,可个性化定制的数据可视化图表。 官网地址:Apache ECharts 入门案例 Apache Echarts官方…...
源码分析之Openlayers样式篇CircleStyle类
访问Openlayers网站(https://jinuss.github.io/Openlayers_map_pages/,网站是基于Vue3 Openlayers,里面有大量的实践和案例。觉得还不错,可以 给个小星星Star,鼓励一波 https://github.com/Jinuss/OpenlayersMap哦~ 概述 在 Ope…...
解决CentOS9系统下Zabbix 7.2图形中文字符乱码问题
操作系统:CentOS 9 Zabbix版本:Zabbix7.2 问题描述:主机图形中文字符乱码 解决方案: # 安装字体配置和中文语言包 sudo yum install -y fontconfig langpacks-zh_CN.noarch # 检查是否已有中文字体: fc-list :lan…...
AF3 FourierEmbedding类源码解读
FourierEmbedding 是一个用于扩散条件的傅里叶嵌入类,其核心是将输入的时间步噪声强度或控制参数(timestep)转换为高维的周期性特征。 源代码: class FourierEmbedding(nn.Module):"""Fourier embedding for diffusion conditioning."""de…...
vsftpd虚拟用户部署
vsftpd虚拟用户部署 案例提供两个用户如下,使用centos7验证可行。 test *AO9ih&7 ftp DTx4zp_shell脚本运行一键安装vsftp #!/bin/bash yum -y install vsftpd ftp >/etc/vsftpd/vsftpd.conf cat <<EOL >> /etc/vsftpd/vsftpd.conf anonymous_enableNO l…...
MySQL 容器已经停止(但仍然存在),但希望重新启动它,并使它的 3306 端口映射到宿主机的 3306 端口是不可行的
重新启动容器并映射端口是不行的 由于你已经有一个名为 mysql-container 的 MySQL 容器,你可以使用 docker start 启动它。想要让3306 端口映射到宿主机是不行的,实际上,端口映射是在容器启动时指定的。你无法在容器已经创建的情况下直接修改…...
汇编实验·顺序程序设计
一、实验目的: 1.能够熟练的进行顺序程序的编写,掌握基本的汇编语言指令的用法 2.通过程序设计理解掌握不同类型的数据混合运算的基本规则 3.熟练掌握各种寻址方式,深入理解逻辑地址和物理地址的相关概念 二、实验内容 有三个长度分别为1、2、4个字节的数据,编写程序求…...
AIGC视频扩散模型新星:Video 版本的SD模型
大家好,这里是好评笔记,公主号:Goodnote,专栏文章私信限时Free。本文详细介绍慕尼黑大学携手 NVIDIA 等共同推出视频生成模型 Video LDMs。NVIDIA 在 AI 领域的卓越成就家喻户晓,而慕尼黑大学同样不容小觑,…...
智慧医疗能源事业线深度画像分析(上)
引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...
Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...
关键领域软件测试的突围之路:如何破解安全与效率的平衡难题
在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件,这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下,实现高效测试与快速迭代?这一命题正考验着…...
在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案
这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...
人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...
基于SpringBoot在线拍卖系统的设计和实现
摘 要 随着社会的发展,社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统,主要的模块包括管理员;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...
