Flink 并行度的设置
在 Apache Flink 中,并行度(Parallelism) 是控制任务并发执行的核心参数之一。Flink 提供了 多个层级设置并行度的方式,优先级从高到低如下:
🧩 一、Flink 并行度的四个设置层级
层级 | 描述 | 设置方式 |
---|---|---|
Operator Level | 为某个具体的算子设置并行度 | operator.setParallelism(n) |
Execution Environment Level | 为整个流处理环境设置默认并行度 | env.setParallelism(n) |
Client Level(提交作业时) | 通过命令行指定全局并行度 | flink run -p n |
System Level(系统配置) | 在 flink-conf.yaml 中定义全局默认值 | parallelism.default: n |
✅ 二、各层级设置详解与示例
1. Operator Level(算子级别)
- 优先级最高
- 可以为特定算子设置不同并行度,适用于数据倾斜或资源敏感操作
🔧 示例:
DataStream<String> stream = env.fromElements("a", "b", "c");// 单独为 map 算子设置并行度为4
stream.map(new MyMapFunction()).setParallelism(4).print();
✅ 适用场景:
- 某个算子计算密集,需要更多资源
- 数据源分区数较少,但后续算子可并行化处理
2. Execution Environment Level(执行环境级别)
- 设置整个 Job 的默认并行度
- 如果未对某些算子单独设置,并使用此值
🔧 示例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // 所有算子默认并行度为4DataStream<String> stream = env.fromElements("a", "b", "c");
stream.map(new MyMapFunction()).print(); // 默认并行度为4
✅ 适用场景:
- 多数算子使用相同并行度
- 统一配置便于管理和维护
3. Client Level(客户端提交作业时)
- 使用命令行参数动态设置并行度
- 不修改代码即可适配不同运行环境(如测试/生产)
🔧 示例:
flink run -p 4 -c com.example.MyJob ./myjob.jar
✅ 适用场景:
- 快速调整不同集群资源配置
- 测试阶段快速验证性能
4. System Level(系统级别)
- 在
flink-conf.yaml
中设置全局默认并行度 - 对所有提交的作业生效(除非被更高级别覆盖)
🔧 示例(flink-conf.yaml
):
parallelism.default: 4
✅ 适用场景:
- 所有作业共享相同的默认资源配置
- 避免手动重复设置
📊 三、并行度优先级对比表
设置方式 | 是否推荐 | 场景 | 覆盖关系 |
---|---|---|---|
Operator Level | ✅✅✅ | 特定算子优化 | 最高优先级 |
Execution Environment Level | ✅✅ | 整体统一配置 | 被 Operator 覆盖 |
Client Level (-p) | ✅ | 动态部署 | 被前两者覆盖 |
System Level (flink-conf.yaml) | ⚠️ | 兜底默认值 | 最低优先级 |
💡 四、并行度设置建议
✅ 推荐做法:
- 开发/测试环境:使用
.setParallelism()
或-p
命令行设置较小值(如1~4) - 生产环境:
- 使用
flink-conf.yaml
设置基础并行度 - 使用
env.setParallelism()
明确控制默认值 - 为关键算子单独设置更高并行度(如窗口聚合、复杂逻辑)
- 使用
⚙️ 示例组合:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4); // 默认并行度env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source").setParallelism(8) // Kafka Source 并行度设为8(等于topic分区数).map(new MyMapFunction()) // 使用默认并行度4.keyBy(keySelector).window(TumblingEventTimeWindows.of(Time.seconds(5))).process(new MyProcessWindowFunction()) // 可选 setParallelism().print();
🧠 五、并行度与资源的关系
并行度 | TaskManager 数量 | Slot 数量 | 资源要求 |
---|---|---|---|
≤ TM × slot | ✅ 正常运行 | ✅ 正常运行 | 资源充足 |
> TM × slot | ❌ 无法启动 | ❌ 无法启动 | 资源不足 |
✅ 建议:确保总并行度 ≤ 总 slot 数量
📈 六、实际调优建议
场景 | 建议设置 |
---|---|
Kafka Source | 并行度 = Kafka Topic 分区数 |
Map / FlatMap | 根据 CPU 利用率设置 |
Keyed Window Aggregation | 可适当提高并行度提升吞吐 |
Join / CoGroup | 视数据分布决定是否提高并行度 |
Sink | 若写入慢可适当增加并行度 |
✅ 七、完整示例(Java + Shell)
Java 设置(Env + Operator):
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);env.fromElements("a", "b", "c").map(x -> x).setParallelism(2) // 覆盖默认值.print();env.execute("Parallelism Example");
Shell 设置(Client Level):
flink run -p 8 -c com.example.MyJob ./myjob.jar
✅ 八、总结
层级 | 用途 | 是否推荐使用 |
---|---|---|
Operator Level | 控制单个算子并行度 | ✅✅✅ 强烈推荐用于关键路径优化 |
Execution Environment Level | 设置默认并行度 | ✅✅ 推荐作为基础配置 |
Client Level | 动态设置并行度 | ✅ 适合多环境部署 |
System Level | 全局兜底配置 | ⚠️ 推荐配合其他方式使用 |
相关文章:
Flink 并行度的设置
在 Apache Flink 中,并行度(Parallelism) 是控制任务并发执行的核心参数之一。Flink 提供了 多个层级设置并行度的方式,优先级从高到低如下: 🧩 一、Flink 并行度的四个设置层级 层级描述设置方式Operator…...
【微服务】SpringBoot + Docker 实现微服务容器多节点负载均衡详解
目录 一、前言 二、前置准备 2.1 基本环境 2.2 准备一个springboot工程 2.2.1 准备几个测试接口 2.3 准备Dockerfile文件 2.4 打包上传到服务器 三、制作微服务镜像与运行服务镜像 3.1 拷贝Dockerfile文件到服务器 3.2 制作服务镜像 3.3 启动镜像服务 3.4 访问一下服…...

