实现HBase表和RDB表的转化(附Java源码资源)

实现HBase表和RDB表的转化

在这里插入图片描述
在这里插入图片描述

一、引入

转化为HBase表的三大来源:RDB Table、Client API、Files

在这里插入图片描述
如何构造通用性的代码模板实现向HBase表的转换,是一个值得考虑的问题。这篇文章着重讲解RDB表向HBase表的转换。

首先,我们需要分别构造rdb和hbase的对象,根据批处理的思想,我们可以考虑批量将rdb中的数据导出,并且转化为List<Put>的格式,直接导入HBase表中,最后释放资源,伪代码模板如下:

rdb=...
hbase=...
rdb.init();
hbase.init();
while(rdb.hasNextBatch()){List<Put> batch = rdb.nextBatch();hbase.putBatch(batch);
}
hbase.close();
rdb.close();

二、代码讲解

1. 目录结构

在这里插入图片描述

2. 具体实现
  • transfer.properties
    在这里插入图片描述

内含HBase和RDB转换所有配置信息的配置文件,因为该配置文件是在启动时就需要进行配置,因此我们需要按以下图片进行配置导入配置文件:
在这里插入图片描述

  1. Run/Debug Configurations中,新建一个Application
  2. 配置好主类
  3. 配置好配置文件的具体路径
  • RDB 接口
public interface RDB extends Com {// 要提升性能,需要使用批处理boolean hasNextBatch() throws SQLException;// 是否存在下一个批次List<Put> nextBatch() throws SQLException;// 一个put代表往一个hbase表的一行的一个列族的一个列插入一条数据,对Hbase来说,批次就是List<Put>
}
  • RDB 实现类
