Canal笔记:安装与整合Springboot模式Mysql同步Redis
官方文档
https://github.com/alibaba/canal
使用场景
学习一件东西前,要知道为什么使用它。
1、同步mysql数据到redis
常规情况下,产生数据的方法可能有很多地方,那么就需要在多个地方中,都去做mysql数据同步到redis的处理,相对麻烦很多。
可以使用canal,对mysql进行集中,统一的处理。
概述
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
原理
MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relaylog 中事件,将数据变更反映它自己的数据

canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
架构
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
- server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1…n个instance)

安装和准备
Centos7安装Canal
1、Mysql配置
开启binlog日志
如果是使用Linux安装的话,则直接找my.cnf,直接修改内容即可
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
docker安装
1、安装my.cnf文件
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
2、修改docker-compose.yaml内容
配置挂载卷 前面路径为my.cnf的路径,/etc/mysql/conf.d的路径

3、查询是否成功
show variables like "%server_id%";

show variables like 'log_bin';

获取bin_log当前位置
show master status;

获取后,记录下来,然后不要动数据库了
创建canal数据库用户
这里可以使用
mysql -uroot -p登录进入设置,
或者直接可视化页面
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
2、Canal下载
下载
方式一:关注公众号 I am Walker 回复canal
方式二:https://github.com/alibaba/canal/releases?page=2

# 创建文件夹
mkdir /opt/env/canal
# 解压
tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/env/canal
3、配置文件修改
进入canal/conf/example/instance.properties
主要修改下列相关参数
# 数据库
canal.instance.master.address=127.0.0.1:3306
# bin log日志
canal.instance.master.journal.name=mysql-bin.000001
# bin log写入位置
canal.instance.master.position=157#数据库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
之后进入 canal/bin 执行 ./startup.sh
查看是否启动

