DataX-Oracle新增writeMode支持update

目录

前言

第一步下载源码

第二步修改源码

1、Oraclewriter

2、WriterUtil

 2.1、修改getWriteTemplate方法

 2.2、新增onMergeIntoDoString与getStrings方法

3、CommonRdbmsWriter

 3.1、修改startWriteWithConnection

 3.2、修改doBatchInsert

 3.3、修改fillPreparedStatement

第三步打包

第四步脚本修改

修改后jar包地址 




前言

目前 DataX更新到datax_v202309版本还不能支持Oracle写入的update,只通过DataX只能修改源码。

原理:oracle 不支持类似 MySQL的 REPLACE INTO 和 INSERT … ON DUPLICATE KEY UPDATE,所以只支持 insert 配置项。要实现此功能,需要利用 Oracle 的 merge 语句,先来看下 merge 语法。

MERGE INTO [target-table] A USING [source-table sql] B 
ON([conditional expression] and [...]...) 
WHEN MATCHED THEN[UPDATE sql] 
WHEN NOT MATCHED THEN [INSERT sql]

第一步下载源码

 地址:datax_v202309。

第二步修改源码

一共修改3个文件

1、Oraclewriter

 

找到该代码直接注释掉就行。 

2、WriterUtil
 2.1、修改getWriteTemplate方法
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {boolean update = writeMode.trim().toLowerCase().startsWith("update");boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")|| writeMode.trim().toLowerCase().startsWith("replace")|| update;if (!isWriteModeLegal) {throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));}// && writeMode.trim().toLowerCase().startsWith("replace")String writeDataSqlTemplate;if (forceUseUpdate || update) {//update只在mysql下使用if (dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) {writeDataSqlTemplate = new StringBuilder().append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").append(onDuplicateKeyUpdateString(columnHolders)).toString();}//update在Oracle下使用else if (dataBaseType == DataBaseType.Oracle) {writeDataSqlTemplate = onMergeIntoDoString(writeMode, columnHolders, valueHolders) + "INSERT (" +StringUtils.join(columnHolders, ",") +") VALUES(" + StringUtils.join(valueHolders, ",") +")";}else {throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,String.format("当前数据库不支持 writeMode:%s 模式.", writeMode));}} else {//这里是保护,如果其他错误的使用了update,需要更换为replaceif (update) {writeMode = "replace";}writeDataSqlTemplate = new StringBuilder().append(writeMode).append(" INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").toString();}return writeDataSqlTemplate;}
 2.2、新增onMergeIntoDoString与getStrings方法

代码作用:对Oracle进行update的MERGE拼接

