Canal+Kafka实现MySQL与Redis数据同步(一)
Canal+Kafka实现MySQL与Redis数据同步(一)
前言
在很多业务情况下,我们都会在系统中加入redis缓存做查询优化。
如果数据库数据发生更新,这时候就需要在业务代码中写一段同步更新redis的代码。
这种数据同步的代码跟业务代码糅合在一起会不太优雅,能不能把这些数据同步的代码抽出来形成一个独立的模块呢,答案是可以的。
架构图
canal是一个伪装成slave订阅mysql的binlog,实现数据同步的中间件。
canal最简单的使用方法,是tcp模式。
实际上canal是支持直接发送到MQ的,目前最新版是支持主流的三种MQ:Kafka、RocketMQ、RabbitMQ。而canal的RabbitMQ模式目前是有一定的bug,所以一般使用Kafka或者RocketMQ。
这里使用Kafka,实现Redis与MySQL的数据同步。架构图如下:
通过架构图,我们很清晰知道要用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。
搭建Kafka
首先在官网下载安装包:
解压,打开/config/server.properties配置文件,修改日志目录:
首先启动ZooKeeper,我用的是3.6.1版本:
接着再启动Kafka,在Kafka的bin目录下打开cmd,输入命令:
kafka-server-start.bat ../../config/server.properties
可以看到ZooKeeper上注册了Kafka相关的配置信息:
然后创建一个队列,用于接收canal传送过来的数据,使用命令:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopic
创建的队列名是canaltopic
。
配置Cannal Server
canal官网下载相关安装包:
找到canal.deployer-1.1.4/conf目录下的canal.properties配置文件:
# tcp, kafka, RocketMQ 这里选择kafka模式
canal.serverMode = kafka
# 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况
canal.instance.parser.parallelThreadSize = 16
# 配置MQ的服务地址,这里配置的是kafka对应的地址和端口
canal.mq.servers = 127.0.0.1:9092
# 配置instance,在conf目录下要有example同名的目录,可以配置多个
canal.destinations = example
然后配置instance,找到/conf/example/instance.properties配置文件:
## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置)
# canal.instance.mysql.slaveId=0
# position info
canal.instance.master.address=127.0.0.1:3306
# 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlog
canal.instance.master.journal.name=mysql-bin.000006
canal.instance.master.position=4596
# 账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@****
canal.instance.connectionCharset = UTF-8
#MQ队列名称
canal.mq.topic=canaltopic
#单队列模式的分区下标
canal.mq.partition=0
配置完成后,就可以启动canal了。
测试
这时可以打开kafka的消费者窗口,测试一下kafka是否收到消息。
使用命令进行监听消费:
kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --from-beginning --topic canaltopic
这里使用的是win10系统的cmd命令行,win10系统默认的编码是GBK,而Canal Server是UTF-8的编码,所以控制台会出现乱码:
在cmd命令行执行前切换到UTF-8编码即可,使用命令行:chcp 65001
然后再执行打开kafka消费端的命令,就不乱码了:
接下来就是启动Redis,把数据同步到Redis就完事了。
封装Redis客户端
环境搭建完成后,我们可以写代码了。
首先引入Kafka和Redis的maven依赖:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
在application.yml文件增加以下配置:
spring: redis:host: 127.0.0.1port: 6379database: 0password: 123456
封装一个操作Redis的工具类:
@Component
public class RedisClient {/*** 获取redis模版*/@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 设置redis的key-value*/public void setString(String key, String value) {setString(key, value, null);}/*** 设置redis的key-value,带过期时间*/public void setString(String key, String value, Long timeOut) {stringRedisTemplate.opsForValue().set(key, value);if (timeOut != null) {stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);}}/*** 获取redis中key对应的值*/public String getString(String key) {return stringRedisTemplate.opsForValue().get(key);}/*** 删除redis中key对应的值*/public Boolean deleteKey(String key) {return stringRedisTemplate.delete(key);}
}
相关文章:

