Flink 通过 Chunjun Oracle LogMiner 实时读取 Oracle 变更日志并写入 Doris 的方案
文章目录
- 一、 技术背景
- 二、 关键技术
- 1、 Oracle LogMiner
- 2、 Chunjun 的 LogMiner 关键流程
- 3、修复 Chunjun Oracle LogMiner 问题
一、 技术背景
在大数据实时同步场景中,需要将 Oracle 数据库的变更数据(CDC) 采集并写入 Apache Doris,以支持 数据分析、BI 报表、实时数据仓库 等应用。
本方案基于 Flink + Chunjun,通过 Oracle LogMiner 解析 Redo Log,实现 低延迟 写入Doris。
二、 关键技术
1、 Oracle LogMiner
LogMiner 是 Oracle 提供的 redo log 解析工具,用于跟踪 INSERT、UPDATE、DELETE 操作。
使用LogMiner需要现在Oracle中开启,具体开启操作见:Oracle配置LogMiner
2、 Chunjun 的 LogMiner 关键流程
Chunjun(原 FlinkX)是 Flink 生态的数据同步框架,支持多种数据源连接器(如 Oracle、MySQL、PostgreSQL、Doris)。
其中 Chunjun Oracle LogMiner Source 用于解析 Oracle Redo Log 并转换为 Flink 数据流。
如下整个流程架构:

