当前位置: 首页 > article >正文

使用 Python、Kafka 和 Faust 进行流处理

原文towardsdatascience.com/stream-processing-with-python-kafka-faust-a11740d0910c?sourcecollection_archive---------2-----------------------#2024-02-18如何在高吞吐量时间序列数据上进行流处理并应用实时预测模型https://medium.com/aliosia?sourcepost_page---byline--a11740d0910c--------------------------------https://towardsdatascience.com/?sourcepost_page---byline--a11740d0910c-------------------------------- Ali Osia·发表于Towards Data Science ·阅读时长7 分钟·2024 年 2 月 18 日–https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/ef4f06938ee58f847fdbc6bc1a65aaf2.png图片来自JJ Ying于Unsplash大多数流处理库并不适合 Python而大多数机器学习和数据挖掘库却是基于 Python 的。尽管Faust库旨在将 Kafka 流处理理念引入 Python 生态系统但在易用性方面可能会带来挑战。本文件作为一个教程提供了有效使用 Faust 的最佳实践。在第一部分我介绍了流处理概念的基本概述广泛借鉴了《设计数据密集型应用》一书[1]。随后我探讨了 Faust 库的关键功能特别是 Faust 窗口这部分通常在现有文档中难以理解且难以高效利用。因此我提出了一种通过利用库自身函数的替代方法来使用 Faust 窗口。最后我分享了在 Google Cloud Platform 上实现类似管道的经验。流处理流stream指的是随着时间推移逐步可用的无界数据。事件event是一个小型的、独立的对象包含某一时刻发生的事情的详细信息例如用户交互。事件由生产者producer生成例如温度传感器并可能被一些消费者consumers消费例如在线仪表盘。传统的数据库不适合存储高吞吐量的事件流。这是因为消费者需要定期轮询数据库以识别新事件从而产生显著的开销。相反最好是让消费者在新事件出现时收到通知而消息系统messaging systems正是为此而设计的。消息代理message broker是一种广泛采用的消息传递系统其中生产者将消息写入代理消费者由代理通知并接收这些消息。基于AMQP 的消息代理AMQP-based message brokers如RabbitMQ常用于服务之间的异步消息传递和任务队列。与数据库不同它们采用瞬时消息的理念仅在消息被消费者确认后才会删除消息。当消息处理变得资源密集时可以通过使用多个消费者以负载均衡的方式从相同主题读取来实现并行处理。在这种方法中消息会被随机分配给消费者进行处理这可能导致处理的顺序与接收的顺序不同。另一方面基于日志的消息代理log-based message brokers如Apache Kafka将数据库存储的持久性与消息系统的低延迟通知能力结合在一起。它们利用分区日志结构其中每个分区表示按追加顺序存储在磁盘上的记录序列。这一设计使得重新读取旧消息成为可能。Kafka 中的负载均衡是通过将每个消费者分配给一个分区来实现的从而消息处理的顺序与接收的顺序一致但消费者的数量受限于可用分区的数量。流处理stream processing涉及对流执行操作如处理流并生成新的流、将事件数据存储在数据库中或在仪表盘上可视化数据。流分析stream analytics是一个常见的使用案例其中我们在定义的时间窗口内聚合来自一系列事件的信息。滚动窗口Tumbling windows非重叠和跳动窗口Hopping windows重叠是流分析中常用的窗口类型。流分析使用案例的例子可以是简单地计算过去一小时内的事件数或者对事件应用复杂的时间序列预测模型。流分析面临着区分事件创建时间*(事件时间)和事件处理时间的挑战因为事件处理可能由于排队或网络问题引入延迟。基于处理时间定义窗口是一种更简单的方法特别是当处理延迟较小时。然而基于事件时间定义窗口则更具挑战性。这是因为无法确定窗口内的所有数据是否已经接收完毕或者是否还有未处理的事件。因此需要处理在窗口被认为已完成后仍然到达的滞后事件*。在涉及复杂流分析的应用中如时间序列预测通常需要将一系列有序的消息作为一个整体在窗口内进行处理。在这种情况下消息之间存在强烈的相互依赖关系导致很难从代理中确认并移除单个消息。因此基于日志的消息代理成为了一种更可取的选择。此外在这种情况下由于窗口中的所有消息需要一起考虑平行处理可能不可行或实现过于复杂。然而应用复杂的机器学习模型来处理数据可能需要大量计算资源因此必须采取替代的平行处理方法。本文旨在提出一种解决方案以在高吞吐量流处理应用中有效地使用资源密集型机器学习模型。Faust 流处理有多个流处理库可供选择例如 Apache Kafka Streams、Flink、Samza、Storm 和 Spark Streaming。每个库都有自己的优缺点但其中许多并不是特别适合 Python。不过Faust是一个基于 Python 的流处理库使用 Kafka 作为底层消息系统旨在将 Kafka Streams 的理念引入 Python 生态系统。不幸的是Faust 的文档可能会让人困惑源代码也可能难以理解。例如理解 Faust 中窗口的工作方式在不参考复杂的源代码的情况下是具有挑战性的。此外Faustv1和Faust-Streamingv2仓库中存在许多开放问题解决这些问题并非一件简单的事情。接下来将提供有关 Faust 底层结构的必要知识并附上代码片段帮助有效利用 Faust 库。使用 Faust 的第一步是创建一个应用并配置项目通过指定代理和其他必要的参数。一个有用的参数是table_cleanup_interval将在后续讨论。appfaust.App(app_name,brokerbroker_address,storerocksdb_address,table_cleanup_intervaltable_cleanup_interval)然后你可以使用agent装饰器定义一个流处理器从 Kafka 主题中消费数据并对每个接收到的事件执行某些操作。schemafaust.Schema(value_serializerjson)topicapp.topic(topic_name,schemaschema)app.agent(topic)asyncdefprocessor(stream):asyncforeventinstream:print(event)为了在流处理器中保持状态我们可以使用 Faust 的Table。表是一个分布式的内存字典由 Kafka 变更日志主题支持。你可以将table视为一个可以在流处理器中设置的 Python 字典。tableapp.Table(table_name,defaultint)app.agent(topic)asyncdefprocessor(stream):asyncforeventinstream:table[key]eventFaust 窗口让我们考虑一个时间序列问题每秒我们需要从前 10 秒钟的样本中进行预测。因此我们需要 10 秒重叠的窗口重叠时间为 1 秒。为了实现这个功能我们可以利用 Faust 的windowed tables但在 Faust 文档中对它们的解释不够充分常常导致困惑。理想情况下流处理库应该自动执行以下任务为每个窗口保持状态事件列表确定新事件的相关窗口最后 10 个窗口更新这些窗口的状态将新事件附加到它们各自列表的末尾在窗口关闭时应用一个函数使用窗口的状态作为输入。在下面的代码片段中你可以观察到 Faust 文档中建议的构建窗口并在流处理器中使用它的方法参考 Faust 库中的这个示例# Based on Fuast example# Do not use thiswindow_wrapperapp.Table(table_name,defaultlist,on_window_closewindow_close).hopping(10,1,expiresexpire_time)app.agent(topic)asyncdefprocessor(stream):asyncforeventinstream:window_setwindow_wrapper[key]prevwindow_set.value()prev.append(event)window_wrapper[key]prev在提供的代码中window_wrapper对象是WindowWrapper类的一个实例提供了一些所需的功能。expires参数决定了窗口生命周期的持续时间从创建开始计算。一旦这个指定的时间过去窗口就被视为关闭。Faust 会定期检查table_cleanup_interval持续时间以识别已关闭的窗口。然后它会应用window_close函数使用窗口状态作为输入。当你调用window_wrapper[key]时它返回一个类型为WindowSet的对象该对象内部包含所有相关的窗口。通过调用window_set.value()你可以访问最新窗口的状态另外通过调用window_set.delta(30)你可以访问 30 秒前的窗口状态。此外你还可以通过为window_wrapper[key]赋新值来更新最新窗口的状态。这种方法适用于滚动窗口但不适用于跳跃窗口跳跃窗口需要更新多个窗口的状态。[Faust 文档] 此时在访问跳跃表中的数据时我们总是访问给定时间戳的最新窗口而且我们无法修改这种行为。虽然 Faust 支持维护窗口状态、识别相关窗口并在已关闭的窗口上应用函数但它并没有完全解决第三个功能即更新所有相关窗口的状态。Google Cloud 解决方案我想简要讨论一下我在使用 Google Cloud PlatformGCP时的负面体验。GCP 推荐使用 Google Pub/Sub 作为消息代理Apache Beam 作为流处理库Google Dataflow 作为执行工具Google BigQuery 作为数据库。然而当我尝试使用这个技术栈时我遇到了许多问题导致使用起来非常具有挑战性。在 Python 中使用 Google Pub/Sub 证明是比较慢的可以查看这个和这个这让我放弃了它转而使用 Kafka。Apache Beam 是一个文档齐全的库但与 Kafka 一起使用时却遇到了自己的一些问题。直接运行器有漏洞需要使用 Dataflow且由于等待机器配置导致了显著的时间延迟。此外我还遇到了窗口触发延迟的问题尽管我尝试过解决这个问题但都没有成功可以查看这个GitHub 问题和这个Stack Overflow 贴文。而且由于多个组件的复杂集成调试整个系统是一个巨大的挑战这让我对日志的控制非常有限也使得很难 pinpoint定位Pub/Sub、Beam、Dataflow 或 BigQuery 中问题的根本原因。总的来说我在使用 Google Cloud Platform 的过程中遇到了 Python 中 Google Pub/Sub 性能慢、使用 Apache Beam 与 Kafka 时的 bugs 以及调试这些互联系统的整体困难。[1] Kleppmann, Martin.设计数据密集型应用可靠、可扩展和可维护系统背后的核心理念。 “ O’Reilly Media, Inc.”, 2017。

