当前位置: 首页 > news >正文

如何在Spark中使用gbdt模型分布式预测

这目录

  • 1 训练gbdt模型
  • 2 第三方包python环境打包
  • 3 Spark中使用gbdt模型
    • 3.1 spark配置文件
    • 3.2 主函数main.py
  • 4 spark任务提交

1 训练gbdt模型

我们可以基于lightgbm快速的训练一个gbdt模型,训练相对比较简单,只要把训练样本处理好,几行代码可以快速训练好模型,如下是训练一个多分类模型训练核心代码如下:

import lightgbm as lgb
import joblib
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
#假设处理好的训练样本为train.csv
df = pd.read_csv('./train.csv')
X = pd.drop(['label'],axis=1)
Y = df.label
# split data for val
x_train,x_val,y_train,y_val = train_test_split(X,Y,test_size=0.2,random_state=123)
# train model
cate_features=['sex','brand']
train_data = train_data = lgb.Dataset(x_train,label=y_train,categoryical_featrues=cate_features)
params = {'objective':'multiclass','learning_rate':0.1,'n_estimators':100,'num_class':23}
model = lgb.train(params, train_data,100)
#predict val
y_pred = model.predict(x_val)
y_pred = y_pred.argmax(axis=1)# acc
acc = accuracy_score(y_val, y_pred)
print(acc)# feature importance
feature_name = model.feature_name()
feature_importance = model.feature_importance()
feature_score = dict(zip(feature_name, feature_importance))
feature_score_sort = sorted(feature_score.items(),key=lambda x:x[1], reverse=True)# save model
joblib.dump(model, 'model.pkl')

上述就是基于lightgbm训练gbdt模型的代码,训练完后我们通过joblib保存了我们训练好的模型,这个模型接下来我们可以在spark进行分布式预测。

2 第三方包python环境打包

在使用spark的时候,我们可以自定义python环境,并且把我们需要的第三方包都可以安装该python环境里,这样在spark里我们就可以用python第三方包,比如等会我们需要的joblib, numpy等。具体如何配置python环境和第三方包,可以参考我上一篇博客:如何在spark中使用scikit-learn和tensorflow等第三方python包

3 Spark中使用gbdt模型

通过上述步骤,把需要的python环境和第三方包制作好了,包名为python39.zip,接下来我们介绍一下如何在spark中使用我们刚才训练好的gbdt模型进行分布式快速预测。

3.1 spark配置文件

提交spark任务的时候,配置文件这块也需要稍微修改一下,配置文件信息如下:

$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-memory 12G
--executor-memory 20G \
--executor-cores 4 \
--queue root.your_queue_name \
--archives ./python39.zip#python39 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python39/python39/bin/python3.9 \
--conf spark.yarn.appMasterEnv,HADOOP_USER_NAME=your_hduser_name \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enbled=true \
--conf spark.dynamicAllocation.maxExecutors=50 \
--conf spark.dynamicAllocation.minExecutors=50 \
--conf spark.braodcast.compress=True \
--conf saprk.network.timeout=1000s \
--conf spark.sql.hive.mergeFiles=true \
--conf spark.speculation=false \
--conf spark.yarn.executor.memoryOverhead=4096 \
--files $HIVE_CONF_DIR/hive-site.xml \
--py-files ./model.pkl \
$@

