当前位置: 首页 > news >正文

flink cdc oceanbase(binlog模式)

接上文:一文说清flink从编码到部署上线
环境:①操作系统:阿里龙蜥 7.9(平替CentOS7.9);②CPU:x86;③用户:root。

预研初衷:现在很多项目有国产化的要求,操作系统、数据库需要国产化,然后就想着找既能开源免费,又能很好的兼容MySQL,还能很好支持flink。然后就在信创目录找到OceanBase数据库。

flink探索:flink CDC 找到这个文章Flink CDC 配置 OceanBase 实战指南,官网论坛感觉比较靠谱,然而发现按照说明引入依赖后,相关语法是不支持的。也在网上找了比较多的其它资料,中间比较坎坷,都未解决,不再赘述。最后转换思路:既然OceanBase支持MySQL binlog,那就把OceanBase当MySQL用,使用MySQL CDC是不是可以,最后问题得到解决。下面展开说明。

1.OceanBase部署

1.1 obd 部署

官方文档:oceanbase部署

注意:①这个地方最好选择obd 图形化部署,docker部署虽然简单,但是后续安装obbinlog会比较麻烦。②操作系统不要使用CentOS了,好多yum源不能用了。可以使用“阿里龙蜥 7.9”。

部署完,记得保存相关账号信息(供参考):

[{"component": "oceanbase-ce","access_url": "10.86.97.168:2881","user": "root","password": "pwd","connect_url": "obclient -h10.86.97.168 -P2881 -uroot -p'pwd' -Doceanbase -A"},{"component": "obproxy-ce","access_url": "10.86.97.168:2883","user": "root@proxysys","password": "Y6.B4s)pt","connect_url": "obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A \n"},{"component": "ocp-express","access_url": "10.86.97.168:8180","user": "admin","password": "DSxF-{odkdX-bmL6fjrF2{3mLL","connect_url": "http://10.86.97.168:8180"}
]

这个“ocp-express”是个监控页面,能看到集群信息,访问“http://10.86.97.168:8180”:
在这里插入图片描述

1.2 常用命令

启动:obd cluster start myoceanbase(改成具体集群名称)常用命令:
# 查看集群列表
obd cluster list
# 查看集群状态,以部署名为 obtest 为例
obd cluster display obtest
# 停止运行中的集群,以部署名为 obtest 为例
obd cluster stop obtest
# 销毁已部署的集群,以部署名为 obtest 为例
obd cluster destroy obtest

2.obbinlog

2.1 部署

官方文档:obbinlog部署

部署过程中,会遇到这个错误:“https://mirrors.oceanbase.com/community/stable/el/7.9/x86_64/repodata/repomd.xml: [Errno 14] HTTPS Error 404 - Not Found”

解决方法:修改“/etc/yum.repos.d/OceanBase.repo”中,“$releasever”改为“7”。
在这里插入图片描述
解决完上面这个错误,其它地方就比较顺利了。

查看是否安装成功:

netstat -anp | grep 2983

2.2 创建租户

由于“不可以用root@sys创建binlog任务”,所以要创建租户。

1.查看所有的租户信息:
SELECT * FROM oceanbase.DBA_OB_TENANTS;2.查看resource pool:
SELECT * FROM oceanbase.DBA_OB_RESOURCE_POOLS;3.创建“资源规格(UNIT)”
CREATE RESOURCE UNIT S1_unit_flink_testMEMORY_SIZE = '2G',MAX_CPU = 1, MIN_CPU = 1,LOG_DISK_SIZE = '6G',MAX_IOPS = 10000, MIN_IOPS = 10000, IOPS_WEIGHT=1;4.创建resource pool(仅 sys 租户的 root 用户(root@sys)可以创建资源池,其他租户不支持创建资源池)
-- sys_unit_config大概2GB内存。
CREATE RESOURCE POOL tenant_pool_flink_test UNIT='sys_unit_config', UNIT_NUM=1, ZONE_LIST=('zone1');5.创建租户:创建一个名为  flink_test_tenant 的租户(默认为 MySQL 模式租户),副本数为1,资源池指定为 flink_test_tenant_pool_01,Primary Zone 为 zone1,允许所有 IP 连接数据库。
CREATE TENANT IF NOT EXISTS flink_test_tenant  PRIMARY_ZONE='zone1', RESOURCE_POOL_LIST=('tenant_pool_flink_test') set OB_TCP_INVITED_NODES='%';6.使用新创建的租户管理员登录:
用户名:root@flink_test_tenant
密码:默认为空(有需要可以自己设置密码)7.创建用户( CREATE USER 权限较大,默认仅集群管理员和租户管理员拥有此系统权限):
CREATE USER 'test' IDENTIFIED BY 'pwd';
GRANT ALL ON *.* TO 'test';8.常用命令
其它命令,删除用户:
drop user 'test';
删除“资源规格”:
DROP RESOURCE UNIT S1_unit_flink_test;
查询已有的“资源规格”信息:
SELECT * FROM oceanbase.DBA_OB_UNIT_CONFIGS;

