实现canal监控binlog日志再通过消息队列异步处理
一、简单步骤

实现Canal监控Binlog日志,并通过消息队列进行异步处理,步骤如下:
-
配置Canal:首先,需要配置Canal进行Binlog日志监控。可以通过Canal的官方文档了解如何配置Canal。
-
连接到Canal:使用Canal客户端连接到Canal服务器,订阅需要监控的数据库和表。
-
监听Binlog事件:通过Canal客户端,监听Canal服务器发送的Binlog事件。可以通过设置监听器来处理不同类型的Binlog事件。
-
将事件发送到消息队列:一旦监听到Binlog事件,可以将事件转化为消息,并发送到消息队列进行异步处理。可以选择使用Kafka、RabbitMQ、ActiveMQ等消息队列。
-
消息处理:在消息队列中,可以编写消费者程序来处理接收到的消息。根据消息的类型和内容,可以执行相应的操作,例如更新数据库,生成报告等。
-
确保处理的幂等性:在处理消息时,需要注意幂等性。即,同一条消息可以被处理多次,但最终结果应该是一致的。
-
监控和错误处理:在整个过程中,需要监控消息队列的状态,并进行错误处理。例如,如果消息队列出现故障,可能需要重新连接或存储未处理的消息。
二、Binlog使用
Binlog是MySQL数据库的二进制日志,用于记录数据库的更改操作。它可以用于数据恢复、主从复制、数据备份和恢复等场景。以下是有关Binlog的一些常见用法:
-
数据恢复:通过分析Binlog,可以还原数据库到某个特定的时间点或事务提交后的状态。这在意外删除或误操作后恢复数据非常有用。
-
主从复制:通过将Binlog从主数据库复制到从数据库,可以实现主数据库的实时同步。这可以用于横向扩展读操作、数据备份和故障恢复。
-
数据备份和恢复:可以使用Binlog进行增量备份和恢复,只记录变更的部分,而不需要完全备份整个数据库。
-
数据审计和追踪:分析Binlog可以了解数据库的操作历史,包括谁在何时执行了哪些操作。这对于安全审计和数据追踪非常有用。
-
数据更改追踪和同步:通过解析Binlog,可以捕获和同步数据库的更改,以便在其他系统或数据仓库中进行进一步处理和分析。
要使用Binlog,首先需要在MySQL服务器上启用Binlog功能。可以在MySQL配置文件中设置以下参数:
log-bin=mysql-bin
binlog-format=ROW
其中,log-bin指定Binlog日志文件的名称前缀,binlog-format指定Binlog的格式为行格式。
启用Binlog后,MySQL将开始记录所有的更新操作到Binlog中。可以使用MySQL提供的工具(如mysqlbinlog)来解析和分析Binlog文件。还可以使用第三方工具(如Canal)来实时监控和解析Binlog,以便进行异步处理或其他操作。
需要注意的是,Binlog日志会占用磁盘空间,因此需要定期进行清理和归档,以避免过多的日志文件导致磁盘空间不足。
三、canal使用
Canal是一款开源的基于Java的MySQL数据库增量订阅&消费组件,它可以帮助我们实时监控MySQL的Binlog日志,并将数据推送到消息队列中进行异步处理。以下是使用Canal的一般步骤:
-
下载和安装Canal:首先需要从Canal的官方网站或Github上下载Canal的安装包,然后解压到指定的目录中。
-
配置Canal:Canal需要配置MySQL的相关信息,如主机地址、端口、用户名、密码等,以便连接到MySQL数据库。配置文件位于Canal安装目录下的
conf文件夹中,主要包括canal.properties和instance.properties两个文件。其中,canal.properties是全局配置文件,而instance.properties是实例级别的配置文件,可以配置多个实例。 -
启动Canal Server:通过运行Canal Server的启动脚本(如
startup.sh或startup.bat),启动Canal Server进程。 -
创建Canal实例:使用Canal提供的命令行工具或API,创建一个Canal实例,指定要订阅和监控的数据库和表。
-
消费Binlog事件:通过连接到Canal Server,订阅和消费Binlog事件。Canal可以将Binlog事件推送到Kafka、RocketMQ等消息队列中,供后续的异步处理。
四、Rabbit结合canal更新Redis缓存示例
通过canal 监控更新 Redis 缓存或者触发RabbitMQ发送消息示例:
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.rabbitmq.client.*;
import redis.clients.jedis.Jedis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.sql.Connection;
import java.util.List;public class RabbitCanalRedisExample {private static final String RABBITMQ_QUEUE_NAME = "canal_redis_queue";private static final String RABBITMQ_HOST = "localhost";private static final int RABBITMQ_PORT = 5672;private static final String CANAL_HOST = "localhost";private static final int CANAL_PORT = 11111;private static final String REDIS_HOST = "localhost";private static final int REDIS_PORT = 6379;public static void main(String[] args) throws Exception {// 连接 RabbitMQConnectionFactory factory = new ConnectionFactory();factory.setHost(RABBITMQ_HOST);factory.setPort(RABBITMQ_PORT);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(RABBITMQ_QUEUE_NAME, false, false, false, null);// 连接 CanalCanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_HOST, CANAL_PORT), "example", "", "");connector.connect();connector.subscribe(".*\\..*");connector.rollback();// 连接 RedisJedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);while (true) {// 从 Canal 获取消息Message message = connector.getWithoutAck(100);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 确认消息已处理connector.ack(batchId);continue;}// 遍历 Canal 消息for (CanalEntry.Entry entry : message.getEntries()) {if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {continue;}try {// 解析 Canal 消息CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {// 处理新增数据String tableName = entry.getHeader().getTableName();String key = rowData.getAfterColumnsList().stream().filter(c -> c.getIsKey()).findFirst().get().getValue();// 更新 Redis 缓存jedis.set(tableName + "_" + key, JSON.toJSONString(rowData.getAfterColumnsList()));// 发送消息到 RabbitMQchannel.basicPublish("", RABBITMQ_QUEUE_NAME, null, key.getBytes());} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {// 处理删除数据String tableName = entry.getHeader().getTableName();String key = rowData.getBeforeColumnsList().stream().filter(c -> c.getIsKey()).findFirst().get().getValue();// 删除 Redis 缓存jedis.del(tableName + "_" + key);// 发送消息到 RabbitMQchannel.basicPublish("", RABBITMQ_QUEUE_NAME, null, key.getBytes());} else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {// 处理更新数据String tableName = entry.getHeader().getTableName();String key = rowData.getAfterColumnsList().stream().filter(c -> c.getIsKey()).findFirst().get().getValue();// 更新 Redis 缓存jedis.set(tableName + "_" + key, JSON.toJSONString(rowData.getAfterColumnsList()));// 发送消息到 RabbitMQchannel.basicPublish("", RABBITMQ_QUEUE_NAME, null, key.getBytes());}}} catch (IOException e) {e.printStackTrace();}}}// 确认消息已处理connector.ack(batchId);}
}
这个示例代码实现了以下功能:
- 使用 Canal 连接到 MySQL 数据库并订阅所有表。
- 使用 RabbitMQ 队列接收并发送消息。
- 使用 Redis 缓存数据。
- 根据 Canal 消息的类型进行相应操作:新增数据时,在 Redis 中保存数据并发送消息到 RabbitMQ;删除数据时,从 Redis 中删除数据并发送消息到 RabbitMQ;更新数据时,更新 Redis 中的数据并发送消息到 RabbitMQ。
区分场景,也可以先发送消息,通过消息触发缓存更新,好处有以下几点:
-
异步操作:将更新缓存的操作写入消息队列后,在实际更新缓存的过程中不会阻塞应用程序的执行。应用程序可以继续处理其他的业务逻辑,而不必等待缓存更新完成。这样可以提高应用程序的响应速度和吞吐量。
-
并发控制:通过消息队列可以实现缓存更新的并发控制。当多个请求同时更新缓存时,可以通过消息队列的先进先出原则,确保每个更新操作按照顺序进行,避免了并发操作导致的数据不一致问题。
-
可靠性:消息队列可以提供消息的持久化机制,即使在消息队列出现故障或重启的情况下,更新缓存的消息不会丢失。这样可以提高缓存更新的可靠性和数据完整性。
-
扩展性:通过消息队列,可以将缓存更新的任务分发到多个消费者进行处理,实现任务的并行处理,提高系统的扩展性和负载能力。
总的来说,使用消息队列更新缓存可以实现异步操作、并发控制、可靠性和扩展性,提高系统的性能和可靠性。
相关文章:
实现canal监控binlog日志再通过消息队列异步处理
一、简单步骤 实现Canal监控Binlog日志,并通过消息队列进行异步处理,步骤如下: 配置Canal:首先,需要配置Canal进行Binlog日志监控。可以通过Canal的官方文档了解如何配置Canal。 连接到Canal:使用Canal客户…...
Linux DNS 协议概述
1. DNS 概述 互联网中,一台计算机与其他计算机通信时,通过 IP 地址唯一的标志自己。此时的 IP 地址就类似于我们日常生活中的电话号码。但是,这种纯数字的标识是比较难记忆的,而且数量也比较庞大。例如,每个 IPv4 地址…...
linux打包qt程序
参考这篇文章:Linux下Qt程序打包_linuxdeployqt下载-CSDN博客 本篇文章的系统环境是 : 虚拟机ubuntu18.04 用下面这个qmake路径 进行编译 在 ~/.bashrc 文件末尾,qmake目录配置到文件末尾 将上图中bin目录下的linuxdeployqt程序拷贝到/usr/bin下一份 &…...
软考中级-软件设计师通过心路经验分享
执念,第四次终于通过了 没买书,下班后每天2小时,四个2个月终于过了 学习经验: 1.下班后学习真的靠毅力,和上学的时候考证不是一个状态,大家要及时调整,否则过程很痛苦 2.失败三次的经验…...
safe area helper插件
概述 显示不同机型的必能显示的区域 实现步骤 引入safearea,引入其中的safearea的csharp 为cancas加入gameobject gameobject中加入safearea脚本 将UI作为这个gameobject的子物体,就可以完成显示...
李宏毅机器学习-批次 (batch)和动量(momentum)
一.batch(批次) 在计算微分时,不是对所有的数据算出来的Loss值做微分,而是将所有的数据分成一个一个的batch。一个batch是一个B,在更新参数时,拿B的资料计算Loss,计算gradient,再更新…...
C# 网络编程--关于UDP 通信(二)
UDP (User Datagram Protocol) 是一种无连接的传输层协议,主要用于支持数据报文的传输。它的主要特点包括简单、高效、不保证可靠性和顺序。 1.UDP协议基本概念 1.udp基于IP的简单的协议,不可靠的协议 2.优点:简单、 轻量化、 传输速度高、…...
【k8s集群应用】Kubernetes部署安装-二进制部署实例
文章目录 Kubernetes 部署方式常见的K8S安装部署方式Kubeadm与二进制部署的区别 Kubernetes部署安装环境配置Kubernetes集群初始化配置(实验环境)一、操作系统初始化配置二、部署Docker引擎 etcd 集群搭建配置 etcd 集群 Kubernetes Master 组件部署准备…...
js常见代码输出问题之promise,await,变量提升以及闭包(包括例子以及详细解析)
这里写目录标题 异步事件循环宏任务微任务1. 执行顺序2. 分类 Promise代码输出1. promise.then执行时机2. 宏任务微任务的多轮次3. .then .catch会返回新的promise4. 返回任意一个非 promise 的值都会被包裹成 promise 对象5. .then .catch 的值不能是promise本身6. 值透传7. .…...
遗传算法与深度学习实战(27)——进化卷积神经网络
遗传算法与深度学习实战(27)——进化卷积神经网络 0. 前言1. 自定义交叉算子2. 自定义突变操作符3. 进化卷积神经网络小结系列链接 0. 前言 DEAP toolbox 中提供的标准遗传操作符对于自定义的网络架构基因序列来说是不够的。这是因为任何标准的交叉算子…...
【Vue3】前端使用 FFmpeg.wasm 完成用户视频录制,并对视频进行压缩处理
强烈推荐这篇博客!非常全面的一篇文章,本文是对该博客的简要概括和补充,在不同技术栈中提供一种可行思路,可先阅读该篇文章再阅读本篇: FFmpeg——在Vue项目中使用FFmpeg(安装、配置、使用、SharedArrayBu…...
基础算法——前缀和
由于比赛基本都是采用Dev-C所以,算法篇基本都是采用Dev-C来解释(版本5.11,c11) 首先介绍一下前缀和算法 给定一个数组,有q次询问,每次询问: 两个整数l,r,求出数组 l 到 r的结果 遇…...
spring实例化对象的几种方式(使用XML配置文件)
前言 Spring框架作为一个轻量级的控制反转(IoC)容器,为开发者提供了多种对象实例化的策略。通过这些策略,开发者可以更加灵活地控制对象的生命周期和依赖关系。无论是通过XML配置、注解配置还是Java配置,Spring都能…...
【二叉树】力扣 129.求根节点到叶子节点数字之和
一、题目 二、思路 每找到一个非空节点,之前路径上的所有节点的数量级都要增加1个单位。例如,当前节点为3,之前的节点路径为1 -> 2,presum 1 * 10 2 12,现在路径变为了 1 -> 2 -> 3,sum pres…...
深度学习物体检测之YOLOV5源码解读
V5比前面版本偏工程化,项目化,更贴合实战 一.V5版本项目配置 (1)整体项目概述 首先github直接查找yolov5,下载下来即可。在训练时,数据是怎么处理的?网络模型架构是怎么设计的(如各层的设计)?yolov5要求是大于python3.8与大于等…...
音频数据采样入门详解 - 给Python初学者的简单解释
音频数据采样入门详解 - 给Python初学者的简单解释 声音是如何变成数字的?什么是采样率?为什么要懂这个?Python小例子总结 大家好!今天我们来聊一个有趣的话题:音频数据是如何在计算机中处理的。让我用最简单的方式来解…...
Unity类银河战士恶魔城学习总结(P179 Enemy Archer 弓箭手)
教程源地址:https://www.udemy.com/course/2d-rpg-alexdev/ 本章节实现了敌人弓箭手的制作 Enemy_Archer.cs 核心功能 状态机管理敌人的行为 定义了多个状态对象(如 idleState、moveState、attackState 等),通过状态机管理敌人的…...
SpringCloud集成sleuth和zipkin实现微服务链路追踪
文章目录 前言技术积累spring cloud sleuth介绍zipkin介绍Zipkin与Sleuth的协作 SpringCloud多模块搭建Zipkin Server部署docker pull 镜像启动zipkin server SpringCloud 接入 Sleuth 与 Zipkinpom引入依赖 (springboot2.6)appilication.yml配置修改增加测试链路代码 调用微服…...
Python随机抽取Excel数据并在处理后整合为一个文件
本文介绍基于Python语言,针对一个文件夹下大量的Excel表格文件,基于其中每一个文件,随机从其中选取一部分数据,并将全部文件中随机获取的数据合并为一个新的Excel表格文件的方法。 首先,我们来明确一下本文的具体需求。…...
Linux+Docker onlyoffice 启用 HTTPS 端口支持
文章目录 一、需求二、配置2.1 创建容器2.2 进入容器2.3 生成私钥和证书 2.4 测试访问 一、需求 上篇文章介绍了如何搭建一个 onlyoffice 在线预览服务,但是我们实际场景调用该服务的网站是协议是 https 的 ,但是 onlyoffice 服务还没做配置,…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
通过Wrangler CLI在worker中创建数据库和表
官方使用文档:Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后,会在本地和远程创建数据库: npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库: 现在,您的Cloudfla…...
AtCoder 第409场初级竞赛 A~E题解
A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...
【项目实战】通过多模态+LangGraph实现PPT生成助手
PPT自动生成系统 基于LangGraph的PPT自动生成系统,可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析:自动解析Markdown文档结构PPT模板分析:分析PPT模板的布局和风格智能布局决策:匹配内容与合适的PPT布局自动…...
相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...
【配置 YOLOX 用于按目录分类的图片数据集】
现在的图标点选越来越多,如何一步解决,采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集(每个目录代表一个类别,目录下是该类别的所有图片),你需要进行以下配置步骤&#x…...
基于matlab策略迭代和值迭代法的动态规划
经典的基于策略迭代和值迭代法的动态规划matlab代码,实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...
算法岗面试经验分享-大模型篇
文章目录 A 基础语言模型A.1 TransformerA.2 Bert B 大语言模型结构B.1 GPTB.2 LLamaB.3 ChatGLMB.4 Qwen C 大语言模型微调C.1 Fine-tuningC.2 Adapter-tuningC.3 Prefix-tuningC.4 P-tuningC.5 LoRA A 基础语言模型 A.1 Transformer (1)资源 论文&a…...
【分享】推荐一些办公小工具
1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由:大部分的转换软件需要收费,要么功能不齐全,而开会员又用不了几次浪费钱,借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...