get请求使用数组进行传参
get请求使用数组进行传参,无需添加中括号 mvc接口要添加参数名,使用array承接。不能用list, 否则会报错 这里是用apifox模拟前端调用。 前端调用代码 // 根据项目ID和角色ID查询相关审批人 export function findRelativeApproverByProjectIdAndRoleId(roleIds, p…...
20. 自动化测试框架开发之Excel配置文件的IO开发
20.自动化测试框架开发之Excel配置文件的IO开发 一、核心架构解析 1.1 类继承体系 class File: # 文件基类# 基础文件验证和路径管理class ExcelReader(File): # Excel读取器# 实现Excel数据解析逻辑1.2 版本依赖说明 # 必须安装1.2.0版本(支持xlsx格式&#…...

【MySQL成神之路】MySQL常用语法总结
目录 MySQL 语法总结 数据库操作 表操作 数据操作 查询语句 索引操作 约束 事务控制 视图操作 存储过程和函数 触发器 用户和权限管理 数据库操作 创建数据库: CREATE DATABASE database_name; 选择数据库: USE database_name; 删除数…...

Linux动静态库制作与原理
什么是库 库是写好的现有的,成熟的,可以复用的代码。现实中每个程序都要依赖很多基础的底层库,不可能每个人的代码都从零开始,因此库的存在意义非同寻常。 本质上来说库是一种可执行代码的二进制形式,可以被操作系统…...
确保高质量的音视频通话,如何最大化利用视频带宽
在当今数字时代,音视频内容随处可见,对于开发者来说,理解互联网带宽变得至关重要。我们的在线体验质量,无论是观看高清电影还是演唱会直播,都严重依赖于互联网带宽的概念。在本文中,我们将揭示视频带宽的复…...

ffmpeg 把一个视频复制3次
1. 起因, 目的: 前面我写过,使用 python 把一个视频复制3次但是速度太慢了,我想试试看能否改进。而且我想换一种新的视频处理思路,并试试看速度如何。 2. 先看效果 效果就是能行,而且速度也快。 3. 过程: 代码 1…...

