当前位置: 首页 > news >正文

Flink学习之旅:(四)Flink转换算子(Transformation)

1.基本转换算子

基本转换算子说明
映射(map)将数据流中的数据进行转换,形成新的数据流
过滤(filter)将数据流中的数据根据条件过滤
扁平映射(flatMap)将数据流中的整体(如:集合)拆分成个体使用。消费一个元素,产生0到多个元素

package com.qiyu.Transformation;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 11:00*/
public class Trans {/****  映射 map 算子* @param env*/public static void map(StreamExecutionEnvironment env){DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);//将集合中的元素值都 加上 100DataStream<Integer> map = stream.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer integer) throws Exception {return integer+100;}});map.print();}/**** 过滤 filter 算子* @param env*/public static void filter(StreamExecutionEnvironment env){DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);//将集合中的值取模,不等于1的通行,反之过滤DataStream<Integer> filter = stream.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer integer) throws Exception {if (integer % 2 != 1) {return true;}return false;}});filter.print();}/**** 扁平化 flatMap 算子* @param env*/public static void flatMap(StreamExecutionEnvironment env){DataStream<String> stream = env.fromElements("Flink is a powerful framework for stream and batch processing","It provides support for event time processing");//将字符串以空格分隔,拆成多个字符串个体stream.flatMap(new FlatMapFunction<String, Object>() {@Overridepublic void flatMap(String s, Collector<Object> collector) throws Exception {String[] words = s.split(" ");for (String word : words){collector.collect(word);}}}).print();}/*** 主程序类* @param args* @throws Exception*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//map(env);//filter(env);flatMap(env);env.execute();}
}

2.聚合算子

聚合算子说明
按键分区(keyBy)通过指定键(key),将一条流逻辑上划分为不同的分区。分区指的是并行任务的子任务,对应着任务槽(task solt)
简单聚合

sum():在输入流上,对指定的字段做叠加求和的操作。

min():在输入流上,对指定的字段求最小值。

max():在输入流上,对指定的字段求最大值。

minBy():在输入流上针对指定字段求最小值。

maxBy():在输入流上针对指定字段求最大值。

归约聚合(reduce)可以把每一个新输入的数据和当前已经归约出来的值,做聚合计算

package com.qiyu.Transformation;import com.qiyu.Source.Student;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 14:45*/
public class Aggregation {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 3),Tuple2.of("b", 3),Tuple2.of("b", 4));stream.keyBy(r -> r.f0).print();stream.keyBy(r -> r.f0).sum(1).print();stream.keyBy(r -> r.f0).min(1).print();stream.keyBy(r -> r.f0).max(1).print();stream.keyBy(r -> r.f0).maxBy(1).print();stream.keyBy(r -> r.f0).minBy(1).print();stream.keyBy(r -> r.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {return Tuple2.of(t1.f0, t1.f1 + t2.f1);}}).print();env.execute();}
}

相关文章:

Flink学习之旅:(四)Flink转换算子(Transformation)

1.基本转换算子 基本转换算子说明映射&#xff08;map&#xff09;将数据流中的数据进行转换&#xff0c;形成新的数据流过滤&#xff08;filter&#xff09;将数据流中的数据根据条件过滤扁平映射&#xff08;flatMap&#xff09;将数据流中的整体&#xff08;如&#xff1a;集…...

CesiumJS 中绘制大多边形

本文翻译自Cesium官方&#xff0c;有改动。 本文中提及到的“大多边形”就如下图所示。 在Cesium的早期版本和一些引擎中&#xff0c;我们绘制这种跨度比较大的多边形&#xff0c;经常会看到一些奇怪的冲突问题&#xff0c;如下图所示。 要渲染任何几何体&#xff0c;我们必…...

FreeRTOS移植以及任务

FreeRTOS移植 1.在sys.h中需要把SYSTEM_SUPPORT_OS 改为 1&#xff0c;支持我们使用 FreeRTOS //0,不支持 os //1,支持 os #define SYSTEM_SUPPORT_OS 1 //定义系统文件夹是否支持 OS2.出现报错 …\SYSTEM\usart\usart.c(6): error: #5: cannot open source input file “incl…...

笙默考试管理系统-MyExamTest----codemirror(41)