有CanalLauncher则代表ok,或者看日志也ok
场景
springboot整合
简单整合
1、依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency>
2、配置文件
canal:# 服务地址serverAddress: localhost# 端口serverPort: 11111# 订阅 库 表subscrie: ".*\\..*"# batchSize: 100# 实例instance:- example
subscrie配置
全库全表
connector.subscribe(".*\\..*")
指定库全表
connector.subscribe("test\\..*")
单表
connector.subscribe("test.user")
多规则组合使用
connector.subscribe("test\\..*,test2.user1,test3.user2")
3、properties类
package com.walker.mybatisplus.canal;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import java.util.List;@Data
@Component
@ConfigurationProperties(value = "canal")
public class CanalProperties {private String serverAddress;private Integer serverPort;private String subcribe;private Integer batchSize;private List<String> instance;}
4、监听类编写
package com.walker.mybatisplus.canal;import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
@Component
public class CanalListener {@Autowiredprivate CanalProperties canalProperties;public static Map<String,Integer> NUM_MAP=new HashMap<>();/*** 可能会有多业务,不同的业务,应该有多个处理类,不要使用if等*/@PostConstructpublic void run() throws InterruptedException, InvalidProtocolBufferException {//创建Canal连接对象CanalConnector conn = CanalConnectors.newSingleConnector(new InetSocketAddress(canalProperties.getServerAddress(),canalProperties.getServerPort()),canalProperties.getInstance().get(0),null, null);while(true){//连接conn.connect();
// 监听的数据库和表conn.subscribe(canalProperties.getSubcribe());//回滚操作conn.rollback();//获取信息Message message = conn.getWithoutAck(canalProperties.getBatchSize());long id = message.getId();List<CanalEntry.Entry> entries = message.getEntries();if(id!=-1&&entries.size()>0){//处理数据messageProcess(entries);}else{//防止重复链接数据库Thread.sleep(1000);}//确认消费信息conn.ack(id);//释放连接conn.disconnect();}}private void messageProcess(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entries) {log.info("接收Entry:{}", entry);CanalEntry.Header header = entry.getHeader();//数据库String schemaName = header.getSchemaName();//表名String tableName = header.getTableName();//事件类型CanalEntry.EventType eventType = header.getEventType();//这里可以对数据库和表进行一个重新判断 虽然在subscribe已经定义,但是一般可以配置一个库,然后表的可能可以是全部表
// 对库进行判断if(!"walker_share".equals(schemaName)){continue;}//对表进行判断//这里只是一个案例,如果是实际场景,可以使用工厂模式去编写,不然会有很多的ifif("dish".equals(tableName)){//获取修改数据List<CanalEntry.RowData> rowDataList = getRowDataList(entry);//新增if(eventType.getNumber()==CanalEntry.EventType.INSERT_VALUE){log.info("新增事件");if(CollUtil.isEmpty(rowDataList)) continue;//模拟场景:获取新增的数据,并存储到redis中,这里是直接存储到Map中for (CanalEntry.RowData rowData : rowDataList) {List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {//获取name的类型if("type".equals(column.getName())){//模拟redis 根据类型进行分类String key = column.getValue();NUM_MAP.put(key,NUM_MAP.getOrDefault(key,0)+1);log.info("NUM_MAP {}",NUM_MAP);continue;}}}}if(eventType.getNumber()==CanalEntry.EventType.UPDATE_VALUE){log.info("修改事件");}if(eventType.getNumber()==CanalEntry.EventType.DELETE_VALUE){log.info("删除事件");}}}}//获取row数据private List<CanalEntry.RowData> getRowDataList(CanalEntry.Entry entry) {CanalEntry.RowChange rowChange=null;try {//解析数据rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);}List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();return rowDatasList;}
}
相关类和配置
CanalConnector

CanalEntry

EntryType

Header

EventType
事件类型,可以根据事件类型去做不一样的操作

RowChange

获取数据
CanalEntry.RowChange rowChange=null;
try {//解析数据rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
}
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
log.info("rowDatasList:{}",rowDatasList);
RowData

CanalEntry.Column
属性

问题
IOException: caching_sha2_password Auth failed
因为mysql8.0.3后身份检验方式为caching_sha2_password,但canal使用的是mysql_native_password,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错IOException: caching_sha2_password Auth failed
参考文档
Java开发 - Canal的基本用法_canal java-CSDN博客
15分钟学会Canal安装与部署-CSDN博客
SpringBoot整合Canal1.1.6并同步数据到Redis(超详细和很多踩坑点)_canal同步数据到redis-CSDN博客
相关文章:
Canal笔记:安装与整合Springboot模式Mysql同步Redis
官方文档 https://github.com/alibaba/canal 使用场景 学习一件东西前,要知道为什么使用它。 1、同步mysql数据到redis 常规情况下,产生数据的方法可能有很多地方,那么就需要在多个地方中,都去做mysql数据同步到redis的处理&…...
C++的继承语法
在面向对象编程中,继承是一种强大的机制,允许一个类(子类)从另一个类(父类)继承属性和方法。C是一种支持面向对象编程的编程语言,通过其灵活而强大的继承语法,开发者可以构建更加模块…...
C# .NET平台提取PDF表格数据,并转换为txt、CSV和Excel表格文件
处理PDF文件中的内容是比较麻烦的事情,特别是以表格形式呈现的各种数据。为了充分利用这些宝贵的数据资源,我们可以通过程序提取PDF文件中的表格,并将其保存为更易于处理和分析的格式,如txt、csv、xlsx,从而更方便地对…...
spring boot学习第五篇:spring boot与JPA结合
1、准备表,创建表语句如下 CREATE TABLE girl (id int(11) NOT NULL AUTO_INCREMENT,cup_Size varchar(100) COLLATE utf8mb4_bin DEFAULT NULL,age int(11) DEFAULT NULL,PRIMARY KEY (id) ) ENGINEInnoDB AUTO_INCREMENT4 DEFAULT CHARSETutf8mb4 COLLATEutf8mb4…...
代理IP怎么使用?Mac苹果系统设置http代理IP教程
代理IP是一种通过将请求转发到另一个服务器,以隐藏自己的真实IP地址的服务器。使用代理IP可以保护您的隐私和安全,防止被跟踪或被攻击。在本文中,我们将介绍如何在Mac苹果系统上设置http代理IP教程。 一、了解代理IP 代理IP地址是一种可以用来…...
postgresql_conf中常用配置项
在 PostgreSQL 的 postgresql.conf 配置文件中,有许多常用的配置项,这些配置项可以根据特定需求和性能优化进行调整。以下是一些常用的配置项及其作用: 1. shared_buffers 用于设置 PostgreSQL 实例使用的共享内存缓冲区大小。增加此值可以…...
使用MfgTool烧写前需准备的文件
一. 简介 本文我们就来学习,如何将我们编译的 uboot,zImage(内核镜像),xxx.dtb设备树文件,还有制作的根文件系统,这四个文件烧写到开发板中,最后 开发板能正常启动。 本文这里使用…...
SAP UI5 walkthrough step4 XML Views
SAPUI5 指出多种VIEW类型,包括XML,HTML,JavaScript 推荐使用XML,因为可读性更高 我们提前介绍一下MVC架构。 MVC是一种软件架构模式,它包括三个主要组件:模型(Model)、视图(View)…...
Java 1对1
文章目录 前言 客户端 服务器端 输出线程端 End 前言 TCP(Transmission Control Protocol)是一种面向连接的、可靠的网络传输协议,它提供了端到端的数据传输和可靠性保证。 本程序就是基于tcp协议编写而成的。 利用 TCP 协议进行通信的…...
云服务器Centos中安装Docker
云服务器Centos中安装Docker 1 简介DockerCentosCentos和Ubuntu区别 2 安装3 测试hello-world的镜像测试 1 简介 Docker Docker是一个开源的应用容器引擎,利用操作系统本身已有的机制和特性,可以实现远超传统虚拟机的轻量级虚拟化。它支持将软件编译成…...
人工智能教程(三):更多有用的 Python 库
目录 前言 推荐 JupyterLab 入门 复杂的矩阵运算 其它人工智能和机器学习的 Python 库 前言 在本系列的上一篇人工智能教程(二):人工智能的历史以及再探矩阵中,我们回顾了人工智能的历史,然后详细地讨论了矩阵。在…...
【带头学C++】----- 九、类和对象 ---- 9.10 C++设计模式之单例模式设计
❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️麻烦您点个关注,不迷路❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️❤️ 目 录 9.10 C设计模式之单例模式设计 举例说明: 9.10 C设计模式之单例模式设计 看过我之前的文章的,简单讲解过C/Q…...
Qt之QCache和QContiguousCache
一.QCache QCache在构造的时候指定了缓存中允许的最大成本,也就是如下构造函数中的参数maxCost。默认情况下,QCaches maxCost() 是100。 QCache(int maxCost = 100) ~QCache() void clear() bool contains(const Key &key) const int count() const bool insert(const …...
Django讲课笔记01:初探Django框架
文章目录 一、学习目标二、课程导入(一)课程简介(二)课程目标(三)适用人群(四)教学方式(五)评估方式(六)参考教材 三、新课讲授&#…...
JS中的闭包
闭包 闭包的概念其实很简单,就是函数A内部有一个函数B,函数B可以访问函数A的变量。也就是说闭包是指有权访问另一个函数作用域中变量的函数,利用闭包可以突破作用域链。 闭包的特性: 1、函数内再嵌套函数 2、内部函数可以引用外层的参数和变…...
深度学习在计算机视觉中的应用
深度学习在计算机视觉中的应用 摘要:本文介绍了深度学习在计算机视觉领域的应用,包括目标检测、图像分类、人脸识别等。通过分析深度学习在计算机视觉中的实际应用案例,阐述了深度学习在计算机视觉中的优势和未来发展趋势。 一、引言 计算…...
模板与泛型编程
函数模板 显示实例化 区别定义与声明 T是模板形参 int是模板实参 inpunt是函数形参 3是函数实参 显示实例化 模板必须实例化可见 翻译单元一处定义原则 与内联函数异同 引入原因:函数模板是为了编译器两个阶段的处理 内联函数是为了能在编译期展开 模板实参的类…...
【Fastadmin】一个完整的轮播图功能示例
目录 1.效果展示: 列表 添加及编辑页面同 2.建表: 3.使用crud一键生成并创建控制器 4.html页面 add.html edit.html index.php 5.js页面 6.小知识点 1.效果展示: 列表 添加及编辑页面同 2.建表: 表名:fa_x…...
Ribbon 饥饿加载
Ribbon默认是采用懒加载,即第一次访问时才会去创建LoadBalanceClient,请求时间会很长而饥饿加载则会在项目启动时创建,降低第一次访问的耗时,通过下面配置开启饥饿加载: 一、懒加载 Ribbon 默认为懒加载即在首次启动Application…...
【AIGC】大语言模型的采样策略--temperature、top-k、top-p等
总结如下: 图片链接 参考 LLM解码-采样策略串讲 LLM大模型解码生成方式总结 LLM探索:GPT类模型的几个常用参数 Top-k, Top-p, Temperature...
基于算法竞赛的c++编程(28)结构体的进阶应用
结构体的嵌套与复杂数据组织 在C中,结构体可以嵌套使用,形成更复杂的数据结构。例如,可以通过嵌套结构体描述多层级数据关系: struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...
Python|GIF 解析与构建(5):手搓截屏和帧率控制
目录 Python|GIF 解析与构建(5):手搓截屏和帧率控制 一、引言 二、技术实现:手搓截屏模块 2.1 核心原理 2.2 代码解析:ScreenshotData类 2.2.1 截图函数:capture_screen 三、技术实现&…...
日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻
在如今就业市场竞争日益激烈的背景下,越来越多的求职者将目光投向了日本及中日双语岗位。但是,一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧?面对生疏的日语交流环境,即便提前恶补了…...
在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能
下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...
【网络安全产品大调研系列】2. 体验漏洞扫描
前言 2023 年漏洞扫描服务市场规模预计为 3.06(十亿美元)。漏洞扫描服务市场行业预计将从 2024 年的 3.48(十亿美元)增长到 2032 年的 9.54(十亿美元)。预测期内漏洞扫描服务市场 CAGR(增长率&…...
【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】
1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件(System Property Definition File),用于声明和管理 Bluetooth 模块相…...
CocosCreator 之 JavaScript/TypeScript和Java的相互交互
引擎版本: 3.8.1 语言: JavaScript/TypeScript、C、Java 环境:Window 参考:Java原生反射机制 您好,我是鹤九日! 回顾 在上篇文章中:CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的“no matching...“系列算法协商失败问题
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的"no matching..."系列算法协商失败问题 摘要: 近期,在使用较新版本的OpenSSH客户端连接老旧SSH服务器时,会遇到 "no matching key exchange method found", "n…...
