spark支持深度学习批量推理
背景
在数据量较大的业务场景中,spark在数据处理、传统机器学习训练、
深度学习相关业务,能取得较明显的效率提升。
本篇围绕spark大数据背景下的推理,介绍一些优雅的使用方式。
spark适用场景
- 大数据量自定义方法处理、类sql处理
- 传统机器学习方法(k-means、xgboost、lr…)
- 分布式深度学习推理

目前在10亿+数据量的推理场景中使用,需要用户自己实现批数据准备,基于RDD的方法完成模型推理输出。
业务使用中的问题:
- 模型文件重复导入加载
- 自定义批数据准备,脱离深度学习dataloader框架,操作略显麻烦,有性能和内存oom等问题。
实践
spark加速深度学习推理
spark加速深度学习推理,基本思路为:
- 开启不定量worker并行执行(cpu或gpu)推理任务
- 所有worker共享同一份模型参数
- 依赖spark pandas udf功能,方便并行处理 dataframe 数据
- 依赖深度学习框架,方便实现最优批数据划分
下面以pytorch resnet 为实践demo
加载&&广播模型参数
广播模型参数,不仅能减少模型重复加载带来的流量和io,而且能加速推理前模型加载的速度。
driver广播模型参数:
# Load ResNet50 on driver node and broadcast its state.
model_state = models.resnet50(pretrained=True).state_dict()
bc_model_state = sc.broadcast(model_state)
worker读取模型参数:
def get_model_for_eval():"""Gets the broadcasted model."""model = models.resnet50(pretrained=True)model.load_state_dict(bc_model_state.value)model.eval()return model
实现基于dataframe的dataset
目前主流的深度学习框架,dataset的实现大多基于本地存储,在读取分布式存储的场景 需要用户自定义实现。
自定义实现有2个方法:
- 使用分布式存储的api接口读取文件内容
- dataset读取dataframe二进制文件内容
方法一迭代与使用的存储类型会保持同步,且每次使用前需要明确使用的分布式存储,虽然实现方法容易但是使用流程略显麻烦。
方法二不需要关心分布式存储类型,只要需要获取并解析spark dataframe列传入内容即可。
本文采用方法二实现dataset:
# 从二进制流中解析图片信息
def pil_loader(binary_file):# open path as file to avoid ResourceWarning (https://github.com/python-pillow/Pillow/issues/835)image_io = io.BytesIO(binary_file)img = Image.open(image_io)return img.convert('RGB')# Create a custom PyTorch dataset class.
class ImageDataset(Dataset):def __init__(self, data, transform=None):self.data = dataself.transform = transformdef __len__(self):return len(self.data)def __getitem__(self, index):image = pil_loader(self.data[index])if self.transform is not None:image = self.transform(image)return image
实现批量推理的pandas udf
Pandas udf是基于RDD的一个低门槛高性能的实现方法,pandas udf能自定义处理逻辑,以列的方式操作datafrme内容。
这是社区目前推荐的自定义处理方式。
# Define the function for model inference.
# PyArrow >= 1.0.0 must be installed;
@pandas_udf(ArrayType(FloatType()))
def predict_batch_udf(binaray_data: pd.Series) -> pd.Series:transform = transforms.Compose([transforms.Resize(224),transforms.CenterCrop(224),transforms.ToTensor(),transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])])images = ImageDataset(binaray_data, transform=transform)loader = torch.utils.data.DataLoader(images, batch_size=500, num_workers=8)model = get_model_for_eval()model.to(device)all_predictions = []with torch.no_grad():for batch in loader:predictions = list(model(batch.to(device)).cpu().numpy())for prediction in predictions:all_predictions.append(prediction)return pd.Series(all_predictions)
# 调用pandas udf
predictions_df = df. \select(col("filename"), predict_batch_udf(col("data")).alias("prediction"))
更多代码细节:
https://github.com/Crazybean-lwb/deeplearning-pyspark/blob/master/examples/pytorch-inference.py
模型仓加速推理
打通到模型仓mlflow功能:
- 模型存储和版本管理
- 便捷取用
- 适用spark datarame更高阶的pandas udf实现

