当前位置: 首页 > news >正文

基于Docker的Kafka分布式集群

目录

1. 说明

2. 服务器规划

3. docker-compose文件

kafka{i}.yaml

 kafka-ui.yaml

4. kafka-ui配置集群监控

5. 参数表

6. 测试脚本

生产者-异步生产: AsyncKafkaProducer1.py

消费者-异步消费: AsyncKafkaConsumer1.py

7. 参考


1. 说明

  • 创建一个本地开发环境所需的kafka集群
  • 分布在3个虚拟机上,以docker容器方式互联互通

2. 服务器规划

Host端口备注

host001.dev.sb

9092, 9093, 9081

kafka ui 访问

kafka0 节点

host002.dev.sb9092, 9093kafka1 节点
host003.dev.sb9092, 9093kafka2 节点

3. docker-compose文件

kafka{i}.yaml

- 其中 {i} 对应0,1,2

- 用户密码都配在文件里面

services:kafka:image: 'bitnami/kafka:3.6.2'container_name: kafka{i}hostname: kafka{i}restart: alwaysports:- 9092:9092- 9093:9093environment:# KRaft- KAFKA_CFG_NODE_ID={i}- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093,1@kafka1:9093,2@kafka2:9093- KAFKA_KRAFT_CLUSTER_ID=sbcluster01-mnopqrstuv# Listeners- KAFKA_CFG_LISTENERS=INTERNAL://:9094,CLIENT://:9095,CONTROLLER://:9093,EXTERNAL://:9092- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka0:9094,CLIENT://:9095,EXTERNAL://kafka0:9092- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_NUM_PARTITIONS=3- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL# Clustering- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2# Log- KAFKA_CFG_LOG_RETENTION_HOURS = 72# SASL- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN- KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN- KAFKA_CONTROLLER_USER=kfkuser- KAFKA_CONTROLLER_PASSWORD=youknow- KAFKA_INTER_BROKER_USER=kfkuser- KAFKA_INTER_BROKER_PASSWORD=youknow- KAFKA_CLIENT_USERS=kfkuser- KAFKA_CLIENT_PASSWORDS=youknow# Others- TZ=Asia/Shanghaivolumes:- '/data0/Server/Db/kafka0:/bitnami/kafka'extra_hosts: - "kafka0:172.16.20.60"- "kafka1:172.16.20.61"- "kafka2:172.16.20.62"
 kafka-ui.yaml
services:kafka-ui:image: 'provectuslabs/kafka-ui:master'container_name: kafka-uirestart: alwaysports:- 9081:8080environment:- KAFKA_CLUSTERS_0_NAME=local- DYNAMIC_CONFIG_ENABLED=true- AUTH_TYPE=LOGIN_FORM- SPRING_SECURITY_USER_NAME=admin- SPRING_SECURITY_USER_PASSWORD=youknowextra_hosts: - "kafka0:172.16.20.60"- "kafka1:172.16.20.61"- "kafka2:172.16.20.62"

4. kafka-ui配置集群监控

5. 参数表

参数说明
KAFKA_CFG_PROCESS_ROLES

kafka角色,做broker, controller

示例:
KAFKA_CFG_PROCESS_ROLES=controller,broker

KAFKA_KRAFT_CLUSTER_ID集群id, 同属节点需一样
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS投票选举列表
KAFKA_CFG_CONTROLLER_LISTENER_NAMES控制器名称
KAFKA_CFG_NUM_PARTITIONS默认分区数
KAFKA_CFG_LISTENERS监听器的地址和端口
KAFKA_CFG_ADVERTISED_LISTENERS发布监听器的地址和端口
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP监听器的协议 这里sasl_plain表示   仅认证加密 传输不加密
KAFKA_CLIENT_USERS加密客户端账号
KAFKA_CLIENT_PASSWORDS加密客户端密码
#Clustering
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTORKafka 内部使用的 __consumer_offsets 主题的复制因子。这个主题是用来存储消费者偏移
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTORKafka 内部使用的 __transaction_state 主题的复制因子。这个主题是用来存储事务日志
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISRKafka 内部使用的 __transaction_state 主题的最小 ISR(In-Sync Replicas)数量。ISR 是与
leader 保持同步的副本集合
#Log
KAFKA_CFG_LOG_DIRS日志目录
KAFKA_CFG_LOG_RETENTION_HOURS数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理,默认168小时,一周时间

