当前位置: 首页 > news >正文

flink-cdc同步数据到doris中

1 创建数据库和表

1.1 数据库脚本

这样直接创建数据库是有问题,因为后面发现superset连接使用doris://root:123456@10.101.12.82:9030/internal.eayc?charset=utf8mb4

-- 创建数据库eayc
create database if not exists ods_eayc;
-- 创建数据表

1

2 数据同步

2.1 flnk-cdc

参考Flink CDC实时同步MySQL到Doris
Flink CDC 概述

2.1.1 最简单的单表同步

从下面的yml脚本可以看到,并没有doris中创建eayc_user表,应该是flink-cdc自动创建的。

#Mysql的参数配置
source:type: mysqlhostname: 10.101.10.11port: 3306username: flinkpassword: 123456tables: eayc.eayc_userserver-id: 5400# server-time-zone: UTC
#Doris的参数配置
sink:type: dorisfenodes: 10.101.11.2:8030,10.101.11.2:8030,10.101.11.3:8030username: rootpassword: 123456table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: eayc.eayc_usersink-table: ods_eayc.eayc_user
pipeline:name: eayc to dorisparallelism: 1

注意连接mysql的server-id的要唯一,否则提示下面的错误

A slave with the same server_uuid/server_id as this slave has connected to the master...
The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.

进入到flink的界面查看到错误日志,任务执行失败。下面报的错是mysql时区与flink配置不匹配。现在改生产库影响未知,不敢动,于是去掉server-time-zone: UTC设置。重新执行任务。
1

1
此时任务可以正常执行了,数据也可以正常过来了。因为flink-cdc是根据binlog,因此mysql变更,doris中的数据也实时更新过来。
1

2.1.2 多表同步

如下配置

source:tables: eayc.eayc_user,eayc.eayc_company,eayc.eayc_company_user
route:- source-table: eayc.eayc_usersink-table: ods_eayc.eayc_user- source-table: eayc.eayc_companysink-table: ods_eayc.eayc_company- source-table: eayc.eayc_company_usersink-table: ods_eayc.eayc_company_user

下面这种方式不支持,会报下面的错误:

Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Array value (token `JsonToken.START_ARRAY`)at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: java.util.LinkedHashMap["tables"])

1

2.1.3 分表导入

taskmanager.numberOfTaskSlots默认为1,slot不够,就报下面的错误,因为是16C32G,于是我改成了8,parallelism.default默认也是1,我也改成了8,启动之后,没有报下面的错误,但是之前执行的任务没有了。

2025-02-19 15:05:07
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.at 

如果mysql的表没有主键,则报下面的错误,这个时候就需要修正原mysql表数据。

Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.

doris权限问题,这个是FE集群有问题,更改过来就好了。

reason: SchemaChange request error with Failed to schemaChange, response: {"msg":"Unauthorized","code":401,"data":"Access denied for user 'root@10.101.12.90' (using password: YES)","count":0}

可以看到下面,要获取acc的全部表,但是有一些是做了分表,需合并到其中doris的一张表里面,这个规则是有效的,开始parallelism: 1,我以为有一异常,只同步了一张表,过了几分钟才发现其他表也陆续进来。

source:tables: acc.\.*
route:- source-table: acc.acc_account_balance_\.*sink-table: acc.acc_account_balance- source-table: acc.acc_account_subject_\.*sink-table: acc.acc_account_subject- source-table: acc.acc_initial_balance_\.*sink-table: acc.acc_initial_balance- source-table: acc.acc_voucher_\.*sink-table: acc.acc_voucher- source-table: acc.acc_voucher_entry_\.*sink-table: acc.acc_voucher_entry    

于是将parallelism: 4,很快后台又抛异常。

java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

于是调整

taskmanager.memory.process.size: 8192m  # 增加 TaskManager 的内存

Flink CDC并行执行,会出现数据越界的问题。
Flink CDC报错ArrayIndexOutOfBoundsException解决思路

2.2 flink安装

2.2.1 单节点
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
# 配置环境变量
vi /etc/profile
export JAVA_HOME=/appdata/jdk1.8.0_181
export CLASSPATH=$JAVA_HOME/lib
export FLINK_HOME=/appdata/flink/flink-1.18.0
export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH
# 生效
source /etc/profile
# flink配置
vim conf/flink-conf.yaml
execution.checkpointing.interval: 3000
rest.bind-address: 0.0.0.0
cd bin
./start-cluster.sh
#
tar -zxvf flink-cdc-3.0.0-bin.tar.gz
# 执行任务
cd /appdata/flink/flink-cdc-3.0.0
bash bin/flink-cdc.sh /appdata/flink/job/eayc_to_doris.yml

flink-1.18.0
flink-cdc-3.0.0
mysql pipeline connector 3.0.0
doris pipeline connector 3.0.0
将上面两个connector放到cdc的lib目录
1

2.2.2 监控

1

1

相关文章:

flink-cdc同步数据到doris中

1 创建数据库和表 1.1 数据库脚本 这样直接创建数据库是有问题,因为后面发现superset连接使用doris://root:12345610.101.12.82:9030/internal.eayc?charsetutf8mb4 -- 创建数据库eayc create database if not exists ods_eayc; -- 创建数据表2 数据同步 2.1 f…...

Kubernetes:EKS 中 Istio Ingress Gateway 负载均衡器配置及常见问题解析

引言 在云原生时代,Kubernetes 已经成为容器编排的事实标准。AWS EKS (Elastic Kubernetes Service) 作为一项完全托管的 Kubernetes 服务,简化了在 AWS 上运行 Kubernetes 的复杂性。Istio 作为服务网格领域的佼佼者,为微服务提供了流量管理…...

Golang教程

1. go 环境与命令 1.1 go 环境搭建 SDK 安装 Go 官网:golang.orgGo 中文社区:https://studygolang.com/dlGo API文档:https/golang.org 或 https://studygolang.com/pkgdoc 目录 api :api 存放bin:go命令src&#…...

AI 百炼成神:线性回归,预测房价

我们开始第一个项目——线性回归:预测房价。这是一个经典的机器学习入门项目,可以帮助你理解如何使用线性回归模型来预测连续的数值。 第一个项目:线性回归预测房价 项目目标 学习线性回归的基本概念。使用历史房价数据建立一个预测模型。理解如何评估模型的性能。项目步骤…...

企业软件合规性管理:构建高效、安全的软件资产生态

引言 在数字化转型的浪潮下,企业的软件使用方式日益多元化,涉及云端、订阅制、永久授权及浮动许可等多种模式。然而,随着软件资产的增多,企业面临着合规性管理的严峻挑战:非法软件使用、许可证管理不当、软件资产闲置…...

每日一题——编辑距离

编辑距离 参考资料题目描述示例 解题思路动态规划(DP)方法 代码实现复杂度分析示例详解示例1:"nowcoder" → "new"示例2:"intention" → "execution" 总结与心得 参考资料 建议先参考下…...

TensorFlow项目GPU运行 安装步骤

以下是在 Linux 系统 下搭建完整 GPU 加速环境的详细流程(适配 CUDA 11.2 和 Python 3.9): 1. 前置检查 1.1 验证 NVIDIA 驱动 # 检查驱动版本(需 ≥ 450.80.02) nvidia-smi 输出示例: CUDA Version: 11.2…...

c++进阶———继承

1.引言 在一些大的项目中,我们可能要重复定义一些类,但是很麻烦,应该怎么办呢?举个简单的例子,我要做一个全校师生统计表,统计学号,教师编号,姓名,年龄,电话…...

FreeSwitch的mod_translate模块详细,附带场景案例及代码示例

mod_translate 模块详细介绍 mod_translate 是 FreeSWITCH 中的一个拨号计划应用程序模块,用于对电话号码或字符串进行格式转换和翻译。它可以根据预定义的规则对输入的内容进行匹配和转换,常用于号码格式化、路由选择、号码屏蔽等场景。 主要功能 号码…...

前端504错误分析

前端出现504错误(网关超时)通常是由于代理服务器未能及时从上游服务获取响应。以下是详细分析步骤和解决方案: 1. 确认错误来源 504含义:代理服务器(如Nginx、Apache)在等待后端服务响应时超时。常见架构:前端 → 代理服务器 → 后端服务,问题通常出在代理与后端之间。…...

在 .NET 8/9 中使用 AppUser 进行 JWT 令牌身份验证

文章目录 一、引言二、什么是 JSON Web 令牌?三、什么是 JSON Web 令牌结构?四、设置 JWT 令牌身份验证4.1 创建新的 .NET 8 Web API 项目4.2 安装所需的 NuGet 软件包4.3 创建 JWT 配置模型4.4 将 JWT 配置添加到您的 appsettings.json 中4.5 为 Config…...

基于python实现机器学习的心脏病预测系统

以下是一个基于 Python 实现的简单心脏病预测系统代码示例,我们将使用 Scikit - learn 库中的机器学习算法(这里以逻辑回归为例),并使用公开的心脏病数据集。 步骤: 数据加载与预处理:加载心脏病数据集&a…...

使用 NVM 随意切换 Node.js 版本

安装nvm https://github.com/coreybutler/nvm-windows/releases nvm安装详细教程(卸载旧的nodejs,安装nvm、node、npm、cnpm、yarn及环境变量配置)-CSDN博客 验证 NVM 是否安装成功-查看版本 nvm --version安装指定版本的 Node.js nvm i…...

【Prometheus】prometheus结合pushgateway实现脚本运行状态监控

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全…...

SpringBoot 项目配置动态数据源

