当前位置: 首页 > news >正文

FlinkSQL处理Canal-JSON数据

背景信息

Canal是一个CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将MySQL变更传输到其他系统。Canal为变更日志提供了统一的数据格式,并支持使用JSON或protobuf序列化消息(Canal默认使用protobuf)。支持Canal格式的连接器有消息队列Kafka和对象存储OSS。

Flink支持将Canal的JSON消息解析为INSERT、UPDATE或DELETE消息到Flink SQL系统中。在很多情况下,利用Canal这个特性非常的有用,例如:

  • 将增量数据从数据库同步到其他系统

  • 日志审计

  • 数据库的实时物化视图

  • 数据库表的temporal join变更历史

Flink还支持将Flink SQL中的INSERT、UPDATE或DELETE消息编码为Canal格式的JSON消息,输出到Kafka等存储中。

重要

目前Flink还不支持将UPDATE_BEFORE和UPDATE_AFTER合并为一条UPDATE消息。因此,Flink将UPDATE_BEFORE和UPDATE_AFTER分别编码为DELETE和INSERT类型的Canal消息。

将Kafka topic注册成Flink表之后,您可以将Canal消息用作变更日志源。

-- 关于MySQL "products" 表的实时物化视图。
-- 计算相同产品的最新平均重量。
SELECT name, AVG(weight) FROM topic_products GROUP BY name;-- 将MySQL "products" 表的所有数据和增量更改同步到Elasticsearch "products" 索引以供将来搜索。
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

配置选项

选项

要求

默认

类型

描述

format

必填

(none)

String

指定要使用的格式,使用Canal格式时,参数取值为canal-json。

canal-json.ignore-parse-errors

选填

false

Boolean

参数取值如下:

  • true:当解析异常时,跳过当前字段或行。

  • false(默认值):报出错误,作业启动失败。

canal-json.timestamp-format.standard

选填

SQL

String

指定输入和输出时间戳格式。参数取值如下:

  • SQL:解析yyyy-MM-dd HH:mm:ss.s{precision}格式的输入时间戳,例如2020-12-30 12:13:14.123,并以相同格式输出时间戳。

  • ISO-8601:解析yyyy-MM-ddTHH:mm:ss.s{precision}格式的输入时间戳,例如2020-12-30T12:13:14.123,并以相同的格式输出时间戳。

canal-json.map-null-key.mode

选填

FAIL

String

指定处理Map中key值为空的方法。参数取值如下:

  • FAIL:在Map中key值为空的时候抛出异常。

  • DROP:丢弃Map中key值为空的数据项。

  • LITERAL:使用字符串常量来替换Map中的空key值。字符串常量的值由canal-json.map-null-key.literal定义。

canal-json.map-null-key.literal

选填

null

String

当canal-json.map-null-key.mode的值是LITERAL时,指定字符串常量替换Map中的空key值。

canal-json.encode.decimal-as-plain-number

选填

false

Boolean

参数取值如下:

  • true:所有DECIMAL类型的数据保持原状,不使用科学计数法表示,例如0.000000027表示为0.000000027。

  • false:所有DECIMAL类型的数据,使用科学计数法表示,例如0.000000027表示为2.7E-8。

canal-json.database.include

选填

(none)

String

一个可选的正则表达式,通过正则匹配Canal记录中的database元字段,仅读取指定数据库的changelog记录。正则字符串与Java的Pattern兼容。

canal-json.table.include

选填

(none)

String

一个可选的正则表达式,通过正则匹配Canal记录中的table元字段,仅读取指定表的changelog记录。正则字符串与Java的Pattern兼容。

类型映射

目前,Canal使用JSON格式进行序列化和反序列化。有关数据类型映射的更多详细信息,请参阅JSON Format。Canal格式额外兼容了数据传输服务DTS在Kafka集群存储使用的Canal扩展变更类型(INIT)。请参见Kafka集群的数据存储格式。

其他使用说明

可用的元数据

下面的格式元数据可以在DDL语句中声明为只读(VIRTUAL)列。

重要

格式元数据字段只有在相应的连接器转发格式元数据时才可用。目前,只有Kafka连接器能够声明其值格式的元数据字段。

数据类型

说明

database

STRING NULL

原始数据库。对应于Canal记录中的database字段。

table

STRING NULL

原始数据库的表。对应于Canal记录中的table字段。

sql-type

MAP<STRING, INT> NULL

各种sql类型的映射。对应于Canal记录中的sqlType字段。

pk-names

ARRAY<STRING> NULL

主键名称数组。对应于Canal记录中的pkNames字段。

ingestion-timestamp

TIMESTAMP_LTZ(3) NULL

