当前位置: 首页 > news >正文

Doris(六)--通过 Canal 同步数据到 Doris 中

pre 开启 mysql Binlog

网上有众多方法,自行百度。

查询是否成功,在 mysql 客户端输入

show BINARY LOGS;

出现如下提示,即表示 big log 正常开启。 

1,下载 canal 服务端

传送门

注意:下载 canal.deployer-xxx 版本即可。admin 是 deployer 的管理端。

2,上传到服务器的指定位置并解压

tar xzvf canal.deployer-1.1.6.tar.gz

注意,这个 deployer 解压之后直接是零散文件夹,建议先创建一个文件夹后,在这个文件夹里面进行解压

 3,配置实例

进入 conf 文件夹后,创建实例文件夹

cd conf/
mkdir test

从 example 文件夹中,拷贝instance.properties到当前文件夹

cp ../example/instance.properties .

 4,编辑实例文件

4.1 源数据库位置

//源数据位置
canal.instance.master.address=127.0.0.1:3306
//源数据 binlog 名字
canal.instance.master.journal.name=
//源数据 biglog 偏移量
canal.instance.master.position=

4.2 连接源数据库的用户名和密码

//连接源数据库用户名
canal.instance.dbUsername=canal
//连接源数据库密码
canal.instance.dbPassword=canal

4.3 编辑完,保存退出

5,编辑 canal 的配置文件

cd ..
vim canal.properties

5.1 加入新加的实例,已逗号分割

canal.destinations = example

6,部署客户端

这里客户端可以根据 canal 的 api 文档自行开发。

这里贴一些关键代码

