当前位置: 首页 > news >正文

大数据学习之Flink算子、了解DataStream API(基础篇一)

DataStream API (基础篇)


注: 本文只涉及DataStream

  • 原因:随着大数据和流式计算需求的增长,处理实时数据流变得越来越重要。因此,DataStream由于其处理实时数据流的特性和能力,逐渐替代了DataSet成为了主流的数据处理方式。

目录

DataStream API (基础篇)

前摘:

一、执行环境

1. 创建执行环境

2. 执行模式

3. 触发程序执行

二、源算子(source)

三、转换算子(Transformation)

四、输出算子(sink)


前摘:

一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几 部分构成,如图所示:

  • 获取执行环境(Execution Environment)
  • 读取数据源(Source)
  • 定义基于数据的转换操作(Transformations)
  • 定义计算结果的输出位置(Sink)
  • 触发程序执行(Execute)

其中,获取环境和触发执行,都可以认为是针对执行环境的操作。所以本章我们就从执行 环境、数据源(source)、转换操作(Transformation)、输出(Sink)四大部分,对常用的 DataStream API 做基本介绍。

一、执行环境

1. 创建执行环境

  • 编写Flink程序的第一步就是创建执行环境。
  • 我 们 要 获 取 的 执 行 环 境 , 是 StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础
  • 在代码中创建执行环境的 方式,就是调用这个类的静态方法,具体有以下三种。
  1. getExecutionEnvironment
    最简单的方式,就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文 直接得到正确的结果;
    //此处的 env 是 StreamExecutionEnvironment 对象
    val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. createLocalEnvironment
    这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果 不传入,则默认并行度就是本地的 CPU 核心数。
    //此处的 localEnvironment 是 StreamExecutionEnvironment 对象
    val localEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
    
  3. createRemoteEnvironment
    这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定 要在集群中运行的 Jar 包。

    //此处的 remoteEnv 是 StreamExecutionEnvironment 对象
    val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager 主机名1234, // JobManager 进程端口号"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包
    )
    

2. 执行模式

而从 1.12.0 版本起,Flink 实现了 API 上的流批统一。DataStream API 新增了一个重要特 性:可以支持不同的“执行模式”(execution mode),通过简单的设置就可以让一段 Flink 程序 在流处理和批处理之间切换。这样一来,DataSet API 也就没有存在的必要了。

  • 流执行模式(STREAMING) 这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情 况下,程序使用的就是 STREAMING 执行模式。
  • 批执行模式(BATCH) 专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架。 对于不会持续计算的有界数据,我们用这种模式处理会更方便。
  • 自动模式(AUTOMATIC) 在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式

由于 Flink 程序默认是 STREAMING 模式,我们这里重点介绍一下 BATCH 模式的配置。 主要有两种方式:

(1)通过命令行配置

bin/flink run -Dexecution.runtime-mode=BATCH ...

在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH。

(2)通过代码配置

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)

3. 触发程序执行

我们需要显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一直等 待作业完成,然后返回一个执行结果(JobExecutionResult)。

env.execute()

二、源算子(source)

Source源算子(基础篇二)

三、转换算子(Transformation)

Transformation转换算子(基础篇三)

四、输出算子(sink)

持续更新中

相关文章:

大数据学习之Flink算子、了解DataStream API(基础篇一)

DataStream API (基础篇) 注: 本文只涉及DataStream 原因:随着大数据和流式计算需求的增长,处理实时数据流变得越来越重要。因此,DataStream由于其处理实时数据流的特性和能力,逐渐替代了DataSe…...

js中字符串string,遍历json/Object【匹配url、邮箱、电话,版本号,千位分割,判断回文】

目录 正则 合法的URL 邮箱、电话 字符串方法 千位分割:num.slice(render, len).match(/\d{3}/g).join(,) 版本号比较 判断回文 json/Object 遍历 自身属性 for...inhasOwnProperty(key) Object.获取数组(obj):Object.keys,Object…...

字符串和C预处理器

本文参考C Primer Plus第四章学习 文章目录 常量和预处理器const限定符 1. 常量和预处理器 有时,在程序中要使用常量。例如,可以这样计算圆的周长: circumference 3.14159 * diameter; 这里,常量3.14159 代表著名的常量 pi(π)。…...

