实时同步:使用 Canal 和 Kafka 解决 MySQL 与缓存的数据一致性问题
目录
1. 准备工作
2. 将需要缓存的数据存储 Redis
3. 监听 canal 存储在 Kafka Topic 中数据
1. 准备工作
1. 开启并配置MySQL的 BinLog(MySQL 8.0 默认开启)

修改配置:C:\ProgramData\MySQL\MySQL Server 8.0\my.ini
log-bin="HELONG-bin"
binlog_format=ROW # 只能配置行模式, 因为 Cannal 不具备将SQL转化成数据的能力
binlog-do-db=aicloud # 监控 AI Cloud 项目
如果要同步多个项目:
binlog-do-db=aicloud
binlog-do-db=aicloud2
binlog-do-db=aicloud3
2. 重启MySQL服务

3. 赋值数据同步权限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
4. 安装并配置 Canal
下载地址:https://github.com/alibaba/canal/releases
① 修改canal.properties
canal.serverMode=kafka
canal.mq.servers=127.0.0.1:9092
canal 监控 binlog 日志,binlog 日志的传输默认使用 MySQL 的复制协议(基于 TCP/IP),
可以使用写代码的方式直接从 MySQL 服务器读取数据,此处使用本地 kafka 进行存储。
② 修改instance.properties
canal.instance.mysql.slaveId=100 # 大于 1 即可
canal.instance.master.address=127.0.0.1:3306
canal.mq.topic=ai-cloud-canal-to-kafka
slaveId 表示从节点 id,canal 的执行原理就是伪装成一个从库去主库同步数据
(主节点的 slaveId = 1)
address 配置连接本地的 MySQL
topic 配置数据发送到 Kafka 的某个主题下
5. 拷贝 Jar 包到 lib
将 canal 下 plugin 下的所有 jar 包拷贝到 lib 目录下。
6. 删除 bin 目录下 startup.bat 里的参数
如果启动时报错:
Unrecognized VM option 'PermSize=128m'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
删除 -XX:PermSize=128m 参数即可。
7. 启动 canal
打开 cmd ,cd 到 bin 目录下,输入 startup.bat 回车
2. 将需要缓存的数据存储 Redis

