当前位置: 首页 > news >正文

Docker 中使用 PHP 通过 Canal 同步 Mysql 数据到 ElasticSearch

一、Mysql 的安装和配置

1.使用 docker 安装 mysql,并且映射端口和 root 账号的密码

# 获取镜像
docker pull mysql:8.0.40-debian# 查看镜像是否下载成功
docker images# 运行msyql镜像
docker run -d -p 3388:3306 --name super-mysql -e MYSQL_ROOT_PASSWORD=123456 mysql:8.0.40-debian

2.连接 mysql,检查是否正确安装

        使用 Navicat 连接

使用 linux 命令行连接,172.21.121.208 是我本地映射的ip地址,这里换成对应 ip 即可

3.修改 mysql 的配置文件

# 进入刚刚安装的 mysql 容器
docker exec -it super-mysql /bin/bash# 查找 mysql 的配置文件 my.cnf
find / -name my.cnf 2>/dev/null#显示:
# /var/lib/dpkg/alternatives/my.cnf
# /etc/alternatives/my.cnf
# /etc/mysql/my.cnf# 修改配置文件 my.cnf 
vim /etc/mysql/my.cnf# 开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复# 如果没有vim,则需 先安装vim,执行如下命令
# 更新源
apt update# 安装vim
apt install vim

4.授权 canal 链接 mysql 账号具有作为 mysql slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

运行截图如下

使用刚刚新建的账号: canal 密码: canal 连接 mysql,成功了才往下配置

二、Canal 的安装和配置

1.使用 docker 安装 canal

# 获取镜像
docker pull canal/canal-server:latest# 查看镜像是否下载成功
docker images# 运行msyql镜像
docker run --name super-canal -p 3399:11111 -d canal/canal-server:latest

2.修改 canal 配置,并且重启 canal

# 进入刚刚安装的 canal 容器
docker exec -it super-canal /bin/bash# 查找 canal 的配置文件 instance.properties
find / -name 'instance.properties' 2>/dev/null 
# 显示:
# /home/admin/canal-server/conf/example/instance.properties# 修改配置文件 instance.properties
vi /home/admin/canal-server/conf/example/instance.propertiescanal.instance.master.address=连接mysql的ip:port
canal.instance.dbUsername=连接mysql的账号
canal.instance.dbPassword=连接mysql的密码# 重启 canal 服务
find / -name 'stop.sh' 2>/dev/null # 查找停止 canal 命令
bash /home/admin/canal-server/bin/stop.sh # 停止 canalfind / -name 'startup.sh' 2>/dev/null # 查找重启 canal 命令
bash /home/admin/canal-server/bin/startup.sh # 重启 canal

主要修改连接 msyql 的 ip 和 port,使用的账号和密码,修改配置如下:

三、使用 PHP 测试 Canal 是否监听到了 Mysql 的变化

1.初始化项目

# 创建项目文件夹 canal-elasticsearch,并且进入到项目文件夹# composer 初始化项目
composer init 
# 一直按回车键 Enter# 安装 cannal 依赖
composer require xingwenge/canal_php

2.在 src 目录下新建文件 index.php ,写入一下代码

<?php
require __DIR__.'/../vendor/autoload.php';
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\Fmt;try {$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);$client->connect("172.21.121.208", 3399);$client->checkValid();$client->subscribe("1001", "example", ".*\\..*");# $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤while (true) {$message = $client->get(100);if ($entries = $message->getEntries()) {foreach ($entries as $entry) {Fmt::println($entry);}}sleep(1);}$client->disConnect();
} catch (\Exception $e) {echo $e->getMessage(), PHP_EOL;
}

3.命令行中启动脚本 php ./src/index.php

4.在 mysql 中新建表或者新增数据,就会在命令行中打印出来

