当前位置: 首页 > news >正文

flink StreamGraph 构造flink任务

文章目录

      • 背景
      • 主要步骤
      • 代码

背景

通常使用flink 提供的高级算子来编写flink 任务,对底层不是很了解,尤其是如何生成作业图的细节
下面通过构造一个有向无环图,来实际看一下

主要步骤

1.增加source
2.增加operator
3. 增加一条边,连接source和operator
4. 增加sink
5. 增加一条边,连接operator和sink

代码

 // Step 1: Create basic configurationsConfiguration configuration = new Configuration();ExecutionConfig executionConfig = new ExecutionConfig();CheckpointConfig checkpointConfig = new CheckpointConfig();SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();// Step 2: Create a new StreamGraph instanceStreamGraph streamGraph = new StreamGraph(configuration, executionConfig, checkpointConfig, savepointRestoreSettings);// Step 3: Add a source operatorGeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;DataGeneratorSource<String> source = new DataGeneratorSource<>(generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.STRING);SourceOperatorFactory<String> sourceOperatorFactory = new SourceOperatorFactory<>(source, WatermarkStrategy.noWatermarks());streamGraph.addSource(1, "sourceNode", "sourceDescription", sourceOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sourceSlot");// Step 4: Add a map operator to transform the dataStreamMap<String, String> mapOperator = new StreamMap<>(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value;}});SimpleOperatorFactory<String> mapOperatorFactory = SimpleOperatorFactory.of(mapOperator);streamGraph.addOperator(2, "mapNode", "mapDescription", mapOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "mapSlot");// Step 5: Connect source and map operatorstreamGraph.addEdge(1, 2, 0);// Step 6: Add a sink operator to consume the dataStreamMap<String, String> sinkOperator = new StreamMap<>(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {System.out.println(value);return value;}});SimpleOperatorFactory<String> sinkOperatorFactory = SimpleOperatorFactory.of(sinkOperator);streamGraph.addSink(3, "sinkNode", "sinkDescription", sinkOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sinkSlot");// Step 7: Connect map and sink operatorstreamGraph.addEdge(2, 3, 0);streamGraph.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);streamGraph.setMaxParallelism(1,1);streamGraph.setMaxParallelism(2,1);streamGraph.setMaxParallelism(3,1);streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);// Step 8: Convert StreamGraph to JobGraphJobGraph jobGraph = streamGraph.getJobGraph();// Step 9: Set up a MiniCluster for local executionMiniClusterConfiguration miniClusterConfig = new MiniClusterConfiguration.Builder().setNumTaskManagers(10).setNumSlotsPerTaskManager(10).build();MiniCluster miniCluster = new MiniCluster(miniClusterConfig);// Step 10: Start the MiniClusterminiCluster.start();// Step 11: Submit the job to the MiniClusterJobExecutionResult result = miniCluster.executeJobBlocking(jobGraph);System.out.println("Job completed with result: " + result);// Step 12: Stop the MiniClusterminiCluster.close();

相关文章:

flink StreamGraph 构造flink任务

文章目录 背景主要步骤代码 背景 通常使用flink 提供的高级算子来编写flink 任务&#xff0c;对底层不是很了解&#xff0c;尤其是如何生成作业图的细节 下面通过构造一个有向无环图&#xff0c;来实际看一下 主要步骤 1.增加source 2.增加operator 3. 增加一条边&#xff0…...

【51单片机】LCD1602液晶显示屏

学习使用的开发板&#xff1a;STC89C52RC/LE52RC 编程软件&#xff1a;Keil5 烧录软件&#xff1a;stc-isp 开发板实图&#xff1a; 文章目录 LCD1602存储结构时序结构 编码 —— 显示字符、数字 LCD1602 LCD1602&#xff08;Liquid Crystal Display&#xff09;液晶显示屏是…...

理解 HTML5 Canvas 中逻辑像素与物理像素的关系

理解 HTML5 Canvas 中逻辑像素与物理像素的关系 在使用 HTML5 Canvas 时&#xff0c;开发者经常会遇到一个困惑&#xff1a;为什么鼠标的 offsetX 和 offsetY 和我绘制的图形坐标对不上&#xff1f;这通常是因为 Canvas 的逻辑像素大小和物理像素大小不一致。本文将详细解释这…...

7.揭秘C语言输入输出内幕:printf与scanf的深度剖析

揭秘C语言输入输出内幕&#xff1a;printf与scanf的深度剖析 C语言往期系列文章目录 往期回顾&#xff1a; VS 2022 社区版C语言的安装教程&#xff0c;不要再卡在下载0B/s啦C语言入门&#xff1a;解锁基础概念&#xff0c;动手实现首个C程序C语言概念之旅&#xff1a;解锁关…...

数据分析-系统认识数据分析

目录 数据分析的全貌 观测 实验 应用 数据分析的全貌 观测 实验 应用...

蓝桥杯介绍

赛事背景与历程 自2009年举办以来&#xff0c;蓝桥杯已经连续举行了多届&#xff0c;成为国内领先的信息技术赛事。2022年&#xff0c;蓝桥杯被教育部确定为2022—2025学年面向中小学生的全国性竞赛活动&#xff0c;并入选国家级A类学科竞赛。 参赛对象与组别 蓝桥杯的参赛对…...

