当前位置: 首页 > article >正文

Flink 流处理核心算子深度剖析

一、ProcessFunction 与 MapFunction 区别1、功能和区别MapFunction:纯数据转换,一条进一条出,无状态、无时间、无侧输出,只能做简单映射。ProcessFunction:全能处理,一条进可以 0/1/N 条出,支持状态、定时器、侧输出、访问时间,能实现复杂业务逻辑。简单说:Map 能干的,Process 全能干;Process 能干的,Map 干不了。2、processFunction用法2.1、初级:基础过滤与数据增强场景:监控传感器读数,过滤异常值并添加标记package com.self.map; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; public class BasicProcessFunctionExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 简化输出 // 模拟传感器数据流 (传感器ID, 温度值) DataStreamSensorReading sensorStream = env .fromElements( new SensorReading("sensor_1", 25.5), new SensorReading("sensor_1", 95.0), // 异常高温 new SensorReading("sensor_2", 22.3), new SensorReading("sensor_2", -5.0) // 异常低温 ); // 使用 ProcessFunction 进行异常检测和标记 DataStreamEnrichedReading processedStream = sensorStream .process(new AnomalyDetectionProcessFunction()); processedStream.print(); env.execute("Basic ProcessFunction Demo"); } // 传感器数据类 public static class SensorReading { public String sensorId; public double temperature; public SensorReading(String sensorId, double temperature) { this.sensorId = sensorId; this.temperature = temperature; } @Override public String toString() { return String.format("SensorReading{sensor='%s', temp=%.1f}", sensorId, temperature); } } // 增强后的数据类 public static class EnrichedReading { public String sensorId; public double temperature; public boolean isAnomaly; public String status; public EnrichedReading(String sensorId, double temperature, boolean isAnomaly, String status) { this.sensorId = sensorId; this.temperature = temperature; this.isAnomaly = isAnomaly; this.status = status; } @Override public String toString() { return String.format("EnrichedReading{sensor='%s', temp=%.1f, anomaly=%s, status='%s'}", sensorId, temperature, isAnomaly, status); } } /** * 初级 ProcessFunction:过滤异常值并添加标记 * 核心 API: * - processElement():处理每条记录 * - ctx.output():可输出到侧输出流(本例未使用) * - ctx.timestamp():获取事件时间戳 */ public static class AnomalyDetectionProcessFunction extends ProcessFunctionSensorReading, EnrichedReading { private static final double MAX_NORMAL_TEMP = 80.0; private static final double MIN_NORMAL_TEMP = 0.0; @Override public void processElement( SensorReading reading, Context ctx, CollectorEnrichedReading out) throws Exception { boolean isAnomaly = reading.temperature MAX_NORMAL_TEMP || reading.temperature MIN_NORMAL_TEMP; String status; if (reading.temperature MAX_NORMAL_TEMP) { status = "HIGH_TEMP_ALERT"; } else if (reading.temperature MIN_NORMAL_TEMP) { status = "LOW_TEMP_ALERT"; } else { status = "NORMAL"; } // 可以访问事件时间戳(如果存在) Long timestamp = ctx.timestamp(); System.out.println("Processing at timestamp: " + timestamp); // 输出处理后的结果 out.collect(new EnrichedReading( reading.sensorId, reading.temperature, isAnomaly, status )); } } }关键学习点:processElement的基础用法Context对象获取时间戳Collector输出数据2.2、中级:使用定时器和状态进行延迟告警场景:监测传感器在 5 秒内没有收到数据,触发告警package com.self.map; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import java.time.Duration; public class IntermediateProcessFunctionExample { public static void main(String[] args) throws Exception { // 获取流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 env.setParallelism(1); // 设置水位线 = 最大时间 - 2s;使用数据自带的时间作为事件事件 DataStreamSensorEvent eventStream = env .addSource(new SensorSource()) .assignTimestampsAndWatermarks( WatermarkStrategy.SensorEventforBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner((event, timestamp) - event.timestamp) ); // 根据id分组 + 处理数据 DataStreamAlert alertStream = eventStream .keyBy(event - event.sensorId) .process(new InactivityAlertProcessFunction(5000L)); //打印数据 alertStream.print(); //执行 env.execute("Intermediate ProcessFunction Demo"); } public static class SensorEvent { public String sensorId; public double value; public long timestamp; public SensorEvent(String sensorId, double value, long timestamp) { this.sensorId = sensorId; this.value = value; this.timestamp = timestamp; } @Override public String toString() { return String.format("Event{sensor='%s', value=%.2f, time=%d}", sensorId, value, timestamp); } } public static class Alert { public String sensorId; public String message; public long alertTime; public Alert(String sensorId, String message, long alertTime) { this.sensorId = sensorId; this.message = message; this.alertTime = alertTime; } @Override public String toString() { return String.format("ALERT! sensor='%s', msg='%s', at=%d", sensorId, message, alertTime); } } /** * 使用 KeyedProcessFunction 而不是 ProcessFunction * 这样才能使用 getCurrentKey() 方法 */ public static class InactivityAlertProcessFunction extends KeyedProcessFunctionString, SensorEvent, Alert { private final long inactivityThresholdMs; //最大静默时间 private transient ValueStateLong lastEventTimeState; //最后活跃时间 private transient ValueStateLong registeredTimerState; //闹钟时间 // 构造函数 public InactivityAlertProcessFunction(long inactivityThresholdMs) { this.inactivityThresholdMs = inactivityThresholdMs; } @Override public void open(Configuration parameters) throws Exception { //状态说明书 获取最后活跃时间 ValueStateDescriptorLong lastEventDesc = new ValueStateDescriptor( "lastEventTime", Long.class ); //从上下文获取状态值 lastEventTimeState = getRuntimeContext().getState(lastEventDesc); //状态说明书 获取闹钟时间 ValueStateDescriptorLong timerDesc = new ValueStateDescriptor( "registeredTimer", Long.class ); //从上下文获取状态值 registeredTimerState = getRuntimeContext().getState(timerDesc); } /** * 更新闹钟和事件时间 * @param event * @param ctx * @param out * @throws Exception */ @Override public void processElement( SensorEvent event, Context ctx, CollectorAlert out) throws Exception { // 获取getCurrentKey() String currentKey = ctx.getCurrentKey(); // 获取/更新最后活跃时间 long currentTime = event.timestamp; lastEventTimeState.update(currentTime); //获取旧闹钟,若存在删除 Long oldTimer = registeredTimerState.value(); if (oldTimer != null) { ctx.timerService().deleteEventTimeTimer(oldTimer); } //设置新闹钟时间 long alarmTime = currentTime + inactivityThresholdMs; //注册闹钟 ctx.timerService().registerEventTimeTimer(alarmTime); //更新闹钟时间 registeredTimerState.update(alarmTime); //打印数据 System.out.println(String.format( "[%s] Processed event at %d, registered alarm at %d", currentKey, currentTime, alarmTime )); } /** * 定时器,定时检测数据有没有到 * @param timestamp * @param ctx * @param out * @throws Exception */ @Override public void onTimer(long timestamp, OnTimerContext ctx, CollectorAlert out) throws Exception { // 获取当前数据的key String key = ctx.getCurrentKey(); //获取最后一次活跃时间和闹钟时间 Long lastTime = lastEventTimeState.value(); Long registeredTimer = registeredTimerState.value(); //若没有闹钟,则无须继续 if (registeredTimer == null || timestamp != registeredTimer) { return; } //若没有最后活跃时间,则无须继续 if (lastTime == null) { return; } //最后一次活跃时间 + 静默时间 = 当前时间 ,报警 if (lastTime + inactivityThresholdMs = timestamp) { out.collect(new Alert( key, String.format("No data received for %d ms!", inactivityThresholdMs), timestamp )); } //清除闹钟 registeredTimerState.clear(); } } // 模拟数据源 public static class SensorSource implements SourceFunctionSensorEvent { private volatile boolean running = true; @Override public void run(SourceContextSensorEvent ctx) throws Exception { long startTime = System.currentTimeMillis(); System.out.println("=== Starting data source ==="); for (int i = 0; i 3 running; i++) { long eventTime = startTime + i * 1000; ctx.collectWithTimestamp(new SensorEvent("sensor_1", 100 + i, eventTime), eventTime); System.out.println("Sent: sensor_1 at " + eventTime); Thread.sleep(500); } System.out.println("=== sensor_1 going silent for 6 seconds ==="); Thread.sleep(6000); long resumeTime = System.currentTimeMillis(); ctx.collectWithTimestamp(new SensorEvent("sensor_1", 103, resumeTime), resumeTime); System.out.println("Sent: sensor_1 at " + resum

