当前位置: 首页 > news >正文

Spring Boot+RabbitMQ+Canal 解决数据一致性

目录大纲

    • 一、环境配置
      • 1.1 docker-compose.yml 配置
      • 1.2 docker-compose 常用命令
      • 1.3 镜像服务启动状态
    • 二、MySQL binlog 配置
      • 2.1 docker-compose command 配置 binlog
      • 2.2 创建canal用户,以及查看是否开启binlog
    • 三、canal 相关配置文件
      • 3.1 canal.properties 完整文件
      • 3.2 instance.properties 完整文件
      • 3.3 检查配置是否与宿主机一致
      • 3.4 开启相关端口防火墙配置
    • 四、代码实现
      • 4.1 相关pom依赖引入
      • 4.2 完整pom.xml
      • 4.3 application.yml 配置
      • 4.4 完整application.yml配置
      • 4.5 RabbitConstants 基础常量配置
      • 4.6 CanalMqConfigure MQ队列交换机配置
      • 4.7 CanalConsumer 消费者
    • 五、运行与测试

一、环境配置

1.1 docker-compose.yml 配置

version: '3.8'services:redis:container_name: redisimage: redis:6.2.7restart: alwaysnetworks:- app_netports:- "6379:6379"volumes:- /usr/local/docker/redis/data:/data- /usr/local/docker/redis/config/redis.conf:/usr/local/redis/config/redis.conf- /usr/local/docker/redis/logs:/logscommand: [ "redis-server","/usr/local/redis/config/redis.conf" ]mysql:container_name: mysqlimage: mysql:8.0.30restart: alwaysnetworks:- app_netports:- "3306:3306"volumes:- /usr/local/docker/mysql/data:/var/lib/mysql- /usr/local/docker/mysql/config:/etc/mysql/conf.denvironment:MYSQL_ROOT_PASSWORD: rootTZ: Asia/Shanghaicommand:--default-authentication-plugin=mysql_native_password--character-set-server=utf8mb4--collation-server=utf8mb4_general_ci--explicit_defaults_for_timestamp=true--lower_case_table_names=1--log-bin=/var/lib/mysql/mysql-bin--server-id=1--binlog-format=ROW--expire_logs_days=7--max_binlog_size=500Mcanal:image: canal/canal-server:v1.1.5container_name: canalrestart: alwaysports:- 11110:11110- 11111:11111- 11112:11112volumes:- /usr/local/docker/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties- /usr/local/docker/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties- /usr/local/docker/canal/logs:/home/admin/canal-server/logsnetworks:- app_netdepends_on:- mysql- rabbitmqrabbitmq:image: rabbitmq:3-managementcontainer_name: rabbitmqrestart: alwaysports:- "5672:5672"- "15672:15672"volumes:- /usr/local/docker/rabbitmq/data/:/var/lib/rabbitmq/- /usr/local/docker/rabbitmq/log/:/var/log/rabbitmq/environment:- RABBITMQ_DEFAULT_USER=guest- RABBITMQ_DEFAULT_PASS=guestnetworks:- app_netnetworks:app_net:driver: bridge

1.2 docker-compose 常用命令

# 后台启动容器编排文件
docker-compose up -d [service]# 停止up命令所启动的容器,并移除网络
docker-compose down# 进入指定容器
docker-compose exec [service]# 列出项目中所有的容器
docker-compose ps [service]# 重启项目中容器
docker-compose restart [service]# 删除项目中所有容器
docker-compose rm -f [service]# 启动项目中容器(或指定容器)
docker-compose start [service]# 暂停项目中容器(或指定容器)
docker-compose stop [service]

1.3 镜像服务启动状态

