当前位置: 首页 > news >正文

Flink学习之旅:(四)Flink转换算子(Transformation)

1.基本转换算子

基本转换算子说明
映射(map)将数据流中的数据进行转换,形成新的数据流
过滤(filter)将数据流中的数据根据条件过滤
扁平映射(flatMap)将数据流中的整体(如:集合)拆分成个体使用。消费一个元素,产生0到多个元素

package com.qiyu.Transformation;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 11:00*/
public class Trans {/****  映射 map 算子* @param env*/public static void map(StreamExecutionEnvironment env){DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);//将集合中的元素值都 加上 100DataStream<Integer> map = stream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer integer) throws Exception {return integer+100;}});map.print();}/**** 过滤 filter 算子* @param env*/public static void filter(StreamExecutionEnvironment env){DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);//将集合中的值取模,不等于1的通行,反之过滤DataStream<Integer> filter = stream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer integer) throws Exception {if (integer % 2 != 1) {return true;}return false;}});filter.print();}/**** 扁平化 flatMap 算子* @param env*/public static void flatMap(StreamExecutionEnvironment env){DataStream<String> stream = env.fromElements("Flink is a powerful framework for stream and batch processing","It provides support for event time processing");//将字符串以空格分隔,拆成多个字符串个体stream.flatMap(new FlatMapFunction<String, Object>() {@Overridepublic void flatMap(String s, Collector<Object> collector) throws Exception {String[] words = s.split(" ");for (String word : words){collector.collect(word);}}}).print();}/*** 主程序类* @param args* @throws Exception*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//map(env);//filter(env);flatMap(env);env.execute();}
}

2.聚合算子

聚合算子说明
按键分区(keyBy)通过指定键(key),将一条流逻辑上划分为不同的分区。分区指的是并行任务的子任务,对应着任务槽(task solt)
简单聚合

sum():在输入流上,对指定的字段做叠加求和的操作。

min():在输入流上,对指定的字段求最小值。

max():在输入流上,对指定的字段求最大值。

minBy():在输入流上针对指定字段求最小值。

maxBy():在输入流上针对指定字段求最大值。

归约聚合(reduce)可以把每一个新输入的数据和当前已经归约出来的值,做聚合计算

package com.qiyu.Transformation;import com.qiyu.Source.Student;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 14:45*/
public class Aggregation {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 3),Tuple2.of("b", 3),Tuple2.of("b", 4));stream.keyBy(r -> r.f0).print();stream.keyBy(r -> r.f0).sum(1).print();stream.keyBy(r -> r.f0).min(1).print();stream.keyBy(r -> r.f0).max(1).print();stream.keyBy(r -> r.f0).maxBy(1).print();stream.keyBy(r -> r.f0).minBy(1).print();stream.keyBy(r -> r.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {return Tuple2.of(t1.f0, t1.f1 + t2.f1);}}).print();env.execute();}
}

相关文章:

Flink学习之旅:(四)Flink转换算子(Transformation)

1.基本转换算子 基本转换算子说明映射&#xff08;map&#xff09;将数据流中的数据进行转换&#xff0c;形成新的数据流过滤&#xff08;filter&#xff09;将数据流中的数据根据条件过滤扁平映射&#xff08;flatMap&#xff09;将数据流中的整体&#xff08;如&#xff1a;集…...

CesiumJS 中绘制大多边形

本文翻译自Cesium官方&#xff0c;有改动。 本文中提及到的“大多边形”就如下图所示。 在Cesium的早期版本和一些引擎中&#xff0c;我们绘制这种跨度比较大的多边形&#xff0c;经常会看到一些奇怪的冲突问题&#xff0c;如下图所示。 要渲染任何几何体&#xff0c;我们必…...

FreeRTOS移植以及任务

FreeRTOS移植 1.在sys.h中需要把SYSTEM_SUPPORT_OS 改为 1&#xff0c;支持我们使用 FreeRTOS //0,不支持 os //1,支持 os #define SYSTEM_SUPPORT_OS 1 //定义系统文件夹是否支持 OS2.出现报错 …\SYSTEM\usart\usart.c(6): error: #5: cannot open source input file “incl…...

