当前位置: 首页 > news >正文

SpringBoot集成Flink-CDC

Flink CDC

CDC相关介绍

CDC是什么?

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到MQ以供其他服务进行订阅及消费

CDC分类

CDC主要分为基于查询基于Binlog

基于查询基于Binlog
开源产品Sqoop、DataXCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力

基于查询的都是Batch模式(即数据到达一定量后/一定时间才行会执行), 同时也因为这种模式, 那么延迟是必然高的, 而基于Streaming则是可以做到按条的粒度, 每条数据发生变化, 那么就会监听到

Flink CDC

Flink社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL 等数据库直接读取全量数据增量变更数据的source组件。

目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors

Java中集成Flink CDC

MySQL相关设置

执行初始化SQL数据
-- 创建whitebrocade数据库
DROP DATABASE IF EXISTS whitebrocade;
CREATE DATABASE whitebrocade;
USER whitebrocade;
-- 创建student表
CREATE TABLE `student` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,`description` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4-- 插入数据
INSERT INTO `whitebrocade`.`student`(`id`, `name`, `description`) VALUES (1, '小牛马', '我是小牛马');
INSERT INTO `whitebrocade`.`student`(`id`, `name`, `description`) VALUES (2, '中牛马', '我是中牛马');
开启Binlog

通常来说默认安装MySQL的cnf都是存在/etc下的

sudo vim /etc/my.cnf
# 添加如下配置信息,开启`test`以及`test_route`数据库的Binlog
# 数据库id
server-id = 1
# 时区, 如果不修改数据库时区, 那么Flink MySQL CDC无法启动
default-time-zone = '+8:00'
# 启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
# binlog类型,maxwell要求为row类型
binlog_format=row
# 启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=whitebrocade
修改数据库时区

永久修改, 那么就修改my.cnf配置(刚刚配置已经修改了, 记得重启即可)

default-time-zone = '+8:00'

临时修改(重启会丢失)

# MySQL 8 执行这个
set persist time_zone='+8:00';# MySQL 5.x版本执行这个
set time_zone='+8:00';
重启MySQL

注意了, 设置后需要重启MySQL!

service mysqld restart

代码(直接处理BaseLogHander或者kafka间接处理)