笙默考试管理系统-MyExamTest----codemirror&#xff08;40&#xff09; 目录 一、 笙默考试管理系统-MyExamTest 二、 笙默考试管理系统-MyExamTest 三、 笙默考试管理系统-MyExamTest 四、 笙默考试管理系统-MyExamTest 五、 笙默考试管理系统-MyExamTest 笙默考试…...

C#数据结构--数组和ArrayList

目录 本章目录&#xff1a; 2.1 数组基本概念 2.1.1 数组的声明和初始化 2.1.2 数组元素的设置和存取访问 2.1.4 多维数组 2.1.5 参数数组 2.2ArrayList 类 2.2.1ArrayList 类的成员 2.2.2 应用 ArrayList 类 数组和ArrayList之间的区别以及使用的场景 数组&#xf…...

Stable Diffusion WebUI扩展adetailer安装及功能介绍

ADetailer是Stable Diffusion WebUI的一个扩展,类似于检测细节器。 目录 安装地址 如何安装 1. Windows系统 (1)手动安装 (2)一体机...

Linux安装MINIO

MINIO简介MINIO目录 mkdir -p /opt/minio/data && cd /opt/minio MINIO下载 wget https://dl.minio.org.cn/server/minio/release/linux-amd64/minio MINIO授权 chmod x minio MINIO端口 firewall-cmd --zonepublic --add-port7171/tcp --permanent && firewal…...

Java架构师分布式搜索架构

目录 1 导学1.1 初识Elasticsearch1.1.1 Elasticsearch的作用1.1.2 ELK技术栈1.1.3 Elasticsearch和lucene1.1.4.为什么不是其他搜索技术?1.1.5.总结2 Elasticsearch快速建立知识体系3 Elasticsearch和MySQL实体建立联系3.1.mapping映射属性3.2 数据分组聚合3.2.1.聚合的种类3…...

简单宿舍管理系统(springboot+vue)

简单宿舍管理系统&#xff08;springbootvue&#xff09; 1.创建项目1.前端2.数据库3.后端 2.登陆1.前端1.准备工作2.登陆组件3.配置 2.后端1.链接数据库2.创建用户实体类3.数据操作持久层1.配置2.内容3.测试 4.中间业务层1.异常2.业务实现3.测试 5.响应前端控制层 3.前后对接4…...

Socks5代理、IP代理的关键作用

Socks5代理与SK5代理&#xff1a;网络安全的卫士 Socks5代理作为一项先进的代理协议&#xff0c;其多协议支持、身份验证功能以及UDP支持使其成为网络安全的重要支持者。 IP代理&#xff1a;隐私保护与无限访问的利器 IP代理技术通过隐藏真实IP地址&#xff0c;保护用户隐私…...

前端 CSS 经典:box-shadow

1. 基础属性 /* box-shadow: h-shadow v-shadow blur spread color inset; */ box-shadow: 10px 10px 2px 2px red inset; h-shadow: 必填&#xff0c;水平阴影的位置&#xff0c;允许负值 v-shadow: 必填&#xff0c;垂直阴影的位置&#xff0c;允许负值 blur: 可选&#xff…...

使用数组方法打印出 1 - 10000 之间的所有对称数。例如:121、1331等

&#xff08;我从别的人那复制的&#xff0c;原文章请点击此处&#xff09; 源代码&#xff1a; function getNum (start, end) {var arr [];for(var i start; i < end; i) {if (i.toString() i.toString().split().reverse().join() && i.toString().length &…...

DELM深度极限学习机回归预测研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

Spark大数据分析与实战笔记(第一章 Scala语言基础-5)

文章目录 每日一句正能量章节概要1.5 Scala的模式匹配与样例类1.5.1 模式匹配字符匹配匹配字符串守卫匹配类型匹配数组、元组、集合 1.5.2 样例类 课外补充偏函数 每日一句正能量 “成功的秘诀&#xff0c;在于对目标的执着追求。”——爱迪生 无论是在工作、学习、还是生活中&…...

shell学习脚本04(小滴课堂)

他就可以直接读出来了。不需要在sh后面加参数。 可以用-s隐藏内容&#xff1a; 可以用-t进行指定几秒后显示。 -n限制内容长度。 输入到长度为5自动打印。 我们把-s放到-p后面的话&#xff1a; 这样会出错。 如果最后加5m会一直闪烁。 大家可以按照需求自行使用。...

