当前位置: 首页 > news >正文

Flink CDC系列之:Oracle CDC 导入 Elasticsearch

Flink CDC系列之:Oracle CDC 导入 Elasticsearch

  • 一、深入理解Flink Oracle CDC Connector
  • 二、创建docker-compose.yml文件
  • 三、启动容器
  • 四、下载Flink Oracle CDC的jar包
  • 五、启动 Flink 集群,再启动 SQL CLI
  • 六、检查 ElasticSearch 中的结果
  • 七、在 Oracle 制造一些变更,观察 ElasticSearch 中的结果

一、深入理解Flink Oracle CDC Connector

  • Flink CDC系列之:Oracle CDC Connector

二、创建docker-compose.yml文件

version: '2.1'
services:oracle:image: yuxialuo/oracle-xe-11g-r2-cdc-demo:v1.0ports:- "1521:1521"elasticsearch:image: elastic/elasticsearch:7.6.0environment:- cluster.name=docker-cluster- bootstrap.memory_lock=true- "ES_JAVA_OPTS=-Xms512m -Xmx512m"- discovery.type=single-nodeports:- "9200:9200"- "9300:9300"ulimits:memlock:soft: -1hard: -1nofile:soft: 65536hard: 65536kibana:image: elastic/kibana:7.6.0ports:- "5601:5601"volumes:- /var/run/docker.sock:/var/run/docker.sock

该 Docker Compose 中包含的容器有:

  • Oracle: Oracle 11g, 已经预先创建了 products 和 orders表,并插入了一些数据
  • Elasticsearch: orders 表将和 products 表进行join,join的结果写入Elasticsearch中
  • Kibana: 可视化 Elasticsearch 中的数据

三、启动容器

在 docker-compose.yml 所在目录下运行如下命令以启动所有容器:

docker-compose up -d

该命令会以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。 你可以通过 docker ps 来观察上述的容器是否正常启动了。 也可以访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。 另外可以通过如下命令停止所有的容器:

docker-compose down

四、下载Flink Oracle CDC的jar包

下载以下 jar 包到 <FLINK_HOME>/lib/:

  • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
  • flink-sql-connector-oracle-cdc-2.4.1.jar

五、启动 Flink 集群,再启动 SQL CLI

-- Flink SQL
-- checkpoint every 3000 milliseconds                       
Flink SQL> SET execution.checkpointing.interval = 3s;Flink SQL> CREATE TABLE products (ID INT,NAME STRING,DESCRIPTION STRING,PRIMARY KEY (ID) NOT ENFORCED) WITH ('connector' = 'oracle-cdc','hostname' = 'localhost','port' = '1521','username' = 'flinkuser','password' = 'flinkpw','database-name' = 'XE','schema-name' = 'flinkuser',  'table-name' = 'products');Flink SQL> CREATE TABLE orders (ORDER_ID INT,ORDER_DATE TIMESTAMP_LTZ(3),CUSTOMER_NAME STRING,PRICE DECIMAL(10, 5),PRODUCT_ID INT,ORDER_STATUS BOOLEAN) WITH ('connector' = 'oracle-cdc','hostname' = 'localhost','port' = '1521','username' = 'flinkuser','password' = 'flinkpw','database-name' = 'XE','schema-name' = 'flinkuser',  'table-name' = 'orders');

创建elasticsearch

