当前位置: 首页 > news >正文

Python 全栈系列266 Kafka服务的Docker搭建

说明

在大量数据处理任务下的缓存与分发

这个算是来自顾同学的助攻+1,我有点java绝缘体的体质,碰到和java相关的安装部署总会碰到点奇怪的问题,不过现在已经搞定了。测试也接近了kafka官方标称的性能。考虑到网络、消息的大小等因素,可以简单认为kafka的速度是10万/秒级的。

本次文章的目的是:

  • 1 搭建一个平时工作中常用的队列服务
  • 2 方便自己或者其他同事再次搭建

内容

1 搭建过程

共要搭建两个服务:zookeeper和kafka。

1.1 创建zookeeper

这个是基础服务,必须要最先启动

docker run -d --name zookeeper -e \
ZOOKEEPER_CLIENT_PORT=2181 -e \
ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 \
registry.cn-hangzhou.aliyuncs.com/andy08008/zookeeper0718:v100

通常来说,这个服务启动后就不用管了,但是偶尔如果需要debug的时候:

docker exec -it zookeeper bash
bin/zkCli.sh -server 127.0.0.1:2181
ls /brokers/ids

1.2 创建持久化路径

这个会实际保存kafka的消息

mkdir -p /data/kafka-logs

1.3 创建kafka

一种场景是只监听外网IP(WAN_IP),另一种场景是同时监听内外网(LAN_IP)。

只监听外网的比较简单

WAN_IP=111
LAN_IP=222
docker run -it --rm --name kafka \-p 24666:24666 \--link zookeeper:zk \-e HOST_IP=localhost \-e KAFKA_BROKER_ID=1 \-e KAFKA_ZOOKEEPER_CONNECT=zk:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${WAN_IP}:24666  \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:24666 \-e KAFKA_LOG_DIRS=/data/kafka-logs \-v /data/kafka-logs:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100

同时监听内外网的比较麻烦(且要求端口不同)

WAN_IP=111
LAN_IP=222
docker run -d --name kafka \-p 24666:24666 \-p 9092:9092 \--link zookeeper:zk \-e HOST_IP=localhost \-e KAFKA_BROKER_ID=1 \-e KAFKA_ZOOKEEPER_CONNECT=zk:2181 \-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://${LAN_IP}:9092,EXTERNAL://${WAN_IP}:24666 \-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:24666 \-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \-e KAFKA_LISTENER_NAME=INTERNAL \-e KAFKA_LISTENER_NAME=EXTERNAL \-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \-e KAFKA_LOG_DIRS=/data/kafka-logs \-v /data/kafka-logs:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100

配置解释
KAFKA_LISTENERS:

  • INTERNAL://0.0.0.0:9092 用于所有网络接口监听。

  • EXTERNAL://0.0.0.0:24666 用于所有网络接口监听。

  • KAFKA_ADVERTISED_LISTENERS:

  • INTERNAL://IP:9092 用于内网客户端。

  • EXTERNAL://IP:24666 用于外网客户端。

  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:

  • INTERNAL:PLAINTEXT 和 EXTERNAL:PLAINTEXT 映射了每个监听器名称和协议类型。

注释
• docker run -d --name kafka:启动一个名为 kafka 的容器,并在后台运行。
• -p 9092:9092:将主机的 9092 端口映射到容器的 9092 端口,这是 Kafka 的默认端口。
• --link zookeeper:zk:将名为 zookeeper 的容器链接到当前容器,并在当前容器中以 zk 作为别名进行访问。
• -e HOST_IP=localhost:设置环境变量 HOST_IP 为 localhost。
• -e KAFKA_BROKER_ID=1:设置 Kafka 的 broker ID 为 1。【如果有多个,应该在这里区分】
• -e KAFKA_ZOOKEEPER_CONNECT=zk:2181:指定 Zookeeper 的连接地址。
• -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxx:9092:设置 Kafka 的广告监听器地址。【这个是实际上Consumer一定会用的。】
• -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092:设置 Kafka 的监听地址。
• -e KAFKA_LOG_DIRS=/data/kafka-logs:指定 Kafka 日志存储目录。
• -v /data/kafka-logs:/data/kafka-logs:将主机的 /data/kafka-logs 目录挂载到容器的 /data/kafka-logs 目录,以持久化存储 Kafka 日志。

2 测试

2.1 生产者测试

