当前位置: 首页 > news >正文

Canal搭建 idea设置及采集数据到kafka

Canal GitHub:https://github.com/alibaba/canal#readme
实时采集工具canal:利用mysql主从复制的原理,slave定期读取master的binarylog对binarylog进行解析。
canal工作原理
canal模拟MySQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议
MySQL master收到dump请求,开始推送binary log给slave(即canal)
canal解析binary log对像(原始为bye流)

官网配置:https://github.com/alibaba/canal/wiki/QuickStart
1.在mysql中开启binlog日志功能
mysql上配置

linux>vi /etc/my.cnf 
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=testdb	//指定数据库

2.重启mysql服务

linux>systemctl restart mysqld

3.查看binlog是否生效:

linux>ls /var/lib/mysql

4.解压canal压缩包

linux>tar -zxvf canal-* -C  canal

5.数据库设置
登陆mysql

mysql>GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
mysql>flush privileges;

6.配置文件
1.修改canal.properties配置文件

linux>vi conf/canal.properties
canal.instance.parser.parallelThreadSize = 1

2.修改instance.properties配置文件

linux>vi  conf/example/instance.properties
canal.instance.mysql.slaveId=21
canal.instance.master.address=192.168.58.203:3306

7.启动服务并查看进程

linux>bin/startup.shlinux>jpsxxxx CanalLauncher

查看日志

linux>cat /opt/install/canal/logs/canal/canal.log

idea客户端
pom.xml

 <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency>
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class CanalClientDemo {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.58.203)",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {//testdb库中的所有表connector.connect();connector.subscribe("testdb.*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}

注意:canal只能在java8中运行,如果canal进程CanalLauncher起不来,检查本地java环境
CanalClientDemo运行提示拒绝连接,检查脚本中的连接地址是不是运行canal的主机 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(“192.168.58.203(canal))”,
xxxx");

Canal kafka github 配置官网:github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

使用canal将数据同步到kafka上
#重新解压一个canal到nodefour上进行配置
1.修改canal.properties配置文件

linux>vi canal.propertiescanal.serverMode = kafka
canal.instance.parser.parallel = false
canal.mq.servers = 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092

2.修改instance.properties配置文件

linux>vi instance.propertiescanal.instance.mysql.slaveId=21
canal.instance.master.address=192.168.58.203:3306
canal.mq.topic=example

3.可以创建topic,也可以不创建

在kafka上启动一个消费者

bin/kafka-console-consumer.sh --topic example --from-beginning --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092

启动 canal

linux>bin/startup.sh

结果:在这里插入图片描述

相关文章:

Canal搭建 idea设置及采集数据到kafka

Canal GitHub&#xff1a;https://github.com/alibaba/canal#readme 实时采集工具canal&#xff1a;利用mysql主从复制的原理&#xff0c;slave定期读取master的binarylog对binarylog进行解析。 canal工作原理 canal模拟MySQL slave的交互协议&#xff0c;伪装自己为MySQL slav…...

CentOS7搭建伪分布式Hadoop(全过程2023)

##具体操作目录## 1.配置静态ip2.关闭防火墙3.修改主机名为 *master* &#xff0c;并重启虚拟机vi /etc/hostname 4.修改主机名与ip映射5.设置SSH免密登录6.安装配置java环境----------------------正式Hadoop配置1.移动安装包到合适位置2.解压安装包并重命名3.配置环境变量4.修…...

Linux中文件描述符fd和文件指针filp的理解

简单归纳&#xff1a;fd只是一个整数&#xff0c;在open时产生。起到一个索引的作用&#xff0c;进程通过PCB中的文件描述符表找到该fd所指向的文件指针filp。 文件描述符的操作(如: open)返回的是一个文件描述符,内核会在每个进程空间中维护一个文件描述符表, 所有打开的文件…...

CSS color中常用英文色值

