当前位置: 首页 > news >正文

python 使用进程池并发执行 SQL 语句

        这段代码使用了 Python 的 multiprocessing 模块来实现真正的并行处理,绕过 Python 的全局解释器锁(GIL)限制,从而在多核 CPU 上并发执行多个 SQL 语句。

from pyhive import hive
import multiprocessing# 建立连接
conn = hive.Connection(host="localhost", port=10000, username="your_username", password="your_password")# SQL 语句列表
sql_statements = ["INSERT INTO table1 VALUES (1, 'value1')","INSERT INTO table1 VALUES (2, 'value2')","INSERT INTO table1 VALUES (3, 'value3')"
]# 定义执行函数
def execute_sql(sql):with conn.cursor() as cursor:cursor.execute(sql)# 确保多进程代码只在主进程中执行
if __name__ == '__main__':# 使用进程池并发执行with multiprocessing.Pool() as pool:pool.map(execute_sql, sql_statements)# 关闭连接conn.close()

1. 导入模块

from pyhive import hive
import multiprocessing
  • pyhive: 这是用于连接和操作 Hive 数据库的 Python 库。hive.Connection 用于建立与 Hive 数据库的连接。
  • multiprocessing: 这是 Python 的标准库,用于创建和管理进程。通过 multiprocessing,我们可以绕过 Python 的 GIL(全局解释器锁)限制,实现真正的并行处理。

2. 建立数据库连接

conn = hive.Connection(host="localhost", port=10000, username="your_username", password="your_password")
  • 这里我们使用 hive.Connection 建立一个到 Hive 数据库的连接。
  • 参数
    • host: HiveServer2 的主机地址,通常是 localhost 或 HiveServer2 运行的服务器 IP。
    • port: HiveServer2 的端口号,默认是 10000
    • username: 连接 Hive 使用的用户名。
    • password: 连接 Hive 使用的密码。

这个连接对象 conn 将在后续的代码中用于创建游标(cursor),并通过游标执行 SQL 语句。

3. 定义 SQL 语句列表

sql_statements = ["INSERT INTO table1 VALUES (1, 'value1')","INSERT INTO table1 VALUES (2, 'value2')","INSERT INTO table1 VALUES (3, 'value3')"
]
  • 这里定义了一个包含多个 SQL 语句的列表 sql_statements。每个语句都是一个插入操作,将数据插入到 Hive 表 table1 中。
  • 你可以根据实际需求修改这些 SQL 语句。

4. 定义执行函数

def execute_sql(sql):with conn.cursor() as cursor:cursor.execute(sql)
  • execute_sql 函数是用于执行单个 SQL 语句的函数。
  • with conn.cursor() as cursor:为当前数据库连接创建一个游标对象 cursor,这个游标用于执行 SQL 语句。
    • cursor.execute(sql):执行传入的 SQL 语句。
  • 这个函数会被进程池中的每个进程调用,每个进程都会独立执行一个 SQL 语句。

5. 使用进程池并发执行

with multiprocessing.Pool() as pool:pool.map(execute_sql, sql_statements)
  • multiprocessing.Pool():创建一个进程池。进程池可以管理一组工作进程,并将任务分配给这些进程。
    • 默认情况下,Pool() 会根据系统的 CPU 核心数创建相应数量的工作进程。
    • 你可以通过参数指定池中的进程数量,例如 Pool(4) 表示创建 4 个工作进程。
  • pool.map(execute_sql, sql_statements)
    • pool.map 方法会将 execute_sql 函数应用到 sql_statements 列表中的每个元素上。
    • pool.map 方法会自动将 SQL 语句列表分配给进程池中的工作进程,每个进程独立执行一个 SQL 语句。
    • 这个过程是并行的,多个进程可以同时执行不同的 SQL 语句,从而提高执行效率。

6. 关闭数据库连接

conn.close()
  • 在所有 SQL 语句执行完毕后,我们关闭数据库连接,释放资源。

进程池的工作原理

multiprocessing.Pool 提供了一种方便的方式来并行化执行函数。其工作原理如下:

  1. 创建进程池:当你创建一个 Pool 对象时,会启动多个工作进程(数量可以指定,或默认根据 CPU 核心数决定)。
  2. 任务分配:当你调用 pool.map 时,进程池会将任务(在这里是 execute_sql 函数)分配给空闲的工作进程。
  3. 并行执行:每个工作进程独立执行分配给它的任务,互不干扰。
  4. 结果收集pool.map 会收集所有工作进程的执行结果,并按照原始任务列表的顺序返回结果。