上述是基本的提交spark任务的配置文件,其中
–archives ./python39.zip#python39 \
–archives参数用于在Spark应用程序运行期间将本地压缩档案文件解压到YARN集群节点上。#python39 是为档案文件定义的别名,这将在Spark应用程序中使用。
这个参数的目的是将名为python39.zip的压缩文件解压到YARN集群节点,并将其路径设置为python39,以供Spark应用程序使用。这通常用于指定特定版本的Python环境,以便在Spark任务中使用。
–conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python39/python39/bin/python3.9
–conf参数用于设置Spark配置属性。
spark.yarn.appMasterEnv.PYSPARK_PYTHON 是一个Spark配置属性,它指定了YARN应用程序的主节点(ApplicationMaster)使用的Python解释器。
./python39/python39/bin/python3.9 是实际Python解释器的路径,它将在YARN应用程序的主节点上执行。
这个参数的目的是告诉Spark应用程序在YARN的主节点上使用特定的Python解释器即./python39/python39/bin/python3.9。这通常用于确保Spark应用程序使用正确的Python版本和环境来运行任务。
–py-files ./model.pkl \
–py-file是 Spark 提交任务时的一个参数,用于将指定的 .py 文件、.zip 文件或 .egg 文件分发到集群的所有 Worker 节点。Spark 会将这些文件自动添加到 Python 的模块路径中(即 sys.path),使得这些文件可以被任务中的代码引用。所以在这里我们将 model.pkl 模型文件分发到 Spark 集群的每个节点,确保每个节点在运行任务时都能访问并使用这个模型

3.2 主函数main.py

接下来,我们来看下如何在在spark调用我们训练好的gbdt模型进行预测,核心代码主要如下:
1)import基础函数功能包等

# -*-coding:utf8 -*-
import sys
from pyspark.sql.types import Row
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext, HiveContext
import datetime
import numpy as np
import joblibsave_table='your_target_table_name'
source_table = 'your_source_predict_table_name'
index_table = 'your_index_table_name'
# define saved table schema
schema = StructType([StructFiled('userid', StringType(), True),StructFiled('names', ArrayType(StringType()), True),StructFiled('scores', ArrayType(FloatType()), True)])
  1. main执行入口基础配置和执行流程
if __name__=='__main__':conf = SparkConf()sc = SparkContext(conf=conf, appName='gbdt_spark_predict')sc.setLogLevel("WARN")hiveCtx = HiveContext(sc)#hive基础配置hiveCtx.setConf('spark.shuffle.consolidateFiles','true')hiveCtx.setConf('spark.shuffle.memoryFraction','0.4')hiveCtx.setConf('spark.sql.shuffle.partitions','1000')if len(sys.argv) == 1:dt = datetime.datetime.now() + datetime.timedelta(-1)else:dt = datetime.datetime.strptime(sys.argv[1],"%Y%m%d').date()dt_str = dt.strftime('%Y-%m-%d')hiveCtx.sql("use your_datebase_name")#注册函数,在sql时候可以使用hiveCtx.registerFunction('null_chage', null_chage, StringType())#创建目标表create_table(hiveCtx)#主函数get_predict(hiveCtx)

上面主函数给出了一个基本的流程步骤:1)spark, hive context等初始化 2)注册函数可以直接在sql中使用,方便数据处理 3)建立目标hive表 4)执行功能函数

  1. 函数功能模块实现

在第2步骤里,我们主要有三个函数需要编写,一个是可以在sql中调用的基础函数,第二个就是创建表函数,第三个就是功能函数,我们接下来实现这三个的基本功能:

#用在sql中的基本数据操作处理
def null_chage(x):return 'unknow' if x is None else x#创建目标表
def create_table(hiveCtx):create_tbl = """CREATE EXTERNAL TABLE IF NOT EXISTS your_database_name.{table_name} (userid       string       COMMENT 'user id';names        array<string>   COMMENT 'predict label names')scores        array<float>   COMMENT 'predict socre')PARTITIONED BY(dt string, dp string)STORED AS ORCLOCATION 'hdfs://your_database_name.db/{table_name}'TBLPROPERTIES('orc.compress'='SNAPPY','comment'='gbdt predict user score')""".format(table_name=save_table)
# 功能函数
def get_predict(hiveCtx):# get label and idex datasql="""select index, valuefrom {index_table}where dt='active;""".format(index_table=index_table)print(sql)vocab = hiveCtx.sql(sql).rdd.collect()vocab_dict = dict()for x in vocab:vocab_dict.setdefault(x[0],x[1])# broadcastbr_vocab_dict = sc.broadcast(vocab_dict)# get predict datasql="""select null_chage(userid) as userid, featuresfrom {source_table}where dt='active'""".format(source_table=source_table)print(sql)hiveCtx.sql(sql).rdd.mapPartitions(lambda rows: main_func(rows, br_vocab_dict)) \.toDF(schema=schema) \.registerTempTable('final_tbl')# insert tableinsert_sql = """insert overwrite table {save_table} partition (dt='{dt}')select * from final_tbl""".format(save_table=save_table,dt='active')print(insert_sql)hiveCtx.sql(insert_sql)

