当前位置: 首页 > news >正文

SparkStreaming之04:调优

SparkStreaming调优

一 、要点

4.1 SparkStreaming运行原理

在这里插入图片描述

深入理解

在这里插入图片描述

4.2 调优策略

4.2.1 调整BlockReceiver的数量

在这里插入图片描述

案例演示:

object MultiReceiverNetworkWordCount {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("NetworkWordCount")val sc = new SparkContext(sparkConf)// Create the context with a 1 second batch sizeval ssc = new StreamingContext(sc, Seconds(5))//创建多个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER)val lines = lines1.union(lines2)lines.repartition(100)//处理的逻辑,就是简单的进行word countval words = lines.repartition(100).flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))//将结果输出到控制台wordCounts.print()//启动Streaming处理流ssc.start()//等待Streaming程序终止ssc.awaitTermination()ssc.stop(false)}
}
⭐️4.2.2 调整Block的数量

batchInterval : 触发批处理的时间间隔
blockInterval :将接收到的数据生成Block的时间间隔,spark.streaming.blockInterval(默认是200ms),那么,BlockRDD的分区数 = batchInterval / blockInterval,即一个Block就是RDD的一个分区,就是一个task
比如,batchInterval是2秒,而blockInterval是200ms,那么task数为10,如果task的数量太少,比一个executor的core数还少的话,那么可以减少blockInterval,blockInterval最好不要小于50ms,太小的话导致task数太多,那么launch task的时间久多了

4.2.3 调整Receiver的接受速率

pps:permits per second 每秒允许接受的数据量(QPS -> queries per second)
Spark Streaming默认的PPS是没有限制的,可以通过参数spark.streaming.receiver.maxRate来控制,默认是Long.Maxvalue

⭐️4.2.3 调整数据处理的并行度

BlockRDD的分区数

a. 通过Receiver接受数据的特点决定

b. 也可以自己通过repartition设置

ShuffleRDD的分区数

a. 默认的分区数为spark.default.parallelism(core的大小)

b. 通过我们自己设置决定

val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))
4.2.4 数据的序列化

SparkStreaming两种需要序列化的数据:
a. 输入的数据:默认是以StorageLevel.MEMORY_AND_DISK_SER_2的形式存储在executor上的内存中
b. 缓存的数据:默认是以StorageLevel.MEMORY_ONLY_SER的形式存储的内存中
使用Kryo序列化机制,比Java序列化机制性能好

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
4.2.5 内存调优
(1)需要内存大小

和transformation的类型有关,如果使用的是updateStateByKey,Window这样的算子,那么内存就要设置得偏大

(2)数据存储级别

如果把接收到的数据设置的存储级别是MEMORY_DISK这种级别,也就是说如果内存不够可以把数据存储到磁盘上,其实性能还是不好的,性能最好的就是所有的数据都在内存里面,所以如果在资源允许的情况下,把内存调大一点,让所有的数据都存在内存里面。

4.2.6 Outout性能
(1)MySQL,HBase

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

(2)Kafka(0.8版本)

虽然现在的Kafka的版本已经到2.x版本了,但是很多公司因为历史遗留的原因,公司里面还是会有0.8x的Kafka。比如本人公司里面有两个Kafka集群,一个是0.8x的kafka,一个是1.x的Kafka。开发的时候有时候需要我们使用SparkStreaming做实时的ETL,然后再把数据打回Kafka,0.8版本的kafka默认是没有批量提交的功能的。本人公司里面一个真实的案例,一位同学写的SparkStreaming程序将数据处理完了以后通过ForeachRDD把数据写回到0.8Kafka。但是数据处理得很慢,经常会收到延时告警。最终发现他把数据写到Kafka的时候是一条数据一条数据提交的性能很差。最终手动实现了批量提交的功能。从此再也没有收到过告警。

4.2.7 Backpressure(压力反馈)

在这里插入图片描述
在这里插入图片描述

Feedback Loop : 动态使得Streaming app从unstable状态回到stable状态

在这里插入图片描述

从Spark1.5版本开始:spark.streaming.backpressure.enabled = true

4.2.8 Elastic Scaling(资源动态分配)

动态分配资源:

批处理动态的决定这个application中需要多少个Executors:

  1. 当一个Executor空闲的时候,将这个Executor杀掉
  2. 当task太多的时候,动态的启动Executors

Streaming分配Executor的原则是比对 process time / batchInterval 的比率

在这里插入图片描述

如果延迟了,那么就自动增加资源

在这里插入图片描述

在这里插入图片描述

从Spark2.0有这个功能:: spark.streaming.dynamicAllocation.enabled = true

⭐️4.2.8 数据倾斜调优(重要)

因为SparkStreaming的底层就是RDD,之前SparkCore的所有的数据倾斜的调优策略(见Spark之数据倾斜调优)都适合于SparkStreaming,需要灵活掌握,在实际开发的工作当中用得频率较高。

二 、总结

面试问题:你在工作当中有SparkStreaming调优过项目吗?怎么调优的?效果怎么样?

  1. 比如举foreachRDD的例子
  2. 比如举个数据倾斜的例子
  3. 用Xmind整理调优的策略

