当前位置: 首页 > news >正文

Flink之Partitioner(分区规则)

Flink之Partitioner(分区规则)

方法注释
global()全部发往1个task
broadcast()广播(前面的文章讲解过,这里不做阐述)
forward()上下游并行度一致时一对一发送,和同一个算子连中算子的OneToOne是一回事
shuffle()随机分配(只是随机,同Spark的shuffle不同)
rebalance()轮询分配,默认机制就是rebalance()
recale()一般是下游task是上游task的并行度的倍数时,在生成job时,会将下游中的某几个subtask和上游的某个subtask绑成一组,然后在组内上游subtask以轮询的方式将数据发送给下游的subtask.
partitionCustom自定义分区器(这里不做演示,后续会单独写一个自定义分区器的内容)
keyBy()根据数据key的HashCode进行Hash分配
  • global

    global在实际业务场景中使用的不是很多,一般都是需要全局数据汇总的时候才会用到.global就是将上游的数据全部发往下游的第一个subtask中,也就是说下游设置再多的并行度是没意义的,所以使用global的时候,下游的task的并行度都是1.
    在这里插入图片描述
    这里结合代码看一下:

    public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置并行度为3,且设置数据分区方式为globalDataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).global();// 切分字符串,设置并行度为1SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(1);//......env.execute("Flink Partitioner");}
    }
    

    WebUI界面查看代码中upperCaseMapStreamsplitFlatMapStream之间数据的发送方式
    在这里插入图片描述

  • forward

    forward其实就是一对一发送数据,和之前讲解Task的文章中提到的算子之间OneToOne的模式是一样的,就是可以将forward理解为同一个task chain[算子链]中算子之间的数据传输方式,但是使用forward的前提是上下游的算子并行度是一致的也就是上下游的subtask数量保持一致,图解如下:
    在这里插入图片描述

    代码内容如下:

    public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置为forward分区方式DataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).forward();// 切分字符串SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(3).startNewChain(); // 这里加上.startNewChain是为了在WebUI能看到效果,因为upperCaseMapStream和splitFlatMapStream的并行度是一致的,不加startNewChain默认的机制会将两者划分到同一个算子链中,就看不到实际的效果了.// ...env.execute("Flink Partitioner");}
    }
    

    WebUI界面查看upperCaseMapStreamsplitFlatMapStream的数据发送方式,如下:
    在这里插入图片描述

  • shuffle

    通过前面WebUI的图片我们可以看到,从Socket数据源将数据发送到第一个map的时候使用的是默认的rebalance方式,也就是轮询发送的方式,而这里说的shuffle虽然也是一对多的发送方式,但是发送数据时是随机的,举个例子,上游有3subtask,下游有5subtask,数据源有5条数据,上游中的某一个subtask向下游发送数据时,是随机发送的,下游的5subtask并不是每个都一定能接受到数据,可能有的接收到1条,有的接收到2条,有的接收到3条数据,这就是shuffle发送数据的方式.

    如果说上两个operator并行度一致,上游选择了shuffle发送数据的方式,那么两个operator会绑定成一个task chain么?不会,因为shuffle的数据发送方式就已经导致两个operator不是OneToOne的模式了.
    在这里插入图片描述
    代码示例:

    public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置为shuffle分区方式DataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).shuffle();// 切分字符串SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(7)// ...env.execute("Flink Partitioner");}
    }
    

    WebUI界面查看upperCaseMapStreamsplitFlatMapStream的数据发送方式,如下:
    在这里插入图片描述

  • Rebalance

    rebalance就是Flink默认的数据分发机制,直白的讲就是给每个小朋友一人一块糖果,直到发完为止,不偏不倚,这个不细说了,没什么可讲的.
    在这里插入图片描述

  • recale

    关于recale前面说到了是组内的方式进行轮询分发数据,这里就以图解的方式进行讲解,便于理解.

    Flink任务启动时,如果发现上下游中使用了recale分发数据的方式就会将上下游的subtask进行分组绑定,如上游有2个subtask,下游有四个subtask,就会将上游的一个subtask和下游的两个subtask进行绑定,如下图:
    在这里插入图片描述

    当上下游对应的subtask分组后,上下游组内的subtak就会以组内轮询的方式发送数据,如下图:
    在这里插入图片描述

  • keyBy

    keyBy使用的HASH分区方式,实际是hashCode() + murmurHash()的组合方式,这个在源码的KeyGroupRangeAssignment类中是可以看到的,简单来说根据keyhash值模除以下游的最大并行度(return MathUtils.murmurHash(keyHash) % maxParallelism;).

    关于keyBy的使用应该都很熟悉了,这里直接给大家看演示结果吧,如下图:
    在这里插入图片描述

