当前位置: 首页 > article >正文

Spark Streaming的背压机制的原理与实现代码及分析

Spark Streaming的背压机制是一种根据JobScheduler反馈的作业执行信息来动态调整Receiver数据接收率的机制。

在Spark 1.5.0及以上版本中,可以通过设置spark.streaming.backpressure.enabled为true来启用背压机制。当启用背压机制时,Spark Streaming会自动根据系统的处理能力来调整数据的输入速率,从而在流量高峰时保证最大的吞吐量和性能。

背压机制中涉及的关键组件包括RateController和RateEstimator。RateController负责监听作业的执行情况,并从BatchInfo实例中获取相关信息交给RateEstimator进行速率估算。RateEstimator则根据收集到的数据和设定值进行比较,估算出一个合适的用于下一批次的流量阈值。这个阈值用于更新每秒能够处理的最大记录数,从而实现对数据输入速率的动态调整。

需要注意的是,在背压机制真正起作用之前,应至少保证处理一个批次的数据,以便根据当前批次的速率预估新批次的速率。同时,为了控制每个批次的最大摄入速率,可以通过设置相关参数(如spark.streaming.kafka.maxRatePerPartition对于Kafka Direct Stream)来限制每秒每个分区最大摄入的数据条数。

以下是基于Spark Streaming背压机制的测试代码及分析:

测试PySpark代码示例

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
import time
import logging# 配置SparkSession
spark = SparkSession.builder \.appName("BackpressureDemo") \.config("spark.streaming.backpressure.enabled", "true") \.config("spark.streaming.backpressure.initialRate", "100") \.config("spark.streaming.receiver.maxRate", "1000") \.getOrCreate()sc = spark.sparkContext
ssc = StreamingContext(sc, batchDuration=1)  # 1秒批次间隔
logging.basicConfig(level=logging.INFO)# 模拟数据源(建议使用生产环境数据源如Kafka)
lines = ssc.socketTextStream("localhost", 9999)# 简单处理逻辑(添加人工延迟模拟处理压力)
def process_batch(rdd):start_time = time.time()if not rdd.isEmpty():# 模拟处理延迟processed = rdd.flatMap(lambda line: line.split()) \.map(lambda word: (word, 1)) \.reduceByKey(lambda a, b: a + b)processed.count()duration = time.time() - start_timelogging.info(f"Batch processed in {duration:.2f}s")lines.foreachRDD(process_batch)# 启动流处理
ssc.start()# 运行120秒(包含初始阶段和背压阶段)
time.sleep(120)
ssc.stop(stopSparkContext=True, stopGraceFully=True)

测试执行步骤

  1. 启动数据生成器(另启终端执行):
nc -lk 9999 | while true; do sleep 0.1; echo "test data $(date)"; done
  1. 观察日志中以下关键指标:
INFO: Batch processed in 0.85s
INFO: Current rate: 850 records/s
INFO: New rate estimated: 800 records/s

背压机制原理分析(结合测试结果)

  1. 初始化阶段
  • 初始速率由spark.streaming.backpressure.initialRate控制(测试设置为100条/秒)
  • 前几个批次显示处理时间逐渐增加:
Batch 1 processed in 0.2s
Batch 2 processed in 0.5s
Batch 3 processed in 0.8s
  1. 速率调整阶段
  • RateController通过PID算法(默认)动态调整速率
  • 当处理时间接近批次间隔时触发降速:
[Batch 4] Processing time: 0.95s → New rate: 700 records/s
[Batch 5] Processing time: 0.92s → New rate: 750 records/s
  1. 稳定阶段
  • 系统找到可持续处理速率(示例稳定在800-850条/秒)
  • 处理时间保持在批次间隔的50-70%范围:
Batch processed in 0.65s (Target: 1s batch)
Current rate: 820 records/s

关键参数对照表

参数名称测试值作用说明
spark.streaming.backpressure.enabledtrue背压机制总开关
spark.streaming.backpressure.initialRate100初始接收速率(条/秒)
spark.streaming.receiver.maxRate1000接收器最大速率限制
batchDuration1s微批处理时间窗口