目录 一、前言二、操作1、引入依赖2、配置默认数据库 13、定义数据源实体和 Repository4、定义动态数据源5、配置数据源6、定义切换数据源注解7、定义切面类8、使用注解切换数据源 一、前言 通过切面注解方式根据不同业务动态切换数据库 二、操作 1、引入依赖 <dependen…...

CSS基本选择器

1. 通配选择器 作用&#xff1a;可以选中所有的 HTML 元素。 语法&#xff1a; * { 属性名: 属性值; } 举例&#xff1a; <!DOCTYPE html> <html lang"zh-cn"> <head><meta charset"UTF-8"><meta name"viewport" …...

idea-代码补全快捷键

文章目录 前言idea-代码补全快捷键1. 基本补全2. 类型匹配补全3. 后缀补全4. 代码补全 前言 如果您觉得有用的话&#xff0c;记得给博主点个赞&#xff0c;评论&#xff0c;收藏一键三连啊&#xff0c;写作不易啊^ _ ^。   而且听说点赞的人每天的运气都不会太差&#xff0c;…...

基于SpringBoot+vue粮油商城小程序系统

粮油商城小程序为用户提供方便快捷的在线购物体验&#xff0c;包括大米、面粉、食用油、调味品等各种粮油产品的选购&#xff0c;用户可以浏览商品详情、对比价格、下单支付等操作。同时&#xff0c;商城还提供优惠活动、积分兑换等福利&#xff0c;让用户享受到更多实惠和便利…...

挪车小程序挪车二维码php+uniapp

一款基于FastAdminThinkPHP开发的匿名通知车主挪车微信小程序&#xff0c;采用匿名通话的方式&#xff0c;用户只能在有效期内拨打车主电话&#xff0c;过期失效&#xff0c;从而保护车主和用户隐私。提供微信小程序端和服务端源码&#xff0c;支持私有化部署。 更新日志 V1.0…...

企业内部知识库:安全协作打造企业智慧运营基石

内容概要 作为企业智慧运营的核心载体&#xff0c;企业内部知识库通过结构化的信息聚合与动态化的知识流动&#xff0c;为组织提供了从数据沉淀到价值转化的系统性框架。其底层架构以权限管理为核心&#xff0c;依托数据加密技术构建多层级访问控制机制&#xff0c;确保敏感信…...

python/java环境配置

环境变量放一起 python&#xff1a; 1.首先下载Python Python下载地址&#xff1a;Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个&#xff0c;然后自定义&#xff0c;全选 可以把前4个选上 3.环境配置 1&#xff09;搜高级系统设置 2…...

postgresql|数据库|只读用户的创建和删除(备忘)

CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...

WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)

一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解&#xff0c;适合用作学习或写简历项目背景说明。 &#x1f9e0; 一、概念简介&#xff1a;Solidity 合约开发 Solidity 是一种专门为 以太坊&#xff08;Ethereum&#xff09;平台编写智能合约的高级编…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具&#xff0c;在大规模数据获取中发挥着关键作用。然而&#xff0c;传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时&#xff0c;常出现数据质…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...

【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的“no matching...“系列算法协商失败问题

【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的"no matching..."系列算法协商失败问题 摘要&#xff1a; 近期&#xff0c;在使用较新版本的OpenSSH客户端连接老旧SSH服务器时&#xff0c;会遇到 "no matching key exchange method found"​, "n…...

【Linux系统】Linux环境变量:系统配置的隐形指挥官

。# Linux系列 文章目录 前言一、环境变量的概念二、常见的环境变量三、环境变量特点及其相关指令3.1 环境变量的全局性3.2、环境变量的生命周期 四、环境变量的组织方式五、C语言对环境变量的操作5.1 设置环境变量&#xff1a;setenv5.2 删除环境变量:unsetenv5.3 遍历所有环境…...

SpringAI实战:ChatModel智能对话全解

一、引言&#xff1a;Spring AI 与 Chat Model 的核心价值 &#x1f680; 在 Java 生态中集成大模型能力&#xff0c;Spring AI 提供了高效的解决方案 &#x1f916;。其中 Chat Model 作为核心交互组件&#xff0c;通过标准化接口简化了与大语言模型&#xff08;LLM&#xff0…...

node.js的初步学习

那什么是node.js呢&#xff1f; 和JavaScript又是什么关系呢&#xff1f; node.js 提供了 JavaScript的运行环境。当JavaScript作为后端开发语言来说&#xff0c; 需要在node.js的环境上进行当JavaScript作为前端开发语言来说&#xff0c;需要在浏览器的环境上进行 Node.js 可…...

ArcGIS Pro+ArcGIS给你的地图加上北回归线!

今天来看ArcGIS Pro和ArcGIS中如何给制作的中国地图或者其他大范围地图加上北回归线。 我们将在ArcGIS Pro和ArcGIS中一同介绍。 1 ArcGIS Pro中设置北回归线 1、在ArcGIS Pro中初步设置好经纬格网等&#xff0c;设置经线、纬线都以10间隔显示。 2、需要插入背会归线&#xf…...