为什么使用进程池而不是线程池?

  1. GIL 限制:Python 的全局解释器锁(GIL)限制了多线程的并行执行能力,尤其是在 CPU 密集型任务中,多线程并不能充分利用多核 CPU。
  2. 进程并行multiprocessing 模块通过创建多个进程来绕过 GIL 限制,每个进程都有自己的 Python 解释器和内存空间,因此可以实现真正的并行执行。
  3. 适用场景
    • 线程池:适合 I/O 密集型任务(例如,等待数据库查询结果)。
    • 进程池:适合 CPU 密集型任务(例如,并行计算、数据处理等),或者你需要绕过 GIL 限制时。

注意事项

  1. 数据库连接:在多进程环境中,每个进程都有自己的内存空间,因此每个进程需要独立的数据库连接。在上述代码中,每个进程都通过 conn.cursor() 创建了自己的游标。
  2. 进程开销:创建和销毁进程有一定的开销,因此对于非常短小的任务,进程池可能不会显著提高性能。在这种情况下,可以考虑调整进程池的大小或使用其他优化手段。
  3. 连接池:如果你的程序需要频繁访问数据库,可以考虑使用数据库连接池来复用数据库连接,减少连接建立和关闭的开销。

总结

  • 进程池:通过 multiprocessing.Pool 实现,可以绕过 Python 的 GIL 限制,实现真正的并行处理。
  • 适用场景:适合 CPU 密集型任务或需要并行执行多个独立任务的场景。
  • 代码结构
    • 建立数据库连接。
    • 定义 SQL 语句列表。
    • 定义执行函数 execute_sql
    • 使用进程池并发执行 SQL 语句。
    • 关闭数据库连接。

通过这种方式,你可以充分利用多核 CPU 的优势,并发执行多个 SQL 语句,从而提高程序的执行效率。

解决多进程报错

你遇到的错误是 RuntimeError,这是因为你在使用 multiprocessing 时没有正确地保护代码的入口点。具体来说,在 Windows 系统上(以及其他非 fork 的启动方式),你必须将多进程相关的代码放在 if __name__ == '__main__': 语句块中,以避免子进程在启动时重新导入主模块并执行不必要的代码。

错误原因:

在 Windows 系统中,Python 的 multiprocessing 模块使用 spawn 启动子进程,这意味着子进程会重新导入当前脚本。如果不加以保护,子进程会再次执行主模块中的代码,导致递归创建进程并抛出错误。

解决方案:

你需要将多进程相关的代码放在 if __name__ == '__main__': 语句块中,确保只有主进程会执行这些代码,而子进程不会。

修改后的代码:

 

python

import multiprocessingdata = ["1","2","3"
]# 定义执行函数
def print_str(data):print(data)# 确保多进程代码只在主进程中执行
if __name__ == '__main__':# 使用进程池并发执行with multiprocessing.Pool() as pool:pool.map(print_str, data)

解释:

  • if __name__ == '__main__': 确保了只有在直接运行当前脚本时,才会执行其中的多进程代码。子进程不会执行这个代码块,从而避免了递归创建进程的问题。
  • 在 Windows 系统上,这是使用 multiprocessing 时必须遵循的惯用写法。

其他注意事项:

  • 如果你打算将脚本打包成可执行文件(例如使用 pyinstaller),你还需要调用 multiprocessing.freeze_support(),不过在大多数脚本运行的情况下,这个调用不是必须的。

例如:

 

python

if __name__ == '__main__':multiprocessing.freeze_support()  # 如果需要打包成可执行文件,可以加上这行with multiprocessing.Pool() as pool:pool.map(print_str, data)

参考文档:

你可以参考 Python 官方文档中关于 multiprocessing 的部分,了解更多关于安全导入主模块的信息:

  • multiprocessing — Process-based parallelism — Python 3.13.0 documentation

执行sql 简单示例

import multiprocessingdata = [  ]# 定义执行函数
def print_str(data):print(data)# 确保多进程代码只在主进程中执行
if __name__ == '__main__':data2 = ["1","2","3"]for i in data2:data_str = f"""inset into {i}"""data.append(data_str)# 使用进程池并发执行with multiprocessing.Pool() as pool:pool.map(print_str, data)

相关文章:

python 使用进程池并发执行 SQL 语句

这段代码使用了 Python 的 multiprocessing 模块来实现真正的并行处理,绕过 Python 的全局解释器锁(GIL)限制,从而在多核 CPU 上并发执行多个 SQL 语句。 from pyhive import hive import multiprocessing# 建立连接 conn hive.…...

我也谈AI

“随着人工智能技术的不断发展,我们已经看到了它在各行业带来的巨大变革。在医疗行业中,人工智能技术正在被应用于病例诊断、药物研发等方面,为医学研究和临床治疗提供了新的思路和方法;在企业中,人工智能技术可以通过…...

算法妙妙屋-------1.递归的深邃回响:二叉树的奇妙剪枝

