当前位置: 首页 > news >正文

flink mysql数据表同步API CDC

概述:

CDC简介 Change Data Capture

API CDC同步数据代码

package com.yclxiao.flinkcdcdemo.api;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.yclxiao.flinkcdcdemo.util.JdbcUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;/*** league.oc_settle_profit -> cloud.dws_profit_record_hdj* API方式*/
public class Wfg2userApi {private static final Logger LOG = LoggerFactory.getLogger(Wfg2userApi.class);private static String MYSQL_HOST = "192.168.1.12";private static int MYSQL_PORT = 3306;private static String MYSQL_USER = "root";private static String MYSQL_PASSWD = "123456";private static String SYNC_DB = "zentao";private static List<String> SYNC_TABLES = Arrays.asList("zentao.zt_group");public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(MYSQL_HOST).port(MYSQL_PORT).databaseList(SYNC_DB) // set captured database.tableList(String.join(",", SYNC_TABLES)) // set captured table.username(MYSQL_USER).password(MYSQL_PASSWD).deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);env.enableCheckpointing(5000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "CDC Source" + Wfg2userApi.class.getName());List<String> tableList = getTableList();for (String tbl : tableList) {SingleOutputStreamOperator<String> filterStream = filterTableData(cdcSource, tbl);SingleOutputStreamOperator<String> cleanStream = clean(filterStream);SingleOutputStreamOperator<String> logicStream = logic(cleanStream);logicStream.addSink(new CustomDealDataSink());}env.execute(Wfg2userApi.class.getName());}private static class CustomDealDataSink extends RichSinkFunction<String> {private transient Connection cloudConnection;private transient PreparedStatement cloudPreparedStatement;private String insertSql = "INSERT INTO `zentao_zt_group` (`id`, `project`, `vision`, `name`, `role`, `desc`, `acl`, `developer`) \n" +"      VALUES (?, ?, ?, ?, ?, ?, ?, ?)";private String deleteSql = "delete from zentao_zt_group where id = '%s'";@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 在这里初始化 JDBC 连接cloudConnection = DriverManager.getConnection("jdbc:mysql://" + MYSQL_HOST + ":3306/wfg", "root", "123456");cloudPreparedStatement = cloudConnection.prepareStatement(insertSql);}@Overridepublic void invoke(String value, Context context) throws Exception {JSONObject dataJson = JSON.parseObject(value);Long id = dataJson.getLong("id");Integer project = dataJson.getInteger("project");String vision = dataJson.getString("vision");String name = dataJson.getString("name");String role = dataJson.getString("role");String desc = dataJson.getString("desc");String acl = dataJson.getString("acl");Integer developer = dataJson.getInteger("developer");cloudPreparedStatement.setLong(1, id);cloudPreparedStatement.setInt(2, project);cloudPreparedStatement.setString(3, vision);cloudPreparedStatement.setString(4, name);cloudPreparedStatement.setString(5, role);cloudPreparedStatement.setString(6, desc);cloudPreparedStatement.setString(7, acl);cloudPreparedStatement.setInt(8, developer);cloudPreparedStatement.execute(String.format(deleteSql, id));cloudPreparedStatement.execute();}@Overridepublic void close() throws Exception {super.close();// 在这里关闭 JDBC 连接cloudPreparedStatement.close();cloudConnection.close();}}/*** 处理逻辑:过滤掉部分数据** @param cleanStream* @return*/private static SingleOutputStreamOperator<String> logic(SingleOutputStreamOperator<String> cleanStream) {return cleanStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String data) throws Exception {try {
//                    JSONObject dataJson = JSON.parseObject(data);
//                    String id = dataJson.getString("id");
//                    Integer bizType = dataJson.getInteger("biz_type");
//                    if (StringUtils.isBlank(id) || bizType == null) {
//                        return false;
//                    }// 只处理上岗卡数据
//                    return bizType == 9;return true;} catch (Exception ex) {LOG.warn("filter other format binlog:{}", data);return false;}}});}/*** 清晰数据** @param source* @return*/private static SingleOutputStreamOperator<String> clean(SingleOutputStreamOperator<String> source) {return source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String row, Collector<String> out) throws Exception {try {LOG.info("============================row:{}", row);JSONObject rowJson = JSON.parseObject(row);String op = rowJson.getString("op");//history,insert,updateif (Arrays.asList("r", "c", "u").contains(op)) {out.collect(rowJson.getJSONObject("after").toJSONString());} else {LOG.info("filter other op:{}", op);}} catch (Exception ex) {LOG.warn("filter other format binlog:{}", row);}}});}/*** 过滤数据** @param source* @param table* @return*/private static SingleOutputStreamOperator<String> filterTableData(DataStreamSource<String> source, String table) {return source.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String row) throws Exception {try {JSONObject rowJson = JSON.parseObject(row);JSONObject source = rowJson.getJSONObject("source");String tbl = source.getString("table");return table.equals(tbl);} catch (Exception ex) {ex.printStackTrace();return false;}}});}private static List<String> getTableList() {List<String> tables = new ArrayList<>();String sql = "SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '" + SYNC_DB + "'";List<JSONObject> tableList = JdbcUtil.executeQuery(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);for (JSONObject jsob : tableList) {String schemaName = jsob.getString("TABLE_SCHEMA");String tblName = jsob.getString("TABLE_NAME");String schemaTbl = schemaName + "." + tblName;if (SYNC_TABLES.contains(schemaTbl)) {tables.add(tblName);}}return tables;}
}