from pydantic import BaseModel, field_validator
import json 
import pandas as pd 
class KafkaJsonMsgList(BaseModel):json_list : list@propertydef msg_list(self):return pd.Series(self.json_list).apply(json.loads).to_list()from func_timeout import func_set_timeout,FunctionTimedOutimport json
from confluent_kafka import Producer
# @func_set_timeout(60)def send_messages(bootstrap_servers = None, topic= None, messages= None):"""发送消息到 Kafka 主题:param bootstrap_servers: Kafka 服务器地址:param topic: Kafka 主题:param messages: 要发送的消息列表"""# 创建 Producer 实例producer = Producer(**{'bootstrap.servers': bootstrap_servers,'acks': 1 })for msg in messages:try:producer.produce(topic, msg)except BufferError:# 如果队列已满,等待队列空出空间producer.poll(1)# 定期调用poll以确保消息传递producer.poll(0)# 确保所有消息都被发送producer.flush()msg_list = [json.dumps({'id':i ,'value':'aaa','aa':'''this is test'''}) for i in range(3)]
topic = 'my_test6'
# 外网
## bootstrap_servers = 'WAN_IP:24666'
# 内网
bootstrap_servers = 'LAN_IP:9092'send_messages(bootstrap_servers=bootstrap_servers,topic=topic,messages = msg_list)

2.2 消费者测试

from confluent_kafka import Consumer# 如果是非json的,直接拿到就可以了
# @func_set_timeout(60)def consume_messages(config = None, topic = None, max_messages = 3):# Create Consumer instanceconsumer = Consumer(config)# Subscribe to topicconsumer.subscribe([topic])consumed_count = 0res_list = []try:while consumed_count < max_messages:msg = consumer.poll(1.0)if msg is None:print('Empty Q')break else:res_list.append(msg.value().decode('utf-8'))consumed_count += 1if consumed_count >= max_messages:breakexcept KeyboardInterrupt:passfinally:# Leave group and commit final offsetsconsumer.close()return res_list # 外网
config = {
# User-specific properties that you must set
'bootstrap.servers': 'WAN_IP:24666',
'group.id':'group1',
'auto.offset.reset': 'earliest', 
'enable.auto.commit': True
}
# 内网
config = {
# User-specific properties that you must set
'bootstrap.servers': 'LAN_IP:9092',
'group.id':'group1',
'auto.offset.reset': 'earliest', 
'enable.auto.commit': True
}
topic = 'my_test6'
import time 
tick1 = time.time()
max_messages = 100  # 这里设置要消费的消息数量
json_list = consume_messages(config, topic, max_messages)
tick2 = time.time()
kj = KafkaJsonMsgList(json_list = json_list)
msg_list = kj.msg_list
tick3 = time.time()

2.3 性能测试

发送端,1.48秒发送10万条消息,稍微弱了点,不过考虑这个是一台仅仅4核8G且繁忙的机器,那就还好(我默认的方式是需要json序列化的)。


tick1 = time.time()
msg_list_10w = [json.dumps({'id':i ,'value':'aaa','aa':'''this is test'''}) for i in range(100000)]
topic = 'my_test6'
send_messages(bootstrap_servers=bootstrap_servers,topic=topic,messages = msg_list_10w)
tick2 = time.time()
print('takes %.2f to send 100000' % (tick2-tick1))
takes 1.48 to send 100000
```接收端
````python
topic = 'my_test6'
import time 
tick1 = time.time()
max_messages = 100000  # 这里设置要消费的消息数量
json_list = consume_messages(config, topic, max_messages)
tick2 = time.time()
kj = KafkaJsonMsgList(json_list = json_list)
msg_list = kj.msg_list
tick3 = time.time()
print(tick2-tick1, 'get_time')
print(tick3-tick2, 'parse-time')1.3391587734222412 get_time
0.24841904640197754 parse-time
```总体上还是满意的,可以了。

相关文章:

Python 全栈系列266 Kafka服务的Docker搭建

说明 在大量数据处理任务下的缓存与分发 这个算是来自顾同学的助攻1&#xff0c;我有点java绝缘体的体质&#xff0c;碰到和java相关的安装部署总会碰到点奇怪的问题&#xff0c;不过现在已经搞定了。测试也接近了kafka官方标称的性能。考虑到网络、消息的大小等因素&#xff0…...

集合框架,List常用API,栈和队列初识

回顾 集合框架 两个重点——ArrayList和HashSet. Vector/ArraysList/LinkedList区别 VectorArraysListLinkedList底层实现数组数组链表线程安全安全不安全不安全增删效率较低较低高扩容*2*1.5-------- &#xff08;>>&#xff09;运算级最低&#xff0c;记得加括号。 常…...