大佬们好呀,这一次讲解的是二叉树的深度搜索,大佬们请阅 1.前言 ⼆叉树中的深搜(介绍) 深度优先遍历(DFS,全称为DepthFirstTraversal),是我们树或者图这样的数据结构中常⽤的⼀种…...

编写第一个 Appium 测试脚本:从安装到运行!

前言 最近接到一个测试项目,简单描述一下,需求就是:一端发送指令,另一端接受指令并处理指令。大概看了看有上百条指令,点点点岂不是废了,而且后期迭代,每次都需要点点点,想想就头大…...

mysql查表相关练习

作业要求: 单表练习: 1 . 查询出部门编号为 D2019060011 的所有员工 2 . 所有财务总监的姓名、编号和部门编号。 3 . 找出奖金高于工资的员工。 4 . 找出奖金高于工资 40% 的员工。 5 找出部门编号为 D2019090011 中所有财务总监,和…...

airtest+poco多脚本、多设备批处理运行测试用例自动生成测试报告

一:主要内容 框架功能、框架架构及测试报告效果 airtest安装、环境搭建 框架搭建、框架运行说明 框架源码 二:框架功能及测试报告效果 1. 框架功能: 该框架笔者用来作为公司的项目的前端自动化,支持pc和app,本文…...

Prometheus套装部署到K8S+Dashboard部署详解

1、添加helm源并更新 helm repo add prometheus-community https://prometheus-community.github.io/helm-charts helm repo update2、创建namespace kubectl create namespace monitoring 3、安装Prometheus监控套装 helm install prometheus prometheus-community/prome…...

python使用pymysql

为了封装这个数据库操作为一个通用方法,我们可以创建一个函数,该函数接受数据库连接参数(如主机名、用户名、密码、数据库名)、SQL语句以及必要的参数(用于参数化查询)。下面是一个简单的封装示例&#xff…...

Vue3 + TypeScript 组件和文件命名规范及 setup 导入顺序规范

前言 在 Vue3 项目中,合理的文件命名规范和导入顺序不仅有助于提高代码的可读性,还能增强团队协作的效率。特别是在使用 TypeScript 和 Composition API 的项目中,清晰的组件和文件结构尤为重要。本文将详细介绍 Vue3 TypeScript 项目中的组…...

netty之处理连接源码分析

写在前面 在这篇文章看了netty服务是如何启动的,服务启动成功后,也就相当于是迎宾工作都已经准备好了,那么当客人来了怎么招待客人呢?也就是本文要看的处理连接的工作。 1:正文 先启动源码example模块的echoserver&a…...

Dockerfile文件编写

1、打nginx原始包 登录后复制 ROM nginxENV LANG zh_CN.UTF-8 ENV LC_ALL zh_CN.UTF-8 ENV TZ Asia/Singapore# 设置时区,同样保持在一层 RUN ln -sf /usr/share/zoneinfo/${TZ} /etc/localtime && \echo "${TZ}" > /etc/timezoneRUN apt-get …...

Oracle SQL 使用 ROWNUM 分页查询速度太慢的问题及解决方案!

在使用 Oracle 数据库进行数据查询时,分页查询是一种常见的需求。传统上,开发者常常使用 ROWNUM 来实现分页功能。 然而,当数据量较大时,使用 ROWNUM 进行分页查询可能会导致性能问题。本文将深入探讨这一问题的原因,并提供多种解决方案,以提高分页查询的性能。 一、RO…...

Django3 + Vue.js 前后端分离书籍添加项目Web开发实战

文章目录 Django3后端项目创建切换数据库创建Django实战项目App新建Vue.js前端项目 Django3后端项目创建 创建Django项目,采用Pycharm或者命令行创建皆可。此处,以命令行方式作为演示,项目名为django_vue。 django-admin startproject djang…...

楼梯区域分割系统:Web效果惊艳

楼梯区域分割系统源码&数据集分享 [yolov8-seg-FocalModulation&yolov8-seg-GFPN等50全套改进创新点发刊_一键训练教程_Web前端展示] 1.研究背景与意义 项目参考ILSVRC ImageNet Large Scale Visual Recognition Challenge 项目来源AAAI Global Al l…...

Day10加一

