Flink CDC MySQL同步MySQL错误记录
1、启动 Flink SQL
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh

2、新建源表
问题1:Encountered “(”
处理方法:去掉int(11),改为int
Flink SQL> CREATE TABLE `t_user` (
> `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',
> `did` int(11) DEFAULT NULL COMMENT 'dept id',
> `username` varchar(14) DEFAULT NULL,
> `add_time` datetime DEFAULT NULL,
> PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = '192.25.34.2',
> 'port' = '3306',
> 'username' = '*******',
> 'password' = '*******',
> 'database-name' = 'test',
> 'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "(" at line 2, column 12.
Was expecting one of:"CONSTRAINT" ..."NOT" ..."NULL" ..."PRIMARY" ..."UNIQUE" ..."COMMENT" ..."METADATA" ...")" ..."," ..."MULTISET" ..."ARRAY" ...Flink SQL>
问题2:Encountered “AUTO_INCREMENT”
处理方法:删除AUTO_INCREMENT
Flink SQL> CREATE TABLE `t_user` (
> `uid` int NOT NULL AUTO_INCREMENT COMMENT 'user id',
> `did` int DEFAULT NULL COMMENT 'dept id',
> `username` varchar(14) DEFAULT NULL,
> `add_time` datetime DEFAULT NULL,
> PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = '192.25.34.2',
> 'port' = '3306',
> 'username' = '*******',
> 'password' = '*******',
> 'database-name' = 'test',
> 'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "AUTO_INCREMENT" at line 2, column 22.
Was expecting one of:"CONSTRAINT" ..."PRIMARY" ..."UNIQUE" ..."COMMENT" ..."METADATA" ...")" ..."," ..."MULTISET" ..."ARRAY" ...Flink SQL>
问题3:Encountered “DEFAULT”
处理方法:删去DEFAULT
Flink SQL> CREATE TABLE `t_user` (
> `uid` int NOT NULL COMMENT 'user id',
> `did` int DEFAULT NULL COMMENT 'dept id',
> `username` varchar(14) DEFAULT NULL,
> `add_time` datetime DEFAULT NULL,
> PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = '192.25.34.2',
> 'port' = '3306',
> 'username' = '*******',
> 'password' = '*******',
> 'database-name' = 'test',
> 'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "DEFAULT" at line 3, column 13.
Was expecting one of:"CONSTRAINT" ..."NOT" ..."NULL" ..."PRIMARY" ..."UNIQUE" ..."COMMENT" ..."METADATA" ...")" ..."," ..."MULTISET" ..."ARRAY" ...Flink SQL>
问题4:Unknown identifier ‘datetime’
处理方法:改用 TIMESTAMP(3)
Flink SQL> CREATE TABLE `t_user` (
> `uid` int NOT NULL COMMENT 'user id',
> `did` int COMMENT 'dept id',
> `username` varchar(14) ,
> `add_time` datetime ,
> PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = '192.25.34.2',
> 'port' = '3306',
> 'username' = '*******',
> 'password' = '*******',
> 'database-name' = 'test',
> 'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'datetime'Flink SQL>
创建成功:
Flink SQL> CREATE TABLE `t_user` (
> `uid` int NOT NULL COMMENT 'user id',
> `did` int COMMENT 'dept id',
> `username` varchar(14) ,
> `add_time` TIMESTAMP(3),
> PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = '192.25.34.2',
> 'port' = '3306',
> 'username' = '*******',
> 'password' = '*******',
> 'database-name' = 'test',
> 'table-name' = 't_user'
> );
[INFO] Execute statement succeed.Flink SQL>
3、创建目标表
Flink SQL> CREATE TABLE `ods_t_user` (
> `uid` int NOT NULL COMMENT 'user id',
> `did` int COMMENT 'dept id',
> `username` varchar(14) ,
> `add_time` TIMESTAMP(3),
> PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
> 'driver' = 'com.mysql.cj.jdbc.Driver',
> 'username' = '*******',
> 'password' = '*******',
> 'table-name' = 'ods_t_user'
> );
4、将源表加载到目标表
错误1:Connector ‘mysql-cdc’ can only be used as a source. It cannot be used as a sink.
Flink SQL> insert into t_user select * from ods_t_user;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Connector 'mysql-cdc' can only be used as a source. It cannot be used as a sink.Flink SQL>
原因:方向搞反了,插入表应该是目标表
Flink SQL> insert into ods_t_user select * from t_user;
[ERROR] Could not execute SQL statement. Reason:
java.io.StreamCorruptedException: unexpected block dataFlink SQL>
错误2:unexpected block data
解决办法:
(1)更新jar包如下
[appuser@whtpjfscpt01 flink-1.17.1]$ ll lib/
total 223320
-rw-r--r-- 1 appuser appuser 196491 May 19 18:56 flink-cep-1.17.1.jar
-rw-r--r-- 1 appuser appuser 542620 May 19 18:59 flink-connector-files-1.17.1.jar
-rw-r--r-- 1 appuser appuser 266420 Sep 25 14:21 flink-connector-jdbc-3.1.1-1.17.jar
-rw-r--r-- 1 appuser appuser 345711 Sep 25 15:45 flink-connector-mysql-cdc-2.4.1.jar
-rw-r--r-- 1 appuser appuser 102472 May 19 19:02 flink-csv-1.17.1.jar
-rw-r--r-- 1 appuser appuser 135975541 May 19 19:13 flink-dist-1.17.1.jar
-rw-r--r-- 1 appuser appuser 8452171 Sep 19 10:20 flink-doris-connector-1.17-1.4.0.jar
-rw-r--r-- 1 appuser appuser 180248 May 19 19:02 flink-json-1.17.1.jar
-rw-r--r-- 1 appuser appuser 21043319 May 19 19:12 flink-scala_2.12-1.17.1.jar
-rw-r--r-- 1 appuser appuser 15407424 May 19 19:13 flink-table-api-java-uber-1.17.1.jar
-rw-r--r-- 1 appuser appuser 38191226 May 19 19:08 flink-table-planner-loader-1.17.1.jar
-rw-r--r-- 1 appuser appuser 3146210 May 19 18:56 flink-table-runtime-1.17.1.jar
-rw-r--r-- 1 appuser appuser 208006 May 17 18:07 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 appuser appuser 301872 May 17 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 appuser appuser 1790452 May 17 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 appuser appuser 24279 May 17 18:07 log4j-slf4j-impl-2.17.1.jar
-rw-r--r-- 1 appuser appuser 2462364 Sep 19 11:30 mysql-connector-java-8.0.26.jar
[appuser@whtpjfscpt01 flink-1.17.1]$
(2)重启flink
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 41993) on host whtpjfscpt01.
Stopping standalonesession daemon (pid: 41597) on host whtpjfscpt01.
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host whtpjfscpt01.
Starting taskexecutor daemon on host whtpjfscpt01.
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh
(3)重新执行
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Execute statement succeed.Flink SQL> CREATE TABLE `t_user` (
> `uid` int NOT NULL COMMENT 'user id',
> `did` int COMMENT 'dept id',
> `username` varchar(14) ,
> `add_time` TIMESTAMP(3),
> PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = '192.25.34.2',
> 'port' = '3306',
> 'username' = '*******',
> 'password' = '*******',
> 'database-name' = 'test',
> 'table-name' = 't_user'
> );
[INFO] Execute statement succeed.Flink SQL> CREATE TABLE `ods_t_user` (
> `uid` int NOT NULL COMMENT 'user id',
> `did` int COMMENT 'dept id',
> `username` varchar(14) ,
> `add_time` TIMESTAMP(3),
> PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
> 'driver' = 'com.mysql.cj.jdbc.Driver',
> 'username' = '*******',
> 'password' = '*******',
> 'table-name' = 'ods_t_user'
> );
[INFO] Execute statement succeed.Flink SQL>
(4)成功执行
Flink SQL> insert into ods_t_user select * from t_user;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c2e69d061f3777c031b0acb4ec03d13a

错误3:无目标表

CREATE TABLE demo.ods_t_user (`uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',`did` int(11) DEFAULT NULL COMMENT 'dept id',`username` varchar(14) DEFAULT NULL,`add_time` datetime DEFAULT NULL,PRIMARY KEY (`uid`)
)

源表添加新纪录
INSERT INTO test.t_user(did,username)values('3','test');
目标表自动同步数据

相关文章:
Flink CDC MySQL同步MySQL错误记录
1、启动 Flink SQL [appuserwhtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh2、新建源表 问题1:Encountered “(” 处理方法:去掉int(11),改为int Flink SQL> CREATE TABLE t_user ( > uid int(11) NOT NULL AUTO_INCREMENT COMME…...
深入了解 Linux 中的 AWK 命令:文本处理的瑞士军刀
简介 在Linux和Unix操作系统中,文本处理是一个常见的任务。AWK命令是一个强大的文本处理工具,专门进行文本截取和分析,它允许你在文本文件中查找、过滤、处理和格式化数据。本文将深入介绍Linux中的AWK命令,让你了解其基本用法和…...
【RuoYi项目分析】网关的AuthFilter完成“认证”,注意是认证而不是权限
文章目录 1. 功能介绍2. AuthFilter的配置3. AuthFilter实现分析4. 资料参考 过滤器的功能是检验经过网关的每一个请求,检查 token 中的信息是否有效。 注意是“认证检查”,而不是“权限” 1. 功能介绍 1、在用户完成登录后,程序会把用户相关…...
excel将文件夹下面的表格文件指定名称的sheet批量导出到指定文件中,并按照文件名保存在新文件的不同sheet中
excel将文件夹下面的表格文件指定名称的sheet批量导出到指定文件中,并按照文件名保存在新文件的不同sheet中 import pandas as pd import ositems os.listdir("./") sheetname"" for item in items:if item.__contains__(xls):dfpd.read_exc…...
IIS管理器无法打开。启动后,在任务栏中有,但是窗口不见了
找到IIS管理器启动程序的所在位置 并在cmd命令行中调用 inetmgr.exe /reset 进行重启 先查看IIS管理器属性,找到其位置 管理员模式打开cmd命令行,并切换到上面的文件夹下运行Inetmgr.exe /reset 运行完成后可以重新看到IIS窗口 原因:由于某…...
使用VBA实现快速模糊查询数据
实例需求:基础数据保存在Database工作表中,如下图所示。 基础数据有37个字段,上图仅展示部分字段内容,下图中黄色字段为需要提取的数据字段。 在Search工作表B1单元格输入查询关键字Title和Genre字段中搜索关键字,包…...
spring boot flowable多人前加签
1、前加签插件 package com.xxx.flowable.cmd;import com.xxx.auth.security.user.SecurityUser; import com.xxx.commons.ApplicationContextHolder; import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.co…...
结构体运算符重载
1.降序 struct Point{int x, y;//重载比较符bool operator < (const Point &a) const{return x > a.x;//当前元素大时,是降序} };2.升序 struct Point{int x, y;//重载比较符 // bool operator < (const Point &a) const{ // return x…...
幽默直观的文档作者注释
注释是程序中非常重要的一部分,在不同的编程语言中,注释的风格和语言描述会有所不同。以下是一些常用的注释风格和语言描述: 直观注释:这种注释使用简洁、明了的语言,帮助读者快速地理解代码。例如,代码中…...
前端开发网站推荐
每个人都会遇见那么一个人,永远无法忘却,也永远不能拥有。 以下是一些可以用来查找和比较前端框架的推荐网站: JavaScript框架比较: 这些网站提供了对不同JavaScript框架和库的详细比较和评估。 JavaScripting: 提供了大量的JavaS…...
【C语言】通讯录管理系统(保姆级教程+内含源码)
C系列文章目录 目录 C系列文章目录 前言 一,模块化编程 二,系统框架构建 1.成员信息的创建 2.菜单实现 3.系统功能声明 三、系统功能实现 1.初始化通讯录 2.增加联系人 3.显示所有联系人 4.根据姓名查找位置 5.删除指定联系人 6.查找指定联…...
python自动解析301、302重定向链接
嗨喽~大家好呀,这里是魔王呐 ❤ ~! python更多源码/资料/解答/教程等 点击此处跳转文末名片免费获取 使用模块requests 方式代码如下: import requests url_string"http://******" r requests.head(url_string, streamTrue) print r.h…...
【未解决问题】opencv 交叉编译 ffmpeg选项始终为NO
opencv 打不开视频的原因 在交叉编译时候,发现在 pc 端能用 opencv 打开的视频,但是在 rv1126 上打不开。在网上查了很久,原因可能是 ffmpeg 造成的。 解决opencv源代码编译找不到ffmpeg-CSDN博客 交叉编译 ffmpeg 尝试了一天还是第二个博客…...
Python实用技术二:数据分析和可视化(2)
目录 一,多维数组库numpy 1,操作函数: 2,numpy数组元素增删 1)添加数组元素 2)numpy删除数组元素 3)在numpy数组中查找元素 4)numpy数组的数学运算 3,numpy数…...
24Hibench
1. Hibench 官网 HiBench is a big data benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilizations. It contains a set of Hadoop, Spark and streaming workloads, including Sort, WordCou…...
VC++父进程交互式操作子进程标准输入输出
父进程接管子进程的标准输入输出和错误,实现对子进程的交互操作。比如子进程是一个类似mysql这种可以交互的命令,执行操作后输出结果,父进程根据结果分析决定执行下一步的命令,从而替代人工的输入。 通过父进程创建子进程,使用管道重定向子进程的输入输出错误可以实现 在 …...
一步一招,教你如何制作出成功的优惠促销微传单
在当今的数字化时代,几乎所有的事情都可以在互联网上完成,包括制作宣传单。有很多在线工具可以帮助我们轻松制作出精美的商场促销宣传单。下面就以乔拓云为例,详细介绍如何简单几步制作出让人眼前一亮的商场促销宣传单。 1. 注册并登录乔拓云…...
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…...
Git使用【上】
欢迎来到Cefler的博客😁 🕌博客主页:那个传说中的man的主页 🏠个人专栏:题目解析 🌎推荐文章:题目大解析3 前言 先前有些git命令我在我的其它文章里面已经写过,若要查看可参考【Linu…...
flink的序列化基准测试
背景: flink提供了在本地环境使用jmh测试不同序列化方法的性能差异,本文就是基于这个https://github.com/apache/flink-benchmarks这个性能测试,总结几个结论,以便后面使用时避免掉坑 基准测试 我们本次运行的是SerializationF…...
大数据学习栈记——Neo4j的安装与使用
本文介绍图数据库Neofj的安装与使用,操作系统:Ubuntu24.04,Neofj版本:2025.04.0。 Apt安装 Neofj可以进行官网安装:Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...
全球首个30米分辨率湿地数据集(2000—2022)
数据简介 今天我们分享的数据是全球30米分辨率湿地数据集,包含8种湿地亚类,该数据以0.5X0.5的瓦片存储,我们整理了所有属于中国的瓦片名称与其对应省份,方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...
Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...
微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据
微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列,以便知晓哪些列包含有价值的数据,…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用
在工业制造领域,无损检测(NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统,以非接触式光学麦克风技术为核心,打破传统检测瓶颈,为半导体、航空航天、汽车制造等行业提供了高灵敏…...
Chrome 浏览器前端与客户端双向通信实战
Chrome 前端(即页面 JS / Web UI)与客户端(C 后端)的交互机制,是 Chromium 架构中非常核心的一环。下面我将按常见场景,从通道、流程、技术栈几个角度做一套完整的分析,特别适合你这种在分析和改…...