public static String onMergeIntoDoString(String merge, List<String> columnHolders, List<String> valueHolders) {String[] sArray = getStrings(merge);StringBuilder sb = new StringBuilder();sb.append("MERGE INTO %s A USING ( SELECT ");boolean first = true;boolean first1 = true;StringBuilder str = new StringBuilder();StringBuilder update = new StringBuilder();for (String columnHolder : columnHolders) {if (Arrays.asList(sArray).contains(columnHolder)) {if (!first) {sb.append(",");str.append(" AND ");} else {first = false;}str.append("TMP.").append(columnHolder);sb.append("?");str.append(" = ");sb.append(" AS ");str.append("A.").append(columnHolder);sb.append(columnHolder);}}for (String columnHolder : columnHolders) {if (!Arrays.asList(sArray).contains(columnHolder)) {if (!first1) {update.append(",");} else {first1 = false;}update.append(columnHolder);update.append(" = ");update.append("?");}}sb.append(" FROM DUAL ) TMP ON (");sb.append(str);sb.append(" ) WHEN MATCHED THEN UPDATE SET ");sb.append(update);sb.append(" WHEN NOT MATCHED THEN ");return sb.toString();}public static String[] getStrings(String merge) {merge = merge.replace("update", "");merge = merge.replace("(", "");merge = merge.replace(")", "");merge = merge.replace(" ", "");return merge.split(",");}
3、CommonRdbmsWriter
 3.1、修改startWriteWithConnection
        // 替换原先的代码块public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {this.taskPluginCollector = taskPluginCollector;List<String> columns = new LinkedList<>();if (this.dataBaseType == DataBaseType.Oracle && writeMode.trim().toLowerCase().startsWith("update") ) {String merge = this.writeMode;String[] sArray = WriterUtil.getStrings(merge);this.columns.forEach(column->{if (Arrays.asList(sArray).contains(column)) {columns.add(column);}});this.columns.forEach(column->{if (!Arrays.asList(sArray).contains(column)) {columns.add(column);}});}columns.addAll(this.columns);// 用于写入数据的时候的类型根据目的表字段类型转换this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(columns, ","));// 写数据库的SQL语句calcWriteRecordSql();List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);int bufferBytes = 0;try {Record record;while ((record = recordReceiver.getFromReader()) != null) {if (record.getColumnNumber() != this.columnNumber) {// 源头读取字段列数与目的表字段写入列数不相等,直接报错throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,String.format("列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",record.getColumnNumber(),this.columnNumber));}writeBuffer.add(record);bufferBytes += record.getMemorySize();if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {doBatchInsert(connection, writeBuffer);writeBuffer.clear();bufferBytes = 0;}}if (!writeBuffer.isEmpty()) {doBatchInsert(connection, writeBuffer);writeBuffer.clear();bufferBytes = 0;}} catch (Exception e) {throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);} finally {writeBuffer.clear();bufferBytes = 0;DBUtil.closeDBResources(null, null, connection);}}
 3.2、修改doBatchInsert
 protected void doBatchInsert(Connection connection, List<Record> buffer)throws SQLException{PreparedStatement preparedStatement = null;try {connection.setAutoCommit(false);preparedStatement = connection.prepareStatement(this.writeRecordSql);if (this.dataBaseType == DataBaseType.Oracle && !"insert".equalsIgnoreCase(this.writeMode)) {String merge = this.writeMode;String[] sArray = WriterUtil.getStrings(merge);for (Record record : buffer) {List<Column> recordOne = new ArrayList<>();for (int j = 0; j < this.columns.size(); j++) {if (Arrays.asList(sArray).contains(this.columns.get(j))) {recordOne.add(record.getColumn(j));}}for (int j = 0; j < this.columns.size(); j++) {if (!Arrays.asList(sArray).contains(this.columns.get(j))) {recordOne.add(record.getColumn(j));}}for (int j = 0; j < this.columns.size(); j++) {recordOne.add(record.getColumn(j));}for (int j = 0; j < recordOne.size(); j++) {record.setColumn(j, recordOne.get(j));}preparedStatement = fillPreparedStatement(preparedStatement, record);preparedStatement.addBatch();}}else {for (Record record : buffer) {preparedStatement = fillPreparedStatement(preparedStatement, record);preparedStatement.addBatch();}}preparedStatement.executeBatch();connection.commit();}catch (SQLException e) {LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为: {}", e.getMessage());connection.rollback();doOneInsert(connection, buffer);}catch (Exception e) {throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);}finally {DBUtil.closeDBResources(preparedStatement, null);}}
 3.3、修改fillPreparedStatement
  protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)throws SQLException{for (int i = 0; i < record.getColumnNumber(); i++) {int columnSqltype = this.resultSetMetaData.getMiddle().get(i);String typeName = this.resultSetMetaData.getRight().get(i);preparedStatement = fillPreparedStatementColumnType(preparedStatement, i,columnSqltype, typeName,record.getColumn(i));}return preparedStatement;}

第三步打包

1、只需要在idea里面打包修改的两个程序就可以

 2、打包成功后获取两个jar包

 3、将包替换到datax的插件里面

 将oraclewriter-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter

 将plugin-rdbms-util-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter\libs

第四步脚本修改

{"job": {"setting": {"speed": {"byte": 1048576},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "${r_username}","password": "${r_password}","connection": [{	   "querySql": ["SELECT f_year,f_code,f_name,f_order FROM tableName"],"jdbcUrl": ["${r_jdbcUrl}"]}]}},"writer": {"name": "oraclewriter","parameter": {"writeMode": "update(f_year,f_code)","username": "${w_username}","password": "${w_password}","column": ["f_year","f_code","f_name","f_order"],"session": [],"preSql": [],"connection": [{"jdbcUrl": "${w_jdbcUrl}","table": ["tableName"]}]}}		   }]}
}

参数 "writeMode": "update(f_year,f_code)" 里面f_year,f_code就是主键, 参数上不要加/"