相关文章:

Flink 流处理核心算子深度剖析

一、ProcessFunction 与 MapFunction 区别 1、功能和区别 MapFunction:纯数据转换,一条进一条出,无状态、无时间、无侧输出,只能做简单映射。 ProcessFunction:全能处理,一条进可以 0/1/N 条出,支持状态、定时器、侧输出、访问时间,能实现复杂业务逻辑。 简单说:Map …...

基于RAG的个人知识库AI助手:从原理到部署实战

1. 项目概述:当RAG遇上个人知识库最近几年,大语言模型(LLM)的能力边界不断被拓展,但一个核心痛点始终存在:它无法记住你私有的、非公开的、不断更新的知识。比如,你想让AI助手帮你分析上周的团队…...

研扬EPIC-RPS9工控主板解析:4英寸板载13代酷睿,赋能边缘AI与机器视觉

1. 项目概述:当“小钢炮”遇上工业严苛环境在工业自动化、边缘计算和嵌入式视觉这些领域里,我们常常面临一个经典矛盾:既要强大的算力来处理海量数据、运行复杂算法,又要设备足够紧凑、坚固,能塞进各种空间受限、环境恶…...

数据与大语言模型融合:从NL2SQL到RAG架构的实践指南

1. 项目概述:当数据遇见大语言模型如果你是一名数据工程师、数据分析师,或者任何需要和数据打交道的开发者,最近肯定被“大语言模型”和“数据智能”这两个词轮番轰炸。我们手里有海量的数据,从结构化的业务表到非结构化的日志、文…...

