当前位置: 首页 > news >正文

WSL下的Kafka开发容器:Docker搭建、API、整合

背景介绍

Kafka是一个分布式流处理平台,可以处理大规模数据流并支持实时数据流的处理。

本文介绍了如何在WSL下使用Docker搭建Kafka容器,并使用Python的kafka-python库和FastAPI框架实现了一个简单的API。同时,还将该服务整合到一个整体的docker-compose中。文章详细介绍了Docker网络、Kafka环境变量配置、Python连接Kafka的方法以及API的开发。

实验环境

WSL2 Ubuntu18.04 | Docker

⚙️容器配置与搭建

镜像选择

bitnami/kafka镜像

在这里插入图片描述

Bitnami是一个提供开发、部署和管理应用程序的软件公司。Bitnami提供了Kafka的Docker镜像,并有非常详细的文档。我们将使用这个镜像来搭建Kafka容器。

由于Kafka需要Zookeeper支持,我们可以通过docker-compose来快速组合多个容器。

按照官方的文档,我们可以通过如下的配置快速搭建一个Kafka+Zookeeper的中间件:

# docker-compose.ymlversion: "3"networks:app-tier:driver: bridgeservices:zookeeper:restart: alwaysimage: 'bitnami/zookeeper:latest'networks:- app-tierports:- '2181:2181'environment:- ALLOW_ANONYMOUS_LOGIN=yeskafka:restart: alwaysimage: 'bitnami/kafka:latest'networks:- app-tierports:- '9092:9092'- '9093:9093'environment:- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENTdepends_on:- zookeeper

在network部分,我们一个定义了一个 Docker 网络,名称为 “app-tier”,驱动程序为 “bridge”。

在 Docker 中,网络是一种虚拟网络,使容器之间可以进行通信。 “bridge” 驱动程序是 Docker 网络的默认驱动程序,它在单个 Docker 主机内创建一个内部网络,允许容器使用其 IP 地址相互通信。通过定义名称为 “app-tier”,驱动程序为 “bridge” 的网络,连接到该网络的任何容器都将能够使用其在网络内的 IP 地址相互通信。这可以用于创建微服务架构或其他分布式系统,其中多个容器需要彼此通信。

在Kafka的部分,我们设置了 Kafka 的环境变量:包括 Zookeeper 地址、允许明文监听器、监听器安全协议映射、监听器和广告监听器等。

注意,当前的配置将允许明文监听Kafka,这在实际生产环境中是不被允许的,我们在此为了便于开发环境的测试,允许直接监听。

剩余的环境配置可以在官网详细查询

在这里插入图片描述

容器使用

使用如下命令启动容器

docker-compose up

使用如下命令进入容器

sudo docker ps
sudo docker exec -it xx bash

注:其中xx是ps命令得到的Kafka容器的id前两位

进入容器后我们可以通过如下命令开启生产者和消费者

// 确保你在/opt/bitnami/kafka目录下// 创建topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test// 启动生产者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning// 启动消费者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

你可以打开两个终端,分别开启一个生产者和消费者。如果环境运行正常,就可以在消费者的端口同步查看到生产的输入。

基于此,我们快速地搭建了一个Kafka的开发环境,并且成功地启动了生产者和消费者。在实际的开发中,我们可以使用这个环境来进行Kafka相关的开发和测试工作。同时,在生产环境中,我们需要根据实际情况来进行更加严格和安全的配置,以确保Kafka的安全和可靠性。


🌐API构建与测试

除了命令行,我们还可以使用第三方框架实现对Kafka的连接和操作。下面用Python连接kafka并搭建一个简单的API供调试。

安装Python依赖

kafka-python
fastapi
uvicorn
pydantic

使用懒汉式的单例模式构建一个用于连接Kafka的上下文对象