2.3 创建数据库

账号:test@flink_test_tenant
密码:pwd。
使用上面账号登录oceanbase后创建数据库。

CREATE DATABASE IF NOT EXISTS `flink_test`;
USE `flink_test`;SET FOREIGN_KEY_CHECKS=0;-- ----------------------------
-- Table structure for rv_table
-- ----------------------------
DROP TABLE IF EXISTS `rv_table`;
CREATE TABLE `rv_table` (`dt` varchar(10) NOT NULL ,`uuid` varchar(30) DEFAULT NULL,`report_time` bigint(20) DEFAULT NULL,PRIMARY KEY (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;2024-12-25	uid20241225	1735090201740
2024-12-26	uid20241226	1735090201741

2.4 创建binlog

进入binlog server服务:

cd /home/oceanbase-all-in-one/obclient/u01/obclient/bin
obclient -h127.0.0.1 -P2983

创建binlog:

CREATE BINLOG FOR TENANT `myoceanbase`.`flink_test_tenant` TO USER `root` PASSWORD `pwd` WITH CLUSTER URL `http://10.86.97.168:8080/services?Action=ObRootServiceInfo&ObCluster=myoceanbase`,REPLICATE NUM 2;

2.5 配置ODP

账号密码见安装完成保存的账号信息。
obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A
ALTER proxyconfig SET binlog_service_ip='10.86.97.168:2983';

2.6 验证结果

obclient -h10.86.97.168 -P2883 -uroot@flink_test_tenant  -p -Doceanbase -A
默认密码为空(到输密码时直接回车就行)。SHOW MASTER STATUS;SHOW BINLOG EVENTS;

2.7 常见问题

问题描述:CREATE BINLOG 报 “ERROR 1236 (HY000): Internal error”

查看日志:“[error] mysql_connecton_wrapper.cpp(121): Failed to execute query, error: (conn=3221748588) Table ‘flink_test.instances_gtid_seq’ doesn’t exist”,提示没有binlog相关表。

日志路径:/home/ds/oblogproxy/log/logproxy.log

解决:重新执行“sudo sh env/deploy.sh -m deploy -f env/deploy.conf.json”
相关的数据表会重建。然后再执行“CREATE BINLOG”即可。

或者说:应该先创建数据库,再安装obbinlog组件。安装后会在数据库创建binlog相关数据库表,如下:
在这里插入图片描述

3. fink CDC

3.1 核心代码

package com.zl.oceanbase;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.zl.utils.EnvUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;/*** 就当成MySQL使用就行。*/
public class OceanBaseCDCLikeMySQLExample {public static void main(String[] args) throws Exception {List<String> SYNC_TABLES = Arrays.asList("flink_test.rv_table");MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("10.86.97.168").port(2883)// oceanbase安装时obproxy-ce组件端口.databaseList("flink_test").tableList(String.join(",", SYNC_TABLES)).username("root@flink_test_tenant").password("")// 记得修改为实际密码.startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/OceanBaseCDCLikeMySQLExample");// 如果不能正常读取mysql的binlog:①可能是mysql没有打开binlog或者mysql版本不支持(当前在mysql5.7.20环境下,功能正常);// ②可能是数据库ip、port、账号、密码错误。env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(1).print();env.execute("Print MySQL Snapshot + Binlog");}
}

3.2 flink web

在这里插入图片描述

3.3 控制台日志

在这里插入图片描述

3.4 完整代码

完成代码见:flink-cdc-mysql

4.扩展

本文主要基于oceanbase oblogproxy的binlog模式。
其实oblogproxy还支持CDC模式,详见官网文档:CDC模式。

相关文章:

flink cdc oceanbase(binlog模式)

接上文&#xff1a;一文说清flink从编码到部署上线 环境&#xff1a;①操作系统&#xff1a;阿里龙蜥 7.9&#xff08;平替CentOS7.9&#xff09;&#xff1b;②CPU&#xff1a;x86&#xff1b;③用户&#xff1a;root。 预研初衷&#xff1a;现在很多项目有国产化的要求&#…...

【WPF】 数据绑定机制之INotifyPropertyChanged

INotifyPropertyChanged 是 WPF 中的一个接口&#xff0c;用于实现 数据绑定 中的 属性更改通知。它的主要作用是&#xff0c;当对象的某个属性值发生更改时&#xff0c;通知绑定到该属性的 UI 控件更新其显示内容。 以下是有关 INotifyPropertyChanged 的详细信息和实现方法&…...

机器学习算法深度解析:以支持向量机(SVM)为例及实战应用

机器学习算法深度解析&#xff1a;以支持向量机&#xff08;SVM&#xff09;为例及实战应用 在当今数据驱动的时代&#xff0c;机器学习作为人工智能的一个核心分支&#xff0c;正以前所未有的速度改变着我们的生活与工作方式。从金融风控到医疗诊断&#xff0c;从自动驾驶到智…...

网络编程基础:连接Java的秘密网络

1 网络编程的重要性 网络编程允许Java应用程序与其他计算机或设备进行通信。这包括从简单的数据传输到复杂的分布式系统和Web服务。 2 Java网络编程的核心类 Java提供了多个类来支持网络编程&#xff1a; InetAddress&#xff1a;表示网络上的IP地址。 URL&#xff1a;表示统…...

无监督学习:自编码器(AutoEncoder)

自编码器&#xff1a;数据的净化之旅 引言 自编码器作为一种强大的特征学习方法&#xff0c;已经经历了从简单到复杂的发展历程。本文综述了多种类型的自编码器及其演进过程&#xff0c;强调了它们在数据降维、图像处理、噪声去除及生成模型等方面的关键作用。随着技术的进步…...

在不到 5 分钟的时间内将威胁情报 PDF 添加为 AI 助手的自定义知识

作者&#xff1a;来自 Elastic jamesspi 安全运营团队通常会维护威胁情报报告的存储库&#xff0c;这些报告包含由报告提供商生成的大量知识。然而&#xff0c;挑战在于&#xff0c;这些报告的内容通常以 PDF 格式存在&#xff0c;使得在处理安全事件或调查时难以检索和引用相关…...

Memcached prepend 命令

Memcached prepend 命令用于向已存在 key(键) 的 value(数据值) 前面追加数据 。 语法&#xff1a; prepend 命令的基本语法格式如下&#xff1a; prepend key flags exptime bytes [noreply] value参数说明如下&#xff1a; key&#xff1a;键值 key-value 结构中的 key&a…...

Win10 VScode配置远程Linux开发环境

Windows VScode配置远程Linux开发环境 记录一下在Windows下VScode配置远程连接Linux环境进行开发的过程。 VScode的远程编程与调试的插件Remote Development&#xff0c;使用这个插件可以在很多情况下代替vim直接远程修改与调试服务器上的代码&#xff0c;搭配上VScode的语言…...

微信小程序校园自助点餐系统实战:从设计到实现

随着移动互联网的发展&#xff0c;越来越多的校园场景开始智能化、自助化。微信小程序凭借其轻量化、便捷性和强大的生态支持&#xff0c;成为了各类校园应用的首选工具之一。今天&#xff0c;我们将通过实际开发一个微信小程序“校园自助点餐系统”来展示如何设计和实现这样一…...

解决sublime编译无法输入问题

在使用sublime编译简单的c语言的时候,发现编译过程中,带有scanf的程序,无法正确的输入。 需要提前配置好gcc 和g++ 一、新增配置 新建编译系统文件:C.sublime-build 具体步骤:菜单中选择Tools——Build System——New Build System——保存文件名C.sublime-build ,填写以…...

const修饰指针总结

作者简介&#xff1a; 一个平凡而乐于分享的小比特&#xff0c;中南民族大学通信工程专业研究生在读&#xff0c;研究方向无线联邦学习 擅长领域&#xff1a;驱动开发&#xff0c;嵌入式软件开发&#xff0c;BSP开发 作者主页&#xff1a;一个平凡而乐于分享的小比特的个人主页…...

uniapp实现后端数据i18n国际化

1.在main.js配置请求获取到数据再设置到i18n中&#xff0c; 我这里是通过后端接口先获取到一个多个数据的的json链接&#xff0c;通过链接再获取数据&#xff0c;拿到数据后通过遍历的方式设置i18n //接口数据示例&#xff1a;{"vi": "http://localhost:8899/…...

什么是国密设计

国密设计&#xff0c;全称为“国家密码算法设计”&#xff0c;是指中国自主研发的一系列密码学算法和相关的技术标准。这些算法旨在提供安全可靠的加密、解密、签名验证等服务&#xff0c;并且在中国的信息安全领域中扮演着至关重要的角色。以下是关于国密设计的详细解释&#…...

Android IO 问题:java.io.IOException Operation not permitted

问题描述与处理策略 1、问题描述 java.io.IOException: Operation not permittedjava.nio.file.FileSystemException: /storage/emulated/0/test/test.txt: Operation not permittedjava.io.IOException: Operation not permitted&#xff1a;异常为操作不被允许 java.nio.f…...

安装bert_embedding遇到问题

在使用命令&#xff1a; pip install bert-embedding 安装bert_embedding的时候&#xff0c;遇到如下问题&#xff1a; ERROR: Failed cleaning build dir for numpy Successfully built gluonnlp Failed to build numpy ERROR: ERROR: Failed to build installable wheel…...

cka考试-03-k8s版本升级

一、原题 二、解答 [root@master ~]# kubectl get node NAME STATUS ROLES AGE VERSION master Ready control-plane,master 25h v1.22.12 node1 Ready worker 25h v1.22.12 node2 Ready worker …...

【insert 插入数据语法合集】.NET开源ORM框架 SqlSugar 系列

系列文章目录 &#x1f380;&#x1f380;&#x1f380; .NET开源 ORM 框架 SqlSugar 系列 &#x1f380;&#x1f380;&#x1f380; 文章目录 系列文章目录一、前言 &#x1f343;二、插入方式 &#x1f4af;2.1 单条插入实体2.2 批量 插入实体2.3 根据字典插入2.4 根据 Dat…...

Spring Boot 的自动配置,以rabbitmq为例,请详细说明

Spring Boot 的自动配置特性能够大大简化集成外部服务和组件的配置过程。以 RabbitMQ 为例&#xff0c;Spring Boot 通过 spring-boot-starter-amqp 提供了自动配置支持&#xff0c;开发者只需在应用中添加相关依赖并配置必要的属性&#xff0c;Spring Boot 会自动配置所需的连…...

Visual Studio 2022+Qt6.5.3安装教程+环境配置+创建Qt项目+乱码插件+运行很完美(16岁孩子也能看懂)

点击上方"蓝字"关注我们 01、安装VS2022 >>> 一、安装VS2022 1、VS2022下载链接:Visual Studio 2022 IDE - 适用于软件开发人员的编程工具[https://visualstudio.microsoft.com/zh-hans/vs/] 2、选择Community 2022个人免费版,点击下载[https://gitcode.…...

LeetCode - 初级算法 数组(旋转数组)

旋转数组 这篇文章讨论如何通过编程实现数组元素的旋转操作。 免责声明:本文来源于个人知识与公开资料,仅用于学术交流。 描述 给定一个整数数组 nums,将数组中的元素向右轮转 k 个位置,其中 k 是非负数。 示例: 输入: nums = [1,2,3,...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)

文章目录 1.什么是Redis&#xff1f;2.为什么要使用redis作为mysql的缓存&#xff1f;3.什么是缓存雪崩、缓存穿透、缓存击穿&#xff1f;3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

STM32F4基本定时器使用和原理详解

STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...

【JavaSE】绘图与事件入门学习笔记

-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角&#xff0c;以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向&#xff0c;距离坐标原点x个像素;第二个是y坐标&#xff0c;表示当前位置为垂直方向&#xff0c;距离坐标原点y个像素。 坐标体系-像素 …...

安卓基础(aar)

重新设置java21的环境&#xff0c;临时设置 $env:JAVA_HOME "D:\Android Studio\jbr" 查看当前环境变量 JAVA_HOME 的值 echo $env:JAVA_HOME 构建ARR文件 ./gradlew :private-lib:assembleRelease 目录是这样的&#xff1a; MyApp/ ├── app/ …...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)

前言&#xff1a; 最近在做行为检测相关的模型&#xff0c;用的是时空图卷积网络&#xff08;STGCN&#xff09;&#xff0c;但原有kinetic-400数据集数据质量较低&#xff0c;需要进行细粒度的标注&#xff0c;同时粗略搜了下已有开源工具基本都集中于图像分割这块&#xff0c…...

JavaScript基础-API 和 Web API

在学习JavaScript的过程中&#xff0c;理解API&#xff08;应用程序接口&#xff09;和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能&#xff0c;使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化

缓存架构 代码结构 代码详情 功能点&#xff1a; 多级缓存&#xff0c;先查本地缓存&#xff0c;再查Redis&#xff0c;最后才查数据库热点数据重建逻辑使用分布式锁&#xff0c;二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...

腾讯云V3签名

想要接入腾讯云的Api&#xff0c;必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口&#xff0c;但总是卡在签名这一步&#xff0c;最后放弃选择SDK&#xff0c;这次终于自己代码实现。 可能腾讯云翻新了接口文档&#xff0c;现在阅读起来&#xff0c;清晰了很多&…...

scikit-learn机器学习

# 同时添加如下代码, 这样每次环境(kernel)启动的时候只要运行下方代码即可: # Also add the following code, # so that every time the environment (kernel) starts, # just run the following code: import sys sys.path.append(/home/aistudio/external-libraries)机…...

【JavaSE】多线程基础学习笔记

多线程基础 -线程相关概念 程序&#xff08;Program&#xff09; 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序&#xff0c;比如我们使用QQ&#xff0c;就启动了一个进程&#xff0c;操作系统就会为该进程分配内存…...