当前位置: 首页 > article >正文

Flink算子

一、基础转换算子最常用这类算子用于对数据流进行基础的格式转换、过滤、映射是处理数据的第一步。1. map一对一转换作用将数据流中的每个元素转换为另一个元素输入 1 个输出 1 个。场景字段提取、格式转换如字符串转对象。java运行// 示例提取订单金额并转为Double类型 DataStreamDouble amountStream orderStream .map(line - { // 按逗号拆分每行数据 String[] fields line.split(,); // 提取第3个字段金额并转为Double return Double.parseDouble(fields[2]); }); // 输出100.0, 200.0, 150.0, 300.02. flatMap一对多转换作用将一个元素转换为 0 个、1 个或多个元素输入 1 个输出多个。场景数据拆分如一行拆多行、脏数据过滤。java运行// 示例拆分订单信息为Tuple2用户ID, 金额并过滤支付失败的订单 DataStreamTuple2String, Double userAmountStream orderStream .flatMap(new FlatMapFunctionString, Tuple2String, Double() { Override public void flatMap(String line, CollectorTuple2String, Double out) throws Exception { String[] fields line.split(,); String userId fields[1]; double amount Double.parseDouble(fields[2]); String status fields[3]; // 只保留支付成功的订单 if (pay_success.equals(status)) { out.collect(Tuple2.of(userId, amount)); } } }); // 输出(user_01,100.0), (user_01,150.0), (user_03,300.0)3. filter数据过滤作用根据条件筛选出符合要求的元素。场景脏数据过滤、业务规则筛选如只保留大额订单。java运行// 示例过滤出金额大于200的成功订单 DataStreamString highAmountStream orderStream .filter(line - { String[] fields line.split(,); double amount Double.parseDouble(fields[2]); String status fields[3]; // 条件支付成功 且 金额200 return pay_success.equals(status) amount 200; }); // 输出order_004,user_03,300,pay_success二、聚合算子核心统计聚合算子需结合keyBy使用先分组再聚合是实时统计的核心。1. keyBy数据分组作用按指定字段将数据流分组类似 SQL 的 GROUP BY是聚合的前提。注意keyBy返回KeyedStream只能在 KeyedStream 上执行聚合。java运行// 示例按用户ID分组 KeyedStreamTuple2String, Double, String keyedStream userAmountStream // 按Tuple2的第一个字段用户ID分组 .keyBy(tuple - tuple.f0);2. sum/avg/max/min基础聚合作用对分组后的数据进行求和、平均值、最大值、最小值计算。场景实时统计用户累计消费、订单最大金额等。java运行// 示例统计每个用户的累计消费金额 DataStreamTuple2String, Double sumStream keyedStream // 对Tuple2的第二个字段金额求和 .sum(1); // 输出(user_01,100.0) → (user_01,250.0) → (user_03,300.0) // 示例统计每个用户的平均消费金额 DataStreamTuple2String, Double avgStream keyedStream .avg(1); // 输出(user_01,100.0) → (user_01,125.0) → (user_03,300.0)3. reduce自定义聚合作用自定义聚合逻辑比 sum/avg 更灵活支持增量聚合。场景复杂统计如累计金额 订单数。java运行// 示例统计每个用户的累计金额和订单数Tuple3用户ID, 累计金额, 订单数 DataStreamTuple3String, Double, Integer reduceStream keyedStream .reduce((t1, t2) - { // t1历史聚合结果t2新到来的元素 String userId t1.f0; double totalAmount t1.f1 t2.f1; // 累计金额 int orderCount 1 (t1.f2 null ? 0 : t1.f2); // 订单数 return Tuple3.of(userId, totalAmount, orderCount); }, () - Tuple3.of(, 0.0, 0)); // 初始值 // 输出(user_01,100.0,1) → (user_01,250.0,2) → (user_03,300.0,1)三、窗口算子实时统计核心Flink 是流式计算窗口用于将无限流切分为有限的 “批次” 进行统计结合keyBy使用。1. 滚动窗口Tumbling Window作用窗口大小固定无重叠如每 5 分钟统计一次。场景固定周期统计如每小时用户消费总额。java运行// 示例5秒滚动窗口统计每个用户的消费总额 DataStreamTuple2String, Double tumblingWindowStream keyedStream // 5秒滚动窗口 .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 对金额求和 .sum(1);2. 滑动窗口Sliding Window作用窗口大小固定有重叠如每 2 分钟统计最近 5 分钟的数据。场景高频统计如实时监控每 10 秒统计最近 1 分钟的订单量。java运行// 示例滑动窗口窗口5秒滑动2秒 DataStreamTuple2String, Double slidingWindowStream keyedStream .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))) .sum(1);3. 会话窗口Session Window作用按用户会话划分窗口如用户连续操作 30 秒内为一个会话。场景用户行为分析如统计用户一次会话内的消费金额。java运行// 示例会话窗口超时时间3秒无操作3秒则窗口关闭 DataStreamTuple2String, Double sessionWindowStream keyedStream .window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))) .sum(1);四、连接 / 拆分算子1. union合并同类型数据流作用将多个同类型的数据流合并为一个字段结构必须完全一致。场景合并多来源的同类型数据如多个省份的订单流。java运行// 模拟第二个订单流 DataStreamTuple2String, Double orderStream2 env.fromElements( Tuple2.of(user_02, 180.0), Tuple2.of(user_03, 50.0) ); // 合并两个数据流 DataStreamTuple2String, Double unionStream userAmountStream.union(orderStream2);2. connect连接不同类型数据流作用连接两个不同类型的数据流支持自定义处理逻辑。场景关联补充数据如订单流 用户信息流。java运行// 模拟用户信息流用户ID, 用户名 DataStreamTuple2String, String userInfoStream env.fromElements( Tuple2.of(user_01, 张三), Tuple2.of(user_02, 李四) ); // 连接订单流和用户信息流 ConnectedStreamsTuple2String, Double, Tuple2String, String connectedStreams userAmountStream.connect(userInfoStream) // 按用户ID分组两个流的关联键 .keyBy(t1 - t1.f0, t2 - t2.f0); // 处理连接后的数据流关联用户名和消费金额 DataStreamString resultStream connectedStreams .map(new CoMapFunctionTuple2String, Double, Tuple2String, String, String() { Override public String map1(Tuple2String, Double order) throws Exception { // 处理订单流暂时无用户名先返回默认值 return order.f0 ,未知用户, order.f1; } Override public String map2(Tuple2String, String user) throws Exception { // 处理用户信息流暂时无金额先返回默认值 return user.f0 , user.f1 ,0.0; } });总结基础转换map一对一、flatMap一对多、filter过滤是数据预处理的核心几乎所有 Flink 任务都会用到聚合统计先keyBy分组再用sum/avg/reduce做聚合是实时统计的基础窗口核心滚动窗口无重叠、滑动窗口有重叠、会话窗口按会话是流式统计的关键需结合业务场景选择。

