Flink之Partitioner(分区规则)
Flink之Partitioner(分区规则)
方法 | 注释 |
---|---|
global() | 全部发往1个task |
broadcast() | 广播(前面的文章讲解过,这里不做阐述) |
forward() | 上下游并行度一致时一对一发送,和同一个算子连中算子的OneToOne是一回事 |
shuffle() | 随机分配(只是随机,同Spark的shuffle不同) |
rebalance() | 轮询分配,默认机制就是rebalance() |
recale() | 一般是下游task是上游task的并行度的倍数时,在生成job时,会将下游中的某几个subtask和上游的某个subtask绑成一组,然后在组内上游subtask以轮询的方式将数据发送给下游的subtask. |
partitionCustom | 自定义分区器(这里不做演示,后续会单独写一个自定义分区器的内容) |
keyBy() | 根据数据key的HashCode进行Hash分配 |
-
global
global
在实际业务场景中使用的不是很多,一般都是需要全局数据汇总的时候才会用到.global
就是将上游的数据全部发往下游的第一个subtask
中,也就是说下游设置再多的并行度是没意义的,所以使用global
的时候,下游的task
的并行度都是1
.
这里结合代码看一下:public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置并行度为3,且设置数据分区方式为globalDataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).global();// 切分字符串,设置并行度为1SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(1);//......env.execute("Flink Partitioner");} }
WebUI
界面查看代码中upperCaseMapStream
和splitFlatMapStream之间数据的发送方式
-
forward
forward
其实就是一对一发送数据,和之前讲解Task
的文章中提到的算子之间OneToOne
的模式是一样的,就是可以将forward
理解为同一个task chain
[算子链]中算子之间的数据传输方式,但是使用forward
的前提是上下游的算子并行度是一致的也就是上下游的subtask
数量保持一致,图解如下:
代码内容如下:
public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置为forward分区方式DataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).forward();// 切分字符串SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(3).startNewChain(); // 这里加上.startNewChain是为了在WebUI能看到效果,因为upperCaseMapStream和splitFlatMapStream的并行度是一致的,不加startNewChain默认的机制会将两者划分到同一个算子链中,就看不到实际的效果了.// ...env.execute("Flink Partitioner");} }
WebUI
界面查看upperCaseMapStream
和splitFlatMapStream
的数据发送方式,如下:
-
shuffle
通过前面
WebUI
的图片我们可以看到,从Socket
数据源将数据发送到第一个map
的时候使用的是默认的rebalance
方式,也就是轮询发送的方式,而这里说的shuffle
虽然也是一对多的发送方式,但是发送数据时是随机的,举个例子,上游有3
个subtask
,下游有5
个subtask
,数据源有5
条数据,上游中的某一个subtask
向下游发送数据时,是随机发送的,下游的5
个subtask
并不是每个都一定能接受到数据,可能有的接收到1
条,有的接收到2
条,有的接收到3
条数据,这就是shuffle
发送数据的方式.如果说上两个
operator
并行度一致,上游选择了shuffle
发送数据的方式,那么两个operator
会绑定成一个task chain
么?不会,因为shuffle
的数据发送方式就已经导致两个operator
不是OneToOne
的模式了.
代码示例:public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置为shuffle分区方式DataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).shuffle();// 切分字符串SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(7)// ...env.execute("Flink Partitioner");} }
WebUI
界面查看upperCaseMapStream
和splitFlatMapStream
的数据发送方式,如下:
-
Rebalance
rebalance
就是Flink默认的数据分发机制,直白的讲就是给每个小朋友一人一块糖果,直到发完为止,不偏不倚,这个不细说了,没什么可讲的.
-
recale
关于
recale
前面说到了是组内的方式进行轮询分发数据,这里就以图解的方式进行讲解,便于理解.Flink任务启动时,如果发现上下游中使用了recale分发数据的方式就会将上下游的subtask进行分组绑定,如上游有2个subtask,下游有四个subtask,就会将上游的一个subtask和下游的两个subtask进行绑定,如下图:
当上下游对应的
subtask
分组后,上下游组内的subtak
就会以组内轮询的方式发送数据,如下图:
-
keyBy
keyBy
使用的HASH
分区方式,实际是hashCode()
+murmurHash()
的组合方式,这个在源码的KeyGroupRangeAssignment
类中是可以看到的,简单来说根据key
的hash
值模除以下游的最大并行度(return MathUtils.murmurHash(keyHash) % maxParallelism;
).关于
keyBy
的使用应该都很熟悉了,这里直接给大家看演示结果吧,如下图:
以上就是对Flink中分区规则的讲解.
相关文章:

