当前位置: 首页 > news >正文

數據集成平台:datax將hive數據步到mysql(全部列和指定列)

數據集成平台:datax將hive數據步到mysql(全部列和指定列)

1.py腳本

傳入參數:

target_database:數據庫
target_table:表
target_columns:列
target_positions:hive列的下標(從0開始)

# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb# MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "xx"# HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "mycluster"
hdfs_nn_port = "8020"def get_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)def get_mysql_meta(database, table, columns):connection = get_connection()cursor = connection.cursor()if columns == 'all':# 如果传入 '*' 表示要所有列sql = "SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' ORDER BY ORDINAL_POSITION" % (database, table)else:# 传入指定列# 将每个列名加上单引号columns = ', '.join("'%s'" % col.strip() for col in columns.split(','))sql = "SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' AND COLUMN_NAME IN (%s) ORDER BY ORDINAL_POSITION" % (database, table, columns)cursor.execute(sql)fetchall = cursor.fetchall()# print(fetchall)cursor.close()connection.close()return fetchalldef get_mysql_columns(database, table, target_columns):return map(lambda x: x[0], get_mysql_meta(database, table, target_columns))def get_hive_columns(database, table, target_columns, target_positions):def type_mapping(mysql_type):mappings = {"bigint": "bigint","int": "bigint","smallint": "bigint","tinyint": "bigint","mediumint": "bigint","decimal": "string","double": "double","float": "float","binary": "string","char": "string","varchar": "string","datetime": "string","time": "string","timestamp": "string","date": "string","text": "string","bit": "string",}return mappings[mysql_type]meta = get_mysql_meta(database, table, target_columns)if target_columns == 'all':return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)else:positions = list(map(int, target_positions.split(',')))return map(lambda x, i: {"index": positions[i], "type": type_mapping(x[1].lower())}, meta, range(len(meta)))def generate_json(target_database, target_table, target_columns, target_positions):print(get_hive_columns(target_database, target_table, target_columns, target_positions))if target_columns == 'all':target_columns_hive = "[*]"else:target_columns_hive = get_hive_columns(target_database, target_table, target_columns, target_positions)job = {"job": {"setting": {"speed": {"channel": 15},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "hdfsreader","batchSize": "8192","batchByteSize": "33554432","parameter": {"path": "${exportdir}","defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,"column": target_columns_hive,"fileType": "orc","encoding": "UTF-8","fieldDelimiter": u"\u0001","nullFormat": "\\N"}},"writer": {"name": "mysqlwriter","batchSize": "8192","batchByteSize": "33554432","parameter": {"writeMode": "replace","username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(target_database, target_table, target_columns),"connection": [{"jdbcUrl":"jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + target_database + "?useUnicode=true&characterEncoding=utf-8&useSSL=false","table": [target_table]}]}}}]}
}output_path = "/opt/module/datax/job/export/" + target_databaseif not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f:json.dump(job, f)def main(args):target_database = ""target_table = ""target_columns = ""  # 默认为 None,表示没有指定列信息target_positions = ""options, arguments = getopt.getopt(args, 'p:d:t:c:', ['positions=', 'targetdb=', 'targettbl=', 'columns='])for opt_name, opt_value in options:if opt_name in ('-d', '--targetdb'):target_database = opt_valueif opt_name in ('-t', '--targettbl'):target_table = opt_valueif opt_name in ('-c', '--columns'):target_columns = opt_valueif opt_name in ('-p', '--positions'):target_positions = opt_valueprint(target_database, target_table, target_columns, target_positions)generate_json(target_database, target_table, target_columns, target_positions)if __name__ == '__main__':main(sys.argv[1:])

2.sh腳本

#!/bin/bash
python ~/bin/test.py -d db-t table -c all
#kunnr,name1,sort2,addrnumber,country,state -p 0,1,2,3,4,5
#all

相关文章:

數據集成平台:datax將hive數據步到mysql(全部列和指定列)

數據集成平台:datax將hive數據步到mysql(全部列和指定列) 1.py腳本 傳入參數: target_database:數據庫 target_table:表 target_columns:列 target_positions:hive列的下標&#x…...

pikachu靶场-File Inclusion

