Debedium如何忽略Oracle的purge命令
报错
截至目前3.0版本,Debezium的Oracle Connector并不支持purge table这个指令。
所以,在使用Debezium解析Oracle变更的时候,如果在源端执行了类似
purge table "$BIN…
的语句,就会导致Debezium罢工,日志里显示:
Mining session stopped due to error.io.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'purge table "BIN$rvZfTNVaRv+3dgTgQzBLuw==$0";'
mismatched input 'table' expecting {<EOF>, '/', ';'}at io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:543) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportInputMismatch(DefaultErrorStrategy.java:327) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:139) ~[antlr4-runtime-4.10.1.jar:4.10.1]at io.debezium.ddl.parser.oracle.generated.PlSqlParser.sql_script(PlSqlParser.java:2211) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:74) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:32) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.antlr.AntlrDdlParser.parse(AntlrDdlParser.java:76) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parse(OracleDdlParser.java:69) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.OracleSchemaChangeEventEmitter.emitSchemaChangeEvent(OracleSchemaChangeEventEmitter.java:104) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:388) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.handleSchemaChange(AbstractLogMinerEventProcessor.java:1016) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:514) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:439) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:288) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:243) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:62) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:324) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:203) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:143) ~[debezium-core-3.0.2.Final.jar:3.0.2.Final]at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[na:na]at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317) ~[na:na]at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.antlr.v4.runtime.InputMismatchException: nullat org.antlr.v4.runtime.DefaultErrorStrategy.sync(DefaultErrorStrategy.java:270) ~[antlr4-runtime-4.10.1.jar:4.10.1]at io.debezium.ddl.parser.oracle.generated.PlSqlParser.sql_script(PlSqlParser.java:2143) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]... 21 common frames omitted
而这个错误的处理方式如下:
io.debezium.pipeline.ErrorHandler : Producer failureio.debezium.text.ParsingException: DDL statement couldn't be parsed. Please open a Jira issue with the statement 'purge table "BIN$rvZfTNVaRv+3dgTgQzBLuw==$0";'
mismatched input 'table' expecting {<EOF>, '/', ';'}at io.debezium.antlr.ParsingErrorListener.syntaxError(ParsingErrorListener.java:43) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at org.antlr.v4.runtime.ProxyErrorListener.syntaxError(ProxyErrorListener.java:41) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.Parser.notifyErrorListeners(Parser.java:543) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportInputMismatch(DefaultErrorStrategy.java:327) ~[antlr4-runtime-4.10.1.jar:4.10.1]at org.antlr.v4.runtime.DefaultErrorStrategy.reportError(DefaultErrorStrategy.java:139) ~[antlr4-runtime-4.10.1.jar:4.10.1]at io.debezium.ddl.parser.oracle.generated.PlSqlParser.sql_script(PlSqlParser.java:2211) ~[debezium-ddl-parser-3.0.2.Final.jar:3.0.2.Final]at io.debezium.connector.oracle.antlr.OracleDdlParser.parseTree(OracleDdlParser.java:74) ~[debezium-connector-oracle-3.0.2.Final.jar:3.0.2.Final]
……
其实就是整个流水线都停止,无法工作。
这个报错一大堆,还显示了一个
Please open a Jira issue with the statement 'purge table "BIN$rvZfTNVaRv+3dgTgQzBLuw==$0";'
mismatched input 'table' expecting {<EOF>, '/', ';'}
好像只能去项目主页去提issue然后坐等修复了。
忽略
但是,如果我们确保这个指令我们不需要解析的话,可以不可以直接忽略,而不是停止呢?
其实可以。即把选项schema.history.internal.skip.unparseable.ddl,默认值为false,开启就为true就可以了。
properties.setProperty("schema.history.internal.skip.unparseable.ddl", "true");
官方手册里面,认为这个值需要我们关注,确定可以忽略再使用。
A Boolean value that specifies whether the connector should ignore malformed or unknown database statements or stop processing so a human can fix the issue. The safe default is false. Skipping should be used only with care as it can lead to data loss or mangling when the binlog is being processed.
源码
虽然忽略了问题,但是我们也很好奇这个过程是如何发生的,所以追一下源码吧!
经过一番查找,发现在debezium-oracle-connector包的源码文件OracleSchemaChangeEventEmitter.class里,有解析schema的方法emitSchemaChangeEvent,方法定义为:
public void emitSchemaChangeEvent(SchemaChangeEventEmitter.Receiver receiver) throws InterruptedException { Table tableBefore = this.schema.tableFor(this.tableId); OracleDdlParser parser = this.schema.getDdlParser(); DdlChanges ddlChanges = parser.getDdlChanges(); try { ddlChanges.reset(); parser.setCurrentDatabase(this.sourceDatabaseName); parser.setCurrentSchema(this.objectOwner); parser.parse(this.ddlText, this.schema.getTables()); } catch (MultipleParsingExceptions | ParsingException e) { if (!this.schema.skipUnparseableDdlStatements()) { throw e; } LOGGER.warn("Ignoring unparsable DDL statement '{}':", this.ddlText, e); this.streamingMetrics.incrementWarningCount(); this.streamingMetrics.incrementSchemaChangeParseErrorCount(); }
可以看到,如果this.schema.skipUnparseableDdlStatements()为真,就只会打印一条警告日志,继续执行,不会抛出异常。
而这个skipUnparseableDdlStatements的定义,在debezium-core的HistorizedRelationalDatabaseSchema.class文件里:
public boolean skipUnparseableDdlStatements() { return this.historizedConnectorConfig.skipUnparseableDdlStatements();
}
可以看到,只是返回了historizedConnectorConfig的同名方法。而historizedConnectorConfig则是一个HistorizedRelationalDatabaseConnectorConfig。
跟进这个文件去,可以看到这个方法只是一个变量的返回:
public boolean skipUnparseableDdlStatements() { return this.skipUnparseableDDL;
}
而变量skipUnparseableDDL,则在构造函数里进行了设定:
protected HistorizedRelationalDatabaseConnectorConfig(Class<? extends SourceConnector> connectorClass, Configuration config, Tables.TableFilter systemTablesFilter, Selectors.TableIdToStringMapper tableIdMapper, boolean useCatalogBeforeSchema, int defaultSnapshotFetchSize, ColumnFilterMode columnFilterMode, boolean multiPartitionMode) { super(config, systemTablesFilter, tableIdMapper, defaultSnapshotFetchSize, columnFilterMode, useCatalogBeforeSchema); this.useCatalogBeforeSchema = useCatalogBeforeSchema; this.connectorClass = connectorClass; this.multiPartitionMode = multiPartitionMode; this.ddlFilter = this.createDdlFilter(config); this.skipUnparseableDDL = config.getBoolean(SKIP_UNPARSEABLE_DDL_STATEMENTS); this.storeOnlyCapturedTablesDdl = config.getBoolean(STORE_ONLY_CAPTURED_TABLES_DDL); this.storeOnlyCapturedDatabasesDdl = config.getBoolean(STORE_ONLY_CAPTURED_DATABASES_DDL);
}
来自与config的SKIP_UNPARSEABLE_DDL_STATEMENTS参数的boolean值。
而SKIP_UNPARSEABLE_DDL_STATEMENTS的定义在这个类里面是一个静态初始化过程:
static { SCHEMA_HISTORY = Field.create("schema.history.internal").withDisplayName("Database schema history class").withType(Type.CLASS).withWidth(Width.LONG).withImportance(Importance.LOW).withInvisibleRecommender().withDescription("The name of the SchemaHistory class that should be used to store and recover database schema changes. The configuration properties for the history are prefixed with the 'schema.history.internal.' string.").withDefault("io.debezium.storage.kafka.history.KafkaSchemaHistory"); SKIP_UNPARSEABLE_DDL_STATEMENTS = SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS; STORE_ONLY_CAPTURED_TABLES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL; STORE_ONLY_CAPTURED_DATABASES_DDL = SchemaHistory.STORE_ONLY_CAPTURED_DATABASES_DDL; CONFIG_DEFINITION = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit().history(new Field[]{SCHEMA_HISTORY, SKIP_UNPARSEABLE_DDL_STATEMENTS, STORE_ONLY_CAPTURED_TABLES_DDL, STORE_ONLY_CAPTURED_DATABASES_DDL}).create();
}
其实就是SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS,其中SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS的定义也在这个类的构造函数里,是:
public interface SchemaHistory { String CONFIGURATION_FIELD_PREFIX_STRING = "schema.history.internal."; Field NAME = Field.create("schema.history.internal.name").withDisplayName("Logical name for the database schema history").withType(Type.STRING).withWidth(Width.MEDIUM).withImportance(Importance.LOW).withDescription("The name used for the database schema history, perhaps differently by each implementation.").withValidation(new Field.Validator[]{Field::isOptional}); Field SKIP_UNPARSEABLE_DDL_STATEMENTS = Field.create("schema.history.internal.skip.unparseable.ddl").withDisplayName("Skip DDL statements that cannot be parsed").withType(Type.BOOLEAN).withWidth(Width.SHORT).withImportance(Importance.LOW).withDescription("Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes.").withDefault(false);
……
这一趟追下来,不得不说,这些参数真是包装(隐藏)得博大精深!
相关文章:
Debedium如何忽略Oracle的purge命令
报错 截至目前3.0版本,Debezium的Oracle Connector并不支持purge table这个指令。 所以,在使用Debezium解析Oracle变更的时候,如果在源端执行了类似 purge table "$BIN… 的语句,就会导致Debezium罢工,日志里显…...

PlantUML 语言
PlantUML 是一种开源工具,用于通过简单的文本描述生成 UML 图。它支持多种 UML 图类型,如类图、序列图、用例图、活动图、组件图、状态图等。PlantUML 语言非常简洁,采用类似编程语言的语法,允许用户使用文本定义模型,…...
linux的 .so和.ko文件分别是什么?主要区别是什么?
前言: .so和.ko文件的主要区别在于它们的应用层次和功能不同。 应用层次 .so文件:这是用户层的动态链接库(Shared Object),主要用于用户态的程序中。 它用于动态链接,多个程序可以共享同一个库文件&…...

XX服务器上的npm不知道咋突然坏了
收到同事的V,说是:182上的npm不知道咋突然坏了,查到这里了,不敢动了。 咱一定要抓重点:突然坏了。这里的突然肯定不是瞬间(大概率是上次可用,这次不可用,中间间隔了多长时间&#x…...

数据结构(优先级队列 :Priority Queue)
前言: 在计算机科学中,队列是一种非常常见的数据结构,它遵循先进先出(FIFO)的原则,也就是说,先进入队列的元素会先被处理。然而,在许多实际应用中,我们不仅仅需要按顺序…...
nginx.conf 请求时间部分参数说明新手教程
下面来说下nginx.conf 的部分参数,配置如下: http {include mime.types;default_type application/octet-stream;client_max_body_size 1000M;#log_format main $remote_addr - $remote_user [$time_local] "$request" # …...

【Linux-ubuntu通过USB传输程序点亮LED灯】
Linux-ubuntu通过USB传输程序点亮LED灯 一,初始化GPIO配置1.使能时钟2.其他寄存器配置 二,程序编译三,USB传输程序 一,初始化GPIO配置 1.使能时钟 使能就是一个控制信号,用于决定时钟信号是否能够有效的传递或者被使用,就像一个…...
《开源时间序列数据:探索与应用》
《开源时间序列数据:探索与应用》 一、开源时间序列数据概述二、热门的开源时间序列数据库1. InfluxDB2. TimescaleDB3. Prometheus4. OpenTSDB5. Graphite6. Druid 三、开源时间序列数据的应用场景1. 物联网领域2. 金融领域3. 运维监控领域4. 能源领域 四、开源时间…...
三相异步电动机跳闸的原因是什么?
三相异步电动机是现代工业生产和日常生活中广泛应用的一种电动机,因其结构简单、维护方便和功率范围广泛而受到广泛青睐。然而,在实际使用过程中,电动机的跳闸现象时有发生,这不仅影响了设备的正常运行,甚至可能导致经…...

连续思维链Coconut ,打开LLM推理新范式
语言与推理之间有着什么样内涵上的联系与本质上的差别? 系统二的长链复杂分步推理与系统一分别在训练时与推理时的正/反向传播链路、模型神经网络内部的潜在机制(虽然是黑盒)以及网络链路对应的模型训练过程中“压缩”的数据(认知)流形所映射出的隐含碎片化泛化分布…...

阿里云数据库MongoDB版助力极致游戏高效开发
客户简介 成立于2010年的厦门极致互动网络技术股份有限公司(以下简称“公司”或“极致游戏”),是一家集网络游戏产品研发与运营为一体的重点软件企业,公司专注于面向全球用户的网络游戏研发与运营。在整个产业链中,公…...
ESP32-S3模组上跑通ES8388(29)
接前一篇文章:ESP32-S3模组上跑通ES8388(28) 二、利用ESP-ADF操作ES8388 2. 详细解析 上一回解析到了es8388_init函数中的第11段也是最后一段代码,没有解析完,本回继续解析。为了便于理解和回顾,再次贴出该片段,在components\audio_hal\driver\es8388\es8388.c中,如下…...

使用ElasticSearch实现全文检索
文章目录 全文检索任务描述技术难点任务目标实现过程1. java读取Json文件,并导入MySQL数据库中2. 利用Logstah完成MySQL到ES的数据同步3. 开始编写功能接口3.1 全文检索接口3.2 查询详情 4. 前端调用 全文检索 任务描述 在获取到数据之后如何在ES中进行数据建模&a…...
通过k-means对相似度较高的语句进行分类
本文介绍了如何使用K-Means算法对相似度较高的语句进行分类,并附上java案例代码 import java.util.ArrayList; import java.util.List; import java.util.Random;public class KMeansTextClustering {public static void main(String[] args) {// 初始化语句数据集…...

国信华源科技赋能长江蓄滞洪区水闸管护项目验收成果报道
“碧水悠悠绕古城,闸启长江万象新。”近日,由北京国信华源科技有限公司倾力打造的万里长江蓄滞洪区水闸管护项目,圆满通过验收,为这片鱼米之乡的防洪安全注入了新的科技活力。 长江之畔,水闸挺立,犹如干堤上…...

HTML:表格重点
用表格就用table caption为该表上部信息,用来说明表的作用 thead为表头主要信息,效果加粗 tbody为表格中的主体内容 tr是 table row 表格的行 td是table data th是table heading表格标题 ,一般表格第一行的数据都是table heading...

wine的使用方法
wine版本 所有分支,新的主要版本: wine-x.0 All branches, release candidates:各分支、候选版本: wine-x.0-rcn Stable branch updates: 稳定分支更新: wine-x.0.z Development branch updates: wine-x.y wine *.exe “更改目…...

Linux服务器离线安装unzip包
Linux服务器离线安装unzip包 1. 安装unzip包的目的 解压Docker部署包和服务部署包。 2. 查看当前环境是否已经安装unzip rpm -qa | grep --color unzip3. 下载对应的离线包 地址:http://www.rpmfind.net/linux/rpm2html/search.php?query&submitSearch 例…...

Excel拆分脚本
Excel拆分 工作表按行拆分为工作薄 工作表按行拆分为工作薄 打开要拆分的Excel文件,使用快捷键(AltF11)打开脚本界面,选择要拆分的sheet,打开Module,在Module中输入脚本代码,然后运行脚本 Su…...

Mybatis---事务
目录 引入 一、事务存在的意义 1.事务是什么? 2.Mybatis关于事务的管理 程序员自己控制处理的提交和回滚 引入 一、事务存在的意义 1.事务是什么? 多个操作同时进行,那么同时成功,那么同时失败。这就是事务。 事务有四个特性…...
在 Ubuntu 服务器上 下载 Clash 文件使用代理
文件Clash.Verge_1.3.8_x64_portable.zip 在 Ubuntu 服务器上不能使用这个Clash 文件**,我们需要的是 Clash.Meta 而不是 Clash Verge GUI 客户端 也就是 Clash Verge GUI 客户端的 Windows 版本,是给 Windows 桌面环境用的图形界面,不适用…...

显示docker桌面,vnc远程连接docker
目录 相关概念: 实现步骤: 1.启动docker容器 2.安装x11 3.Docker 容器中安装一个完整的图形桌面(XFCE)和 VNC 远程桌面服务器(TightVNC) 4.配置vncservice 5.本地安装VNC Viewer连接VNC Viewer下载地…...
【HTML-13】HTML表格合并技术详解:打造专业数据展示
表格是HTML中展示结构化数据的重要元素,而表格合并则是提升表格表现力的关键技术。本文将全面介绍HTML中的表格合并方法,帮助您创建更专业、更灵活的数据展示界面。 1. 表格合并基础概念 在HTML中,表格合并主要通过两个属性实现:…...
压测服务器和线上环境的区别
在进行服务器压测时,测试环境与线上环境的差异会直接影响测试结果的可靠性。以下是两者的关键区别及注意事项: 1. 压测服务器的常见类型 本地开发机:低配虚拟机(如4核8GB),仅用于功能验证…...

【C++ Qt】容器类(GroupBox、TabWidget)内附思维导图 通俗易懂
每日激励:“不设限和自我肯定的心态:I can do all things。 — Stephen Curry” ✍️绪论: 本章主要介绍了 Qt 中 QGroupBox 与 QTabWidget 控件。QGroupBox 是带标题的分组框,能容纳其他控件,有标题、对齐方式、是否…...

20250529-C#知识:运算符重载
C#知识:运算符重载 运算符重载能够让我们像值类型数据那样使用运算符对类或结构体进行运算,并且能够自定义运算逻辑。 1、运算符重载及完整代码示例 作用是让自定义的类或者结构体能够使用运算符运算符重载一定是public static的可以把运算符看成一个函…...
题目 3314: 蓝桥杯2025年第十六届省赛真题-魔法科考试
题目 3314: 蓝桥杯2025年第十六届省赛真题-魔法科考试 时间限制: 3s 内存限制: 512MB 提交: 245 解决: 49 题目描述 小明正在参加魔法科的期末考试,考生需要根据给定的口诀组合出有效的 魔法。其中,老师给定了 n 个上半部分口诀 a1, a2, . . . , an 和 m…...

在 Ubuntu 上安装 NVM (Node Version Manager) 的步骤
NVM (Node Version Manager) 是一个用于管理多个 Node.js 版本的工具,它允许您在同一台设备上安装、切换和管理不同版本的 Node.js。以下是在 Ubuntu 上安装 NVM 的详细步骤: 安装前准备 可先在windows上安装ubuntu 参考链接:https://blog.…...

MCP入门实战(极简案例)
MCP简介 MCP(Model Context Protocol,模型上下文协议)2024年11月底由 Antbropic 推出的一种开放标准,旨在统一大型语言模型(LLM)与外部数据源和工具之间的通信协议。 Function Calling是AI模型调用函数的机制,MCP是一个标准协议,使AI模型与API无缝交互,而Al Agent是一个…...

Cursor从入门到精通实战指南(一):开始使用Cursor
一、简介与核心优势 Cursor是一款基于VSCode开发的AI编程工具,集成了GPT-4、Claude 3.5等先进大语言模型,支持代码补全、生成、重构、调试等功能。其核心优势包括: 高效协作:通过自然语言对话实现代码开发,支持跨文件…...