当前位置: 首页 > article >正文

大数据实时计算:Flink+AI 融合实战

一、为什么需要 FlinkAI 融合在大数据实时计算场景中传统的Flink作业往往只负责数据清洗、聚合、流转等标准化处理但业务需求早已不满足于计算出结果而是需要从结果中产生智能决策电商场景实时识别用户异常下单行为需要结合用户历史行为特征做AI判断运维场景服务器监控指标实时异常检测需要用AI模型替代固定阈值规则金融场景实时交易反欺诈需要在毫秒级窗口内完成多维度风险评分传统方案中通常是Flink计算完数据后再调用外部AI服务这种架构存在三个核心痛点延迟高跨服务调用的网络开销会将端到端延迟从毫秒级拉长到秒级一致性差AI模型的版本更新与Flink作业状态无法同步资源浪费独立部署的AI服务无法共享Flink的计算资源而FlinkAI的融合方案核心是将AI模型嵌入Flink作业内部实现计算与推理的一体化。二、FlinkAI 融合的核心技术路径Flink提供了两条主要的AI融合路径分别对应不同的业务场景1. 基于Flink ML的原生集成推荐用于离线训练在线推理场景Flink ML是Flink官方提供的机器学习库支持在Flink集群上完成模型训练、模型导出、在线推理全流程。其核心优势是与Flink的状态管理、窗口计算天然兼容。2. 基于UDF的模型嵌入推荐用于第三方预训练模型场景对于已经在TensorFlow/PyTorch中训练好的AI模型可以通过Flink的用户自定义函数UDF将模型加载到TaskManager节点中实现推理逻辑与计算逻辑的融合。三、实战FlinkTensorFlow 实时异常检测我们以服务器CPU指标实时异常检测为例完整实现一个FlinkAI融合的作业前置准备环境依赖Flink 1.17TensorFlow 2.13Flink TensorFlow Connector 1.17.0Java 11预训练模型使用LSTM模型训练CPU指标异常检测模型导出为SavedModel格式数据源模拟产生服务器CPU使用率的实时数据流0-100的浮点值步骤1将TensorFlow模型打包为Flink UDFimportorg.apache.flink.streaming.api.functions.ProcessFunction;importorg.apache.flink.util.Collector;importorg.tensorflow.SavedModelBundle;importorg.tensorflow.Tensor;importorg.tensorflow.ndarray.StdArrays;importorg.tensorflow.types.TFloat32;publicclassAnomalyDetectionUDFextendsProcessFunction{// 模型对象使用 transient 避免序列化问题privatetransientSavedModelBundlemodel;Overridepublicvoidopen(Configurationparameters)throwsException{super.open(parameters);// 从本地或分布式文件系统加载SavedModel模型modelSavedModelBundle.load(/path/to/saved_model,serve);}OverridepublicvoidprocessElement(DoublecpuValue,Contextctx,Collectorout)throwsException{// 1. 构造模型输入将CPU值转换为模型需要的输入格式此处假设模型输入是[1, 10]的时间窗口double[]inputArraynewdouble;// 简化处理用当前值填充数组实际场景应使用Flink的窗口缓存最近10个值for(inti0;i inputTFloat32.tensorOf(StdArrays.ndCopyOf(newfloat[][]{(float[])Arrays.stream(inputArray).mapToObj(Float::valueOf).toArray()}))){// 3. 执行模型推理varresultmodel.session().runner().feed(serving_default_input,input).fetch(StatefulPartitionedCall).run().get(0).expect(TFloat32.class).copyTo(newfloat);// 4. 解析推理结果0表示正常1表示异常booleanisAnomalyresult0.5;if(isAnomaly){out.collect(String.format(检测到异常CPU指标%.2f时间戳%d,cpuValue,ctx.timestamp()));}}}Overridepublicvoidclose()throwsException{super.close();if(model!null){model.close();}}}步骤2构建Flink实时数据流作业importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.functions.source.SourceFunction;publicclassFlinkAIDemoJob{publicstaticvoidmain(String[]args)throwsException{// 1. 初始化Flink执行环境StreamExecutionEnvironmentenvStreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 2. 模拟实时CPU指标数据源DataStreamcpuStreamenv.addSource(newSourceFunction(){privatevolatilebooleanisRunningtrue;Overridepublicvoidrun(SourceContextctx)throwsException{while(isRunning){// 模拟正常CPU值0-80每100ms产生一条数据doublenormalCpuMath.random()*80;ctx.collectWithTimestamp(normalCpu,System.currentTimeMillis());// 随机插入异常值90-100概率10%if(Math.random()anomalyStreamcpuStream.process(newAnomalyDetectionUDF());// 4. 输出异常结果到控制台anomalyStream.print(异常告警);// 5. 执行作业env.execute(FlinkAI 实时异常检测作业);}}步骤3作业部署与验证打包作业将代码和模型文件打包为Fat Jar提交作业./bin/flink run-ccom.example.FlinkAIDemoJob ./flink-ai-demo.jar预期输出异常告警 检测到异常CPU指标95.23时间戳1698765432100异常告警 检测到异常CPU指标98.71时间戳1698765433200四、FlinkAI 融合的性能优化技巧模型并行化将模型加载到每个TaskManager的内存中避免跨节点调用同时设置合理的并行度建议与CPU核心数匹配输入批量处理通过Flink的窗口将多条数据打包成批量输入充分利用TensorFlow的批量推理性能模型轻量化对预训练模型进行量化、剪枝处理减少内存占用和推理延迟状态管理优化对于需要保留历史特征的AI模型使用Flink的ValueState或ListState进行状态缓存避免重复计算资源隔离通过Flink的资源组Resource Group为AI推理任务分配独立的CPU资源避免与计算任务抢资源五、生产环境落地注意事项模型版本管理实现模型热加载机制支持在不重启Flink作业的情况下更新AI模型监控与告警实时监控模型推理的QPS、延迟、准确率等指标当模型性能下降时及时告警容错与恢复将模型文件存储在分布式文件系统如HDFS、S3中确保TaskManager节点故障重启后能重新加载模型数据一致性使用Flink的Exactly-Once语义保证数据处理的一致性避免AI模型重复推理相同数据六、总结与展望FlinkAI的融合本质是将实时计算的算力与AI的智力深度结合让实时数据在产生的瞬间就能转化为智能决策。当前Flink ML还在快速迭代中未来会支持更多的AI算法和模型格式同时Flink与云原生AI服务的集成也会更加紧密比如与Kubernetes上的TensorFlow Serving、PyTorch Serving的无缝对接。对于开发者而言掌握FlinkAI融合技术将从实时计算工程师升级为实时智能工程师这也是大数据领域未来的核心竞争力之一。

相关文章:

大数据实时计算:Flink+AI 融合实战

一、为什么需要 FlinkAI 融合? 在大数据实时计算场景中,传统的Flink作业往往只负责数据清洗、聚合、流转等标准化处理,但业务需求早已不满足于"计算出结果",而是需要"从结果中产生智能决策": 电…...

GeoDa 空间回归分析

GeoDa 空间回归分析 前置知识:[[GeoDa空间自相关分析]] 难度等级:⭐⭐⭐⭐⭐ 更新日期:2026-03-16 📋 目录 1. 空间回归基础2. 空间滞后模型(SLM)3. 空间误差模型(SEM)4. 空间杜宾模…...

初探 MindSpore(一):PyTorch 用户先从哪里开始

初探 MindSpore(一):先建立最基本的框架认识 对 PyTorch 用户来说,MindSpore 不是一套需要从头理解的框架,但也绝不是“把 API 名字改掉就能迁过去”的另一层皮。MindSpore 官方文档本身就是按这个思路组织的&#xff…...

OpenClaw 安全公告激增暴露 GitHub 与 CVE 漏洞跟踪体系间的鸿沟

自托管AI Agent项目OpenClaw在发布数周后便成为GitHub星标最多的代码库,吸引了大量开发者社区和研究人员关注。但没人预料到,其快速增长很快成为全球漏洞跟踪体系的意外压力测试。 安全公告爆发式增长 2月下旬,该项目开始以开源项目罕见的速度发布安全公告,迅速暴露出两大…...

申论素材资源合集

26行政执法专项资料 文件大小: 31.8GB内容特色: 31.8GB行政执法专项资料,覆盖法规、案例与高频考点适用人群: 备考公务员行政执法岗、法检书记员、执法勤务辅警核心价值: 一站式掌握执法依据、程序与高频考题,快速提升应试能力下载链接: https://pan.qu…...

openclaw运维

这里写目录标题常用命令配置管理更新管理斜杠命令常用命令 #### Gateway 管理 # 启动 Gateway openclaw gateway# 启动并显示详细日志 openclaw gateway --verbose# 指定端口启动 openclaw gateway --port 18789配置管理 # 运行配置向导 openclaw onboard# 系统健康检查 open…...

[连载] C++ 零基础入门-5.C++ if else 条件判断(小白必看)

【C 零基础入门】第5篇:if else 条件判断(小白必看) 作者:咏方舟-长江支流 | 日期:2026-03-16 ✅ 标准C跨平台说明 本系列免费,敬请关注!所有代码均采用标准C,不依赖任何平台…...

Gemini 3 flash架构深度拆解:从稀疏MoE到原生多模态的工程实现

Gemini 3 Pro是谷歌于2025年11月发布的旗舰级大语言模型,其技术内核远非“参数更大”所能概括——稀疏专家混合(MoE)架构、原生多模态统一语义空间、可配置思考深度与思维签名机制,共同构成了其性能跃迁的底层逻辑。国内技术爱好者…...

PD协议物理层深度解析:SOP在充电中的关键作用

近日,有大师级人物成功完成了PD快充的Only Source端软件开发,这一庞大工程目前展现出良好的兼容性,经过测试的笔记本和手机均无异常。 在技术细节上,他采用了ZR的SW3526 buck芯片、安森美的FUSB302物理层芯片,并辅以ST…...

Camera ISP 之 镜头阴影矫正(lens_shading_correction)

1、Lens Shading Lens Shading指画面四角由于入射光线不足形成暗角,同时由于不同频率的光折射率不同,导致Color Shading,因此需要进行镜头阴影矫正(Lens Shading Correction) 。 Lens shading分为两种 luma shading和color shadi…...

一区级光伏功率预测创新模型!CEEMDAN-KPCA-PINN多变量时序预测!完全自适应噪声集合经验模态分解+核主成份降维+物理信息神经网络

SCI配图创新模型!完全自适应噪声集合经验模态分解核主成份降维物理信息神经网络!CEEMDAN-KPCA-PINN多变量时序光伏功率预测,MATLAB代码。以下是对代码的全面分析: 一、主要功能 该代码用于光伏功率时间序列预测,结合了…...

在 CentOS Stream 9 上部署 OpenClaw(小龙虾)

在 CentOS Stream 9 上部署 OpenClaw(小龙虾) 注意:本人使用的普通用户安装 环境准备 # 1. 更新系统 sudo dnf update -y# 2. 安装基础工具 sudo dnf install -y gcc-c make cmake git curl wget vim执行官方安装脚本 脚本会自动安装 Node.js…...

C# 语言测验

C# 语言测验 引言 C#(读作“C sharp”)是一种由微软开发的高级编程语言,它旨在提供跨平台的开发能力,并广泛应用于桌面应用、移动应用、Web应用以及云服务等领域。为了帮助读者更好地理解和掌握C#语言,本文将提供一份全面的C#语言测验,旨在检验读者对C#基础知识的掌握程…...

迅雷怎么加快下载速度_现在迅雷下载怎么这么慢

迅雷限速怎么破解这个很简单,这个方法我还是在我朋友那里找到的。下载速度也是非常可以的。我让大家看一下。点我打开方法 这个就是我测试的速度。速度基本能跑到10M左右。宽带问题。下面开始今天的教学环节 打开上面图片中的地址,你会看到一个获取文件列…...

前端面试基础知识整理【Day-11】

前言 前端面试基础知识整理【Day-1】-CSDN博客 前端面试基础知识整理【Day-2】-CSDN博客 前端面试基础知识整理【Day-3】-CSDN博客 前端面试基础知识整理【Day-4】-CSDN博客 前端面试基础知识整理【Day-5】-CSDN博客 前端面试基础知识整理【Day-6】-CSDN博客 前端面试基…...

前端实现网页转PDF矢量文件,高清还原网页内容

前端:Vue3 后端:Node.js Express 接口 核心 PDF 引擎:Puppeteer(谷歌 Chrome 官方无头浏览器) 中文 100% 不乱码 图片 100% 显示 样式 1:1 还原 A4 自动分页,完美排版 文字可选中,矢量高清 ✅ …...

网络安全的进一步学习

了解基础网安知识分析第三方应用,进一步了解向日葵低版本被利用的条件,和木马能隐藏的原因(通过计划任务定时运行实现持久化的运行)和发现异常登录的记录并进行排查。...

JavaScript性能优化实战烈嘿

JavaScript性能优化实战技术文章大纲 性能优化的核心原则 减少代码执行时间 降低内存占用 优化网络请求 提升用户体验 代码层面的优化 避免全局变量污染,使用模块化或闭包 减少DOM操作,批量更新或使用文档片段 使用事件委托减少事件监听器数量 优化循环结…...

木马的排除与防护

作为学习者,我仅将所学知识进行系统梳理和总结。如有任何疏漏或错误,敬请指正进程、服务、启动项、计划任务的定义进程:操作系统中程序的一次执行实例,是资源分配和调度的基本单位。 服务:在后台运行的程序&#xff0c…...

我用 OpenClaw 7 天,砍掉了 80% 的重复沟通

我用 OpenClaw 7 天,砍掉了 80% 的重复沟通 很多人第一次接触 AI 助手,期待的是“无所不能”。 但真正把 AI 用起来之后,你会发现,最先产生价值的不是那些酷炫能力,而是那些你早就烦透了、却每天都还得做的重复工作。 …...

IDEA各版本支持的Java 版本和功能

https://www.jetbrains.com.cn/help/idea/supported-java-versions.html...

2.【.NET10 实战--孢子记账--产品智能化】--升级前的准备工作:项目依赖梳理与升级计划制定

我们在日常产品维护时,往往会遇到底层基础框架需要升级的情况,尤其是当底层框架升级到一个新的大版本时,可能会带来一些不兼容的变更,这时候我们就需要做好充分的准备工作,以确保升级过程顺利进行。从本文开始&#xf…...

064远程教育网站系统-springboot+vue

文末领取项目源码springbootvue 1.登录2.注册3.首页请文末卡片dd我获取源码...

Android 多进程开发 - FileDescriptor、Uri、AIDL 接口定义不能抛出异常

FileDescriptor 1、AIDL IMyAidlInterface.aidl,这里是位于 src/main/java/com/my/common 包下 package com.my.common;import android.os.ParcelFileDescriptor;interface IMyAidlInterface {ParcelFileDescriptor getFileDescriptor();void setFileDescriptor(in …...

KMP算法详解 [c++]

目录 前言 朴素的模式匹配算法 KMP模式匹配算法 KMP模式匹配算法的原理 next数组值的推导 KMP模式匹配算法的实现 KMP模式匹配算法的改进 nextval的推导 优化后的KMP模式匹配算法代码 零、前言 每年新闻周刊都会发布年度十大热词,这其实查询某个字符串在其…...

AD7685的SPI接口调试过程(附完整代码)

该系列的ADC主要差别是在转换速率上,AD7685的最大转换速率是250kSPS。我们主要是看芯片SPI接口和主机的通信:单个ADC和兼容SPI接口的主机通信时,一般会用三线且无繁忙指示模式,该模式的时序图如下所示:主要注意以下几点…...

L298N 直流电机驱动模块与 Arduino 的接口

虽然您最终需要学习控制直流电机才能构建自己的机器人,但您可能需要一些更容易上手的东西 - 这就是 L298N 电机驱动器的用武之地。它可以控制速度和旋转两个直流电机的方向。此外,它还可以 控制直流电机 只有能够控制直流电机的速度和旋转方向,我们才能完全控制它。通过结…...

【LLM infra】Megatron-LM | deepspeed | 量化/推理框架

note LLM推理过程: prefill:每层都得到历史token的kv cache,最后一个位置输出 logits;decode:对刚才新生成的token,计算它的Q/K/V,用它的 Q 去 attend 历史所有 K/V cache,输出下一…...

mimic数据库提取小问题解决

sql学艺不精,所以基本上自己开发一套“专属sql”后后面都是套用。首先是拼接问题,正常提取出目标人群后,需要不断拼接demo,treat,lab等数据,像demo,treat这些可能还好,但lab这些短时…...

推荐:Jib — 容器化你的Java应用的新选择!

推荐:Jib — 容器化你的Java应用的新选择! 【免费下载链接】jib GoogleContainerTools/jib: 是一个基于 Java 的 Docker 镜像构建工具,支持多种容器镜像构建选项和插件。该项目提供了一个简单易用的 Docker 镜像构建工具,可以方便…...