flink cdc oceanbase(binlog模式)
接上文:一文说清flink从编码到部署上线
环境:①操作系统:阿里龙蜥 7.9(平替CentOS7.9);②CPU:x86;③用户:root。
预研初衷:现在很多项目有国产化的要求,操作系统、数据库需要国产化,然后就想着找既能开源免费,又能很好的兼容MySQL,还能很好支持flink。然后就在信创目录找到OceanBase数据库。
flink探索:flink CDC 找到这个文章Flink CDC 配置 OceanBase 实战指南,官网论坛感觉比较靠谱,然而发现按照说明引入依赖后,相关语法是不支持的。也在网上找了比较多的其它资料,中间比较坎坷,都未解决,不再赘述。最后转换思路:既然OceanBase支持MySQL binlog,那就把OceanBase当MySQL用,使用MySQL CDC是不是可以,最后问题得到解决。下面展开说明。
1.OceanBase部署
1.1 obd 部署
官方文档:oceanbase部署
注意:①这个地方最好选择obd 图形化部署,docker部署虽然简单,但是后续安装obbinlog会比较麻烦。②操作系统不要使用CentOS了,好多yum源不能用了。可以使用“阿里龙蜥 7.9”。
部署完,记得保存相关账号信息(供参考):
[{"component": "oceanbase-ce","access_url": "10.86.97.168:2881","user": "root","password": "pwd","connect_url": "obclient -h10.86.97.168 -P2881 -uroot -p'pwd' -Doceanbase -A"},{"component": "obproxy-ce","access_url": "10.86.97.168:2883","user": "root@proxysys","password": "Y6.B4s)pt","connect_url": "obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A \n"},{"component": "ocp-express","access_url": "10.86.97.168:8180","user": "admin","password": "DSxF-{odkdX-bmL6fjrF2{3mLL","connect_url": "http://10.86.97.168:8180"}
]
这个“ocp-express”是个监控页面,能看到集群信息,访问“http://10.86.97.168:8180”:

1.2 常用命令
启动:obd cluster start myoceanbase(改成具体集群名称)常用命令:
# 查看集群列表
obd cluster list
# 查看集群状态,以部署名为 obtest 为例
obd cluster display obtest
# 停止运行中的集群,以部署名为 obtest 为例
obd cluster stop obtest
# 销毁已部署的集群,以部署名为 obtest 为例
obd cluster destroy obtest
2.obbinlog
2.1 部署
官方文档:obbinlog部署
部署过程中,会遇到这个错误:“https://mirrors.oceanbase.com/community/stable/el/7.9/x86_64/repodata/repomd.xml: [Errno 14] HTTPS Error 404 - Not Found”
解决方法:修改“/etc/yum.repos.d/OceanBase.repo”中,“$releasever”改为“7”。

