当前位置: 首页 > news >正文

flink-cdc同步数据到doris中

1 创建数据库和表

1.1 数据库脚本

这样直接创建数据库是有问题,因为后面发现superset连接使用doris://root:123456@10.101.12.82:9030/internal.eayc?charset=utf8mb4

-- 创建数据库eayc
create database if not exists ods_eayc;
-- 创建数据表

1

2 数据同步

2.1 flnk-cdc

参考Flink CDC实时同步MySQL到Doris
Flink CDC 概述

2.1.1 最简单的单表同步

从下面的yml脚本可以看到,并没有doris中创建eayc_user表,应该是flink-cdc自动创建的。

#Mysql的参数配置
source:type: mysqlhostname: 10.101.10.11port: 3306username: flinkpassword: 123456tables: eayc.eayc_userserver-id: 5400# server-time-zone: UTC
#Doris的参数配置
sink:type: dorisfenodes: 10.101.11.2:8030,10.101.11.2:8030,10.101.11.3:8030username: rootpassword: 123456table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: eayc.eayc_usersink-table: ods_eayc.eayc_user
pipeline:name: eayc to dorisparallelism: 1

注意连接mysql的server-id的要唯一,否则提示下面的错误

A slave with the same server_uuid/server_id as this slave has connected to the master...
The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.

进入到flink的界面查看到错误日志,任务执行失败。下面报的错是mysql时区与flink配置不匹配。现在改生产库影响未知,不敢动,于是去掉server-time-zone: UTC设置。重新执行任务。
1

1
此时任务可以正常执行了,数据也可以正常过来了。因为flink-cdc是根据binlog,因此mysql变更,doris中的数据也实时更新过来。
1

2.1.2 多表同步

如下配置

source:tables: eayc.eayc_user,eayc.eayc_company,eayc.eayc_company_user
route:- source-table: eayc.eayc_usersink-table: ods_eayc.eayc_user- source-table: eayc.eayc_companysink-table: ods_eayc.eayc_company- source-table: eayc.eayc_company_usersink-table: ods_eayc.eayc_company_user

下面这种方式不支持,会报下面的错误:

Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Array value (token `JsonToken.START_ARRAY`)at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: java.util.LinkedHashMap["tables"])

1

2.1.3 分表导入

taskmanager.numberOfTaskSlots默认为1,slot不够,就报下面的错误,因为是16C32G,于是我改成了8,parallelism.default默认也是1,我也改成了8,启动之后,没有报下面的错误,但是之前执行的任务没有了。

2025-02-19 15:05:07
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.at 

如果mysql的表没有主键,则报下面的错误,这个时候就需要修正原mysql表数据。

Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.

doris权限问题,这个是FE集群有问题,更改过来就好了。

reason: SchemaChange request error with Failed to schemaChange, response: {"msg":"Unauthorized","code":401,"data":"Access denied for user 'root@10.101.12.90' (using password: YES)","count":0}

可以看到下面,要获取acc的全部表,但是有一些是做了分表,需合并到其中doris的一张表里面,这个规则是有效的,开始parallelism: 1,我以为有一异常,只同步了一张表,过了几分钟才发现其他表也陆续进来。

source:tables: acc.\.*
route:- source-table: acc.acc_account_balance_\.*sink-table: acc.acc_account_balance- source-table: acc.acc_account_subject_\.*sink-table: acc.acc_account_subject- source-table: acc.acc_initial_balance_\.*sink-table: acc.acc_initial_balance- source-table: acc.acc_voucher_\.*sink-table: acc.acc_voucher- source-table: acc.acc_voucher_entry_\.*sink-table: acc.acc_voucher_entry    

于是将parallelism: 4,很快后台又抛异常。

java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

于是调整

taskmanager.memory.process.size: 8192m  # 增加 TaskManager 的内存

Flink CDC并行执行,会出现数据越界的问题。
Flink CDC报错ArrayIndexOutOfBoundsException解决思路

2.2 flink安装

2.2.1 单节点
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
# 配置环境变量
vi /etc/profile
export JAVA_HOME=/appdata/jdk1.8.0_181
export CLASSPATH=$JAVA_HOME/lib
export FLINK_HOME=/appdata/flink/flink-1.18.0
export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH
# 生效
source /etc/profile
# flink配置
vim conf/flink-conf.yaml
execution.checkpointing.interval: 3000
rest.bind-address: 0.0.0.0
cd bin
./start-cluster.sh
#
tar -zxvf flink-cdc-3.0.0-bin.tar.gz
# 执行任务
cd /appdata/flink/flink-cdc-3.0.0
bash bin/flink-cdc.sh /appdata/flink/job/eayc_to_doris.yml