常用颜色英文 red green blue magenta yellow chocolate black aquamarine lime fuchsia brass azure brown bronze deeppink aliceblue gray copper coral feldspar orange orchid pink plum quartz purple aliceblue antiquewith blanchedalmond…...

Springboot idea 中 maven配置问题,找不到依赖:Could not find artifact xxxx

现象&#xff1a;当我们从代码仓拉取新项目时&#xff0c;从该项目的开发同事拿到其maven的settings文件&#xff0c;作为项目的maven配置&#xff0c;为了是能找到工程中所依赖的包&#xff0c;能从远程仓下载下来。 然后本地仓的包&#xff0c;也从同事那边拷贝一份过来&…...

编译原理笔记(一)引论

文章目录 1.什么是编译程序2.编译过程和编译程序的结构2.1.编译过程概述2.2.编译程序的结构2.3.编译阶段的组合 3.解释程序和一些软件工具3.1.解释程序3.2.处理源程序的软件工具 4.PL/0语言编译系统 学习总结&#xff1a;这一部分是编译原理的绪论部分内容&#xff0c;对编译程…...

C++ 类和对象下 [补充]

文章目录 友元内部类内部类是外部类的天生友元 匿名对象匿名对象的特性 拷贝对象时的一些编译器优化函数返回值临时空间的存储位置返回值临时空间具有常性 标题相同和不同类型 需要 临时空间 友元 友元函数 重载operator<< 输出自定义类型 比如日期类的这个重载&#xff…...

[CTF/网络安全] 攻防世界 PHP2 解题详析

[CTF/网络安全] 攻防世界 PHP2 解题详析 index.php.phps扩展名姿势 翻译&#xff1a;你能给这个网站进行身份验证吗&#xff1f; index.php index.php是一个常见的文件名&#xff0c;通常用于Web服务器中的网站根目录下。它是默认的主页文件名&#xff0c;在访问一个网站时&am…...

图神经网络:(节点分类)在Cora数据集上动手实现图神经网络

文章说明&#xff1a; 1)参考资料&#xff1a;PYG官方文档。超链。 2)博主水平不高&#xff0c;如有错误还望批评指正。 3)我在百度网盘上传了这篇文章的jupyter notebook。超链。提取码8888。 文章目录 代码实操1&#xff1a;GCN的复杂实现代码实操2&#xff1a;GCN的简单实现…...

RabbitMQ应用问题——消息补偿机制以及代码示例

RabbitMQ应用问题——消息补偿机制以及代码示例 RabbitMQ应用问题 消息可靠性的保障 消息补偿机制 详细说明 这里使用了简单的代码进行演示&#xff0c;订单的消费者没有写&#xff0c;在订单的消费同时&#xff0c;发送一条增加积分消息到积分队列。 详细流程途中都有注明…...

量化特征贡献度函数:feature_importances_函数/LGBMClassifier/XGBClassifier

feature_importances_是scikit-learn机器学习库中许多模型对象的属性&#xff0c;在训练模型之后调用该属性可以输出各个特征在模型中的重要性。 示例代码&#xff1a; from sklearn.ensemble import RandomForestRegressor from sklearn.datasets import make_regression# 生…...

总结JVM重要知识点

一.类加载和创建对象的过程 1.类加载 1.编译 : 将源码文件(.java)编译成JVM可以解释的.class文件 . 语法分析>语义分析>注解处理 , 生成class文件 2.加载 : 装载 : 字节码本来存储在硬盘上 , 需要运行时 , 有类加载系统负责将类的信息加载到内存中(方法区) , 使用的是类…...

奇技淫巧第8期

学无止境。 下面是对去年11月至今年5月的零散知识点总结。 春节期间好好放松了一两个月&#xff0c;来校后又懒散的度过了一两个月&#xff0c;直到论文评审意见下来&#xff0c;才开启冲刺模式狂干了一两个月。总的来说&#xff0c;这半年来摸的时间比较多。好&#xff0c;不废…...

这个 归并排序详解过程 我能吹一辈子!!!