介绍: File Inclusion(文件包含漏洞)概述 文件包含,是一个功能。在各种开发语言中都提供了内置的文件包含函数,其可以使开发人员在一个代码文件中直接包含(引入)另外一个代码文件。 比如 在PHP中,提供了&…...

[今天跟AI聊聊职场] ~你能接受你的直接领导能力不如你,年纪还比你小很多吗?

知乎问题: 弟弟今年35岁,刚换了一份工作,直接领导小A比他小5岁,各方面经验没有他成熟。难的工作都是弟弟在做,功劳都被直接领导小A抢走了,有时候还要被直接领导小A打压。弟弟感觉升职加薪无望。现在找工作不…...

网络原理TCP之“三次握手“

TCP内核中的建立连接 众所周知,TCP是有连接的. 当我们在客户端敲出socket new Socket(serverIp,severPort)时,就在系统内核就在建立连接 真正建立连接是在系统内核中建立的,我们程序员只是调用相关的api. 在此处,我们把TCP的建立连接称为三次握手. 系统在内核建立连接时如上…...

990-03产品经理与程序员:什么是 IT 与业务协调以及为什么它很重要?

What is IT-business alignment and why is it important? 什么是IT-业务一致性?为什么它很重要? It’s more important than ever that IT and the business operate from the same playbook(剧本). So why do so many organizations struggle to ach…...

Java Web(七)__Tomcat(二)

Tomcat工作模式 Tomcat作为Servlet容器,有以下三种工作模式。 1)独立的Servlet容器,由Java虚拟机进程来运行 Tomcat作为独立的Web服务器来单独运行,Servlet容器组件作为Web服务器中的一部分而存在。这是Tomcat的默认工作模式。…...

【项目实战】帮美女老师做一个点名小程序(Python tkinter)

前言 博主有一个非常漂亮的老师朋友😍。最近,她急需一个能够实现随机点名的小程序,而博主正好擅长这方面的技术🤏。所以,今天博主决定为她制作一个专门用于点名的小程序💪。 博主在美女老师面前吹完牛皮之…...

Elasticsearch 去重后求和