# 创建数据表
CREATE TABLE `user` (`id` int unsigned NOT NULL AUTO_INCREMENT,`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',`age` int unsigned NOT NULL DEFAULT '0',`created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=18 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

注意:如果新增数据表或者其它情况,导致 canal 突然连接不上 mysql 了,建议停止 canal,且删除当前的 canal 容器,重新使用 docker 安装 canal,这样解决起来比较迅速(坏笑)

四、ElasticSearch 的安装和配置

1.使用 docker 安装 elasticsearch

# 获取镜像
docker pull registry.cn-hangzhou.aliyuncs.com/xka/es:7.11.2-210328-1# 查看镜像是否下载成功
docker images# 创建数据文件夹
mkdir /mnt/d/Work/Code/docker/elasticsearch/data# 运行 elasticsearch 镜像
docker run --name super-elasticsearch -d \-p 3320:9200 -p 3330:9300 \-e "ES_JAVA_OPTS=-Xms4g -Xmx16g" \-e "discovery.type=single-node" \-v /mnt/d/Work/Code/docker/elasticsearch/data:/usr/share/elasticsearch/data \registry.cn-hangzhou.aliyuncs.com/xka/es:7.11.2-210328-1

2.测试 elasticsearch 是否安装成功, 在浏览器地址栏输入:127.0.0.1:3320,出现如下画面表示安装成功了

五、使用 PHP 通过 Canal 同步 Mysql 数据到 ElasticSearch

1.在 php 中使用 composer 安装 elasticsearch 依赖包

# 使用 composer 安装 elasticsearch 依赖包
composer require elasticsearch/elasticsearch

2.使用 php 在 elasticsearch 中创建 mapping,在 src 目录下创建文件 creatElasticSearchMapping.php

<?php
require 'vendor/autoload.php';use Elasticsearch\ClientBuilder;
use Elasticsearch\Common\Exceptions\BadRequest400Exception;// 创建 Elasticsearch 客户端
$client = ClientBuilder::create()->setHosts(['localhost:3320']) // 设置 Elasticsearch 主机和端口->build();// 索引名称
$indexName = 'canal_user_index';// 索引设置和映射
$params = ['index' => $indexName,'body'  => ['settings' => ['number_of_shards' => 1, // 分片数量'number_of_replicas' => 0 // 副本数量],'mappings' => ['properties' => ['name' => ['type' => 'text'],'age' => ['type' => 'integer'],'email' => ['type' => 'keyword'],'created_at' => ['type' => 'date', 'format' => 'yyyy-MM-dd HH:mm:ss'],'updated_at' => ['type' => 'date', 'format' => 'yyyy-MM-dd HH:mm:ss'],]]]
];try {// 创建索引$response = $client->indices()->create($params);echo "索引 '$indexName' 创建成功:\n";print_r($response);
} catch (BadRequest400Exception $e) {// 如果索引已经存在if (strpos($e->getMessage(), 'index_already_exists_exception') !== false) {echo "索引 '$indexName' 已经存在。\n";} else {echo "创建索引时发生错误: " . $e->getMessage() . "\n";}
} catch (\Exception $e) {echo "创建索引时发生错误: " . $e->getMessage() . "\n";
}

3.检查elasticsearch中的mapping是否创建成功

curl -XGET 'http://localhost:3320/canal_user_index/_mapping'

提示如图

4.在 php 中通过 canal 获取 msyql 的数据变化,且更新到 elasticsearch 中,数据的增删改代码都在下面了,在 src 目录下创建文件 canalToElasticSearch.php

<?php
require __DIR__.'/../vendor/autoload.php';
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use Com\Alibaba\Otter\Canal\Protocol\RowData;
use Elasticsearch\ClientBuilder;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;$clientES = ClientBuilder::create()->setHosts(['localhost:3320'])->setSSLVerification(false) // 禁用 SSL 验证(仅用于开发环境)->setRetries(3) // 设置重试次数->build();// 索引名称
$indexName = 'canal_user_index';try {$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);$client->connect("172.21.121.208", 3399);$client->checkValid();$client->subscribe("1001", "example", ".*\\..*");echo "script start success!";while (true) {$message = $client->get(100);if ($entries = $message->getEntries()) {foreach ($entries as $entry) {Fmt::println($entry);$rowChange = new RowChange();$rowChange->mergeFromString($entry->getStoreValue());$evenType = $rowChange->getEventType();$header = $entry->getHeader();/** @var RowData $rowData */foreach ($rowChange->getRowDatas() as $rowData) {switch ($evenType) {/** 删除数据 */case EventType::DELETE:if ($rowData->getAfterColumns()) {foreach ($rowData->getAfterColumns() as $column) {if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($id) && $clientES->exists(['index' => $indexName, 'id' => $id])) {$response = $clientES->delete(['index' => $indexName, 'type' => '_doc', 'id' => $id]);}}break;/** 新增数据 */case EventType::INSERT:$insertData = [];if ($rowData->getAfterColumns()) {foreach ($rowData->getAfterColumns() as $column) {$insertData = array_merge($insertData, [$column->getName() => $column->getValue()]);if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($insertData)) {$params = ['index' => $indexName, 'body' => $insertData];if (!empty($id)) $params['id'] = $id;$response = $clientES->index($params);}}break;default:/** 更新数据 */if ($rowData->getAfterColumns()) {$updateData = [];foreach ($rowData->getAfterColumns() as $column) {$updateData = array_merge($updateData, [$column->getName() => $column->getValue()]);if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($id) && !empty($updateData)) {$params = ['index' => $indexName,'id' => $id,'body' => ['doc' => $updateData],];if ($clientES->exists(['index' => $indexName, 'id' => $id])) {$response = $clientES->update($params);} else {$updateData['id'] = $id;$params['body'] = $updateData;$response = $clientES->index($params);}}}break;}}}}sleep(1);}
} catch (\Exception $e) {echo $e->getMessage(), PHP_EOL;
}

5.执行php脚本,监听数据变化

我这里在 mysql 中添加了一些数据,都同步到 elasticsearch 里面了

msyql 中的截图:

elasticsearch 中的截图

看到这里,辛苦了。

感觉自己今天又又又变得比昨天更强了

参考文档链接:

1.Mysql 数据库 主从数据库 (主从)(主主)-CSDN博客

2.https://github.com/xingwenge/canal-php

相关文章:

Docker 中使用 PHP 通过 Canal 同步 Mysql 数据到 ElasticSearch

一、Mysql 的安装和配置 1.使用 docker 安装 mysql&#xff0c;并且映射端口和 root 账号的密码 # 获取镜像 docker pull mysql:8.0.40-debian# 查看镜像是否下载成功 docker images# 运行msyql镜像 docker run -d -p 3388:3306 --name super-mysql -e MYSQL_ROOT_PASSWORD12…...

数据结构之五:排序

void*类型的实现&#xff1a;排序&#xff08;void*类型&#xff09;-CSDN博客 一、插入排序 1、直接插入排序 思想&#xff1a;把待排序的数据逐个插入到一个已经排好序的有序序列中&#xff0c;直到所有的记录插入完为止&#xff0c;得到一个新的有序序列 。 单趟&#x…...

科研绘图系列:R语言绘制热图和散点图以及箱线图(pheatmap, scatterplot boxplot)

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍加载R包数据下载图1图2图3系统信息参考介绍 R语言绘制热图和散点图以及箱线图(pheatmap, scatterplot & boxplot) 加载R包 library(magrittr) library(dplyr) library(ve…...

基于 webRTC Vue 的局域网 文件传输工具

文件传输工具&#xff0c;匿名加密&#xff0c;只需访问网页&#xff0c;即可连接到其他设备&#xff0c;基于 webRTC 和 Vue.js coturn TURN 服务器 docker pull coturn/coturn docker run -d --networkhost \-v $(pwd)/my.conf:/etc/coturn/turnserver.conf \coturn/coturn…...

LeetCode 718. 最长重复子数组 java题解

https://leetcode.cn/problems/maximum-length-of-repeated-subarray/description/ 动态规划 class Solution {public int findLength(int[] nums1, int[] nums2) {int len1nums1.length,len2nums2.length;int[][] dpnew int[len11][len21];dp[0][0]0;//没有意义&#xff0c;…...

算法知识-15-深搜

一、概念 深度优先搜索&#xff08;Deep First Search, DFS&#xff09;是一种用于遍历或搜索树或图的算法。这种策略沿着树的深度遍历树的节点&#xff0c;尽可能深地搜索树的分支。 二、关键步骤 选择起点&#xff1a;根据题目要求&#xff0c;选择一个或多个节点作为搜索…...

区块链dapp 开发详解(VUE3.0)

1、安装metamask 插件。 2、使用封装的工具包: wagmi . 3、 wagmi 操作手册地址:connect | Wagmi 4、注意事项&#xff1a; 因为最初是react 版本&#xff0c;所以在VUE版的官方文档有很多地方在 import 用的是 wagmi,需要改为 wagmi/vue 。 连接成功后打印的内容如下&…...

Plugin [id: ‘flutter‘] was not found in any of the following sources解决方法

文章目录 错误描述解决方法修正方案&#xff1a;继续使用 apply from修正后的 build.gradle说明警告的处理进一步验证 错误描述 Plugin [id: ‘flutter’] was not found in any of the following sources: Gradle Core Plugins (not a core plugin, please see https://docs…...

专升本-高数 1

第 0 章&#xff0c;基础知识 一&#xff0c;重要公式 1、完全平方 (ab)a2abb (a-b)a-2abb 2、平方差公式 &#xff08;a-b&#xff09;(ab)a-b 3、立方差公式 a-b(a-b)(aabb) 4、 立方和公式 ab(ab)(a-abb) 二&#xff0c;基本初等函数 1&#xff0c;幂函数 一元二…...

【考前预习】3.计算机网络—数据链路层

往期推荐 【考前预习】2.计算机网络—物理层-CSDN博客 【考前预习】1.计算机网络概述-CSDN博客 浅谈云原生--微服务、CICD、Serverless、服务网格_云原生cicd-CSDN博客 子网掩码、网络地址、广播地址、子网划分及计算_子网广播地址-CSDN博客 浅学React和JSX-CSDN博客 目录 1.数…...

DockeUI 弱口令登录漏洞+未授权信息泄露

0x01 产品描述: DockerUI是一款开源的、强大的、轻量级的Docker管理工具。DockerUI覆盖了 docker cli 命令行 95% 以上的命令功能,通过可视化的界面,即使是不熟悉docker命令的用户也可以非常方便的进行Docker和Docker Swarm集群进行管理和维护。0x02 漏洞描述: DockerUI中存…...

【电子元器件】电感基础知识

本文章是笔者整理的备忘笔记。希望在帮助自己温习避免遗忘的同时&#xff0c;也能帮助其他需要参考的朋友。如有谬误&#xff0c;欢迎大家进行指正。 一、 电感的基本工作原理 1. 电感的基本工作原理如下&#xff1a; &#xff08;1&#xff09; 当线圈中有电流通过时&#…...

【SSH+X11】VsCode使用Remote-SSH在远程服务器的docker中打开Rviz

&#x1f680;今天来分享一下通过VsCode的Remote-SSH插件在远程服务器的docker中打开Rviz进行可视化的方法。 具体流程如下图所示&#xff0c;在操作开始前&#xff0c;请先重启设备&#xff0c;排除之前运行配置的影响&#xff1a; ⭐️ 我这里是使用主机连接服务器&#xff…...

Vue Web开发(五)

1. axios axios官方文档 异步库axios和mockjs模拟后端数据&#xff0c;axios是一个基于promise的HTTP库&#xff0c;使用npm i axios。在main.js中引入&#xff0c;需要绑定在Vue的prototype属性上&#xff0c;并重命名。   &#xff08;1&#xff09;main.js文件引用 imp…...

HarmonyOS:使用Grid构建网格

一、概述 网格布局是由“行”和“列”分割的单元格所组成&#xff0c;通过指定“项目”所在的单元格做出各种各样的布局。网格布局具有较强的页面均分能力&#xff0c;子组件占比控制能力&#xff0c;是一种重要自适应布局&#xff0c;其使用场景有九宫格图片展示、日历、计算器…...

开源Java快速自测工具,可以调用系统内任意一个方法

java快速测试框架&#xff0c;可以调到系统内任意一个方法&#xff0c;告别写单测和controller的困扰。 开源地址&#xff1a;https://gitee.com/missyouch/Easy-JTest 我们在开发时很多时候想要测试下自己的代码&#xff0c;特别是service层或者是更底层的代码&#xff0c;就…...

力扣刷题TOP101: 29.BM36 判断是不是平衡二叉树

目录&#xff1a; 目的 思路 复杂度 记忆秘诀 python代码 目的&#xff1a; 输入一棵节点数为 n 二叉树&#xff0c;判断该二叉树是否是平衡二叉树。 思路 什么是平衡二叉树&#xff08;AVL 树&#xff09;&#xff1f; 每个节点的左子树和右子树的高度差不能超过 1。确保…...

【在Linux世界中追寻伟大的One Piece】自旋锁

目录 1 -> 概述 2 -> 原理 3 -> 优缺点及使用场景 3.1 -> 优点 3.2 -> 缺点 3.3 -> 使用场景 4 -> 纯软件自旋锁类似的原理实现 4.1 -> 结论 5 -> 样例代码 1 -> 概述 自旋锁是一种多线程同步机制&#xff0c;用于保护共享资源避免受并…...

前端编辑器JSON HTML等,vue2-ace-editor,vue3-ace-editor

与框架无关 vue2-ace-editor有问题&#xff0c;ace拿不到&#xff08;brace&#xff09; 一些组件都是基于ace-builds或者brace包装的 不如直接用下面的&#xff0c;不如直接使用下面的 <template><div ref"editor" class"json-editor"><…...

C++ 中的运算符重载

运算符重载是C中的一种特性&#xff0c;它允许开发者为自定义类型定义或改变标准运算符的行为。通过运算符重载&#xff0c;你可以使得用户定义的类像内置类型一样使用运算符&#xff0c;比如加法、减法、赋值等。 如何在C中进行运算符重载&#xff1f; 重载运算符的语法&#…...

渗透测试工具 -- SQLmap安装教程及使用

随着网络安全问题日益严峻&#xff0c;渗透测试成为了保护信息安全的重要手段。而在渗透测试的众多工具中&#xff0c;SQLmap凭借其强大的自动化SQL注入检测和利用能力&#xff0c;成为了网络安全专家必备的利器。那么&#xff0c;你知道如何高效地使用SQLmap进行漏洞扫描吗&am…...

使用 Database Tools 实现高效数据查询的十大 IntelliJ IDEA 快捷键

得益于 IntelliJ IDEA Ultimate 的 Database Tools&#xff08;数据库工具&#xff09;中的专用 SQL 查询控制台&#xff0c;您无需离开 IDE 即可轻松修改连接到您的 Java 应用程序的任何数据库中的数据&#xff0c;以及从这些数据库中提取数据。 查询控制台具有 SQL 语句特定的…...

SpringBoot 整合 RabbitMQ 实现流量消峰

RabbitMQ 即一个消息队列&#xff0c;主要是用来实现应用程序的异步和解耦&#xff0c;同时也能起到消息缓冲&#xff0c;消息分发的作用。 消息中间件在互联网公司的使用中越来越多&#xff0c;刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache&#xff0c;当然了今天的主角还…...

大数据挖掘建模平台案例分享

大数据挖掘建模平台是由泰迪自主研发&#xff0c;面向企业级用户的大数据挖掘建模平台。平台采用可视化操作方式&#xff0c;通过丰富内置算法&#xff0c;帮助用户快速、一站式地进行数据分析及挖掘建模&#xff0c;可应用于处理海量数据、高复杂性的数据挖掘任务&#xff0c;…...

MySQL数据表的管理

1.创建表 语法&#xff1a; create table 表名( 字段名 字段里保存数据的类型【(数据的长度) 约束】, 字段名 字段里保存数据的类型【(数据的长度) 约束】, 字段名 字段里保存数据的类型【(数据的长度) 约束】 ...... ); 注意&#xff1a;数据类型和约束&#xff0c;接下来用…...

SpringBoot【十三(实战篇)】集成在线接口文档Swagger2

一、前言&#x1f525; 环境说明&#xff1a;Windows10 Idea2021.3.2 Jdk1.8 SpringBoot 2.3.1.RELEASE 二、如何生成Swagger文档 上一期我们已经能正常访问swagger在线文档&#xff0c;但是文档空空如也&#xff0c;对不对&#xff0c;接下来我就教大家怎么把相关的接口都给…...

【C++初阶】第8课—标准模板库STL(string_2)

文章目录 1. string类对象遍历操作1.1 标准库中的成员函数begin( )和end( )1.2 标准库中的成员函数rbegin( )和rend( )1.3 C11引入的4个标准库中的成员函数 2. string类对象的访问2.1 operator[ ]运算符重载访问字符串字符2.2 公有成员函数at访问字符2.3 公有成员函数back()和f…...

【arm】程序跑飞,SWD端口不可用修复(N32G435CBL7)

项目场景&#xff1a; 国民N32G43X系列&#xff0c;烧录了一个测试程序&#xff0c;在DEBUG中不知什么原因挂掉&#xff0c;然后就无法连接SWD或JLINK。 问题描述 在SWD配置中不可见芯片型号&#xff0c;无法connect&#xff0c;无法烧录。但基本判断是芯片没有损坏。怀疑是程…...

https证书生成、linux 生成https证书、nginx 配置https证书

1. 检查 Certbot 是否已安装 which certbot 2. 安装 Certbot 2.1启用 EPEL 仓库&#xff08;如果尚未启用&#xff09;&#xff1a; sudo yum install epel-release 2.2 安装 Certbot 和 Nginx 插件&#xff1a; sudo yum install certbot python3-certbot-nginx 2.3验证安…...

Halcon随机贴图生成缺陷图片脚本

halcon随机贴图生成缺陷图片&#xff0c;用于深度学习训练: read_image (Image, C:/Users/61082/Desktop/bentiiamge/omron/S06-1211/ok/ok_images/D246B_CPFNNUBA8LT0SX_AAA_S2412001793_C1216_1733895885320066.jpg) get_image_size (Image, Width, Height) gen_rectangle1 …...