当前位置: 首页 > news >正文

python pandas.DataFrame 直接写入Clickhouse

import pandas as pd
import sqlalchemy
from clickhouse_sqlalchemy import Table, engines
from sqlalchemy import create_engine, MetaData, Column
import urllib.parsehost = '1.1.1.1'
user = 'default'
password = 'default'
db = 'test'
port = 8123 # http连接端口
engine = create_engine('clickhouse://{user}:{password}@{host}:{port}/{db}'.format(user = user,host = host,password = urllib.parse.quote_plus(password),db = db,port = port),pool_size = 30,max_overflow = 0,pool_pre_ping=True , pool_recycle= 3600)
port = 9000 # Tcp/Ip连接端口
engine1 = create_engine('clickhouse+native://{user}:{password}@{host}:{port}/{db}'.format(user = user,host = host,password = urllib.parse.quote_plus(password),db = db,port = port),pool_size = 30,max_overflow = 0,pool_pre_ping=True , pool_recycle=3600)# https://github.com/xzkostyan/clickhouse-sqlalchemy/issues/129
# 参考文档https://github.com/xzkostyan/clickhouse-sqlalchemy
# pip install sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
# pip install clickhouse-sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simpleclass ClickhouseDf(object):def __init__(self, **kwargs):self.engines_dict = {"MergeTree": engines.MergeTree,"AggregatingMergeTree": engines.AggregatingMergeTree,"GraphiteMergeTree": engines.GraphiteMergeTree,"CollapsingMergeTree": engines.CollapsingMergeTree,"VersionedCollapsingMergeTree": engines.VersionedCollapsingMergeTree,"SummingMergeTree": engines.SummingMergeTree,"ReplacingMergeTree": engines.ReplacingMergeTree,"Distributed": engines.Distributed,"ReplicatedMergeTree": engines.ReplicatedMergeTree,"ReplicatedAggregatingMergeTree": engines.ReplicatedAggregatingMergeTree,"ReplicatedCollapsingMergeTree": engines.ReplicatedCollapsingMergeTree,"ReplicatedVersionedCollapsingMergeTree": engines.ReplicatedVersionedCollapsingMergeTree,"ReplicatedReplacingMergeTree": engines.ReplicatedReplacingMergeTree,"ReplicatedSummingMergeTree": engines.ReplicatedSummingMergeTree,"View": engines.View,"MaterializedView": engines.MaterializedView,"Buffer": engines.Buffer,"TinyLog": engines.TinyLog,"Log": engines.Log,"Memory": engines.Memory,"Null": engines.Null,"File": engines.File}self.table_engine = kwargs.get("table_engine", "MergeTree")  # 默认引擎选择if self.table_engine not in self.engines_dict.keys():raise ValueError("No engine for this table")def _createORMTable(self, df, name, con, schema, **kwargs):col_dtype_dict = {"object": sqlalchemy.Text,"int64": sqlalchemy.Integer,"int32": sqlalchemy.Integer,"int16": sqlalchemy.Integer,"int8": sqlalchemy.Integer,"int": sqlalchemy.Integer,"float64": sqlalchemy.Float,"float32": sqlalchemy.Float,"float16": sqlalchemy.Float,"float8": sqlalchemy.Float,"float": sqlalchemy.Float,}primary_key = kwargs.get("primary_key", [])df_col = df.columns.tolist()metadata = MetaData(bind=con, schema=schema)_table_check_col = []for col in df_col:col_dtype = str(df.dtypes[col])if col_dtype not in col_dtype_dict.keys():if col in primary_key:_table_check_col.append(Column(col, col_dtype_dict["object"], primary_key=True))else:_table_check_col.append(Column(col, col_dtype_dict["object"]))else:if col in primary_key:_table_check_col.append(Column(col, col_dtype_dict[col_dtype], primary_key=True))else:_table_check_col.append(Column(col, col_dtype_dict[col_dtype]))_table_check = Table(name, metadata,*_table_check_col,self.engines_dict[self.table_engine](primary_key=primary_key))return _table_checkdef _checkTable(self, name, con, schema):sql_str = f"EXISTS {schema}.{name}"if con.execute(sql_str).fetchall() == [(0,)]:return 0else:return 1def to_sql(self, df, name: str, con, schema=None, if_exists="fail",**kwargs):'''将DataFrame格式数据插入Clickhouse中{'fail', 'replace', 'append'}, default 'fail''''if self.table_engine in ["MergeTree"]:  # 表格必须有主键的引擎列表-暂时只用这种,其他未测试self.primary_key = kwargs.get("primary_key", [df.columns.tolist()[0]])else:self.primary_key = kwargs.get("primary_key", [])orm_table = self._createORMTable(df, name, con, schema, primary_key=self.primary_key)tanle_exeit = self._checkTable(name, con, schema)# 创建表if if_exists == "fail":if tanle_exeit:raise ValueError(f"table already exists :{name} ")else:orm_table.create()if if_exists == "replace":if tanle_exeit:orm_table.drop()orm_table.create()else:orm_table.create()if if_exists == "append":if not tanle_exeit:orm_table.create()# http连接下会自动将None填充为空字符串以入库,tcp/ip模式下则不会,会导致引擎报错,需要手动填充。df_dict = df.to_dict(orient="records")con.execute(orm_table.insert(), df_dict)# df.to_sql(name, con, schema, index=False, if_exists="append")if __name__ == '__main__':# 使用方法cdf = ClickhouseDf()df = pd.DataFrame({'column1': [1, 2, 3],'column2': ['A', 'B', 'C']})db = 'default'password = ''user = 'default'port = 9090host = '192.168.76.136'engine = create_engine('clickhouse+native://{user}:{password}@{host}:{port}/{db}'.format(user=user,host=host,password=urllib.parse.quote_plus(password),db=db,port=port),pool_size=30, max_overflow=0,pool_pre_ping=True, pool_recycle=3600)with engine.connect() as conn:cdf.to_sql(df, "table_name", conn, schema='default', if_exists="replace")list = engine.connect().execute("SELECT * FROM table_name").fetchall()print(list)

