实现HBase表和RDB表的转化(附Java源码资源)
实现HBase表和RDB表的转化


一、引入
转化为HBase表的三大来源:RDB Table、Client API、Files

如何构造通用性的代码模板实现向HBase表的转换,是一个值得考虑的问题。这篇文章着重讲解RDB表向HBase表的转换。
首先,我们需要分别构造rdb和hbase的对象,根据批处理的思想,我们可以考虑批量将rdb中的数据导出,并且转化为List<Put>的格式,直接导入HBase表中,最后释放资源,伪代码模板如下:
rdb=...
hbase=...
rdb.init();
hbase.init();
while(rdb.hasNextBatch()){List<Put> batch = rdb.nextBatch();hbase.putBatch(batch);
}
hbase.close();
rdb.close();
二、代码讲解
1. 目录结构

2. 具体实现
- transfer.properties

内含HBase和RDB转换所有配置信息的配置文件,因为该配置文件是在启动时就需要进行配置,因此我们需要按以下图片进行配置导入配置文件:

- 在Run/Debug Configurations中,新建一个Application
- 配置好主类
- 配置好配置文件的具体路径
- RDB 接口
public interface RDB extends Com {// 要提升性能,需要使用批处理boolean hasNextBatch() throws SQLException;// 是否存在下一个批次List<Put> nextBatch() throws SQLException;// 一个put代表往一个hbase表的一行的一个列族的一个列插入一条数据,对Hbase来说,批次就是List<Put>
}
- RDB 实现类
public class RDBImpl implements RDB {private static Logger logger = Logger.getLogger(RDBImpl.class);// JDBC 的基本元素:连接对象(装载[驱动]、[URL]、[账号]、[密码])->执行对象(SQL语句)->结果集private Properties config;/*** 它们需要设置成全局变量的原因是它们需要共享*/private Connection con;private PreparedStatement pst;private ResultSet rst;// 定义每个批次处理的记录数的最大数量private int batchSize;// hbase的行键对应rdb的列的列名private String hbaseRowKeyRdbCol;private Map<String,Map<String,String>> hbaseRdbColMapping;// RDB配置可以灵活地从外部传入(构造方法),从内部读取(config())public RDBImpl(Properties config) {this.config = config;}@Overridepublic Properties config() {return config;}/*** 内部资源初始化*/@Overridepublic void init() throws Exception{con = getConnection();logger.info("RDB 创建 [ 连接 ] 对象成功");pst = getStatement(con);logger.info("RDB 创建 [ 执行 ] 对象成功");rst = getResult(pst);logger.info("RDB 创建 [ 结果集 ] 成功");batchSize = batchSize();hbaseRdbColMapping = hbaseRdbColumnsMapping();}@Overridepublic void close() {closeAll(rst,pst,con);}private String driver(){return checkAndGetConfig("rdb.driver");}private String url(){return checkAndGetConfig("rdb.url");}private String username(){return checkAndGetConfig("rdb.username");}private String password(){return checkAndGetConfig("rdb.password");}private String sql(){return checkAndGetConfig("rdb.sql");}private int batchSize(){return Integer.parseInt(checkAndGetConfig("rdb.batchSize"));}// java.sql下的Connectionprivate Connection getConnection() throws ClassNotFoundException, SQLException {// 装载驱动Class.forName(driver());// 获取并返回连接对象return DriverManager.getConnection(url(),username(),password());}private PreparedStatement getStatement(Connection con) throws SQLException {return con.prepareStatement(sql());}private ResultSet getResult(PreparedStatement statement) throws SQLException {return statement.executeQuery();}/*** hbase 列族和列与rdb中列的映射关系* hbase列族 hbase列 rdb列* @return Map<String,Map<String,String>>*/private Map<String, Map<String,String>> hbaseRdbColumnsMapping(){String mapping = checkAndGetConfig("rdb.hbase.columns.mapping");Map<String,Map<String,String>> map = new HashMap<>();String[] pss = mapping.split(",");for(String ps : pss){String[] pp = ps.split("->");String[] p = pp[0].split(":");String rdbCol = pp[1],hbaseColFamily,hbaseColName;if(p.length==1){hbaseRowKeyRdbCol = pp[1];}else {hbaseColFamily = p[0];hbaseColName = p[1];if(!map.containsKey(hbaseColFamily)){map.put(hbaseColFamily,new HashMap<>());}map.get(hbaseColFamily).put(hbaseColName,rdbCol);}}return map;}/*** 将RDB的列转化为字节数组(需要确定列的数据类型)* @param rdbColumn* @return* @throws SQLException*/private byte[] toBytesFromRdb(String rdbColumn) throws SQLException {Object obj = rst.getObject(rdbColumn);if(obj instanceof String){return Bytes.toBytes((String)obj);} else if(obj instanceof Float){return Bytes.toBytes(((Float)obj).floatValue());} else if(obj instanceof Double){return Bytes.toBytes(((Double)obj).doubleValue());} else if(obj instanceof BigDecimal){return Bytes.toBytes((BigDecimal)obj);} else if(obj instanceof Short){return Bytes.toBytes(((Short) obj).shortValue());} else if(obj instanceof Integer){return Bytes.toBytes(((Integer)obj).intValue());} else if(obj instanceof Boolean){return Bytes.toBytes((Boolean)((Boolean) obj).booleanValue());} else {throw new SQLException("HBase不支持转化为字节数组的类型:"+obj.getClass().getName());}}/*** 将HBase的列名或列族名转化为字节数组* @param name* @return*/private byte[] toBytes(String name){return Bytes.toBytes(name);}// 最后一个批次的数据最少有一条@Overridepublic boolean hasNextBatch() throws SQLException{return rst.next();}@Overridepublic List<Put> nextBatch() throws SQLException{// 预先分配容量List<Put> list = new ArrayList<>(batchSize);int count = 0;do{/*** 如何将一行解析为多个put(结合配置文件)* 对每条数据,创建一个带行键的put,向put中放入HBase列族名,HBase列名,RDB列名*/Put put = new Put(toBytesFromRdb(hbaseRowKeyRdbCol));for (Map.Entry<String, Map<String, String>> e : hbaseRdbColMapping.entrySet()) {String columnFamily = e.getKey();for (Map.Entry<String, String> s : e.getValue().entrySet()) {String hbaseColumn = s.getKey();String rdbColumn = s.getValue();// 需要将内容转变为字节数组传入方法put.addColumn(toBytes(columnFamily),toBytes(hbaseColumn),toBytesFromRdb(rdbColumn));}}list.add(put);}while(++count<batchSize && rst.next());return list;}}
如何理解一行转化为多个put?

结果集的实质?

rst.next() 的两个作用
rst.next();
// 1.判定是否存在下一个有效行
// 2.若存在下一个有效行,则指向该有效行
a. 只通过config作为参数构造rdb
b. 以JDBC为核心,需要连接对象(驱动,URL,账号,密码)=>执行对象(SQL)=>结果集,这些都需要被设计为全局变量(因为需要被共享)
c. 既实现了RDB接口,还实现了RDB的继承接口Com中的init()、close()进行资源的初始化和释放,checkAndGetConfig()根据传入的配置文件获取配置信息并且赋值给全局变量。
d. 重点:我们还需要对RDB和HBase的映射关系进行解析,最终解析出RDB列名,HBase列族名,HBase列名,具体如何解析参考配置文件transfer.properties,并将解析出来的名字构造成一个Put对象,由于构造Put对象只能放字节数组,所以需要转化为字节数组的方法,又因为解析RDB的列名需要考虑列的数据类型,而解析HBase的列族或列名不需要考虑,因此需要有两个转换方法==ToBytesFromRDB()和ToBytes()==分别实现两种情况的字节数组转化。
- HBase接口
public interface HBase extends Com {// RDBImpl的nextBatch()返回的就是List<Put>,直接放入HBase表即可。void putBatch(List<Put> batch) throws IOException;
}
- HBase实现类
public class HBaseImpl implements HBase {private static Logger loggerHBase = Logger.getLogger(HBaseImpl.class);private Properties config;private Connection con;private Table hbaseTable;public HBaseImpl(Properties config) {this.config = config;}@Overridepublic Properties config() {return config;}@Overridepublic void init() throws Exception {con = getCon();loggerHBase.info("HBase 创建 [ 连接 ] 成功");hbaseTable = checkAndGetTable(con);loggerHBase.info("HBase 创建 [ 数据表 ] 成功");}@Overridepublic void close() {closeAll(hbaseTable,con);}private String tableName(){return checkAndGetConfig("hbase.table.name");}private String zkUrl(){return checkAndGetConfig("hbase.zk");}private Connection getCon() throws IOException {// hadoop.conf的configurationConfiguration config = HBaseConfiguration.create();config.set("hbase.zookeeper.quorum",zkUrl());return ConnectionFactory.createConnection(config);}private Table checkAndGetTable(Connection con) throws IOException {/*** Admin : HBase DDL*/Admin admin = con.getAdmin();TableName tableName = TableName.valueOf(tableName());// 通过tableName判定表是否存在if(!admin.tableExists(tableName)){throw new IOException("HBase表不存在异常:"+tableName);}/*** Table : HBase DML & DQL*/// 传入的参数可以是TableName tableName,ExecutorService pool(表操作可以并发)return con.getTable(tableName);}@Overridepublic void putBatch(List<Put> batch) throws IOException{hbaseTable.put(batch);}
}
HBase的实现类和RDB的实现类也非常类似:
先重写HBase接口中的方法和Com接口中的方法,发现往里放数据需要构造一个Table对象,而Table对象的构建需要一个连接对象和TableName,因此在构造了两个方法tableName()获取配置信息中的TableName(注意:此时的TableName是字符串类型),zkUrl()获取zk.url作为配置构造连接对象。
- Com接口
public interface Com {Logger logger = Logger.getLogger(Com.class);// 获取配置对象Properties config();// 初始化资源void init() throws Exception;// 释放资源void close();default String checkAndGetConfig(String key){if(!config().containsKey(key)){// 因为该方法可能被用于HBase和RDBthrow new RuntimeException("配置项缺失异常:"+key);}String item = config().getProperty(key);logger.info(String.format("获取配置项 %s : %s",key,item));return item;}default void closeAll(AutoCloseable...acs){for (AutoCloseable ac : acs) {if (Objects.nonNull(ac)) {try {ac.close();logger.info(String.format("释放 %s 成功",ac.getClass().getName()));} catch (Exception e) {logger.error("释放资源异常:"+e);}}}}
}
在Com接口中,设计了一些普通方法config()实现配置的导出,init()、close()资源的初始化和关闭;同样还设计了一些无需实现的默认方法便于实现init()和close()方法。这些方法适用于RDB和HBase的实现类。
- RDBToHBase接口
public interface RDBToHBase {// 创建一个RDB对象void setRDB(RDB rdb);// 创建一个HBase对象void setHBase(HBase hbase);// 进行数据的传输void startTransfer();
}
- RDBToHBase实现类
public class RDBToHBaseImpl implements RDBToHBase {// 日志显示private static Logger loggerRH = Logger.getLogger(RDBToHBaseImpl.class);private RDB rdb;private HBase hbase;@Overridepublic void setRDB(RDB rdb) {this.rdb = rdb;}@Overridepublic void setHBase(HBase hbase) {this.hbase = hbase;}@Overridepublic void startTransfer() {try {rdb.init();loggerRH.info("RDB 初始化成功");hbase.init();loggerRH.info("HBase 初始化成功");loggerRH.info("数据从 RDB 迁移至 HBase 开始...");int count = 0;while (rdb.hasNextBatch()) {final List<Put> batch = rdb.nextBatch();hbase.putBatch(batch);loggerRH.info(String.format("第 %d 批:%d 条数据插入成功",++count,batch.size()));}loggerRH.info("数据从 RDB 迁移至 HBase 结束...");} catch (Exception e){loggerRH.error("将 RDB 数据批量迁移至 HBase 异常",e);} finally{hbase.close();rdb.close();}}
}
- AppRDBToHBase 实现类
public class AppRDBToHBase
{private static Logger logger = Logger.getLogger(AppRDBToHBase.class);private static void start(String[] args){try {if (Objects.isNull(args) || args.length == 0 || Objects.isNull(args[0])) {throw new NullPointerException("配置文件路径空指针异常");}final String PATH = args[0];final File file = new File(PATH);if (!file.exists() || file.length() == 0 || !file.canRead()) {throw new IOException("配置文件不存在、不可读、空白");}Properties config = new Properties();// final String path = args[0];config.load(new FileReader(file));RDB rdb = new RDBImpl(config);HBase hBase = new HBaseImpl(config);RDBToHBase rdbToHBase = new RDBToHBaseImpl();rdbToHBase.setRDB(rdb);rdbToHBase.setHBase(hBase);rdbToHBase.startTransfer();}catch(Exception e){logger.error("配置异常",e);}}public static void main( String[] args ) {start(args);}
}
对于传入的配置文件路径,既要检查路径本身,也要检查路径代表的文件本身。
通过流的方式将文件进行配置,并且利用该配置构造RDB和HBase并进行数据的传输
其他:日志文件系统Log.4j的应用
- 准备:需要在Resources模块下配置log4j.properties文件
- 注意:
- 日志文件信息的输出方式有三种
logger.error()、logger.info()、logger.warn(),除了对错误信息进行输出之外,也要习惯于补充正常信息的输出,以增强代码的可读性。 - log.4j除了在控制台打印日志信息之外,还能在磁盘下的日志文件中打印日志信息,因此在导入log4j.properties文件之后需要修改日志文件的路径。
- 对于不同类或接口下的logger,需要注意进行名字的区分。
- 日志文件信息的输出方式有三种
相关文章:
实现HBase表和RDB表的转化(附Java源码资源)
实现HBase表和RDB表的转化 一、引入 转化为HBase表的三大来源:RDB Table、Client API、Files 如何构造通用性的代码模板实现向HBase表的转换,是一个值得考虑的问题。这篇文章着重讲解RDB表向HBase表的转换。 首先,我们需要分别构造rdb和hba…...
10:00面试,10:06就出来了,问的问题有点变态。。。
从小厂出来,没想到在另一家公司又寄了。 到这家公司开始上班,加班是每天必不可少的,看在钱给的比较多的份上,就不太计较了。没想到8月一纸通知,所有人不准加班,加班费不仅没有了,薪资还要降40%…...
【Python】: Django Web开发实战(详细教程)
Python Django全面介绍 Django是一个非常强大的Python Web开发框架,它以"快速开发"和"干净、实用的设计"为设计宗旨。本文将从Django的基本概念开始,逐渐引导大家理解如何使用Django构建复杂的web应用程序。 Django基本概念与原理…...
突破编程_C++_C++11新特性(tuple)
1 std::tuple 简介 1.1 std::tuple 概述 std::tuple 是一个固定大小的不同类型值的集合,可以看作 std::pair 的泛化,即 std::pair 是 std::tuple 的一个特例,其长度受限为 2。与 C# 中的 tuple 类似,但 std::tuple 的功能更为强…...
xss.pwnfunction(DOM型XSS)靶场
环境进入该网站 Challenges (pwnfunction.com) 第一关:Ma Spaghet! 源码: <!-- Challenge --> <h2 id"spaghet"></h2> <script>spaghet.innerHTML (new URL(location).searchParams.get(somebody) || "Somebo…...
安装 docker 和 jenkins
安装 docker #安装 软件包 docker yum install -y yum-utils device-mapper-persistent-data lvm2#设置 yum 源 yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo yum-config-manager --add-repo http://mirrors.aliyun.com/docker-c…...
jni入门学习 CMakeLists脚本
在 Android Studio 中使用 CMake 可以编译 C/C 代码,这为开发者提供了在 Android 应用中嵌入本地代码的能力。下面是关于在 Android Studio 中使用 CMake 编译的详细说明: 1. 创建 CMakeLists.txt 文件: 首先,你需要在项目的根目…...
如何在没有向量数据库的情况下使用知识图谱实现RAG
引言 传统上,为大型语言模型(LLMs)提供长期记忆通常涉及到使用检索增强生成(RAG)解决方案,其中向量数据库作为长期记忆的存储机制。然而,我们是否能在没有向量数据库的情况下达到相同效果呢&am…...
6.如何判断数据库搜索是否走索引?
判断是否使用索引搜索 索引在数据库中是一个不可或缺的存在,想让你的查询结果快准狠,还是需要索引的来帮忙,那么在mongo中如何判断搜索是不是走索引呢?通常使用执行计划(解释计划、Explain Plan)来查看查询…...
Java并发编程的性能优化方案中,哪些方法比较常用
在Java并发编程的性能优化方案中,以下是一些常用的方法: 线程池的使用: 线程池可以复用线程,避免频繁地创建和销毁线程,从而提高系统性能。常用的线程池有FixedThreadPool、CachedThreadPool等。根据任务特性选择合适…...
AcWing 2867. 回文日期(每日一题)
原题链接:2867. 回文日期 - AcWing题库 2020 年春节期间,有一个特殊的日期引起了大家的注意:2020 年 2 月 2 日。 因为如果将这个日期按 “yyyymmdd” 的格式写成一个 8 位数是 20200202,恰好是一个回文数。 我们称这样的日期是…...
学习笔记-华为IPD转型2020:3,IPD的实施
3. IPD的实施 1999 年开始的 IPD 转型是计划中的多个转型项目中的第一个(Liu,2015)。华为为此次转型成立了一个专门的团队,从大约20人开始,他们是华为第一产业的高层领导。董事会主席孙雅芳是这个团队的负责人。该团…...
2024腾龙杯web签到题-初识jwt(签到:这是一个登录页面)
什么是 jwt? 它是 JSON Web Token 的缩写,是一个开放标准,定义了一种紧凑的、自包含的方式,用于作为JSON对象在各方之间安全地传输信息,该信息可以被验证和信任,因为它是数字签名的。它就是一种认证机制,…...
Monaco Editor系列(一)启动项目与入门示例解析
前言:作为一名程序员,我们工作中的每一天都在与代码编辑器打交道,相信各位前端程序员对 VS Code 一定都不陌生,VS Code 可以为我们提供代码高亮、代码对比等等功能,让我们在开发的时候,不需要对着暗淡无光的…...
DNA存储技术原理是什么?
随着大数据和人工智能的发展,全球每天产生的数据量剧增,对存储设备的需求也随之增长,数据存储问题日益凸显。传统的硬盘驱动器(HDD)、磁带等冷存和深度归档存储占据数据中心存储的60-70%,由于它们的访问频率…...
多维时序 | Matlab实现VMD-CNN-GRU变分模态分解结合卷积神经网络门控循环单元多变量时间序列预测
多维时序 | Matlab实现VMD-CNN-GRU变分模态分解结合卷积神经网络门控循环单元多变量时间序列预测 目录 多维时序 | Matlab实现VMD-CNN-GRU变分模态分解结合卷积神经网络门控循环单元多变量时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab实现VMD-CN…...
基于springboot+vue的毕业论文管理系统
博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战,欢迎高校老师\讲师\同行交流合作 主要内容:毕业设计(Javaweb项目|小程序|Pyt…...
JavaWeb后端——分层解耦 IOC DI
分层/三层架构概述 三层架构:Controller、Service、Dao 解耦/IOC&DI概述 分层解耦 容器称为:IOC容器/Spring容器 IOC 容器中创建,管理的对象,称为:bean 对象 IOC&DI入门 实现 IOC&DI 需要的注解&#…...
短视频矩阵系统技术交付
短视频矩阵系统技术交付,短视频矩阵剪辑矩阵分发系统现在在来开发这个市场单个项目来说,目前基本上已经沉淀3年了,那么我们来就技术短视频矩阵剪辑系统开发来聊聊 短视频矩阵系统经过315大会以后,很多违规的技术开发肯定有筛选到了…...
Halcon 凹坑检测案例
* 使用元组的方法 ImageFile:[] ImageFile[0]:D:/Halcon/产品上的凹坑检测/1.bmp ImageFile[1]:D:/Halcon/产品上的凹坑检测/2.bmp for Index : 0 to |ImageFile|-1 by 1read_image (Image, ImageFile[Index])* 二值化threshold (Image, Region, 100, 255)* 连通性connection (…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...
微信小程序之bind和catch
这两个呢,都是绑定事件用的,具体使用有些小区别。 官方文档: 事件冒泡处理不同 bind:绑定的事件会向上冒泡,即触发当前组件的事件后,还会继续触发父组件的相同事件。例如,有一个子视图绑定了b…...
DeepSeek 赋能智慧能源:微电网优化调度的智能革新路径
目录 一、智慧能源微电网优化调度概述1.1 智慧能源微电网概念1.2 优化调度的重要性1.3 目前面临的挑战 二、DeepSeek 技术探秘2.1 DeepSeek 技术原理2.2 DeepSeek 独特优势2.3 DeepSeek 在 AI 领域地位 三、DeepSeek 在微电网优化调度中的应用剖析3.1 数据处理与分析3.2 预测与…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...
Mac软件卸载指南,简单易懂!
刚和Adobe分手,它却总在Library里给你写"回忆录"?卸载的Final Cut Pro像电子幽灵般阴魂不散?总是会有残留文件,别慌!这份Mac软件卸载指南,将用最硬核的方式教你"数字分手术"࿰…...
令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍
文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结: 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析: 实际业务去理解体会统一注…...
select、poll、epoll 与 Reactor 模式
在高并发网络编程领域,高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表,以及基于它们实现的 Reactor 模式,为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。 一、I…...
Rapidio门铃消息FIFO溢出机制
关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系,以下是深入解析: 门铃FIFO溢出的本质 在RapidIO系统中,门铃消息FIFO是硬件控制器内部的缓冲区,用于临时存储接收到的门铃消息(Doorbell Message)。…...
Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
「全栈技术解析」推客小程序系统开发:从架构设计到裂变增长的完整解决方案
在移动互联网营销竞争白热化的当下,推客小程序系统凭借其裂变传播、精准营销等特性,成为企业抢占市场的利器。本文将深度解析推客小程序系统开发的核心技术与实现路径,助力开发者打造具有市场竞争力的营销工具。 一、系统核心功能架构&…...
