当前位置: 首页 > news >正文

Spring Boot+RabbitMQ+Canal 解决数据一致性

目录大纲

    • 一、环境配置
      • 1.1 docker-compose.yml 配置
      • 1.2 docker-compose 常用命令
      • 1.3 镜像服务启动状态
    • 二、MySQL binlog 配置
      • 2.1 docker-compose command 配置 binlog
      • 2.2 创建canal用户,以及查看是否开启binlog
    • 三、canal 相关配置文件
      • 3.1 canal.properties 完整文件
      • 3.2 instance.properties 完整文件
      • 3.3 检查配置是否与宿主机一致
      • 3.4 开启相关端口防火墙配置
    • 四、代码实现
      • 4.1 相关pom依赖引入
      • 4.2 完整pom.xml
      • 4.3 application.yml 配置
      • 4.4 完整application.yml配置
      • 4.5 RabbitConstants 基础常量配置
      • 4.6 CanalMqConfigure MQ队列交换机配置
      • 4.7 CanalConsumer 消费者
    • 五、运行与测试

一、环境配置

1.1 docker-compose.yml 配置

version: '3.8'services:redis:container_name: redisimage: redis:6.2.7restart: alwaysnetworks:- app_netports:- "6379:6379"volumes:- /usr/local/docker/redis/data:/data- /usr/local/docker/redis/config/redis.conf:/usr/local/redis/config/redis.conf- /usr/local/docker/redis/logs:/logscommand: [ "redis-server","/usr/local/redis/config/redis.conf" ]mysql:container_name: mysqlimage: mysql:8.0.30restart: alwaysnetworks:- app_netports:- "3306:3306"volumes:- /usr/local/docker/mysql/data:/var/lib/mysql- /usr/local/docker/mysql/config:/etc/mysql/conf.denvironment:MYSQL_ROOT_PASSWORD: rootTZ: Asia/Shanghaicommand:--default-authentication-plugin=mysql_native_password--character-set-server=utf8mb4--collation-server=utf8mb4_general_ci--explicit_defaults_for_timestamp=true--lower_case_table_names=1--log-bin=/var/lib/mysql/mysql-bin--server-id=1--binlog-format=ROW--expire_logs_days=7--max_binlog_size=500Mcanal:image: canal/canal-server:v1.1.5container_name: canalrestart: alwaysports:- 11110:11110- 11111:11111- 11112:11112volumes:- /usr/local/docker/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties- /usr/local/docker/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties- /usr/local/docker/canal/logs:/home/admin/canal-server/logsnetworks:- app_netdepends_on:- mysql- rabbitmqrabbitmq:image: rabbitmq:3-managementcontainer_name: rabbitmqrestart: alwaysports:- "5672:5672"- "15672:15672"volumes:- /usr/local/docker/rabbitmq/data/:/var/lib/rabbitmq/- /usr/local/docker/rabbitmq/log/:/var/log/rabbitmq/environment:- RABBITMQ_DEFAULT_USER=guest- RABBITMQ_DEFAULT_PASS=guestnetworks:- app_netnetworks:app_net:driver: bridge

1.2 docker-compose 常用命令

# 后台启动容器编排文件
docker-compose up -d [service]# 停止up命令所启动的容器,并移除网络
docker-compose down# 进入指定容器
docker-compose exec [service]# 列出项目中所有的容器
docker-compose ps [service]# 重启项目中容器
docker-compose restart [service]# 删除项目中所有容器
docker-compose rm -f [service]# 启动项目中容器(或指定容器)
docker-compose start [service]# 暂停项目中容器(或指定容器)
docker-compose stop [service]

1.3 镜像服务启动状态

[root@lavm-13jmyj9ugf docker]# docker ps -a
CONTAINER ID   IMAGE                           COMMAND                  CREATED          STATUS          PORTS                                                                                                                                                 NAMES
0d4260bc557b   canal/canal-server:v1.1.5       "/alidata/bin/main.s…"   38 minutes ago   Up 38 minutes   9100/tcp, 0.0.0.0:11110-11112->11110-11112/tcp, :::11110-11112->11110-11112/tcp                                                                       canal
c66b3f1f13a9   mysql:8.0.30                    "docker-entrypoint.s…"   38 minutes ago   Up 38 minutes   0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp                                                                                                  mysql
645e27bd4001   rabbitmq:3-management           "docker-entrypoint.s…"   5 hours ago      Up 49 minutes   4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq
f55d42cbbd8e   redis:6.2.7                     "docker-entrypoint.s…"   3 days ago       Up 49 minutes   0.0.0.0:6379->6379/tcp, :::6379->6379/tcp                                                                                                             redis