public class RDBImpl implements RDB {private static Logger logger = Logger.getLogger(RDBImpl.class);// JDBC 的基本元素:连接对象(装载[驱动]、[URL]、[账号]、[密码])->执行对象(SQL语句)->结果集private Properties config;/*** 它们需要设置成全局变量的原因是它们需要共享*/private Connection con;private PreparedStatement pst;private ResultSet rst;// 定义每个批次处理的记录数的最大数量private int batchSize;// hbase的行键对应rdb的列的列名private String hbaseRowKeyRdbCol;private Map<String,Map<String,String>> hbaseRdbColMapping;// RDB配置可以灵活地从外部传入(构造方法),从内部读取(config())public RDBImpl(Properties config) {this.config = config;}@Overridepublic Properties config() {return config;}/*** 内部资源初始化*/@Overridepublic void init() throws Exception{con = getConnection();logger.info("RDB 创建 [ 连接 ] 对象成功");pst = getStatement(con);logger.info("RDB 创建 [ 执行 ] 对象成功");rst = getResult(pst);logger.info("RDB 创建 [ 结果集 ] 成功");batchSize = batchSize();hbaseRdbColMapping = hbaseRdbColumnsMapping();}@Overridepublic void close() {closeAll(rst,pst,con);}private String driver(){return checkAndGetConfig("rdb.driver");}private String url(){return checkAndGetConfig("rdb.url");}private String username(){return checkAndGetConfig("rdb.username");}private String password(){return checkAndGetConfig("rdb.password");}private String sql(){return checkAndGetConfig("rdb.sql");}private int batchSize(){return Integer.parseInt(checkAndGetConfig("rdb.batchSize"));}// java.sql下的Connectionprivate Connection getConnection() throws ClassNotFoundException, SQLException {// 装载驱动Class.forName(driver());// 获取并返回连接对象return DriverManager.getConnection(url(),username(),password());}private PreparedStatement getStatement(Connection con) throws SQLException {return con.prepareStatement(sql());}private ResultSet getResult(PreparedStatement statement) throws SQLException {return statement.executeQuery();}/*** hbase 列族和列与rdb中列的映射关系*             hbase列族   hbase列  rdb列* @return Map<String,Map<String,String>>*/private Map<String, Map<String,String>> hbaseRdbColumnsMapping(){String mapping = checkAndGetConfig("rdb.hbase.columns.mapping");Map<String,Map<String,String>> map = new HashMap<>();String[] pss = mapping.split(",");for(String ps : pss){String[] pp = ps.split("->");String[] p = pp[0].split(":");String rdbCol = pp[1],hbaseColFamily,hbaseColName;if(p.length==1){hbaseRowKeyRdbCol = pp[1];}else {hbaseColFamily = p[0];hbaseColName = p[1];if(!map.containsKey(hbaseColFamily)){map.put(hbaseColFamily,new HashMap<>());}map.get(hbaseColFamily).put(hbaseColName,rdbCol);}}return map;}/*** 将RDB的列转化为字节数组(需要确定列的数据类型)* @param rdbColumn* @return* @throws SQLException*/private byte[] toBytesFromRdb(String rdbColumn) throws SQLException {Object obj = rst.getObject(rdbColumn);if(obj instanceof String){return Bytes.toBytes((String)obj);} else if(obj instanceof Float){return Bytes.toBytes(((Float)obj).floatValue());} else if(obj instanceof Double){return Bytes.toBytes(((Double)obj).doubleValue());} else if(obj instanceof BigDecimal){return Bytes.toBytes((BigDecimal)obj);} else if(obj instanceof Short){return Bytes.toBytes(((Short) obj).shortValue());} else if(obj instanceof Integer){return Bytes.toBytes(((Integer)obj).intValue());} else if(obj instanceof Boolean){return Bytes.toBytes((Boolean)((Boolean) obj).booleanValue());} else {throw new SQLException("HBase不支持转化为字节数组的类型:"+obj.getClass().getName());}}/*** 将HBase的列名或列族名转化为字节数组* @param name* @return*/private byte[] toBytes(String name){return Bytes.toBytes(name);}// 最后一个批次的数据最少有一条@Overridepublic boolean hasNextBatch() throws SQLException{return rst.next();}@Overridepublic List<Put> nextBatch() throws SQLException{// 预先分配容量List<Put> list = new ArrayList<>(batchSize);int count = 0;do{/*** 如何将一行解析为多个put(结合配置文件)* 对每条数据,创建一个带行键的put,向put中放入HBase列族名,HBase列名,RDB列名*/Put put = new Put(toBytesFromRdb(hbaseRowKeyRdbCol));for (Map.Entry<String, Map<String, String>> e : hbaseRdbColMapping.entrySet()) {String columnFamily = e.getKey();for (Map.Entry<String, String> s : e.getValue().entrySet()) {String hbaseColumn = s.getKey();String rdbColumn = s.getValue();// 需要将内容转变为字节数组传入方法put.addColumn(toBytes(columnFamily),toBytes(hbaseColumn),toBytesFromRdb(rdbColumn));}}list.add(put);}while(++count<batchSize && rst.next());return list;}}

如何理解一行转化为多个put?
在这里插入图片描述
结果集的实质?
在这里插入图片描述
rst.next() 的两个作用

rst.next();
// 1.判定是否存在下一个有效行
// 2.若存在下一个有效行,则指向该有效行

a. 只通过config作为参数构造rdb
b. 以JDBC为核心,需要连接对象(驱动,URL,账号,密码)=>执行对象(SQL)=>结果集,这些都需要被设计为全局变量(因为需要被共享)
c. 既实现了RDB接口,还实现了RDB的继承接口Com中的init()、close()进行资源的初始化和释放,checkAndGetConfig()根据传入的配置文件获取配置信息并且赋值给全局变量。
d. 重点:我们还需要对RDB和HBase的映射关系进行解析,最终解析出RDB列名,HBase列族名,HBase列名,具体如何解析参考配置文件transfer.properties,并将解析出来的名字构造成一个Put对象,由于构造Put对象只能放字节数组,所以需要转化为字节数组的方法,又因为解析RDB的列名需要考虑列的数据类型,而解析HBase的列族或列名不需要考虑,因此需要有两个转换方法==ToBytesFromRDB()和ToBytes()==分别实现两种情况的字节数组转化。

