当前位置: 首页 > news >正文

实时同步:使用 Canal 和 Kafka 解决 MySQL 与缓存的数据一致性问题

目录

1. 准备工作

2. 将需要缓存的数据存储 Redis

3. 监听 canal 存储在 Kafka Topic 中数据


1. 准备工作

1. 开启并配置MySQL的 BinLog(MySQL 8.0 默认开启)

修改配置:C:\ProgramData\MySQL\MySQL Server 8.0\my.ini

log-bin="HELONG-bin"
binlog_format=ROW     # 只能配置行模式, 因为 Cannal 不具备将SQL转化成数据的能力
binlog-do-db=aicloud    # 监控 AI Cloud 项目

如果要同步多个项目:

binlog-do-db=aicloud
binlog-do-db=aicloud2
binlog-do-db=aicloud3

2. 重启MySQL服务

3. 赋值数据同步权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

4. 安装并配置 Canal

下载地址:https://github.com/alibaba/canal/releases

① 修改canal.properties

canal.serverMode=kafka
canal.mq.servers=127.0.0.1:9092

canal 监控 binlog 日志,binlog 日志的传输默认使用 MySQL 的复制协议(基于 TCP/IP),

可以使用写代码的方式直接从 MySQL 服务器读取数据,此处使用本地 kafka 进行存储。

② 修改instance.properties

canal.instance.mysql.slaveId=100   # 大于 1 即可
canal.instance.master.address=127.0.0.1:3306
canal.mq.topic=ai-cloud-canal-to-kafka

slaveId 表示从节点 id,canal 的执行原理就是伪装成一个从库去主库同步数据

(主节点的 slaveId = 1)

address 配置连接本地的 MySQL

topic 配置数据发送到 Kafka 的某个主题下

5. 拷贝 Jar 包到 lib

将 canal 下 plugin 下的所有 jar 包拷贝到 lib 目录下。

6. 删除 bin 目录下 startup.bat 里的参数

如果启动时报错:

Unrecognized VM option 'PermSize=128m'

Error: Could not create the Java Virtual Machine.

Error: A fatal exception has occurred. Program will exit.

删除 -XX:PermSize=128m 参数即可。

7. 启动 canal

打开 cmd ,cd 到 bin 目录下,输入 startup.bat 回车

2. 将需要缓存的数据存储 Redis

此时我将这个查询列表接口的数据,存储在 Redis 中:

/*** 获取历史聊天记录(对话/绘图)** @param type* @return {@link ResponseEntity }*/
@RequestMapping("/list")
public ResponseEntity getHistoryList(Integer type, Integer model) {String listCacheKey = RedisUtil.getListCacheKey(SecurityUtil.getCurrentUser().getUid(), model, type);Object list = redisTemplate.opsForValue().get(listCacheKey);if (ObjectUtil.isNull(list)) {LambdaQueryWrapper<Answer> queryWrapper = new LambdaQueryWrapper<>();queryWrapper.eq(Answer::getUid, SecurityUtil.getCurrentUser().getUid());queryWrapper.eq(Answer::getType, type);queryWrapper.eq(Answer::getModel, model);queryWrapper.orderByDesc(Answer::getAid);List<Answer> answerList = answerService.list(queryWrapper);List<Long> userIds = answerList.stream().map(Answer::getUid).collect(Collectors.toList());Map<Long, User> userIdMap = userService.selectByIds(userIds).stream().collect(Collectors.toMap(User::getUid, Function.identity()));List<AnswerVo> answerVoList = answerList.stream().map(answer -> AnswerVoUtil.getListAnswerVo(answer, userIdMap)).collect(Collectors.toList());// 缓存 1 天redisTemplate.opsForValue().set(listCacheKey, answerVoList, 1, TimeUnit.DAYS);return ResponseEntity.success(answerVoList);} else {return ResponseEntity.success(list);}
}
/*** 查询列表存储 Redis 缓存** @param uid* @param model* @param type* @return {@link String }*/
public static String getListCacheKey(Long uid, Integer model, Integer type) {return "LIST_CACHE_KEY_" + uid + "_" + model + "_" + type;
}

3. 监听 Kafka Topic 中数据并删除 Redis 缓存

首先对数据库中需要缓存的数据进行一些修改操作:

此时,使用 kafka ui(下载地址划到最底下),刷新 kafka 对应 topic 下的 message,就可以看到当前所作出的修改:

执行修改操作:将 “如何学习Spring???”修改成 “如何学习Spring??”

执行删除操作:

由此可见,对数据库的每一个修改操作,都是对应固定格式的一个数据,所以可以监听对应的  topic 并针对 data 中的数据进行一个提取,得到一个  cacheKey,然后删除对应的缓存,使得下一次的查询去访问数据库,并同步缓存。

【代码示例】

/*** canal 监控 binlog 日志,将修改的数据存储 kafka topic 中* 监听 kafka topic 中的数据** @param data* @param ack* @throws JsonProcessingException*/
@KafkaListener(topics = {KafkaConstant.CANAL_TOPIC})
public void canalListen(String data, Acknowledgment ack) throws JsonProcessingException {HashMap<String, Object> map = objectMapper.readValue(data, HashMap.class);if (map.isEmpty()) {ack.acknowledge();return;}// 匹配上对应的数据库和数据表if (KafkaConstant.TARGET_DATABASE.equals(map.get(KafkaConstant.DATABASE_KEY).toString()) &&KafkaConstant.TARGET_TABLE.equals(map.get(KafkaConstant.TABLE_KEY).toString())) {// 更新缓存 List<Map<String, Object>> list = (List<Map<String, Object>>) map.get(KafkaConstant.DATA_KEY);if (!CollectionUtils.isEmpty(list)) {for (Map<String, Object> answerMap : list) {String answerListCacheKey = RedisUtil.getListCacheKey(Long.valueOf(answerMap.get("uid").toString()),Integer.parseInt(answerMap.get("model").toString()),Integer.parseInt(answerMap.get("type").toString()));// 删除缓存,让下一次查询走数据库,并同步缓存redisTemplate.delete(answerListCacheKey);}}}//  手动确认应答ack.acknowledge();
}
/*** canal 同步数据到 kafka*/
public static final String CANAL_TOPIC = "ai-cloud-canal-to-kafka";/*** 数据库,缓存数据一致性的*/public static final String DATABASE_KEY = "database";public static final String TABLE_KEY = "table";public static final String DATA_KEY = "data";public static final String TARGET_DATABASE = "aicloud";public static final String TARGET_TABLE = "answer";

【补充】

kafka ui 下载地址:​​​​​​https://github.com/provectus/kafka-ui/tags

修改配置

kafka:clusters:- name: kafka3_clusterbootstrapServers: 127.0.0.1:9092

 

相关文章:

实时同步:使用 Canal 和 Kafka 解决 MySQL 与缓存的数据一致性问题

目录 1. 准备工作 2. 将需要缓存的数据存储 Redis 3. 监听 canal 存储在 Kafka Topic 中数据 1. 准备工作 1. 开启并配置MySQL的 BinLog&#xff08;MySQL 8.0 默认开启&#xff09; 修改配置&#xff1a;C:\ProgramData\MySQL\MySQL Server 8.0\my.ini log-bin"HELO…...

WINUI——Microsoft.UI.Xaml.Markup.XamlParseException:“无法找到与此错误代码关联的文本。

开发环境 VS2022 .net core6 问题现象 在Canvas内的子控件要绑定Canvas的兄弟控件的一个属性&#xff0c;在运行时出现了下述报错。 可能原因 在 WinUI&#xff08;特别是用于 UWP 或 Windows App SDK 的版本&#xff09;中&#xff0c;如果你尝试在 XAML 中将 Canvas 内的…...

C语言 | Leetcode C语言题解之第283题移动零

题目&#xff1a; 题解&#xff1a; void swap(int *a, int *b) {int t *a;*a *b, *b t; }void moveZeroes(int *nums, int numsSize) {int left 0, right 0;while (right < numsSize) {if (nums[right]) {swap(nums left, nums right);left;}right;} }...

WPF项目实战视频《二》(主要为prism框架)

14.prism框架知识&#xff08;1&#xff09; 使用在多个平台的MVVM框架 新建WPF项目prismDemo 项目中&#xff1a;工具-NuGet包管理&#xff1a;安装Prism.DryIoc框架 在git中能看Prism的结构和源代码&#xff1a;git链接地址 例如&#xff1a;Prism/src/Wpf/Prism.DryIoc.Wpf…...

【微信小程序实战教程】之微信小程序 WXS 语法详解

WXS语法 WXS是微信小程序的一套脚本语言&#xff0c;其特性包括&#xff1a;模块、变量、注释、运算符、语句、数据类型、基础类库等。在本章我们主要介绍WXS语言的特性与基本用法&#xff0c;以及 WXS 与 JavaScript 之间的不同之处。 1 WXS介绍 在微信小程序中&#xff0c…...

Android中Service学习记录

目录 一 概述二 生命周期2.1 启动服务startService()2.2 绑定服务bindService()2.3 先启动后绑定2.4 先绑定后启动 三 使用3.1 本地服务&#xff08;启动式&#xff09;3.2 可通信的服务&#xff08;绑定式&#xff09;3.3 前台服务3.4 IntentService 总结参考 一 概述 Servic…...

Elasticsearch:Java ECS 日志记录 - log4j2

ECS 记录器是你最喜欢的日志库的格式化程序/编码器插件。它们可让你轻松将日志格式化为与 ECS 兼容的 JSON。ECS 兼容的 JSON 日志记录可以帮我们简化很多分析&#xff0c;可视化及解析的工作。在今天的文章里&#xff0c;我来详述如何在 Java 应用里生成 ECS 相兼容的日志。 …...

MongoDB自学笔记(四)

一、前文回顾 上一篇文章中我们学习了MongoDB中的更新方法&#xff0c;也学了一部分操作符。今天我们将学习最后一个操作“删除”。 二、删除 原始数据如下&#xff1a; 1、deleteOne 语法&#xff1a;db.collection.deleteOne(< query >,< options >) 具体参…...

时序分解 | Matlab基于CEEMDAN-CPO-VMD的CEEMDAN结合冠豪猪优化算法(CPO)优化VMD二次分解

时序分解 | Matlab基于CEEMDAN-CPO-VMD的CEEMDAN结合冠豪猪优化算法&#xff08;CPO&#xff09;优化VMD二次分解 目录 时序分解 | Matlab基于CEEMDAN-CPO-VMD的CEEMDAN结合冠豪猪优化算法&#xff08;CPO&#xff09;优化VMD二次分解效果一览基本介绍程序设计参考资料 效果一览…...

新版海螺影视主题模板M3.1全解密版本多功能苹果CMSv10后台自适应主题

苹果CMS2022新版海螺影视主题M3.1版本&#xff0c;这个主题我挺喜欢的&#xff0c;之前也有朋友给我提供过原版主题&#xff0c;一直想要破解但是后来找了几个SG11解密的大哥都表示解密需要大几百大洋&#xff0c;所以一直被搁置了。这个版本是完全解密的&#xff0c;无需SG11加…...

汽车免拆诊断案例 | 2014 款上汽名爵 GT 车发动机无法起动

故障现象 一辆2014款上汽名爵GT车&#xff0c;搭载15S4G发动机&#xff0c;累计行驶里程约为18.4万km。该车因左前部发生碰撞事故进厂维修&#xff0c;更换损坏的部件后起动发动机&#xff0c;起动机运转有力&#xff0c;但无着机迹象。用故障检测仪检测&#xff0c;发现无法与…...

vue3前端开发-小兔鲜项目-登录功能的业务接口调用

vue3前端开发-小兔鲜项目-登录功能的业务接口调用!这次&#xff0c;正式调用远程服务器的登录接口了。大家要必须使用测试账号密码&#xff0c;才能验证我们的代码。 测试账号密码是&#xff1a;账号&#xff08;xiaotuxian001&#xff09;&#xff1b;密码是&#xff08;1234…...

【Linux】vim编辑器使用详解

目录 一、vim编辑器简介二、 vim编辑器使用指南1.基本操作1.进入与退出2.模式切换 2.命令模式1.移动光标2.选择文本&#xff08;可视模式&#xff09;3.删除文字4.复制粘贴5.替换6.撤销7.注释8.多文件窗口切换 3.底行模式1.列出每行的行号2.跳转到某行3.查找字符4.保存文件5.在…...

手机怎么设置不同的ip地址

在数字化日益深入的今天&#xff0c;智能手机已成为我们生活、工作和学习中不可或缺的设备。然而&#xff0c;随着网络应用的广泛和深入&#xff0c;我们有时需要为手机设置不同的IP地址来满足特定需求。比如&#xff0c;避免网络限制、提高网络安全、或者进行网络测试等。本文…...

SpringBoot读取配置的6种方式

在SpringBoot应用开发中&#xff0c;配置文件是不可或缺的一部分。它们帮助我们管理应用的运行时参数&#xff0c;使得应用的部署和维护变得更加灵活。SpringBoot提供了多种方式来读取配置文件&#xff0c;每种方式都有其适用场景和优缺点。本文将介绍六种常用的SpringBoot读取…...

1.1 openCv -- 介绍

OpenCV(开放源代码计算机视觉库:http://opencv.org)是一个开源库,包含了数百种计算机视觉算法。本文件描述了所谓的OpenCV 2.x API,这是一个本质上基于C++的API,与基于C的OpenCV 1.x API(C API已被弃用,并且自从OpenCV 2.4版本起不再使用“C”编译器进行测试)相对。 …...

探索PostgreSQL的GUI工具:提升数据库管理效率

在当今快速发展的技术世界中&#xff0c;数据库管理是任何软件开发项目的核心部分。PostgreSQL&#xff0c;作为一款功能强大的开源关系型数据库管理系统&#xff0c;因其稳定性、可靠性和高度的可扩展性而广受开发者和数据库管理员的青睐。然而&#xff0c;尽管PostgreSQL自带…...

【从零开始实现stm32无刷电机FOC】【实践】【5/7 stm32 adc外设的高级用法】

目录 采样时刻触发采样同步采样 点击查看本文开源的完整FOC工程 本节介绍的adc外设高级用法用于电机电流控制。 从前面几节可知&#xff0c;电机力矩来自于转子的q轴受磁力&#xff0c;而磁场强度与电流成正比&#xff0c;也就是说电机力矩与q轴电流成正相关&#xff0c;控制了…...

springcloud接入seata管理分布式事务

下载安装包 链接: seata 配置seata-server 文件上传Linux解压 压缩包我放在/usr/local/seata中 tar -zxvf seata-server-2.0.0.tar.gz修改配置文件 设置nacos为注册和配置中心 进入文件夹 cd /usr/local/seata/seata/conf修改application.yml文件 ...... ...... cons…...

Android APP 音视频(02)MediaProjection录屏与MediaCodec编码

说明&#xff1a; 此MediaProjection 录屏和编码实操主要针对Android12.0系统。通过MediaProjection获取屏幕数据&#xff0c;将数据通过mediacodec编码输出H264码流&#xff08;使用ffmpeg播放&#xff09;&#xff0c;存储到sd卡上。 1 MediaProjection录屏与编码简介 这里…...

(十)学生端搭建

本次旨在将之前的已完成的部分功能进行拼装到学生端&#xff0c;同时完善学生端的构建。本次工作主要包括&#xff1a; 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)

概述 在 Swift 开发语言中&#xff0c;各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过&#xff0c;在涉及到多个子类派生于基类进行多态模拟的场景下&#xff0c;…...

React19源码系列之 事件插件系统

事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中&#xff0c;提示一个依赖外部头文件的cpp源文件需要同步&#xff0c;点…...

重启Eureka集群中的节点,对已经注册的服务有什么影响

先看答案&#xff0c;如果正确地操作&#xff0c;重启Eureka集群中的节点&#xff0c;对已经注册的服务影响非常小&#xff0c;甚至可以做到无感知。 但如果操作不当&#xff0c;可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...

面向无人机海岸带生态系统监测的语义分割基准数据集

描述&#xff1a;海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而&#xff0c;目前该领域仍面临一个挑战&#xff0c;即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...

音视频——I2S 协议详解

I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议&#xff0c;专门用于在数字音频设备之间传输数字音频数据。它由飞利浦&#xff08;Philips&#xff09;公司开发&#xff0c;以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...

Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换

目录 关键点 技术实现1 技术实现2 摘要&#xff1a; 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式&#xff08;自动驾驶、人工驾驶、远程驾驶、主动安全&#xff09;&#xff0c;并通过实时消息推送更新车…...

C# 表达式和运算符(求值顺序)

求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如&#xff0c;已知表达式3*52&#xff0c;依照子表达式的求值顺序&#xff0c;有两种可能的结果&#xff0c;如图9-3所示。 如果乘法先执行&#xff0c;结果是17。如果5…...

TSN交换机正在重构工业网络,PROFINET和EtherCAT会被取代吗?

在工业自动化持续演进的今天&#xff0c;通信网络的角色正变得愈发关键。 2025年6月6日&#xff0c;为期三天的华南国际工业博览会在深圳国际会展中心&#xff08;宝安&#xff09;圆满落幕。作为国内工业通信领域的技术型企业&#xff0c;光路科技&#xff08;Fiberroad&…...