笙默考试管理系统-MyExamTest----codemirror(41)

笙默考试管理系统-MyExamTest----codemirror&#xff08;40&#xff09; 目录 一、 笙默考试管理系统-MyExamTest 二、 笙默考试管理系统-MyExamTest 三、 笙默考试管理系统-MyExamTest 四、 笙默考试管理系统-MyExamTest 五、 笙默考试管理系统-MyExamTest 笙默考试…...

C#数据结构--数组和ArrayList

目录 本章目录&#xff1a; 2.1 数组基本概念 2.1.1 数组的声明和初始化 2.1.2 数组元素的设置和存取访问 2.1.4 多维数组 2.1.5 参数数组 2.2ArrayList 类 2.2.1ArrayList 类的成员 2.2.2 应用 ArrayList 类 数组和ArrayList之间的区别以及使用的场景 数组&#xf…...

Stable Diffusion WebUI扩展adetailer安装及功能介绍

ADetailer是Stable Diffusion WebUI的一个扩展,类似于检测细节器。 目录 安装地址 如何安装 1. Windows系统 (1)手动安装 (2)一体机...

Linux安装MINIO

MINIO简介MINIO目录 mkdir -p /opt/minio/data && cd /opt/minio MINIO下载 wget https://dl.minio.org.cn/server/minio/release/linux-amd64/minio MINIO授权 chmod x minio MINIO端口 firewall-cmd --zonepublic --add-port7171/tcp --permanent && firewal…...

Java架构师分布式搜索架构

目录 1 导学1.1 初识Elasticsearch1.1.1 Elasticsearch的作用1.1.2 ELK技术栈1.1.3 Elasticsearch和lucene1.1.4.为什么不是其他搜索技术?1.1.5.总结2 Elasticsearch快速建立知识体系3 Elasticsearch和MySQL实体建立联系3.1.mapping映射属性3.2 数据分组聚合3.2.1.聚合的种类3…...

简单宿舍管理系统(springboot+vue)

简单宿舍管理系统&#xff08;springbootvue&#xff09; 1.创建项目1.前端2.数据库3.后端 2.登陆1.前端1.准备工作2.登陆组件3.配置 2.后端1.链接数据库2.创建用户实体类3.数据操作持久层1.配置2.内容3.测试 4.中间业务层1.异常2.业务实现3.测试 5.响应前端控制层 3.前后对接4…...

Socks5代理、IP代理的关键作用

Socks5代理与SK5代理&#xff1a;网络安全的卫士 Socks5代理作为一项先进的代理协议&#xff0c;其多协议支持、身份验证功能以及UDP支持使其成为网络安全的重要支持者。 IP代理&#xff1a;隐私保护与无限访问的利器 IP代理技术通过隐藏真实IP地址&#xff0c;保护用户隐私…...

前端 CSS 经典:box-shadow

1. 基础属性 /* box-shadow: h-shadow v-shadow blur spread color inset; */ box-shadow: 10px 10px 2px 2px red inset; h-shadow: 必填&#xff0c;水平阴影的位置&#xff0c;允许负值 v-shadow: 必填&#xff0c;垂直阴影的位置&#xff0c;允许负值 blur: 可选&#xff…...

使用数组方法打印出 1 - 10000 之间的所有对称数。例如:121、1331等

&#xff08;我从别的人那复制的&#xff0c;原文章请点击此处&#xff09; 源代码&#xff1a; function getNum (start, end) {var arr [];for(var i start; i < end; i) {if (i.toString() i.toString().split().reverse().join() && i.toString().length &…...

DELM深度极限学习机回归预测研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

Spark大数据分析与实战笔记(第一章 Scala语言基础-5)

文章目录 每日一句正能量章节概要1.5 Scala的模式匹配与样例类1.5.1 模式匹配字符匹配匹配字符串守卫匹配类型匹配数组、元组、集合 1.5.2 样例类 课外补充偏函数 每日一句正能量 “成功的秘诀&#xff0c;在于对目标的执着追求。”——爱迪生 无论是在工作、学习、还是生活中&…...