相关文章:

使用 Python、Kafka 和 Faust 进行流处理

原文:towardsdatascience.com/stream-processing-with-python-kafka-faust-a11740d0910c?sourcecollection_archive---------2-----------------------#2024-02-18 如何在高吞吐量时间序列数据上进行流处理并应用实时预测模型 https://medium.com/aliosia?source…...

保姆级教程:在Ubuntu上为RK3588开发板配置交叉编译环境(含完整脚本)

保姆级教程:在Ubuntu上为RK3588开发板配置交叉编译环境(含完整脚本) 刚拿到RK3588开发板时,最让人头疼的就是如何快速搭建开发环境。不同于x86平台的直接编译,嵌入式开发需要面对处理器架构差异、工具链配置、库依赖等…...

实战指南:通达信缠论量化分析插件的智能化解决方案

实战指南:通达信缠论量化分析插件的智能化解决方案 【免费下载链接】Indicator 通达信缠论可视化分析插件 项目地址: https://gitcode.com/gh_mirrors/ind/Indicator 在金融市场技术分析领域,缠论以其严谨的数学结构和完整的理论体系而备受推崇。…...

WarcraftHelper:5大核心优化功能让魔兽争霸3重获新生

WarcraftHelper:5大核心优化功能让魔兽争霸3重获新生 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 你是否还记得当年在网吧鏖战魔兽争霸…...