[root@lavm-13jmyj9ugf docker]# docker ps -a
CONTAINER ID   IMAGE                           COMMAND                  CREATED          STATUS          PORTS                                                                                                                                                 NAMES
0d4260bc557b   canal/canal-server:v1.1.5       "/alidata/bin/main.s…"   38 minutes ago   Up 38 minutes   9100/tcp, 0.0.0.0:11110-11112->11110-11112/tcp, :::11110-11112->11110-11112/tcp                                                                       canal
c66b3f1f13a9   mysql:8.0.30                    "docker-entrypoint.s…"   38 minutes ago   Up 38 minutes   0.0.0.0:3306->3306/tcp, :::3306->3306/tcp, 33060/tcp                                                                                                  mysql
645e27bd4001   rabbitmq:3-management           "docker-entrypoint.s…"   5 hours ago      Up 49 minutes   4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp   rabbitmq
f55d42cbbd8e   redis:6.2.7                     "docker-entrypoint.s…"   3 days ago       Up 49 minutes   0.0.0.0:6379->6379/tcp, :::6379->6379/tcp                                                                                                             redis

二、MySQL binlog 配置

2.1 docker-compose command 配置 binlog

--log-bin=/var/lib/mysql/mysql-bin
--server-id=1
--binlog-format=ROW
--expire_logs_days=7
--max_binlog_size=500M

2.2 创建canal用户,以及查看是否开启binlog

mysql> CREATE USER canal IDENTIFIED BY 'canal';  
Query OK, 0 rows affected (0.05 sec)mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.05 sec)mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.05 sec)mysql> select * from mysql.user where User = 'canal';
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| Host | User  | Select_priv | Insert_priv | Update_priv | Delete_priv | Create_priv | Drop_priv | Reload_priv | Shutdown_priv | Process_priv | File_priv | Grant_priv | References_priv | Index_priv | Alter_priv | Show_db_priv | Super_priv | Create_tmp_table_priv | Lock_tables_priv | Execute_priv | Repl_slave_priv | Repl_client_priv | Create_view_priv | Show_view_priv | Create_routine_priv | Alter_routine_priv | Create_user_priv | Event_priv | Trigger_priv | Create_tablespace_priv | ssl_type | ssl_cipher | x509_issuer | x509_subject | max_questions | max_updates | max_connections | max_user_connections | plugin                | authentication_string                     | password_expired | password_last_changed | password_lifetime | account_locked | Create_role_priv | Drop_role_priv | Password_reuse_history | Password_reuse_time | Password_require_current | User_attributes |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| %    | canal | Y           | N           | N           | N           | N           | N         | N           | N             | N            | N         | N          | N               | N          | N          | N            | N          | N                     | N                | N            | Y               | Y                | N                | N              | N                   | N                  | N                | N          | N            | N                      |          |            |             |              |             0 |           0 |               0 |                    0 | mysql_native_password | *E3619321C1A937C46A0D8BD1DAC39F93B27D4458 | N                |   2025-03-10 11:53:49 | NULL              | N              | N                | N              | NULL                   | NULL                | NULL                     | NULL            |
+------+-------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------+-------------+--------------+---------------+-------------+-----------------+----------------------+-----------------------+-------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
1 row in set (0.08 sec)mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.30 sec)

三、canal 相关配置文件

canal.properties 主要核心配置

canal.serverMode=rabbitMQ:选择 RabbitMQ 作为通知服务模型。
rabbitmq.host=rabbitmq:基于 Docker 同一网络下,可以使用容器名称代替 host。
rabbitmq.queuerabbitmq.routingKeyrabbitmq.exchange : RabbitMQ 的三件套,用于后续创建具体通道监听。

#################################################
#########               common argument         #############
#################################################
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 支持的服务模型,tcp直连或mq,此处我选择RabbitMQ
canal.serverMode=rabbitMQ##################################################
#########                   RabbitMQ         #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=

instance.properties 主要核心配置
canal.instance.master.address 数据库地址
canal.instance.dbUsername 数据库用户名
canal.instance.dbPassword 数据库密码
canal.mq.topic RabbitMQ路由

# 数据地址,此处mysql,是因为canal和mysql是同一network下,可以使用容器名称代替具体ip
canal.instance.master.address=mysql:3306# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root# mq config
canal.mq.topic=canal-routing-key

3.1 canal.properties 完整文件

