当前位置: 首页 > news >正文

flink cdc oceanbase(binlog模式)

接上文:一文说清flink从编码到部署上线
环境:①操作系统:阿里龙蜥 7.9(平替CentOS7.9);②CPU:x86;③用户:root。

预研初衷:现在很多项目有国产化的要求,操作系统、数据库需要国产化,然后就想着找既能开源免费,又能很好的兼容MySQL,还能很好支持flink。然后就在信创目录找到OceanBase数据库。

flink探索:flink CDC 找到这个文章Flink CDC 配置 OceanBase 实战指南,官网论坛感觉比较靠谱,然而发现按照说明引入依赖后,相关语法是不支持的。也在网上找了比较多的其它资料,中间比较坎坷,都未解决,不再赘述。最后转换思路:既然OceanBase支持MySQL binlog,那就把OceanBase当MySQL用,使用MySQL CDC是不是可以,最后问题得到解决。下面展开说明。

1.OceanBase部署

1.1 obd 部署

官方文档:oceanbase部署

注意:①这个地方最好选择obd 图形化部署,docker部署虽然简单,但是后续安装obbinlog会比较麻烦。②操作系统不要使用CentOS了,好多yum源不能用了。可以使用“阿里龙蜥 7.9”。

部署完,记得保存相关账号信息(供参考):

[{"component": "oceanbase-ce","access_url": "10.86.97.168:2881","user": "root","password": "pwd","connect_url": "obclient -h10.86.97.168 -P2881 -uroot -p'pwd' -Doceanbase -A"},{"component": "obproxy-ce","access_url": "10.86.97.168:2883","user": "root@proxysys","password": "Y6.B4s)pt","connect_url": "obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A \n"},{"component": "ocp-express","access_url": "10.86.97.168:8180","user": "admin","password": "DSxF-{odkdX-bmL6fjrF2{3mLL","connect_url": "http://10.86.97.168:8180"}
]

这个“ocp-express”是个监控页面,能看到集群信息,访问“http://10.86.97.168:8180”:
在这里插入图片描述

1.2 常用命令

启动:obd cluster start myoceanbase(改成具体集群名称)常用命令:
# 查看集群列表
obd cluster list
# 查看集群状态,以部署名为 obtest 为例
obd cluster display obtest
# 停止运行中的集群,以部署名为 obtest 为例
obd cluster stop obtest
# 销毁已部署的集群,以部署名为 obtest 为例
obd cluster destroy obtest

2.obbinlog

2.1 部署

官方文档:obbinlog部署

部署过程中,会遇到这个错误:“https://mirrors.oceanbase.com/community/stable/el/7.9/x86_64/repodata/repomd.xml: [Errno 14] HTTPS Error 404 - Not Found”

解决方法:修改“/etc/yum.repos.d/OceanBase.repo”中,“$releasever”改为“7”。
在这里插入图片描述
解决完上面这个错误,其它地方就比较顺利了。

查看是否安装成功:

netstat -anp | grep 2983

2.2 创建租户

由于“不可以用root@sys创建binlog任务”,所以要创建租户。