Depth Anything深度估计:ComfyUI中AI图像处理的终极指南

Depth Anything深度估计:ComfyUI中AI图像处理的终极指南 【免费下载链接】comfyui_controlnet_aux ComfyUIs ControlNet Auxiliary Preprocessors 项目地址: https://gitcode.com/gh_mirrors/co/comfyui_controlnet_aux 在AI图像生成的世界中,深度…...

Tiny11Builder终极指南:如何快速构建精简版Windows 11系统镜像

Tiny11Builder终极指南:如何快速构建精简版Windows 11系统镜像 【免费下载链接】tiny11builder Scripts to build a trimmed-down Windows 11 image. 项目地址: https://gitcode.com/GitHub_Trending/ti/tiny11builder Tiny11Builder是一个开源的PowerShell脚…...

B站CC字幕高效提取工具:3分钟掌握免费下载与格式转换

B站CC字幕高效提取工具:3分钟掌握免费下载与格式转换 【免费下载链接】BiliBiliCCSubtitle 一个用于下载B站(哔哩哔哩)CC字幕及转换的工具; 项目地址: https://gitcode.com/gh_mirrors/bi/BiliBiliCCSubtitle 还在为B站视频中的精彩内容无法保存为文字而烦恼…...

用GPT-4当老师,手把手教你复现LLaVA多模态模型(附代码与数据集)

从零构建LLaVA多模态助手:GPT-4数据生成与模型训练全流程实战 在人工智能领域,多模态模型正迅速成为技术前沿的焦点。当ChatGPT展现强大文本理解能力时,研究者们开始思考:如何让AI同时理解图像和语言?LLaVA&#xff08…...

告别数据跳动!用STM32F103驱动ADS1220进行精密电压测量的5个关键配置与调试技巧

