基于Docker的Kafka分布式集群
目录
1. 说明
2. 服务器规划
3. docker-compose文件
kafka{i}.yaml
kafka-ui.yaml
4. kafka-ui配置集群监控
5. 参数表
6. 测试脚本
生产者-异步生产: AsyncKafkaProducer1.py
消费者-异步消费: AsyncKafkaConsumer1.py
7. 参考
1. 说明
- 创建一个本地开发环境所需的kafka集群
- 分布在3个虚拟机上,以docker容器方式互联互通
2. 服务器规划
| Host | 端口 | 备注 |
|---|---|---|
| host001.dev.sb | 9092, 9093, 9081 | kafka ui 访问 kafka0 节点 |
| host002.dev.sb | 9092, 9093 | kafka1 节点 |
| host003.dev.sb | 9092, 9093 | kafka2 节点 |
3. docker-compose文件
kafka{i}.yaml
- 其中 {i} 对应0,1,2
- 用户密码都配在文件里面
services:kafka:image: 'bitnami/kafka:3.6.2'container_name: kafka{i}hostname: kafka{i}restart: alwaysports:- 9092:9092- 9093:9093environment:# KRaft- KAFKA_CFG_NODE_ID={i}- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka0:9093,1@kafka1:9093,2@kafka2:9093- KAFKA_KRAFT_CLUSTER_ID=sbcluster01-mnopqrstuv# Listeners- KAFKA_CFG_LISTENERS=INTERNAL://:9094,CLIENT://:9095,CONTROLLER://:9093,EXTERNAL://:9092- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka0:9094,CLIENT://:9095,EXTERNAL://kafka0:9092- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_NUM_PARTITIONS=3- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL# Clustering- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2# Log- KAFKA_CFG_LOG_RETENTION_HOURS = 72# SASL- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN- KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN- KAFKA_CONTROLLER_USER=kfkuser- KAFKA_CONTROLLER_PASSWORD=youknow- KAFKA_INTER_BROKER_USER=kfkuser- KAFKA_INTER_BROKER_PASSWORD=youknow- KAFKA_CLIENT_USERS=kfkuser- KAFKA_CLIENT_PASSWORDS=youknow# Others- TZ=Asia/Shanghaivolumes:- '/data0/Server/Db/kafka0:/bitnami/kafka'extra_hosts: - "kafka0:172.16.20.60"- "kafka1:172.16.20.61"- "kafka2:172.16.20.62"
kafka-ui.yaml
services:kafka-ui:image: 'provectuslabs/kafka-ui:master'container_name: kafka-uirestart: alwaysports:- 9081:8080environment:- KAFKA_CLUSTERS_0_NAME=local- DYNAMIC_CONFIG_ENABLED=true- AUTH_TYPE=LOGIN_FORM- SPRING_SECURITY_USER_NAME=admin- SPRING_SECURITY_USER_PASSWORD=youknowextra_hosts: - "kafka0:172.16.20.60"- "kafka1:172.16.20.61"- "kafka2:172.16.20.62"
4. kafka-ui配置集群监控


