当前位置: 首页 > news >正文

flink cdc oceanbase(binlog模式)

接上文:一文说清flink从编码到部署上线
环境:①操作系统:阿里龙蜥 7.9(平替CentOS7.9);②CPU:x86;③用户:root。

预研初衷:现在很多项目有国产化的要求,操作系统、数据库需要国产化,然后就想着找既能开源免费,又能很好的兼容MySQL,还能很好支持flink。然后就在信创目录找到OceanBase数据库。

flink探索:flink CDC 找到这个文章Flink CDC 配置 OceanBase 实战指南,官网论坛感觉比较靠谱,然而发现按照说明引入依赖后,相关语法是不支持的。也在网上找了比较多的其它资料,中间比较坎坷,都未解决,不再赘述。最后转换思路:既然OceanBase支持MySQL binlog,那就把OceanBase当MySQL用,使用MySQL CDC是不是可以,最后问题得到解决。下面展开说明。

1.OceanBase部署

1.1 obd 部署

官方文档:oceanbase部署

注意:①这个地方最好选择obd 图形化部署,docker部署虽然简单,但是后续安装obbinlog会比较麻烦。②操作系统不要使用CentOS了,好多yum源不能用了。可以使用“阿里龙蜥 7.9”。

部署完,记得保存相关账号信息(供参考):

[{"component": "oceanbase-ce","access_url": "10.86.97.168:2881","user": "root","password": "pwd","connect_url": "obclient -h10.86.97.168 -P2881 -uroot -p'pwd' -Doceanbase -A"},{"component": "obproxy-ce","access_url": "10.86.97.168:2883","user": "root@proxysys","password": "Y6.B4s)pt","connect_url": "obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A \n"},{"component": "ocp-express","access_url": "10.86.97.168:8180","user": "admin","password": "DSxF-{odkdX-bmL6fjrF2{3mLL","connect_url": "http://10.86.97.168:8180"}
]

这个“ocp-express”是个监控页面,能看到集群信息,访问“http://10.86.97.168:8180”:
在这里插入图片描述

1.2 常用命令

启动:obd cluster start myoceanbase(改成具体集群名称)常用命令:
# 查看集群列表
obd cluster list
# 查看集群状态,以部署名为 obtest 为例
obd cluster display obtest
# 停止运行中的集群,以部署名为 obtest 为例
obd cluster stop obtest
# 销毁已部署的集群,以部署名为 obtest 为例
obd cluster destroy obtest

2.obbinlog

2.1 部署

官方文档:obbinlog部署

部署过程中,会遇到这个错误:“https://mirrors.oceanbase.com/community/stable/el/7.9/x86_64/repodata/repomd.xml: [Errno 14] HTTPS Error 404 - Not Found”

解决方法:修改“/etc/yum.repos.d/OceanBase.repo”中,“$releasever”改为“7”。
在这里插入图片描述
解决完上面这个错误,其它地方就比较顺利了。

查看是否安装成功:

netstat -anp | grep 2983

2.2 创建租户

由于“不可以用root@sys创建binlog任务”,所以要创建租户。

1.查看所有的租户信息:
SELECT * FROM oceanbase.DBA_OB_TENANTS;2.查看resource pool:
SELECT * FROM oceanbase.DBA_OB_RESOURCE_POOLS;3.创建“资源规格(UNIT)”
CREATE RESOURCE UNIT S1_unit_flink_testMEMORY_SIZE = '2G',MAX_CPU = 1, MIN_CPU = 1,LOG_DISK_SIZE = '6G',MAX_IOPS = 10000, MIN_IOPS = 10000, IOPS_WEIGHT=1;4.创建resource pool(仅 sys 租户的 root 用户(root@sys)可以创建资源池,其他租户不支持创建资源池)
-- sys_unit_config大概2GB内存。
CREATE RESOURCE POOL tenant_pool_flink_test UNIT='sys_unit_config', UNIT_NUM=1, ZONE_LIST=('zone1');5.创建租户:创建一个名为  flink_test_tenant 的租户(默认为 MySQL 模式租户),副本数为1,资源池指定为 flink_test_tenant_pool_01,Primary Zone 为 zone1,允许所有 IP 连接数据库。
CREATE TENANT IF NOT EXISTS flink_test_tenant  PRIMARY_ZONE='zone1', RESOURCE_POOL_LIST=('tenant_pool_flink_test') set OB_TCP_INVITED_NODES='%';6.使用新创建的租户管理员登录:
用户名:root@flink_test_tenant
密码:默认为空(有需要可以自己设置密码)7.创建用户( CREATE USER 权限较大,默认仅集群管理员和租户管理员拥有此系统权限):
CREATE USER 'test' IDENTIFIED BY 'pwd';
GRANT ALL ON *.* TO 'test';8.常用命令
其它命令,删除用户:
drop user 'test';
删除“资源规格”:
DROP RESOURCE UNIT S1_unit_flink_test;
查询已有的“资源规格”信息:
SELECT * FROM oceanbase.DBA_OB_UNIT_CONFIGS;