#################################################
#########               common argument         #############
#################################################
canal.ip=
canal.register.ip=
canal.port=11111
canal.metrics.pull.port=11112
canal.admin.port=11110
canal.admin.user=admin
canal.admin.passwd=
canal.zkServers=
canal.zookeeper.flush.period=1000
canal.withoutNetty=false
canal.serverMode=rabbitMQ
canal.file.data.dir=${canal.conf.dir}
canal.file.flush.period=1000
canal.instance.memory.buffer.size=16384
canal.instance.memory.buffer.memunit=1024 
canal.instance.memory.batch.mode=MEMSIZE
canal.instance.memory.rawEntry=true
canal.instance.detecting.enable=false
canal.instance.detecting.sql=select 1
canal.instance.detecting.interval.time=3
canal.instance.detecting.retry.threshold=3
canal.instance.detecting.heartbeatHaEnable=false
canal.instance.transaction.size=1024
canal.instance.fallbackIntervalInSeconds=60
canal.instance.network.receiveBufferSize=16384
canal.instance.network.sendBufferSize=16384
canal.instance.network.soTimeout=30
canal.instance.filter.druid.ddl=true
canal.instance.filter.query.dcl=false
canal.instance.filter.query.dml=false
canal.instance.filter.query.ddl=false
canal.instance.filter.table.error=false
canal.instance.filter.rows=false
canal.instance.filter.transaction.entry=false
canal.instance.filter.dml.insert=false
canal.instance.filter.dml.update=false
canal.instance.filter.dml.delete=false
canal.instance.binlog.format=ROW,STATEMENT,MIXED 
canal.instance.binlog.image=FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation=false
canal.instance.parser.parallel=true
canal.instance.parser.parallelThreadSize = 16
canal.instance.parser.parallelBufferSize=256
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
canal.instance.tsdb.snapshot.interval=24
canal.instance.tsdb.snapshot.expire=360
#################################################
#########               destinations            #############
#################################################
canal.destinations=example
canal.conf.dir=../conf
canal.auto.scan=true
canal.auto.scan.interval=5
canal.auto.reset.latest.pos.mode=false
canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
canal.instance.global.mode=spring
canal.instance.global.lazy=false
canal.instance.global.manager.address=${canal.admin.manager}
canal.instance.global.spring.xml=classpath:spring/file-instance.xml
##################################################
#########             MQ Properties      #############
##################################################
canal.aliyun.accessKey=
canal.aliyun.secretKey=
canal.aliyun.uid=
canal.mq.flatMessage=true
canal.mq.canalBatchSize=50
canal.mq.canalGetTimeout=100
canal.mq.accessChannel=local
canal.mq.database.hash=true
canal.mq.send.thread.size=30
canal.mq.build.thread.size=8
##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers=127.0.0.1:9092
kafka.acks=all
kafka.compression.type=none
kafka.batch.size=16384
kafka.linger.ms=1
kafka.max.request.size=1048576
kafka.buffer.memory=33554432
kafka.max.in.flight.requests.per.connection=1
kafka.retries=0
kafka.kerberos.enable=false
kafka.kerberos.krb5.file=../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file=../conf/kerberos/jaas.conf
##################################################
#########                   RocketMQ         #############
##################################################
rocketmq.producer.group=test
rocketmq.enable.message.trace=false
rocketmq.customized.trace.topic=
rocketmq.namespace=
rocketmq.namesrv.addr=127.0.0.1:9876
rocketmq.retry.times.when.send.failed=0
rocketmq.vip.channel.enabled=false
rocketmq.tag=
##################################################
#########                   RabbitMQ         #############
##################################################
rabbitmq.host=rabbitmq
rabbitmq.virtual.host=/
rabbitmq.exchange=canal-exchange
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.queue=canal-queue
rabbitmq.routingKey=canal-routing-key
rabbitmq.deliveryMode=
##################################################
#########                     Pulsar         #############
##################################################
pulsarmq.serverUrl=
pulsarmq.roleToken=
pulsarmq.topicTenantPrefix=

