flink StreamGraph 构造flink任务
文章目录
- 背景
- 主要步骤
- 代码
背景
通常使用flink 提供的高级算子来编写flink 任务,对底层不是很了解,尤其是如何生成作业图的细节
下面通过构造一个有向无环图,来实际看一下
主要步骤
1.增加source
2.增加operator
3. 增加一条边,连接source和operator
4. 增加sink
5. 增加一条边,连接operator和sink
代码
// Step 1: Create basic configurationsConfiguration configuration = new Configuration();ExecutionConfig executionConfig = new ExecutionConfig();CheckpointConfig checkpointConfig = new CheckpointConfig();SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();// Step 2: Create a new StreamGraph instanceStreamGraph streamGraph = new StreamGraph(configuration, executionConfig, checkpointConfig, savepointRestoreSettings);// Step 3: Add a source operatorGeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;DataGeneratorSource<String> source = new DataGeneratorSource<>(generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.STRING);SourceOperatorFactory<String> sourceOperatorFactory = new SourceOperatorFactory<>(source, WatermarkStrategy.noWatermarks());streamGraph.addSource(1, "sourceNode", "sourceDescription", sourceOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sourceSlot");// Step 4: Add a map operator to transform the dataStreamMap<String, String> mapOperator = new StreamMap<>(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value;}});SimpleOperatorFactory<String> mapOperatorFactory = SimpleOperatorFactory.of(mapOperator);streamGraph.addOperator(2, "mapNode", "mapDescription", mapOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "mapSlot");// Step 5: Connect source and map operatorstreamGraph.addEdge(1, 2, 0);// Step 6: Add a sink operator to consume the dataStreamMap<String, String> sinkOperator = new StreamMap<>(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {System.out.println(value);return value;}});SimpleOperatorFactory<String> sinkOperatorFactory = SimpleOperatorFactory.of(sinkOperator);streamGraph.addSink(3, "sinkNode", "sinkDescription", sinkOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sinkSlot");// Step 7: Connect map and sink operatorstreamGraph.addEdge(2, 3, 0);streamGraph.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);streamGraph.setMaxParallelism(1,1);streamGraph.setMaxParallelism(2,1);streamGraph.setMaxParallelism(3,1);streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);// Step 8: Convert StreamGraph to JobGraphJobGraph jobGraph = streamGraph.getJobGraph();// Step 9: Set up a MiniCluster for local executionMiniClusterConfiguration miniClusterConfig = new MiniClusterConfiguration.Builder().setNumTaskManagers(10).setNumSlotsPerTaskManager(10).build();MiniCluster miniCluster = new MiniCluster(miniClusterConfig);// Step 10: Start the MiniClusterminiCluster.start();// Step 11: Submit the job to the MiniClusterJobExecutionResult result = miniCluster.executeJobBlocking(jobGraph);System.out.println("Job completed with result: " + result);// Step 12: Stop the MiniClusterminiCluster.close();
相关文章:
flink StreamGraph 构造flink任务
文章目录 背景主要步骤代码 背景 通常使用flink 提供的高级算子来编写flink 任务,对底层不是很了解,尤其是如何生成作业图的细节 下面通过构造一个有向无环图,来实际看一下 主要步骤 1.增加source 2.增加operator 3. 增加一条边࿰…...
【51单片机】LCD1602液晶显示屏
学习使用的开发板:STC89C52RC/LE52RC 编程软件:Keil5 烧录软件:stc-isp 开发板实图: 文章目录 LCD1602存储结构时序结构 编码 —— 显示字符、数字 LCD1602 LCD1602(Liquid Crystal Display)液晶显示屏是…...
理解 HTML5 Canvas 中逻辑像素与物理像素的关系
理解 HTML5 Canvas 中逻辑像素与物理像素的关系 在使用 HTML5 Canvas 时,开发者经常会遇到一个困惑:为什么鼠标的 offsetX 和 offsetY 和我绘制的图形坐标对不上?这通常是因为 Canvas 的逻辑像素大小和物理像素大小不一致。本文将详细解释这…...
7.揭秘C语言输入输出内幕:printf与scanf的深度剖析
揭秘C语言输入输出内幕:printf与scanf的深度剖析 C语言往期系列文章目录 往期回顾: VS 2022 社区版C语言的安装教程,不要再卡在下载0B/s啦C语言入门:解锁基础概念,动手实现首个C程序C语言概念之旅:解锁关…...
数据分析-系统认识数据分析
目录 数据分析的全貌 观测 实验 应用 数据分析的全貌 观测 实验 应用...
蓝桥杯介绍
赛事背景与历程 自2009年举办以来,蓝桥杯已经连续举行了多届,成为国内领先的信息技术赛事。2022年,蓝桥杯被教育部确定为2022—2025学年面向中小学生的全国性竞赛活动,并入选国家级A类学科竞赛。 参赛对象与组别 蓝桥杯的参赛对…...
鸿蒙加载网络图片并转换成PixelMap
鸿蒙加载网络图片并转换成PixelMap 参考文档 基于API12. 有一些图片功能需要使用 PixelMap 类型的参数,但是使用Image组件之类的时候无法获取到 PixelMap 类型数据。 因此只能是把图片下载下来然后加在并转换一下。 实现方式 一下封装了一个函数。使用的 rcp 模…...
hive搭建
1.准备环境 三台节点主机已安装hadoopmysql数据库 2.环境 2.1修改三台节点上hadoop的core-site.xml <!-- 配置 HDFS 允许代理任何主机和组 --> <property><name>hadoop.proxyuser.hadoop.hosts</name><value>*</value> </property&…...
51c扩散模型~合集1
我自己的原文哦~ https://blog.51cto.com/whaosoft/11541675 #Diffusion Forcing 无限生成视频,还能规划决策,扩散强制整合下一token预测与全序列扩散 当前,采用下一 token 预测范式的自回归大型语言模型已经风靡全球,同时互联…...
从零开始深度学习:全连接层、损失函数与梯度下降的详尽指南
引言 在深度学习的领域,全连接层、损失函数与梯度下降是三块重要的基石。如果你正在踏上深度学习的旅程,理解它们是迈向成功的第一步。这篇文章将从概念到代码、从基础到进阶,详细剖析这三个主题,帮助你从小白成长为能够解决实际…...
Liebherr利勃海尔 EDI 需求分析
Liebherr 使用 EDI 技术来提高业务流程的效率、降低错误率、加快数据交换速度,并优化与供应商、客户和其他合作伙伴之间的业务沟通。通过 EDI,Liebherr 实现了与全球交易伙伴的自动化数据交换,提升了供应链管理和订单处理的透明度。 Liebher…...
java小练习
小练1.用while语句计算11/2!1/3!1/4!...1/20!的和 public class test_11_17_2 {public static void main(String[] args) {double sum 0;double item 1;int n 20;int i 1;while(i<n){sum item;i i1;item item*(1.0/i);}System.out.println(sum);} } 小练2.计算88888…...
go语言中的占位符有哪些
在Go语言中,占位符主要用于格式化字符串输出,特别是在使用fmt包中的Printf系列函数时。以下是Go语言中常用的占位符: %v:代表值的默认格式,对于字符串是直接输出,对于整型是十进制形式。%v:扩展…...
基于Windows安装opus python库
项目中需要用到一些opus格式的编解码功能,找到网上有opus的开源库。网址:Opus Codec 想着人生苦短,没想到遇上了错误!在这里记录一下过程 过程 安装python库 pip3 install opuslib验证 >>> import opuslib Tracebac…...
【设计模式】行为型模式(五):解释器模式、访问者模式、依赖注入
《设计模式之行为型模式》系列,共包含以下文章: 行为型模式(一):模板方法模式、观察者模式行为型模式(二):策略模式、命令模式行为型模式(三):责…...
使用nossl模式连接MySQL数据库详解
使用nossl模式连接MySQL数据库详解 摘要一、引言二、nossl模式概述2.1 SSL与nossl模式的区别2.2 选择nossl模式的场景三、在nossl模式下连接MySQL数据库3.1 准备工作3.2 C++代码示例3.3 代码详解3.3.1 初始化MySQL连接对象3.3.2 连接到MySQL数据库3.3.3 执行查询操作3.3.4 处理…...
【MySQL】ubantu 系统 MySQL的安装与免密码登录的配置
🍑个人主页:Jupiter. 🚀 所属专栏:MySQL初阶探索:构建数据库基础 欢迎大家点赞收藏评论😊 目录 📚mysql的安装📕MySQL的登录🌏MySQL配置免密码登录 📚mysql的…...
高级 SQL 技巧讲解
大家好,我是程序员小羊! 前言: SQL(结构化查询语言)是管理和操作数据库的核心工具。从基本的查询语句到复杂的数据处理,掌握高级 SQL 技巧不仅能显著提高数据分析的效率,还能解决业务中的复…...
浅论AI大模型在电商行业的发展未来
随着人工智能(AI)技术的快速发展,AI大模型在电商行业中扮演着越来越重要的角色。本文旨在探讨AI大模型如何赋能电商行业,包括提升销售效率、优化用户体验、增强供应链管理等方面。通过分析AI大模型在电商领域的应用案例和技术进展…...
【python笔记03】《类》
文章目录 面向对象基本概念对象的概念类的概念 类的定义类的创建(实例的模板)类的实例化--获取对象对象方法中的self关键字面试题请描述什么是对象,什么是类。请观阅读如下代码,判断是否能正常运行,如果不能正常运行&a…...
铭豹扩展坞 USB转网口 突然无法识别解决方法
当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...
docker详细操作--未完待续
docker介绍 docker官网: Docker:加速容器应用程序开发 harbor官网:Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台,用于将应用程序及其依赖项(如库、运行时环…...
可靠性+灵活性:电力载波技术在楼宇自控中的核心价值
可靠性灵活性:电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中,电力载波技术(PLC)凭借其独特的优势,正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据,无需额外布…...
LeetCode - 394. 字符串解码
题目 394. 字符串解码 - 力扣(LeetCode) 思路 使用两个栈:一个存储重复次数,一个存储字符串 遍历输入字符串: 数字处理:遇到数字时,累积计算重复次数左括号处理:保存当前状态&a…...
在四层代理中还原真实客户端ngx_stream_realip_module
一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡(如 HAProxy、AWS NLB、阿里 SLB)发起上游连接时,将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后,ngx_stream_realip_module 从中提取原始信息…...
LLM基础1_语言模型如何处理文本
基于GitHub项目:https://github.com/datawhalechina/llms-from-scratch-cn 工具介绍 tiktoken:OpenAI开发的专业"分词器" torch:Facebook开发的强力计算引擎,相当于超级计算器 理解词嵌入:给词语画"…...
Caliper 配置文件解析:config.yaml
Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...
Spring数据访问模块设计
前面我们已经完成了IoC和web模块的设计,聪明的码友立马就知道了,该到数据访问模块了,要不就这俩玩个6啊,查库势在必行,至此,它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据(数据库、No…...
【网络安全】开源系统getshell漏洞挖掘
审计过程: 在入口文件admin/index.php中: 用户可以通过m,c,a等参数控制加载的文件和方法,在app/system/entrance.php中存在重点代码: 当M_TYPE system并且M_MODULE include时,会设置常量PATH_OWN_FILE为PATH_APP.M_T…...
Xcode 16 集成 cocoapods 报错
基于 Xcode 16 新建工程项目,集成 cocoapods 执行 pod init 报错 ### Error RuntimeError - PBXGroup attempted to initialize an object with unknown ISA PBXFileSystemSynchronizedRootGroup from attributes: {"isa">"PBXFileSystemSynchro…...