update(\"f_year\",\"f_code\")这样是拼不上sql的,这个问题调试了好久才解决。

这时候运行就成功了

参考文章DataX 二次开发支持 Oracle 更新数据icon-default.png?t=N7T8https://blog.csdn.net/xch_yang/article/details/128250190?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-128250190-blog-106881907.235%5Ev43%5Epc_blog_bottom_relevance_base8&spm=1001.2101.3001.4242.1&utm_relevant_index=3Datax oracle 支持增量并且支持全量更新icon-default.png?t=N7T8https://blog.csdn.net/weixin_41250031/article/details/122615271?spm=1001.2101.3001.6650.5&utm_medium=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&utm_relevant_index=7

修改后jar包地址 

懒得修改可以直接下载两个jar替换到你们的datax对应目录。

https://download.csdn.net/download/qq_36802726/89046154icon-default.png?t=N7T8https://download.csdn.net/download/qq_36802726/89046154

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2905974.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

QT6实现音频输出方法

一.QT6音频调用及与QT5的区别 1.音频输入 QAudioSource代替QAudioInput类 QAudioSource类提供了一个接口&#xff0c;用于从音频输入设备接收音频数据。 Header: #include <QAudioSource> qmake: QT multimedia 2.音频输出 QAudioSink代替QAudioOutput类 QAudioSi…

硬件10、从网站获取封装

百度搜索IC封装网或者网址https://www.iclib.com/ 搜索想要的器件&#xff0c;直接下载他的原理图库和封装库

Spring Cloud+Spring Alibaba笔记

Spring CloudSpring Alibaba 文章目录 Spring CloudSpring AlibabaNacos服务发现配置中心 OpenFeign超时机制开启httpclient5重试机制开启日志 SeataSentinel流量控制熔断降级热点控制规则持久化集成 OpenFeign集成 Gateway MicrometerZipKinGateway路由断言过滤器 Nacos 服务…

【YOLOv5改进系列(6)】高效涨点----使用DAMO-YOLO中的Efficient RepGFPN模块替换yolov5中的Neck部分

文章目录 &#x1f680;&#x1f680;&#x1f680;前言一、1️⃣ 添加yolov5_GFPN.yaml文件二、2️⃣添加extra_modules.py代码三、3️⃣yolo.py文件添加内容3.1 &#x1f393; 添加CSPStage模块 四、4️⃣实验结果4.1 &#x1f393; 使用yolov5s.pt训练的结果对比4.2 ✨ 使用…

【Pytorch入门】小土堆PyTorch入门教程完整学习笔记(详细笔记并附练习代码 ipynb文件)

小土堆PyTorch入门教程笔记 最近在观看PyTorch深度学习快速入门教程&#xff08;绝对通俗易懂&#xff01;&#xff09;【小土堆】顺便做点笔记&#xff0c;方便回看&#xff0c;同时也希望记录的笔记能够帮助到更多在入门的小伙伴~ 【注】仅记录个人觉得重要的知识&#xff0c…

Java的静态代理与jdk动态代理

代理 我们经常利用代理进行解耦以及控制对实际对象的访问等工作。例如&#xff0c;我们可以通过代理对方法的调用进行更精细的控制&#xff08;例如加上日志、权限控制等&#xff09;&#xff0c;而无需修改实际对象的代码。代理的作用是无侵入式的给代码增加功能。有些事情是…

sql——对于行列转换相关的操作

目录 一、lead、lag 函数 二、wm_concat 函数 三、pivot 函数 四、判断函数 遇到需要进行行列转换的数据处理需求&#xff0c;以 oracle 自带的表作为例子复习一下&#xff1a; 一、lead、lag 函数 需要行列转换的表&#xff1a; select deptno,count(empno) emp_num from…

第十四届蓝桥杯JavaA组省赛真题 - 互质数的个数

解题思路&#xff1a; 快速幂 欧拉函数 快速幂比较常见于数据较大的取模场景&#xff0c;欧拉函数感觉还是有点抽象 注意&#xff1a; 取模的时候就不要简写了&#xff0c;例如&#xff1a;res res * a % mod;不要写成res * a % mod; import java.util.Scanner;public c…

算法之美:B+树原理、应用及Mysql索引底层原理剖析

B树的一种变种形式&#xff0c;B树上的叶子结点存储关键字以及相应记录的地址&#xff0c;同等存储空间下比B-Tree存储更多Key。非叶子节点不对关键字记录的指针进行保存&#xff0c;只进行数据索引 , 树的层级会更少 , 所有叶子节点都在同一层, 叶子节点的关键字从小到大有序排…

直流马达驱动芯片D6289ADA介绍

应用领域 适用于智能断路器&#xff08;家用或工业智能空开&#xff09;、新能源汽车充电枪锁、电动玩具、电磁门锁、自动阀门等的直流电机驱动。 功能介绍 D6289ADA是一款直流马达驱动芯片&#xff0c;它有两个逻辑输入端子用来控制电机前进、后退及制动。该电路具有良好的抗干…

如何解决 IntelliJ IDEA 中属性文件的编码问题

在使用 IntelliJ IDEA 进行开发过程中&#xff0c;我们经常会遇到属性文件&#xff08;.properties 文件&#xff09;的编码问题。如果属性文件的编码设置不正确&#xff0c;就会导致中文等特殊字符显示乱码。这是因为IntelliJ IDEA中默认的配置文件的编码格式是ISO-8859-1。 …

36-递归与迭代

36-1 用递归和迭代解决问题 1、求n的阶乘 公式&#xff1a; n!123...(n-1)n。用递归方式定义&#xff1a;0!1&#xff0c;n!(n-1)!n。 代码1&#xff1a; 我们先回忆一下之前用循环怎么实现的吧 非递归&#xff0c;也可称迭代&#xff1a; int main() {int n 0;scanf(&q…

精酿啤酒:酿造工艺的创新与实验探索

在啤酒酿造领域&#xff0c;创新与实验探索一直是推动品质提升和品类丰富的重要动力。Fendi Club啤酒作为一家注重品质和口感的品牌&#xff0c;在酿造工艺方面不断创新与尝试&#xff0c;为消费者带来更多与众不同的风味体验。 Fendi Club啤酒在原料选择方面不断进行创新与实验…

超级码科技股份携手品品香开数字茶业新范式,实现全产业链数智化闭环

品品香白茶创立于1992年&#xff0c;品牌创立的30多年间&#xff0c;品品香不断创新技术、精耕细作、推陈出新&#xff0c;在不同发展时期始终走在行业前沿&#xff0c;助推着白茶产业高质量发展。 2016年&#xff0c;品品香发挥茶产业龙头示范作用率先进行转型&#xff0c;联…

jmeter中参数加密

加密接口常用的方式有&#xff1a; MD5&#xff0c;SHA&#xff0c;HmacSHA RSA AES&#xff0c;DES&#xff0c;Base64 压测中有些参数需要进行加密&#xff0c;加密方式已接口文档为主。 MD5加密 比如MD5加密的接口文档&#xff1a; 请求URL&#xff1a;http://101.34.221…

免费VPS/云服务器整理汇总

随着互联网的普及和云计算技术的飞速发展&#xff0c;越来越多的人开始尝试使用VPS&#xff08;Virtual Private Server&#xff0c;虚拟专用服务器&#xff09;或者云服务器来部署自己的在线业务。本文将对免费VPS/云服务器进行整理汇总&#xff0c;助力大家轻松开启云计算之旅…

顺应互联网发展大潮流,红河农资招商火爆开启

顺应互联网发展大潮流&#xff0c;红河农资招商火爆开启 进入新世纪&#xff0c;生态农业建设成为了影响和改变农村、农业工作的重要领域。尤其是在互联网的快速发展之下&#xff0c;实现农业结构调整&#xff0c;推动互联网模式的发展&#xff0c;成为了当前生态农业发展的主流…

​python学习之变量类型​

print单纯输中的十种数据类型只需要用print()函数即可&#xff0c;()里面直接写变量名。 下面重点介绍print格式输出&#xff1a; 第一种方法&#xff1a;一个萝卜一个坑&#xff0c;下面的代码中&#xff0c;{0}、{1}、{2}分别表示j,i,j*i&#xff0c;单引号里面是输出格式。…

网络七层模型之表示层:理解网络通信的架构(六)

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

Python中模块

基本概念 **模块 module&#xff1a;**一般情况下&#xff0c;是一个以.py为后缀的文件 ①Python内置的模块&#xff08;标准库&#xff09;&#xff1b; ②第三方模块&#xff1b; ③自定义模块。 包 package&#xff1a; 当一个文件夹下有 init .py时&#xff0c;意为该文…