以上就是对Flink中分区规则的讲解.

相关文章:

Flink之Partitioner(分区规则)

Flink之Partitioner(分区规则) 方法注释global()全部发往1个taskbroadcast()广播(前面的文章讲解过,这里不做阐述)forward()上下游并行度一致时一对一发送,和同一个算子连中算子的OneToOne是一回事shuffle()随机分配(只是随机,同Spark的shuffle不同)rebalance()轮询分配,默认机…...

tk切换到mac的code分享

文章目录 前言一、基础环境配置二、开发软件与扩展1.用到的开发软件与平替、扩展情况 总结 前言 最近换上了coding人生的第一台mac&#xff0c;以前一直偏好tk&#xff0c;近来身边的朋友越来越多的用mac了&#xff0c;win的自动更新越来越占磁盘了&#xff0c;而且win11抛弃了…...

spark的standalone 分布式搭建

一、环境准备 集群环境hadoop11&#xff0c;hadoop12 &#xff0c;hadoop13 安装 zookeeper 和 HDFS 1、启动zookeeper -- 启动zookeeper(11,12,13都需要启动) xcall.sh zkServer.sh start -- 或者 zk.sh start -- xcall.sh 和zk.sh都是自己写的脚本-- 查看进程 jps -- 有…...

浅析基于视频汇聚与AI智能分析的新零售方案设计

一、行业背景 近年来&#xff0c;随着新零售概念的提出&#xff0c;国内外各大企业纷纷布局智慧零售领域。从无人便利店、智能售货机&#xff0c;到线上线下融合的电商平台&#xff0c;再到通过大数据分析实现精准推送的个性化营销&#xff0c;智慧零售的触角已经深入各个零售…...

SpringMVC之异常处理

SpringMVC之异常处理 异常分为编译时异常和运行时异常&#xff0c;编译时异常我们trycatch捕获&#xff0c;捕获后自行处理&#xff0c;而运行时异常是不可预期的&#xff0c;就需要规范编码来避免&#xff0c;在SpringMVC中&#xff0c;不管是编译异常还是运行时异常&#xff…...

保险龙头科技进化论:太保的六年

如果从2013年中国首家互联网保险公司——众安在线的成立算起&#xff0c;保险科技在我国的发展已走进第十个年头。十年以来&#xff0c;在政策指引、技术发展和金融机构数字化转型的大背景下&#xff0c;科技赋能保险业高质量发展转型已成为行业共识。 大数据、云计算、人工智…...

升级STM32电机PID速度闭环编程:从F1到F4的移植技巧与实例解析

引言&#xff1a; 在嵌入式系统开发中&#xff0c;STM32系列微控制器广泛应用于各种应用领域。而对于直流有刷电机的控制&#xff0c;PID速度闭环是一种常用的控制方式。本文将以此为例&#xff0c;探讨如何从STM32F1系列移植到STM32F4系列&#xff0c;并详细介绍HAL库在不同型…...

GaussDB 实验篇+openGauss的4种1级分区案例

