当前位置: 首页 > news >正文

SparkStreaming之04:调优

SparkStreaming调优

一 、要点

4.1 SparkStreaming运行原理

在这里插入图片描述

深入理解

在这里插入图片描述

4.2 调优策略

4.2.1 调整BlockReceiver的数量

在这里插入图片描述

案例演示:

object MultiReceiverNetworkWordCount {def main(args: Array[String]) {val sparkConf = new SparkConf().setAppName("NetworkWordCount")val sc = new SparkContext(sparkConf)// Create the context with a 1 second batch sizeval ssc = new StreamingContext(sc, Seconds(5))//创建多个接收器(ReceiverInputDStream),这个接收器接收一台机器上的某个端口通过socket发送过来的数据并处理val lines1 = ssc.socketTextStream("master", 9998, StorageLevel.MEMORY_AND_DISK_SER)val lines2 = ssc.socketTextStream("master", 9997, StorageLevel.MEMORY_AND_DISK_SER)val lines = lines1.union(lines2)lines.repartition(100)//处理的逻辑,就是简单的进行word countval words = lines.repartition(100).flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))//将结果输出到控制台wordCounts.print()//启动Streaming处理流ssc.start()//等待Streaming程序终止ssc.awaitTermination()ssc.stop(false)}
}
⭐️4.2.2 调整Block的数量

batchInterval : 触发批处理的时间间隔
blockInterval :将接收到的数据生成Block的时间间隔,spark.streaming.blockInterval(默认是200ms),那么,BlockRDD的分区数 = batchInterval / blockInterval,即一个Block就是RDD的一个分区,就是一个task
比如,batchInterval是2秒,而blockInterval是200ms,那么task数为10,如果task的数量太少,比一个executor的core数还少的话,那么可以减少blockInterval,blockInterval最好不要小于50ms,太小的话导致task数太多,那么launch task的时间久多了

4.2.3 调整Receiver的接受速率

pps:permits per second 每秒允许接受的数据量(QPS -> queries per second)
Spark Streaming默认的PPS是没有限制的,可以通过参数spark.streaming.receiver.maxRate来控制,默认是Long.Maxvalue

⭐️4.2.3 调整数据处理的并行度

BlockRDD的分区数

a. 通过Receiver接受数据的特点决定

b. 也可以自己通过repartition设置

ShuffleRDD的分区数

a. 默认的分区数为spark.default.parallelism(core的大小)

b. 通过我们自己设置决定

val wordCounts = words.map(x => (x, 1)).reduceByKey((a: Int, b: Int) => a + b, new HashPartitioner(10))
4.2.4 数据的序列化

SparkStreaming两种需要序列化的数据:
a. 输入的数据:默认是以StorageLevel.MEMORY_AND_DISK_SER_2的形式存储在executor上的内存中
b. 缓存的数据:默认是以StorageLevel.MEMORY_ONLY_SER的形式存储的内存中
使用Kryo序列化机制,比Java序列化机制性能好

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
4.2.5 内存调优
(1)需要内存大小

和transformation的类型有关,如果使用的是updateStateByKey,Window这样的算子,那么内存就要设置得偏大

(2)数据存储级别

如果把接收到的数据设置的存储级别是MEMORY_DISK这种级别,也就是说如果内存不够可以把数据存储到磁盘上,其实性能还是不好的,性能最好的就是所有的数据都在内存里面,所以如果在资源允许的情况下,把内存调大一点,让所有的数据都存在内存里面。

4.2.6 Outout性能
(1)MySQL,HBase

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

(2)Kafka(0.8版本)

虽然现在的Kafka的版本已经到2.x版本了,但是很多公司因为历史遗留的原因,公司里面还是会有0.8x的Kafka。比如本人公司里面有两个Kafka集群,一个是0.8x的kafka,一个是1.x的Kafka。开发的时候有时候需要我们使用SparkStreaming做实时的ETL,然后再把数据打回Kafka,0.8版本的kafka默认是没有批量提交的功能的。本人公司里面一个真实的案例,一位同学写的SparkStreaming程序将数据处理完了以后通过ForeachRDD把数据写回到0.8Kafka。但是数据处理得很慢,经常会收到延时告警。最终发现他把数据写到Kafka的时候是一条数据一条数据提交的性能很差。最终手动实现了批量提交的功能。从此再也没有收到过告警。

4.2.7 Backpressure(压力反馈)

在这里插入图片描述
在这里插入图片描述

Feedback Loop : 动态使得Streaming app从unstable状态回到stable状态

在这里插入图片描述

从Spark1.5版本开始:spark.streaming.backpressure.enabled = true

4.2.8 Elastic Scaling(资源动态分配)

动态分配资源:

批处理动态的决定这个application中需要多少个Executors:

  1. 当一个Executor空闲的时候,将这个Executor杀掉
  2. 当task太多的时候,动态的启动Executors

Streaming分配Executor的原则是比对 process time / batchInterval 的比率

在这里插入图片描述

如果延迟了,那么就自动增加资源

在这里插入图片描述

在这里插入图片描述

从Spark2.0有这个功能:: spark.streaming.dynamicAllocation.enabled = true

⭐️4.2.8 数据倾斜调优(重要)

因为SparkStreaming的底层就是RDD,之前SparkCore的所有的数据倾斜的调优策略(见Spark之数据倾斜调优)都适合于SparkStreaming,需要灵活掌握,在实际开发的工作当中用得频率较高。

二 、总结

面试问题:你在工作当中有SparkStreaming调优过项目吗?怎么调优的?效果怎么样?

  1. 比如举foreachRDD的例子
  2. 比如举个数据倾斜的例子
  3. 用Xmind整理调优的策略

相关文章:

SparkStreaming之04:调优

SparkStreaming调优 一 、要点 4.1 SparkStreaming运行原理 深入理解 4.2 调优策略 4.2.1 调整BlockReceiver的数量 案例演示: object MultiReceiverNetworkWordCount {def main(args: Array[String]) {val sparkConf new SparkConf().setAppName("Networ…...

勿以危小而为之勿以避率而不为

《故事汇之:所见/所闻/所历/所想》:《公园散步与小雨遇记》(二) 就差一点到山顶了,路上碰到一阿姨,她说等会儿要下大雨了,让我不要往上走了,我犹豫了一会儿,还是听劝地返…...

JavaWeb后端基础(4)

这一篇就开始是做一个项目了,在项目里学习,我主要记录在学习过程中遇到的问题,以及一些知识点 Restful风格 一种软件架构风格 在REST风格的URL中,通过四种请求方式,来操作数据的增删改查。 GET : 查询 …...

SpringBoot调用DeepSeek

引入依赖 <dependency><groupId>io.github.pig-mesh.ai</groupId><artifactId>deepseek-spring-boot-starter</artifactId><version>1.4.5</version> </dependency>配置 deepseek:api-key: sk-******base-url: https://api.…...

记录一下本地部署Dify的坑

1. 截止2025-3-4为止&#xff0c;请注意&#xff0c;不要直接拉Dify的1.0.0版本。请先试用0.15.3版本。1.0.0有一个bug需要解决。[PANIC]failed to init dify plugin db: failed to connect to hostdb userpostgres databasepostgres Issue #14707 langgenius/dify GitHub …...

LC109. 有序链表转换平衡二叉搜索树

LC109. 有序链表转换平衡二叉搜索树 题目要求(一)快慢指针1. 理解问题2. 解决思路3. 具体步骤4. 代码实现5. 复杂度分析6. 示例解释7. 总结 LC109. 有序链表转换平衡二叉搜索树 题目要求 (一)快慢指针 要将一个按升序排列的单链表转换为平衡的二叉搜索树&#xff08;BST&…...

Hutool一个类型转换工具类 `Convert`,

Hutool 是一个非常实用的Java工具库&#xff0c;旨在简化Java开发中的常见任务。它包含了一个类型转换工具类 Convert&#xff0c;可以帮助开发者轻松地进行各种类型之间的转换。以下是一些使用 Convert 类进行类型转换的例子&#xff1a; 基本类型转换 假设你需要将一个字符…...

基于eRDMA实测DeepSeek开源的3FS

DeepSeek昨天开源了3FS分布式文件系统, 通过180个存储节点提供了 6.6TiB/s的存储性能, 全面支持大模型的训练和推理的KVCache转存以及向量数据库等能力, 每个客户端节点支持40GB/s峰值吞吐用于KVCache查找. 发布后, 我们在阿里云ECS上进行了快速的复现, 并进行了性能测试, ECS…...

【Linux篇】第一个系统程序 - 进度条

文章目录 1.回车与换行2.行缓冲区3.倒计时程序4.进度条 1.回车与换行 回车的概念: 回到当前行的最开始 \r换行的概念: 换到当前行的下一行\n 2.行缓冲区 当我们运行下面这段程序时&#xff0c;我们会发现屏幕上首先会打印出hello world!,再过两秒后程序结束。 当我们把\n去掉…...

VLM-E2E:通过多模态驾驶员注意融合增强端到端自动驾驶

25年2月来自香港科大广州分校、理想汽车和厦门大学的论文“VLM-E2E: Enhancing End-to-End Autonomous Driving with Multimodal Driver Attention Fusion”。 人类驾驶员能够利用丰富的注意语义&#xff0c;熟练地应对复杂场景&#xff0c;但当前的自动驾驶系统难以复制这种能…...

如何将飞书多维表格与DeepSeek R1结合使用:效率提升的完美搭档

将飞书的多维表格与DeepSeek R1结合使用&#xff0c;就像为你的数据管理和分析之旅装上一台涡轮增压器。两者的合作&#xff0c;不仅仅在速度上让人耳目一新&#xff0c;更是将智能化分析带入了日常的工作场景。以下是它们如何相辅相成并改变我们工作方式的一些分享。 --- 在…...

Kali CentOs 7代理

工具v2↓ kali_IP段v2端口例子<1> kali_IP段v2端口例子<2> CentOs 7 //编辑配置文件 vi /etc/profile//在该配置文件的最后添加代理配置 export http_proxyhttp://ip:port //代理服务器ip地址和端口号 export https_proxyhttp://ip:port //代理服务器ip地址和…...

Zookeeper 的核心引擎:深入解析 ZAB 协议

#作者&#xff1a;张桐瑞 文章目录 前言ZAB 协议算法崩溃恢复选票结构选票筛选消息广播 前言 ZooKeeper 最核心的作用就是保证分布式系统的数据一致性&#xff0c;而无论是处理来自客户端的会话请求时&#xff0c;还是集群 Leader 节点发生重新选举时&#xff0c;都会产生数据…...

L3-001 凑零钱

L3-001 凑零钱 - 团体程序设计天梯赛-练习集 n, m map(int, input().split()) a list(map(int, input().split())) a.sort() f [[] for _ in range(m 1)] f[0] [0] for i in a:for j in range(m, i - 1, -1):if f[j - i]:if not f[j] or f[j] > f[j - i] [i]:f[j] f…...

命名管道(用命名管道模拟server和client之间的通信)

目录 命名管道创建命名管道使用命令行创建命名管道&#xff08;FIFO&#xff09;在程序中创建 命名管道的打开规则用命名管道实现server和client通信 命名管道 bash进程并不会给我们写的两个不同的程序创建通信的管道&#xff0c;即使这两个进程看起来好像都是bash的子进程&am…...

【AI深度学习基础】Pandas完全指南入门篇:数据处理的瑞士军刀 (含完整代码)

&#x1f4da; Pandas 系列文章导航 入门篇 &#x1f331;进阶篇 &#x1f680;终极篇 &#x1f30c; &#x1f4cc; 一、引言 在大数据与 AI 驱动的时代&#xff0c;数据预处理和分析是深度学习与机器学习的基石。Pandas 作为 Python 生态中最强大的数据处理库&#xff0c;以…...

关于opencv中solvepnp中UPNP与DLS与EPNP的参数

The methods SOLVEPNP_DLS and SOLVEPNP_UPNP cannot be used as the current implementations are unstable and sometimes give completely wrong results. If you pass one of these two flags, SOLVEPNP_EPNP method will be used instead.、 由于当前的实现不稳定&#x…...

金融项目实战

测试流程 测试流程 功能测试流程 功能测试流程 需求评审制定测试计划编写测试用例和评审用例执行缺陷管理测试报告 接口测试流程 接口测试流程 需求评审制定测试计划分析api文档编写测试用例搭建测试环境编写脚本执行脚本缺陷管理测试报告 测试步骤 测试步骤 需求评审 需求评…...

大模型小白入门

【课前篇】大模型从0到1指南 【基础篇】大模型的演变与概念 大模型的演变 人工智能&#xff1a;人工智能是一个广泛涉及计算机科学、数据分析、统计学、机器工程、语言学、神 经科学、哲学和心理学等多个学科的领域。 机器学习&#xff1a;机器学习可以分为监督学习&…...

从零到一:快速上手 Poetry——Python 项目管理的利器

在 Python 项目开发中&#xff0c;包管理、依赖管理和虚拟环境的创建一直是开发者们经常面对的难题。传统上&#xff0c;开发者通常会使用 pip、virtualenv 或者 conda 来处理这些问题。然而&#xff0c;随着 Python 项目复杂度的增加&#xff0c;传统工具往往显得力不从心&…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验

一、多模态商品数据接口的技术架构 &#xff08;一&#xff09;多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如&#xff0c;当用户上传一张“蓝色连衣裙”的图片时&#xff0c;接口可自动提取图像中的颜色&#xff08;RGB值&…...

什么是库存周转?如何用进销存系统提高库存周转率?

你可能听说过这样一句话&#xff1a; “利润不是赚出来的&#xff0c;是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业&#xff0c;很多企业看着销售不错&#xff0c;账上却没钱、利润也不见了&#xff0c;一翻库存才发现&#xff1a; 一堆卖不动的旧货…...

postgresql|数据库|只读用户的创建和删除(备忘)

CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...

cf2117E

原题链接&#xff1a;https://codeforces.com/contest/2117/problem/E 题目背景&#xff1a; 给定两个数组a,b&#xff0c;可以执行多次以下操作&#xff1a;选择 i (1 < i < n - 1)&#xff0c;并设置 或&#xff0c;也可以在执行上述操作前执行一次删除任意 和 。求…...

python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

Python 包管理器 uv 介绍

Python 包管理器 uv 全面介绍 uv 是由 Astral&#xff08;热门工具 Ruff 的开发者&#xff09;推出的下一代高性能 Python 包管理器和构建工具&#xff0c;用 Rust 编写。它旨在解决传统工具&#xff08;如 pip、virtualenv、pip-tools&#xff09;的性能瓶颈&#xff0c;同时…...

C#中的CLR属性、依赖属性与附加属性

CLR属性的主要特征 封装性&#xff1a; 隐藏字段的实现细节 提供对字段的受控访问 访问控制&#xff1a; 可单独设置get/set访问器的可见性 可创建只读或只写属性 计算属性&#xff1a; 可以在getter中执行计算逻辑 不需要直接对应一个字段 验证逻辑&#xff1a; 可以…...

MySQL JOIN 表过多的优化思路

当 MySQL 查询涉及大量表 JOIN 时&#xff0c;性能会显著下降。以下是优化思路和简易实现方法&#xff1a; 一、核心优化思路 减少 JOIN 数量 数据冗余&#xff1a;添加必要的冗余字段&#xff08;如订单表直接存储用户名&#xff09;合并表&#xff1a;将频繁关联的小表合并成…...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制

目录 节点的功能承载层&#xff08;GATT/Adv&#xff09;局限性&#xff1a; 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能&#xff0c;如 Configuration …...