二、MySQL binlog 配置

2.1 docker-compose command 配置 binlog

--log-bin=/var/lib/mysql/mysql-bin
--server-id=1
--binlog-format=ROW
--expire_logs_days=7
--max_binlog_size=500M

2.2 创建canal用户,以及查看是否开启binlog

mysql> CREATE USER canal IDENTIFIED BY 'canal';  
Query OK, 0 rows affected (0.05 sec)mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.05 sec)mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.05 sec)mysql> select * from mysql.user where User = 'canal';
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| Host | User  | Select_priv | Insert_priv | Update_priv | Delete_priv | Create_priv | Drop_priv | Reload_priv | Shutdown_priv | Process_priv | File_priv | Grant_priv | References_priv | Index_priv | Alter_priv | Show_db_priv | Super_priv | Create_tmp_table_priv | Lock_tables_priv | Execute_priv | Repl_slave_priv | Repl_client_priv | Create_view_priv | Show_view_priv | Create_routine_priv | Alter_routine_priv | Create_user_priv | Event_priv | Trigger_priv | Create_tablespace_priv | ssl_type | ssl_cipher | x509_issuer | x509_subject | max_questions | max_updates | max_connections | max_user_connections | plugin                | authentication_string                     | password_expired | password_last_changed | password_lifetime | account_locked | Create_role_priv | Drop_role_priv | Password_reuse_history | Password_reuse_time | Password_require_current | User_attributes |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| %    | canal | Y           | N           | N           | N           | N           | N         | N           | N             | N            | N         | N          | N               | N          | N          | N            | N          | N                     | N                | N            | Y               | Y                | N                | N              | N                   | N                  | N                | N          | N            | N                      |          |            |             |              |             0 |           0 |               0 |                    0 | mysql_native_password | *E3619321C1A937C46A0D8BD1DAC39F93B27D4458 | N                |   2025-03-10 11:53:49 | NULL              | N              | N                | N              | NULL                   | NULL                | NULL                     | NULL            |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
1 row in set (0.08 sec)mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.30 sec)

三、canal 相关配置文件

canal.properties 主要核心配置

canal.serverMode=rabbitMQ:选择 RabbitMQ 作为通知服务模型。
rabbitmq.host=rabbitmq:基于 Docker 同一网络下,可以使用容器名称代替 host。
rabbitmq.queuerabbitmq.routingKeyrabbitmq.exchange : RabbitMQ 的三件套,用于后续创建具体通道监听。

#################################################
#########               common argument         #############
#################################################
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 支持的服务模型,tcp直连或mq,此处我选择RabbitMQ
canal.serverMode=rabbitMQ##################################################
#########                   RabbitMQ         #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=

instance.properties 主要核心配置
canal.instance.master.address 数据库地址
canal.instance.dbUsername 数据库用户名
canal.instance.dbPassword 数据库密码
canal.mq.topic RabbitMQ路由

# 数据地址,此处mysql,是因为canal和mysql是同一network下,可以使用容器名称代替具体ip
canal.instance.master.address=mysql:3306# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root# mq config
canal.mq.topic=canal-routing-key

3.1 canal.properties 完整文件