shell学习脚本04(小滴课堂)

他就可以直接读出来了。不需要在sh后面加参数。 可以用-s隐藏内容&#xff1a; 可以用-t进行指定几秒后显示。 -n限制内容长度。 输入到长度为5自动打印。 我们把-s放到-p后面的话&#xff1a; 这样会出错。 如果最后加5m会一直闪烁。 大家可以按照需求自行使用。...

Python数据结构(链表)

Python数据结构&#xff08;链表&#xff09; 单向链表 单向链表也叫单链表&#xff0c;是链表中最简单的一种形式&#xff0c;它的每个节点包含两个域&#xff0c;一个信息域(元素域)和一个链接域。这个链接指向链表中的下一个节点&#xff0c;而最后一个节点的链接域则指向…...

连续/离散的控制系统阶跃测试(包括MATLAB里的step()函数)

阶跃测试 只要是连续时间系统&#xff0c;无论是传递函数还是连续状态空间形式的模型&#xff0c;直接可以用**step()**做阶跃测试&#xff1b;但是对于离散系统而言&#xff0c;不能用step()函数&#xff0c;可以自行编写代码&#xff0c;如下。 1、离散系统&#xff1a;x(k…...

【六:pytest框架介绍】

常见的请求对象requests.get()requests.post()requests.delete()requests.put()requests.request()常见的响应对象reprequests.request()//返回字符串格式数据print(req.text)//返回字节格式数据print(req.content)//返回字典格式数据print(req.json)#状态码print(req.status_c…...

提升医院安全的关键利器——医院安全(不良)事件报告系统源码

医院是人们寻求医疗服务和康复的场所&#xff0c;安全是医院运营的基石。然而&#xff0c;医疗过程中不可避免地会出现不良事件&#xff0c;如药物错误、手术事故等。为了及时发现、评估和解决这些问题&#xff0c;医院安全&#xff08;不良&#xff09;事件报告系统应运而生。…...

【瑞吉外卖部分功能补充】

瑞吉外卖部分功能补充 菜品的启售和停售 在浏览器控制台点击对应功能后可以看到前端发送的请求是&#xff1a;http://localhost:9999/dish/status/1?ids1413342036832100354&#xff0c;请求方式为POST。 接收到前端参数后&#xff0c;进行controller层代码补全&#xff0c…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解

突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 ​安全措施依赖问题​ GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘

美国西海岸的夏天&#xff0c;再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至&#xff0c;这不仅是开发者的盛宴&#xff0c;更是全球数亿苹果用户翘首以盼的科技春晚。今年&#xff0c;苹果依旧为我们带来了全家桶式的系统更新&#xff0c;包括 iOS 26、iPadOS 26…...

ES6从入门到精通:前言

ES6简介 ES6&#xff08;ECMAScript 2015&#xff09;是JavaScript语言的重大更新&#xff0c;引入了许多新特性&#xff0c;包括语法糖、新数据类型、模块化支持等&#xff0c;显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var&#xf…...

day52 ResNet18 CBAM

在深度学习的旅程中&#xff0c;我们不断探索如何提升模型的性能。今天&#xff0c;我将分享我在 ResNet18 模型中插入 CBAM&#xff08;Convolutional Block Attention Module&#xff09;模块&#xff0c;并采用分阶段微调策略的实践过程。通过这个过程&#xff0c;我不仅提升…...

在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:

在 HarmonyOS 应用开发中&#xff0c;手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力&#xff0c;既支持点击、长按、拖拽等基础单一手势的精细控制&#xff0c;也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档&#xff0c…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1

每日一言 生活的美好&#xff0c;总是藏在那些你咬牙坚持的日子里。 硬件&#xff1a;OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写&#xff0c;"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用

1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南

精益数据分析&#xff08;97/126&#xff09;&#xff1a;邮件营销与用户参与度的关键指标优化指南 在数字化营销时代&#xff0c;邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天&#xff0c;我们将深入解析邮件打开率、网站可用性、页面参与时…...

听写流程自动化实践,轻量级教育辅助

随着智能教育工具的发展&#xff0c;越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式&#xff0c;也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建&#xff0c;…...