鸿蒙加载网络图片并转换成PixelMap

鸿蒙加载网络图片并转换成PixelMap 参考文档 基于API12. 有一些图片功能需要使用 PixelMap 类型的参数&#xff0c;但是使用Image组件之类的时候无法获取到 PixelMap 类型数据。 因此只能是把图片下载下来然后加在并转换一下。 实现方式 一下封装了一个函数。使用的 rcp 模…...

hive搭建

1.准备环境 三台节点主机已安装hadoopmysql数据库 2.环境 2.1修改三台节点上hadoop的core-site.xml <!-- 配置 HDFS 允许代理任何主机和组 --> <property><name>hadoop.proxyuser.hadoop.hosts</name><value>*</value> </property&…...

51c扩散模型~合集1

我自己的原文哦~ https://blog.51cto.com/whaosoft/11541675 #Diffusion Forcing 无限生成视频&#xff0c;还能规划决策&#xff0c;扩散强制整合下一token预测与全序列扩散 当前&#xff0c;采用下一 token 预测范式的自回归大型语言模型已经风靡全球&#xff0c;同时互联…...

从零开始深度学习:全连接层、损失函数与梯度下降的详尽指南

引言 在深度学习的领域&#xff0c;全连接层、损失函数与梯度下降是三块重要的基石。如果你正在踏上深度学习的旅程&#xff0c;理解它们是迈向成功的第一步。这篇文章将从概念到代码、从基础到进阶&#xff0c;详细剖析这三个主题&#xff0c;帮助你从小白成长为能够解决实际…...

Liebherr利勃海尔 EDI 需求分析

Liebherr 使用 EDI 技术来提高业务流程的效率、降低错误率、加快数据交换速度&#xff0c;并优化与供应商、客户和其他合作伙伴之间的业务沟通。通过 EDI&#xff0c;Liebherr 实现了与全球交易伙伴的自动化数据交换&#xff0c;提升了供应链管理和订单处理的透明度。 Liebher…...

java小练习

小练1.用while语句计算11/2!1/3!1/4!...1/20!的和 public class test_11_17_2 {public static void main(String[] args) {double sum 0;double item 1;int n 20;int i 1;while(i<n){sum item;i i1;item item*(1.0/i);}System.out.println(sum);} } 小练2.计算88888…...

go语言中的占位符有哪些

在Go语言中&#xff0c;占位符主要用于格式化字符串输出&#xff0c;特别是在使用fmt包中的Printf系列函数时。以下是Go语言中常用的占位符&#xff1a; %v&#xff1a;代表值的默认格式&#xff0c;对于字符串是直接输出&#xff0c;对于整型是十进制形式。%v&#xff1a;扩展…...

基于Windows安装opus python库

项目中需要用到一些opus格式的编解码功能&#xff0c;找到网上有opus的开源库。网址&#xff1a;Opus Codec 想着人生苦短&#xff0c;没想到遇上了错误&#xff01;在这里记录一下过程 过程 安装python库 pip3 install opuslib验证 >>> import opuslib Tracebac…...

【设计模式】行为型模式(五):解释器模式、访问者模式、依赖注入

《设计模式之行为型模式》系列&#xff0c;共包含以下文章&#xff1a; 行为型模式&#xff08;一&#xff09;&#xff1a;模板方法模式、观察者模式行为型模式&#xff08;二&#xff09;&#xff1a;策略模式、命令模式行为型模式&#xff08;三&#xff09;&#xff1a;责…...

使用nossl模式连接MySQL数据库详解

使用nossl模式连接MySQL数据库详解 摘要一、引言二、nossl模式概述2.1 SSL与nossl模式的区别2.2 选择nossl模式的场景三、在nossl模式下连接MySQL数据库3.1 准备工作3.2 C++代码示例3.3 代码详解3.3.1 初始化MySQL连接对象3.3.2 连接到MySQL数据库3.3.3 执行查询操作3.3.4 处理…...

【MySQL】ubantu 系统 MySQL的安装与免密码登录的配置

&#x1f351;个人主页&#xff1a;Jupiter. &#x1f680; 所属专栏&#xff1a;MySQL初阶探索&#xff1a;构建数据库基础 欢迎大家点赞收藏评论&#x1f60a; 目录 &#x1f4da;mysql的安装&#x1f4d5;MySQL的登录&#x1f30f;MySQL配置免密码登录 &#x1f4da;mysql的…...

高级 SQL 技巧讲解

​ 大家好&#xff0c;我是程序员小羊&#xff01; 前言&#xff1a; SQL&#xff08;结构化查询语言&#xff09;是管理和操作数据库的核心工具。从基本的查询语句到复杂的数据处理&#xff0c;掌握高级 SQL 技巧不仅能显著提高数据分析的效率&#xff0c;还能解决业务中的复…...

浅论AI大模型在电商行业的发展未来