✔ 范围分区/range分区 -- 创建表 drop table if exists zzt.par_range; create table if not exists zzt.par_range (empno integer,ename char(10),job char(9),mgr integer(4),hiredate date,sal numeric(7,2),comm numeric(7,2),deptno integer,constraint pk_par_emp pri…...

Ruby软件外包开发语言特点

Ruby 是一种动态、开放源代码的编程语言&#xff0c;它注重简洁性和开发人员的幸福感。在许多方面都具有优点&#xff0c;但由于其动态类型和解释执行的特性&#xff0c;它可能不适合某些对性能和类型安全性要求较高的场景。下面和大家分享 Ruby 语言的一些主要特点以及适用的场…...

《系统架构设计师教程》重点章节思维导图

内容来自《系统架构设计师教程》&#xff0c;筛选系统架构设计师考试中分值重点分布的章节&#xff0c;根据章节的内容整理出相关思维导图。 重点章节 第2章&#xff1a;计算机系统知识第5章&#xff1a;软件工程基础知识第7章&#xff1a;系统架构设计基础知识第8章&#xff1…...

mac录屏工具,录屏没有声音的解决办法

mac录屏工具&#xff0c;录屏没有声音的解决办法 在使用macbook录制屏幕时&#xff0c;发现自带的录屏工具QuickTime Player没有声音&#xff0c;于是尝试了多款录屏工具&#xff0c;对其做一些经验总结&#xff08;省流&#xff1a;APP Store直接可以免费下载使用Omi录屏专家…...

神经网络基础-神经网络补充概念-33-偏差与方差

概念 偏差&#xff08;Bias&#xff09;&#xff1a; 偏差是模型预测值与实际值之间的差距&#xff0c;它反映了模型对训练数据的拟合能力。高偏差意味着模型无法很好地拟合训练数据&#xff0c;通常会导致欠拟合。欠拟合是指模型过于简单&#xff0c;不能捕捉数据中的复杂模式…...

单片机第一季:零基础13——AD和DA转换

1&#xff0c;AD转换基本概念 51 单片机系统内部运算时用的全部是数字量&#xff0c;即0 和1&#xff0c;因此对单片机系统而言&#xff0c;无法直接操作模拟量&#xff0c;必须将模拟量转换成数字量。所谓数字量&#xff0c;就是用一系列0 和1 组成的二进制代码表示某个信号大…...

小区外卖跑腿,解决最后100米配送难题

小区外卖跑腿&#xff0c;解决最后100米配送难题 小区外卖跑腿作为新市场环境下的创业模式&#xff0c;通过选择小区里的闲散人员作为骑手&#xff0c;解决了最后100米配送的问题。这项业务不仅包括小区业主的取快递、寄快递等日常需求&#xff0c;还能提供小区帮忙、小区外卖…...

ZooKeeper的应用场景(命名服务、分布式协调通知)

3 命名服务 命名服务(NameService)也是分布式系统中比较常见的一类场景&#xff0c;在《Java网络高级编程》一书中提到&#xff0c;命名服务是分布式系统最基本的公共服务之一。在分布式系统中&#xff0c;被命名的实体通常可以是集群中的机器、提供的服务地址或远程对象等一这…...

网络套接字

网络套接字 文章目录 网络套接字认识端口号初识TCP协议初识UDP协议网络字节序 socket编程接口socket创建socket文件描述符bind绑定端口号sockaddr结构体netstat -nuap&#xff1a;查看服务器网络信息 代码编译运行展示 实现简单UDP服务器开发 认识端口号 端口号(port)是传输层协…...

对话 4EVERLAND:Web3 是云计算的新基建吗?

在传统云计算的发展过程中&#xff0c;数据存储与计算的中心化问题&#xff0c;对用户来说一直存在着潜在的安全与隐私风险——例如单点故障可能会导致网络瘫痪和数据泄露等危险。同时&#xff0c;随着越来越多 Web3 项目应用的落地&#xff0c;对于数据云计算的性能要求也越来…...

iOS申请证书(.p12)和描述文件(.mobileprovision)

打包app时&#xff0c;经常会用到ios证书&#xff0c;但很多人都苦于没有苹果电脑&#xff0c;即使有苹果电脑的&#xff0c;也会觉得苹果电脑操作也很麻烦&#xff0c;这里记录一下&#xff0c;用香蕉云编&#xff0c;申请证书及描述文件的过程。 香蕉云编的地址&#xff1a;…...

Java:PO、VO、BO、DO、DAO、DTO、POJO

&#x1f497;wei_shuo的个人主页 &#x1f4ab;wei_shuo的学习社区 &#x1f310;Hello World &#xff01; Java&#xff1a;PO、VO、BO、DO、DAO、DTO、POJO PO持久化对象&#xff08;Persistent Object&#xff09; PO是持久化对象&#xff0c;用于表示数据库中的实体或表…...

c语言每日一练(8)

前言&#xff1a;每日一练系列&#xff0c;每一期都包含5道选择题&#xff0c;2道编程题&#xff0c;博主会尽可能详细地进行讲解&#xff0c;令初学者也能听的清晰。每日一练系列会持续更新&#xff0c;暑假时三天之内必有一更&#xff0c;到了开学之后&#xff0c;将看学业情…...

周期 角频率 频率 振幅 初相角

周期 角频率 频率 振幅 初相角 当我们谈论傅里叶级数或波形分析时&#xff0c;以下术语经常出现&#xff1a; 周期 T T T: 函数在其图形上重复的时间或空间的长度。周期的倒数是频率。 频率 f f f: 周期的倒数&#xff0c;即一秒内波形重复的次数。单位通常为赫兹&#xff…...

根据一棵树的两种遍历构造二叉树

题目 给定两个整数数组 preorder 和 inorder &#xff0c;其中 preorder 是二叉树的先序遍历&#xff0c; inorder 是同一棵树的中序遍历&#xff0c;请构造二叉树并返回其根节点。 示例 1: 输入: preorder [3,9,20,15,7], inorder [9,3,15,20,7] 输出: [3,9,20,null,null,…...

stack 、 queue的语法使用及底层实现以及deque的介绍【C++】

文章目录 stack的使用queue的使用适配器queue的模拟实现stack的模拟实现deque stack的使用 stack是一种容器适配器&#xff0c;具有后进先出&#xff0c;只能从容器的一端进行元素的插入与提取操作 #include <iostream> #include <vector> #include <stack&g…...

没学C++,如何从C语言丝滑过度到python【python基础万字详解】

大家好&#xff0c;我是纪宁。 文章将从C语言出发&#xff0c;深入介绍python的基础知识&#xff0c;也包括很多python的新增知识点详解。 文章目录 1.python的输入输出&#xff0c;重新认识 hello world&#xff0c;重回那个激情燃烧的岁月1.1 输出函数print的规则1.2 输入函…...

haproxy负载均衡

1、配置环境 作用环境windows测试  192.168.33.158 172.25.0.11 haproxy负载均衡haproxy&#xff1a;2.8.1&#xff0c;centos7172.25.0.31web服务器1--rs1Apache&#xff1a;2.4&#xff0c;redhat9172.25.0.32web服务器2--rs2Apache&#xff1a;2.4 &#xff0c; redhat9 2、…...

【数据结构】顺序队列模拟实现

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …...

TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

TiDB数据库从入门到精通系列之六&#xff1a;使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka 一、技术流程二、搭建环境三、创建Kafka changefeed四、写入数据以产生变更日志五、配置 Flink 消费 Kafka 数据 一、技术流程 快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群创建 c…...

Spring对象装配

在spring中&#xff0c;Bean的执行流程为启动spring容器&#xff0c;实例化bean&#xff0c;将bean注册到spring容器中&#xff0c;将bean装配到需要的类中。 既然我们需要将bea装配到需要的类中&#xff0c;那么如何实现呢&#xff1f;这篇文章&#xff0c;将来阐述一下如何实…...

bigemap如何添加mapbox地图?

第一步 打开浏览器&#xff0c;找到你要访问的地图的URL地址&#xff0c;并且确认可以正常在浏览器中访问&#xff1b;浏览器中不能访问&#xff0c;同样也不能在软件中访问。 以下为常用地图源地址&#xff1a; 天地图&#xff1a; http://map.tianditu.gov.cn 包含&…...

python爬虫6:lxml库

python爬虫6&#xff1a;lxml库 前言 ​ python实现网络爬虫非常简单&#xff0c;只需要掌握一定的基础知识和一定的库使用技巧即可。本系列目标旨在梳理相关知识点&#xff0c;方便以后复习。 申明 ​ 本系列所涉及的代码仅用于个人研究与讨论&#xff0c;并不会对网站产生不好…...