当前位置: 首页 > news >正文

maxwell同步mysql到kafka(一个服务器启动多个)

创建mysql同步用户

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 

开启mysql binlog

a.修改 /etc/my.cnf 配置

log-bin=mysql-bin  # 开启binlog
binlog-format=ROW  # 设置Binary Log记录方式为Row
server_id=1 # 记住id 后续开发会使用
# 指定binlog日志文件的名字为mysql-bin,以及其存储路径
# 如果没有对log-bin指定log文件,默认在 /var/lib/mysql目录下以mysqld-bin.00000X等作为名称。
# 而 mysqld-bin.index则记录了所有的log的文件名称
# 使用时则使用mysqlbinlog /var/lib/mysql|grep "*****"等来追踪database的操作。
log-bin=/var/lib/mysql/mysql-bin   
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock

b.重启mysql
service mysqld restart
c.查看开启状态
输入 show variables like ‘log_bin’; 查看binlog开启状态。如下图所示。
输入 show variables like ‘binlog_format’; 查看Binary Log记录方式。如下图所示。

mysql> show variables like 'log_%';
+---------------------------------+-------------+
| Variable_name | Value |
+---------------------------------+-------------+
| log_bin | ON |
| log_bin_trust_function_creators | OFF |
| log_error | .\mysql.err |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_queries | ON |
| log_warnings | 1 |
+---------------------------------+-------------+
没有开启log_bin的值是OFF,开启之后是ON
mysql> SHOW VARIABLES LIKE 'character%';
+--------------------------+----------------------------+
| Variable_name            | Value                      |
+--------------------------+----------------------------+
| character_set_client     | utf8                       |
| character_set_connection | utf8                       |
| character_set_database   | latin1                     |
| character_set_filesystem | binary                     |
| character_set_results    | utf8                       |
| character_set_server     | latin1                     |
| character_set_system     | utf8                       |
| character_sets_dir       | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+

安装maxwell

下载

从 v1.30.0 开始,Maxwell 不再支持 JDK1.8
使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz

安装

解压即可 tar -zxvf maxwell.tar.gz

配置

vim config_1.properties

server_id=1
client_id=city_ct_63 #用于启动多个maxwell
replica_server_id=2 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.XX
producer=kafka
kafka.bootstrap.servers=192.168.0.XX:9092
kafka_topic=city_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test.mj_qyxx,include:test.mj_sbxx

vim config_2.properties

server_id=1
client_id=province_ct_63 #用于启动多个maxwell
replica_server_id=1 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.1x
producer=kafka
kafka.bootstrap.servers=192.168.0.xx:9092
kafka_topic=province_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test2.xxx

启动

bin/maxwell --config city_config.properties --daemon
bin/maxwell --config province_config.properties --daemon

验证启动进程

ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l

测试数据库全量同步

maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties

其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。

maxwell -> kafka: 
{"database": "finance_result","table": "industry","type": "bootstrap-start","ts": 1694748250,"data": {}
}{"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 1,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "工程建设","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 2,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "轻工","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 3,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 2,"industry_name": "土木","superior_industry_id": 1}
}
......{"database": "finance_result","table": "industry","type": "bootstrap-complete","ts": 1694748250,"data": {}
}

测试数据库增量同步

参数说明

输出JSON字符串的格式
● data 最新的数据,修改后的数据
● old 旧数据,修改前的数据
● type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
● xid 事务id
● commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
● server_id
● thread_id
● 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
● datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
● maxwell支持多种编码,但仅输出utf8编码
● maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理

创建mysql同步用户

CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; 

开启mysql binlog

a.修改 /etc/my.cnf 配置

log-bin=mysql-bin  # 开启binlog
binlog-format=ROW  # 设置Binary Log记录方式为Row
server_id=1 # 记住id 后续开发会使用
# 指定binlog日志文件的名字为mysql-bin,以及其存储路径
# 如果没有对log-bin指定log文件,默认在 /var/lib/mysql目录下以mysqld-bin.00000X等作为名称。
# 而 mysqld-bin.index则记录了所有的log的文件名称
# 使用时则使用mysqlbinlog /var/lib/mysql|grep "*****"等来追踪database的操作。
log-bin=/var/lib/mysql/mysql-bin   
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock

b.重启mysql
service mysqld restart
c.查看开启状态
输入 show variables like ‘log_bin’; 查看binlog开启状态。如下图所示。
输入 show variables like ‘binlog_format’; 查看Binary Log记录方式。如下图所示。

mysql> show variables like 'log_%';
+---------------------------------+-------------+
| Variable_name | Value |
+---------------------------------+-------------+
| log_bin | ON |
| log_bin_trust_function_creators | OFF |
| log_error | .\mysql.err |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_queries | ON |
| log_warnings | 1 |
+---------------------------------+-------------+
没有开启log_bin的值是OFF,开启之后是ON
mysql> SHOW VARIABLES LIKE 'character%';
+--------------------------+----------------------------+
| Variable_name            | Value                      |
+--------------------------+----------------------------+
| character_set_client     | utf8                       |
| character_set_connection | utf8                       |
| character_set_database   | latin1                     |
| character_set_filesystem | binary                     |
| character_set_results    | utf8                       |
| character_set_server     | latin1                     |
| character_set_system     | utf8                       |
| character_sets_dir       | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+

安装maxwell

下载

从 v1.30.0 开始,Maxwell 不再支持 JDK1.8
使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz

安装

解压即可 tar -zxvf maxwell.tar.gz

配置

vim config_1.properties

server_id=1
client_id=city_ct_63 #用于启动多个maxwell
replica_server_id=2 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.XX
producer=kafka
kafka.bootstrap.servers=192.168.0.XX:9092
kafka_topic=city_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test.mj_qyxx,include:test.mj_sbxx

vim config_2.properties

server_id=1
client_id=province_ct_63 #用于启动多个maxwell
replica_server_id=1 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.1x
producer=kafka
kafka.bootstrap.servers=192.168.0.xx:9092
kafka_topic=province_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test2.xxx

启动

bin/maxwell --config city_config.properties --daemon
bin/maxwell --config province_config.properties --daemon

验证启动进程

ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l

测试数据库全量同步

maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties

其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。

maxwell -> kafka: 
{"database": "finance_result","table": "industry","type": "bootstrap-start","ts": 1694748250,"data": {}
}{"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 1,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "工程建设","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 2,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "轻工","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 3,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 2,"industry_name": "土木","superior_industry_id": 1}
}
......{"database": "finance_result","table": "industry","type": "bootstrap-complete","ts": 1694748250,"data": {}
}

测试数据库增量同步

参数说明

输出JSON字符串的格式
● data 最新的数据,修改后的数据
● old 旧数据,修改前的数据
● type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
● xid 事务id
● commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
● server_id
● thread_id
● 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
● datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
● maxwell支持多种编码,但仅输出utf8编码
● maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理

相关文章:

maxwell同步mysql到kafka(一个服务器启动多个)

创建mysql同步用户 CREATE USER maxwell% IDENTIFIED BY 123456; GRANT ALL ON maxwell.* TO maxwell%; GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to maxwell%; 开启mysql binlog a.修改 /etc/my.cnf 配置 log-binmysql-bin # 开启binlog binlog-forma…...

实用软件分享---简单菜谱 0.3版本 几千种美食(安卓)

专栏介绍:本专栏主要分享一些实用的软件(Po Jie版); 声明1:软件不保证时效性;只能保证在写本文时,该软件是可用的;不保证后续时间该软件能一直正常运行;不保证没有bug;如果软件不可用了,我知道后会第一时间在题目上注明(已失效)。介意者请勿订阅。 声明2:本专栏的…...

网络学习(14)|RESTful API设计:构建优雅的Web服务

文章目录 设计原则最佳实践命名与URI设计状态码与响应格式HTTP状态码详解响应格式选择 在当今的互联网世界中,RESTful API已成为构建可扩展、可维护和高性能Web服务的标准。本文将深入探讨RESTful API的设计原则、资源命名与URI设计的最佳实践,以及请求与…...

【开源】APIJSON 框架

