Debedium如何忽略Oracle的purge命令
报错
截至目前3.0版本,Debezium的Oracle Connector并不支持purge table这个指令。
所以,在使用Debezium解析Oracle变更的时候,如果在源端执行了类似
purge table "$BIN…
的语句,就会导致Debezium罢工,日志里显示:
Mining session stopped due to error.io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'purge table "BIN$rvZfTNVaRv+3dgTgQzBLuw==$0";'
mismatched input 'table' expecting {<EOF>, '/', ';'}at io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:543) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportInputMismatch(DefaultErrorStrategy.java:327) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:139) ~[antlr4-runtime-4.10.1.jar:4.10.1]at io.debezium.ddl.parser.oracle.generated.PlSqlParser.sql_script(PlSqlParser.java:2211) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:74) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:32) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:76) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parse(OracleDdlParser.java:69) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:104) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:388) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.handleSchemaChange(AbstractLogMinerEventProcessor.java:1016) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:514) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:439) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:288) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:243) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:62) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:324) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:203) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:143) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[na:na]at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317) ~[na:na]at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.antlr.v4.runtime.InputMismatchException: nullat org.antlr.v4.runtime.DefaultErrorStrategy.sync(DefaultErrorStrategy.java:270) ~[antlr4-runtime-4.10.1.jar:4.10.1]at io.debezium.ddl.parser.oracle.generated.PlSqlParser.sql_script(PlSqlParser.java:2143) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]... 21 common frames omitted
而这个错误的处理方式如下:
io.debezium.pipeline.ErrorHandler : Producer failureio.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'purge table "BIN$rvZfTNVaRv+3dgTgQzBLuw==$0";'
mismatched input 'table' expecting {<EOF>, '/', ';'}at io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:543) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportInputMismatch(DefaultErrorStrategy.java:327) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:139) ~[antlr4-runtime-4.10.1.jar:4.10.1]at io.debezium.ddl.parser.oracle.generated.PlSqlParser.sql_script(PlSqlParser.java:2211) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:74) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]
……
其实就是整个流水线都停止,无法工作。
这个报错一大堆,还显示了一个
Please open a Jira issue with the statement 'purge table "BIN$rvZfTNVaRv+3dgTgQzBLuw==$0";'
mismatched input 'table' expecting {<EOF>, '/', ';'}
好像只能去项目主页去提issue然后坐等修复了。
忽略
但是,如果我们确保这个指令我们不需要解析的话,可以不可以直接忽略,而不是停止呢?
其实可以。即把选项schema.history.internal.skip.unparseable.ddl,默认值为false,开启就为true就可以了。
properties.setProperty("schema.history.internal.skip.unparseable.ddl", "true");
官方手册里面,认为这个值需要我们关注,确定可以忽略再使用。
A Boolean value that specifies whether the connector should ignore malformed or unknown database statements or stop processing so a human can fix the issue. The safe default is false. Skipping should be used only with care as it can lead to data loss or mangling when the binlog is being processed.
源码
虽然忽略了问题,但是我们也很好奇这个过程是如何发生的,所以追一下源码吧!
经过一番查找,发现在debezium-oracle-connector包的源码文件OracleSchemaChangeEventEmitter.class里,有解析schema的方法emitSchemaChangeEvent,方法定义为:
public void emitSchemaChangeEvent(SchemaChangeEventEmitter.Receiver receiver) throws InterruptedException { Table tableBefore = this.schema.tableFor(this.tableId); OracleDdlParser parser = this.schema.getDdlParser(); DdlChanges ddlChanges = parser.getDdlChanges(); try { ddlChanges.reset(); parser.setCurrentDatabase(this.sourceDatabaseName); parser.setCurrentSchema(this.objectOwner); parser.parse(this.ddlText, this.schema.getTables()); } catch (MultipleParsingExceptions | ParsingException e) { if (!this.schema.skipUnparseableDdlStatements()) { throw e; } LOGGER.warn("Ignoring unparsable DDL statement '{}':", this.ddlText, e); this.streamingMetrics.incrementWarningCount(); this.streamingMetrics.incrementSchemaChangeParseErrorCount(); }
可以看到,如果this.schema.skipUnparseableDdlStatements()为真,就只会打印一条警告日志,继续执行,不会抛出异常。
而这个skipUnparseableDdlStatements的定义,在debezium-core的HistorizedRelationalDatabaseSchema.class文件里:
public boolean skipUnparseableDdlStatements() { return this.historizedConnectorConfig.skipUnparseableDdlStatements();
}
可以看到,只是返回了historizedConnectorConfig的同名方法。而historizedConnectorConfig则是一个HistorizedRelationalDatabaseConnectorConfig。
跟进这个文件去,可以看到这个方法只是一个变量的返回:
public boolean skipUnparseableDdlStatements() { return this.skipUnparseableDDL;
}
而变量skipUnparseableDDL,则在构造函数里进行了设定:
protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass, Configuration config, Tables.TableFilter systemTablesFilter, Selectors.TableIdToStringMapper tableIdMapper, boolean useCatalogBeforeSchema, int defaultSnapshotFetchSize, ColumnFilterMode columnFilterMode, boolean multiPartitionMode) { super(config, systemTablesFilter, tableIdMapper, defaultSnapshotFetchSize, columnFilterMode, useCatalogBeforeSchema); this.useCatalogBeforeSchema = useCatalogBeforeSchema; this.connectorClass = connectorClass; this.multiPartitionMode = multiPartitionMode; this.ddlFilter = this.createDdlFilter(config); this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS); this.storeOnlyCapturedTablesDdl = config.getBoolean(STORE_ONLY_CAPTURED_TABLES_DDL); this.storeOnlyCapturedDatabasesDdl = config.getBoolean(STORE_ONLY_CAPTURED_DATABASES_DDL);
}
来自与config的SKIP_UNPARSEABLE_DDL_STATEMENTS参数的boolean值。
而SKIP_UNPARSEABLE_DDL_STATEMENTS的定义在这个类里面是一个静态初始化过程:
static { SCHEMA_HISTORY = Field.create("schema.history.internal").withDisplayName("Database schema history class").withType(Type.CLASS).withWidth(Width.LONG).withImportance(Importance.LOW).withInvisibleRecommender().withDescription("The name of the SchemaHistory class that should be used to store and recover database schema changes. The configuration properties for the history are prefixed with the 'schema.history.internal.' string.").withDefault("io.debezium.storage.kafka.history.KafkaSchemaHistory"); SKIP_UNPARSEABLE_DDL_STATEMENTS = SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS; STORE_ONLY_CAPTURED_TABLES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL; STORE_ONLY_CAPTURED_DATABASES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_DATABASES_DDL; CONFIG_DEFINITION = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit().history(new Field[]{SCHEMA_HISTORY, SKIP_UNPARSEABLE_DDL_STATEMENTS, STORE_ONLY_CAPTURED_TABLES_DDL, STORE_ONLY_CAPTURED_DATABASES_DDL}).create();
}
其实就是SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS,其中SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS的定义也在这个类的构造函数里,是:
public interface SchemaHistory { String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.internal."; Field NAME = Field.create("schema.history.internal.name").withDisplayName("Logical name for the database schema history").withType(Type.STRING).withWidth(Width.MEDIUM).withImportance(Importance.LOW).withDescription("The name used for the database schema history, perhaps differently by each implementation.").withValidation(new Field.Validator[]{Field::isOptional}); Field SKIP_UNPARSEABLE_DDL_STATEMENTS = Field.create("schema.history.internal.skip.unparseable.ddl").withDisplayName("Skip DDL statements that cannot be parsed").withType(Type.BOOLEAN).withWidth(Width.SHORT).withImportance(Importance.LOW).withDescription("Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes.").withDefault(false);
……
这一趟追下来,不得不说,这些参数真是包装(隐藏)得博大精深!
相关文章:
Debedium如何忽略Oracle的purge命令
报错 截至目前3.0版本,Debezium的Oracle Connector并不支持purge table这个指令。 所以,在使用Debezium解析Oracle变更的时候,如果在源端执行了类似 purge table "$BIN… 的语句,就会导致Debezium罢工,日志里显…...

PlantUML 语言
PlantUML 是一种开源工具,用于通过简单的文本描述生成 UML 图。它支持多种 UML 图类型,如类图、序列图、用例图、活动图、组件图、状态图等。PlantUML 语言非常简洁,采用类似编程语言的语法,允许用户使用文本定义模型,…...
linux的 .so和.ko文件分别是什么?主要区别是什么?
前言: .so和.ko文件的主要区别在于它们的应用层次和功能不同。 应用层次 .so文件:这是用户层的动态链接库(Shared Object),主要用于用户态的程序中。 它用于动态链接,多个程序可以共享同一个库文件&…...

XX服务器上的npm不知道咋突然坏了
收到同事的V,说是:182上的npm不知道咋突然坏了,查到这里了,不敢动了。 咱一定要抓重点:突然坏了。这里的突然肯定不是瞬间(大概率是上次可用,这次不可用,中间间隔了多长时间&#x…...

数据结构(优先级队列 :Priority Queue)
前言: 在计算机科学中,队列是一种非常常见的数据结构,它遵循先进先出(FIFO)的原则,也就是说,先进入队列的元素会先被处理。然而,在许多实际应用中,我们不仅仅需要按顺序…...
nginx.conf 请求时间部分参数说明新手教程
下面来说下nginx.conf 的部分参数,配置如下: http {include mime.types;default_type application/octet-stream;client_max_body_size 1000M;#log_format main $remote_addr - $remote_user [$time_local] "$request" # …...

【Linux-ubuntu通过USB传输程序点亮LED灯】
Linux-ubuntu通过USB传输程序点亮LED灯 一,初始化GPIO配置1.使能时钟2.其他寄存器配置 二,程序编译三,USB传输程序 一,初始化GPIO配置 1.使能时钟 使能就是一个控制信号,用于决定时钟信号是否能够有效的传递或者被使用,就像一个…...
《开源时间序列数据:探索与应用》
《开源时间序列数据:探索与应用》 一、开源时间序列数据概述二、热门的开源时间序列数据库1. InfluxDB2. TimescaleDB3. Prometheus4. OpenTSDB5. Graphite6. Druid 三、开源时间序列数据的应用场景1. 物联网领域2. 金融领域3. 运维监控领域4. 能源领域 四、开源时间…...
三相异步电动机跳闸的原因是什么?
三相异步电动机是现代工业生产和日常生活中广泛应用的一种电动机,因其结构简单、维护方便和功率范围广泛而受到广泛青睐。然而,在实际使用过程中,电动机的跳闸现象时有发生,这不仅影响了设备的正常运行,甚至可能导致经…...

连续思维链Coconut ,打开LLM推理新范式
语言与推理之间有着什么样内涵上的联系与本质上的差别? 系统二的长链复杂分步推理与系统一分别在训练时与推理时的正/反向传播链路、模型神经网络内部的潜在机制(虽然是黑盒)以及网络链路对应的模型训练过程中“压缩”的数据(认知)流形所映射出的隐含碎片化泛化分布…...

阿里云数据库MongoDB版助力极致游戏高效开发
客户简介 成立于2010年的厦门极致互动网络技术股份有限公司(以下简称“公司”或“极致游戏”),是一家集网络游戏产品研发与运营为一体的重点软件企业,公司专注于面向全球用户的网络游戏研发与运营。在整个产业链中,公…...
ESP32-S3模组上跑通ES8388(29)
接前一篇文章:ESP32-S3模组上跑通ES8388(28) 二、利用ESP-ADF操作ES8388 2. 详细解析 上一回解析到了es8388_init函数中的第11段也是最后一段代码,没有解析完,本回继续解析。为了便于理解和回顾,再次贴出该片段,在components\audio_hal\driver\es8388\es8388.c中,如下…...

使用ElasticSearch实现全文检索
文章目录 全文检索任务描述技术难点任务目标实现过程1. java读取Json文件,并导入MySQL数据库中2. 利用Logstah完成MySQL到ES的数据同步3. 开始编写功能接口3.1 全文检索接口3.2 查询详情 4. 前端调用 全文检索 任务描述 在获取到数据之后如何在ES中进行数据建模&a…...
通过k-means对相似度较高的语句进行分类
本文介绍了如何使用K-Means算法对相似度较高的语句进行分类,并附上java案例代码 import java.util.ArrayList; import java.util.List; import java.util.Random;public class KMeansTextClustering {public static void main(String[] args) {// 初始化语句数据集…...

国信华源科技赋能长江蓄滞洪区水闸管护项目验收成果报道
“碧水悠悠绕古城,闸启长江万象新。”近日,由北京国信华源科技有限公司倾力打造的万里长江蓄滞洪区水闸管护项目,圆满通过验收,为这片鱼米之乡的防洪安全注入了新的科技活力。 长江之畔,水闸挺立,犹如干堤上…...

HTML:表格重点
用表格就用table caption为该表上部信息,用来说明表的作用 thead为表头主要信息,效果加粗 tbody为表格中的主体内容 tr是 table row 表格的行 td是table data th是table heading表格标题 ,一般表格第一行的数据都是table heading...

wine的使用方法
wine版本 所有分支,新的主要版本: wine-x.0 All branches, release candidates:各分支、候选版本: wine-x.0-rcn Stable branch updates: 稳定分支更新: wine-x.0.z Development branch updates: wine-x.y wine *.exe “更改目…...

Linux服务器离线安装unzip包
Linux服务器离线安装unzip包 1. 安装unzip包的目的 解压Docker部署包和服务部署包。 2. 查看当前环境是否已经安装unzip rpm -qa | grep --color unzip3. 下载对应的离线包 地址:http://www.rpmfind.net/linux/rpm2html/search.php?query&submitSearch 例…...

Excel拆分脚本
Excel拆分 工作表按行拆分为工作薄 工作表按行拆分为工作薄 打开要拆分的Excel文件,使用快捷键(AltF11)打开脚本界面,选择要拆分的sheet,打开Module,在Module中输入脚本代码,然后运行脚本 Su…...

Mybatis---事务
目录 引入 一、事务存在的意义 1.事务是什么? 2.Mybatis关于事务的管理 程序员自己控制处理的提交和回滚 引入 一、事务存在的意义 1.事务是什么? 多个操作同时进行,那么同时成功,那么同时失败。这就是事务。 事务有四个特性…...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...

全球首个30米分辨率湿地数据集(2000—2022)
数据简介 今天我们分享的数据是全球30米分辨率湿地数据集,包含8种湿地亚类,该数据以0.5X0.5的瓦片存储,我们整理了所有属于中国的瓦片名称与其对应省份,方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...

学校招生小程序源码介绍
基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码,专为学校招生场景量身打造,功能实用且操作便捷。 从技术架构来看,ThinkPHP提供稳定可靠的后台服务,FastAdmin加速开发流程,UniApp则保障小程序在多端有良好的兼…...

cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...

视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...

Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...

用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
CSS | transition 和 transform的用处和区别
省流总结: transform用于变换/变形,transition是动画控制器 transform 用来对元素进行变形,常见的操作如下,它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...

解析“道作为序位生成器”的核心原理
解析“道作为序位生成器”的核心原理 以下完整展开道函数的零点调控机制,重点解析"道作为序位生成器"的核心原理与实现框架: 一、道函数的零点调控机制 1. 道作为序位生成器 道在认知坐标系$(x_{\text{物}}, y_{\text{意}}, z_{\text{文}}…...