數據集成平台:datax將hive數據步到mysql(全部列和指定列)
數據集成平台:datax將hive數據步到mysql(全部列和指定列)
1.py腳本
傳入參數:
target_database:數據庫
target_table:表
target_columns:列
target_positions:hive列的下標(從0開始)
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb# MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "xx"# HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "mycluster"
hdfs_nn_port = "8020"def get_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)def get_mysql_meta(database, table, columns):connection = get_connection()cursor = connection.cursor()if columns == 'all':# 如果传入 '*' 表示要所有列sql = "SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' ORDER BY ORDINAL_POSITION" % (database, table)else:# 传入指定列# 将每个列名加上单引号columns = ', '.join("'%s'" % col.strip() for col in columns.split(','))sql = "SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' AND COLUMN_NAME IN (%s) ORDER BY ORDINAL_POSITION" % (database, table, columns)cursor.execute(sql)fetchall = cursor.fetchall()# print(fetchall)cursor.close()connection.close()return fetchalldef get_mysql_columns(database, table, target_columns):return map(lambda x: x[0], get_mysql_meta(database, table, target_columns))def get_hive_columns(database, table, target_columns, target_positions):def type_mapping(mysql_type):mappings = {"bigint": "bigint","int": "bigint","smallint": "bigint","tinyint": "bigint","mediumint": "bigint","decimal": "string","double": "double","float": "float","binary": "string","char": "string","varchar": "string","datetime": "string","time": "string","timestamp": "string","date": "string","text": "string","bit": "string",}return mappings[mysql_type]meta = get_mysql_meta(database, table, target_columns)if target_columns == 'all':return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)else:positions = list(map(int, target_positions.split(',')))return map(lambda x, i: {"index": positions[i], "type": type_mapping(x[1].lower())}, meta, range(len(meta)))def generate_json(target_database, target_table, target_columns, target_positions):print(get_hive_columns(target_database, target_table, target_columns, target_positions))if target_columns == 'all':target_columns_hive = "[*]"else:target_columns_hive = get_hive_columns(target_database, target_table, target_columns, target_positions)job = {"job": {"setting": {"speed": {"channel": 15},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "hdfsreader","batchSize": "8192","batchByteSize": "33554432","parameter": {"path": "${exportdir}","defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,"column": target_columns_hive,"fileType": "orc","encoding": "UTF-8","fieldDelimiter": u"\u0001","nullFormat": "\\N"}},"writer": {"name": "mysqlwriter","batchSize": "8192","batchByteSize": "33554432","parameter": {"writeMode": "replace","username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(target_database, target_table, target_columns),"connection": [{"jdbcUrl":"jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + target_database + "?useUnicode=true&characterEncoding=utf-8&useSSL=false","table": [target_table]}]}}}]}
}output_path = "/opt/module/datax/job/export/" + target_databaseif not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f:json.dump(job, f)def main(args):target_database = ""target_table = ""target_columns = "" # 默认为 None,表示没有指定列信息target_positions = ""options, arguments = getopt.getopt(args, 'p:d:t:c:', ['positions=', 'targetdb=', 'targettbl=', 'columns='])for opt_name, opt_value in options:if opt_name in ('-d', '--targetdb'):target_database = opt_valueif opt_name in ('-t', '--targettbl'):target_table = opt_valueif opt_name in ('-c', '--columns'):target_columns = opt_valueif opt_name in ('-p', '--positions'):target_positions = opt_valueprint(target_database, target_table, target_columns, target_positions)generate_json(target_database, target_table, target_columns, target_positions)if __name__ == '__main__':main(sys.argv[1:])
2.sh腳本
#!/bin/bash
python ~/bin/test.py -d db-t table -c all
#kunnr,name1,sort2,addrnumber,country,state -p 0,1,2,3,4,5
#all相关文章:
數據集成平台:datax將hive數據步到mysql(全部列和指定列)
數據集成平台:datax將hive數據步到mysql(全部列和指定列) 1.py腳本 傳入參數: target_database:數據庫 target_table:表 target_columns:列 target_positions:hive列的下標&#x…...
pikachu靶场-File Inclusion
介绍: File Inclusion(文件包含漏洞)概述 文件包含,是一个功能。在各种开发语言中都提供了内置的文件包含函数,其可以使开发人员在一个代码文件中直接包含(引入)另外一个代码文件。 比如 在PHP中,提供了&…...
[今天跟AI聊聊职场] ~你能接受你的直接领导能力不如你,年纪还比你小很多吗?
知乎问题: 弟弟今年35岁,刚换了一份工作,直接领导小A比他小5岁,各方面经验没有他成熟。难的工作都是弟弟在做,功劳都被直接领导小A抢走了,有时候还要被直接领导小A打压。弟弟感觉升职加薪无望。现在找工作不…...
网络原理TCP之“三次握手“
TCP内核中的建立连接 众所周知,TCP是有连接的. 当我们在客户端敲出socket new Socket(serverIp,severPort)时,就在系统内核就在建立连接 真正建立连接是在系统内核中建立的,我们程序员只是调用相关的api. 在此处,我们把TCP的建立连接称为三次握手. 系统在内核建立连接时如上…...
990-03产品经理与程序员:什么是 IT 与业务协调以及为什么它很重要?
What is IT-business alignment and why is it important? 什么是IT-业务一致性?为什么它很重要? It’s more important than ever that IT and the business operate from the same playbook(剧本). So why do so many organizations struggle to ach…...
Java Web(七)__Tomcat(二)
Tomcat工作模式 Tomcat作为Servlet容器,有以下三种工作模式。 1)独立的Servlet容器,由Java虚拟机进程来运行 Tomcat作为独立的Web服务器来单独运行,Servlet容器组件作为Web服务器中的一部分而存在。这是Tomcat的默认工作模式。…...
【项目实战】帮美女老师做一个点名小程序(Python tkinter)
前言 博主有一个非常漂亮的老师朋友😍。最近,她急需一个能够实现随机点名的小程序,而博主正好擅长这方面的技术🤏。所以,今天博主决定为她制作一个专门用于点名的小程序💪。 博主在美女老师面前吹完牛皮之…...
Elasticsearch 去重后求和
标题的要求可以用如下 SQL 表示 select sum(column2) from (select distinct(column1),column2 from table)t 要如何用 DSL 实现呢,先准备下索引和数据 PUT test_index {"mappings": {"properties": {"column1": {"type"…...
考研数学——高数:函数与极限(3)
函数的连续性与间断点 函数的连续性 左连续 右连续 区间上的连续性 在xo处连续 函数的间断点 第一类间断点(左右极限都存在) 可去间断点: f(xo-0)= f(xo+0) 跳跃间断点: f(xo-0)≠ f(xo+0) 第二类间断点(震荡间断点、无穷间断点)...
LeetCode49 字母异位词分组
LeetCode49 字母异位词分组 在这篇博客中,我们将探讨 LeetCode 上的一道经典算法问题:字母异位词分组。这个问题要求将给定的字符串数组中的字母异位词组合在一起,并以任意顺序返回结果列表。 问题描述 给定一个字符串数组 strs࿰…...
【Python】Windows本地映射远程Linux服务器上的端口(解决jupyter notebook无法启动问题)
创作日志: 学习深度学习不想在本地破电脑上再安装各种软件,我就用实验室的服务器配置环境,启动jupyter notebook时脑子又瓦特了,在自己Windows电脑上打开服务器提供的网址,那肯定打不开啊,以前在其它电脑上…...
C++面试:用户态和内核态的基本概念、区别
目录 一、基本概念 概念: 区别: 二、Windows示例 基础介绍 用户态到内核态的切换过程: 程序实例 三、Linux示例 特权级别: 用户态到内核态的切换过程: 调度和中断处理: 程序实例 总结 在操作系…...
Vue计算属性computed()
1. 计算属性定义 获取计算属性值 <div>{{ 计算属性名称}}</div>创建计算属性 let 定义的属性ref/reactive....let 计算属性名称 computed(() > {//这里写函数式,函数式里面包含定义属性//只有这个包含的定义属性被修改时才出发此函数式//通过计算属性名称co…...
JWT学习笔记
了解 JWT Token 释义及使用 | Authing 文档 JSON Web Token Introduction - jwt.io JSON Web Token (JWT,RFC 7519 (opens new window)),是为了在网络应用环境间传递声明而执行的一种基于 JSON 的开放标准((RFC 7519)。该 token 被设计为紧凑…...
WSL里的Ubuntu 登录密码忘了怎么更改
环境: Win10 专业版 WSL2 如何 Ubuntu22.04 问题描述: WSL里的Ubuntu 登录密码忘了怎么更改 解决方案: 在WSL中的Ubuntu系统中,忘记了密码,可以通过以下步骤重置密码: 1.打开命令提示符或PowerShel…...
【软件测试面试】要你介绍项目-如何说?完美面试攻略...
目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 1、测试面试时&am…...
【Crypto | CTF】RSA打法 集合
天命:我发现题题不一样,已知跟求知的需求都不一样 题目一:已知 p q E ,计算T,最后求D 已知两个质数p q 和 公钥E ,通过p和q计算出欧拉函数T,最后求私钥D 【密码学 | CTF】BUUCTF RSA-CSDN…...
在springboot中调用openai Api并实现流式响应
之前在《在springboot项目中调用openai API及我遇到的问题》这篇博客中,我实现了在springboot中调用openai接口,但是在这里的返回的信息是一次性全部返回的,如果返回的文字比较多,我们可能需要等很久。 所以需要考虑将请求接口响应…...
C++构造函数重难点解析
一、C构造函数是什么 C的构造函数是一种特殊的成员函数,用于初始化类的对象。它具有与类相同的名称,并且没有返回类型。构造函数在创建对象时自动调用,并且可以执行必要的初始化操作。 二、C构造函数特点 类的构造函数不能被继承,…...
QT day3 作业2.22
思维导图: 作业: 完善对话框,点击登录对话框,如果账号和密码匹配,则弹出信息对话框,给出提示”登录成功“,提供一个Ok按钮,用户点击Ok后,关闭登录界面,跳转到…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
【WiFi帧结构】
文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成:MAC头部frame bodyFCS,其中MAC是固定格式的,frame body是可变长度。 MAC头部有frame control,duration,address1,address2,addre…...
UE5 学习系列(三)创建和移动物体
这篇博客是该系列的第三篇,是在之前两篇博客的基础上展开,主要介绍如何在操作界面中创建和拖动物体,这篇博客跟随的视频链接如下: B 站视频:s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...
【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力
引言: 在人工智能快速发展的浪潮中,快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型(LLM)。该模型代表着该领域的重大突破,通过独特方式融合思考与非思考…...
视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
SpringTask-03.入门案例
一.入门案例 启动类: package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...
NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合
在汽车智能化的汹涌浪潮中,车辆不再仅仅是传统的交通工具,而是逐步演变为高度智能的移动终端。这一转变的核心支撑,来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒(T-Box)方案:NXP S32K146 与…...
