Debedium如何忽略Oracle的purge命令
报错
截至目前3.0版本,Debezium的Oracle Connector并不支持purge table这个指令。
所以,在使用Debezium解析Oracle变更的时候,如果在源端执行了类似
purge table "$BIN…
的语句,就会导致Debezium罢工,日志里显示:
Mining session stopped due to error.io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'purge table "BIN$rvZfTNVaRv+3dgTgQzBLuw==$0";'
mismatched input 'table' expecting {<EOF>, '/', ';'}at io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:543) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportInputMismatch(DefaultErrorStrategy.java:327) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:139) ~[antlr4-runtime-4.10.1.jar:4.10.1]at io.debezium.ddl.parser.oracle.generated.PlSqlParser.sql_script(PlSqlParser.java:2211) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:74) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:32) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:76) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parse(OracleDdlParser.java:69) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:104) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:388) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.handleSchemaChange(AbstractLogMinerEventProcessor.java:1016) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:514) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:439) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:288) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:243) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:62) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:324) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:203) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:143) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[na:na]at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317) ~[na:na]at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.antlr.v4.runtime.InputMismatchException: nullat org.antlr.v4.runtime.DefaultErrorStrategy.sync(DefaultErrorStrategy.java:270) ~[antlr4-runtime-4.10.1.jar:4.10.1]at io.debezium.ddl.parser.oracle.generated.PlSqlParser.sql_script(PlSqlParser.java:2143) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]... 21 common frames omitted
而这个错误的处理方式如下:
io.debezium.pipeline.ErrorHandler : Producer failureio.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'purge table "BIN$rvZfTNVaRv+3dgTgQzBLuw==$0";'
mismatched input 'table' expecting {<EOF>, '/', ';'}at io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:543) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportInputMismatch(DefaultErrorStrategy.java:327) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:139) ~[antlr4-runtime-4.10.1.jar:4.10.1]at io.debezium.ddl.parser.oracle.generated.PlSqlParser.sql_script(PlSqlParser.java:2211) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:74) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]
……
其实就是整个流水线都停止,无法工作。
这个报错一大堆,还显示了一个
Please open a Jira issue with the statement 'purge table "BIN$rvZfTNVaRv+3dgTgQzBLuw==$0";'
mismatched input 'table' expecting {<EOF>, '/', ';'}
好像只能去项目主页去提issue然后坐等修复了。
忽略
但是,如果我们确保这个指令我们不需要解析的话,可以不可以直接忽略,而不是停止呢?
其实可以。即把选项schema.history.internal.skip.unparseable.ddl,默认值为false,开启就为true就可以了。
properties.setProperty("schema.history.internal.skip.unparseable.ddl", "true");
官方手册里面,认为这个值需要我们关注,确定可以忽略再使用。
A Boolean value that specifies whether the connector should ignore malformed or unknown database statements or stop processing so a human can fix the issue. The safe default is false. Skipping should be used only with care as it can lead to data loss or mangling when the binlog is being processed.
源码
虽然忽略了问题,但是我们也很好奇这个过程是如何发生的,所以追一下源码吧!
经过一番查找,发现在debezium-oracle-connector包的源码文件OracleSchemaChangeEventEmitter.class里,有解析schema的方法emitSchemaChangeEvent,方法定义为:
public void emitSchemaChangeEvent(SchemaChangeEventEmitter.Receiver receiver) throws InterruptedException { Table tableBefore = this.schema.tableFor(this.tableId); OracleDdlParser parser = this.schema.getDdlParser(); DdlChanges ddlChanges = parser.getDdlChanges(); try { ddlChanges.reset(); parser.setCurrentDatabase(this.sourceDatabaseName); parser.setCurrentSchema(this.objectOwner); parser.parse(this.ddlText, this.schema.getTables()); } catch (MultipleParsingExceptions | ParsingException e) { if (!this.schema.skipUnparseableDdlStatements()) { throw e; } LOGGER.warn("Ignoring unparsable DDL statement '{}':", this.ddlText, e); this.streamingMetrics.incrementWarningCount(); this.streamingMetrics.incrementSchemaChangeParseErrorCount(); }
可以看到,如果this.schema.skipUnparseableDdlStatements()为真,就只会打印一条警告日志,继续执行,不会抛出异常。
而这个skipUnparseableDdlStatements的定义,在debezium-core的HistorizedRelationalDatabaseSchema.class文件里:
public boolean skipUnparseableDdlStatements() { return this.historizedConnectorConfig.skipUnparseableDdlStatements();
}
可以看到,只是返回了historizedConnectorConfig的同名方法。而historizedConnectorConfig则是一个HistorizedRelationalDatabaseConnectorConfig。
跟进这个文件去,可以看到这个方法只是一个变量的返回:
public boolean skipUnparseableDdlStatements() { return this.skipUnparseableDDL;
}
而变量skipUnparseableDDL,则在构造函数里进行了设定:
protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass, Configuration config, Tables.TableFilter systemTablesFilter, Selectors.TableIdToStringMapper tableIdMapper, boolean useCatalogBeforeSchema, int defaultSnapshotFetchSize, ColumnFilterMode columnFilterMode, boolean multiPartitionMode) { super(config, systemTablesFilter, tableIdMapper, defaultSnapshotFetchSize, columnFilterMode, useCatalogBeforeSchema); this.useCatalogBeforeSchema = useCatalogBeforeSchema; this.connectorClass = connectorClass; this.multiPartitionMode = multiPartitionMode; this.ddlFilter = this.createDdlFilter(config); this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS); this.storeOnlyCapturedTablesDdl = config.getBoolean(STORE_ONLY_CAPTURED_TABLES_DDL); this.storeOnlyCapturedDatabasesDdl = config.getBoolean(STORE_ONLY_CAPTURED_DATABASES_DDL);
}
来自与config的SKIP_UNPARSEABLE_DDL_STATEMENTS参数的boolean值。
而SKIP_UNPARSEABLE_DDL_STATEMENTS的定义在这个类里面是一个静态初始化过程:
static { SCHEMA_HISTORY = Field.create("schema.history.internal").withDisplayName("Database schema history class").withType(Type.CLASS).withWidth(Width.LONG).withImportance(Importance.LOW).withInvisibleRecommender().withDescription("The name of the SchemaHistory class that should be used to store and recover database schema changes. The configuration properties for the history are prefixed with the 'schema.history.internal.' string.").withDefault("io.debezium.storage.kafka.history.KafkaSchemaHistory"); SKIP_UNPARSEABLE_DDL_STATEMENTS = SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS; STORE_ONLY_CAPTURED_TABLES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL; STORE_ONLY_CAPTURED_DATABASES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_DATABASES_DDL; CONFIG_DEFINITION = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit().history(new Field[]{SCHEMA_HISTORY, SKIP_UNPARSEABLE_DDL_STATEMENTS, STORE_ONLY_CAPTURED_TABLES_DDL, STORE_ONLY_CAPTURED_DATABASES_DDL}).create();
}
其实就是SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS,其中SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS的定义也在这个类的构造函数里,是:
public interface SchemaHistory { String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.internal."; Field NAME = Field.create("schema.history.internal.name").withDisplayName("Logical name for the database schema history").withType(Type.STRING).withWidth(Width.MEDIUM).withImportance(Importance.LOW).withDescription("The name used for the database schema history, perhaps differently by each implementation.").withValidation(new Field.Validator[]{Field::isOptional}); Field SKIP_UNPARSEABLE_DDL_STATEMENTS = Field.create("schema.history.internal.skip.unparseable.ddl").withDisplayName("Skip DDL statements that cannot be parsed").withType(Type.BOOLEAN).withWidth(Width.SHORT).withImportance(Importance.LOW).withDescription("Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes.").withDefault(false);
……
这一趟追下来,不得不说,这些参数真是包装(隐藏)得博大精深!
相关文章:
Debedium如何忽略Oracle的purge命令
报错 截至目前3.0版本,Debezium的Oracle Connector并不支持purge table这个指令。 所以,在使用Debezium解析Oracle变更的时候,如果在源端执行了类似 purge table "$BIN… 的语句,就会导致Debezium罢工,日志里显…...

PlantUML 语言
PlantUML 是一种开源工具,用于通过简单的文本描述生成 UML 图。它支持多种 UML 图类型,如类图、序列图、用例图、活动图、组件图、状态图等。PlantUML 语言非常简洁,采用类似编程语言的语法,允许用户使用文本定义模型,…...
linux的 .so和.ko文件分别是什么?主要区别是什么?
前言: .so和.ko文件的主要区别在于它们的应用层次和功能不同。 应用层次 .so文件:这是用户层的动态链接库(Shared Object),主要用于用户态的程序中。 它用于动态链接,多个程序可以共享同一个库文件&…...

XX服务器上的npm不知道咋突然坏了
收到同事的V,说是:182上的npm不知道咋突然坏了,查到这里了,不敢动了。 咱一定要抓重点:突然坏了。这里的突然肯定不是瞬间(大概率是上次可用,这次不可用,中间间隔了多长时间&#x…...

数据结构(优先级队列 :Priority Queue)
前言: 在计算机科学中,队列是一种非常常见的数据结构,它遵循先进先出(FIFO)的原则,也就是说,先进入队列的元素会先被处理。然而,在许多实际应用中,我们不仅仅需要按顺序…...
nginx.conf 请求时间部分参数说明新手教程
下面来说下nginx.conf 的部分参数,配置如下: http {include mime.types;default_type application/octet-stream;client_max_body_size 1000M;#log_format main $remote_addr - $remote_user [$time_local] "$request" # …...

【Linux-ubuntu通过USB传输程序点亮LED灯】
Linux-ubuntu通过USB传输程序点亮LED灯 一,初始化GPIO配置1.使能时钟2.其他寄存器配置 二,程序编译三,USB传输程序 一,初始化GPIO配置 1.使能时钟 使能就是一个控制信号,用于决定时钟信号是否能够有效的传递或者被使用,就像一个…...
《开源时间序列数据:探索与应用》
《开源时间序列数据:探索与应用》 一、开源时间序列数据概述二、热门的开源时间序列数据库1. InfluxDB2. TimescaleDB3. Prometheus4. OpenTSDB5. Graphite6. Druid 三、开源时间序列数据的应用场景1. 物联网领域2. 金融领域3. 运维监控领域4. 能源领域 四、开源时间…...
三相异步电动机跳闸的原因是什么?
三相异步电动机是现代工业生产和日常生活中广泛应用的一种电动机,因其结构简单、维护方便和功率范围广泛而受到广泛青睐。然而,在实际使用过程中,电动机的跳闸现象时有发生,这不仅影响了设备的正常运行,甚至可能导致经…...

连续思维链Coconut ,打开LLM推理新范式
语言与推理之间有着什么样内涵上的联系与本质上的差别? 系统二的长链复杂分步推理与系统一分别在训练时与推理时的正/反向传播链路、模型神经网络内部的潜在机制(虽然是黑盒)以及网络链路对应的模型训练过程中“压缩”的数据(认知)流形所映射出的隐含碎片化泛化分布…...

阿里云数据库MongoDB版助力极致游戏高效开发
客户简介 成立于2010年的厦门极致互动网络技术股份有限公司(以下简称“公司”或“极致游戏”),是一家集网络游戏产品研发与运营为一体的重点软件企业,公司专注于面向全球用户的网络游戏研发与运营。在整个产业链中,公…...
ESP32-S3模组上跑通ES8388(29)
接前一篇文章:ESP32-S3模组上跑通ES8388(28) 二、利用ESP-ADF操作ES8388 2. 详细解析 上一回解析到了es8388_init函数中的第11段也是最后一段代码,没有解析完,本回继续解析。为了便于理解和回顾,再次贴出该片段,在components\audio_hal\driver\es8388\es8388.c中,如下…...

使用ElasticSearch实现全文检索
文章目录 全文检索任务描述技术难点任务目标实现过程1. java读取Json文件,并导入MySQL数据库中2. 利用Logstah完成MySQL到ES的数据同步3. 开始编写功能接口3.1 全文检索接口3.2 查询详情 4. 前端调用 全文检索 任务描述 在获取到数据之后如何在ES中进行数据建模&a…...
通过k-means对相似度较高的语句进行分类
本文介绍了如何使用K-Means算法对相似度较高的语句进行分类,并附上java案例代码 import java.util.ArrayList; import java.util.List; import java.util.Random;public class KMeansTextClustering {public static void main(String[] args) {// 初始化语句数据集…...

国信华源科技赋能长江蓄滞洪区水闸管护项目验收成果报道
“碧水悠悠绕古城,闸启长江万象新。”近日,由北京国信华源科技有限公司倾力打造的万里长江蓄滞洪区水闸管护项目,圆满通过验收,为这片鱼米之乡的防洪安全注入了新的科技活力。 长江之畔,水闸挺立,犹如干堤上…...

HTML:表格重点
用表格就用table caption为该表上部信息,用来说明表的作用 thead为表头主要信息,效果加粗 tbody为表格中的主体内容 tr是 table row 表格的行 td是table data th是table heading表格标题 ,一般表格第一行的数据都是table heading...

wine的使用方法
wine版本 所有分支,新的主要版本: wine-x.0 All branches, release candidates:各分支、候选版本: wine-x.0-rcn Stable branch updates: 稳定分支更新: wine-x.0.z Development branch updates: wine-x.y wine *.exe “更改目…...

Linux服务器离线安装unzip包
Linux服务器离线安装unzip包 1. 安装unzip包的目的 解压Docker部署包和服务部署包。 2. 查看当前环境是否已经安装unzip rpm -qa | grep --color unzip3. 下载对应的离线包 地址:http://www.rpmfind.net/linux/rpm2html/search.php?query&submitSearch 例…...

Excel拆分脚本
Excel拆分 工作表按行拆分为工作薄 工作表按行拆分为工作薄 打开要拆分的Excel文件,使用快捷键(AltF11)打开脚本界面,选择要拆分的sheet,打开Module,在Module中输入脚本代码,然后运行脚本 Su…...

Mybatis---事务
目录 引入 一、事务存在的意义 1.事务是什么? 2.Mybatis关于事务的管理 程序员自己控制处理的提交和回滚 引入 一、事务存在的意义 1.事务是什么? 多个操作同时进行,那么同时成功,那么同时失败。这就是事务。 事务有四个特性…...

边缘计算医疗风险自查APP开发方案
核心目标:在便携设备(智能手表/家用检测仪)部署轻量化疾病预测模型,实现低延迟、隐私安全的实时健康风险评估。 一、技术架构设计 #mermaid-svg-iuNaeeLK2YoFKfao {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…...

无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...

CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...
使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度
文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...

Yolov8 目标检测蒸馏学习记录
yolov8系列模型蒸馏基本流程,代码下载:这里本人提交了一个demo:djdll/Yolov8_Distillation: Yolov8轻量化_蒸馏代码实现 在轻量化模型设计中,**知识蒸馏(Knowledge Distillation)**被广泛应用,作为提升模型…...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...

Qemu arm操作系统开发环境
使用qemu虚拟arm硬件比较合适。 步骤如下: 安装qemu apt install qemu-system安装aarch64-none-elf-gcc 需要手动下载,下载地址:https://developer.arm.com/-/media/Files/downloads/gnu/13.2.rel1/binrel/arm-gnu-toolchain-13.2.rel1-x…...
c# 局部函数 定义、功能与示例
C# 局部函数:定义、功能与示例 1. 定义与功能 局部函数(Local Function)是嵌套在另一个方法内部的私有方法,仅在包含它的方法内可见。 • 作用:封装仅用于当前方法的逻辑,避免污染类作用域,提升…...

GraphQL 实战篇:Apollo Client 配置与缓存
GraphQL 实战篇:Apollo Client 配置与缓存 上一篇:GraphQL 入门篇:基础查询语法 依旧和上一篇的笔记一样,主实操,没啥过多的细节讲解,代码具体在: https://github.com/GoldenaArcher/graphql…...

2025年- H71-Lc179--39.组合总和(回溯,组合)--Java版
1.题目描述 2.思路 当前的元素可以重复使用。 (1)确定回溯算法函数的参数和返回值(一般是void类型) (2)因为是用递归实现的,所以我们要确定终止条件 (3)单层搜索逻辑 二…...