Cursor3.3发布:Skill 自动转为快捷操作

想象一下:每次发版之前,你盯着一个庞大PR,脑子里同时跑着十几个线程——这个模块要重构、那个API要优化、还有安全扫描不能忘。以前你得像个孤独的指挥家,一根根指挥棒轮流挥。 现在,Cursor直接给你拉来一支AI交响乐团…...

Go语言工厂模式:对象创建封装

Go语言工厂模式:对象创建封装 1. 简单工厂 type Product interface {Operation() string }type ConcreteProductA struct{}func (p *ConcreteProductA) Operation() string {return "Product A" }type ConcreteProductB struct{}func (p *ConcreteProduct…...

Redis怎样配置不同环境下的内存淘汰机制

...

魔兽争霸3兼容性修复终极指南:5步解决现代系统闪退问题

魔兽争霸3兼容性修复终极指南:5步解决现代系统闪退问题 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 你是否还在为魔兽争霸3在现代Windo…...

ARM Cortex-A72浮点与SIMD寄存器架构详解

1. ARM Cortex-A72高级SIMD与浮点寄存器架构解析在嵌入式系统和高性能计算领域,ARM Cortex-A72处理器以其卓越的能效比和计算性能著称。作为其核心功能模块之一,高级SIMD(单指令多数据)和浮点运算单元为现代计算密集型应用提供了关…...

Go语言模板方法模式:算法骨架

Go语言模板方法模式:算法骨架 1. 模板方法实现 type AbstractClass struct{}func (a *AbstractClass) TemplateMethod() {a.Step1()a.Step2()a.Step3() }func (a *AbstractClass) Step1() {} func (a *AbstractClass) Step2() {} func (a *AbstractClass) Step3() {…...

Sunshine自托管游戏串流服务器:构建高性能私人云游戏平台的完整指南

Sunshine自托管游戏串流服务器:构建高性能私人云游戏平台的完整指南 【免费下载链接】Sunshine Self-hosted game stream host for Moonlight. 项目地址: https://gitcode.com/GitHub_Trending/su/Sunshine Sunshine是一款功能强大的自托管游戏串流服务器&am…...

ClawForgeAI:基于工作流编排的AIGC创意自动化平台解析

1. 项目概述:从“ClawForgeAI/clawforge”看AI驱动的创意工具新范式最近在GitHub上看到一个挺有意思的项目,叫“ClawForgeAI/clawforge”。光看这个名字,你可能会有点摸不着头脑——“ClawForge”听起来像是个游戏模组工具或者某种机械设计软…...

第5章 集群初始化

本章说明: 集群初始化是 Kubernetes 部署过程中最核心的一步。本章使用 kubeadm 在 master01 节点上初始化高可用集群控制平面。初始化时需要指定 VIP(192.168.3.59:6443)作为控制平面统一入口,这样后续加入的其他 Master 节点和 Worker 节点都通过 VIP 访问 API Server,…...

定制你的专属探针:PEG-锰基纳米材料,为精准科研而生

在纳米生物医学研究的前沿,标准化的材料往往难以完全契合你的实验设想。你是否正在为TME响应成像、MRI造影增强、化学动力学Treatment 或药物递送系统的构建而寻找一种可调控、生物相容性良好的纳米平台?现在,你可以完全掌控参数——PEG-锰基…...

NotebookLM智能体插件:AI驱动的自动化知识处理与任务执行

1. 项目概述:当NotebookLM遇上智能体,知识处理的范式革命最近在AI圈子里,一个名为“notebooklm-agent-plugin”的项目引起了我的注意。乍一看,这个名字结合了Google的NotebookLM和当下火热的“智能体”(Agent&#xff…...

从零开始:用PX4的uORB消息机制,手把手教你实现模块间通信(附代码示例)

从零构建PX4模块通信:uORB消息机制实战指南 在PX4飞控生态中,模块间通信如同无人机的神经系统,而uORB(微对象请求代理)正是这个系统的核心传输介质。当开发者尝试为飞控添加激光雷达或自定义IMU时,往往会遇…...

YOLOv8植物病害识别检测系统(项目源码+YOLO数据集+模型权重+UI界面+python+深度学习+环境配置)

摘要 植物病害是威胁全球农业产量与质量的主要因素之一,传统的人工识别方法依赖专家经验,效率低、主观性强。本文基于YOLOv8目标检测算法,构建了一套涵盖30类植物及其叶片病害的检测系统,包括苹果、玉米、马铃薯、番茄、葡萄等主…...

从零到一:构建与解析XTS测试环境的实战指南

1. 环境准备:搭建XTS测试环境的基础条件 第一次接触XTS测试环境搭建时,我完全被各种术语搞晕了。后来才发现,只要把基础环境准备好,后面的工作就会顺利很多。就像盖房子要先打地基一样,搭建XTS测试环境也需要先准备好几…...

大语言模型智能体长期记忆解决方案:LightMem架构解析与LangChain实战

1. 项目概述:轻量化记忆增强的智能体新范式最近在探索大语言模型智能体应用时,一个核心痛点始终绕不开:如何让智能体在长对话或多轮任务中,记住关键信息,并做出连贯、精准的决策?传统的做法要么是将整个对话…...

金蝶云星空日常使用功能

1、必录和锁定和隐藏 2、取多少位字符 FMaterialId <> null AND ( FMaterialId.FNumber[0:3] in (321) or FMaterialId.FNumber[0:1] in (P)) 3、设定指定值...

雷达系统原理与脉冲测量技术详解

1. 雷达系统基础原理与核心方程雷达&#xff08;RADAR&#xff09;是Radio Detection And Ranging的缩写&#xff0c;其基本原理是通过发射电磁波并接收目标反射信号来实现探测和测距。雷达方程是理解雷达系统性能的基础数学表达式&#xff1a;Pr (Pt * G * λ * σ) / ((4π)…...

基于TypeScript的MCP服务器开发指南:为AI助手构建安全工具调用能力

1. 项目概述&#xff1a;一个为TypeScript开发者打造的MCP服务器最近在折腾AI应用开发&#xff0c;特别是想给Claude、Cursor这类智能助手扩展更强大的工具调用能力时&#xff0c;不可避免地接触到了Model Context Protocol。如果你也在研究如何让AI助手安全、可控地访问文件系…...

利用 STM32F407 BKPSRAM 实现运行时变量监控 —— 从方案到 Keil 调试实战

利用 STM32F407 BKPSRAM 实现运行时变量监控 —— 从方案到 Keil 调试实战 一、什么是 BKPSRAM 1.1 先看一张图 STM32F407 的存储系统里有一个很特别的区域叫备份域&#xff08;Backup Domain&#xff09;。备份域里住着几个东西&#xff1a; ┌───────────────…...

GPTPortal:基于模型抽象层的AI应用快速部署与统一管理平台

1. 项目概述&#xff1a;一个面向开发者的AI应用快速部署门户 最近在GitHub上看到一个挺有意思的项目&#xff0c;叫GPTPortal。乍一看名字&#xff0c;可能会让人联想到某个特定的AI模型服务&#xff0c;但深入了解一下就会发现&#xff0c;它的定位其实更偏向于一个“门户”或…...

RePKG终极指南:如何深度解析Wallpaper Engine资源包与TEX纹理转换

RePKG终极指南&#xff1a;如何深度解析Wallpaper Engine资源包与TEX纹理转换 【免费下载链接】repkg Wallpaper engine PKG extractor/TEX to image converter 项目地址: https://gitcode.com/gh_mirrors/re/repkg RePKG是一款专为Wallpaper Engine设计的专业级资源包解…...

跨境直播进入“下半场”:2026年值得关注的几个新方向

很多人提到跨境直播&#xff0c;第一反应还是“流量”和“带货”。但如果这两年持续关注行业变化&#xff0c;会发现一个明显趋势&#xff1a;跨境直播正在从“内容竞争”转向“技术能力竞争”。尤其从2025年开始&#xff0c;行业越来越卷的不只是主播&#xff0c;而是整个直播…...

基于RAG的Obsidian智能知识库:本地部署与优化实战

1. 项目概述&#xff1a;当知识管理遇上大语言模型 如果你和我一样&#xff0c;是 Obsidian 的深度用户&#xff0c;同时又对大语言模型&#xff08;LLM&#xff09;的智能涌现能力感到着迷&#xff0c;那么你肯定也想过一个问题&#xff1a;能不能让我的知识库“活”起来&…...

WinGet安装工具:PowerShell自动化部署的架构解析与实践指南

WinGet安装工具&#xff1a;PowerShell自动化部署的架构解析与实践指南 【免费下载链接】winget-install Install WinGet using PowerShell! Prerequisites automatically installed. Works on Windows 10/11 and Server 2019/2022. 项目地址: https://gitcode.com/gh_mirror…...

【数字孪生实战案例】三维场景中怎样点击飞线,唤起弹窗并加载匹配的关联数据?~山海鲸可视化

在三维数据可视化场景中&#xff0c;飞线常用于呈现跨区域业务关联与流转关系。为增强交互体验与数据可读性&#xff0c;需实现点击飞线触发弹窗&#xff0c;并精准加载匹配的关联数据&#xff0c;让用户可实时查看单条飞线对应的业务信息&#xff0c;提升三维场景的数据交互与…...

AI技能工程框架解析:从模块化设计到智能体构建实战

1. 项目概述&#xff1a;一个面向技能复现与创造的AI工具集最近在GitHub上看到一个挺有意思的项目&#xff0c;叫“skill-creator-pro”。光看这个名字&#xff0c;你可能会有点摸不着头脑&#xff0c;这到底是做什么的&#xff1f;是教人学技能的&#xff0c;还是生成技能的&a…...