maxwell同步mysql到kafka(一个服务器启动多个)
创建mysql同步用户
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
开启mysql binlog
a.修改 /etc/my.cnf 配置
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 设置Binary Log记录方式为Row
server_id=1 # 记住id 后续开发会使用
# 指定binlog日志文件的名字为mysql-bin,以及其存储路径
# 如果没有对log-bin指定log文件,默认在 /var/lib/mysql目录下以mysqld-bin.00000X等作为名称。
# 而 mysqld-bin.index则记录了所有的log的文件名称
# 使用时则使用mysqlbinlog /var/lib/mysql|grep "*****"等来追踪database的操作。
log-bin=/var/lib/mysql/mysql-bin
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
b.重启mysql
service mysqld restart
c.查看开启状态
输入 show variables like ‘log_bin’; 查看binlog开启状态。如下图所示。
输入 show variables like ‘binlog_format’; 查看Binary Log记录方式。如下图所示。
mysql> show variables like 'log_%';
+---------------------------------+-------------+
| Variable_name | Value |
+---------------------------------+-------------+
| log_bin | ON |
| log_bin_trust_function_creators | OFF |
| log_error | .\mysql.err |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_queries | ON |
| log_warnings | 1 |
+---------------------------------+-------------+
没有开启log_bin的值是OFF,开启之后是ON
mysql> SHOW VARIABLES LIKE 'character%';
+--------------------------+----------------------------+
| Variable_name | Value |
+--------------------------+----------------------------+
| character_set_client | utf8 |
| character_set_connection | utf8 |
| character_set_database | latin1 |
| character_set_filesystem | binary |
| character_set_results | utf8 |
| character_set_server | latin1 |
| character_set_system | utf8 |
| character_sets_dir | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+
安装maxwell
下载
从 v1.30.0 开始,Maxwell 不再支持 JDK1.8
使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
安装
解压即可 tar -zxvf maxwell.tar.gz
配置
vim config_1.properties
server_id=1
client_id=city_ct_63 #用于启动多个maxwell
replica_server_id=2 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.XX
producer=kafka
kafka.bootstrap.servers=192.168.0.XX:9092
kafka_topic=city_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test.mj_qyxx,include:test.mj_sbxx
vim config_2.properties
server_id=1
client_id=province_ct_63 #用于启动多个maxwell
replica_server_id=1 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.1x
producer=kafka
kafka.bootstrap.servers=192.168.0.xx:9092
kafka_topic=province_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test2.xxx
启动
bin/maxwell --config city_config.properties --daemon
bin/maxwell --config province_config.properties --daemon
验证启动进程
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l
测试数据库全量同步
maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties
其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。
maxwell -> kafka:
{"database": "finance_result","table": "industry","type": "bootstrap-start","ts": 1694748250,"data": {}
}{"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 1,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "工程建设","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 2,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "轻工","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 3,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 2,"industry_name": "土木","superior_industry_id": 1}
}
......{"database": "finance_result","table": "industry","type": "bootstrap-complete","ts": 1694748250,"data": {}
}
测试数据库增量同步
参数说明
输出JSON字符串的格式
● data 最新的数据,修改后的数据
● old 旧数据,修改前的数据
● type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
● xid 事务id
● commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
● server_id
● thread_id
● 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
● datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
● maxwell支持多种编码,但仅输出utf8编码
● maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理
创建mysql同步用户
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
开启mysql binlog
a.修改 /etc/my.cnf 配置
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 设置Binary Log记录方式为Row
server_id=1 # 记住id 后续开发会使用
# 指定binlog日志文件的名字为mysql-bin,以及其存储路径
# 如果没有对log-bin指定log文件,默认在 /var/lib/mysql目录下以mysqld-bin.00000X等作为名称。
# 而 mysqld-bin.index则记录了所有的log的文件名称
# 使用时则使用mysqlbinlog /var/lib/mysql|grep "*****"等来追踪database的操作。
log-bin=/var/lib/mysql/mysql-bin
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
b.重启mysql
service mysqld restart
c.查看开启状态
输入 show variables like ‘log_bin’; 查看binlog开启状态。如下图所示。
输入 show variables like ‘binlog_format’; 查看Binary Log记录方式。如下图所示。
mysql> show variables like 'log_%';
+---------------------------------+-------------+
| Variable_name | Value |
+---------------------------------+-------------+
| log_bin | ON |
| log_bin_trust_function_creators | OFF |
| log_error | .\mysql.err |
| log_queries_not_using_indexes | OFF |
| log_slave_updates | OFF |
| log_slow_queries | ON |
| log_warnings | 1 |
+---------------------------------+-------------+
没有开启log_bin的值是OFF,开启之后是ON
mysql> SHOW VARIABLES LIKE 'character%';
+--------------------------+----------------------------+
| Variable_name | Value |
+--------------------------+----------------------------+
| character_set_client | utf8 |
| character_set_connection | utf8 |
| character_set_database | latin1 |
| character_set_filesystem | binary |
| character_set_results | utf8 |
| character_set_server | latin1 |
| character_set_system | utf8 |
| character_sets_dir | /usr/share/mysql/charsets/ |
+--------------------------+----------------------------+
安装maxwell
下载
从 v1.30.0 开始,Maxwell 不再支持 JDK1.8
使用最后一个支持 JDK1.8 版本的 Maxwell v1.29.2 进行部署
wget https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
安装
解压即可 tar -zxvf maxwell.tar.gz
配置
vim config_1.properties
server_id=1
client_id=city_ct_63 #用于启动多个maxwell
replica_server_id=2 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.XX
producer=kafka
kafka.bootstrap.servers=192.168.0.XX:9092
kafka_topic=city_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test.mj_qyxx,include:test.mj_sbxx
vim config_2.properties
server_id=1
client_id=province_ct_63 #用于启动多个maxwell
replica_server_id=1 #用于启动多个maxwell
user=maxwell
password=123456
host=192.168.0.1x
producer=kafka
kafka.bootstrap.servers=192.168.0.xx:9092
kafka_topic=province_mysql_kafka_cdc
jdbc_options=serverTimezone=UTC
filter=exclude:*.*,include:test2.xxx
启动
bin/maxwell --config city_config.properties --daemon
bin/maxwell --config province_config.properties --daemon
验证启动进程
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l
测试数据库全量同步
maxwell-bootstrap --database finance_result --table industry --config $MAXWELL_HOME/config.properties
其中发送过来的数据第一行及最后一行数据是标识 Maxwell 历史数据同步的,不携带任何数据。
maxwell -> kafka:
{"database": "finance_result","table": "industry","type": "bootstrap-start","ts": 1694748250,"data": {}
}{"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 1,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "工程建设","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 2,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 1,"industry_name": "轻工","superior_industry_id": null}
} {"database": "finance_result","table": "industry","type": "bootstrap-insert","ts": 1694748250,"data": {"id": 3,"create_time": "2022-08-19 00:00:00.000000","update_time": "2022-08-19 00:00:00.000000","industry_level": 2,"industry_name": "土木","superior_industry_id": 1}
}
......{"database": "finance_result","table": "industry","type": "bootstrap-complete","ts": 1694748250,"data": {}
}
测试数据库增量同步
参数说明
输出JSON字符串的格式
● data 最新的数据,修改后的数据
● old 旧数据,修改前的数据
● type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
● xid 事务id
● commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
● server_id
● thread_id
● 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
● datetime列会输出为"YYYY-MM-DD hh:mm:ss",如果遇到"0000-00-00 00:00:00"会原样输出
● maxwell支持多种编码,但仅输出utf8编码
● maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理
相关文章:
maxwell同步mysql到kafka(一个服务器启动多个)
创建mysql同步用户 CREATE USER maxwell% IDENTIFIED BY 123456; GRANT ALL ON maxwell.* TO maxwell%; GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to maxwell%; 开启mysql binlog a.修改 /etc/my.cnf 配置 log-binmysql-bin # 开启binlog binlog-forma…...