背压机制核心流程

  1. 监控阶段
  • JobScheduler收集BatchInfo包含:
    • 调度延迟(schedulingDelay)
    • 处理时间(processingDelay)
    • 记录总数(numRecords)
  1. 速率估算
  • 使用PID控制器计算新速率:
    newRate = oldRate * (1 / (processingDelay / batchDuration))
    
  • 加入积分项(历史误差)和微分项(误差变化率)
  1. 动态调整
  • 限制调整幅度不超过±15%(默认)
  • 最终速率不超过spark.streaming.receiver.maxRate

性能优化建议

  1. 初始值设置
.config("spark.streaming.backpressure.initialRate", "实际TPS的50%")
  1. 高级调参
.config("spark.streaming.backpressure.pid.proportional", "0.5")  # 比例系数
.config("spark.streaming.backpressure.pid.integral", "0.1")      # 积分系数
.config("spark.streaming.backpressure.pid.derived", "0.2")       # 微分系数
  1. 监控指标
# 通过Spark UI观察:
Streaming Statistics → Avg Input Rate / Avg Processing Time / Total Delay

结论分析

测试数据显示背压机制有效时的特征:

  • 处理时间稳定在批次间隔的60-80%
  • 输入速率呈现阶梯式调整(示例从100 → 700 → 800)
  • 系统延迟(Total Delay)保持在秒级以下

当关闭背压时(设置spark.streaming.backpressure.enabled=false):

  • 处理时间逐渐超过批次间隔
  • 调度延迟持续增长
  • 最终可能导致Executor OOM

该机制通过动态平衡输入速率与处理能力,有效防止了流处理系统的级联故障(cascading failure),是Spark Streaming实现稳定低延迟处理的关键设计。

相关文章:

Spark Streaming的背压机制的原理与实现代码及分析

Spark Streaming的背压机制是一种根据JobScheduler反馈的作业执行信息来动态调整Receiver数据接收率的机制。 在Spark 1.5.0及以上版本中,可以通过设置spark.streaming.backpressure.enabled为true来启用背压机制。当启用背压机制时,Spark Streaming会自…...

刷题记录 贪心算法-2:455. 分发饼干

题目:455. 分发饼干 难度:简单 假设你是一位很棒的家长,想要给你的孩子们一些小饼干。但是,每个孩子最多只能给一块饼干。 对每个孩子 i,都有一个胃口值 g[i],这是能让孩子们满足胃口的饼干的最小尺寸&a…...

360大数据面试题及参考答案

数据清理有哪些方法? 数据清理是指发现并纠正数据文件中可识别的错误,包括检查数据一致性,处理无效值和缺失值等。常见的数据清理方法有以下几种: 去重处理:数据中可能存在重复的记录,这不仅会占用存储空间,还可能影响分析结果。通过对比每条记录的关键属性,若所有关键…...

【大模型】Ollama+AnythingLLM搭建RAG大模型私有知识库

文章目录 一、AnythingLLM简介二、搭建本地智能知识库2.1 安装Ollama2.2 安装AnythingLLM 参考资料 一、AnythingLLM简介 AnythingLLM是由Mintplex Labs Inc.开发的一个全栈应用程序,是一款高效、可定制、开源的企业级文档聊天机器人解决方案。AnythingLLM能够将任…...

深入MapReduce——从MRv1到Yarn

引入 我们前面篇章有提到,和MapReduce的论文不太一样。在Hadoop1.0实现里,每一个MapReduce的任务并没有一个独立的master进程,而是直接让调度系统承担了所有的worker 的master 的角色,这就是Hadoop1.0里的 JobTracker。在Hadoop1…...

arkui-x 前端布局编码模板