告别数据跳动!用STM32F103驱动ADS1220进行精密电压测量的5个关键配置与调试技巧 在工业测量和传感器信号采集领域,ADS1220作为TI推出的24位精密ADC,凭借其低噪声PGA和灵活的配置选项,成为许多工程师的首选。但实际应用中&#xff…...

PHP订单幂等性设计失效全复盘(2024真实生产事故溯源)

更多请点击: https://intelliparadigm.com 第一章:PHP订单幂等性设计失效全复盘(2024真实生产事故溯源) 某电商平台在 2024 年“618”大促期间突发重复扣款与订单爆炸式生成,核心支付服务 3 小时内创建超 17 万笔状态…...

【PHP 8.9错误处理终极指南】:5大精准管控机制+3个生产环境避坑实战案例

更多请点击: https://intelliparadigm.com 第一章:PHP 8.9错误处理演进与核心理念 PHP 8.9(当前为前瞻规范草案)在错误处理机制上引入了“可恢复类型错误协议”(Recoverable Type Error Protocol, RTEP)&a…...

生信分析实战:用MetaPhlAn4处理完测序数据后,这些结果文件怎么用?(附常用脚本)

MetaPhlAn4结果文件深度解析:从数据提取到高级可视化的完整指南 当你第一次拿到MetaPhlAn4生成的.txt结果文件时,可能会被那些看似晦涩的clade_name和relative_abundance搞得一头雾水。别担心,这篇文章将带你从零开始理解这些数据&#xff0…...

手把手教你用51单片机和ADC0832做个CO2监测仪(附Proteus仿真和Keil源码)

51单片机实战:从零搭建高精度CO2监测仪(含仿真与源码解析) 在空气质量日益受到关注的今天,二氧化碳浓度监测已成为智能家居、农业大棚和工业环境中的重要需求。本文将带您完整实现一个基于51单片机的CO2监测系统,不仅…...

FanControl终极指南:5分钟学会Windows风扇精准控制,告别噪音烦恼

FanControl终极指南:5分钟学会Windows风扇精准控制,告别噪音烦恼 【免费下载链接】FanControl.Releases This is the release repository for Fan Control, a highly customizable fan controlling software for Windows. 项目地址: https://gitcode.c…...

Go语言高效开发实战:并发模式、性能优化与工程化实践

1. 项目概述与核心价值最近在GitHub上看到一个挺有意思的项目,叫cxuu/golang-skids。乍一看标题,可能会让人联想到“技能”或者“技巧”,但点进去你会发现,它其实是一个精心整理的Go语言(Golang)学习资源与…...

多核处理器与高速互连技术在雷达信号处理中的应用

1. 现代雷达系统的计算挑战与架构演进 雷达信号处理领域正经历着前所未有的计算需求增长。十年前,单通道雷达系统可能只需要单个处理器就能完成所有实时处理任务。但如今,即使是基础型号的雷达系统,也需要多个处理器协同工作才能满足实时性要…...

终极Windows清理方案:用Windows Cleaner彻底告别C盘爆红困扰

终极Windows清理方案:用Windows Cleaner彻底告别C盘爆红困扰 【免费下载链接】WindowsCleaner Windows Cleaner——专治C盘爆红及各种不服! 项目地址: https://gitcode.com/gh_mirrors/wi/WindowsCleaner 你是否经常遇到C盘空间不足的警告&#x…...

别再手动算权重了!用SPSSAU搞定面板数据财务排名(熵权TOPSIS保姆级教程)

财务分析新范式:如何用SPSSAU实现面板数据的智能排名决策 财务分析领域正在经历一场静默的革命。当大多数分析师还在Excel中手动计算权重、反复核对公式时,前沿的数据处理工具已经能够将原本需要数天的工作压缩到几分钟内完成。本文将揭示如何利用SPSSAU…...

ChatGPT Adapter:统一AI接口网关,轻松集成多模型服务

1. 项目概述与核心价值最近在折腾AI应用开发,发现一个挺头疼的问题:市面上的AI模型和API接口五花八门,OpenAI有它的标准,Coze有它的玩法,DeepSeek、Cursor、Bing Copilot又各自为政。想在自己的项目里灵活切换或者同时…...

ROS机器人Web控制面板:从架构设计到安全部署的完整实践

