当前位置: 首页 > news >正文

实现canal监控binlog日志再通过消息队列异步处理

一、简单步骤

在这里插入图片描述

实现Canal监控Binlog日志,并通过消息队列进行异步处理,步骤如下:

  1. 配置Canal:首先,需要配置Canal进行Binlog日志监控。可以通过Canal的官方文档了解如何配置Canal。

  2. 连接到Canal:使用Canal客户端连接到Canal服务器,订阅需要监控的数据库和表。

  3. 监听Binlog事件:通过Canal客户端,监听Canal服务器发送的Binlog事件。可以通过设置监听器来处理不同类型的Binlog事件。

  4. 将事件发送到消息队列:一旦监听到Binlog事件,可以将事件转化为消息,并发送到消息队列进行异步处理。可以选择使用Kafka、RabbitMQ、ActiveMQ等消息队列。

  5. 消息处理:在消息队列中,可以编写消费者程序来处理接收到的消息。根据消息的类型和内容,可以执行相应的操作,例如更新数据库,生成报告等。

  6. 确保处理的幂等性:在处理消息时,需要注意幂等性。即,同一条消息可以被处理多次,但最终结果应该是一致的。

  7. 监控和错误处理:在整个过程中,需要监控消息队列的状态,并进行错误处理。例如,如果消息队列出现故障,可能需要重新连接或存储未处理的消息。

二、Binlog使用

Binlog是MySQL数据库的二进制日志,用于记录数据库的更改操作。它可以用于数据恢复、主从复制、数据备份和恢复等场景。以下是有关Binlog的一些常见用法:

  1. 数据恢复:通过分析Binlog,可以还原数据库到某个特定的时间点或事务提交后的状态。这在意外删除或误操作后恢复数据非常有用。

  2. 主从复制:通过将Binlog从主数据库复制到从数据库,可以实现主数据库的实时同步。这可以用于横向扩展读操作、数据备份和故障恢复。

  3. 数据备份和恢复:可以使用Binlog进行增量备份和恢复,只记录变更的部分,而不需要完全备份整个数据库。

  4. 数据审计和追踪:分析Binlog可以了解数据库的操作历史,包括谁在何时执行了哪些操作。这对于安全审计和数据追踪非常有用。

  5. 数据更改追踪和同步:通过解析Binlog,可以捕获和同步数据库的更改,以便在其他系统或数据仓库中进行进一步处理和分析。

要使用Binlog,首先需要在MySQL服务器上启用Binlog功能。可以在MySQL配置文件中设置以下参数:

log-bin=mysql-bin
binlog-format=ROW

其中,log-bin指定Binlog日志文件的名称前缀,binlog-format指定Binlog的格式为行格式。

启用Binlog后,MySQL将开始记录所有的更新操作到Binlog中。可以使用MySQL提供的工具(如mysqlbinlog)来解析和分析Binlog文件。还可以使用第三方工具(如Canal)来实时监控和解析Binlog,以便进行异步处理或其他操作。

需要注意的是,Binlog日志会占用磁盘空间,因此需要定期进行清理和归档,以避免过多的日志文件导致磁盘空间不足。

三、canal使用

Canal是一款开源的基于Java的MySQL数据库增量订阅&消费组件,它可以帮助我们实时监控MySQL的Binlog日志,并将数据推送到消息队列中进行异步处理。以下是使用Canal的一般步骤:

  1. 下载和安装Canal:首先需要从Canal的官方网站或Github上下载Canal的安装包,然后解压到指定的目录中。

  2. 配置Canal:Canal需要配置MySQL的相关信息,如主机地址、端口、用户名、密码等,以便连接到MySQL数据库。配置文件位于Canal安装目录下的conf文件夹中,主要包括canal.propertiesinstance.properties两个文件。其中,canal.properties是全局配置文件,而instance.properties是实例级别的配置文件,可以配置多个实例。

  3. 启动Canal Server:通过运行Canal Server的启动脚本(如startup.shstartup.bat),启动Canal Server进程。

  4. 创建Canal实例:使用Canal提供的命令行工具或API,创建一个Canal实例,指定要订阅和监控的数据库和表。

  5. 消费Binlog事件:通过连接到Canal Server,订阅和消费Binlog事件。Canal可以将Binlog事件推送到Kafka、RocketMQ等消息队列中,供后续的异步处理。