{protected final static Logger logger = LoggerFactory.getLogger(CanalClientApplication.class);private static String ADDRESS = ConfigUtils.getConfigValue("application.properties", "canal.address");private static int PORT = Integer.parseInt(ConfigUtils.getConfigValue("application.properties", "canal.port"));private static String DESTINATION = ConfigUtils.getConfigValue("application.properties", "canal.destination");private static String USERNAME = ConfigUtils.getConfigValue("application.properties", "canal.username");private static String PASSWORD = ConfigUtils.getConfigValue("application.properties", "canal.password");private static String SUBSCRIBER = ConfigUtils.getConfigValue("application.properties", "canal.subscriber");public static void main(String args[]) {SpringApplication.run(CanalClientApplication.class,args);System.out.println("数据同步服务启动成功");// 创建链接logger.info("Trying to connect to " + ADDRESS + ":" + PORT);CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ADDRESS,PORT), DESTINATION, USERNAME, PASSWORD);int batchSize = 1000;try {logger.info("...");connector.connect();logger.info("connected");connector.subscribe(SUBSCRIBER);connector.rollback();logger.info("CanalClient Application started successfully!");while (true) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();logger.info("当前 message 信息为:{}",message);if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {DataProcessor.process(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}} catch (Exception e) {e.printStackTrace();logger.error("Canal Client exit with error.", e);System.exit(-2);} finally {connector.disconnect();}}}
{protected final static Logger logger = LoggerFactory.getLogger(DataProcessor.class);private static String DATABASE = ConfigUtils.getConfigValue("application.properties", "canal.database");private static String TABLE = ConfigUtils.getConfigValue("application.properties", "canal.table");private static String OPERATOR = ConfigUtils.getConfigValue("application.properties", "canal.operator");private static String CANAL_OUTPUT = ConfigUtils.getConfigValue("application.properties", "canal.output");private static DorisUtil dorisUtil;private static MySQLUtil mySQLUtil;public static void process(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();if (eventType == CanalEntry.EventType.TRUNCATE && OPERATOR.contains("TRUNCATE")) {if (StringUtils.isEmpty(DATABASE) ||(entry.getHeader().getSchemaName()!=null && isContain(DATABASE.split(","),entry.getHeader().getSchemaName()))) {if (StringUtils.isEmpty(TABLE) ||(entry.getHeader().getTableName() != null && isContain(TABLE.split(","), entry.getHeader().getTableName()))) {logger.info("TRUNCATE TABLE " + entry.getHeader().getTableName());if (CANAL_OUTPUT.contains("mysql")) {mySQLUtil = MySQLUtil.getInstance();try {mySQLUtil.mySQLTruncate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName());} catch (SQLException e) {e.printStackTrace();logger.error("MySQL执行同步truncate出错,dataBase:" + entry.getHeader().getSchemaName() + ",table:" + entry.getHeader().getTableName());}}if (CANAL_OUTPUT.contains("doris")) {dorisUtil = DorisUtil.getInstance();try {dorisUtil.dorisTruncate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName());} catch (SQLException e) {e.printStackTrace();logger.error("Doris执行同步truncate出错,dataBase:" + entry.getHeader().getSchemaName() + ",table:" + entry.getHeader().getTableName());}}}}}for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {// 过滤database, table, operatorif (StringUtils.isEmpty(DATABASE) ||(entry.getHeader().getSchemaName()!=null && isContain(DATABASE.split(","),entry.getHeader().getSchemaName()))) {if (StringUtils.isEmpty(TABLE) ||(entry.getHeader().getTableName()!=null && isContain(TABLE.split(","),entry.getHeader().getTableName()))) {if (CANAL_OUTPUT.contains("mysql")) {mySQLUtil = MySQLUtil.getInstance();try {if (eventType == CanalEntry.EventType.DELETE && OPERATOR.contains("DELETE")) {mySQLUtil.mySQLDelete(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT && OPERATOR.contains("INSERT")) {mySQLUtil.mySQLInsert(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getAfterColumnsList());} else if (eventType == CanalEntry.EventType.UPDATE && OPERATOR.contains("UPDATE")) {mySQLUtil.mySQLUpdate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());} else {// nothing to do}} catch (SQLException e) {logger.error("MySQL执行同步" + eventType + "出错,dataBase:"+entry.getHeader().getSchemaName()+",table:"+entry.getHeader().getTableName(), e);}}if (CANAL_OUTPUT.contains("doris")) {dorisUtil = DorisUtil.getInstance();try {if (eventType == CanalEntry.EventType.DELETE && OPERATOR.contains("DELETE")) {dorisUtil.dorisDelete(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT && OPERATOR.contains("INSERT")) {dorisUtil.dorisInsert(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getAfterColumnsList());} else if (eventType == CanalEntry.EventType.UPDATE && OPERATOR.contains("UPDATE")) {dorisUtil.dorisUpdate(entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());} else {// nothing to do}} catch (SQLException e) {logger.error("MySQL执行同步" + eventType + "出错,dataBase:"+entry.getHeader().getSchemaName()+",table:"+entry.getHeader().getTableName(), e);}}}}}}}public static boolean isContain(String[] list, String value) {if (list == null || value == null) return false;for (String lv : list) {if (value.trim().equals(lv.trim())) {return true;}}return false;}private static void printColumn(String database, String table, List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {logger.info(database + "-" + table + "-" + column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}}

7,启动 canal 服务端

在 canal 根目录下,执行如下命令

./bin/startup.sh 

8,启动 canal 客户端

因为我用的 jar,所以,启动 jar 包就行了。

9,待完成事项

1,doris 官方文档上有通过 binLog 同步数据到 doris 中的方法,这部分待实现。

2,当前客户端写法单一。一旦canal 服务端重启,应用自动停机。待优化。

相关文章:

Doris(六)--通过 Canal 同步数据到 Doris 中

pre 开启 mysql Binlog 网上有众多方法&#xff0c;自行百度。 查询是否成功&#xff0c;在 mysql 客户端输入 show BINARY LOGS; 出现如下提示&#xff0c;即表示 big log 正常开启。 1&#xff0c;下载 canal 服务端 传送门 注意&#xff1a;下载 canal.deployer-xxx …...

快手Java一面,全是基础

现在已经到了面试招聘比较火热的时候&#xff0c;准备面试的过程中&#xff0c;一定要多看面经&#xff0c;多自测&#xff01; 今天分享的是一位贵州大学的同学分享的快手一面面经。 快手一面主要会问一些基础问题&#xff0c;也就是比较简单且容易准备的常规八股&#xff0…...

未来芯片设计领域的药明康德——青芯如何在N个项目间游走平衡

总部位于上海张江的青芯半导体&#xff08;CyanSemi&#xff09;&#xff0c;ASIC定制设计是其核心业务之一。 青芯在单纯的设计服务维度之上&#xff0c;打造了从设计到生产的一套完整ASIC定制业务&#xff0c;不仅做芯片设计&#xff0c;还提供封装、测试服务&#xff0c;也…...

【跟小嘉学 Rust 编程】十九、高级特性

系列文章目录 【跟小嘉学 Rust 编程】一、Rust 编程基础 【跟小嘉学 Rust 编程】二、Rust 包管理工具使用 【跟小嘉学 Rust 编程】三、Rust 的基本程序概念 【跟小嘉学 Rust 编程】四、理解 Rust 的所有权概念 【跟小嘉学 Rust 编程】五、使用结构体关联结构化数据 【跟小嘉学…...

pandas由入门到精通-数据清洗-缺失值处理

pandas-02-数据清洗&预处理 A.缺失值处理1. Pandas缺失值判断2. 缺失值过滤2.1 Series.dropna()2.2 DataFrame.dropna()3. 缺失值填充3.1 值填充3.2 向前/向后填充文中用S代指Series,用Df代指DataFrame 数据清洗是处理大型复杂情况数据必不可少的步骤,这里总结一些数据清…...

Redis 教程 - 主从复制

Redis 教程 - 主从复制 Redis 支持主从复制&#xff08;Master-Slave Replication&#xff09;&#xff0c;通过主从复制可以将一个 Redis 服务器&#xff08;主节点&#xff09;的数据复制到其他 Redis 服务器&#xff08;从节点&#xff09;&#xff0c;以实现数据的冗余备份…...

[递归] 子集 全排列和组合问题

1.1 子集I 思路可以简单概括为 二叉树&#xff0c;每一次分叉要么选择一个元素&#xff0c;要么选择空&#xff0c;总共有n次&#xff0c;因此到n1进行保存结果&#xff0c;返回。像这样&#xff1a; #include <cstdio> #include <vector> #include <algorithm&…...

ELK安装、部署、调试(四)KAFKA消息队列的安装和部署

1.简介 Kafka是一种高吞吐量的分布式发布订阅消息系统&#xff0c;它可以处理消费者在网站中的所有动作流数据。 这种动作&#xff08;网页浏览&#xff0c;搜索和其他用户的行动&#xff09;是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通…...

半导体晶片机器视觉测量及MARK点视觉定位

半导体晶片机器视觉测量及MARK点视觉定位 客户的需求: 检测内容&#xff1a; SMT行业晶片位置角度与PCB板Mark点位置的测试测量 检测要求&#xff1a; 精度0.04mm&#xff0c;移动速度100mm/s 视觉可行性分析: 对样品进行了光学实验&#xff0c;并进行图像处理&#xff0c…...

ranger无法同步用户问题解决

1.首先就是定位日志,日志目录 cd /var/log/ranger/usersync 定位到问题报错如下: LdapDeltaUserGroupBuilder.getUsers() failed with exception:java.naming.AuthticationExceptiom :[LDAP:error code 49 - Invalid Credentials]:remaing name ‘ouPeople,dc*.dccom’ 解决办法…...

使用通信顺序进程(CSP)模型的 Go 语言通道

在并发编程中&#xff0c;许多编程语言采用共享内存/状态模型。然而&#xff0c;Go 通过实现 通信顺序进程&#xff08;CSP&#xff09;模型来区别于众多。在CSP中&#xff0c;程序由不共享状态的并行进程组成&#xff1b;相反&#xff0c;它们通过通道进行通信和同步操作。因此…...

VPN网关

阿里云VPN网关(VPN Gateway&#xff0c;简称VPN)是一款基于Internet&#xff0c;通过加密通道将企业数据中心、办公网或终端与专有网络(VPC) 安全可靠连接起来的服务。 VPN网关提供IPsec-VPN和SSL-VPN两种。 网络连接方式应用场景IPsec-VPN支持在企业本地数据中心、企业办公网…...

产品展示视频制作的要点

制作产品展示视频时通过精心策划的视频剧本和拍摄手法&#xff0c;可以准确地呈现活动的目的、主题和特点&#xff0c;让观众更好地理解和认同活动的意义。深圳产品活动视频制作公司老友记小编还为您整理了以下一些重要的制作要点&#xff1a; 1.明确目标受众&#xff1a;了解你…...

appium+python自动化测试

获取APP的包名 1、aapt即Android Asset Packaging Tool&#xff0c;在SDK的build-tools目录下。该工具可以查看apk包名和launcherActivity 2、在android-sdk里面双击SDK-manager,下载buidl-tools 3、勾选build-tools&#xff0c;随便选一个版本&#xff0c;我这里选的是24的版…...

【AI辅助办公】PDF转PPT,移除水印

PDF转PPT 将PDF上传链接即可转换成PPT。​​​​​​ ​​​​​​​ https://www.camscanner.com/pdftoppthttps://www.camscanner.com/pdftoppt​​​​​​​​​​​​​​移除水印 第一步&#xff1a;打开视图-宏 第二步&#xff1a;输入宏名&#xff08;可以是人以文字…...

ssm农业视频实时发布管理系统源码

ssm农业视频实时发布管理系统源码108 开发工具&#xff1a;idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 技术&#xff1a;ssm package com.controller;import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; impo…...

【100天精通python】Day48:python Web开发_WSGI接口与使用

目录 1 WSGI接口 1.1 CGI 简介 1.2 WSGI 简介 1.3 定义 WSGI 接口 1.3.1 应用程序&#xff08;Application&#xff09; 1.3.2 服务器&#xff08;Server&#xff09; 1.4 WSGI 接口的使用示例 1.5 WSGI接口的优势 1 WSGI接口 上一节实现了静态服务器&#xff0c;但是当…...

Understanding Lockup Cells

工具会分析扫描链和EDT逻辑之间的控制时序元素的时钟的时序关系,当必须要同步时钟并保持数据完整性时插入边沿触发寄存器(lockup cells)。 可以使用report_edt_lockup_cells命令来展示工具已经插入的lockup cells的详细报告。 Lockup Cell Insertion 工具会分析控制时序元…...

javaCV实现java图片ocr提取文字效果

引入依赖&#xff1a; <dependency><groupId>org.bytedeco</groupId><artifactId>javacv-platform</artifactId><version>1.5.5</version></dependency> 引入中文语言训练数据集&#xff1a;chi_sim GitHub - tesseract-ocr…...

七牛云OSS存储

前言: 七牛云的存储项目的附件,需要开发一套七牛云的工具类,可以使用该工具类进行七牛云服务器进行文件的上传与下载操作; 七牛云的文档学习: 相关的依赖项的配置: <dependency><groupId>com.amazonaws</groupId><artifactId>aws-java-sdk-s3…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程&#xff0c;然后使用强化学习的Actor-Critic机制&#xff08;中文译作“知行互动”机制&#xff09;&#xff0c;逐步迭代求解…...

系统设计 --- MongoDB亿级数据查询优化策略

系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log&#xff0c;共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题&#xff0c;不能使用ELK只能使用…...

五年级数学知识边界总结思考-下册

目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解&#xff1a;由来、作用与意义**一、知识点核心内容****二、知识点的由来&#xff1a;从生活实践到数学抽象****三、知识的作用&#xff1a;解决实际问题的工具****四、学习的意义&#xff1a;培养核心素养…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

Web 架构之 CDN 加速原理与落地实践

文章目录 一、思维导图二、正文内容&#xff08;一&#xff09;CDN 基础概念1. 定义2. 组成部分 &#xff08;二&#xff09;CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 &#xff08;三&#xff09;CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 &#xf…...

Java求职者面试指南:计算机基础与源码原理深度解析

Java求职者面试指南&#xff1a;计算机基础与源码原理深度解析 第一轮提问&#xff1a;基础概念问题 1. 请解释什么是进程和线程的区别&#xff1f; 面试官&#xff1a;进程是程序的一次执行过程&#xff0c;是系统进行资源分配和调度的基本单位&#xff1b;而线程是进程中的…...

Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)

引言 工欲善其事&#xff0c;必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后&#xff0c;我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集&#xff0c;就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...

一些实用的chrome扩展0x01

简介 浏览器扩展程序有助于自动化任务、查找隐藏的漏洞、隐藏自身痕迹。以下列出了一些必备扩展程序&#xff0c;无论是测试应用程序、搜寻漏洞还是收集情报&#xff0c;它们都能提升工作流程。 FoxyProxy 代理管理工具&#xff0c;此扩展简化了使用代理&#xff08;如 Burp…...

何谓AI编程【02】AI编程官网以优雅草星云智控为例建设实践-完善顶部-建立各项子页-调整排版-优雅草卓伊凡

何谓AI编程【02】AI编程官网以优雅草星云智控为例建设实践-完善顶部-建立各项子页-调整排版-优雅草卓伊凡 背景 我们以建设星云智控官网来做AI编程实践&#xff0c;很多人以为AI已经强大到不需要程序员了&#xff0c;其实不是&#xff0c;AI更加需要程序员&#xff0c;普通人…...

DeepSeek越强,Kimi越慌?

被DeepSeek吊打的Kimi&#xff0c;还有多少人在用&#xff1f; 去年&#xff0c;月之暗面创始人杨植麟别提有多风光了。90后清华学霸&#xff0c;国产大模型六小虎之一&#xff0c;手握十几亿美金的融资。旗下的AI助手Kimi烧钱如流水&#xff0c;单月光是投流就花费2个亿。 疯…...