#################################################
#########               common argument         #############
#################################################
canal.ip=
canal.register.ip=
canal.port=11111
canal.metrics.pull.port=11112
canal.admin.port=11110
canal.admin.user=admin
canal.admin.passwd=
canal.zkServers=
canal.zookeeper.flush.period=1000
canal.withoutNetty=false
canal.serverMode=rabbitMQ
canal.file.data.dir=${canal.conf.dir}
canal.file.flush.period=1000
canal.instance.memory.buffer.size=16384
canal.instance.memory.buffer.memunit=1024 
canal.instance.memory.batch.mode=MEMSIZE
canal.instance.memory.rawEntry=true
canal.instance.detecting.enable=false
canal.instance.detecting.sql=select 1
canal.instance.detecting.interval.time=3
canal.instance.detecting.retry.threshold=3
canal.instance.detecting.heartbeatHaEnable=false
canal.instance.transaction.size=1024
canal.instance.fallbackIntervalInSeconds=60
canal.instance.network.receiveBufferSize=16384
canal.instance.network.sendBufferSize=16384
canal.instance.network.soTimeout=30
canal.instance.filter.druid.ddl=true
canal.instance.filter.query.dcl=false
canal.instance.filter.query.dml=false
canal.instance.filter.query.ddl=false
canal.instance.filter.table.error=false
canal.instance.filter.rows=false
canal.instance.filter.transaction.entry=false
canal.instance.filter.dml.insert=false
canal.instance.filter.dml.update=false
canal.instance.filter.dml.delete=false
canal.instance.binlog.format=ROW,STATEMENT,MIXED 
canal.instance.binlog.image=FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation=false
canal.instance.parser.parallel=true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize=256
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
canal.instance.tsdb.snapshot.interval=24
canal.instance.tsdb.snapshot.expire=360
#################################################
#########               destinations            #############
#################################################
canal.destinations=example
canal.conf.dir=../conf
canal.auto.scan=true
canal.auto.scan.interval=5
canal.auto.reset.latest.pos.mode=false
canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
canal.instance.global.mode=spring
canal.instance.global.lazy=false
canal.instance.global.manager.address=${canal.admin.manager}
canal.instance.global.spring.xml=classpath:spring/file-instance.xml
##################################################
#########             MQ Properties      #############
##################################################
canal.aliyun.accessKey=
canal.aliyun.secretKey=
canal.aliyun.uid=
canal.mq.flatMessage=true
canal.mq.canalBatchSize=50
canal.mq.canalGetTimeout=100
canal.mq.accessChannel=local
canal.mq.database.hash=true
canal.mq.send.thread.size=30
canal.mq.build.thread.size=8
##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers=127.0.0.1:9092
kafka.acks=all
kafka.compression.type=none
kafka.batch.size=16384
kafka.linger.ms=1
kafka.max.request.size=1048576
kafka.buffer.memory=33554432
kafka.max.in.flight.requests.per.connection=1
kafka.retries=0
kafka.kerberos.enable=false
kafka.kerberos.krb5.file=../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file=../conf/kerberos/jaas.conf
##################################################
#########                   RocketMQ         #############
##################################################
rocketmq.producer.group=test
rocketmq.enable.message.trace=false
rocketmq.customized.trace.topic=
rocketmq.namespace=
rocketmq.namesrv.addr=127.0.0.1:9876
rocketmq.retry.times.when.send.failed=0
rocketmq.vip.channel.enabled=false
rocketmq.tag=
##################################################
#########                   RabbitMQ         #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=
##################################################
#########                     Pulsar         #############
##################################################
pulsarmq.serverUrl=
pulsarmq.roleToken=
pulsarmq.topicTenantPrefix=

3.2 instance.properties 完整文件

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# position info
canal.instance.master.address=mysql:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# multi stream for polardbx
canal.instance.multi.stream.on=false# ssl
#canal.instance.master.sslMode=DISABLED
#canal.instance.master.tlsVersions=
#canal.instance.master.trustCertificateKeyStoreType=
#canal.instance.master.trustCertificateKeyStoreUrl=
#canal.instance.master.trustCertificateKeyStorePassword=
#canal.instance.master.clientCertificateKeyStoreType=
#canal.instance.master.clientCertificateKeyStoreUrl=
#canal.instance.master.clientCertificateKeyStorePassword=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=admin123!@#
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=canal-routing-key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

3.3 检查配置是否与宿主机一致

进入容器内部:docker exec -it canal bash

检查配置文件内容是否与宿主机一致:

  • cat /home/admin/canal-server/conf/canal.properties
  • cat /home/admin/canal-server/conf/example/instance.properties