四、Rabbit结合canal更新Redis缓存示例

通过canal 监控更新 Redis 缓存或者触发RabbitMQ发送消息示例:

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.rabbitmq.client.*;
import redis.clients.jedis.Jedis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.sql.Connection;
import java.util.List;public class RabbitCanalRedisExample {private static final String RABBITMQ_QUEUE_NAME = "canal_redis_queue";private static final String RABBITMQ_HOST = "localhost";private static final int RABBITMQ_PORT = 5672;private static final String CANAL_HOST = "localhost";private static final int CANAL_PORT = 11111;private static final String REDIS_HOST = "localhost";private static final int REDIS_PORT = 6379;public static void main(String[] args) throws Exception {// 连接 RabbitMQConnectionFactory factory = new ConnectionFactory();factory.setHost(RABBITMQ_HOST);factory.setPort(RABBITMQ_PORT);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(RABBITMQ_QUEUE_NAME, false, false, false, null);// 连接 CanalCanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_HOST, CANAL_PORT), "example", "", "");connector.connect();connector.subscribe(".*\\..*");connector.rollback();// 连接 RedisJedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);while (true) {// 从 Canal 获取消息Message message = connector.getWithoutAck(100);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 确认消息已处理connector.ack(batchId);continue;}// 遍历 Canal 消息for (CanalEntry.Entry entry : message.getEntries()) {if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {continue;}try {// 解析 Canal 消息CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {// 处理新增数据String tableName = entry.getHeader().getTableName();String key = rowData.getAfterColumnsList().stream().filter(c -> c.getIsKey()).findFirst().get().getValue();// 更新 Redis 缓存jedis.set(tableName + "_" + key, JSON.toJSONString(rowData.getAfterColumnsList()));// 发送消息到 RabbitMQchannel.basicPublish("", RABBITMQ_QUEUE_NAME, null, key.getBytes());} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {// 处理删除数据String tableName = entry.getHeader().getTableName();String key = rowData.getBeforeColumnsList().stream().filter(c -> c.getIsKey()).findFirst().get().getValue();// 删除 Redis 缓存jedis.del(tableName + "_" + key);// 发送消息到 RabbitMQchannel.basicPublish("", RABBITMQ_QUEUE_NAME, null, key.getBytes());} else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {// 处理更新数据String tableName = entry.getHeader().getTableName();String key = rowData.getAfterColumnsList().stream().filter(c -> c.getIsKey()).findFirst().get().getValue();// 更新 Redis 缓存jedis.set(tableName + "_" + key, JSON.toJSONString(rowData.getAfterColumnsList()));// 发送消息到 RabbitMQchannel.basicPublish("", RABBITMQ_QUEUE_NAME, null, key.getBytes());}}} catch (IOException e) {e.printStackTrace();}}}// 确认消息已处理connector.ack(batchId);}
}

这个示例代码实现了以下功能:

  1. 使用 Canal 连接到 MySQL 数据库并订阅所有表。
  2. 使用 RabbitMQ 队列接收并发送消息。
  3. 使用 Redis 缓存数据。
  4. 根据 Canal 消息的类型进行相应操作:新增数据时,在 Redis 中保存数据并发送消息到 RabbitMQ;删除数据时,从 Redis 中删除数据并发送消息到 RabbitMQ;更新数据时,更新 Redis 中的数据并发送消息到 RabbitMQ。

区分场景,也可以先发送消息,通过消息触发缓存更新,好处有以下几点:

  1. 异步操作:将更新缓存的操作写入消息队列后,在实际更新缓存的过程中不会阻塞应用程序的执行。应用程序可以继续处理其他的业务逻辑,而不必等待缓存更新完成。这样可以提高应用程序的响应速度和吞吐量。

  2. 并发控制:通过消息队列可以实现缓存更新的并发控制。当多个请求同时更新缓存时,可以通过消息队列的先进先出原则,确保每个更新操作按照顺序进行,避免了并发操作导致的数据不一致问题。

  3. 可靠性:消息队列可以提供消息的持久化机制,即使在消息队列出现故障或重启的情况下,更新缓存的消息不会丢失。这样可以提高缓存更新的可靠性和数据完整性。

  4. 扩展性:通过消息队列,可以将缓存更新的任务分发到多个消费者进行处理,实现任务的并行处理,提高系统的扩展性和负载能力。

总的来说,使用消息队列更新缓存可以实现异步操作、并发控制、可靠性和扩展性,提高系统的性能和可靠性。

相关文章:

实现canal监控binlog日志再通过消息队列异步处理

一、简单步骤 实现Canal监控Binlog日志,并通过消息队列进行异步处理,步骤如下: 配置Canal:首先,需要配置Canal进行Binlog日志监控。可以通过Canal的官方文档了解如何配置Canal。 连接到Canal:使用Canal客户…...

Linux DNS 协议概述

1. DNS 概述 互联网中,一台计算机与其他计算机通信时,通过 IP 地址唯一的标志自己。此时的 IP 地址就类似于我们日常生活中的电话号码。但是,这种纯数字的标识是比较难记忆的,而且数量也比较庞大。例如,每个 IPv4 地址…...

linux打包qt程序

参考这篇文章:Linux下Qt程序打包_linuxdeployqt下载-CSDN博客 本篇文章的系统环境是 : 虚拟机ubuntu18.04 用下面这个qmake路径 进行编译 在 ~/.bashrc 文件末尾,qmake目录配置到文件末尾 将上图中bin目录下的linuxdeployqt程序拷贝到/usr/bin下一份 &…...

软考中级-软件设计师通过心路经验分享

执念,第四次终于通过了 没买书,下班后每天2小时,四个2个月终于过了 学习经验: 1.下班后学习真的靠毅力,和上学的时候考证不是一个状态,大家要及时调整,否则过程很痛苦 2.失败三次的经验&#xf…...

safe area helper插件

概述 显示不同机型的必能显示的区域 实现步骤 引入safearea,引入其中的safearea的csharp 为cancas加入gameobject gameobject中加入safearea脚本 将UI作为这个gameobject的子物体,就可以完成显示...

李宏毅机器学习-批次 (batch)和动量(momentum)

一.batch(批次) 在计算微分时,不是对所有的数据算出来的Loss值做微分,而是将所有的数据分成一个一个的batch。一个batch是一个B,在更新参数时,拿B的资料计算Loss,计算gradient,再更新…...

C# 网络编程--关于UDP 通信(二)

UDP (User Datagram Protocol) 是一种无连接的传输层协议,主要用于支持数据报文的传输。它的主要特点包括简单、高效、不保证可靠性和顺序。 1.UDP协议基本概念 1.udp基于IP的简单的协议,不可靠的协议 2.优点:简单、 轻量化、 传输速度高、…...

【k8s集群应用】Kubernetes部署安装-二进制部署实例

文章目录 Kubernetes 部署方式常见的K8S安装部署方式Kubeadm与二进制部署的区别 Kubernetes部署安装环境配置Kubernetes集群初始化配置(实验环境)一、操作系统初始化配置二、部署Docker引擎 etcd 集群搭建配置 etcd 集群 Kubernetes Master 组件部署准备…...

js常见代码输出问题之promise,await,变量提升以及闭包(包括例子以及详细解析)

这里写目录标题 异步事件循环宏任务微任务1. 执行顺序2. 分类 Promise代码输出1. promise.then执行时机2. 宏任务微任务的多轮次3. .then .catch会返回新的promise4. 返回任意一个非 promise 的值都会被包裹成 promise 对象5. .then .catch 的值不能是promise本身6. 值透传7. .…...

遗传算法与深度学习实战(27)——进化卷积神经网络

遗传算法与深度学习实战(27)——进化卷积神经网络 0. 前言1. 自定义交叉算子2. 自定义突变操作符3. 进化卷积神经网络小结系列链接 0. 前言 DEAP toolbox 中提供的标准遗传操作符对于自定义的网络架构基因序列来说是不够的。这是因为任何标准的交叉算子…...

【Vue3】前端使用 FFmpeg.wasm 完成用户视频录制,并对视频进行压缩处理

强烈推荐这篇博客!非常全面的一篇文章,本文是对该博客的简要概括和补充,在不同技术栈中提供一种可行思路,可先阅读该篇文章再阅读本篇: FFmpeg——在Vue项目中使用FFmpeg(安装、配置、使用、SharedArrayBu…...

基础算法——前缀和

由于比赛基本都是采用Dev-C所以,算法篇基本都是采用Dev-C来解释(版本5.11,c11) 首先介绍一下前缀和算法 给定一个数组,有q次询问,每次询问: 两个整数l,r,求出数组 l 到 r的结果 遇…...

spring实例化对象的几种方式(使用XML配置文件)

前言 Spring框架作为一个轻量级的控制反转(IoC)容器,为开发者提供了多种对象实例化的策略。通过这些策略,开发者可以更加灵活地控制对象的生命周期和依赖关系。无论是通过XML配置、注解配置还是Java配置,Spring都能…...

【二叉树】力扣 129.求根节点到叶子节点数字之和

一、题目 二、思路 每找到一个非空节点,之前路径上的所有节点的数量级都要增加1个单位。例如,当前节点为3,之前的节点路径为1 -> 2,presum 1 * 10 2 12,现在路径变为了 1 -> 2 -> 3,sum pres…...

深度学习物体检测之YOLOV5源码解读

V5比前面版本偏工程化,项目化,更贴合实战 一.V5版本项目配置 (1)整体项目概述 首先github直接查找yolov5,下载下来即可。在训练时,数据是怎么处理的?网络模型架构是怎么设计的(如各层的设计)?yolov5要求是大于python3.8与大于等…...

音频数据采样入门详解 - 给Python初学者的简单解释

音频数据采样入门详解 - 给Python初学者的简单解释 声音是如何变成数字的?什么是采样率?为什么要懂这个?Python小例子总结 大家好!今天我们来聊一个有趣的话题:音频数据是如何在计算机中处理的。让我用最简单的方式来解…...

Unity类银河战士恶魔城学习总结(P179 Enemy Archer 弓箭手)

教程源地址:https://www.udemy.com/course/2d-rpg-alexdev/ 本章节实现了敌人弓箭手的制作 Enemy_Archer.cs 核心功能 状态机管理敌人的行为 定义了多个状态对象(如 idleState、moveState、attackState 等),通过状态机管理敌人的…...

SpringCloud集成sleuth和zipkin实现微服务链路追踪

文章目录 前言技术积累spring cloud sleuth介绍zipkin介绍Zipkin与Sleuth的协作 SpringCloud多模块搭建Zipkin Server部署docker pull 镜像启动zipkin server SpringCloud 接入 Sleuth 与 Zipkinpom引入依赖 (springboot2.6)appilication.yml配置修改增加测试链路代码 调用微服…...

Python随机抽取Excel数据并在处理后整合为一个文件

本文介绍基于Python语言,针对一个文件夹下大量的Excel表格文件,基于其中每一个文件,随机从其中选取一部分数据,并将全部文件中随机获取的数据合并为一个新的Excel表格文件的方法。 首先,我们来明确一下本文的具体需求。…...

Linux+Docker onlyoffice 启用 HTTPS 端口支持

文章目录 一、需求二、配置2.1 创建容器2.2 进入容器2.3 生成私钥和证书 2.4 测试访问 一、需求 上篇文章介绍了如何搭建一个 onlyoffice 在线预览服务,但是我们实际场景调用该服务的网站是协议是 https 的 ,但是 onlyoffice 服务还没做配置&#xff0c…...

第19节 Node.js Express 框架

Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:

一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...

【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)

可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...

【Go】3、Go语言进阶与依赖管理

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程,它的核心机制是 Goroutine 协程、Channel 通道,并基于CSP(Communicating Sequential Processes&#xff0…...

在Ubuntu中设置开机自动运行(sudo)指令的指南

在Ubuntu系统中,有时需要在系统启动时自动执行某些命令,特别是需要 sudo权限的指令。为了实现这一功能,可以使用多种方法,包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法,并提供…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战

“🤖手搓TuyaAI语音指令 😍秒变表情包大师,让萌系Otto机器人🔥玩出智能新花样!开整!” 🤖 Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制(TuyaAI…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

selenium学习实战【Python爬虫】

selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)

上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...