數據集成平台:datax將hive數據步到mysql(全部列和指定列)
數據集成平台:datax將hive數據步到mysql(全部列和指定列)
1.py腳本
傳入參數:
target_database:數據庫
target_table:表
target_columns:列
target_positions:hive列的下標(從0開始)
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb# MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "xx"# HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "mycluster"
hdfs_nn_port = "8020"def get_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)def get_mysql_meta(database, table, columns):connection = get_connection()cursor = connection.cursor()if columns == 'all':# 如果传入 '*' 表示要所有列sql = "SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' ORDER BY ORDINAL_POSITION" % (database, table)else:# 传入指定列# 将每个列名加上单引号columns = ', '.join("'%s'" % col.strip() for col in columns.split(','))sql = "SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' AND COLUMN_NAME IN (%s) ORDER BY ORDINAL_POSITION" % (database, table, columns)cursor.execute(sql)fetchall = cursor.fetchall()# print(fetchall)cursor.close()connection.close()return fetchalldef get_mysql_columns(database, table, target_columns):return map(lambda x: x[0], get_mysql_meta(database, table, target_columns))def get_hive_columns(database, table, target_columns, target_positions):def type_mapping(mysql_type):mappings = {"bigint": "bigint","int": "bigint","smallint": "bigint","tinyint": "bigint","mediumint": "bigint","decimal": "string","double": "double","float": "float","binary": "string","char": "string","varchar": "string","datetime": "string","time": "string","timestamp": "string","date": "string","text": "string","bit": "string",}return mappings[mysql_type]meta = get_mysql_meta(database, table, target_columns)if target_columns == 'all':return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)else:positions = list(map(int, target_positions.split(',')))return map(lambda x, i: {"index": positions[i], "type": type_mapping(x[1].lower())}, meta, range(len(meta)))def generate_json(target_database, target_table, target_columns, target_positions):print(get_hive_columns(target_database, target_table, target_columns, target_positions))if target_columns == 'all':target_columns_hive = "[*]"else:target_columns_hive = get_hive_columns(target_database, target_table, target_columns, target_positions)job = {"job": {"setting": {"speed": {"channel": 15},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "hdfsreader","batchSize": "8192","batchByteSize": "33554432","parameter": {"path": "${exportdir}","defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,"column": target_columns_hive,"fileType": "orc","encoding": "UTF-8","fieldDelimiter": u"\u0001","nullFormat": "\\N"}},"writer": {"name": "mysqlwriter","batchSize": "8192","batchByteSize": "33554432","parameter": {"writeMode": "replace","username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(target_database, target_table, target_columns),"connection": [{"jdbcUrl":"jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + target_database + "?useUnicode=true&characterEncoding=utf-8&useSSL=false","table": [target_table]}]}}}]}
}output_path = "/opt/module/datax/job/export/" + target_databaseif not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f:json.dump(job, f)def main(args):target_database = ""target_table = ""target_columns = "" # 默认为 None,表示没有指定列信息target_positions = ""options, arguments = getopt.getopt(args, 'p:d:t:c:', ['positions=', 'targetdb=', 'targettbl=', 'columns='])for opt_name, opt_value in options:if opt_name in ('-d', '--targetdb'):target_database = opt_valueif opt_name in ('-t', '--targettbl'):target_table = opt_valueif opt_name in ('-c', '--columns'):target_columns = opt_valueif opt_name in ('-p', '--positions'):target_positions = opt_valueprint(target_database, target_table, target_columns, target_positions)generate_json(target_database, target_table, target_columns, target_positions)if __name__ == '__main__':main(sys.argv[1:])
2.sh腳本
#!/bin/bash
python ~/bin/test.py -d db-t table -c all
#kunnr,name1,sort2,addrnumber,country,state -p 0,1,2,3,4,5
#all相关文章:
數據集成平台:datax將hive數據步到mysql(全部列和指定列)
數據集成平台:datax將hive數據步到mysql(全部列和指定列) 1.py腳本 傳入參數: target_database:數據庫 target_table:表 target_columns:列 target_positions:hive列的下標&#x…...
pikachu靶场-File Inclusion
介绍: File Inclusion(文件包含漏洞)概述 文件包含,是一个功能。在各种开发语言中都提供了内置的文件包含函数,其可以使开发人员在一个代码文件中直接包含(引入)另外一个代码文件。 比如 在PHP中,提供了&…...
[今天跟AI聊聊职场] ~你能接受你的直接领导能力不如你,年纪还比你小很多吗?
知乎问题: 弟弟今年35岁,刚换了一份工作,直接领导小A比他小5岁,各方面经验没有他成熟。难的工作都是弟弟在做,功劳都被直接领导小A抢走了,有时候还要被直接领导小A打压。弟弟感觉升职加薪无望。现在找工作不…...
网络原理TCP之“三次握手“
TCP内核中的建立连接 众所周知,TCP是有连接的. 当我们在客户端敲出socket new Socket(serverIp,severPort)时,就在系统内核就在建立连接 真正建立连接是在系统内核中建立的,我们程序员只是调用相关的api. 在此处,我们把TCP的建立连接称为三次握手. 系统在内核建立连接时如上…...
990-03产品经理与程序员:什么是 IT 与业务协调以及为什么它很重要?
What is IT-business alignment and why is it important? 什么是IT-业务一致性?为什么它很重要? It’s more important than ever that IT and the business operate from the same playbook(剧本). So why do so many organizations struggle to ach…...
Java Web(七)__Tomcat(二)
Tomcat工作模式 Tomcat作为Servlet容器,有以下三种工作模式。 1)独立的Servlet容器,由Java虚拟机进程来运行 Tomcat作为独立的Web服务器来单独运行,Servlet容器组件作为Web服务器中的一部分而存在。这是Tomcat的默认工作模式。…...
【项目实战】帮美女老师做一个点名小程序(Python tkinter)
前言 博主有一个非常漂亮的老师朋友😍。最近,她急需一个能够实现随机点名的小程序,而博主正好擅长这方面的技术🤏。所以,今天博主决定为她制作一个专门用于点名的小程序💪。 博主在美女老师面前吹完牛皮之…...
Elasticsearch 去重后求和
标题的要求可以用如下 SQL 表示 select sum(column2) from (select distinct(column1),column2 from table)t 要如何用 DSL 实现呢,先准备下索引和数据 PUT test_index {"mappings": {"properties": {"column1": {"type"…...
考研数学——高数:函数与极限(3)
函数的连续性与间断点 函数的连续性 左连续 右连续 区间上的连续性 在xo处连续 函数的间断点 第一类间断点(左右极限都存在) 可去间断点: f(xo-0)= f(xo+0) 跳跃间断点: f(xo-0)≠ f(xo+0) 第二类间断点(震荡间断点、无穷间断点)...
LeetCode49 字母异位词分组
LeetCode49 字母异位词分组 在这篇博客中,我们将探讨 LeetCode 上的一道经典算法问题:字母异位词分组。这个问题要求将给定的字符串数组中的字母异位词组合在一起,并以任意顺序返回结果列表。 问题描述 给定一个字符串数组 strs࿰…...
【Python】Windows本地映射远程Linux服务器上的端口(解决jupyter notebook无法启动问题)
创作日志: 学习深度学习不想在本地破电脑上再安装各种软件,我就用实验室的服务器配置环境,启动jupyter notebook时脑子又瓦特了,在自己Windows电脑上打开服务器提供的网址,那肯定打不开啊,以前在其它电脑上…...
C++面试:用户态和内核态的基本概念、区别
目录 一、基本概念 概念: 区别: 二、Windows示例 基础介绍 用户态到内核态的切换过程: 程序实例 三、Linux示例 特权级别: 用户态到内核态的切换过程: 调度和中断处理: 程序实例 总结 在操作系…...
Vue计算属性computed()
1. 计算属性定义 获取计算属性值 <div>{{ 计算属性名称}}</div>创建计算属性 let 定义的属性ref/reactive....let 计算属性名称 computed(() > {//这里写函数式,函数式里面包含定义属性//只有这个包含的定义属性被修改时才出发此函数式//通过计算属性名称co…...
JWT学习笔记
了解 JWT Token 释义及使用 | Authing 文档 JSON Web Token Introduction - jwt.io JSON Web Token (JWT,RFC 7519 (opens new window)),是为了在网络应用环境间传递声明而执行的一种基于 JSON 的开放标准((RFC 7519)。该 token 被设计为紧凑…...
WSL里的Ubuntu 登录密码忘了怎么更改
环境: Win10 专业版 WSL2 如何 Ubuntu22.04 问题描述: WSL里的Ubuntu 登录密码忘了怎么更改 解决方案: 在WSL中的Ubuntu系统中,忘记了密码,可以通过以下步骤重置密码: 1.打开命令提示符或PowerShel…...
【软件测试面试】要你介绍项目-如何说?完美面试攻略...
目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 1、测试面试时&am…...
【Crypto | CTF】RSA打法 集合
天命:我发现题题不一样,已知跟求知的需求都不一样 题目一:已知 p q E ,计算T,最后求D 已知两个质数p q 和 公钥E ,通过p和q计算出欧拉函数T,最后求私钥D 【密码学 | CTF】BUUCTF RSA-CSDN…...
在springboot中调用openai Api并实现流式响应
之前在《在springboot项目中调用openai API及我遇到的问题》这篇博客中,我实现了在springboot中调用openai接口,但是在这里的返回的信息是一次性全部返回的,如果返回的文字比较多,我们可能需要等很久。 所以需要考虑将请求接口响应…...
C++构造函数重难点解析
一、C构造函数是什么 C的构造函数是一种特殊的成员函数,用于初始化类的对象。它具有与类相同的名称,并且没有返回类型。构造函数在创建对象时自动调用,并且可以执行必要的初始化操作。 二、C构造函数特点 类的构造函数不能被继承,…...
QT day3 作业2.22
思维导图: 作业: 完善对话框,点击登录对话框,如果账号和密码匹配,则弹出信息对话框,给出提示”登录成功“,提供一个Ok按钮,用户点击Ok后,关闭登录界面,跳转到…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...
基于服务器使用 apt 安装、配置 Nginx
🧾 一、查看可安装的 Nginx 版本 首先,你可以运行以下命令查看可用版本: apt-cache madison nginx-core输出示例: nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...
大数据零基础学习day1之环境准备和大数据初步理解
学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 (1)设置网关 打开VMware虚拟机,点击编辑…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...
OpenLayers 分屏对比(地图联动)
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能,和卷帘图层不一样的是,分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...
Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理
引言 Bitmap(位图)是Android应用内存占用的“头号杀手”。一张1080P(1920x1080)的图片以ARGB_8888格式加载时,内存占用高达8MB(192010804字节)。据统计,超过60%的应用OOM崩溃与Bitm…...
Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
PAN/FPN
import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...
Linux nano命令的基本使用
参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时,显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...