构建全景式智慧文旅生态:EasyCVR视频汇聚平台与AR/VR技术的深度融合实践

在科技日新月异的今天&#xff0c;AR&#xff08;增强现实&#xff09;和VR&#xff08;虚拟现实&#xff09;技术正以前所未有的速度改变着我们的生活方式和工作模式。而EasyCVR视频汇聚平台&#xff0c;作为一款基于云-边-端一体化架构的视频融合AI智能分析平台&#xff0c;可…...

C++结构体声明时初始化

提示&#xff1a;文章 文章目录 前言一、背景二、 2.1 2.2 总结 前言 前期疑问&#xff1a; 本文目标&#xff1a; 一、背景 最近 二、 2.1 c 结构体默认初始化 在C中&#xff0c;结构体的默认成员初始化可以通过构造函数来完成。如果没有为结构体提供构造函数&#x…...

基于微信的热门景点推荐小程序的设计与实现(论文+源码)_kaic

摘 要 近些年来互联网迅速发展人们生活水平也稳步提升&#xff0c;人们也越来越热衷于旅游来提高生活品质。互联网的应用与发展也使得人们获取旅游信息的方法也更加丰富&#xff0c;以前的景点推荐系统现在已经不足以满足用户的要求了&#xff0c;也不能满足不同用户自身的个…...

9、设计模式

设计模式 1、工厂模式 在工厂模式中&#xff0c;我们在创建对象时不会对客户端暴露创建逻辑&#xff0c;并且是通过使用一个共同的接口来指向新创建的对象。工厂模式作为一种创建模式&#xff0c;一般在创建复杂对象时&#xff0c;考虑使用&#xff1b;在创建简单对象时&…...

数学专题.

数论 1.判断质数 定义&#xff1a;在大于1的整数中&#xff0c;如果只包含1和本身这两个约数&#xff0c;就称为质数or素数 Acwing 866.试除法判断质数 2.预处理质数&#xff08;筛质数&#xff09; Acwing 868.筛质数 3.质因数分解 Acwing 867.分解质因数 4.阶乘分解 5.因…...

如何提升网站的收录率?

要提升网站的收录率&#xff0c;其中一个特别有效的工具就是GPC爬虫池&#xff0c;这个工具通过深度研究谷歌SEO算法&#xff0c;吸引谷歌爬虫。 GPC爬虫池的基本原理是构建一个庞大的站群系统&#xff0c;并创建复杂的内链和外链结构&#xff0c;以吸引并留住谷歌蜘蛛 使用GP…...

HALCON根据需要创建自定义函数

在HALCON中&#xff0c;根据需要创建自定义函数是扩展其图像处理和分析功能的有效方式。HALCON支持通过其高级编程接口&#xff08;HDevelop和C/C、C#、Python等&#xff09;来创建自定义函数。这里将主要讨论在HDevelop环境中如何创建自定义函数&#xff0c;因为HDevelop是HAL…...

力扣SQL仅数据库(196~569)

196. 删除重复的电子邮箱 题目&#xff1a;编写解决方案 删除 所有重复的电子邮件&#xff0c;只保留一个具有最小 id 的唯一电子邮件。 &#xff08;对于 SQL 用户&#xff0c;请注意你应该编写一个 DELETE 语句而不是 SELECT 语句。&#xff09; &#xff08;对于 Pandas …...

网络基础:理解IP地址、默认网关与网段(IP地址是什么,默认网关是什么,网段是什么,IP地址、默认网关与网段)

前言 在计算机网络中&#xff0c;IP地址、默认网关和网段&#xff08;也称为子网&#xff09;之间有着密切的关系。它们是网络通信中的至关重要的概念&#xff0c;但它们并不相同。这里来介绍一下它们之间的关系&#xff0c;简单记录一下 一. IP地址 1. 介绍 IP 地址&#xf…...

windows安装php7.4

windows安装php7.4 1.通过官网下载所需的php版本 首先从PHP官网&#xff08;https://www.php.net/downloads.php&#xff09;或者Windows下的PHP官网&#xff08;http://windows.php.net/download/&#xff09;下载Windows版本的PHP安装包。下载后解压到一个路径下。 2.配…...

【代码随想录|图论part03之后】