1.查看所有的租户信息:
SELECT * FROM oceanbase.DBA_OB_TENANTS;2.查看resource pool:
SELECT * FROM oceanbase.DBA_OB_RESOURCE_POOLS;3.创建“资源规格(UNIT)”
CREATE RESOURCE UNIT S1_unit_flink_testMEMORY_SIZE = '2G',MAX_CPU = 1, MIN_CPU = 1,LOG_DISK_SIZE = '6G',MAX_IOPS = 10000, MIN_IOPS = 10000, IOPS_WEIGHT=1;4.创建resource pool(仅 sys 租户的 root 用户(root@sys)可以创建资源池,其他租户不支持创建资源池)
-- sys_unit_config大概2GB内存。
CREATE RESOURCE POOL tenant_pool_flink_test UNIT='sys_unit_config', UNIT_NUM=1, ZONE_LIST=('zone1');5.创建租户:创建一个名为  flink_test_tenant 的租户(默认为 MySQL 模式租户),副本数为1,资源池指定为 flink_test_tenant_pool_01,Primary Zone 为 zone1,允许所有 IP 连接数据库。
CREATE TENANT IF NOT EXISTS flink_test_tenant  PRIMARY_ZONE='zone1', RESOURCE_POOL_LIST=('tenant_pool_flink_test') set OB_TCP_INVITED_NODES='%';6.使用新创建的租户管理员登录:
用户名:root@flink_test_tenant
密码:默认为空(有需要可以自己设置密码)7.创建用户( CREATE USER 权限较大,默认仅集群管理员和租户管理员拥有此系统权限):
CREATE USER 'test' IDENTIFIED BY 'pwd';
GRANT ALL ON *.* TO 'test';8.常用命令
其它命令,删除用户:
drop user 'test';
删除“资源规格”:
DROP RESOURCE UNIT S1_unit_flink_test;
查询已有的“资源规格”信息:
SELECT * FROM oceanbase.DBA_OB_UNIT_CONFIGS;

2.3 创建数据库

账号:test@flink_test_tenant
密码:pwd。
使用上面账号登录oceanbase后创建数据库。

