flink-cdc同步数据到doris中
1 创建数据库和表
1.1 数据库脚本
这样直接创建数据库是有问题,因为后面发现superset连接使用doris://root:123456@10.101.12.82:9030/internal.eayc?charset=utf8mb4
-- 创建数据库eayc
create database if not exists ods_eayc;
-- 创建数据表

2 数据同步
2.1 flnk-cdc
参考Flink CDC实时同步MySQL到Doris
Flink CDC 概述
2.1.1 最简单的单表同步
从下面的yml脚本可以看到,并没有doris中创建eayc_user表,应该是flink-cdc自动创建的。
#Mysql的参数配置
source:type: mysqlhostname: 10.101.10.11port: 3306username: flinkpassword: 123456tables: eayc.eayc_userserver-id: 5400# server-time-zone: UTC
#Doris的参数配置
sink:type: dorisfenodes: 10.101.11.2:8030,10.101.11.2:8030,10.101.11.3:8030username: rootpassword: 123456table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: eayc.eayc_usersink-table: ods_eayc.eayc_user
pipeline:name: eayc to dorisparallelism: 1
注意连接mysql的server-id的要唯一,否则提示下面的错误
A slave with the same server_uuid/server_id as this slave has connected to the master...
The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.
进入到flink的界面查看到错误日志,任务执行失败。下面报的错是mysql时区与flink配置不匹配。现在改生产库影响未知,不敢动,于是去掉server-time-zone: UTC设置。重新执行任务。


此时任务可以正常执行了,数据也可以正常过来了。因为flink-cdc是根据binlog,因此mysql变更,doris中的数据也实时更新过来。

2.1.2 多表同步
如下配置
source:tables: eayc.eayc_user,eayc.eayc_company,eayc.eayc_company_user
route:- source-table: eayc.eayc_usersink-table: ods_eayc.eayc_user- source-table: eayc.eayc_companysink-table: ods_eayc.eayc_company- source-table: eayc.eayc_company_usersink-table: ods_eayc.eayc_company_user
下面这种方式不支持,会报下面的错误:
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Array value (token `JsonToken.START_ARRAY`)at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: java.util.LinkedHashMap["tables"])

2.1.3 分表导入
taskmanager.numberOfTaskSlots默认为1,slot不够,就报下面的错误,因为是16C32G,于是我改成了8,parallelism.default默认也是1,我也改成了8,启动之后,没有报下面的错误,但是之前执行的任务没有了。
2025-02-19 15:05:07
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.at
如果mysql的表没有主键,则报下面的错误,这个时候就需要修正原mysql表数据。
Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.
doris权限问题,这个是FE集群有问题,更改过来就好了。
reason: SchemaChange request error with Failed to schemaChange, response: {"msg":"Unauthorized","code":401,"data":"Access denied for user 'root@10.101.12.90' (using password: YES)","count":0}
可以看到下面,要获取acc的全部表,但是有一些是做了分表,需合并到其中doris的一张表里面,这个规则是有效的,开始parallelism: 1,我以为有一异常,只同步了一张表,过了几分钟才发现其他表也陆续进来。
source:tables: acc.\.*
route:- source-table: acc.acc_account_balance_\.*sink-table: acc.acc_account_balance- source-table: acc.acc_account_subject_\.*sink-table: acc.acc_account_subject- source-table: acc.acc_initial_balance_\.*sink-table: acc.acc_initial_balance- source-table: acc.acc_voucher_\.*sink-table: acc.acc_voucher- source-table: acc.acc_voucher_entry_\.*sink-table: acc.acc_voucher_entry
于是将parallelism: 4,很快后台又抛异常。
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
于是调整
taskmanager.memory.process.size: 8192m # 增加 TaskManager 的内存
Flink CDC并行执行,会出现数据越界的问题。
Flink CDC报错ArrayIndexOutOfBoundsException解决思路
2.2 flink安装
2.2.1 单节点
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
# 配置环境变量
vi /etc/profile
export JAVA_HOME=/appdata/jdk1.8.0_181
export CLASSPATH=$JAVA_HOME/lib
export FLINK_HOME=/appdata/flink/flink-1.18.0
export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH
# 生效
source /etc/profile
# flink配置
vim conf/flink-conf.yaml
execution.checkpointing.interval: 3000
rest.bind-address: 0.0.0.0
cd bin
./start-cluster.sh
#
tar -zxvf flink-cdc-3.0.0-bin.tar.gz
# 执行任务
cd /appdata/flink/flink-cdc-3.0.0
bash bin/flink-cdc.sh /appdata/flink/job/eayc_to_doris.yml
flink-1.18.0
flink-cdc-3.0.0
mysql pipeline connector 3.0.0
doris pipeline connector 3.0.0
将上面两个connector放到cdc的lib目录