文章目录 归并排序概念归并排序算法思路归并排序递归实现归并排序非递归实现 归并排序概念 1945年&#xff0c;约翰冯诺依曼&#xff08;John von Neumann&#xff09;发明了归并排序&#xff0c;这是典型的分治算法的应用。 归并排序&#xff08;Merge sort&#xff09;是建立…...

docker版jxTMS使用指南:自动生成代码

本文讲解4.0版jxTMS的自动生成代码功能&#xff0c; 整个系列的文章请查看&#xff1a;docker版jxTMS使用指南&#xff1a;4.0版升级内容 docker版本的使用&#xff0c;请参考&#xff1a;docker版jxTMS使用指南 任何一个管理系统都需要对管理对象进行管理&#xff0c;包括最…...

聚观早报 | 小冰启动GPT克隆人计划;ofo创始人在美创业改做咖啡

今日要闻&#xff1a;小冰启动“GPT克隆人计划”&#xff1b;ofo创始人在美创业改做咖啡&#xff1b;OpenAI正准备新的开源AI模型&#xff1b;青年失业率首破20&#xff05;创新高&#xff1b;微软收购动视暴雪获批 小冰启动“GPT克隆人计划” 5 月 16 日&#xff0c;小冰公司…...

面试造航母,入职拧螺丝,工资离了个大谱...

有粉丝跟我吐槽说&#xff1a;金三银四去面试软件测试岗&#xff0c;真的是面试造航母&#xff0c;入职拧螺丝&#xff0c;工资还低 这种现象很正常&#xff0c;因为找一个测试员&#xff0c;当然希望他能做的业务越多越好&#xff0c;最好像机器猫一样&#xff0c;啥事儿都能…...

Python+selenium自动化元素定位防踩坑

在自动化UI测试过程中常常会在元素定位阶段就踩坑&#xff0c;碰到困扰已久的问题。 以下是个人整理元素定位报错原因和解决方法。 踩坑一&#xff1a;StaleElementReferenceException selenium.common.exceptions.StaleElementReferenceException: Message: stale element re…...

【计算机组成原理】实验一

文章目录 实验一 数据传送实验1. 实验目的2. 实验仪器3. 原理概述4. 实验内容步骤4.1 手动实验环境的建立4.2 手控传送实验 5. 实验结论及问题讨论 实验一 数据传送实验 1. 实验目的 2. 实验仪器 3. 原理概述 4. 实验内容步骤 4.1 手动实验环境的建立 1)初始待令状态 上电或…...

前端022_广告模块_修改功能

广告模块_修改功能 1、需求分析2、Mock添加查询数据3、Mock修改数据4、Api调用回显数据5、提交修改后的数据6、效果1、需求分析 需求分析 当点击 编辑 按钮后,弹出编辑窗口,并查询出分类相关信息进行渲染。修改后点击 确定 提交修改后的数据。 2、Mock添加查询数据 请求URL…...

C++实现分布式网络通信框架RPC(3)--rpc调用端

目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中&#xff0c;我们已经大致实现了rpc服务端的各项功能代…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢

随着互联网技术的飞速发展&#xff0c;消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁&#xff0c;不仅优化了客户体验&#xff0c;还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用&#xff0c;并…...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)

参考官方文档&#xff1a;https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java&#xff08;供 Kotlin 使用&#xff09; 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...

C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。

1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj&#xff0c;再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...

使用 SymPy 进行向量和矩阵的高级操作

在科学计算和工程领域&#xff0c;向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能&#xff0c;能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作&#xff0c;并通过具体…...

html-<abbr> 缩写或首字母缩略词

定义与作用 <abbr> 标签用于表示缩写或首字母缩略词&#xff0c;它可以帮助用户更好地理解缩写的含义&#xff0c;尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时&#xff0c;会显示一个提示框。 示例&#x…...

【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制

使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下&#xff0c;限制某个 IP 的访问频率是非常重要的&#xff0c;可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案&#xff0c;使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...