相关文章:

SparkStreaming之04:调优

SparkStreaming调优 一 、要点 4.1 SparkStreaming运行原理 深入理解 4.2 调优策略 4.2.1 调整BlockReceiver的数量 案例演示: object MultiReceiverNetworkWordCount {def main(args: Array[String]) {val sparkConf new SparkConf().setAppName("Networ…...

勿以危小而为之勿以避率而不为

《故事汇之:所见/所闻/所历/所想》:《公园散步与小雨遇记》(二) 就差一点到山顶了,路上碰到一阿姨,她说等会儿要下大雨了,让我不要往上走了,我犹豫了一会儿,还是听劝地返…...

JavaWeb后端基础(4)

这一篇就开始是做一个项目了,在项目里学习,我主要记录在学习过程中遇到的问题,以及一些知识点 Restful风格 一种软件架构风格 在REST风格的URL中,通过四种请求方式,来操作数据的增删改查。 GET : 查询 …...

SpringBoot调用DeepSeek

引入依赖 <dependency><groupId>io.github.pig-mesh.ai</groupId><artifactId>deepseek-spring-boot-starter</artifactId><version>1.4.5</version> </dependency>配置 deepseek:api-key: sk-******base-url: https://api.…...

记录一下本地部署Dify的坑

1. 截止2025-3-4为止&#xff0c;请注意&#xff0c;不要直接拉Dify的1.0.0版本。请先试用0.15.3版本。1.0.0有一个bug需要解决。[PANIC]failed to init dify plugin db: failed to connect to hostdb userpostgres databasepostgres Issue #14707 langgenius/dify GitHub …...

LC109. 有序链表转换平衡二叉搜索树

LC109. 有序链表转换平衡二叉搜索树 题目要求(一)快慢指针1. 理解问题2. 解决思路3. 具体步骤4. 代码实现5. 复杂度分析6. 示例解释7. 总结 LC109. 有序链表转换平衡二叉搜索树 题目要求 (一)快慢指针 要将一个按升序排列的单链表转换为平衡的二叉搜索树&#xff08;BST&…...

Hutool一个类型转换工具类 `Convert`,

Hutool 是一个非常实用的Java工具库&#xff0c;旨在简化Java开发中的常见任务。它包含了一个类型转换工具类 Convert&#xff0c;可以帮助开发者轻松地进行各种类型之间的转换。以下是一些使用 Convert 类进行类型转换的例子&#xff1a; 基本类型转换 假设你需要将一个字符…...

基于eRDMA实测DeepSeek开源的3FS

DeepSeek昨天开源了3FS分布式文件系统, 通过180个存储节点提供了 6.6TiB/s的存储性能, 全面支持大模型的训练和推理的KVCache转存以及向量数据库等能力, 每个客户端节点支持40GB/s峰值吞吐用于KVCache查找. 发布后, 我们在阿里云ECS上进行了快速的复现, 并进行了性能测试, ECS…...

【Linux篇】第一个系统程序 - 进度条

文章目录 1.回车与换行2.行缓冲区3.倒计时程序4.进度条 1.回车与换行 回车的概念: 回到当前行的最开始 \r换行的概念: 换到当前行的下一行\n 2.行缓冲区 当我们运行下面这段程序时&#xff0c;我们会发现屏幕上首先会打印出hello world!,再过两秒后程序结束。 当我们把\n去掉…...

VLM-E2E:通过多模态驾驶员注意融合增强端到端自动驾驶

25年2月来自香港科大广州分校、理想汽车和厦门大学的论文“VLM-E2E: Enhancing End-to-End Autonomous Driving with Multimodal Driver Attention Fusion”。 人类驾驶员能够利用丰富的注意语义&#xff0c;熟练地应对复杂场景&#xff0c;但当前的自动驾驶系统难以复制这种能…...

如何将飞书多维表格与DeepSeek R1结合使用:效率提升的完美搭档

将飞书的多维表格与DeepSeek R1结合使用&#xff0c;就像为你的数据管理和分析之旅装上一台涡轮增压器。两者的合作&#xff0c;不仅仅在速度上让人耳目一新&#xff0c;更是将智能化分析带入了日常的工作场景。以下是它们如何相辅相成并改变我们工作方式的一些分享。 --- 在…...

Kali CentOs 7代理

工具v2↓ kali_IP段v2端口例子<1> kali_IP段v2端口例子<2> CentOs 7 //编辑配置文件 vi /etc/profile//在该配置文件的最后添加代理配置 export http_proxyhttp://ip:port //代理服务器ip地址和端口号 export https_proxyhttp://ip:port //代理服务器ip地址和…...

Zookeeper 的核心引擎:深入解析 ZAB 协议

#作者&#xff1a;张桐瑞 文章目录 前言ZAB 协议算法崩溃恢复选票结构选票筛选消息广播 前言 ZooKeeper 最核心的作用就是保证分布式系统的数据一致性&#xff0c;而无论是处理来自客户端的会话请求时&#xff0c;还是集群 Leader 节点发生重新选举时&#xff0c;都会产生数据…...

L3-001 凑零钱

L3-001 凑零钱 - 团体程序设计天梯赛-练习集 n, m map(int, input().split()) a list(map(int, input().split())) a.sort() f [[] for _ in range(m 1)] f[0] [0] for i in a:for j in range(m, i - 1, -1):if f[j - i]:if not f[j] or f[j] > f[j - i] [i]:f[j] f…...

命名管道(用命名管道模拟server和client之间的通信)

目录 命名管道创建命名管道使用命令行创建命名管道&#xff08;FIFO&#xff09;在程序中创建 命名管道的打开规则用命名管道实现server和client通信 命名管道 bash进程并不会给我们写的两个不同的程序创建通信的管道&#xff0c;即使这两个进程看起来好像都是bash的子进程&am…...

【AI深度学习基础】Pandas完全指南入门篇:数据处理的瑞士军刀 (含完整代码)

&#x1f4da; Pandas 系列文章导航 入门篇 &#x1f331;进阶篇 &#x1f680;终极篇 &#x1f30c; &#x1f4cc; 一、引言 在大数据与 AI 驱动的时代&#xff0c;数据预处理和分析是深度学习与机器学习的基石。Pandas 作为 Python 生态中最强大的数据处理库&#xff0c;以…...

关于opencv中solvepnp中UPNP与DLS与EPNP的参数

The methods SOLVEPNP_DLS and SOLVEPNP_UPNP cannot be used as the current implementations are unstable and sometimes give completely wrong results. If you pass one of these two flags, SOLVEPNP_EPNP method will be used instead.、 由于当前的实现不稳定&#x…...

金融项目实战

测试流程 测试流程 功能测试流程 功能测试流程 需求评审制定测试计划编写测试用例和评审用例执行缺陷管理测试报告 接口测试流程 接口测试流程 需求评审制定测试计划分析api文档编写测试用例搭建测试环境编写脚本执行脚本缺陷管理测试报告 测试步骤 测试步骤 需求评审 需求评…...

大模型小白入门

【课前篇】大模型从0到1指南 【基础篇】大模型的演变与概念 大模型的演变 人工智能&#xff1a;人工智能是一个广泛涉及计算机科学、数据分析、统计学、机器工程、语言学、神 经科学、哲学和心理学等多个学科的领域。 机器学习&#xff1a;机器学习可以分为监督学习&…...

从零到一:快速上手 Poetry——Python 项目管理的利器

在 Python 项目开发中&#xff0c;包管理、依赖管理和虚拟环境的创建一直是开发者们经常面对的难题。传统上&#xff0c;开发者通常会使用 pip、virtualenv 或者 conda 来处理这些问题。然而&#xff0c;随着 Python 项目复杂度的增加&#xff0c;传统工具往往显得力不从心&…...

C++_核心编程_多态案例二-制作饮品

#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为&#xff1a;煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例&#xff0c;提供抽象制作饮品基类&#xff0c;提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器

一.自适应梯度算法Adagrad概述 Adagrad&#xff08;Adaptive Gradient Algorithm&#xff09;是一种自适应学习率的优化算法&#xff0c;由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率&#xff0c;适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...

【第二十一章 SDIO接口(SDIO)】

第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

稳定币的深度剖析与展望

一、引言 在当今数字化浪潮席卷全球的时代&#xff0c;加密货币作为一种新兴的金融现象&#xff0c;正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而&#xff0c;加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下&#xff0c;稳定…...

安卓基础(aar)

重新设置java21的环境&#xff0c;临时设置 $env:JAVA_HOME "D:\Android Studio\jbr" 查看当前环境变量 JAVA_HOME 的值 echo $env:JAVA_HOME 构建ARR文件 ./gradlew :private-lib:assembleRelease 目录是这样的&#xff1a; MyApp/ ├── app/ …...

uniapp手机号一键登录保姆级教程(包含前端和后端)

目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号&#xff08;第三种&#xff09;后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...

iview框架主题色的应用

1.下载 less要使用3.0.0以下的版本 npm install less2.7.3 npm install less-loader4.0.52./src/config/theme.js文件 module.exports {yellow: {theme-color: #FDCE04},blue: {theme-color: #547CE7} }在sass中使用theme配置的颜色主题&#xff0c;无需引入&#xff0c;直接可…...

【深度学习新浪潮】什么是credit assignment problem?

Credit Assignment Problem(信用分配问题) 是机器学习,尤其是强化学习(RL)中的核心挑战之一,指的是如何将最终的奖励或惩罚准确地分配给导致该结果的各个中间动作或决策。在序列决策任务中,智能体执行一系列动作后获得一个最终奖励,但每个动作对最终结果的贡献程度往往…...

【Qt】控件 QWidget

控件 QWidget 一. 控件概述二. QWidget 的核心属性可用状态&#xff1a;enabled几何&#xff1a;geometrywindows frame 窗口框架的影响 窗口标题&#xff1a;windowTitle窗口图标&#xff1a;windowIconqrc 机制 窗口不透明度&#xff1a;windowOpacity光标&#xff1a;cursor…...