1) 运行需要安装包

# pip install sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
# pip install clickhouse-sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
 

2)cdf.to_sql(df, "table_name", conn, schema='default', if_exists="replace")

这里的 schema 一定要写,判断表是否存在 是用 

if con.execute('EXISTS default.table_name') == [(0,)]: 来判断表是否存在的

参考链接: SQLAlchemy_clickhouse_sqlalchemy-CSDN博客

https://github.com/xzkostyan/clickhouse-sqlalchemy

相关文章:

python pandas.DataFrame 直接写入Clickhouse

import pandas as pd import sqlalchemy from clickhouse_sqlalchemy import Table, engines from sqlalchemy import create_engine, MetaData, Column import urllib.parsehost 1.1.1.1 user default password default db test port 8123 # http连接端口 engine create…...

德语中第二虚拟式在主动态的形式,柯桥哪里可以学德语

德语中第二虚拟式在主动态的形式 1. 对于大多数的动词,一般使用这样的一般现在时时态: wrde 动词原形 例句:Wenn es nicht so viel kosten wrde, wrde ich mir ein Haus am Meer kaufen. 如果不花这么多钱,我会在海边买一栋房…...

[Python进阶] 消息框、弹窗:tkinter库

6.16 消息框、弹窗:tkinter 6.16.1 前言 应用程序中的提示信息处理程序是非常重要的部分,用户要知道他输入的资料到底正不正确,或者是应用程序有一些提示信息要告诉用户,都必须通过提示信息处理程序来显示适当的信息&#xff0c…...

(免费领源码)java#Springboot#mysql装修选购网站99192-计算机毕业设计项目选题推荐

摘 要 随着科学技术,计算机迅速的发展。在如今的社会中,市场上涌现出越来越多的新型的产品,人们有了不同种类的选择拥有产品的方式,而电子商务就是随着人们的需求和网络的发展涌动出的产物,电子商务网站是建立在企业与…...

生活废品回收系统 JAVA语言设计和实现

目录 一、系统介绍 二、系统下载 三、系统截图 一、系统介绍 基于VueSpringBootMySQL的生活废品回收系统包含资源类型模块、资源品类模块、回收机构模块、回收机构模块、资源销售单模块、资源交易单模块、资源交易单模块,还包含系统自带的用户管理、部门管理、角…...

redhat/centos 配置本地yum源

- 详细步骤(首先需要将iso文件上传到服务器): 1. mkdir /media/cdrom #新建镜像文件挂载目录2. cd /usr/local/src #进入系统镜像文件存放目录3. ls #列出目录文件,可以看到刚刚上传的系统镜像文件4. mount -t iso9660 -o loop /usr/local/src/rhel-s…...