2.3 创建数据库

账号:test@flink_test_tenant
密码:pwd。
使用上面账号登录oceanbase后创建数据库。

CREATE DATABASE IF NOT EXISTS `flink_test`;
USE `flink_test`;SET FOREIGN_KEY_CHECKS=0;-- ----------------------------
-- Table structure for rv_table
-- ----------------------------
DROP TABLE IF EXISTS `rv_table`;
CREATE TABLE `rv_table` (`dt` varchar(10) NOT NULL ,`uuid` varchar(30) DEFAULT NULL,`report_time` bigint(20) DEFAULT NULL,PRIMARY KEY (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;2024-12-25	uid20241225	1735090201740
2024-12-26	uid20241226	1735090201741

2.4 创建binlog

进入binlog server服务:

cd /home/oceanbase-all-in-one/obclient/u01/obclient/bin
obclient -h127.0.0.1 -P2983

创建binlog:

CREATE BINLOG FOR TENANT `myoceanbase`.`flink_test_tenant` TO USER `root` PASSWORD `pwd` WITH CLUSTER URL `http://10.86.97.168:8080/services?Action=ObRootServiceInfo&ObCluster=myoceanbase`,REPLICATE NUM 2;

2.5 配置ODP

账号密码见安装完成保存的账号信息。
obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A
ALTER proxyconfig SET binlog_service_ip='10.86.97.168:2983';

2.6 验证结果

obclient -h10.86.97.168 -P2883 -uroot@flink_test_tenant  -p -Doceanbase -A
默认密码为空(到输密码时直接回车就行)。SHOW MASTER STATUS;SHOW BINLOG EVENTS;

2.7 常见问题

问题描述:CREATE BINLOG 报 “ERROR 1236 (HY000): Internal error”

查看日志:“[error] mysql_connecton_wrapper.cpp(121): Failed to execute query, error: (conn=3221748588) Table ‘flink_test.instances_gtid_seq’ doesn’t exist”,提示没有binlog相关表。

日志路径:/home/ds/oblogproxy/log/logproxy.log

解决:重新执行“sudo sh env/deploy.sh -m deploy -f env/deploy.conf.json”
相关的数据表会重建。然后再执行“CREATE BINLOG”即可。

或者说:应该先创建数据库,再安装obbinlog组件。安装后会在数据库创建binlog相关数据库表,如下:
在这里插入图片描述

3. fink CDC

3.1 核心代码

package com.zl.oceanbase;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.zl.utils.EnvUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;/*** 就当成MySQL使用就行。*/
public class OceanBaseCDCLikeMySQLExample {public static void main(String[] args) throws Exception {List<String> SYNC_TABLES = Arrays.asList("flink_test.rv_table");MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("10.86.97.168").port(2883)// oceanbase安装时obproxy-ce组件端口.databaseList("flink_test").tableList(String.join(",", SYNC_TABLES)).username("root@flink_test_tenant").password("")// 记得修改为实际密码.startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/OceanBaseCDCLikeMySQLExample");// 如果不能正常读取mysql的binlog:①可能是mysql没有打开binlog或者mysql版本不支持(当前在mysql5.7.20环境下,功能正常);// ②可能是数据库ip、port、账号、密码错误。env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(1).print();env.execute("Print MySQL Snapshot + Binlog");}
}

3.2 flink web

在这里插入图片描述

3.3 控制台日志

在这里插入图片描述

3.4 完整代码

完成代码见:flink-cdc-mysql

4.扩展

本文主要基于oceanbase oblogproxy的binlog模式。
其实oblogproxy还支持CDC模式,详见官网文档:CDC模式。

相关文章:

flink cdc oceanbase(binlog模式)

接上文&#xff1a;一文说清flink从编码到部署上线 环境&#xff1a;①操作系统&#xff1a;阿里龙蜥 7.9&#xff08;平替CentOS7.9&#xff09;&#xff1b;②CPU&#xff1a;x86&#xff1b;③用户&#xff1a;root。 预研初衷&#xff1a;现在很多项目有国产化的要求&#…...

【WPF】 数据绑定机制之INotifyPropertyChanged

INotifyPropertyChanged 是 WPF 中的一个接口&#xff0c;用于实现 数据绑定 中的 属性更改通知。它的主要作用是&#xff0c;当对象的某个属性值发生更改时&#xff0c;通知绑定到该属性的 UI 控件更新其显示内容。 以下是有关 INotifyPropertyChanged 的详细信息和实现方法&…...

机器学习算法深度解析:以支持向量机(SVM)为例及实战应用

机器学习算法深度解析&#xff1a;以支持向量机&#xff08;SVM&#xff09;为例及实战应用 在当今数据驱动的时代&#xff0c;机器学习作为人工智能的一个核心分支&#xff0c;正以前所未有的速度改变着我们的生活与工作方式。从金融风控到医疗诊断&#xff0c;从自动驾驶到智…...

网络编程基础:连接Java的秘密网络

1 网络编程的重要性 网络编程允许Java应用程序与其他计算机或设备进行通信。这包括从简单的数据传输到复杂的分布式系统和Web服务。 2 Java网络编程的核心类 Java提供了多个类来支持网络编程&#xff1a; InetAddress&#xff1a;表示网络上的IP地址。 URL&#xff1a;表示统…...

无监督学习:自编码器(AutoEncoder)

自编码器&#xff1a;数据的净化之旅 引言 自编码器作为一种强大的特征学习方法&#xff0c;已经经历了从简单到复杂的发展历程。本文综述了多种类型的自编码器及其演进过程&#xff0c;强调了它们在数据降维、图像处理、噪声去除及生成模型等方面的关键作用。随着技术的进步…...

在不到 5 分钟的时间内将威胁情报 PDF 添加为 AI 助手的自定义知识

作者&#xff1a;来自 Elastic jamesspi 安全运营团队通常会维护威胁情报报告的存储库&#xff0c;这些报告包含由报告提供商生成的大量知识。然而&#xff0c;挑战在于&#xff0c;这些报告的内容通常以 PDF 格式存在&#xff0c;使得在处理安全事件或调查时难以检索和引用相关…...

Memcached prepend 命令

Memcached prepend 命令用于向已存在 key(键) 的 value(数据值) 前面追加数据 。 语法&#xff1a; prepend 命令的基本语法格式如下&#xff1a; prepend key flags exptime bytes [noreply] value参数说明如下&#xff1a; key&#xff1a;键值 key-value 结构中的 key&a…...

Win10 VScode配置远程Linux开发环境

Windows VScode配置远程Linux开发环境 记录一下在Windows下VScode配置远程连接Linux环境进行开发的过程。 VScode的远程编程与调试的插件Remote Development&#xff0c;使用这个插件可以在很多情况下代替vim直接远程修改与调试服务器上的代码&#xff0c;搭配上VScode的语言…...

微信小程序校园自助点餐系统实战:从设计到实现

随着移动互联网的发展&#xff0c;越来越多的校园场景开始智能化、自助化。微信小程序凭借其轻量化、便捷性和强大的生态支持&#xff0c;成为了各类校园应用的首选工具之一。今天&#xff0c;我们将通过实际开发一个微信小程序“校园自助点餐系统”来展示如何设计和实现这样一…...

解决sublime编译无法输入问题

在使用sublime编译简单的c语言的时候,发现编译过程中,带有scanf的程序,无法正确的输入。 需要提前配置好gcc 和g++ 一、新增配置 新建编译系统文件:C.sublime-build 具体步骤:菜单中选择Tools——Build System——New Build System——保存文件名C.sublime-build ,填写以…...

const修饰指针总结

作者简介&#xff1a; 一个平凡而乐于分享的小比特&#xff0c;中南民族大学通信工程专业研究生在读&#xff0c;研究方向无线联邦学习 擅长领域&#xff1a;驱动开发&#xff0c;嵌入式软件开发&#xff0c;BSP开发 作者主页&#xff1a;一个平凡而乐于分享的小比特的个人主页…...

uniapp实现后端数据i18n国际化

1.在main.js配置请求获取到数据再设置到i18n中&#xff0c; 我这里是通过后端接口先获取到一个多个数据的的json链接&#xff0c;通过链接再获取数据&#xff0c;拿到数据后通过遍历的方式设置i18n //接口数据示例&#xff1a;{"vi": "http://localhost:8899/…...

什么是国密设计

国密设计&#xff0c;全称为“国家密码算法设计”&#xff0c;是指中国自主研发的一系列密码学算法和相关的技术标准。这些算法旨在提供安全可靠的加密、解密、签名验证等服务&#xff0c;并且在中国的信息安全领域中扮演着至关重要的角色。以下是关于国密设计的详细解释&#…...

Android IO 问题:java.io.IOException Operation not permitted

问题描述与处理策略 1、问题描述 java.io.IOException: Operation not permittedjava.nio.file.FileSystemException: /storage/emulated/0/test/test.txt: Operation not permittedjava.io.IOException: Operation not permitted&#xff1a;异常为操作不被允许 java.nio.f…...

安装bert_embedding遇到问题

在使用命令&#xff1a; pip install bert-embedding 安装bert_embedding的时候&#xff0c;遇到如下问题&#xff1a; ERROR: Failed cleaning build dir for numpy Successfully built gluonnlp Failed to build numpy ERROR: ERROR: Failed to build installable wheel…...

cka考试-03-k8s版本升级

一、原题 二、解答 [root@master ~]# kubectl get node NAME STATUS ROLES AGE VERSION master Ready control-plane,master 25h v1.22.12 node1 Ready worker 25h v1.22.12 node2 Ready worker …...

【insert 插入数据语法合集】.NET开源ORM框架 SqlSugar 系列

系列文章目录 &#x1f380;&#x1f380;&#x1f380; .NET开源 ORM 框架 SqlSugar 系列 &#x1f380;&#x1f380;&#x1f380; 文章目录 系列文章目录一、前言 &#x1f343;二、插入方式 &#x1f4af;2.1 单条插入实体2.2 批量 插入实体2.3 根据字典插入2.4 根据 Dat…...

Spring Boot 的自动配置,以rabbitmq为例,请详细说明

Spring Boot 的自动配置特性能够大大简化集成外部服务和组件的配置过程。以 RabbitMQ 为例&#xff0c;Spring Boot 通过 spring-boot-starter-amqp 提供了自动配置支持&#xff0c;开发者只需在应用中添加相关依赖并配置必要的属性&#xff0c;Spring Boot 会自动配置所需的连…...

Visual Studio 2022+Qt6.5.3安装教程+环境配置+创建Qt项目+乱码插件+运行很完美(16岁孩子也能看懂)

点击上方"蓝字"关注我们 01、安装VS2022 >>> 一、安装VS2022 1、VS2022下载链接:Visual Studio 2022 IDE - 适用于软件开发人员的编程工具[https://visualstudio.microsoft.com/zh-hans/vs/] 2、选择Community 2022个人免费版,点击下载[https://gitcode.…...

LeetCode - 初级算法 数组(旋转数组)

旋转数组 这篇文章讨论如何通过编程实现数组元素的旋转操作。 免责声明:本文来源于个人知识与公开资料,仅用于学术交流。 描述 给定一个整数数组 nums,将数组中的元素向右轮转 k 个位置,其中 k 是非负数。 示例: 输入: nums = [1,2,3,...

C语言接口开发:Shadow Sound Hunter模型高效调用

C语言接口开发&#xff1a;Shadow & Sound Hunter模型高效调用 1. 引言 在实际的AI模型部署中&#xff0c;我们经常遇到这样的场景&#xff1a;需要将先进的AI模型集成到现有的C/C项目中&#xff0c;或者为嵌入式设备开发高效推理接口。Shadow & Sound Hunter作为功能…...

生成式AI推理服务扩缩容失效案例分析与解决方案(GPU利用率低于12%却持续扩容的底层逻辑)

第一章&#xff1a;生成式AI推理服务扩缩容失效案例分析与解决方案&#xff08;GPU利用率低于12%却持续扩容的底层逻辑&#xff09; 2026奇点智能技术大会(https://ml-summit.org) 在真实生产环境中&#xff0c;某大模型推理服务集群频繁触发水平自动扩缩容&#xff08;HPA&a…...

水性浸涂漆工艺规范:从调配到干燥,讲透五金浸涂所有细节

在水性工业漆的实际应用中&#xff0c;浸涂工艺因其效率高、适合大批量小五金件&#xff08;如螺栓、垫圈、弹簧、小型电机壳、刹车钳、千斤顶零部件等&#xff09;而备受青睐。但很多工厂在浸漆时常常遇到气泡、流挂、膜厚不均等问题。本文以敦普水性工业漆的水性浸涂漆为例&a…...

Linux小白看过来:手把手教你用命令行在Ubuntu 16.04搞定MATLAB 2021b

Linux命令行实战&#xff1a;Ubuntu 16.04安装MATLAB 2021b全指南 第一次在Linux系统上安装专业软件&#xff1f;别担心&#xff0c;命令行操作其实比图形界面更高效。本文将带你用终端命令完成MATLAB 2021b的完整安装过程&#xff0c;每个步骤都会解释背后的原理&#xff0c;让…...

《英雄无敌:上古纪元》评测:经典回合制策略游戏的回归之作

开发任何一款新的《魔法门之英雄无敌》都是一场巨大的冒险。这个系列对许多玩家来说早已不只是回合制策略的经典&#xff0c;更是近乎无法超越的这种游戏的标杆。正因如此&#xff0c;每一部新作都会受到粉丝们的严苛审视&#xff1a;它不仅要是一款好游戏&#xff0c;还必须证…...

7大录制模式+双音轨独立控制:QuickRecorder让macOS录屏变得如此简单

7大录制模式双音轨独立控制&#xff1a;QuickRecorder让macOS录屏变得如此简单 【免费下载链接】QuickRecorder A lightweight screen recorder based on ScreenCapture Kit for macOS / 基于 ScreenCapture Kit 的轻量化多功能 macOS 录屏工具 项目地址: https://gitcode.co…...

终极FanControl中文配置指南:3步实现Windows智能风扇控制

终极FanControl中文配置指南&#xff1a;3步实现Windows智能风扇控制 【免费下载链接】FanControl.Releases This is the release repository for Fan Control, a highly customizable fan controlling software for Windows. 项目地址: https://gitcode.com/GitHub_Trending…...

别再只会用find了!C++字符串替换的3个实战场景与避坑指南(含中文字符处理)

别再只会用find了&#xff01;C字符串替换的3个实战场景与避坑指南&#xff08;含中文字符处理&#xff09; 在C开发中&#xff0c;字符串处理看似基础却暗藏玄机。许多开发者习惯性地使用find和replace组合拳&#xff0c;直到在真实项目中遭遇中文字符乱码、性能瓶颈或跨平台兼…...

【AGI安全治理白皮书级指南】:20年AI伦理专家亲授7大风险红线与实时拦截框架

第一章&#xff1a;AGI安全治理的范式跃迁 2026奇点智能技术大会(https://ml-summit.org) 传统AI治理框架建立在“可控性假设”之上——即系统行为可被训练目标、监督信号与边界约束所充分引导。而通用人工智能&#xff08;AGI&#xff09;的涌现能力、目标内化机制与跨域自主…...

从零到一:手把手教你用OpenVINS跑通INDEMIND双目VIO(附避坑指南)

从零到一&#xff1a;手把手教你用OpenVINS跑通INDEMIND双目VIO&#xff08;附避坑指南&#xff09; 最近在机器人定位领域&#xff0c;基于视觉惯性里程计&#xff08;VIO&#xff09;的方案越来越受到关注。作为一个在多个实际项目中部署过VIO系统的开发者&#xff0c;我发现…...