从零搭建:Canal实时数据管道打通MySQL与Elasticsearch
Canal实时同步Mysql Binlog至 Elasticsearch

文章目录
-
Canal实时同步Mysql **Binlog**至**Elasticsearch** - 一. 环境准备
- 1.环境检查
- 检查`Mysql`是否开启`BinLog`
- 开启Mysql Binlog
- Java环境检查
- 2.新建测试库和表
- 3.新建Es索引
- 二.**部署 Canal Server**
- **2.1 解压安装包**
- **2.2 配置 Canal Server**
- **2.3 启动 Canal Server**
- **三. 部署 Canal Adapter(同步到 Elasticsearch)**
- **3.1 配置 Adapter**
- **3.2 配置数据映射**
- **3.3 启动 Adapter**
- **4. 验证同步**
- **4.1 插入测试数据到 MySQL**
- **4.2 查询 Elasticsearch**
一. 环境准备
- 操作系统:Linux(Ubuntu 20.04)
- Java 环境:JDK 8+(建议 OpenJDK 11)
- MySQL:已启用 Binlog(ROW 模式),并创建 Canal 用户
- Elasticsearch:已部署(版本 7.x 或 8.x)
- Canal 二进制包:从 Canal Release 下载
canal.deployer-1.1.8.tar.gz和canal.adapter-1.1.8.tar.gz
1.环境检查
#root账号执行
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
输出如下证明已经打开:

创建 Canal 用户并授权:
#创建用户
CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'Password@123';
# 给新创建账户赋予从库权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';# 刷新权限
FLUSH PRIVILEGES;
如果没打开BinLog可以通过如下方法打开:
修改my.cnf文件,加入如下内容:
log_bin=mysql-bin
binlog_format=ROW
binlog_expire_logs_seconds=172800
expire_logs_days=2
log_bin:启用二进制日志,日志文件会以mysql-bin为前缀,并依次生成日志文件(例如:mysql-bin.000001,mysql-bin.000002等)。
binlog_format:设置使用的二进制日志格式,在 MySQL 8.0 版本中,binlog_format的默认值已经变为ROW。所以,即使你在配置文件中没有明确设置binlog_format,MySQL 会默认使用ROW作为二进制日志格式。在较早的 MySQL 版本中默认值是STATEMENT。
binlog_expire_logs_seconds=172800和expire_logs_days=2:这些配置设置了二进制日志的过期时间(默认情况下,MySQL 会保留二进制日志,直到它们过期或达到日志文件数的限制)。在这种情况下,日志会在 2 天后过期。
配置好后重启Mysql:
systemctl restart mysqld.service
echo $JAVA_HOME
2.新建测试库和表
CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;CREATE TABLE `test_user` (`id` bigint unsigned NOT NULL AUTO_INCREMENT,`name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '姓名',`sex` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '性别',`tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '电话',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
3.新建Es索引
curl -X PUT "http://<your es IP>:9200/test_user" -H 'Content-Type: application/json' -u <es账号>:<es 密码> -d'
{"mappings": {"properties": {"id": {"type": "long"},"title": {"type": "text"},"sex": {"type": "text"},"tel": {"type": "text"}}}
}
'
二.部署 Canal Server
2.1 解压安装包
# 创建目录
mkdir -p /opt/canal/server /opt/canal/adapter# 解压 Server
tar -zxvf canal.deployer-1.1.8.tar.gz -C /opt/canal/server# 解压 Adapter
tar -zxvf canal.adapter-1.1.8.tar.gz -C /opt/canal/adapter
2.2 配置 Canal Server
修改配置文件 /opt/canal/server/conf/canal.properties:
# tcp bind ip
canal.ip =127.0.0.1
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112# 目标实例名称(默认 example)
canal.destinations = example# 持久化模式(默认内存,可选 H2/MySQL)
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
这里主要修改canal.ip其他保持默认即可。
修改实例配置 /opt/canal/server/conf/example/instance.properties:
#被同步的mysql地址,填写自己的IP地址
canal.instance.master.address=127.0.0.1:3306
#第一步创建的数据库从库权限账号/密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Password@123
#数据库连接编码
canal.instance.connectionCharset = UTF-8
#Binlog 过滤规则(监控所有库表)
canal.instance.filter.regex=.*\\..*
#指定了 Canal 消费者(比如 MQ 客户端)读取和写入消息的目标主题,保持默认即可
canal.mq.topic=example
2.3 启动 Canal Server
cd /opt/canal/server/bin
./startup.sh# 查看日志
tail -f /opt/canal/server/logs/canal/canal.log
tail -f /opt/canal/server/logs/example/example.log



可以看到日志没有明显报错,且进程已经启动,则表示Canal Server已经启动成功。

三. 部署 Canal Adapter(同步到 Elasticsearch)
3.1 配置 Adapter
修改配置文件 /opt/canal/adapter/conf/application.yml:
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: tcp # 客户端的模式,可选tcp kafka rocketMQflatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效zookeeperHosts: # 对应集群模式下的zk地址syncBatchSize: 1000 # 每次同步的批数量retries: 0 # 重试次数, -1为无限重试timeout: # 同步超时时间, 单位毫秒accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111 #配置canal-server的地址canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:srcDataSources: # 源数据库配置defaultDScanal是测试数据库url: jdbc:mysql://<yourIP>:3306/canal?useUnicode=true&useSSL=true #数据库连接,canal是测试用的数据库username: root #数据库账号password: Pass@1234 #数据库密码canalAdapters: # 适配器列表- instance: example # canal实例名,和上述Server的配置一样groups: # 分组列表- groupId: g1 # 分组id, 如果是MQ模式将用到该值outerAdapters:- name: logger # 日志打印适配器- name: es8 # ES同步适配器根据自己的es版本来hosts: <your IP>:9200 # ES连接地址properties:mode: rest # 模式可选transport端口(9300) 或者 rest端口(9200)security.auth: elastic:123456 # 连接es的用户和密码,仅rest模式使用cluster.name: elasticsearch # ES集群名称
如何获取es集群名称,命令输出的cluster_name就是上面需要配置的集群名字:
curl -u elastic:<esPass> -X GET "http://<es IP>:9200/_cluster/health?pretty"

3.2 配置数据映射
创建 Elasticsearch 映射文件 /opt/canal/adapter/conf/es8/mytest_user.yml:
dataSourceKey: defaultDS # 源数据源的key, 对应上面application配置的srcDataSources中的值
destination: example # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:_index: test_user # es 的索引名称_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配sql: "SELECTtb.id AS _id,tb.name,tb.sex,tb.telFROMtest_user us" # sql映射etlCondition: "where p.id>={}" #etl的条件参数commitBatch: 3000 # 提交批大小
3.3 启动 Adapter
cd /opt/canal/adapter/bin
./startup.sh#查看日志
tail -f /opt/canal/adapter/logs/adapter/adapter.log
会输出很多数据库变更的日志:


4. 验证同步
4.1 插入测试数据到 MySQL
#执行sql
INSERT INTO test_user (name, sex, tel) VALUES ('Paco', '男', '123456789');


4.2 查询 Elasticsearch
curl -u elastic:<esPass> -X GET "http://<esIP>:9200/test_user/_search?pretty"
也可以在工具上查看,这边是Eage插件:


至此,即可验证可同步成功。我们可以修改数据测试,看是否能同步。


然后我们测试修改Es的数据:


可以发现数据库并没有变,至此Canal单向实时同步Mysql Binlog至Elasticsearch就配置完成了。
相关文章:
从零搭建:Canal实时数据管道打通MySQL与Elasticsearch
Canal实时同步Mysql Binlog至 Elasticsearch 文章目录 Canal实时同步Mysql **Binlog**至**Elasticsearch** 一. 环境准备1.环境检查检查Mysql是否开启BinLog开启Mysql BinlogJava环境检查 2.新建测试库和表3.新建Es索引 二.**部署 Canal Server****2.1 解压安装包****2.2 配置 …...
Baumer工业相机堡盟工业相机如何通过BGAPI SDK实现一次触发控制三个光源开关分别采集三张图像(C#)
Baumer工业相机堡盟工业相机如何通过BGAPI SDK实现一次触发控制三个光源开关分别采集三张图像(C#) Baumer工业相机Baumer工业相机定序器功能的技术背景Baumer工业相机通过BGAPI SDK使用定序器功能预期的相机动作定序器的工作原理 Baumer工业相机通过BGAP…...
网络安全用centos干嘛 网络安全需要学linux吗
网络安全为啥要学Linux系统,据不完全统计,Linux系统在数据中心操作系统上的份额高达70%。它一般运行于服务器和超级计算机上。 所以我们日常访问的网站后台和app后端都是部署在Linux服务器上的,如果你不会Linux系统操作,那么很多…...
【React】react-redux+redux-toolkit实现状态管理
安装 npm install reduxjs/toolkit react-reduxRedux Toolkit 是官方推荐编写Redux的逻辑方式,用于简化书写方式React-redux 用来链接Redux和React组件之间的中间件 使用 定义数据 创建要管理的数据模块 store/module/counter.ts import { createSlice, Payloa…...
如何通过AI轻松制作PPT?让PPT一键生成变得简单又高效
如何通过AI轻松制作PPT?让PPT一键生成变得简单又高效!在这个信息化飞速发展的时代,PPT已经成为我们日常工作、学习和生活中不可或缺的一部分。无论是公司会议、学术报告,还是个人展示,PPT的作用都不容忽视。很多人对于…...
Springer |第七届2025年区块链、人工智能和可信系统国际会议
Springer |第七届2025年区块链、人工智能和可信系统国际会议 International Conference on Blockchain, Artificial Intelligence, and Trustworthy Systems 【重要日期】 论文提交截止日期:2025年03月01日(第2轮) 会议报名截止日期&…...
新一代SCADA: 宏集Panorama Suite 2025 正式发布,提供更灵活、符合人体工学且安全的应用体验
宏集科技宣布正式推出全新Panorama Suite 2025 SCADA软件!全新版本标志着 Panorama Suite的一个重要里程碑,代表了从 Panorama Suite 2022 开始并跨越三个版本(2022、2023、2025)的开发过程的顶峰。 此次重大发布集中在六个核心主…...
AI在电竞比分网中的主要应用场景
AI在电竞体育比分网的数据应用非常广泛,能够显著提升数据分析、预测、用户体验和商业价值。以下是AI在电竞比分网中的主要应用场景: 1. 实时数据采集与分析 比赛数据实时更新:AI通过自动化系统实时采集比赛数据(如击杀数、经济差、…...
前端快速生成接口方法
大家好,我是苏麟,今天聊一下OpenApi。 官网 : umijs/openapi - npm 安装命令 npm i --save-dev umijs/openapi 在根目录(项目目录下)创建文件 openapi.config.js import { generateService } from umijs/openapi// 自…...
【Pico】使用Pico进行无线串流搜索不到电脑
使用Pico进行无线串流搜索不到电脑 官串方式:使用Pico互联连接电脑。 故障排查 以下来自官方文档 请按照以下步骡排除故障: 确认电脑和一体机连接了相同的路由器WiFi网络(相同网段) IP地址通常为192.168.XX,若两设备的IP地址前三段相同&…...
机柜机箱制冷风扇在使用过程中突然停止运转的原因
在机柜机箱的正常运行中,制冷风扇起着关键的散热作用,可一旦它在使用时突然停止运转,将会对机柜机箱内设备的稳定运行构成严重威胁。而导致这一现象出现的原因较为复杂,主要涵盖以下几个方面。 从电源供应角度来看,这是…...
Python函数返回值250214
import requests from xml.etree import ElementTree as ETdef xml_to_list(city):data_list []url "...".format(city) # 具体url地址就不写了res requests.get(url url)root ET.XML(res.text)for node in root:data_list.append(node.text)return dat_listres…...
call、apply、bind 详解
在 JavaScript 中,call、apply 和 bind 是 Function 对象的三个重要方法,它们都与函数的上下文(this 值)和参数传递有关。 一、call 方法 1. 语法 function.call(thisArg, arg1, arg2, ...) 2. 示例代码 const person {name…...
详解电子邮箱工作原理|SMTP、POP3、IMAP、SPF、MIME
写在前面 电子邮件(Email)是一种通过互联网进行异步通信的技术,工作原理涉及多个协议、服务器和客户端协同工作。 接下来我们来介绍一下电子邮箱的工作原理 1. 电子邮件的核心组成部分 邮件客户端:用户直接交互的软件…...
QT笔记——QPlainTextEdit
文章目录 1、概要2、文本设计2.1、设置文本2.1、字体样式(大小、下划线、加粗、斜体) 1、概要 QPlainTextEdit 是 Qt 框架中用于处理纯文本编辑的控件,具有轻量级和高效的特点,以下是它常见的应用场景: 文本编辑器&am…...
Qt使用pri和pro文件进行模块化编程
假如我想要做一个功能,这个功能用代码模块化实现出来,方便将来移植,比如音视频播放器的界面,将来想要在其他工程使用时,只需要将widget提升为音视频播放界面即可。 当我们其他工程需要这个功能时,我们在调用…...
Linux-文件基本操作
1.基本概念 文件: 一组相关数据的集合 文件名: 01.sh //文件名 2.linux下的文件类型 b block 块设备文件 eg: 硬盘 c character 字符设备文件 eg: 鼠标,键盘 d directory 目录文件 eg: 文件夹 - regular 常规文件…...
自己部署 DeepSeek 助力 Vue 开发:打造丝滑的时间线(Timeline )
前言:哈喽,大家好,今天给大家分享一篇文章!并提供具体代码帮助大家深入理解,彻底掌握!创作不易,如果能帮助到大家或者给大家一些灵感和启发,欢迎收藏关注哦 💕 目录 自己…...
初窥强大,AI识别技术实现图像转文字(OCR技术)
⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ 🐴作者:秋无之地 🐴简介:CSDN爬虫、后端、大数据、人工智能领域创作者。目前从事python全栈、爬虫和人工智能等相关工作,主要擅长领域有:python…...
【Apache Paimon】-- 作为一名小白,如何系统地学习 Apache paimon?
目录 一、整体规划 1. 了解基本概念与背景 2. 学习资料的选择 3. 学习路径与规划 4. 学习建议 5. 注意事项 6. 参考学习资料 二、详细计划 阶段 1:了解基础(1-2 周) 阶段 2:深入掌握核心功能(3-4 周…...
网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
线程与协程
1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指:像函数调用/返回一样轻量地完成任务切换。 举例说明: 当你在程序中写一个函数调用: funcA() 然后 funcA 执行完后返回&…...
Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...
【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)
要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况,可以通过以下几种方式模拟或触发: 1. 增加CPU负载 运行大量计算密集型任务,例如: 使用多线程循环执行复杂计算(如数学运算、加密解密等)。运行图…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...
C#学习第29天:表达式树(Expression Trees)
目录 什么是表达式树? 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持: 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...
android13 app的触摸问题定位分析流程
一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...