flink-1.18.0
flink-cdc-3.0.0
mysql pipeline connector 3.0.0
doris pipeline connector 3.0.0
将上面两个connector放到cdc的lib目录
1

2.2.2 监控

1

1

相关文章:

flink-cdc同步数据到doris中

1 创建数据库和表 1.1 数据库脚本 这样直接创建数据库是有问题,因为后面发现superset连接使用doris://root:12345610.101.12.82:9030/internal.eayc?charsetutf8mb4 -- 创建数据库eayc create database if not exists ods_eayc; -- 创建数据表2 数据同步 2.1 f…...

Kubernetes:EKS 中 Istio Ingress Gateway 负载均衡器配置及常见问题解析

引言 在云原生时代,Kubernetes 已经成为容器编排的事实标准。AWS EKS (Elastic Kubernetes Service) 作为一项完全托管的 Kubernetes 服务,简化了在 AWS 上运行 Kubernetes 的复杂性。Istio 作为服务网格领域的佼佼者,为微服务提供了流量管理…...

Golang教程

1. go 环境与命令 1.1 go 环境搭建 SDK 安装 Go 官网:golang.orgGo 中文社区:https://studygolang.com/dlGo API文档:https/golang.org 或 https://studygolang.com/pkgdoc 目录 api :api 存放bin:go命令src&#…...

AI 百炼成神:线性回归,预测房价

我们开始第一个项目——线性回归:预测房价。这是一个经典的机器学习入门项目,可以帮助你理解如何使用线性回归模型来预测连续的数值。 第一个项目:线性回归预测房价 项目目标 学习线性回归的基本概念。使用历史房价数据建立一个预测模型。理解如何评估模型的性能。项目步骤…...

企业软件合规性管理:构建高效、安全的软件资产生态

引言 在数字化转型的浪潮下,企业的软件使用方式日益多元化,涉及云端、订阅制、永久授权及浮动许可等多种模式。然而,随着软件资产的增多,企业面临着合规性管理的严峻挑战:非法软件使用、许可证管理不当、软件资产闲置…...

每日一题——编辑距离

编辑距离 参考资料题目描述示例 解题思路动态规划(DP)方法 代码实现复杂度分析示例详解示例1:"nowcoder" → "new"示例2:"intention" → "execution" 总结与心得 参考资料 建议先参考下…...

TensorFlow项目GPU运行 安装步骤

以下是在 Linux 系统 下搭建完整 GPU 加速环境的详细流程(适配 CUDA 11.2 和 Python 3.9): 1. 前置检查 1.1 验证 NVIDIA 驱动 # 检查驱动版本(需 ≥ 450.80.02) nvidia-smi 输出示例: CUDA Version: 11.2…...

c++进阶———继承

1.引言 在一些大的项目中,我们可能要重复定义一些类,但是很麻烦,应该怎么办呢?举个简单的例子,我要做一个全校师生统计表,统计学号,教师编号,姓名,年龄,电话…...

FreeSwitch的mod_translate模块详细,附带场景案例及代码示例

mod_translate 模块详细介绍 mod_translate 是 FreeSWITCH 中的一个拨号计划应用程序模块,用于对电话号码或字符串进行格式转换和翻译。它可以根据预定义的规则对输入的内容进行匹配和转换,常用于号码格式化、路由选择、号码屏蔽等场景。 主要功能 号码…...

前端504错误分析

前端出现504错误(网关超时)通常是由于代理服务器未能及时从上游服务获取响应。以下是详细分析步骤和解决方案: 1. 确认错误来源 504含义:代理服务器(如Nginx、Apache)在等待后端服务响应时超时。常见架构:前端 → 代理服务器 → 后端服务,问题通常出在代理与后端之间。…...

在 .NET 8/9 中使用 AppUser 进行 JWT 令牌身份验证

文章目录 一、引言二、什么是 JSON Web 令牌?三、什么是 JSON Web 令牌结构?四、设置 JWT 令牌身份验证4.1 创建新的 .NET 8 Web API 项目4.2 安装所需的 NuGet 软件包4.3 创建 JWT 配置模型4.4 将 JWT 配置添加到您的 appsettings.json 中4.5 为 Config…...

基于python实现机器学习的心脏病预测系统

以下是一个基于 Python 实现的简单心脏病预测系统代码示例,我们将使用 Scikit - learn 库中的机器学习算法(这里以逻辑回归为例),并使用公开的心脏病数据集。 步骤: 数据加载与预处理:加载心脏病数据集&a…...

使用 NVM 随意切换 Node.js 版本