GPT/Claude3国内免费镜像站更新 亲测可用
无限次使用:无限制的提问次数,不设上限,随心所欲。 无需魔法、稳定流畅:操作简便,无需复杂设置,即可享受稳定流畅的服务。 手机和电脑均能用:轻松适配手机和电脑,使用体验更佳。 …...
AI自动化工作流:开启当下智能生产力的价值
举手之言:AI自动化工作流创造了什么呢? AI自动化工作流 ,顾名思义,是将人工智能(AI)技术与自动化流程相结合,通过智能化的方式来完成复杂的任务和操作。简单来说,它就是利用AI的强大…...
stm32——EXTI外部中断
NVIC优先级分组 抢占优先级 可以进行中断嵌套的优先级,即可以不等上一个中断执行完成就进入下一个中断 响应优先级 决定中断发生的顺序,但不可嵌套 程序实现 对射式红外传感计次 #include "stm32f10x.h" // Device head…...

Python:操作Excel按行写入
Python按行写入Excel数据,5种实用方法大揭秘! 在日常的数据处理和分析工作中,我们经常需要将数据写入到Excel文件中。Python作为一门强大的编程语言,提供了多种库和方法来实现将数据按行写入Excel文件的功能。本文将详细介绍5种常见的Python按行写入Excel数据的方法,并附上…...

Redis进阶知识
Redis 1.事务2. 主从复制2.1 如何启动多个Redis服务器2.2 监控主从节点的状态2.3 断开主从复制关系2.4 额外注意2.5拓扑结构2.6 复制过程2.6.1 数据同步 3.哨兵选举原理注意事项 4.集群4.1 数据分片算法4.2 故障检测 5. 缓存5.1 缓存问题 6. 分布式锁 1.事务 Redis的事务只能保…...
Python机器学习笔记(二十三 模型评估与改进-网格搜索)
上一次学习了评估一个模型的泛化能力,现在继续学习通过调参来提升模型的泛化性能。scikit-learn中许多算法的参数设置,在尝试调参之前,重要的是要理解参数的含义。找到一个模型的重要参数(提供最佳泛化性能的参数)的取值是一项棘手的任务,但对于几乎所有模型和数据集来说…...

12.vue整合springboot首页显示数据库表-实现按钮:【添加修改删除查询】
vue整合springboot首页显示数据库表:【添加修改删除查询】 提示:帮帮志会陆续更新非常多的IT技术知识,希望分享的内容对您有用。本章分享的是node.js和vue的使用。前后每一小节的内容是存在的有:学习and理解的关联性。【帮帮志系…...

bisheng系列(一)- 本地部署(Docker)
目录 一、导读 二、说明 1、镜像说明 2、本节内容 三、docker部署 1、克隆代码 2、运行镜像 3、可能的错误信息 四、页面测试 1、注册用户 2、登陆成功 3、添加模型 一、导读 环境:Ubuntu 24.04、Windows 11、WSL 2、Python 3.10 、bisheng 1.1.1 背景…...