此时我将这个查询列表接口的数据,存储在 Redis 中:
/*** 获取历史聊天记录(对话/绘图)** @param type* @return {@link ResponseEntity }*/
@RequestMapping("/list")
public ResponseEntity getHistoryList(Integer type, Integer model) {String listCacheKey = RedisUtil.getListCacheKey(SecurityUtil.getCurrentUser().getUid(), model, type);Object list = redisTemplate.opsForValue().get(listCacheKey);if (ObjectUtil.isNull(list)) {LambdaQueryWrapper<Answer> queryWrapper = new LambdaQueryWrapper<>();queryWrapper.eq(Answer::getUid, SecurityUtil.getCurrentUser().getUid());queryWrapper.eq(Answer::getType, type);queryWrapper.eq(Answer::getModel, model);queryWrapper.orderByDesc(Answer::getAid);List<Answer> answerList = answerService.list(queryWrapper);List<Long> userIds = answerList.stream().map(Answer::getUid).collect(Collectors.toList());Map<Long, User> userIdMap = userService.selectByIds(userIds).stream().collect(Collectors.toMap(User::getUid, Function.identity()));List<AnswerVo> answerVoList = answerList.stream().map(answer -> AnswerVoUtil.getListAnswerVo(answer, userIdMap)).collect(Collectors.toList());// 缓存 1 天redisTemplate.opsForValue().set(listCacheKey, answerVoList, 1, TimeUnit.DAYS);return ResponseEntity.success(answerVoList);} else {return ResponseEntity.success(list);}
}
/*** 查询列表存储 Redis 缓存** @param uid* @param model* @param type* @return {@link String }*/
public static String getListCacheKey(Long uid, Integer model, Integer type) {return "LIST_CACHE_KEY_" + uid + "_" + model + "_" + type;
}
3. 监听 Kafka Topic 中数据并删除 Redis 缓存
首先对数据库中需要缓存的数据进行一些修改操作:

此时,使用 kafka ui(下载地址划到最底下),刷新 kafka 对应 topic 下的 message,就可以看到当前所作出的修改:

执行修改操作:将 “如何学习Spring???”修改成 “如何学习Spring??”

执行删除操作:

由此可见,对数据库的每一个修改操作,都是对应固定格式的一个数据,所以可以监听对应的 topic 并针对 data 中的数据进行一个提取,得到一个 cacheKey,然后删除对应的缓存,使得下一次的查询去访问数据库,并同步缓存。
【代码示例】
/*** canal 监控 binlog 日志,将修改的数据存储 kafka topic 中* 监听 kafka topic 中的数据** @param data* @param ack* @throws JsonProcessingException*/
@KafkaListener(topics = {KafkaConstant.CANAL_TOPIC})
public void canalListen(String data, Acknowledgment ack) throws JsonProcessingException {HashMap<String, Object> map = objectMapper.readValue(data, HashMap.class);if (map.isEmpty()) {ack.acknowledge();return;}// 匹配上对应的数据库和数据表if (KafkaConstant.TARGET_DATABASE.equals(map.get(KafkaConstant.DATABASE_KEY).toString()) &&KafkaConstant.TARGET_TABLE.equals(map.get(KafkaConstant.TABLE_KEY).toString())) {// 更新缓存 List<Map<String, Object>> list = (List<Map<String, Object>>) map.get(KafkaConstant.DATA_KEY);if (!CollectionUtils.isEmpty(list)) {for (Map<String, Object> answerMap : list) {String answerListCacheKey = RedisUtil.getListCacheKey(Long.valueOf(answerMap.get("uid").toString()),Integer.parseInt(answerMap.get("model").toString()),Integer.parseInt(answerMap.get("type").toString()));// 删除缓存,让下一次查询走数据库,并同步缓存redisTemplate.delete(answerListCacheKey);}}}// 手动确认应答ack.acknowledge();
}
/*** canal 同步数据到 kafka*/
public static final String CANAL_TOPIC = "ai-cloud-canal-to-kafka";/*** 数据库,缓存数据一致性的*/public static final String DATABASE_KEY = "database";public static final String TABLE_KEY = "table";public static final String DATA_KEY = "data";public static final String TARGET_DATABASE = "aicloud";public static final String TARGET_TABLE = "answer";
【补充】
kafka ui 下载地址:https://github.com/provectus/kafka-ui/tags
修改配置

kafka:clusters:- name: kafka3_clusterbootstrapServers: 127.0.0.1:9092
相关文章:
实时同步:使用 Canal 和 Kafka 解决 MySQL 与缓存的数据一致性问题
目录 1. 准备工作 2. 将需要缓存的数据存储 Redis 3. 监听 canal 存储在 Kafka Topic 中数据 1. 准备工作 1. 开启并配置MySQL的 BinLog(MySQL 8.0 默认开启) 修改配置:C:\ProgramData\MySQL\MySQL Server 8.0\my.ini log-bin"HELO…...
WINUI——Microsoft.UI.Xaml.Markup.XamlParseException:“无法找到与此错误代码关联的文本。
开发环境 VS2022 .net core6 问题现象 在Canvas内的子控件要绑定Canvas的兄弟控件的一个属性,在运行时出现了下述报错。 可能原因 在 WinUI(特别是用于 UWP 或 Windows App SDK 的版本)中,如果你尝试在 XAML 中将 Canvas 内的…...
C语言 | Leetcode C语言题解之第283题移动零
题目: 题解: void swap(int *a, int *b) {int t *a;*a *b, *b t; }void moveZeroes(int *nums, int numsSize) {int left 0, right 0;while (right < numsSize) {if (nums[right]) {swap(nums left, nums right);left;}right;} }...
WPF项目实战视频《二》(主要为prism框架)
14.prism框架知识(1) 使用在多个平台的MVVM框架 新建WPF项目prismDemo 项目中:工具-NuGet包管理:安装Prism.DryIoc框架 在git中能看Prism的结构和源代码:git链接地址 例如:Prism/src/Wpf/Prism.DryIoc.Wpf…...
【微信小程序实战教程】之微信小程序 WXS 语法详解
WXS语法 WXS是微信小程序的一套脚本语言,其特性包括:模块、变量、注释、运算符、语句、数据类型、基础类库等。在本章我们主要介绍WXS语言的特性与基本用法,以及 WXS 与 JavaScript 之间的不同之处。 1 WXS介绍 在微信小程序中,…...
Android中Service学习记录
目录 一 概述二 生命周期2.1 启动服务startService()2.2 绑定服务bindService()2.3 先启动后绑定2.4 先绑定后启动 三 使用3.1 本地服务(启动式)3.2 可通信的服务(绑定式)3.3 前台服务3.4 IntentService 总结参考 一 概述 Servic…...
Elasticsearch:Java ECS 日志记录 - log4j2
ECS 记录器是你最喜欢的日志库的格式化程序/编码器插件。它们可让你轻松将日志格式化为与 ECS 兼容的 JSON。ECS 兼容的 JSON 日志记录可以帮我们简化很多分析,可视化及解析的工作。在今天的文章里,我来详述如何在 Java 应用里生成 ECS 相兼容的日志。 …...
MongoDB自学笔记(四)
一、前文回顾 上一篇文章中我们学习了MongoDB中的更新方法,也学了一部分操作符。今天我们将学习最后一个操作“删除”。 二、删除 原始数据如下: 1、deleteOne 语法:db.collection.deleteOne(< query >,< options >) 具体参…...
时序分解 | Matlab基于CEEMDAN-CPO-VMD的CEEMDAN结合冠豪猪优化算法(CPO)优化VMD二次分解
时序分解 | Matlab基于CEEMDAN-CPO-VMD的CEEMDAN结合冠豪猪优化算法(CPO)优化VMD二次分解 目录 时序分解 | Matlab基于CEEMDAN-CPO-VMD的CEEMDAN结合冠豪猪优化算法(CPO)优化VMD二次分解效果一览基本介绍程序设计参考资料 效果一览…...
新版海螺影视主题模板M3.1全解密版本多功能苹果CMSv10后台自适应主题
苹果CMS2022新版海螺影视主题M3.1版本,这个主题我挺喜欢的,之前也有朋友给我提供过原版主题,一直想要破解但是后来找了几个SG11解密的大哥都表示解密需要大几百大洋,所以一直被搁置了。这个版本是完全解密的,无需SG11加…...
汽车免拆诊断案例 | 2014 款上汽名爵 GT 车发动机无法起动
故障现象 一辆2014款上汽名爵GT车,搭载15S4G发动机,累计行驶里程约为18.4万km。该车因左前部发生碰撞事故进厂维修,更换损坏的部件后起动发动机,起动机运转有力,但无着机迹象。用故障检测仪检测,发现无法与…...
vue3前端开发-小兔鲜项目-登录功能的业务接口调用
vue3前端开发-小兔鲜项目-登录功能的业务接口调用!这次,正式调用远程服务器的登录接口了。大家要必须使用测试账号密码,才能验证我们的代码。 测试账号密码是:账号(xiaotuxian001);密码是(1234…...
【Linux】vim编辑器使用详解
目录 一、vim编辑器简介二、 vim编辑器使用指南1.基本操作1.进入与退出2.模式切换 2.命令模式1.移动光标2.选择文本(可视模式)3.删除文字4.复制粘贴5.替换6.撤销7.注释8.多文件窗口切换 3.底行模式1.列出每行的行号2.跳转到某行3.查找字符4.保存文件5.在…...
手机怎么设置不同的ip地址
在数字化日益深入的今天,智能手机已成为我们生活、工作和学习中不可或缺的设备。然而,随着网络应用的广泛和深入,我们有时需要为手机设置不同的IP地址来满足特定需求。比如,避免网络限制、提高网络安全、或者进行网络测试等。本文…...
SpringBoot读取配置的6种方式
在SpringBoot应用开发中,配置文件是不可或缺的一部分。它们帮助我们管理应用的运行时参数,使得应用的部署和维护变得更加灵活。SpringBoot提供了多种方式来读取配置文件,每种方式都有其适用场景和优缺点。本文将介绍六种常用的SpringBoot读取…...
1.1 openCv -- 介绍
OpenCV(开放源代码计算机视觉库:http://opencv.org)是一个开源库,包含了数百种计算机视觉算法。本文件描述了所谓的OpenCV 2.x API,这是一个本质上基于C++的API,与基于C的OpenCV 1.x API(C API已被弃用,并且自从OpenCV 2.4版本起不再使用“C”编译器进行测试)相对。 …...
探索PostgreSQL的GUI工具:提升数据库管理效率
在当今快速发展的技术世界中,数据库管理是任何软件开发项目的核心部分。PostgreSQL,作为一款功能强大的开源关系型数据库管理系统,因其稳定性、可靠性和高度的可扩展性而广受开发者和数据库管理员的青睐。然而,尽管PostgreSQL自带…...
【从零开始实现stm32无刷电机FOC】【实践】【5/7 stm32 adc外设的高级用法】
目录 采样时刻触发采样同步采样 点击查看本文开源的完整FOC工程 本节介绍的adc外设高级用法用于电机电流控制。 从前面几节可知,电机力矩来自于转子的q轴受磁力,而磁场强度与电流成正比,也就是说电机力矩与q轴电流成正相关,控制了…...
springcloud接入seata管理分布式事务
下载安装包 链接: seata 配置seata-server 文件上传Linux解压 压缩包我放在/usr/local/seata中 tar -zxvf seata-server-2.0.0.tar.gz修改配置文件 设置nacos为注册和配置中心 进入文件夹 cd /usr/local/seata/seata/conf修改application.yml文件 ...... ...... cons…...
Android APP 音视频(02)MediaProjection录屏与MediaCodec编码
说明: 此MediaProjection 录屏和编码实操主要针对Android12.0系统。通过MediaProjection获取屏幕数据,将数据通过mediacodec编码输出H264码流(使用ffmpeg播放),存储到sd卡上。 1 MediaProjection录屏与编码简介 这里…...
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...
Matlab | matlab常用命令总结
常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...
什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
Golang——7、包与接口详解
包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...
Python 训练营打卡 Day 47
注意力热力图可视化 在day 46代码的基础上,对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...
消息队列系统设计与实践全解析
文章目录 🚀 消息队列系统设计与实践全解析🔍 一、消息队列选型1.1 业务场景匹配矩阵1.2 吞吐量/延迟/可靠性权衡💡 权衡决策框架 1.3 运维复杂度评估🔧 运维成本降低策略 🏗️ 二、典型架构设计2.1 分布式事务最终一致…...
Ubuntu 可执行程序自启动方法
使用 autostart(适用于桌面环境) 适用于 GNOME/KDE 桌面环境(如 Ubuntu 图形界面) 1. 创建 .desktop 文件 sudo vi ~/.config/autostart/my_laser.desktop[Desktop Entry] TypeApplication NameMy Laser Program Execbash -c &…...
如何让非 TCP/IP 协议驱动屏蔽 IPv4/IPv6 和 ARP 报文?
——从硬件过滤到协议栈隔离的完整指南 引言 在现代网络开发中,许多场景需要定制化网络协议(如工业控制、高性能计算),此时需确保驱动仅处理特定协议,避免被标准协议(如 IPv4/IPv6/ARP)干扰。本文基于 Linux 内核驱动的实现,探讨如何通过硬件过滤、驱动层拦截和协议栈…...
OCC笔记:TDF_Label中有多个相同类型属性
注:OCCT版本:7.9.1 TDF_Label中有多个相同类型的属性的方案 OCAF imposes the restriction that only one attribute type may be allocated to one label. It is necessary to take into account the design of the application data tree. For exampl…...
