如何处理 Flink 作业中的数据倾斜问题?
分析&回答
什么是数据倾斜?
由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点。
举例:一个 Flink 作业包含 200 个 Task 节点,其中有 199 个节点可以在很短的时间内完成计算。但是有一个节点执行时间远超其他结果,并且随着数据量的持续增加,导致该计算节点挂掉,从而整个任务失败重启。我们可以在 Flink 的管理界面中看到任务的某一个 Task 数据量远超其他节点。
大数据框架的特性
- 不怕数据大,怕数据倾斜。
- jobs数比较多的作业运行效率相对比较低,如子查询比较多。
- sum,count,max,min等聚集函数,不会有数据倾斜问题
容易数据倾斜情况
- group by
- count(distinct ),在数据量大的情况下,容易数据倾斜,因为count(distinct)是按group by 字段分组,按distinct字段排序。
- 小表关联超大表
优化常用的手段
Flink 任务出现数据倾斜的直观表现是任务节点频繁出现反压,但是增加并行度后并不能解决问题;部分节点出现 OOM 异常,是因为大量的数据集中在某个节点上,导致该节点内存被爆,任务失败重启。
产生数据倾斜的原因主要有 2 个方面:
- 业务上有严重的数据热点,比如滴滴打车的订单数据中北京、上海等几个城市的订单量远远超过其他地区;
- 技术上大量使用了 KeyBy、GroupBy 等操作,错误的使用了分组 Key,人为产生数据热点。
因此解决问题的思路也很清晰:
- 业务上要尽量避免热点 key 的设计,例如我们可以把北京、上海等热点城市分成不同的区域,并进行单独处理;
- 技术上出现热点时,要调整方案打散原来的 key,避免直接聚合;此外 Flink 还提供了大量的功能可以避免数据倾斜。
解决数据倾斜问题
- 减少job数(合并MapReduce,用Multi-group by)
- 设置合理的mapreduce的task数,能有效提升性能。
- 数据量较大的情况下,慎用count(distinct)。
- 对小文件进行合并,针对文件数据源。
优化案例
- join原则 将条目少的表/子查询放在 Join的左边。 原因是在 Join 操作的 Reduce 阶段,位于 Join左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存溢出的几率。
当一个小表关联一个超大表时,容易发生数据倾斜,可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。
如:SELECT /*+ MAPJOIN(user)*/ l.session_id, u.username from user u join page_views lon (u. id=l.user_id) ;
复制代码
- 笛卡尔积 当Hive设定为严格模式(hive.mapred.mode=strict)时,不允许在HQL语句中出现笛卡尔积。
当无法躲避笛卡尔积时,采用MapJoin,会在Map端完成Join操作,将Join操作的一个或多个表完全读入内存。
MapJoin的用法是在查询/子查询的SELECT关键字后面添加/*+MAPJOIN(tablelist) */提示优化器转化为MapJoin 。
其中tablelist可以是一个表,或以逗号连接的表的列表。tablelist中的表将会读入内存,应该将小表写在这里
- 控制Map数
同时可执行的map数是有限的。
通常情况下,作业会通过input的目录产生一个或者多个map任务
主要的决定因素有: input的文件总个数,input的文件大小。
举例:a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(block为128M,6个128m的块和1个12m的块),从而产生7个map数b) 假设input目录下有3个文件a,b,c,大小分别为10m,20m,130m,那么hadoop会分隔成4个块(10m,20m,128m,2m),从而产生4个map数
复制代码
两种方式控制Map数:即减少map数和增加map数
- 减少map数可以通过合并小文件来实现,这点是对文件数据源来讲。
- 增加map数的可以通过控制上一个job的reduer数来控制
反思&扩展
Flink 消费 Kafka 上下游并行度不一致导致的数据倾斜
通常我们在使用 Flink 处理实时业务时,上游一般都是消息系统,Kafka 是使用最广泛的大数据消息系统。当使用 Flink 消费 Kafka 数据时,也会出现数据倾斜。
需要十分注意的是,我们 Flink 消费 Kafka 的数据时,是推荐上下游并行度保持一致,即 Kafka 的分区数等于 Flink Consumer 的并行度。
但是会有一种情况,为了加快数据的处理速度,来设置 Flink 消费者的并行度大于 Kafka 的分区数。如果你不做任何的设置则会导致部分 Flink Consumer 线程永远消费不到数据。
这时候你需要设置 Flink 的 Redistributing,也就是数据重分配。
GroupBy + Aggregation 分组聚合热点问题
业务上通过 GroupBy 进行分组,然后紧跟一个 SUM、COUNT 等聚合操作是非常常见的。我们都知道 GroupBy 函数会根据 Key 进行分组,完全依赖 Key 的设计,如果 Key 出现热点,那么会导致巨大的 shuffle,相同 key 的数据会被发往同一个处理节点;如果某个 key 的数据量过大则会直接导致该节点成为计算瓶颈,引起反压。
两阶段聚合解决 KeyBy 热点
KeyBy 是我们经常使用的分组聚合函数之一。在实际的业务中经常会碰到这样的场景:双十一按照下单用户所在的省聚合求订单量最高的前 10 个省,或者按照用户的手机类型聚合求访问量最高的设备类型等。
喵呜面试助手:一站式解决面试问题,你可以搜索微信小程序 [喵呜面试助手] 或关注 [喵呜刷题] -> 面试助手 免费刷题。如有好的面试知识或技巧期待您的共享!
相关文章:
如何处理 Flink 作业中的数据倾斜问题?
分析&回答 什么是数据倾斜? 由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点。 举例:一个 Flink 作业包含 200 个 Task 节点,其中有 199 个节点可以在很短的时间内完成计算。但是有一个节点执行时间…...
cobbler自动化安装CentOS、windows和ubuntu
环境介绍 同时玩cobbler3.3和cobbler2.8.5 cobbler3.3 系统CentOS8.3 VMware虚拟机 桥接到物理网络 IP: 192.168.1.33 cobbler2.8.5 系统CentOS7.9 VMWare虚拟机 桥接到物理网络 IP:192.168.1.33 安装cobbler3.3 yum源修改 cat /etc/yum.repo.d/Cento…...
springcloud3 GateWay章节-Nacos+gateway动态路由负载均衡4
一 工程结构 1.1 工程 1.2 搭建gatewayapi工程 1.pom文件 <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13</version><scope>test</scope></dependency><!--gateway--&g…...
RESTful API 面试必问
RESTful API是一种基于 HTTP 协议的 API 设计风格,它提供了一组规范和约束,使得客户端(如 Web 应用程序、移动应用等)和服务端之间的通信更加清晰、简洁和易于理解。 RESTful API 的设计原则 使用 HTTP 协议:RESTful …...
软件机器人助力行政审批局优化网约车业务流程,推动审批业务数字化转型
随着社会的进步和发展,行政审批业务逐渐趋向于智能化和自动化。近日,某市行政审批局在市场准入窗口引入博为小帮软件机器人大幅度提升了网约车办理业务的效率,创新了原有的业务模式。 软件机器人以其自动化、智能化的特性,优化了网…...
飞天使-python的字符串转义字符元组字典等
文章目录 基础语法数据类型python的字符串运算符输入和输出 数据结构列表与元组字典与集合 参考文档 基础语法 数据类型 数值型 ,整数 浮点型 布尔型, 真假, 假范围 字符型 类型转换python的字符串 了解转义字符一些基本的运算 \ 比如一行…...
stm32 uart dma方式接收不定长度字符
一般处理: stm32 uart使用dma接收时,会有自己的数据流中断,数据流中断会调用HAL_UART_RxCpltCallback。但是数据流中断只会在HAL_UART_Receive_DMA函数指定的buffer满时才会触发。 接收不定长度字符,需要和uart的UART_IT_IDLE结…...
SciencePub学术 | Elsevier出版社SCIEEI征稿中
SciencePub学术刊源推荐:Elsevier出版社SCIE&EI征稿中!信息如下,录满为止: 一、期刊概况: 计算机科学类SCI-01 【期刊简介】6.5-7.0,JCR1区,中科院2区; 【检索情况】正刊,SC…...
PHP小白搭建Kafka环境以及初步使用rdkafka
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、安装java(Kafka必须安装java,因为kafka依赖java核心)二、安装以及配置Kafka、zookeeper1.下载Kafka(无需下载…...
【Java Web】敏感词过滤
一、前缀树 假设有敏感词:b,abc,abd,bcd,abcd,efg,hii 那么前缀树可以构造为: 二、敏感词过滤器 package com.nowcoder.community.util;import org.apache.commons.lang3.CharUt…...
stable diffusion实践操作-提示词
本文专门开一节写提示词相关的内容,在看之前,可以同步关注: stable diffusion实践操作 正文 提示词是SD中非常重要,你生成的图片质量,基本就取决于提示词的好坏,提示词分为正向提示词和反向提示词。 模板…...
leetcode8.字符串转整数-Java
题目 请你来实现一个 myAtoi(string s) 函数,使其能将字符串转换成一个 32 位有符号整数(类似 C/C 中的 atoi 函数)。 函数 myAtoi(string s) 的算法如下: 读入字符串并丢弃无用的前导空格 检查下一个字符(假设还未到字…...
从零开始的Hadoop学习(四)| SSH无密登录配置、集群配置
1. SSH 无密登录配置 1.1 配置 ssh (1)基本语法 ssh 另一台电脑的IP地址 (2)ssh 连接时出现 Host key verification failed 的解决方法 [atguiguhadoop102 ~]$ ssh hadoop103(3)回退到 hadoop102 [at…...
微信小程序活动报名管理系统设计与实现
摘 要 随着当下的移动互联网技术的不断发展壮大,现在人们对于手机的应用已经非常的成熟,当下的时代基本上达到了人手一部手机,数字化、信息化已经成为了人们的主流生活。有数据统计,截止到2020年末我国的手机网民人数已经接近10亿…...
用Kubernetes(k8s)的ingress部署https应用
用Kubernetes的ingress部署https应用 环境准备Ingress安装域名证书准备 部署应用通过ingress暴露应用根据ssl证书生成对应的secret创建ingress暴露部署的应用确认自己安装了ingress创建ingress 访问你暴露的应用 环境准备 Ingress安装 我之前有一片文章写的是用ingress暴露应…...
【附安装包】MyEclipse2020安装教程
软件下载 软件:MyEclipse版本:2020语言:简体中文大小:1.61G安装环境:Win11/Win10/Win8/Win7硬件要求:CPU2.5GHz 内存4G(或更高)下载通道①百度网盘丨下载链接:https://pan.baidu.co…...
软件与软件工程
软件 软件的概念以及特点: 软件是计算机系统中不可或缺的一部分,与硬件共同构成特定的系统功能。 人们通常把各种不同功能的程序,包括系统程序、应用程序、用户自己编写的程序等称为软件 软件的概念: 软件不仅包括程序,还包括程序…...
记录一下:基于nginx配置的封禁真实IP
nginx Situation(背景)Task(任务)Action(行动)1:方法1:使用nginx 自带的deny 和 allow 来实现2:方法2:添加配置 Result(结果) Situati…...
【狂神】Spring5笔记(1-9)
目录 首页: 1.Spring 1.1 简介 1.2 优点 2.IOC理论推导 3.IOC本质 4.HelloSpring ERROR 5.IOC创建对象方式 5.1、无参构造 这个是默认的 5.2、有参构造 6.Spring配置说明 6.1、别名 6.2、Bean的配置 6.3、import 7.DL依赖注入环境 7.1 构造器注入 …...
Redis——急速安装并设置自启(CentOS)
现状 对于开发人员来说,部署服务器环境并不是一个高频操作。所以就导致绝大部分开发人员不会花太多时间去学习记忆,而是直接百度(有一些同学可能连链接都懒得收藏)。所以到了部署环境的时候就头疼,甚至是抗拒。除了每次…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
python/java环境配置
环境变量放一起 python: 1.首先下载Python Python下载地址:Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个,然后自定义,全选 可以把前4个选上 3.环境配置 1)搜高级系统设置 2…...
【算法训练营Day07】字符串part1
文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接:344. 反转字符串 双指针法,两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...
云原生玩法三问:构建自定义开发环境
云原生玩法三问:构建自定义开发环境 引言 临时运维一个古董项目,无文档,无环境,无交接人,俗称三无。 运行设备的环境老,本地环境版本高,ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...
08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险
C#入门系列【类的基本概念】:开启编程世界的奇妙冒险 嘿,各位编程小白探险家!欢迎来到 C# 的奇幻大陆!今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类!别害怕,跟着我,保准让你轻松搞…...
android13 app的触摸问题定位分析流程
一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...
深度学习之模型压缩三驾马车:模型剪枝、模型量化、知识蒸馏
一、引言 在深度学习中,我们训练出的神经网络往往非常庞大(比如像 ResNet、YOLOv8、Vision Transformer),虽然精度很高,但“太重”了,运行起来很慢,占用内存大,不适合部署到手机、摄…...
《信号与系统》第 6 章 信号与系统的时域和频域特性
目录 6.0 引言 6.1 傅里叶变换的模和相位表示 6.2 线性时不变系统频率响应的模和相位表示 6.2.1 线性与非线性相位 6.2.2 群时延 6.2.3 对数模和相位图 6.3 理想频率选择性滤波器的时域特性 6.4 非理想滤波器的时域和频域特性讨论 6.5 一阶与二阶连续时间系统 6.5.1 …...
OCR MLLM Evaluation
为什么需要评测体系?——背景与矛盾 能干的事: 看清楚发票、身份证上的字(准确率>90%),速度飞快(眨眼间完成)。干不了的事: 碰到复杂表格(合并单元…...
