【Flink-scala】DataSet编程模型介绍及数据源
DataStream 学习
1.DataStream编程模型总结
文章目录
- DataStream 学习
- 介绍
- 一、DataSet编程模型
- 二、数据源
- 1.文件类数据源
- 2.集合类数据源
- 3.通用类数据源
- 4第三方文件系统
介绍
Flink把批处理看成是一个流处理的特例,因此可以在底层统一的流处理引擎上,同时提供了STREAM API和SET API,经典的有限数据流处理方式有:
由于批处理的对象是有界数据集,因此批处理不需要时间和窗口机制
一、DataSet编程模型
link批处理程序的基本运行流程包括以下4个步骤:
- 创建执行环境;
- 创建数据源;
- 指定对数据进行的转换操作;
- 指定数据计算的输出结果方式。
上面第1步中创建批处理执行环境的方式如下:
val env = ExecutionEnvironment.getExecutionEnvironment
此外,还需要在pom.xml文件中引入flink-scala_2.12依赖库,具体如下:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.11.2</version>
</dependency>
编程模型如图:
数据的处理过程:
读取数据源-进行转换操作-获取结果数据。
批处理数据的基本流程:
二、数据源
1.文件类数据源
Flink提供了从文件中读取数据生成DataSet的多种方法,具体如下:
readTextFile(path):逐行读取文件并将文件内容转换成DataSet类型数据集;
readTextFileWithValue(path):读取文本文件内容,并将文件内容转换成DataSet[StringValue]类型数据集。
该方法与readTextFile(String)不同的是,其泛型是StringValue,是一种可变的String类型,通过StringValue存储文本数据可以有效降低String对象创建数量,减小垃圾回收的压力;
readCsvFile(path):解析以逗号(或其他字符)分隔字段的文件,返回元组或POJO对象;
readSequenceFile(Key, Value, path):读取SequenceFile,以Tuple2<Key, Value>类型返回。
以readTextFile(path)为例,可以使用如下语句读取文本文件内容:
val dataSet : DataSet[String] = env.readTextFile("file:///home/hadoop/word.txt")
假设有一个CSV格式文件sales.csv,内容如下:
transactionId,customerId,itemId,amountPaid
111,1,1,100.0
112,2,2,505.0
113,1,3,510.0
114,2,4,600.0
115,3,2,500.0
则可以使用如下程序读取该CSV文件:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._object ReadCSVFile{def main(args: Array[String]): Unit = {val bEnv = ExecutionEnvironment.getExecutionEnvironmentval filePath="file:///home/hadoop/sales.csv"val csv = bEnv.readCsvFile[SalesLog](filePath,ignoreFirstLine = true)//这里csv.print()}case class SalesLog(transactionId:String,customerId:String,itemId:String,amountPaid:Double)//这里定义的类型
}
结果如下:
SalesLog(111,1,1,100.0)
SalesLog(112,2,2,505.0)
SalesLog(113,1,3,510.0)
SalesLog(114,2,4,600.0)
SalesLog(115,3,2,500.0)
2.集合类数据源
Flink提供了fromCollection()、fromElements()和generateSequence()
等方法,来构建集合类数据源,具体如下:
fromCollection()
:从集合中创建DataSet数据集,集合中的元素数据类型相同;
fromElements()
:从给定数据元素序列中创建DataSet数据集,且所有的数据对象类型必须一致;
generateSequence()
:指定一个范围区间,然后在区间内部生成数字序列数据集,由于是并行处理的,所以最终的顺序不能保证一致。
val myArray = Array("hello world","hadoop spark flink")
val collectionSet = env.fromCollection(myArray)//从集合中获取val dataSet = env.fromElements("hadoop","spark","flink")//一个个元素获取val numSet = env.generateSequence(1,10)//生成的数据 1 2 3 4 ... 10 包含10
3.通用类数据源
以Flink内置的JDBCInputFormat类为实例,介绍通用类数据源的用法。
假设已经在Linux系统中安装了MySQL数据库,在Linux终端中执行如下命令启动MySQL:
输入数据库登录密码以后,就可以启动MySQL了,然后,执行如下命令创建数据库,并添加数据:
$ create database flink
$ use flink
$ create table student(sno char(8),cno char(2),grade int);
$ insert into student values('95001','1',96);
$ insert into student values('95002','1',94);
新建代码文件InputFromMySQL.scala,内容如下:
i
mport org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._object InputFromMySQL{def main(args: Array[String]): Unit = {//创建执行环境val env = ExecutionEnvironment.getExecutionEnvironment//使用JDBC输入格式从关系数据库读取数据val inputMySQL = env.createInput(JDBCInputFormat.buildJDBCInputFormat()//数据库连接驱动名称.setDrivername("com.mysql.jdbc.Driver")//数据库连接驱动名称.setDBUrl("jdbc:mysql://localhost:3306/flink")//数据库连接用户名.setUsername("root")//数据库连接密码.setPassword("123456")//数据库连接查询SQL.setQuery("select sno,cno,grade from student")//字段类型、顺序和个数必须与SQL保持一致.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.INT_TYPE_INFO)).finish())inputMySQL.print()}
}
新建pom.xml文件,在里面添加与访问MySQL相关的依赖包,内容如下:
<project><groupId>cn.edu.xmu.dblab</groupId><artifactId>simple-project</artifactId><modelVersion>4.0.0</modelVersion><name>Simple Project</name><packaging>jar</packaging><version>1.0</version><repositories><repository><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories>
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.11.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.11.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.11.2</version></dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.40</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.11.2</version></dependency></dependencies><build><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
使用Maven工具对程序进行编译打包,然后,提交到Flink中运行(请确认Flink已经启动)。运行结束以后,可以在屏幕上看到如下的输出结果:
95001,1,96
95002,1,94
4第三方文件系统
Flink通过FileSystem类来抽象自己的文件系统,这个抽象提供了各类文件系统实现的通用操作和最低保证。
每种数据源(比如HDFS、S3、Alluxio、XtreemFS、FTP等)可以继承和实现FileSystem类,将数据从各个系统读取到Flink中。
DataSet API中内置了HDFS数据源,这里给出一个读取HDFS文件系统的一个实例,代码如下:
import org.apache.flink.api.scala.ExecutionEnvironmentobject ReadHDFS{def main(args: Array[String]): Unit = {//获取执行环境val env = ExecutionEnvironment.getExecutionEnvironment//创建数据源
val inputHDFS = env.readTextFile("hdfs://localhost:9000/word.txt")//打印输出
inputHDFS.print()}
}
获取数据源就1行代码 ,但是在pom中需要添加依赖。
在pom.xml文件中,需要添加与访问HDFS相关的依赖包,内容如下:
<project><groupId>cn.edu.xmu.dblab</groupId><artifactId>simple-project</artifactId><modelVersion>4.0.0</modelVersion><name>Simple Project</name><packaging>jar</packaging><version>1.0</version><repositories><repository><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository></repositories>
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.11.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.11.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.11.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency></dependencies><build><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><goals><goal>compile</goal></goals></execution></executions></plugin>
<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
使用Maven工具对程序进行编译打包。
为了让Flink能够顺利访问HDFS,需要修改环境变量
如果环境变量已经完成了修改,这里就不需要重复操作;如果还没有则修改,添加hadoop环境变量
修改如下。
$ vim ~/.bashrc
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
$ source ~/.bashrc
使用flink run命令把ReadHDFS程序提交到Flink中运行(请确认Flink和Hadoop已经启动),如果运行成功,就可以在屏幕上看到"hdfs://localhost:9000/word.txt"文件里面的内容了。
相关文章:

【Flink-scala】DataSet编程模型介绍及数据源
DataStream 学习 1.DataStream编程模型总结 文章目录 DataStream 学习介绍一、DataSet编程模型二、数据源1.文件类数据源2.集合类数据源3.通用类数据源4第三方文件系统 介绍 Flink把批处理看成是一个流处理的特例,因此可以在底层统一的流处理引擎上,同…...

Odrive源码分析(四) 位置爬坡算法
Odrive中自带一个简单的梯形速度爬坡算法,本文分析下这部分代码。 代码如下: #include <cmath> #include "odrive_main.h" #include "utils.hpp"// A sign function where input 0 has positive sign (not 0) float sign_ha…...

[Unity Shader][图形渲染] Shader数学基础11 - 复合变换详解
在图形学与Shader编程中,复合变换是将平移、旋转和缩放等基本几何变换组合在一起,从而实现更复杂的物体变换效果。复合变换的本质是通过矩阵的串联操作,依次应用多个变换。 本文将介绍复合变换的数学原理、矩阵计算方法及注意事项,并结合实际编程中的实现细节帮助你掌握其…...

使用Python实现智能家居控制系统:开启智慧生活的钥匙
友友们好! 我的新专栏《Python进阶》正式启动啦!这是一个专为那些渴望提升Python技能的朋友们量身打造的专栏,无论你是已经有一定基础的开发者,还是希望深入挖掘Python潜力的爱好者,这里都将是你不可错过的宝藏。 在这个专栏中,你将会找到: ● 深入解析:每一篇文章都将…...

使用 HTML5 Canvas 实现动态蜈蚣动画
使用 HTML5 Canvas 实现动态蜈蚣动画 1. 项目概述 我们将通过 HTML 和 JavaScript 创建一个动态蜈蚣。蜈蚣由多个节段组成,每个节段看起来像一个小圆形,并且每个节段上都附带有“脚”。蜈蚣的头部会在画布上随机移动。 完整代码在底部!&…...

计算机视觉目标检测——DETR(End-to-End Object Detection with Transformers)
计算机视觉目标检测——DETR(End-to-End Object Detection with Transformers) 文章目录 计算机视觉目标检测——DETR(End-to-End Object Detection with Transformers)摘要Abstract一、DETR算法1. 摘要(Abstract)2. 引言(Introduction&#…...

uniapp .gitignore
打开HBuilderX,在项目根目录下新建文件 .gitignore复制下面内容 #忽略unpackge目录下除了res目录的所有目录 unpackage/* !unpackage/res/#忽略.hbuilderx目录 .hbuilderx# 忽略node_modules目录下的所有文件 node_modules/# 忽略锁文件 package-lock.json yarn.l…...

JavaWeb Servlet的反射优化、Dispatcher优化、视图(重定向)优化、方法参数值获取优化
目录 1. 背景2. 实现2.1 pom.xml2.2 FruitController.java2.3 DispatcherServlet.java2.4 applicationContext.xml 3. 测试 1. 背景 前面我们做了Servlet的一个案例。但是存在很多问题,现在我们要做优化,优化的步骤如下: 每个Fruit请求都需…...

备忘一个FDBatchMove数据转存的问题
使用FDBatchMove的SQL导入excel表到sql表,设置条件时一头雾水,函数不遵守sql的规则。 比如替换字段的TAB键值为空,replace(字段名,char(9),)竟然提示错误,百思不得其解。 试遍了几乎所有的函数,竟然是chr(9)。 这个…...

CEF127 编译指南 MacOS 篇 - 编译 CEF(六)
1. 引言 经过前面的准备工作,我们已经完成了所有必要的环境配置。本文将详细介绍如何在 macOS 系统上编译 CEF127。通过正确的编译命令和参数配置,我们将完成 CEF 的构建工作,最终生成可用的二进制文件。 2. 编译前准备 2.1 确认环境变量 …...

【更新】LLM Interview
课程链接:BV1o217YeELo 文章目录 LLM基础相关1. LLMs概述2. 大语言模型尺寸3. LLMs的优势与劣势4. 常见的大模型分类5. 目前主流的LLMs开源模型体系有哪些(Prefix Decoder,Causal Decoder,Encoder-Decoder的区别是什么)…...

Django 视图中使用 Redis 缓存优化查询性能
在 Web 应用程序开发中,查询数据库是一个常见的操作,但如果查询过于频繁或耗时,就会影响应用程序的性能。为了解决这个问题,我们可以使用缓存技术,将查询结果暂时存储在内存中,从而减少对数据库的访问。本文将介绍如何在 Django 视图中使用 Redis 缓存来优化查询性能。 © …...

正则表达式解析与功能说明
正则表达式解析与功能说明 表达式说明 String regex "\\#\\{TOASRTRINNG\\((.*?)((.*?))\\)(\\})";该正则表达式的作用是匹配形如 #{TOASRTRINNG(...)} 的字符串格式。以下是正则表达式的详细解析: 拆解与解析 1. \\# 匹配:# 字符。说明…...

STUN服务器实现NAT穿透
NAT穿透的问题 在现代网络环境中,大多数设备都位于NAT(网络地址转换)设备后面。这给点对点(P2P)通信带来了挑战,因为NAT会阻止外部网络直接访问内部设备。STUN(Session Traversal Utilities for NAT)服务器就是为了解决这个问题而设计的。 STUN是什么?…...

音视频入门基础:MPEG2-TS专题(19)——FFmpeg源码中,解析TS流中的PES流的实现
一、引言 FFmpeg源码在解析完PMT表后,会得到该节目包含的视频和音频信息,从而找到音视频流。TS流的音视频流包含在PES流中。FFmpeg源码通过调用函数指针tss->u.pes_filter.pes_cb指向的回调函数解析PES流的PES packet: /* handle one TS…...

tomcat的安装以及配置(基于linuxOS)
目录 安装jdk环境 yum安装 验证JDK环境 安装tomcat应用 yum安装 编辑 使用yum工具进行安装 配置tomcat应用 关闭防火墙和selinux 查看端口开启情况 编辑 访问tomcat服务 安装扩展包 重启服务 查看服务 源码安装 进入tomcat官网进行下载 查找自己要用的to…...

因子分解(递归)
1.素分解式(简单版) 任务描述 编写函数,输出一个正整数的素数分解式。主函数的功能为输入若干正整数(大于1),输出每一个数的素分解式。素数分解式是指将整数写成若干素数(从小到大)乘积的形式。例如: 202*2*5 362*2*…...

【Python】pandas库---数据分析
大学毕业那年,你成了社会底层群众里,受教育程度最高的一批人。 前言 这是我自己学习Python的第四篇博客总结。后期我会继续把Python学习笔记开源至博客上。 上一期笔记有关Python的NumPy数据分析,没看过的同学可以去看看:【Pyt…...

RabbitMQ 的7种工作模式
RabbitMQ 共提供了7种⼯作模式,进⾏消息传递,. 官⽅⽂档:RabbitMQ Tutorials | RabbitMQ 1.Simple(简单模式) P:⽣产者,也就是要发送消息的程序 C:消费者,消息的接收者 Queue:消息队列,图中⻩⾊背景部分.类似⼀个邮箱,可以缓存消息;⽣产者向其中投递消息,消费者从其中取出消息…...

负载均衡式在线OJ
文章目录 项目介绍所用技术与开发环境所用技术开发环境 项目框架compiler_server模块compiler编译功能comm/util.hpp 编译时的临时文件comm/log.hpp 日志comm/util.hpp 时间戳comm/util.hpp 检查文件是否存在compile_server/compiler.hpp 编译功能总体编写 runner运行功能资源设…...

【3D打印机】启庞KP3S热床加热失败报错err6
最近天冷,打印机预热突然失败,热床无法加热,过了一段时间报错err6,查看另一篇资料说是天气冷原因,导致代码的PID控温部分达不到预期加热效果,从而自检报错,然后资料通过修改3D打印机代码的方式进…...

基于 MATLAB 的图像增强技术分享
一、引言 图像增强是数字图像处理中的重要环节,其目的在于改善图像的视觉效果,使图像更清晰、细节更丰富、对比度更高,以便于后续的分析、识别与理解等任务。MATLAB 作为一款功能强大的科学计算软件,提供了丰富的图像处理工具和函…...

前端知识补充—HTML
1. HTML 1.1 什么是HTML HTML(Hyper Text Markup Language), 超⽂本标记语⾔ 超⽂本: ⽐⽂本要强⼤. 通过链接和交互式⽅式来组织和呈现信息的⽂本形式. 不仅仅有⽂本, 还可能包含图⽚, ⾳频, 或者⾃已经审阅过它的学者所加的评注、补充或脚注等等 标记语⾔: 由标签构成的语⾔…...

安卓从Excel文件导入数据到SQLite数据库的实现
在现代的移动应用开发中,数据的处理和管理是至关重要的一环。有时候,我们需要从外部文件(如Excel文件)中导入数据,以便在应用程序中使用。本文将介绍如何在Android应用中使用Java代码从一个Excel文件中导入数据到SQLit…...

C/C++基础知识复习(44)
1) C 中多态性在实际项目中的应用场景 多态性是面向对象编程(OOP)中的一个重要特性,指的是不同的对象可以通过相同的接口来表现不同的行为。在 C 中,多态通常通过虚函数(virtual)和继承机制来实现。实际项…...

【day13】深入面向对象编程
【day12】回顾 在正文开始之前,先让我们回顾一下【day12】中的关键内容: 接口(Interface): interface关键字用于定义接口。implements关键字用于实现接口。 接口成员: 抽象方法:需要在实现类中…...

《 火星人 》
题目描述 人类终于登上了火星的土地并且见到了神秘的火星人。人类和火星人都无法理解对方的语言,但是我们的科学家发明了一种用数字交流的方法。这种交流方法是这样的,首先,火星人把一个非常大的数字告诉人类科学家,科学家破解这…...

盒子模型(内边距的设置)
所有元素都可以设置内边距属性和外边距属性大体相同,可参考上一篇,但有区别 内边距不能设置为负值padding-方向:尺寸 注意:使用内边距padding之后元素整体会变大,因为他是直接加上了内边距的大小,不改变元素…...

CentOS7网络配置,解决不能联网、ping不通外网、主机的问题
1. 重置 关闭Centos系统 编辑->虚拟网络编辑器 还原默认设置 2. 记录基本信息 查看网关地址,并记录在小本本上 查看网段,记录下 3. 修改网卡配置 启动Centos系统 非root用户,切换root su root查看Mac地址 ifconfig 或 ip addr记录下来 修改配置文件 vim /et…...

如何测继电器是否正常
继电器是一种电控制器件,广泛应用于自动控制、电力保护等领域。为了确保继电器的正常工作,定期检测其状态是非常必要的。以下是一些常用的方法来测试继电器是否正常工作: 1. 视觉检查: - 观察继电器的外观是否有损坏、变形或烧焦…...