大数据学习之Flink算子、了解DataStream API(基础篇一)
DataStream API (基础篇)
注: 本文只涉及DataStream
- 原因:随着大数据和流式计算需求的增长,处理实时数据流变得越来越重要。因此,DataStream由于其处理实时数据流的特性和能力,逐渐替代了DataSet成为了主流的数据处理方式。
目录
DataStream API (基础篇)
前摘:
一、执行环境
1. 创建执行环境
2. 执行模式
3. 触发程序执行
二、源算子(source)
三、转换算子(Transformation)
四、输出算子(sink)
前摘:
一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几 部分构成,如图所示:
- 获取执行环境(Execution Environment)
- 读取数据源(Source)
- 定义基于数据的转换操作(Transformations)
- 定义计算结果的输出位置(Sink)
- 触发程序执行(Execute)
其中,获取环境和触发执行,都可以认为是针对执行环境的操作。所以本章我们就从执行 环境、数据源(source)、转换操作(Transformation)、输出(Sink)四大部分,对常用的 DataStream API 做基本介绍。

一、执行环境
1. 创建执行环境
- 编写Flink程序的第一步就是创建执行环境。
- 我 们 要 获 取 的 执 行 环 境 , 是 StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础
- 在代码中创建执行环境的 方式,就是调用这个类的静态方法,具体有以下三种。
- getExecutionEnvironment
最简单的方式,就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文 直接得到正确的结果;//此处的 env 是 StreamExecutionEnvironment 对象 val env = StreamExecutionEnvironment.getExecutionEnvironment - createLocalEnvironment
这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果 不传入,则默认并行度就是本地的 CPU 核心数。//此处的 localEnvironment 是 StreamExecutionEnvironment 对象 val localEnvironment = StreamExecutionEnvironment.createLocalEnvironment() -
createRemoteEnvironment
这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定 要在集群中运行的 Jar 包。//此处的 remoteEnv 是 StreamExecutionEnvironment 对象 val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager 主机名1234, // JobManager 进程端口号"path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包 )
2. 执行模式
而从 1.12.0 版本起,Flink 实现了 API 上的流批统一。DataStream API 新增了一个重要特 性:可以支持不同的“执行模式”(execution mode),通过简单的设置就可以让一段 Flink 程序 在流处理和批处理之间切换。这样一来,DataSet API 也就没有存在的必要了。
- 流执行模式(STREAMING) 这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情 况下,程序使用的就是 STREAMING 执行模式。
- 批执行模式(BATCH) 专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架。 对于不会持续计算的有界数据,我们用这种模式处理会更方便。
- 自动模式(AUTOMATIC) 在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式
由于 Flink 程序默认是 STREAMING 模式,我们这里重点介绍一下 BATCH 模式的配置。 主要有两种方式:
(1)通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH。
(2)通过代码配置
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
3. 触发程序执行
我们需要显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一直等 待作业完成,然后返回一个执行结果(JobExecutionResult)。
env.execute()
二、源算子(source)
Source源算子(基础篇二)
三、转换算子(Transformation)
Transformation转换算子(基础篇三)
四、输出算子(sink)
持续更新中
相关文章:
大数据学习之Flink算子、了解DataStream API(基础篇一)
DataStream API (基础篇) 注: 本文只涉及DataStream 原因:随着大数据和流式计算需求的增长,处理实时数据流变得越来越重要。因此,DataStream由于其处理实时数据流的特性和能力,逐渐替代了DataSe…...
js中字符串string,遍历json/Object【匹配url、邮箱、电话,版本号,千位分割,判断回文】
目录 正则 合法的URL 邮箱、电话 字符串方法 千位分割:num.slice(render, len).match(/\d{3}/g).join(,) 版本号比较 判断回文 json/Object 遍历 自身属性 for...inhasOwnProperty(key) Object.获取数组(obj):Object.keys,Object…...
字符串和C预处理器
本文参考C Primer Plus第四章学习 文章目录 常量和预处理器const限定符 1. 常量和预处理器 有时,在程序中要使用常量。例如,可以这样计算圆的周长: circumference 3.14159 * diameter; 这里,常量3.14159 代表著名的常量 pi(π)。…...
Ultraleap 3Di新建项目之给所有的Joint挂载物体
工程文件 Ultraleap 3Di给所有的Joint挂载物体 前期准备 参考上一期文章,进行正确配置 Ultraleap 3Di配置以及在 Unity 中使用 Ultraleap 3Di手部跟踪 新建项目 初始项目如下: 新建Create Empty 将新建的Create Empty,重命名为LeapPro…...
关于session每次请求都会改变的问题
这几天在部署一个前后端分离的项目,使用docker进行部署,在本地测试没有一点问题没有,前脚刚把后端部署到服务器,后脚测试就出现了问题!查看控制台报错提示跨域错误?但是对于静态资源请求,包括登…...
【leetcode题解C++】150.逆波兰表达式求值 and 239.滑动窗口最大值 and 347.前k个高频元素
150.逆波兰表达式求值 给你一个字符串数组 tokens ,表示一个根据 逆波兰表示法 表示的算术表达式。 请你计算该表达式。返回一个表示表达式值的整数。 注意: 有效的算符为 、-、* 和 / 。每个操作数(运算对象)都可以是一个整数…...
【计网·湖科大·思科】实验三 总线型以太网的特性、集线器和交换机的区别、交换机的自学习算法
🕺作者: 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux 😘欢迎关注:👍点赞🙌收藏✍️留言 🏇码字不易,你的👍点赞🙌收藏❤️关注对我真的很重要&…...
API设计模式:REST、GraphQL、gRPC与tRPC全面解析
一、引言 在现代Web和微服务架构中,API(应用程序编程接口)的设计和实现方式至关重要。本文将探讨四种流行的API设计模式:REST(Representational State Transfer)、GraphQL、gRPC以及新兴的tRPC。每种模式都…...
C/C++ protobuf与json互转
测试环境 ubuntu16.04 64bitprotocbuf:3.9.1 (支持json转换需>3.0.0) 协议 syntax "proto2";message Person{optional string name 1;optional uint32 age 2;optional string address 3; }测试代码 //protobuf > 3.0.0#…...
Open CASCADE学习|圆柱螺旋线绘制原理探究
1、圆柱螺旋线绘制原理 在OCC中,圆柱面的参数方程为: 设P为(x0,y0,z0),则 xx0r*cos(u) yy0r*sin(u) zz0v 但u、v之间有关系时,此方程表达为圆柱螺旋线,u、v之间为线性关系时是等螺距螺旋线࿰…...
Python学习笔记--认识sys.argv
sys.argv 是 Python 的一个内置模块 sys 中的一个属性。它是一个列表,包含了从命令行传递给脚本的参数。 例如,如果你有一个名为 script.py 的脚本,并且你从终端窗口命令行这样运行它: >>>python script.py arg1 arg2 …...
【C++】入门基础
前言:C是在C的基础之上,容纳进去了面向对象编程思想,并增加了许多有用的库,以及编程范式等。熟悉C语言之后,对C学习有一定的帮助,因此从今天开始们将进入C的学习。 💖 博主CSDN主页:…...
Nginx与keepalived实现集群
提醒一下:下面实例讲解是在mac虚拟机里的Ubuntu系统演示的; Nginx与keepalived实现集群实现的效果 两台服务器都安装Nginx与keepalived: master服务器的ip(192.168.200.2) backup服务器的ip(192.168.200.4) 将 master服务器Nginx与keepalive…...
初识MQRabbitMQ快速入门
一、同步和异步通讯 微服务间通讯有同步和异步两种方式: 同步通讯:就像打电话,需要实时响应。 异步通讯:就像发邮件,不需要马上回复。 两种方式各有优劣,打电话可以立即得到响应,但是你却不能…...
javaMailSender 发送邮件,基于Spring Boot
目录 引入依赖 配置文件配置 具体代码 MultipartFile 转 File 工具类 引入依赖 <!--邮件--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><!--日…...
【汇总】解决Spring-Web与Spring-WebFlux冲突
【汇总】解决Spring-Web与Spring-WebFlux冲突 问题发现问题解决问题一:The bean requestMappingHandlerMapping, defined in class path resource [org/springframework/web/reactive/config/DelegatingWebFluxConfiguration.class],问题二:The Java/XML…...
maven 依赖配置补充
依赖配置补充 依赖范围 import 管理依赖最基本的办法是继承父工程,但是和 Java 类一样,Maven 也是单继承的。如果不同体系的依赖信息封装在不同 POM 中了,没办法继承多个父工程怎么办?这时就可以使用 import 依赖范围。 典型案…...
Pandas ------ 向 Excel 文件中写入含有合并表头的数据
Pandas ------ 向 Excel 文件中写入含有合并表头的数据 推荐阅读引言正文 推荐阅读 Pandas ------ 向 Excel 文件中写入含有 multi-index 和 Multi-column 表头的数据 引言 这里给大家介绍一下如何向 Excel 中写入带有合并表头的数据。 正文 import pandas as pddf1 pd.D…...
kafka summary
最近整体梳理之前用到的一些东西,回顾Kafka的时候好多东西都忘记了,把一些自己记的比较模糊并且感觉有用的东西整理一遍并且记忆一遍,仅用于记录以备后续回顾 Kafka的哪些场景中使用了零拷贝 生产者发送消息:在 Kafka 生产者发送…...
【新书推荐】2.6节 原码、反码和补码
回顾上一节中,我们讲解了整数的编码规则。 无符号整数编码规则:无符号整数全部都是正数,是什么就存什么。 有符号整数编码规则:有符号整数最高有效位为0是正数,最高有效位为1是负数。 本节内容:原码、反…...
深度学习在微纳光子学中的应用
深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向: 逆向设计 通过神经网络快速预测微纳结构的光学响应,替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
linux之kylin系统nginx的安装
一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源(HTML/CSS/图片等),响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址,提高安全性 3.负载均衡服务器 支持多种策略分发流量…...
Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)
文章目录 1.什么是Redis?2.为什么要使用redis作为mysql的缓存?3.什么是缓存雪崩、缓存穿透、缓存击穿?3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...
深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用
文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么?1.1.2 感知机的工作原理 1.2 感知机的简单应用:基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...
Docker拉取MySQL后数据库连接失败的解决方案
在使用Docker部署MySQL时,拉取并启动容器后,有时可能会遇到数据库连接失败的问题。这种问题可能由多种原因导致,包括配置错误、网络设置问题、权限问题等。本文将分析可能的原因,并提供解决方案。 一、确认MySQL容器的运行状态 …...
API网关Kong的鉴权与限流:高并发场景下的核心实践
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 引言 在微服务架构中,API网关承担着流量调度、安全防护和协议转换的核心职责。作为云原生时代的代表性网关,Kong凭借其插件化架构…...
xmind转换为markdown
文章目录 解锁思维导图新姿势:将XMind转为结构化Markdown 一、认识Xmind结构二、核心转换流程详解1.解压XMind文件(ZIP处理)2.解析JSON数据结构3:递归转换树形结构4:Markdown层级生成逻辑 三、完整代码 解锁思维导图新…...
