Scala 练习一 将Mysql表数据导入HBase
Scala 练习一 将Mysql表数据导入HBase
续第一篇:Java代码将Mysql表数据导入HBase表
源码仓库地址:https://gitee.com/leaf-domain/data-to-hbase
- 一、整体介绍
- 二、依赖
- 三、测试结果
- 四、源码
一、整体介绍

-
HBase特质连接HBase, 创建HBase执行对象
- 初始化配置信息:多条(hbase.zookeeper.quorum=>ip:2181)
Configuration conf = HBaseConfiguration.create()
conf.set(String, String) - 创建连接:多个连接(池化)
Connection con = ConnectionFactory.createConnection() - 创建数据表:表名: String
Table table = con.getTable(TableName)
def build(): HBase // 初始化配置信息 def initPool(): HBase // 初始化连接池 def finish(): Executor // 完成 返回执行对象 - 初始化配置信息:多条(hbase.zookeeper.quorum=>ip:2181)
-
Executor特质对HBase进行操作的方法: 包含如下函数
def exists(tableName: String): Boolean // 验证数据表是否存在 def create(tableName: String, columnFamilies: Seq[String]): Boolean // 创建数据表 def drop(tableName: String): Boolean // 删除数据表 def put(tableName: String, data: util.List[Put]): Boolean // 批量插入数据 -
Jdbc封装Jdbc封装
- 初始化连接
driver : com.mysql.cj.jdbc.Driver
参数:url, username, password
创建连接 - 初始化执行器
sql, parameters
创建执行器【初始化参数】 - 执行操作并返回【结果】
DML: 返回影响数据库表行数
DQL: 返回查询的数据集合
EX: 出现异常结果
- 初始化连接
-
MyHBase用于实现HBase和Executor特质 -
测试数据格式
mysql表
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0;DROP TABLE IF EXISTS `test_table_for_hbase`; CREATE TABLE `test_table_for_hbase` (`test_id` int NULL DEFAULT NULL,`test_name` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`test_age` int NULL DEFAULT NULL,`test_gender` varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`test_phone` varchar(11) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;INSERT INTO `test_table_for_hbase` VALUES (1, 'testName1', 26, 'male', '18011111112'); INSERT INTO `test_table_for_hbase` VALUES (2, 'testName2', 25, 'female', '18011111113'); INSERT INTO `test_table_for_hbase` VALUES (3, 'testName3', 27, 'male', '18011111114'); INSERT INTO `test_table_for_hbase` VALUES (4, 'testName4', 35, 'male', '18011111115'); -- .... 省略以下数据部分hbase表
# 创建表 库名:表名, 列族1, 列族2 create "hbase_test:tranfer_from_mysql","baseInfo","scoreInfo" truncate 'hbase_test:tranfer_from_mysql' # 清空hbase_test命名空间下的tranfer_from_mysql表 scan 'hbase_test:tranfer_from_mysql' # 查看表
二、依赖
<dependencies><!-- HBase 驱动 --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.3.5</version></dependency><!-- Hadoop --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>3.1.3</version></dependency><!-- mysql --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.33</version></dependency><!-- zookeeper --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.3</version></dependency>
</dependencies>
三、测试结果
终端有个日志的小警告(无伤大雅hh),输出为 true

查看hbase表,发现数据正常导入

四、源码
scala代码较简单这里直接上源码了,去除了部分注释,更多请去仓库下载
Executor
package hbase
import org.apache.hadoop.hbase.client.Put
import java.util
trait Executor {def exists(tableName: String): Booleandef create(tableName: String, columnFamilies: Seq[String]): Booleandef drop(tableName: String): Booleandef put(tableName: String, data: util.List[Put]): Boolean
}
HBase
package hbase
import org.apache.hadoop.hbase.client.Connection
trait HBase {protected var statusCode: Int = -1def build(): HBasecase class PoolCon(var available: Boolean, con: Connection) {def out = {available = falsethis}def in = available = true}def initPool(): HBasedef finish(): Executor
}
MyHBase
package hbase.implimport hbase.{Executor, HBase}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, Put, TableDescriptorBuilder}
import org.apache.hadoop.hbase.exceptions.HBaseException
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import java.util
import scala.collection.mutable.ArrayBufferclass MyHBase (conf: Map[String, String])(pooled: Boolean = false, poolSize: Int = 3) extends HBase{private lazy val config: Configuration = HBaseConfiguration.create()private lazy val pool: ArrayBuffer[PoolCon] = ArrayBuffer()override def build(): HBase = {if(statusCode == -1){conf.foreach(t => config.set(t._1, t._2))statusCode = 0this}else{throw new HBaseException("build() function must be invoked first")}}override def initPool(): HBase = {if(statusCode == 0){val POOL_SIZE = if (pooled) {if (poolSize <= 0) 3 else poolSize} else 1for (i <- 1 to POOL_SIZE) {pool.append(PoolCon(available = true, ConnectionFactory.createConnection(config)))}statusCode = 1this}else{throw new HBaseException("initPool() function must be invoked only after build()")}}override def finish(): Executor = {if (statusCode == 1) {statusCode = 2new Executor {override def exists(tableName: String): Boolean = {var pc: PoolCon = nulltry{pc = getConval exists = pc.con.getAdmin.tableExists(TableName.valueOf(tableName))pc.inexists}catch {case e: Exception => e.printStackTrace()false}finally {close(pc)}}override def create(tableName: String, columnFamilies: Seq[String]): Boolean = {if (exists(tableName)) {return false}var pc: PoolCon = nulltry {pc = getConval builder: TableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))columnFamilies.foreach(cf => builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf)))pc.con.getAdmin.createTable(builder.build())true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}override def drop(tableName: String): Boolean = {if(!exists(tableName)){return false}var pc: PoolCon = nulltry {pc = getConpc.con.getAdmin.deleteTable(TableName.valueOf(tableName))true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}override def put(tableName: String, data: util.List[Put]): Boolean = {if(!exists(tableName)){return false}var pc: PoolCon = nulltry {pc = getConpc.con.getTable(TableName.valueOf(tableName)).put(data)true} catch {case e: Exception => e.printStackTrace()false} finally {close(pc)}}}}else {throw new HBaseException("finish() function must be invoked only after initPool()")}}private def getCon = {val left: ArrayBuffer[PoolCon] = pool.filter(_.available)if (left.isEmpty) {throw new HBaseException("no available connection")}left.apply(0).out}private def close(con: PoolCon) = {if (null != con) {con.in}}
}object MyHBase{def apply(conf: Map[String, String])(poolSize: Int): MyHBase = new MyHBase(conf)(true, poolSize)
}
Jdbc
package mysql
import java.sql.{Connection, DriverManager, ResultSet, SQLException}
import java.util
object Jdbc {object Result extends Enumeration {val EX = Value(0) val DML = Value(1) val DQL = Value(2) }// 3种结果(异常,DML,DQL)封装case class ResThree(rst: Result.Value) {def to[T <: ResThree]: T = this.asInstanceOf[T]}class Ex(throwable: Throwable) extends ResThree(Result.EX)object Ex {def apply(throwable: Throwable): Ex = new Ex(throwable)}class Dml(affectedRows: Int) extends ResThree(Result.DML) {def update = affectedRows}object Dml {def apply(affectedRows: Int): Dml = new Dml(affectedRows)}class Dql(set: ResultSet) extends ResThree(Result.DQL) {def generate[T](f: ResultSet => T) = {val list: util.List[T] = new util.ArrayList()while (set.next()) {list.add(f(set))}list}}object Dql {def apply(set: ResultSet): Dql = new Dql(set)}// JDBC 函数封装def jdbc(url: String, user: String, password: String)(sql: String, params: Seq[Any] = null): ResThree = {def con() = {// 1.1 显式加载 JDBC 驱动程序(只需要一次)Class.forName("com.mysql.cj.jdbc.Driver")// 1.2 创建连接对象DriverManager.getConnection(url, user, password)}def pst(con: Connection) = {// 2.1 创建执行对象val pst = con.prepareStatement(sql)// 2.2 初始化 SQL 参数if (null != params && params.nonEmpty) {params.zipWithIndex.foreach(t => pst.setObject(t._2 + 1, t._1))}pst}try {val connect = con()val prepared = pst(connect)sql match {case sql if sql.matches("^(insert|INSERT|delete|DELETE|update|UPDATE) .*")=> Dml(prepared.executeUpdate())case sql if sql.matches("^(select|SELECT) .*")=> Dql(prepared.executeQuery())case _ => Ex(new SQLException(s"illegal sql command : $sql"))}} catch {case e: Exception => Ex(e)}}}
Test
import hbase.impl.MyHBase
import mysql.Jdbc._
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import java.utilobject Test {def main(args: Array[String]): Unit = {// 初始化MySQL JDBC操作函数val jdbcOpr: (String, Seq[Any]) => ResThree = jdbc(user = "root",url = "jdbc:mysql://localhost:3306/test_db_for_bigdata",password = "123456")// 执行SQL查询,并将结果封装在ResThree对象中val toEntity: ResThree = jdbcOpr("select * from test_table_for_hbase where test_id between ? and ?",Seq(2, 4))// 判断ResThree对象中的结果是否为异常if (toEntity.rst == Result.EX) {// 如果异常,执行异常结果处理toEntity.to[Ex]println("出现异常结果处理")} else {// 如果没有异常,将查询结果转换为HBase的Put对象列表val puts: util.List[Put] = toEntity.to[Dql].generate(rst => {// 创建一个Put对象,表示HBase中的一行val put = new Put(Bytes.toBytes(rst.getInt("test_id")), // row key设置为test_idSystem.currentTimeMillis() // 设置时间戳)// 向Put对象中添加列值// baseInfo是列族名,test_name、test_age、test_gender、test_phone是列名put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_name"),Bytes.toBytes(rst.getString("test_name")))put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_age"),Bytes.toBytes(rst.getString("test_age")) // 注意:这里假设test_age是字符串类型,但通常应为整数类型)put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_gender"),Bytes.toBytes(rst.getString("test_gender")))put.addColumn(Bytes.toBytes("baseInfo"), Bytes.toBytes("test_phone"),Bytes.toBytes(rst.getString("test_phone")))// 返回构建好的Put对象put})// 如果有数据需要插入HBaseif (puts.size() > 0) {// 初始化HBase连接池并执行Put操作val exe = MyHBase(Map("hbase.zookeeper.quorum" -> "single01:2181"))(1).build().initPool().finish()// 执行Put操作,并返回是否成功val bool = exe.put("hbase_test:tranfer_from_mysql", puts)// 打印操作结果println(bool)} else {// 如果没有数据需要插入println("查无数据")}}}
}
相关文章:
Scala 练习一 将Mysql表数据导入HBase
Scala 练习一 将Mysql表数据导入HBase 续第一篇:Java代码将Mysql表数据导入HBase表 源码仓库地址:https://gitee.com/leaf-domain/data-to-hbase 一、整体介绍二、依赖三、测试结果四、源码 一、整体介绍 HBase特质 连接HBase, 创建HBase执行对象 初始化…...
前端工程化:基于Vue.js 3.0的设计与实践
这里写目录标题 《前端工程化:基于Vue.js 3.0的设计与实践》书籍引言本书概述主要内容作者简介为什么选择这本书?结语 《前端工程化:基于Vue.js 3.0的设计与实践》书籍 够买连接—>https://item.jd.com/13952512.html 引言 在前端技术日…...
Linux☞进程控制
在终端执行命令时,Linux会建立进程,程序执行完,进程会被终止;Linux是一个多任务的OS,允许多个进程并发运行; Linxu中启动进程的两种途径: ①手动启动(前台进程(命令gedit)...后台进程(命令‘&’)) ②…...
mybatis离谱bug乱转类型
字符串传入的参数被转成了int: Param("online") String online<if test"online 0">and (heart_time is null or heart_time <![CDATA[ < ]]> UNIX_TIMESTAMP(SUBDATE(now(),INTERVAL 8 MINUTE)) )</if><if test"…...
视频监控管理平台LntonCVS视频汇聚平台充电桩视频监控应用方案
随着新能源汽车的广泛使用,公众对充电设施的安全性和可靠性日益重视。为了提高充电桩的安全管理和站点运营效率,LntonCVS公司推出了一套全面的新能源汽车充电桩视频监控与管理解决方案。 该方案通过安装高分辨率摄像头,对充电桩及其周边区域进…...
VS环境Python:深度探索与实用指南
VS环境Python:深度探索与实用指南 在编程领域,VS环境(Visual Studio环境)与Python的结合,为开发者们提供了一种强大而灵活的开发体验。这种结合不仅提升了开发效率,还增强了代码的可读性和可维护性。然而&…...
SpringBoot整合SpringSecurit(二)通过token进行访问
在文章:SpringBoot整合SpringSecurit(一)实现ajax的登录、退出、权限校验-CSDN博客 里面,使用的session的方式进行保存用户信息的,这一篇文章就是使用token的方式。 在其上进行的改造,可以先看SpringBoot…...
【算法训练 day50 打家劫舍、打家劫舍Ⅱ、打家劫舍Ⅲ】
目录 一、打家劫舍-LeetCode 198思路 二、打家劫舍Ⅱ-LeetCode 213思路 三.打家劫舍Ⅲ-LeeCode 337思路 一、打家劫舍-LeetCode 198 Leecode链接: leetcode 198 思路 dp数组含义为:当前数组范围下能偷到的最多的钱。递推公式为:dp[j] max(dp[j-2]nums[j],dp[j-1…...
YOLOv8改进 | 卷积模块 | 在主干网络中添加/替换蛇形卷积Dynamic Snake Convolution
💡💡💡本专栏所有程序均经过测试,可成功执行💡💡💡 蛇形动态卷积是一种新型的卷积操作,旨在提高对细长和弯曲的管状结构的特征提取能力。它通过自适应地调整卷积核的权重࿰…...
深入解析力扣183题:从不订购的客户(LEFT JOIN与子查询方法详解)
关注微信公众号 数据分析螺丝钉 免费领取价值万元的python/java/商业分析/数据结构与算法学习资料 在本篇文章中,我们将详细解读力扣第183题“从不订购的客户”。通过学习本篇文章,读者将掌握如何使用SQL语句来解决这一问题,并了解相关的复杂…...
牛客NC32 求平方根【简单 二分 Java/Go/C++】
题目 题目链接: https://www.nowcoder.com/practice/09fbfb16140b40499951f55113f2166c 思路 Java代码 import java.util.*;public class Solution {/*** 代码中的类名、方法名、参数名已经指定,请勿修改,直接返回方法规定的值即可*** para…...
王道408数据结构CH3_栈、队列
概述 3.栈、队列和数组 3.1 栈 3.1.1 基本操作 3.1.2 顺序栈 #define Maxsize 50typedef struct{ElemType data[Maxsize];int top; }SqStack;3.1.3 链式栈 typedef struct LinkNode{ElemType data;struct LinkNode *next; }*LiStack;3.2 队列 3.2.1 基本操作 3.2.2 顺序存储…...
Angular 由一个bug说起之六:字体预加载
浏览器在加载一个页面时,会解析网页中的html和css,并开始加载字体文件。字体文件可以通过css中的font-face规则指定,并使用url()函数指定字体文件的路径。 比如下面这样: css font-face {font-family: MyFont;src: url(path/to/font.woff2…...
并查集进阶版
过关代码如下 #define _CRT_SECURE_NO_WARNINGS #include<bits/stdc.h> #include<unordered_set> using namespace std;int n, m; vector<int> edg[400005]; int a[400005], be[400005]; // a的作用就是存放要摧毁 int k; int fa[400005]; int daan[400005]…...
贪心(不相交的开区间、区间选点、带前导的拼接最小数问题)
目录 1.简单贪心 2.区间贪心 不相交的开区间 1.如何删除? 2.如何比较大小 区间选点问题 3.拼接最小数 1.简单贪心 比如:给你一堆数,你来构成最大的几位数 2.区间贪心 不相交的开区间 思路: 首先,如果有两个…...
[力扣题解] 617. 合并二叉树
题目:617. 合并二叉树 思路 递归法遍历,随便一种遍历方式都可以,以前序遍历为例; 代码 class Solution { public:TreeNode* mergeTrees(TreeNode* root1, TreeNode* root2) {if(root1 NULL){return root2;}if(root2 NULL){r…...
kafka-消费者组(SpringBoot整合Kafka)
文章目录 1、消费者组1.1、使用 efak 创建 主题 my_topic1 并建立6个分区并给每个分区建立3个副本1.2、创建生产者发送消息1.3、application.yml配置1.4、创建消费者监听器1.5、创建SpringBoot启动类1.6、屏蔽 kafka debug 日志 logback.xml1.7、引入spring-kafka依赖1.8、消费…...
Redisson知识
使用Redission获取锁 RLock lock redisson.getLock("my-lock"); 一、Redisson使用不指定锁过期时间的方式加锁: lock.lock(); 特点: 1.使用Redisson加的锁,具有自动续期机制,如果业务运行时间较长,运行…...
0103__【C/C++ 单线程性能分析工具 Gprof】 GNU的C/C++ 性能分析工具 Gprof 使用全面指南
【C/C 单线程性能分析工具 Gprof】 GNU的C/C 性能分析工具 Gprof 使用全面指南-CSDN博客...
如何把几个pdf文件合成在一个pdf文件
PDF合并,作为一种常见的文件处理方式,无论是在学术研究、工作汇报还是日常生活中,都有着广泛的应用。本文将详细介绍PDF合并的多种方法,帮助读者轻松掌握这一技能。 打开 “轻云处理pdf官网” 的网站,然后上传pdf。 pd…...
自动驾驶人机交接:DMS与安全验证如何破解控制权转移困局
1. 自动驾驶人机交接的核心困境与行业分野最近几年,自动驾驶(AV)和高级驾驶辅助系统(ADAS)无疑是汽车科技领域最炙手可热的话题。无论是传统车企的“新四化”转型,还是科技公司的颠覆性入局,大家…...
上古卷轴5天际整合包下载最新全热门MOD整合(画质+人物+功能+场景全美化)下载分享
一、整合包基础概况 新手向懒人专属整合资源,适配电脑Windows系统。整合包集成多款热门优质MOD,无需玩家单独下载模组,整合包整体兼容性强,适配主流家用电脑,官方提前做好模组适配优化,规避多数模组冲突问…...
5G技术授权商业化的七大挑战与市场可行性深度解析
1. 项目概述:一次关于5G技术授权商业可行性的深度探讨最近在整理行业资料时,翻到一篇2019年EE Times上的旧文,标题挺抓人眼球,叫《授权华为5G技术可能是个坏主意的30个理由》。文章的核心是讨论当时华为创始人提出的一项设想&…...
告别60帧束缚:《原神》帧率解锁终极指南,轻松实现120帧流畅体验
告别60帧束缚:《原神》帧率解锁终极指南,轻松实现120帧流畅体验 【免费下载链接】genshin-fps-unlock unlocks the 60 fps cap 项目地址: https://gitcode.com/gh_mirrors/ge/genshin-fps-unlock 还在为《原神》60帧的限制而烦恼吗?想…...
告别并行接口:手把手教你用Stm32F4的SPI高效读取AD7606八通道数据
告别并行接口:手把手教你用Stm32F4的SPI高效读取AD7606八通道数据 在嵌入式系统设计中,AD7606作为一款高性能八通道16位ADC芯片,常被用于电力监测、工业控制等需要多通道高精度采样的场景。传统方案往往依赖其并行接口实现数据读取ÿ…...
从VGG到ResNet:手把手教你用PyTorch复现DeepLabV2的ASPP模块(附代码)
从VGG到ResNet:手把手教你用PyTorch复现DeepLabV2的ASPP模块(附代码) 在计算机视觉领域,语义分割一直是极具挑战性的任务之一。不同于简单的图像分类,语义分割需要在像素级别上对图像进行理解和标注,这要求…...
从ABL项目看激光武器发展:技术挑战、工程突破与未来转型
1. 项目背景与核心争议十几年前,当美国国防部(DoD)最终决定为YAL-1机载激光试验台(ABL)项目画上句号时,在军事与航空航天工程圈子里引发的讨论,远比一份简单的项目终止公告要复杂得多。这个项目…...
STM32F103软件模拟IIC驱动0.96寸OLED:从零搭建与界面交互优化
1. 硬件准备与接线指南 拿到STM32F103核心板和0.96寸OLED模块时,我第一反应是翻看引脚定义。这块4针OLED通常采用IIC接口,接线其实特别简单,只需要4根线:VCC、GND、SCL、SDA。但要注意供电电压,我刚开始用5V供电结果屏…...
5G接入与移动性管理(AMF):构建未来通信的基石
5G接入与移动性管理(AMF):构建未来通信的基石 在5G网络架构中,接入与移动性管理功能(AMF,Access and Mobility Management Function)扮演着至关重要的角色。作为核心网的关键组件之一࿰…...
ComfyUI-Manager终极指南:快速优化AI工作流性能的完整方案
ComfyUI-Manager终极指南:快速优化AI工作流性能的完整方案 【免费下载链接】ComfyUI-Manager ComfyUI-Manager is an extension designed to enhance the usability of ComfyUI. It offers management functions to install, remove, disable, and enable various c…...
