Docker 中使用 PHP 通过 Canal 同步 Mysql 数据到 ElasticSearch
一、Mysql 的安装和配置
1.使用 docker 安装 mysql,并且映射端口和 root 账号的密码
# 获取镜像
docker pull mysql:8.0.40-debian# 查看镜像是否下载成功
docker images# 运行msyql镜像
docker run -d -p 3388:3306 --name super-mysql -e MYSQL_ROOT_PASSWORD=123456 mysql:8.0.40-debian
2.连接 mysql,检查是否正确安装
使用 Navicat 连接
使用 linux 命令行连接,172.21.121.208 是我本地映射的ip地址,这里换成对应 ip 即可
3.修改 mysql 的配置文件
# 进入刚刚安装的 mysql 容器
docker exec -it super-mysql /bin/bash# 查找 mysql 的配置文件 my.cnf
find / -name my.cnf 2>/dev/null#显示:
# /var/lib/dpkg/alternatives/my.cnf
# /etc/alternatives/my.cnf
# /etc/mysql/my.cnf# 修改配置文件 my.cnf
vim /etc/mysql/my.cnf# 开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复# 如果没有vim,则需 先安装vim,执行如下命令
# 更新源
apt update# 安装vim
apt install vim
4.授权 canal 链接 mysql 账号具有作为 mysql slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
运行截图如下
使用刚刚新建的账号: canal 密码: canal 连接 mysql,成功了才往下配置
二、Canal 的安装和配置
1.使用 docker 安装 canal
# 获取镜像
docker pull canal/canal-server:latest# 查看镜像是否下载成功
docker images# 运行msyql镜像
docker run --name super-canal -p 3399:11111 -d canal/canal-server:latest
2.修改 canal 配置,并且重启 canal
# 进入刚刚安装的 canal 容器
docker exec -it super-canal /bin/bash# 查找 canal 的配置文件 instance.properties
find / -name 'instance.properties' 2>/dev/null
# 显示:
# /home/admin/canal-server/conf/example/instance.properties# 修改配置文件 instance.properties
vi /home/admin/canal-server/conf/example/instance.propertiescanal.instance.master.address=连接mysql的ip:port
canal.instance.dbUsername=连接mysql的账号
canal.instance.dbPassword=连接mysql的密码# 重启 canal 服务
find / -name 'stop.sh' 2>/dev/null # 查找停止 canal 命令
bash /home/admin/canal-server/bin/stop.sh # 停止 canalfind / -name 'startup.sh' 2>/dev/null # 查找重启 canal 命令
bash /home/admin/canal-server/bin/startup.sh # 重启 canal
主要修改连接 msyql 的 ip 和 port,使用的账号和密码,修改配置如下:
三、使用 PHP 测试 Canal 是否监听到了 Mysql 的变化
1.初始化项目
# 创建项目文件夹 canal-elasticsearch,并且进入到项目文件夹# composer 初始化项目
composer init
# 一直按回车键 Enter# 安装 cannal 依赖
composer require xingwenge/canal_php
2.在 src 目录下新建文件 index.php ,写入一下代码
<?php
require __DIR__.'/../vendor/autoload.php';
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\Fmt;try {$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);$client->connect("172.21.121.208", 3399);$client->checkValid();$client->subscribe("1001", "example", ".*\\..*");# $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤while (true) {$message = $client->get(100);if ($entries = $message->getEntries()) {foreach ($entries as $entry) {Fmt::println($entry);}}sleep(1);}$client->disConnect();
} catch (\Exception $e) {echo $e->getMessage(), PHP_EOL;
}
3.命令行中启动脚本 php ./src/index.php
4.在 mysql 中新建表或者新增数据,就会在命令行中打印出来
# 创建数据表
CREATE TABLE `user` (`id` int unsigned NOT NULL AUTO_INCREMENT,`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',`age` int unsigned NOT NULL DEFAULT '0',`created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=18 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
注意:如果新增数据表或者其它情况,导致 canal 突然连接不上 mysql 了,建议停止 canal,且删除当前的 canal 容器,重新使用 docker 安装 canal,这样解决起来比较迅速(坏笑)
四、ElasticSearch 的安装和配置
1.使用 docker 安装 elasticsearch
# 获取镜像
docker pull registry.cn-hangzhou.aliyuncs.com/xka/es:7.11.2-210328-1# 查看镜像是否下载成功
docker images# 创建数据文件夹
mkdir /mnt/d/Work/Code/docker/elasticsearch/data# 运行 elasticsearch 镜像
docker run --name super-elasticsearch -d \-p 3320:9200 -p 3330:9300 \-e "ES_JAVA_OPTS=-Xms4g -Xmx16g" \-e "discovery.type=single-node" \-v /mnt/d/Work/Code/docker/elasticsearch/data:/usr/share/elasticsearch/data \registry.cn-hangzhou.aliyuncs.com/xka/es:7.11.2-210328-1
2.测试 elasticsearch 是否安装成功, 在浏览器地址栏输入:127.0.0.1:3320,出现如下画面表示安装成功了
五、使用 PHP 通过 Canal 同步 Mysql 数据到 ElasticSearch
1.在 php 中使用 composer 安装 elasticsearch 依赖包
# 使用 composer 安装 elasticsearch 依赖包
composer require elasticsearch/elasticsearch
2.使用 php 在 elasticsearch 中创建 mapping,在 src 目录下创建文件 creatElasticSearchMapping.php
<?php
require 'vendor/autoload.php';use Elasticsearch\ClientBuilder;
use Elasticsearch\Common\Exceptions\BadRequest400Exception;// 创建 Elasticsearch 客户端
$client = ClientBuilder::create()->setHosts(['localhost:3320']) // 设置 Elasticsearch 主机和端口->build();// 索引名称
$indexName = 'canal_user_index';// 索引设置和映射
$params = ['index' => $indexName,'body' => ['settings' => ['number_of_shards' => 1, // 分片数量'number_of_replicas' => 0 // 副本数量],'mappings' => ['properties' => ['name' => ['type' => 'text'],'age' => ['type' => 'integer'],'email' => ['type' => 'keyword'],'created_at' => ['type' => 'date', 'format' => 'yyyy-MM-dd HH:mm:ss'],'updated_at' => ['type' => 'date', 'format' => 'yyyy-MM-dd HH:mm:ss'],]]]
];try {// 创建索引$response = $client->indices()->create($params);echo "索引 '$indexName' 创建成功:\n";print_r($response);
} catch (BadRequest400Exception $e) {// 如果索引已经存在if (strpos($e->getMessage(), 'index_already_exists_exception') !== false) {echo "索引 '$indexName' 已经存在。\n";} else {echo "创建索引时发生错误: " . $e->getMessage() . "\n";}
} catch (\Exception $e) {echo "创建索引时发生错误: " . $e->getMessage() . "\n";
}
3.检查elasticsearch中的mapping是否创建成功
curl -XGET 'http://localhost:3320/canal_user_index/_mapping'
提示如图
4.在 php 中通过 canal 获取 msyql 的数据变化,且更新到 elasticsearch 中,数据的增删改代码都在下面了,在 src 目录下创建文件 canalToElasticSearch.php
<?php
require __DIR__.'/../vendor/autoload.php';
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use Com\Alibaba\Otter\Canal\Protocol\RowData;
use Elasticsearch\ClientBuilder;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;$clientES = ClientBuilder::create()->setHosts(['localhost:3320'])->setSSLVerification(false) // 禁用 SSL 验证(仅用于开发环境)->setRetries(3) // 设置重试次数->build();// 索引名称
$indexName = 'canal_user_index';try {$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);$client->connect("172.21.121.208", 3399);$client->checkValid();$client->subscribe("1001", "example", ".*\\..*");echo "script start success!";while (true) {$message = $client->get(100);if ($entries = $message->getEntries()) {foreach ($entries as $entry) {Fmt::println($entry);$rowChange = new RowChange();$rowChange->mergeFromString($entry->getStoreValue());$evenType = $rowChange->getEventType();$header = $entry->getHeader();/** @var RowData $rowData */foreach ($rowChange->getRowDatas() as $rowData) {switch ($evenType) {/** 删除数据 */case EventType::DELETE:if ($rowData->getAfterColumns()) {foreach ($rowData->getAfterColumns() as $column) {if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($id) && $clientES->exists(['index' => $indexName, 'id' => $id])) {$response = $clientES->delete(['index' => $indexName, 'type' => '_doc', 'id' => $id]);}}break;/** 新增数据 */case EventType::INSERT:$insertData = [];if ($rowData->getAfterColumns()) {foreach ($rowData->getAfterColumns() as $column) {$insertData = array_merge($insertData, [$column->getName() => $column->getValue()]);if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($insertData)) {$params = ['index' => $indexName, 'body' => $insertData];if (!empty($id)) $params['id'] = $id;$response = $clientES->index($params);}}break;default:/** 更新数据 */if ($rowData->getAfterColumns()) {$updateData = [];foreach ($rowData->getAfterColumns() as $column) {$updateData = array_merge($updateData, [$column->getName() => $column->getValue()]);if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($id) && !empty($updateData)) {$params = ['index' => $indexName,'id' => $id,'body' => ['doc' => $updateData],];if ($clientES->exists(['index' => $indexName, 'id' => $id])) {$response = $clientES->update($params);} else {$updateData['id'] = $id;$params['body'] = $updateData;$response = $clientES->index($params);}}}break;}}}}sleep(1);}
} catch (\Exception $e) {echo $e->getMessage(), PHP_EOL;
}
5.执行php脚本,监听数据变化
我这里在 mysql 中添加了一些数据,都同步到 elasticsearch 里面了
msyql 中的截图:
elasticsearch 中的截图
看到这里,辛苦了。
感觉自己今天又又又变得比昨天更强了
参考文档链接:
1.Mysql 数据库 主从数据库 (主从)(主主)-CSDN博客
2.https://github.com/xingwenge/canal-php
相关文章:

Docker 中使用 PHP 通过 Canal 同步 Mysql 数据到 ElasticSearch
一、Mysql 的安装和配置 1.使用 docker 安装 mysql,并且映射端口和 root 账号的密码 # 获取镜像 docker pull mysql:8.0.40-debian# 查看镜像是否下载成功 docker images# 运行msyql镜像 docker run -d -p 3388:3306 --name super-mysql -e MYSQL_ROOT_PASSWORD12…...

数据结构之五:排序
void*类型的实现:排序(void*类型)-CSDN博客 一、插入排序 1、直接插入排序 思想:把待排序的数据逐个插入到一个已经排好序的有序序列中,直到所有的记录插入完为止,得到一个新的有序序列 。 单趟&#x…...

科研绘图系列:R语言绘制热图和散点图以及箱线图(pheatmap, scatterplot boxplot)
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍加载R包数据下载图1图2图3系统信息参考介绍 R语言绘制热图和散点图以及箱线图(pheatmap, scatterplot & boxplot) 加载R包 library(magrittr) library(dplyr) library(ve…...
基于 webRTC Vue 的局域网 文件传输工具
文件传输工具,匿名加密,只需访问网页,即可连接到其他设备,基于 webRTC 和 Vue.js coturn TURN 服务器 docker pull coturn/coturn docker run -d --networkhost \-v $(pwd)/my.conf:/etc/coturn/turnserver.conf \coturn/coturn…...
LeetCode 718. 最长重复子数组 java题解
https://leetcode.cn/problems/maximum-length-of-repeated-subarray/description/ 动态规划 class Solution {public int findLength(int[] nums1, int[] nums2) {int len1nums1.length,len2nums2.length;int[][] dpnew int[len11][len21];dp[0][0]0;//没有意义,…...

算法知识-15-深搜
一、概念 深度优先搜索(Deep First Search, DFS)是一种用于遍历或搜索树或图的算法。这种策略沿着树的深度遍历树的节点,尽可能深地搜索树的分支。 二、关键步骤 选择起点:根据题目要求,选择一个或多个节点作为搜索…...
区块链dapp 开发详解(VUE3.0)
1、安装metamask 插件。 2、使用封装的工具包: wagmi . 3、 wagmi 操作手册地址:connect | Wagmi 4、注意事项: 因为最初是react 版本,所以在VUE版的官方文档有很多地方在 import 用的是 wagmi,需要改为 wagmi/vue 。 连接成功后打印的内容如下&…...

Plugin [id: ‘flutter‘] was not found in any of the following sources解决方法
文章目录 错误描述解决方法修正方案:继续使用 apply from修正后的 build.gradle说明警告的处理进一步验证 错误描述 Plugin [id: ‘flutter’] was not found in any of the following sources: Gradle Core Plugins (not a core plugin, please see https://docs…...

专升本-高数 1
第 0 章,基础知识 一,重要公式 1、完全平方 (ab)a2abb (a-b)a-2abb 2、平方差公式 (a-b)(ab)a-b 3、立方差公式 a-b(a-b)(aabb) 4、 立方和公式 ab(ab)(a-abb) 二,基本初等函数 1,幂函数 一元二…...

【考前预习】3.计算机网络—数据链路层
往期推荐 【考前预习】2.计算机网络—物理层-CSDN博客 【考前预习】1.计算机网络概述-CSDN博客 浅谈云原生--微服务、CICD、Serverless、服务网格_云原生cicd-CSDN博客 子网掩码、网络地址、广播地址、子网划分及计算_子网广播地址-CSDN博客 浅学React和JSX-CSDN博客 目录 1.数…...

DockeUI 弱口令登录漏洞+未授权信息泄露
0x01 产品描述: DockerUI是一款开源的、强大的、轻量级的Docker管理工具。DockerUI覆盖了 docker cli 命令行 95% 以上的命令功能,通过可视化的界面,即使是不熟悉docker命令的用户也可以非常方便的进行Docker和Docker Swarm集群进行管理和维护。0x02 漏洞描述: DockerUI中存…...

【电子元器件】电感基础知识
本文章是笔者整理的备忘笔记。希望在帮助自己温习避免遗忘的同时,也能帮助其他需要参考的朋友。如有谬误,欢迎大家进行指正。 一、 电感的基本工作原理 1. 电感的基本工作原理如下: (1) 当线圈中有电流通过时&#…...

【SSH+X11】VsCode使用Remote-SSH在远程服务器的docker中打开Rviz
🚀今天来分享一下通过VsCode的Remote-SSH插件在远程服务器的docker中打开Rviz进行可视化的方法。 具体流程如下图所示,在操作开始前,请先重启设备,排除之前运行配置的影响: ⭐️ 我这里是使用主机连接服务器ÿ…...

Vue Web开发(五)
1. axios axios官方文档 异步库axios和mockjs模拟后端数据,axios是一个基于promise的HTTP库,使用npm i axios。在main.js中引入,需要绑定在Vue的prototype属性上,并重命名。 (1)main.js文件引用 imp…...

HarmonyOS:使用Grid构建网格
一、概述 网格布局是由“行”和“列”分割的单元格所组成,通过指定“项目”所在的单元格做出各种各样的布局。网格布局具有较强的页面均分能力,子组件占比控制能力,是一种重要自适应布局,其使用场景有九宫格图片展示、日历、计算器…...

开源Java快速自测工具,可以调用系统内任意一个方法
java快速测试框架,可以调到系统内任意一个方法,告别写单测和controller的困扰。 开源地址:https://gitee.com/missyouch/Easy-JTest 我们在开发时很多时候想要测试下自己的代码,特别是service层或者是更底层的代码,就…...
力扣刷题TOP101: 29.BM36 判断是不是平衡二叉树
目录: 目的 思路 复杂度 记忆秘诀 python代码 目的: 输入一棵节点数为 n 二叉树,判断该二叉树是否是平衡二叉树。 思路 什么是平衡二叉树(AVL 树)? 每个节点的左子树和右子树的高度差不能超过 1。确保…...

【在Linux世界中追寻伟大的One Piece】自旋锁
目录 1 -> 概述 2 -> 原理 3 -> 优缺点及使用场景 3.1 -> 优点 3.2 -> 缺点 3.3 -> 使用场景 4 -> 纯软件自旋锁类似的原理实现 4.1 -> 结论 5 -> 样例代码 1 -> 概述 自旋锁是一种多线程同步机制,用于保护共享资源避免受并…...

前端编辑器JSON HTML等,vue2-ace-editor,vue3-ace-editor
与框架无关 vue2-ace-editor有问题,ace拿不到(brace) 一些组件都是基于ace-builds或者brace包装的 不如直接用下面的,不如直接使用下面的 <template><div ref"editor" class"json-editor"><…...
C++ 中的运算符重载
运算符重载是C中的一种特性,它允许开发者为自定义类型定义或改变标准运算符的行为。通过运算符重载,你可以使得用户定义的类像内置类型一样使用运算符,比如加法、减法、赋值等。 如何在C中进行运算符重载? 重载运算符的语法&#…...

使用VSCode开发Django指南
使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架,专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用,其中包含三个使用通用基本模板的页面。在此…...

MFC内存泄露
1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...

【入坑系列】TiDB 强制索引在不同库下不生效问题
文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...
2024年赣州旅游投资集团社会招聘笔试真
2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码
目录 一、👨🎓网站题目 二、✍️网站描述 三、📚网站介绍 四、🌐网站效果 五、🪓 代码实现 🧱HTML 六、🥇 如何让学习不再盲目 七、🎁更多干货 一、👨…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
C#中的CLR属性、依赖属性与附加属性
CLR属性的主要特征 封装性: 隐藏字段的实现细节 提供对字段的受控访问 访问控制: 可单独设置get/set访问器的可见性 可创建只读或只写属性 计算属性: 可以在getter中执行计算逻辑 不需要直接对应一个字段 验证逻辑: 可以…...

C# winform教程(二)----checkbox
一、作用 提供一个用户选择或者不选的状态,这是一个可以多选的控件。 二、属性 其实功能大差不差,除了特殊的几个外,与button基本相同,所有说几个独有的 checkbox属性 名称内容含义appearance控件外观可以变成按钮形状checkali…...
嵌入式面试常问问题
以下内容面向嵌入式/系统方向的初学者与面试备考者,全面梳理了以下几大板块,并在每个板块末尾列出常见的面试问答思路,帮助你既能夯实基础,又能应对面试挑战。 一、TCP/IP 协议 1.1 TCP/IP 五层模型概述 链路层(Link Layer) 包括网卡驱动、以太网、Wi‑Fi、PPP 等。负责…...