FLStudio2024汉化破解版在哪可以下载?

水果音乐制作软件FLStudio是一款功能强大的音乐创作软件,全名:Fruity Loops Studio。水果音乐制作软件FLStudio内含教程、软件、素材,是一个完整的软件音乐制作环境或数字音频工作站... FL Studio21简称FL 21,全称 Fruity Loops Studio 21,因此国人习惯叫…...

Java 音频处理,音频流转音频文件,获取音频播放时长

1.背景 最近对接了一款智能手表,手环,可以应用与老人与儿童监控,环卫工人监控,农场畜牧业监控,宠物监控等,其中用到了音频传输,通过平台下发语音包,发送远程命令录制当前设备音频并…...

Spring Boot发送邮件

在现代的互联网应用中,发送电子邮件是一项常见的功能需求。Spring Boot提供了简单且强大的邮件发送功能,使得在应用中集成邮件发送变得非常容易。本文将介绍如何在Spring Boot中发送电子邮件,并提供一个完整的示例。 1. 准备工作 在开始之前…...

智慧矿山:AI算法助力!刮板机监测,生产效率和安全性提升!

工作面刮板机在煤矿等采矿场景中起着重要作用。为了提高其生产效率和安全性,研究人员开发了一种基于 AI 算法的刮板机监测技术。 在传统的刮板机监测中,通常需要人工观察和判断刮板机的状态。这种方法存在许多问题,如主观性、耗时和易出错等。…...

Qt跨平台(统信UOS)各种坑解决办法

记录Qt跨平台的坑,方便日后翻阅。 一、环境安装 本人用的是qt 5.14.2.直接在官网下载即可。地址:Index of /archive/qt/5.14/5.14.2 下载linux版本。 下载之后 添加可执行权限。 chmod 777 qt-opensource-linux-x64-5.14.2.run 然后执行。 出现坑1…...

ORB-SLAM3算法1之Ubuntu18.04+ROS-melodic安装ORB-SLAM3及各种问题解决

文章目录 0 引言1 安装依赖1.1 opencv安装1.2 Eigen3安装1.3 Pangolin安装1.4 其他2 编译安装ORB-SLAM32.1 build.sh2.2 build_ros.sh0 引言 ORB-SLAM3,在之前ORB-SLAM和ORB-SLAM2的基础上,新增了IMU多传感器融合SLAM,这是第一个能够使用针孔和鱼眼镜头模型通过单目、立体和…...

git学习笔记之用命令行解决冲突

背景 一般来说,当使用git检测到源分支和目标分支发生冲突时,我们习惯用IDE在本地进行冲突的解决,再合并、push。 但如果冲突文件不多,我们大可以直接用命令行去解决冲突。 方法 第一种方法: 找到所有的>>>…...

C语言中的内联汇编是什么?如何使用内联汇编进行底层编程?

C语言中的内联汇编是一种高级编程技术,允许开发者在C代码中嵌入汇编代码,以实现对特定处理器指令的直接控制和优化。内联汇编通常用于底层编程,例如操作系统开发、嵌入式系统编程和性能关键的应用程序。本文将详细介绍内联汇编的概念、语法和…...

react笔记基础部分(组件生命周期路由)

注意点&#xff1a; class是一个关键字&#xff0c; 类。 所以react 写class, 用classname &#xff0c;会自动编译替换class 点击方法&#xff1a; <button onClick {this.sendData}>给父元素传值</button>常用的插件&#xff1a; 需要引入才能使用的&#xf…...

Sentinel授权规则和规则持久化

大家好我是苏麟 , 今天说说Sentinel规则持久化. 授权规则 授权规则可以对请求方来源做判断和控制。 授权规则 基本规则 授权规则可以对调用方的来源做控制&#xff0c;有白名单和黑名单两种方式。 白名单&#xff1a;来源&#xff08;origin&#xff09;在白名单内的调用…...

JVM(三) 垃圾回收

一、自动垃圾回收 1.1 C/C++的内存管理 在C/C++这类没有自动垃圾回收机制的语言中,一个对象如果不再使用,需要手动释放,否则就会出现内存泄漏。我们称这种释放对象的过程为垃圾回收,而需要程序员编写代码进行回收的方式为手动回收。 内存泄漏指的是不再使用的对象在系统中…...

vue3中使用svg并封装成组件

