导入JDBC元数据到Apache Atlas
前言
前期实现了导入MySQL元数据到Apache Atlas, 由于是初步版本,且功能参照Atlas Hive Hook,实现的不够完美
本期对功能进行改进,实现了导入多种关系型数据库元数据到Apache Atlas
数据库schema与catalog
按照SQL标准的解释,在SQL环境下Catalog和Schema都属于抽象概念,可以把它们理解为一个容器或者数据库对象命名空间中的一个层次,主要用来解决命名冲突问题。从概念上说,一个数据库系统包含多个Catalog,每个Catalog又包含多个Schema,而每个Schema又包含多个数据库对象(表、视图、字段等),反过来讲一个数据库对象必然属于一个Schema,而该Schema又必然属于一个Catalog,这样我们就可以得到该数据库对象的完全限定名称,从而解决命名冲突的问题了;例如数据库对象表的完全限定名称就可以表示为:Catalog名称.Schema名称.表名称。这里还有一点需要注意的是,SQL标准并不要求每个数据库对象的完全限定名称是唯一的。
从实现的角度来看,各种数据库系统对Catalog和Schema的支持和实现方式千差万别,针对具体问题需要参考具体的产品说明书,比较简单而常用的实现方式是使用数据库名作为Catalog名,使用用户名作为Schema名,具体可参见下表:
表1 常用数据库
| 供应商 | Catalog支持 | Schema支持 |
|---|---|---|
| Oracle | 不支持 | Oracle User ID |
| MySQL | 不支持 | 数据库名 |
| MS SQL Server | 数据库名 | 对象属主名,2005版开始有变 |
| DB2 | 指定数据库对象时,Catalog部分省略 | Catalog属主名 |
| Sybase | 数据库名 | 数据库属主名 |
| Informix | 不支持 | 不需要 |
| PointBase | 不支持 | 数据库名 |
原文:https://www.cnblogs.com/ECNB/p/4611309.html
元数据模型层级抽象
不同的关系型数据库,其数据库模式有所区别,对应与下面的层级关系

- Datasource -> Catalog -> Schema -> Table -> Column
- Datasource -> Catalog -> Table -> Column
- Datasource -> Schema -> Table -> Column
元数据转换设计