简述 APIJSON是一个关于API和JSON的综合技术或框架,一种专为API设计的JSON网络传输协议,以及基于这套协议实现的ORM库。 1. 定义与特点: APIJSON是一种基于接口的JSON传输结构协议,它允许客户端定义任何JSON结构来向服务端发起…...

R语言探索与分析18-基于时间序列的汇率预测

一、研究背景与意义 汇率是指两个国家之间的货币兑换比率,而且在国家与国家的经济交流有着举足轻重的作用。随着经济全球化的不断深入,在整个全球经济体中,汇率还是一个评估国家与国家之间的经济状况和发展水平的一个风向标。汇率的变动会对…...

30岁迷茫?AI赛道,人生新起点

前言 30岁,对于许多人来说,是一个人生的分水岭。在这个年纪,有些人可能已经在某个领域取得了不小的成就,而有些人则可能开始对未来的职业方向感到迷茫。如果你正处于这个阶段,那么你可能会问自己:30岁转行…...

开门预警系统技术规范(简化版)

开门预警系统技术规范(简化版) 1 系统概述2 预警区域3 预警目标4 功能需求5 功能条件6 显示需求7 指标需求1 系统概述 开门预警系统(DOW),在自车停止开门过程中,安装在车辆的传感器(如安装在车辆后保险杆两个角雷达)检测从自车后方接近的目标车(汽车、摩托车等)的相对…...

Django与MySQL:配置数据库的详细步骤

文章目录 Django-MySQL 配置配置完执行数据迁移,如果报错: Error loading MySQLdb module, Django-MySQL 配置 # settings.pyDATABASES {# 默认配置sqlite3数据库# default: {# ENGINE: django.db.backends.sqlite3,# NAME: BASE_DIR / db.sqli…...

GPT-4o short description

GPT-4o,作为OpenAI最新推出的人工智能模型,无疑在人工智能领域掀起了新的波澜。 一、版本间的对比分析 与前一版本GPT-4相比,GPT-4o在多个方面进行了显著的改进和优化。首先,在参数规模上,GPT-4o达到了2000亿个参数&…...

MATLAB 矩阵

创建矩阵直接输入:使用 zeros, ones, eye 函数:使用 rand, randi 函数:使用 diag 函数: 矩阵操作矩阵加法和减法:矩阵乘法:矩阵转置:矩阵求逆:矩阵分解:矩阵大小&#xf…...

LED灯的功率以及好的品牌推荐

LED灯的功率选择主要根据使用场景、照明需求以及灯具类型来决定。常见的LED灯功率范围在0.5W到100W之间,不同的功率范围适用于不同的场景。 对于小型照明设备,如小夜灯或手电筒,通常选择0.5W到3W的LED灯,足以满足基本的照明需求。…...

Linux “ 软件管理 “

软件管理 widows 安装 方法一: 双击exe安装包,就可以安装。 用exe安装的软件会破记录到注册表中。 注册会记录安装位置,软件名称。 方法二: 用绿色方式进行安装。 不用写到注册表中,因此无法在开始菜单里面查看和卸…...

【uni-app】申请高德地图key,封装map.js,实现H5、iOS、Android通过getlocation获取地图定位信息

文章目录 map组件基础使用封装map.js,实现定位1、使用第三方地图:高德,申请对应平台key1、申请H5 key2、申请微信小程序 key3、申请android key查看证书详情,可以看到SHA1查看/设置Android包名 4、申请ios key 2、封装map1、lib/m…...

使用rufus做Kali Linux时持久分区大小如何设置

持久分区大小是什么意思: 持久分区大小指的是在U盘安装引导Kali Linux时,为保存Kali修改后的设置(如中文界面显示等)而预留的空间大小。这个空间相当于电脑中的D盘,用于保存修改后的设置。 而剩下的空间则用于安装Kali…...

Java高阶数据结构-----并查集(详解)

目录 🧐一.并查集的基本概念&实例: 🤪二.并查集代码: 😂三:并查集的一些习题: A.省份数量 B.等式方程的可满足性 🧐一.并查集的基本概念&实例: 并查集概念&…...

GitLab教程(三):多人合作场景下如何pull代码和处理冲突