相关文章:

flink mysql数据表同步API CDC

概述&#xff1a; CDC简介 Change Data Capture API CDC同步数据代码 package com.yclxiao.flinkcdcdemo.api;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.verv…...

AI大模型探索之路-训练篇21:Llama2微调实战-LoRA技术微调步骤详解

系列篇章&#x1f4a5; AI大模型探索之路-训练篇1&#xff1a;大语言模型微调基础认知 AI大模型探索之路-训练篇2&#xff1a;大语言模型预训练基础认知 AI大模型探索之路-训练篇3&#xff1a;大语言模型全景解读 AI大模型探索之路-训练篇4&#xff1a;大语言模型训练数据集概…...

如何使用client-go构建pod web shell

代码示例及原理 原理是利用websocket协议实现对pod的exec登录&#xff0c;利用client-go构造与远程apiserver的长连接&#xff0c;将对pod容器的输入和pod容器的输出重定向到我们的io方法中&#xff0c;从而实现浏览器端的虚拟终端的效果消息体结构如下 type Connection stru…...

AI工具摸索-关于写作(1)

虽然人工智能工具非常多,但是如果想要成为生产力,能达标的工具仍然非常少,除了最常用的chatgpt,其他的工具真的能达标吗,这篇文章主要就是对比市面上的一些工具&#xff0c; 但我这个人非常执拗,我认为作为生产力工具的功能必然是可以真正帮助我们的,而不是说作为一个写作工具结…...

昂科烧录器支持O2Micro凹凸科技的电池组管理IC OZ7708

芯片烧录行业领导者-昂科技术近日发布最新的烧录软件更新及新增支持的芯片型号列表&#xff0c;其中O2Micro凹凸科技的电池组管理IC OZ7708已经被昂科的通用烧录平台AP8000所支持。 OZ7708是一款高度集成、低成本的电池组管理IC&#xff0c;适用于5~8s Li-Ion/Polymer电池组&a…...

Spring Cloud Gateway详解

文章目录 Gateway搭建路由&#xff08;route&#xff09;断言&#xff08;Predicate &#xff09;自定义断言 过滤器&#xff08;filter&#xff09;自定义全局过滤器 引言 在传统的单体项目中&#xff0c;前端和后端的交互相对简单&#xff0c;只需通过一个调用地址即可实现。…...

信息系统项目管理师0103:初步可行性研究(7项目立项管理—7.2项目可行性研究—7.2.2初步可行性研究)

点击查看专栏目录 文章目录 7.2.2初步可行性研究1.初步可行性研究定义2.辅助研究的目的和作用3.初步可行性研究的作用4.初步可行性研究的主要内容记忆要点总结7.2.2初步可行性研究 1.初步可行性研究定义 初步可行性研究一般是在对市场或者客户情况进行调查后,对项目进行的初步…...

Linux 系统中,nl命令用于计算文件中的行号

在 Linux 系统中&#xff0c;nl命令用于计算文件中的行号。它可以将输出的文件内容自动加上行号&#xff0c;并且可以通过不同的选项来设置行号的显示方式&#xff0c;包括行号的位数、是否自动补齐 0 等。其命令格式为&#xff1a;nl(选项)…(文件)…。以下是一些常见的选项&a…...

知从科技战略客户经理张志强受邀出席2024 AutoSec中国汽车网络安全与数据安全峰会

4月11-12日&#xff0c;AutoSec8周年年会暨中国汽车网络安全及数据安全合规峰会在上海成功举办。此次峰会吸引了来自全球各地的头部汽车网络安全企业、OEM厂商、安全专家和学者等齐聚盛会&#xff0c;零距离共话智能网联汽车产业的新发展、新趋势。 知从科技董事长成云霞亲自带…...