Ultraleap 3Di新建项目之给所有的Joint挂载物体

工程文件 Ultraleap 3Di给所有的Joint挂载物体 前期准备 参考上一期文章,进行正确配置 Ultraleap 3Di配置以及在 Unity 中使用 Ultraleap 3Di手部跟踪 新建项目 初始项目如下: 新建Create Empty 将新建的Create Empty,重命名为LeapPro…...

关于session每次请求都会改变的问题

这几天在部署一个前后端分离的项目,使用docker进行部署,在本地测试没有一点问题没有,前脚刚把后端部署到服务器,后脚测试就出现了问题!查看控制台报错提示跨域错误?但是对于静态资源请求,包括登…...

【leetcode题解C++】150.逆波兰表达式求值 and 239.滑动窗口最大值 and 347.前k个高频元素

150.逆波兰表达式求值 给你一个字符串数组 tokens ,表示一个根据 逆波兰表示法 表示的算术表达式。 请你计算该表达式。返回一个表示表达式值的整数。 注意: 有效的算符为 、-、* 和 / 。每个操作数(运算对象)都可以是一个整数…...

【计网·湖科大·思科】实验三 总线型以太网的特性、集线器和交换机的区别、交换机的自学习算法

🕺作者: 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux 😘欢迎关注:👍点赞🙌收藏✍️留言 🏇码字不易,你的👍点赞🙌收藏❤️关注对我真的很重要&…...

API设计模式:REST、GraphQL、gRPC与tRPC全面解析

一、引言 在现代Web和微服务架构中,API(应用程序编程接口)的设计和实现方式至关重要。本文将探讨四种流行的API设计模式:REST(Representational State Transfer)、GraphQL、gRPC以及新兴的tRPC。每种模式都…...

C/C++ protobuf与json互转

测试环境 ubuntu16.04 64bitprotocbuf:3.9.1 (支持json转换需>3.0.0) 协议 syntax "proto2";message Person{optional string name 1;optional uint32 age 2;optional string address 3; }测试代码 //protobuf > 3.0.0#…...

Open CASCADE学习|圆柱螺旋线绘制原理探究

1、圆柱螺旋线绘制原理 在OCC中,圆柱面的参数方程为: 设P为(x0,y0,z0),则 xx0r*cos(u) yy0r*sin(u) zz0v 但u、v之间有关系时,此方程表达为圆柱螺旋线,u、v之间为线性关系时是等螺距螺旋线&#xff0…...

Python学习笔记--认识sys.argv

sys.argv 是 Python 的一个内置模块 sys 中的一个属性。它是一个列表,包含了从命令行传递给脚本的参数。 例如,如果你有一个名为 script.py 的脚本,并且你从终端窗口命令行这样运行它: >>>python script.py arg1 arg2 …...

【C++】入门基础

前言:C是在C的基础之上,容纳进去了面向对象编程思想,并增加了许多有用的库,以及编程范式等。熟悉C语言之后,对C学习有一定的帮助,因此从今天开始们将进入C的学习。 💖 博主CSDN主页:…...

Nginx与keepalived实现集群

提醒一下:下面实例讲解是在mac虚拟机里的Ubuntu系统演示的; Nginx与keepalived实现集群实现的效果 两台服务器都安装Nginx与keepalived: master服务器的ip(192.168.200.2) backup服务器的ip(192.168.200.4) 将 master服务器Nginx与keepalive…...

初识MQRabbitMQ快速入门

一、同步和异步通讯 微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能…...

javaMailSender 发送邮件,基于Spring Boot

目录 引入依赖 配置文件配置 具体代码 MultipartFile 转 File 工具类 引入依赖 <!--邮件--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><!--日…...

【汇总】解决Spring-Web与Spring-WebFlux冲突

【汇总】解决Spring-Web与Spring-WebFlux冲突 问题发现问题解决问题一&#xff1a;The bean requestMappingHandlerMapping, defined in class path resource [org/springframework/web/reactive/config/DelegatingWebFluxConfiguration.class],问题二&#xff1a;The Java/XML…...

maven 依赖配置补充

