Python 全栈系列266 Kafka服务的Docker搭建
说明
在大量数据处理任务下的缓存与分发
这个算是来自顾同学的助攻+1,我有点java绝缘体的体质,碰到和java相关的安装部署总会碰到点奇怪的问题,不过现在已经搞定了。测试也接近了kafka官方标称的性能。考虑到网络、消息的大小等因素,可以简单认为kafka的速度是10万/秒级的。
本次文章的目的是:
- 1 搭建一个平时工作中常用的队列服务
- 2 方便自己或者其他同事再次搭建
内容
1 搭建过程
共要搭建两个服务:zookeeper和kafka。
1.1 创建zookeeper
这个是基础服务,必须要最先启动
docker run -d --name zookeeper -e \
ZOOKEEPER_CLIENT_PORT=2181 -e \
ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 \
registry.cn-hangzhou.aliyuncs.com/andy08008/zookeeper0718:v100
通常来说,这个服务启动后就不用管了,但是偶尔如果需要debug的时候:
docker exec -it zookeeper bash
bin/zkCli.sh -server 127.0.0.1:2181
ls /brokers/ids
1.2 创建持久化路径
这个会实际保存kafka的消息
mkdir -p /data/kafka-logs
1.3 创建kafka
一种场景是只监听外网IP(WAN_IP),另一种场景是同时监听内外网(LAN_IP)。
只监听外网的比较简单
WAN_IP=111
LAN_IP=222
docker run -it --rm --name kafka \-p 24666:24666 \--link zookeeper:zk \-e HOST_IP=localhost \-e KAFKA_BROKER_ID=1 \-e KAFKA_ZOOKEEPER_CONNECT=zk:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${WAN_IP}:24666 \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:24666 \-e KAFKA_LOG_DIRS=/data/kafka-logs \-v /data/kafka-logs:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100
同时监听内外网的比较麻烦(且要求端口不同)
WAN_IP=111
LAN_IP=222
docker run -d --name kafka \-p 24666:24666 \-p 9092:9092 \--link zookeeper:zk \-e HOST_IP=localhost \-e KAFKA_BROKER_ID=1 \-e KAFKA_ZOOKEEPER_CONNECT=zk:2181 \-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://${LAN_IP}:9092,EXTERNAL://${WAN_IP}:24666 \-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:24666 \-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \-e KAFKA_LISTENER_NAME=INTERNAL \-e KAFKA_LISTENER_NAME=EXTERNAL \-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \-e KAFKA_LOG_DIRS=/data/kafka-logs \-v /data/kafka-logs:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100
配置解释
KAFKA_LISTENERS:
-
INTERNAL://0.0.0.0:9092 用于所有网络接口监听。
-
EXTERNAL://0.0.0.0:24666 用于所有网络接口监听。
-
KAFKA_ADVERTISED_LISTENERS:
-
INTERNAL://IP:9092 用于内网客户端。
-
EXTERNAL://IP:24666 用于外网客户端。
-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
-
INTERNAL:PLAINTEXT 和 EXTERNAL:PLAINTEXT 映射了每个监听器名称和协议类型。
注释
• docker run -d --name kafka:启动一个名为 kafka 的容器,并在后台运行。
• -p 9092:9092:将主机的 9092 端口映射到容器的 9092 端口,这是 Kafka 的默认端口。
• --link zookeeper:zk:将名为 zookeeper 的容器链接到当前容器,并在当前容器中以 zk 作为别名进行访问。
• -e HOST_IP=localhost:设置环境变量 HOST_IP 为 localhost。
• -e KAFKA_BROKER_ID=1:设置 Kafka 的 broker ID 为 1。【如果有多个,应该在这里区分】
• -e KAFKA_ZOOKEEPER_CONNECT=zk:2181:指定 Zookeeper 的连接地址。
• -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxx:9092:设置 Kafka 的广告监听器地址。【这个是实际上Consumer一定会用的。】
• -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092:设置 Kafka 的监听地址。
• -e KAFKA_LOG_DIRS=/data/kafka-logs:指定 Kafka 日志存储目录。
• -v /data/kafka-logs:/data/kafka-logs:将主机的 /data/kafka-logs 目录挂载到容器的 /data/kafka-logs 目录,以持久化存储 Kafka 日志。
2 测试
2.1 生产者测试
from pydantic import BaseModel, field_validator
import json
import pandas as pd
class KafkaJsonMsgList(BaseModel):json_list : list@propertydef msg_list(self):return pd.Series(self.json_list).apply(json.loads).to_list()from func_timeout import func_set_timeout,FunctionTimedOutimport json
from confluent_kafka import Producer
# @func_set_timeout(60)def send_messages(bootstrap_servers = None, topic= None, messages= None):"""发送消息到 Kafka 主题:param bootstrap_servers: Kafka 服务器地址:param topic: Kafka 主题:param messages: 要发送的消息列表"""# 创建 Producer 实例producer = Producer(**{'bootstrap.servers': bootstrap_servers,'acks': 1 })for msg in messages:try:producer.produce(topic, msg)except BufferError:# 如果队列已满,等待队列空出空间producer.poll(1)# 定期调用poll以确保消息传递producer.poll(0)# 确保所有消息都被发送producer.flush()msg_list = [json.dumps({'id':i ,'value':'aaa','aa':'''this is test'''}) for i in range(3)]
topic = 'my_test6'
# 外网
## bootstrap_servers = 'WAN_IP:24666'
# 内网
bootstrap_servers = 'LAN_IP:9092'send_messages(bootstrap_servers=bootstrap_servers,topic=topic,messages = msg_list)
2.2 消费者测试
from confluent_kafka import Consumer# 如果是非json的,直接拿到就可以了
# @func_set_timeout(60)def consume_messages(config = None, topic = None, max_messages = 3):# Create Consumer instanceconsumer = Consumer(config)# Subscribe to topicconsumer.subscribe([topic])consumed_count = 0res_list = []try:while consumed_count < max_messages:msg = consumer.poll(1.0)if msg is None:print('Empty Q')break else:res_list.append(msg.value().decode('utf-8'))consumed_count += 1if consumed_count >= max_messages:breakexcept KeyboardInterrupt:passfinally:# Leave group and commit final offsetsconsumer.close()return res_list # 外网
config = {
# User-specific properties that you must set
'bootstrap.servers': 'WAN_IP:24666',
'group.id':'group1',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True
}
# 内网
config = {
# User-specific properties that you must set
'bootstrap.servers': 'LAN_IP:9092',
'group.id':'group1',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True
}
topic = 'my_test6'
import time
tick1 = time.time()
max_messages = 100 # 这里设置要消费的消息数量
json_list = consume_messages(config, topic, max_messages)
tick2 = time.time()
kj = KafkaJsonMsgList(json_list = json_list)
msg_list = kj.msg_list
tick3 = time.time()
2.3 性能测试
发送端,1.48秒发送10万条消息,稍微弱了点,不过考虑这个是一台仅仅4核8G且繁忙的机器,那就还好(我默认的方式是需要json序列化的)。
tick1 = time.time()
msg_list_10w = [json.dumps({'id':i ,'value':'aaa','aa':'''this is test'''}) for i in range(100000)]
topic = 'my_test6'
send_messages(bootstrap_servers=bootstrap_servers,topic=topic,messages = msg_list_10w)
tick2 = time.time()
print('takes %.2f to send 100000' % (tick2-tick1))
takes 1.48 to send 100000
```接收端
````python
topic = 'my_test6'
import time
tick1 = time.time()
max_messages = 100000 # 这里设置要消费的消息数量
json_list = consume_messages(config, topic, max_messages)
tick2 = time.time()
kj = KafkaJsonMsgList(json_list = json_list)
msg_list = kj.msg_list
tick3 = time.time()
print(tick2-tick1, 'get_time')
print(tick3-tick2, 'parse-time')1.3391587734222412 get_time
0.24841904640197754 parse-time
```总体上还是满意的,可以了。
相关文章:
Python 全栈系列266 Kafka服务的Docker搭建
说明 在大量数据处理任务下的缓存与分发 这个算是来自顾同学的助攻1,我有点java绝缘体的体质,碰到和java相关的安装部署总会碰到点奇怪的问题,不过现在已经搞定了。测试也接近了kafka官方标称的性能。考虑到网络、消息的大小等因素࿰…...
集合框架,List常用API,栈和队列初识
回顾 集合框架 两个重点——ArrayList和HashSet. Vector/ArraysList/LinkedList区别 VectorArraysListLinkedList底层实现数组数组链表线程安全安全不安全不安全增删效率较低较低高扩容*2*1.5-------- (>>)运算级最低,记得加括号。 常…...
构建全景式智慧文旅生态:EasyCVR视频汇聚平台与AR/VR技术的深度融合实践
在科技日新月异的今天,AR(增强现实)和VR(虚拟现实)技术正以前所未有的速度改变着我们的生活方式和工作模式。而EasyCVR视频汇聚平台,作为一款基于云-边-端一体化架构的视频融合AI智能分析平台,可…...
C++结构体声明时初始化
提示:文章 文章目录 前言一、背景二、 2.1 2.2 总结 前言 前期疑问: 本文目标: 一、背景 最近 二、 2.1 c 结构体默认初始化 在C中,结构体的默认成员初始化可以通过构造函数来完成。如果没有为结构体提供构造函数&#x…...
基于微信的热门景点推荐小程序的设计与实现(论文+源码)_kaic
摘 要 近些年来互联网迅速发展人们生活水平也稳步提升,人们也越来越热衷于旅游来提高生活品质。互联网的应用与发展也使得人们获取旅游信息的方法也更加丰富,以前的景点推荐系统现在已经不足以满足用户的要求了,也不能满足不同用户自身的个…...
9、设计模式
设计模式 1、工厂模式 在工厂模式中,我们在创建对象时不会对客户端暴露创建逻辑,并且是通过使用一个共同的接口来指向新创建的对象。工厂模式作为一种创建模式,一般在创建复杂对象时,考虑使用;在创建简单对象时&…...
数学专题.
数论 1.判断质数 定义:在大于1的整数中,如果只包含1和本身这两个约数,就称为质数or素数 Acwing 866.试除法判断质数 2.预处理质数(筛质数) Acwing 868.筛质数 3.质因数分解 Acwing 867.分解质因数 4.阶乘分解 5.因…...
如何提升网站的收录率?
要提升网站的收录率,其中一个特别有效的工具就是GPC爬虫池,这个工具通过深度研究谷歌SEO算法,吸引谷歌爬虫。 GPC爬虫池的基本原理是构建一个庞大的站群系统,并创建复杂的内链和外链结构,以吸引并留住谷歌蜘蛛 使用GP…...
HALCON根据需要创建自定义函数
在HALCON中,根据需要创建自定义函数是扩展其图像处理和分析功能的有效方式。HALCON支持通过其高级编程接口(HDevelop和C/C、C#、Python等)来创建自定义函数。这里将主要讨论在HDevelop环境中如何创建自定义函数,因为HDevelop是HAL…...
力扣SQL仅数据库(196~569)
196. 删除重复的电子邮箱 题目:编写解决方案 删除 所有重复的电子邮件,只保留一个具有最小 id 的唯一电子邮件。 (对于 SQL 用户,请注意你应该编写一个 DELETE 语句而不是 SELECT 语句。) (对于 Pandas …...
网络基础:理解IP地址、默认网关与网段(IP地址是什么,默认网关是什么,网段是什么,IP地址、默认网关与网段)
前言 在计算机网络中,IP地址、默认网关和网段(也称为子网)之间有着密切的关系。它们是网络通信中的至关重要的概念,但它们并不相同。这里来介绍一下它们之间的关系,简单记录一下 一. IP地址 1. 介绍 IP 地址…...
windows安装php7.4
windows安装php7.4 1.通过官网下载所需的php版本 首先从PHP官网(https://www.php.net/downloads.php)或者Windows下的PHP官网(http://windows.php.net/download/)下载Windows版本的PHP安装包。下载后解压到一个路径下。 2.配…...
【代码随想录|图论part03之后】
代码随想录|数组 704. 二分查找,27. 移除元素 一、part031、101. 孤岛的总面积1.1 dfs版本1.2 BFS版本2.102. 沉没孤岛3、103. 水流问题4、104.建造最大岛屿二、part041、110. 字符串接龙2、105.有向图的完全可达性3、106. 岛屿的周长三、part05-06 并查集理论1、107. 寻找存在…...
【项目一】基于pytest的自动化测试框架day1
day1不涉及编写代码,只简单梳理接口测试相关的概念。 day1接口测试的本质:功能测试的一部分测试用例的设计与实现接口调试与自动化:从postman到持续集成补充概念 day1 接口测试的本质:功能测试的一部分 接口测试是功能测试的一部…...
如何下载和安装 Notepad++
Notepad 是一款功能强大的开源文本编辑器,广泛用于代码编写和文本编辑。以下是 Notepad 的下载安装教程: 下载 Notepad 访问官方网站 打开你的网络浏览器,访问 Notepad 的官方网站:https://notepad-plus-plus.org/ 选择下载选项…...
笔记:如何使用Process Explorer分析句柄泄露溢出问题
一、目的:如何使用Process Explorer分析句柄泄露溢出问题 使用 Process Explorer 分析句柄泄漏问题是一个非常有效的方法。句柄泄漏通常是由于应用程序在创建系统资源(如文件、注册表项、GDI 对象等)后没有正确释放这些资源。以下是使用 二、…...
HTTP/2
http相关知识点 HTTP/2是超文本传输协议(HTTP)的第二个主要版本,旨在解决HTTP/1.x版本中存在的一些性能限制和效率问题。HTTP/2由互联网工程任务组(IETF)的HTTP工作组开发,最终在2015年作为RFC 7540正式发…...
如何在算家云搭建ComfyUI(AI绘画)
一、ComfyUI简介 ComfyUI 是一个强大的、模块化的 Stable Diffusion 界面与后端项目。该用户界面将允许用户使用基于图形/节点/流程图的界面设计和执行高级稳定的扩散管道。该项目部分其它特点如下: 全面支持 SD1.x,SD2.x,SDXL,…...
公司的企业画册如何制作?
企业画册是公司形象和产品服务展示的重要载体,一个制作精良的企业画册不仅能展示公司的实力,也能提升客户对公司专业度的认可。以下是制作企业画册的步骤和要点,帮助你的公司画册既美观又实用。 1.要制作电子杂志,首先需要选择一款适合自己的…...
13、Django Admin创建两个独立的管理站点
admin文件 from .models import Epic, Event, EventHero, EventVillain from django.contrib.admin import AdminSiteclass EventAdminSite(AdminSite):site_header "Events管理"site_title "欢迎您!"index_title "管理员"even…...
Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)
文章目录 1.什么是Redis?2.为什么要使用redis作为mysql的缓存?3.什么是缓存雪崩、缓存穿透、缓存击穿?3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...
2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面
代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口(适配服务端返回 Token) export const login async (code, avatar) > {const res await http…...
EtherNet/IP转DeviceNet协议网关详解
一,设备主要功能 疆鸿智能JH-DVN-EIP本产品是自主研发的一款EtherNet/IP从站功能的通讯网关。该产品主要功能是连接DeviceNet总线和EtherNet/IP网络,本网关连接到EtherNet/IP总线中做为从站使用,连接到DeviceNet总线中做为从站使用。 在自动…...
Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...
uniapp 开发ios, xcode 提交app store connect 和 testflight内测
uniapp 中配置 配置manifest 文档:manifest.json 应用配置 | uni-app官网 hbuilderx中本地打包 下载IOS最新SDK 开发环境 | uni小程序SDK hbulderx 版本号:4.66 对应的sdk版本 4.66 两者必须一致 本地打包的资源导入到SDK 导入资源 | uni小程序SDK …...
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的----NTFS源代码分析--重要
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的 第一部分: 0: kd> g Breakpoint 9 hit Ntfs!ReadIndexBuffer: f7173886 55 push ebp 0: kd> kc # 00 Ntfs!ReadIndexBuffer 01 Ntfs!FindFirstIndexEntry 02 Ntfs!NtfsUpda…...
Python 训练营打卡 Day 47
注意力热力图可视化 在day 46代码的基础上,对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...
Linux操作系统共享Windows操作系统的文件
目录 一、共享文件 二、挂载 一、共享文件 点击虚拟机选项-设置 点击选项,设置文件夹共享为总是启用,点击添加,可添加需要共享的文件夹 查询是否共享成功 ls /mnt/hgfs 如果显示Download(这是我共享的文件夹)&…...
linux设备重启后时间与网络时间不同步怎么解决?
linux设备重启后时间与网络时间不同步怎么解决? 设备只要一重启,时间又错了/偏了,明明刚刚对时还是对的! 这在物联网、嵌入式开发环境特别常见,尤其是开发板、树莓派、rk3588 这类设备。 解决方法: 加硬件…...
Centos 7 服务器部署多网站
一、准备工作 安装 Apache bash sudo yum install httpd -y sudo systemctl start httpd sudo systemctl enable httpd创建网站目录 假设部署 2 个网站,目录结构如下: bash sudo mkdir -p /var/www/site1/html sudo mkdir -p /var/www/site2/html添加测试…...