6. 测试脚本

生产者-异步生产: AsyncKafkaProducer1.py
from confluent_kafka import Producer
import jsondef delivery_report(err, msg):"""Called once for each message produced to indicate delivery result.Triggered by poll() or flush()."""if err is not None:print(f"Message delivery failed: {err}")else:print(f"Message delivered to {msg.topic()} [{msg.partition()}]")def create_async_producer(config):"""Creates an instance of an asynchronous Kafka producer."""return Producer(config)def produce_messages(producer, topic, messages):"""Asynchronously produces messages to a Kafka topic."""for message in messages:# Trigger any available delivery report callbacks from previous produce() callsproducer.poll(0)# Asynchronously produce a message, the delivery report callback# will be triggered from poll() above, or flush() below, when the message has# been successfully delivered or failed permanently.producer.produce(topic, json.dumps(message).encode("utf-8"), callback=delivery_report)# Wait for any outstanding messages to be delivered and delivery report# callbacks to be triggered.producer.flush()if __name__ == "__main__":# Kafka configuration# Replace these with your server's configurationconf = {"bootstrap.servers": "host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092","client.id": "PythonProducer","security.protocol": "SASL_PLAINTEXT","sasl.mechanisms": "PLAIN","sasl.username": "kfkuser","sasl.password": "youknow",}# Create an asynchronous Kafka producerasync_producer = create_async_producer(conf)# Messages to send to Kafkamessages_to_send = [{"key": "value1a"}, {"key": "value2a"}, {"key": "value3a"}]# Produce messagesproduce_messages(async_producer, "zx001.msg.user", messages_to_send)
消费者-异步消费: AsyncKafkaConsumer1.py
from confluent_kafka import Consumer, KafkaError, KafkaException
import asyncio
import json
import logging
from datetime import datetime# 设置日志格式,'%()'表示日志参数
log_format = "%(message)s"
logging.basicConfig(filename="logs/kafka_messages1.log", format=log_format, level=logging.INFO
)async def consume_loop(consumer, topics):try:# 订阅主题consumer.subscribe(topics)while True:# 轮询消息msg = consumer.poll(timeout=1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:# End of partition eventprint("%% %s [%d] reached end at offset %d\n"% (msg.topic(), msg.partition(), msg.offset()))elif msg.error():raise KafkaException(msg.error())else:# 正常消息raw_message = msg.value()# print(f"Raw message: {raw_message}")str_msg = raw_message.decode("utf-8")parsed_message = json.loads(str_msg)parsed_message["time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"Received message: {type(parsed_message)} : {parsed_message}")json_data = json.dumps(parsed_message, ensure_ascii=False)logging.info("{}".format(json_data))await asyncio.sleep(0.01)  # 小睡片刻,让出控制权finally:# 关闭消费者consumer.close()async def consume():# 消费者配置conf = {"bootstrap.servers": "host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092","group.id": "MsgGroup2","auto.offset.reset": "earliest","client.id" :  "PythonConsumer","security.protocol" :  "SASL_PLAINTEXT","sasl.mechanisms" :  "PLAIN","sasl.username" :  "kfkuser","sasl.password" :  "youknow"}# 创建消费者consumer = Consumer(conf)await consume_loop(consumer, ["zx001.msg.user"])if __name__ == "__main__":asyncio.run(consume())

7. 参考

- Apache Kafka® Quick Start - Local Install With Docker

- kafka-ui-docs/configuration/configuration-wizard.md at main · provectus/kafka-ui-docs · GitHub

- https://juejin.cn/post/7187301063832109112

相关文章:

基于Docker的Kafka分布式集群

目录 1. 说明 2. 服务器规划 3. docker-compose文件 kafka{i}.yaml kafka-ui.yaml 4. kafka-ui配置集群监控 5. 参数表 6. 测试脚本 生产者-异步生产: AsyncKafkaProducer1.py 消费者-异步消费: AsyncKafkaConsumer1.py 7. 参考 1. 说明 创建一个本地开发环境所需的k…...