标题的要求可以用如下 SQL 表示 select sum(column2) from (select distinct(column1),column2 from table)t 要如何用 DSL 实现呢,先准备下索引和数据 PUT test_index {"mappings": {"properties": {"column1": {"type"…...

考研数学——高数:函数与极限(3)

函数的连续性与间断点 函数的连续性 左连续 右连续 区间上的连续性 在xo处连续 函数的间断点 第一类间断点(左右极限都存在) 可去间断点: f(xo-0)= f(xo+0) 跳跃间断点: f(xo-0)≠ f(xo+0) 第二类间断点(震荡间断点、无穷间断点)...

LeetCode49 字母异位词分组

LeetCode49 字母异位词分组 在这篇博客中,我们将探讨 LeetCode 上的一道经典算法问题:字母异位词分组。这个问题要求将给定的字符串数组中的字母异位词组合在一起,并以任意顺序返回结果列表。 问题描述 给定一个字符串数组 strs&#xff0…...

【Python】Windows本地映射远程Linux服务器上的端口(解决jupyter notebook无法启动问题)

创作日志: 学习深度学习不想在本地破电脑上再安装各种软件,我就用实验室的服务器配置环境,启动jupyter notebook时脑子又瓦特了,在自己Windows电脑上打开服务器提供的网址,那肯定打不开啊,以前在其它电脑上…...

C++面试:用户态和内核态的基本概念、区别

目录 一、基本概念 概念: 区别: 二、Windows示例 基础介绍 用户态到内核态的切换过程: 程序实例 三、Linux示例 特权级别: 用户态到内核态的切换过程: 调度和中断处理: 程序实例 总结 在操作系…...

Vue计算属性computed()

1. 计算属性定义 获取计算属性值 <div>{{ 计算属性名称}}</div>创建计算属性 let 定义的属性ref/reactive....let 计算属性名称 computed(() > {//这里写函数式,函数式里面包含定义属性//只有这个包含的定义属性被修改时才出发此函数式//通过计算属性名称co…...

JWT学习笔记

了解 JWT Token 释义及使用 | Authing 文档 JSON Web Token Introduction - jwt.io JSON Web Token (JWT&#xff0c;RFC 7519 (opens new window))&#xff0c;是为了在网络应用环境间传递声明而执行的一种基于 JSON 的开放标准&#xff08;(RFC 7519)。该 token 被设计为紧凑…...

WSL里的Ubuntu 登录密码忘了怎么更改

环境&#xff1a; Win10 专业版 WSL2 如何 Ubuntu22.04 问题描述&#xff1a; WSL里的Ubuntu 登录密码忘了怎么更改 解决方案&#xff1a; 在WSL中的Ubuntu系统中&#xff0c;忘记了密码&#xff0c;可以通过以下步骤重置密码&#xff1a; 1.打开命令提示符或PowerShel…...

【软件测试面试】要你介绍项目-如何说?完美面试攻略...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、测试面试时&am…...

【Crypto | CTF】RSA打法 集合

天命&#xff1a;我发现题题不一样&#xff0c;已知跟求知的需求都不一样 题目一&#xff1a;已知 p q E &#xff0c;计算T&#xff0c;最后求D 已知两个质数p q 和 公钥E &#xff0c;通过p和q计算出欧拉函数T&#xff0c;最后求私钥D 【密码学 | CTF】BUUCTF RSA-CSDN…...

在springboot中调用openai Api并实现流式响应

之前在《在springboot项目中调用openai API及我遇到的问题》这篇博客中&#xff0c;我实现了在springboot中调用openai接口&#xff0c;但是在这里的返回的信息是一次性全部返回的&#xff0c;如果返回的文字比较多&#xff0c;我们可能需要等很久。 所以需要考虑将请求接口响应…...

C++构造函数重难点解析

一、C构造函数是什么 C的构造函数是一种特殊的成员函数&#xff0c;用于初始化类的对象。它具有与类相同的名称&#xff0c;并且没有返回类型。构造函数在创建对象时自动调用&#xff0c;并且可以执行必要的初始化操作。 二、C构造函数特点 类的构造函数不能被继承&#xff0c…...

QT day3 作业2.22

思维导图&#xff1a; 作业&#xff1a; 完善对话框&#xff0c;点击登录对话框&#xff0c;如果账号和密码匹配&#xff0c;则弹出信息对话框&#xff0c;给出提示”登录成功“&#xff0c;提供一个Ok按钮&#xff0c;用户点击Ok后&#xff0c;关闭登录界面&#xff0c;跳转到…...

[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解

突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 ​安全措施依赖问题​ GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)

0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述&#xff0c;后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作&#xff0c;其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...

【JVM】- 内存结构

引言 JVM&#xff1a;Java Virtual Machine 定义&#xff1a;Java虚拟机&#xff0c;Java二进制字节码的运行环境好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收的功能数组下标越界检查&#xff08;会抛异常&#xff0c;不会覆盖到其他代码…...

el-switch文字内置

el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...

1.3 VSCode安装与环境配置

进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件&#xff0c;然后打开终端&#xff0c;进入下载文件夹&#xff0c;键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

Ascend NPU上适配Step-Audio模型

1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统&#xff0c;支持多语言对话&#xff08;如 中文&#xff0c;英文&#xff0c;日语&#xff09;&#xff0c;语音情感&#xff08;如 开心&#xff0c;悲伤&#xff09;&#x…...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)

前言&#xff1a; 最近在做行为检测相关的模型&#xff0c;用的是时空图卷积网络&#xff08;STGCN&#xff09;&#xff0c;但原有kinetic-400数据集数据质量较低&#xff0c;需要进行细粒度的标注&#xff0c;同时粗略搜了下已有开源工具基本都集中于图像分割这块&#xff0c…...

Docker 本地安装 mysql 数据库

Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker &#xff1b;并安装。 基础操作不再赘述。 打开 macOS 终端&#xff0c;开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...

LINUX 69 FTP 客服管理系统 man 5 /etc/vsftpd/vsftpd.conf

FTP 客服管理系统 实现kefu123登录&#xff0c;不允许匿名访问&#xff0c;kefu只能访问/data/kefu目录&#xff0c;不能查看其他目录 创建账号密码 useradd kefu echo 123|passwd -stdin kefu [rootcode caozx26420]# echo 123|passwd --stdin kefu 更改用户 kefu 的密码…...