当前位置: 首页 > news >正文

从零搭建:Canal实时数据管道打通MySQL与Elasticsearch

Canal实时同步Mysql BinlogElasticsearch


文章目录

  • Canal实时同步Mysql **Binlog**至**Elasticsearch**
      • 一. 环境准备
        • 1.环境检查
          • 检查`Mysql`是否开启`BinLog`
          • 开启Mysql Binlog
          • Java环境检查
        • 2.新建测试库和表
        • 3.新建Es索引
      • 二.**部署 Canal Server**
        • **2.1 解压安装包**
        • **2.2 配置 Canal Server**
        • **2.3 启动 Canal Server**
      • **三. 部署 Canal Adapter(同步到 Elasticsearch)**
        • **3.1 配置 Adapter**
        • **3.2 配置数据映射**
        • **3.3 启动 Adapter**
      • **4. 验证同步**
        • **4.1 插入测试数据到 MySQL**
        • **4.2 查询 Elasticsearch**

一. 环境准备

  • 操作系统:Linux(Ubuntu 20.04)
  • Java 环境:JDK 8+(建议 OpenJDK 11)
  • MySQL:已启用 Binlog(ROW 模式),并创建 Canal 用户
  • Elasticsearch:已部署(版本 7.x 或 8.x)
  • Canal 二进制包:从 Canal Release 下载 canal.deployer-1.1.8.tar.gzcanal.adapter-1.1.8.tar.gz
1.环境检查
  • 检查Mysql是否开启BinLog
#root账号执行
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';

输出如下证明已经打开:

image-20250211103029832

创建 Canal 用户并授权:

#创建用户
CREATE USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'Password@123';
# 给新创建账户赋予从库权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';# 刷新权限
FLUSH PRIVILEGES;

如果没打开BinLog可以通过如下方法打开:

  • 开启Mysql Binlog

修改my.cnf文件,加入如下内容:

log_bin=mysql-bin
binlog_format=ROW
binlog_expire_logs_seconds=172800
expire_logs_days=2

log_bin:启用二进制日志,日志文件会以 mysql-bin 为前缀,并依次生成日志文件(例如:mysql-bin.000001mysql-bin.000002 等)。

binlog_format:设置使用的二进制日志格式,在 MySQL 8.0 版本中,binlog_format 的默认值已经变为 ROW。所以,即使你在配置文件中没有明确设置 binlog_format,MySQL 会默认使用 ROW 作为二进制日志格式。在较早的 MySQL 版本中默认值是 STATEMENT

binlog_expire_logs_seconds=172800expire_logs_days=2:这些配置设置了二进制日志的过期时间(默认情况下,MySQL 会保留二进制日志,直到它们过期或达到日志文件数的限制)。在这种情况下,日志会在 2 天后过期。

配置好后重启Mysql:

systemctl restart mysqld.service
  • Java环境检查