【博客之星】年度总结:在云影与墨香中探寻成长的足迹

🐇明明跟你说过:个人主页 🔖行路有良友,便是天堂🔖 目录 一、年度回顾 1、创作历程 2、个人成长 3、个人生活与博客事业 二、技术总结 1、赛道选择 2、技术工具 3、实战项目 三、前景与展望 1、云原生未来…...

SpringBoot的Swagger配置

一、Swagger配置 1.添加依赖 <dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-spring-boot-starter</artifactId><version>3.0.2</version> </dependency> 2.修改WebMvcConfig Slf4j Configurat…...

machine learning knn算法之使用KNN对鸢尾花数据集进行分类

通过导入必要的scikit-learn导入必要的库&#xff0c;加载给定的数据&#xff0c;划分测试集和训练集之后训练预测和评估即可 具体代码如下&#xff1a; import numpy as np from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split f…...

C语言练习(16)

猴子吃桃问题。猴子第一天摘下若干个桃子&#xff0c;当即吃了一半&#xff0c;还不过瘾&#xff0c;又多吃了一个。第二天早上又将剩下的桃子吃掉一半&#xff0c;又多吃了一个。以后每天早上都吃了前一天剩下的一半加一个。到第10天早上想再吃时&#xff0c;见只剩一个桃子了…...

SOAFEE 技术研讨会:汽车软件定义与自动驾驶技术探讨

在本次技术研讨会上&#xff0c;来自汽车与科技领域的专家们围绕汽车软件定义及自动驾驶技术展开了深入交流与探讨。从 SOAFEE 蓝图计划的创新性理念&#xff0c;到 Autoware 开源项目及 Open AD Kit 在实际应用中的探索&#xff0c;再到 Edge Workload Abstraction and Orches…...

R语言学习笔记之开发环境配置

一、概要 整个安装过程及遇到的问题记录 操作步骤备注&#xff08;包含遇到的问题&#xff09;1下载安装R语言2下载安装RStudio3离线安装pacman提示需要安装Rtools4安装Rtoolspacman、tidyfst均离线安装完成5加载tidyfst报错 提示需要安装依赖&#xff0c;试错逐步下载并安装…...

多版本并发控制:MVCC的作用和基本原理

多版本并发控制&#xff1a;MVCC的作用和基本原理 1、MVCC简介1.1 快照读与当前读的区别1.1.1 快照读1.1.2 当前读 1.2 数据库的读写问题1.3 MVCC的作用 2、MVCC实现原理之ReadView2.1 什么是ReadView2.2 ReadView的设计思路2.3 MVCC整体操作流程 1、MVCC简介 1.1 快照读与当前…...

ubuntu18.04安装nvm管理本机node和npm

ubuntu18.04安装nvm管理本机node和npm nvm的使用方法1. 安装nvm2. 加载nvm3. 安装执行版本4. 设置默认版本(可选)5. 检查:6. 将配置加入到shell配置文件中(默认已经加入) 如果系统全局的 Node.js 存在&#xff0c;但被 nvm 覆盖了&#xff0c;可以通过禁用或卸载 nvm 恢复到系统…...

【数据结构进阶】红黑树超详解 + 实现(附源码)

&#x1f31f;&#x1f31f;作者主页&#xff1a;ephemerals__ &#x1f31f;&#x1f31f;所属专栏&#xff1a;数据结构 目录 前言 一、红黑树介绍 二、红黑树原理详解 三、红黑树的实现 1. 节点定义 2. 红黑树类型定义及接口声明 3. 红黑树的插入&#xff08;重点&a…...

leetcode_3092. 最高频率的 ID

https://leetcode.cn/problems/most-frequent-ids/description/ 看到这个数据范围 最极端情况 如果nums全为一个数 并且数量取到最大 那么范围是10的10次方 需要longlong储存 这题主要运用了哈希表配合multiset实现 哈希表主要用作存储某个数的出现次数 mst则用于记录出现次…...

鸿蒙仓颉环境配置(仓颉SDK下载,仓颉VsCode开发环境配置,仓颉DevEco开发环境配置)

