当前位置: 首页 > news >正文

flink使用StatementSet降低资源浪费

背景

项目中有很多ods层(mysql 通过cannal)kafka,需要对这些ods kakfa做一些etl操作后写入下一层的kafka(dwd层)。

一开始采用的是executeSql方式来执行每个ods→dwd层操作,即类似:

 def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dml,在insert语句中调用etl_handle进行预处理和写入tableEnv.executeSql(INSERT_DWD_TABLE1)tableEnv.executeSql(INSERT_DWD_TABLE2)... 
}

当有多个ods->dwd操作放在同一个flink作业中时,发现这种方式会导致每次insert操作都是单独的DAG,非常消耗资源,特别是这些处理都是比较轻量级的,最好是能融合在同一个DAG中共享资源。

解决方案

查看flink文档:INSERT 语句 | Apache Flink

因此,可以采用statementset的方式,将不同insert sql进行分组执行,每组的insert sql会先被缓存到 StatementSet 中,并在StatementSet.execute() 方法被调用时,同一组的 insert sql(sink) 会被优化成一张DAG共用taskmanager,减少资源浪费,即类似:

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)val configuration: Configuration = tableEnv.getConfig.getConfigurationtableEnv.createTemporarySystemFunction("etl_handle", classOf[ETLFunction])// source/sink ddltableEnv.executeSql(CREATE_DB_DDL)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE1)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE1)tableEnv.executeSql(SOURCE_KAFKA_ODS_TABLE2)tableEnv.executeSql(SINK_KAFKA_DWD_TABLE2)....// insert dmltableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE1).addInsertSql(INSERT_DWD_TABLE2).addInsertSql(INSERT_DWD_TABLE3).execute()tableEnv.createStatementSet().addInsertSql(INSERT_DWD_TABLE4).addInsertSql(INSERT_DWD_TABLE5).addInsertSql(INSERT_DWD_TABLE6).execute()
}

其他

如果是纯flink sql而不用data stream api,也是可以达到同样的效果的。

相关文章:

flink使用StatementSet降低资源浪费