5. 参数表
| 参数 | 说明 |
|---|---|
| KAFKA_CFG_PROCESS_ROLES | kafka角色,做broker, controller 示例: |
| KAFKA_KRAFT_CLUSTER_ID | 集群id, 同属节点需一样 |
| KAFKA_CFG_CONTROLLER_QUORUM_VOTERS | 投票选举列表 |
| KAFKA_CFG_CONTROLLER_LISTENER_NAMES | 控制器名称 |
| KAFKA_CFG_NUM_PARTITIONS | 默认分区数 |
| KAFKA_CFG_LISTENERS | 监听器的地址和端口 |
| KAFKA_CFG_ADVERTISED_LISTENERS | 发布监听器的地址和端口 |
| KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP | 监听器的协议 这里sasl_plain表示 仅认证加密 传输不加密 |
| KAFKA_CLIENT_USERS | 加密客户端账号 |
| KAFKA_CLIENT_PASSWORDS | 加密客户端密码 |
| #Clustering | |
| KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR | Kafka 内部使用的 __consumer_offsets 主题的复制因子。这个主题是用来存储消费者偏移 量 |
| KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR | Kafka 内部使用的 __transaction_state 主题的复制因子。这个主题是用来存储事务日志 |
| KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR | Kafka 内部使用的 __transaction_state 主题的最小 ISR(In-Sync Replicas)数量。ISR 是与 leader 保持同步的副本集合 |
| #Log | |
| KAFKA_CFG_LOG_DIRS | 日志目录 |
| KAFKA_CFG_LOG_RETENTION_HOURS | 数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理,默认168小时,一周时间 |
6. 测试脚本
生产者-异步生产: AsyncKafkaProducer1.py
from confluent_kafka import Producer
import jsondef delivery_report(err, msg):"""Called once for each message produced to indicate delivery result.Triggered by poll() or flush()."""if err is not None:print(f"Message delivery failed: {err}")else:print(f"Message delivered to {msg.topic()} [{msg.partition()}]")def create_async_producer(config):"""Creates an instance of an asynchronous Kafka producer."""return Producer(config)def produce_messages(producer, topic, messages):"""Asynchronously produces messages to a Kafka topic."""for message in messages:# Trigger any available delivery report callbacks from previous produce() callsproducer.poll(0)# Asynchronously produce a message, the delivery report callback# will be triggered from poll() above, or flush() below, when the message has# been successfully delivered or failed permanently.producer.produce(topic, json.dumps(message).encode("utf-8"), callback=delivery_report)# Wait for any outstanding messages to be delivered and delivery report# callbacks to be triggered.producer.flush()if __name__ == "__main__":# Kafka configuration# Replace these with your server's configurationconf = {"bootstrap.servers": "host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092","client.id": "PythonProducer","security.protocol": "SASL_PLAINTEXT","sasl.mechanisms": "PLAIN","sasl.username": "kfkuser","sasl.password": "youknow",}# Create an asynchronous Kafka producerasync_producer = create_async_producer(conf)# Messages to send to Kafkamessages_to_send = [{"key": "value1a"}, {"key": "value2a"}, {"key": "value3a"}]# Produce messagesproduce_messages(async_producer, "zx001.msg.user", messages_to_send)
消费者-异步消费: AsyncKafkaConsumer1.py
from confluent_kafka import Consumer, KafkaError, KafkaException
import asyncio
import json
import logging
from datetime import datetime# 设置日志格式,'%()'表示日志参数
log_format = "%(message)s"
logging.basicConfig(filename="logs/kafka_messages1.log", format=log_format, level=logging.INFO
)async def consume_loop(consumer, topics):try:# 订阅主题consumer.subscribe(topics)while True:# 轮询消息msg = consumer.poll(timeout=1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:# End of partition eventprint("%% %s [%d] reached end at offset %d\n"% (msg.topic(), msg.partition(), msg.offset()))elif msg.error():raise KafkaException(msg.error())else:# 正常消息raw_message = msg.value()# print(f"Raw message: {raw_message}")str_msg = raw_message.decode("utf-8")parsed_message = json.loads(str_msg)parsed_message["time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")print(f"Received message: {type(parsed_message)} : {parsed_message}")json_data = json.dumps(parsed_message, ensure_ascii=False)logging.info("{}".format(json_data))await asyncio.sleep(0.01) # 小睡片刻,让出控制权finally:# 关闭消费者consumer.close()async def consume():# 消费者配置conf = {"bootstrap.servers": "host001.dev.sb:9092,host002.dev.sb:9092,host003.dev.sb:9092","group.id": "MsgGroup2","auto.offset.reset": "earliest","client.id" : "PythonConsumer","security.protocol" : "SASL_PLAINTEXT","sasl.mechanisms" : "PLAIN","sasl.username" : "kfkuser","sasl.password" : "youknow"}# 创建消费者consumer = Consumer(conf)await consume_loop(consumer, ["zx001.msg.user"])if __name__ == "__main__":asyncio.run(consume())
7. 参考
- Apache Kafka® Quick Start - Local Install With Docker
- kafka-ui-docs/configuration/configuration-wizard.md at main · provectus/kafka-ui-docs · GitHub
- https://juejin.cn/post/7187301063832109112
相关文章:
基于Docker的Kafka分布式集群
目录 1. 说明 2. 服务器规划 3. docker-compose文件 kafka{i}.yaml kafka-ui.yaml 4. kafka-ui配置集群监控 5. 参数表 6. 测试脚本 生产者-异步生产: AsyncKafkaProducer1.py 消费者-异步消费: AsyncKafkaConsumer1.py 7. 参考 1. 说明 创建一个本地开发环境所需的k…...
【博客之星】年度总结:在云影与墨香中探寻成长的足迹
🐇明明跟你说过:个人主页 🔖行路有良友,便是天堂🔖 目录 一、年度回顾 1、创作历程 2、个人成长 3、个人生活与博客事业 二、技术总结 1、赛道选择 2、技术工具 3、实战项目 三、前景与展望 1、云原生未来…...
SpringBoot的Swagger配置
一、Swagger配置 1.添加依赖 <dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-spring-boot-starter</artifactId><version>3.0.2</version> </dependency> 2.修改WebMvcConfig Slf4j Configurat…...
machine learning knn算法之使用KNN对鸢尾花数据集进行分类
通过导入必要的scikit-learn导入必要的库,加载给定的数据,划分测试集和训练集之后训练预测和评估即可 具体代码如下: import numpy as np from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split f…...
C语言练习(16)
猴子吃桃问题。猴子第一天摘下若干个桃子,当即吃了一半,还不过瘾,又多吃了一个。第二天早上又将剩下的桃子吃掉一半,又多吃了一个。以后每天早上都吃了前一天剩下的一半加一个。到第10天早上想再吃时,见只剩一个桃子了…...
SOAFEE 技术研讨会:汽车软件定义与自动驾驶技术探讨
在本次技术研讨会上,来自汽车与科技领域的专家们围绕汽车软件定义及自动驾驶技术展开了深入交流与探讨。从 SOAFEE 蓝图计划的创新性理念,到 Autoware 开源项目及 Open AD Kit 在实际应用中的探索,再到 Edge Workload Abstraction and Orches…...
R语言学习笔记之开发环境配置
一、概要 整个安装过程及遇到的问题记录 操作步骤备注(包含遇到的问题)1下载安装R语言2下载安装RStudio3离线安装pacman提示需要安装Rtools4安装Rtoolspacman、tidyfst均离线安装完成5加载tidyfst报错 提示需要安装依赖,试错逐步下载并安装…...
多版本并发控制:MVCC的作用和基本原理
多版本并发控制:MVCC的作用和基本原理 1、MVCC简介1.1 快照读与当前读的区别1.1.1 快照读1.1.2 当前读 1.2 数据库的读写问题1.3 MVCC的作用 2、MVCC实现原理之ReadView2.1 什么是ReadView2.2 ReadView的设计思路2.3 MVCC整体操作流程 1、MVCC简介 1.1 快照读与当前…...
ubuntu18.04安装nvm管理本机node和npm
ubuntu18.04安装nvm管理本机node和npm nvm的使用方法1. 安装nvm2. 加载nvm3. 安装执行版本4. 设置默认版本(可选)5. 检查:6. 将配置加入到shell配置文件中(默认已经加入) 如果系统全局的 Node.js 存在,但被 nvm 覆盖了,可以通过禁用或卸载 nvm 恢复到系统…...
【数据结构进阶】红黑树超详解 + 实现(附源码)
🌟🌟作者主页:ephemerals__ 🌟🌟所属专栏:数据结构 目录 前言 一、红黑树介绍 二、红黑树原理详解 三、红黑树的实现 1. 节点定义 2. 红黑树类型定义及接口声明 3. 红黑树的插入(重点&a…...
leetcode_3092. 最高频率的 ID
https://leetcode.cn/problems/most-frequent-ids/description/ 看到这个数据范围 最极端情况 如果nums全为一个数 并且数量取到最大 那么范围是10的10次方 需要longlong储存 这题主要运用了哈希表配合multiset实现 哈希表主要用作存储某个数的出现次数 mst则用于记录出现次…...
鸿蒙仓颉环境配置(仓颉SDK下载,仓颉VsCode开发环境配置,仓颉DevEco开发环境配置)
目录 1)仓颉的SDK下载 1--进入仓颉的官网 2--点击图片中的下载按钮 3--在新跳转的页面点击即刻下载 4--下载 5--找到你们自己下载好的地方 6--解压软件 2)仓颉编程环境配置 1--找到自己的根目录 2--进入命令行窗口 3--输入 envsetup.bat 4--验证是否安…...
数据统计–图形报表(day11)
Apache ECharts 介绍 Apache ECharts 介绍 Apache ECharts 是一款基于 Javascript 的数据可视化图表库,提供直观,生动,可交互,可个性化定制的数据可视化图表。 官网地址:Apache ECharts 入门案例 Apache Echarts官方…...
源码分析之Openlayers样式篇CircleStyle类
访问Openlayers网站(https://jinuss.github.io/Openlayers_map_pages/,网站是基于Vue3 Openlayers,里面有大量的实践和案例。觉得还不错,可以 给个小星星Star,鼓励一波 https://github.com/Jinuss/OpenlayersMap哦~ 概述 在 Ope…...
解决CentOS9系统下Zabbix 7.2图形中文字符乱码问题
操作系统:CentOS 9 Zabbix版本:Zabbix7.2 问题描述:主机图形中文字符乱码 解决方案: # 安装字体配置和中文语言包 sudo yum install -y fontconfig langpacks-zh_CN.noarch # 检查是否已有中文字体: fc-list :lan…...
AF3 FourierEmbedding类源码解读
FourierEmbedding 是一个用于扩散条件的傅里叶嵌入类,其核心是将输入的时间步噪声强度或控制参数(timestep)转换为高维的周期性特征。 源代码: class FourierEmbedding(nn.Module):"""Fourier embedding for diffusion conditioning."""de…...
vsftpd虚拟用户部署
vsftpd虚拟用户部署 案例提供两个用户如下,使用centos7验证可行。 test *AO9ih&7 ftp DTx4zp_shell脚本运行一键安装vsftp #!/bin/bash yum -y install vsftpd ftp >/etc/vsftpd/vsftpd.conf cat <<EOL >> /etc/vsftpd/vsftpd.conf anonymous_enableNO l…...
MySQL 容器已经停止(但仍然存在),但希望重新启动它,并使它的 3306 端口映射到宿主机的 3306 端口是不可行的
重新启动容器并映射端口是不行的 由于你已经有一个名为 mysql-container 的 MySQL 容器,你可以使用 docker start 启动它。想要让3306 端口映射到宿主机是不行的,实际上,端口映射是在容器启动时指定的。你无法在容器已经创建的情况下直接修改…...
汇编实验·顺序程序设计
一、实验目的: 1.能够熟练的进行顺序程序的编写,掌握基本的汇编语言指令的用法 2.通过程序设计理解掌握不同类型的数据混合运算的基本规则 3.熟练掌握各种寻址方式,深入理解逻辑地址和物理地址的相关概念 二、实验内容 有三个长度分别为1、2、4个字节的数据,编写程序求…...
AIGC视频扩散模型新星:Video 版本的SD模型
大家好,这里是好评笔记,公主号:Goodnote,专栏文章私信限时Free。本文详细介绍慕尼黑大学携手 NVIDIA 等共同推出视频生成模型 Video LDMs。NVIDIA 在 AI 领域的卓越成就家喻户晓,而慕尼黑大学同样不容小觑,…...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...
Spring Boot 实现流式响应(兼容 2.7.x)
在实际开发中,我们可能会遇到一些流式数据处理的场景,比如接收来自上游接口的 Server-Sent Events(SSE) 或 流式 JSON 内容,并将其原样中转给前端页面或客户端。这种情况下,传统的 RestTemplate 缓存机制会…...
dify打造数据可视化图表
一、概述 在日常工作和学习中,我们经常需要和数据打交道。无论是分析报告、项目展示,还是简单的数据洞察,一个清晰直观的图表,往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server,由蚂蚁集团 AntV 团队…...
C#中的CLR属性、依赖属性与附加属性
CLR属性的主要特征 封装性: 隐藏字段的实现细节 提供对字段的受控访问 访问控制: 可单独设置get/set访问器的可见性 可创建只读或只写属性 计算属性: 可以在getter中执行计算逻辑 不需要直接对应一个字段 验证逻辑: 可以…...
力扣热题100 k个一组反转链表题解
题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...
水泥厂自动化升级利器:Devicenet转Modbus rtu协议转换网关
在水泥厂的生产流程中,工业自动化网关起着至关重要的作用,尤其是JH-DVN-RTU疆鸿智能Devicenet转Modbus rtu协议转换网关,为水泥厂实现高效生产与精准控制提供了有力支持。 水泥厂设备众多,其中不少设备采用Devicenet协议。Devicen…...
VisualXML全新升级 | 新增数据库编辑功能
VisualXML是一个功能强大的网络总线设计工具,专注于简化汽车电子系统中复杂的网络数据设计操作。它支持多种主流总线网络格式的数据编辑(如DBC、LDF、ARXML、HEX等),并能够基于Excel表格的方式生成和转换多种数据库文件。由此&…...
Python训练营-Day26-函数专题1:函数定义与参数
题目1:计算圆的面积 任务: 编写一个名为 calculate_circle_area 的函数,该函数接收圆的半径 radius 作为参数,并返回圆的面积。圆的面积 π * radius (可以使用 math.pi 作为 π 的值)要求:函数接收一个位置参数 radi…...
STM32标准库-ADC数模转换器
文章目录 一、ADC1.1简介1. 2逐次逼近型ADC1.3ADC框图1.4ADC基本结构1.4.1 信号 “上车点”:输入模块(GPIO、温度、V_REFINT)1.4.2 信号 “调度站”:多路开关1.4.3 信号 “加工厂”:ADC 转换器(规则组 注入…...
电脑桌面太单调,用Python写一个桌面小宠物应用。
下面是一个使用Python创建的简单桌面小宠物应用。这个小宠物会在桌面上游荡,可以响应鼠标点击,并且有简单的动画效果。 import tkinter as tk import random import time from PIL import Image, ImageTk import os import sysclass DesktopPet:def __i…...
