当前位置: 首页 > news >正文

FlinkCDC for mysql to Clickhouse

完整依赖

<dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.0</version></dependency><!--       <dependency>-->
<!--           <groupId>org.apache.flink</groupId>-->
<!--           <artifactId>flink-jdbc_2.12</artifactId>-->
<!--           <version>1.10.3</version>-->
<!--       </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.0</version><type>test-jar</type></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.4.0</version></dependency><dependency><groupId>com.aliyun</groupId><artifactId>flink-connector-clickhouse</artifactId><version>1.12.0</version></dependency><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.6</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency></dependencies>

Flink CDC

package name.lijiaqi.cdc;import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.google.gson.Gson;
import com.google.gson.internal.LinkedTreeMap;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.HashMap;public class MySqlBinlogSourceExample {public static void main(String[] args) throws Exception {SourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("localhost").port(3306).databaseList("test").username("flinkcdc").password("dafei1288").deserializer(new JsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 添加 sourceenv.addSource(sourceFunction)// 添加 sink.addSink(new ClickhouseSink());env.execute("mysql2clickhouse");}// 将cdc数据反序列化public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {Gson jsstr = new Gson();HashMap<String, Object> hs = new HashMap<>();String topic = sourceRecord.topic();String[] split = topic.split("[.]");String database = split[1];String table = split[2];hs.put("database",database);hs.put("table",table);//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);//获取数据本身Struct struct = (Struct)sourceRecord.value();Struct after = struct.getStruct("after");if (after != null) {Schema schema = after.schema();HashMap<String, Object> afhs = new HashMap<>();for (Field field : schema.fields()) {afhs.put(field.name(), after.get(field.name()));}hs.put("data",afhs);}String type = operation.toString().toLowerCase();if ("create".equals(type)) {type = "insert";}hs.put("type",type);collector.collect(jsstr.toJson(hs));}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}public static class ClickhouseSink extends RichSinkFunction<String>{Connection connection;PreparedStatement pstmt;private Connection getConnection() {Connection conn = null;try {Class.forName("ru.yandex.clickhouse.ClickHouseDriver");String url = "jdbc:clickhouse://localhost:8123/default";conn = DriverManager.getConnection(url,"default","dafei1288");} catch (Exception e) {e.printStackTrace();}return conn;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = getConnection();String sql = "insert into sink_ch_test(id,name,description) values (?,?,?)";pstmt = connection.prepareStatement(sql);}// 每条记录插入时调用一次public void invoke(String value, Context context) throws Exception {//{"database":"test","data":{"name":"jacky","description":"fffff","id":8},"type":"insert","table":"test_cdc"}Gson t = new Gson();HashMap<String,Object> hs = t.fromJson(value,HashMap.class);String database = (String)hs.get("database");String table = (String)hs.get("table");String type = (String)hs.get("type");if("test".equals(database) && "test_cdc".equals(table)){if("insert".equals(type)){System.out.println("insert => "+value);LinkedTreeMap<String,Object> data = (LinkedTreeMap<String,Object>)hs.get("data");String name = (String)data.get("name");String description = (String)data.get("description");Double id = (Double)data.get("id");// 未前面的占位符赋值pstmt.setInt(1, id.intValue());pstmt.setString(2, name);pstmt.setString(3, description);pstmt.executeUpdate();}}}@Overridepublic void close() throws Exception {super.close();if(pstmt != null) {pstmt.close();}if(connection != null) {connection.close();}}}
}

Flink SQL CDC

package name.lijiaqi.cdc;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class MysqlToMysqlMain {public static void main(String[] args) throws Exception {EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);// 数据源表String sourceDDL ="CREATE TABLE mysql_binlog (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'username' = 'flinkcdc',\n" +" 'password' = 'dafei1288',\n" +" 'database-name' = 'test',\n" +" 'table-name' = 'test_cdc'\n" +")";String url = "jdbc:mysql://127.0.0.1:3306/test";String userName = "root";String password = "dafei1288";String mysqlSinkTable = "test_cdc_sink";// 输出目标表String sinkDDL ="CREATE TABLE test_cdc_sink (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING,\n" +" PRIMARY KEY (id) NOT ENFORCED \n " +") WITH (\n" +" 'connector' = 'jdbc',\n" +" 'driver' = 'com.mysql.jdbc.Driver',\n" +" 'url' = '" + url + "',\n" +" 'username' = '" + userName + "',\n" +" 'password' = '" + password + "',\n" +" 'table-name' = '" + mysqlSinkTable + "'\n" +")";// 简单的聚合处理String transformSQL ="insert into test_cdc_sink select * from mysql_binlog";tableEnv.executeSql(sourceDDL);tableEnv.executeSql(sinkDDL);TableResult result = tableEnv.executeSql(transformSQL);// 等待flink-cdc完成快照result.print();env.execute("sync-flink-cdc");}}

相关文章:

FlinkCDC for mysql to Clickhouse

完整依赖 <dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.13.0</version>…...

沃通SSL证书服务多省区一体化政务服务平台

近年来&#xff0c;我国政务服务数字化水平不断提升&#xff0c;数字政府建设取得积极成效。依托全国一体化政务服务平台&#xff0c;政务服务效能不断提升&#xff0c;“一网通办”能力显著增强&#xff0c;为创新政府治理、优化营商环境提供了有力支撑。沃通SSL证书具备保护数…...

Linux程序地址

目录 一、定义 二、问题引出 三、虚拟地址和物理地址 &#xff08;一&#xff09;问题解释 &#xff08;二&#xff09;什么是进程地址空间 &#xff08;三&#xff09;为什么要有进程地址空间 一、定义 #include <stdio.h> #include <stdlib.h>//geten…...

华为OD k 对元素最小值(100分)【java】A卷+B卷

华为OD统一考试A卷+B卷 新题库说明 你收到的链接上面会标注A卷还是B卷。目前大部分收到的都是B卷。 B卷对应20022部分考题以及新出的题目,A卷对应的是新出的题目。 我将持续更新最新题目 获取更多免费题目可前往夸克网盘下载,请点击以下链接进入: 我用夸克网盘分享了「华为O…...

解决Unity打包时,Android SDK 报错问题

报错内容应该包括类似如下信息&#xff1a; CommandInvokationFailure: Failed to update Android SDK package list. java.lang.UnsupportedClassVersionError: com/android/prefs/AndroidLocationsProvider has been compiled by a more recent version of the Java Runtim…...

基于nodejs+vue 校园通勤车系统

但是管理好校园通勤车可视化又面临很多麻烦需要解决, 信息化已经成为主流,开发一个校园通勤车可视化系统小程序一方面的可能会更合乎时宜,困扰管理层的许多问题当中,校园通勤车 管理也是不敢忽视的一块。另一方面来说也可以提高在校园通勤车可视化管理方面的效率给相关管理人员…...

【JavaEE】Java多线程编程案例 -- 多线程篇(3)

Java多线程编程案例 1. 单例模式1.1 代码的简单实现1.2 懒汉模式的线程安全代码 2. 阻塞队列2.1 阻塞队列的概念2.2 使用库中的BlockingDeque2.3 模拟实现阻塞队列2.4 生产者消费者模型 3. 定时器3.1 概念3.2 使用库的定时器 - Timer类3.3 模拟实现定时器 4. 线程池4.1 概念4.2…...

axios发送常见请求方式以及拦截器的封装

一&#xff0c;常见请求 //1.get--传递paramsaxios.get("/test",{params:{}})//2.post--传递paramsaxios.post("/test",{},{params:{}}) //3.post--传递bodyaxios.post("/test",{name:""}) 二&#xff0c;封装请求拦截器 import ax…...

Apollo中的身份验证与授权:保护你的数据

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 「推荐专栏」&#xff1a; ★java一站式服务 ★ ★ React从入门到精通★ ★前端炫酷代码分享 ★ ★ 从0到英雄&#xff0c;vue成神之路★ ★ uniapp-从构建到提升★ ★ 从0到英雄&#xff…...

斜率优化dp

f i min ⁡ ( a j − j i ) f_i\min(a_j - j \times i) fi​min(aj​−ji) 考虑变成点对 ( j , a j ) (j,a_j) (j,aj​)&#xff0c;则 f i Y j − X j i f_iY_j-X_ji fi​Yj​−Xj​i 令 i k , f i b ik, f_ib ik,fi​b&#xff0c;得 b Y j − X j k bY_j-X_jk b…...

华为OD 打印任务排序(100分)【java】A卷+B卷

华为OD统一考试A卷+B卷 新题库说明 你收到的链接上面会标注A卷还是B卷。目前大部分收到的都是B卷。 B卷对应20022部分考题以及新出的题目,A卷对应的是新出的题目。 我将持续更新最新题目 获取更多免费题目可前往夸克网盘下载,请点击以下链接进入: 我用夸克网盘分享了「华为O…...

最新Ai写作创作系统源码+Ai绘画系统源码+搭建部署教程+支持GPT4.0+支持Prompt预设应用+思维导图生成

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统AI绘画系统&#xff0c;支持OpenAI GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署…...

FPGA【紫光语法】

寄存器数据类型&#xff1a; reg 默认为 1 bit wide&#xff0c;如果超过 1 bit&#xff0c;则需要 range declaration 设置 reg 的位宽integer 默认位宽为 32 bit&#xff0c;不允许有 range declarationtime 默认位宽为 64 bit&#xff0c;不允许有 range declarat…...

运维监控Zabbix部署

目录 运维监控Zabbix部署 1. 简介 2. 安装 ​编辑 2.1 安装前准备 - Mysql 2.2 安装Zabbix Server 和 Zabbix Agent 2.2.1 安装Zabbix yum库 2.2.2 安装Zabbix Server、前端、Agent 2.2.3 初始化Mysql数据库 2.2.4 为Zabbix Server配置数据库 2.2.5 配置Zab…...

vue与react,angular的区别

Vue.js 作为一个优秀的前端框架&#xff0c;方便前端开发者快速开发应用的前端&#xff0c;在实际项目中使用得比较普遍。 当然 Vue.js 也不是实际项目中唯一的前端框架&#xff0c;比较优秀的前端框架还有 React、AngularJS 和 Angular等。接下来就介绍一下 Vue.js 同这3个框架…...

水质分析仪MQTT应用案例

水质分析仪MQTT应用案例 一、公司介绍 某仪器股份有限公司&#xff0c;集研发&#xff0c;生产&#xff0c;销售于一体的水质分析仪器公司。产品主要包括PH/ORP分析仪&#xff0c;电导度分析仪&#xff0c;溶氧分析仪&#xff0c;离子浓度分析仪&#xff0c;浊度分析仪及重金…...

网络代理技术的护航与网络安全

在数字化时代&#xff0c;网络代理技术日益重要&#xff0c;不仅可维护网络安全&#xff0c;还能促进数据获取。本文深入探讨Socks5代理、IP代理以及它们在网络安全、爬虫、HTTP协议中的应用&#xff0c;助您深刻了解这些技术。 1. Socks5代理&#xff1a;网络安全与多协议支持…...

大模型LLM相关面试题整理-PEFT

Prefix/Prompt-Tuning&#xff1a;在模型的输入或隐层添加 个额外可训练的前缀 tokens&#xff08;这些前缀是连续的伪 tokens&#xff0c;不对应真实的 tokens&#xff09;&#xff0c;只训练这些前缀参数&#xff1b; Adapter-Tuning&#xff1a;将较小的神经网络层或模块插入…...

65_Pandas显示设置(小数位数、有效数字、最大行/列数等)

65_Pandas显示设置&#xff08;小数位数、有效数字、最大行/列数等&#xff09; 本文介绍了使用 print() 函数显示 pandas.DataFrame、pandas.Series 等时如何更改设置&#xff08;小数点后位数、有效数字、最大行/列数等&#xff09;。 有关如何检查、更改和重置设置值的详细…...

一个失败架构升级案例

架构师的核心能力-抽象能力 在做架构升级的时候&#xff0c; 升级开始&#xff1a; 升级过程&#xff1a; 结束&#xff1a; 虽然升级完了能很好的满足未来的需求&#xff0c;但是在升级的过程中一个需求可能要同时在新老链路里同时实现&#xff0c;风险和工作量加倍。 架构…...

Xshell远程连接Kali(默认 | 私钥)Note版

前言:xshell远程连接&#xff0c;私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

376. Wiggle Subsequence

376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具

文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

【开发技术】.Net使用FFmpeg视频特定帧上绘制内容

目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法&#xff0c;当前调用一个医疗行业的AI识别算法后返回…...

Java求职者面试指南:计算机基础与源码原理深度解析

Java求职者面试指南&#xff1a;计算机基础与源码原理深度解析 第一轮提问&#xff1a;基础概念问题 1. 请解释什么是进程和线程的区别&#xff1f; 面试官&#xff1a;进程是程序的一次执行过程&#xff0c;是系统进行资源分配和调度的基本单位&#xff1b;而线程是进程中的…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...

学习一下用鸿蒙​​DevEco Studio HarmonyOS5实现百度地图

在鸿蒙&#xff08;HarmonyOS5&#xff09;中集成百度地图&#xff0c;可以通过以下步骤和技术方案实现。结合鸿蒙的分布式能力和百度地图的API&#xff0c;可以构建跨设备的定位、导航和地图展示功能。 ​​1. 鸿蒙环境准备​​ ​​开发工具​​&#xff1a;下载安装 ​​De…...

阿里云Ubuntu 22.04 64位搭建Flask流程(亲测)

cd /home 进入home盘 安装虚拟环境&#xff1a; 1、安装virtualenv pip install virtualenv 2.创建新的虚拟环境&#xff1a; virtualenv myenv 3、激活虚拟环境&#xff08;激活环境可以在当前环境下安装包&#xff09; source myenv/bin/activate 此时&#xff0c;终端…...