当前位置: 首页 > news >正文

大数据-玩转数据-Flink-Transform

一、Transform

转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.
在这里插入图片描述

二、基本转换算子

2.1、map(映射)

将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;public class Transform_map {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> s_num = svn.fromElements(1, 2, 3, 4, 5);s_num.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer values) throws Exception {return values*values;}}).print();}
}

2.2、filter(过滤)

根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃

package com.lyh.flink05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;public class Transform_filter {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> elements = svn.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
//        elements.filter(new FilterFunction<Integer>() {
//            @Override
//            public boolean filter(Integer integer) throws Exception {
//                if (integer % 2 == 0 )
//                        return false;
//                        else {
//                            return true;
//                }
//            }
//        }).print();elements.filter(value -> value % 2 != 0).print();}
}

2.3、flatMap(扁平映射)

消费一个元素并产生零个或多个元素

package com.lyh.flink05;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;import java.util.concurrent.ExecutionException;public class flatMap {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> dataSource = svn.fromElements(1, 2, 3);dataSource.flatMap(new FlatMapFunction<Integer, Integer>() {@Overridepublic void flatMap(Integer integer, Collector<Integer> collector) throws Exception {collector.collect(integer*integer);collector.collect(integer*integer*integer);}}).print();}
}

三、聚合算子

3.1、keyBy(按键分区)

把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1).print();env.execute("execute");}
}

3.2、sum,min,max,minBy,maxBy(简单聚合)

KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.sum(1).print();env.execute("execute");}
}

3.3、reduce(归约聚合)

一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
在这里插入图片描述

package com.lyh.flink05;import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyByReduce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(Tuple2.of(1L,2L),Tuple2.of(2L,4L),Tuple2.of(2L,9L),Tuple2.of(1L,9L),Tuple2.of(1L,2L),Tuple2.of(2L,3L)).keyBy(0).reduce(new ReduceFunction<Tuple2<Long, Long>>() {@Overridepublic Tuple2<Long, Long> reduce(Tuple2<Long, Long> values1, Tuple2<Long, Long> values2) throws Exception {return new Tuple2<>(values1.f0,values1.f1+values2.f1);}}).print();env.execute();}
}

3.4、process(底层处理)

process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身),process 用法比较灵活,后面再做专门研究。

package com.lyh.flink05;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Process_s {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value % 3 == 0) {//测流数据ctx.output(new OutputTag<Integer>("3%0", TypeInformation.of(Integer.class)), value);}if (value % 3 == 1) {//测流数据ctx.output(new OutputTag<Integer>("3%1", TypeInformation.of(Integer.class)), value);}//主流 ,数据out.collect(value);}});DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));output1.print();env.execute();}
}

四、合流算子

4.1、connect(连接)

在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.protocol.types.Field;public class connect_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4, 5);DataStreamSource<String> data2 = env.fromElements("a", "b", "c");ConnectedStreams<Integer, String> data3 = data1.connect(data2);data3.getFirstInput().print("data1");data3.getSecondInput().print("data2");env.execute();}
}

4.2、union(合并)

package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class union_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> data2 = env.fromElements(555,666);DataStreamSource<Integer> data3 = env.fromElements(999);DataStream<Integer> data = data1.union(data2).union(data3);data.print();env.execute();}
}

相关文章:

大数据-玩转数据-Flink-Transform

一、Transform 转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑. 二、基本转换算子 2.1、map&#xff08;映射&#xff09; 将数据流中的数据进行转换, 形成新的数据流&#xff0c;消费一个元素并产出一个元素…...

Java泛型集合简明教程

前言 我们编写一个数组并对数组进行排序&#xff0c;不管是对浮点型数组、整型数组、字符串数组或者是其他任何类型的数组进行排序&#xff0c;我们可以利用方法重载的方式&#xff0c;针对每种类型的数组分别编写一个排序方法&#xff0c;需要为几种类型的数组排序&#xff0…...

Prometheus-RabbitMQ Exporter