连接器处理事件时的时间戳。对应于Canal记录中的ts字段。

如何在Kafka中访问Canal元数据字段的代码示例如下。

CREATE TABLE KafkaTable (origin_database STRING METADATA FROM 'value.database' VIRTUAL,origin_table STRING METADATA FROM 'value.table' VIRTUAL,origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,user_id BIGINT,item_id BIGINT,behavior STRING
) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','value.format' = 'canal-json'
);

常见问题

故障时投递重复的变更事件

在正常的操作环境下,Canal能够以exactly-once的语义投递每条变更事件,Flink能够正常消费Canal产生的变更事件。在非正常情况下(例如有故障发生),Canal只能保证at-least-once的投递语义。此时,Canal可能会投递重复的变更事件到Kafka中,当Flink从Kafka中消费的时候就会得到重复的事件,可能导致Flink query的运行得到错误的结果或者非预期的异常。因此,在这种情况下,建议将作业参数table.exec.source.cdc-events-duplicate设置成true,并在该source上定义PRIMARY KEY。Flink系统会生成一个额外的有状态算子,使用该PRIMARY KEY来对变更事件去重并生成一个规范化的changelog流。​

 

参考:Canal格式的使用方法和类型映射_实时计算 Flink版(Flink)-阿里云帮助中心 

相关文章:

FlinkSQL处理Canal-JSON数据

背景信息 Canal是一个CDC&#xff08;ChangeLog Data Capture&#xff0c;变更日志数据捕获&#xff09;工具&#xff0c;可以实时地将MySQL变更传输到其他系统。Canal为变更日志提供了统一的数据格式&#xff0c;并支持使用JSON或protobuf序列化消息&#xff08;Canal默认使用…...

玩转贝启科技BQ3588C开源鸿蒙系统开发板 —— DevEco Studio下载与安装

一、下载DevEco Studio IDE开发工具 1. 登录鸿蒙官网 网址为&#xff1a; ​​​​​​​华为HarmonyOS智能终端操作系统官网 | 应用设备分布式开发者生态 页面如下&#xff1a; 2. 搜索“DevEco Studio IDE” 点击右上角的“请输入关键词”&#xff0c;在其中搜索“DevEc…...

大模型上下文长度的超强扩展:从LongLora到LongQLora

前言 本文一开始是《七月论文审稿GPT第2版&#xff1a;从Meta Nougat、GPT4审稿到Mistral、LongLora Llama》中4.3节的内容&#xff0c;但考虑到 一方面&#xff0c;LongLora的实用性较高二方面&#xff0c;为了把LongLora和LongQLora更好的写清楚&#xff0c;而不至于受篇幅…...

pdf格式转换为txt格式

pdf文档转换为txt文档 首先在python3虚拟环境中安装PyPDF2 Python 3.6.8 (default, Jun 20 2023, 11:53:23) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux Type "help", "copyright", "credits" or "license" for more infor…...

scss使用for循环遍历,动态赋值类名并配置不同颜色

需求&#xff1a;后端要传入不同的等级&#xff0c;前端通过等级展示不同的字体颜色&#xff0c;通过scss遍历更有利于动态修改颜色或者增删等级 1.通过 for $i from 1 through 4 定义循环&#xff0c;索引值为i 2.nth($colors, $i) 取出对应的颜色 $colors: #ff0000, #00ff…...

GaussDB数据库使用COPY命令导数

目录 一、前言 二、GaussDB数据库使用COPY命令导数语法 1、语法COPY FROM 2、语法COPY TO 3、特别说明及参数示意 三、GaussDB数据库使用COPY命令导数示例 1、操作步骤 2、准备工作&#xff08;示例&#xff09; 3、把一个表的数据拷贝到一个文件&#xff08;示例&…...

SunFMEA软件免费试用:FMEA的目标和限制是什么?

免费试用FMEA软件-免费版-SunFMEA FMEA&#xff0c;即故障模式与影响分析&#xff0c;是一种预防性的质量工具&#xff0c;旨在识别、评估和优先处理潜在的故障模式及其对系统性能的影响。其目标是提高产品和过程的可靠性和安全性&#xff0c;降低产品故障的风险&#xff0c;并…...

【Redis交响乐】Redis中的数据类型/内部编码/单线程模型

文章目录 一. Redis中的数据类型和内部编码二. Redis的单线程模型面试题: redis是单线程模型,为什么效率之高,速度之快呢? 在上一篇博客中我们讲述了Redis中的通用命令,本篇博客中我们将围绕每个数据结构来介绍相关命令. 一. Redis中的数据类型和内部编码 type命令实际返回的…...

APK 瘦身