3.2 instance.properties 完整文件

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# position info
canal.instance.master.address=mysql:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# multi stream for polardbx
canal.instance.multi.stream.on=false# ssl
#canal.instance.master.sslMode=DISABLED
#canal.instance.master.tlsVersions=
#canal.instance.master.trustCertificateKeyStoreType=
#canal.instance.master.trustCertificateKeyStoreUrl=
#canal.instance.master.trustCertificateKeyStorePassword=
#canal.instance.master.clientCertificateKeyStoreType=
#canal.instance.master.clientCertificateKeyStoreUrl=
#canal.instance.master.clientCertificateKeyStorePassword=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=admin123!@#
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
canal.mq.topic=canal-routing-key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

3.3 检查配置是否与宿主机一致

进入容器内部:docker exec -it canal bash

检查配置文件内容是否与宿主机一致:

  • cat /home/admin/canal-server/conf/canal.properties
  • cat /home/admin/canal-server/conf/example/instance.properties

3.4 开启相关端口防火墙配置

  • canal:11110、11111、11112
  • mysql:3306
  • redis:6379
  • RabbitMQ: 15672、5672
    在这里插入图片描述

四、代码实现

4.1 相关pom依赖引入

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent from repository -->
</parent><!-- Spring Boot MQ 依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency><!-- canal -->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>

