当前位置: 首页 > news >正文

实现canal监控binlog日志再通过消息队列异步处理

一、简单步骤

在这里插入图片描述

实现Canal监控Binlog日志,并通过消息队列进行异步处理,步骤如下:

  1. 配置Canal:首先,需要配置Canal进行Binlog日志监控。可以通过Canal的官方文档了解如何配置Canal。

  2. 连接到Canal:使用Canal客户端连接到Canal服务器,订阅需要监控的数据库和表。

  3. 监听Binlog事件:通过Canal客户端,监听Canal服务器发送的Binlog事件。可以通过设置监听器来处理不同类型的Binlog事件。

  4. 将事件发送到消息队列:一旦监听到Binlog事件,可以将事件转化为消息,并发送到消息队列进行异步处理。可以选择使用Kafka、RabbitMQ、ActiveMQ等消息队列。

  5. 消息处理:在消息队列中,可以编写消费者程序来处理接收到的消息。根据消息的类型和内容,可以执行相应的操作,例如更新数据库,生成报告等。

  6. 确保处理的幂等性:在处理消息时,需要注意幂等性。即,同一条消息可以被处理多次,但最终结果应该是一致的。

  7. 监控和错误处理:在整个过程中,需要监控消息队列的状态,并进行错误处理。例如,如果消息队列出现故障,可能需要重新连接或存储未处理的消息。

二、Binlog使用

Binlog是MySQL数据库的二进制日志,用于记录数据库的更改操作。它可以用于数据恢复、主从复制、数据备份和恢复等场景。以下是有关Binlog的一些常见用法:

  1. 数据恢复:通过分析Binlog,可以还原数据库到某个特定的时间点或事务提交后的状态。这在意外删除或误操作后恢复数据非常有用。

  2. 主从复制:通过将Binlog从主数据库复制到从数据库,可以实现主数据库的实时同步。这可以用于横向扩展读操作、数据备份和故障恢复。

  3. 数据备份和恢复:可以使用Binlog进行增量备份和恢复,只记录变更的部分,而不需要完全备份整个数据库。

  4. 数据审计和追踪:分析Binlog可以了解数据库的操作历史,包括谁在何时执行了哪些操作。这对于安全审计和数据追踪非常有用。

  5. 数据更改追踪和同步:通过解析Binlog,可以捕获和同步数据库的更改,以便在其他系统或数据仓库中进行进一步处理和分析。

要使用Binlog,首先需要在MySQL服务器上启用Binlog功能。可以在MySQL配置文件中设置以下参数:

log-bin=mysql-bin
binlog-format=ROW

其中,log-bin指定Binlog日志文件的名称前缀,binlog-format指定Binlog的格式为行格式。

启用Binlog后,MySQL将开始记录所有的更新操作到Binlog中。可以使用MySQL提供的工具(如mysqlbinlog)来解析和分析Binlog文件。还可以使用第三方工具(如Canal)来实时监控和解析Binlog,以便进行异步处理或其他操作。

需要注意的是,Binlog日志会占用磁盘空间,因此需要定期进行清理和归档,以避免过多的日志文件导致磁盘空间不足。

三、canal使用

Canal是一款开源的基于Java的MySQL数据库增量订阅&消费组件,它可以帮助我们实时监控MySQL的Binlog日志,并将数据推送到消息队列中进行异步处理。以下是使用Canal的一般步骤:

  1. 下载和安装Canal:首先需要从Canal的官方网站或Github上下载Canal的安装包,然后解压到指定的目录中。

  2. 配置Canal:Canal需要配置MySQL的相关信息,如主机地址、端口、用户名、密码等,以便连接到MySQL数据库。配置文件位于Canal安装目录下的conf文件夹中,主要包括canal.propertiesinstance.properties两个文件。其中,canal.properties是全局配置文件,而instance.properties是实例级别的配置文件,可以配置多个实例。

  3. 启动Canal Server:通过运行Canal Server的启动脚本(如startup.shstartup.bat),启动Canal Server进程。

  4. 创建Canal实例:使用Canal提供的命令行工具或API,创建一个Canal实例,指定要订阅和监控的数据库和表。

  5. 消费Binlog事件:通过连接到Canal Server,订阅和消费Binlog事件。Canal可以将Binlog事件推送到Kafka、RocketMQ等消息队列中,供后续的异步处理。

