Flink CDC系列之:Oracle CDC 导入 Elasticsearch
Flink CDC系列之:Oracle CDC 导入 Elasticsearch
- 一、深入理解Flink Oracle CDC Connector
- 二、创建docker-compose.yml文件
- 三、启动容器
- 四、下载Flink Oracle CDC的jar包
- 五、启动 Flink 集群,再启动 SQL CLI
- 六、检查 ElasticSearch 中的结果
- 七、在 Oracle 制造一些变更,观察 ElasticSearch 中的结果
一、深入理解Flink Oracle CDC Connector
- Flink CDC系列之:Oracle CDC Connector
二、创建docker-compose.yml文件
version: '2.1'
services:oracle:image: yuxialuo/oracle-xe-11g-r2-cdc-demo:v1.0ports:- "1521:1521"elasticsearch:image: elastic/elasticsearch:7.6.0environment:- cluster.name=docker-cluster- bootstrap.memory_lock=true- "ES_JAVA_OPTS=-Xms512m -Xmx512m"- discovery.type=single-nodeports:- "9200:9200"- "9300:9300"ulimits:memlock:soft: -1hard: -1nofile:soft: 65536hard: 65536kibana:image: elastic/kibana:7.6.0ports:- "5601:5601"volumes:- /var/run/docker.sock:/var/run/docker.sock
该 Docker Compose 中包含的容器有:
- Oracle: Oracle 11g, 已经预先创建了 products 和 orders表,并插入了一些数据
- Elasticsearch: orders 表将和 products 表进行join,join的结果写入Elasticsearch中
- Kibana: 可视化 Elasticsearch 中的数据
三、启动容器
在 docker-compose.yml 所在目录下运行如下命令以启动所有容器:
docker-compose up -d
该命令会以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。 你可以通过 docker ps 来观察上述的容器是否正常启动了。 也可以访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。 另外可以通过如下命令停止所有的容器:
docker-compose down
四、下载Flink Oracle CDC的jar包
下载以下 jar 包到 <FLINK_HOME>/lib/:
- flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
- flink-sql-connector-oracle-cdc-2.4.1.jar
五、启动 Flink 集群,再启动 SQL CLI
-- Flink SQL
-- checkpoint every 3000 milliseconds
Flink SQL> SET execution.checkpointing.interval = 3s;Flink SQL> CREATE TABLE products (ID INT,NAME STRING,DESCRIPTION STRING,PRIMARY KEY (ID) NOT ENFORCED) WITH ('connector' = 'oracle-cdc','hostname' = 'localhost','port' = '1521','username' = 'flinkuser','password' = 'flinkpw','database-name' = 'XE','schema-name' = 'flinkuser', 'table-name' = 'products');Flink SQL> CREATE TABLE orders (ORDER_ID INT,ORDER_DATE TIMESTAMP_LTZ(3),CUSTOMER_NAME STRING,PRICE DECIMAL(10, 5),PRODUCT_ID INT,ORDER_STATUS BOOLEAN) WITH ('connector' = 'oracle-cdc','hostname' = 'localhost','port' = '1521','username' = 'flinkuser','password' = 'flinkpw','database-name' = 'XE','schema-name' = 'flinkuser', 'table-name' = 'orders');
创建elasticsearch
Flink SQL> CREATE TABLE enriched_orders (ORDER_ID INT,ORDER_DATE TIMESTAMP_LTZ(3),CUSTOMER_NAME STRING,PRICE DECIMAL(10, 5),PRODUCT_ID INT,ORDER_STATUS BOOLEAN,PRODUCT_NAME STRING,PRODUCT_DESCRIPTION STRING,PRIMARY KEY (ORDER_ID) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9200','index' = 'enriched_orders_1'
关联处理后,插入数据
Flink SQL> INSERT INTO enriched_ordersSELECT o.*, p.NAME, p.DESCRIPTIONFROM orders AS oLEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;
六、检查 ElasticSearch 中的结果
检查最终的结果是否写入ElasticSearch中, 可以在Kibana看到ElasticSearch中的数据
七、在 Oracle 制造一些变更,观察 ElasticSearch 中的结果
进入Oracle容器中并通过如下的SQL语句对Oracle数据库进行一些修改, 然后就可以看到每执行一条SQL语句,Elasticsearch中的数据都会实时更新。
docker-compose exec sqlplus flinkuser/flinkpw
插入更新数据
INSERT INTO flinkuser.orders VALUES (10004, to_date('2020-07-30 15:22:00', 'yyyy-mm-dd hh24:mi:ss'), 'Jark', 29.71, 104, 0);UPDATE flinkuser.orders SET ORDER_STATUS = 1 WHERE ORDER_ID = 10004;DELETE FROM flinkuser.orders WHERE ORDER_ID = 10004;
相关文章:
Flink CDC系列之:Oracle CDC 导入 Elasticsearch
Flink CDC系列之:Oracle CDC 导入 Elasticsearch 一、深入理解Flink Oracle CDC Connector二、创建docker-compose.yml文件三、启动容器四、下载Flink Oracle CDC的jar包五、启动 Flink 集群,再启动 SQL CLI六、检查 ElasticSearch 中的结果七、在 Oracl…...
Linux忘记root密码解决方法
当我们忘记root密码进不去服务器怎么办?不要担心,可以进入到linux的救援模式修改root密码。 下面直接上干货,流程如下: 1.重启电脑,按上下键滑动,保证不进入开机流程,然后按e键 2.出现此页面…...
AR/VR眼镜转接器方案,实现同时传输视频快充方案
简介 虚拟现实头戴显示器设备,简称VR头显VR眼镜,是利用仿真技术与计算机图形学人机接口技术多媒体技术传感技术网络技术等多种技术集合的产品,是借助计算机及最新传感器技术创造的一种崭新的人机交互手段。VR头显VR眼镜是一个跨时代的产品。不…...
ASP.NET Core中路由规则匹配
RESTful约束,如果在一个控制器里面有多个Get、Post...的操作 1、在一个控制器里面可以定义多个API方法 2、通过路由规则来区分 /// <summary> /// 获取用户信息 /// </summary> /// <param name"user"></param> /// <returns…...
IDEA:Error running,Command line is too long. 解决方法
报错如下: Error running SendSmsUtil. Command line is too long. Shorten the command line via JAR manifest or via a classpath file and rerun.原因是启动命令过长。 解决方法: 1、打开Edit Configurations 2、点击Modify options设置&#x…...
什么是反射机制?为什么反射慢?
目录 面试回答 知识扩展 反射常见的使用方式 反射和 Class 的关系 面试回答 反射指的是程序在运行时能够获取自身的信息。在 java 中,只要给定类的名字,那么就可以通过反射机制来获得类的所有属性和方法。 Java 的反射可以: 在运行时判断…...
list元素
列表元素 列表元素分为有序列表和无序列表 有序列表 ol – order list – 有序列表 li – list item – 列表元素 <ol type"1"><li>有序列表1</li><li>有序列表2</li><li>有序列表3</li> </ol>属性 type type属…...
OkHttp 源码浅析一
演进之路:原生Android框架不好用 ---- HttpUrlConnect 和 Apache HTTPClient 第一版 底层使用HTTPURLConnect 第二版 Square构建 从Android4.4开始 基本使用: val okhttp OkHttpClient()val request Request.Builder().url("http://www.baidu.com").buil…...
【解决问题】远程仓库GitHub/GitLab添加了SSH Key之后依然无法clone的解决办法
GitHub/GitLab添加了SSH Key之后依然无法clone的解决办法 问题现象解决办法 问题现象 在Git远程仓库添加了自己的ssh key到账户下,git clone时,依然报错clone失败,请检查是否没有权限进行clone操作。 解决办法 在git的安装目录下ÿ…...
回归预测 | MATLAB实现SA-SVM模拟退火算法优化支持向量机多输入单输出回归预测(多指标,多图)
回归预测 | MATLAB实现SA-SVM模拟退火算法优化支持向量机多输入单输出回归预测(多指标,多图) 目录 回归预测 | MATLAB实现SA-SVM模拟退火算法优化支持向量机多输入单输出回归预测(多指标,多图)效果一览基本…...
Spring事务和事务传播机制(1)
前言🍭 ❤️❤️❤️SSM专栏更新中,各位大佬觉得写得不错,支持一下,感谢了!❤️❤️❤️ Spring Spring MVC MyBatis_冷兮雪的博客-CSDN博客 在Spring框架中,事务管理是一种用于维护数据库操作的一致性和…...
如何快速在vscode中实现不同python文件的对比查看
总体而言:两种方式。一种是直接点击vscode右上角的图标(见下图)。 另一种方式就是使用快捷键啦“**Ctrl**”,用的时候选中想要对比的python文件,然后快捷键就可以达到下图效果了: 建议大家直接使用第二种…...
网络安全---Ring3下动态链接库.so函数劫持
一、动态链接库劫持原理 1.1、原理 Unix操作系统中,程序运行时会按照一定的规则顺序去查找依赖的动态链接库,当查找到指定的so文件时,动态链接器(/lib/ld-linux.so.X)会将程序所依赖的共享对象进行装载和初始化,而为什么可以使用…...
leetcode283. 移动零
难度:简单题 题目 给定一个数组 nums,编写一个函数将所有 0 移动到数组的末尾,同时保持非零元素的相对顺序。 请注意 ,必须在不复制数组的情况下原地对数组进行操作。 思路: 一开始想,从前往后遍历&am…...
GuLi商城-前端基础Vue-生命周期和钩子函数
下图展示了实例的生命周期。你不需要立马弄明白所有的东西,不过随着你的不断学习和使用,它 的参考价值会越来越高。 VUE 的生命周期指的是组件在创建、运行和销毁过程中所经历的一系列事件,通过这些事件可以 让开发者在不同阶段进行相应的…...
输入输出+暴力模拟入门:魔法之树、染色の树、矩阵、字母加密、玫瑰鸭
秋招实习刷题网站推荐:codefun2000.com,还有题解博客:blog.codefun2000.com/。以下内容都是来自塔子哥的~ 输入输出 2023.04.15-春招-第三题-魔法之树 //#include<bits/stdc.h> #include<vector> #include<iostream>usin…...
Kubernetes的演变:从etcd到分布式SQL的过渡
DevRel领域专家Denis Magda表示,他偶然发现了一篇解释如何用PostgreSQL无缝替换etcd的文章。该文章指出,Kine项目作为外部etcd端点,可以将Kubernetes etcd请求转换为底层关系数据库的SQL查询。 受到这种方法的启发,Magda决定进一步…...
29、简单通过git把项目远程提交到gitee
简单通过git把项目远程提交到gitee 1、在gitee上创建一个仓库 2、在要提交的项目文件夹打开git 输入 git init 初始化git 然后设置下用户名和邮箱 git config --global user.name “username” git config --global user.email “yourEmail” 因为我是要把文件简单提交到…...
元宇宙之应用(04)沉浸式游戏
在数字科技迅猛发展的今天,元宇宙的概念正逐渐从科幻走向现实,重新定义了人们与虚拟世界的交互方式。在这一概念的引领下,"沉浸式游戏" 蓬勃发展,为游戏体验带来了前所未有的深度和广度。那么,为什么沉浸式游…...
浙大数据结构第八周之08-图7 公路村村通
题目详情: 现有村落间道路的统计数据表中,列出了有可能建设成标准公路的若干条道路的成本,求使每个村落都有公路连通所需要的最低成本。 输入格式: 输入数据包括城镇数目正整数N(≤1000)和候选道路数目M(…...
项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决
Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中,新增了一个本地验证码接口 /code,使用函数式路由(RouterFunction)和 Hutool 的 Circle…...
Python 包管理器 uv 介绍
Python 包管理器 uv 全面介绍 uv 是由 Astral(热门工具 Ruff 的开发者)推出的下一代高性能 Python 包管理器和构建工具,用 Rust 编写。它旨在解决传统工具(如 pip、virtualenv、pip-tools)的性能瓶颈,同时…...
人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...
在Mathematica中实现Newton-Raphson迭代的收敛时间算法(一般三次多项式)
考察一般的三次多项式,以r为参数: p[z_, r_] : z^3 (r - 1) z - r; roots[r_] : z /. Solve[p[z, r] 0, z]; 此多项式的根为: 尽管看起来这个多项式是特殊的,其实一般的三次多项式都是可以通过线性变换化为这个形式…...
Ubuntu Cursor升级成v1.0
0. 当前版本低 使用当前 Cursor v0.50时 GitHub Copilot Chat 打不开,快捷键也不好用,当看到 Cursor 升级后,还是蛮高兴的 1. 下载 Cursor 下载地址:https://www.cursor.com/cn/downloads 点击下载 Linux (x64) ,…...
【堆垛策略】设计方法
堆垛策略的设计是积木堆叠系统的核心,直接影响堆叠的稳定性、效率和容错能力。以下是分层次的堆垛策略设计方法,涵盖基础规则、优化算法和容错机制: 1. 基础堆垛规则 (1) 物理稳定性优先 重心原则: 大尺寸/重量积木在下…...
微服务通信安全:深入解析mTLS的原理与实践
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、引言:微服务时代的通信安全挑战 随着云原生和微服务架构的普及,服务间的通信安全成为系统设计的核心议题。传统的单体架构中&…...
实战设计模式之模板方法模式
概述 模板方法模式定义了一个操作中的算法骨架,并将某些步骤延迟到子类中实现。模板方法使得子类可以在不改变算法结构的前提下,重新定义算法中的某些步骤。简单来说,就是在一个方法中定义了要执行的步骤顺序或算法框架,但允许子类…...
一些实用的chrome扩展0x01
简介 浏览器扩展程序有助于自动化任务、查找隐藏的漏洞、隐藏自身痕迹。以下列出了一些必备扩展程序,无论是测试应用程序、搜寻漏洞还是收集情报,它们都能提升工作流程。 FoxyProxy 代理管理工具,此扩展简化了使用代理(如 Burp…...