4.2 完整pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.neo</groupId><artifactId>code-repository</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent from repository --></parent><name>code-repository</name><properties><java.version>17</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><hutool.version>5.8.20</hutool.version><mysql.version>8.0.30</mysql.version><mybatis-plus.version>3.5.3.1</mybatis-plus.version><redis.version>3.1.0</redis.version><druid.version>1.2.16</druid.version><fastjson.version>1.2.83</fastjson.version><sa-token.version>1.37.0</sa-token.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis-plus.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>${druid.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>${hutool.version}</version></dependency><dependency><groupId>cn.dev33</groupId><artifactId>sa-token-spring-boot-starter</artifactId><version>${sa-token.version}</version></dependency><dependency><groupId>cn.dev33</groupId><artifactId>sa-token-redis-jackson</artifactId><version>${sa-token.version}</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

4.3 application.yml 配置

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlatedpublisher-returns: true

4.4 完整application.yml配置

server:port: 8088servlet:context-path: /apispring:datasource:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/db_v1?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: rootdruid:initial-size: 5min-idle: 5max-active: 20max-wait: 60000validation-query: SELECT 1test-while-idle: truestat-view-servlet:enabled: trueurl-pattern: /druid/*login-username: adminlogin-password: admin123web-stat-filter:enabled: trueurl-pattern: /*exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*"filter:stat:enabled: truelog-slow-sql: trueslow-sql-millis: 1000wall:enabled: trueconfig:drop-table-allow: falseredis:host: localhostport: 6379password: 123456database: 0timeout: 5000lettuce:pool:max-active: 8max-wait: -1max-idle: 8min-idle: 0mail:host: smtp.aliyun.comusername: password: port: 25properties:mail:smtp:auth: truestarttls:enable: truerequired: truerabbitmq:host: localhostport: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlatedpublisher-returns: truemybatis-plus:mapper-locations: classpath*:mapper/*_Mapper.xmlglobal-config:db-config:logic-delete-field: delFlaglogic-delete-value: 1logic-not-delete-value: 0configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

4.5 RabbitConstants 基础常量配置

public interface RabbitConstants {interface Canal {String QUEUE = "canal-queue";String EXCHANGE = "canal-exchange";String ROUTING = "canal-routing-key";}interface EventType {String INSERT = "INSERT";String UPDATE = "UPDATE";String DELETE = "DELETE";}}

4.6 CanalMqConfigure MQ队列交换机配置

@Configuration
public class CanalMqConfigure {@Beanpublic Queue queue() {return new Queue(RabbitConstants.Canal.QUEUE, true);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(RabbitConstants.Canal.EXCHANGE, true, false);}@Beanpublic Binding bindingCanal() {return BindingBuilder.bind(queue()).to(directExchange()).with(RabbitConstants.Canal.ROUTING);}
}

4.7 CanalConsumer 消费者

package com.neo.core.canal;import com.neo.core.constant.RabbitConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Map;@Slf4j
@Component
@RabbitListener(queues = RabbitConstants.Canal.QUEUE)
public class CanalConsumer {@RabbitHandlerpublic void execute(Map<String, Object> msg) {log.info("canal消息监听事件触发,消息内容:{}", msg);boolean isDdl = (boolean) msg.get("isDdl");if (isDdl) {return;}String database = (String) msg.get("database");String table = (String) msg.get("table");String type = (String) msg.get("type");List<?> data = (List<?>) msg.get("data");log.info("database:{}.table:{}", database, table);if (RabbitConstants.EventType.INSERT.equalsIgnoreCase(type)) {System.out.println("INSERT");} else if (RabbitConstants.EventType.UPDATE.equalsIgnoreCase(type)) {System.out.println("UPDATE");} else if (RabbitConstants.EventType.DELETE.equalsIgnoreCase(type)) {System.out.println("DELETE");} else {// 其他事件}}
}

五、运行与测试

当MySQL数据出现变动后,会触发canal-queue的监听事件,后续可根据具体业务逻辑实现业务处理。

25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test1, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596660000, id=5, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596660932, type=DELETE}
25-03-10.16:51:28.431 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - database:db_v1.table:sys_user
DELETE
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - canal消息监听事件触发,消息内容:{data=[{id=2, account=test, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596663000, id=6, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=null, pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596664038, type=INSERT}
25-03-10.16:51:31.523 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - database:db_v1.table:sys_user
INSERT
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - canal消息监听事件触发,消息内容:{data=[{id=2, account=test1, password=9c79685ab5ca920a187d94688d6f1845, salt=7c2b6b0caf0b4174816e2a4bf5f05cba, email=test@qq.com, nick_name=test, enabled=1, create_by=0, create_time=2025-03-08 22:22:00, update_by=0, update_time=2025-03-08 22:22:01, del_flag=0}], database=db_v1, es=1741596668000, id=7, isDdl=false, mysqlType={id=bigint, account=varchar(128), password=varchar(128), salt=varchar(128), email=varchar(64), nick_name=varchar(32), enabled=tinyint(1), create_by=bigint, create_time=datetime, update_by=bigint, update_time=datetime, del_flag=tinyint(1)}, old=[{account=test}], pkNames=[id], sql=, sqlType={id=-5, account=12, password=12, salt=12, email=12, nick_name=12, enabled=-6, create_by=-5, create_time=93, update_by=-5, update_time=93, del_flag=-6}, table=sys_user, ts=1741596668545, type=UPDATE}
25-03-10.16:51:36.030 [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  CanalConsumer          - database:db_v1.table:sys_user
UPDATE

以上是RabbitMQ+Canal数据一致性的完整解决方案,包括环境配置、代码实现以及运行测试等环节,确保了数据在不同系统间的一致性和可靠性。

相关文章:

Spring Boot+RabbitMQ+Canal 解决数据一致性

目录大纲 一、环境配置1.1 docker-compose.yml 配置1.2 docker-compose 常用命令1.3 镜像服务启动状态 二、MySQL binlog 配置2.1 docker-compose command 配置 binlog2.2 创建canal用户&#xff0c;以及查看是否开启binlog 三、canal 相关配置文件3.1 canal.properties 完整文…...

Java高频面试之集合-08

hello啊&#xff0c;各位观众姥爷们&#xff01;&#xff01;&#xff01;本baby今天来报道了&#xff01;哈哈哈哈哈嗝&#x1f436; 面试官&#xff1a;详细说说CopyOnWriteArrayList CopyOnWriteArrayList 详解 CopyOnWriteArrayList 是 Java 并发包&#xff08;java.util…...

C#实现高性能异步文件下载器(支持进度显示/断点续传)

一、应用场景分析 异步文件下载器用处很大&#xff0c;当我们需要实现以下功能时可以用的上&#xff1a; 大文件下载&#xff08;如4K视频/安装包&#xff09; 避免UI线程阻塞&#xff0c;保证界面流畅响应多任务并行下载 支持同时下载多个文件&#xff0c;提升带宽利用率后台…...

【数据分析】转录组基因表达的KEGG通路富集分析教程

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍差异分析(limma)KEGG富集分析(enrichKEGG)可视化加载R包数据下载导入数据基因差异分析火山图KEGG通路富集分析可视化通路结果另一个案例总结系统信息参考介绍 KEGG富集分析,可…...

【由技及道】API契约的量子纠缠术:响应封装的十一维通信协议(全局的返回结果封装)【人工智障AI2077的开发日志012】

摘要&#xff1a;在API通信的量子混沌中&#xff0c;30种返回格式如同平行宇宙的物理定律相互碰撞。本文构建的十一维通信协议&#xff0c;通过时空锚点&#xff08;ApiResult&#xff09;、量子过滤器&#xff08;ResponseWrapper&#xff09;和湮灭防护罩&#xff08;Jackson…...

STM32 ——系统架构

3个被动单元 SRAM 存储程序运行时用到的变量 Flash&#xff08;内部闪存存储器&#xff09; 存储下载的程序 程序执行时用到的常量 桥接1和桥接2 AHB到APB的桥&#xff08;AHBtoAPBx) 桥1 通过APB2总线连接到APB2上的外设。 高速外设&#xff0c;最高72MHz。 桥2 通过…...

算法 之 树形dp 树的中心、重心

文章目录 重心实践题目小红的陡峭值 在树的算法中&#xff0c;求解树的中心和重心是一类十分重要的算法 求解树的重心 树的重心的定义&#xff1a;重心是树中的一个节点&#xff0c;如果将这个点删除后&#xff0c;剩余各个连通块中点数的最大值最小&#xff0c;那么这个节点…...

如何利用 Excel 表格实现精准文件批量重命名教程

在处理大量文件时&#xff0c;有时需要根据特定规则对文件名进行调整。如果您的文件名和新名称之间存在一对多的关系&#xff0c;并且这种关系可以通过 Excel 表格来管理&#xff0c;那么使用“简鹿文件批量重命名”软件中的“匹配对应名称命名”功能将是一个高效的选择。接下来…...

ACE协议学习1

在多核系统或复杂SoC&#xff08;System on Chip&#xff09;中&#xff0c;不同处理器核心或IP&#xff08;Intellectual Property&#xff09;模块之间需要保持数据的一致性。常用的是ACE协议or CHI。 先对ACE协议进行学习 ACE协议&#xff08;Advanced Microcontroller Bu…...

【实战ES】实战 Elasticsearch:快速上手与深度实践-5.1.1热点分片识别与均衡策略

&#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 文章大纲 5.1.1 Filebeat Logstash ES Kibana 全链路配置实1. 架构设计与组件选型1.1 技术栈对比分析1.2 硬件配置推荐 2. Filebeat 高级配置2.1 多输入源配置2.2 性能优化参数 3.…...

Kubernetes 的正式安装

1.基础的网络结构说明 软件路由器 ikuai 当然同一个仅主机模式 相当于在 同一个我们所谓的广播域内 所以相当于它们的几张网卡 是被连接起来的 为了防止出现问题 我们可以把第二块网卡临时关闭一下 2.准备路由器 ikuai 爱快 iKuai-商业场景网络解决方案提供商 (ikuai8.com)…...

初阶数据结构(C语言实现)——4.2队列

目录 2.队列2.1队列的概念及结构2.2队列的实现2.2.1 初始化队列2.2.2 销毁队列2.2.3 队尾入队列2.2.4 队头出队列2.2.5获取队列头部元素2.2.6 获取队列队尾元素2.2.7获取队列中有效元素个数2.2.8 检测队列是否为空&#xff0c;如果为空返回非零结果&#xff0c;如果非空返回0 3…...

Mysql主从复制和Mysql高可用以及负载均衡配置

需要先配置MySQL主从复制&#xff0c;然后再在主MySQL服务器上配置MySQL Router。以下是详细说明和步骤&#xff1a; 1. 为什么需要先配置MySQL主从复制&#xff1f; MySQL主从复制是MySQL高可用性和负载均衡的基础&#xff0c;通过将数据从主服务器实时同步到从服务器&#…...

c#中使用时间戳转换器

在C#中,时间戳转换器通常用于将时间戳(通常是一个表示自某一特定时间点(如1970年1月1日UTC)以来的毫秒数的长整型值)转换为DateTime对象,或者将DateTime对象转换回时间戳。以下是几种实现这一功能的方法: 1. 使用DateTime的构造函数 将时间戳转换为DateTime long tim…...

杂项知识笔记搜集

1.pygame pygame可以画出来图形界面&#xff0c;pygame Python仓库 PyGame游戏编程_游戏程序设计csdn-CSDN博客 2.V4L2库 V4L2是Linux上的Camera采集器的框架 Video for Linux &#xff0c;是从Linux2.1版本开始支持的。HDMI视频采集卡采集到的视频通过USB3.0输出&#xff0…...

rust语言match模式匹配涉及转移所有权Error Case

struct S{data:String, }//注意&#xff1a;因为String默认是移动语义&#xff0c;从而决定结构体S也是移动语义&#xff0c;可采用(1)或(2)两种方法解决编译错误&#xff1b;关键思路&#xff1a;放弃获取结构体S的字段data的所有权&#xff0c;改为借用。fn process(s_ref:&a…...

golang从入门到做牛马:第十一篇-Go语言变量作用域:变量的“生活圈”

在Go语言中,变量的作用域决定了它在程序中的可见性和生命周期。理解变量的作用域对于编写清晰、高效的代码至关重要。Go语言中的变量可以在三个主要地方声明:函数内、函数外和函数定义中。接下来,让我们深入探讨局部变量、全局变量和形式参数的作用域。 局部变量:函数内的“…...

【Linux】37.网络版本计算器

文章目录 1. Log.hpp-日志记录器2. Daemon.hpp-守护进程工具3. Protocol.hpp-通信协议解析器4. ServerCal.hpp-计算器服务处理器5. Socket.hpp-Socket通信封装类6. TcpServer.hpp-TCP服务器框架7. ClientCal.cc-计算器客户端8. ServerCal.cc-计算器服务器9. 代码时序1. 服务器启…...

linux安装Mariadb10.5并修改端口

首先配置yum源 进入下方的文件进行配置 vim /etc/yum.repos.d/MariaDB.repo填写下方内容 [mariadb]name MariaDBbaseurl https:///mirrors.aliyun.com/mariadb/yum/10.5/centos8-amd64/gpgkeyhttps:///mirrors.aliyun.com/mariadb/yum/RPM-GPG-KEY-MariaDBmodule_hotfixes…...

从Windows到ARM Linux:Qt程序的交叉编译与移植指南

引言 在嵌入式开发中&#xff0c;我们经常需要将桌面端开发的Qt程序部署到ARM架构的Linux设备。本文详细介绍如何将Windows平台开发的Qt程序&#xff0c;通过Linux虚拟机交叉编译为ARM架构可执行文件的完整过程 环境准备 需要特别注意的是&#xff0c;对于CentOS 7 默认支持…...

【紧急预警】Mojo nightly build已悄然移除PyModule::import() API!立即备份旧版+迁移至PyO3 0.21+手动GC管理方案(附自动化迁移脚本)

第一章&#xff1a;【紧急预警】Mojo nightly build已悄然移除PyModule::import() API&#xff01;立即备份旧版迁移至PyO3 0.21手动GC管理方案&#xff08;附自动化迁移脚本&#xff09;Mojo nightly build v2024.06.12 起&#xff0c;PyModule::import() 已被彻底移除&#x…...

OpenClaw+GLM-4.7-Flash:个人网络安全监控助手

OpenClawGLM-4.7-Flash&#xff1a;个人网络安全监控助手 1. 为什么需要个人网络安全监控 去年我的开发机遭遇了一次恶意脚本攻击&#xff0c;导致本地Git仓库被篡改。事后排查发现&#xff0c;攻击者通过一个陈旧的SSH密钥漏洞入侵&#xff0c;而系统日志里其实早有异常登录…...

安全第一:OpenClaw+GLM-4.7-Flash的本地化数据处理方案

安全第一&#xff1a;OpenClawGLM-4.7-Flash的本地化数据处理方案 1. 为什么我们需要本地化AI解决方案 上个月我帮一位律师朋友处理合同审查任务时&#xff0c;遇到了一个棘手问题——他需要分析上百份涉及商业机密的文件&#xff0c;但担心使用云端AI服务会导致数据泄露。这…...

科哥CAM++镜像入门指南:快速搭建中文语音识别系统

CAM镜像入门指南&#xff1a;快速搭建中文语音识别系统 1. 系统概述 CAM说话人识别系统是一个基于深度学习的声纹识别工具&#xff0c;由科哥封装为易用的Docker镜像。它能快速判断两段语音是否来自同一说话人&#xff0c;并提取语音特征向量&#xff0c;适用于身份验证、语音…...

别再被‘小样本’难倒了!用Python的PyMC3库实战层次贝叶斯模型

用PyMC3解锁小样本分析&#xff1a;层次贝叶斯建模实战指南 当你的数据集像便利店冰柜里的酸奶——每个品类只有零星几瓶时&#xff0c;传统统计方法往往会束手无策。想象你正分析20个城市的新开门店周销售额&#xff0c;每个城市却只有3-5条数据记录。这时&#xff0c;层次贝叶…...

如何高效迁移至WeFriends:微信好友关系管理工具全新升级指南

如何高效迁移至WeFriends&#xff1a;微信好友关系管理工具全新升级指南 【免费下载链接】WechatRealFriends 微信好友关系一键检测&#xff0c;基于微信ipad协议&#xff0c;看看有没有朋友偷偷删掉或者拉黑你 项目地址: https://gitcode.com/gh_mirrors/we/WechatRealFrien…...

HTML2Canvas终极指南:快速将网页内容转为精美图片的完整方案

HTML2Canvas终极指南&#xff1a;快速将网页内容转为精美图片的完整方案 【免费下载链接】html2canvas Screenshots with JavaScript 项目地址: https://gitcode.com/gh_mirrors/ht/html2canvas HTML2Canvas是一款强大的JavaScript库&#xff0c;能够直接在浏览器中把网…...

6_Harness驾驭工程可靠性层:混沌工程与服务可靠性管理

6_Harness驾驭工程可靠性层&#xff1a;混沌工程与服务可靠性管理 关键字&#xff1a; Chaos Engineering、混沌工程、SRM、服务可靠性管理、SLI、SLO、错误预算、韧性评分、故障模拟、事件响应、事后分析、韧性验证、自动故障注入、最小爆炸半径、Datadog、New Relic、Prometh…...

OpenClaw会议纪要大师:Qwen3-32B实时转录飞书语音会议

OpenClaw会议纪要大师&#xff1a;Qwen3-32B实时转录飞书语音会议 1. 为什么需要自动化会议纪要 每次开完会最头疼的就是整理会议纪要。作为团队的技术负责人&#xff0c;我每周要参加至少8场跨部门会议&#xff0c;传统的手动记录方式让我苦不堪言——要么记录不全重点&…...

CTFHub | 解密MySQL、Redis、MongoDB流量中的隐藏Flag

1. 数据库流量分析入门&#xff1a;为什么需要Wireshark&#xff1f; 当你参加CTF比赛时&#xff0c;经常会遇到需要从数据库流量中寻找Flag的题目。这类题目通常会给你一个抓包文件&#xff08;pcap格式&#xff09;&#xff0c;里面记录了MySQL、Redis或MongoDB等数据库的网络…...