四、Rabbit结合canal更新Redis缓存示例

通过canal 监控更新 Redis 缓存或者触发RabbitMQ发送消息示例:

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.rabbitmq.client.*;
import redis.clients.jedis.Jedis;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.sql.Connection;
import java.util.List;public class RabbitCanalRedisExample {private static final String RABBITMQ_QUEUE_NAME = "canal_redis_queue";private static final String RABBITMQ_HOST = "localhost";private static final int RABBITMQ_PORT = 5672;private static final String CANAL_HOST = "localhost";private static final int CANAL_PORT = 11111;private static final String REDIS_HOST = "localhost";private static final int REDIS_PORT = 6379;public static void main(String[] args) throws Exception {// 连接 RabbitMQConnectionFactory factory = new ConnectionFactory();factory.setHost(RABBITMQ_HOST);factory.setPort(RABBITMQ_PORT);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(RABBITMQ_QUEUE_NAME, false, false, false, null);// 连接 CanalCanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_HOST, CANAL_PORT), "example", "", "");connector.connect();connector.subscribe(".*\\..*");connector.rollback();// 连接 RedisJedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);while (true) {// 从 Canal 获取消息Message message = connector.getWithoutAck(100);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}// 确认消息已处理connector.ack(batchId);continue;}// 遍历 Canal 消息for (CanalEntry.Entry entry : message.getEntries()) {if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {continue;}try {// 解析 Canal 消息CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {// 处理新增数据String tableName = entry.getHeader().getTableName();String key = rowData.getAfterColumnsList().stream().filter(c -> c.getIsKey()).findFirst().get().getValue();// 更新 Redis 缓存jedis.set(tableName + "_" + key, JSON.toJSONString(rowData.getAfterColumnsList()));// 发送消息到 RabbitMQchannel.basicPublish("", RABBITMQ_QUEUE_NAME, null, key.getBytes());} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {// 处理删除数据String tableName = entry.getHeader().getTableName();String key = rowData.getBeforeColumnsList().stream().filter(c -> c.getIsKey()).findFirst().get().getValue();// 删除 Redis 缓存jedis.del(tableName + "_" + key);// 发送消息到 RabbitMQchannel.basicPublish("", RABBITMQ_QUEUE_NAME, null, key.getBytes());} else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {// 处理更新数据String tableName = entry.getHeader().getTableName();String key = rowData.getAfterColumnsList().stream().filter(c -> c.getIsKey()).findFirst().get().getValue();// 更新 Redis 缓存jedis.set(tableName + "_" + key, JSON.toJSONString(rowData.getAfterColumnsList()));// 发送消息到 RabbitMQchannel.basicPublish("", RABBITMQ_QUEUE_NAME, null, key.getBytes());}}} catch (IOException e) {e.printStackTrace();}}}// 确认消息已处理connector.ack(batchId);}
}

这个示例代码实现了以下功能:

  1. 使用 Canal 连接到 MySQL 数据库并订阅所有表。
  2. 使用 RabbitMQ 队列接收并发送消息。
  3. 使用 Redis 缓存数据。
  4. 根据 Canal 消息的类型进行相应操作:新增数据时,在 Redis 中保存数据并发送消息到 RabbitMQ;删除数据时,从 Redis 中删除数据并发送消息到 RabbitMQ;更新数据时,更新 Redis 中的数据并发送消息到 RabbitMQ。

区分场景,也可以先发送消息,通过消息触发缓存更新,好处有以下几点:

  1. 异步操作:将更新缓存的操作写入消息队列后,在实际更新缓存的过程中不会阻塞应用程序的执行。应用程序可以继续处理其他的业务逻辑,而不必等待缓存更新完成。这样可以提高应用程序的响应速度和吞吐量。

  2. 并发控制:通过消息队列可以实现缓存更新的并发控制。当多个请求同时更新缓存时,可以通过消息队列的先进先出原则,确保每个更新操作按照顺序进行,避免了并发操作导致的数据不一致问题。

  3. 可靠性:消息队列可以提供消息的持久化机制,即使在消息队列出现故障或重启的情况下,更新缓存的消息不会丢失。这样可以提高缓存更新的可靠性和数据完整性。

  4. 扩展性:通过消息队列,可以将缓存更新的任务分发到多个消费者进行处理,实现任务的并行处理,提高系统的扩展性和负载能力。

总的来说,使用消息队列更新缓存可以实现异步操作、并发控制、可靠性和扩展性,提高系统的性能和可靠性。

相关文章:

实现canal监控binlog日志再通过消息队列异步处理

一、简单步骤 实现Canal监控Binlog日志,并通过消息队列进行异步处理,步骤如下: 配置Canal:首先,需要配置Canal进行Binlog日志监控。可以通过Canal的官方文档了解如何配置Canal。 连接到Canal:使用Canal客户…...

Linux DNS 协议概述

1. DNS 概述 互联网中,一台计算机与其他计算机通信时,通过 IP 地址唯一的标志自己。此时的 IP 地址就类似于我们日常生活中的电话号码。但是,这种纯数字的标识是比较难记忆的,而且数量也比较庞大。例如,每个 IPv4 地址…...

linux打包qt程序

参考这篇文章:Linux下Qt程序打包_linuxdeployqt下载-CSDN博客 本篇文章的系统环境是 : 虚拟机ubuntu18.04 用下面这个qmake路径 进行编译 在 ~/.bashrc 文件末尾,qmake目录配置到文件末尾 将上图中bin目录下的linuxdeployqt程序拷贝到/usr/bin下一份 &…...

软考中级-软件设计师通过心路经验分享

执念,第四次终于通过了 没买书,下班后每天2小时,四个2个月终于过了 学习经验: 1.下班后学习真的靠毅力,和上学的时候考证不是一个状态,大家要及时调整,否则过程很痛苦 2.失败三次的经验&#xf…...

safe area helper插件

概述 显示不同机型的必能显示的区域 实现步骤 引入safearea,引入其中的safearea的csharp 为cancas加入gameobject gameobject中加入safearea脚本 将UI作为这个gameobject的子物体,就可以完成显示...

李宏毅机器学习-批次 (batch)和动量(momentum)

一.batch(批次) 在计算微分时,不是对所有的数据算出来的Loss值做微分,而是将所有的数据分成一个一个的batch。一个batch是一个B,在更新参数时,拿B的资料计算Loss,计算gradient,再更新…...

C# 网络编程--关于UDP 通信(二)

UDP (User Datagram Protocol) 是一种无连接的传输层协议,主要用于支持数据报文的传输。它的主要特点包括简单、高效、不保证可靠性和顺序。 1.UDP协议基本概念 1.udp基于IP的简单的协议,不可靠的协议 2.优点:简单、 轻量化、 传输速度高、…...

【k8s集群应用】Kubernetes部署安装-二进制部署实例

文章目录 Kubernetes 部署方式常见的K8S安装部署方式Kubeadm与二进制部署的区别 Kubernetes部署安装环境配置Kubernetes集群初始化配置(实验环境)一、操作系统初始化配置二、部署Docker引擎 etcd 集群搭建配置 etcd 集群 Kubernetes Master 组件部署准备…...

js常见代码输出问题之promise,await,变量提升以及闭包(包括例子以及详细解析)

这里写目录标题 异步事件循环宏任务微任务1. 执行顺序2. 分类 Promise代码输出1. promise.then执行时机2. 宏任务微任务的多轮次3. .then .catch会返回新的promise4. 返回任意一个非 promise 的值都会被包裹成 promise 对象5. .then .catch 的值不能是promise本身6. 值透传7. .…...

遗传算法与深度学习实战(27)——进化卷积神经网络

遗传算法与深度学习实战(27)——进化卷积神经网络 0. 前言1. 自定义交叉算子2. 自定义突变操作符3. 进化卷积神经网络小结系列链接 0. 前言 DEAP toolbox 中提供的标准遗传操作符对于自定义的网络架构基因序列来说是不够的。这是因为任何标准的交叉算子…...

【Vue3】前端使用 FFmpeg.wasm 完成用户视频录制,并对视频进行压缩处理

强烈推荐这篇博客!非常全面的一篇文章,本文是对该博客的简要概括和补充,在不同技术栈中提供一种可行思路,可先阅读该篇文章再阅读本篇: FFmpeg——在Vue项目中使用FFmpeg(安装、配置、使用、SharedArrayBu…...

基础算法——前缀和

由于比赛基本都是采用Dev-C所以,算法篇基本都是采用Dev-C来解释(版本5.11,c11) 首先介绍一下前缀和算法 给定一个数组,有q次询问,每次询问: 两个整数l,r,求出数组 l 到 r的结果 遇…...

spring实例化对象的几种方式(使用XML配置文件)

前言 Spring框架作为一个轻量级的控制反转(IoC)容器,为开发者提供了多种对象实例化的策略。通过这些策略,开发者可以更加灵活地控制对象的生命周期和依赖关系。无论是通过XML配置、注解配置还是Java配置,Spring都能…...

【二叉树】力扣 129.求根节点到叶子节点数字之和

一、题目 二、思路 每找到一个非空节点,之前路径上的所有节点的数量级都要增加1个单位。例如,当前节点为3,之前的节点路径为1 -> 2,presum 1 * 10 2 12,现在路径变为了 1 -> 2 -> 3,sum pres…...

深度学习物体检测之YOLOV5源码解读

V5比前面版本偏工程化,项目化,更贴合实战 一.V5版本项目配置 (1)整体项目概述 首先github直接查找yolov5,下载下来即可。在训练时,数据是怎么处理的?网络模型架构是怎么设计的(如各层的设计)?yolov5要求是大于python3.8与大于等…...

音频数据采样入门详解 - 给Python初学者的简单解释

音频数据采样入门详解 - 给Python初学者的简单解释 声音是如何变成数字的?什么是采样率?为什么要懂这个?Python小例子总结 大家好!今天我们来聊一个有趣的话题:音频数据是如何在计算机中处理的。让我用最简单的方式来解…...

Unity类银河战士恶魔城学习总结(P179 Enemy Archer 弓箭手)

教程源地址:https://www.udemy.com/course/2d-rpg-alexdev/ 本章节实现了敌人弓箭手的制作 Enemy_Archer.cs 核心功能 状态机管理敌人的行为 定义了多个状态对象(如 idleState、moveState、attackState 等),通过状态机管理敌人的…...

SpringCloud集成sleuth和zipkin实现微服务链路追踪

文章目录 前言技术积累spring cloud sleuth介绍zipkin介绍Zipkin与Sleuth的协作 SpringCloud多模块搭建Zipkin Server部署docker pull 镜像启动zipkin server SpringCloud 接入 Sleuth 与 Zipkinpom引入依赖 (springboot2.6)appilication.yml配置修改增加测试链路代码 调用微服…...

Python随机抽取Excel数据并在处理后整合为一个文件

本文介绍基于Python语言,针对一个文件夹下大量的Excel表格文件,基于其中每一个文件,随机从其中选取一部分数据,并将全部文件中随机获取的数据合并为一个新的Excel表格文件的方法。 首先,我们来明确一下本文的具体需求。…...

Linux+Docker onlyoffice 启用 HTTPS 端口支持

文章目录 一、需求二、配置2.1 创建容器2.2 进入容器2.3 生成私钥和证书 2.4 测试访问 一、需求 上篇文章介绍了如何搭建一个 onlyoffice 在线预览服务,但是我们实际场景调用该服务的网站是协议是 https 的 ,但是 onlyoffice 服务还没做配置&#xff0c…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...

K8S认证|CKS题库+答案| 11. AppArmor

目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...

React Native 导航系统实战(React Navigation)

导航系统实战(React Navigation) React Navigation 是 React Native 应用中最常用的导航库之一,它提供了多种导航模式,如堆栈导航(Stack Navigator)、标签导航(Tab Navigator)和抽屉…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

uniapp中使用aixos 报错

问题: 在uniapp中使用aixos,运行后报如下错误: AxiosError: There is no suitable adapter to dispatch the request since : - adapter xhr is not supported by the environment - adapter http is not available in the build 解决方案&…...

Java毕业设计:WML信息查询与后端信息发布系统开发

JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发,实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构,服务器端使用Java Servlet处理请求,数据库采用MySQL存储信息&#xff0…...

32单片机——基本定时器

STM32F103有众多的定时器,其中包括2个基本定时器(TIM6和TIM7)、4个通用定时器(TIM2~TIM5)、2个高级控制定时器(TIM1和TIM8),这些定时器彼此完全独立,不共享任何资源 1、定…...

TJCTF 2025

还以为是天津的。这个比较容易,虽然绕了点弯,可还是把CP AK了,不过我会的别人也会,还是没啥名次。记录一下吧。 Crypto bacon-bits with open(flag.txt) as f: flag f.read().strip() with open(text.txt) as t: text t.read…...