Python数据结构(链表)

Python数据结构&#xff08;链表&#xff09; 单向链表 单向链表也叫单链表&#xff0c;是链表中最简单的一种形式&#xff0c;它的每个节点包含两个域&#xff0c;一个信息域(元素域)和一个链接域。这个链接指向链表中的下一个节点&#xff0c;而最后一个节点的链接域则指向…...

连续/离散的控制系统阶跃测试(包括MATLAB里的step()函数)

阶跃测试 只要是连续时间系统&#xff0c;无论是传递函数还是连续状态空间形式的模型&#xff0c;直接可以用**step()**做阶跃测试&#xff1b;但是对于离散系统而言&#xff0c;不能用step()函数&#xff0c;可以自行编写代码&#xff0c;如下。 1、离散系统&#xff1a;x(k…...

【六:pytest框架介绍】

常见的请求对象requests.get()requests.post()requests.delete()requests.put()requests.request()常见的响应对象reprequests.request()//返回字符串格式数据print(req.text)//返回字节格式数据print(req.content)//返回字典格式数据print(req.json)#状态码print(req.status_c…...

提升医院安全的关键利器——医院安全(不良)事件报告系统源码

医院是人们寻求医疗服务和康复的场所&#xff0c;安全是医院运营的基石。然而&#xff0c;医疗过程中不可避免地会出现不良事件&#xff0c;如药物错误、手术事故等。为了及时发现、评估和解决这些问题&#xff0c;医院安全&#xff08;不良&#xff09;事件报告系统应运而生。…...

【瑞吉外卖部分功能补充】

瑞吉外卖部分功能补充 菜品的启售和停售 在浏览器控制台点击对应功能后可以看到前端发送的请求是&#xff1a;http://localhost:9999/dish/status/1?ids1413342036832100354&#xff0c;请求方式为POST。 接收到前端参数后&#xff0c;进行controller层代码补全&#xff0c…...

五年级数学知识边界总结思考-下册

目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解&#xff1a;由来、作用与意义**一、知识点核心内容****二、知识点的由来&#xff1a;从生活实践到数学抽象****三、知识的作用&#xff1a;解决实际问题的工具****四、学习的意义&#xff1a;培养核心素养…...

【论文笔记】若干矿井粉尘检测算法概述

总的来说&#xff0c;传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度&#xff0c;通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...

基于Docker Compose部署Java微服务项目

一. 创建根项目 根项目&#xff08;父项目&#xff09;主要用于依赖管理 一些需要注意的点&#xff1a; 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件&#xff0c;否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...

Rust 异步编程

Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3

一&#xff0c;概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本&#xff1a;2014.07&#xff1b; Kernel版本&#xff1a;Linux-3.10&#xff1b; 二&#xff0c;Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01)&#xff0c;并让boo…...

【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的“no matching...“系列算法协商失败问题

【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的"no matching..."系列算法协商失败问题 摘要&#xff1a; 近期&#xff0c;在使用较新版本的OpenSSH客户端连接老旧SSH服务器时&#xff0c;会遇到 "no matching key exchange method found"​, "n…...

STM32HAL库USART源代码解析及应用

STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...

Python 训练营打卡 Day 47

注意力热力图可视化 在day 46代码的基础上&#xff0c;对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...

SpringAI实战:ChatModel智能对话全解

一、引言&#xff1a;Spring AI 与 Chat Model 的核心价值 &#x1f680; 在 Java 生态中集成大模型能力&#xff0c;Spring AI 提供了高效的解决方案 &#x1f916;。其中 Chat Model 作为核心交互组件&#xff0c;通过标准化接口简化了与大语言模型&#xff08;LLM&#xff0…...

6️⃣Go 语言中的哈希、加密与序列化:通往区块链世界的钥匙

Go 语言中的哈希、加密与序列化:通往区块链世界的钥匙 一、前言:离区块链还有多远? 区块链听起来可能遥不可及,似乎是只有密码学专家和资深工程师才能涉足的领域。但事实上,构建一个区块链的核心并不复杂,尤其当你已经掌握了一门系统编程语言,比如 Go。 要真正理解区…...