Docker 中使用 PHP 通过 Canal 同步 Mysql 数据到 ElasticSearch
一、Mysql 的安装和配置
1.使用 docker 安装 mysql,并且映射端口和 root 账号的密码
# 获取镜像
docker pull mysql:8.0.40-debian# 查看镜像是否下载成功
docker images# 运行msyql镜像
docker run -d -p 3388:3306 --name super-mysql -e MYSQL_ROOT_PASSWORD=123456 mysql:8.0.40-debian
2.连接 mysql,检查是否正确安装
使用 Navicat 连接

使用 linux 命令行连接,172.21.121.208 是我本地映射的ip地址,这里换成对应 ip 即可

3.修改 mysql 的配置文件
# 进入刚刚安装的 mysql 容器
docker exec -it super-mysql /bin/bash# 查找 mysql 的配置文件 my.cnf
find / -name my.cnf 2>/dev/null#显示:
# /var/lib/dpkg/alternatives/my.cnf
# /etc/alternatives/my.cnf
# /etc/mysql/my.cnf# 修改配置文件 my.cnf
vim /etc/mysql/my.cnf# 开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复# 如果没有vim,则需 先安装vim,执行如下命令
# 更新源
apt update# 安装vim
apt install vim
4.授权 canal 链接 mysql 账号具有作为 mysql slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
运行截图如下

使用刚刚新建的账号: canal 密码: canal 连接 mysql,成功了才往下配置

二、Canal 的安装和配置
1.使用 docker 安装 canal
# 获取镜像
docker pull canal/canal-server:latest# 查看镜像是否下载成功
docker images# 运行msyql镜像
docker run --name super-canal -p 3399:11111 -d canal/canal-server:latest
2.修改 canal 配置,并且重启 canal
# 进入刚刚安装的 canal 容器
docker exec -it super-canal /bin/bash# 查找 canal 的配置文件 instance.properties
find / -name 'instance.properties' 2>/dev/null
# 显示:
# /home/admin/canal-server/conf/example/instance.properties# 修改配置文件 instance.properties
vi /home/admin/canal-server/conf/example/instance.propertiescanal.instance.master.address=连接mysql的ip:port
canal.instance.dbUsername=连接mysql的账号
canal.instance.dbPassword=连接mysql的密码# 重启 canal 服务
find / -name 'stop.sh' 2>/dev/null # 查找停止 canal 命令
bash /home/admin/canal-server/bin/stop.sh # 停止 canalfind / -name 'startup.sh' 2>/dev/null # 查找重启 canal 命令
bash /home/admin/canal-server/bin/startup.sh # 重启 canal
主要修改连接 msyql 的 ip 和 port,使用的账号和密码,修改配置如下:

三、使用 PHP 测试 Canal 是否监听到了 Mysql 的变化
1.初始化项目
# 创建项目文件夹 canal-elasticsearch,并且进入到项目文件夹# composer 初始化项目
composer init
# 一直按回车键 Enter# 安装 cannal 依赖
composer require xingwenge/canal_php
2.在 src 目录下新建文件 index.php ,写入一下代码
<?php
require __DIR__.'/../vendor/autoload.php';
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\Fmt;try {$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);$client->connect("172.21.121.208", 3399);$client->checkValid();$client->subscribe("1001", "example", ".*\\..*");# $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤while (true) {$message = $client->get(100);if ($entries = $message->getEntries()) {foreach ($entries as $entry) {Fmt::println($entry);}}sleep(1);}$client->disConnect();
} catch (\Exception $e) {echo $e->getMessage(), PHP_EOL;
}
3.命令行中启动脚本 php ./src/index.php
4.在 mysql 中新建表或者新增数据,就会在命令行中打印出来
# 创建数据表
CREATE TABLE `user` (`id` int unsigned NOT NULL AUTO_INCREMENT,`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',`age` int unsigned NOT NULL DEFAULT '0',`created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=18 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

注意:如果新增数据表或者其它情况,导致 canal 突然连接不上 mysql 了,建议停止 canal,且删除当前的 canal 容器,重新使用 docker 安装 canal,这样解决起来比较迅速(坏笑)
四、ElasticSearch 的安装和配置
1.使用 docker 安装 elasticsearch
# 获取镜像
docker pull registry.cn-hangzhou.aliyuncs.com/xka/es:7.11.2-210328-1# 查看镜像是否下载成功
docker images# 创建数据文件夹
mkdir /mnt/d/Work/Code/docker/elasticsearch/data# 运行 elasticsearch 镜像
docker run --name super-elasticsearch -d \-p 3320:9200 -p 3330:9300 \-e "ES_JAVA_OPTS=-Xms4g -Xmx16g" \-e "discovery.type=single-node" \-v /mnt/d/Work/Code/docker/elasticsearch/data:/usr/share/elasticsearch/data \registry.cn-hangzhou.aliyuncs.com/xka/es:7.11.2-210328-1
2.测试 elasticsearch 是否安装成功, 在浏览器地址栏输入:127.0.0.1:3320,出现如下画面表示安装成功了

五、使用 PHP 通过 Canal 同步 Mysql 数据到 ElasticSearch
1.在 php 中使用 composer 安装 elasticsearch 依赖包
# 使用 composer 安装 elasticsearch 依赖包
composer require elasticsearch/elasticsearch
2.使用 php 在 elasticsearch 中创建 mapping,在 src 目录下创建文件 creatElasticSearchMapping.php
<?php
require 'vendor/autoload.php';use Elasticsearch\ClientBuilder;
use Elasticsearch\Common\Exceptions\BadRequest400Exception;// 创建 Elasticsearch 客户端
$client = ClientBuilder::create()->setHosts(['localhost:3320']) // 设置 Elasticsearch 主机和端口->build();// 索引名称
$indexName = 'canal_user_index';// 索引设置和映射
$params = ['index' => $indexName,'body' => ['settings' => ['number_of_shards' => 1, // 分片数量'number_of_replicas' => 0 // 副本数量],'mappings' => ['properties' => ['name' => ['type' => 'text'],'age' => ['type' => 'integer'],'email' => ['type' => 'keyword'],'created_at' => ['type' => 'date', 'format' => 'yyyy-MM-dd HH:mm:ss'],'updated_at' => ['type' => 'date', 'format' => 'yyyy-MM-dd HH:mm:ss'],]]]
];try {// 创建索引$response = $client->indices()->create($params);echo "索引 '$indexName' 创建成功:\n";print_r($response);
} catch (BadRequest400Exception $e) {// 如果索引已经存在if (strpos($e->getMessage(), 'index_already_exists_exception') !== false) {echo "索引 '$indexName' 已经存在。\n";} else {echo "创建索引时发生错误: " . $e->getMessage() . "\n";}
} catch (\Exception $e) {echo "创建索引时发生错误: " . $e->getMessage() . "\n";
}
3.检查elasticsearch中的mapping是否创建成功
curl -XGET 'http://localhost:3320/canal_user_index/_mapping'
提示如图

4.在 php 中通过 canal 获取 msyql 的数据变化,且更新到 elasticsearch 中,数据的增删改代码都在下面了,在 src 目录下创建文件 canalToElasticSearch.php
<?php
require __DIR__.'/../vendor/autoload.php';
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use Com\Alibaba\Otter\Canal\Protocol\RowData;
use Elasticsearch\ClientBuilder;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;$clientES = ClientBuilder::create()->setHosts(['localhost:3320'])->setSSLVerification(false) // 禁用 SSL 验证(仅用于开发环境)->setRetries(3) // 设置重试次数->build();// 索引名称
$indexName = 'canal_user_index';try {$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);$client->connect("172.21.121.208", 3399);$client->checkValid();$client->subscribe("1001", "example", ".*\\..*");echo "script start success!";while (true) {$message = $client->get(100);if ($entries = $message->getEntries()) {foreach ($entries as $entry) {Fmt::println($entry);$rowChange = new RowChange();$rowChange->mergeFromString($entry->getStoreValue());$evenType = $rowChange->getEventType();$header = $entry->getHeader();/** @var RowData $rowData */foreach ($rowChange->getRowDatas() as $rowData) {switch ($evenType) {/** 删除数据 */case EventType::DELETE:if ($rowData->getAfterColumns()) {foreach ($rowData->getAfterColumns() as $column) {if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($id) && $clientES->exists(['index' => $indexName, 'id' => $id])) {$response = $clientES->delete(['index' => $indexName, 'type' => '_doc', 'id' => $id]);}}break;/** 新增数据 */case EventType::INSERT:$insertData = [];if ($rowData->getAfterColumns()) {foreach ($rowData->getAfterColumns() as $column) {$insertData = array_merge($insertData, [$column->getName() => $column->getValue()]);if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($insertData)) {$params = ['index' => $indexName, 'body' => $insertData];if (!empty($id)) $params['id'] = $id;$response = $clientES->index($params);}}break;default:/** 更新数据 */if ($rowData->getAfterColumns()) {$updateData = [];foreach ($rowData->getAfterColumns() as $column) {$updateData = array_merge($updateData, [$column->getName() => $column->getValue()]);if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($id) && !empty($updateData)) {$params = ['index' => $indexName,'id' => $id,'body' => ['doc' => $updateData],];if ($clientES->exists(['index' => $indexName, 'id' => $id])) {$response = $clientES->update($params);} else {$updateData['id'] = $id;$params['body'] = $updateData;$response = $clientES->index($params);}}}break;}}}}sleep(1);}
} catch (\Exception $e) {echo $e->getMessage(), PHP_EOL;
}
5.执行php脚本,监听数据变化

我这里在 mysql 中添加了一些数据,都同步到 elasticsearch 里面了
msyql 中的截图:

elasticsearch 中的截图

看到这里,辛苦了。
感觉自己今天又又又变得比昨天更强了
参考文档链接:
1.Mysql 数据库 主从数据库 (主从)(主主)-CSDN博客
2.https://github.com/xingwenge/canal-php
相关文章:
Docker 中使用 PHP 通过 Canal 同步 Mysql 数据到 ElasticSearch
一、Mysql 的安装和配置 1.使用 docker 安装 mysql,并且映射端口和 root 账号的密码 # 获取镜像 docker pull mysql:8.0.40-debian# 查看镜像是否下载成功 docker images# 运行msyql镜像 docker run -d -p 3388:3306 --name super-mysql -e MYSQL_ROOT_PASSWORD12…...
数据结构之五:排序
void*类型的实现:排序(void*类型)-CSDN博客 一、插入排序 1、直接插入排序 思想:把待排序的数据逐个插入到一个已经排好序的有序序列中,直到所有的记录插入完为止,得到一个新的有序序列 。 单趟&#x…...
科研绘图系列:R语言绘制热图和散点图以及箱线图(pheatmap, scatterplot boxplot)
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍加载R包数据下载图1图2图3系统信息参考介绍 R语言绘制热图和散点图以及箱线图(pheatmap, scatterplot & boxplot) 加载R包 library(magrittr) library(dplyr) library(ve…...
基于 webRTC Vue 的局域网 文件传输工具
文件传输工具,匿名加密,只需访问网页,即可连接到其他设备,基于 webRTC 和 Vue.js coturn TURN 服务器 docker pull coturn/coturn docker run -d --networkhost \-v $(pwd)/my.conf:/etc/coturn/turnserver.conf \coturn/coturn…...
LeetCode 718. 最长重复子数组 java题解
https://leetcode.cn/problems/maximum-length-of-repeated-subarray/description/ 动态规划 class Solution {public int findLength(int[] nums1, int[] nums2) {int len1nums1.length,len2nums2.length;int[][] dpnew int[len11][len21];dp[0][0]0;//没有意义,…...
算法知识-15-深搜
一、概念 深度优先搜索(Deep First Search, DFS)是一种用于遍历或搜索树或图的算法。这种策略沿着树的深度遍历树的节点,尽可能深地搜索树的分支。 二、关键步骤 选择起点:根据题目要求,选择一个或多个节点作为搜索…...
区块链dapp 开发详解(VUE3.0)
1、安装metamask 插件。 2、使用封装的工具包: wagmi . 3、 wagmi 操作手册地址:connect | Wagmi 4、注意事项: 因为最初是react 版本,所以在VUE版的官方文档有很多地方在 import 用的是 wagmi,需要改为 wagmi/vue 。 连接成功后打印的内容如下&…...
Plugin [id: ‘flutter‘] was not found in any of the following sources解决方法
文章目录 错误描述解决方法修正方案:继续使用 apply from修正后的 build.gradle说明警告的处理进一步验证 错误描述 Plugin [id: ‘flutter’] was not found in any of the following sources: Gradle Core Plugins (not a core plugin, please see https://docs…...
专升本-高数 1
第 0 章,基础知识 一,重要公式 1、完全平方 (ab)a2abb (a-b)a-2abb 2、平方差公式 (a-b)(ab)a-b 3、立方差公式 a-b(a-b)(aabb) 4、 立方和公式 ab(ab)(a-abb) 二,基本初等函数 1,幂函数 一元二…...
【考前预习】3.计算机网络—数据链路层
往期推荐 【考前预习】2.计算机网络—物理层-CSDN博客 【考前预习】1.计算机网络概述-CSDN博客 浅谈云原生--微服务、CICD、Serverless、服务网格_云原生cicd-CSDN博客 子网掩码、网络地址、广播地址、子网划分及计算_子网广播地址-CSDN博客 浅学React和JSX-CSDN博客 目录 1.数…...
DockeUI 弱口令登录漏洞+未授权信息泄露
0x01 产品描述: DockerUI是一款开源的、强大的、轻量级的Docker管理工具。DockerUI覆盖了 docker cli 命令行 95% 以上的命令功能,通过可视化的界面,即使是不熟悉docker命令的用户也可以非常方便的进行Docker和Docker Swarm集群进行管理和维护。0x02 漏洞描述: DockerUI中存…...
【电子元器件】电感基础知识
本文章是笔者整理的备忘笔记。希望在帮助自己温习避免遗忘的同时,也能帮助其他需要参考的朋友。如有谬误,欢迎大家进行指正。 一、 电感的基本工作原理 1. 电感的基本工作原理如下: (1) 当线圈中有电流通过时&#…...
【SSH+X11】VsCode使用Remote-SSH在远程服务器的docker中打开Rviz
🚀今天来分享一下通过VsCode的Remote-SSH插件在远程服务器的docker中打开Rviz进行可视化的方法。 具体流程如下图所示,在操作开始前,请先重启设备,排除之前运行配置的影响: ⭐️ 我这里是使用主机连接服务器ÿ…...
Vue Web开发(五)
1. axios axios官方文档 异步库axios和mockjs模拟后端数据,axios是一个基于promise的HTTP库,使用npm i axios。在main.js中引入,需要绑定在Vue的prototype属性上,并重命名。 (1)main.js文件引用 imp…...
HarmonyOS:使用Grid构建网格
一、概述 网格布局是由“行”和“列”分割的单元格所组成,通过指定“项目”所在的单元格做出各种各样的布局。网格布局具有较强的页面均分能力,子组件占比控制能力,是一种重要自适应布局,其使用场景有九宫格图片展示、日历、计算器…...
开源Java快速自测工具,可以调用系统内任意一个方法
java快速测试框架,可以调到系统内任意一个方法,告别写单测和controller的困扰。 开源地址:https://gitee.com/missyouch/Easy-JTest 我们在开发时很多时候想要测试下自己的代码,特别是service层或者是更底层的代码,就…...
力扣刷题TOP101: 29.BM36 判断是不是平衡二叉树
目录: 目的 思路 复杂度 记忆秘诀 python代码 目的: 输入一棵节点数为 n 二叉树,判断该二叉树是否是平衡二叉树。 思路 什么是平衡二叉树(AVL 树)? 每个节点的左子树和右子树的高度差不能超过 1。确保…...
【在Linux世界中追寻伟大的One Piece】自旋锁
目录 1 -> 概述 2 -> 原理 3 -> 优缺点及使用场景 3.1 -> 优点 3.2 -> 缺点 3.3 -> 使用场景 4 -> 纯软件自旋锁类似的原理实现 4.1 -> 结论 5 -> 样例代码 1 -> 概述 自旋锁是一种多线程同步机制,用于保护共享资源避免受并…...
前端编辑器JSON HTML等,vue2-ace-editor,vue3-ace-editor
与框架无关 vue2-ace-editor有问题,ace拿不到(brace) 一些组件都是基于ace-builds或者brace包装的 不如直接用下面的,不如直接使用下面的 <template><div ref"editor" class"json-editor"><…...
C++ 中的运算符重载
运算符重载是C中的一种特性,它允许开发者为自定义类型定义或改变标准运算符的行为。通过运算符重载,你可以使得用户定义的类像内置类型一样使用运算符,比如加法、减法、赋值等。 如何在C中进行运算符重载? 重载运算符的语法&#…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...
【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器
一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...
基础测试工具使用经验
背景 vtune,perf, nsight system等基础测试工具,都是用过的,但是没有记录,都逐渐忘了。所以写这篇博客总结记录一下,只要以后发现新的用法,就记得来编辑补充一下 perf 比较基础的用法: 先改这…...
【算法训练营Day07】字符串part1
文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接:344. 反转字符串 双指针法,两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...
c#开发AI模型对话
AI模型 前面已经介绍了一般AI模型本地部署,直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型,但是目前国内可能使用不多,至少实践例子很少看见。开发训练模型就不介绍了&am…...
学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”
2025年#高考 将在近日拉开帷幕,#AI 监考一度冲上热搜。当AI深度融入高考,#时间同步 不再是辅助功能,而是决定AI监考系统成败的“生命线”。 AI亮相2025高考,40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕,江西、…...
并发编程 - go版
1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...
Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换
目录 关键点 技术实现1 技术实现2 摘要: 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式(自动驾驶、人工驾驶、远程驾驶、主动安全),并通过实时消息推送更新车…...
倒装芯片凸点成型工艺
UBM(Under Bump Metallization)与Bump(焊球)形成工艺流程。我们可以将整张流程图分为三大阶段来理解: 🔧 一、UBM(Under Bump Metallization)工艺流程(黄色区域ÿ…...
持续交付的进化:从DevOps到AI驱动的IT新动能
文章目录 一、持续交付的本质:从手动到自动的交付飞跃关键特性案例:电商平台的高效部署 二、持续交付的演进:从CI到AI驱动的未来发展历程 中国…...