# kafkaContext.pyimport random
from kafka import KafkaProducer, KafkaClient
import timedef _get_kafka_producer_connection(host, port) -> KafkaProducer:while True:try:producer = KafkaProducer(bootstrap_servers=f'{host}:{port}')breakexcept Exception as e:print('producer failed to connect, retrying', e)time.sleep(5)print('producer connected', producer)return producerdef _get_kafka_client_connection(host, port) -> KafkaClient:while True:try:client = KafkaClient(bootstrap_servers=f'{host}:{port}')breakexcept Exception as e:print('client failed to connect, retrying', e)time.sleep(5)print('client connected', client)return clientclass KafkaContext:# 构建一个单例模式的producerdef __init__(self, host='kafka', port=9092):self.__host = hostself.__port = portself.__client = Noneself.__producer = Nonedef __connect_client(self):self.__client = _get_kafka_client_connection(self.__host, self.__port)def __connect_producer(self):self.__producer = _get_kafka_producer_connection(self.__host, self.__port)def is_client_connected(self) -> bool:if self.__client is None:return Falsereturn self.__client.bootstrap_connected()def is_producer_connected(self) -> bool:return self.__producer is not Nonedef add_topic(self, topic: str):if self.__client is None:self.__connect_client()self.__client.add_topic(topic)def send_msg(self, topic: str, msg: str) -> str:if self.__producer is None:self.__connect_producer()# convert msg to bytesmsg_bytes = bytes(msg, encoding='utf-8')self.__producer.send(topic, msg_bytes)return msgkafkaContext = KafkaContext()

注意到我们使用了kafka 的解析去连接Kafka,是因为我们后续要将该服务放在一个Docker容器内,并且接入到和上文提到的Kafka模块的网络中

使用FastAPI构建一个API,提供基本的状态检测、增加topic和生产消息的接口

# api.pyfrom datetime import datetime
from pydantic import BaseModelfrom KafkaContext import kafkaContext
from fastapi import FastAPIapp = FastAPI(title="kafka-server", description="kafka-server", version="0.1.0")class Message(BaseModel):  # 继承了BaseModel,定义了People的数据格式topic: strmsg: str@app.get("/")
def read_root():return {"time": datetime.now(), "status": "ok"}@app.get("/health/client")
def health_client():return {"data": kafkaContext.is_client_connected()}@app.get("/health/producer")
def health_producer():return {"data": kafkaContext.is_producer_connected()}@app.get("/producer/add_topic/{topic}")
def add_topic(topic: str):kafkaContext.add_topic(topic)return {"status": "ok","data": topic}@app.post("/producer/send_msg")
def send_msg(message: Message):return {"status": "ok","data": kafkaContext.send_msg(message.topic, message.msg)}

如果希望开发更多的接口,可以阅读官方的文档 kafka-python · PyPI

在命令行输入命令启动服务

uvicorn api:app --host 0.0.0.0 --port 8000 --reload

打开浏览器访问http://<your_wsl_ip>:8000/docs即可看到接口文档,如果Kafka的消费者还在运行,则可以尝试接口是否运行正常。

在这里插入图片描述

至此,我们已经实现了用Docker搭建一个Kafka模块并使用FastAPI结合python-kafka实现了接口的开发。


📦容器整合

最后,我们将该服务整合到一个整体的docker-compose

首先我们先构建后端API的镜像,该镜像使用了Python的环境。

FROM python
LABEL author="chene2000"
ENV PYTHONIOENCODING=utf-8RUN mkdir -p /app
WORKDIR /app
COPY requirements.txt /app
RUN pip3 install -r requirements.txt -i https://pypi.doubanio.com/simple --trusted-host pypi.doubanio.comCOPY . /appCMD bash start-server.sh

在docker-compose.yml中追加一个server的服务

server:restart: always# image: 'kafka-server'container_name: 'kafka-server'build:dockerfile: Dockerfilecontext: ./server/networks:- app-tierports:- '8002:8000'depends_on:- zookeeper- kafka

注意,我们采用了如下的项目结构

├── docker-compose.yml
└── server├── api.py├── Dockerfile├── KafkaContext.py├── requirements.txt└── start-server.sh

