当前位置: 首页 > news >正文

FlinkCDC for mysql to Clickhouse

完整依赖

<dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.0</version></dependency><!--       <dependency>-->
<!--           <groupId>org.apache.flink</groupId>-->
<!--           <artifactId>flink-jdbc_2.12</artifactId>-->
<!--           <version>1.10.3</version>-->
<!--       </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.0</version><type>test-jar</type></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.4.0</version></dependency><dependency><groupId>com.aliyun</groupId><artifactId>flink-connector-clickhouse</artifactId><version>1.12.0</version></dependency><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.6</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency></dependencies>

Flink CDC

package name.lijiaqi.cdc;import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.google.gson.Gson;
import com.google.gson.internal.LinkedTreeMap;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.HashMap;public class MySqlBinlogSourceExample {public static void main(String[] args) throws Exception {SourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("localhost").port(3306).databaseList("test").username("flinkcdc").password("dafei1288").deserializer(new JsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 添加 sourceenv.addSource(sourceFunction)// 添加 sink.addSink(new ClickhouseSink());env.execute("mysql2clickhouse");}// 将cdc数据反序列化public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {Gson jsstr = new Gson();HashMap<String, Object> hs = new HashMap<>();String topic = sourceRecord.topic();String[] split = topic.split("[.]");String database = split[1];String table = split[2];hs.put("database",database);hs.put("table",table);//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);//获取数据本身Struct struct = (Struct)sourceRecord.value();Struct after = struct.getStruct("after");if (after != null) {Schema schema = after.schema();HashMap<String, Object> afhs = new HashMap<>();for (Field field : schema.fields()) {afhs.put(field.name(), after.get(field.name()));}hs.put("data",afhs);}String type = operation.toString().toLowerCase();if ("create".equals(type)) {type = "insert";}hs.put("type",type);collector.collect(jsstr.toJson(hs));}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}public static class ClickhouseSink extends RichSinkFunction<String>{Connection connection;PreparedStatement pstmt;private Connection getConnection() {Connection conn = null;try {Class.forName("ru.yandex.clickhouse.ClickHouseDriver");String url = "jdbc:clickhouse://localhost:8123/default";conn = DriverManager.getConnection(url,"default","dafei1288");} catch (Exception e) {e.printStackTrace();}return conn;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = getConnection();String sql = "insert into sink_ch_test(id,name,description) values (?,?,?)";pstmt = connection.prepareStatement(sql);}// 每条记录插入时调用一次public void invoke(String value, Context context) throws Exception {//{"database":"test","data":{"name":"jacky","description":"fffff","id":8},"type":"insert","table":"test_cdc"}Gson t = new Gson();HashMap<String,Object> hs = t.fromJson(value,HashMap.class);String database = (String)hs.get("database");String table = (String)hs.get("table");String type = (String)hs.get("type");if("test".equals(database) && "test_cdc".equals(table)){if("insert".equals(type)){System.out.println("insert => "+value);LinkedTreeMap<String,Object> data = (LinkedTreeMap<String,Object>)hs.get("data");String name = (String)data.get("name");String description = (String)data.get("description");Double id = (Double)data.get("id");// 未前面的占位符赋值pstmt.setInt(1, id.intValue());pstmt.setString(2, name);pstmt.setString(3, description);pstmt.executeUpdate();}}}@Overridepublic void close() throws Exception {super.close();if(pstmt != null) {pstmt.close();}if(connection != null) {connection.close();}}}
}

Flink SQL CDC

package name.lijiaqi.cdc;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class MysqlToMysqlMain {public static void main(String[] args) throws Exception {EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);// 数据源表String sourceDDL ="CREATE TABLE mysql_binlog (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'username' = 'flinkcdc',\n" +" 'password' = 'dafei1288',\n" +" 'database-name' = 'test',\n" +" 'table-name' = 'test_cdc'\n" +")";String url = "jdbc:mysql://127.0.0.1:3306/test";String userName = "root";String password = "dafei1288";String mysqlSinkTable = "test_cdc_sink";// 输出目标表String sinkDDL ="CREATE TABLE test_cdc_sink (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING,\n" +" PRIMARY KEY (id) NOT ENFORCED \n " +") WITH (\n" +" 'connector' = 'jdbc',\n" +" 'driver' = 'com.mysql.jdbc.Driver',\n" +" 'url' = '" + url + "',\n" +" 'username' = '" + userName + "',\n" +" 'password' = '" + password + "',\n" +" 'table-name' = '" + mysqlSinkTable + "'\n" +")";// 简单的聚合处理String transformSQL ="insert into test_cdc_sink select * from mysql_binlog";tableEnv.executeSql(sourceDDL);tableEnv.executeSql(sinkDDL);TableResult result = tableEnv.executeSql(transformSQL);// 等待flink-cdc完成快照result.print();env.execute("sync-flink-cdc");}}

相关文章:

FlinkCDC for mysql to Clickhouse

完整依赖 <dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.13.0</version>…...

沃通SSL证书服务多省区一体化政务服务平台

近年来&#xff0c;我国政务服务数字化水平不断提升&#xff0c;数字政府建设取得积极成效。依托全国一体化政务服务平台&#xff0c;政务服务效能不断提升&#xff0c;“一网通办”能力显著增强&#xff0c;为创新政府治理、优化营商环境提供了有力支撑。沃通SSL证书具备保护数…...

Linux程序地址

目录 一、定义 二、问题引出 三、虚拟地址和物理地址 &#xff08;一&#xff09;问题解释 &#xff08;二&#xff09;什么是进程地址空间 &#xff08;三&#xff09;为什么要有进程地址空间 一、定义 #include <stdio.h> #include <stdlib.h>//geten…...

华为OD k 对元素最小值(100分)【java】A卷+B卷

华为OD统一考试A卷+B卷 新题库说明 你收到的链接上面会标注A卷还是B卷。目前大部分收到的都是B卷。 B卷对应20022部分考题以及新出的题目,A卷对应的是新出的题目。 我将持续更新最新题目 获取更多免费题目可前往夸克网盘下载,请点击以下链接进入: 我用夸克网盘分享了「华为O…...

解决Unity打包时,Android SDK 报错问题

报错内容应该包括类似如下信息&#xff1a; CommandInvokationFailure: Failed to update Android SDK package list. java.lang.UnsupportedClassVersionError: com/android/prefs/AndroidLocationsProvider has been compiled by a more recent version of the Java Runtim…...

基于nodejs+vue 校园通勤车系统

但是管理好校园通勤车可视化又面临很多麻烦需要解决, 信息化已经成为主流,开发一个校园通勤车可视化系统小程序一方面的可能会更合乎时宜,困扰管理层的许多问题当中,校园通勤车 管理也是不敢忽视的一块。另一方面来说也可以提高在校园通勤车可视化管理方面的效率给相关管理人员…...

【JavaEE】Java多线程编程案例 -- 多线程篇(3)

Java多线程编程案例 1. 单例模式1.1 代码的简单实现1.2 懒汉模式的线程安全代码 2. 阻塞队列2.1 阻塞队列的概念2.2 使用库中的BlockingDeque2.3 模拟实现阻塞队列2.4 生产者消费者模型 3. 定时器3.1 概念3.2 使用库的定时器 - Timer类3.3 模拟实现定时器 4. 线程池4.1 概念4.2…...

axios发送常见请求方式以及拦截器的封装

一&#xff0c;常见请求 //1.get--传递paramsaxios.get("/test",{params:{}})//2.post--传递paramsaxios.post("/test",{},{params:{}}) //3.post--传递bodyaxios.post("/test",{name:""}) 二&#xff0c;封装请求拦截器 import ax…...

Apollo中的身份验证与授权:保护你的数据

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 「推荐专栏」&#xff1a; ★java一站式服务 ★ ★ React从入门到精通★ ★前端炫酷代码分享 ★ ★ 从0到英雄&#xff0c;vue成神之路★ ★ uniapp-从构建到提升★ ★ 从0到英雄&#xff…...

斜率优化dp

f i min ⁡ ( a j − j i ) f_i\min(a_j - j \times i) fi​min(aj​−ji) 考虑变成点对 ( j , a j ) (j,a_j) (j,aj​)&#xff0c;则 f i Y j − X j i f_iY_j-X_ji fi​Yj​−Xj​i 令 i k , f i b ik, f_ib ik,fi​b&#xff0c;得 b Y j − X j k bY_j-X_jk b…...

华为OD 打印任务排序(100分)【java】A卷+B卷

华为OD统一考试A卷+B卷 新题库说明 你收到的链接上面会标注A卷还是B卷。目前大部分收到的都是B卷。 B卷对应20022部分考题以及新出的题目,A卷对应的是新出的题目。 我将持续更新最新题目 获取更多免费题目可前往夸克网盘下载,请点击以下链接进入: 我用夸克网盘分享了「华为O…...

最新Ai写作创作系统源码+Ai绘画系统源码+搭建部署教程+支持GPT4.0+支持Prompt预设应用+思维导图生成

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统AI绘画系统&#xff0c;支持OpenAI GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署…...

FPGA【紫光语法】

寄存器数据类型&#xff1a; reg 默认为 1 bit wide&#xff0c;如果超过 1 bit&#xff0c;则需要 range declaration 设置 reg 的位宽integer 默认位宽为 32 bit&#xff0c;不允许有 range declarationtime 默认位宽为 64 bit&#xff0c;不允许有 range declarat…...

运维监控Zabbix部署

目录 运维监控Zabbix部署 1. 简介 2. 安装 ​编辑 2.1 安装前准备 - Mysql 2.2 安装Zabbix Server 和 Zabbix Agent 2.2.1 安装Zabbix yum库 2.2.2 安装Zabbix Server、前端、Agent 2.2.3 初始化Mysql数据库 2.2.4 为Zabbix Server配置数据库 2.2.5 配置Zab…...

vue与react,angular的区别

Vue.js 作为一个优秀的前端框架&#xff0c;方便前端开发者快速开发应用的前端&#xff0c;在实际项目中使用得比较普遍。 当然 Vue.js 也不是实际项目中唯一的前端框架&#xff0c;比较优秀的前端框架还有 React、AngularJS 和 Angular等。接下来就介绍一下 Vue.js 同这3个框架…...

水质分析仪MQTT应用案例

水质分析仪MQTT应用案例 一、公司介绍 某仪器股份有限公司&#xff0c;集研发&#xff0c;生产&#xff0c;销售于一体的水质分析仪器公司。产品主要包括PH/ORP分析仪&#xff0c;电导度分析仪&#xff0c;溶氧分析仪&#xff0c;离子浓度分析仪&#xff0c;浊度分析仪及重金…...

网络代理技术的护航与网络安全

在数字化时代&#xff0c;网络代理技术日益重要&#xff0c;不仅可维护网络安全&#xff0c;还能促进数据获取。本文深入探讨Socks5代理、IP代理以及它们在网络安全、爬虫、HTTP协议中的应用&#xff0c;助您深刻了解这些技术。 1. Socks5代理&#xff1a;网络安全与多协议支持…...

大模型LLM相关面试题整理-PEFT

Prefix/Prompt-Tuning&#xff1a;在模型的输入或隐层添加 个额外可训练的前缀 tokens&#xff08;这些前缀是连续的伪 tokens&#xff0c;不对应真实的 tokens&#xff09;&#xff0c;只训练这些前缀参数&#xff1b; Adapter-Tuning&#xff1a;将较小的神经网络层或模块插入…...

65_Pandas显示设置(小数位数、有效数字、最大行/列数等)

65_Pandas显示设置&#xff08;小数位数、有效数字、最大行/列数等&#xff09; 本文介绍了使用 print() 函数显示 pandas.DataFrame、pandas.Series 等时如何更改设置&#xff08;小数点后位数、有效数字、最大行/列数等&#xff09;。 有关如何检查、更改和重置设置值的详细…...

一个失败架构升级案例

架构师的核心能力-抽象能力 在做架构升级的时候&#xff0c; 升级开始&#xff1a; 升级过程&#xff1a; 结束&#xff1a; 虽然升级完了能很好的满足未来的需求&#xff0c;但是在升级的过程中一个需求可能要同时在新老链路里同时实现&#xff0c;风险和工作量加倍。 架构…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

《Qt C++ 与 OpenCV:解锁视频播放程序设计的奥秘》

引言:探索视频播放程序设计之旅 在当今数字化时代,多媒体应用已渗透到我们生活的方方面面,从日常的视频娱乐到专业的视频监控、视频会议系统,视频播放程序作为多媒体应用的核心组成部分,扮演着至关重要的角色。无论是在个人电脑、移动设备还是智能电视等平台上,用户都期望…...

通过Wrangler CLI在worker中创建数据库和表

官方使用文档&#xff1a;Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后&#xff0c;会在本地和远程创建数据库&#xff1a; npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库&#xff1a; 现在&#xff0c;您的Cloudfla…...

JVM虚拟机:内存结构、垃圾回收、性能优化

1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...

代码规范和架构【立芯理论一】(2025.06.08)

1、代码规范的目标 代码简洁精炼、美观&#xff0c;可持续性好高效率高复用&#xff0c;可移植性好高内聚&#xff0c;低耦合没有冗余规范性&#xff0c;代码有规可循&#xff0c;可以看出自己当时的思考过程特殊排版&#xff0c;特殊语法&#xff0c;特殊指令&#xff0c;必须…...

elementUI点击浏览table所选行数据查看文档

项目场景&#xff1a; table按照要求特定的数据变成按钮可以点击 解决方案&#xff1a; <el-table-columnprop"mlname"label"名称"align"center"width"180"><template slot-scope"scope"><el-buttonv-if&qu…...

MySQL 主从同步异常处理

阅读原文&#xff1a;https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主&#xff0c;遇到的这个错误&#xff1a; Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一&#xff0c;通常表示&#xff…...

相关类相关的可视化图像总结

目录 一、散点图 二、气泡图 三、相关图 四、热力图 五、二维密度图 六、多模态二维密度图 七、雷达图 八、桑基图 九、总结 一、散点图 特点 通过点的位置展示两个连续变量之间的关系&#xff0c;可直观判断线性相关、非线性相关或无相关关系&#xff0c;点的分布密…...

【iOS】 Block再学习

iOS Block再学习 文章目录 iOS Block再学习前言Block的三种类型__ NSGlobalBlock____ NSMallocBlock____ NSStackBlock__小结 Block底层分析Block的结构捕获自由变量捕获全局(静态)变量捕获静态变量__block修饰符forwarding指针 Block的copy时机block作为函数返回值将block赋给…...

大数据驱动企业决策智能化的路径与实践

&#x1f4dd;个人主页&#x1f339;&#xff1a;慌ZHANG-CSDN博客 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; 一、引言&#xff1a;数据驱动的企业竞争力重构 在这个瞬息万变的商业时代&#xff0c;“快者胜”的竞争逻辑愈发明显。企业如何在复杂环…...