实用软件分享---简单菜谱 0.3版本 几千种美食(安卓)
专栏介绍:本专栏主要分享一些实用的软件(Po Jie版); 声明1:软件不保证时效性;只能保证在写本文时,该软件是可用的;不保证后续时间该软件能一直正常运行;不保证没有bug;如果软件不可用了,我知道后会第一时间在题目上注明(已失效)。介意者请勿订阅。 声明2:本专栏的…...
网络学习(14)|RESTful API设计:构建优雅的Web服务
文章目录 设计原则最佳实践命名与URI设计状态码与响应格式HTTP状态码详解响应格式选择 在当今的互联网世界中,RESTful API已成为构建可扩展、可维护和高性能Web服务的标准。本文将深入探讨RESTful API的设计原则、资源命名与URI设计的最佳实践,以及请求与…...
【开源】APIJSON 框架
简述 APIJSON是一个关于API和JSON的综合技术或框架,一种专为API设计的JSON网络传输协议,以及基于这套协议实现的ORM库。 1. 定义与特点: APIJSON是一种基于接口的JSON传输结构协议,它允许客户端定义任何JSON结构来向服务端发起…...

R语言探索与分析18-基于时间序列的汇率预测
一、研究背景与意义 汇率是指两个国家之间的货币兑换比率,而且在国家与国家的经济交流有着举足轻重的作用。随着经济全球化的不断深入,在整个全球经济体中,汇率还是一个评估国家与国家之间的经济状况和发展水平的一个风向标。汇率的变动会对…...