APK 瘦身的主要原因是考虑应用的下载转化率和留存率&#xff0c;应用太大了&#xff0c;用户可能就不下载了。再者&#xff0c;因为手机空间问题&#xff0c;用户有可能会卸载一些占用空间比较大的应用&#xff0c;所以&#xff0c;应用的大小也会影响留存率。 1 APK 的结构 …...

GitHub上的15000个Go模块存储库易受劫持攻击

内容概要&#xff1a; 目前研究发现&#xff0c;GitHub上超过15000个Go模块存储库容易受到一种名为“重新劫持”的攻击。 由于GitHub用户名的更改会造成9000多个存储库容易被重新劫持&#xff0c;同时因为帐户删除&#xff0c;会对6000多个存储库造成重新劫持的危机。目前统计…...

避免3ds Max效果图渲染一片黑的4个正确解决方法

在进行3ds Max效果图渲染时&#xff0c;有时候会遇到渲染一片黑的情况&#xff0c;这给我们的工作带来了很大的困扰。为了解决这个问题&#xff0c;下面我将介绍4个正确的解决方法。 1.相机位置 首先需要考虑场景内的相机位置是否有问题。如果相机放在了模型的内部或者墙体的外…...

UI演示双视图立体匹配与重建

相关文章&#xff1a; PyQt5和Qt designer的详细安装教程&#xff1a;https://blog.csdn.net/qq_43811536/article/details/135185233?spm1001.2014.3001.5501Qt designer界面和所有组件功能的详细介绍&#xff1a;https://blog.csdn.net/qq_43811536/article/details/1351868…...

添加一个编辑的小功能(PHP的Laravel)

一个编辑的按钮可以弹出会话框修改断更天数 前台 加一个编辑按钮的样式&#xff0c;他的名字是固定好的 之前有人封装过直接用就好&#xff0c;但是一定放在class里面&#xff0c;不要放在id里面 看见不认识的方法一定要去看里面封装的是什么 之前就是没有看&#xff0c;所以…...

YOLOv8改进 | 主干篇 | ConvNeXtV2全卷积掩码自编码器网络

一、本文介绍 本文给大家带来的改进机制是ConvNeXtV2网络,ConvNeXt V2是一种新型的卷积神经网络架构,它融合了自监督学习技术和架构改进,特别是加入了全卷积掩码自编码器框架和全局响应归一化(GRN)层。我将其替换YOLOv8的特征提取网络,用于提取更有用的特征。经过我的实…...

elasticsearch7.17.9两节点集群改为单节点

需求 将数据从node-23-1节点中迁移到node-83-1节点。但是现在node-83-1并没有加入到集群中&#xff0c;因此首先将node-83-1加入到node-23-1的集群 解决方案 使用ES版本为7.17.9&#xff0c;最开始设置集群为一个节点&#xff0c;node-23-1的配置如下 cluster.name: my-app…...

二叉树的层序遍历,力扣

目录 题目地址&#xff1a; 题目&#xff1a; 我们直接看题解吧&#xff1a; 解题方法&#xff1a; 方法分析&#xff1a; 解题分析&#xff1a; 解题思路&#xff1a; 代码实现&#xff1a; 代码补充说明&#xff1a; 题目地址&#xff1a; 102. 二叉树的层序遍历 - 力扣&…...

构建Dockerfile报错/bin/sh: 1: cd: can‘t cd to /xxx/yyy问题记录

目录 关键的命令行 排查分析 原因 附&#xff1a;Dockerfile构建时打印命令输出的办法 关键的命令行 WORKDIR /app COPY record . RUN cd record && xxx 执行到RUN时报了错&#xff1a; /bin/sh: 1: cd: cant cd to /app/record 并且宿主机当前目录也准备好了re…...

Vue常用的修饰符详解(有哪些,怎么用)

文章目录 一、修饰符是什么二、修饰符的作用1.表单修饰符lazytrimnumber 2.事件修饰符stoppreventselfoncecapturepassivenative 3.鼠标按钮修饰符4.键盘修饰符5.v-bind修饰符asyncpropscamel 三、应用场景参考文献 一、修饰符是什么 在程序世界里&#xff0c;修饰符是用于限定…...

Linux C/C++ 获取CPUID

实现方式&#xff1a; INTEL CC 格式 AT^T CC 格式 GCC/C库 __cpuid 宏 大致讲义&#xff1a; AT^T 格式汇编很反人类&#xff0c;GCC可以改编译器选项为INTEL内嵌汇编&#xff0c;但一般在GCC还是按照默认的AT^T汇编来拽写把&#xff0c;不想用也可以让AI工具把INTEL内嵌…...

2023年“中银杯”安徽省网络安全B模块(部分解析)

