从零搭建:Canal实时数据管道打通MySQL与Elasticsearch
Canal实时同步Mysql Binlog至 Elasticsearch

文章目录
-
Canal实时同步Mysql **Binlog**至**Elasticsearch** - 一. 环境准备
- 1.环境检查
- 检查`Mysql`是否开启`BinLog`
- 开启Mysql Binlog
- Java环境检查
- 2.新建测试库和表
- 3.新建Es索引
- 二.**部署 Canal Server**
- **2.1 解压安装包**
- **2.2 配置 Canal Server**
- **2.3 启动 Canal Server**
- **三. 部署 Canal Adapter(同步到 Elasticsearch)**
- **3.1 配置 Adapter**
- **3.2 配置数据映射**
- **3.3 启动 Adapter**
- **4. 验证同步**
- **4.1 插入测试数据到 MySQL**
- **4.2 查询 Elasticsearch**
一. 环境准备
- 操作系统:Linux(Ubuntu 20.04)
- Java 环境:JDK 8+(建议 OpenJDK 11)
- MySQL:已启用 Binlog(ROW 模式),并创建 Canal 用户
- Elasticsearch:已部署(版本 7.x 或 8.x)
- Canal 二进制包:从 Canal Release 下载
canal.deployer-1.1.8.tar.gz和canal.adapter-1.1.8.tar.gz
1.环境检查
#root账号执行
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
输出如下证明已经打开:

创建 Canal 用户并授权:
#创建用户
CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'Password@123';
# 给新创建账户赋予从库权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';# 刷新权限
FLUSH PRIVILEGES;
如果没打开BinLog可以通过如下方法打开:
修改my.cnf文件,加入如下内容:
log_bin=mysql-bin
binlog_format=ROW
binlog_expire_logs_seconds=172800
expire_logs_days=2
log_bin:启用二进制日志,日志文件会以mysql-bin为前缀,并依次生成日志文件(例如:mysql-bin.000001,mysql-bin.000002等)。
binlog_format:设置使用的二进制日志格式,在 MySQL 8.0 版本中,binlog_format的默认值已经变为ROW。所以,即使你在配置文件中没有明确设置binlog_format,MySQL 会默认使用ROW作为二进制日志格式。在较早的 MySQL 版本中默认值是STATEMENT。
binlog_expire_logs_seconds=172800和expire_logs_days=2:这些配置设置了二进制日志的过期时间(默认情况下,MySQL 会保留二进制日志,直到它们过期或达到日志文件数的限制)。在这种情况下,日志会在 2 天后过期。
配置好后重启Mysql:
systemctl restart mysqld.service
echo $JAVA_HOME
2.新建测试库和表
CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;CREATE TABLE `test_user` (`id` bigint unsigned NOT NULL AUTO_INCREMENT,`name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '姓名',`sex` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '性别',`tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '电话',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
3.新建Es索引
curl -X PUT "http://<your es IP>:9200/test_user" -H 'Content-Type: application/json' -u <es账号>:<es 密码> -d'
{"mappings": {"properties": {"id": {"type": "long"},"title": {"type": "text"},"sex": {"type": "text"},"tel": {"type": "text"}}}
}
'
二.部署 Canal Server
2.1 解压安装包
# 创建目录
mkdir -p /opt/canal/server /opt/canal/adapter# 解压 Server
tar -zxvf canal.deployer-1.1.8.tar.gz -C /opt/canal/server# 解压 Adapter
tar -zxvf canal.adapter-1.1.8.tar.gz -C /opt/canal/adapter
2.2 配置 Canal Server
修改配置文件 /opt/canal/server/conf/canal.properties:
# tcp bind ip
canal.ip =127.0.0.1
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112# 目标实例名称(默认 example)
canal.destinations = example# 持久化模式(默认内存,可选 H2/MySQL)
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
这里主要修改canal.ip其他保持默认即可。
修改实例配置 /opt/canal/server/conf/example/instance.properties:
#被同步的mysql地址,填写自己的IP地址
canal.instance.master.address=127.0.0.1:3306
#第一步创建的数据库从库权限账号/密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Password@123
#数据库连接编码
canal.instance.connectionCharset = UTF-8
#Binlog 过滤规则(监控所有库表)
canal.instance.filter.regex=.*\\..*
#指定了 Canal 消费者(比如 MQ 客户端)读取和写入消息的目标主题,保持默认即可
canal.mq.topic=example
2.3 启动 Canal Server
cd /opt/canal/server/bin
./startup.sh# 查看日志
tail -f /opt/canal/server/logs/canal/canal.log
tail -f /opt/canal/server/logs/example/example.log



可以看到日志没有明显报错,且进程已经启动,则表示Canal Server已经启动成功。

三. 部署 Canal Adapter(同步到 Elasticsearch)
3.1 配置 Adapter
修改配置文件 /opt/canal/adapter/conf/application.yml:
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: tcp # 客户端的模式,可选tcp kafka rocketMQflatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效zookeeperHosts: # 对应集群模式下的zk地址syncBatchSize: 1000 # 每次同步的批数量retries: 0 # 重试次数, -1为无限重试timeout: # 同步超时时间, 单位毫秒accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111 #配置canal-server的地址canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:srcDataSources: # 源数据库配置defaultDScanal是测试数据库url: jdbc:mysql://<yourIP>:3306/canal?useUnicode=true&useSSL=true #数据库连接,canal是测试用的数据库username: root #数据库账号password: Pass@1234 #数据库密码canalAdapters: # 适配器列表- instance: example # canal实例名,和上述Server的配置一样groups: # 分组列表- groupId: g1 # 分组id, 如果是MQ模式将用到该值outerAdapters:- name: logger # 日志打印适配器- name: es8 # ES同步适配器根据自己的es版本来hosts: <your IP>:9200 # ES连接地址properties:mode: rest # 模式可选transport端口(9300) 或者 rest端口(9200)security.auth: elastic:123456 # 连接es的用户和密码,仅rest模式使用cluster.name: elasticsearch # ES集群名称
如何获取es集群名称,命令输出的cluster_name就是上面需要配置的集群名字:
curl -u elastic:<esPass> -X GET "http://<es IP>:9200/_cluster/health?pretty"

3.2 配置数据映射
创建 Elasticsearch 映射文件 /opt/canal/adapter/conf/es8/mytest_user.yml:
dataSourceKey: defaultDS # 源数据源的key, 对应上面application配置的srcDataSources中的值
destination: example # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:_index: test_user # es 的索引名称_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配sql: "SELECTtb.id AS _id,tb.name,tb.sex,tb.telFROMtest_user us" # sql映射etlCondition: "where p.id>={}" #etl的条件参数commitBatch: 3000 # 提交批大小
3.3 启动 Adapter
cd /opt/canal/adapter/bin
./startup.sh#查看日志
tail -f /opt/canal/adapter/logs/adapter/adapter.log
会输出很多数据库变更的日志:


4. 验证同步
4.1 插入测试数据到 MySQL
#执行sql
INSERT INTO test_user (name, sex, tel) VALUES ('Paco', '男', '123456789');


4.2 查询 Elasticsearch
curl -u elastic:<esPass> -X GET "http://<esIP>:9200/test_user/_search?pretty"
也可以在工具上查看,这边是Eage插件:


至此,即可验证可同步成功。我们可以修改数据测试,看是否能同步。


然后我们测试修改Es的数据:


可以发现数据库并没有变,至此Canal单向实时同步Mysql Binlog至Elasticsearch就配置完成了。
相关文章:
从零搭建:Canal实时数据管道打通MySQL与Elasticsearch
Canal实时同步Mysql Binlog至 Elasticsearch 文章目录 Canal实时同步Mysql **Binlog**至**Elasticsearch** 一. 环境准备1.环境检查检查Mysql是否开启BinLog开启Mysql BinlogJava环境检查 2.新建测试库和表3.新建Es索引 二.**部署 Canal Server****2.1 解压安装包****2.2 配置 …...
PyArmor:一个超级厉害的 Python 库!
在 Python 的世界里,如何保护我们的代码不被轻易盗用或者破解,一直是开发者们关注的问题。尤其是在发布软件时,如何有效防止源代码泄漏或者被逆向工程分析,成为了一个重要课题。 PyArmor 作为一款强大的 Python 加密工具ÿ…...
《战神:诸神黄昏》游戏闪退后提示弹窗“d3dx9_43.dll缺失”“找不到d3dx11_43.d”该怎么处理?
宝子们,是不是在玩《战神:诸神黄昏》的时候,突然弹出一个提示:“找不到d3dx9_43.dll”或者“d3dx11_43.dll缺失”?这可真是让人着急上火!别慌,今天就给大家唠唠这个文件为啥会丢,还有…...
Ollama本地部署DeepSeek(Mac)
准备工作 DeepSeek对比 DeepSeek-r1 DeepSeek-R1的多个版本:加上2个原装671B的,总计8个参数版本 DeepSeek-R1 671B DeepSeek-R1-Zero 671B DeepSeek-R1-Distill-Llama-70B DeepSeek-R1-Distill-Qwen-32B DeepSeek-R1-Distill-Qwen-14B DeepSeek-R1-Di…...
mysql8 从C++源码角度看sql生成抽象语法树
在 MySQL 8 的 C 源码中,SQL 语句的解析过程涉及多个步骤,包括词法分析、语法分析和抽象语法树(AST)的生成。以下是详细的解析过程和相关组件的描述: 1. 词法分析器(Lexer) MySQL 使用一个称为…...
【Linux】修改语言编码
查询环境变量 locale#下载简体中文语言包 locale-gen zh_CN.UTF-8#查看当前环境的所有语言包 locale -a#查看配置文件中的编码 cat /etc/default/locale source /etc/default/locale修改为美式英语 LANG"en_US.UTF-8"修改为中文简体 LANG"zh_CN.UTF-8"…...
arm linux下的中断处理过程。
本文基于ast2600 soc来阐述,内核版本为5.10 1.中断gic初始化 start_kernel() -> init_IRQ() -> irqchip_init() of_irq_init()主要是构建of_intc_desc. 489-514: 从__irqchip_of_table中找到dts node中匹配的of_table(匹配matches->compatible)…...
Docker的深入浅出
目录 Docker引擎 Docker镜像 (镜像由多个层组成,每层叠加之后,从外部看来就如一个独立的对象。镜像内部是一个精简的操作系统(OS),同时还包含应用运行所必须的文件和依赖包) Docker容器 应用容器化--Docker化 最佳…...
内存映射工作原理和适用场景
Linux 内存映射(Memory Mapping)是一种将文件或其他资源直接映射到进程虚拟内存地址空间的机制,允许进程像访问内存一样访问文件或设备。这种机制通过 mmap() 系统调用实现,常用于高效文件操作、进程间共享内存等场景。 1. 内存映…...
【Nginx + Keepalived 实现高可用的负载均衡架构】
使用 Nginx Keepalived 可以实现高可用的负载均衡架构,确保在某个 Nginx 节点故障时,自动将流量转移到备用节点。以下是详细的实现步骤: 1. 架构概述 Nginx:作为负载均衡器,将流量分发到后端服务器。Keepalived&…...
自动驾驶超声波雷达:市场潜力爆发,引领未来出行新趋势
在自动驾驶技术的飞速发展中,自动驾驶超声波雷达作为一项关键技术,正逐渐崭露头角,其重要性及市场增长潜力不容忽视。本文将深入探讨自动驾驶超声波雷达的重要性、市场增长趋势、显著优势、全球市场规模与驱动因素、主要市场参与者以及不同地…...
Apache服务器的基础配置(认证考试笔记)
Apache服务器的基本配置 配置Apache服务器,有如下需求: 不能修改Apache默认配置文件建立虚拟主机www.test.com,端口80将URLwww.test.com/data的请求引至目录/web/database,将URL www.test.com/img的请求导至目录/web/imagesweb/…...
41.兼职网站管理系统(基于springbootvue的Java项目)
目录 1.系统的受众说明 2.相关技术 2.1 B/S架构 2.2 Java技术介绍 2.3 mysql数据库介绍 2.4 Spring Boot框架 3.系统分析 3.1 需求分析 3.2 系统可行性分析 3.2.1技术可行性:技术背景 3.2.2经济可行性 3.2.3操作可行性: 3.3 项目设计目…...
Linux ARM64 将内核虚拟地址转化为物理地址
文章目录 前言一、通用方案1.1 kern_addr_valid1.2 __pa 二、ARM64架构2.1 AT S1E1R2.2 is_kernel_addr_vaild2.3 va2pa_helper 三、demo演示参考资料 前言 本文介绍一种通用的将内核虚拟地址转化为物理地址的方案以及一种适用于ARM64 将内核虚拟地址转化为物理地址的方案&…...
spring学习(使用spring加载properties文件信息)(spring自定义标签引入)
目录 一、博客引言。 二、基本配置准备。 (1)初步分析。 (2)初始spring配置文件。 三、spring自定义标签的引入。 (1)基本了解。 (2)引入新的命名空间:xmlns:context。 &…...
Flutter项目试水
1基本介绍 本文章在构建您的第一个 Flutter 应用指导下进行实践 可作为项目实践的辅助参考资料 Flutter 是 Google 的界面工具包,用于通过单一代码库针对移动设备、Web 和桌面设备构建应用。在此 Codelab 中,您将构建以下 Flutter 应用。 该应用可以…...
Linux(Ubuntu)安装pyenv和pyenv-virtualenv
Ubuntu安装pyenv和pyenv-virtualenv 安装 pyenv1. 下载 pyenv2. 配置环境变量3. 重启 Shell4. 安装依赖5.检测是否安装成功 安装 pyenv-virtualenv1. 安装 pyenv-virtualenv2. 配置环境变量3. 重启 Shell pyenv 的使用1. 查看可安装的 Python 版本2. 安装指定版本的 Python3. 查…...
调用DeepSeek官方的API接口
效果 前端样式体验链接:https://livequeen.top/deepseekshow 准备工作 1、注册deepseek官网账号 地址:DeepSeek 点击进入右上角【API开放平台】,并进行账号注册。 2、注册完成后,依次点击【API keys】-【生成API key】&#x…...
MFC线程安全案例
作者:小蜗牛向前冲 名言:我可以接受失败,但我不能接受放弃 如果觉的博主的文章还不错的话,还请点赞,收藏,关注👀支持博主。如果发现有问题的地方欢迎❀大家在评论区指正 目录 一、项目解析 二…...
【Elasticsearch】bucket_sort
Elasticsearch 的bucket_sort聚合是一种管道聚合,用于对父多桶聚合(如terms、date_histogram、histogram等)的桶进行排序。以下是关于bucket_sort的详细说明: 1.基本功能 bucket_sort聚合可以对父聚合返回的桶进行排序ÿ…...
计算机毕业设计——Springboot点餐平台网站
📘 博主小档案: 花花,一名来自世界500强的资深程序猿,毕业于国内知名985高校。 🔧 技术专长: 花花在深度学习任务中展现出卓越的能力,包括但不限于java、python等技术。近年来,花花更…...
MATLAB中count函数用法
目录 语法 说明 示例 对出现次数计数 使用模式对数字和字母进行计数 多个子字符串的所有出现次数 忽略大小写 对字符向量中的子字符串进行计数 count函数的功能是计算字符串中模式的出现次数。 语法 A count(str,pat) A count(str,pat,IgnoreCase,true) 说明 A c…...
Win11下搭建Kafka环境
目录 一、环境准备 二、安装JDK 1、下载JDK 2、配置环境变量 3、验证 三、安装zookeeper 1、下载Zookeeper安装包 2、配置环境变量 3、修改配置文件zoo.cfg 4、启动Zookeeper服务 4.1 启动Zookeeper客户端验证 4.2 启动客户端 四、安装Kafka 1、下载Kafka安装包…...
51c自动驾驶~合集49
我自己的原文哦~ https://blog.51cto.com/whaosoft/13164876 #Ultra-AV 轨迹预测新基准!清华开源:统一自动驾驶纵向轨迹数据集 自动驾驶车辆在交通运输领域展现出巨大潜力,而理解其纵向驾驶行为是实现安全高效自动驾驶的关键。现有的开…...
nexus部署及配置https访问
1. 使用docker-compose部署nexus docker-compose-nexus.yml version: "3" services:nexus:container_name: my-nexusimage: sonatype/nexus3:3.67.1hostname: my-nexusnetwork_mode: hostports:- 8081:8081deploy:resources:limits:cpus: 4memory: 8192Mreservations…...
ffmpeg -hwaccels
1. ffmpeg -hwaccels -loglevel quiet 显示ffmpeg支持的硬件设备 2. 输出 Hardware acceleration methods: vdpau cuda vaapi qsv drm opencl 3. 说明 输出中的cuda表示ffmpeg支持Nvidia 硬件设备。编译ffmpeg增加相关硬件设备的配置,输出会显示相应的信…...
Python——批量图片转PDF(GUI版本)
目录 专栏导读1、背景介绍2、库的安装3、核心代码4、完整代码总结专栏导读 🌸 欢迎来到Python办公自动化专栏—Python处理办公问题,解放您的双手 🏳️🌈 博客主页:请点击——> 一晌小贪欢的博客主页求关注 👍 该系列文章专栏:请点击——>Python办公自动化专…...
LabVIEW无人机飞行状态监测系统
近年来,无人机在农业植保、电力巡检、应急救灾等多个领域得到了广泛应用。然而,传统的目视操控方式仍然存在以下三大问题: 飞行姿态的感知主要依赖操作者的经验; 飞行中突发的姿态异常难以及时发现; 飞行数据缺乏系统…...
算法16(力扣451)——根据字符出现频率排序
1、问题 给定一个字符串 s ,根据字符出现的 频率 对其进行 降序排序 。一个字符出现的频率 是它出现在字符串中的次数, 返回 已排序的字符串。如果有多个答案,返回其中任何一个。 2、示例 (1) 输入: s "tree&q…...
Response 和 Request 介绍
怀旧网个人博客网站地址:怀旧网,博客详情:Response 和 Request 介绍 1、HttpServletResponse 1、简单分类 2、文件下载 通过Response下载文件数据 放一个文件到resources目录 编写下载文件Servlet文件 public class FileDownServlet exten…...