如何用Python批量解压ZIP文件?快速解决方案
如何用Python批量解压ZIP文件?快速解决方案 文章目录 **如何用Python批量解压ZIP文件?快速解决方案**代码结果详细解释 话不多说,先上干货!!! 代码 import os import zipfiledef unzip_file(dir_path: str…...

DriveGenVLM:基于视觉-语言模型的自动驾驶真实世界视频生成
《DriveGenVLM: Real-world Video Generation for Vision Language Model based Autonomous Driving》2024年8月发表,来自哥伦比亚大学的论文。 自动驾驶技术的进步需要越来越复杂的方法来理解和预测现实世界的场景。视觉语言模型(VLM)正在成…...
JavaScript 中的五种继承方式进行深入对比
文章目录 前言JavaScript 五种继承方式对比原型链继承构造函数继承组合继承寄生组合继承ES6 class extends 继承五种继承方式对比表前言 对 JavaScript 中的五种继承方式进行深入对比:原型链继承、构造函数继承、组合继承、寄生组合继承、以及 ES6 的 class extends。 内容将…...

企业标准信息公共服务平台已开放标准通编辑器访问入口
标准通 数字化标准编辑器 专业、高效、便捷 企业标准信息公共服务平台 近日,企业标准信息公共服务平台已开放标准通编辑器访问入口,可进入官网指定版块使用! 核心功能亮点 解决企业痛点 传统标准编制,需反复核对格式、逐条…...
[Linux]安装吧!我的软件包管理器!
一、常见安装方式 在 Linux 中,有 3 种常见的软件安装方式: (1)yam、apt (2).rpm 安装包安装 (3)源码安装 二、什么是软件包 在 Linux 下安装软件,通常的办法是下载…...
Spring Boot 与 RabbitMQ 的深度集成实践(三)
高级特性实现 消息持久化 在实际的生产环境中,消息的可靠性是至关重要的。消息持久化是确保 RabbitMQ 在发生故障或重启后,消息不会丢失的关键机制。它涉及到消息、队列和交换机的持久化配置。 首先,配置队列持久化。在创建队列时…...

进阶-数据结构部分:1、数据结构入门
飞书文档https://x509p6c8to.feishu.cn/wiki/HRLkwznHiiOgZqkqhLrcZNqVnLd 一、存储结构 顺序存储 链式存储 二、常用数据结构 2.1、栈 先进后出 场景: 后退/前进功能:网页浏览器中的后退和前进按钮可以使用栈来实现。在浏览网页时,每次…...

React 19中useContext不需要Provider了。
文章目录 前言一、React 19中useContext移除了Provider?二、使用步骤总结 前言 在 React 19 中,useContext 的使用方式有所更新。开发者现在可以直接使用 作为提供者,而不再需要使用 <Context.Provider>。这一变化简化了代码结构&…...

Json schema校验json字符串(networknt/json-schema-validator库)
学习链接 json-schema官网 - 英文 jsonschemavalidator 可在线校验网站 networknt的json-schema-validator github地址 networknt的json-schema-validator 个人gitee地址 - 里面有md文档说明和代码示例 JSON Schema 入门指南:如何定义和验证 JSON 数据结构 JS…...

交易所开发:构建功能完备的金融基础设施全流程指南
交易所开发:构建功能完备的金融基础设施全流程指南 ——从技术架构到合规安全的系统性解决方案 一、开发流程:从需求分析到运维优化 开发一款功能完备的交易所需要遵循全生命周期管理理念,涵盖市场定位、技术实现、安全防护和持续迭代四大阶…...
Windows_Vs2022 C#语言开发环境构建
Windows_VisualStudio2022 C#语言开发环境构建 一、C#语言简介历史背景语言特点应用领域开发工具未来发展方向 二、Visual Studio 2022(一)开发语言支持(二)主要功能(三)适用场景(四)…...

Axure疑难杂症:统计分析页面引入Echarts示例动态效果
亲爱的小伙伴,在您浏览之前,烦请关注一下,在此深表感谢! Axure产品经理精品视频课已登录CSDN可点击学习https://edu.csdn.net/course/detail/40420 课程主题:统计分析页面引入Echarts示例动态效果 主要内容:echart示例引入、大小调整、数据导入 应用场景:统计分析页面…...

展锐Android14及更新版本split_build编译方法
更改split_build.py文件内容后按照下面方法编译: zip -r sys/vendor/sprd/release/split_build.zip sys/vendor/sprd/release/split_build/ rm -r sys/vendor/sprd/release/split_build/ cp -r vnd/vendor/sprd/release/split_build/ sys/vendor/sprd/release/cd s…...

青少年ctf平台应急响应-应急响应2
题目: 当前服务器被创建了一个新的用户,请提交新用户的用户名,得到的结果 ssh rootchallenge.qsnctf.com -p 30327 这个命令用于通过 SSH 协议连接到指定的远程服务器。具体解释如下: ssh:这是在 Unix-like 系统中…...