30岁迷茫?AI赛道,人生新起点
前言 30岁,对于许多人来说,是一个人生的分水岭。在这个年纪,有些人可能已经在某个领域取得了不小的成就,而有些人则可能开始对未来的职业方向感到迷茫。如果你正处于这个阶段,那么你可能会问自己:30岁转行…...

开门预警系统技术规范(简化版)
开门预警系统技术规范(简化版) 1 系统概述2 预警区域3 预警目标4 功能需求5 功能条件6 显示需求7 指标需求1 系统概述 开门预警系统(DOW),在自车停止开门过程中,安装在车辆的传感器(如安装在车辆后保险杆两个角雷达)检测从自车后方接近的目标车(汽车、摩托车等)的相对…...

Django与MySQL:配置数据库的详细步骤
文章目录 Django-MySQL 配置配置完执行数据迁移,如果报错: Error loading MySQLdb module, Django-MySQL 配置 # settings.pyDATABASES {# 默认配置sqlite3数据库# default: {# ENGINE: django.db.backends.sqlite3,# NAME: BASE_DIR / db.sqli…...
GPT-4o short description
GPT-4o,作为OpenAI最新推出的人工智能模型,无疑在人工智能领域掀起了新的波澜。 一、版本间的对比分析 与前一版本GPT-4相比,GPT-4o在多个方面进行了显著的改进和优化。首先,在参数规模上,GPT-4o达到了2000亿个参数&…...
MATLAB 矩阵
创建矩阵直接输入:使用 zeros, ones, eye 函数:使用 rand, randi 函数:使用 diag 函数: 矩阵操作矩阵加法和减法:矩阵乘法:矩阵转置:矩阵求逆:矩阵分解:矩阵大小…...
LED灯的功率以及好的品牌推荐
LED灯的功率选择主要根据使用场景、照明需求以及灯具类型来决定。常见的LED灯功率范围在0.5W到100W之间,不同的功率范围适用于不同的场景。 对于小型照明设备,如小夜灯或手电筒,通常选择0.5W到3W的LED灯,足以满足基本的照明需求。…...

Linux “ 软件管理 “
软件管理 widows 安装 方法一: 双击exe安装包,就可以安装。 用exe安装的软件会破记录到注册表中。 注册会记录安装位置,软件名称。 方法二: 用绿色方式进行安装。 不用写到注册表中,因此无法在开始菜单里面查看和卸…...

【uni-app】申请高德地图key,封装map.js,实现H5、iOS、Android通过getlocation获取地图定位信息
文章目录 map组件基础使用封装map.js,实现定位1、使用第三方地图:高德,申请对应平台key1、申请H5 key2、申请微信小程序 key3、申请android key查看证书详情,可以看到SHA1查看/设置Android包名 4、申请ios key 2、封装map1、lib/m…...
使用rufus做Kali Linux时持久分区大小如何设置
持久分区大小是什么意思: 持久分区大小指的是在U盘安装引导Kali Linux时,为保存Kali修改后的设置(如中文界面显示等)而预留的空间大小。这个空间相当于电脑中的D盘,用于保存修改后的设置。 而剩下的空间则用于安装Kali…...