Canal+Kafka实现MySQL与Redis数据同步(一)
CanalKafka实现MySQL与Redis数据同步(一) 前言 在很多业务情况下,我们都会在系统中加入redis缓存做查询优化。 如果数据库数据发生更新,这时候就需要在业务代码中写一段同步更新redis的代码。 这种数据同步的代码跟业务代码糅合…...
集合的运算
集合的运算 #include <stdio.h> #include <stdlib.h> void print(int size, char arr[]) {if (size 0) {printf("null");}for (int i 0; i < size; i) {printf("%c", arr[i]);}printf("\n"); } int main() {char U[] { a,b,c,…...
在MySQL上实现间隔5分钟汇总取数及相关字符串、时间处理方法实践
1. 实践案例需求描述 查询mysql数据库,按每5分钟分组获取3个小时内的电量数据,参考SQL语句如下。 select sd.RecordTime RecordTime, sd.sddl sddl,sd.pvdl ,cap.capdl capdl from ((SELECT CONCAT(DATE_FORMAT(RecordTime,%Y-%m-%d %H:), LPAD(floor(…...
什么是AIGC
1 定义 "AIGC"代表“人工智能生成内容”(Artificial Intelligence Generated Content),它指的是使用人工智能(AI)技术自动生成的内容,这些内容可以包括文本、图像、音乐、视频或其他多媒体形式。…...

〖大前端 - 基础入门三大核心之JS篇㊳〗- DOM访问元素节点
说明:该文属于 大前端全栈架构白宝书专栏,目前阶段免费,如需要项目实战或者是体系化资源,文末名片加V!作者:不渴望力量的哈士奇(哈哥),十余年工作经验, 从事过全栈研发、产品经理等工作…...

GitHub Universe 2023:AI 技术引领软件开发创新浪潮
GitHub 是全球领先的软件开发和协作平台,数百万开发者和企业在此分享、学习和创建卓越的软件。同时 GitHub 处在 AI 技术前沿,通过其先进的 AI 技术增强开发者体验并赋能未来软件开发的使命。在今天的文章中,我们将一起看看在 GitHub 年度大会…...

数据结构:红黑树的插入实现(C++)
个人主页 : 个人主页 个人专栏 : 《数据结构》 《C语言》《C》《Linux》 文章目录 一、红黑树二、红黑树的插入三、代码实现总结 一、红黑树 红黑树的概念: 红黑树是一颗二叉搜索树,但在每个节点上增加一个存储位表示节点的颜色&…...

飞天使-django之数据库简介
文章目录 增删改查解决数据库不能存储中文问题创建表数据类型表的基本操作主键唯一键 unique外键实战 增删改查 四个常用的语句查询 : insert delete update select insert into student(Sno,name) values(95001,"张三") delete from student where name张三 upda…...
Flink之KeyedState
前面的文章中介绍过Operator State,这里介绍一下Keyed State. 在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Ch…...

c语言:模拟实现qsort函数
qsort函数的功能: qsort相较于冒泡排序法,不仅效率更快,而且能够比较不同类型的元素,如:浮点数,结构体等等。这里我们来模拟下qsort是如何实现这一功能的,方便我们对指针数组有一个更深层次的理…...

从0开始学习数据结构 C语言实现 1.前篇及二分查找算法
一、前篇 1、什么是数据结构? 数据结构是带有结构特性的数据元素的集合,它研究的是数据的逻辑结构和数据的物理结构以及它们之间的相互关系 2、时间复杂度与空间复杂度 大O符号是用于描述函数渐进行为的数学符号 常用函数的增长表 阶乘O(n!) > 指数…...

VSCode 使用CMakePreset找不到cl.exe编译器的问题
在用vscode开发c项目的时候,使用预先配置的CMakePresets.json可以把一些特定的cmake选项固定下来,在配置时直接使用 "cmake --config --preset presetname"就可以进行配置,免去在命令行输入过多的配置参数。 但是在vscode中&#…...

【Linux系统化学习】进程的状态 | 僵尸进程 | 孤儿进程
个人主页点击直达:小白不是程序媛 Linux专栏:Linux系统化学习 目录 操作系统进程的状态 运行状态 阻塞状态 进程阻塞的现象 挂起阻塞状态 Linux进程状态 Linux内核源代码怎么说 R(running状态)运行状态 S(sl…...

深信服AC流量管理技术
拓扑图 一.保证通道针对修仙部,访问网站,邮件,DNS,IM,办工 OA,微博论坛网上银行等常见应用保证带宽最低 50%,最高 100% 1. 先新建线路带宽 2.新增流量管理通道(保证关键应用&#x…...
二元关系及关系代数中的象集、除运算
二元关系及关系代数中的象集、除运算 数学上,二元关系用于讨论两个数学对象的联系。诸如算术中的「大于」及「等于」,几何学中的"相似",或集合论中的"为...之元素"或"为...之子集"。二元关系有时会简称关系&a…...

[PHP]关联和操作MySQL数据库然后将数据库部署到ECS
在Mac电脑上使用VS Code进行PHP开发并关联操作MySQL数据库,然后将数据库部署到ECS。 1.安装PHP和MySQL 确保你的Mac上已经安装了PHP和MySQL。你可以使用Homebrew来安装它们: $ brew install php $ brew install mysql 安装mysql完成后记住这一句: …...

23.11.19日总结
经过昨天的中期答辩,其实可以看出来项目进度太慢了,现在是第十周,预计第十四周是终级答辩,在这段时间要把项目写完。 前端要加上一个未登录的拦截器,后端加上全局的异常处理。对于饿了么项目的商品建表,之前…...

系列一、JVM概述
一、概述 1.1、Java发展中的重大事件 1.2、虚拟机 vs Java虚拟机 1.2.1、虚拟机 1.2.2、Java虚拟机 1.2.3、Java虚拟机的作用 Java虚拟机是二进制字节码的运行环境,负责装载字节码到其内部,解释/编译为对应平台上的机器指令指令。每一条Java指令&#…...
milvus数据管理-压缩数据
Milvus 默认支持自动数据压缩。您可以 配置 Milvus 以启用或禁用 压缩 和自动压缩。 如果自动压缩被禁用,您仍然可以手动压缩数据。 1.手动压缩数据 压缩请求是异步处理的,因为它们通常需要花费很长时间。 from pymilvus import Collection collection…...

SpringBoot项目连接linux服务器数据库两种解决方法(linux直接开放端口访问本机通过SSH协议访问,以mysql为例)
最近找个springboot脚手架重新熟悉一下springboot相关框架的东西,结果发现好像项目还不能直接像数据库GUI工具一样填几个SSH参数就可以了,于是就给他再整一下看看如何解决 linux开放3306(可修改)端口直接访问 此方法较为方便&am…...

铭豹扩展坞 USB转网口 突然无法识别解决方法
当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...
谷歌浏览器插件
项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...

Spark 之 入门讲解详细版(1)
1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...

关键领域软件测试的突围之路:如何破解安全与效率的平衡难题
在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件,这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下,实现高效测试与快速迭代?这一命题正考验着…...

无人机侦测与反制技术的进展与应用
国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机(无人驾驶飞行器,UAV)技术的快速发展,其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统,无人机的“黑飞”&…...
JS手写代码篇----使用Promise封装AJAX请求
15、使用Promise封装AJAX请求 promise就有reject和resolve了,就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...

从 GreenPlum 到镜舟数据库:杭银消费金融湖仓一体转型实践
作者:吴岐诗,杭银消费金融大数据应用开发工程师 本文整理自杭银消费金融大数据应用开发工程师在StarRocks Summit Asia 2024的分享 引言:融合数据湖与数仓的创新之路 在数字金融时代,数据已成为金融机构的核心竞争力。杭银消费金…...
第7篇:中间件全链路监控与 SQL 性能分析实践
7.1 章节导读 在构建数据库中间件的过程中,可观测性 和 性能分析 是保障系统稳定性与可维护性的核心能力。 特别是在复杂分布式场景中,必须做到: 🔍 追踪每一条 SQL 的生命周期(从入口到数据库执行)&#…...

uniapp 小程序 学习(一)
利用Hbuilder 创建项目 运行到内置浏览器看效果 下载微信小程序 安装到Hbuilder 下载地址 :开发者工具默认安装 设置服务端口号 在Hbuilder中设置微信小程序 配置 找到运行设置,将微信开发者工具放入到Hbuilder中, 打开后出现 如下 bug 解…...