在docker-compose.yml的build处我们配置了Docker容器构建时的目录位置和构建所用的Dockerfile配置。

回到根目录,运行如下命令

# 构建容器
sudo docker compose build# 启动容器
sudo docker compose up
# 启动容器(后台运行)
sudo docker compose up -d

构建完毕


🗒️ 小结

在实际生产环境中,应注意Kafka的安全性和可靠性。在使用第三方框架连接Kafka时,需要使用Kafka的解析进行连接。最后,将Kafka模块整合到docker-compose中,方便进行部署和使用。

项目代码
https://github.com/ChenE2000/thesis-kafka-server

相关文章:

WSL下的Kafka开发容器:Docker搭建、API、整合

背景介绍 Kafka是一个分布式流处理平台&#xff0c;可以处理大规模数据流并支持实时数据流的处理。 本文介绍了如何在WSL下使用Docker搭建Kafka容器&#xff0c;并使用Python的kafka-python库和FastAPI框架实现了一个简单的API。同时&#xff0c;还将该服务整合到一个整体的d…...

cv2(OpenCV)下载安装

cv2对应库是OpenCV&#xff0c;官网下载链接&#xff1a;https://www.lfd.uci.edu/~gohlke/pythonlibs/#opencv 最好下载对应python版本的&#xff0c;通过pip命令安装可能会出现版本过高或者过低的问题&#xff0c;导致import cv2没问题&#xff0c;但是内部函数无法调用。 …...

【剑指 offer】旋转数组的最小数字

✨个人主页&#xff1a;bit me&#x1f447; ✨当前专栏&#xff1a;算法训练营&#x1f447; 旋 转 数 组 的 最 小 数 字核心考点&#xff1a;数组理解&#xff0c;二分查找&#xff0c;临界条件 描述&#xff1a; 有一个长度为 n 的非降序数组&#xff0c;比如[1,2,3,4,5]…...

GB 9706.1-2020 医用电气设备第1部分:基本安全和基本性能的通用要求-1

这是份什么文件 这是一份中华人民共和国国家标准&#xff0c;具体为GB9706.1—2020&#xff0c;标准适用于医用电气设备&#xff0c;并规定了医用电气设备基本安全和基本性能的通用要求。主要涵盖了医疗电器设备与患者接触的各种要求&#xff0c;包括电气安全、机械防护、防护辐…...

认识C++《共、枚、指1》

目录 前言: 1.共用体的基本知识 2.匿名共用体 3.枚举 3.1设置枚举值 3.2枚举的应用场景 3.3枚举变量的取值范围 4.地址和自由存储空间 5.指针的思想 6.指针的声明和初始化 前言: 指针内容比较多&#xff0c;还需要再出一篇。久等了&#xff01;&#xff01;我看了我的…...

vim 一键配置

PS&#xff1a;本文是为了以后为了方便&#xff0c;做备忘的&#xff0c;今天用的时候找了半天很麻烦。 vim编辑器一键配置 在非root用户下执行上面的语句即可&#xff0c;不要在root用户下直接安装&#xff01; 安装的时候需要输入root用户的密码&#xff0c;请找您的服主要一…...

如何成为一名成功的 PHP 开发者

当今的网络应用开发市场&#xff0c;PHP 一直是其中最受欢迎的语言之一&#xff0c;许多优秀的网络应用程序都是由 PHP 开发人员设计和开发的。如果你想成为一名成功的 PHP 开发者&#xff0c;以下是几个关键步骤&#xff1a; 1. 学习基础知识 首先&#xff0c;你需要掌握 PH…...

UHD安装教程

UHD Universal Hardware Driver&#xff0c;即USRP驱动。 UHD&#xff0c;Windows平台安装教程 uhd驱动安装 http://files.ettus.com/binaries/misc/erllc_uhd_winusb_driver.zip 安装LibUSBx http://files.ettus.com/binaries/uhd/latest_release 下载默认C盘 环境配置 将…...

Unity和UE有啥区别?哪个更适合游戏开发

