当前位置: 首页 > news >正文

Canal搭建 idea设置及采集数据到kafka

Canal GitHub:https://github.com/alibaba/canal#readme
实时采集工具canal:利用mysql主从复制的原理,slave定期读取master的binarylog对binarylog进行解析。
canal工作原理
canal模拟MySQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议
MySQL master收到dump请求,开始推送binary log给slave(即canal)
canal解析binary log对像(原始为bye流)

官网配置:https://github.com/alibaba/canal/wiki/QuickStart
1.在mysql中开启binlog日志功能
mysql上配置

linux>vi /etc/my.cnf 
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=testdb	//指定数据库

2.重启mysql服务

linux>systemctl restart mysqld

3.查看binlog是否生效:

linux>ls /var/lib/mysql

4.解压canal压缩包

linux>tar -zxvf canal-* -C  canal

5.数据库设置
登陆mysql

mysql>GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
mysql>flush privileges;

6.配置文件
1.修改canal.properties配置文件

linux>vi conf/canal.properties
canal.instance.parser.parallelThreadSize = 1

2.修改instance.properties配置文件

linux>vi  conf/example/instance.properties
canal.instance.mysql.slaveId=21
canal.instance.master.address=192.168.58.203:3306

7.启动服务并查看进程

linux>bin/startup.shlinux>jpsxxxx CanalLauncher

查看日志

linux>cat /opt/install/canal/logs/canal/canal.log

idea客户端
pom.xml

 <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class CanalClientDemo {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.58.203)",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {//testdb库中的所有表connector.connect();connector.subscribe("testdb.*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}

注意:canal只能在java8中运行,如果canal进程CanalLauncher起不来,检查本地java环境
CanalClientDemo运行提示拒绝连接,检查脚本中的连接地址是不是运行canal的主机 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(“192.168.58.203(canal))”,
xxxx");

Canal kafka github 配置官网:github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

使用canal将数据同步到kafka上
#重新解压一个canal到nodefour上进行配置
1.修改canal.properties配置文件

linux>vi canal.propertiescanal.serverMode = kafka
canal.instance.parser.parallel = false
canal.mq.servers = 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092

2.修改instance.properties配置文件

linux>vi instance.propertiescanal.instance.mysql.slaveId=21
canal.instance.master.address=192.168.58.203:3306
canal.mq.topic=example

3.可以创建topic,也可以不创建

在kafka上启动一个消费者

bin/kafka-console-consumer.sh --topic example --from-beginning --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092

启动 canal

linux>bin/startup.sh

结果:在这里插入图片描述

相关文章:

Canal搭建 idea设置及采集数据到kafka

Canal GitHub&#xff1a;https://github.com/alibaba/canal#readme 实时采集工具canal&#xff1a;利用mysql主从复制的原理&#xff0c;slave定期读取master的binarylog对binarylog进行解析。 canal工作原理 canal模拟MySQL slave的交互协议&#xff0c;伪装自己为MySQL slav…...

CentOS7搭建伪分布式Hadoop(全过程2023)

##具体操作目录## 1.配置静态ip2.关闭防火墙3.修改主机名为 *master* &#xff0c;并重启虚拟机vi /etc/hostname 4.修改主机名与ip映射5.设置SSH免密登录6.安装配置java环境----------------------正式Hadoop配置1.移动安装包到合适位置2.解压安装包并重命名3.配置环境变量4.修…...

Linux中文件描述符fd和文件指针filp的理解

简单归纳&#xff1a;fd只是一个整数&#xff0c;在open时产生。起到一个索引的作用&#xff0c;进程通过PCB中的文件描述符表找到该fd所指向的文件指针filp。 文件描述符的操作(如: open)返回的是一个文件描述符,内核会在每个进程空间中维护一个文件描述符表, 所有打开的文件…...

CSS color中常用英文色值

常用颜色英文 red green blue magenta yellow chocolate black aquamarine lime fuchsia brass azure brown bronze deeppink aliceblue gray copper coral feldspar orange orchid pink plum quartz purple aliceblue antiquewith blanchedalmond…...

Springboot idea 中 maven配置问题,找不到依赖:Could not find artifact xxxx

现象&#xff1a;当我们从代码仓拉取新项目时&#xff0c;从该项目的开发同事拿到其maven的settings文件&#xff0c;作为项目的maven配置&#xff0c;为了是能找到工程中所依赖的包&#xff0c;能从远程仓下载下来。 然后本地仓的包&#xff0c;也从同事那边拷贝一份过来&…...

编译原理笔记(一)引论

文章目录 1.什么是编译程序2.编译过程和编译程序的结构2.1.编译过程概述2.2.编译程序的结构2.3.编译阶段的组合 3.解释程序和一些软件工具3.1.解释程序3.2.处理源程序的软件工具 4.PL/0语言编译系统 学习总结&#xff1a;这一部分是编译原理的绪论部分内容&#xff0c;对编译程…...

C++ 类和对象下 [补充]

