大数据-玩转数据-Flink-Transform
一、Transform
转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.

二、基本转换算子
2.1、map(映射)
将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素
package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;public class Transform_map {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> s_num = svn.fromElements(1, 2, 3, 4, 5);s_num.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer values) throws Exception {return values*values;}}).print();}
}
2.2、filter(过滤)
根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃
package com.lyh.flink05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;public class Transform_filter {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> elements = svn.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
// elements.filter(new FilterFunction<Integer>() {
// @Override
// public boolean filter(Integer integer) throws Exception {
// if (integer % 2 == 0 )
// return false;
// else {
// return true;
// }
// }
// }).print();elements.filter(value -> value % 2 != 0).print();}
}
2.3、flatMap(扁平映射)
消费一个元素并产生零个或多个元素
package com.lyh.flink05;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;import java.util.concurrent.ExecutionException;public class flatMap {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> dataSource = svn.fromElements(1, 2, 3);dataSource.flatMap(new FlatMapFunction<Integer, Integer>() {@Overridepublic void flatMap(Integer integer, Collector<Integer> collector) throws Exception {collector.collect(integer*integer);collector.collect(integer*integer*integer);}}).print();}
}
三、聚合算子
3.1、keyBy(按键分区)
把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中
package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1).print();env.execute("execute");}
}
3.2、sum,min,max,minBy,maxBy(简单聚合)
KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream
package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.sum(1).print();env.execute("execute");}
}
3.3、reduce(归约聚合)
一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