2024.5.12 Pandas 基础语法day02

#describe()作用是计算出各个列的描述行统计量如平均数&#xff0c;方差&#xff0c;最大值&#xff0c;最小值&#xff0c;四分位数&#xff0c;返回类型是 #pandas.core.frame.DataFrame import pandas as pd df pd.read_csv("Nowcoder.csv") print(df.describe()…...

Stable Diffusion是什么?

目录 一、Stable Diffusion是什么&#xff1f; 二、Stable Diffusion的基本原理 三、Stable Diffusion有哪些运用领域&#xff1f; 一、Stable Diffusion是什么&#xff1f; Stable Diffusion是一个先进的人工智能图像生成模型&#xff0c;它能够根据文本描述创造出高质量的图…...

Netty源码分析二NioEventLoop 剖析

剖析方向 NioEventLoop是一个重量级的类&#xff0c;其中涉及到的方法都有很复杂的继承关系&#xff0c;调用链&#xff0c;要想把源码全部过一遍工作量实在是太大了&#xff0c;于是小编就基于下面的这些常见的问题来对NioEventLoop的源码来进行剖析 1.Seletor何时创建 1.1Se…...

chatGLM或chatgpt:什么是tokens以及如何计算tokens长度?

token是什么? 简单的来说tokens就是大语言模型输入的向量数据,它是从原始的文本转化而来。 比如 输入:here is a text demo tokens为:[64790, 64792, 985, 323, 260, 2254, 16948] 解码:将tokens转化为文本 [‘[gMASK]’, ‘sop’, ‘▁here’, ‘▁is’, ‘▁a’, ‘▁…...

springcloudalibaba版本发布说明

版本发布说明 | https://sca.aliyun.com 2.2.x 分支 适配 Spring Boot 为 2.4&#xff0c;Spring Cloud Hoxton 版本及以下的 Spring Cloud Alibaba 版本按从新到旧排列如下表&#xff08;最新版本用*标记&#xff09;&#xff1a; Spring Cloud Alibaba VersionSpring Cloud…...

Obsidian/Typora设置图床

在obsidian中默认图片是保存在本地的&#xff0c;但是在要导出文档上传到网上时&#xff0c;由于图片保存在本地&#xff0c;会出现无法加载图片的问题。 这里引用的一段话&#xff1a; 这里使用picgo-core和gitee实现图床功能&#xff0c; 参考1&#xff1a; Ubuntu下PicGO配…...

【RAG论文】RAG中半结构化数据的解析和向量化方法

论文简介 论文题目&#xff1a; 《A Method for Parsing and Vectorization of Semi-structured Data used in Retrieval Augmented Generation》 论文链接&#xff1a; https://arxiv.org/abs/2405.03989 代码: https://github.com/linancn/TianGong-AI-Unstructure/tree/m…...

git提交代码异常报错error:bad signature 0x00000000

报错信息 error:bad signature 0x00000000 异常原因 git 提交过程中异常关机或重启&#xff0c;造成当前项目工程中的.git/index 文件损坏&#xff0c;无法提交 解决步骤 删除.git/index文件 rm -f .git/index 重启git git reset...

【FFmpeg】调用ffmpeg库进行RTMP推流和拉流

【FFmpeg】调用ffmpeg库实现RTMP推流 1.FFmpeg编译2.RTMP服务器搭建3.调用FFmpeg库实现RTMP推流和拉流3.1 基本框架3.2 实现代码3.3 测试3.3.1 推流3.3.2 拉流 参考&#xff1a;雷霄骅博士, 调用ffmpeg库进行RTMP推流 示例工程 【FFmpeg】调用FFmpeg库实现264软编 【FFmpeg】…...

Multisim 14 常见电子仪器的使用和Multisim的使用

multisim multisim&#xff0c;即电子电路仿真设计软件。Multisim是美国国家仪器&#xff08;NI&#xff09;有限公司推出的以Windows为基础的仿真工具&#xff0c;适用于板级的模拟/数字电路板的设计工作。它包含了电路原理图的图形输入、电路硬件描述语言输入方式&#xff0…...

【2024高校网络安全管理运维赛】巨细记录!

2024高校网络安全管理运维赛 文章目录 2024高校网络安全管理运维赛MISC签到考点&#xff1a;动态图片分帧提取 easyshell考点&#xff1a;流量分析 冰蝎3.0 Webphpsql考点&#xff1a;sql万能钥匙 fileit考点&#xff1a;xml注入 外带 Cryptosecretbit考点&#xff1a;代码阅读…...

2026免费降AI率工具Top10:一键去机味 首选这款稳过检测

现在写论文用AI辅助早已是常态&#xff0c;但随之而来的AIGC检测卡得越来越严&#xff0c;熬了好几天改出来的稿子要是被判定AI率超标&#xff0c;打回重写都是轻的&#xff0c;耽误答辩进度才最让人头疼。 所以降AI、降低AI率已经成了毕业生的必备技能&#xff0c;只是市面上…...

多模态场景:头巾误判为厨师帽 — 问题分析与调优指南

多模态场景&#xff1a;头巾误判为厨师帽 — 问题分析与调优指南适用对象&#xff1a;使用 Qwen-VL 等多模态大模型做「厨师帽 / 头饰」相关识别时的面试问答、方案设计与落地调优参考。1. 问题本质&#xff1a;为什么会把头巾当成厨师帽 这通常不是「模型坏了」&#xff0c;而…...

AI-AGENT概念解析 - LLM部署文件

**问题&#xff1a;那一个下载到本地的大模型中&#xff0c;包括哪些文件&#xff0c;各有什么功能和作用&#xff0c;不同的大模型&#xff0c;包括的文件应该是不一样的。 大家会很自然地问到&#xff1a;下载到本地的大模型文件夹里到底有哪些文件&#xff1f;不同模型的文件…...

Nuki:多芯片组合,覆盖全场景需求

当下“以家庭为中心”的生活趋势&#xff0c;推动了智能家居需求激增&#xff0c;智能门禁作为家庭安全与便捷的核心&#xff0c;却因传统门锁适配性差、智能锁安装繁琐等问题发展受限&#xff0c;设备制造商亟需能简化无线开发、提升能效且满足安全认证的解决方案&#xff0c;…...

IP-Adapter-FaceID在社交媒体中的应用:内容创作与分享

IP-Adapter-FaceID在社交媒体中的应用&#xff1a;内容创作与分享 【免费下载链接】IP-Adapter-FaceID 项目地址: https://ai.gitcode.com/hf_mirrors/h94/IP-Adapter-FaceID IP-Adapter-FaceID是一款基于Stable Diffusion的AI人脸生成工具&#xff0c;它通过面部识别模…...

01_第一篇:到底什么是嵌入式芯片?与通用CPU_GPU_DSP的核心区别

嵌入式芯片入门&#xff1a;到底什么是嵌入式芯片&#xff1f;与通用CPU/GPU/DSP的核心区别 引言&#xff1a;智能时代的核心基石&#xff0c;嵌入式芯片的无处不在 在万物互联的智能时代&#xff0c;我们的生活早已被无数“隐形大脑”环绕&#xff1a;清晨唤醒你的智能手环、出…...

新手零基础入门:通过快马生成burpsuite超详细安装图解教程

作为一名网络安全新手&#xff0c;第一次接触BurpSuite时确实容易被各种专业术语和复杂的安装步骤吓到。今天我就用最直白的方式&#xff0c;手把手带你完成BurpSuite的安装&#xff0c;让你轻松迈出Web安全测试的第一步。 什么是BurpSuite&#xff1f;为什么需要它&#xff1…...

快速部署Python3.10环境:Miniconda镜像实战教学

快速部署Python3.10环境&#xff1a;Miniconda镜像实战教学 1. 为什么选择Miniconda搭建Python环境&#xff1f; 在Python开发中&#xff0c;最让人头疼的问题之一就是环境管理。不同项目可能需要不同版本的Python和依赖库&#xff0c;直接安装会导致版本冲突。Miniconda提供…...

Phi-4-mini-reasoning推理模型5分钟快速上手:数学题逻辑题一键解答

Phi-4-mini-reasoning推理模型5分钟快速上手&#xff1a;数学题逻辑题一键解答 1. 为什么选择Phi-4-mini-reasoning&#xff1f; 如果你经常需要解决数学题、逻辑题或者需要一步步分析的问题&#xff0c;Phi-4-mini-reasoning就是为你量身定制的AI助手。这个模型不像那些通用…...

Unity UGUI实战:手把手教你打造一个可拖拽、可弯曲的UI连线组件(附完整源码)

Unity UGUI实战&#xff1a;打造可拖拽、可弯曲的智能连线系统 在游戏开发中&#xff0c;可视化连接系统是构建技能树、流程图、科技树等复杂UI结构的核心组件。传统实现往往局限于静态线条或简单的直线连接&#xff0c;缺乏交互性和动态美感。本文将带你从零构建一个支持实时拖…...