给定一个由 整数 组成的 非空 数组所表示的非负整数,在该数的基础上加一。 最高位数字存放在数组的首位, 数组中每个元素只存储单个数字。 你可以假设除了整数 0 之外,这个整数不会以零开头。 class Solution {public int[] plusOne(int[] di…...

UTF-8简介

UTF-8 UTF-8(8-bit Unicode Transformation Format)是一种针对Unicode的可变长度字符编码,也是一种前缀码。它可以用一至四个字节对Unicode字符集中的所有有效编码点进行编码,属于Unicode标准的一部分,最初由肯汤普逊…...

基于Openwrt系统架构,实现应用与驱动的实例。

一、在openwrt系统架构,编写helloworld的应用程序。 第一步先创建目录,项目代码要放在 openwrt根目下的 package 目录中,这里源码写在了 hellworld 的 src 目录下,因为外层还有需要编写的文件。 mkdir -p ~/openwrt/package/hel…...

SQL进阶技巧:如何利用三次指数平滑模型预测商品零售额?

目录 0 问题背景 1 数据准备 2 问题解决 2.1 模型构建 (1)符号规定 (2)基本假设...

HTB:Cicada[WriteUP]

目录 连接至HTB服务器并启动靶机 使用nmap对靶机进行开放端口扫描 使用nmap对靶机开放端口进行脚本、服务信息扫描 首先尝试空密码连接靶机SMB服务 由于不知道账户名,这里我们使用crackmapexec对smb服务进行用户爆破 通过该账户连接至靶机SMB服务器提取敏感信…...

小张求职记四

学校食堂的装修富丽堂皇,像个金碧辉煌的宫殿,可实际上却充斥着廉价的塑料制品和刺鼻的消毒水味。这金玉其外败絮其中的景象,与食堂承包商的“精明算计”如出一辙。 小张和小丽应约来到了一个档口下,“红烧肉”,之前就是…...

适用于 c++ 的 wxWidgets框架源码编译SDK-windows篇

本文章记录了下载wxWidgets源码在windows 11上使用visual Studio 2022编译的全过程,讲的不详细请给我留言,让我知道错误并改进。 本教程是入门级。有更深入的交流可以留言给我。 如今互联网流行现在大家都忘记了这块桌面的开发,我认为桌面应用还是有用武之地,是WEB无法替代…...

flink 内存配置(二):设置TaskManager内存

TaskManager在Flink中运行用户代码。根据需要配置内存使用,可以极大地减少Flink的资源占用,提高作业的稳定性。 注意下面的讲解适用于TaskManager 1.10之后的版本。与JobManager进程的内存模型相比,TaskManager内存组件具有类似但更复杂的结构…...

【C++ 算法进阶】算法提升八

复杂计算 (括号问题相关递归套路 重要) 题目 给定一个字符串str str表示一个公式 公式里面可能有整数 - * / 符号以及左右括号 返回最终计算的结果 题目分析 本题的难点主要在于可能会有很多的括号 而我们直接模拟现实中的算法的话code会难写 要考虑…...

阿里云实时数据仓库HologresFlink

1. 实时数仓Hologres特点 专注实时场景:数据实时写入、实时更新,写入即可见,与Flink原生集成,支持高吞吐、低延时、有模型的实时数仓开发,满足业务洞察实时性需求。 亚秒级交互式分析:支持海量数据亚秒级交…...

生成式语言模型的文本生成评价指标(从传统的基于统计到现在的基于语义)

文本生成评价指标 以 BLEU 为代表的基于统计的文本评价指标基于 BERT 等预训练模型的文本评价指标 1.以 BLEU 为代表的基于统计的文本评价指标 1.BLEU(Bilingual Evaluation Understudy, 双语评估辅助工具) 所有评价指标的鼻祖,核心思想是比较 候选译文 和 参考…...

【网安案例学习】暴力破解攻击(Brute Force Attack)

### 案例与影响 暴力破解攻击在历史上曾导致多次重大安全事件,特别是在用户数据泄露和账户被盗的案例中。随着计算能力的提升和密码管理技术的进步,暴力破解的威胁虽然有所减弱,但仍需警惕,特别是在面对高价值目标时。 【故事一…...

时间序列预测(十八)——实现配置管理和扩展命令行参数解析器

如图,这是一个main,py文件,在此代码中,最开始定义了许多模型参数,为了使项目更加灵活和可扩展,便于根据不同的需求调整参数和配置,可以根据实际需要扩展参数和配置项。 下面是如何实现配置管理和扩展命令行…...

Vue问题汇总解决

作者:fyupeng 技术专栏:☞ https://github.com/fyupeng 项目地址:☞ https://github.com/fyupeng/distributed-blog-system-api 留给读者 我们经常在使用Vue开发遇到一些棘手的问题,解决后通常要进行总结,避免下次重复…...

Spark学习

Spark简介 1.Spark是什么 首先spark是一个计算引擎,而不是存储工具,计算引擎有很多: 第一代:MapReduce廉价机器实现分布式大数据处理 第二代:Tez基于MR优化了DAG,性能比MR快一些 第三代:Spark…...

一些小细节代码笔记汇总

Python cv2抓取摄像头图片保存到本地 import cv2 import datetime, ossavePath "E:/Image/"if not os.path.exists(savePath):os.makedirs(savePath)cap cv2.VideoCapture(0) capture Falseif not cap.isOpened():print("无法打开摄像头")exit()while…...