当前位置: 首页 > news >正文

Flink CDC MySQL同步MySQL错误记录

1、启动 Flink SQL

[appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh

在这里插入图片描述

2、新建源表

问题1:Encountered “(”
处理方法:去掉int(11),改为int

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',
>   `did` int(11) DEFAULT NULL COMMENT 'dept id',
>   `username` varchar(14) DEFAULT NULL,
>   `add_time` datetime DEFAULT NULL,
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "(" at line 2, column 12.
Was expecting one of:"CONSTRAINT" ..."NOT" ..."NULL" ..."PRIMARY" ..."UNIQUE" ..."COMMENT" ..."METADATA" ...")" ..."," ..."MULTISET" ..."ARRAY" ...Flink SQL> 

问题2:Encountered “AUTO_INCREMENT”
处理方法:删除AUTO_INCREMENT

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int NOT NULL AUTO_INCREMENT COMMENT 'user id',
>   `did` int DEFAULT NULL COMMENT 'dept id',
>   `username` varchar(14) DEFAULT NULL,
>   `add_time` datetime DEFAULT NULL,
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "AUTO_INCREMENT" at line 2, column 22.
Was expecting one of:"CONSTRAINT" ..."PRIMARY" ..."UNIQUE" ..."COMMENT" ..."METADATA" ...")" ..."," ..."MULTISET" ..."ARRAY" ...Flink SQL> 

问题3:Encountered “DEFAULT”
处理方法:删去DEFAULT

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int NOT NULL COMMENT 'user id',
>   `did` int DEFAULT NULL COMMENT 'dept id',
>   `username` varchar(14) DEFAULT NULL,
>   `add_time` datetime DEFAULT NULL,
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "DEFAULT" at line 3, column 13.
Was expecting one of:"CONSTRAINT" ..."NOT" ..."NULL" ..."PRIMARY" ..."UNIQUE" ..."COMMENT" ..."METADATA" ...")" ..."," ..."MULTISET" ..."ARRAY" ...Flink SQL> 

问题4:Unknown identifier ‘datetime’
处理方法:改用 TIMESTAMP(3)

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int NOT NULL COMMENT 'user id',
>   `did` int COMMENT 'dept id',
>   `username` varchar(14) ,
>   `add_time` datetime ,
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'datetime'Flink SQL> 

创建成功:

Flink SQL> CREATE TABLE `t_user` (
>   `uid` int NOT NULL COMMENT 'user id',
>   `did` int COMMENT 'dept id',
>   `username` varchar(14) ,
>   `add_time` TIMESTAMP(3),
>   PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
> );
[INFO] Execute statement succeed.Flink SQL> 

3、创建目标表

Flink SQL> CREATE TABLE `ods_t_user` (
>     `uid` int NOT NULL COMMENT 'user id',
>     `did` int COMMENT 'dept id',
>     `username` varchar(14) ,
>     `add_time` TIMESTAMP(3),
>     PRIMARY KEY (`uid`) NOT ENFORCED
>  ) WITH (
>      'connector' = 'jdbc',
>      'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
>      'driver' = 'com.mysql.cj.jdbc.Driver',
>      'username' = '*******',
>      'password' = '*******',
>      'table-name' = 'ods_t_user'
> );

4、将源表加载到目标表

错误1:Connector ‘mysql-cdc’ can only be used as a source. It cannot be used as a sink.

Flink SQL> insert into t_user select * from ods_t_user;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Connector 'mysql-cdc' can only be used as a source. It cannot be used as a sink.Flink SQL> 

原因:方向搞反了,插入表应该是目标表

Flink SQL> insert into ods_t_user select * from t_user;
[ERROR] Could not execute SQL statement. Reason:
java.io.StreamCorruptedException: unexpected block dataFlink SQL> 

错误2:unexpected block data
解决办法:
(1)更新jar包如下

[appuser@whtpjfscpt01 flink-1.17.1]$ ll lib/
total 223320
-rw-r--r-- 1 appuser appuser    196491 May 19 18:56 flink-cep-1.17.1.jar
-rw-r--r-- 1 appuser appuser    542620 May 19 18:59 flink-connector-files-1.17.1.jar
-rw-r--r-- 1 appuser appuser    266420 Sep 25 14:21 flink-connector-jdbc-3.1.1-1.17.jar
-rw-r--r-- 1 appuser appuser    345711 Sep 25 15:45 flink-connector-mysql-cdc-2.4.1.jar
-rw-r--r-- 1 appuser appuser    102472 May 19 19:02 flink-csv-1.17.1.jar
-rw-r--r-- 1 appuser appuser 135975541 May 19 19:13 flink-dist-1.17.1.jar
-rw-r--r-- 1 appuser appuser   8452171 Sep 19 10:20 flink-doris-connector-1.17-1.4.0.jar
-rw-r--r-- 1 appuser appuser    180248 May 19 19:02 flink-json-1.17.1.jar
-rw-r--r-- 1 appuser appuser  21043319 May 19 19:12 flink-scala_2.12-1.17.1.jar
-rw-r--r-- 1 appuser appuser  15407424 May 19 19:13 flink-table-api-java-uber-1.17.1.jar
-rw-r--r-- 1 appuser appuser  38191226 May 19 19:08 flink-table-planner-loader-1.17.1.jar
-rw-r--r-- 1 appuser appuser   3146210 May 19 18:56 flink-table-runtime-1.17.1.jar
-rw-r--r-- 1 appuser appuser    208006 May 17 18:07 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 appuser appuser    301872 May 17 18:07 log4j-api-2.17.1.jar
-rw-r--r-- 1 appuser appuser   1790452 May 17 18:07 log4j-core-2.17.1.jar
-rw-r--r-- 1 appuser appuser     24279 May 17 18:07 log4j-slf4j-impl-2.17.1.jar
-rw-r--r-- 1 appuser appuser   2462364 Sep 19 11:30 mysql-connector-java-8.0.26.jar
[appuser@whtpjfscpt01 flink-1.17.1]$

(2)重启flink

[appuser@whtpjfscpt01 flink-1.17.1]$ bin/stop-cluster.sh 
Stopping taskexecutor daemon (pid: 41993) on host whtpjfscpt01.
Stopping standalonesession daemon (pid: 41597) on host whtpjfscpt01.
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host whtpjfscpt01.
Starting taskexecutor daemon on host whtpjfscpt01.
[appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh

(3)重新执行

Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Execute statement succeed.Flink SQL> CREATE TABLE `t_user` (
>     `uid` int NOT NULL COMMENT 'user id',
>     `did` int COMMENT 'dept id',
>     `username` varchar(14) ,
>     `add_time` TIMESTAMP(3),
>     PRIMARY KEY (`uid`) NOT ENFORCED
> ) WITH (
>       'connector' = 'mysql-cdc',
>       'hostname' = '192.25.34.2',
>       'port' = '3306',
>       'username' = '*******',
>       'password' = '*******',
>       'database-name' = 'test',
>       'table-name' = 't_user'
>  );
[INFO] Execute statement succeed.Flink SQL> CREATE TABLE `ods_t_user` (
>     `uid` int NOT NULL COMMENT 'user id',
>     `did` int COMMENT 'dept id',
>     `username` varchar(14) ,
>     `add_time` TIMESTAMP(3),
>     PRIMARY KEY (`uid`) NOT ENFORCED
>  ) WITH (
>      'connector' = 'jdbc',
>      'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
>      'driver' = 'com.mysql.cj.jdbc.Driver',
>      'username' = '*******',
>      'password' = '*******',
>      'table-name' = 'ods_t_user'
> );
[INFO] Execute statement succeed.Flink SQL>

(4)成功执行

Flink SQL> insert into ods_t_user select * from t_user;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c2e69d061f3777c031b0acb4ec03d13a

在这里插入图片描述

错误3:无目标表
在这里插入图片描述

 CREATE TABLE demo.ods_t_user (`uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',`did` int(11) DEFAULT NULL COMMENT 'dept id',`username` varchar(14) DEFAULT NULL,`add_time` datetime DEFAULT NULL,PRIMARY KEY (`uid`) 
) 

在这里插入图片描述
源表添加新纪录

INSERT INTO test.t_user(did,username)values('3','test'); 

目标表自动同步数据
在这里插入图片描述

相关文章:

Flink CDC MySQL同步MySQL错误记录

1、启动 Flink SQL [appuserwhtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh2、新建源表 问题1:Encountered “(” 处理方法:去掉int(11),改为int Flink SQL> CREATE TABLE t_user ( > uid int(11) NOT NULL AUTO_INCREMENT COMME…...

深入了解 Linux 中的 AWK 命令:文本处理的瑞士军刀

简介 在Linux和Unix操作系统中,文本处理是一个常见的任务。AWK命令是一个强大的文本处理工具,专门进行文本截取和分析,它允许你在文本文件中查找、过滤、处理和格式化数据。本文将深入介绍Linux中的AWK命令,让你了解其基本用法和…...

【RuoYi项目分析】网关的AuthFilter完成“认证”,注意是认证而不是权限

文章目录 1. 功能介绍2. AuthFilter的配置3. AuthFilter实现分析4. 资料参考 过滤器的功能是检验经过网关的每一个请求,检查 token 中的信息是否有效。 注意是“认证检查”,而不是“权限” 1. 功能介绍 1、在用户完成登录后,程序会把用户相关…...

excel将文件夹下面的表格文件指定名称的sheet批量导出到指定文件中,并按照文件名保存在新文件的不同sheet中

excel将文件夹下面的表格文件指定名称的sheet批量导出到指定文件中,并按照文件名保存在新文件的不同sheet中 import pandas as pd import ositems os.listdir("./") sheetname"" for item in items:if item.__contains__(xls):dfpd.read_exc…...

IIS管理器无法打开。启动后,在任务栏中有,但是窗口不见了

找到IIS管理器启动程序的所在位置 并在cmd命令行中调用 inetmgr.exe /reset 进行重启 先查看IIS管理器属性,找到其位置 管理员模式打开cmd命令行,并切换到上面的文件夹下运行Inetmgr.exe /reset 运行完成后可以重新看到IIS窗口 原因:由于某…...

使用VBA实现快速模糊查询数据

实例需求:基础数据保存在Database工作表中,如下图所示。 基础数据有37个字段,上图仅展示部分字段内容,下图中黄色字段为需要提取的数据字段。 在Search工作表B1单元格输入查询关键字Title和Genre字段中搜索关键字,包…...

spring boot flowable多人前加签

1、前加签插件 package com.xxx.flowable.cmd;import com.xxx.auth.security.user.SecurityUser; import com.xxx.commons.ApplicationContextHolder; import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.co…...

结构体运算符重载

1.降序 struct Point{int x, y;//重载比较符bool operator < (const Point &a) const{return x > a.x;//当前元素大时&#xff0c;是降序} };2.升序 struct Point{int x, y;//重载比较符 // bool operator < (const Point &a) const{ // return x…...

幽默直观的文档作者注释

注释是程序中非常重要的一部分&#xff0c;在不同的编程语言中&#xff0c;注释的风格和语言描述会有所不同。以下是一些常用的注释风格和语言描述&#xff1a; 直观注释&#xff1a;这种注释使用简洁、明了的语言&#xff0c;帮助读者快速地理解代码。例如&#xff0c;代码中…...

前端开发网站推荐

每个人都会遇见那么一个人&#xff0c;永远无法忘却&#xff0c;也永远不能拥有。 以下是一些可以用来查找和比较前端框架的推荐网站&#xff1a; JavaScript框架比较&#xff1a; 这些网站提供了对不同JavaScript框架和库的详细比较和评估。 JavaScripting: 提供了大量的JavaS…...

【C语言】通讯录管理系统(保姆级教程+内含源码)

C系列文章目录 目录 C系列文章目录 前言 一&#xff0c;模块化编程 二&#xff0c;系统框架构建 1.成员信息的创建 2.菜单实现 3.系统功能声明 三、系统功能实现 1.初始化通讯录 2.增加联系人 3.显示所有联系人 4.根据姓名查找位置 5.删除指定联系人 6.查找指定联…...

python自动解析301、302重定向链接

嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! python更多源码/资料/解答/教程等 点击此处跳转文末名片免费获取 使用模块requests 方式代码如下&#xff1a; import requests url_string"http://******" r requests.head(url_string, streamTrue) print r.h…...

【未解决问题】opencv 交叉编译 ffmpeg选项始终为NO

opencv 打不开视频的原因 在交叉编译时候&#xff0c;发现在 pc 端能用 opencv 打开的视频&#xff0c;但是在 rv1126 上打不开。在网上查了很久&#xff0c;原因可能是 ffmpeg 造成的。 解决opencv源代码编译找不到ffmpeg-CSDN博客 交叉编译 ffmpeg 尝试了一天还是第二个博客…...

Python实用技术二:数据分析和可视化(2)

目录 一&#xff0c;多维数组库numpy 1&#xff0c;操作函数&#xff1a;​ 2&#xff0c;numpy数组元素增删 1&#xff09;添加数组元素 2&#xff09;numpy删除数组元素 3&#xff09;在numpy数组中查找元素 4&#xff09;numpy数组的数学运算 3&#xff0c;numpy数…...

24Hibench

1. Hibench 官网 ​ HiBench is a big data benchmark suite that helps evaluate different big data frameworks in terms of speed, throughput and system resource utilizations. It contains a set of Hadoop, Spark and streaming workloads, including Sort, WordCou…...

VC++父进程交互式操作子进程标准输入输出

父进程接管子进程的标准输入输出和错误,实现对子进程的交互操作。比如子进程是一个类似mysql这种可以交互的命令,执行操作后输出结果,父进程根据结果分析决定执行下一步的命令,从而替代人工的输入。 通过父进程创建子进程,使用管道重定向子进程的输入输出错误可以实现 在 …...

一步一招,教你如何制作出成功的优惠促销微传单

在当今的数字化时代&#xff0c;几乎所有的事情都可以在互联网上完成&#xff0c;包括制作宣传单。有很多在线工具可以帮助我们轻松制作出精美的商场促销宣传单。下面就以乔拓云为例&#xff0c;详细介绍如何简单几步制作出让人眼前一亮的商场促销宣传单。 1. 注册并登录乔拓云…...

27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…...

Git使用【上】

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;那个传说中的man的主页 &#x1f3e0;个人专栏&#xff1a;题目解析 &#x1f30e;推荐文章&#xff1a;题目大解析3 前言 先前有些git命令我在我的其它文章里面已经写过&#xff0c;若要查看可参考【Linu…...

flink的序列化基准测试

背景&#xff1a; flink提供了在本地环境使用jmh测试不同序列化方法的性能差异&#xff0c;本文就是基于这个https://github.com/apache/flink-benchmarks这个性能测试&#xff0c;总结几个结论&#xff0c;以便后面使用时避免掉坑 基准测试 我们本次运行的是SerializationF…...

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...

基于大模型的 UI 自动化系统

基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...

系统设计 --- MongoDB亿级数据查询优化策略

系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log&#xff0c;共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题&#xff0c;不能使用ELK只能使用…...

稳定币的深度剖析与展望

一、引言 在当今数字化浪潮席卷全球的时代&#xff0c;加密货币作为一种新兴的金融现象&#xff0c;正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而&#xff0c;加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下&#xff0c;稳定…...

关键领域软件测试的突围之路:如何破解安全与效率的平衡难题

在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件&#xff0c;这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下&#xff0c;实现高效测试与快速迭代&#xff1f;这一命题正考验着…...

从面试角度回答Android中ContentProvider启动原理

Android中ContentProvider原理的面试角度解析&#xff0c;分为​​已启动​​和​​未启动​​两种场景&#xff1a; 一、ContentProvider已启动的情况 1. ​​核心流程​​ ​​触发条件​​&#xff1a;当其他组件&#xff08;如Activity、Service&#xff09;通过ContentR…...

机器学习的数学基础:线性模型

线性模型 线性模型的基本形式为&#xff1a; f ( x ) ω T x b f\left(\boldsymbol{x}\right)\boldsymbol{\omega}^\text{T}\boldsymbol{x}b f(x)ωTxb 回归问题 利用最小二乘法&#xff0c;得到 ω \boldsymbol{\omega} ω和 b b b的参数估计$ \boldsymbol{\hat{\omega}}…...

【PX4飞控】mavros gps相关话题分析,经纬度海拔获取方法,卫星数锁定状态获取方法

使用 ROS1-Noetic 和 mavros v1.20.1&#xff0c; 携带经纬度海拔的话题主要有三个&#xff1a; /mavros/global_position/raw/fix/mavros/gpsstatus/gps1/raw/mavros/global_position/global 查看 mavros 源码&#xff0c;来分析他们的发布过程。发现前两个话题都对应了同一…...

python读取SQLite表个并生成pdf文件

代码用于创建含50列的SQLite数据库并插入500行随机浮点数据&#xff0c;随后读取数据&#xff0c;通过ReportLab生成横向PDF表格&#xff0c;包含格式化&#xff08;两位小数&#xff09;及表头、网格线等美观样式。 # 导入所需库 import sqlite3 # 用于操作…...

云原生时代的系统设计:架构转型的战略支点

&#x1f4dd;个人主页&#x1f339;&#xff1a;一ge科研小菜鸡-CSDN博客 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 一、云原生的崛起&#xff1a;技术趋势与现实需求的交汇 随着企业业务的互联网化、全球化、智能化持续加深&#xff0c;传统的 I…...