Flink学习之旅:(四)Flink转换算子(Transformation)
1.基本转换算子
基本转换算子 说明 映射(map) 将数据流中的数据进行转换,形成新的数据流 过滤(filter) 将数据流中的数据根据条件过滤 扁平映射(flatMap) 将数据流中的整体(如:集合)拆分成个体使用。消费一个元素,产生0到多个元素
package com.qiyu.Transformation;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 11:00*/
public class Trans {/**** 映射 map 算子* @param env*/public static void map(StreamExecutionEnvironment env){DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);//将集合中的元素值都 加上 100DataStream<Integer> map = stream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer integer) throws Exception {return integer+100;}});map.print();}/**** 过滤 filter 算子* @param env*/public static void filter(StreamExecutionEnvironment env){DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);//将集合中的值取模,不等于1的通行,反之过滤DataStream<Integer> filter = stream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer integer) throws Exception {if (integer % 2 != 1) {return true;}return false;}});filter.print();}/**** 扁平化 flatMap 算子* @param env*/public static void flatMap(StreamExecutionEnvironment env){DataStream<String> stream = env.fromElements("Flink is a powerful framework for stream and batch processing","It provides support for event time processing");//将字符串以空格分隔,拆成多个字符串个体stream.flatMap(new FlatMapFunction<String, Object>() {@Overridepublic void flatMap(String s, Collector<Object> collector) throws Exception {String[] words = s.split(" ");for (String word : words){collector.collect(word);}}}).print();}/*** 主程序类* @param args* @throws Exception*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//map(env);//filter(env);flatMap(env);env.execute();}
}
2.聚合算子
聚合算子 说明 按键分区(keyBy) 通过指定键(key),将一条流逻辑上划分为不同的分区。分区指的是并行任务的子任务,对应着任务槽(task solt) 简单聚合 sum():在输入流上,对指定的字段做叠加求和的操作。
min():在输入流上,对指定的字段求最小值。
max():在输入流上,对指定的字段求最大值。
minBy():在输入流上针对指定字段求最小值。
maxBy():在输入流上针对指定字段求最大值。
归约聚合(reduce) 可以把每一个新输入的数据和当前已经归约出来的值,做聚合计算
package com.qiyu.Transformation;import com.qiyu.Source.Student;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 14:45*/
public class Aggregation {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 3),Tuple2.of("b", 3),Tuple2.of("b", 4));stream.keyBy(r -> r.f0).print();stream.keyBy(r -> r.f0).sum(1).print();stream.keyBy(r -> r.f0).min(1).print();stream.keyBy(r -> r.f0).max(1).print();stream.keyBy(r -> r.f0).maxBy(1).print();stream.keyBy(r -> r.f0).minBy(1).print();stream.keyBy(r -> r.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {return Tuple2.of(t1.f0, t1.f1 + t2.f1);}}).print();env.execute();}
}
相关文章:
Flink学习之旅:(四)Flink转换算子(Transformation)
1.基本转换算子 基本转换算子说明映射(map)将数据流中的数据进行转换,形成新的数据流过滤(filter)将数据流中的数据根据条件过滤扁平映射(flatMap)将数据流中的整体(如:集…...

CesiumJS 中绘制大多边形
本文翻译自Cesium官方,有改动。 本文中提及到的“大多边形”就如下图所示。 在Cesium的早期版本和一些引擎中,我们绘制这种跨度比较大的多边形,经常会看到一些奇怪的冲突问题,如下图所示。 要渲染任何几何体,我们必…...
FreeRTOS移植以及任务
FreeRTOS移植 1.在sys.h中需要把SYSTEM_SUPPORT_OS 改为 1,支持我们使用 FreeRTOS //0,不支持 os //1,支持 os #define SYSTEM_SUPPORT_OS 1 //定义系统文件夹是否支持 OS2.出现报错 …\SYSTEM\usart\usart.c(6): error: #5: cannot open source input file “incl…...
笙默考试管理系统-MyExamTest----codemirror(41)
笙默考试管理系统-MyExamTest----codemirror(40) 目录 一、 笙默考试管理系统-MyExamTest 二、 笙默考试管理系统-MyExamTest 三、 笙默考试管理系统-MyExamTest 四、 笙默考试管理系统-MyExamTest 五、 笙默考试管理系统-MyExamTest 笙默考试…...
C#数据结构--数组和ArrayList
目录 本章目录: 2.1 数组基本概念 2.1.1 数组的声明和初始化 2.1.2 数组元素的设置和存取访问 2.1.4 多维数组 2.1.5 参数数组 2.2ArrayList 类 2.2.1ArrayList 类的成员 2.2.2 应用 ArrayList 类 数组和ArrayList之间的区别以及使用的场景 数组…...
Stable Diffusion WebUI扩展adetailer安装及功能介绍
ADetailer是Stable Diffusion WebUI的一个扩展,类似于检测细节器。 目录 安装地址 如何安装 1. Windows系统 (1)手动安装 (2)一体机...

Linux安装MINIO
MINIO简介MINIO目录 mkdir -p /opt/minio/data && cd /opt/minio MINIO下载 wget https://dl.minio.org.cn/server/minio/release/linux-amd64/minio MINIO授权 chmod x minio MINIO端口 firewall-cmd --zonepublic --add-port7171/tcp --permanent && firewal…...
Java架构师分布式搜索架构
目录 1 导学1.1 初识Elasticsearch1.1.1 Elasticsearch的作用1.1.2 ELK技术栈1.1.3 Elasticsearch和lucene1.1.4.为什么不是其他搜索技术?1.1.5.总结2 Elasticsearch快速建立知识体系3 Elasticsearch和MySQL实体建立联系3.1.mapping映射属性3.2 数据分组聚合3.2.1.聚合的种类3…...

简单宿舍管理系统(springboot+vue)
简单宿舍管理系统(springbootvue) 1.创建项目1.前端2.数据库3.后端 2.登陆1.前端1.准备工作2.登陆组件3.配置 2.后端1.链接数据库2.创建用户实体类3.数据操作持久层1.配置2.内容3.测试 4.中间业务层1.异常2.业务实现3.测试 5.响应前端控制层 3.前后对接4…...
Socks5代理、IP代理的关键作用
Socks5代理与SK5代理:网络安全的卫士 Socks5代理作为一项先进的代理协议,其多协议支持、身份验证功能以及UDP支持使其成为网络安全的重要支持者。 IP代理:隐私保护与无限访问的利器 IP代理技术通过隐藏真实IP地址,保护用户隐私…...
前端 CSS 经典:box-shadow
1. 基础属性 /* box-shadow: h-shadow v-shadow blur spread color inset; */ box-shadow: 10px 10px 2px 2px red inset; h-shadow: 必填,水平阴影的位置,允许负值 v-shadow: 必填,垂直阴影的位置,允许负值 blur: 可选ÿ…...
使用数组方法打印出 1 - 10000 之间的所有对称数。例如:121、1331等
(我从别的人那复制的,原文章请点击此处) 源代码: function getNum (start, end) {var arr [];for(var i start; i < end; i) {if (i.toString() i.toString().split().reverse().join() && i.toString().length &…...

DELM深度极限学习机回归预测研究(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...

Spark大数据分析与实战笔记(第一章 Scala语言基础-5)
文章目录 每日一句正能量章节概要1.5 Scala的模式匹配与样例类1.5.1 模式匹配字符匹配匹配字符串守卫匹配类型匹配数组、元组、集合 1.5.2 样例类 课外补充偏函数 每日一句正能量 “成功的秘诀,在于对目标的执着追求。”——爱迪生 无论是在工作、学习、还是生活中&…...

shell学习脚本04(小滴课堂)
他就可以直接读出来了。不需要在sh后面加参数。 可以用-s隐藏内容: 可以用-t进行指定几秒后显示。 -n限制内容长度。 输入到长度为5自动打印。 我们把-s放到-p后面的话: 这样会出错。 如果最后加5m会一直闪烁。 大家可以按照需求自行使用。...

Python数据结构(链表)
Python数据结构(链表) 单向链表 单向链表也叫单链表,是链表中最简单的一种形式,它的每个节点包含两个域,一个信息域(元素域)和一个链接域。这个链接指向链表中的下一个节点,而最后一个节点的链接域则指向…...

连续/离散的控制系统阶跃测试(包括MATLAB里的step()函数)
阶跃测试 只要是连续时间系统,无论是传递函数还是连续状态空间形式的模型,直接可以用**step()**做阶跃测试;但是对于离散系统而言,不能用step()函数,可以自行编写代码,如下。 1、离散系统:x(k…...

【六:pytest框架介绍】
常见的请求对象requests.get()requests.post()requests.delete()requests.put()requests.request()常见的响应对象reprequests.request()//返回字符串格式数据print(req.text)//返回字节格式数据print(req.content)//返回字典格式数据print(req.json)#状态码print(req.status_c…...

提升医院安全的关键利器——医院安全(不良)事件报告系统源码
医院是人们寻求医疗服务和康复的场所,安全是医院运营的基石。然而,医疗过程中不可避免地会出现不良事件,如药物错误、手术事故等。为了及时发现、评估和解决这些问题,医院安全(不良)事件报告系统应运而生。…...

【瑞吉外卖部分功能补充】
瑞吉外卖部分功能补充 菜品的启售和停售 在浏览器控制台点击对应功能后可以看到前端发送的请求是:http://localhost:9999/dish/status/1?ids1413342036832100354,请求方式为POST。 接收到前端参数后,进行controller层代码补全,…...

龙虎榜——20250610
上证指数放量收阴线,个股多数下跌,盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型,指数短线有调整的需求,大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的:御银股份、雄帝科技 驱动…...
[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解
突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 安全措施依赖问题 GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘
美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...
ES6从入门到精通:前言
ES6简介 ES6(ECMAScript 2015)是JavaScript语言的重大更新,引入了许多新特性,包括语法糖、新数据类型、模块化支持等,显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var…...
day52 ResNet18 CBAM
在深度学习的旅程中,我们不断探索如何提升模型的性能。今天,我将分享我在 ResNet18 模型中插入 CBAM(Convolutional Block Attention Module)模块,并采用分阶段微调策略的实践过程。通过这个过程,我不仅提升…...
在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:
在 HarmonyOS 应用开发中,手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力,既支持点击、长按、拖拽等基础单一手势的精细控制,也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档,…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南
精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南 在数字化营销时代,邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天,我们将深入解析邮件打开率、网站可用性、页面参与时…...

听写流程自动化实践,轻量级教育辅助
随着智能教育工具的发展,越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式,也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建,…...