  • HBase接口
public interface HBase extends Com {// RDBImpl的nextBatch()返回的就是List<Put>,直接放入HBase表即可。void putBatch(List<Put> batch) throws IOException;
}
  • HBase实现类
public class HBaseImpl implements HBase {private static Logger loggerHBase = Logger.getLogger(HBaseImpl.class);private Properties config;private Connection con;private Table hbaseTable;public HBaseImpl(Properties config) {this.config = config;}@Overridepublic Properties config() {return config;}@Overridepublic void init() throws Exception {con = getCon();loggerHBase.info("HBase 创建 [ 连接 ] 成功");hbaseTable = checkAndGetTable(con);loggerHBase.info("HBase 创建 [ 数据表 ] 成功");}@Overridepublic void close() {closeAll(hbaseTable,con);}private String tableName(){return checkAndGetConfig("hbase.table.name");}private String zkUrl(){return checkAndGetConfig("hbase.zk");}private Connection getCon() throws IOException {// hadoop.conf的configurationConfiguration config = HBaseConfiguration.create();config.set("hbase.zookeeper.quorum",zkUrl());return ConnectionFactory.createConnection(config);}private Table checkAndGetTable(Connection con) throws IOException {/*** Admin : HBase DDL*/Admin admin = con.getAdmin();TableName tableName = TableName.valueOf(tableName());// 通过tableName判定表是否存在if(!admin.tableExists(tableName)){throw new IOException("HBase表不存在异常:"+tableName);}/*** Table : HBase DML & DQL*/// 传入的参数可以是TableName tableName,ExecutorService pool(表操作可以并发)return con.getTable(tableName);}@Overridepublic void putBatch(List<Put> batch) throws IOException{hbaseTable.put(batch);}
}

HBase的实现类和RDB的实现类也非常类似:
先重写HBase接口中的方法和Com接口中的方法,发现往里放数据需要构造一个Table对象,而Table对象的构建需要一个连接对象和TableName,因此在构造了两个方法tableName()获取配置信息中的TableName(注意:此时的TableName是字符串类型),zkUrl()获取zk.url作为配置构造连接对象。

  • Com接口
public interface Com {Logger logger = Logger.getLogger(Com.class);// 获取配置对象Properties config();// 初始化资源void init() throws Exception;// 释放资源void close();default String checkAndGetConfig(String key){if(!config().containsKey(key)){// 因为该方法可能被用于HBase和RDBthrow new RuntimeException("配置项缺失异常:"+key);}String item = config().getProperty(key);logger.info(String.format("获取配置项 %s : %s",key,item));return item;}default void closeAll(AutoCloseable...acs){for (AutoCloseable ac : acs) {if (Objects.nonNull(ac)) {try {ac.close();logger.info(String.format("释放 %s 成功",ac.getClass().getName()));} catch (Exception e) {logger.error("释放资源异常:"+e);}}}}
}

在Com接口中,设计了一些普通方法config()实现配置的导出,init()、close()资源的初始化和关闭;同样还设计了一些无需实现的默认方法便于实现init()和close()方法。这些方法适用于RDB和HBase的实现类。

