Flink学习之旅:(四)Flink转换算子(Transformation)
1.基本转换算子
基本转换算子 说明 映射(map) 将数据流中的数据进行转换,形成新的数据流 过滤(filter) 将数据流中的数据根据条件过滤 扁平映射(flatMap) 将数据流中的整体(如:集合)拆分成个体使用。消费一个元素,产生0到多个元素
package com.qiyu.Transformation;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 11:00*/
public class Trans {/**** 映射 map 算子* @param env*/public static void map(StreamExecutionEnvironment env){DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);//将集合中的元素值都 加上 100DataStream<Integer> map = stream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer integer) throws Exception {return integer+100;}});map.print();}/**** 过滤 filter 算子* @param env*/public static void filter(StreamExecutionEnvironment env){DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);//将集合中的值取模,不等于1的通行,反之过滤DataStream<Integer> filter = stream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer integer) throws Exception {if (integer % 2 != 1) {return true;}return false;}});filter.print();}/**** 扁平化 flatMap 算子* @param env*/public static void flatMap(StreamExecutionEnvironment env){DataStream<String> stream = env.fromElements("Flink is a powerful framework for stream and batch processing","It provides support for event time processing");//将字符串以空格分隔,拆成多个字符串个体stream.flatMap(new FlatMapFunction<String, Object>() {@Overridepublic void flatMap(String s, Collector<Object> collector) throws Exception {String[] words = s.split(" ");for (String word : words){collector.collect(word);}}}).print();}/*** 主程序类* @param args* @throws Exception*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//map(env);//filter(env);flatMap(env);env.execute();}
}
2.聚合算子
聚合算子 说明 按键分区(keyBy) 通过指定键(key),将一条流逻辑上划分为不同的分区。分区指的是并行任务的子任务,对应着任务槽(task solt) 简单聚合 sum():在输入流上,对指定的字段做叠加求和的操作。
min():在输入流上,对指定的字段求最小值。
max():在输入流上,对指定的字段求最大值。
minBy():在输入流上针对指定字段求最小值。
maxBy():在输入流上针对指定字段求最大值。
归约聚合(reduce) 可以把每一个新输入的数据和当前已经归约出来的值,做聚合计算
package com.qiyu.Transformation;import com.qiyu.Source.Student;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 14:45*/
public class Aggregation {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 3),Tuple2.of("b", 3),Tuple2.of("b", 4));stream.keyBy(r -> r.f0).print();stream.keyBy(r -> r.f0).sum(1).print();stream.keyBy(r -> r.f0).min(1).print();stream.keyBy(r -> r.f0).max(1).print();stream.keyBy(r -> r.f0).maxBy(1).print();stream.keyBy(r -> r.f0).minBy(1).print();stream.keyBy(r -> r.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {return Tuple2.of(t1.f0, t1.f1 + t2.f1);}}).print();env.execute();}
}
相关文章:
Flink学习之旅:(四)Flink转换算子(Transformation)
1.基本转换算子 基本转换算子说明映射(map)将数据流中的数据进行转换,形成新的数据流过滤(filter)将数据流中的数据根据条件过滤扁平映射(flatMap)将数据流中的整体(如:集…...

CesiumJS 中绘制大多边形
本文翻译自Cesium官方,有改动。 本文中提及到的“大多边形”就如下图所示。 在Cesium的早期版本和一些引擎中,我们绘制这种跨度比较大的多边形,经常会看到一些奇怪的冲突问题,如下图所示。 要渲染任何几何体,我们必…...
FreeRTOS移植以及任务
FreeRTOS移植 1.在sys.h中需要把SYSTEM_SUPPORT_OS 改为 1,支持我们使用 FreeRTOS //0,不支持 os //1,支持 os #define SYSTEM_SUPPORT_OS 1 //定义系统文件夹是否支持 OS2.出现报错 …\SYSTEM\usart\usart.c(6): error: #5: cannot open source input file “incl…...
笙默考试管理系统-MyExamTest----codemirror(41)
笙默考试管理系统-MyExamTest----codemirror(40) 目录 一、 笙默考试管理系统-MyExamTest 二、 笙默考试管理系统-MyExamTest 三、 笙默考试管理系统-MyExamTest 四、 笙默考试管理系统-MyExamTest 五、 笙默考试管理系统-MyExamTest 笙默考试…...
C#数据结构--数组和ArrayList
目录 本章目录: 2.1 数组基本概念 2.1.1 数组的声明和初始化 2.1.2 数组元素的设置和存取访问 2.1.4 多维数组 2.1.5 参数数组 2.2ArrayList 类 2.2.1ArrayList 类的成员 2.2.2 应用 ArrayList 类 数组和ArrayList之间的区别以及使用的场景 数组…...
Stable Diffusion WebUI扩展adetailer安装及功能介绍
ADetailer是Stable Diffusion WebUI的一个扩展,类似于检测细节器。 目录 安装地址 如何安装 1. Windows系统 (1)手动安装 (2)一体机...

Linux安装MINIO
MINIO简介MINIO目录 mkdir -p /opt/minio/data && cd /opt/minio MINIO下载 wget https://dl.minio.org.cn/server/minio/release/linux-amd64/minio MINIO授权 chmod x minio MINIO端口 firewall-cmd --zonepublic --add-port7171/tcp --permanent && firewal…...
Java架构师分布式搜索架构
目录 1 导学1.1 初识Elasticsearch1.1.1 Elasticsearch的作用1.1.2 ELK技术栈1.1.3 Elasticsearch和lucene1.1.4.为什么不是其他搜索技术?1.1.5.总结2 Elasticsearch快速建立知识体系3 Elasticsearch和MySQL实体建立联系3.1.mapping映射属性3.2 数据分组聚合3.2.1.聚合的种类3…...

简单宿舍管理系统(springboot+vue)
简单宿舍管理系统(springbootvue) 1.创建项目1.前端2.数据库3.后端 2.登陆1.前端1.准备工作2.登陆组件3.配置 2.后端1.链接数据库2.创建用户实体类3.数据操作持久层1.配置2.内容3.测试 4.中间业务层1.异常2.业务实现3.测试 5.响应前端控制层 3.前后对接4…...
Socks5代理、IP代理的关键作用
Socks5代理与SK5代理:网络安全的卫士 Socks5代理作为一项先进的代理协议,其多协议支持、身份验证功能以及UDP支持使其成为网络安全的重要支持者。 IP代理:隐私保护与无限访问的利器 IP代理技术通过隐藏真实IP地址,保护用户隐私…...
前端 CSS 经典:box-shadow
1. 基础属性 /* box-shadow: h-shadow v-shadow blur spread color inset; */ box-shadow: 10px 10px 2px 2px red inset; h-shadow: 必填,水平阴影的位置,允许负值 v-shadow: 必填,垂直阴影的位置,允许负值 blur: 可选ÿ…...
使用数组方法打印出 1 - 10000 之间的所有对称数。例如:121、1331等
(我从别的人那复制的,原文章请点击此处) 源代码: function getNum (start, end) {var arr [];for(var i start; i < end; i) {if (i.toString() i.toString().split().reverse().join() && i.toString().length &…...

DELM深度极限学习机回归预测研究(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...

Spark大数据分析与实战笔记(第一章 Scala语言基础-5)
文章目录 每日一句正能量章节概要1.5 Scala的模式匹配与样例类1.5.1 模式匹配字符匹配匹配字符串守卫匹配类型匹配数组、元组、集合 1.5.2 样例类 课外补充偏函数 每日一句正能量 “成功的秘诀,在于对目标的执着追求。”——爱迪生 无论是在工作、学习、还是生活中&…...

shell学习脚本04(小滴课堂)
他就可以直接读出来了。不需要在sh后面加参数。 可以用-s隐藏内容: 可以用-t进行指定几秒后显示。 -n限制内容长度。 输入到长度为5自动打印。 我们把-s放到-p后面的话: 这样会出错。 如果最后加5m会一直闪烁。 大家可以按照需求自行使用。...

Python数据结构(链表)
Python数据结构(链表) 单向链表 单向链表也叫单链表,是链表中最简单的一种形式,它的每个节点包含两个域,一个信息域(元素域)和一个链接域。这个链接指向链表中的下一个节点,而最后一个节点的链接域则指向…...

连续/离散的控制系统阶跃测试(包括MATLAB里的step()函数)
阶跃测试 只要是连续时间系统,无论是传递函数还是连续状态空间形式的模型,直接可以用**step()**做阶跃测试;但是对于离散系统而言,不能用step()函数,可以自行编写代码,如下。 1、离散系统:x(k…...

【六:pytest框架介绍】
常见的请求对象requests.get()requests.post()requests.delete()requests.put()requests.request()常见的响应对象reprequests.request()//返回字符串格式数据print(req.text)//返回字节格式数据print(req.content)//返回字典格式数据print(req.json)#状态码print(req.status_c…...

提升医院安全的关键利器——医院安全(不良)事件报告系统源码
医院是人们寻求医疗服务和康复的场所,安全是医院运营的基石。然而,医疗过程中不可避免地会出现不良事件,如药物错误、手术事故等。为了及时发现、评估和解决这些问题,医院安全(不良)事件报告系统应运而生。…...

【瑞吉外卖部分功能补充】
瑞吉外卖部分功能补充 菜品的启售和停售 在浏览器控制台点击对应功能后可以看到前端发送的请求是:http://localhost:9999/dish/status/1?ids1413342036832100354,请求方式为POST。 接收到前端参数后,进行controller层代码补全,…...

多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度
一、引言:多云环境的技术复杂性本质 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时,基础设施的技术债呈现指数级积累。网络连接、身份认证、成本管理这三大核心挑战相互嵌套:跨云网络构建数据…...

超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...
Leetcode 3577. Count the Number of Computer Unlocking Permutations
Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接:3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯,要想要能够将所有的电脑解锁&#x…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...

1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

srs linux
下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935,SRS管理页面端口是8080,可…...
C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)
名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...
鸿蒙(HarmonyOS5)实现跳一跳小游戏
下面我将介绍如何使用鸿蒙的ArkUI框架,实现一个简单的跳一跳小游戏。 1. 项目结构 src/main/ets/ ├── MainAbility │ ├── pages │ │ ├── Index.ets // 主页面 │ │ └── GamePage.ets // 游戏页面 │ └── model │ …...
flow_controllers
关键点: 流控制器类型: 同步(Sync):发布操作会阻塞,直到数据被确认发送。异步(Async):发布操作非阻塞,数据发送由后台线程处理。纯同步(PureSync…...
Java并发编程实战 Day 11:并发设计模式
【Java并发编程实战 Day 11】并发设计模式 开篇 这是"Java并发编程实战"系列的第11天,今天我们聚焦于并发设计模式。并发设计模式是解决多线程环境下常见问题的经典解决方案,它们不仅提供了优雅的设计思路,还能显著提升系统的性能…...