实现canal监控binlog日志再通过消息队列异步处理
一、简单步骤

实现Canal监控Binlog日志,并通过消息队列进行异步处理,步骤如下:
-
配置Canal:首先,需要配置Canal进行Binlog日志监控。可以通过Canal的官方文档了解如何配置Canal。
-
连接到Canal:使用Canal客户端连接到Canal服务器,订阅需要监控的数据库和表。
-
监听Binlog事件:通过Canal客户端,监听Canal服务器发送的Binlog事件。可以通过设置监听器来处理不同类型的Binlog事件。
-
将事件发送到消息队列:一旦监听到Binlog事件,可以将事件转化为消息,并发送到消息队列进行异步处理。可以选择使用Kafka、RabbitMQ、ActiveMQ等消息队列。
-
消息处理:在消息队列中,可以编写消费者程序来处理接收到的消息。根据消息的类型和内容,可以执行相应的操作,例如更新数据库,生成报告等。
-
确保处理的幂等性:在处理消息时,需要注意幂等性。即,同一条消息可以被处理多次,但最终结果应该是一致的。
-
监控和错误处理:在整个过程中,需要监控消息队列的状态,并进行错误处理。例如,如果消息队列出现故障,可能需要重新连接或存储未处理的消息。
二、Binlog使用
Binlog是MySQL数据库的二进制日志,用于记录数据库的更改操作。它可以用于数据恢复、主从复制、数据备份和恢复等场景。以下是有关Binlog的一些常见用法:
-
数据恢复:通过分析Binlog,可以还原数据库到某个特定的时间点或事务提交后的状态。这在意外删除或误操作后恢复数据非常有用。
-
主从复制:通过将Binlog从主数据库复制到从数据库,可以实现主数据库的实时同步。这可以用于横向扩展读操作、数据备份和故障恢复。
-
数据备份和恢复:可以使用Binlog进行增量备份和恢复,只记录变更的部分,而不需要完全备份整个数据库。
-
数据审计和追踪:分析Binlog可以了解数据库的操作历史,包括谁在何时执行了哪些操作。这对于安全审计和数据追踪非常有用。
-
数据更改追踪和同步:通过解析Binlog,可以捕获和同步数据库的更改,以便在其他系统或数据仓库中进行进一步处理和分析。
要使用Binlog,首先需要在MySQL服务器上启用Binlog功能。可以在MySQL配置文件中设置以下参数:
log-bin=mysql-bin
binlog-format=ROW
其中,log-bin指定Binlog日志文件的名称前缀,binlog-format指定Binlog的格式为行格式。
启用Binlog后,MySQL将开始记录所有的更新操作到Binlog中。可以使用MySQL提供的工具(如mysqlbinlog)来解析和分析Binlog文件。还可以使用第三方工具(如Canal)来实时监控和解析Binlog,以便进行异步处理或其他操作。
需要注意的是,Binlog日志会占用磁盘空间,因此需要定期进行清理和归档,以避免过多的日志文件导致磁盘空间不足。
三、canal使用
Canal是一款开源的基于Java的MySQL数据库增量订阅&消费组件,它可以帮助我们实时监控MySQL的Binlog日志,并将数据推送到消息队列中进行异步处理。以下是使用Canal的一般步骤:
-
下载和安装Canal:首先需要从Canal的官方网站或Github上下载Canal的安装包,然后解压到指定的目录中。
-
配置Canal:Canal需要配置MySQL的相关信息,如主机地址、端口、用户名、密码等,以便连接到MySQL数据库。配置文件位于Canal安装目录下的
conf文件夹中,主要包括canal.properties和instance.properties两个文件。其中,canal.properties是全局配置文件,而instance.properties是实例级别的配置文件,可以配置多个实例。 -
启动Canal Server:通过运行Canal Server的启动脚本(如
startup.sh或startup.bat),启动Canal Server进程。 -
创建Canal实例:使用Canal提供的命令行工具或API,创建一个Canal实例,指定要订阅和监控的数据库和表。
-
消费Binlog事件:通过连接到Canal Server,订阅和消费Binlog事件。Canal可以将Binlog事件推送到Kafka、RocketMQ等消息队列中,供后续的异步处理。
四、Rabbit结合canal更新Redis缓存示例
通过canal 监控更新 Redis 缓存或者触发RabbitMQ发送消息示例:
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.rabbitmq.client.*;
import redis.clients.jedis.Jedis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.sql.Connection;
import java.util.List;public class RabbitCanalRedisExample {private static final String RABBITMQ_QUEUE_NAME = "canal_redis_queue";private static final String RABBITMQ_HOST = "localhost";private static final int RABBITMQ_PORT = 5672;private static final String CANAL_HOST = "localhost";private static final int CANAL_PORT = 11111;private static final String REDIS_HOST = "localhost";private static final int REDIS_PORT = 6379;public static void main(String[] args) throws Exception {// 连接 RabbitMQConnectionFactory factory = new ConnectionFactory();factory.setHost(RABBITMQ_HOST);factory.setPort(RABBITMQ_PORT);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(RABBITMQ_QUEUE_NAME, false, false, false, null);// 连接 CanalCanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_HOST, CANAL_PORT), "example", "", "");connector.connect();connector.subscribe(".*\\..*");connector.rollback();// 连接 RedisJedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);while (true) {// 从 Canal 获取消息Message message = connector.getWithoutAck(100);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 确认消息已处理connector.ack(batchId);continue;}// 遍历 Canal 消息for (CanalEntry.Entry entry : message.getEntries()) {if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {continue;}try {// 解析 Canal 消息CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {// 处理新增数据String tableName = entry.getHeader().getTableName();String key = rowData.getAfterColumnsList().stream().filter(c -> c.getIsKey()).findFirst().get().getValue();// 更新 Redis 缓存jedis.set(tableName + "_" + key, JSON.toJSONString(rowData.getAfterColumnsList()));// 发送消息到 RabbitMQchannel.basicPublish("", RABBITMQ_QUEUE_NAME, null, key.getBytes());} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {// 处理删除数据String tableName = entry.getHeader().getTableName();String key = rowData.getBeforeColumnsList().stream().filter(c -> c.getIsKey()).findFirst().get().getValue();// 删除 Redis 缓存jedis.del(tableName + "_" + key);// 发送消息到 RabbitMQchannel.basicPublish("", RABBITMQ_QUEUE_NAME, null, key.getBytes());} else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {// 处理更新数据String tableName = entry.getHeader().getTableName();String key = rowData.getAfterColumnsList().stream().filter(c -> c.getIsKey()).findFirst().get().getValue();// 更新 Redis 缓存jedis.set(tableName + "_" + key, JSON.toJSONString(rowData.getAfterColumnsList()));// 发送消息到 RabbitMQchannel.basicPublish("", RABBITMQ_QUEUE_NAME, null, key.getBytes());}}} catch (IOException e) {e.printStackTrace();}}}// 确认消息已处理connector.ack(batchId);}
}
这个示例代码实现了以下功能:
- 使用 Canal 连接到 MySQL 数据库并订阅所有表。
- 使用 RabbitMQ 队列接收并发送消息。
- 使用 Redis 缓存数据。
- 根据 Canal 消息的类型进行相应操作:新增数据时,在 Redis 中保存数据并发送消息到 RabbitMQ;删除数据时,从 Redis 中删除数据并发送消息到 RabbitMQ;更新数据时,更新 Redis 中的数据并发送消息到 RabbitMQ。
区分场景,也可以先发送消息,通过消息触发缓存更新,好处有以下几点:
-
异步操作:将更新缓存的操作写入消息队列后,在实际更新缓存的过程中不会阻塞应用程序的执行。应用程序可以继续处理其他的业务逻辑,而不必等待缓存更新完成。这样可以提高应用程序的响应速度和吞吐量。
-
并发控制:通过消息队列可以实现缓存更新的并发控制。当多个请求同时更新缓存时,可以通过消息队列的先进先出原则,确保每个更新操作按照顺序进行,避免了并发操作导致的数据不一致问题。
-
可靠性:消息队列可以提供消息的持久化机制,即使在消息队列出现故障或重启的情况下,更新缓存的消息不会丢失。这样可以提高缓存更新的可靠性和数据完整性。
-
扩展性:通过消息队列,可以将缓存更新的任务分发到多个消费者进行处理,实现任务的并行处理,提高系统的扩展性和负载能力。
总的来说,使用消息队列更新缓存可以实现异步操作、并发控制、可靠性和扩展性,提高系统的性能和可靠性。
相关文章:
实现canal监控binlog日志再通过消息队列异步处理
一、简单步骤 实现Canal监控Binlog日志,并通过消息队列进行异步处理,步骤如下: 配置Canal:首先,需要配置Canal进行Binlog日志监控。可以通过Canal的官方文档了解如何配置Canal。 连接到Canal:使用Canal客户…...
Linux DNS 协议概述
1. DNS 概述 互联网中,一台计算机与其他计算机通信时,通过 IP 地址唯一的标志自己。此时的 IP 地址就类似于我们日常生活中的电话号码。但是,这种纯数字的标识是比较难记忆的,而且数量也比较庞大。例如,每个 IPv4 地址…...
linux打包qt程序
参考这篇文章:Linux下Qt程序打包_linuxdeployqt下载-CSDN博客 本篇文章的系统环境是 : 虚拟机ubuntu18.04 用下面这个qmake路径 进行编译 在 ~/.bashrc 文件末尾,qmake目录配置到文件末尾 将上图中bin目录下的linuxdeployqt程序拷贝到/usr/bin下一份 &…...
软考中级-软件设计师通过心路经验分享
执念,第四次终于通过了 没买书,下班后每天2小时,四个2个月终于过了 学习经验: 1.下班后学习真的靠毅力,和上学的时候考证不是一个状态,大家要及时调整,否则过程很痛苦 2.失败三次的经验…...
safe area helper插件
概述 显示不同机型的必能显示的区域 实现步骤 引入safearea,引入其中的safearea的csharp 为cancas加入gameobject gameobject中加入safearea脚本 将UI作为这个gameobject的子物体,就可以完成显示...
李宏毅机器学习-批次 (batch)和动量(momentum)
一.batch(批次) 在计算微分时,不是对所有的数据算出来的Loss值做微分,而是将所有的数据分成一个一个的batch。一个batch是一个B,在更新参数时,拿B的资料计算Loss,计算gradient,再更新…...
C# 网络编程--关于UDP 通信(二)
UDP (User Datagram Protocol) 是一种无连接的传输层协议,主要用于支持数据报文的传输。它的主要特点包括简单、高效、不保证可靠性和顺序。 1.UDP协议基本概念 1.udp基于IP的简单的协议,不可靠的协议 2.优点:简单、 轻量化、 传输速度高、…...
【k8s集群应用】Kubernetes部署安装-二进制部署实例
文章目录 Kubernetes 部署方式常见的K8S安装部署方式Kubeadm与二进制部署的区别 Kubernetes部署安装环境配置Kubernetes集群初始化配置(实验环境)一、操作系统初始化配置二、部署Docker引擎 etcd 集群搭建配置 etcd 集群 Kubernetes Master 组件部署准备…...
js常见代码输出问题之promise,await,变量提升以及闭包(包括例子以及详细解析)
这里写目录标题 异步事件循环宏任务微任务1. 执行顺序2. 分类 Promise代码输出1. promise.then执行时机2. 宏任务微任务的多轮次3. .then .catch会返回新的promise4. 返回任意一个非 promise 的值都会被包裹成 promise 对象5. .then .catch 的值不能是promise本身6. 值透传7. .…...
遗传算法与深度学习实战(27)——进化卷积神经网络
遗传算法与深度学习实战(27)——进化卷积神经网络 0. 前言1. 自定义交叉算子2. 自定义突变操作符3. 进化卷积神经网络小结系列链接 0. 前言 DEAP toolbox 中提供的标准遗传操作符对于自定义的网络架构基因序列来说是不够的。这是因为任何标准的交叉算子…...
【Vue3】前端使用 FFmpeg.wasm 完成用户视频录制,并对视频进行压缩处理
强烈推荐这篇博客!非常全面的一篇文章,本文是对该博客的简要概括和补充,在不同技术栈中提供一种可行思路,可先阅读该篇文章再阅读本篇: FFmpeg——在Vue项目中使用FFmpeg(安装、配置、使用、SharedArrayBu…...
基础算法——前缀和
由于比赛基本都是采用Dev-C所以,算法篇基本都是采用Dev-C来解释(版本5.11,c11) 首先介绍一下前缀和算法 给定一个数组,有q次询问,每次询问: 两个整数l,r,求出数组 l 到 r的结果 遇…...
spring实例化对象的几种方式(使用XML配置文件)
前言 Spring框架作为一个轻量级的控制反转(IoC)容器,为开发者提供了多种对象实例化的策略。通过这些策略,开发者可以更加灵活地控制对象的生命周期和依赖关系。无论是通过XML配置、注解配置还是Java配置,Spring都能…...
【二叉树】力扣 129.求根节点到叶子节点数字之和
一、题目 二、思路 每找到一个非空节点,之前路径上的所有节点的数量级都要增加1个单位。例如,当前节点为3,之前的节点路径为1 -> 2,presum 1 * 10 2 12,现在路径变为了 1 -> 2 -> 3,sum pres…...
深度学习物体检测之YOLOV5源码解读
V5比前面版本偏工程化,项目化,更贴合实战 一.V5版本项目配置 (1)整体项目概述 首先github直接查找yolov5,下载下来即可。在训练时,数据是怎么处理的?网络模型架构是怎么设计的(如各层的设计)?yolov5要求是大于python3.8与大于等…...
音频数据采样入门详解 - 给Python初学者的简单解释
音频数据采样入门详解 - 给Python初学者的简单解释 声音是如何变成数字的?什么是采样率?为什么要懂这个?Python小例子总结 大家好!今天我们来聊一个有趣的话题:音频数据是如何在计算机中处理的。让我用最简单的方式来解…...
Unity类银河战士恶魔城学习总结(P179 Enemy Archer 弓箭手)
教程源地址:https://www.udemy.com/course/2d-rpg-alexdev/ 本章节实现了敌人弓箭手的制作 Enemy_Archer.cs 核心功能 状态机管理敌人的行为 定义了多个状态对象(如 idleState、moveState、attackState 等),通过状态机管理敌人的…...
SpringCloud集成sleuth和zipkin实现微服务链路追踪
文章目录 前言技术积累spring cloud sleuth介绍zipkin介绍Zipkin与Sleuth的协作 SpringCloud多模块搭建Zipkin Server部署docker pull 镜像启动zipkin server SpringCloud 接入 Sleuth 与 Zipkinpom引入依赖 (springboot2.6)appilication.yml配置修改增加测试链路代码 调用微服…...
Python随机抽取Excel数据并在处理后整合为一个文件
本文介绍基于Python语言,针对一个文件夹下大量的Excel表格文件,基于其中每一个文件,随机从其中选取一部分数据,并将全部文件中随机获取的数据合并为一个新的Excel表格文件的方法。 首先,我们来明确一下本文的具体需求。…...
Linux+Docker onlyoffice 启用 HTTPS 端口支持
文章目录 一、需求二、配置2.1 创建容器2.2 进入容器2.3 生成私钥和证书 2.4 测试访问 一、需求 上篇文章介绍了如何搭建一个 onlyoffice 在线预览服务,但是我们实际场景调用该服务的网站是协议是 https 的 ,但是 onlyoffice 服务还没做配置,…...
OpenClaw+Qwen3-14B镜像实战:5分钟搭建飞书智能助手
OpenClawQwen3-14B镜像实战:5分钟搭建飞书智能助手 1. 为什么选择这个组合? 上周三晚上11点,我正在为第二天的部门会议整理材料时,突然冒出一个想法:能不能让AI自动处理这些重复性工作?经过一番折腾&…...
SD卡速度模式全解析:从High Speed到UHS-III的选型指南
SD卡速度模式全解析:从High Speed到UHS-III的选型指南 在4K视频拍摄、高速连拍相机和工业级数据采集设备中,SD卡的性能往往成为系统瓶颈。我曾为一个医疗影像项目选型时,因误用Class 10的High Speed卡导致DVR设备频繁丢帧,最终发现…...
搞电机控制的兄弟应该都懂,无感算法里磁链观测器+PLL锁相环的组合有多香。今天直接上干货,聊聊非线性磁链观测器的实现套路和实操中那些让你少掉几根头发的技巧
永磁同步电机非线性磁链无感算法、Flux观测器锁相环PLL仿真模型 flux:计算电机磁链,目的为了使得估计的磁链收敛于实际磁链; pll:通过估计磁链计算经过pi调节后使得估计角度跟踪实际角度 模型描述及资料: (…...
网安新手必刷的五个渗透测试靶场!黑客技术实战靶场零基础入门到精通教程!DVWA、Pikachu、SQLi-Labs、Upload-Labs、XSS-Labs靶场教程
前言 因为最近有任务需要搭建一些适合新手使用的靶场,所以收集了一下互联网常见的一些友好的新手渗透测试靶场。 分别是DVWA、Pikachu、SQLi-Labs、Upload-Labs、XSS-Labs。 DVWA靶场 DVWA靶场是一个专门用于漏洞测试和练习的Web应用程序,旨在为安全专业…...
Epigenase m6A 甲基化酶活性/抑制比色法检测试剂盒:快速、灵敏、高通量适配
一、产品概述Epigenase m6A 甲基化酶活性/抑制比色法检测试剂盒,由Cytoskeleton推出,艾美捷代理,它是一套完整的优化缓冲液与试剂组合,专用于定量检测总 m6A 甲基化酶(甲基转移酶)的活性或抑制效果。该试剂…...
DAY4--SQL限制返回行数查询
SQL基础入门:电商用户数据限制返回行数查询实操 这一章能解决什么电商工作问题? 这一章要学的LIMIT,是我认为电商数据分析新人最应该刻进肌肉记忆的语法。因为它直接关系到两件事:你的工作效率,以及你的职场安全。 我先…...
表格设计:结构与美感并重
1. 表格的结构如果把表格比作一座建筑,那么它的每个结构部件都承担着特定功能。下面是一个完整的表格示例,展示了所有标准结构组件:表格结构图解:标题与副标题:表格的"名字"和"简介",告…...
Linux系统线程数量限制与优化指南
1. 进程与线程基础概念回顾在深入探讨进程能创建多少线程之前,我们需要先明确几个基本概念。进程是操作系统资源分配的基本单位,而线程则是CPU调度的基本单位。每个进程至少包含一个主线程,这个主线程可以创建其他子线程。线程与进程最大的区…...
Linux端口占用排查:工具与实战技巧
1. 网络端口占用排查的必要性遇到"Address already in use"错误提示时,每个Linux系统管理员都会心头一紧。这种端口冲突问题不仅影响服务启动,还可能导致关键业务中断。我刚入行时就曾因为Nginx和Apache争抢80端口,导致公司官网瘫痪…...
偏振无关 宽带消色差 长波红外超透镜模型 粒子群优化算法 复现论文:2022年博士论文
偏振无关 宽带消色差 长波红外超透镜模型 粒子群优化算法 复现论文:2022年博士论文:消色差超透镜设计原理及其应用研究 论文介绍:采用各向同性的多种不同形状的超表面单元,利用庞大的数据库和粒子群优化算法,设计长波红…...