Flink任务启动后
- 通过Chunjun的oracle logMiner连接器, 建立 Oracle 连接,启动 LogMiner 解析 Redo Log。
- 实时监听
V$LOGMNR_CONTENTS,解析变更数据并转换为 Flink 事件流。具体地会将Oracle不同的操作日志解析为如下数据类型即重放数据操作, - Flink 任务处理数据,完成转换、清洗等操作。
- Flink Sink 组件(Chunjun Doris Sink)将数据写入 Doris。
| 操作类型 | before(旧数据) | after(新数据) | Flink 处理逻辑 |
|---|---|---|---|
| INSERT | 无 | {新数据} | 直接插入 |
| UPDATE | {旧数据} | {新数据} | 先删除旧数据,再插入新数据 |
| DELETE | {旧数据} | 无 | 删除数据 |
最后如下示例flink sql:
CREATE TABLE source
( ID int, NAME string
) WITH ( 'connector' = 'oraclelogminer-x' ,'url' = 'jdbc:oracle:thin:@//xxx:1521/ORCL' ,'username' = 'system' ,'password' = 'xxx' ,'cat' = 'insert,delete,update' ,'table' = 'TEST.TEST_USER' ,'timestamp-format.standard' = 'SQL' ); CREATE TABLE sink
( k4 int, k3 string
) WITH (
'connector' = 'doris-x',
'schema'='demo', 'password' = 'xxx', 'table-name' = 'mytable', 'url' = 'jdbc:mysql://xxx:9030', 'username' = 'root', 'sink.parallelism' = '1', 'lookup.error-limit' = '100', 'lookup.cache-type' = 'LRU', 'lookup.parallelism' = '1', 'lookup.cache.ttl' = '60000', 'lookup.cache.max-rows' = '10000', 'writeMode'='UPSERT' ); insert into sink
select ID as k4, NAME as k3
from source;
3、修复 Chunjun Oracle LogMiner 问题
在实际使用中,Chunjun Oracle LogMiner 会遇到以下问题:
- 关于全量增量读数据的问题
//LogMinerConfig,没有全量同步的外部配置,默认是增量读取数据
private boolean enableFetchAll = true;
- 无法获取监听的表
//LogMinerListener 中的LogMinerConfig没有set table的地方,
//即无法获取被监听的表,改成直接获取
logMinerConfig.getListenerTables();
- PavingData和Split 不能同时开启,默认都开启,将PavingData关闭
相关文章:
Flink 通过 Chunjun Oracle LogMiner 实时读取 Oracle 变更日志并写入 Doris 的方案
文章目录 一、 技术背景二、 关键技术1、 Oracle LogMiner2、 Chunjun 的 LogMiner 关键流程3、修复 Chunjun Oracle LogMiner 问题 一、 技术背景 在大数据实时同步场景中,需要将 Oracle 数据库的变更数据(CDC) 采集并写入 Apache Doris&am…...
WordPress系统获取webshell的攻略
一.后台修改模板拿WebShell 1.进入Vulhub靶场并执⾏以下命令开启靶场;在浏览器中访问并安装好 #执⾏命令 cd /vulhub/wordpress/pwnscriptum docker-compose up -d 2. 修改其WP的模板,登陆WP后点击 【外 观】 --》 【编辑】 --》 404.php 3.插入一句话木…...
JMeter基本介绍
Apache JMeter 工具详解 一、JMeter 简介 JMeter 是 Apache 基金会开源的 Java 应用程序,主要用于 性能测试、负载测试 和 功能测试。它通过对服务器或网络资源模拟多种负载条件(如并发用户、持续压力),帮助评估系统性能指标&am…...
npm 安装 pnpm 的详细步骤及注意事项
一、安装步骤 1.全局安装 pnpm npm install -g pnpm2.验证安装 pnpm -v输出版本号即表示安装成功。 二、升级 pnpm 若已安装旧版本,可通过以下命令升级: npm install -g pnpmlatest三、配置镜像加速 设置淘宝镜像 pnpm config set registry http…...
蓝桥杯2023年第十四届省赛真题-子矩阵
题目来自DOTCPP: 暴力思路(两个测试点超时): 题目要求我们求出子矩阵的最大值和最小值的乘积,我们可以枚举矩阵中的所有点,以这个点为其子矩阵的左上顶点,然后判断一下能不能构成子矩阵。如果可…...
如何在 Node.js 中使用 .env 文件管理环境变量 ?
Node.js 应用程序通常依赖于环境变量来管理敏感信息或配置设置。.env 文件已经成为一种流行的本地管理这些变量的方法,而无需在代码存储库中公开它们。本文将探讨 .env 文件为什么重要,以及如何在 Node.js 应用程序中有效的使用它。 为什么使用 .env 文…...
Redis BitMap 用户签到
Redis Bitmap Bitmap(位图)是 Redis 提供的一种用于处理二进制位(bit)的特殊数据结构,它基于 String 类型,每个 bit 代表一个布尔值(0 或 1),可以用于存储大规模的二值状…...
未来办公与生活的新范式——智慧园区
在信息化与智能化技术飞速发展的今天,智慧园区作为一种新兴的城市发展形态,正逐步成为推动产业升级、提升城市管理效率、改善居民生活质量的重要力量。智慧园区不仅融合了先进的信息技术,还深刻体现了可持续发展的理念,为园区内的…...
Hugging Face预训练GPT微调ChatGPT(微调入门!新手友好!)
Hugging Face预训练GPT微调ChatGPT(微调入门!新手友好!) 在实战中,⼤多数情况下都不需要从0开始训练模型,⽽是使⽤“⼤⼚”或者其他研究者开源的已经训练好的⼤模型。 在各种⼤模型开源库中,最…...
【CSS3】化神篇
目录 平面转换平移旋转改变旋转原点多重转换缩放倾斜 渐变线性渐变径向渐变 空间转换平移视距旋转立体呈现缩放 动画使现步骤animation 复合属性animation 属性拆分逐帧动画多组动画 平面转换 作用:为元素添加动态效果,一般与过渡配合使用 概念&#x…...
Unity音频混合器如何暴露参数
音频混合器是Unity推荐管理音效混音的工具,那么如何使用代码对它进行管理呢? 首先我在AudioMixer的Master组中创建了BGM和SFX的分组,你也可以直接用Master没有问题。 这里我以BGM为例,如果要在代码中进行使用就需要将参数暴露出去…...
Vue keepalive学习用法
在Vue中,<keep-alive>的include属性用于指定需要缓存的组件,其实现方式如下: 1. 基本用法 • 字符串形式:通过逗号分隔组件名称,匹配到的组件会被缓存。 <keep-alive include"ComponentA,ComponentB&…...
5-1 使用ECharts将MySQL数据库中的数据可视化
方法一:使用Python Flask框架搭建API 对于技术小白来说,使用ECharts将MySQL数据库中的数据可视化需要分步骤完成。以下是详细的实现流程: 一、技术架构 后端服务:使用Python Flask框架搭建API(简单易学ÿ…...
构建下一代AI Agent:自动化开发与行业落地全解析
1. 下一代AI Agent:概念与核心能力 核心能力描述技术支撑应用价值自主性独立规划与执行任务,无需持续人工干预决策树、强化学习、目标导向规划减少人工干预,提高任务执行效率决策能力评估多种方案并选择最优解决方案贝叶斯决策、多目标优化、…...
如何理解分布式光纤传感器?
关键词:OFDR、分布式光纤传感、光纤传感器 分布式光纤传感器是近年来备受关注的前沿技术,其核心在于将光纤本身作为传感介质和信号传输介质,通过解析光信号在光纤中的散射效应,实现对温度、应变、振动等物理量的连续、无盲区、高…...
四、小白学JAVA-石头剪刀布游戏
1、如何从控制台获取用户输入 import java.util.Scanner;public class Main {public static void main(String[] args) {// 石头剪刀布的思路// 1 2 3 石头 剪刀 布Scanner scanner new Scanner(System.in);System.out.println("请出拳:1.石头 2.剪刀 3.布【…...
【一起来学kubernetes】21、Secret使用详解
Secret 的详细介绍 Secret 是 Kubernetes 中用于存储和管理敏感信息(如密码、令牌、密钥等)的资源对象。Secret的设计目的是为了安全地存储和传输敏感信息,如密码、API密钥、证书等。这些信息通常不应该直接硬编码在配置文件或镜像中&#x…...
css重点知识汇总(一)
css重点知识汇总(一) 引入css的不同方式 link 通过src来获取相应的css资源。除了获取css之外还可以获取其他资源,例如js在页面载入是同步下载可以通过js对dom操作来改变css import css3引入的新方法只能引入css资源需要页面完全载入后才…...
PMP-项目运行环境
你好!我是 Lydia-穎穎 ♥感谢你的陪伴与支持 ~~~ 欢迎一起探索未知的知识和未来,现在lets go go go!!! 1. 影响项目的要素 项目存在在不同的环境下,环境对于项目的交付产生不同的影响。需了解环境对于项目的影响,采取相应措施应对…...
shell 脚本搭建apache
#!/bin/bash # Set Apache version to install ## author: yuan# 检查外网连接 echo "检查外网连接..." ping www.baidu.com -c 3 > /dev/null 2>&1 if [ $? -eq 0 ]; thenecho "外网通讯良好!" elseecho "网络连接失败&#x…...
Huawei 鲲鹏(ARM/Aarch64)服务器安装KVM虚拟机(非桌面视图)
提出问题 因需要进行ARM架构适配,需要在Huawei Taishan 200k(CPU: Kunpeng 920 5231K)上,创建几台虚拟机做为开发测试环境。 无奈好久没搞了,看了一下自己多年前写的文章:Huawei 鲲鹏…...
《Python实战进阶》No28: 使用 Paramiko 实现远程服务器管理
No28: 使用 Paramiko 实现远程服务器管理 摘要 在现代开发与运维中,远程服务器管理是必不可少的一环。通过 SSH 协议,我们可以安全地连接到远程服务器并执行各种操作。Python 的 Paramiko 模块是一个强大的工具,能够帮助我们实现自动化任务&…...
备赛蓝桥杯之第十六届模拟赛3期职业院校组
提示:本篇文章仅仅是作者自己目前在备赛蓝桥杯中,自己学习与刷题的学习笔记,写的不好,欢迎大家批评与建议 由于个别题目代码量与题目量偏大,请大家自己去蓝桥杯官网【连接高校和企业 - 蓝桥云课】去寻找原题࿰…...
【Kafka】深入了解Kafka
集群的成员关系 Kafka使用Zookeeper维护集群的成员信息。 每一个broker都有一个唯一的标识,这个标识可以在配置文件中指定,也可以自动生成。当broker在启动时通过创建Zookeeper的临时节点把自己的ID注册到Zookeeper中。broker、控制器和其他一些动态系…...
C++特性——RAII、智能指针
RAII 就像new一个需要delete,fopen之后需要fclose,但这样会有隐形问题(忘记释放)。RAII即用对象把这个过程给包起来,对象构造的时候,new或者fopen,析构的时候delete. 为什么需要智能指针 对于…...
C++异常处理时的异常类型抛出选择
在 C 中选择抛出哪种异常类型,主要取决于错误的性质以及希望传达的语义信息。以下是一些指导原则,帮助在可能发生异常的地方选择合适的异常类型进行抛出: 1. std::exception 适用场景:作为所有标准异常的基类,std::e…...
elsticsearch 通过reindex修改shards
elasticsearch reindex 索引。 背景: 索引test1 reindex到test2 修改sharding数量 程序是通过别名test1_alias访问索引 1、创建目标索引test2 索引需要手动提前创建自动创建可能会有mapping 不一致性的风险。 The destination should be configured as wanted …...
CentOS系类普通挂载磁盘挂载命令
检查磁盘是否有分区 lsblk如果 vdb 下面没有分区(比如 vdb1),你需要先创建分区。 创建分区(如果需要) fdisk /dev/vdb然后在 fdisk 交互界面: 输入 n 创建新分区 选择 p 创建主分区 默认分区号和大小 输…...
Kafka自定义分区机制
文章目录 1.如何自定义分区机制2.示例 1.如何自定义分区机制 若需要使用自定义分区机制,需要完成两件事: 1)在 producer 程序中创建一个类,实现 org.apache.kafka.clients.producer.Partitioner 接口主要分区逻辑在 Partitioner.partition中…...
【HarmonyOS NEXT】关键资产存储开发案例
在 iOS 开发中 Keychain 是一个非常安全的存储系统,用于保存敏感信息,如密码、证书、密钥等。与文件系统不同,Keychain 提供了更高的安全性,因为它对数据进行了加密,并且只有经过授权的应用程序才能访问存储的数据。那…...