文章目录 一、介绍监控插件两个插件的区别一、 官方插件 rabbitmq_prometheus1 配置 RabbitMQ 集群名称2 授权使用插件2.1 配置文件方式2.2 命令行方式3 监听地址和端口4 RabbitMQ 插件获取指标的频率5 配置到 Prometheus6 关于聚合指标和每个对象指标6.1 获取聚合指标 `/metri…...

flink读取kafka数据存储iceberg

1、说明 使用flink实时的读取kafka的数据&#xff0c;并且实时的存储到iceberg中。好处是可以一边存数据&#xff0c;一边查询数据。当然使用clickhouse也可以实现数据的既存既取。而hive数据既存既读则会有问题。iceberg中数据读写数据都是从快照中开始的&#xff0c;读和写对…...

文章二:分支管理策略 - 分支玩转:Git分支管理实战

开始本篇文章之前先推荐一个好用的学习工具&#xff0c;AIRIght&#xff0c;借助于AI助手工具&#xff0c;学习事半功倍。欢迎访问&#xff1a;http://airight.fun 概述 在软件开发中&#xff0c;版本控制是一项至关重要的工作。Git作为目前最受欢迎的分布式版本控制系统&…...

JS dom元素和鼠标位置之间的一系列属性快速参考

clientHeight 获取对象的高度&#xff0c;不计算任何边距、边框、滚动条&#xff0c;但包括该对象的补白。 clientLeft 获取 offsetLeft 属性和客户区域的实际左边之间的距离。 clientTop 获取 offsetTop 属性和客户区域的实际顶端之间的距离。 clie…...

【剑指 Offer 39】数组中超过一半的数字

题目&#xff1a; 数组中有一个数字出现的次数超过数组长度的一半&#xff0c;请找出这个数字。 你可以假设数组是非空的&#xff0c;并且给定的数组总是存在多数元素。 示例&#xff1a; 输入: [1, 2, 3, 2, 2, 2, 5, 4, 2] 输出: 2 思考&#xff1a; 方法一&#xff1a;投…...

list.stream.filter,List<List>转换为List

1.filter过滤 返回符合查询条件的集合//过滤所有deviceType为1的List<DeviceWorkTimeEntity> list entities.stream().filter(a -> "1".equals(a.getDeviceType())).toList(); 2.List<List>转换为List 可以使用流(Stream)的flatMap操作 public cl…...

手机里视频太大怎么压缩?压缩教程分享

现在视频文件的体积越来越大了&#xff0c;动不动就是几个GB起步&#xff0c;如果后期再剪辑处理一下&#xff0c;更是会占据更多的设备空间了&#xff0c;还会导致我们传输受到限制&#xff0c;这时候就需要我们对视频进行压缩处理&#xff0c;下面给大家分享几个简单的方法&a…...

Spring-Cloud-Loadblancer详细分析_3

前两篇文章介绍了加载过程&#xff0c;本文从Feign的入口开始分析执行过程&#xff0c;还是从FeignBlockingLoadBalancerClient.execute来入手 public class FeignBlockingLoadBalancerClient implements Client {private static final Log LOG LogFactory.getLog(FeignBlock…...

使用 VScode 开发 ROS 的Python程序(简例)

一、任务介绍 本篇作为ROS学习的第二篇&#xff0c;是关于如何在Ubuntu18.04中使用VSCode编写一个Python程序&#xff0c;输出“Hello&#xff01;”的内容介绍。 首先我们来了解下ROS的文件系统&#xff0c;ROS文件系统级指的是在硬盘上ROS源代码的组织形式&#xff0c;其结构…...

2022年03月 C/C++(一级)真题解析#中国电子学会#全国青少年软件编程等级考试

第1题&#xff1a;双精度浮点数的输入输出 输入一个双精度浮点数&#xff0c;保留8位小数&#xff0c;输出这个浮点数。 时间限制&#xff1a;1000 内存限制&#xff1a;65536 输入 只有一行&#xff0c;一个双精度浮点数。 输出 一行&#xff0c;保留8位小数的浮点数。 样例输…...

HarmonyOS/OpenHarmony应用开发-ArkTSAPI系统能力SystemCapability列表

SysCap&#xff0c;全称SystemCapability&#xff0c;即系统能力&#xff0c;指操作系统中每一个相对独立的特性。 开发者使用某个接口进行开发前&#xff0c;建议先阅读系统能力使用说明&#xff0c;了解Syscap的定义和使用指导。 说明 当前列表枚举出3.1 Beta版本中支持的…...

【01】基础知识:typescript安装及使用,开发工具vscode配置

一、typescript 了解 typeScript 是由微软开发的一款开源的编程语言。 typeScript 是 javascript 的超级&#xff0c;遵循最新的 es6、es5规范。 typeScript 扩展了 javaScript 的语法。 typeScript 更像后端 java、C# 这样的面向对象语言&#xff0c;可以让 js 开发大型企…...

用C++实现的RTS游戏的路径查找算法(A*、JPS、Wall-tracing)

在实时策略&#xff08;RTS&#xff09;游戏中&#xff0c;路径查找是一个关键的问题。游戏中的单位需要能够找到从一个地方到另一个地方的最佳路径。这个问题在计算机科学中被广泛研究&#xff0c;有许多已经存在的算法可以解决这个问题。在本文中&#xff0c;我们将探讨三种在…...

helm 制作应用的离线安装包

helm 制作应用的离线安装包 1、安装helm 到helm下载对应的压缩包&#xff1a;https://github.com/helm/helm/releases 解压&#xff0c;将helm文件cp到/usr/local/bin/ 文件夹下&#xff0c;查看helm版本&#xff1b;不同的k8s对应不同的helm版本&#xff0c;下载时留心注意…...

RN实现混合式开发-内嵌html

介绍 React Native WebView是一个用于在React Native应用中嵌入Web内容的组件。它允许你在应用中显示网页、加载HTML字符串、运行JavaScript代码等。 使用 首先&#xff0c;你需要在你的React Native项目中安装React Native WebView库。可以使用以下命令进行安装&#xff1a;…...

2000-2022年全国各地级市绿色金融指数数据

2000-2022年全国各地级市绿色金融指数数据 1、时间&#xff1a;2000-2022年 2、来源&#xff1a;来源&#xff1a;统计局、科技部、中国人民银行等权威机构网站及各种权威统计年鉴&#xff0c;包括全国及各省市统计年鉴、环境状况公报及一些专业统计年鉴&#xff0c;如 《中国…...

MachineLearningWu_13/P60-P64_Tensorflow

P60-P64的学习目录如下&#xff0c; x.1 TF网络模型实现 以一个简单的TF的分类网络为例&#xff0c;将模型翻译成框架下的语义&#xff0c;即如右侧所表达的。 当然上面对于分类网络的解释是一个简洁的解释&#xff0c;我们来进行更加具象的了解一下。左边是机器学习的三步骤&…...

centos7实现负载均衡

目录 一、基于 CentOS 7 构建 LVS-DR 集群。 1.1 配置lvs负载均衡服务 1.1.1 下载ipvsadm 1.1.2 增加vip 1.1.3 配置ipvsadm 1.2 配置rs1 1.2.1 编写测试页面 1.2.2 手工在RS端绑定VIP、添加路由 1.2.3 抑制arp响应 1.3 配置rs2 1.4 测试 二、配置nginx负载…...

Python爬虫实战:研究MechanicalSoup库相关技术

一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下&#xff0c;商品详情API作为连接电商平台与开发者、商家及用户的关键纽带&#xff0c;其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息&#xff08;如名称、价格、库存等&#xff09;的获取与展示&#xff0c;已难以满足市场对个性化、智能…...

通过Wrangler CLI在worker中创建数据库和表

官方使用文档&#xff1a;Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后&#xff0c;会在本地和远程创建数据库&#xff1a; npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库&#xff1a; 现在&#xff0c;您的Cloudfla…...

Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件

今天呢&#xff0c;博主的学习进度也是步入了Java Mybatis 框架&#xff0c;目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学&#xff0c;希望能对大家有所帮助&#xff0c;也特别欢迎大家指点不足之处&#xff0c;小生很乐意接受正确的建议&…...

Qt Widget类解析与代码注释

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码&#xff0c;写上注释 当然可以&#xff01;这段代码是 Qt …...

渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止

<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet&#xff1a; https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...

什么是库存周转?如何用进销存系统提高库存周转率?

你可能听说过这样一句话&#xff1a; “利润不是赚出来的&#xff0c;是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业&#xff0c;很多企业看着销售不错&#xff0c;账上却没钱、利润也不见了&#xff0c;一翻库存才发现&#xff1a; 一堆卖不动的旧货…...

汇编常见指令

汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX&#xff08;不访问内存&#xff09;XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...

【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)

1.获取 authorizationCode&#xff1a; 2.利用 authorizationCode 获取 accessToken&#xff1a;文档中心 3.获取手机&#xff1a;文档中心 4.获取昵称头像&#xff1a;文档中心 首先创建 request 若要获取手机号&#xff0c;scope必填 phone&#xff0c;permissions 必填 …...

Go语言多线程问题

打印零与奇偶数&#xff08;leetcode 1116&#xff09; 方法1&#xff1a;使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...