接下来,我们来看下main_func函数的实现:

def main_func(rows, br_vocab_dict):# load modelmodel = joblib.load('./model.pkl')vocab_dict = br_vobab_dict.valuefor row in rows:userid, features = rowfeatures = np.array(features)predict = model.predict(features)predict_sort = np.argsort(-predict[0])names = [vocab_dict[idx] for idx in predict_sort]scores = [float(predict[0][idx]) for idx in predict_sort]yield userid, names, scores

整个代码的实现我们在这里就写完了,整体实现逻辑是比较清晰易懂的,按照这个流程来,我们可以很高效快速的基于spark分布式的跑一些数据处理和模型预测性的任务。

4 spark任务提交

接下来,就是提交我们的spark任务了,在工作环境目录如下文件信息:

  • 提前准备好的python环境包python39.zip
  • spark config文件 run_spark_arg.sh
  • 主函数代码 main.py
  • gbdt模型文件model.pkl

最后环节就是提交spark任务,我么可以在服务器提交命令如下:

nohup sh run_spark_arg.sh main.py >log.txt 2>&1 &

相关文章:

如何在Spark中使用gbdt模型分布式预测

这目录 1 训练gbdt模型2 第三方包python环境打包3 Spark中使用gbdt模型3.1 spark配置文件3.2 主函数main.py 4 spark任务提交 1 训练gbdt模型 我们可以基于lightgbm快速的训练一个gbdt模型&#xff0c;训练相对比较简单&#xff0c;只要把训练样本处理好&#xff0c;几行代码可…...

Qt-5.14.2 example

官方历程很丰富&#xff0c;modbus、串口、chart图表、3D、视频 共享方便使用 Building and Running an Example You can test that your Qt installation is successful by opening an existing example application project. To run an example application on an Android …...

virtualbox给Ubuntu22创建共享文件夹

1.在windows上的操作&#xff0c;创建共享文件夹Share 2.Ubuntu22上的操作&#xff0c;创建共享文件夹LinuxShare 3.在virtualbox虚拟机设置里&#xff0c;设置共享文件夹 共享文件夹路径&#xff1a;选择Windows系统中你需要共享的文件夹 共享文件夹名称&#xff1a;挂载至wi…...

GPT打字机效果—— fetchEventSouce进行sse流式请求

EventStream基本用法 与 WebSocket 不同的是&#xff0c;服务器发送事件是单向的。数据消息只能从服务端到发送到客户端&#xff08;如用户的浏览器&#xff09;。这使其成为不需要从客户端往服务器发送消息的情况下的最佳选择。 const evtSource new EventSource(“/api/v1/…...

SpringBoot 在线家具商城:设计考量与实现细节聚焦

第4章 系统设计 市面上设计比较好的系统都有一个共同特征&#xff0c;就是主题鲜明突出。通过对页面简洁清晰的布局&#xff0c;让页面的内容&#xff0c;包括文字语言&#xff0c;或者视频图片等元素可以清晰表达出系统的主题。让来访用户无需花费过多精力和时间找寻需要的内容…...

每日速记10道java面试题07