背景 项目中有很多ods层(mysql 通过cannal)kafka,需要对这些ods kakfa做一些etl操作后写入下一层的kafka(dwd层)。 一开始采用的是executeSql方式来执行每个ods→dwd层操作,即类似: def main(…...

FineDataLink4.1.9支持Kettle调用

FDL更新至4.1.9后,新增kettle调用功能,支持不增加额外负担的情况下,将现有的Kettle任务平滑迁移到FineDataLink。 一、更新版本前存在的问题与痛点 在此次功能更新前,用户可能会遇到以下问题: 1.对于仅使用kettle的…...

SwanLinkOS首批实现与HarmonyOS NEXT互联互通,软通动力子公司鸿湖万联助力鸿蒙生态统一互联

在刚刚落下帷幕的华为开发者大会2024上,伴随全场景智能操作系统HarmonyOS Next的盛大发布,作为基于OpenHarmony的同根同源系统生态,软通动力子公司鸿湖万联全域智能操作系统SwanLinkOS首批实现与HarmonyOS NEXT互联互通,率先攻克基…...

Win11禁止右键菜单折叠的方法

背景 在使用windows11的时候,会发现默认情况下,右键菜单折叠了。以至于在使用一些软件的右键菜单时总是要点击“显示更多选项”菜单展开所有菜单,然后再点击。而且每次在显示菜单时先是全部展示,再隐藏一下,看着着实难…...

Maven列出所有的依赖树

在 IntelliJ IDEA 中,你可以使用 Maven 插件来列出项目的依赖树。Maven 插件提供了一个名为dependency:tree的目标,可以帮助你获取项目的依赖树详细信息。 要列出项目的依赖树,可以执行以下步骤: 打开 IntelliJ IDEA,…...

测试开发面试题和答案

Python 请解释Python中的列表推导式(List Comprehension)是什么,并给出一个示例。 答案: 列表推导式是Python中一种简洁的构建列表的方法。它允许从一个已存在的列表创建新列表,同时应用一个表达式来修改或选择元素。…...

llm学习-3(向量数据库的使用)

1:数据读取和加载 接着上面的常规操作 加载环境变量---》获取所有路径---》加载文档---》切分文档 代码如下: import os from dotenv import load_dotenv, find_dotenvload_dotenv(find_dotenv()) # 获取folder_path下所有文件路径,储存在…...

【01-02】Mybatis的配置文件与基于XML的使用

1、引入日志 在这里我们引入SLF4J的日志门面&#xff0c;使用logback的具体日志实现&#xff1b;引入相关依赖&#xff1a; <!--日志的依赖--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version&g…...

Linux-进程间通信(IPC)

进程间通信&#xff08;IPC&#xff09;介绍 进程间通信&#xff08;IPC&#xff0c;InterProcess Communication&#xff09;是指在不同的进程之间传播或交换信息。IPC 的方式包括管道&#xff08;无名管道和命名管道&#xff09;、消息队列、信号量、共享内存、Socket、Stre…...

C++ STL: std::vector与std::array的深入对比

什么是 std::vector 和 std::array 首先&#xff0c;让我们简要介绍一下这两种容器&#xff1a; • std::vector&#xff1a;一个动态数组&#xff0c;可以根据需要动态调整其大小。 • std::array&#xff1a;一个固定大小的数组&#xff0c;其大小在编译时确定。 虽然…...

哈哈看到这条消息感觉就像是打开了窗户

在这个信息爆炸的时代&#xff0c;每一条动态可能成为我们情绪的小小触发器。今天&#xff0c;当我无意间滑过那条由杜海涛亲自发布的“自曝式”消息时&#xff0c;不禁心头一颤——如果这是我的另一半&#xff0c;哎呀&#xff0c;那画面&#xff0c;简直比烧烤摊还要“热辣”…...

10、matlab中字符、数字、矩阵、字符串和元胞合并为字符串并将字符串以不同格式写入读出excel

1、前言 在 MATLAB 中&#xff0c;可以使用不同的数据类型&#xff08;字符、数字、矩阵、字符串和元胞&#xff09;合并为字符串&#xff0c;然后将字符串以不同格式写入 Excel 文件。 以下是一个示例代码&#xff0c;展示如何将不同数据类型合并为字符串&#xff0c;并以不…...

如何正确面对GPT-5技术突破

随着人工智能技术的快速发展&#xff0c;预训练语言模型在自然语言处理领域取得了显著的成果。其中&#xff0c;GPT系列模型作为代表之一&#xff0c;受到了广泛关注。2023年&#xff0c;GPT-5模型的发布引起了业界的热烈讨论。本文将从以下几个方面分析GPT-5的发布及其对人工智…...

HarmonyOS ArkUi 官网踩坑:单独隐藏导航条无效

环境&#xff1a; 手机&#xff1a;Mate 60 Next版本&#xff1a; NEXT.0.0.26 导航条介绍 导航条官网设计指南 setSpecificSystemBarEnabled 设置实际效果&#xff1a; navigationIndicator&#xff1a;隐藏导航条无效status&#xff1a;会把导航条和状态栏都隐藏 官方…...

解决跨域问题(vite、axios/koa)

两种方法选其一即可 一、后端koa设置中间件 app.use(async (ctx, next)> {ctx.set(Access-Control-Allow-Origin, *);ctx.set(Access-Control-Allow-Headers, Content-Type, Content-Length, Authorization, Accept, X-Requested-With , yourHeaderFeild);ctx.set(Access-C…...

echarts实现3D柱状图(视觉层面)

一、第一种效果 效果图 使用步骤 完整实例&#xff0c;copy就可直接使用 <template><div :class"className" :style"{height:height,width:width}" /> </template><script>import echarts from echartsrequire(echarts/theme/…...

K8S集群进行分布式负载测试

使用K8S集群执行分布式负载测试 本教程介绍如何使用Kubernetes部署分布式负载测试框架&#xff0c;该框架使用分布式部署的locust 产生压测流量&#xff0c;对一个部署到 K8S集群的 Web 应用执行负载测试&#xff0c;该 Web 应用公开了 REST 格式的端点&#xff0c;以响应传入…...

20.《C语言》——【移位操作符】

&#x1f339;开场语 亲爱的读者&#xff0c;大家好&#xff01;我是一名正在学习编程的高校生。在这个博客里&#xff0c;我将和大家一起探讨编程技巧、分享实用工具&#xff0c;并交流学习心得。希望通过我的博客&#xff0c;你能学到有用的知识&#xff0c;提高自己的技能&a…...

你想活出怎样的人生?

hi~好久不见&#xff0c;距离上次发文隔了有段时间了&#xff0c;这段时间&#xff0c;我是裸辞去感受了一下前端市场的水深火热&#xff0c;那么这次咱们不聊技术&#xff0c;就说一说最近这段时间的经历和一些感触吧。 先说一下自己的个人情况&#xff0c;目前做前端四年&am…...

py黑帽子学习笔记_burp

配置burp kali虚机默认装好了社区版burp和java&#xff0c;其他os需要手动装 burp是用java&#xff0c;还得下载一个jython包&#xff0c;供burp用 配apt国内源&#xff0c;然后apt install jython --download-only&#xff0c;会只下载包而不安装&#xff0c;下载的目录搜一…...

Curated Programming Resources实战案例:如何利用这些资源快速掌握新技能

Curated Programming Resources实战案例&#xff1a;如何利用这些资源快速掌握新技能 【免费下载链接】curated-programming-resources A curated list of resources for learning programming. 项目地址: https://gitcode.com/gh_mirrors/cu/curated-programming-resources …...

寻音捉影·侠客行企业应用:制药企业GMP培训录音中自动核查‘无菌操作’等SOP术语

寻音捉影侠客行企业应用&#xff1a;制药企业GMP培训录音中自动核查‘无菌操作’等SOP术语 1. 引言&#xff1a;制药企业的音频管理痛点 在制药企业的日常运营中&#xff0c;GMP&#xff08;良好生产规范&#xff09;培训是确保药品质量和生产安全的关键环节。每次培训都会产…...

Qwen3-VL-30B功能全体验:图文对话、图表分析、多图推理一网打尽

Qwen3-VL-30B功能全体验&#xff1a;图文对话、图表分析、多图推理一网打尽 1. 开篇&#xff1a;认识这个视觉语言"全能选手" 当你第一次听说Qwen3-VL-30B这个名字时&#xff0c;可能会被它的技术参数吓到——300亿参数的视觉语言模型&#xff0c;听起来像是实验室…...

窗口大小强制调整工具终极指南:如何轻松掌控任意应用程序窗口尺寸

窗口大小强制调整工具终极指南&#xff1a;如何轻松掌控任意应用程序窗口尺寸 【免费下载链接】WindowResizer 一个可以强制调整应用程序窗口大小的工具 项目地址: https://gitcode.com/gh_mirrors/wi/WindowResizer 还在为那些顽固的应用程序窗口而烦恼吗&#xff1f;某…...

终极指南:5个核心方案彻底解决AEUX插件连接失败问题

终极指南&#xff1a;5个核心方案彻底解决AEUX插件连接失败问题 【免费下载链接】AEUX Editable After Effects layers from Sketch artboards 项目地址: https://gitcode.com/gh_mirrors/ae/AEUX AEUX作为连接设计工具与After Effects的专业桥梁&#xff0c;在提升设计…...

DiskInfo终极指南:3分钟掌握硬盘健康状态,免费保护你的数据安全

DiskInfo终极指南&#xff1a;3分钟掌握硬盘健康状态&#xff0c;免费保护你的数据安全 【免费下载链接】DiskInfo DiskInfo based on CrystalDiskInfo 项目地址: https://gitcode.com/gh_mirrors/di/DiskInfo 硬盘就像电脑的"记忆仓库"&#xff0c;所有重要文…...

NoFences:重构桌面空间的区域化引擎

NoFences&#xff1a;重构桌面空间的区域化引擎 【免费下载链接】NoFences &#x1f6a7; Open Source Stardock Fences alternative 项目地址: https://gitcode.com/gh_mirrors/no/NoFences 副标题&#xff1a;三步打造个性化桌面系统 痛点分析&#xff1a;你的桌面是…...

Hearthstone-Script:炉石传说自动化脚本的革新实践

Hearthstone-Script&#xff1a;炉石传说自动化脚本的革新实践 【免费下载链接】Hearthstone-Script Hearthstone script&#xff08;炉石传说脚本&#xff09;&#xff08;2024.01.25停更至国服回归&#xff09; 项目地址: https://gitcode.com/gh_mirrors/he/Hearthstone-S…...

从PID控制器到语义分割:手把手教你复现PIDNet(附PyTorch代码与Cityscapes实战)

从PID控制器到语义分割&#xff1a;手把手教你复现PIDNet&#xff08;附PyTorch代码与Cityscapes实战&#xff09; 在计算机视觉领域&#xff0c;实时语义分割一直是个极具挑战性的任务。想象一下自动驾驶汽车需要在毫秒级时间内准确识别道路上的每个像素属于车辆、行人还是交通…...

Realistic Vision V5.1 虚拟摄影棚:QT开发跨平台AI图像生成桌面应用

Realistic Vision V5.1 虚拟摄影棚&#xff1a;QT开发跨平台AI图像生成桌面应用 想象一下&#xff0c;你是一位独立摄影师或内容创作者&#xff0c;脑海里有一个绝妙的画面构思——可能是晨曦中穿着复古长裙的少女&#xff0c;也可能是赛博朋克都市里的未来侦探。过去&#xf…...