解决完上面这个错误,其它地方就比较顺利了。
查看是否安装成功:
netstat -anp | grep 2983
2.2 创建租户
由于“不可以用root@sys创建binlog任务”,所以要创建租户。
1.查看所有的租户信息:
SELECT * FROM oceanbase.DBA_OB_TENANTS;2.查看resource pool:
SELECT * FROM oceanbase.DBA_OB_RESOURCE_POOLS;3.创建“资源规格(UNIT)”
CREATE RESOURCE UNIT S1_unit_flink_testMEMORY_SIZE = '2G',MAX_CPU = 1, MIN_CPU = 1,LOG_DISK_SIZE = '6G',MAX_IOPS = 10000, MIN_IOPS = 10000, IOPS_WEIGHT=1;4.创建resource pool(仅 sys 租户的 root 用户(root@sys)可以创建资源池,其他租户不支持创建资源池)
-- sys_unit_config大概2GB内存。
CREATE RESOURCE POOL tenant_pool_flink_test UNIT='sys_unit_config', UNIT_NUM=1, ZONE_LIST=('zone1');5.创建租户:创建一个名为 flink_test_tenant 的租户(默认为 MySQL 模式租户),副本数为1,资源池指定为 flink_test_tenant_pool_01,Primary Zone 为 zone1,允许所有 IP 连接数据库。
CREATE TENANT IF NOT EXISTS flink_test_tenant PRIMARY_ZONE='zone1', RESOURCE_POOL_LIST=('tenant_pool_flink_test') set OB_TCP_INVITED_NODES='%';6.使用新创建的租户管理员登录:
用户名:root@flink_test_tenant
密码:默认为空(有需要可以自己设置密码)7.创建用户( CREATE USER 权限较大,默认仅集群管理员和租户管理员拥有此系统权限):
CREATE USER 'test' IDENTIFIED BY 'pwd';
GRANT ALL ON *.* TO 'test';8.常用命令
其它命令,删除用户:
drop user 'test';
删除“资源规格”:
DROP RESOURCE UNIT S1_unit_flink_test;
查询已有的“资源规格”信息:
SELECT * FROM oceanbase.DBA_OB_UNIT_CONFIGS;
2.3 创建数据库
账号:test@flink_test_tenant
密码:pwd。
使用上面账号登录oceanbase后创建数据库。
CREATE DATABASE IF NOT EXISTS `flink_test`;
USE `flink_test`;SET FOREIGN_KEY_CHECKS=0;-- ----------------------------
-- Table structure for rv_table
-- ----------------------------
DROP TABLE IF EXISTS `rv_table`;
CREATE TABLE `rv_table` (`dt` varchar(10) NOT NULL ,`uuid` varchar(30) DEFAULT NULL,`report_time` bigint(20) DEFAULT NULL,PRIMARY KEY (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;2024-12-25 uid20241225 1735090201740
2024-12-26 uid20241226 1735090201741
2.4 创建binlog
进入binlog server服务:
cd /home/oceanbase-all-in-one/obclient/u01/obclient/bin
obclient -h127.0.0.1 -P2983
创建binlog:
CREATE BINLOG FOR TENANT `myoceanbase`.`flink_test_tenant` TO USER `root` PASSWORD `pwd` WITH CLUSTER URL `http://10.86.97.168:8080/services?Action=ObRootServiceInfo&ObCluster=myoceanbase`,REPLICATE NUM 2;
2.5 配置ODP
账号密码见安装完成保存的账号信息。
obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A
ALTER proxyconfig SET binlog_service_ip='10.86.97.168:2983';
2.6 验证结果
obclient -h10.86.97.168 -P2883 -uroot@flink_test_tenant -p -Doceanbase -A
默认密码为空(到输密码时直接回车就行)。SHOW MASTER STATUS;SHOW BINLOG EVENTS;
2.7 常见问题
问题描述:CREATE BINLOG 报 “ERROR 1236 (HY000): Internal error”
查看日志:“[error] mysql_connecton_wrapper.cpp(121): Failed to execute query, error: (conn=3221748588) Table ‘flink_test.instances_gtid_seq’ doesn’t exist”,提示没有binlog相关表。
日志路径:/home/ds/oblogproxy/log/logproxy.log
解决:重新执行“sudo sh env/deploy.sh -m deploy -f env/deploy.conf.json”
相关的数据表会重建。然后再执行“CREATE BINLOG”即可。
或者说:应该先创建数据库,再安装obbinlog组件。安装后会在数据库创建binlog相关数据库表,如下:

3. fink CDC
3.1 核心代码
package com.zl.oceanbase;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.zl.utils.EnvUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;/*** 就当成MySQL使用就行。*/
public class OceanBaseCDCLikeMySQLExample {public static void main(String[] args) throws Exception {List<String> SYNC_TABLES = Arrays.asList("flink_test.rv_table");MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("10.86.97.168").port(2883)// oceanbase安装时obproxy-ce组件端口.databaseList("flink_test").tableList(String.join(",", SYNC_TABLES)).username("root@flink_test_tenant").password("")// 记得修改为实际密码.startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/OceanBaseCDCLikeMySQLExample");// 如果不能正常读取mysql的binlog:①可能是mysql没有打开binlog或者mysql版本不支持(当前在mysql5.7.20环境下,功能正常);// ②可能是数据库ip、port、账号、密码错误。env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(1).print();env.execute("Print MySQL Snapshot + Binlog");}
}
3.2 flink web

3.3 控制台日志

3.4 完整代码
完成代码见:flink-cdc-mysql
4.扩展
本文主要基于oceanbase oblogproxy的binlog模式。
其实oblogproxy还支持CDC模式,详见官网文档:CDC模式。
相关文章:
flink cdc oceanbase(binlog模式)
接上文:一文说清flink从编码到部署上线 环境:①操作系统:阿里龙蜥 7.9(平替CentOS7.9);②CPU:x86;③用户:root。 预研初衷:现在很多项目有国产化的要求&#…...
【WPF】 数据绑定机制之INotifyPropertyChanged
INotifyPropertyChanged 是 WPF 中的一个接口,用于实现 数据绑定 中的 属性更改通知。它的主要作用是,当对象的某个属性值发生更改时,通知绑定到该属性的 UI 控件更新其显示内容。 以下是有关 INotifyPropertyChanged 的详细信息和实现方法&…...
机器学习算法深度解析:以支持向量机(SVM)为例及实战应用
机器学习算法深度解析:以支持向量机(SVM)为例及实战应用 在当今数据驱动的时代,机器学习作为人工智能的一个核心分支,正以前所未有的速度改变着我们的生活与工作方式。从金融风控到医疗诊断,从自动驾驶到智…...
网络编程基础:连接Java的秘密网络
1 网络编程的重要性 网络编程允许Java应用程序与其他计算机或设备进行通信。这包括从简单的数据传输到复杂的分布式系统和Web服务。 2 Java网络编程的核心类 Java提供了多个类来支持网络编程: InetAddress:表示网络上的IP地址。 URL:表示统…...
无监督学习:自编码器(AutoEncoder)
自编码器:数据的净化之旅 引言 自编码器作为一种强大的特征学习方法,已经经历了从简单到复杂的发展历程。本文综述了多种类型的自编码器及其演进过程,强调了它们在数据降维、图像处理、噪声去除及生成模型等方面的关键作用。随着技术的进步…...
在不到 5 分钟的时间内将威胁情报 PDF 添加为 AI 助手的自定义知识
作者:来自 Elastic jamesspi 安全运营团队通常会维护威胁情报报告的存储库,这些报告包含由报告提供商生成的大量知识。然而,挑战在于,这些报告的内容通常以 PDF 格式存在,使得在处理安全事件或调查时难以检索和引用相关…...
Memcached prepend 命令
Memcached prepend 命令用于向已存在 key(键) 的 value(数据值) 前面追加数据 。 语法: prepend 命令的基本语法格式如下: prepend key flags exptime bytes [noreply] value参数说明如下: key:键值 key-value 结构中的 key&a…...
Win10 VScode配置远程Linux开发环境
Windows VScode配置远程Linux开发环境 记录一下在Windows下VScode配置远程连接Linux环境进行开发的过程。 VScode的远程编程与调试的插件Remote Development,使用这个插件可以在很多情况下代替vim直接远程修改与调试服务器上的代码,搭配上VScode的语言…...
微信小程序校园自助点餐系统实战:从设计到实现
随着移动互联网的发展,越来越多的校园场景开始智能化、自助化。微信小程序凭借其轻量化、便捷性和强大的生态支持,成为了各类校园应用的首选工具之一。今天,我们将通过实际开发一个微信小程序“校园自助点餐系统”来展示如何设计和实现这样一…...
解决sublime编译无法输入问题
在使用sublime编译简单的c语言的时候,发现编译过程中,带有scanf的程序,无法正确的输入。 需要提前配置好gcc 和g++ 一、新增配置 新建编译系统文件:C.sublime-build 具体步骤:菜单中选择Tools——Build System——New Build System——保存文件名C.sublime-build ,填写以…...
const修饰指针总结
作者简介: 一个平凡而乐于分享的小比特,中南民族大学通信工程专业研究生在读,研究方向无线联邦学习 擅长领域:驱动开发,嵌入式软件开发,BSP开发 作者主页:一个平凡而乐于分享的小比特的个人主页…...
uniapp实现后端数据i18n国际化
1.在main.js配置请求获取到数据再设置到i18n中, 我这里是通过后端接口先获取到一个多个数据的的json链接,通过链接再获取数据,拿到数据后通过遍历的方式设置i18n //接口数据示例:{"vi": "http://localhost:8899/…...
什么是国密设计
国密设计,全称为“国家密码算法设计”,是指中国自主研发的一系列密码学算法和相关的技术标准。这些算法旨在提供安全可靠的加密、解密、签名验证等服务,并且在中国的信息安全领域中扮演着至关重要的角色。以下是关于国密设计的详细解释&#…...
Android IO 问题:java.io.IOException Operation not permitted
问题描述与处理策略 1、问题描述 java.io.IOException: Operation not permittedjava.nio.file.FileSystemException: /storage/emulated/0/test/test.txt: Operation not permittedjava.io.IOException: Operation not permitted:异常为操作不被允许 java.nio.f…...
安装bert_embedding遇到问题
在使用命令: pip install bert-embedding 安装bert_embedding的时候,遇到如下问题: ERROR: Failed cleaning build dir for numpy Successfully built gluonnlp Failed to build numpy ERROR: ERROR: Failed to build installable wheel…...
cka考试-03-k8s版本升级
一、原题 二、解答 [root@master ~]# kubectl get node NAME STATUS ROLES AGE VERSION master Ready control-plane,master 25h v1.22.12 node1 Ready worker 25h v1.22.12 node2 Ready worker …...
【insert 插入数据语法合集】.NET开源ORM框架 SqlSugar 系列
系列文章目录 🎀🎀🎀 .NET开源 ORM 框架 SqlSugar 系列 🎀🎀🎀 文章目录 系列文章目录一、前言 🍃二、插入方式 💯2.1 单条插入实体2.2 批量 插入实体2.3 根据字典插入2.4 根据 Dat…...
Spring Boot 的自动配置,以rabbitmq为例,请详细说明
Spring Boot 的自动配置特性能够大大简化集成外部服务和组件的配置过程。以 RabbitMQ 为例,Spring Boot 通过 spring-boot-starter-amqp 提供了自动配置支持,开发者只需在应用中添加相关依赖并配置必要的属性,Spring Boot 会自动配置所需的连…...
Visual Studio 2022+Qt6.5.3安装教程+环境配置+创建Qt项目+乱码插件+运行很完美(16岁孩子也能看懂)
点击上方"蓝字"关注我们 01、安装VS2022 >>> 一、安装VS2022 1、VS2022下载链接:Visual Studio 2022 IDE - 适用于软件开发人员的编程工具[https://visualstudio.microsoft.com/zh-hans/vs/] 2、选择Community 2022个人免费版,点击下载[https://gitcode.…...
LeetCode - 初级算法 数组(旋转数组)
旋转数组 这篇文章讨论如何通过编程实现数组元素的旋转操作。 免责声明:本文来源于个人知识与公开资料,仅用于学术交流。 描述 给定一个整数数组 nums,将数组中的元素向右轮转 k 个位置,其中 k 是非负数。 示例: 输入: nums = [1,2,3,...
C语言接口开发:Shadow Sound Hunter模型高效调用
C语言接口开发:Shadow & Sound Hunter模型高效调用 1. 引言 在实际的AI模型部署中,我们经常遇到这样的场景:需要将先进的AI模型集成到现有的C/C项目中,或者为嵌入式设备开发高效推理接口。Shadow & Sound Hunter作为功能…...
生成式AI推理服务扩缩容失效案例分析与解决方案(GPU利用率低于12%却持续扩容的底层逻辑)
第一章:生成式AI推理服务扩缩容失效案例分析与解决方案(GPU利用率低于12%却持续扩容的底层逻辑) 2026奇点智能技术大会(https://ml-summit.org) 在真实生产环境中,某大模型推理服务集群频繁触发水平自动扩缩容(HPA&a…...
水性浸涂漆工艺规范:从调配到干燥,讲透五金浸涂所有细节
在水性工业漆的实际应用中,浸涂工艺因其效率高、适合大批量小五金件(如螺栓、垫圈、弹簧、小型电机壳、刹车钳、千斤顶零部件等)而备受青睐。但很多工厂在浸漆时常常遇到气泡、流挂、膜厚不均等问题。本文以敦普水性工业漆的水性浸涂漆为例&a…...
Linux小白看过来:手把手教你用命令行在Ubuntu 16.04搞定MATLAB 2021b
Linux命令行实战:Ubuntu 16.04安装MATLAB 2021b全指南 第一次在Linux系统上安装专业软件?别担心,命令行操作其实比图形界面更高效。本文将带你用终端命令完成MATLAB 2021b的完整安装过程,每个步骤都会解释背后的原理,让…...
《英雄无敌:上古纪元》评测:经典回合制策略游戏的回归之作
开发任何一款新的《魔法门之英雄无敌》都是一场巨大的冒险。这个系列对许多玩家来说早已不只是回合制策略的经典,更是近乎无法超越的这种游戏的标杆。正因如此,每一部新作都会受到粉丝们的严苛审视:它不仅要是一款好游戏,还必须证…...
7大录制模式+双音轨独立控制:QuickRecorder让macOS录屏变得如此简单
7大录制模式双音轨独立控制:QuickRecorder让macOS录屏变得如此简单 【免费下载链接】QuickRecorder A lightweight screen recorder based on ScreenCapture Kit for macOS / 基于 ScreenCapture Kit 的轻量化多功能 macOS 录屏工具 项目地址: https://gitcode.co…...
终极FanControl中文配置指南:3步实现Windows智能风扇控制
终极FanControl中文配置指南:3步实现Windows智能风扇控制 【免费下载链接】FanControl.Releases This is the release repository for Fan Control, a highly customizable fan controlling software for Windows. 项目地址: https://gitcode.com/GitHub_Trending…...
别再只会用find了!C++字符串替换的3个实战场景与避坑指南(含中文字符处理)
别再只会用find了!C字符串替换的3个实战场景与避坑指南(含中文字符处理) 在C开发中,字符串处理看似基础却暗藏玄机。许多开发者习惯性地使用find和replace组合拳,直到在真实项目中遭遇中文字符乱码、性能瓶颈或跨平台兼…...
【AGI安全治理白皮书级指南】:20年AI伦理专家亲授7大风险红线与实时拦截框架
第一章:AGI安全治理的范式跃迁 2026奇点智能技术大会(https://ml-summit.org) 传统AI治理框架建立在“可控性假设”之上——即系统行为可被训练目标、监督信号与边界约束所充分引导。而通用人工智能(AGI)的涌现能力、目标内化机制与跨域自主…...
从零到一:手把手教你用OpenVINS跑通INDEMIND双目VIO(附避坑指南)
从零到一:手把手教你用OpenVINS跑通INDEMIND双目VIO(附避坑指南) 最近在机器人定位领域,基于视觉惯性里程计(VIO)的方案越来越受到关注。作为一个在多个实际项目中部署过VIO系统的开发者,我发现…...