依赖配置补充 依赖范围 import 管理依赖最基本的办法是继承父工程&#xff0c;但是和 Java 类一样&#xff0c;Maven 也是单继承的。如果不同体系的依赖信息封装在不同 POM 中了&#xff0c;没办法继承多个父工程怎么办&#xff1f;这时就可以使用 import 依赖范围。 典型案…...

Pandas ------ 向 Excel 文件中写入含有合并表头的数据

Pandas ------ 向 Excel 文件中写入含有合并表头的数据 推荐阅读引言正文 推荐阅读 Pandas ------ 向 Excel 文件中写入含有 multi-index 和 Multi-column 表头的数据 引言 这里给大家介绍一下如何向 Excel 中写入带有合并表头的数据。 正文 import pandas as pddf1 pd.D…...

kafka summary

最近整体梳理之前用到的一些东西&#xff0c;回顾Kafka的时候好多东西都忘记了&#xff0c;把一些自己记的比较模糊并且感觉有用的东西整理一遍并且记忆一遍&#xff0c;仅用于记录以备后续回顾 Kafka的哪些场景中使用了零拷贝 生产者发送消息&#xff1a;在 Kafka 生产者发送…...

【新书推荐】2.6节 原码、反码和补码

回顾上一节中&#xff0c;我们讲解了整数的编码规则。 无符号整数编码规则&#xff1a;无符号整数全部都是正数&#xff0c;是什么就存什么。 有符号整数编码规则&#xff1a;有符号整数最高有效位为0是正数&#xff0c;最高有效位为1是负数。 本节内容&#xff1a;原码、反…...

springboot 百货中心供应链管理系统小程序

一、前言 随着我国经济迅速发展&#xff0c;人们对手机的需求越来越大&#xff0c;各种手机软件也都在被广泛应用&#xff0c;但是对于手机进行数据信息管理&#xff0c;对于手机的各种软件也是备受用户的喜爱&#xff0c;百货中心供应链管理系统被用户普遍使用&#xff0c;为方…...

LeetCode - 394. 字符串解码

题目 394. 字符串解码 - 力扣&#xff08;LeetCode&#xff09; 思路 使用两个栈&#xff1a;一个存储重复次数&#xff0c;一个存储字符串 遍历输入字符串&#xff1a; 数字处理&#xff1a;遇到数字时&#xff0c;累积计算重复次数左括号处理&#xff1a;保存当前状态&a…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...

html-<abbr> 缩写或首字母缩略词

定义与作用 <abbr> 标签用于表示缩写或首字母缩略词&#xff0c;它可以帮助用户更好地理解缩写的含义&#xff0c;尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时&#xff0c;会显示一个提示框。 示例&#x…...

Java线上CPU飙高问题排查全指南

一、引言 在Java应用的线上运行环境中&#xff0c;CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时&#xff0c;通常会导致应用响应缓慢&#xff0c;甚至服务不可用&#xff0c;严重影响用户体验和业务运行。因此&#xff0c;掌握一套科学有效的CPU飙高问题排查方法&…...

【笔记】WSL 中 Rust 安装与测试完整记录

#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统&#xff1a;Ubuntu 24.04 LTS (WSL2)架构&#xff1a;x86_64 (GNU/Linux)Rust 版本&#xff1a;rustc 1.87.0 (2025-05-09)Cargo 版本&#xff1a;cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...

纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join

纯 Java 项目&#xff08;非 SpringBoot&#xff09;集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...

【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看

文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...

ZYNQ学习记录FPGA(一)ZYNQ简介

一、知识准备 1.一些术语,缩写和概念&#xff1a; 1&#xff09;ZYNQ全称&#xff1a;ZYNQ7000 All Pgrammable SoC 2&#xff09;SoC:system on chips(片上系统)&#xff0c;对比集成电路的SoB&#xff08;system on board&#xff09; 3&#xff09;ARM&#xff1a;处理器…...

倒装芯片凸点成型工艺

UBM&#xff08;Under Bump Metallization&#xff09;与Bump&#xff08;焊球&#xff09;形成工艺流程。我们可以将整张流程图分为三大阶段来理解&#xff1a; &#x1f527; 一、UBM&#xff08;Under Bump Metallization&#xff09;工艺流程&#xff08;黄色区域&#xff…...