文章目录 友元内部类内部类是外部类的天生友元 匿名对象匿名对象的特性 拷贝对象时的一些编译器优化函数返回值临时空间的存储位置返回值临时空间具有常性 标题相同和不同类型 需要 临时空间 友元 友元函数 重载operator<< 输出自定义类型 比如日期类的这个重载&#xff…...

[CTF/网络安全] 攻防世界 PHP2 解题详析

[CTF/网络安全] 攻防世界 PHP2 解题详析 index.php.phps扩展名姿势 翻译&#xff1a;你能给这个网站进行身份验证吗&#xff1f; index.php index.php是一个常见的文件名&#xff0c;通常用于Web服务器中的网站根目录下。它是默认的主页文件名&#xff0c;在访问一个网站时&am…...

图神经网络:(节点分类)在Cora数据集上动手实现图神经网络

文章说明&#xff1a; 1)参考资料&#xff1a;PYG官方文档。超链。 2)博主水平不高&#xff0c;如有错误还望批评指正。 3)我在百度网盘上传了这篇文章的jupyter notebook。超链。提取码8888。 文章目录 代码实操1&#xff1a;GCN的复杂实现代码实操2&#xff1a;GCN的简单实现…...

RabbitMQ应用问题——消息补偿机制以及代码示例

RabbitMQ应用问题——消息补偿机制以及代码示例 RabbitMQ应用问题 消息可靠性的保障 消息补偿机制 详细说明 这里使用了简单的代码进行演示&#xff0c;订单的消费者没有写&#xff0c;在订单的消费同时&#xff0c;发送一条增加积分消息到积分队列。 详细流程途中都有注明…...

量化特征贡献度函数:feature_importances_函数/LGBMClassifier/XGBClassifier

feature_importances_是scikit-learn机器学习库中许多模型对象的属性&#xff0c;在训练模型之后调用该属性可以输出各个特征在模型中的重要性。 示例代码&#xff1a; from sklearn.ensemble import RandomForestRegressor from sklearn.datasets import make_regression# 生…...

总结JVM重要知识点

一.类加载和创建对象的过程 1.类加载 1.编译 : 将源码文件(.java)编译成JVM可以解释的.class文件 . 语法分析>语义分析>注解处理 , 生成class文件 2.加载 : 装载 : 字节码本来存储在硬盘上 , 需要运行时 , 有类加载系统负责将类的信息加载到内存中(方法区) , 使用的是类…...

奇技淫巧第8期

学无止境。 下面是对去年11月至今年5月的零散知识点总结。 春节期间好好放松了一两个月&#xff0c;来校后又懒散的度过了一两个月&#xff0c;直到论文评审意见下来&#xff0c;才开启冲刺模式狂干了一两个月。总的来说&#xff0c;这半年来摸的时间比较多。好&#xff0c;不废…...

这个 归并排序详解过程 我能吹一辈子!!!

文章目录 归并排序概念归并排序算法思路归并排序递归实现归并排序非递归实现 归并排序概念 1945年&#xff0c;约翰冯诺依曼&#xff08;John von Neumann&#xff09;发明了归并排序&#xff0c;这是典型的分治算法的应用。 归并排序&#xff08;Merge sort&#xff09;是建立…...

docker版jxTMS使用指南:自动生成代码

本文讲解4.0版jxTMS的自动生成代码功能&#xff0c; 整个系列的文章请查看&#xff1a;docker版jxTMS使用指南&#xff1a;4.0版升级内容 docker版本的使用&#xff0c;请参考&#xff1a;docker版jxTMS使用指南 任何一个管理系统都需要对管理对象进行管理&#xff0c;包括最…...

聚观早报 | 小冰启动GPT克隆人计划;ofo创始人在美创业改做咖啡

今日要闻&#xff1a;小冰启动“GPT克隆人计划”&#xff1b;ofo创始人在美创业改做咖啡&#xff1b;OpenAI正准备新的开源AI模型&#xff1b;青年失业率首破20&#xff05;创新高&#xff1b;微软收购动视暴雪获批 小冰启动“GPT克隆人计划” 5 月 16 日&#xff0c;小冰公司…...

面试造航母,入职拧螺丝,工资离了个大谱...

有粉丝跟我吐槽说&#xff1a;金三银四去面试软件测试岗&#xff0c;真的是面试造航母&#xff0c;入职拧螺丝&#xff0c;工资还低 这种现象很正常&#xff0c;因为找一个测试员&#xff0c;当然希望他能做的业务越多越好&#xff0c;最好像机器猫一样&#xff0c;啥事儿都能…...

Python+selenium自动化元素定位防踩坑

在自动化UI测试过程中常常会在元素定位阶段就踩坑&#xff0c;碰到困扰已久的问题。 以下是个人整理元素定位报错原因和解决方法。 踩坑一&#xff1a;StaleElementReferenceException selenium.common.exceptions.StaleElementReferenceException: Message: stale element re…...

【计算机组成原理】实验一

