centos7部署Canal与Canal集成使用
1、简介
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x 、5.5.x 、5.6.x 、 5.7.x 、8.0.x
2、工作原理
2.1、MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
2.2、canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
3、部署
3.1、准备
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
- 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3.2、启动
下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.0.17 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
解压缩
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
解压完成后,进入 /tmp/canal 目录,可以看到如下结构
drwxr-xr-x 2 jianghang jianghang 136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang 160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang 48 2013-02-05 21:29 logs
配置修改
vi conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
启动
sh bin/startup.sh
查看 server 日志
vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
查看 instance 的日志
vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
关闭
sh bin/stop.sh
4、安装canal-admin
canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作
4.1、准备
canal-admin的限定依赖:
MySQL,用于存储配置和节点等相关数据
canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)
4.2、部署
下载 canal-admin, 访问 release 页面 , 选择需要的包下载, 如以 1.1.4 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
解压缩
mkdir /tmp/canal-admin
tar zxvf canal.admin-$version.tar.gz -C /tmp/canal-admin
解压完成后,进入 /tmp/canal 目录,可以看到如下结构
drwxr-xr-x 6 agapple staff 204B 8 31 15:37 bin
drwxr-xr-x 8 agapple staff 272B 8 31 15:37 conf
drwxr-xr-x 90 agapple staff 3.0K 8 31 15:37 lib
drwxr-xr-x 2 agapple staff 68B 8 31 15:26 logs
配置修改
vi conf/application.yml
server:port: 8089
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8spring.datasource:address: 127.0.0.1:3306database: canal_managerusername: canalpassword: canaldriver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=falsehikari:maximum-pool-size: 30minimum-idle: 1canal:adminUser: adminadminPasswd: admin
初始化元数据库
mysql -h127.1 -uroot -p
导入初始化SQL
> source conf/canal_manager.sql
a. 初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化 b. canal_manager.sql默认会在conf目录下,也可以通过链接下载 canal_manager.sql
启动
sh bin/startup.sh
查看 admin 日志
vi logs/admin.log
2019-08-31 15:43:38.162 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat initialized with port(s): 8089 (http)
2019-08-31 15:43:38.180 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8089"]
2019-08-31 15:43:38.191 [main] INFO org.apache.catalina.core.StandardService - Starting service [Tomcat]
2019-08-31 15:43:38.194 [main] INFO org.apache.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/8.5.29
....
2019-08-31 15:43:39.789 [main] INFO o.s.w.s.m.m.annotation.ExceptionHandlerExceptionResolver - Detected @ExceptionHandler methods in customExceptionHandler
2019-08-31 15:43:39.825 [main] INFO o.s.b.a.web.servlet.WelcomePageHandlerMapping - Adding welcome page: class path resource [public/index.html]
此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456
关闭
sh bin/stop.sh
canal-server端配置
使用canal_local.properties的配置覆盖canal.properties
# register ip
canal.register.ip =# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =
启动admin-server即可。
或在启动命令中使用参数:sh bin/startup.sh local
指定配置
5、springboot项目集成使用
依赖配置:
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>
5.1、创建mvn标准工程:
mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample
maven3.0.5以上版本舍弃了create,使用generate生成项目
mvn archetype:generate -DgroupId=com.alibaba.otter -DartifactId=canal.sample
5.2、修改pom.xml,添加依赖
5.3、ClientSample代码
package com.alibaba.otter.canal.sample;
import java.net.InetSocketAddress;
import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}
}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}
}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}
}}
5.4、运行Client
首先启动Canal Server,
启动Canal Client后,可以从控制台从看到类似消息:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
此时代表当前数据库无变更数据
- 触发数据库变更
mysql> use test;
Database changed
mysql> CREATE TABLE `xdual` (-> `ID` int(11) NOT NULL AUTO_INCREMENT,-> `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,-> PRIMARY KEY (`ID`)-> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
Query OK, 0 rows affected (0.06 sec)
mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)
可以从控制台中看到:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT
ID : 4 update=true
X : 2013-02-05 23:29:46 update=true
相关文章:

centos7部署Canal与Canal集成使用
1、简介 canal [kə’nl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigge…...

C语言--分段函数--switch语句
如何用switch语句写分段函数呢?⭐️ 首先介绍一下switch语句的语法规则⭐️ switch(整形表达式) {case 常量表达式1; //标签必须唯一语句块1;break;case 常量表达式2; //if(a0),而case中时系统自动加语句块2;break;c…...
动态规划31(Leetcode188买卖股票的最佳时机4)
代码: 我的状态方程: buy[i][j]max{buy[i−1][j],sell[i−1][j-1]−price[i]} 题解里的: buy[i][j]max{buy[i−1][j],sell[i−1][j]−price[i]} ..没理解题解的 但我的通过了 class Solution {public int maxProfit(int k, int[] pric…...
npm包管理相关命令
前置条件,准备npm账号,并登录,npm login 或者 npm adduser (这一行同样需要输入账号密码登录,之后就不用登录了) 验证是否登录:npm whoami 还可以查看用户简介:npm profile get …...

2023年Q3乳品行业数据分析(乳品市场未来发展趋势)
随着人们生活水平的不断提高以及对健康生活的追求不断增强,牛奶作为优质蛋白和钙的补充品,市场需求逐年增加。 今年Q3,牛奶乳品市场仍呈增长趋势。根据鲸参谋电商数据分析平台的相关数据显示,2023年7月-9月,牛奶乳品市…...
软考 系统架构设计师系列知识点之边缘计算(2)
接前一篇文章:软考 系统架构设计师系列知识点之边缘计算(1) 所属章节: 第11章. 未来信息综合技术 第4节. 边缘计算概述 3. 边缘计算的特点 边缘计算是在靠近物或数据源头的网络边缘侧,融合网络、计算、存储、应用核心…...

Maven中的继承与聚合
一,继承 前面我们将项目拆分成各个小模块,但是每个小模块中有很多相同的依赖于是我们创建一个父工程将模块中相同的依赖定义在父工程中,然后子工程继承父工程Maven作用:简化依赖配置,统一依赖管理,可以实现多重继承像J…...

第三章 UI开发的点点滴滴
一、常用控件的使用方法 1.TextView android:gravity"center" 可选值:top、bottom、left、right、center等,可以用"|"来同时指定多个值,center表示文字在垂直和水平方向都居中 android:textSize 指定文字的大小&#…...

637. 二叉树的层平均值
描述 : 给定一个非空二叉树的根节点 root , 以数组的形式返回每一层节点的平均值。与实际答案相差 10-5 以内的答案可以被接受。 题目 : 637. 二叉树的层平均值 分析 : 这个题和前面的几个一样,只不过是每层都先将元素保存下来,最后求平均就行了: 解…...

【Java笔试强训】Day9(CM72 另类加法、HJ91 走方格的方案数)
CM72 另类加法 链接:另类加法 题目: 给定两个int A和B。编写一个函数返回AB的值,但不得使用或其他算数运算符。 题目分析: 代码实现: package Day9;public class Day9_1 {public int addAB(int A, int B) {// wr…...

django REST框架- Django-ninja
Django 是我学习的最早的web框架,大概在2014年,当时选他原因也很简单就是网上资料比较丰富,自然是遇到问题更容易找答案,直到 2018年真正开始拿django做项目,才对他有了更全面的了解。他是一个入门有门槛,学…...

数据结构与算法C语言版学习笔记(3)-线性表的链式结构:链表
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言:回顾顺序表的优缺点:为什么要引入链式结构的线性表? 一、什么是链表?二、链表的分类①为什么要设置头节点&…...
Web学习笔记-Vue3(环境配置、概念、整体布局设计)
笔记内容转载自 AcWing 的 Web 应用课讲义,课程链接:AcWing Web 应用课。 CONTENTS 1. 环境配置2. 基本概念3. 导航栏4. 页面创建5. 用户动态页面实现 Vue 官网:Vue.js。 Vue.js 是一款用于构建用户界面的 JavaScript 框架。它基于标准 HTML…...

【React-Native开发3D应用】React Native加载GLB格式3D模型并打包至Android手机端
【React-Native开发3D应用】React Native加载GLB格式3D模型并打包至Android手机端 【加载3D模型】**React Native上如何加载glb格式的模型**第零步,选择相关模型第一步,导入相关模型加载库第二步,自定义GLB模型加载钩子第三步,借助…...
python的列表
定义 列表 是python中的一种数据类型,可以存放多个数据,列表中的数据可以是任意类型的。 格式 格式: my_list [] my_list list() 定义一个空的列表,有如上两种方式 遍历 for 循环 while循环 列表添加操作 列表添加操作有…...

[100天算法】-最短无序连续子数组(day 66)
题目描述 给定一个整数数组,你需要寻找一个连续的子数组,如果对这个子数组进行升序排序,那么整个数组都会变为升序排序。你找到的子数组应是最短的,请输出它的长度。示例 1:输入: [2, 6, 4, 8, 10, 9, 15] 输出: 5 解释: 你只需要…...

001. 变量、环境变量
1、在终端中显示输出 shell脚本通常以shebang起始:#!/bin/bash/ shebang是一个文本行,其中#!位于解释器路径之前。/bin/bash是Bash的解释器命令路径。bash将以#符号开头的行视为注释。脚本中只有第一行可以使用shebang来定义解释该脚本所使…...

软考软件设计师刷题笔记整理
软件设计师 HTML代码中,创建指向邮箱地址的链接正确的是ARP攻击造成网络无法跨网段通信的原因是在软件开发过程中进行风险分析关于哈夫曼树的叙述关于风险管理的叙述ISO/IEC9126软件质量模型关于结构化开发方法的叙述分布式数据库中的分片透明、复制透明、位置透明和…...

Canal
canal译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。 1.canal 工作原理 canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议MySQL master 收到…...
SpringBoot实现mysql与clickhouse多数据源
一、我们来实现一个mysql与clickhouse多数据源配置 二、数据源配置 # 指定服务名称 spring:application:name: demobigdatadatasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/db?createDatabaseIfNotExisttrue&useUnicodetrue&…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望
文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例:使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例:使用OpenAI GPT-3进…...
在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:
在 HarmonyOS 应用开发中,手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力,既支持点击、长按、拖拽等基础单一手势的精细控制,也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档,…...

2.Vue编写一个app
1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...
oracle与MySQL数据库之间数据同步的技术要点
Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异ÿ…...
Python爬虫(二):爬虫完整流程
爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
反射获取方法和属性
Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...

使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台
🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...