3.flinkDateStreamAPI介绍env与source
执行环境
Flink可以在不同的环境上下文中运行.可以本地集成开发环境中运行也可以提交到远程集群环境运行.
不同的运行环境对应的flink的运行过程不同,需要首先获取flink的运行环境,才能将具体的job调度到不同的TaskManager
在flink中可以通过StreamExecutionEnvironment类获取不同的环境
- 自适应方式 getExecutionEnvironment
flink会根据运行的上下文自动推断出创建什么样的环境,也是开发中最常用的方式
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 创建本地环境 createLocalEnvironment
这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果
不传入,则默认并行度就是本地的 CPU 核心数。
StreamExecutionEnvironment.createLocalEnvironment();
- 创建远程集群运行环境 createRemoteEnvironment
这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定
要在集群中运行的 Jar 包。
// 创建远程执行环境// job manager hostString host = "node1";// job manager portint port = 6123;// 默认并行度int parallelism = 1;// jar包存在位置String jarFiles = "hdfs://flink/data/wordCount.class";StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(host, port, parallelism, jarFiles);
-
自定义SourceFunction 实现SourceFunction可以通过我们自定义方式加载数据
- SourceFunction 并发度只能是1
- ParallelSourceFunction支持setParallelism
public class FlinkCustomSourceOperatorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource ds = env.addSource(new MyCustomSourceOperator());ds.print();env.execute();}/*** 实现SourceFunction接口的run 方法 与 cancel*/static class MyCustomSourceOperator implements SourceFunction<Integer> {private boolean flag = true;private ThreadLocalRandom random = ThreadLocalRandom.current();/*** 数据收集方法* @param ctx* @throws Exception*/@Overridepublic void run(SourceContext<Integer> ctx) throws Exception {// flag标志位表示数据的生成是否停止while (flag) {// ctx source上下文 collect可以收集生成的数据流向下游ctx.collect(random.nextInt(3000));Thread.sleep(1000);}}/*** 任务停止方法*/@Overridepublic void cancel() {flag = false;}} }
运行模式
flink 在1.12.0版本上统一了批处理与流处理的API,两种数据都可以使用DataStreamAPI进行处理.默认都是以STREAM流式模式进行处理
设置方式
- 命令行设置
bin/flink run -Dexecution.runtime-mode=BATCH
- 代码设置
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
推荐通过命令行模式进行设置运行模式,而通过代码硬编码的形式灵活度较差
关于批与流处理的选择
批处理会等到数据全部就位之后一次性输出结果,流式处理会一直等待数据写入来一条处理一条,在如果数据有界的情况下直接输出效率更高,如果数据无界就只能使用流式处理
最后在编写完成flink程序之后需要显示调用execute方法程序才会真正执行
Flink支持的数据类型
Flink支持大部分Java与Scala数据类型
- 基本数据类型及其包装类
- 数组类型 包含基本类型数组和对象类型数组
- 复合数据类型
- POJO
- 元组
- 行类型ROW
- 辅助类型 Optional Either List Map等
- 泛型
元组类型和 POJO 类型最为灵活,复杂类型。而相比之 下,POJO 还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。
flink 对 POJO 类型的要求如下:
- 类是公共的(public)和独立的(没有非静态的内部类)
- 类有一个公共的无参构造方法
- 类中的所有字段是 public 且非 final 的;或者有一个公共的 getter 和 setter 方法,这些方法需要符合 Java bean 的命名规范
类型提示TypeHints
由于Java存在泛型擦除,还有一些lambda表达式的情况,flink无法推断出返回类型,此时可以通过类型提示在编译的时候就告诉flink泛型类型
flink提供改了TypeHints 与 Types两个类作为返回值类型提示明确告诉转换后的DataStream的数据类型
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String filePath = FileUtil.getAbsolutePath("classpath:input/wordcount.txt");DataStreamSource<String> ds = env.readTextFile(filePath);ds.flatMap((String data, Collector<Tuple2<String, Integer>> collector) -> {String[] word = data.split(" ");Arrays.stream(word).forEach(w -> {collector.collect(Tuple2.of(w, 1));});})// TypeHint 或者 Types.returns(new TypeHint<Tuple2<String, Integer>>() {}).keyBy(data -> data.f0).sum(1).print();env.execute();}
相关文章:
3.flinkDateStreamAPI介绍env与source
执行环境 Flink可以在不同的环境上下文中运行.可以本地集成开发环境中运行也可以提交到远程集群环境运行. 不同的运行环境对应的flink的运行过程不同,需要首先获取flink的运行环境,才能将具体的job调度到不同的TaskManager 在flink中可以通过StreamExecutionEnvironment类获取…...
$ 2 :数据类型
1.数据类型 1.1基本类型 a、整型int b、浮点型float c、字符型char 1.2构造类型 a、数组[ ] b、结构体struct 1.3指针类型 * 1.4空类型(void) 2.关键字 autoconstdoublefloatintshortstructunsignedbreakcontinueelseforlongsignedswitchvoidcasedefaultenumgotoregistersiz…...
类和对象 - 上
本文已收录至《C语言》专栏! 作者:ARMCSKGT 目录 前言 正文 面向过程与面向对象 面向过程的解决方法 面向对象的解决方法 面向对象的优势 类的引入 早期C类的实现 class定义类 class定义规则 类成员的两种定义方式 类的访问限定符及封装 访…...
补档:红黑树代码实现+简略讲解
红黑树讲解和实现1 红黑树介绍1.1 红黑树特性1.2 红黑树的插入1.3 红黑树的删除2 完整代码实现2.1 rtbtree.h头文件2.2 main.c源文件1 红黑树介绍 红黑树( Red-Black tree,简称RB树)是一种自平衡二叉查找树,是计算机科学中常见的一种数据结构,…...
FirePower X2 14.0.1 for RAD Studio Alexandria
介绍 FirePower X2 FirePower X2 集成了 RAD Studio 11.0 Alexandria 中的新功能,并预览了我们的新特色组件 TwwDataGrouper。 FirePower X2 还允许您为 Apple 的新 M1 芯片构建应用程序,这样您就可以进一步利用 M1 芯片来提高本机应用程序的性能&#x…...
二十九、MongoDB 恢复数据( mongorestore )
MongoDB mongorestore 脚本命令可以用来恢复备份的数据 语法 MongoDB mongorestore 命令脚本语法如下 $ mongorestore -h <hostname><:port> -d dbname <path> 参数说明 -h <:port>, -h<:port> MongoDB 所在服务器地址,默认为 l…...
【数据分析】缺失数据如何处理?pandas
本文目录1. 基础概念1.1. 缺失值分类1.2. 缺失值处理方法2. 缺失观测及其类型2.1. 了解缺失信息2.2. 三种缺失符号2.3. Nullable类型与NA符号2.4. NA的特性2.5. convert_dtypes方法3. 缺失数据的运算与分组 3.1. 加号与乘号规则3.2. groupby方法中的缺失值4. 填充与剔除4.1. fi…...
嵌入式开发--STM32H750VBT6开发中,新版本CubeMX的时钟问题,不能设置到最高速度480MHZ
嵌入式开发–STM32H750VBT6开发中,新版本CubeMX的时钟问题,不能设置到最高速度480MHZ 问题描述 之前开发的项目,开发环境是CubeMX6.6.1,H7系列的支持包版本是1.10.0。跑得没问题,最近需要对项目做修改,同…...
一文读懂PaddleSpeech中英混合语音识别技术
语音识别技术能够让计算机理解人类的语音,从而支持多种语音交互的场景,如手机应用、人车协同、机器人对话、语音转写等。然而,在这些场景中,语音识别的输入并不总是单一的语言,有时会出现多语言混合的情况。例如&#…...
问题三十四:傅立叶变换——高通滤波
高通滤波器是一种可以通过去除图像低频信息来增强高频信息的滤波器。在图像处理中,高通滤波器常常用于去除模糊或平滑效果,以及增强边缘或细节。在本篇回答中,我们将使用Python和OpenCV实现高通滤波器。 Step 1:加载图像并进行傅…...
flink 键控状态(keyed state)
github开源项目flink-note的笔记。本博客的实现代码都写在项目的flink-state/src/main/java/state/keyed/KeyedStateDemo.java文件中。 项目github地址: github 1. flink键控状态 flink键控状态是作用与flink KeyedStream上的,也就是说需要将DataStream先进行keyby之后才能使…...
【ChatGPT】sqlachmey 多表连表查询语句
感受下科技带来的魅力,这篇文章是通过ChatGPT自动生成的,不得不说技术强大!!! 在SQLAlchemy中进行多表连接查询可以使用join()方法或join()函数,具体用法如下: join()方法 join()方法可以在SQLAlchemy ORM中的查询中使用。假设…...
win11 系统登录问题,PIN 设置问题
我的电脑配置是华为MateBook X Pro 12,i7处理器,16G,1T,win11 系统通过微软账户登录,下午一直登录不进去,网络能连外网,分析应该是连微软服务器不行。连续登录几十次,偶尔可能有一次…...
数据结构六大排序
1.插入排序 思路: 从第一个元素开始认为是有序的,去一个元素tem从有序序列从后往前扫描,如果该元素大于tem,将该元素一刀下一位,循环步骤3知道找到有序序列中小于等于的元素将tem插入到该元素后,如果已排序…...
快速生成QR码的方法:教你变成QR Code Master
目录 简介: 具体实现步骤: 一、可以使用Python中的qrcode和tkinter模块来生成QR码。以下是一个简单的例子,演示如何在Tkinter窗口中获取用户输入并使用qrcode生成QR码。 1)首先需要安装qrcode模块,可以使用以下命令在终端或命令…...
tensorflow1.14.0安装教程--保姆级
//方法不止一种,下面仅展示一种。 注:本人电脑为win11,anaconda的python版本为3.9,但tensorflow需要python版本为3.7,所以下面主要阐述将python版本改为3.7后的安装过程以及常遇到的问题。 1.首先电脑安装好anaconda…...
AcWing算法提高课-3.1.3香甜的黄油
宣传一下算法提高课整理 <— CSDN个人主页:更好的阅读体验 <— 题目传送门点这里 题目描述 农夫John发现了做出全威斯康辛州最甜的黄油的方法:糖。 把糖放在一片牧场上,他知道 N 只奶牛会过来舔它,这样就能做出能卖好价…...
私库搭建1:Nexus 安装 Docker 版
本文内容以语雀为准 文档 https://hub.docker.com/r/sonatype/nexus3Docker 安装:https://www.yuque.com/xuxiaowei-com-cn/gitlab-k8s/docker-install 安装 创建文件夹 由于 Nexus 的数据可能会很大,比如:作为 Docker、Maven 私库时&…...
LeetCode-面试题 05.02. 二进制数转字符串【数学,字符串,位运算】
LeetCode-面试题 05.02. 二进制数转字符串【数学,字符串,位运算】题目描述:解题思路一:简单暴力。小数点后面的二进制,now首先从0.5开始之和每次除以2。然后依次判断当前数是否大于now,是则答案加1。若等于…...
pandas: 三种算法实现递归分析Excel中各列相关性
目录 前言 目的 思路 代码实现 1. 循环遍历整个SDGs列,两两拿到数据 2. 调用pandas库函数直接进行分析 完整源码 运行效果 总结 前言 博主之前刚刚被学弟邀请参与了2023美赛,这也是第一次正式接触数学建模竞赛,现在已经提交等待结果…...
BAAI/bge-m3应用案例:在文档检索系统中实现精准语义匹配
BAAI/bge-m3应用案例:在文档检索系统中实现精准语义匹配 1. 项目背景与核心价值 在当今信息爆炸的时代,企业和个人都面临着海量文档管理的挑战。传统的关键词搜索方式已经无法满足精准检索的需求,特别是在处理专业术语、同义词和跨语言文档…...
OpenClaw+nanobot镜像:3步配置QQ聊天机器人触发AI任务
OpenClawnanobot镜像:3步配置QQ聊天机器人触发AI任务 1. 为什么选择OpenClawnanobot组合? 去年冬天,当我第一次尝试用QQ机器人自动处理群消息时,经历了漫长的环境配置地狱。直到发现星图平台的nanobot镜像,这个开箱即…...
7.企业级开发
一.软件开发的流程二.系统开发环境三.分支设计规范Git Flow模型四.企业级项目管理https://gitee.com/enterprises1.创建项目2.创建项目对应的仓库3.添加成员还可以进行(项目/仓库)成员管理五.开发实战场景1.创建仓库时,一般选生产和开发模型,其他的分支自己创建2.创建新分支:3.…...
深度解析开源工具如何实现游戏性能优化:Genshin FPS Unlocker专业实战指南
深度解析开源工具如何实现游戏性能优化:Genshin FPS Unlocker专业实战指南 【免费下载链接】genshin-fps-unlock unlocks the 60 fps cap 项目地址: https://gitcode.com/gh_mirrors/ge/genshin-fps-unlock Genshin FPS Unlocker 是一款专注于游戏性能优化的…...
SEO_资深从业者的高级SEO策略与实战技巧
前言:SEO的进阶之道 在当今互联网时代,搜索引擎优化(SEO)已经不再是一个简单的任务。对于资深从业者来说,SEO不仅仅是一门技术,更是一门艺术。本文将从多个角度探讨资深从业者的高级SEO策略与实战技巧&…...
软件工程师如何转型AI工程师 第三章 技术路线的选择——不要从头学起
第三章 技术路线的选择——不要从头学起 在转型的技术路径上,我见过最多的弯路长这个样子:某个工程师下定决心要搞AI,于是买了一本《深度学习》(花书),从第一章线性代数开始硬啃,啃到反向传播…...
基于OpenStack的毕业设计:从零搭建私有云平台的入门实战与避坑指南
最近在帮学弟学妹们看毕业设计,发现不少同学对云计算方向很感兴趣,尤其是想用OpenStack做个私有云平台。但一上手就懵了,组件多、文档杂,环境动不动就崩,最后时间都花在折腾部署上了。我自己当初也踩过不少坑ÿ…...
STM32L0待机模式唤醒后程序跑飞?用LL库/HAL库正确处理系统复位与初始化
STM32L0待机模式唤醒后的系统复位陷阱与实战解决方案 引言:被忽视的唤醒后世界 当你按下STM32L0的唤醒按键,看到电流表指针从微安级跳回毫安级,内心是否涌起一阵成就感?但紧接着,OLED屏幕不再刷新,蓝牙模块…...
3步接入钉钉机器人:OpenClaw+百川2-13B打造部门问答助手
3步接入钉钉机器人:OpenClaw百川2-13B打造部门问答助手 1. 为什么选择这个组合? 去年我们部门开始尝试用大模型解决内部知识检索问题。最初直接使用网页版对话工具,但遇到三个痛点:一是敏感业务数据不敢上传公有云;二…...
c++ 20 有什么新的功能
C20 是继 C11 之后最具革命性的 C 标准更新之一,引入了许多强大的新特性,旨在提高代码的表达力、类型安全性、编译效率和开发体验。以下是 C20 的主要新功能分类总结:一、四大核心语言特性1. 模块(Modules)目的&#x…...