安装nvm https://github.com/coreybutler/nvm-windows/releases nvm安装详细教程(卸载旧的nodejs,安装nvm、node、npm、cnpm、yarn及环境变量配置)-CSDN博客 验证 NVM 是否安装成功-查看版本 nvm --version安装指定版本的 Node.js nvm i…...

【Prometheus】prometheus结合pushgateway实现脚本运行状态监控

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全…...

SpringBoot 项目配置动态数据源

目录 一、前言二、操作1、引入依赖2、配置默认数据库 13、定义数据源实体和 Repository4、定义动态数据源5、配置数据源6、定义切换数据源注解7、定义切面类8、使用注解切换数据源 一、前言 通过切面注解方式根据不同业务动态切换数据库 二、操作 1、引入依赖 <dependen…...

CSS基本选择器

1. 通配选择器 作用&#xff1a;可以选中所有的 HTML 元素。 语法&#xff1a; * { 属性名: 属性值; } 举例&#xff1a; <!DOCTYPE html> <html lang"zh-cn"> <head><meta charset"UTF-8"><meta name"viewport" …...

idea-代码补全快捷键

文章目录 前言idea-代码补全快捷键1. 基本补全2. 类型匹配补全3. 后缀补全4. 代码补全 前言 如果您觉得有用的话&#xff0c;记得给博主点个赞&#xff0c;评论&#xff0c;收藏一键三连啊&#xff0c;写作不易啊^ _ ^。   而且听说点赞的人每天的运气都不会太差&#xff0c;…...

基于SpringBoot+vue粮油商城小程序系统

粮油商城小程序为用户提供方便快捷的在线购物体验&#xff0c;包括大米、面粉、食用油、调味品等各种粮油产品的选购&#xff0c;用户可以浏览商品详情、对比价格、下单支付等操作。同时&#xff0c;商城还提供优惠活动、积分兑换等福利&#xff0c;让用户享受到更多实惠和便利…...

挪车小程序挪车二维码php+uniapp

一款基于FastAdminThinkPHP开发的匿名通知车主挪车微信小程序&#xff0c;采用匿名通话的方式&#xff0c;用户只能在有效期内拨打车主电话&#xff0c;过期失效&#xff0c;从而保护车主和用户隐私。提供微信小程序端和服务端源码&#xff0c;支持私有化部署。 更新日志 V1.0…...

企业内部知识库:安全协作打造企业智慧运营基石

内容概要 作为企业智慧运营的核心载体&#xff0c;企业内部知识库通过结构化的信息聚合与动态化的知识流动&#xff0c;为组织提供了从数据沉淀到价值转化的系统性框架。其底层架构以权限管理为核心&#xff0c;依托数据加密技术构建多层级访问控制机制&#xff0c;确保敏感信…...

利用ngx_stream_return_module构建简易 TCP/UDP 响应网关

一、模块概述 ngx_stream_return_module 提供了一个极简的指令&#xff1a; return <value>;在收到客户端连接后&#xff0c;立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量&#xff08;如 $time_iso8601、$remote_addr 等&#xff09;&a…...

Java 8 Stream API 入门到实践详解

一、告别 for 循环&#xff01; 传统痛点&#xff1a; Java 8 之前&#xff0c;集合操作离不开冗长的 for 循环和匿名类。例如&#xff0c;过滤列表中的偶数&#xff1a; List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...

vscode(仍待补充)

写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh&#xff1f; debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时&#xff0c;可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案&#xff1a; 1. 检查电源供电问题 问题原因&#xff1a;多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现

摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序&#xff0c;以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务&#xff0c;提供稳定高效的数据处理与业务逻辑支持&#xff1b;利用 uniapp 实现跨平台前…...

CMake 从 GitHub 下载第三方库并使用

有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

MySQL用户和授权

开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务&#xff1a; test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

华硕a豆14 Air香氛版,美学与科技的馨香融合

在快节奏的现代生活中&#xff0c;我们渴望一个能激发创想、愉悦感官的工作与生活伙伴&#xff0c;它不仅是冰冷的科技工具&#xff0c;更能触动我们内心深处的细腻情感。正是在这样的期许下&#xff0c;华硕a豆14 Air香氛版翩然而至&#xff0c;它以一种前所未有的方式&#x…...

C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)

名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...

【JVM】Java虚拟机(二)——垃圾回收

目录 一、如何判断对象可以回收 &#xff08;一&#xff09;引用计数法 &#xff08;二&#xff09;可达性分析算法 二、垃圾回收算法 &#xff08;一&#xff09;标记清除 &#xff08;二&#xff09;标记整理 &#xff08;三&#xff09;复制 &#xff08;四&#xff…...