CREATE DATABASE IF NOT EXISTS `flink_test`;
USE `flink_test`;SET FOREIGN_KEY_CHECKS=0;-- ----------------------------
-- Table structure for rv_table
-- ----------------------------
DROP TABLE IF EXISTS `rv_table`;
CREATE TABLE `rv_table` (`dt` varchar(10) NOT NULL ,`uuid` varchar(30) DEFAULT NULL,`report_time` bigint(20) DEFAULT NULL,PRIMARY KEY (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;2024-12-25	uid20241225	1735090201740
2024-12-26	uid20241226	1735090201741

2.4 创建binlog

进入binlog server服务:

cd /home/oceanbase-all-in-one/obclient/u01/obclient/bin
obclient -h127.0.0.1 -P2983

创建binlog:

CREATE BINLOG FOR TENANT `myoceanbase`.`flink_test_tenant` TO USER `root` PASSWORD `pwd` WITH CLUSTER URL `http://10.86.97.168:8080/services?Action=ObRootServiceInfo&ObCluster=myoceanbase`,REPLICATE NUM 2;

2.5 配置ODP

账号密码见安装完成保存的账号信息。
obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A
ALTER proxyconfig SET binlog_service_ip='10.86.97.168:2983';

2.6 验证结果

obclient -h10.86.97.168 -P2883 -uroot@flink_test_tenant  -p -Doceanbase -A
默认密码为空(到输密码时直接回车就行)。SHOW MASTER STATUS;SHOW BINLOG EVENTS;

2.7 常见问题

问题描述:CREATE BINLOG 报 “ERROR 1236 (HY000): Internal error”

查看日志:“[error] mysql_connecton_wrapper.cpp(121): Failed to execute query, error: (conn=3221748588) Table ‘flink_test.instances_gtid_seq’ doesn’t exist”,提示没有binlog相关表。

日志路径:/home/ds/oblogproxy/log/logproxy.log

解决:重新执行“sudo sh env/deploy.sh -m deploy -f env/deploy.conf.json”
相关的数据表会重建。然后再执行“CREATE BINLOG”即可。

或者说:应该先创建数据库,再安装obbinlog组件。安装后会在数据库创建binlog相关数据库表,如下:
在这里插入图片描述

3. fink CDC

3.1 核心代码

package com.zl.oceanbase;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.zl.utils.EnvUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;/*** 就当成MySQL使用就行。*/
public class OceanBaseCDCLikeMySQLExample {public static void main(String[] args) throws Exception {List<String> SYNC_TABLES = Arrays.asList("flink_test.rv_table");MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("10.86.97.168").port(2883)// oceanbase安装时obproxy-ce组件端口.databaseList("flink_test").tableList(String.join(",", SYNC_TABLES)).username("root@flink_test_tenant").password("")// 记得修改为实际密码.startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/OceanBaseCDCLikeMySQLExample");// 如果不能正常读取mysql的binlog:①可能是mysql没有打开binlog或者mysql版本不支持(当前在mysql5.7.20环境下,功能正常);// ②可能是数据库ip、port、账号、密码错误。env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(1).print();env.execute("Print MySQL Snapshot + Binlog");}
}

3.2 flink web

在这里插入图片描述

3.3 控制台日志

在这里插入图片描述

3.4 完整代码

完成代码见:flink-cdc-mysql

4.扩展

本文主要基于oceanbase oblogproxy的binlog模式。
其实oblogproxy还支持CDC模式,详见官网文档:CDC模式。

相关文章:

flink cdc oceanbase(binlog模式)

接上文&#xff1a;一文说清flink从编码到部署上线 环境&#xff1a;①操作系统&#xff1a;阿里龙蜥 7.9&#xff08;平替CentOS7.9&#xff09;&#xff1b;②CPU&#xff1a;x86&#xff1b;③用户&#xff1a;root。 预研初衷&#xff1a;现在很多项目有国产化的要求&#…...

【WPF】 数据绑定机制之INotifyPropertyChanged

INotifyPropertyChanged 是 WPF 中的一个接口&#xff0c;用于实现 数据绑定 中的 属性更改通知。它的主要作用是&#xff0c;当对象的某个属性值发生更改时&#xff0c;通知绑定到该属性的 UI 控件更新其显示内容。 以下是有关 INotifyPropertyChanged 的详细信息和实现方法&…...

机器学习算法深度解析:以支持向量机(SVM)为例及实战应用

机器学习算法深度解析&#xff1a;以支持向量机&#xff08;SVM&#xff09;为例及实战应用 在当今数据驱动的时代&#xff0c;机器学习作为人工智能的一个核心分支&#xff0c;正以前所未有的速度改变着我们的生活与工作方式。从金融风控到医疗诊断&#xff0c;从自动驾驶到智…...

网络编程基础:连接Java的秘密网络

1 网络编程的重要性 网络编程允许Java应用程序与其他计算机或设备进行通信。这包括从简单的数据传输到复杂的分布式系统和Web服务。 2 Java网络编程的核心类 Java提供了多个类来支持网络编程&#xff1a; InetAddress&#xff1a;表示网络上的IP地址。 URL&#xff1a;表示统…...

无监督学习:自编码器(AutoEncoder)

自编码器&#xff1a;数据的净化之旅 引言 自编码器作为一种强大的特征学习方法&#xff0c;已经经历了从简单到复杂的发展历程。本文综述了多种类型的自编码器及其演进过程&#xff0c;强调了它们在数据降维、图像处理、噪声去除及生成模型等方面的关键作用。随着技术的进步…...

在不到 5 分钟的时间内将威胁情报 PDF 添加为 AI 助手的自定义知识

作者&#xff1a;来自 Elastic jamesspi 安全运营团队通常会维护威胁情报报告的存储库&#xff0c;这些报告包含由报告提供商生成的大量知识。然而&#xff0c;挑战在于&#xff0c;这些报告的内容通常以 PDF 格式存在&#xff0c;使得在处理安全事件或调查时难以检索和引用相关…...

Memcached prepend 命令

Memcached prepend 命令用于向已存在 key(键) 的 value(数据值) 前面追加数据 。 语法&#xff1a; prepend 命令的基本语法格式如下&#xff1a; prepend key flags exptime bytes [noreply] value参数说明如下&#xff1a; key&#xff1a;键值 key-value 结构中的 key&a…...

Win10 VScode配置远程Linux开发环境

Windows VScode配置远程Linux开发环境 记录一下在Windows下VScode配置远程连接Linux环境进行开发的过程。 VScode的远程编程与调试的插件Remote Development&#xff0c;使用这个插件可以在很多情况下代替vim直接远程修改与调试服务器上的代码&#xff0c;搭配上VScode的语言…...

微信小程序校园自助点餐系统实战:从设计到实现

随着移动互联网的发展&#xff0c;越来越多的校园场景开始智能化、自助化。微信小程序凭借其轻量化、便捷性和强大的生态支持&#xff0c;成为了各类校园应用的首选工具之一。今天&#xff0c;我们将通过实际开发一个微信小程序“校园自助点餐系统”来展示如何设计和实现这样一…...

解决sublime编译无法输入问题

在使用sublime编译简单的c语言的时候,发现编译过程中,带有scanf的程序,无法正确的输入。 需要提前配置好gcc 和g++ 一、新增配置 新建编译系统文件:C.sublime-build 具体步骤:菜单中选择Tools——Build System——New Build System——保存文件名C.sublime-build ,填写以…...

const修饰指针总结

作者简介&#xff1a; 一个平凡而乐于分享的小比特&#xff0c;中南民族大学通信工程专业研究生在读&#xff0c;研究方向无线联邦学习 擅长领域&#xff1a;驱动开发&#xff0c;嵌入式软件开发&#xff0c;BSP开发 作者主页&#xff1a;一个平凡而乐于分享的小比特的个人主页…...

uniapp实现后端数据i18n国际化

1.在main.js配置请求获取到数据再设置到i18n中&#xff0c; 我这里是通过后端接口先获取到一个多个数据的的json链接&#xff0c;通过链接再获取数据&#xff0c;拿到数据后通过遍历的方式设置i18n //接口数据示例&#xff1a;{"vi": "http://localhost:8899/…...

什么是国密设计

国密设计&#xff0c;全称为“国家密码算法设计”&#xff0c;是指中国自主研发的一系列密码学算法和相关的技术标准。这些算法旨在提供安全可靠的加密、解密、签名验证等服务&#xff0c;并且在中国的信息安全领域中扮演着至关重要的角色。以下是关于国密设计的详细解释&#…...

Android IO 问题:java.io.IOException Operation not permitted

问题描述与处理策略 1、问题描述 java.io.IOException: Operation not permittedjava.nio.file.FileSystemException: /storage/emulated/0/test/test.txt: Operation not permittedjava.io.IOException: Operation not permitted&#xff1a;异常为操作不被允许 java.nio.f…...

安装bert_embedding遇到问题

在使用命令&#xff1a; pip install bert-embedding 安装bert_embedding的时候&#xff0c;遇到如下问题&#xff1a; ERROR: Failed cleaning build dir for numpy Successfully built gluonnlp Failed to build numpy ERROR: ERROR: Failed to build installable wheel…...

cka考试-03-k8s版本升级

一、原题 二、解答 [root@master ~]# kubectl get node NAME STATUS ROLES AGE VERSION master Ready control-plane,master 25h v1.22.12 node1 Ready worker 25h v1.22.12 node2 Ready worker …...

【insert 插入数据语法合集】.NET开源ORM框架 SqlSugar 系列

系列文章目录 &#x1f380;&#x1f380;&#x1f380; .NET开源 ORM 框架 SqlSugar 系列 &#x1f380;&#x1f380;&#x1f380; 文章目录 系列文章目录一、前言 &#x1f343;二、插入方式 &#x1f4af;2.1 单条插入实体2.2 批量 插入实体2.3 根据字典插入2.4 根据 Dat…...

Spring Boot 的自动配置,以rabbitmq为例,请详细说明

Spring Boot 的自动配置特性能够大大简化集成外部服务和组件的配置过程。以 RabbitMQ 为例&#xff0c;Spring Boot 通过 spring-boot-starter-amqp 提供了自动配置支持&#xff0c;开发者只需在应用中添加相关依赖并配置必要的属性&#xff0c;Spring Boot 会自动配置所需的连…...

Visual Studio 2022+Qt6.5.3安装教程+环境配置+创建Qt项目+乱码插件+运行很完美(16岁孩子也能看懂)

点击上方"蓝字"关注我们 01、安装VS2022 >>> 一、安装VS2022 1、VS2022下载链接:Visual Studio 2022 IDE - 适用于软件开发人员的编程工具[https://visualstudio.microsoft.com/zh-hans/vs/] 2、选择Community 2022个人免费版,点击下载[https://gitcode.…...

LeetCode - 初级算法 数组(旋转数组)

旋转数组 这篇文章讨论如何通过编程实现数组元素的旋转操作。 免责声明:本文来源于个人知识与公开资料,仅用于学术交流。 描述 给定一个整数数组 nums,将数组中的元素向右轮转 k 个位置,其中 k 是非负数。 示例: 输入: nums = [1,2,3,...

谷歌浏览器插件

项目中有时候会用到插件 sync-cookie-extension1.0.0&#xff1a;开发环境同步测试 cookie 至 localhost&#xff0c;便于本地请求服务携带 cookie 参考地址&#xff1a;https://juejin.cn/post/7139354571712757767 里面有源码下载下来&#xff0c;加在到扩展即可使用FeHelp…...

云计算——弹性云计算器(ECS)

弹性云服务器&#xff1a;ECS 概述 云计算重构了ICT系统&#xff0c;云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台&#xff0c;包含如下主要概念。 ECS&#xff08;Elastic Cloud Server&#xff09;&#xff1a;即弹性云服务器&#xff0c;是云计算…...

Debian系统简介

目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版&#xff…...

【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力

引言&#xff1a; 在人工智能快速发展的浪潮中&#xff0c;快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型&#xff08;LLM&#xff09;。该模型代表着该领域的重大突破&#xff0c;通过独特方式融合思考与非思考…...

Qt Http Server模块功能及架构

Qt Http Server 是 Qt 6.0 中引入的一个新模块&#xff0c;它提供了一个轻量级的 HTTP 服务器实现&#xff0c;主要用于构建基于 HTTP 的应用程序和服务。 功能介绍&#xff1a; 主要功能 HTTP服务器功能&#xff1a; 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1

每日一言 生活的美好&#xff0c;总是藏在那些你咬牙坚持的日子里。 硬件&#xff1a;OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写&#xff0c;"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...

IP如何挑?2025年海外专线IP如何购买?

你花了时间和预算买了IP&#xff0c;结果IP质量不佳&#xff0c;项目效率低下不说&#xff0c;还可能带来莫名的网络问题&#xff0c;是不是太闹心了&#xff1f;尤其是在面对海外专线IP时&#xff0c;到底怎么才能买到适合自己的呢&#xff1f;所以&#xff0c;挑IP绝对是个技…...

C++.OpenGL (20/64)混合(Blending)

混合(Blending) 透明效果核心原理 #mermaid-svg-SWG0UzVfJms7Sm3e {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-icon{fill:#552222;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-text{fill…...

SpringAI实战:ChatModel智能对话全解

一、引言&#xff1a;Spring AI 与 Chat Model 的核心价值 &#x1f680; 在 Java 生态中集成大模型能力&#xff0c;Spring AI 提供了高效的解决方案 &#x1f916;。其中 Chat Model 作为核心交互组件&#xff0c;通过标准化接口简化了与大语言模型&#xff08;LLM&#xff0…...

ubuntu系统文件误删(/lib/x86_64-linux-gnu/libc.so.6)修复方案 [成功解决]

报错信息&#xff1a;libc.so.6: cannot open shared object file: No such file or directory&#xff1a; #ls, ln, sudo...命令都不能用 error while loading shared libraries: libc.so.6: cannot open shared object file: No such file or directory重启后报错信息&…...