相关文章:

Flink算子

一、基础转换算子(最常用)这类算子用于对数据流进行基础的格式转换、过滤、映射,是处理数据的第一步。1. map:一对一转换作用:将数据流中的每个元素转换为另一个元素(输入 1 个,输出 1 个&#…...

ANIMATEDIFF PRO实战教程:批量生成不同风格(赛博/水墨/油画)动态作品

ANIMATEDIFF PRO实战教程:批量生成不同风格(赛博/水墨/油画)动态作品 1. 快速了解ANIMATEDIFF PRO ANIMATEDIFF PRO是一个专业的文生视频工具,它能让你用简单的文字描述,快速生成高质量的动态视频作品。无论你是想制…...

Phi-3-Mini-128K效果展示:处理带Markdown表格的API文档并生成测试用例

Phi-3-Mini-128K效果展示:处理带Markdown表格的API文档并生成测试用例 1. 工具核心能力概览 Phi-3-Mini-128K作为微软最新推出的轻量化对话模型,在处理结构化技术文档方面展现出惊人的能力。本次重点展示其两大核心能力: 复杂文档解析&…...

分支循环语句

总引 一.if语句 1.if 2.if…else… 3.分支中包含多条语句 一般直接加括号 4.if嵌套 5.else悬空问题 二.关系表达式 三.条件操作符 四.逻辑操作符 1.逻辑取反运算符 2.逻辑与运算符 3.逻辑或运算符 4.练习 5.练习 a a变成1,&&左边是0为假,直…...

BUCK输出响应不及时问题分析及解决

本文以问题原理分析解决措施形式,以系统休眠唤醒时导致BUCK电压跌落、负载瞬态响应慢问题为例,提供分析过程及工程化解决方案。 一、Buck电路输出电容如何选型?核心计算公式是什么? 问题分析 输出电容直接决定纹波大小、瞬态电流支…...

E = M * V * V / 2

中学动能公式 E M * V * V / 21500kg * 33m/s * 33m/s / 2 816750 J逆向思维,当然人家乐意,换我们肯定不干这事,这些都是噱头吹牛增加曝光没啥问题;最大的问题在于产品质量或者产品问题比较严峻,套路一老&#xff0…...

CRM [Customer Rating Score]

CRM [Customer Rating Score] 客户评级评分...

基于Python的工作量统计系统毕业设计

博主介绍:✌ 专注于Java,python,✌关注✌私信我✌具体的问题,我会尽力帮助你。一、研究目的本研究旨在设计并实现一个基于Python的工作量统计系统,以实现对计算机科学领域科研人员工作量的有效统计和分析。具体而言,研究目的可概括…...

【电路笔记 STM32】Cortex-M3 Cortex-M4 Cortex-M7 ARM架构区别+关键不同+图示对比+代码兼容性

文章目录 内核特性Cortex-M3架构特性:Cortex-M4架构特性:Cortex-M7架构特性: Cortex-M3 和 Cortex-M4关键不同点图示对比代码兼容性 Cortex-M4 和 Cortex-M7关键不同点图示对比代码兼容性 CG 内核特性 Cortex-M3架构特性: 特性 …...

智慧工地巡检 混凝土结构损伤检测数据集混凝土裂缝检测数据集 检测混凝土出现的裂缝露筋、剥落 YOLO模型数据集 目标检测算法

智慧工地巡检 混凝土结构损伤检测数据集混凝土裂缝检测数据集 检测混凝土出现的裂缝露筋、剥落 YOLO模型数据集 目标检测算法 数据集信息表项目内容数据集中文名混凝土结构损伤检测数据集图片数量3072 张类别裂缝、露筋、剥落数据集格式YOLO目标检测格式图片尺寸未明确标注 11…...

改进鲸鱼优化算法性能深度解析:多策略融合、参数优化与测试函数波形报告

改进鲸鱼优化算法(IWOA,自己融合了多策略改进,名字自己取的[破涕为笑]),具体改进公式会在readme说明文件中详细给出。 与鲸鱼算法,灰狼算法,麻雀算法,北方苍鹰算法,在初始种群为30,独…...

3步解决方案:ncmdump实现NCM音乐格式转换与跨平台播放自由

3步解决方案:ncmdump实现NCM音乐格式转换与跨平台播放自由 【免费下载链接】ncmdump 项目地址: https://gitcode.com/gh_mirrors/ncmd/ncmdump 你是否遇到过网易云音乐下载的NCM文件无法在车载音响、专业音频软件或其他播放器中使用的困扰?ncmdu…...

Lychee-Rerank效果展示:多场景文本匹配精度对比分析

Lychee-Rerank效果展示:多场景文本匹配精度对比分析 最近在折腾几个RAG应用,发现检索质量总是差那么点意思。用传统的BM25这类关键词匹配方法,查准率时高时低,尤其是面对一些表述灵活或者语义复杂的查询时,经常“答非…...

Qwen3.5-9B惊艳案例:工业图纸理解+故障描述生成真实项目复现

Qwen3.5-9B惊艳案例:工业图纸理解故障描述生成真实项目复现 1. 项目背景与模型特性 在工业制造领域,设备维护人员每天需要处理大量机械图纸和技术文档。传统的人工解读方式效率低下,且对经验要求极高。Qwen3.5-9B模型的出现为这一场景带来了…...

ozon小白入行指南:用CaptainAI解锁俄罗斯电商新蓝海

在俄罗斯电商市场持续火热的当下,Ozon平台凭借其覆盖全俄的物流网络和精准的本土化运营策略,成为跨境卖家掘金的新阵地。但对于初入行的“小白”而言,如何突破选品、物流、运营三重困局?而CaptainAI作为专为对俄电商设计的智能工具…...

弦音墨影实战教程:用自然语言‘识物于林间光影’完成视频片段定位

弦音墨影实战教程:用自然语言‘识物于林间光影’完成视频片段定位 1. 引言:当AI遇见水墨丹青 想象一下,你正在观看一部自然纪录片,画面中猎豹在草原上追逐羚羊。突然,你想找到"猎豹从右侧快速跑过草丛"的那…...

南北阁Nanbeige 4.1-3B实战:基于STM32CubeMX的嵌入式AI项目文档生成

南北阁Nanbeige 4.1-3B实战:基于STM32CubeMX的嵌入式AI项目文档生成 1. 引言:当嵌入式开发遇上AI助手 如果你用过STM32CubeMX,肯定对那个图形化界面又爱又恨。爱的是,点点鼠标就能配置好时钟树、外设引脚,生成初始化…...

形式化验证工具选型生死战:CBMC vs. KLEE vs. Serval——20年裸机开发老兵用17类中断场景压测结果说话

第一章:形式化验证工具选型生死战:CBMC vs. KLEE vs. Serval——20年裸机开发老兵用17类中断场景压测结果说话真实战场:17类ARM Cortex-M4中断驱动场景建模 在无OS裸机环境中,我们构建了覆盖NVIC优先级抢占、嵌套中断返回、PendSV…...

3步突破信息壁垒:面向研究者的开源内容解锁工具全指南

3步突破信息壁垒:面向研究者的开源内容解锁工具全指南 【免费下载链接】bypass-paywalls-chrome-clean 项目地址: https://gitcode.com/GitHub_Trending/by/bypass-paywalls-chrome-clean 在数字化阅读时代,付费墙已成为知识获取的主要障碍。据2…...

Qwen-Ranker Pro实战教程:结合Milvus/FAISS向量库构建完整RAG

Qwen-Ranker Pro实战教程:结合Milvus/FAISS向量库构建完整RAG 1. 引言:为什么需要语义重排序? 想象一下这样的场景:你在电商平台搜索"适合夏天穿的轻薄透气运动鞋",向量搜索引擎返回了100个结果&#xff0…...

RSL10 dongle 驱动识别不到

RSL10 USB Dongle(PN: RSL10-USB001GEVK ) 可作为central 设备对peripheral 设备进行确认与诊断也可在开发E7160sl presuite产品作为无线验配编程器使用。 有客户反馈在使用RSL10 USB Dongle作为无线验配编程器时,无法搜索到设备。...

ESRGAN实战:如何用Python快速提升模糊图片分辨率(附完整代码)

ESRGAN实战:用Python将模糊照片秒变高清的完整指南 每次翻看老照片或低分辨率截图时,那种"要是能再清晰一点就好了"的遗憾感,相信很多人都有体会。传统图像放大技术往往让图片变得更模糊或出现锯齿,而基于深度学习的超分…...

Qwen3与Unity引擎联动:为游戏过场动画实时生成字幕

Qwen3与Unity引擎联动:为游戏过场动画实时生成字幕 最近在琢磨一个挺有意思的事儿:怎么让游戏里的过场动画和角色对话,能自动配上精准的字幕。这事儿听起来简单,做起来可有不少门道。特别是对于开放世界或者剧情丰富的游戏&#…...

CVPR 2026 即插即用 | 卷积篇 | DEGConv:方向引导门控卷积,动态掩码强化结构区域,边缘/纹理/小目标结构全捕捉!

VX: shixiaodayyds,备注【即插即用】,添加即插即用模块交流群。 文章目录 模块出处 模块介绍 模块提出的动机(Motivation) 适用范围与模块效果 模块代码及使用方式 模块出处 Paper:MixerCSeg: An Efficient Mixer Architecture for Crack Segmentation via Decoupled Mamb…...

黑马LangChain4j - AI志愿填报顾问

认识AI AI发展史 AI, 人工智能, 使机器能够像人类一样思考、学习和解决问题的技术。 PS: 本节课主要讲了一些机器学习, 深度学习相关的概念知识, 可以先去看一下鱼书。鱼书真的手把手教会新手深度学习相关的所有知识。 AI市场分布 AI应用开发就是框起来部分需要做的事。 大模…...

Qwen3.5-9B作品集:支持红外热成像图+可见光图双模输入的工业设备诊断

Qwen3.5-9B作品集:支持红外热成像图可见光图双模输入的工业设备诊断 1. 模型核心能力展示 Qwen3.5-9B作为新一代多模态大模型,在工业设备诊断领域展现出独特优势。该模型能够同时处理红外热成像图和可见光图像,为设备状态监测提供双重验证。…...

C语言程序设计第四版(何钦铭、颜晖)第九章结构之输出平均分

【练习9-3】例9-1中,如果要计算的是三门课程的课程平均分,应该如何改写程序? #include<stdio.h> struct Student{int num;char name[10];int computer,english,math;double average; }; int main(){struct Student stu;int n,i;scanf("%d",&n);for(i1;i&…...

计算机图形学入门(openGL)持续更新

OpenGL概览 图形API(Application Programming Interface) 跨平台&#xff0c;跨编程语言的图形程序接口。用于调用GPU上的指令功能 游戏引擎底层都是由图形API制作出来的 OpenGL&#xff1a;是一个由Khronos组织制定并维护的规范(Specification) OpenGL实现&#xff1a;各个…...

Redis学习笔记(实战篇3)

一、分布式锁-redission 1. 存在的问题 (1) 不可重入&#xff1a; // 方法A加了分布式锁 public void methodA() {lock(); // 线程拿到锁methodB(); // 方法B也加了同一个分布式锁unlock(); }// 方法B也加了同一个分布式锁 public void methodB() {lock(); // 同一个线程再次…...

思维方式变革是指个体或群体在认知模式、问题解决路径、价值判断逻辑等方面发生的根本性转变

思维方式变革是指个体或群体在认知模式、问题解决路径、价值判断逻辑等方面发生的根本性转变。它不仅涉及知识更新或技能提升&#xff0c;更深层的是对“如何思考”本身的反思与重构。这种变革常由技术革命&#xff08;如人工智能普及&#xff09;、社会结构转型&#xff08;如…...