flink-cdc同步数据到doris中
1 创建数据库和表
1.1 数据库脚本
这样直接创建数据库是有问题,因为后面发现superset连接使用doris://root:123456@10.101.12.82:9030/internal.eayc?charset=utf8mb4
-- 创建数据库eayc
create database if not exists ods_eayc;
-- 创建数据表

2 数据同步
2.1 flnk-cdc
参考Flink CDC实时同步MySQL到Doris
Flink CDC 概述
2.1.1 最简单的单表同步
从下面的yml脚本可以看到,并没有doris中创建eayc_user表,应该是flink-cdc自动创建的。
#Mysql的参数配置
source:type: mysqlhostname: 10.101.10.11port: 3306username: flinkpassword: 123456tables: eayc.eayc_userserver-id: 5400# server-time-zone: UTC
#Doris的参数配置
sink:type: dorisfenodes: 10.101.11.2:8030,10.101.11.2:8030,10.101.11.3:8030username: rootpassword: 123456table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: eayc.eayc_usersink-table: ods_eayc.eayc_user
pipeline:name: eayc to dorisparallelism: 1
注意连接mysql的server-id的要唯一,否则提示下面的错误
A slave with the same server_uuid/server_id as this slave has connected to the master...
The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.
进入到flink的界面查看到错误日志,任务执行失败。下面报的错是mysql时区与flink配置不匹配。现在改生产库影响未知,不敢动,于是去掉server-time-zone: UTC设置。重新执行任务。


此时任务可以正常执行了,数据也可以正常过来了。因为flink-cdc是根据binlog,因此mysql变更,doris中的数据也实时更新过来。

2.1.2 多表同步
如下配置
source:tables: eayc.eayc_user,eayc.eayc_company,eayc.eayc_company_user
route:- source-table: eayc.eayc_usersink-table: ods_eayc.eayc_user- source-table: eayc.eayc_companysink-table: ods_eayc.eayc_company- source-table: eayc.eayc_company_usersink-table: ods_eayc.eayc_company_user
下面这种方式不支持,会报下面的错误:
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Array value (token `JsonToken.START_ARRAY`)at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: java.util.LinkedHashMap["tables"])

2.1.3 分表导入
taskmanager.numberOfTaskSlots默认为1,slot不够,就报下面的错误,因为是16C32G,于是我改成了8,parallelism.default默认也是1,我也改成了8,启动之后,没有报下面的错误,但是之前执行的任务没有了。
2025-02-19 15:05:07
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.at
如果mysql的表没有主键,则报下面的错误,这个时候就需要修正原mysql表数据。
Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.
doris权限问题,这个是FE集群有问题,更改过来就好了。
reason: SchemaChange request error with Failed to schemaChange, response: {"msg":"Unauthorized","code":401,"data":"Access denied for user 'root@10.101.12.90' (using password: YES)","count":0}
可以看到下面,要获取acc的全部表,但是有一些是做了分表,需合并到其中doris的一张表里面,这个规则是有效的,开始parallelism: 1,我以为有一异常,只同步了一张表,过了几分钟才发现其他表也陆续进来。
source:tables: acc.\.*
route:- source-table: acc.acc_account_balance_\.*sink-table: acc.acc_account_balance- source-table: acc.acc_account_subject_\.*sink-table: acc.acc_account_subject- source-table: acc.acc_initial_balance_\.*sink-table: acc.acc_initial_balance- source-table: acc.acc_voucher_\.*sink-table: acc.acc_voucher- source-table: acc.acc_voucher_entry_\.*sink-table: acc.acc_voucher_entry
于是将parallelism: 4,很快后台又抛异常。
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
于是调整
taskmanager.memory.process.size: 8192m # 增加 TaskManager 的内存
Flink CDC并行执行,会出现数据越界的问题。
Flink CDC报错ArrayIndexOutOfBoundsException解决思路
2.2 flink安装
2.2.1 单节点
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
# 配置环境变量
vi /etc/profile
export JAVA_HOME=/appdata/jdk1.8.0_181
export CLASSPATH=$JAVA_HOME/lib
export FLINK_HOME=/appdata/flink/flink-1.18.0
export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH
# 生效
source /etc/profile
# flink配置
vim conf/flink-conf.yaml
execution.checkpointing.interval: 3000
rest.bind-address: 0.0.0.0
cd bin
./start-cluster.sh
#
tar -zxvf flink-cdc-3.0.0-bin.tar.gz
# 执行任务
cd /appdata/flink/flink-cdc-3.0.0
bash bin/flink-cdc.sh /appdata/flink/job/eayc_to_doris.yml
flink-1.18.0
flink-cdc-3.0.0
mysql pipeline connector 3.0.0
doris pipeline connector 3.0.0
将上面两个connector放到cdc的lib目录

