Spring Boot+RabbitMQ+Canal 解决数据一致性
目录大纲
- 一、环境配置
- 1.1 docker-compose.yml 配置
- 1.2 docker-compose 常用命令
- 1.3 镜像服务启动状态
- 二、MySQL binlog 配置
- 2.1 docker-compose command 配置 binlog
- 2.2 创建canal用户,以及查看是否开启binlog
- 三、canal 相关配置文件
- 3.1 canal.properties 完整文件
- 3.2 instance.properties 完整文件
- 3.3 检查配置是否与宿主机一致
- 3.4 开启相关端口防火墙配置
- 四、代码实现
- 4.1 相关pom依赖引入
- 4.2 完整pom.xml
- 4.3 application.yml 配置
- 4.4 完整application.yml配置
- 4.5 RabbitConstants 基础常量配置
- 4.6 CanalMqConfigure MQ队列交换机配置
- 4.7 CanalConsumer 消费者
- 五、运行与测试
一、环境配置
1.1 docker-compose.yml 配置
version: '3.8'services:redis:container_name: redisimage: redis:6.2.7restart: alwaysnetworks:- app_netports:- "6379:6379"volumes:- /usr/local/docker/redis/data:/data- /usr/local/docker/redis/config/redis.conf:/usr/local/redis/config/redis.conf- /usr/local/docker/redis/logs:/logscommand: [ "redis-server","/usr/local/redis/config/redis.conf" ]mysql:container_name: mysqlimage: mysql:8.0.30restart: alwaysnetworks:- app_netports:- "3306:3306"volumes:- /usr/local/docker/mysql/data:/var/lib/mysql- /usr/local/docker/mysql/config:/etc/mysql/conf.denvironment:MYSQL_ROOT_PASSWORD: rootTZ: Asia/Shanghaicommand:--default-authentication-plugin=mysql_native_password--character-set-server=utf8mb4--collation-server=utf8mb4_general_ci--explicit_defaults_for_timestamp=true--lower_case_table_names=1--log-bin=/var/lib/mysql/mysql-bin--server-id=1--binlog-format=ROW--expire_logs_days=7--max_binlog_size=500Mcanal:image: canal/canal-server:v1.1.5container_name: canalrestart: alwaysports:- 11110:11110- 11111:11111- 11112:11112volumes:- /usr/local/docker/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties- /usr/local/docker/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties- /usr/local/docker/canal/logs:/home/admin/canal-server/logsnetworks:- app_netdepends_on:- mysql- rabbitmqrabbitmq:image: rabbitmq:3-managementcontainer_name: rabbitmqrestart: alwaysports:- "5672:5672"- "15672:15672"volumes:- /usr/local/docker/rabbitmq/data/:/var/lib/rabbitmq/- /usr/local/docker/rabbitmq/log/:/var/log/rabbitmq/environment:- RABBITMQ_DEFAULT_USER=guest- RABBITMQ_DEFAULT_PASS=guestnetworks:- app_netnetworks:app_net:driver: bridge
1.2 docker-compose 常用命令
# 后台启动容器编排文件
docker-compose up -d [service]# 停止up命令所启动的容器,并移除网络
docker-compose down# 进入指定容器
docker-compose exec [service]# 列出项目中所有的容器
docker-compose ps [service]# 重启项目中容器
docker-compose restart [service]# 删除项目中所有容器
docker-compose rm -f [service]# 启动项目中容器(或指定容器)
docker-compose start [service]# 暂停项目中容器(或指定容器)
docker-compose stop [service]
1.3 镜像服务启动状态
[root@lavm-13jmyj9ugf docker]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0d4260bc557b canal/canal-server:v1.1.5 "/alidata/bin/main.s…" 38 minutes ago Up 38 minutes 9100/tcp, 0.0.0.0:11110-11112->11110-11112/tcp, :::11110-11112->11110-11112/tcp canal
c66b3f1f13a9 mysql:8.0.30 "docker-entrypoint.s…" 38 minutes ago Up 38 minutes 0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp mysql
645e27bd4001 rabbitmq:3-management "docker-entrypoint.s…" 5 hours ago Up 49 minutes 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq
f55d42cbbd8e redis:6.2.7 "docker-entrypoint.s…" 3 days ago Up 49 minutes 0.0.0.0:6379->6379/tcp, :::6379->6379/tcp redis
二、MySQL binlog 配置
2.1 docker-compose command 配置 binlog
--log-bin=/var/lib/mysql/mysql-bin
--server-id=1
--binlog-format=ROW
--expire_logs_days=7
--max_binlog_size=500M
2.2 创建canal用户,以及查看是否开启binlog
mysql> CREATE USER canal IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.05 sec)mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.05 sec)mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.05 sec)mysql> select * from mysql.user where User = 'canal';
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| Host | User | Select_priv | Insert_priv | Update_priv | Delete_priv | Create_priv | Drop_priv | Reload_priv | Shutdown_priv | Process_priv | File_priv | Grant_priv | References_priv | Index_priv | Alter_priv | Show_db_priv | Super_priv | Create_tmp_table_priv | Lock_tables_priv | Execute_priv | Repl_slave_priv | Repl_client_priv | Create_view_priv | Show_view_priv | Create_routine_priv | Alter_routine_priv | Create_user_priv | Event_priv | Trigger_priv | Create_tablespace_priv | ssl_type | ssl_cipher | x509_issuer | x509_subject | max_questions | max_updates | max_connections | max_user_connections | plugin | authentication_string | password_expired | password_last_changed | password_lifetime | account_locked | Create_role_priv | Drop_role_priv | Password_reuse_history | Password_reuse_time | Password_require_current | User_attributes |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| % | canal | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | Y | Y | N | N | N | N | N | N | N | N | | | | | 0 | 0 | 0 | 0 | mysql_native_password | *E3619321C1A937C46A0D8BD1DAC39F93B27D4458 | N | 2025-03-10 11:53:49 | NULL | N | N | N | NULL | NULL | NULL | NULL |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
1 row in set (0.08 sec)mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.30 sec)
三、canal 相关配置文件
canal.properties 主要核心配置
canal.serverMode=rabbitMQ:选择 RabbitMQ 作为通知服务模型。
rabbitmq.host=rabbitmq:基于 Docker 同一网络下,可以使用容器名称代替 host。
rabbitmq.queue、rabbitmq.routingKey、rabbitmq.exchange : RabbitMQ 的三件套,用于后续创建具体通道监听。
#################################################
######### common argument #############
#################################################
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 支持的服务模型,tcp直连或mq,此处我选择RabbitMQ
canal.serverMode=rabbitMQ##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=
instance.properties 主要核心配置
canal.instance.master.address 数据库地址
canal.instance.dbUsername 数据库用户名
canal.instance.dbPassword 数据库密码
canal.mq.topic RabbitMQ路由
# 数据地址,此处mysql,是因为canal和mysql是同一network下,可以使用容器名称代替具体ip
canal.instance.master.address=mysql:3306# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root# mq config
canal.mq.topic=canal-routing-key
3.1 canal.properties 完整文件
#################################################
######### common argument #############
#################################################
canal.ip=
canal.register.ip=
canal.port=11111
canal.metrics.pull.port=11112
canal.admin.port=11110
canal.admin.user=admin
canal.admin.passwd=
canal.zkServers=
canal.zookeeper.flush.period=1000
canal.withoutNetty=false
canal.serverMode=rabbitMQ
canal.file.data.dir=${canal.conf.dir}
canal.file.flush.period=1000
canal.instance.memory.buffer.size=16384
canal.instance.memory.buffer.memunit=1024
canal.instance.memory.batch.mode=MEMSIZE
canal.instance.memory.rawEntry=true
canal.instance.detecting.enable=false
canal.instance.detecting.sql=select 1
canal.instance.detecting.interval.time=3
canal.instance.detecting.retry.threshold=3
canal.instance.detecting.heartbeatHaEnable=false
canal.instance.transaction.size=1024
canal.instance.fallbackIntervalInSeconds=60
canal.instance.network.receiveBufferSize=16384
canal.instance.network.sendBufferSize=16384
canal.instance.network.soTimeout=30
canal.instance.filter.druid.ddl=true
canal.instance.filter.query.dcl=false
canal.instance.filter.query.dml=false
canal.instance.filter.query.ddl=false
canal.instance.filter.table.error=false
canal.instance.filter.rows=false
canal.instance.filter.transaction.entry=false
canal.instance.filter.dml.insert=false
canal.instance.filter.dml.update=false
canal.instance.filter.dml.delete=false
canal.instance.binlog.format=ROW,STATEMENT,MIXED
canal.instance.binlog.image=FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation=false
canal.instance.parser.parallel=true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize=256
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
canal.instance.tsdb.snapshot.interval=24
canal.instance.tsdb.snapshot.expire=360
#################################################
######### destinations #############
#################################################
canal.destinations=example
canal.conf.dir=../conf
canal.auto.scan=true
canal.auto.scan.interval=5
canal.auto.reset.latest.pos.mode=false
canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
canal.instance.global.mode=spring
canal.instance.global.lazy=false
canal.instance.global.manager.address=${canal.admin.manager}
canal.instance.global.spring.xml=classpath:spring/file-instance.xml
##################################################
######### MQ Properties #############
##################################################
canal.aliyun.accessKey=
canal.aliyun.secretKey=
canal.aliyun.uid=
canal.mq.flatMessage=true
canal.mq.canalBatchSize=50
canal.mq.canalGetTimeout=100
canal.mq.accessChannel=local
canal.mq.database.hash=true
canal.mq.send.thread.size=30
canal.mq.build.thread.size=8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers=127.0.0.1:9092
kafka.acks=all
kafka.compression.type=none
kafka.batch.size=16384
kafka.linger.ms=1
kafka.max.request.size=1048576
kafka.buffer.memory=33554432
kafka.max.in.flight.requests.per.connection=1
kafka.retries=0
kafka.kerberos.enable=false
kafka.kerberos.krb5.file=../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file=../conf/kerberos/jaas.conf
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group=test
rocketmq.enable.message.trace=false
rocketmq.customized.trace.topic=
rocketmq.namespace=
rocketmq.namesrv.addr=127.0.0.1:9876
rocketmq.retry.times.when.send.failed=0
rocketmq.vip.channel.enabled=false
rocketmq.tag=
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=
##################################################
######### Pulsar #############
##################################################
pulsarmq.serverUrl=
pulsarmq.roleToken=
pulsarmq.topicTenantPrefix=
3.2 instance.properties 完整文件
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# position info
canal.instance.master.address=mysql:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# multi stream for polardbx
canal.instance.multi.stream.on=false# ssl
#canal.instance.master.sslMode=DISABLED
#canal.instance.master.tlsVersions=
#canal.instance.master.trustCertificateKeyStoreType=
#canal.instance.master.trustCertificateKeyStoreUrl=
#canal.instance.master.trustCertificateKeyStorePassword=
#canal.instance.master.clientCertificateKeyStoreType=
#canal.instance.master.clientCertificateKeyStoreUrl=
#canal.instance.master.clientCertificateKeyStorePassword=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=admin123!@#
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=canal-routing-key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
3.3 检查配置是否与宿主机一致
进入容器内部:docker exec -it canal bash
检查配置文件内容是否与宿主机一致:
cat /home/admin/canal-server/conf/canal.propertiescat /home/admin/canal-server/conf/example/instance.properties
3.4 开启相关端口防火墙配置
canal:11110、11111、11112mysql:3306redis:6379RabbitMQ: 15672、5672