  • RDBToHBase接口
public interface RDBToHBase {// 创建一个RDB对象void setRDB(RDB rdb);// 创建一个HBase对象void setHBase(HBase hbase);// 进行数据的传输void startTransfer();
}
  • RDBToHBase实现类
public class RDBToHBaseImpl implements RDBToHBase {// 日志显示private static Logger loggerRH = Logger.getLogger(RDBToHBaseImpl.class);private RDB rdb;private HBase hbase;@Overridepublic void setRDB(RDB rdb) {this.rdb = rdb;}@Overridepublic void setHBase(HBase hbase) {this.hbase = hbase;}@Overridepublic void startTransfer() {try {rdb.init();loggerRH.info("RDB 初始化成功");hbase.init();loggerRH.info("HBase 初始化成功");loggerRH.info("数据从 RDB 迁移至 HBase 开始...");int count = 0;while (rdb.hasNextBatch()) {final List<Put> batch = rdb.nextBatch();hbase.putBatch(batch);loggerRH.info(String.format("第 %d 批:%d 条数据插入成功",++count,batch.size()));}loggerRH.info("数据从 RDB 迁移至 HBase 结束...");} catch (Exception e){loggerRH.error("将 RDB 数据批量迁移至 HBase 异常",e);} finally{hbase.close();rdb.close();}}
}
  • AppRDBToHBase 实现类
public class AppRDBToHBase
{private static Logger logger = Logger.getLogger(AppRDBToHBase.class);private static void start(String[] args){try {if (Objects.isNull(args) || args.length == 0 || Objects.isNull(args[0])) {throw new NullPointerException("配置文件路径空指针异常");}final String PATH = args[0];final File file = new File(PATH);if (!file.exists() || file.length() == 0 || !file.canRead()) {throw new IOException("配置文件不存在、不可读、空白");}Properties config = new Properties();// final String path = args[0];config.load(new FileReader(file));RDB rdb = new RDBImpl(config);HBase hBase = new HBaseImpl(config);RDBToHBase rdbToHBase = new RDBToHBaseImpl();rdbToHBase.setRDB(rdb);rdbToHBase.setHBase(hBase);rdbToHBase.startTransfer();}catch(Exception e){logger.error("配置异常",e);}}public static void main( String[] args ) {start(args);}
}

对于传入的配置文件路径,既要检查路径本身,也要检查路径代表的文件本身。
通过流的方式将文件进行配置,并且利用该配置构造RDB和HBase并进行数据的传输

其他:日志文件系统Log.4j的应用
  • 准备:需要在Resources模块下配置log4j.properties文件
  • 注意:
    • 日志文件信息的输出方式有三种logger.error()、logger.info()、logger.warn() ,除了对错误信息进行输出之外,也要习惯于补充正常信息的输出,以增强代码的可读性。
    • log.4j除了在控制台打印日志信息之外,还能在磁盘下的日志文件中打印日志信息,因此在导入log4j.properties文件之后需要修改日志文件的路径。
    • 对于不同类或接口下的logger,需要注意进行名字的区分。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2869303.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

瑞_Redis_短信登录(二)

文章目录 项目介绍1.1 项目准备1.2 基于Session实现登录流程1.2.1 发送短信验证码1.2.2 短信验证码登录、注册1.2.3 校验登录状态 1.3 实现发送短信验证码功能1.3.1 页面流程1.3.2 代码实现 1.41.51.6 &#x1f64a; 前言&#xff1a;本文章为瑞_系列专栏之《Redis》的实战篇的…

Redis和Mysql的数据一致性问题

在高并发的场景下&#xff0c;大量的请求直接访问Mysql很容易造成性能问题。所以我们都会用Redis来做数据的缓存&#xff0c;削减对数据库的请求的频率。 但是&#xff0c;Mysql和Redis是两种不同的数据库&#xff0c;如何保证不同数据库之间数据的一致性就非常关键了。 1、导…

Linux自动化任务管理以及常见定时命令示例

Linux以其强大的稳定性和灵活性成为了许多IT专业人士的首选。其中&#xff0c;自动化任务管理是Linux系统管理不可或缺的一部分&#xff0c;它能帮助系统管理员有效地管理系统任务&#xff0c;提高工作效率。定时任务&#xff0c;作为自动化任务管理的重要组成部分&#xff0c;…

Go——运算符,变量和常量,基本类型

一.运算符 Go语言内置的运算符有&#xff1a; 算术运算符 关系运算符 逻辑运算符 位运算符 赋值运算符 1.1 算术运算符 注意&#xff1a;(自增)和--(自减)在go语言中是单独的语句&#xff0c;并不是运算符。 1.2 关系运算符 1.3 逻辑运算符 1.4 位运算符 位运算符对整数在内存…

html5使用Websocket

html5使用Websocket 前言1、html5中的websocket2、创建一个 WebSocket 对象3、监听 WebSocket 连接事件4、监听 WebSocket 收到消息事件5、监听 WebSocket 关闭事件6、 监听 WebSocket 出错事件7、发送消息8、整体代码 前言 在即时通讯的交互方式中websocket是一个很使用的方式…

【八】【算法分析与设计】双指针(2)

11. 盛最多水的容器 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明&#xff1a;你不能…

react可视化编辑器 第三章 限制移动范围

代码 import React, {useState,DragEvent,useRef,useEffect,MouseEvent, } from react; // import { throttle } from lodash;interface Demo {id: number;x: number;y: number; }const App: React.FC () > {const [demos, setDemos] useState<Demo[]>([]);// let …

JMeter 面试题及答案整理,最新面试题

JMeter中如何进行性能测试的规划和设计&#xff1f; 进行JMeter性能测试的规划和设计主要遵循以下几个步骤&#xff1a; 1、确定测试目标&#xff1a; 明确性能测试的目的和目标&#xff0c;比如确定要测试的系统性能指标&#xff08;如响应时间、吞吐量、并发用户数等&#…

Linux第80步_使用“信号量”实现“互斥访问”共享资源

1、创建MySemaphoreLED目录 输入“cd /home/zgq/linux/Linux_Drivers/回车” 切换到“/home/zgq/linux/Linux_Drivers/”目录 输入“mkdir MySemaphoreLED回车”&#xff0c;创建“MySemaphoreLED”目录 输入“ls回车”查看“/home/zgq/linux/Linux_Drivers/”目录下的文件…

嵌入式硬件设计(一)|利用 NodeMCU-ESP8266 开发板和继电器结合APP“点灯•blinker”制作Wi-Fi智能开关(附有关硬件详细资料)

概述 本文主要讲述利用 NodeMCU-ESP8266 开发板和继电器通过手机 APP “ 点灯 • Blinker ” 制作一款能够由手机控制的WiFi 智能开关&#xff0c;从而实现智能物联。NodeMCU 是基于 Lua 的开源固件&#xff0c;ESP8266-NodeMCU是一个开源硬件开发板&#xff0c;支持WiFi功能&a…

redis瘦身版

高可用&#xff1a; 主从 哨兵&#xff1a;sentinel&#xff1a; 集群监控 消息通知 故障转移 配置中心 redis cluster &#xff1a;livu livechat中使用了 人家有槽slot 16384个呢 请求发送任意节点 该节点会将请求发送到正确节点上-相亲相爱 1.哈希的方式&#xff0c;将数据…

数字万用表 (Digital Multimeter)

数字万用表 [Digital Multimeter] 1. Product parameters2. 交流频率测量3. 面板介绍4. 背光屏References 1. Product parameters 2. 交流频率测量 在交流 750V 档处按 HOLD 键切换到市电频率 3. 面板介绍 4. 背光屏 ​ References [1] Yongqiang Cheng, https://yongqiang…

Leet code 91 解码方法

解题思路&#xff1a;动态规划 创建一个数组dp记录到达每个位置时候次数 解码时候要么在该位置单独解码 要么就是和前一个位置共同解码 第一步考虑 下标0位置能否单独解码 如果可以单独解码dp[0] 在0位置有一种解码方式 假如在下标1位置 dp[1]的结果是多少呢 然后再考虑…

Swift 面试题及答案整理,最新面试题

Swift 中如何实现单例模式&#xff1f; 在Swift中&#xff0c;单例模式的实现通常采用静态属性和私有初始化方法来确保一个类仅有一个实例。具体做法是&#xff1a;定义一个静态属性来存储这个单例实例&#xff0c;然后将类的初始化方法设为私有&#xff0c;以阻止外部通过构造…

maven工程,未被idea识别为maven工程怎么办?

示例&#xff1a;以下工程的pom文件图标不是一个蓝色的m&#xff0c;所以未被识别为maven工程。 解决办法&#xff1a;打开pom.xml文件—>右键—>add as maven project 问题解决&#xff1a;

第二门课:改善深层神经网络<超参数调试、正则化及优化>-超参数调试、Batch正则化和程序框架

文章目录 1 调试处理2 为超参数选择合适的范围3 超参数调试的实践4 归一化网络的激活函数5 将Batch Norm拟合进神经网络6 Batch Norm为什么会奏效&#xff1f;7 测试时的Batch Norm8 SoftMax回归9 训练一个SoftMax分类器10 深度学习框架11 TensorFlow 1 调试处理 需要调试的参…

Lua中文语言编程源码-第一节,更改llex.c词法分析器模块, 使Lua支持中文关键词。

源码已经更新在CSDN的码库里&#xff1a; git clone https://gitcode.com/funsion/CLua.git 在src文件夹下的llex.c&#xff0c;是Lua的词法分析器模块。 增加中文保留字标识符列表&#xff0c;保留英文保留字标识符列表。 搜索“ORDER RESERVED”&#xff0c;将原始代码 …

CSS学习(2)-盒子模型

1. CSS 长度单位 px &#xff1a;像素。em &#xff1a;相对元素 font-size 的倍数。rem &#xff1a;相对根字体大小&#xff0c;html标签就是根。% &#xff1a;相对父元素计算。 注意&#xff1a; CSS 中设置长度&#xff0c;必须加单位&#xff0c;否则样式无效&#xff…

鸿蒙Harmony应用开发—ArkTS声明式开发(容器组件:Row)

沿水平方向布局容器。 说明&#xff1a; 该组件从API Version 7开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 子组件 可以包含子组件。 接口 Row(value?:{space?: number | string }) 从API version 9开始&#xff0c;该接口支持在…

HTML5CSS3提高导读

HTML5CSS3提高导读 2024/2/20 HTML5 的新增特性主要是针对于以前的不足&#xff0c;增加了一些新的标签、新的表单和新的表单属性等。 这些新特性都有兼容性问题&#xff0c;基本是 IE9 以上版本的浏览器才支持&#xff0c;如果不考虑兼容性问题&#xff0c;可以大量使用这 …