2.2.2 监控


相关文章:
flink-cdc同步数据到doris中
1 创建数据库和表 1.1 数据库脚本 这样直接创建数据库是有问题,因为后面发现superset连接使用doris://root:12345610.101.12.82:9030/internal.eayc?charsetutf8mb4 -- 创建数据库eayc create database if not exists ods_eayc; -- 创建数据表2 数据同步 2.1 f…...
Kubernetes:EKS 中 Istio Ingress Gateway 负载均衡器配置及常见问题解析
引言 在云原生时代,Kubernetes 已经成为容器编排的事实标准。AWS EKS (Elastic Kubernetes Service) 作为一项完全托管的 Kubernetes 服务,简化了在 AWS 上运行 Kubernetes 的复杂性。Istio 作为服务网格领域的佼佼者,为微服务提供了流量管理…...
Golang教程
1. go 环境与命令 1.1 go 环境搭建 SDK 安装 Go 官网:golang.orgGo 中文社区:https://studygolang.com/dlGo API文档:https/golang.org 或 https://studygolang.com/pkgdoc 目录 api :api 存放bin:go命令src&#…...
AI 百炼成神:线性回归,预测房价
我们开始第一个项目——线性回归:预测房价。这是一个经典的机器学习入门项目,可以帮助你理解如何使用线性回归模型来预测连续的数值。 第一个项目:线性回归预测房价 项目目标 学习线性回归的基本概念。使用历史房价数据建立一个预测模型。理解如何评估模型的性能。项目步骤…...
企业软件合规性管理:构建高效、安全的软件资产生态
引言 在数字化转型的浪潮下,企业的软件使用方式日益多元化,涉及云端、订阅制、永久授权及浮动许可等多种模式。然而,随着软件资产的增多,企业面临着合规性管理的严峻挑战:非法软件使用、许可证管理不当、软件资产闲置…...
每日一题——编辑距离
编辑距离 参考资料题目描述示例 解题思路动态规划(DP)方法 代码实现复杂度分析示例详解示例1:"nowcoder" → "new"示例2:"intention" → "execution" 总结与心得 参考资料 建议先参考下…...
TensorFlow项目GPU运行 安装步骤
以下是在 Linux 系统 下搭建完整 GPU 加速环境的详细流程(适配 CUDA 11.2 和 Python 3.9): 1. 前置检查 1.1 验证 NVIDIA 驱动 # 检查驱动版本(需 ≥ 450.80.02) nvidia-smi 输出示例: CUDA Version: 11.2…...
c++进阶———继承
1.引言 在一些大的项目中,我们可能要重复定义一些类,但是很麻烦,应该怎么办呢?举个简单的例子,我要做一个全校师生统计表,统计学号,教师编号,姓名,年龄,电话…...
FreeSwitch的mod_translate模块详细,附带场景案例及代码示例
mod_translate 模块详细介绍 mod_translate 是 FreeSWITCH 中的一个拨号计划应用程序模块,用于对电话号码或字符串进行格式转换和翻译。它可以根据预定义的规则对输入的内容进行匹配和转换,常用于号码格式化、路由选择、号码屏蔽等场景。 主要功能 号码…...
前端504错误分析
前端出现504错误(网关超时)通常是由于代理服务器未能及时从上游服务获取响应。以下是详细分析步骤和解决方案: 1. 确认错误来源 504含义:代理服务器(如Nginx、Apache)在等待后端服务响应时超时。常见架构:前端 → 代理服务器 → 后端服务,问题通常出在代理与后端之间。…...
在 .NET 8/9 中使用 AppUser 进行 JWT 令牌身份验证
文章目录 一、引言二、什么是 JSON Web 令牌?三、什么是 JSON Web 令牌结构?四、设置 JWT 令牌身份验证4.1 创建新的 .NET 8 Web API 项目4.2 安装所需的 NuGet 软件包4.3 创建 JWT 配置模型4.4 将 JWT 配置添加到您的 appsettings.json 中4.5 为 Config…...
基于python实现机器学习的心脏病预测系统
以下是一个基于 Python 实现的简单心脏病预测系统代码示例,我们将使用 Scikit - learn 库中的机器学习算法(这里以逻辑回归为例),并使用公开的心脏病数据集。 步骤: 数据加载与预处理:加载心脏病数据集&a…...
使用 NVM 随意切换 Node.js 版本
安装nvm https://github.com/coreybutler/nvm-windows/releases nvm安装详细教程(卸载旧的nodejs,安装nvm、node、npm、cnpm、yarn及环境变量配置)-CSDN博客 验证 NVM 是否安装成功-查看版本 nvm --version安装指定版本的 Node.js nvm i…...
【Prometheus】prometheus结合pushgateway实现脚本运行状态监控
✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全…...
SpringBoot 项目配置动态数据源
目录 一、前言二、操作1、引入依赖2、配置默认数据库 13、定义数据源实体和 Repository4、定义动态数据源5、配置数据源6、定义切换数据源注解7、定义切面类8、使用注解切换数据源 一、前言 通过切面注解方式根据不同业务动态切换数据库 二、操作 1、引入依赖 <dependen…...
CSS基本选择器
1. 通配选择器 作用:可以选中所有的 HTML 元素。 语法: * { 属性名: 属性值; } 举例: <!DOCTYPE html> <html lang"zh-cn"> <head><meta charset"UTF-8"><meta name"viewport" …...
idea-代码补全快捷键
文章目录 前言idea-代码补全快捷键1. 基本补全2. 类型匹配补全3. 后缀补全4. 代码补全 前言 如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。 而且听说点赞的人每天的运气都不会太差,…...
基于SpringBoot+vue粮油商城小程序系统
粮油商城小程序为用户提供方便快捷的在线购物体验,包括大米、面粉、食用油、调味品等各种粮油产品的选购,用户可以浏览商品详情、对比价格、下单支付等操作。同时,商城还提供优惠活动、积分兑换等福利,让用户享受到更多实惠和便利…...
挪车小程序挪车二维码php+uniapp
一款基于FastAdminThinkPHP开发的匿名通知车主挪车微信小程序,采用匿名通话的方式,用户只能在有效期内拨打车主电话,过期失效,从而保护车主和用户隐私。提供微信小程序端和服务端源码,支持私有化部署。 更新日志 V1.0…...
企业内部知识库:安全协作打造企业智慧运营基石
内容概要 作为企业智慧运营的核心载体,企业内部知识库通过结构化的信息聚合与动态化的知识流动,为组织提供了从数据沉淀到价值转化的系统性框架。其底层架构以权限管理为核心,依托数据加密技术构建多层级访问控制机制,确保敏感信…...
第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...
[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解
突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 安全措施依赖问题 GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...
Appium+python自动化(十六)- ADB命令
简介 Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具,其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利,如安装和调试…...
以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
Python爬虫(一):爬虫伪装
一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...
PL0语法,分析器实现!
简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...
C# SqlSugar:依赖注入与仓储模式实践
C# SqlSugar:依赖注入与仓储模式实践 在 C# 的应用开发中,数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护,许多开发者会选择成熟的 ORM(对象关系映射)框架,SqlSugar 就是其中备受…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...
基于matlab策略迭代和值迭代法的动态规划
经典的基于策略迭代和值迭代法的动态规划matlab代码,实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...
