Canal搭建 idea设置及采集数据到kafka
Canal GitHub:https://github.com/alibaba/canal#readme
实时采集工具canal:利用mysql主从复制的原理,slave定期读取master的binarylog对binarylog进行解析。
canal工作原理
canal模拟MySQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议
MySQL master收到dump请求,开始推送binary log给slave(即canal)
canal解析binary log对像(原始为bye流)
官网配置:https://github.com/alibaba/canal/wiki/QuickStart
1.在mysql中开启binlog日志功能
mysql上配置
linux>vi /etc/my.cnf
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=testdb //指定数据库
2.重启mysql服务
linux>systemctl restart mysqld
3.查看binlog是否生效:
linux>ls /var/lib/mysql
4.解压canal压缩包
linux>tar -zxvf canal-* -C canal
5.数据库设置
登陆mysql
mysql>GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
mysql>flush privileges;
6.配置文件
1.修改canal.properties配置文件
linux>vi conf/canal.properties
canal.instance.parser.parallelThreadSize = 1
2.修改instance.properties配置文件
linux>vi conf/example/instance.properties
canal.instance.mysql.slaveId=21
canal.instance.master.address=192.168.58.203:3306
7.启动服务并查看进程
linux>bin/startup.shlinux>jpsxxxx CanalLauncher
查看日志
linux>cat /opt/install/canal/logs/canal/canal.log
idea客户端
pom.xml
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class CanalClientDemo {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.58.203)",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {//testdb库中的所有表connector.connect();connector.subscribe("testdb.*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}
}
注意:canal只能在java8中运行,如果canal进程CanalLauncher起不来,检查本地java环境
CanalClientDemo运行提示拒绝连接,检查脚本中的连接地址是不是运行canal的主机 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(“192.168.58.203(canal))”,
xxxx");
Canal kafka github 配置官网:github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
使用canal将数据同步到kafka上
#重新解压一个canal到nodefour上进行配置
1.修改canal.properties配置文件
linux>vi canal.propertiescanal.serverMode = kafka
canal.instance.parser.parallel = false
canal.mq.servers = 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
2.修改instance.properties配置文件
linux>vi instance.propertiescanal.instance.mysql.slaveId=21
canal.instance.master.address=192.168.58.203:3306
canal.mq.topic=example
3.可以创建topic,也可以不创建
在kafka上启动一个消费者
bin/kafka-console-consumer.sh --topic example --from-beginning --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
启动 canal
linux>bin/startup.sh
结果:
相关文章:
Canal搭建 idea设置及采集数据到kafka
Canal GitHub:https://github.com/alibaba/canal#readme 实时采集工具canal:利用mysql主从复制的原理,slave定期读取master的binarylog对binarylog进行解析。 canal工作原理 canal模拟MySQL slave的交互协议,伪装自己为MySQL slav…...
CentOS7搭建伪分布式Hadoop(全过程2023)
##具体操作目录## 1.配置静态ip2.关闭防火墙3.修改主机名为 *master* ,并重启虚拟机vi /etc/hostname 4.修改主机名与ip映射5.设置SSH免密登录6.安装配置java环境----------------------正式Hadoop配置1.移动安装包到合适位置2.解压安装包并重命名3.配置环境变量4.修…...
Linux中文件描述符fd和文件指针filp的理解
简单归纳:fd只是一个整数,在open时产生。起到一个索引的作用,进程通过PCB中的文件描述符表找到该fd所指向的文件指针filp。 文件描述符的操作(如: open)返回的是一个文件描述符,内核会在每个进程空间中维护一个文件描述符表, 所有打开的文件…...
CSS color中常用英文色值
常用颜色英文 red green blue magenta yellow chocolate black aquamarine lime fuchsia brass azure brown bronze deeppink aliceblue gray copper coral feldspar orange orchid pink plum quartz purple aliceblue antiquewith blanchedalmond…...
Springboot idea 中 maven配置问题,找不到依赖:Could not find artifact xxxx
现象:当我们从代码仓拉取新项目时,从该项目的开发同事拿到其maven的settings文件,作为项目的maven配置,为了是能找到工程中所依赖的包,能从远程仓下载下来。 然后本地仓的包,也从同事那边拷贝一份过来&…...
编译原理笔记(一)引论
文章目录 1.什么是编译程序2.编译过程和编译程序的结构2.1.编译过程概述2.2.编译程序的结构2.3.编译阶段的组合 3.解释程序和一些软件工具3.1.解释程序3.2.处理源程序的软件工具 4.PL/0语言编译系统 学习总结:这一部分是编译原理的绪论部分内容,对编译程…...
C++ 类和对象下 [补充]
文章目录 友元内部类内部类是外部类的天生友元 匿名对象匿名对象的特性 拷贝对象时的一些编译器优化函数返回值临时空间的存储位置返回值临时空间具有常性 标题相同和不同类型 需要 临时空间 友元 友元函数 重载operator<< 输出自定义类型 比如日期类的这个重载ÿ…...
[CTF/网络安全] 攻防世界 PHP2 解题详析
[CTF/网络安全] 攻防世界 PHP2 解题详析 index.php.phps扩展名姿势 翻译:你能给这个网站进行身份验证吗? index.php index.php是一个常见的文件名,通常用于Web服务器中的网站根目录下。它是默认的主页文件名,在访问一个网站时&am…...
图神经网络:(节点分类)在Cora数据集上动手实现图神经网络
文章说明: 1)参考资料:PYG官方文档。超链。 2)博主水平不高,如有错误还望批评指正。 3)我在百度网盘上传了这篇文章的jupyter notebook。超链。提取码8888。 文章目录 代码实操1:GCN的复杂实现代码实操2:GCN的简单实现…...
RabbitMQ应用问题——消息补偿机制以及代码示例
RabbitMQ应用问题——消息补偿机制以及代码示例 RabbitMQ应用问题 消息可靠性的保障 消息补偿机制 详细说明 这里使用了简单的代码进行演示,订单的消费者没有写,在订单的消费同时,发送一条增加积分消息到积分队列。 详细流程途中都有注明…...
量化特征贡献度函数:feature_importances_函数/LGBMClassifier/XGBClassifier
feature_importances_是scikit-learn机器学习库中许多模型对象的属性,在训练模型之后调用该属性可以输出各个特征在模型中的重要性。 示例代码: from sklearn.ensemble import RandomForestRegressor from sklearn.datasets import make_regression# 生…...
总结JVM重要知识点
一.类加载和创建对象的过程 1.类加载 1.编译 : 将源码文件(.java)编译成JVM可以解释的.class文件 . 语法分析>语义分析>注解处理 , 生成class文件 2.加载 : 装载 : 字节码本来存储在硬盘上 , 需要运行时 , 有类加载系统负责将类的信息加载到内存中(方法区) , 使用的是类…...
奇技淫巧第8期
学无止境。 下面是对去年11月至今年5月的零散知识点总结。 春节期间好好放松了一两个月,来校后又懒散的度过了一两个月,直到论文评审意见下来,才开启冲刺模式狂干了一两个月。总的来说,这半年来摸的时间比较多。好,不废…...
这个 归并排序详解过程 我能吹一辈子!!!
文章目录 归并排序概念归并排序算法思路归并排序递归实现归并排序非递归实现 归并排序概念 1945年,约翰冯诺依曼(John von Neumann)发明了归并排序,这是典型的分治算法的应用。 归并排序(Merge sort)是建立…...
docker版jxTMS使用指南:自动生成代码
本文讲解4.0版jxTMS的自动生成代码功能, 整个系列的文章请查看:docker版jxTMS使用指南:4.0版升级内容 docker版本的使用,请参考:docker版jxTMS使用指南 任何一个管理系统都需要对管理对象进行管理,包括最…...
聚观早报 | 小冰启动GPT克隆人计划;ofo创始人在美创业改做咖啡
今日要闻:小冰启动“GPT克隆人计划”;ofo创始人在美创业改做咖啡;OpenAI正准备新的开源AI模型;青年失业率首破20%创新高;微软收购动视暴雪获批 小冰启动“GPT克隆人计划” 5 月 16 日,小冰公司…...
面试造航母,入职拧螺丝,工资离了个大谱...
有粉丝跟我吐槽说:金三银四去面试软件测试岗,真的是面试造航母,入职拧螺丝,工资还低 这种现象很正常,因为找一个测试员,当然希望他能做的业务越多越好,最好像机器猫一样,啥事儿都能…...
Python+selenium自动化元素定位防踩坑
在自动化UI测试过程中常常会在元素定位阶段就踩坑,碰到困扰已久的问题。 以下是个人整理元素定位报错原因和解决方法。 踩坑一:StaleElementReferenceException selenium.common.exceptions.StaleElementReferenceException: Message: stale element re…...
【计算机组成原理】实验一
文章目录 实验一 数据传送实验1. 实验目的2. 实验仪器3. 原理概述4. 实验内容步骤4.1 手动实验环境的建立4.2 手控传送实验 5. 实验结论及问题讨论 实验一 数据传送实验 1. 实验目的 2. 实验仪器 3. 原理概述 4. 实验内容步骤 4.1 手动实验环境的建立 1)初始待令状态 上电或…...
前端022_广告模块_修改功能
广告模块_修改功能 1、需求分析2、Mock添加查询数据3、Mock修改数据4、Api调用回显数据5、提交修改后的数据6、效果1、需求分析 需求分析 当点击 编辑 按钮后,弹出编辑窗口,并查询出分类相关信息进行渲染。修改后点击 确定 提交修改后的数据。 2、Mock添加查询数据 请求URL…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...
【项目实战】通过多模态+LangGraph实现PPT生成助手
PPT自动生成系统 基于LangGraph的PPT自动生成系统,可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析:自动解析Markdown文档结构PPT模板分析:分析PPT模板的布局和风格智能布局决策:匹配内容与合适的PPT布局自动…...
python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)
更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...
HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...
Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...
成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
docker 部署发现spring.profiles.active 问题
报错: org.springframework.boot.context.config.InvalidConfigDataPropertyException: Property spring.profiles.active imported from location class path resource [application-test.yml] is invalid in a profile specific resource [origin: class path re…...
springboot整合VUE之在线教育管理系统简介
可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生,小白用户,想学习知识的 有点基础,想要通过项…...
android RelativeLayout布局
<?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"match_parent"android:gravity&…...