Flink SQL> CREATE TABLE enriched_orders (ORDER_ID INT,ORDER_DATE TIMESTAMP_LTZ(3),CUSTOMER_NAME STRING,PRICE DECIMAL(10, 5),PRODUCT_ID INT,ORDER_STATUS BOOLEAN,PRODUCT_NAME STRING,PRODUCT_DESCRIPTION STRING,PRIMARY KEY (ORDER_ID) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://localhost:9200','index' = 'enriched_orders_1'

关联处理后,插入数据

Flink SQL> INSERT INTO enriched_ordersSELECT o.*, p.NAME, p.DESCRIPTIONFROM orders AS oLEFT JOIN products AS p ON o.PRODUCT_ID = p.ID;

六、检查 ElasticSearch 中的结果

检查最终的结果是否写入ElasticSearch中, 可以在Kibana看到ElasticSearch中的数据

七、在 Oracle 制造一些变更,观察 ElasticSearch 中的结果

进入Oracle容器中并通过如下的SQL语句对Oracle数据库进行一些修改, 然后就可以看到每执行一条SQL语句,Elasticsearch中的数据都会实时更新。

docker-compose exec sqlplus flinkuser/flinkpw

插入更新数据

INSERT INTO flinkuser.orders VALUES (10004, to_date('2020-07-30 15:22:00', 'yyyy-mm-dd hh24:mi:ss'), 'Jark', 29.71, 104, 0);UPDATE flinkuser.orders SET ORDER_STATUS = 1 WHERE ORDER_ID = 10004;DELETE FROM flinkuser.orders WHERE ORDER_ID = 10004;

相关文章:

Flink CDC系列之:Oracle CDC 导入 Elasticsearch

Flink CDC系列之&#xff1a;Oracle CDC 导入 Elasticsearch 一、深入理解Flink Oracle CDC Connector二、创建docker-compose.yml文件三、启动容器四、下载Flink Oracle CDC的jar包五、启动 Flink 集群&#xff0c;再启动 SQL CLI六、检查 ElasticSearch 中的结果七、在 Oracl…...

Linux忘记root密码解决方法

当我们忘记root密码进不去服务器怎么办&#xff1f;不要担心&#xff0c;可以进入到linux的救援模式修改root密码。 下面直接上干货&#xff0c;流程如下&#xff1a; 1.重启电脑&#xff0c;按上下键滑动&#xff0c;保证不进入开机流程&#xff0c;然后按e键 2.出现此页面…...

AR/VR眼镜转接器方案,实现同时传输视频快充方案

简介 虚拟现实头戴显示器设备&#xff0c;简称VR头显VR眼镜&#xff0c;是利用仿真技术与计算机图形学人机接口技术多媒体技术传感技术网络技术等多种技术集合的产品&#xff0c;是借助计算机及最新传感器技术创造的一种崭新的人机交互手段。VR头显VR眼镜是一个跨时代的产品。不…...

ASP.NET Core中路由规则匹配

RESTful约束&#xff0c;如果在一个控制器里面有多个Get、Post...的操作 1、在一个控制器里面可以定义多个API方法 2、通过路由规则来区分 /// <summary> /// 获取用户信息 /// </summary> /// <param name"user"></param> /// <returns…...

IDEA:Error running,Command line is too long. 解决方法

报错如下&#xff1a; Error running SendSmsUtil. Command line is too long. Shorten the command line via JAR manifest or via a classpath file and rerun.原因是启动命令过长。 解决方法&#xff1a; 1、打开Edit Configurations 2、点击Modify options设置&#x…...

什么是反射机制?为什么反射慢?

目录 面试回答 知识扩展 反射常见的使用方式 反射和 Class 的关系 面试回答 反射指的是程序在运行时能够获取自身的信息。在 java 中&#xff0c;只要给定类的名字&#xff0c;那么就可以通过反射机制来获得类的所有属性和方法。 Java 的反射可以&#xff1a; 在运行时判断…...

list元素

列表元素 列表元素分为有序列表和无序列表 有序列表 ol – order list – 有序列表 li – list item – 列表元素 <ol type"1"><li>有序列表1</li><li>有序列表2</li><li>有序列表3</li> </ol>属性 type type属…...

OkHttp 源码浅析一

演进之路:原生Android框架不好用 ---- HttpUrlConnect 和 Apache HTTPClient 第一版 底层使用HTTPURLConnect 第二版 Square构建 从Android4.4开始 基本使用: val okhttp OkHttpClient()val request Request.Builder().url("http://www.baidu.com").buil…...

【解决问题】远程仓库GitHub/GitLab添加了SSH Key之后依然无法clone的解决办法

GitHub/GitLab添加了SSH Key之后依然无法clone的解决办法 问题现象解决办法 问题现象 在Git远程仓库添加了自己的ssh key到账户下&#xff0c;git clone时&#xff0c;依然报错clone失败&#xff0c;请检查是否没有权限进行clone操作。 解决办法 在git的安装目录下&#xff…...

回归预测 | MATLAB实现SA-SVM模拟退火算法优化支持向量机多输入单输出回归预测(多指标,多图)

回归预测 | MATLAB实现SA-SVM模拟退火算法优化支持向量机多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09; 目录 回归预测 | MATLAB实现SA-SVM模拟退火算法优化支持向量机多输入单输出回归预测&#xff08;多指标&#xff0c;多图&#xff09;效果一览基本…...

Spring事务和事务传播机制(1)

前言&#x1f36d; ❤️❤️❤️SSM专栏更新中&#xff0c;各位大佬觉得写得不错&#xff0c;支持一下&#xff0c;感谢了&#xff01;❤️❤️❤️ Spring Spring MVC MyBatis_冷兮雪的博客-CSDN博客 在Spring框架中&#xff0c;事务管理是一种用于维护数据库操作的一致性和…...

如何快速在vscode中实现不同python文件的对比查看

总体而言&#xff1a;两种方式。一种是直接点击vscode右上角的图标&#xff08;见下图&#xff09;。 另一种方式就是使用快捷键啦“**Ctrl**”&#xff0c;用的时候选中想要对比的python文件&#xff0c;然后快捷键就可以达到下图效果了&#xff1a; 建议大家直接使用第二种…...

网络安全---Ring3下动态链接库.so函数劫持

一、动态链接库劫持原理 1.1、原理 Unix操作系统中&#xff0c;程序运行时会按照一定的规则顺序去查找依赖的动态链接库&#xff0c;当查找到指定的so文件时&#xff0c;动态链接器(/lib/ld-linux.so.X)会将程序所依赖的共享对象进行装载和初始化&#xff0c;而为什么可以使用…...

leetcode283. 移动零

难度&#xff1a;简单题 题目 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 请注意 &#xff0c;必须在不复制数组的情况下原地对数组进行操作。 思路&#xff1a; 一开始想&#xff0c;从前往后遍历&am…...

GuLi商城-前端基础Vue-生命周期和钩子函数

下图展示了实例的生命周期。你不需要立马弄明白所有的东西&#xff0c;不过随着你的不断学习和使用&#xff0c;它 的参考价值会越来越高。 VUE 的生命周期指的是组件在创建、运行和销毁过程中所经历的一系列事件&#xff0c;通过这些事件可以 让开发者在不同阶段进行相应的…...

输入输出+暴力模拟入门:魔法之树、染色の树、矩阵、字母加密、玫瑰鸭

秋招实习刷题网站推荐&#xff1a;codefun2000.com&#xff0c;还有题解博客&#xff1a;blog.codefun2000.com/。以下内容都是来自塔子哥的~ 输入输出 2023.04.15-春招-第三题-魔法之树 //#include<bits/stdc.h> #include<vector> #include<iostream>usin…...

​Kubernetes的演变:从etcd到分布式SQL的过渡

DevRel领域专家Denis Magda表示&#xff0c;他偶然发现了一篇解释如何用PostgreSQL无缝替换etcd的文章。该文章指出&#xff0c;Kine项目作为外部etcd端点&#xff0c;可以将Kubernetes etcd请求转换为底层关系数据库的SQL查询。 受到这种方法的启发&#xff0c;Magda决定进一步…...

29、简单通过git把项目远程提交到gitee

简单通过git把项目远程提交到gitee 1、在gitee上创建一个仓库 2、在要提交的项目文件夹打开git 输入 git init 初始化git 然后设置下用户名和邮箱 git config --global user.name “username” git config --global user.email “yourEmail” 因为我是要把文件简单提交到…...

元宇宙之应用(04)沉浸式游戏

在数字科技迅猛发展的今天&#xff0c;元宇宙的概念正逐渐从科幻走向现实&#xff0c;重新定义了人们与虚拟世界的交互方式。在这一概念的引领下&#xff0c;"沉浸式游戏" 蓬勃发展&#xff0c;为游戏体验带来了前所未有的深度和广度。那么&#xff0c;为什么沉浸式游…...

浙大数据结构第八周之08-图7 公路村村通

题目详情&#xff1a; 现有村落间道路的统计数据表中&#xff0c;列出了有可能建设成标准公路的若干条道路的成本&#xff0c;求使每个村落都有公路连通所需要的最低成本。 输入格式: 输入数据包括城镇数目正整数N&#xff08;≤1000&#xff09;和候选道路数目M&#xff08…...

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇&#xff0c;在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下&#xff1a; 【Note】&#xff1a;如果你已经完成安装等操作&#xff0c;可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作&#xff0c;重…...

业务系统对接大模型的基础方案:架构设计与关键步骤

业务系统对接大模型&#xff1a;架构设计与关键步骤 在当今数字化转型的浪潮中&#xff0c;大语言模型&#xff08;LLM&#xff09;已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中&#xff0c;不仅可以优化用户体验&#xff0c;还能为业务决策提供…...

Qt Widget类解析与代码注释

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码&#xff0c;写上注释 当然可以&#xff01;这段代码是 Qt …...

LeetCode - 394. 字符串解码

题目 394. 字符串解码 - 力扣&#xff08;LeetCode&#xff09; 思路 使用两个栈&#xff1a;一个存储重复次数&#xff0c;一个存储字符串 遍历输入字符串&#xff1a; 数字处理&#xff1a;遇到数字时&#xff0c;累积计算重复次数左括号处理&#xff1a;保存当前状态&a…...

关于iview组件中使用 table , 绑定序号分页后序号从1开始的解决方案

问题描述&#xff1a;iview使用table 中type: "index",分页之后 &#xff0c;索引还是从1开始&#xff0c;试过绑定后台返回数据的id, 这种方法可行&#xff0c;就是后台返回数据的每个页面id都不完全是按照从1开始的升序&#xff0c;因此百度了下&#xff0c;找到了…...

Swagger和OpenApi的前世今生

Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章&#xff0c;二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑&#xff1a; &#x1f504; 一、起源与初创期&#xff1a;Swagger的诞生&#xff08;2010-2014&#xff09; 核心…...

在Ubuntu24上采用Wine打开SourceInsight

1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...

基于Java+MySQL实现(GUI)客户管理系统

客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息&#xff0c;对客户进行统一管理&#xff0c;可以把所有客户信息录入系统&#xff0c;进行维护和统计功能。可通过文件的方式保存相关录入数据&#xff0c;对…...

通过 Ansible 在 Windows 2022 上安装 IIS Web 服务器

拓扑结构 这是一个用于通过 Ansible 部署 IIS Web 服务器的实验室拓扑。 前提条件&#xff1a; 在被管理的节点上安装WinRm 准备一张自签名的证书 开放防火墙入站tcp 5985 5986端口 准备自签名证书 PS C:\Users\azureuser> $cert New-SelfSignedCertificate -DnsName &…...

6️⃣Go 语言中的哈希、加密与序列化:通往区块链世界的钥匙

Go 语言中的哈希、加密与序列化:通往区块链世界的钥匙 一、前言:离区块链还有多远? 区块链听起来可能遥不可及,似乎是只有密码学专家和资深工程师才能涉足的领域。但事实上,构建一个区块链的核心并不复杂,尤其当你已经掌握了一门系统编程语言,比如 Go。 要真正理解区…...