随着人工智能&#xff08;AI&#xff09;技术的快速发展&#xff0c;AI大模型在电商行业中扮演着越来越重要的角色。本文旨在探讨AI大模型如何赋能电商行业&#xff0c;包括提升销售效率、优化用户体验、增强供应链管理等方面。通过分析AI大模型在电商领域的应用案例和技术进展…...

【python笔记03】《类》

文章目录 面向对象基本概念对象的概念类的概念 类的定义类的创建&#xff08;实例的模板&#xff09;类的实例化--获取对象对象方法中的self关键字面试题请描述什么是对象&#xff0c;什么是类。请观阅读如下代码&#xff0c;判断是否能正常运行&#xff0c;如果不能正常运行&a…...

ESP32-S3 OV2640摄像头从AP模式到STA模式的保姆级切换教程(附完整代码)

ESP32-S3 OV2640摄像头从AP模式到STA模式的保姆级切换教程&#xff08;附完整代码&#xff09; 当你第一次拿到ESP32-S3开发板和OV2640摄像头模块时&#xff0c;可能会被官方例程中的AP&#xff08;热点&#xff09;模式所困扰。虽然AP模式让设备快速上线&#xff0c;但在实际家…...

效率革命:设计师必备的Sketch批量命名神器RenameIt完全指南

效率革命&#xff1a;设计师必备的Sketch批量命名神器RenameIt完全指南 【免费下载链接】RenameIt Keep your Sketch files organized, batch rename layers and artboards. 项目地址: https://gitcode.com/gh_mirrors/re/RenameIt 在现代UI/UX设计流程中&#xff0c;保…...

FJSP:蛇鹫优化算法(SBOA)求解柔性作业车间调度问题(FJSP),提供MATLAB代码

FJSP&#xff1a;蛇鹫优化算法&#xff08;SBOA&#xff09;求解柔性作业车间调度问题&#xff08;FJSP&#xff09;&#xff0c;提供MATLAB代码当车间调度遇上非洲大草原的蛇鹄&#xff0c;会碰撞出什么样的火花&#xff1f;今天咱们用MATLAB实现一种新颖的群智能算法——蛇鹄…...

如何将TaskWeaver与LangChain无缝集成:扩展AI代理能力边界的终极指南

如何将TaskWeaver与LangChain无缝集成&#xff1a;扩展AI代理能力边界的终极指南 【免费下载链接】TaskWeaver A code-first agent framework for seamlessly planning and executing data analytics tasks. 项目地址: https://gitcode.com/gh_mirrors/ta/TaskWeaver T…...

戴尔Precision Pro商务笔记本回归,新一代产品聚焦便携性

看起来2026年是戴尔按下重启键的一年——在消费者和商用产品线中&#xff0c;戏剧性的品牌重塑都在被回归。有时候&#xff0c;老品牌确实是好品牌&#xff1a;熟悉的XPS名称和设计今年回归是有原因的&#xff0c;绝对不是因为戴尔Premium有正面的品牌认知度。在戴尔商务产品方…...

Docker镜像的制作

什么是Docker镜像&#xff1f; Docker镜像是一个轻量级、独立的可执行软件包&#xff0c;包含运行应用程序所需的一切&#xff1a;代码、运行时、系统工具、系统库和设置。镜像是容器的基础&#xff0c;容器是镜像的运行实例。 准备工作 安装Docker 首先确保你的系统已安装D…...

手把手教你用深信服备份系统做整机恢复:从PXE到U盘启动的保姆级避坑指南

深信服整机恢复实战&#xff1a;PXE与U盘启动的深度避坑手册 当服务器突然宕机&#xff0c;硬盘彻底损坏时&#xff0c;整机恢复能力就是IT工程师的救命稻草。深信服备份系统的裸机恢复功能&#xff0c;能在没有操作系统的"裸机"上直接还原整个系统环境——但实际操作…...

如何使用Compiler Explorer实时编译原理:揭秘代码到汇编的转换过程

如何使用Compiler Explorer实时编译原理&#xff1a;揭秘代码到汇编的转换过程 【免费下载链接】compiler-explorer Run compilers interactively from your web browser and interact with the assembly 项目地址: https://gitcode.com/gh_mirrors/co/compiler-explorer …...

如何快速部署Uvicorn ASGI服务器到AWS Lightsail:终极云服务器配置指南 [特殊字符]

如何快速部署Uvicorn ASGI服务器到AWS Lightsail&#xff1a;终极云服务器配置指南 &#x1f680; 【免费下载链接】uvicorn An ASGI web server, for Python. &#x1f984; 项目地址: https://gitcode.com/GitHub_Trending/uv/uvicorn Uvicorn是一个轻量级、高性能的A…...

Maxwell16.0实战:如何用实验电流数据搞定电机仿真(附.tab文件制作技巧)

Maxwell16.0实战&#xff1a;实验电流数据驱动电机仿真的全流程解析 电机仿真作为现代工业设计的重要环节&#xff0c;其准确性直接影响产品性能评估。而将实测电流数据融入仿真流程&#xff0c;往往是工程师突破"理想模型"局限的关键一步。本文将系统性地拆解从实验…...