代码随想录|数组 704. 二分查找,27. 移除元素 一、part031、101. 孤岛的总面积1.1 dfs版本1.2 BFS版本2.102. 沉没孤岛3、103. 水流问题4、104.建造最大岛屿二、part041、110. 字符串接龙2、105.有向图的完全可达性3、106. 岛屿的周长三、part05-06 并查集理论1、107. 寻找存在…...

【项目一】基于pytest的自动化测试框架day1

day1不涉及编写代码&#xff0c;只简单梳理接口测试相关的概念。 day1接口测试的本质&#xff1a;功能测试的一部分测试用例的设计与实现接口调试与自动化&#xff1a;从postman到持续集成补充概念 day1 接口测试的本质&#xff1a;功能测试的一部分 接口测试是功能测试的一部…...

如何下载和安装 Notepad++

Notepad 是一款功能强大的开源文本编辑器&#xff0c;广泛用于代码编写和文本编辑。以下是 Notepad 的下载安装教程&#xff1a; 下载 Notepad 访问官方网站 打开你的网络浏览器&#xff0c;访问 Notepad 的官方网站&#xff1a;https://notepad-plus-plus.org/ 选择下载选项…...

笔记:如何使用Process Explorer分析句柄泄露溢出问题

一、目的&#xff1a;如何使用Process Explorer分析句柄泄露溢出问题 使用 Process Explorer 分析句柄泄漏问题是一个非常有效的方法。句柄泄漏通常是由于应用程序在创建系统资源&#xff08;如文件、注册表项、GDI 对象等&#xff09;后没有正确释放这些资源。以下是使用 二、…...

HTTP/2

http相关知识点 HTTP/2是超文本传输协议&#xff08;HTTP&#xff09;的第二个主要版本&#xff0c;旨在解决HTTP/1.x版本中存在的一些性能限制和效率问题。HTTP/2由互联网工程任务组&#xff08;IETF&#xff09;的HTTP工作组开发&#xff0c;最终在2015年作为RFC 7540正式发…...

如何在算家云搭建ComfyUI(AI绘画)

一、ComfyUI简介 ComfyUI 是一个强大的、模块化的 Stable Diffusion 界面与后端项目。该用户界面将允许用户使用基于图形/节点/流程图的界面设计和执行高级稳定的扩散管道。该项目部分其它特点如下&#xff1a; 全面支持 SD1.x&#xff0c;SD2.x&#xff0c;SDXL&#xff0c;…...

公司的企业画册如何制作?

企业画册是公司形象和产品服务展示的重要载体&#xff0c;一个制作精良的企业画册不仅能展示公司的实力&#xff0c;也能提升客户对公司专业度的认可。以下是制作企业画册的步骤和要点&#xff0c;帮助你的公司画册既美观又实用。 1.要制作电子杂志,首先需要选择一款适合自己的…...

13、Django Admin创建两个独立的管理站点

admin文件 from .models import Epic, Event, EventHero, EventVillain from django.contrib.admin import AdminSiteclass EventAdminSite(AdminSite):site_header "Events管理"site_title "欢迎您&#xff01;"index_title "管理员"even…...

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站&#xff0c;会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后&#xff0c;网站没有变化的情况。 不熟悉siteground主机的新手&#xff0c;遇到这个问题&#xff0c;就很抓狂&#xff0c;明明是哪都没操作错误&#x…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

深入浅出:JavaScript 中的 `window.crypto.getRandomValues()` 方法

深入浅出&#xff1a;JavaScript 中的 window.crypto.getRandomValues() 方法 在现代 Web 开发中&#xff0c;随机数的生成看似简单&#xff0c;却隐藏着许多玄机。无论是生成密码、加密密钥&#xff0c;还是创建安全令牌&#xff0c;随机数的质量直接关系到系统的安全性。Jav…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

C++ 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

NFT模式:数字资产确权与链游经济系统构建

NFT模式&#xff1a;数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新&#xff1a;构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议&#xff1a;基于LayerZero协议实现以太坊、Solana等公链资产互通&#xff0c;通过零知…...

Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)

参考官方文档&#xff1a;https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java&#xff08;供 Kotlin 使用&#xff09; 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...

Linux --进程控制

本文从以下五个方面来初步认识进程控制&#xff1a; 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程&#xff0c;创建出来的进程就是子进程&#xff0c;原来的进程为父进程。…...

视觉slam十四讲实践部分记录——ch2、ch3

ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...

无人机侦测与反制技术的进展与应用

国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机&#xff08;无人驾驶飞行器&#xff0c;UAV&#xff09;技术的快速发展&#xff0c;其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统&#xff0c;无人机的“黑飞”&…...