文章目录 1.拉取别人同步的代码到本地的流程2.push冲突发生场景情景模拟简单的解决方法 在这一章中,为了模拟多人合作的场景,我需要一个人分饰两角。 执行git clone xx远端仓库地址 xx文件夹命令,在clone代码时指定本地仓库的文件夹名&#…...

模版偏特化之std::enable_if

1 SFINAE。 2 条件特化。可用作额外的函数参数&#xff08;不可应用于运算符重载&#xff09;、返回类型&#xff08;不可应用于构造函数与析构函数&#xff09;&#xff0c;或类模板或函数模板形参。 函数参数&#xff1a; #include <iostream> #include <type_tra…...

好用的Web数据库管理工具推荐(ChatGPT的推荐)

在现代数据管理和开发中&#xff0c;Web数据库管理工具变得越来越重要。这些工具不仅提供了直观的用户界面&#xff0c;还支持跨平台操作&#xff0c;方便用户在任何地方进行数据库管理。 目录 1. SQLynx 2. phpMyAdmin 3. Adminer 4. DBeaver 5 结论 以下是几款推荐的Web…...

encoding Token和embedding 傻傻分不清楚?

encoding 编码 “encoding” 是一个在计算机科学和人工智能领域广泛使用的术语&#xff0c;它可以指代多种不同的过程和方法。核心就是编码&#xff1a;用某些数字来表示特定的信息。当然你或许会说字符集(Unicode)更理解这种概念&#xff0c;编码更强调这种动态的过程。而字符…...

一个公用的数据状态修改组件

灵感来自于一项重复的工作&#xff0c;下图中&#xff0c;这类禁用启用、审核通过不通过、设计成是什么状态否什么状态的场景很多。每一个都需要单独提供接口。重复工作还蛮大的。于是&#xff0c;基于该组件类捕获组件跳转写了这款通用接口。省时省力。 代码如下&#xff1a;…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例

一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

剑指offer20_链表中环的入口节点

链表中环的入口节点 给定一个链表&#xff0c;若其中包含环&#xff0c;则输出环的入口节点。 若其中不包含环&#xff0c;则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)

升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点&#xff0c;但无自动故障转移能力&#xff0c;Master宕机后需人工切换&#xff0c;期间消息可能无法读取。Slave仅存储数据&#xff0c;无法主动升级为Master响应请求&#xff…...

企业如何增强终端安全?

在数字化转型加速的今天&#xff0c;企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机&#xff0c;到工厂里的物联网设备、智能传感器&#xff0c;这些终端构成了企业与外部世界连接的 “神经末梢”。然而&#xff0c;随着远程办公的常态化和设备接入的爆炸式…...

python报错No module named ‘tensorflow.keras‘

是由于不同版本的tensorflow下的keras所在的路径不同&#xff0c;结合所安装的tensorflow的目录结构修改from语句即可。 原语句&#xff1a; from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后&#xff1a; from tensorflow.python.keras.lay…...

基于PHP的连锁酒店管理系统

有需要请加文章底部Q哦 可远程调试 基于PHP的连锁酒店管理系统 一 介绍 连锁酒店管理系统基于原生PHP开发&#xff0c;数据库mysql&#xff0c;前端bootstrap。系统角色分为用户和管理员。 技术栈 phpmysqlbootstrapphpstudyvscode 二 功能 用户 1 注册/登录/注销 2 个人中…...

水泥厂自动化升级利器:Devicenet转Modbus rtu协议转换网关

在水泥厂的生产流程中&#xff0c;工业自动化网关起着至关重要的作用&#xff0c;尤其是JH-DVN-RTU疆鸿智能Devicenet转Modbus rtu协议转换网关&#xff0c;为水泥厂实现高效生产与精准控制提供了有力支持。 水泥厂设备众多&#xff0c;其中不少设备采用Devicenet协议。Devicen…...

拟合问题处理

在机器学习中&#xff0c;核心任务通常围绕模型训练和性能提升展开&#xff0c;但你提到的 “优化训练数据解决过拟合” 和 “提升泛化性能解决欠拟合” 需要结合更准确的概念进行梳理。以下是对机器学习核心任务的系统复习和修正&#xff1a; 一、机器学习的核心任务框架 机…...