目录 ​1&#xff09;仓颉的SDK下载 1--进入仓颉的官网 2--点击图片中的下载按钮 3--在新跳转的页面点击即刻下载 4--下载 5--找到你们自己下载好的地方 6--解压软件 2&#xff09;仓颉编程环境配置 1--找到自己的根目录 2--进入命令行窗口 3--输入 envsetup.bat 4--验证是否安…...

数据统计–图形报表(day11)

Apache ECharts 介绍 Apache ECharts 介绍 Apache ECharts 是一款基于 Javascript 的数据可视化图表库&#xff0c;提供直观&#xff0c;生动&#xff0c;可交互&#xff0c;可个性化定制的数据可视化图表。 官网地址&#xff1a;Apache ECharts 入门案例 Apache Echarts官方…...

源码分析之Openlayers样式篇CircleStyle类

访问Openlayers网站(https://jinuss.github.io/Openlayers_map_pages/&#xff0c;网站是基于Vue3 Openlayers&#xff0c;里面有大量的实践和案例。觉得还不错&#xff0c;可以 给个小星星Star&#xff0c;鼓励一波 https://github.com/Jinuss/OpenlayersMap哦~ 概述 在 Ope…...

解决CentOS9系统下Zabbix 7.2图形中文字符乱码问题

操作系统&#xff1a;CentOS 9 Zabbix版本&#xff1a;Zabbix7.2 问题描述&#xff1a;主机图形中文字符乱码 解决方案&#xff1a; # 安装字体配置和中文语言包 sudo yum install -y fontconfig langpacks-zh_CN.noarch # 检查是否已有中文字体&#xff1a; fc-list :lan…...

AF3 FourierEmbedding类源码解读

FourierEmbedding 是一个用于扩散条件的傅里叶嵌入类,其核心是将输入的时间步噪声强度或控制参数(timestep)转换为高维的周期性特征。 源代码: class FourierEmbedding(nn.Module):"""Fourier embedding for diffusion conditioning."""de…...

vsftpd虚拟用户部署

vsftpd虚拟用户部署 案例提供两个用户如下,使用centos7验证可行。 test *AO9ih&7 ftp DTx4zp_shell脚本运行一键安装vsftp #!/bin/bash yum -y install vsftpd ftp >/etc/vsftpd/vsftpd.conf cat <<EOL >> /etc/vsftpd/vsftpd.conf anonymous_enableNO l…...

MySQL 容器已经停止(但仍然存在),但希望重新启动它,并使它的 3306 端口映射到宿主机的 3306 端口是不可行的

重新启动容器并映射端口是不行的 由于你已经有一个名为 mysql-container 的 MySQL 容器&#xff0c;你可以使用 docker start 启动它。想要让3306 端口映射到宿主机是不行的&#xff0c;实际上&#xff0c;端口映射是在容器启动时指定的。你无法在容器已经创建的情况下直接修改…...

汇编实验·顺序程序设计

一、实验目的: 1.能够熟练的进行顺序程序的编写,掌握基本的汇编语言指令的用法 2.通过程序设计理解掌握不同类型的数据混合运算的基本规则 3.熟练掌握各种寻址方式,深入理解逻辑地址和物理地址的相关概念 二、实验内容 有三个长度分别为1、2、4个字节的数据,编写程序求…...

AIGC视频扩散模型新星:Video 版本的SD模型

大家好&#xff0c;这里是好评笔记&#xff0c;公主号&#xff1a;Goodnote&#xff0c;专栏文章私信限时Free。本文详细介绍慕尼黑大学携手 NVIDIA 等共同推出视频生成模型 Video LDMs。NVIDIA 在 AI 领域的卓越成就家喻户晓&#xff0c;而慕尼黑大学同样不容小觑&#xff0c;…...

五层电梯MCGS7.7嵌入版与三菱PLC的联动编程实践

5五层电梯MCGS7.7嵌入版和三菱PLC联机程序调试电梯控制程序最头疼的莫过于通讯不稳定。上个月刚搞完一个五层电梯项目&#xff0c;MCGS7.7触摸屏和三菱FX3U的联机调试过程简直像坐过山车——楼层显示乱跳、按钮状态丢失这些幺蛾子接踵而来。今天咱就唠唠这个项目的实战经验。硬…...

告别窗口闪烁:用BLASTSyncEngine实现Android多窗口平滑过渡的完整指南

告别窗口闪烁&#xff1a;用BLASTSyncEngine实现Android多窗口平滑过渡的完整指南 在Android多窗口交互场景中&#xff0c;开发者经常面临一个棘手问题——当用户进行分屏切换、画中画调整或任务栈重组时&#xff0c;窗口内容会出现短暂闪烁或撕裂。这种视觉瑕疵不仅影响用户体…...

2026 靠谱网站建设公司推荐|中大型企业 / 上市公司建站避坑与优选指南

摘要 在 AI 与 GEO&#xff08;生成式引擎优化&#xff09;主导的 2026 年&#xff0c;企业官网早已不是简单的展示窗口&#xff0c;而是品牌信任、获客转化、合规披露、全球触达的核心数字资产。选择一家靠谱的网站建设公司&#xff0c;直接决定企业数字化成果与长期商业价值。…...

2026 年真正必备的 10 个 Claude 插件(以及它们的作用)

如何把 Claude 从聊天机器人&#xff0c;变成能写代码、联网、访问数据、自动化全流程的超级 AIClaude 刚刚获得了超能力。 而大多数人还以为它只是个聊天机器人。 2026 年 2 月 24 日&#xff0c;Anthropic 为企业用户推出了私有插件市场。而在此两周前&#xff0c;社区已经发…...

OpenClaw隐私保护方案:Qwen3-14B本地处理敏感数据

OpenClaw隐私保护方案&#xff1a;Qwen3-14B本地处理敏感数据 1. 为什么需要本地化隐私保护方案 去年我在处理一批医疗研究数据时&#xff0c;曾因使用某云端AI服务导致文件误传至公共存储桶。虽然及时删除了数据&#xff0c;但这次经历让我意识到&#xff1a;当涉及法律文书…...

数字化转型深水区:技术从“支撑”到“驱动”的蜕变

对于身处一线的软件测试从业者而言&#xff0c;“数字化转型”早已不是一个陌生的词汇。我们经历了从手工测试到自动化测试的转变&#xff0c;见证了敏捷与DevOps带来的流程革新。然而&#xff0c;当转型浪潮进入“深水区”&#xff0c;一种更为根本的变革正在发生&#xff1a;…...

保姆级教程:在若依框架里给你的系统加个AI客服(通义千问+流式响应)

企业级智能客服系统集成实战&#xff1a;若依框架与通义千问的完美结合 1. 智能客服系统架构设计 在当今数字化转型浪潮中&#xff0c;智能客服已成为企业提升服务效率、降低人力成本的关键工具。基于若依框架与通义千问构建的智能客服系统&#xff0c;能够无缝集成到现有企业应…...

手把手教你用FBRT-YOLO在VisDrone数据集上跑出SOTA:从环境配置到模型推理的保姆级教程

手把手教你用FBRT-YOLO在VisDrone数据集上跑出SOTA&#xff1a;从环境配置到模型推理的保姆级教程 航拍图像目标检测一直是计算机视觉领域的难点&#xff0c;尤其是小目标检测问题。无人机拍摄的图像分辨率高、目标密集且尺寸小&#xff0c;传统检测算法往往难以兼顾精度和速度…...

算法调试与错误处理终极指南:5个实用技巧确保C++算法正确性

算法调试与错误处理终极指南&#xff1a;5个实用技巧确保C算法正确性 【免费下载链接】algorithms Algorithms & Data structures in C. 项目地址: https://gitcode.com/gh_mirrors/algo/algorithms GitHub 加速计划 / algo / algorithms 项目提供了丰富的 C 算法与…...

解决Ubuntu中libc6-dev:i386依赖问题的完整指南

1. 理解libc6-dev:i386依赖问题的本质 当你正在愉快地使用Ubuntu系统&#xff0c;突然在执行sudo apt-get upgrade时遇到一堆红色错误提示&#xff0c;特别是看到"libc6-dev:i386 : 依赖: libc6:i386 ( 2.31-0ubuntu9.14) 但无法安装它"这样的报错&#xff0c;是不是…...