四、代码实现
4.1 相关pom依赖引入
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent from repository -->
</parent><!-- Spring Boot MQ 依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency><!-- canal -->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>
4.2 完整pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.neo</groupId><artifactId>code-repository</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent from repository --></parent><name>code-repository</name><properties><java.version>17</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><hutool.version>5.8.20</hutool.version><mysql.version>8.0.30</mysql.version><mybatis-plus.version>3.5.3.1</mybatis-plus.version><redis.version>3.1.0</redis.version><druid.version>1.2.16</druid.version><fastjson.version>1.2.83</fastjson.version><sa-token.version>1.37.0</sa-token.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>${druid.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version></dependency><dependency><groupId>cn.dev33</groupId><artifactId>sa-token-spring-boot-starter</artifactId><version>${sa-token.version}</version></dependency><dependency><groupId>cn.dev33</groupId><artifactId>sa-token-redis-jackson</artifactId><version>${sa-token.version}</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
4.3 application.yml 配置
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlatedpublisher-returns: true
4.4 完整application.yml配置
server:port: 8088servlet:context-path: /apispring:datasource:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/db_v1?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: rootdruid:initial-size: 5min-idle: 5max-active: 20max-wait: 60000validation-query: SELECT 1test-while-idle: truestat-view-servlet:enabled: trueurl-pattern: /druid/*login-username: adminlogin-password: admin123web-stat-filter:enabled: trueurl-pattern: /*exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"filter:stat:enabled: truelog-slow-sql: trueslow-sql-millis: 1000wall:enabled: trueconfig:drop-table-allow: falseredis:host: localhostport: 6379password: 123456database: 0timeout: 5000lettuce:pool:max-active: 8max-wait: -1max-idle: 8min-idle: 0mail:host: smtp.aliyun.comusername: password: port: 25properties:mail:smtp:auth: truestarttls:enable: truerequired: truerabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlatedpublisher-returns: truemybatis-plus:mapper-locations: classpath*:mapper/*_Mapper.xmlglobal-config:db-config:logic-delete-field: delFlaglogic-delete-value: 1logic-not-delete-value: 0configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
4.5 RabbitConstants 基础常量配置
public interface RabbitConstants {interface Canal {String QUEUE = "canal-queue";String EXCHANGE = "canal-exchange";String ROUTING = "canal-routing-key";}interface EventType {String INSERT = "INSERT";String UPDATE = "UPDATE";String DELETE = "DELETE";}}
4.6 CanalMqConfigure MQ队列交换机配置
@Configuration
public class CanalMqConfigure {@Beanpublic Queue queue() {return new Queue(RabbitConstants.Canal.QUEUE, true);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(RabbitConstants.Canal.EXCHANGE, true, false);}@Beanpublic Binding bindingCanal() {return BindingBuilder.bind(queue()).to(directExchange()).with(RabbitConstants.Canal.ROUTING);}
}
4.7 CanalConsumer 消费者
package com.neo.core.canal;import com.neo.core.constant.RabbitConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Map;@Slf4j
@Component
@RabbitListener(queues = RabbitConstants.Canal.QUEUE)
public class CanalConsumer {@RabbitHandlerpublic void execute(Map<String, Object> msg) {log.info("canal消息监听事件触发,消息内容:{}", msg);boolean isDdl = (boolean) msg.get("isDdl");if (isDdl) {return;}String database = (String) msg.get("database");String table = (String) msg.get("table");String type = (String) msg.get("type");List<?> data = (List<?>) msg.get("data");log.info("database:{}.table:{}", database, table);if (RabbitConstants.EventType.INSERT.equalsIgnoreCase(type)) {System.out.println("INSERT");} else if (RabbitConstants.EventType.UPDATE.equalsIgnoreCase(type)) {System.out.println("UPDATE");} else if (RabbitConstants.EventType.DELETE.equalsIgnoreCase(type)) {System.out.println("DELETE");} else {// 其他事件}}
}
五、运行与测试
当MySQL数据出现变动后,会触发canal-queue的监听事件,后续可根据具体业务逻辑实现业务处理。
25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test1, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596660000, id=5, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596660932, type=DELETE}
25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
DELETE
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596663000, id=6, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596664038, type=INSERT}
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
INSERT
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - canal消息监听事件触发,消息内容:{data=[{id=2, account=test1, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596668000, id=7, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=[{account=test}], pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596668545, type=UPDATE}
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO CanalConsumer - database:db_v1.table:sys_user
UPDATE
以上是RabbitMQ+Canal数据一致性的完整解决方案,包括环境配置、代码实现以及运行测试等环节,确保了数据在不同系统间的一致性和可靠性。
相关文章:
Spring Boot+RabbitMQ+Canal 解决数据一致性
目录大纲 一、环境配置1.1 docker-compose.yml 配置1.2 docker-compose 常用命令1.3 镜像服务启动状态 二、MySQL binlog 配置2.1 docker-compose command 配置 binlog2.2 创建canal用户,以及查看是否开启binlog 三、canal 相关配置文件3.1 canal.properties 完整文…...
Java高频面试之集合-08
hello啊,各位观众姥爷们!!!本baby今天来报道了!哈哈哈哈哈嗝🐶 面试官:详细说说CopyOnWriteArrayList CopyOnWriteArrayList 详解 CopyOnWriteArrayList 是 Java 并发包(java.util…...
C#实现高性能异步文件下载器(支持进度显示/断点续传)
一、应用场景分析 异步文件下载器用处很大,当我们需要实现以下功能时可以用的上: 大文件下载(如4K视频/安装包) 避免UI线程阻塞,保证界面流畅响应多任务并行下载 支持同时下载多个文件,提升带宽利用率后台…...
【数据分析】转录组基因表达的KEGG通路富集分析教程
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍差异分析(limma)KEGG富集分析(enrichKEGG)可视化加载R包数据下载导入数据基因差异分析火山图KEGG通路富集分析可视化通路结果另一个案例总结系统信息参考介绍 KEGG富集分析,可…...
【由技及道】API契约的量子纠缠术:响应封装的十一维通信协议(全局的返回结果封装)【人工智障AI2077的开发日志012】
摘要:在API通信的量子混沌中,30种返回格式如同平行宇宙的物理定律相互碰撞。本文构建的十一维通信协议,通过时空锚点(ApiResult)、量子过滤器(ResponseWrapper)和湮灭防护罩(Jackson…...
STM32 ——系统架构
3个被动单元 SRAM 存储程序运行时用到的变量 Flash(内部闪存存储器) 存储下载的程序 程序执行时用到的常量 桥接1和桥接2 AHB到APB的桥(AHBtoAPBx) 桥1 通过APB2总线连接到APB2上的外设。 高速外设,最高72MHz。 桥2 通过…...
算法 之 树形dp 树的中心、重心
文章目录 重心实践题目小红的陡峭值 在树的算法中,求解树的中心和重心是一类十分重要的算法 求解树的重心 树的重心的定义:重心是树中的一个节点,如果将这个点删除后,剩余各个连通块中点数的最大值最小,那么这个节点…...
如何利用 Excel 表格实现精准文件批量重命名教程
在处理大量文件时,有时需要根据特定规则对文件名进行调整。如果您的文件名和新名称之间存在一对多的关系,并且这种关系可以通过 Excel 表格来管理,那么使用“简鹿文件批量重命名”软件中的“匹配对应名称命名”功能将是一个高效的选择。接下来…...
ACE协议学习1
在多核系统或复杂SoC(System on Chip)中,不同处理器核心或IP(Intellectual Property)模块之间需要保持数据的一致性。常用的是ACE协议or CHI。 先对ACE协议进行学习 ACE协议(Advanced Microcontroller Bu…...
【实战ES】实战 Elasticsearch:快速上手与深度实践-5.1.1热点分片识别与均衡策略
👉 点击关注不迷路 👉 点击关注不迷路 👉 点击关注不迷路 文章大纲 5.1.1 Filebeat Logstash ES Kibana 全链路配置实1. 架构设计与组件选型1.1 技术栈对比分析1.2 硬件配置推荐 2. Filebeat 高级配置2.1 多输入源配置2.2 性能优化参数 3.…...
Kubernetes 的正式安装
1.基础的网络结构说明 软件路由器 ikuai 当然同一个仅主机模式 相当于在 同一个我们所谓的广播域内 所以相当于它们的几张网卡 是被连接起来的 为了防止出现问题 我们可以把第二块网卡临时关闭一下 2.准备路由器 ikuai 爱快 iKuai-商业场景网络解决方案提供商 (ikuai8.com)…...
初阶数据结构(C语言实现)——4.2队列
目录 2.队列2.1队列的概念及结构2.2队列的实现2.2.1 初始化队列2.2.2 销毁队列2.2.3 队尾入队列2.2.4 队头出队列2.2.5获取队列头部元素2.2.6 获取队列队尾元素2.2.7获取队列中有效元素个数2.2.8 检测队列是否为空,如果为空返回非零结果,如果非空返回0 3…...
Mysql主从复制和Mysql高可用以及负载均衡配置
需要先配置MySQL主从复制,然后再在主MySQL服务器上配置MySQL Router。以下是详细说明和步骤: 1. 为什么需要先配置MySQL主从复制? MySQL主从复制是MySQL高可用性和负载均衡的基础,通过将数据从主服务器实时同步到从服务器&#…...
c#中使用时间戳转换器
在C#中,时间戳转换器通常用于将时间戳(通常是一个表示自某一特定时间点(如1970年1月1日UTC)以来的毫秒数的长整型值)转换为DateTime对象,或者将DateTime对象转换回时间戳。以下是几种实现这一功能的方法: 1. 使用DateTime的构造函数 将时间戳转换为DateTime long tim…...
杂项知识笔记搜集
1.pygame pygame可以画出来图形界面,pygame Python仓库 PyGame游戏编程_游戏程序设计csdn-CSDN博客 2.V4L2库 V4L2是Linux上的Camera采集器的框架 Video for Linux ,是从Linux2.1版本开始支持的。HDMI视频采集卡采集到的视频通过USB3.0输出࿰…...
rust语言match模式匹配涉及转移所有权Error Case
struct S{data:String, }//注意:因为String默认是移动语义,从而决定结构体S也是移动语义,可采用(1)或(2)两种方法解决编译错误;关键思路:放弃获取结构体S的字段data的所有权,改为借用。fn process(s_ref:&a…...
golang从入门到做牛马:第十一篇-Go语言变量作用域:变量的“生活圈”
在Go语言中,变量的作用域决定了它在程序中的可见性和生命周期。理解变量的作用域对于编写清晰、高效的代码至关重要。Go语言中的变量可以在三个主要地方声明:函数内、函数外和函数定义中。接下来,让我们深入探讨局部变量、全局变量和形式参数的作用域。 局部变量:函数内的“…...
【Linux】37.网络版本计算器
文章目录 1. Log.hpp-日志记录器2. Daemon.hpp-守护进程工具3. Protocol.hpp-通信协议解析器4. ServerCal.hpp-计算器服务处理器5. Socket.hpp-Socket通信封装类6. TcpServer.hpp-TCP服务器框架7. ClientCal.cc-计算器客户端8. ServerCal.cc-计算器服务器9. 代码时序1. 服务器启…...
linux安装Mariadb10.5并修改端口
首先配置yum源 进入下方的文件进行配置 vim /etc/yum.repos.d/MariaDB.repo填写下方内容 [mariadb]name MariaDBbaseurl https:///mirrors.aliyun.com/mariadb/yum/10.5/centos8-amd64/gpgkeyhttps:///mirrors.aliyun.com/mariadb/yum/RPM-GPG-KEY-MariaDBmodule_hotfixes…...
从Windows到ARM Linux:Qt程序的交叉编译与移植指南
引言 在嵌入式开发中,我们经常需要将桌面端开发的Qt程序部署到ARM架构的Linux设备。本文详细介绍如何将Windows平台开发的Qt程序,通过Linux虚拟机交叉编译为ARM架构可执行文件的完整过程 环境准备 需要特别注意的是,对于CentOS 7 默认支持…...
IDEA运行Tomcat出现乱码问题解决汇总
最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...
Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...
React Native在HarmonyOS 5.0阅读类应用开发中的实践
一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强,React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 (1)使用React Native…...
django filter 统计数量 按属性去重
在Django中,如果你想要根据某个属性对查询集进行去重并统计数量,你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求: 方法1:使用annotate()和Count 假设你有一个模型Item,并且你想…...
如何在看板中有效管理突发紧急任务
在看板中有效管理突发紧急任务需要:设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP(Work-in-Progress)弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中,设立专门的紧急任务通道尤为重要,这能…...
使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装
以下是基于 vant-ui(适配 Vue2 版本 )实现截图中照片上传预览、删除功能,并封装成可复用组件的完整代码,包含样式和逻辑实现,可直接在 Vue2 项目中使用: 1. 封装的图片上传组件 ImageUploader.vue <te…...
Linux-07 ubuntu 的 chrome 启动不了
文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了,报错如下四、启动不了,解决如下 总结 问题原因 在应用中可以看到chrome,但是打不开(说明:原来的ubuntu系统出问题了,这个是备用的硬盘&a…...
今日科技热点速览
🔥 今日科技热点速览 🎮 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售,主打更强图形性能与沉浸式体验,支持多模态交互,受到全球玩家热捧 。 🤖 人工智能持续突破 DeepSeek-R1&…...
STM32HAL库USART源代码解析及应用
STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...