游戏制作软件中最著名的两个游戏引擎是 Unity 和 Unreal Engine。从独立游戏到大型工作室&#xff0c;许多游戏开发商都在使用它们。如果你打算从事游戏行业工作&#xff0c;你肯定曾经问过自己“我的游戏应该使用 Unity 还是 Unreal Engine&#xff1f;” ” 让我们来了解和比…...

红队内网靶场

文章目录开篇介绍靶场介绍靶场下载以及配置Tomcat Get Shell突破DMZ防火墙拿下域内成员机器将内网机器上线到CS使用Adfind侦察子域信息控制子域DCRadmin登录子域进行权限维持(白银票据/ACL)子域bloodhound获取父域信息分析子域Krbtgt密钥创建跨域金票Dcsync父域PTH父域DC准备打…...

如何合并多个升序链表?

前言 本文主要介绍如何将多个小的升序链表合并一个大的升序链表。 需求描述 给出K个升序链接&#xff0c;要求把这K个升序链表合并成一个&#xff0c;并且这个链表也是升序的。 例如&#xff1a;A [1,5,6]&#xff0c; B [2,3,8], C [4,4,9] 将这3个链表合并成一个链表D…...

23上半年信息系统项目管理师新老教程兼顾使用备考策略

在离考试仅有50多天的时候&#xff0c;软考办发文&#xff1a;“为方便报考信息系统项目管理师的考生进行复习备考&#xff0c;2023年上半年信息系统项目管理师考试第3版、第4版教程兼顾使用”。 ​其实软考办发布这样一条信息&#xff0c;也是为了照顾那些在新版发布以前按第…...

Linux环境搭建SVN服务器并实现公网访问 - cpolar端口映射

文章目录前言1. Ubuntu安装SVN服务2. 修改配置文件2.1 修改svnserve.conf文件2.2 修改passwd文件2.3 修改authz文件3. 启动svn服务4. 内网穿透4.1 安装cpolar内网穿透4.2 创建隧道映射本地端口5. 测试公网访问6. 配置固定公网TCP端口地址6.1 保留一个固定的公网TCP端口地址6.2 …...

仿牛客网社区Web开发项目代码逐行精读(更新中)

仿牛客网社区Web开发项目怎么看项目&#xff1f;如何调试项目前瞻技术架构项目亮点开始看代码LoginControllerDiscussPostController怎么看项目&#xff1f; pom.xml看技术架构resource看配置文件&#xff0c;这个项目是前后端不分离的以调试为导向&#xff0c;从前端入手检查…...

5G NR调制阶数与EVM关系以及对系统SNR要求分析

移动通信技术对数据传输速率要求越来越高。一种提高传输速率的思路是使用更高阶的QAM 调制方式&#xff0c;例如5G NR 的256QAM PDSCH&#xff0c;微波的1024QAM&#xff0c;2048QAM和4096QAM 调制。更高阶的QAM 调制方式对系统也提出了更高的要求。例如某个系统的EVM 测试结果…...

【NAS群晖drive异地访问】远程连接drive挂载电脑硬盘「内网穿透」

文章目录前言1.群晖Synology Drive套件的安装1.1 安装Synology Drive套件1.2 设置Synology Drive套件1.3 局域网内电脑测试和使用2.使用cpolar远程访问内网Synology Drive2.1 Cpolar云端设置2.2 Cpolar本地设置2.3 测试和使用3. 结语转发自CSDN远程穿透的文章&#xff1a;【群晖…...

react:hooks为什么不能写在条件语句里

背景 最近朋友在面试&#xff0c;说面试官问到了一个问题不会&#xff0c;说为什么 react hooks为什么不能写在条件语句里&#xff0c;今天我们来研究一下这个问题。 我们在来简单实现一个 useState&#xff1a; const reRender () > {stateIndex -1 ReactDOM.render(&…...

模型优势缺陷整理