Flink之Partitioner(分区规则)
Flink之Partitioner(分区规则) 方法注释global()全部发往1个taskbroadcast()广播(前面的文章讲解过,这里不做阐述)forward()上下游并行度一致时一对一发送,和同一个算子连中算子的OneToOne是一回事shuffle()随机分配(只是随机,同Spark的shuffle不同)rebalance()轮询分配,默认机…...

tk切换到mac的code分享
文章目录 前言一、基础环境配置二、开发软件与扩展1.用到的开发软件与平替、扩展情况 总结 前言 最近换上了coding人生的第一台mac,以前一直偏好tk,近来身边的朋友越来越多的用mac了,win的自动更新越来越占磁盘了,而且win11抛弃了…...

spark的standalone 分布式搭建
一、环境准备 集群环境hadoop11,hadoop12 ,hadoop13 安装 zookeeper 和 HDFS 1、启动zookeeper -- 启动zookeeper(11,12,13都需要启动) xcall.sh zkServer.sh start -- 或者 zk.sh start -- xcall.sh 和zk.sh都是自己写的脚本-- 查看进程 jps -- 有…...

浅析基于视频汇聚与AI智能分析的新零售方案设计
一、行业背景 近年来,随着新零售概念的提出,国内外各大企业纷纷布局智慧零售领域。从无人便利店、智能售货机,到线上线下融合的电商平台,再到通过大数据分析实现精准推送的个性化营销,智慧零售的触角已经深入各个零售…...

SpringMVC之异常处理
SpringMVC之异常处理 异常分为编译时异常和运行时异常,编译时异常我们trycatch捕获,捕获后自行处理,而运行时异常是不可预期的,就需要规范编码来避免,在SpringMVC中,不管是编译异常还是运行时异常ÿ…...

保险龙头科技进化论:太保的六年
如果从2013年中国首家互联网保险公司——众安在线的成立算起,保险科技在我国的发展已走进第十个年头。十年以来,在政策指引、技术发展和金融机构数字化转型的大背景下,科技赋能保险业高质量发展转型已成为行业共识。 大数据、云计算、人工智…...

升级STM32电机PID速度闭环编程:从F1到F4的移植技巧与实例解析
引言: 在嵌入式系统开发中,STM32系列微控制器广泛应用于各种应用领域。而对于直流有刷电机的控制,PID速度闭环是一种常用的控制方式。本文将以此为例,探讨如何从STM32F1系列移植到STM32F4系列,并详细介绍HAL库在不同型…...