前言 以下是2023年中银杯安徽省网络安全B模块题目&#xff0c;镜像可以私聊我 B模块安全事件响应/网络安全数据取证/应用安全&#xff08;400 分&#xff09; B-1&#xff1a;CMS网站渗透测试 任务环境说明&#xff1a; √服务器场景&#xff1a;Server2206&#xff08;关…...

Hperledger Fabric入门课程3 ——软硬件环境

购买专栏前请认真阅读:《Fabric项目学习笔记》专栏介绍 1. 硬件环境 不论是在当前系统上运行、云服务器还是虚拟机,建议内存4G或以上,硬盘空间建议50G以上。 2. 操作系统 Fabric 的操作一般在Linux 或 MacOS上,Mac暂时不支持Apple Silicon芯片即m1以后的芯片。 如果读者…...

如何实现Airbyte动态服务发现:从基础到实践的完整指南

如何实现Airbyte动态服务发现&#xff1a;从基础到实践的完整指南 【免费下载链接】airbyte Open-source data movement for ELT pipelines and AI agents — from APIs, databases & files to warehouses, lakes, and AI applications. Both self-hosted and Cloud. 项目…...

Shoelace自动加载器:终极懒加载Web组件完整指南 [特殊字符]

Shoelace自动加载器&#xff1a;终极懒加载Web组件完整指南 &#x1f680; 【免费下载链接】shoelace Shoelace is now Web Awesome. Come see what’s new! 项目地址: https://gitcode.com/gh_mirrors/sh/shoelace Shoelace自动加载器是Shoelace Web组件库中一个革命性…...

构建AI智能体技能超市:标准化工作流与多平台适配实践

1. 项目概述&#xff1a;一个面向AI智能体的“技能超市”如果你和我一样&#xff0c;每天都在和Codex、Claude、Cursor这些AI助手打交道&#xff0c;那你肯定也遇到过这样的场景&#xff1a;想让AI帮你生成一份规范的Git提交信息、自动更新文档索引&#xff0c;或者为一个新项目…...

基于PanoSim5.0虚拟仿真平台的自主代客泊车AVP系统开发教程

1. PanoSim5.0与AVP系统开发入门指南 第一次接触PanoSim5.0时&#xff0c;我和大多数开发者一样被它丰富的功能模块震撼到了。这个国产仿真平台不仅支持高精度的车辆动力学建模&#xff0c;还能实现逼真的传感器仿真和环境渲染。对于自主代客泊车(AVP)这种需要反复测试的场景来…...

《蔚蓝档案》主题鼠标指针:从设计到安装的完整指南

1. 项目概述&#xff1a;为你的桌面注入《蔚蓝档案》的活力如果你和我一样&#xff0c;既是《蔚蓝档案》的玩家&#xff0c;又是个喜欢折腾桌面美化的爱好者&#xff0c;那么看到一套高质量的游戏主题鼠标指针&#xff0c;那种“必须拥有”的心情我完全理解。今天要聊的这个项目…...

为什么92%的AI团队误用DeepSeek Serverless?——基于37家客户架构审计报告的5大认知断层与重构路径

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;为什么92%的AI团队误用DeepSeek Serverless&#xff1f; DeepSeek Serverless 本为轻量推理与函数即服务&#xff08;FaaS&#xff09;场景设计&#xff0c;但大量团队将其当作通用模型托管平台使用&am…...

移动SoC设计演进:从骁龙600/400系列看芯片战略与体验竞争

1. 从一场发布会看移动芯片的十年演进2015年2月&#xff0c;巴塞罗那世界移动通信大会前夕&#xff0c;高通的一则新闻稿在业内激起了不小的涟漪。他们宣布了全新的骁龙600和400系列移动平台&#xff0c;其中最引人注目的&#xff0c;是首次将当时ARM最新的64位Cortex-A72核心引…...

网安信息收集

声明&#xff1a;任何个人和组织不得从事非法侵入他人网络、干扰他人网络正常功能、窃取网络数据等危害网络安全 的活动&#xff1b;不得提供专门用于从事侵入网络、干扰网络正常功能及防护措施、窃取网络数据等危害网络安全活动的程序、工具&#xff1b;明知他人从事危害网络安…...

YOLO26改进 | MSHC多尺度异构卷积:用方形核与条带核捕获复杂空间纹理,以清晰动机打造超强创新!

# YOLO26改进最新创新改进系列 | MSHC多尺度异构卷积&#xff1a;用方形核与条带核捕获复杂空间纹理&#xff0c;以清晰动机打造超强创新&#xff01; 购买相关资料后畅享一对一答疑&#xff01; 畅享超多免费持续更新且可大幅度提升文章档次的纯干货工具&#xff01; 这篇采用…...