3.4 开启相关端口防火墙配置

  • canal:11110、11111、11112
  • mysql:3306
  • redis:6379
  • RabbitMQ: 15672、5672
    在这里插入图片描述

四、代码实现

4.1 相关pom依赖引入

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent from repository -->
</parent><!-- Spring Boot MQ 依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency><!-- canal -->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>

4.2 完整pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.neo</groupId><artifactId>code-repository</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent from repository --></parent><name>code-repository</name><properties><java.version>17</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><hutool.version>5.8.20</hutool.version><mysql.version>8.0.30</mysql.version><mybatis-plus.version>3.5.3.1</mybatis-plus.version><redis.version>3.1.0</redis.version><druid.version>1.2.16</druid.version><fastjson.version>1.2.83</fastjson.version><sa-token.version>1.37.0</sa-token.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>${druid.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version></dependency><dependency><groupId>cn.dev33</groupId><artifactId>sa-token-spring-boot-starter</artifactId><version>${sa-token.version}</version></dependency><dependency><groupId>cn.dev33</groupId><artifactId>sa-token-redis-jackson</artifactId><version>${sa-token.version}</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

4.3 application.yml 配置

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlatedpublisher-returns: true

4.4 完整application.yml配置

server:port: 8088servlet:context-path: /apispring:datasource:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/db_v1?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: rootdruid:initial-size: 5min-idle: 5max-active: 20max-wait: 60000validation-query: SELECT 1test-while-idle: truestat-view-servlet:enabled: trueurl-pattern: /druid/*login-username: adminlogin-password: admin123web-stat-filter:enabled: trueurl-pattern: /*exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"filter:stat:enabled: truelog-slow-sql: trueslow-sql-millis: 1000wall:enabled: trueconfig:drop-table-allow: falseredis:host: localhostport: 6379password: 123456database: 0timeout: 5000lettuce:pool:max-active: 8max-wait: -1max-idle: 8min-idle: 0mail:host: smtp.aliyun.comusername: password: port: 25properties:mail:smtp:auth: truestarttls:enable: truerequired: truerabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlatedpublisher-returns: truemybatis-plus:mapper-locations: classpath*:mapper/*_Mapper.xmlglobal-config:db-config:logic-delete-field: delFlaglogic-delete-value: 1logic-not-delete-value: 0configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

4.5 RabbitConstants 基础常量配置

public interface RabbitConstants {interface Canal {String QUEUE = "canal-queue";String EXCHANGE = "canal-exchange";String ROUTING = "canal-routing-key";}interface EventType {String INSERT = "INSERT";String UPDATE = "UPDATE";String DELETE = "DELETE";}}

4.6 CanalMqConfigure MQ队列交换机配置

@Configuration
public class CanalMqConfigure {@Beanpublic Queue queue() {return new Queue(RabbitConstants.Canal.QUEUE, true);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(RabbitConstants.Canal.EXCHANGE, true, false);}@Beanpublic Binding bindingCanal() {return BindingBuilder.bind(queue()).to(directExchange()).with(RabbitConstants.Canal.ROUTING);}
}

4.7 CanalConsumer 消费者

package com.neo.core.canal;import com.neo.core.constant.RabbitConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Map;@Slf4j
@Component
@RabbitListener(queues = RabbitConstants.Canal.QUEUE)
public class CanalConsumer {@RabbitHandlerpublic void execute(Map<String, Object> msg) {log.info("canal消息监听事件触发,消息内容:{}", msg);boolean isDdl = (boolean) msg.get("isDdl");if (isDdl) {return;}String database = (String) msg.get("database");String table = (String) msg.get("table");String type = (String) msg.get("type");List<?> data = (List<?>) msg.get("data");log.info("database:{}.table:{}", database, table);if (RabbitConstants.EventType.INSERT.equalsIgnoreCase(type)) {System.out.println("INSERT");} else if (RabbitConstants.EventType.UPDATE.equalsIgnoreCase(type)) {System.out.println("UPDATE");} else if (RabbitConstants.EventType.DELETE.equalsIgnoreCase(type)) {System.out.println("DELETE");} else {// 其他事件}}
}

五、运行与测试

当MySQL数据出现变动后,会触发canal-queue的监听事件,后续可根据具体业务逻辑实现业务处理。

25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test1, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596660000, id=5, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596660932, type=DELETE}
25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - database:db_v1.table:sys_user
DELETE
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596663000, id=6, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596664038, type=INSERT}
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - database:db_v1.table:sys_user
INSERT
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - canal消息监听事件触发,消息内容:{data=[{id=2, account=test1, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596668000, id=7, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=[{account=test}], pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596668545, type=UPDATE}
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - database:db_v1.table:sys_user
UPDATE

以上是RabbitMQ+Canal数据一致性的完整解决方案,包括环境配置、代码实现以及运行测试等环节,确保了数据在不同系统间的一致性和可靠性。

相关文章:

Spring Boot+RabbitMQ+Canal 解决数据一致性

目录大纲 一、环境配置1.1 docker-compose.yml 配置1.2 docker-compose 常用命令1.3 镜像服务启动状态 二、MySQL binlog 配置2.1 docker-compose command 配置 binlog2.2 创建canal用户&#xff0c;以及查看是否开启binlog 三、canal 相关配置文件3.1 canal.properties 完整文…...

Java高频面试之集合-08

hello啊&#xff0c;各位观众姥爷们&#xff01;&#xff01;&#xff01;本baby今天来报道了&#xff01;哈哈哈哈哈嗝&#x1f436; 面试官&#xff1a;详细说说CopyOnWriteArrayList CopyOnWriteArrayList 详解 CopyOnWriteArrayList 是 Java 并发包&#xff08;java.util…...

C#实现高性能异步文件下载器(支持进度显示/断点续传)

一、应用场景分析 异步文件下载器用处很大&#xff0c;当我们需要实现以下功能时可以用的上&#xff1a; 大文件下载&#xff08;如4K视频/安装包&#xff09; 避免UI线程阻塞&#xff0c;保证界面流畅响应多任务并行下载 支持同时下载多个文件&#xff0c;提升带宽利用率后台…...

【数据分析】转录组基因表达的KEGG通路富集分析教程

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍差异分析(limma)KEGG富集分析(enrichKEGG)可视化加载R包数据下载导入数据基因差异分析火山图KEGG通路富集分析可视化通路结果另一个案例总结系统信息参考介绍 KEGG富集分析,可…...

【由技及道】API契约的量子纠缠术:响应封装的十一维通信协议(全局的返回结果封装)【人工智障AI2077的开发日志012】

摘要&#xff1a;在API通信的量子混沌中&#xff0c;30种返回格式如同平行宇宙的物理定律相互碰撞。本文构建的十一维通信协议&#xff0c;通过时空锚点&#xff08;ApiResult&#xff09;、量子过滤器&#xff08;ResponseWrapper&#xff09;和湮灭防护罩&#xff08;Jackson…...

STM32 ——系统架构

3个被动单元 SRAM 存储程序运行时用到的变量 Flash&#xff08;内部闪存存储器&#xff09; 存储下载的程序 程序执行时用到的常量 桥接1和桥接2 AHB到APB的桥&#xff08;AHBtoAPBx) 桥1 通过APB2总线连接到APB2上的外设。 高速外设&#xff0c;最高72MHz。 桥2 通过…...

算法 之 树形dp 树的中心、重心

文章目录 重心实践题目小红的陡峭值 在树的算法中&#xff0c;求解树的中心和重心是一类十分重要的算法 求解树的重心 树的重心的定义&#xff1a;重心是树中的一个节点&#xff0c;如果将这个点删除后&#xff0c;剩余各个连通块中点数的最大值最小&#xff0c;那么这个节点…...

如何利用 Excel 表格实现精准文件批量重命名教程

在处理大量文件时&#xff0c;有时需要根据特定规则对文件名进行调整。如果您的文件名和新名称之间存在一对多的关系&#xff0c;并且这种关系可以通过 Excel 表格来管理&#xff0c;那么使用“简鹿文件批量重命名”软件中的“匹配对应名称命名”功能将是一个高效的选择。接下来…...

ACE协议学习1

在多核系统或复杂SoC&#xff08;System on Chip&#xff09;中&#xff0c;不同处理器核心或IP&#xff08;Intellectual Property&#xff09;模块之间需要保持数据的一致性。常用的是ACE协议or CHI。 先对ACE协议进行学习 ACE协议&#xff08;Advanced Microcontroller Bu…...

【实战ES】实战 Elasticsearch:快速上手与深度实践-5.1.1热点分片识别与均衡策略

&#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 文章大纲 5.1.1 Filebeat Logstash ES Kibana 全链路配置实1. 架构设计与组件选型1.1 技术栈对比分析1.2 硬件配置推荐 2. Filebeat 高级配置2.1 多输入源配置2.2 性能优化参数 3.…...

Kubernetes 的正式安装

1.基础的网络结构说明 软件路由器 ikuai 当然同一个仅主机模式 相当于在 同一个我们所谓的广播域内 所以相当于它们的几张网卡 是被连接起来的 为了防止出现问题 我们可以把第二块网卡临时关闭一下 2.准备路由器 ikuai 爱快 iKuai-商业场景网络解决方案提供商 (ikuai8.com)…...

初阶数据结构(C语言实现)——4.2队列

目录 2.队列2.1队列的概念及结构2.2队列的实现2.2.1 初始化队列2.2.2 销毁队列2.2.3 队尾入队列2.2.4 队头出队列2.2.5获取队列头部元素2.2.6 获取队列队尾元素2.2.7获取队列中有效元素个数2.2.8 检测队列是否为空&#xff0c;如果为空返回非零结果&#xff0c;如果非空返回0 3…...

Mysql主从复制和Mysql高可用以及负载均衡配置

需要先配置MySQL主从复制&#xff0c;然后再在主MySQL服务器上配置MySQL Router。以下是详细说明和步骤&#xff1a; 1. 为什么需要先配置MySQL主从复制&#xff1f; MySQL主从复制是MySQL高可用性和负载均衡的基础&#xff0c;通过将数据从主服务器实时同步到从服务器&#…...

c#中使用时间戳转换器

在C#中,时间戳转换器通常用于将时间戳(通常是一个表示自某一特定时间点(如1970年1月1日UTC)以来的毫秒数的长整型值)转换为DateTime对象,或者将DateTime对象转换回时间戳。以下是几种实现这一功能的方法: 1. 使用DateTime的构造函数 将时间戳转换为DateTime long tim…...

杂项知识笔记搜集

1.pygame pygame可以画出来图形界面&#xff0c;pygame Python仓库 PyGame游戏编程_游戏程序设计csdn-CSDN博客 2.V4L2库 V4L2是Linux上的Camera采集器的框架 Video for Linux &#xff0c;是从Linux2.1版本开始支持的。HDMI视频采集卡采集到的视频通过USB3.0输出&#xff0…...

rust语言match模式匹配涉及转移所有权Error Case

struct S{data:String, }//注意&#xff1a;因为String默认是移动语义&#xff0c;从而决定结构体S也是移动语义&#xff0c;可采用(1)或(2)两种方法解决编译错误&#xff1b;关键思路&#xff1a;放弃获取结构体S的字段data的所有权&#xff0c;改为借用。fn process(s_ref:&a…...

golang从入门到做牛马:第十一篇-Go语言变量作用域:变量的“生活圈”

在Go语言中,变量的作用域决定了它在程序中的可见性和生命周期。理解变量的作用域对于编写清晰、高效的代码至关重要。Go语言中的变量可以在三个主要地方声明:函数内、函数外和函数定义中。接下来,让我们深入探讨局部变量、全局变量和形式参数的作用域。 局部变量:函数内的“…...

【Linux】37.网络版本计算器

文章目录 1. Log.hpp-日志记录器2. Daemon.hpp-守护进程工具3. Protocol.hpp-通信协议解析器4. ServerCal.hpp-计算器服务处理器5. Socket.hpp-Socket通信封装类6. TcpServer.hpp-TCP服务器框架7. ClientCal.cc-计算器客户端8. ServerCal.cc-计算器服务器9. 代码时序1. 服务器启…...

linux安装Mariadb10.5并修改端口

首先配置yum源 进入下方的文件进行配置 vim /etc/yum.repos.d/MariaDB.repo填写下方内容 [mariadb]name MariaDBbaseurl https:///mirrors.aliyun.com/mariadb/yum/10.5/centos8-amd64/gpgkeyhttps:///mirrors.aliyun.com/mariadb/yum/RPM-GPG-KEY-MariaDBmodule_hotfixes…...

从Windows到ARM Linux:Qt程序的交叉编译与移植指南

引言 在嵌入式开发中&#xff0c;我们经常需要将桌面端开发的Qt程序部署到ARM架构的Linux设备。本文详细介绍如何将Windows平台开发的Qt程序&#xff0c;通过Linux虚拟机交叉编译为ARM架构可执行文件的完整过程 环境准备 需要特别注意的是&#xff0c;对于CentOS 7 默认支持…...

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

UE5 学习系列(三)创建和移动物体

这篇博客是该系列的第三篇&#xff0c;是在之前两篇博客的基础上展开&#xff0c;主要介绍如何在操作界面中创建和拖动物体&#xff0c;这篇博客跟随的视频链接如下&#xff1a; B 站视频&#xff1a;s03-创建和移动物体 如果你不打算开之前的博客并且对UE5 比较熟的话按照以…...

云原生玩法三问:构建自定义开发环境

云原生玩法三问&#xff1a;构建自定义开发环境 引言 临时运维一个古董项目&#xff0c;无文档&#xff0c;无环境&#xff0c;无交接人&#xff0c;俗称三无。 运行设备的环境老&#xff0c;本地环境版本高&#xff0c;ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

Python 包管理器 uv 介绍

Python 包管理器 uv 全面介绍 uv 是由 Astral&#xff08;热门工具 Ruff 的开发者&#xff09;推出的下一代高性能 Python 包管理器和构建工具&#xff0c;用 Rust 编写。它旨在解决传统工具&#xff08;如 pip、virtualenv、pip-tools&#xff09;的性能瓶颈&#xff0c;同时…...

JAVA后端开发——多租户

数据隔离是多租户系统中的核心概念&#xff0c;确保一个租户&#xff08;在这个系统中可能是一个公司或一个独立的客户&#xff09;的数据对其他租户是不可见的。在 RuoYi 框架&#xff08;您当前项目所使用的基础框架&#xff09;中&#xff0c;这通常是通过在数据表中增加一个…...

【JVM面试篇】高频八股汇总——类加载和类加载器

目录 1. 讲一下类加载过程&#xff1f; 2. Java创建对象的过程&#xff1f; 3. 对象的生命周期&#xff1f; 4. 类加载器有哪些&#xff1f; 5. 双亲委派模型的作用&#xff08;好处&#xff09;&#xff1f; 6. 讲一下类的加载和双亲委派原则&#xff1f; 7. 双亲委派模…...

比较数据迁移后MySQL数据库和OceanBase数据仓库中的表

设计一个MySQL数据库和OceanBase数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较…...

五子棋测试用例

一.项目背景 1.1 项目简介 传统棋类文化的推广 五子棋是一种古老的棋类游戏&#xff0c;有着深厚的文化底蕴。通过将五子棋制作成网页游戏&#xff0c;可以让更多的人了解和接触到这一传统棋类文化。无论是国内还是国外的玩家&#xff0c;都可以通过网页五子棋感受到东方棋类…...

论文阅读:Matting by Generation

今天介绍一篇关于 matting 抠图的文章&#xff0c;抠图也算是计算机视觉里面非常经典的一个任务了。从早期的经典算法到如今的深度学习算法&#xff0c;已经有很多的工作和这个任务相关。这两年 diffusion 模型很火&#xff0c;大家又开始用 diffusion 模型做各种 CV 任务了&am…...

《Offer来了:Java面试核心知识点精讲》大纲

文章目录 一、《Offer来了:Java面试核心知识点精讲》的典型大纲框架Java基础并发编程JVM原理数据库与缓存分布式架构系统设计二、《Offer来了:Java面试核心知识点精讲(原理篇)》技术文章大纲核心主题:Java基础原理与面试高频考点Java虚拟机(JVM)原理Java并发编程原理Jav…...