其他资料&#xff1a; 每日速记10道java面试题01-CSDN博客 每日速记10道java面试题02-CSDN博客 每日速记10道java面试题03-CSDN博客 每日速记10道java面试题04-CSDN博客 每日速记10道java面试题05-CSDN博客 每日速记10道java面试题06-CSDN博客 目录 1.线程的生命周期在j…...

前端面试热门题(二)[html\css\js\node\vue)

Vue 性能优化的方法 Vue 性能优化的方法多种多样&#xff0c;以下是一些常用的策略&#xff1a; 使用v-show替换v-if&#xff1a;v-show是通过CSS控制元素的显示与隐藏&#xff0c;而v-if是通过操作DOM来控制元素的显示与隐藏&#xff0c;频繁操作DOM会导致性能下降。因此&am…...

mvc基础及搭建一个静态网站

mvc asp.net core mvc环境 .net8vscode * Asp.Net Core 基础* .net8* 前辈* .net 4.9 非跨平台版本 VC* 跨平台版本* 1.0* 2.0* 2.1* 3.1* 5* 语言* C#* F# * Visual Basic* 框架* web应用* asp应用* WebFrom* mvc应用* 桌面应用* Winform* WPF* Web Api api应用或者叫服务* …...

AOSP的同步问题

repo sync同步时提示出错: error: .repo/manifests/: contains uncommitted changesRepo command failed due to the following UpdateManifestError errors: contains uncommitted changes解决方法&#xff1a; 1、cd 进入.repo/manifests cd .repo/manifests2、执行如下三…...

HarmonyOS4+NEXT星河版入门与项目实战(23)------实现手机游戏摇杆功能

文章目录 1、案例效果2、案例实现1、代码实现2、代码解释4、总结1、案例效果 2、案例实现 1、代码实现 代码如下(示例): import router from @ohos.router import {ResizeDirection } from @ohos.UiTest import curves...

Logistic Regression(逻辑回归)、Maximum Likelihood Estimatio(最大似然估计)

Logistic Regression&#xff08;逻辑回归&#xff09;、Maximum Likelihood Estimatio&#xff08;最大似然估计&#xff09; 逻辑回归&#xff08;Logistic Regression&#xff0c;LR&#xff09;逻辑回归的基本思想逻辑回归模型逻辑回归的目标最大似然估计优化方法 逻辑回归…...

Vue文字转语音实现

在开发流程中&#xff0c;面对语音支持的需求&#xff0c;小规模语音内容或许可以通过预处理后播放来轻松应对&#xff0c;但当涉及大量语音时&#xff0c;这一方法就显得繁琐低效了。为此&#xff0c;智慧的开发者们总能找到便捷的解决方案——利用Web技术实现语音播放&#x…...

Docker快速部署RabbitMq

在外网服务器拉取镜像 docker pull arm64v8/rabbitmq:3.8.9-management或者拉去我的服务器的 docker pull registry.cn-hangzhou.aliyuncs.com/qiluo-images/linux_arm64_rabbitmq:3.8.9-management重新命名 docker tag registry.cn-hangzhou.aliyuncs.com/qiluo-images/lin…...

glog在vs2022 hello world中使用

准备工作 设置dns为阿里云dns 223.5.5.5&#xff0c;下载cmake&#xff0c;vs2022&#xff0c;git git clone https://github.com/google/glog.git cd glog mkdir build cd build cmake .. 拷贝文件 新建hello world并设置 设置预处理器增加GLOG_USE_GLOG_EXPORT;GLOG_NO_AB…...

[241129] Docker Desktop 4.36 发布:企业级管理功能、WSL 2 增强 | Smile v4.0.0 发布

目录 Docker Desktop 4.36 发布&#xff1a;企业级管理功能、WSL 2 和 ECI 增强Smile v4.0.0 发布&#xff01;Java 机器学习库迎来重大升级 Docker Desktop 4.36 发布&#xff1a;企业级管理功能、WSL 2 和 ECI 增强 Docker Desktop 4.36 带来了强大的更新&#xff0c;简化了…...