Java高阶数据结构-----并查集(详解)
目录 🧐一.并查集的基本概念&实例: 🤪二.并查集代码: 😂三:并查集的一些习题: A.省份数量 B.等式方程的可满足性 🧐一.并查集的基本概念&实例: 并查集概念&…...

GitLab教程(三):多人合作场景下如何pull代码和处理冲突
文章目录 1.拉取别人同步的代码到本地的流程2.push冲突发生场景情景模拟简单的解决方法 在这一章中,为了模拟多人合作的场景,我需要一个人分饰两角。 执行git clone xx远端仓库地址 xx文件夹命令,在clone代码时指定本地仓库的文件夹名&#…...
模版偏特化之std::enable_if
1 SFINAE。 2 条件特化。可用作额外的函数参数(不可应用于运算符重载)、返回类型(不可应用于构造函数与析构函数),或类模板或函数模板形参。 函数参数: #include <iostream> #include <type_tra…...

好用的Web数据库管理工具推荐(ChatGPT的推荐)
在现代数据管理和开发中,Web数据库管理工具变得越来越重要。这些工具不仅提供了直观的用户界面,还支持跨平台操作,方便用户在任何地方进行数据库管理。 目录 1. SQLynx 2. phpMyAdmin 3. Adminer 4. DBeaver 5 结论 以下是几款推荐的Web…...

encoding Token和embedding 傻傻分不清楚?
encoding 编码 “encoding” 是一个在计算机科学和人工智能领域广泛使用的术语,它可以指代多种不同的过程和方法。核心就是编码:用某些数字来表示特定的信息。当然你或许会说字符集(Unicode)更理解这种概念,编码更强调这种动态的过程。而字符…...

一个公用的数据状态修改组件
灵感来自于一项重复的工作,下图中,这类禁用启用、审核通过不通过、设计成是什么状态否什么状态的场景很多。每一个都需要单独提供接口。重复工作还蛮大的。于是,基于该组件类捕获组件跳转写了这款通用接口。省时省力。 代码如下:…...

华为云AI开发平台ModelArts
华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...

C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:
在 HarmonyOS 应用开发中,手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力,既支持点击、长按、拖拽等基础单一手势的精细控制,也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档,…...

Linux-07 ubuntu 的 chrome 启动不了
文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了,报错如下四、启动不了,解决如下 总结 问题原因 在应用中可以看到chrome,但是打不开(说明:原来的ubuntu系统出问题了,这个是备用的硬盘&a…...

让AI看见世界:MCP协议与服务器的工作原理
让AI看见世界:MCP协议与服务器的工作原理 MCP(Model Context Protocol)是一种创新的通信协议,旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天,MCP正成为连接AI与现实世界的重要桥梁。…...

九天毕昇深度学习平台 | 如何安装库?
pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple --user 举个例子: 报错 ModuleNotFoundError: No module named torch 那么我需要安装 torch pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --user pip install 库名&#x…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式
今天是关于AI如何在教学中增强学生的学习体验,我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育,这并非炒作,而是已经发生的巨大变革。教育机构和教育者不能忽视它,试图简单地禁止学生使…...
智能职业发展系统:AI驱动的职业规划平台技术解析
智能职业发展系统:AI驱动的职业规划平台技术解析 引言:数字时代的职业革命 在当今瞬息万变的就业市场中,传统的职业规划方法已无法满足个人和企业的需求。据统计,全球每年有超过2亿人面临职业转型困境,而企业也因此遭…...

uni-app学习笔记三十五--扩展组件的安装和使用
由于内置组件不能满足日常开发需要,uniapp官方也提供了众多的扩展组件供我们使用。由于不是内置组件,需要安装才能使用。 一、安装扩展插件 安装方法: 1.访问uniapp官方文档组件部分:组件使用的入门教程 | uni-app官网 点击左侧…...

车载诊断架构 --- ZEVonUDS(J1979-3)简介第一篇
我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 做到欲望极简,了解自己的真实欲望,不受外在潮流的影响,不盲从,不跟风。把自己的精力全部用在自己。一是去掉多余,凡事找规律,基础是诚信;二是…...