&#xff08;1&#xff09;BERT 1. 计算资源消耗&#xff1a;bert模型是一个相对较大的模型&#xff0c;具有数亿个参数。因此&#xff0c;为了训练和使用bert模型&#xff0c;需要大量的计算资源和时间。 2. 学习不足问题&#xff1a;尽管bert模型在大规模语料库上进行了预训…...

编写猫咪相册应用 HTML

文章目录1. 标题元素标签2. p元素用于在网站上创建一段文本3. 注释4. 页面主要部分标识标签5. 通过使用img元素来为你的网站添加图片6. 使用锚点元素(a)链接到另一个页面7. 使用 section 元素将照片内容与未来的内容分开8. 无序列表(ul)元素&#xff0c;列表项(li)元素在列表中…...

基于Arduino与LabVIEW的远程家庭监控系统

在基于Arduino与LabVIEW的远程家庭监控系统中&#xff0c;Arduino Uno控制器需要完成以下功能&#xff1a;1&#xff09;通过W5100网络模块接收并判断命令&#xff0c;采集和传输温度、煤气浓度、热释电传感器的数据&#xff0c;并通过W5100网络模块上传给LabVIEW软件。2&#…...

Unity3D中Gfx.WaitForPresent优化方案

前言 在Unity中&#xff0c;Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染&#xff08;即CPU被阻塞&#xff09;&#xff0c;这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案&#xff1a; 对惹&#xff0c;这里有一个游戏开发交流小组&…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命

在华东塑料包装行业面临限塑令深度调整的背景下&#xff0c;江苏艾立泰以一场跨国资源接力的创新实践&#xff0c;重新定义了绿色供应链的边界。 跨国回收网络&#xff1a;废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点&#xff0c;将海外废弃包装箱通过标准…...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案&#xff0c;允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

什么是EULA和DPA

文章目录 EULA&#xff08;End User License Agreement&#xff09;DPA&#xff08;Data Protection Agreement&#xff09;一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA&#xff08;End User License Agreement&#xff09; 定义&#xff1a; EULA即…...

IT供电系统绝缘监测及故障定位解决方案

随着新能源的快速发展&#xff0c;光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域&#xff0c;IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选&#xff0c;但在长期运行中&#xff0c;例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...

Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?

在大数据处理领域&#xff0c;Hive 作为 Hadoop 生态中重要的数据仓库工具&#xff0c;其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式&#xff0c;很多开发者常常陷入选择困境。本文将从底…...

NPOI操作EXCEL文件 ——CAD C# 二次开发

缺点:dll.版本容易加载错误。CAD加载插件时&#xff0c;没有加载所有类库。插件运行过程中用到某个类库&#xff0c;会从CAD的安装目录找&#xff0c;找不到就报错了。 【方案2】让CAD在加载过程中把类库加载到内存 【方案3】是发现缺少了哪个库&#xff0c;就用插件程序加载进…...

怎么让Comfyui导出的图像不包含工作流信息,

为了数据安全&#xff0c;让Comfyui导出的图像不包含工作流信息&#xff0c;导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo&#xff08;推荐&#xff09;​​ 在 save_images 方法中&#xff0c;​​删除或注释掉所有与 metadata …...

热烈祝贺埃文科技正式加入可信数据空间发展联盟

2025年4月29日&#xff0c;在福州举办的第八届数字中国建设峰会“可信数据空间分论坛”上&#xff0c;可信数据空间发展联盟正式宣告成立。国家数据局党组书记、局长刘烈宏出席并致辞&#xff0c;强调该联盟是推进全国一体化数据市场建设的关键抓手。 郑州埃文科技有限公司&am…...

机器学习的数学基础:线性模型

线性模型 线性模型的基本形式为&#xff1a; f ( x ) ω T x b f\left(\boldsymbol{x}\right)\boldsymbol{\omega}^\text{T}\boldsymbol{x}b f(x)ωTxb 回归问题 利用最小二乘法&#xff0c;得到 ω \boldsymbol{\omega} ω和 b b b的参数估计$ \boldsymbol{\hat{\omega}}…...