build() {Column() {Row() {// 上侧页面布局实现}// 下侧页面布局实现}.width(Const.THOUSANDTH_1000).height(Const.THOUSANDTH_1000).justifyContent(FlexAlign.SpaceBetween).backgroundImage($r(app.media.background_xxx)).backgroundImageSize(ImageSize.Cover).backgrou…...

代理模式 -- 学习笔记

代理模式学习笔记 什么是代理? 代理是一种设计模式,用户可以通过代理操作,而真正去进行处理的是我们的目标对象,代理可以在方法增强(如:记录日志,添加事务,监控等) 拿一…...

sem_init的概念和使用案例

sem_init 是 POSIX 线程库中用于初始化未命名信号量&#xff08;unnamed semaphore&#xff09;的函数&#xff0c;常用于多线程或多进程间的同步。以下是其概念和使用案例的详细说明&#xff1a; 概念 函数原型&#xff1a; #include <semaphore.h>int sem_init(sem_t …...

JVM_类的加载、链接、初始化、卸载、主动使用、被动使用

①. 说说类加载分几步&#xff1f; ①. 按照Java虚拟机规范,从class文件到加载到内存中的类,到类卸载出内存为止,它的整个生命周期包括如下7个阶段: 第一过程的加载(loading)也称为装载验证、准备、解析3个部分统称为链接(Linking)在Java中数据类型分为基本数据类型和引用数据…...

ProfibusDP主机与从机交互

ProfibusDP 主机SD2索要数据下发&#xff1a;68 08 F7 68 01 02 03 21 05 06 07 08 1C 1668&#xff1a;SD2 08&#xff1a;LE F7&#xff1a;LEr 68&#xff1a;SD2 01:目的地址 02&#xff1a;源地址 03:FC_CYCLIC_DATA_EXCHANGE功能码 21&#xff1a;数据地址 05,06,07,08&a…...

Java设计模式:结构型模式→组合模式

Java 组合模式详解 1. 定义 组合模式&#xff08;Composite Pattern&#xff09;是一种结构型设计模式&#xff0c;它允许将对象组合成树形结构以表示“部分-整体”的层次。组合模式使得客户端能够以统一的方式对待单个对象和对象集合的一致性&#xff0c;有助于处理树形结构…...

【福州市AOI小区面】shp数据学校大厦商场等占地范围面数据内容测评

AOI城区小区面样图和数据范围查看&#xff1a; — 字段里面有name字段。分类比较多tpye&#xff1a;每个值代表一个类型。比如字段type中1549代表小区住宅&#xff0c;1563代表学校。小区、学校等占地面积范围数据 —— 小区范围占地面积面数据shp格式 无偏移坐标&#xff0c;只…...

【Python实现机器遗忘算法】复现2023年TNNLS期刊算法UNSIR

【Python实现机器遗忘算法】复现2023年TNNLS期刊算法UNSIR 1 算法原理 Tarun A K, Chundawat V S, Mandal M, et al. Fast yet effective machine unlearning[J]. IEEE Transactions on Neural Networks and Learning Systems, 2023. 本文提出了一种名为 UNSIR&#xff08;Un…...

基于SpringBoot的阳光幼儿园管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…...

【逻辑学导论第15版】A. 推理

识别下列语段中的前提与结论。有些前提确实支持结论&#xff0c;有些并不支持。请注意&#xff0c;前提可能直接或间接地支持结论&#xff0c;而简单的语段也可能包含不止一个论证。 例题&#xff1a; 1.管理得当的民兵组织对于一个自由国家的安全是必需的&#xff0c;因而人民…...

【开源免费】基于SpringBoot+Vue.JS景区民宿预约系统(JAVA毕业设计)

本文项目编号 T 162 &#xff0c;文末自助获取源码 \color{red}{T162&#xff0c;文末自助获取源码} T162&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…...

c++ map/multimap容器 学习笔记

1 map的基本概念 简介&#xff1a; map中所有的元素都是pair pair中第一个元素是key&#xff08;键&#xff09;&#xff0c;第二个元素是value&#xff08;值&#xff09; 所有元素都会根据元素的键值自动排序。本质&#xff1a; map/multimap 属于关联式容器&#xff0c;底…...

安卓逆向之脱壳-认识一下动态加载 双亲委派(一)

安卓逆向和脱壳是安全研究、漏洞挖掘、恶意软件分析等领域的重要环节。脱壳&#xff08;unpacking&#xff09;指的是去除应用程序中加固或保护措施的过程&#xff0c;使得可以访问应用程序的原始代码或者数据。脱壳的重要性&#xff1a; 分析恶意软件&#xff1a;很多恶意软件…...

64位的谷歌浏览器Chrome/Google Chrome

64位的谷歌浏览器Chrome/Google Chrome 在百度搜索关键字:chrome&#xff0c;即可下载官方的“谷歌浏览器Chrome/Google Chrome”&#xff0c;但它可能是32位的&#xff08;切记注意网址&#xff1a;https://www.google.cn/....&#xff0c; 即&#xff1a;google.cn&#xff…...

马尔科夫模型和隐马尔科夫模型区别

我用一个天气预报和海藻湿度观测的比喻来解释&#xff0c;保证你秒懂&#xff01; 1. 马尔可夫模型&#xff08;Markov Model, MM&#xff09; 特点&#xff1a;状态直接可见 场景&#xff1a;天气预报&#xff08;晴天→雨天→阴天…&#xff09;核心假设&#xff1a; 下一个…...

Python NumPy(7):连接数组、分割数组、数组元素的添加与删除

1 连接数组 函数描述concatenate连接沿现有轴的数组序列stack沿着新的轴加入一系列数组。hstack水平堆叠序列中的数组&#xff08;列方向&#xff09;vstack竖直堆叠序列中的数组&#xff08;行方向&#xff09; 1.1 numpy.concatenate numpy.concatenate 函数用于沿指定轴连…...

【LLM】deepseek多模态之Janus-Pro和JanusFlow框架

note 文章目录 note一、Janus-Pro&#xff1a;解耦视觉编码&#xff0c;实现多模态高效统一技术亮点模型细节 二、JanusFlow&#xff1a;融合生成流与语言模型&#xff0c;重新定义多模态技术亮点模型细节 Reference 一、Janus-Pro&#xff1a;解耦视觉编码&#xff0c;实现多模…...

2000-2021年 全国各地级市专利申请与获得情况、绿色专利申请与获得情况数据

2000-2021年 全国各地级市专利申请与获得情况、绿色专利申请与获得情况数据.ziphttps://download.csdn.net/download/2401_84585615/89575931 https://download.csdn.net/download/2401_84585615/89575931 2000至2021年&#xff0c;全国各地级市的专利申请与获得情况呈现出显著…...

51单片机(STC89C52)开发:点亮一个小灯

软件安装&#xff1a; 安装开发板CH340驱动。 安装KEILC51开发软件&#xff1a;C51V901.exe。 下载软件&#xff1a;PZ-ISP.exe 创建项目&#xff1a; 新建main.c 将main.c加入至项目中&#xff1a; main.c:点亮一个小灯 #include "reg52.h"sbit LED1P2^0; //P2的…...

数仓ETL测试

提取&#xff0c;转换和加载有助于组织使数据在不同的数据系统中可访问&#xff0c;有意义且可用。ETL工具是用于提取&#xff0c;转换和加载数据的软件。在当今数据驱动的世界中&#xff0c;无论大小如何&#xff0c;都会从各种组织&#xff0c;机器和小工具中生成大量数据。 …...

240. 搜索二维矩阵||

参考题解&#xff1a;https://leetcode.cn/problems/search-a-2d-matrix-ii/solutions/2361487/240-sou-suo-er-wei-ju-zhen-iitan-xin-qin-7mtf 将矩阵旋转45度&#xff0c;可以看作一个二叉搜索树。 假设以左下角元素为根结点&#xff0c; 当target比root大的时候&#xff…...

反向代理模块b

1 概念 1.1 反向代理概念 反向代理是指以代理服务器来接收客户端的请求&#xff0c;然后将请求转发给内部网络上的服务器&#xff0c;将从服务器上得到的结果返回给客户端&#xff0c;此时代理服务器对外表现为一个反向代理服务器。 对于客户端来说&#xff0c;反向代理就相当于…...

Kafka的内部通信协议

引言 kafka内部用到的常见协议和优缺点可以看看原文 Kafka用到的协议 本文奖详细探究kafka核心通信协议和高性能的关键 网络层通信的实现 基于 Java NIO&#xff1a;Kafka 的网络通信层主要基于 Java NIO 来实现&#xff0c;这使得它能够高效地处理大量的连接和数据传输。…...

Excel - Binary和Text两种Compare方法

Option Compare statement VBA里可以定义默认使用的compare方法&#xff1a; Set the string comparison method to Binary. Option Compare Binary That is, "AAA" is less than "aaa". Set the string comparison method to Text. Option Compare Tex…...

【Linux权限】—— 于虚拟殿堂,轻拨密钥启华章

欢迎来到ZyyOvO的博客✨&#xff0c;一个关于探索技术的角落&#xff0c;记录学习的点滴&#x1f4d6;&#xff0c;分享实用的技巧&#x1f6e0;️&#xff0c;偶尔还有一些奇思妙想&#x1f4a1; 本文由ZyyOvO原创✍️&#xff0c;感谢支持❤️&#xff01;请尊重原创&#x1…...