2.2.2 监控


相关文章:
flink-cdc同步数据到doris中
1 创建数据库和表 1.1 数据库脚本 这样直接创建数据库是有问题,因为后面发现superset连接使用doris://root:12345610.101.12.82:9030/internal.eayc?charsetutf8mb4 -- 创建数据库eayc create database if not exists ods_eayc; -- 创建数据表2 数据同步 2.1 f…...
Git命令行入门
诸神缄默不语-个人CSDN博文目录 之前写过一篇VSCode Git的博文:VSCode上的Git使用手记(持续更新ing…) 现在随着开发经历增加,感觉用到命令行之类复杂功能的机会越来越多了,所以我专门再写一篇Git命令行的文章。 G…...
DeepSeek R1/V3满血版——在线体验与API调用
前言:在人工智能的大模型发展进程中,每一次新模型的亮相都宛如一颗投入湖面的石子,激起层层波澜。如今,DeepSeek R1/V3 满血版强势登场,为大模型应用领域带来了全新的活力与变革。 本文不但介绍在线体验 DeepSeek R1/…...
关于 BK3633 上电时受串口 UART2 影响而无法启动的问题说明
1. 问题描述 BK3633 SDK 版本:BK3633_DesignKit_V06_2310 使用 BK3633 UART2 与指纹模块进行通讯,为了降低功耗,通过 GPIO 控制了指纹模块的供电电源。但每次给整个系统板子上电时,BK3633 很大概率会实际而无法正常运行程序&…...
Redis7——基础篇(六)
前言:此篇文章系本人学习过程中记录下来的笔记,里面难免会有不少欠缺的地方,诚心期待大家多多给予指教。 基础篇: Redis(一)Redis(二)Redis(三)Redis&#x…...
简单工厂模式 (Simple Factory Pattern) 在Spring Boot 中的应用
简单工厂模式(Simple Factory Pattern)虽然不属于 GoF 23 种经典设计模式,但在实际开发中非常常用,尤其是在 Spring Boot 项目中。它提供了一种简单的方式来创建对象,将对象的创建逻辑集中到一个工厂类中。 一、简单工…...
Python简单使用MinerU
Python简单使用MinerU 1 简介 MinerU是国产的一款将PDF转化为机器可读格式的工具(如markdown、json),可以很方便地抽取为任意格式。目前支持图像(.jpg及.png)、PDF、Word(.doc及.docx)、以及P…...
使用AI创建流程图和图表的 3 种简单方法
你可能已经尝试过使用 LLMs 生成图像,但你有没有想过用它们来创建 流程图和图表?这些可视化工具对于展示流程、工作流和系统架构至关重要。 通常,在在线工具上手动绘制图表可能会耗费大量时间。但你知道吗?你可以使用 LLMs 通过简…...
ImportError: cannot import name ‘FixtureDef‘ from ‘pytest‘
错误信息表明 pytest 在尝试导入 FixtureDef 时出现了问题。通常是由于 pytest 版本不兼容 或 插件版本冲突 引起的。以下是详细的排查步骤和解决方案: 1. 检查 pytest 版本 首先,确认当前安装的 pytest 版本。某些插件可能需要特定版本的 pytest 才能…...
机器学习实战(7):聚类算法——发现数据中的隐藏模式
第7集:聚类算法——发现数据中的隐藏模式 在机器学习中,聚类(Clustering) 是一种无监督学习方法,用于发现数据中的隐藏模式或分组。与分类任务不同,聚类不需要标签,而是根据数据的相似性将其划…...
z-score算法
z-score算法原理参考网址 https://blog.csdn.net/m0_59596937/article/details/128378641 具体实现代码如下: import numpy as npclass ZScoreOutlierDetector:def __init__(self, threshold3):"""构造函数"""self.threshold thre…...
企业级RAG开源项目分享:Quivr、MaxKB、Dify、FastGPT、RagFlow
企业级 RAG GitHub 开源项目深度分享:Quivr、MaxKB、Dify、FastGPT、RagFlow 及私有化 LLM 部署建议 随着生成式 AI 技术的成熟,检索增强生成(RAG)已成为企业构建智能应用的关键技术。RAG 技术能够有效地将大型语言模型ÿ…...
open webui 部署 以及解决,首屏加载缓慢,nginx反向代理访问404,WebSocket后端服务器链接失败等问题
项目地址:GitHub - open-webui/open-webui: User-friendly AI Interface (Supports Ollama, OpenAI API, ...) 选择了docker部署 如果 Ollama 在您的计算机上,请使用以下命令 docker run -d -p 3000:8080 --add-hosthost.docker.internal:host-gatewa…...
C++ 智能指针 unique_ptr shared_ptr weak_ptr小练习
智能指针是 C11 引入的一项重要特性,它可以帮助我们管理动态分配的内存,自动释放内存,避免内存泄漏和悬空指针的问题。智能指针有三种常用类型:std::unique_ptr、std::shared_ptr 和 std::weak_ptr。 为了帮助你熟悉智能指针的使…...
Netstat(Network Statistics)网络工具介绍
Netstat 工具详细介绍及常见指令应用 Netstat(Network Statistics)是一个常用的命令行工具,用于显示网络连接、路由表、接口统计信息、伪装连接等信息。它可以帮助用户监控计算机的网络状态,尤其在诊断网络问题时非常有用。Netst…...
内容中台架构下智能推荐系统的算法优化与分发策略
内容概要 在数字化内容生态中,智能推荐系统作为内容中台的核心引擎,承担着用户需求与内容资源精准匹配的关键任务。其算法架构的优化路径围绕动态特征建模与多模态数据融合展开,通过深度强化学习技术实现用户行为特征的实时捕捉与动态更新&a…...
React 高阶组件的优缺点
React 高阶组件的优缺点 优点 1. 代码复用性高 公共逻辑封装:当多个组件需要实现相同的功能或逻辑时,高阶组件可以将这些逻辑封装起来,避免代码重复。例如,多个组件都需要在挂载时进行数据获取操作,就可以创建一个数…...
最新版IDEA下载安装教程
一、下载IDEA 点击前往官网下载 或者去网盘下载 点击前往百度网盘下载 点击前往夸克网盘下载 进去后点击IDEA 然后点击Download 选择自己电脑对应的系统 点击下载 等待下载即可 二、安装IDEA 下载好后双击应用程序 点击下一步 选择好安装目录后点击下一步 勾选这两项后点击…...
DeepSeek最新开源动态:核心技术公布
2月21日午间,DeepSeek在社交平台X发文称,从下周开始,他们将开源5个代码库,以完全透明的方式与全球开发者社区分享他们的研究进展。并将这一计划定义为“Open Source Week”。 DeepSeek表示,即将开源的代码库是他们在线…...
《炒股养家心法.pdf》 kimi总结
《炒股养家心法.pdf》这篇文章详细阐述了一位超级游资炒股养家的心得与技巧,展示了其从40万到10亿的股市传奇。以下是文章中炒股技巧和心得的详细总结: 1.核心理念 市场情绪的理解:炒股养家强调,股市的本质是群体博弈,…...
运维脚本——8.证书自动化管理
场景:自动化SSL/TLS证书的申请、续期和部署,避免证书过期导致服务中断。 示例:使用Shell脚本配合Lets Encrypt的Certbot工具自动续期证书。 #!/bin/bash # 自动续期Lets Encrypt证书并重启服务 certbot renew --quiet --post-hook "syst…...
RDMA ibverbs_API功能说明
设备管理 获取当前活动网卡 返回当前rdma设备列表 struct ibv_device **ibv_get_device_list(int *num_devices);//使用 struct ibv_device **dev_list ibv_get_device_list(NULL);获取网卡名 返回网卡名字字符串:如"mlx5_0",一般通过网卡…...
第15届 蓝桥杯 C++编程青少组中/高级选拔赛 202401 真题答案及解析
第 1 题 【 单选题 】 表达式117 % 16 的结果是( )。 A:0 B:5 C:7 D:10 解析: % 是取模运算符,用于计算两个数相除后的余数。 计算 117 / 16,结果是 7,余数是 5。因此,117 % 16 = 5。答案: B 第 2 题 【 单选题 】 下列选项中,字符数组定义正确的是( …...
【R语言】绘图
一、散点图 散点图也叫X-Y图,它将所有的数据以点的形式展现在坐标系上,用来显示变量之间的相互影响程度。 ggplot2包中用来绘制散点图的函数是geom_point(),但在绘制前需要先用ggplot()函数指定数据集和变量。 下面用mtcars数据集做演示&a…...
Linux基本指令(三)+ 权限
文章目录 基本指令grep打包和压缩zip/unzipLinux和windows压缩包互传tar(重要)Linux和Linux压缩包互传 bcuname -r常用的热键关机外壳程序 知识点打包和压缩 Linux中的权限用户权限 基本指令 grep 1. grep可以过滤文本行 done用于标记循环的结束&#x…...
容器化部署tomcat
容器化部署tomcat 需求在docker容器中部署tomcat,并通过外部机器访问tomcat部署的项目 容器化部署要先装好docker容器(docker安装配置) 实现步骤: 拉取tomcat docker pull tomcat用于列出本地Docker主机上存储的所有镜像 docker images在root目录里面创建tomc…...
vscode软件中引入vant组件
一、vant简介 Vant 是一个轻量、可靠的移动端组件库,于 2017 年开源。 目前 Vant 官方提供了 Vue 2 版本、Vue 3 版本和微信小程序版本,并由社区团队维护 React 版本和支付宝小程序版本。 官网:介绍 - Vant Weapp 里面的快速上手的教程&a…...
DeepSeek vs ChatGPT:AI 领域的华山论剑,谁主沉浮?
一、引言 在当今科技飞速发展的时代,人工智能(AI)已然成为推动各领域变革的核心力量。而在人工智能的众多分支中,自然语言处理(NLP)因其与人类日常交流和信息处理的紧密联系,成为了最受瞩目的领…...
Ubuntu 22.04 Install deepseek
前言 deepseekAI助手。它具有聊天机器人功能,可以与用户进行自然语言交互,回答问题、提供建议和帮助解决问题。DeepSeek 的特点包括: 强大的语言理解能力:能够理解和生成自然语言,与用户进行流畅的对话。多领域知识&…...
如何将公钥正确添加到服务器的 authorized_keys 文件中以实现免密码 SSH 登录
1. 下载密钥文件 2. RSA 解析 将 id_ed25519 类型的私钥转换为 RSA 类型,要将 ED25519 私钥转换为 RSA 私钥,需要重新生成一个新的 RSA 密钥对。 步骤: 生成新的 RSA 密钥对 使用 ssh-keygen 来生成一个新的 RSA 密钥对。比如,执…...