pom依赖
<properties><java.version>11</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.6.13</spring-boot.version><!-- 这里的依赖版本不要删除, 比如说es, easy-es的, 下边的案例会使用到 --><es.vsersion>7.12.0</es.vsersion><easy-es.vsersion>2.0.0</easy-es.vsersion><flink.version>1.19.0</flink.version><kafka-clients.version>3.8.0</kafka-clients.version>
</properties>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- hutool --><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.32</version></dependency><!-- Flink CDC依赖 start--><!-- Flink核心依赖, 提供了Flink的核心API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><!--  Flink流处理Java API依赖对于引入Scala还是Java, 参考下面这篇博客: https://developer.aliyun.com/ask/526584--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!-- Flink客户端工具依赖, 包含命令行界面和实用函数 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Flink连接器基础包, 包含连接器公共功能 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><!-- Flink Kafka连接器, 用于和Apache Kafka集成, 注意kafka软件和这个依赖的版本问题, 可能会抱错, 报错参考以下博客方式进行解决版本集成问题: 参考博客 https://blog.csdn.net/qq_34526237/article/details/130968153https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/configuration/overview/https://blog.csdn.net/weixin_55787608/article/details/141436268https://www.cnblogs.com/qq1035807396/p/16227816.htmlhttps://blog.csdn.net/g5guj/article/details/137229597https://blog.csdn.net/x950913/article/details/108249507--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.2.0-1.19</version><exclusions><!-- 排除掉kafka client, 用自己指定的kafka client, 可能会因为kafka太新, 导致的版本不兼容 --><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><!-- kafka client --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka-clients.version}</version></dependency><!-- Flink Table Planner, 用于Table API和SQL的执行计划生成 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API桥接器, 连接DataStream API和Table API --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!-- Flink JSON格式化数据依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- 开启Web UI支持, 端口为8081, 默认为不开启--><!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>1.19.1</version></dependency>--><!-- MySQL CDC依赖org.apache.flink的适用MySQL 8.0具体参照这篇博客 https://blog.csdn.net/kakaweb/article/details/129441408https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/--><dependency><!--MySQL 8.0适用--><!--<groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>3.1.0</version>--><!-- MySQL 5.7适用 , 2.3.0, 3.0.1均可用 --><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version><!-- <version>3.0.1</version> --></dependency><!-- gson工具类 --><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.11.0</version></dependency><!-- ognl表达式 --><dependency><groupId>ognl</groupId><artifactId>ognl</artifactId><version>3.1.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.31</version></dependency>
</dependencies>
yaml
# 应用服务 WEB 访问端口
server:port: 9999# Flink CDC相关配置
flink-cdc:cdcConfig:parallelism: 1enableCheckpointing: 5000mysqlConfig:sourceName: mysql-sourcejobName: mysql-stream-cdchostname: 192.168.132.101port: 3306username: rootpassword: 12345678databaseList: whitebrocadetableList: whitebrocade.studentincludeSchemaChanges: falsekafkaConfig:sourceName: kafka-sourcejobName: kafka-stream-cdcbootstrapServers: localhost:9092groupId: test_grouptopics: test_topic
FlinkCDCConfig
/*** @author whiteBrocade* @version 1.0* @description: Flink CDC配置*/
@Data
@Configuration
@ConfigurationProperties("flink-cdc")
public class FlinkCDCConfig {private CdcConfig cdcConfig;private MysqlConfig mysqlConfig;private KafkaConfig kafkaConfig;@Datapublic static class CdcConfig {/*** 并行度*/private Integer parallelism;/*** 检查点间隔, 单位毫秒*/private Integer enableCheckpointing;}@Datapublic static class MysqlConfig {/*** MySQL数据源名称*/private String sourceName;/*** JOB名称*/private String jobName;/*** 数据库地址*/private String hostname;/*** 数据库端口*/private Integer port;/*** 数据库用户名*/private String username;/*** 数据库密码*/private String password;/*** 数据库名*/private String[] databaseList;/*** 表名*/private String[] tableList;/*** 是否包含schema变更*/private Boolean includeSchemaChanges;}@Datapublic static class KafkaConfig {/*** Kafka数据源名称*/private String sourceName;/*** JOB名称*/private String jobName;/*** kafka地址*/private String bootstrapServers;/*** 消费组id*/private String groupId;/*** kafka主题*/private String topics;}
}
相关枚举
OperatorTypeEnum
/*** @author whiteBrocade* @version 1.0* @description 操作类型枚举*/
@Getter
@AllArgsConstructor
public enum OperatorTypeEnum {/*** 新增*/INSERT(1),/*** 修改*/UPDATE(2),/*** 删除*/DELETE(3),;/*** 类型*/private final int type;/*** 根据type获取枚举** @param type 类型* @return OperatorTypeEnum*/public static OperatorTypeEnum getEnumByType(int type) {for (OperatorTypeEnum operatorTypeEnum : OperatorTypeEnum.values()) {if (operatorTypeEnum.getType() == type) {return operatorTypeEnum;}}throw new RuntimeException(StrUtil.format("未找到type={}的OperatorTypeEnum", type));}
}
StrategyEnum
/*** @author whiteBrocade* @version 1.0* @description MySql处理策略枚举* todo 后续在这里新增相关枚举即可*/
@Getter
@AllArgsConstructor
public enum MySqlStrategyEnum {/*** Student处理策略*/STUDENT(Student.class.getSimpleName(), Student.class, Introspector.decapitalize(StudentLogHandler.class.getSimpleName())),;/*** 表名*/private final String tableName;/*** class对象*/private final Class<?> varClass;/*** MySql处理器名*/private final String mySqlHandlerName;/*** 策略选择器, 根据传入的 DataChangeInfo 对象中的 tableName 属性, 从一系列预定义的策略 (StrategyEnum) 中选择一个合适的处理策略, 并封装进 StrategyHandleSelector 对象中返回** @param mySqlDataChangeInfo 数据变更对象* @return StrategyHandlerSelector*/public static MySqlStrategyHandleSelector getSelector(MySqlDataChangeInfo mySqlDataChangeInfo) {Assert.notNull(mySqlDataChangeInfo, "MySqlDataChangeInfo不能为null");String tableName = mySqlDataChangeInfo.getTableName();MySqlStrategyHandleSelector selector = new MySqlStrategyHandleSelector();// 遍历所有的策略枚举(StrategyEnum), 寻找与当前表名相匹配的策略for (MySqlStrategyEnum mySqlStrategyEnum : values()) {// 如果找到匹配的策略, 创建并配置 StrategyHandleSelectorif (mySqlStrategyEnum.getTableName().equalsIgnoreCase(tableName)) {selector.setMySqlHandlerName(mySqlStrategyEnum.mySqlHandlerName);selector.setOperatorTime(mySqlDataChangeInfo.getOperatorTime());selector.setOperatorType(mySqlDataChangeInfo.getOperatorType());JSONObject jsonObject = JSONUtil.parseObj(mySqlDataChangeInfo.getData());selector.setData(BeanUtil.copyProperties(jsonObject, mySqlStrategyEnum.varClass));return selector;}}throw new RuntimeException(StrUtil.format("没有找到的表名={}绑定的StrategyHandleSelector", tableName));}
}
model
Student
/*** @author whiteBrocade* @version 1.0* @description 学生类, 用于演示*/
@Data
public class Student {/*** id*/private Long id;/*** 姓名*/private String name;/*** 描述*/private String description;
}
MySqlDataChangeInfo
/*** @author whiteBrocade* @version 1.0* @description MySQL数据变更对象*/
@Data
public class MySqlDataChangeInfo implements Serializable {/*** 变更前数据*/private String beforeData;/*** 变更后数据*/private String afterData;/*** 操作的数据*/private String data;/*** 变更类型 1->新增 2->修改 3->删除*/private Integer operatorType;/*** binlog文件名*/private String fileName;/*** binlog当前读取点位*/private Integer filePos;/*** 数据库名*/private String database;/*** 表名*/private String tableName;/*** 变更时间*/private Long operatorTime;
}
MySqlStrategyHandleSelector
/*** @author whiteBrocade* @version 1.0* @description 策略处理选择器*/
@Data
public class MySqlStrategyHandleSelector {/*** MySql策略处理器名称, 当mySql的binLog变化时候如何处理, 就会调用对应的处理器进行处理*/private String mySqlHandlerName;/*** 数据源*/private Object data;/*** 操作时间*/private Long operatorTime;/*** 操作类型*/private Integer operatorType;
}
自定义Sink
LogSink
/*** @author whiteBrocade* @description: 日志算子*/
@Slf4j
@Service
public class LogSink extends RichSinkFunction<MySqlDataChangeInfo> implements Serializable {@Overridepublic void invoke(MySqlDataChangeInfo mySqlDataChangeInfo, Context context) throws Exception {log.info("MySQL数据变化对象: {}", JSONUtil.toJsonStr(mySqlDataChangeInfo));}
}
CustomMySqlSink
/*** @author whiteBrocade* @version 1.0* @description 自定义Sink算子, 这个是根据ognl表达式区分ddl语句类型, 搭配*/
@Slf4j
@Component
public class CustomMySqlSink extends RichSinkFunction<String> {@Overridepublic void invoke(String json, Context context) throws Exception {// op字段:  该字段也有4种取值,分别是C(create)、U(Update)、D(Delete)、Read// 对于U操作,其数据部分同时包含了Before和After。log.info("监听到数据: {}", json);String op = JSONUtil.getValue(json, "op", String.class);// 语句的idInteger id = null;// 如果是update语句if ("u".equals(op)) {id = JSONUtil.getValue(json, "after.id", Integer.class);log.info("执行update语句");// 执行update语句}// 如果是delete语句if ("d".equals(op)) {id = JSONUtil.getValue(json, "before.id", Integer.class);log.info("执行delete语句");// 执行删除语句}// 如果是新增if ("c".equals(op)) {log.info("执行insert语句");}}// 前置操作@Overridepublic void open(OpenContext openContext) throws Exception {super.open(openContext);}// 后置操作@Overridepublic void close() throws Exception {super.close();}
}
MySqlDataChangeSink
/*** @author whiteBrocade* @version 1.0* @description Mysql变更Sink算子*/
@Slf4j
@Component
@AllArgsConstructor
public class MySqlDataChangeSink extends RichSinkFunction<MySqlDataChangeInfo> implements Serializable {/*** BaseLogHandler相关的缓存* Spring自动将相关BaseLogHandler的Bean注入注入到本地缓存Map中*/private final Map<String, BaseLogHandler> strategyHandlerMap;/*** 数据处理逻辑*/@Override@SneakyThrowspublic void invoke(MySqlDataChangeInfo mySqlDataChangeInfo, Context context) {log.info("收到变更原始数据:{}", mySqlDataChangeInfo);// 选择策略MySqlStrategyHandleSelector selector = MySqlStrategyEnum.getSelector(mySqlDataChangeInfo);Assert.notNull("MySqlStrategyHandleSelector不能为空");BaseLogHandler<Object> handler = strategyHandlerMap.get(selector.getMySqlHandlerName());Integer operatorType = selector.getOperatorType();OperatorTypeEnum operatorTypeEnum = OperatorTypeEnum.getEnumByType(operatorType);switch (operatorTypeEnum) {case INSERT:// insert操作handler.handleInsertLog(selector.getData(), selector.getOperatorTime());break;case UPDATE:// update操作handler.handleUpdateLog(selector.getData(), selector.getOperatorTime());break;case DELETE:// delete操作handler.handleDeleteLog(selector.getData(), selector.getOperatorTime());break;default:throw new RuntimeException("不支持的操作类型");}}/*** 写入逻辑*/@Override@SneakyThrowspublic void writeWatermark(Watermark watermark) {log.info("触发了写入逻辑writeWatermark");super.writeWatermark(watermark);}/*** 开始*/@Override@SneakyThrowspublic void open(OpenContext openContext) {log.info("触发了开始逻辑open");super.open(openContext);}/*** 结束*/@Override@SneakyThrowspublic void finish() {log.info("触发了结束逻辑finish");super.finish();}
}
MySqlChangeInfoKafkaProducerSink
/*** @author whiteBrocade* @version 1.0* @description Kafka队列中MySQL消息变更Sink*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MySqlChangeInfoKafkaProducerSink {/*** Flink相关配置*/private final FlinkCDCConfig flinkCDCConfig;/*** 自定义kafKA序列化处理器*/private final KafkaSerializer kafkaSerializer;/*** 获取kafka生产者算子*/public KafkaSink<MySqlDataChangeInfo> getKafkaProducerSink() {FlinkCDCConfig.KafkaConfig kafkaConfig = flinkCDCConfig.getKafkaConfig();kafkaSerializer.setTopic(kafkaConfig.getTopics());// 创建KafkaSink算子KafkaSink<MySqlDataChangeInfo> kafkaProducerSink = KafkaSink.<MySqlDataChangeInfo>builder()// 设置集群地址.setBootstrapServers(kafkaConfig.getBootstrapServers())// 设置事务前缀.setTransactionalIdPrefix("Kafka_Transactional_" + kafkaConfig.getTopics() + IdUtil.getSnowflakeNextIdStr()).setRecordSerializer(kafkaSerializer)// 设置传递保证// At Most Once (至多一次): 系统保证消息要么被成功传递一次,要么根本不被传递。这种保证意味着消息可能会丢失,但不会被传递多次// At Least Once (至少一次): 系统保证消息至少会被传递一次,但可能会导致消息的重复传递。这种保证确保了消息的不丢失,但应用可能会多次消费, 需要自己实现幂等// Exactly Once (精确一次): 系统保证消息会被确切地传递一次,而没有任何重复。这是最高级别的传递保证,确保消息不会丢失且不会多次消费.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 设置kafka各种参数// .setKafkaProducerConfig(properties)/*sinkProducer的超时时间默认为1个小时,但是kafka broker的超时时间默认是15分钟, kafka broker不允许producer的超时时间比他大所以有两种解决办法:1.生产者的超时时间调小2.将broker的超时时间调大这里选择方案一, 将生产者时间调小, 将kafka producer的超时时间调至和broker一致即可参考博客 https://blog.csdn.net/LangLang1111111/article/details/121395831https://blog.csdn.net/weixin_64261178/article/details/140298696*/.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, String.valueOf(15 * 60 * 1000)).build();return kafkaProducerSink;}
MySqlChangeInfoKafkaConsumerSink
/*** @author whiteBrocade* @description: 自定义 MySqlChangeInfo kafka消费者sink*/
@Slf4j
@Service
public class MySqlChangeInfoKafkaConsumerSink  extends RichSinkFunction<MySqlDataChangeInfo> implements Serializable {/*** 数据处理逻辑*/@Override@SneakyThrowspublic void invoke(MySqlDataChangeInfo mySqlDataChangeInfo, Context context) {log.info("正在消费kafka数据:{}", JSONUtil.toJsonStr(mySqlDataChangeInfo));}
}
序列化器和反序列化器
KafkaDeserializer
/*** @author whiteBrocade* @description: 自定义kafka反序列化器*/
@Slf4j
@Service
public class KafkaDeserializer implements KafkaRecordDeserializationSchema<MySqlDataChangeInfo> {@Overridepublic void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<MySqlDataChangeInfo> collector) {String valueJsonStr = new String(record.value(), StandardCharsets.UTF_8);// log.info("反序列化前kafka数据: {}", valueJsonStr);MySqlDataChangeInfo mySqlDataChangeInfo = JSONUtil.toBean(valueJsonStr, MySqlDataChangeInfo.class);collector.collect(mySqlDataChangeInfo);}@Overridepublic TypeInformation<MySqlDataChangeInfo> getProducedType() {return TypeInformation.of(MySqlDataChangeInfo.class);}
}
KafkaSerializer
/*** @author whiteBrocade* @version 1.0* @description: kafka消息 自定义序列化器*/
@Slf4j
@Setter
@Service
public class KafkaSerializer implements KafkaRecordSerializationSchema<MySqlDataChangeInfo> {/*** 主体名称*/private String topic;/*** 序列化*/@Nullable@Overridepublic ProducerRecord<byte[], byte[]> serialize(MySqlDataChangeInfo mySqlDataChangeInfo, KafkaSinkContext context, Long timestamp) {Assert.notNull(topic, "必须指定发送的topic");String jsonStr = JSONUtil.toJsonStr(mySqlDataChangeInfo);log.info("投递kafka到topic={}的数据key: {}, value:", topic, jsonStr);return new ProducerRecord<>(topic, jsonStr.getBytes());}@Overridepublic void open(SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext) throws Exception {KafkaRecordSerializationSchema.super.open(context, sinkContext);}
}
MySqlDeserializer
/*** @author whiteBrocade* @version 1.0* @description L自定义MySQ反序列化器*/
@Slf4j
@Service
public class MySqlDeserializer implements DebeziumDeserializationSchema<MySqlDataChangeInfo> {public static final String TS_MS = "ts_ms";public static final String BIN_FILE = "file";public static final String POS = "pos";public static final String CREATE = "CREATE";public static final String BEFORE = "before";public static final String AFTER = "after";public static final String SOURCE = "source";public static final String UPDATE = "UPDATE";/*** 反序列化数据, 转为变更JSON对象** @param sourceRecord SourceRecord* @param collector Collector<DataChangeInfo>*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<MySqlDataChangeInfo> collector) {try {// 根据主题的格式,获取数据库名(database)和表名(tableName)String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct struct = (Struct) sourceRecord.value();final Struct source = struct.getStruct(SOURCE);MySqlDataChangeInfo mySqlDataChangeInfo = new MySqlDataChangeInfo();// 获取操作类型  CREATE UPDATE DELETEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);String type = operation.toString().toUpperCase();int eventType = type.equals(CREATE) ? OperatorTypeEnum.INSERT.getType() : UPDATE.equals(type) ?OperatorTypeEnum.UPDATE.getType() : OperatorTypeEnum.DELETE.getType();// 一般情况是无需关心其之前之后数据的, 直接获取最新的日志数据即可, 但这里为了演示, 都进行输出// 获取变更前和变更后的数据,并将其设置到DataChangeInfo对象中mySqlDataChangeInfo.setBeforeData(this.getJsonObject(struct, BEFORE).toJSONString());mySqlDataChangeInfo.setAfterData(this.getJsonObject(struct, AFTER).toJSONString());if (eventType == OperatorTypeEnum.DELETE.getType()) {mySqlDataChangeInfo.setData(this.getJsonObject(struct, BEFORE).toJSONString());} else {mySqlDataChangeInfo.setData(this.getJsonObject(struct, AFTER).toJSONString());}mySqlDataChangeInfo.setOperatorType(eventType);mySqlDataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));mySqlDataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x -> Integer.parseInt(x.toString())).orElse(0));mySqlDataChangeInfo.setDatabase(database);mySqlDataChangeInfo.setTableName(tableName);mySqlDataChangeInfo.setOperatorTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));// 输出数据collector.collect(mySqlDataChangeInfo);} catch (Exception e) {log.error("反序列binlog失败", e);}}/*** 从源数据获取出变更之前或之后的数据** @param value Struct* @param fieldElement 字段* @return JSONObject*/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element = value.getStruct(fieldElement);JSONObject jsonObject = new JSONObject();if (element != null) {Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for (Field field : fieldList) {Object afterValue = element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}@Overridepublic TypeInformation<MySqlDataChangeInfo> getProducedType() {return TypeInformation.of(MySqlDataChangeInfo.class);}
}
LogHandler
BaseLogHandler
/*** @author whiteBrocade* @version 1.0* @description 日志处理器* todo 新建一个类实现该BaseLogHandler类, 添加相应的处理逻辑即可, 可参考StudentLogHandler实现*/
public interface BaseLogHandler<T> extends Serializable {/*** 日志处理** @param data 数据转换后模型* @param operatorTime 操作时间*/void handleInsertLog(T data, Long operatorTime);/*** 日志处理** @param data 数据转换后模型* @param operatorTime 操作时间*/void handleUpdateLog(T data, Long operatorTime);/*** 日志处理** @param data 数据转换后模型* @param operatorTime 操作时间*/void handleDeleteLog(T data, Long operatorTime);
}
StudentLogHandler
/*** @author whiteBrocade* @version 1.0* @description Student对应处理器*/
@Slf4j
@Service
@RequiredArgsConstructor
public class StudentLogHandler implements BaseLogHandler<Student> {@Overridepublic void handleInsertLog(Student student, Long operatorTime) {log.info("处理Student表的新增日志: {}", student);}@Overridepublic void handleUpdateLog(Student student, Long operatorTime) {log.info("处理Student表的修改日志: {}", student);}@Overridepublic void handleDeleteLog(Student student, Long operatorTime) {log.info("处理Student表的删除日志: {}", student);}
}
JOB
MySqlDataChangeJob
/*** @author whiteBrocade* @version 1.0* @description MySQL数据变更 JOb*/
@Slf4j
@Component
@AllArgsConstructor
public class MySqlDataChangeJob {/*** Flink CDC相关配置*/private final FlinkCDCConfig flinkCDCConfig;/*** 自定义Sink算子* customSink: 通过ognl解析ddl语句类型* dataChangeSink: 通过struct解析ddl语句类型* kafkaSink: 将MySQL变化投递到Kafka* 通常两个选择一个就行*/private final CustomMySqlSink customMySqlSink;private final MySqlDataChangeSink mySqlDataChangeSink;private final MySqlChangeInfoKafkaProducerSink mysqlChangeInfoKafkaProducerSink;private final LogSink logSink;/*** 自定义MySQL反序列化处理器*/private final MySqlDeserializer mySqlDeserializer;/*** 启动Job*/@SneakyThrowspublic void startJob() {log.info("---------------- MySqlDataChangeJob 开始启动 ----------------");FlinkCDCConfig.CdcConfig cdcConfig = flinkCDCConfig.getCdcConfig();FlinkCDCConfig.MysqlConfig mysqlConfig = flinkCDCConfig.getMysqlConfig();// DataStream API执行模式包括:// 流执行模式(Streaming):用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式// 批执行模式(Batch):专门用于批处理的执行模式// 自动模式(AutoMatic):由程序根据输入数据源是否有界,来自动选择是流处理还是批处理执行// 执行模式选择,可以通过命令行方式配置:StreamExecutionEnvironment mySqlEnv = this.buildStreamExecutionEnvironment();// 这里选择自动模式mySqlEnv.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// todo 下列的两个MySqlSource选择一个// 自定义的反序列化器MySqlSource<MySqlDataChangeInfo> mySqlSource = this.buildBaseMySqlSource(MySqlDataChangeInfo.class).deserializer(mySqlDeserializer).build();// Flink CDC自带的反序列化器// MySqlSource<String> mySqlSource = this.buildBaseMySqlSource(String.class)//     .deserializer(new JsonDebeziumDeserializationSchema())//     .build();// 从MySQL源中读取数据DataStreamSource<MySqlDataChangeInfo> mySqlDataStreamSource = mySqlEnv.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),mysqlConfig.getSourceName())// 设置该数据源的并行度.setParallelism(cdcConfig.getParallelism());// 添加一个日志sink, 用于观察mySqlDataStreamSource.addSink(logSink);// 添加sink算子mySqlDataStreamSource// todo 根据上述的选择,选择对应的Sink算子// .addSink(customMySqlSink)// .addSink(mySqlDataChangeSink); // 添加Sink, 这里配合mySQLDeserialization+dataChangeSink.sinkTo(mysqlChangeInfoKafkaProducerSink.getKafkaProducerSink()); // 将MySQL的数据变化投递到Kafka中// 启动服务// execute和executeAsync启动方式对比: https://blog.csdn.net/llg___/article/details/133798713mySqlEnv.executeAsync(mysqlConfig.getJobName());log.info("---------------- MySqlDataChangeJob 启动完毕 ----------------");}/*** 构建流式执行环境** @return StreamExecutionEnvironment*/private StreamExecutionEnvironment buildStreamExecutionEnvironment() {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FlinkCDCConfig.CdcConfig cdcConfig = flinkCDCConfig.getCdcConfig();// 设置整个Flink程序的默认并行度env.setParallelism(cdcConfig.getParallelism());// 设置checkpoint 间隔env.enableCheckpointing(cdcConfig.getEnableCheckpointing());// 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);return env;}/*** 构建基本的MySqlSourceBuilder** @param clazz 返回的数据类型Class对象* @param <T>   源数据中存储的类型* @return MySqlSourceBuilder*/private <T> MySqlSourceBuilder<T> buildBaseMySqlSource(Class<T> clazz) {FlinkCDCConfig.MysqlConfig mysqlConfig = flinkCDCConfig.getMysqlConfig();return MySqlSource.<T>builder().hostname(mysqlConfig.getHostname()).port(mysqlConfig.getPort()).username(mysqlConfig.getUsername()).password(mysqlConfig.getPassword()).databaseList(mysqlConfig.getDatabaseList()).tableList(mysqlConfig.getTableList())/* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)* latest: 只进行增量导入(不读取历史变化)* timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)*/.startupOptions(StartupOptions.latest()).includeSchemaChanges(mysqlConfig.getIncludeSchemaChanges()) // 包括schema的改变.serverTimeZone("GMT+8"); // 时区}
}
KafkaMySqlDataChangeJob
/*** @author whiteBrocade* @version 1.0* @description kafka接受 MySQL数据变更 JOb*/
@Slf4j
@Component
@AllArgsConstructor
public class KafkaMySqlDataChangeJob {/*** Flink CDC相关配置*/private final FlinkCDCConfig flinkCDCConfig;/*** 自定义kafKA序列化处理器*/private final KafkaSerializer kafkaSerializer;/*** 自定义Kafka反序列化处理器*/private final KafkaDeserializer kafkaDeserializer;/*** 自定义 MySqlChangeInfo kafka消费者sink*/private final MySqlChangeInfoKafkaConsumerSink mySqlChangeInfoKafkaConsumerSink;@SneakyThrowspublic void startJob() {log.info("---------------- KafkaMySqlDataChangeJob 开始启动 ----------------");FlinkCDCConfig.KafkaConfig kafkaConfig = flinkCDCConfig.getKafkaConfig();StreamExecutionEnvironment kafkaEnv = this.buildStreamExecutionEnvironment();// 创建kafka数据源KafkaSource<MySqlDataChangeInfo> kafkaSource = this.buildBaseKafkaSource(MySqlDataChangeInfo.class)// 1. 自定义反序列化器.setDeserializer(kafkaDeserializer)// 2. 使用Kafka 提供的解析器处理// .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))// 3. 只设置kafka的value反序列化// .setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<MySqlDataChangeInfo> kafkaDataStreamSource = kafkaEnv.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),kafkaConfig.getSourceName());// 添加消费组算子进行数据处理kafkaDataStreamSource.addSink(mySqlChangeInfoKafkaConsumerSink);// 启动服务// 启动报错java.lang.NoSuchMethodError: org.apache.kafka.clients.admin.DescribeTopicsResult.allTopicNames 参考博客 https://www.cnblogs.com/yeyuzhuanjia/p/18254652kafkaEnv.executeAsync(kafkaConfig.getJobName());log.info("---------------- KafkaMySqlDataChangeJob 启动完毕 ----------------");}/*** 构建流式执行环境** @return StreamExecutionEnvironment*/private StreamExecutionEnvironment buildStreamExecutionEnvironment() {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FlinkCDCConfig.CdcConfig cdcConfig = flinkCDCConfig.getCdcConfig();// 设置整个Flink程序的默认并行度env.setParallelism(cdcConfig.getParallelism());// 设置checkpoint 间隔env.enableCheckpointing(cdcConfig.getEnableCheckpointing());// 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);return env;}/*** 构建基本的kafka数据源* 参考 https://cloud.tencent.com/developer/article/2393696* https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/datastream/kafka/*/private <T> KafkaSourceBuilder<T> buildBaseKafkaSource(Class<T> Clazz) {FlinkCDCConfig.KafkaConfig kafkaConfig = flinkCDCConfig.getKafkaConfig();return KafkaSource.<T>builder()// 设置kafka地址.setBootstrapServers(kafkaConfig.getBootstrapServers())// 设置消费组id.setGroupId(kafkaConfig.getGroupId())// 设置主题,支持多种主题组合.setTopics(kafkaConfig.getTopics())// 消费模式, 支持多种消费模式/* OffsetsInitializer#committedOffsets: 从消费组提交的位点开始消费,不指定位点重置策略,这种策略会报异常,没有设置快照或设置自动提交* OffsetsInitializer#committedOffsets(OffsetResetStrategy.EARLIEST): 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点* OffsetsInitializer#timestamp(1657256176000L): 从时间戳大于等于指定时间戳(毫秒)的数据开始消费* OffsetsInitializer#earliest(): 从最早位点开始消费* OffsetsInitializer#latest(): 从最末尾位点开始消费,即从注册时刻开始消费*/.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))// 动态检查新分区, 10 秒检查一次新分区.setProperty("partition.discovery.interval.ms", "10000");}
}
Runner
/*** @author whiteBrocade* @description: 数据同步 Runner类*/
@Slf4j
@Component
@AllArgsConstructor
public class DataSyncRunner implements ApplicationRunner {private final MySqlDataChangeJob mySqlDataChangeJob;private final KafkaMySqlDataChangeJob kafkaMySqlDataChangeJob;@Override@SneakyThrowspublic void run(ApplicationArguments args) {mySqlDataChangeJob.startJob();kafkaMySqlDataChangeJob.startJob();}
}
工具类
JSONUtil
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import ognl.Ognl;
import ognl.OgnlContext;import java.util.Map;/*** @author whiteBrocade* @version 1.0* @description: JSON工具类*/
public class JSONUtil {/*** 将指定JSON转为Map对象, Key类型为String,对应JSON的key* Value分情况:* 1. Value是字符串, 自动转为字符串, 例如:{"a","b"}* 2. Value是其他JSON对象, 自动转为Map,例如::{"a":{"b":"2"}}* 3. Value是数组, 自动转为list<Map>,例如::{"a":[:{"b":"2"},"c":"3"]}** @param json 输入的的JSON对象* @return 动态Map集合*/public static Map<String, Object> transferToMap(String json) {Gson gson = new Gson();Map<String, Object> map = gson.fromJson(json, new TypeToken<Map<String, Object>>() {}.getType());return map;}/*** 获取指定JSON的指定路径的值** @param json  原始JSON数据* @param path  OGNL原则表达式* @param clazz Value对应的目标类* @return clazz对应的数据*/public static <T> T getValue(String json, String path, Class<T> clazz) {try {Map<String, Object> map = JSONUtil.transferToMap(json);OgnlContext ognlContext = new OgnlContext();ognlContext.setRoot(map);T value = (T) Ognl.getValue(path, ognlContext, ognlContext.getRoot(), clazz);return value;} catch (Exception e) {throw new RuntimeException(e);}}
}

代码(投递到ActiveMQ)

新增ActiveMQ依赖
<!-- 新增 ActiveMQ, 接受Flink-CDC的日志 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
yaml文件新增内容
# 引入ActiveMQ为了解耦日志同步, 以及持久化, 这里和kafka一致, 其实Flink也有RabbitMQ相关的连接器
spring:activemq:# activemq urlbroker-url: tcp://localhost:61616# 用户名&密码user: adminpassword: admin# 是否使用基于内存的ActiveMQ, 实际生产中使用基于独立安装的ActiveMQin-memory: truepool:# 如果此处设置为true,需要添加activemq-pool的依赖包,否则会⾃动配置失败,⽆法注⼊JmsMessagingTemplateenabled: false# 我们需要在配置⽂件 application.yml 中添加⼀个配置# 发布/订阅消息的消息和点对点不同,订阅消息支持多个消费者一起消费。其次,SpringBoot中默认的点对点消息,所以在使用Topic时会不起作用。jms:# 该配置是 false 的话,则为点对点消息,也是 Spring Boot 默认的# 这样是可以解决问题,但是如果这样配置的话,上⾯提到的点对点消息⼜不能正常消费了。所以⼆者不可兼得,这并⾮⼀个好的解决办法# ⽐较好的解决办法是,我们定义⼀个⼯⼚,@JmsListener 注解默认只接收 queue 消息,如果要接收 topic 消息,需要设置⼀下containerFactorypub-sub-domain: true
配置类
/*** @author whiteBrocade* @version 1.0* @description ActiveMqConfig配置*/
@Configuration
public class ActiveMqConfig {/*** 用于接受student表的消费信息*/public static final String TOPIC_NAME = "activemq:topic:student";public static final String QUEUE_NAME = "activemq:queue:student";@Beanpublic Topic topic() {return new ActiveMQTopic(TOPIC_NAME);}@Beanpublic Queue queue() {return new ActiveMQQueue(QUEUE_NAME);}/*** 接收topic消息,需要设置containerFactory*/@Beanpublic JmsListenerContainerFactory topicListenerContainer(ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 相当于在application.yml中配置:spring.jms.pub-sub-domain=truefactory.setPubSubDomain(true);return factory;}
}
生产者
/*** @author whiteBrocade* @version 1.0* @description CustomProducer*/
@Service
@RequiredArgsConstructor
public class CustomProducer {private final JmsMessagingTemplate jmsMessagingTemplate;@SneakyThrowspublic void sendQueueMessage(Queue queue, String msg) {String queueName = queue.getQueueName();jmsMessagingTemplate.convertAndSend(queueName, msg);}@SneakyThrowspublic void sendTopicMessage(Topic topic, String msg) {String topicName = topic.getTopicName();jmsMessagingTemplate.convertAndSend(topicName, msg);}
}
消费者
/*** @author whiteBrocade* @version 1.0* @description CustomQueueConsumer*/
@Slf4j
@Service
@RequiredArgsConstructor
public class CustomQueueConsumer {@JmsListener(destination = ActiveMqConfig.QUEUE_NAME)public void receiveQueueMsg(String msg) {log.info("消费者1111收到Queue消息: {}", msg);StudentMqDTO mqDTO = JSONUtil.toBean(msg, StudentMqDTO.class);Student student = mqDTO.getStudent();Integer operatorType = mqDTO.getOperatorType();OperatorTypeEnum operatorTypeEnum = OperatorTypeEnum.getEnumByType(operatorType);switch (operatorTypeEnum) {case INSERT:log.info("新增Student");break;case UPDATE: log.info("修改Student");break;case DELETE:log.info("删除Student");break;}}@JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")public void receiveTopicMsg(String msg) {log.info("消费者1111收到Topic消息: {}", msg);}
}
/*** @author whiteBrocade* @version 1.0* @description Custom2QueueConsumer*/
@Slf4j
@Service
public class Custom2QueueConsumer {@JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")public void receiveTopicMsg(String msg) {log.info("消费者2222收到Topic消息: {}", msg);}
}
model
DTO
/*** @author whiteBrocade* @description: Student MQ DTO*/
@Data
@Builder
public class StudentMqDTO implements Serializable {private static final long serialVersionUID = 4308564438724519731L;/*** 学生数据*/private Student student;/*** 数据在mysql中操作类型, 见OperatorTypeEnum的Type*/private Integer operatorType;
}
修改StudentLogHandler, 增加MQ投递逻辑
/*** @author whiteBrocade* @version 1.0* @description Student对应处理器*/
@Slf4j
@Service
@RequiredArgsConstructor
public class StudentLogHandler implements BaseLogHandler<Student> {private final Queue queue;@Overridepublic void handleInsertLog(Student student, Long operatorTime) {log.info("处理Student表的新增日志: {}", student);this.sendMq(student, OperatorTypeEnum.INSERT);}@Overridepublic void handleUpdateLog(Student student, Long operatorTime) {log.info("处理Student表的修改日志: {}", student);this.sendMq(student, OperatorTypeEnum.UPDATE);}@Overridepublic void handleDeleteLog(Student student, Long operatorTime) {log.info("处理Student表的删除日志: {}", student);this.sendMq(student, OperatorTypeEnum.DELETE);}/*** 发送MQ** @param student          Student* @param operatorTypeEnum 操作类型枚举*/private void sendMq(Student student, OperatorTypeEnum operatorTypeEnum) {StudentMqDTO mqDTO = StudentMqDTO.builder().student(student).operatorType(operatorTypeEnum.getType()).build();String jsonStr = JSONUtil.toJsonStr(mqDTO);CustomProducer customProducer = SpringUtil.getBean(CustomProducer.class);// 发送到MQcustomProducer.sendQueueMessage(queue, jsonStr);}
}
Controller
/*** @author whiteBrocade* @version 1.0* @description ActiveMqController, 用于测试发送ActiveMQ逻辑*/
@Slf4j
@RestController
@RequestMapping("/activemq")
@RequiredArgsConstructor
public class ActiveMqController {private final CustomProducer customProducer;private final Queue queue;private final Topic topic;@PostMapping("/send/queue")public String sendQueueMessage() {log.info("开始发送点对点的消息-------------");Student student = new Student();student.setId(IdUtil.getSnowflakeNextId());student.setName("小牛马");student.setDescription("我是小牛马");StudentMqDTO mqDTO = StudentMqDTO.builder().student(student).operatorType(1).build();String jsonStr = JSONUtil.toJsonStr(mqDTO);customProducer.sendQueueMessage(queue, jsonStr);return "success";}@PostMapping("/send/topic")public String sendTopicMessage() {log.info("===开始发送订阅消息===");Student student = new Student();student.setId(IdUtil.getSnowflakeNextId());student.setName("小牛马");student.setDescription("我是小牛马");StudentMqDTO mqDTO = StudentMqDTO.builder().student(student).operatorType(1).build();String jsonStr = JSONUtil.toJsonStr(mqDTO);customProducer.sendTopicMessage(topic, jsonStr);return "success";}
}
修改MySqlDataChangeJob, 将算子切换成mySqlDataChangeSink
/*** @author whiteBrocade* @version 1.0* @description MySQL数据变更 JOb*/
@Slf4j
@Component
@AllArgsConstructor
public class MySqlDataChangeJob {/*** Flink CDC相关配置*/private final FlinkCDCConfig flinkCDCConfig;/*** 自定义Sink算子* customSink: 通过ognl解析ddl语句类型* dataChangeSink: 通过struct解析ddl语句类型* kafkaSink: 将MySQL变化投递到Kafka* 通常两个选择一个就行*/private final CustomMySqlSink customMySqlSink;private final MySqlDataChangeSink mySqlDataChangeSink;private final MySqlChangeInfoKafkaProducerSink mysqlChangeInfoKafkaProducerSink;private final LogSink logSink;/*** 自定义MySQL反序列化处理器*/private final MySqlDeserializer mySqlDeserializer;/*** 启动Job*/@SneakyThrowspublic void startJob() {log.info("---------------- MySqlDataChangeJob 开始启动 ----------------");FlinkCDCConfig.CdcConfig cdcConfig = flinkCDCConfig.getCdcConfig();FlinkCDCConfig.MysqlConfig mysqlConfig = flinkCDCConfig.getMysqlConfig();// DataStream API执行模式包括:// 流执行模式(Streaming):用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式// 批执行模式(Batch):专门用于批处理的执行模式// 自动模式(AutoMatic):由程序根据输入数据源是否有界,来自动选择是流处理还是批处理执行// 执行模式选择,可以通过命令行方式配置:StreamExecutionEnvironment mySqlEnv = this.buildStreamExecutionEnvironment();// 这里选择自动模式mySqlEnv.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// todo 下列的两个MySqlSource选择一个// 自定义的反序列化器MySqlSource<MySqlDataChangeInfo> mySqlSource = this.buildBaseMySqlSource(MySqlDataChangeInfo.class).deserializer(mySqlDeserializer).build();// Flink CDC自带的反序列化器// MySqlSource<String> mySqlSource = this.buildBaseMySqlSource(String.class)//     .deserializer(new JsonDebeziumDeserializationSchema())//     .build();// 从MySQL源中读取数据DataStreamSource<MySqlDataChangeInfo> mySqlDataStreamSource = mySqlEnv.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),mysqlConfig.getSourceName())// 设置该数据源的并行度.setParallelism(cdcConfig.getParallelism());// 添加一个日志sink, 用于观察mySqlDataStreamSource.addSink(logSink);// 添加sink算子mySqlDataStreamSource// todo 根据上述的选择,选择对应的Sink算子// .addSink(customMySqlSink).addSink(mySqlDataChangeSink); // 添加Sink, 这里配合mySQLDeserialization+dataChangeSink// .sinkTo(mysqlChangeInfoKafkaProducerSink.getKafkaProducerSink()); // 将MySQL的数据变化投递到Kafka中// 启动服务// execute和executeAsync启动方式对比: https://blog.csdn.net/llg___/article/details/133798713mySqlEnv.executeAsync(mysqlConfig.getJobName());log.info("---------------- MySqlDataChangeJob 启动完毕 ----------------");}/*** 构建流式执行环境** @return StreamExecutionEnvironment*/private StreamExecutionEnvironment buildStreamExecutionEnvironment() {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FlinkCDCConfig.CdcConfig cdcConfig = flinkCDCConfig.getCdcConfig();// 设置整个Flink程序的默认并行度env.setParallelism(cdcConfig.getParallelism());// 设置checkpoint 间隔env.enableCheckpointing(cdcConfig.getEnableCheckpointing());// 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);return env;}/*** 构建基本的MySqlSourceBuilder** @param clazz 返回的数据类型Class对象* @param <T>   源数据中存储的类型* @return MySqlSourceBuilder*/private <T> MySqlSourceBuilder<T> buildBaseMySqlSource(Class<T> clazz) {FlinkCDCConfig.MysqlConfig mysqlConfig = flinkCDCConfig.getMysqlConfig();return MySqlSource.<T>builder().hostname(mysqlConfig.getHostname()).port(mysqlConfig.getPort()).username(mysqlConfig.getUsername()).password(mysqlConfig.getPassword()).databaseList(mysqlConfig.getDatabaseList()).tableList(mysqlConfig.getTableList())/* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)* latest: 只进行增量导入(不读取历史变化)* timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)*/.startupOptions(StartupOptions.latest()).includeSchemaChanges(mysqlConfig.getIncludeSchemaChanges()) // 包括schema的改变.serverTimeZone("GMT+8"); // 时区}
}

代码(MySQL通过MQ同步到ES)

  • 换成这里的MQ替换成Kafka也是同理

  • 官方地址Easy-Es,它主要就是简化了ES相关的API, 使用起来像MP一样舒服, 这里不在过多介绍, 跑通下边这个案例要看博主另外一篇博客easy-es使用

同步方案有两种

  • Flink-CDC监听MySQL直接写入ES
  • Flink-CDC监听MySQL写入ActiveMQ, MQ写入到ES(这里实现MQ的)

引入MQ保证同步的一个持久性, 即是宕机了, 那么重启恢复后也是可以继续使用的

新增ES和Eesy-ES依赖
<!-- es依赖 -->
<!-- 排除springboot中内置的es依赖,以防和easy-es中的依赖冲突-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId></exclusion><exclusion><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>${es.vsersion}</version>
</dependency>
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>${es.vsersion}</version>
</dependency><!-- easy-es -->
<dependency><groupId>org.dromara.easy-es</groupId><artifactId>easy-es-boot-starter</artifactId><version>${easy-es.vsersion}</version>
</dependency>
修改消费者CustomQueueConsumer
/*** @author whiteBrocade* @version 1.0* @description CustomQueueConsumer*/
@Slf4j
@Service
@RequiredArgsConstructor
public class CustomQueueConsumer {private final StudentEsMapper studentEsMapper;@JmsListener(destination = ActiveMqConfig.QUEUE_NAME)public void receiveQueueMsg(String msg) {log.info("消费者1111收到Queue消息: {}", msg);StudentMqDTO mqDTO = JSONUtil.toBean(msg, StudentMqDTO.class);Student student = mqDTO.getStudent();Integer operatorType = mqDTO.getOperatorType();OperatorTypeEnum operatorTypeEnum = OperatorTypeEnum.getEnumByType(operatorType);switch (operatorTypeEnum) {case INSERT:// 同步新增到Es中StudentEsEntity studentEsEntity = new StudentEsEntity();BeanUtil.copyProperties(student, studentEsEntity);studentEsEntity.setMysqlId(student.getId());studentEsMapper.insert(studentEsEntity);break;case UPDATE:case DELETE:// 修改mysql, 再删除ESLambdaEsQueryWrapper<StudentEsEntity> wrapper = new LambdaEsQueryWrapper<>();wrapper.eq(StudentEsEntity::getMysqlId, student.getId());studentEsMapper.delete(wrapper);break;}}@JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")public void receiveTopicMsg(String msg) {log.info("消费者1111收到Topic消息: {}", msg);}
}
/*** @author whiteBrocade* @version 1.0* @description Custom2QueueConsumer*/
@Slf4j
@Service
public class Custom2QueueConsumer {@JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")public void receiveTopicMsg(String msg) {log.info("消费者2222收到Topic消息: {}", msg);}
}

相关文章:

SpringBoot集成Flink-CDC

Flink CDC CDC相关介绍 CDC是什么? CDC是Change Data Capture(变更数据获取)的简称。核心思想是&#xff0c;监测并捕获数据库的变动&#xff08;包括数据或数据表的插入、更新以及删除等&#xff09;&#xff0c;将这些变更按发生的顺序完整记录下来&#xff0c;写入到MQ以…...

SQL报错注入检测方法与攻击方法

报错注入 即是注入检测方法&#xff0c;又是注入读取数据的方法 攻击者在判断一个参数是否存在SQL注入漏洞时&#xff0c;会拼接单引号&#xff0c;反斜杠字符&#xff0c;如果显示语法报错&#xff0c;证明这个位置具有SQL注入漏洞&#xff0c;也可以通过整数溢出来判断&…...

Linux内核编程(十九)SPI子系统的应用与驱动编写

本文目录 一、 SPI驱动框架图二、编写SPI驱动device框架三、编写SPI驱动driver框架四、实验一编写mcp2515驱动1. 注册字符设备或杂项设备框架2. SPI写数据3. SPI读寄存器数据 4. MCP2515相关配置 对于SPI基础知识这里不做过多讲解&#xff0c;详情查看&#xff1a;SPI基础知识实…...

MVC 文件夹结构详解

MVC 文件夹结构详解 MVC(Model-View-Controller)是一种广泛应用于软件开发中的设计模式,它通过将应用程序分为三个核心组件——模型(Model)、视图(View)和控制器(Controller)——来组织代码,提高代码的可维护性和可扩展性。在MVC模式中,每个组件都有其特定的职责,…...

远程操作Linux服务器 _Xshell、Xftp以及Linux常见操作命令

工具推荐 Xshell和Xftp是两款由NetSarang公司开发的、广受欢迎的软件工具&#xff0c;它们分别专注于终端模拟和文件传输&#xff0c;为用户提供了便捷的操作和强大的功能。以下是对这两款软件的详细解析&#xff1a; 一、Xshell 定义与功能 Xshell是一个强大的安全终端模拟软…...

单链表的实现(数据结构)

一. 单链表的实现 我们在上一篇中简单的认识了链表的组成和结构&#xff0c;并打印出链表&#xff0c;那么今天就来具体实现一下单链表对于数据增加、删减、插入等。 接下来就是我们在链表中对于数据的增、删、插的实现&#xff0c;对于我们的链表来说在任何地方增加数据都需…...

印刷质量检测笔记

一、印刷质量检测的背景与挑战 印刷品的质量检测&#xff0c;特别是针对高精度要求的印刷产品&#xff0c;如包装材料、标签、书籍封面等&#xff0c;一直是制造业中的一个关键环节。印刷品可能存在的质量问题多种多样&#xff0c;包括但不限于颜色偏差、文字模糊、漏印、多印…...

16、论文阅读:Mamba YOLO:用于目标检测的基于 SSM 的 YOLO

Mamba YOLO: SSMs-Based YOLO For Object Detection 总结前言感受野为什么Transformer 的结构被引入&#xff0c;显著扩展了模型的感受野&#xff1f;状态空间模型SSM 介绍相关工作实时目标检测端到端目标检测器视觉状态空间模型 方法预处理整体架构ODSS BlockLocalSpatial Blo…...

python项目实战---使用图形化界面下载音乐

音乐下载 设计思路&#xff1a; 设计界面编写爬虫代码绑定爬虫打包exe文件 这个是最终的设计成果&#xff0c;所有的下载歌曲都在“下载mp3”文件夹里面 完整代码 逻辑代码 import os.path import reimport requests from PyQt5.QtWidgets import QApplication,QWidget,QM…...

无人机干扰与抗干扰,无人机与反制设备的矛与盾

无人机干扰与抗干扰&#xff0c;以及无人机与反制设备之间的关系&#xff0c;可以形象地比喻为矛与盾的较量。以下是对这两方面的详细探讨&#xff1a; 一、无人机干扰与抗干扰 1. 无人机干扰技术 无人机干扰技术是指通过各种手段对无人机系统进行干扰&#xff0c;使其失去正…...

JAVA基础:单元测试;注解;枚举;网络编程 (学习笔记)

单元测试 操作步骤&#xff1a; a.导包import org.junit; b.三个注解 Test Before After c.点击Test 运行就可以了 用在不需要控制台输入的情境下&#xff1a;javaweb&#xff0c;框架项目&#xff0c;微服务项目 供开发人员自己做测试。 package com.page…...

Meta 上周宣布正式开源小型语言模型 MobileLLM 系列

在 7 月发布之后&#xff0c;Meta 上周宣布正式开源能够在智能手机上运行的小型语言模型 MobileLLM 系列。 Meta 在四个月前发布了这两个参数量小于 10 亿的语言模型 MobileLLM 125M 及 MobileLLM 350M。如今&#xff0c;Meta 又开发出了更大参数量的模型版本&#xff0c;包括…...

安全篇(1)判断安全固件

判断安全固件的方法 一、通过串口开机打印 改方法适用Android与Tina 1.开机打印为SBOOT为安全 [289]HELLO! SBOOT is starting! 2.开机打印boot0为非安全 [88]BOOT0 commit : 1cbb5ea8b3 二、通过读数据 1.getprop | grep verifiedbootstate 这条命令的输出表示设备的…...

ArcGIS005:ArcMap常用操作101-150例动图演示

摘要&#xff1a;本文涵盖了GIS软件操作的多方面内容&#xff0c;包括地图文档的新建、打开、保存及版本兼容性处理&#xff1b;错误与警告的查阅及帮助文档的使用技巧&#xff1b;地图打印比例尺的调整与地图信息的完善&#xff1b;图层操作的撤销与恢复&#xff0c;界面元素的…...

如何用ChatGPT结合Python处理遥感数据

在科技飞速发展的时代&#xff0c;遥感数据的精准分析已经成为推动各行业智能决策的关键工具。从无人机监测农田到卫星数据支持气候研究&#xff0c;空天地遥感数据正以前所未有的方式为科研和商业带来深刻变革。然而&#xff0c;对于许多专业人士而言&#xff0c;如何高效地处…...

matlab 质心重合法实现点云配准

目录 一、算法原理1、原理概述2、参考文献二、代码实现三、结果展示1、初始位置2、配准结果本文由CSDN点云侠原创,原文链接,首发于:2024年11月5日。 一、算法原理 1、原理概述 质心重合法是将源点云 P P P...

ubuntu双屏只显示一个屏幕另一个黑屏

简洁的结论&#xff1a; 系统环境 ubuntu22.04 nvidia-535解决方案 删除/etc/X11/xorg.conf 文件 记录一下折腾大半天的问题。 ubuntu系统是22.04,之前使用的时候更新驱动导致桌面崩溃&#xff0c;重新安装桌面安装不上&#xff0c;请IT帮忙&#xff0c;IT一番操作过后也表示…...

小菜家教平台:基于SpringBoot+Vue打造一站式学习管理系统

前言 现在已经学习了很多与Java相关的知识&#xff0c;但是迟迟没有进行一个完整的实践&#xff08;之前这个项目开发到一半&#xff0c;很多东西没学搁置了&#xff0c;同时原先的项目中也有很多的问题&#xff09;&#xff0c;所以现在准备从零开始做一个基于SpringBootVue的…...

网络自动化03:简单解释send_config_set方法并举例

目录 拓扑图设备信息 netmiko涉及方法send_config_set()方法的简单示例代码输出结果代码解释导入模块配置信息config_device_interface_description 函数主程序块总结 send_config_set方法参数&#xff1a;1. enter_config_mode2. config_commands3. enter_config_mode4. error…...

跳表原理笔记

课程地址 跳表是一种基于随机化的有序数据结构&#xff0c;它提出是为了赋予有序单链表以 O(logn) 的快速查找和插入的能力 创建 首先在头部创建一个 sentinel 节点&#xff0c;然后在 L1 层采用“抛硬币”的方式来决定 L0 层的指针是否增长到 L1 层 例如上图中&#xff0c;L…...

计算机毕业设计Hadoop+PySpark深度学习游戏推荐系统 游戏可视化 游戏数据分析 游戏爬虫 Scrapy 机器学习 人工智能 大数据毕设

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…...

AI开发-三方库-torch-torchvision

1 需求 数据集&#xff1a;torchvision.datasets torchvision.datasets.MNIST数据变换&#xff1a;torchvision.transforms torchvision.transforms.Composetorchvision.transforms.ToTensortorchvision.transforms.Normalize模型&#xff1a;torchvision.models可视化工具&…...

解析 MySQL 数据库容量统计、存储限制与优化技巧

管理 MySQL 数据库时&#xff0c;了解数据库中的数据量和存储占用情况是非常重要的&#xff0c;尤其是在面对大规模数据时。无论是为了优化数据库性能&#xff0c;还是为了进行容量规划&#xff0c;准确地统计数据库的容量可以帮助我们做出更好的决策。mysql的客户端工具是Navi…...

智能工厂的软件设计 思维进阶与数学程序

本文要点 讨论 “智能工厂的软件设计”中的“数学程序”。 这里 “数学程序” 是指能“格物致知”来理解“相续”一词。 完整的表述是&#xff1a; 思想素养提升的 思维进阶法&#xff08;三种 数学程序 &#xff1a; 格物致知 &#xff09;之思维导图&#xff1a; 二叉树及其…...

技术速递|GitHub Copilot upgrade assistant for Java 技术预览发布!

作者&#xff1a;Nick Zhu - Senior Program Manager 排版&#xff1a;Alan Wang 随着人工智能和大型语言模型&#xff08;LLMs&#xff09;的不断发展&#xff0c;Agent&#xff08;“智能代理”&#xff09;和智能代理化工作流程正在迅速成为AI领域的下一个前沿。这些自主系统…...

淘宝有哪些API是用来获取商品列表的?(商品id列表)

淘宝商品详情接口item_get是通过商品id或者商品链接来获取商品详情数据的&#xff0c;但是不少客户是没有商品id的&#xff0c;这时需要通过接口来拿到商品id。 可以获取商品id的API有&#xff1a; item_search 通过关键字搜索商品列表 item_search_shop 获取店铺所有商品列…...

D59【python 接口自动化学习】- python基础之异常

day59 捕获异常常见问题 学习日期&#xff1a;20241105 学习目标&#xff1a;异常 -- 75 避坑指南&#xff1a;编写捕获异常程序时经常出现的问题 学习笔记&#xff1a; 捕获位置设置不当 设置范围不当 捕获处理设置不当 嵌套try-except语法错误 总结 位置&#xff0c;范围…...

解决 Spring 异步处理中的 JDK 动态代理问题及相关错误分析

解决 Spring 异步处理中的 JDK 动态代理问题及相关错误分析 遇到的问题&#xff1a; 在使用 Spring 的 Async 注解开启异步处理时&#xff0c;遇到以下错误&#xff1a; The bean ServiceImplChannel could not be injected as a com.wn.order.pay.recharge.controller.Serv…...

从xss到任意文件读取

xss一直是一种非常常见且具有威胁性的攻击方式。然而&#xff0c;除了可能导致用户受到恶意脚本的攻击外&#xff0c;xss在特定条件下还会造成ssrf和文件读取&#xff0c;本文主要讲述在一次漏洞挖掘过程中从xss到文件读取的过程&#xff0c;以及其造成的成因。 0x01 前言 xss一…...

nuiapp vue3 uni-ui uni.uploadFile 图片上传

<div style"position: relative;margin-top: 0.8em;"> <div style"position: absolute;left: 1.5em;top: 2em;">施工图片</div> <div style"position: absolute; left: 7em;top: 0em;right: 0em;bottom…...