GaussDB 实验篇+openGauss的4种1级分区案例
✔ 范围分区/range分区 -- 创建表 drop table if exists zzt.par_range; create table if not exists zzt.par_range (empno integer,ename char(10),job char(9),mgr integer(4),hiredate date,sal numeric(7,2),comm numeric(7,2),deptno integer,constraint pk_par_emp pri…...

Ruby软件外包开发语言特点
Ruby 是一种动态、开放源代码的编程语言,它注重简洁性和开发人员的幸福感。在许多方面都具有优点,但由于其动态类型和解释执行的特性,它可能不适合某些对性能和类型安全性要求较高的场景。下面和大家分享 Ruby 语言的一些主要特点以及适用的场…...

《系统架构设计师教程》重点章节思维导图
内容来自《系统架构设计师教程》,筛选系统架构设计师考试中分值重点分布的章节,根据章节的内容整理出相关思维导图。 重点章节 第2章:计算机系统知识第5章:软件工程基础知识第7章:系统架构设计基础知识第8章࿱…...

mac录屏工具,录屏没有声音的解决办法
mac录屏工具,录屏没有声音的解决办法 在使用macbook录制屏幕时,发现自带的录屏工具QuickTime Player没有声音,于是尝试了多款录屏工具,对其做一些经验总结(省流:APP Store直接可以免费下载使用Omi录屏专家…...
神经网络基础-神经网络补充概念-33-偏差与方差
概念 偏差(Bias): 偏差是模型预测值与实际值之间的差距,它反映了模型对训练数据的拟合能力。高偏差意味着模型无法很好地拟合训练数据,通常会导致欠拟合。欠拟合是指模型过于简单,不能捕捉数据中的复杂模式…...

单片机第一季:零基础13——AD和DA转换
1,AD转换基本概念 51 单片机系统内部运算时用的全部是数字量,即0 和1,因此对单片机系统而言,无法直接操作模拟量,必须将模拟量转换成数字量。所谓数字量,就是用一系列0 和1 组成的二进制代码表示某个信号大…...
小区外卖跑腿,解决最后100米配送难题
小区外卖跑腿,解决最后100米配送难题 小区外卖跑腿作为新市场环境下的创业模式,通过选择小区里的闲散人员作为骑手,解决了最后100米配送的问题。这项业务不仅包括小区业主的取快递、寄快递等日常需求,还能提供小区帮忙、小区外卖…...

ZooKeeper的应用场景(命名服务、分布式协调通知)
3 命名服务 命名服务(NameService)也是分布式系统中比较常见的一类场景,在《Java网络高级编程》一书中提到,命名服务是分布式系统最基本的公共服务之一。在分布式系统中,被命名的实体通常可以是集群中的机器、提供的服务地址或远程对象等一这…...

网络套接字
网络套接字 文章目录 网络套接字认识端口号初识TCP协议初识UDP协议网络字节序 socket编程接口socket创建socket文件描述符bind绑定端口号sockaddr结构体netstat -nuap:查看服务器网络信息 代码编译运行展示 实现简单UDP服务器开发 认识端口号 端口号(port)是传输层协…...

对话 4EVERLAND:Web3 是云计算的新基建吗?
在传统云计算的发展过程中,数据存储与计算的中心化问题,对用户来说一直存在着潜在的安全与隐私风险——例如单点故障可能会导致网络瘫痪和数据泄露等危险。同时,随着越来越多 Web3 项目应用的落地,对于数据云计算的性能要求也越来…...

iOS申请证书(.p12)和描述文件(.mobileprovision)
打包app时,经常会用到ios证书,但很多人都苦于没有苹果电脑,即使有苹果电脑的,也会觉得苹果电脑操作也很麻烦,这里记录一下,用香蕉云编,申请证书及描述文件的过程。 香蕉云编的地址:…...

Java:PO、VO、BO、DO、DAO、DTO、POJO
💗wei_shuo的个人主页 💫wei_shuo的学习社区 🌐Hello World ! Java:PO、VO、BO、DO、DAO、DTO、POJO PO持久化对象(Persistent Object) PO是持久化对象,用于表示数据库中的实体或表…...

c语言每日一练(8)
前言:每日一练系列,每一期都包含5道选择题,2道编程题,博主会尽可能详细地进行讲解,令初学者也能听的清晰。每日一练系列会持续更新,暑假时三天之内必有一更,到了开学之后,将看学业情…...
rknn优化教程(二)
文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK,开始写第二篇的内容了。这篇博客主要能写一下: 如何给一些三方库按照xmake方式进行封装,供调用如何按…...

黑马Mybatis
Mybatis 表现层:页面展示 业务层:逻辑处理 持久层:持久数据化保存 在这里插入图片描述 Mybatis快速入门 
《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》
引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...

04-初识css
一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...

(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...
高防服务器价格高原因分析
高防服务器的价格较高,主要是由于其特殊的防御机制、硬件配置、运营维护等多方面的综合成本。以下从技术、资源和服务三个维度详细解析高防服务器昂贵的原因: 一、硬件与技术投入 大带宽需求 DDoS攻击通过占用大量带宽资源瘫痪目标服务器,因此…...

海云安高敏捷信创白盒SCAP入选《中国网络安全细分领域产品名录》
近日,嘶吼安全产业研究院发布《中国网络安全细分领域产品名录》,海云安高敏捷信创白盒(SCAP)成功入选软件供应链安全领域产品名录。 在数字化转型加速的今天,网络安全已成为企业生存与发展的核心基石,为了解…...

C++--string的模拟实现
一,引言 string的模拟实现是只对string对象中给的主要功能经行模拟实现,其目的是加强对string的底层了解,以便于在以后的学习或者工作中更加熟练的使用string。本文中的代码仅供参考并不唯一。 二,默认成员函数 string主要有三个成员变量,…...