打包svg地图 安装插件 yarn add vite-plugin-svg-icons -D # or npm i vite-plugin-svg-icons -D # or pnpm install vite-plugin-svg-icons -D使用插件 vite.config.ts import { VantResolver } from unplugin-vue-components/resolvers import { createSvgIconsPlugin } from…...

实验六:DHCP、DNS、Apache、FTP服务器的安装和配置

1. (其它) 掌握Linux下DHCP、DNS、Apache、FTP服务器的安装和配置&#xff0c;在Linux服务器上部署JavaWeb应用 完成单元八的实训内容。 1、安装 JDK 2、安装 MySQL 3、部署JavaWeb应用 安装jdk 教程连接&#xff1a;linux安装jdk8详细步骤-CSDN博客 Jdk来源&#xff1a;linu…...

Python实验项目4 :面对对象程序设计

1&#xff1a;运行下面的程序&#xff0c;回答问题。 &#xff08;1&#xff09;说明程序的执行过程&#xff1b; &#xff08;2&#xff09;程序运行结果是什么&#xff1f; # &#xff08;1&#xff09;说明程序的执行过程&#xff1b; # &#xff08;2&#xff09;程序运行…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

LLM基础1_语言模型如何处理文本

基于GitHub项目&#xff1a;https://github.com/datawhalechina/llms-from-scratch-cn 工具介绍 tiktoken&#xff1a;OpenAI开发的专业"分词器" torch&#xff1a;Facebook开发的强力计算引擎&#xff0c;相当于超级计算器 理解词嵌入&#xff1a;给词语画"…...

HTML前端开发:JavaScript 常用事件详解

作为前端开发的核心&#xff0c;JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例&#xff1a; 1. onclick - 点击事件 当元素被单击时触发&#xff08;左键点击&#xff09; button.onclick function() {alert("按钮被点击了&#xff01;&…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心

当仓库学会“思考”&#xff0c;物流的终极形态正在诞生 想象这样的场景&#xff1a; 凌晨3点&#xff0c;某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径&#xff1b;AI视觉系统在0.1秒内扫描包裹信息&#xff1b;数字孪生平台正模拟次日峰值流量压力…...

ArcGIS Pro制作水平横向图例+多级标注

今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作&#xff1a;ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等&#xff08;ArcGIS出图图例8大技巧&#xff09;&#xff0c;那这次我们看看ArcGIS Pro如何更加快捷的操作。…...

Java线上CPU飙高问题排查全指南

一、引言 在Java应用的线上运行环境中&#xff0c;CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时&#xff0c;通常会导致应用响应缓慢&#xff0c;甚至服务不可用&#xff0c;严重影响用户体验和业务运行。因此&#xff0c;掌握一套科学有效的CPU飙高问题排查方法&…...

AI,如何重构理解、匹配与决策?

AI 时代&#xff0c;我们如何理解消费&#xff1f; 作者&#xff5c;王彬 封面&#xff5c;Unplash 人们通过信息理解世界。 曾几何时&#xff0c;PC 与移动互联网重塑了人们的购物路径&#xff1a;信息变得唾手可得&#xff0c;商品决策变得高度依赖内容。 但 AI 时代的来…...

如何更改默认 Crontab 编辑器 ?

在 Linux 领域中&#xff0c;crontab 是您可能经常遇到的一个术语。这个实用程序在类 unix 操作系统上可用&#xff0c;用于调度在预定义时间和间隔自动执行的任务。这对管理员和高级用户非常有益&#xff0c;允许他们自动执行各种系统任务。 编辑 Crontab 文件通常使用文本编…...

前端中slice和splic的区别

1. slice slice 用于从数组中提取一部分元素&#xff0c;返回一个新的数组。 特点&#xff1a; 不修改原数组&#xff1a;slice 不会改变原数组&#xff0c;而是返回一个新的数组。提取数组的部分&#xff1a;slice 会根据指定的开始索引和结束索引提取数组的一部分。不包含…...

抽象类和接口(全)

一、抽象类 1.概念&#xff1a;如果⼀个类中没有包含⾜够的信息来描绘⼀个具体的对象&#xff0c;这样的类就是抽象类。 像是没有实际⼯作的⽅法,我们可以把它设计成⼀个抽象⽅法&#xff0c;包含抽象⽅法的类我们称为抽象类。 2.语法 在Java中&#xff0c;⼀个类如果被 abs…...