提供元数据
借鉴Apache DolphinScheduler中获取Connection的方式,不多赘述。
public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);logger.info("Get connection from datasource {}", datasourceUniqueId);DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());if (null == dataSourceChannel) {throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getDescp()));}return dataSourceChannel.createDataSourceClient(baseConnectionParam, dbType);});return dataSourceClient.getConnection();}
转换元数据
- 元数据模型
创建数据库的元数据模型
private AtlasEntityDef createJdbcDatabaseDef() {AtlasEntityDef typeDef = createClassTypeDef(DatabaseProperties.JDBC_TYPE_DATABASE,Collections.singleton(DatabaseProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(DatabaseProperties.ATTR_URL, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_DRIVER_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_VERSION, "string"));typeDef.setServiceType(DatabaseProperties.ENTITY_SERVICE_TYPE);return typeDef;
}
创建数据库模式的元数据模型
private AtlasEntityDef createJdbcSchemaDef() {AtlasEntityDef typeDef = AtlasTypeUtil.createClassTypeDef(SchemaProperties.JDBC_TYPE_SCHEMA,Collections.singleton(SchemaProperties.ENTITY_TYPE_DATASET));typeDef.setServiceType(SchemaProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "tables");}});return typeDef;
}
创建数据库表的元数据模型
private AtlasEntityDef createJdbcTableDef() {AtlasEntityDef typeDef = createClassTypeDef(TableProperties.JDBC_TYPE_TABLE,Collections.singleton(TableProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(TableProperties.ATTR_TABLE_TYPE, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "columns");}});return typeDef;
}
创建数据库列的元数据模型
private AtlasEntityDef createJdbcColumnDef() {AtlasEntityDef typeDef = createClassTypeDef(ColumnProperties.JDBC_TYPE_COLUMN,Collections.singleton(ColumnProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_TYPE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_IS_PRIMARY_KEY, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_IS_NULLABLE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_DEFAULT_VALUE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_AUTO_INCREMENT, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);HashMap<String, String> options = new HashMap<>() {{put("schemaAttributes", "[\"name\", \"isPrimaryKey\", \"columnType\", \"isNullable\" , \"isAutoIncrement\", \"description\"]");}};typeDef.setOptions(options);return typeDef;
}
创建实体之间的关系模型
private List<AtlasRelationshipDef> createAtlasRelationshipDef() {String version = "1.0";// 数据库和模式的关系AtlasRelationshipDef databaseSchemasDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "schemas", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "database", SINGLE, false));databaseSchemasDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);AtlasRelationshipDef databaseTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_TABLES,BaseProperties.RELATIONSHIP_DATABASE_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "database", SINGLE, false));databaseTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 模式和数据表的关系// 注意 schema 已经被使用, 需要更换否则会冲突, 例如改为 Jschema(jdbc_schema)AtlasRelationshipDef schemaTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_SCHEMA_TABLES,BaseProperties.RELATIONSHIP_SCHEMA_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "Jschema", SINGLE, false));schemaTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 表和数据列的关系AtlasRelationshipDef tableColumnsDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_TABLE_COLUMNS,BaseProperties.RELATIONSHIP_TABLE_COLUMNS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "columns", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_COLUMN, "table", SINGLE, false));tableColumnsDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);return Arrays.asList(databaseSchemasDef, databaseTablesDef, schemaTablesDef, tableColumnsDef);
}
-
提取元数据
不再赘述
-
转换元数据
使用工厂模式,提供不同类型的元数据转换方式
public interface JdbcTransferFactory {JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client);boolean supportType(String type);String getName();
}
List ignorePatterns 用来过滤不想导入的数据库元数据,例如mysql的information_schema
public interface JdbcTransfer {void transfer();JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns);
}
举例:JdbcMysqlTransfer 和 MysqlTransferFactory
@AutoService(JdbcTransferFactory.class)
public class MysqlTransferFactory implements JdbcTransferFactory {public static final String MYSQL = "mysql";@Overridepublic JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {return new JdbcMysqlTransfer(metaData, client);}@Overridepublic boolean supportType(String type) {return MYSQL.equalsIgnoreCase(type);}@Overridepublic String getName() {return MYSQL;}
}
public class JdbcMysqlTransfer implements JdbcTransfer {private final Jdbc jdbc;private final AtlasService atlasService;private List<Pattern> ignorePatterns;public JdbcMysqlTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {this.jdbc = new Jdbc(new JdbcMetadata(metaData));this.atlasService = new AtlasService(client);this.ignorePatterns = Collections.emptyList();}@Overridepublic JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns) {this.ignorePatterns = ignorePatterns;return this;}private boolean tableIsNotIgnored(String tableName) {return ignorePatterns.stream().noneMatch(regex -> regex.matcher(tableName).matches());}@Overridepublic void transfer() {// 1.数据库实体转换DatabaseTransfer databaseTransfer = new DatabaseTransfer(atlasService);AtlasEntity databaseEntity = databaseTransfer.apply(jdbc);// 2.表实体转换String catalog = (String) databaseEntity.getAttribute(BaseProperties.ATTR_NAME);List<AtlasEntity> tableEntities = jdbc.getTables(catalog, catalog).parallelStream().filter(jdbcTable -> tableIsNotIgnored(jdbcTable.getTableName())).map(new TableTransfer(atlasService, databaseEntity)).toList();// 3.列转换for (AtlasEntity tableEntity : tableEntities) {String tableName = (String) tableEntity.getAttribute(BaseProperties.ATTR_NAME);List<JdbcPrimaryKey> primaryKeys = jdbc.getPrimaryKeys(catalog, tableName);jdbc.getColumns(catalog, catalog, tableName).parallelStream().forEach(new ColumnTransfer(atlasService, tableEntity, primaryKeys));}}}
- 元数据存入Atlas
public class DatabaseTransfer implements Function<Jdbc, AtlasEntity> {private final AtlasService atlasService;public DatabaseTransfer(AtlasService atlasService) {this.atlasService = atlasService;}@Overridepublic AtlasEntity apply(Jdbc jdbc) {String userName = jdbc.getUserName();String driverName = jdbc.getDriverName();String productName = jdbc.getDatabaseProductName();String productVersion = jdbc.getDatabaseProductVersion();String url = jdbc.getUrl();String urlWithNoParams = url.contains("?") ? url.substring(0, url.indexOf("?")) : url;String catalogName = urlWithNoParams.substring(urlWithNoParams.lastIndexOf("/") + 1);// 特殊处理 Oracleif (productName.equalsIgnoreCase("oracle")){catalogName = userName.toUpperCase();urlWithNoParams = urlWithNoParams + "/" + catalogName;}DatabaseProperties properties = new DatabaseProperties();properties.setQualifiedName(urlWithNoParams);properties.setDisplayName(catalogName);properties.setOwner(userName);properties.setUrl(url);properties.setDriverName(driverName);properties.setProductName(productName);properties.setProductVersion(productVersion);// 1.创建Atlas EntityAtlasEntity atlasEntity = new AtlasEntity(DatabaseProperties.JDBC_TYPE_DATABASE, properties.getAttributes());// 2.判断是否存在实体, 存在则填充GUIDMap<String, String> searchParam = Collections.singletonMap(DatabaseProperties.ATTR_QUALIFIED_NAME, urlWithNoParams);Optional<AtlasEntityHeader> entityHeader = atlasService.checkAtlasEntityExists(DatabaseProperties.JDBC_TYPE_DATABASE, searchParam);entityHeader.ifPresent(header -> atlasEntity.setGuid(header.getGuid()));// 3,存储或者更新到Atlas中if (entityHeader.isPresent()){atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));}else {AtlasEntityHeader header = atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));atlasEntity.setGuid(header.getGuid());}return atlasEntity;}
}
效果展示
- 元数据类型定义


- 测试导入元数据
由于mysql没有采用schema,因此jdbc_schema为空

如图所示,可以清晰的了解mysql数据库中demo数据库的数据表内容

数据表元数据,qualifiedName使用数据库连接url.表名

如同所示,数据表内各个列的元数据;可以清晰的了解该数据表的各个字段信息

相关文章:
导入JDBC元数据到Apache Atlas
前言 前期实现了导入MySQL元数据到Apache Atlas, 由于是初步版本,且功能参照Atlas Hive Hook,实现的不够完美 本期对功能进行改进,实现了导入多种关系型数据库元数据到Apache Atlas 数据库schema与catalog 按照SQL标准的解释,…...
大数据项目——基于Django/协同过滤算法的房源可视化分析推荐系统的设计与实现
大数据项目——基于Django/协同过滤算法的房源可视化分析推荐系统的设计与实现 技术栈:大数据爬虫/机器学习学习算法/数据分析与挖掘/大数据可视化/Django框架/Mysql数据库 本项目基于 Django框架开发的房屋可视化分析推荐系统。这个系统结合了大数据爬虫、机器学…...
[网鼎杯 2020 朱雀组]phpweb1
提示 call_user_func()函数先通过php内置函数来进行代码审计绕过system(##不止一种方法) 拿到题目养成一个好的习惯先抓个包 从抓到的包以及它首页的报错来看,这里死活会post传输两个参数func以及p func传输函数,而p则是传输参数的…...
深度学习之注意力机制
注意力机制与外部记忆 注意力机制与记忆增强网络是相辅相成的,神经网络去从内存中或者外部记忆中选出与当前输入相关的内容时需要注意力机制,而在注意力机制的很多应用场景中,我们的外部信息也可以看作是一个外部的记忆 这是一个阅读理解任务…...
WordPress:解决xmlrpc.php被扫描爆破的风险
使用WordPress的朋友都知道,一些【垃圾渣渣】会利用xmlrpc.php文件来进行攻击,绕过WP后台错误登录次数限制进行爆破。虽然密码复杂的极难爆破,但及其占用服务器资源。 方法一、利用宝塔防火墙(收费版) 一般可以直接使…...
Fiddler抓包模拟器(雷电模拟器)
Fiddler设置 List item 打开fiddler,的options 点击OK,重启fiddler 模拟器 更改网络设置 IP可以在电脑上终端上查看 然后在模拟器浏览器中输入IP:端口 安装证书...
RepidJson将内容写入文件
使用 RapidJSON 将内容写入文件的步骤如下: 创建一个 rapidjson::Document 对象,将需要写入文件的内容存储到其中。创建一个 rapidjson::StringBuffer 对象来保存 JSON 字符串。将 rapidjson::Document 对象转换为 JSON 字符串,并将其放入 r…...
Endnote使用教程
原由 最近要进行开题报告,要求不低于60文献的阅读与引用,单独插入引入我觉得是非常繁琐的事情,所以就借助Endnote这个工具,减少我们的工作量。 使用方法 第一步:先新建一个数据库,这样子可以在这个数据库…...
java中用Thead创建线程和用Runnable创建线程的区别是什么?
在 Java 中,创建线程的两种主要方式是通过继承 Thread 类和通过实现 Runnable 接口。下面是它们之间的主要区别: 1. 继承 Thread 类: class MyThread extends Thread {public void run() {// 线程执行的代码} }// 创建并启动线程 MyThread …...
0013Java程序设计-基于Vue的上课签到系统的设计与实现
文章目录 **摘 要**目录系统设计4.2学生签到4.3 签到信息列表4.4 用户信息管理5.1系统登录5.1.1 登录5.1.2 清除用户登记记录5.1.3 登录拦截 5.2用户管理5.2.2 用户添加5.2.3 用户编辑5.2.4 用户删除5.2.5 用户分页 5.3签到信息5.3.1签到信息列表 5.4学生签到5.4.1学生签到 开发…...
2.修改列名与列的数据类型
修改字段名与字段数据类型 1.修改字段名 有时,在我们建好一张表后会突然发现,哎呀!字段名貌似写错了!怎么办?要删了表再重新建一个新表吗?还是要删了这个字段再新建一个新的字段? 都不用&…...
[Firefly-Linux] RK3568 Ubuntu固件分区详解
RK为了方便开发与产品定制,自己定义了一套固件的分区,这些分区信息存放在parameter.txt文件中,Firefly参考这个文件定义了自己的Ubuntu分区,文件为parameter-ubuntu.txt,存放于Linux_SDK的device/rockchip/rk356x目录下…...
SpringBoot项目访问resources下的静态资源
1.新建一个配置文件夹,放配置类 2.编辑 WebMvcConfig.java package com.southwind.configuration;import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; import or…...
Qt之面试经验
1.恒生芸擎网络 技术没怎么问,一面问对方工作日常会涉及的一些东西(自动发布),二面公司流程,三面其他(没发offer) 2.光珀智能科技 涉及AI算法落地,问了点基础问题,比如…...
数据库基础概念与范式反范式总结
文章目录 一、基本概念1、属性2、元组3、关系4、超键5、候选键6、主键7、主属性8、外键9、函数依赖完全依赖 二、数据库范式1、第一范式(1NF)2、第二范式(2NF)3、第三范式(3NF)4、巴斯-科德范式(…...
tanstack/react-query使用手册
1. useQuery useQuery的使用一、data是后端成功返回的数据, 第一次的值为undefined 二、isLoading是指数据是否正在加载的状态,通常用于判断请求是否还在进行中。当isLoading为true时,表示数据正在加载中,当isLoading为false时&a…...
camera2对摄像头编码h264
MediaCodec编码摄像头数据 前置:保存的一些成员变量 // 摄像头开启的 handler private Handler cameraHandler; // Camera session 会话 handler private Handler sessionHandler; //这里是个Context都行 private AppCompatActivity mActivity; // 这个摄像头所有需…...
Apache solr XXE 漏洞(CVE-2017-12629)
任务一: 复现环境中的漏洞 任务二: 利用XXE漏洞发送HTTP请求,在VPS服务器端接受请求,或收到DNS记录 任务三: 利用XXE漏洞读取本地的/etc/passwd文件 1.搭建环境 2.开始看wp的时候没有看懂为什么是core,然…...
HTML代码混淆技术:原理、应用和实现方法详解
HTML代码混淆是一种常用的反爬虫技术,它可以有效地防止爬虫对网站数据的抓取。本文将详细介绍HTML代码混淆技术的原理、应用以及实现方法,帮助大家更好地了解和运用这一技术。 一、HTML代码混淆的原理 HTML代码混淆是指将HTML源码通过特定的算法进行加…...
quickapp_快应用_系统接口应用
系统接口 在项目中使用到的接口都需要在配置文件manifest.json中声明,不然会报如下警告 [WARN] 请在 manifest.json 文件里声明项目代码中用到的接口: system.storage, service.account, system.package, system.webview[1]检查某app是否在手机上安装 官方文档&a…...
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容
基于 UniApp + WebSocket实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...
Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)
目录 1.TCP的连接管理机制(1)三次握手①握手过程②对握手过程的理解 (2)四次挥手(3)握手和挥手的触发(4)状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...
家政维修平台实战20:权限设计
目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系,主要是分成几个表,用户表我们是记录用户的基础信息,包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题,不同的角色…...
macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用
文章目录 问题现象问题原因解决办法 问题现象 macOS启动台(Launchpad)多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显,都是Google家的办公全家桶。这些应用并不是通过独立安装的…...
DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI
前一阵子在百度 AI 开发者大会上,看到基于小智 AI DIY 玩具的演示,感觉有点意思,想着自己也来试试。 如果只是想烧录现成的固件,乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外,还提供了基于网页版的 ESP LA…...
拉力测试cuda pytorch 把 4070显卡拉满
import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试,通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小,增大可提高计算复杂度duration: 测试持续时间(秒&…...
自然语言处理——Transformer
自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效,它能挖掘数据中的时序信息以及语义信息,但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN,但是…...
第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...
是否存在路径(FIFOBB算法)
题目描述 一个具有 n 个顶点e条边的无向图,该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序,确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数,分别表示n 和 e 的值(1…...