echo $JAVA_HOME
image-20250211111637904
2.新建测试库和表
 CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;CREATE TABLE `test_user` (`id` bigint unsigned NOT NULL AUTO_INCREMENT,`name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '姓名',`sex` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '性别',`tel` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '电话',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
3.新建Es索引
curl -X PUT "http://<your es IP>:9200/test_user" -H 'Content-Type: application/json' -u <es账号>:<es 密码> -d'
{"mappings": {"properties": {"id": {"type": "long"},"title": {"type": "text"},"sex": {"type": "text"},"tel": {"type": "text"}}}
}
'

二.部署 Canal Server

2.1 解压安装包
# 创建目录
mkdir -p /opt/canal/server /opt/canal/adapter# 解压 Server
tar -zxvf canal.deployer-1.1.8.tar.gz -C /opt/canal/server# 解压 Adapter
tar -zxvf canal.adapter-1.1.8.tar.gz -C /opt/canal/adapter
2.2 配置 Canal Server

修改配置文件 /opt/canal/server/conf/canal.properties

# tcp bind ip
canal.ip =127.0.0.1
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112# 目标实例名称(默认 example)
canal.destinations = example# 持久化模式(默认内存,可选 H2/MySQL)
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml

这里主要修改canal.ip其他保持默认即可。

修改实例配置 /opt/canal/server/conf/example/instance.properties

#被同步的mysql地址,填写自己的IP地址
canal.instance.master.address=127.0.0.1:3306
#第一步创建的数据库从库权限账号/密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Password@123
#数据库连接编码 
canal.instance.connectionCharset = UTF-8 
#Binlog 过滤规则(监控所有库表)
canal.instance.filter.regex=.*\\..*
#指定了 Canal 消费者(比如 MQ 客户端)读取和写入消息的目标主题,保持默认即可
canal.mq.topic=example
2.3 启动 Canal Server
cd /opt/canal/server/bin
./startup.sh# 查看日志
tail -f /opt/canal/server/logs/canal/canal.log
tail -f /opt/canal/server/logs/example/example.log

image-20250211153418697

image-20250211153400835

image-20250211153538656

可以看到日志没有明显报错,且进程已经启动,则表示Canal Server已经启动成功。

image-20250211153842261

三. 部署 Canal Adapter(同步到 Elasticsearch)

3.1 配置 Adapter

修改配置文件 /opt/canal/adapter/conf/application.yml

server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:mode: tcp # 客户端的模式,可选tcp kafka rocketMQflatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效zookeeperHosts:    # 对应集群模式下的zk地址syncBatchSize: 1000 # 每次同步的批数量retries: 0 # 重试次数, -1为无限重试timeout: # 同步超时时间, 单位毫秒accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111 #配置canal-server的地址canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:srcDataSources: # 源数据库配置defaultDScanal是测试数据库url: jdbc:mysql://<yourIP>:3306/canal?useUnicode=true&useSSL=true #数据库连接,canal是测试用的数据库username: root #数据库账号password: Pass@1234 #数据库密码canalAdapters: # 适配器列表- instance: example # canal实例名,和上述Server的配置一样groups: # 分组列表- groupId: g1 # 分组id, 如果是MQ模式将用到该值outerAdapters:- name: logger # 日志打印适配器- name: es8 # ES同步适配器根据自己的es版本来hosts: <your IP>:9200 # ES连接地址properties:mode: rest # 模式可选transport端口(9300) 或者 rest端口(9200)security.auth: elastic:123456 #  连接es的用户和密码,仅rest模式使用cluster.name: elasticsearch # ES集群名称

如何获取es集群名称,命令输出的cluster_name就是上面需要配置的集群名字:

curl -u elastic:<esPass> -X GET "http://<es IP>:9200/_cluster/health?pretty"

image-20250211170653195

3.2 配置数据映射

创建 Elasticsearch 映射文件 /opt/canal/adapter/conf/es8/mytest_user.yml

dataSourceKey: defaultDS # 源数据源的key, 对应上面application配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:_index: test_user # es 的索引名称_id: _id  # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配sql: "SELECTtb.id AS _id,tb.name,tb.sex,tb.telFROMtest_user us"        # sql映射etlCondition: "where p.id>={}"   #etl的条件参数commitBatch: 3000   # 提交批大小
3.3 启动 Adapter
cd /opt/canal/adapter/bin
./startup.sh#查看日志
tail -f /opt/canal/adapter/logs/adapter/adapter.log

会输出很多数据库变更的日志:

image-20250211171145018

image-20250211171208031

4. 验证同步

4.1 插入测试数据到 MySQL
#执行sql
INSERT INTO test_user (name, sex, tel) VALUES ('Paco', '男', '123456789');

image-20250211171534121

image-20250211171547780

4.2 查询 Elasticsearch
curl -u elastic:<esPass> -X GET "http://<esIP>:9200/test_user/_search?pretty"

也可以在工具上查看,这边是Eage插件:

image-20250211171753695

image-20250211171808091

至此,即可验证可同步成功。我们可以修改数据测试,看是否能同步。

image-20250211171849802

image-20250211172502715

然后我们测试修改Es的数据:

image-20250211172542248

image-20250211172555087

可以发现数据库并没有变,至此Canal单向实时同步Mysql BinlogElasticsearch就配置完成了。

相关文章:

从零搭建:Canal实时数据管道打通MySQL与Elasticsearch

Canal实时同步Mysql Binlog至 Elasticsearch 文章目录 Canal实时同步Mysql **Binlog**至**Elasticsearch** 一. 环境准备1.环境检查检查Mysql是否开启BinLog开启Mysql BinlogJava环境检查 2.新建测试库和表3.新建Es索引 二.**部署 Canal Server****2.1 解压安装包****2.2 配置 …...

Baumer工业相机堡盟工业相机如何通过BGAPI SDK实现一次触发控制三个光源开关分别采集三张图像(C#)

Baumer工业相机堡盟工业相机如何通过BGAPI SDK实现一次触发控制三个光源开关分别采集三张图像&#xff08;C#&#xff09; Baumer工业相机Baumer工业相机定序器功能的技术背景Baumer工业相机通过BGAPI SDK使用定序器功能预期的相机动作定序器的工作原理 Baumer工业相机通过BGAP…...

网络安全用centos干嘛 网络安全需要学linux吗

网络安全为啥要学Linux系统&#xff0c;据不完全统计&#xff0c;Linux系统在数据中心操作系统上的份额高达70%。它一般运行于服务器和超级计算机上。 所以我们日常访问的网站后台和app后端都是部署在Linux服务器上的&#xff0c;如果你不会Linux系统操作&#xff0c;那么很多…...

【React】react-redux+redux-toolkit实现状态管理

安装 npm install reduxjs/toolkit react-reduxRedux Toolkit 是官方推荐编写Redux的逻辑方式&#xff0c;用于简化书写方式React-redux 用来链接Redux和React组件之间的中间件 使用 定义数据 创建要管理的数据模块 store/module/counter.ts import { createSlice, Payloa…...

如何通过AI轻松制作PPT?让PPT一键生成变得简单又高效

如何通过AI轻松制作PPT&#xff1f;让PPT一键生成变得简单又高效&#xff01;在这个信息化飞速发展的时代&#xff0c;PPT已经成为我们日常工作、学习和生活中不可或缺的一部分。无论是公司会议、学术报告&#xff0c;还是个人展示&#xff0c;PPT的作用都不容忽视。很多人对于…...

Springer |第七届2025年区块链、人工智能和可信系统国际会议

Springer |第七届2025年区块链、人工智能和可信系统国际会议 International Conference on Blockchain, Artificial Intelligence, and Trustworthy Systems 【重要日期】 论文提交截止日期&#xff1a;2025年03月01日&#xff08;第2轮&#xff09; 会议报名截止日期&…...

新一代SCADA: 宏集Panorama Suite 2025 正式发布,提供更灵活、符合人体工学且安全的应用体验

宏集科技宣布正式推出全新Panorama Suite 2025 SCADA软件&#xff01;全新版本标志着 Panorama Suite的一个重要里程碑&#xff0c;代表了从 Panorama Suite 2022 开始并跨越三个版本&#xff08;2022、2023、2025&#xff09;的开发过程的顶峰。 此次重大发布集中在六个核心主…...

AI在电竞比分网中的主要应用场景

AI在电竞体育比分网的数据应用非常广泛&#xff0c;能够显著提升数据分析、预测、用户体验和商业价值。以下是AI在电竞比分网中的主要应用场景&#xff1a; 1. 实时数据采集与分析 比赛数据实时更新&#xff1a;AI通过自动化系统实时采集比赛数据&#xff08;如击杀数、经济差、…...

前端快速生成接口方法

大家好&#xff0c;我是苏麟&#xff0c;今天聊一下OpenApi。 官网 &#xff1a; umijs/openapi - npm 安装命令 npm i --save-dev umijs/openapi 在根目录&#xff08;项目目录下&#xff09;创建文件 openapi.config.js import { generateService } from umijs/openapi// 自…...

【Pico】使用Pico进行无线串流搜索不到电脑

使用Pico进行无线串流搜索不到电脑 官串方式&#xff1a;使用Pico互联连接电脑。 故障排查 以下来自官方文档 请按照以下步骡排除故障&#xff1a; 确认电脑和一体机连接了相同的路由器WiFi网络(相同网段) IP地址通常为192.168.XX&#xff0c;若两设备的IP地址前三段相同&…...

机柜机箱制冷风扇在使用过程中突然停止运转的原因

在机柜机箱的正常运行中&#xff0c;制冷风扇起着关键的散热作用&#xff0c;可一旦它在使用时突然停止运转&#xff0c;将会对机柜机箱内设备的稳定运行构成严重威胁。而导致这一现象出现的原因较为复杂&#xff0c;主要涵盖以下几个方面。 从电源供应角度来看&#xff0c;这是…...

Python函数返回值250214

import requests from xml.etree import ElementTree as ETdef xml_to_list(city):data_list []url "...".format(city) # 具体url地址就不写了res requests.get(url url)root ET.XML(res.text)for node in root:data_list.append(node.text)return dat_listres…...

call、apply、bind 详解

在 JavaScript 中&#xff0c;call、apply 和 bind 是 Function 对象的三个重要方法&#xff0c;它们都与函数的上下文&#xff08;this 值&#xff09;和参数传递有关。 一、call 方法 1. 语法 function.call(thisArg, arg1, arg2, ...) 2. 示例代码 const person {name…...

详解电子邮箱工作原理|SMTP、POP3、IMAP、SPF、MIME

写在前面 电子邮件&#xff08;Email&#xff09;是一种通过互联网进行异步通信的技术&#xff0c;工作原理涉及多个协议、服务器和客户端协同工作。 接下来我们来介绍一下电子邮箱的工作原理 1. 电子邮件的核心组成部分 邮件客户端&#xff1a;用户直接交互的软件&#xf…...

QT笔记——QPlainTextEdit

文章目录 1、概要2、文本设计2.1、设置文本2.1、字体样式&#xff08;大小、下划线、加粗、斜体&#xff09; 1、概要 QPlainTextEdit 是 Qt 框架中用于处理纯文本编辑的控件&#xff0c;具有轻量级和高效的特点&#xff0c;以下是它常见的应用场景&#xff1a; 文本编辑器&am…...

Qt使用pri和pro文件进行模块化编程

假如我想要做一个功能&#xff0c;这个功能用代码模块化实现出来&#xff0c;方便将来移植&#xff0c;比如音视频播放器的界面&#xff0c;将来想要在其他工程使用时&#xff0c;只需要将widget提升为音视频播放界面即可。 当我们其他工程需要这个功能时&#xff0c;我们在调用…...

Linux-文件基本操作

1.基本概念 文件: 一组相关数据的集合 文件名: 01.sh //文件名 2.linux下的文件类型 b block 块设备文件 eg: 硬盘 c character 字符设备文件 eg: 鼠标&#xff0c;键盘 d directory 目录文件 eg: 文件夹 - regular 常规文件…...

自己部署 DeepSeek 助力 Vue 开发:打造丝滑的时间线(Timeline )

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享一篇文章&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 目录 自己…...

初窥强大,AI识别技术实现图像转文字(OCR技术)

⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ &#x1f434;作者&#xff1a;秋无之地 &#x1f434;简介&#xff1a;CSDN爬虫、后端、大数据、人工智能领域创作者。目前从事python全栈、爬虫和人工智能等相关工作&#xff0c;主要擅长领域有&#xff1a;python…...

【Apache Paimon】-- 作为一名小白,如何系统地学习 Apache paimon?

目录 一、整体规划 1. 了解基本概念与背景 2. 学习资料的选择 3. 学习路径与规划 4. 学习建议 5. 注意事项 6. 参考学习资料 二、详细计划 阶段 1&#xff1a;了解基础&#xff08;1-2 周&#xff09; 阶段 2&#xff1a;深入掌握核心功能&#xff08;3-4 周&#xf…...

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

从零实现STL哈希容器:unordered_map/unordered_set封装详解

本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说&#xff0c;直接开始吧&#xff01; 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心

当仓库学会“思考”&#xff0c;物流的终极形态正在诞生 想象这样的场景&#xff1a; 凌晨3点&#xff0c;某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径&#xff1b;AI视觉系统在0.1秒内扫描包裹信息&#xff1b;数字孪生平台正模拟次日峰值流量压力…...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?

在大数据处理领域&#xff0c;Hive 作为 Hadoop 生态中重要的数据仓库工具&#xff0c;其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式&#xff0c;很多开发者常常陷入选择困境。本文将从底…...

C++使用 new 来创建动态数组

问题&#xff1a; 不能使用变量定义数组大小 原因&#xff1a; 这是因为数组在内存中是连续存储的&#xff0c;编译器需要在编译阶段就确定数组的大小&#xff0c;以便正确地分配内存空间。如果允许使用变量来定义数组的大小&#xff0c;那么编译器就无法在编译时确定数组的大…...

纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join

纯 Java 项目&#xff08;非 SpringBoot&#xff09;集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...

C#学习第29天:表达式树(Expression Trees)

目录 什么是表达式树&#xff1f; 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持&#xff1a; 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...

怎么让Comfyui导出的图像不包含工作流信息,

为了数据安全&#xff0c;让Comfyui导出的图像不包含工作流信息&#xff0c;导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo&#xff08;推荐&#xff09;​​ 在 save_images 方法中&#xff0c;​​删除或注释掉所有与 metadata …...