# Create the PySpark UDF
import mlflow.pyfunc
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)# 调用pandas udf
df = spark_df.withColumn("prediction", pyfunc_udf(struct([...])))
参考信息:
- pytorch分布式批量推理
- tensorflow分布式批量推理
- 模型仓mlflow协助分布式批量推理
相关文章:
spark支持深度学习批量推理
背景 在数据量较大的业务场景中,spark在数据处理、传统机器学习训练、 深度学习相关业务,能取得较明显的效率提升。 本篇围绕spark大数据背景下的推理,介绍一些优雅的使用方式。 spark适用场景 大数据量自定义方法处理、类sql处理传统机器…...
代码随想录打卡—day52—【子序列问题】— 8.31 最大子序列
共性 做完下面三题,发现三个的dp数组中i都是以 i 为结束的字串。 1 300. 最长递增子序列 300. 最长递增子序列 AC: class Solution { public:int dp[10010]; // 表示以i结束的子序列最大的长度/*if(nums[j] > nums[i])dp[j] max(dp[j],dp[i] …...
gcc4.8.5升级到gcc4.9.2
第1步:获取repo [rootlocalhost SPECS]# wget --no-check-certificate https://copr.fedoraproject.org/coprs/rhscl/devtoolset-3/repo/epel-6/rhscl-devtoolset-3-epel-6.repo -O /etc/yum.repos.d/devtoolset-3.repo --2021-12-07 20:53:26-- https://copr.fedo…...
Golang 中的 archive/zip 包详解(三):常用函数
Golang 中的 archive/zip 包用于处理 ZIP 格式的压缩文件,提供了一系列用于创建、读取和解压缩 ZIP 格式文件的函数和类型,使用起来非常方便,本文讲解下常用函数。 zip.OpenReader 定义如下: func OpenReader(name string) (*R…...
微服务架构七种模式
微服务架构七种模式 目录概述需求: 设计思路实现思路分析 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy,skip hardness,make a better result,wait for change,challenge Survive.…...
关于CICD流水线的前端项目运行错误,npm项目环境配置时出现报错:Not Found - GET https://registry.npm...
关于CICD流水线的前端项目运行错误,npm项目环境配置时出现报错:Not Found - GET https://registry.npm… 原因应该是某些jar包缓存中没有需要改变镜像将包拉下来 npm config set registry http://registry.npm.taobao.org npm install npm run build...
element-plus的周选择器 一周从周一开始
1、代码 1)、template中 <el-date-picker v-model"value1" type"week" format"[Week] ww" placeholder"巡访周" change"change"value-format"YYYY-MM-DD" /> 2)、方法中 import…...
Android 9.0 pms获取应用列表时过滤掉某些app功能实现
1.前言 在9.0的系统rom定制化开发中,对系统定制的功能也是很多的,在一次产品开发中,要求在第三方app获取应用列表的时候,需要过滤掉某些app,就是不显示在app应用列表中,这就需要在pms查询app列表时过滤掉这些app就可以了,接下来就实现这些功能 2.pms获取应用列表时过滤掉…...
HTML <thead> 标签
实例 带有 thead、tbody 以及 tfoot 元素的 HTML 表格: <table border="1"><thead><tr><th>Month</th><th>Savings</th></tr></thead><tfoot><tr><td>Sum</td><td>$180<…...
谷歌发布Gemini以5倍速击败GPT-4
在Covid疫情爆发之前,谷歌发布了MEENA模型,短时间内成为世界上最好的大型语言模型。谷歌发布的博客和论文非常可爱,因为它特别与OpenAI进行了比较。 相比于现有的最先进生成模型OpenAI GPT-2,MEENA的模型容量增加了1.7倍…...
力扣92. 局部反转链表
92. 反转链表 II 给你单链表的头指针 head 和两个整数 left 和 right ,其中 left < right 。请你反转从位置 left 到位置 right 的链表节点,返回 反转后的链表 。 示例 1: 输入:head [1,2,3,4,5], left 2, right 4 输出&am…...
九、适配器模式
一、什么是适配器模式 适配器模式(Adapter)的定义如下:将一个类的接口转换成客户希望的另外一个接口,使得原本由于接口不兼容而不能一起工作的那些类能一起工作。 适配器模式(Adapter)包含以下主要角色&…...
使用spring自带的发布订阅来实现发布订阅
背景 公司的项目以前代码里面有存在使用spring自带发布订阅的代码,因此稍微学习一下如何使用,并了解一下这种实现方式的优缺点。 优点 实现方便,代码方面基本只需要定义消息体和消费者,适用于小型应用程序。不依赖外部中间件&a…...
Walmart电商促销活动即将开始,如何做促销活动?需要注意什么?
近日,沃尔玛官宣Baby Days优惠活动将于9月1日正式开始!卖家可以把握机会,通过设置促销定价,以最优惠的婴儿相关产品价格吸引消费者,包括汽车座椅、婴儿车、尿布袋、家具、床上用品、消耗品、婴儿服装、孕妇装等。注意本…...
Matlab(画图进阶)
目录 大纲 1.特殊的Plots 1.1 loglog(双对数刻度图) 1.3 plotyy(创建具有两个y轴的图形) 1.4yyaxis(创建具有两个y轴的图) 1.5 bar 3D条形图(bar3) 1.6 pie(饼图) 3D饼图 1.7 polar 2.Stairs And Ste阶梯图 3.Boxplot 箱型图和Error Bar误差条形图 3.1 boxplot 3.2 …...
人生的回忆
回忆是人类宝贵的精神财富,它们像一串串珍珠,串联起我们生活中的每一个片段。 回忆是时间的见证者,它们承载着我们成长、经历、悲欢离合的点点滴滴。 回忆让我们重温过去的欢笑与眼泪,感受那些已经逝去的时光。它们就像一本翻开的…...
Spring之依赖注入源码解析
Spring之依赖注入源码解析 Spring依赖注入的方式 手动注入 在XML中定义Bean时,即为手动注入,因为是程序员手动给某个属性指定了值。 通过set方式进行注入 <bean name"userService" class"com.luban.service.UserService">…...
5G NR:RACH流程-- Msg1之生成PRACH Preamble
随机接入流程中的Msg1,即在PRACH信道上发送random access preamble。涉及到两个问题: 一个是如何产生preamble?一个是如何选择正确的PRACH时频资源发送所选的preamble? 一、PRACH Preamble是什么 PRACH Preamble从数学上来讲是一个长度为…...
高基数类别特征预处理:平均数编码 | 京东云技术团队
一 前言 对于一个类别特征,如果这个特征的取值非常多,则称它为高基数(high-cardinality)类别特征。在深度学习场景中,对于类别特征我们一般采用Embedding的方式,通过预训练或直接训练的方式将类别特征值编…...
高效利用隧道代理实现无阻塞数据采集
在当今信息时代,大量的有价值数据分散于各个网站和平台。然而,许多网站对爬虫程序进行限制或封禁,使得传统方式下的数据采集变得困难重重。本文将向您介绍如何通过使用隧道代理来解决这一问题,并帮助您成为一名高效、顺畅的数据采…...
(十)学生端搭建
本次旨在将之前的已完成的部分功能进行拼装到学生端,同时完善学生端的构建。本次工作主要包括: 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...
从WWDC看苹果产品发展的规律
WWDC 是苹果公司一年一度面向全球开发者的盛会,其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具,对过去十年 WWDC 主题演讲内容进行了系统化分析,形成了这份…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践
6月5日,2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席,并作《智能体在安全领域的应用实践》主题演讲,分享了在智能体在安全领域的突破性实践。他指出,百度通过将安全能力…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...
技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...
云原生安全实战:API网关Kong的鉴权与限流详解
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关(API Gateway) API网关是微服务架构中的核心组件,负责统一管理所有API的流量入口。它像一座…...
作为测试我们应该关注redis哪些方面
1、功能测试 数据结构操作:验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化:测试aof和aof持久化机制,确保数据在开启后正确恢复。 事务:检查事务的原子性和回滚机制。 发布订阅:确保消息正确传递。 2、性…...
Vite中定义@软链接
在webpack中可以直接通过符号表示src路径,但是vite中默认不可以。 如何实现: vite中提供了resolve.alias:通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...
Vue ③-生命周期 || 脚手架
生命周期 思考:什么时候可以发送初始化渲染请求?(越早越好) 什么时候可以开始操作dom?(至少dom得渲染出来) Vue生命周期: 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...
