Canal搭建 idea设置及采集数据到kafka
Canal GitHub:https://github.com/alibaba/canal#readme
实时采集工具canal:利用mysql主从复制的原理,slave定期读取master的binarylog对binarylog进行解析。
canal工作原理
canal模拟MySQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议
MySQL master收到dump请求,开始推送binary log给slave(即canal)
canal解析binary log对像(原始为bye流)
官网配置:https://github.com/alibaba/canal/wiki/QuickStart
1.在mysql中开启binlog日志功能
mysql上配置
linux>vi /etc/my.cnf
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=testdb //指定数据库
2.重启mysql服务
linux>systemctl restart mysqld
3.查看binlog是否生效:
linux>ls /var/lib/mysql
4.解压canal压缩包
linux>tar -zxvf canal-* -C canal
5.数据库设置
登陆mysql
mysql>GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
mysql>flush privileges;
6.配置文件
1.修改canal.properties配置文件
linux>vi conf/canal.properties
canal.instance.parser.parallelThreadSize = 1
2.修改instance.properties配置文件
linux>vi conf/example/instance.properties
canal.instance.mysql.slaveId=21
canal.instance.master.address=192.168.58.203:3306
7.启动服务并查看进程
linux>bin/startup.shlinux>jpsxxxx CanalLauncher
查看日志
linux>cat /opt/install/canal/logs/canal/canal.log
idea客户端
pom.xml
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class CanalClientDemo {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.58.203)",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {//testdb库中的所有表connector.connect();connector.subscribe("testdb.*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}
}
注意:canal只能在java8中运行,如果canal进程CanalLauncher起不来,检查本地java环境
CanalClientDemo运行提示拒绝连接,检查脚本中的连接地址是不是运行canal的主机 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(“192.168.58.203(canal))”,
xxxx");
Canal kafka github 配置官网:github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
使用canal将数据同步到kafka上
#重新解压一个canal到nodefour上进行配置
1.修改canal.properties配置文件
linux>vi canal.propertiescanal.serverMode = kafka
canal.instance.parser.parallel = false
canal.mq.servers = 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
2.修改instance.properties配置文件
linux>vi instance.propertiescanal.instance.mysql.slaveId=21
canal.instance.master.address=192.168.58.203:3306
canal.mq.topic=example
3.可以创建topic,也可以不创建
在kafka上启动一个消费者
bin/kafka-console-consumer.sh --topic example --from-beginning --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
启动 canal
linux>bin/startup.sh
结果:
相关文章:
Canal搭建 idea设置及采集数据到kafka
Canal GitHub:https://github.com/alibaba/canal#readme 实时采集工具canal:利用mysql主从复制的原理,slave定期读取master的binarylog对binarylog进行解析。 canal工作原理 canal模拟MySQL slave的交互协议,伪装自己为MySQL slav…...
CentOS7搭建伪分布式Hadoop(全过程2023)
##具体操作目录## 1.配置静态ip2.关闭防火墙3.修改主机名为 *master* ,并重启虚拟机vi /etc/hostname 4.修改主机名与ip映射5.设置SSH免密登录6.安装配置java环境----------------------正式Hadoop配置1.移动安装包到合适位置2.解压安装包并重命名3.配置环境变量4.修…...
Linux中文件描述符fd和文件指针filp的理解
简单归纳:fd只是一个整数,在open时产生。起到一个索引的作用,进程通过PCB中的文件描述符表找到该fd所指向的文件指针filp。 文件描述符的操作(如: open)返回的是一个文件描述符,内核会在每个进程空间中维护一个文件描述符表, 所有打开的文件…...
CSS color中常用英文色值
常用颜色英文 red green blue magenta yellow chocolate black aquamarine lime fuchsia brass azure brown bronze deeppink aliceblue gray copper coral feldspar orange orchid pink plum quartz purple aliceblue antiquewith blanchedalmond…...
Springboot idea 中 maven配置问题,找不到依赖:Could not find artifact xxxx
现象:当我们从代码仓拉取新项目时,从该项目的开发同事拿到其maven的settings文件,作为项目的maven配置,为了是能找到工程中所依赖的包,能从远程仓下载下来。 然后本地仓的包,也从同事那边拷贝一份过来&…...
编译原理笔记(一)引论
文章目录 1.什么是编译程序2.编译过程和编译程序的结构2.1.编译过程概述2.2.编译程序的结构2.3.编译阶段的组合 3.解释程序和一些软件工具3.1.解释程序3.2.处理源程序的软件工具 4.PL/0语言编译系统 学习总结:这一部分是编译原理的绪论部分内容,对编译程…...
C++ 类和对象下 [补充]
文章目录 友元内部类内部类是外部类的天生友元 匿名对象匿名对象的特性 拷贝对象时的一些编译器优化函数返回值临时空间的存储位置返回值临时空间具有常性 标题相同和不同类型 需要 临时空间 友元 友元函数 重载operator<< 输出自定义类型 比如日期类的这个重载ÿ…...
[CTF/网络安全] 攻防世界 PHP2 解题详析
[CTF/网络安全] 攻防世界 PHP2 解题详析 index.php.phps扩展名姿势 翻译:你能给这个网站进行身份验证吗? index.php index.php是一个常见的文件名,通常用于Web服务器中的网站根目录下。它是默认的主页文件名,在访问一个网站时&am…...
图神经网络:(节点分类)在Cora数据集上动手实现图神经网络
文章说明: 1)参考资料:PYG官方文档。超链。 2)博主水平不高,如有错误还望批评指正。 3)我在百度网盘上传了这篇文章的jupyter notebook。超链。提取码8888。 文章目录 代码实操1:GCN的复杂实现代码实操2:GCN的简单实现…...
RabbitMQ应用问题——消息补偿机制以及代码示例
RabbitMQ应用问题——消息补偿机制以及代码示例 RabbitMQ应用问题 消息可靠性的保障 消息补偿机制 详细说明 这里使用了简单的代码进行演示,订单的消费者没有写,在订单的消费同时,发送一条增加积分消息到积分队列。 详细流程途中都有注明…...
量化特征贡献度函数:feature_importances_函数/LGBMClassifier/XGBClassifier
feature_importances_是scikit-learn机器学习库中许多模型对象的属性,在训练模型之后调用该属性可以输出各个特征在模型中的重要性。 示例代码: from sklearn.ensemble import RandomForestRegressor from sklearn.datasets import make_regression# 生…...
总结JVM重要知识点
一.类加载和创建对象的过程 1.类加载 1.编译 : 将源码文件(.java)编译成JVM可以解释的.class文件 . 语法分析>语义分析>注解处理 , 生成class文件 2.加载 : 装载 : 字节码本来存储在硬盘上 , 需要运行时 , 有类加载系统负责将类的信息加载到内存中(方法区) , 使用的是类…...
奇技淫巧第8期
学无止境。 下面是对去年11月至今年5月的零散知识点总结。 春节期间好好放松了一两个月,来校后又懒散的度过了一两个月,直到论文评审意见下来,才开启冲刺模式狂干了一两个月。总的来说,这半年来摸的时间比较多。好,不废…...
这个 归并排序详解过程 我能吹一辈子!!!
文章目录 归并排序概念归并排序算法思路归并排序递归实现归并排序非递归实现 归并排序概念 1945年,约翰冯诺依曼(John von Neumann)发明了归并排序,这是典型的分治算法的应用。 归并排序(Merge sort)是建立…...
docker版jxTMS使用指南:自动生成代码
本文讲解4.0版jxTMS的自动生成代码功能, 整个系列的文章请查看:docker版jxTMS使用指南:4.0版升级内容 docker版本的使用,请参考:docker版jxTMS使用指南 任何一个管理系统都需要对管理对象进行管理,包括最…...
聚观早报 | 小冰启动GPT克隆人计划;ofo创始人在美创业改做咖啡
今日要闻:小冰启动“GPT克隆人计划”;ofo创始人在美创业改做咖啡;OpenAI正准备新的开源AI模型;青年失业率首破20%创新高;微软收购动视暴雪获批 小冰启动“GPT克隆人计划” 5 月 16 日,小冰公司…...
面试造航母,入职拧螺丝,工资离了个大谱...
有粉丝跟我吐槽说:金三银四去面试软件测试岗,真的是面试造航母,入职拧螺丝,工资还低 这种现象很正常,因为找一个测试员,当然希望他能做的业务越多越好,最好像机器猫一样,啥事儿都能…...
Python+selenium自动化元素定位防踩坑
在自动化UI测试过程中常常会在元素定位阶段就踩坑,碰到困扰已久的问题。 以下是个人整理元素定位报错原因和解决方法。 踩坑一:StaleElementReferenceException selenium.common.exceptions.StaleElementReferenceException: Message: stale element re…...
【计算机组成原理】实验一
文章目录 实验一 数据传送实验1. 实验目的2. 实验仪器3. 原理概述4. 实验内容步骤4.1 手动实验环境的建立4.2 手控传送实验 5. 实验结论及问题讨论 实验一 数据传送实验 1. 实验目的 2. 实验仪器 3. 原理概述 4. 实验内容步骤 4.1 手动实验环境的建立 1)初始待令状态 上电或…...
前端022_广告模块_修改功能
广告模块_修改功能 1、需求分析2、Mock添加查询数据3、Mock修改数据4、Api调用回显数据5、提交修改后的数据6、效果1、需求分析 需求分析 当点击 编辑 按钮后,弹出编辑窗口,并查询出分类相关信息进行渲染。修改后点击 确定 提交修改后的数据。 2、Mock添加查询数据 请求URL…...
Compose Specification快速入门:5个步骤部署你的第一个应用
Compose Specification快速入门:5个步骤部署你的第一个应用 【免费下载链接】compose-spec The Compose specification 项目地址: https://gitcode.com/gh_mirrors/co/compose-spec Compose Specification是一个强大的工具,它允许开发者使用YAML文…...
HarmonyOS PC 命令行工具构建框架
欢迎大家一起共建 HarmonyOS PC 生态! 🚀 欢迎加入开源鸿蒙PC社区:https://harmonypc.csdn.net/ 目录 概述环境准备 Windows 环境(WSL)Linux 环境(Ubuntu 22.04)macOS 环境 快速开始详细步骤…...
ESP32-CAM无线图像传输系统:从硬件搭建到远程拍照控制
1. ESP32-CAM无线图像传输系统入门指南 第一次接触ESP32-CAM时,我被这个小巧的模块惊艳到了——它集成了摄像头和WiFi功能,价格却不到百元。这个火柴盒大小的设备,完全可以实现远程监控、智能门铃等物联网应用。很多朋友问我怎么快速上手&…...
OpenClaw多模型对比:千问3.5-9B与本地LLaMA混搭方案
OpenClaw多模型对比:千问3.5-9B与本地LLaMA混搭方案 1. 为什么需要多模型混搭 去年冬天的一个深夜,我正用OpenClaw自动处理一批数据清洗任务。当脚本运行到第三个文件时,突然收到短信提醒——当月API调用费用已超预算。查看日志才发现&…...
基于 Vue + TS + Ant Design Vue 实现精细化菜单按钮权限授权组件腥
7.1 初识三维模型 7.1.1 三维模型的数据载体 随着计算机图形技术的发展,我们或多或少都会见过或者听说过三维模型。笔者始终记得小时候第一次在电视上看到三维动画《变形金刚:超能勇士》的震撼感受;而现在我们已经可以在手机上玩三维游戏《…...
企业什么时候应采用 GraphRAG,什么时候普通 RAG 已足够?
企业在建设知识问答、智能搜索或 AI 助手时,常见的问题并不只是模型能力不足,而是没有区分不同类型的知识处理需求。并非所有场景都需要 GraphRAG,也并非普通 RAG 可以覆盖全部企业问题。二者适用的前提、处理的对象以及能够解决的问题&#…...
Unity UI 圆角渲染架构解析:从传统方案到现代Shader技术的演进
Unity UI 圆角渲染架构解析:从传统方案到现代Shader技术的演进 【免费下载链接】Unity-UI-Rounded-Corners These components and shaders allow you to add rounded corners to UI elements! 项目地址: https://gitcode.com/gh_mirrors/un/Unity-UI-Rounded-Corn…...
FLUX.2-klein-base-9b-nvfp4系统资源监控与优化:保障稳定运行
FLUX.2-klein-base-9b-nvfp4系统资源监控与优化:保障稳定运行 最近在星图GPU平台上部署了FLUX.2-klein-base-9b-nvfp4模型,跑起来效果确实不错。但用了一段时间后,我发现一个问题:模型服务偶尔会变慢,甚至卡住不动。一…...
AI原生研发ROI不达标?你可能漏算了这6个合规性折损因子(GDPR/《生成式AI服务管理暂行办法》双轨折价模型)
第一章:AI原生软件研发ROI计算方法详解 2026奇点智能技术大会(https://ml-summit.org) AI原生软件的研发投入产出比(ROI)不能沿用传统软件工程的静态人力-工时模型,而需构建融合模型训练成本、推理服务开销、数据飞轮收益与业务转…...
Cursor Pro免费激活终极指南:突破API限制的完整技术解决方案
Cursor Pro免费激活终极指南:突破API限制的完整技术解决方案 【免费下载链接】cursor-free-vip [Support 0.45](Multi Language 多语言)自动注册 Cursor Ai ,自动重置机器ID , 免费升级使用Pro 功能: Youve reached yo…...