文章目录 实验一 数据传送实验1. 实验目的2. 实验仪器3. 原理概述4. 实验内容步骤4.1 手动实验环境的建立4.2 手控传送实验 5. 实验结论及问题讨论 实验一 数据传送实验 1. 实验目的 2. 实验仪器 3. 原理概述 4. 实验内容步骤 4.1 手动实验环境的建立 1)初始待令状态 上电或…...

前端022_广告模块_修改功能

广告模块_修改功能 1、需求分析2、Mock添加查询数据3、Mock修改数据4、Api调用回显数据5、提交修改后的数据6、效果1、需求分析 需求分析 当点击 编辑 按钮后,弹出编辑窗口,并查询出分类相关信息进行渲染。修改后点击 确定 提交修改后的数据。 2、Mock添加查询数据 请求URL…...

STM32单片机学习(28) —— STM32的SPI外设

文章目录概述SPI通信的移位机制&#xff08;以bit为单位&#xff09;SPI外设框图第一部分&#xff1a;数据通路SPI通信的数据帧格式SPI外设移位机制&#xff08;以字节为单位&#xff09;第二部分&#xff1a;主机时钟生成器SPI通信时钟频率与传输速率第三部分&#xff1a;主从…...

iPaaS 应用场景深度解析:从系统孤岛到数据自由流动的六大实战路径

写在前面 一个企业的数字化程度越高&#xff0c;系统就越多。系统越多&#xff0c;集成问题就越严重。 这不是假设&#xff0c;而是我们在服务客户过程中反复验证的结论——企业数字化转型的瓶颈&#xff0c;往往不在于"造新系统"&#xff0c;而在于"连老系统&q…...

手把手教你为WCH CH582移植CherryUSB主机栈(基于RT-Thread,含中断优化)

基于RT-Thread的WCH CH582 USB主机协议栈深度移植指南在嵌入式开发领域&#xff0c;USB主机功能的实现往往意味着设备能够直接连接各类USB外设&#xff0c;从简单的键盘鼠标到复杂的存储设备。对于使用WCH CH582这类RISC-V内核MCU的开发者而言&#xff0c;原厂SDK提供的USB主机…...

终极艾尔登法环帧率解锁指南:轻松突破60FPS限制

终极艾尔登法环帧率解锁指南&#xff1a;轻松突破60FPS限制 【免费下载链接】EldenRingFpsUnlockAndMore A small utility to remove frame rate limit, change FOV, add widescreen support and more for Elden Ring 项目地址: https://gitcode.com/gh_mirrors/el/EldenRing…...

如何让Rhino 3D模型在Blender中保持完整数据:import_3dm插件深度解析

如何让Rhino 3D模型在Blender中保持完整数据&#xff1a;import_3dm插件深度解析 【免费下载链接】import_3dm Blender importer script for Rhinoceros 3D files 项目地址: https://gitcode.com/gh_mirrors/im/import_3dm 当建筑师需要在Blender中渲染Rhino设计的建筑模…...

Claude端到端测试设计:从零搭建可审计、可回放、可量化的AI服务测试流水线(含开源Schema校验工具)

更多请点击&#xff1a; https://codechina.net 第一章&#xff1a;Claude端到端测试设计 端到端测试是验证Claude模型在真实用户交互链路中行为一致性的关键手段。它覆盖从原始提示输入、上下文管理、流式响应生成&#xff0c;到输出解析与业务校验的全路径&#xff0c;确保模…...

同步带装配工艺要点与损伤防控策略

一、引言在工业精密传动系统中&#xff0c;盖茨同步带凭借高精度、高效率、无滑差的优势&#xff0c;成为自动化设备、精密机床、输送产线的核心传动部件。多数企业在运维中&#xff0c;普遍将同步带异常磨损、断齿、断带等故障归咎于工况恶劣或产品质量问题&#xff0c;却忽略…...

PvZ Toolkit终极指南:三步掌握植物大战僵尸最强修改器

PvZ Toolkit终极指南&#xff1a;三步掌握植物大战僵尸最强修改器 【免费下载链接】pvztoolkit 植物大战僵尸 PC 版综合修改器 项目地址: https://gitcode.com/gh_mirrors/pv/pvztoolkit PvZ Toolkit是一款专为植物大战僵尸PC版设计的综合修改器工具&#xff0c;能够让你…...

接口测试用例设计:超详细防御体系与分层校验实践

1. 为什么“超详细”三个字在接口测试用例里不是修饰词&#xff0c;而是生死线我带过三支不同行业的测试团队——金融支付、SaaS中台、IoT设备管理平台。每次新人入职第一周&#xff0c;我都会收走他们写的前5条接口测试用例&#xff0c;逐行标红批注。不是因为格式不对&#x…...

别再乱建索引了!用Explain的key_len字段,一眼看穿你的MySQL联合索引到底生效了几个字段

解密MySQL联合索引&#xff1a;用key_len精准判断索引生效范围 在数据库性能优化领域&#xff0c;联合索引的使用一直是个既基础又容易踩坑的话题。很多开发者虽然知道"最左匹配原则"这个名词&#xff0c;但在实际业务场景中&#xff0c;面对复杂的查询条件组合时&a…...