CentOS使用chrony服务进行时间同步源设置脚本

CentOS使用chrony服务进行时间同步源设置脚本 #!/bin/bash# Created: 2024-11-26 # Function: Check and Set OS time sync source to 10.0.11.100 # FileName: centos_set_time_source_to_ad.sh # Creator: Anster # Usage: # curl http://webserver-ip/scripts/centos_set…...

Git仓库迁移到远程仓库(源码、分支、提交)

单个迁移仓库 一、迁移仓库 1.准备工作 > 手动在电脑创建一个临时文件夹&#xff0c;CMD进入该目录 > 远程仓库上创建一个同名的空仓库 2.CMD命令&#xff1a;拉取旧Git仓库&#xff08;包含提交、分支、源码&#xff09; $ git clone --bare http://git.domain.cn/…...

【算法刷题指南】优先级队列

&#x1f308;个人主页&#xff1a; 南桥几晴秋 &#x1f308;C专栏&#xff1a; 南桥谈C &#x1f308;C语言专栏&#xff1a; C语言学习系列 &#x1f308;Linux学习专栏&#xff1a; 南桥谈Linux &#x1f308;数据结构学习专栏&#xff1a; 数据结构杂谈 &#x1f308;数据…...

使用pymupdf提取PDF文档中的文字和其颜色

最近我在捣鼓一个PDF文件&#xff0c;想把它里面的文字和文字颜色给提取出来。后来发现有个叫pymupdf的库能搞定这事儿。操作起来挺简单的&#xff0c;pymupdf的示例文档里就有现成的代码可以参考。 how-to-extract-text-with-color 我本地的测试代码如下&#xff1a; impor…...

贪心算法题

0简介 0.1什么是贪心算法 贪心算法是用贪婪(鼠目寸光)的角度&#xff0c;找到解决问题的最优解 贪心策略&#xff1a;(从局部最优 --> 整体最优) 1把解决问题的过程分为若干步&#xff1b; 2解决每一个问题时&#xff0c;都选择当前“看上去”最优的解法&#xff1b; 3“…...

网络编程(Modbus进阶)

思维导图 Modbus RTU&#xff08;先学一点理论&#xff09; 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议&#xff0c;由 Modicon 公司&#xff08;现施耐德电气&#xff09;于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...

【Linux】shell脚本忽略错误继续执行

在 shell 脚本中&#xff0c;可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行&#xff0c;可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令&#xff0c;并忽略错误 rm somefile…...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案&#xff0c;允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

从零实现STL哈希容器:unordered_map/unordered_set封装详解

本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说&#xff0c;直接开始吧&#xff01; 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...

Java入门学习详细版(一)

大家好&#xff0c;Java 学习是一个系统学习的过程&#xff0c;核心原则就是“理论 实践 坚持”&#xff0c;并且需循序渐进&#xff0c;不可过于着急&#xff0c;本篇文章推出的这份详细入门学习资料将带大家从零基础开始&#xff0c;逐步掌握 Java 的核心概念和编程技能。 …...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek

文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama&#xff08;有网络的电脑&#xff09;2.2.3 安装Ollama&#xff08;无网络的电脑&#xff09;2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...

动态 Web 开发技术入门篇

一、HTTP 协议核心 1.1 HTTP 基础 协议全称 &#xff1a;HyperText Transfer Protocol&#xff08;超文本传输协议&#xff09; 默认端口 &#xff1a;HTTP 使用 80 端口&#xff0c;HTTPS 使用 443 端口。 请求方法 &#xff1a; GET &#xff1a;用于获取资源&#xff0c;…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

android13 app的触摸问题定位分析流程

一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...

给网站添加live2d看板娘

给网站添加live2d看板娘 参考文献&#xff1a; stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下&#xff0c;文章也主…...