1. 项目概述:一个为机器人打造的“驾驶舱”如果你玩过机器人,或者接触过自动化设备,你肯定知道,让机器人动起来只是第一步。真正让人头疼的,往往是后续的“驾驶”和“管理”。代码写好了,硬件连上了&#x…...

日本麻将助手HTTPS配置终极指南:安全连接与本地证书完整教程

日本麻将助手HTTPS配置终极指南:安全连接与本地证书完整教程 【免费下载链接】mahjong-helper 日本麻将助手:牌效防守记牌(支持雀魂、天凤) 项目地址: https://gitcode.com/gh_mirrors/ma/mahjong-helper 日本麻将助手&…...

APKMirror:安全高效的安卓应用管理开源解决方案

APKMirror:安全高效的安卓应用管理开源解决方案 【免费下载链接】APKMirror 项目地址: https://gitcode.com/gh_mirrors/ap/APKMirror 在安卓生态系统中,应用版本管理、安全下载和历史版本追溯一直是普通用户和开发者面临的三大核心痛点。APKMir…...

如何零基础掌握SVG在线编辑器:告别专业软件的高门槛创作

如何零基础掌握SVG在线编辑器:告别专业软件的高门槛创作 【免费下载链接】svgedit Powerful SVG-Editor for your browser 项目地址: https://gitcode.com/gh_mirrors/sv/svgedit 你是否曾经因为复杂的矢量图形软件而望而却步?是否在寻找一款简单…...

别再只调阈值了!用OpenCV的Sobel梯度法提升低对比度图像缺陷检出率

别再只调阈值了!用OpenCV的Sobel梯度法提升低对比度图像缺陷检出率 在工业质检和医学影像领域,低对比度图像中的缺陷检测一直是令人头疼的难题。许多开发者第一反应是反复调整二值化阈值参数,却常常陷入"调高漏检、调低误报"的死循…...

从飞控模拟到游戏UI:Qt姿态仪(ADI)的二次开发与数据接入指南(附源码)

从飞控模拟到科幻游戏:Qt姿态仪组件的跨领域开发实战 在无人机地面站软件中,姿态仪(Attitude Director Indicator)是飞行员判断飞行状态的核心仪表;而在科幻游戏里,类似的仪表盘却可能成为太空舱控制台的视…...

重庆大学LaTeX论文模板终极指南:3步完成专业论文排版

重庆大学LaTeX论文模板终极指南:3步完成专业论文排版 【免费下载链接】CQUThesis :pencil: 重庆大学毕业论文LaTeX模板---LaTeX Thesis Template for Chongqing University 项目地址: https://gitcode.com/gh_mirrors/cq/CQUThesis CQUThesis是专为重庆大学学…...

别再只会拖模块了!用MATLAB Function模块在Simulink里写自定义逻辑(附if/for/persistent实战)

从图形化到代码化:MATLAB Function模块在Simulink中的高阶应用 当Simulink的图形化模块无法满足复杂算法需求时,MATLAB Function模块就像一把瑞士军刀,让工程师能够直接在仿真模型中嵌入自定义代码逻辑。这种从拖拽模块到编写代码的思维转变&…...

基于Next.js的多模型AI聊天界面:统一集成OpenAI、Claude、Gemini与Ollama

1. 项目概述:一个统一的多模型AI聊天界面 如果你和我一样,经常需要在OpenAI的GPT、Anthropic的Claude、Google的Gemini,甚至本地运行的Ollama模型之间来回切换,那你一定体会过那种在多个浏览器标签页、不同风格的界面和API控制台…...

硬件工程师的宝藏工具:手把手教你搭建Part-DB,实现元器件扫码入库与KiCAD联动

硬件工程师的元器件管理革命:Part-DB与KiCAD联动实战指南 作为一名长期与电阻电容打交道的硬件工程师,我最头疼的不是画板子调电路,而是每次打开元件柜时面对的那堆杂乱无章的料盘和标签。直到发现了Part-DB这个开源神器,我的工作…...

安桥TX-NR515功放ARC功能折腾记:从吃灰到点亮DTS,一根HDMI线搞定电视声音

安桥TX-NR515功放ARC功能实战指南:让老设备焕发新声 去年整理客厅时,那台积灰多年的安桥TX-NR515功放再次闯入我的视线。2013年花了大价钱购入这台支持ARC(音频回传通道)的功放,本想着用一根HDMI线就能解决电视声音输出…...