package com.lyh.flink05;import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyByReduce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(Tuple2.of(1L,2L),Tuple2.of(2L,4L),Tuple2.of(2L,9L),Tuple2.of(1L,9L),Tuple2.of(1L,2L),Tuple2.of(2L,3L)).keyBy(0).reduce(new ReduceFunction<Tuple2<Long, Long>>() {@Overridepublic Tuple2<Long, Long> reduce(Tuple2<Long, Long> values1, Tuple2<Long, Long> values2) throws Exception {return new Tuple2<>(values1.f0,values1.f1+values2.f1);}}).print();env.execute();}
}
3.4、process(底层处理)
process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身),process 用法比较灵活,后面再做专门研究。
package com.lyh.flink05;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Process_s {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value % 3 == 0) {//测流数据ctx.output(new OutputTag<Integer>("3%0", TypeInformation.of(Integer.class)), value);}if (value % 3 == 1) {//测流数据ctx.output(new OutputTag<Integer>("3%1", TypeInformation.of(Integer.class)), value);}//主流 ,数据out.collect(value);}});DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));output1.print();env.execute();}
}
四、合流算子
4.1、connect(连接)
在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.protocol.types.Field;public class connect_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4, 5);DataStreamSource<String> data2 = env.fromElements("a", "b", "c");ConnectedStreams<Integer, String> data3 = data1.connect(data2);data3.getFirstInput().print("data1");data3.getSecondInput().print("data2");env.execute();}
}
4.2、union(合并)
package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class union_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> data2 = env.fromElements(555,666);DataStreamSource<Integer> data3 = env.fromElements(999);DataStream<Integer> data = data1.union(data2).union(data3);data.print();env.execute();}
}
相关文章:
大数据-玩转数据-Flink-Transform
一、Transform 转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑. 二、基本转换算子 2.1、map(映射) 将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素…...
Java泛型集合简明教程
前言 我们编写一个数组并对数组进行排序,不管是对浮点型数组、整型数组、字符串数组或者是其他任何类型的数组进行排序,我们可以利用方法重载的方式,针对每种类型的数组分别编写一个排序方法,需要为几种类型的数组排序࿰…...
Prometheus-RabbitMQ Exporter
文章目录 一、介绍监控插件两个插件的区别一、 官方插件 rabbitmq_prometheus1 配置 RabbitMQ 集群名称2 授权使用插件2.1 配置文件方式2.2 命令行方式3 监听地址和端口4 RabbitMQ 插件获取指标的频率5 配置到 Prometheus6 关于聚合指标和每个对象指标6.1 获取聚合指标 `/metri…...
flink读取kafka数据存储iceberg
1、说明 使用flink实时的读取kafka的数据,并且实时的存储到iceberg中。好处是可以一边存数据,一边查询数据。当然使用clickhouse也可以实现数据的既存既取。而hive数据既存既读则会有问题。iceberg中数据读写数据都是从快照中开始的,读和写对…...
文章二:分支管理策略 - 分支玩转:Git分支管理实战
开始本篇文章之前先推荐一个好用的学习工具,AIRIght,借助于AI助手工具,学习事半功倍。欢迎访问:http://airight.fun 概述 在软件开发中,版本控制是一项至关重要的工作。Git作为目前最受欢迎的分布式版本控制系统&…...
JS dom元素和鼠标位置之间的一系列属性快速参考
clientHeight 获取对象的高度,不计算任何边距、边框、滚动条,但包括该对象的补白。 clientLeft 获取 offsetLeft 属性和客户区域的实际左边之间的距离。 clientTop 获取 offsetTop 属性和客户区域的实际顶端之间的距离。 clie…...
【剑指 Offer 39】数组中超过一半的数字
题目: 数组中有一个数字出现的次数超过数组长度的一半,请找出这个数字。 你可以假设数组是非空的,并且给定的数组总是存在多数元素。 示例: 输入: [1, 2, 3, 2, 2, 2, 5, 4, 2] 输出: 2 思考: 方法一:投…...
list.stream.filter,List<List>转换为List
1.filter过滤 返回符合查询条件的集合//过滤所有deviceType为1的List<DeviceWorkTimeEntity> list entities.stream().filter(a -> "1".equals(a.getDeviceType())).toList(); 2.List<List>转换为List 可以使用流(Stream)的flatMap操作 public cl…...
手机里视频太大怎么压缩?压缩教程分享
现在视频文件的体积越来越大了,动不动就是几个GB起步,如果后期再剪辑处理一下,更是会占据更多的设备空间了,还会导致我们传输受到限制,这时候就需要我们对视频进行压缩处理,下面给大家分享几个简单的方法&a…...
Spring-Cloud-Loadblancer详细分析_3
前两篇文章介绍了加载过程,本文从Feign的入口开始分析执行过程,还是从FeignBlockingLoadBalancerClient.execute来入手 public class FeignBlockingLoadBalancerClient implements Client {private static final Log LOG LogFactory.getLog(FeignBlock…...
使用 VScode 开发 ROS 的Python程序(简例)
一、任务介绍 本篇作为ROS学习的第二篇,是关于如何在Ubuntu18.04中使用VSCode编写一个Python程序,输出“Hello!”的内容介绍。 首先我们来了解下ROS的文件系统,ROS文件系统级指的是在硬盘上ROS源代码的组织形式,其结构…...
2022年03月 C/C++(一级)真题解析#中国电子学会#全国青少年软件编程等级考试
第1题:双精度浮点数的输入输出 输入一个双精度浮点数,保留8位小数,输出这个浮点数。 时间限制:1000 内存限制:65536 输入 只有一行,一个双精度浮点数。 输出 一行,保留8位小数的浮点数。 样例输…...
HarmonyOS/OpenHarmony应用开发-ArkTSAPI系统能力SystemCapability列表
SysCap,全称SystemCapability,即系统能力,指操作系统中每一个相对独立的特性。 开发者使用某个接口进行开发前,建议先阅读系统能力使用说明,了解Syscap的定义和使用指导。 说明 当前列表枚举出3.1 Beta版本中支持的…...
【01】基础知识:typescript安装及使用,开发工具vscode配置
一、typescript 了解 typeScript 是由微软开发的一款开源的编程语言。 typeScript 是 javascript 的超级,遵循最新的 es6、es5规范。 typeScript 扩展了 javaScript 的语法。 typeScript 更像后端 java、C# 这样的面向对象语言,可以让 js 开发大型企…...
用C++实现的RTS游戏的路径查找算法(A*、JPS、Wall-tracing)
在实时策略(RTS)游戏中,路径查找是一个关键的问题。游戏中的单位需要能够找到从一个地方到另一个地方的最佳路径。这个问题在计算机科学中被广泛研究,有许多已经存在的算法可以解决这个问题。在本文中,我们将探讨三种在…...
helm 制作应用的离线安装包
helm 制作应用的离线安装包 1、安装helm 到helm下载对应的压缩包:https://github.com/helm/helm/releases 解压,将helm文件cp到/usr/local/bin/ 文件夹下,查看helm版本;不同的k8s对应不同的helm版本,下载时留心注意…...
RN实现混合式开发-内嵌html
介绍 React Native WebView是一个用于在React Native应用中嵌入Web内容的组件。它允许你在应用中显示网页、加载HTML字符串、运行JavaScript代码等。 使用 首先,你需要在你的React Native项目中安装React Native WebView库。可以使用以下命令进行安装:…...
2000-2022年全国各地级市绿色金融指数数据
2000-2022年全国各地级市绿色金融指数数据 1、时间:2000-2022年 2、来源:来源:统计局、科技部、中国人民银行等权威机构网站及各种权威统计年鉴,包括全国及各省市统计年鉴、环境状况公报及一些专业统计年鉴,如 《中国…...
MachineLearningWu_13/P60-P64_Tensorflow
P60-P64的学习目录如下, x.1 TF网络模型实现 以一个简单的TF的分类网络为例,将模型翻译成框架下的语义,即如右侧所表达的。 当然上面对于分类网络的解释是一个简洁的解释,我们来进行更加具象的了解一下。左边是机器学习的三步骤&…...
centos7实现负载均衡
目录 一、基于 CentOS 7 构建 LVS-DR 集群。 1.1 配置lvs负载均衡服务 1.1.1 下载ipvsadm 1.1.2 增加vip 1.1.3 配置ipvsadm 1.2 配置rs1 1.2.1 编写测试页面 1.2.2 手工在RS端绑定VIP、添加路由 1.2.3 抑制arp响应 1.3 配置rs2 1.4 测试 二、配置nginx负载…...
第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...
RocketMQ延迟消息机制
两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数,对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后…...
Prompt Tuning、P-Tuning、Prefix Tuning的区别
一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...
黑马Mybatis
Mybatis 表现层:页面展示 业务层:逻辑处理 持久层:持久数据化保存 在这里插入图片描述 Mybatis快速入门 
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
Nuxt.js 中的路由配置详解
Nuxt.js 通过其内置的路由系统简化了应用的路由配置,使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...
C++中string流知识详解和示例
一、概览与类体系 C 提供三种基于内存字符串的流,定义在 <sstream> 中: std::istringstream:输入流,从已有字符串中读取并解析。std::ostringstream:输出流,向内部缓冲区写入内容,最终取…...
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...
Java线上CPU飙高问题排查全指南
一、引言 在Java应用的线上运行环境中,CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时,通常会导致应用响应缓慢,甚至服务不可用,严重影响用户体验和业务运行。因此,掌握一套科学有效的CPU飙高问题排查方法&…...
