当前位置: 首页 > news >正文

PiflowX如何快速开发flink程序

PiflowX如何快速开发flink程序

参考资料

Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码-腾讯云开发者社区-腾讯云 (tencent.com)

Flink SQL 背景

Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。

Flink SQL 是面向用户的 API 层,在我们传统的流式计算领域,比如 Storm、Spark Streaming 都会提供一些 Function 或者 Datastream API,用户通过 Java 或 Scala 写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,API 也出现了很多不兼容的地方。

在这个背景下,毫无疑问,SQL 就成了我们最佳选择,之所以选择将 SQL 作为核心 API,是因为其具有几个非常重要的特点:

  • SQL 属于设定式语言,用户只要表达清楚需求即可,不需要了解具体做法;
  • SQL 可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划;
  • SQL 易于理解,不同行业和领域的人都懂,学习成本较低;
  • SQL 非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少;
  • 流与批的统一,Flink 底层 Runtime 本身就是一个流与批统一的引擎,而 SQL 可以做到 API 层的流与批统一。
Flink SQL 常规实战应用

案例来自(Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码-腾讯云开发者社区-腾讯云 (tencent.com))!详细流程有兴趣可以参考原文示例。(如有侵犯,请请联系!)。

在此,简单总结一下flink sql的开发流程:

1.首先需要创建maven工程,确认需要的各种依赖,运气好的话,还需要花费大量的精力和时间去排查依赖冲突的问题(oh God bless me!);

2.开始balabala编写模板代码,如:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

3.数据准备和预处理;

 DataSet<String> input = env.readTextFile("score.csv");DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {@Overridepublic PlayerData map(String s) throws Exception {String[] split = s.split(",");return new PlayerData(String.valueOf(split[0]),String.valueOf(split[1]),String.valueOf(split[2]),Integer.valueOf(split[3]),Double.valueOf(split[4]),Double.valueOf(split[5]),Double.valueOf(split[6]),Double.valueOf(split[7]),Double.valueOf(split[8]));}});
其中的PlayerData类为自定义类:
public static class PlayerData {/*** 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分*/public String season;public String player;public String play_num;public Integer first_court;public Double time;public Double assists;public Double steals;public Double blocks;public Double scores;public PlayerData() {super();}public PlayerData(String season,String player,String play_num,Integer first_court,Double time,Double assists,Double steals,Double blocks,Double scores) {this.season = season;this.player = player;this.play_num = play_num;this.first_court = first_court;this.time = time;this.assists = assists;this.steals = steals;this.blocks = blocks;this.scores = scores;}}

4.终于到了真正的业务处理了,有了flink sql的强大和方便,倒是省了不少代码;

Table queryResult = tableEnv.sqlQuery("
select player, count(season) as num FROM score GROUP BY player ORDER BY num desc LIMIT 3
");

5.ok,到此,数据处理和计算逻辑完毕,处理结果写入到sink,可以完结散花咯,哈哈;

DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
result.print();

6.哦!好像还需要调试运行,好吧,再辛苦一会,便可大功告成!
在这里插入图片描述

7.完美,上线。。。。。。
在这里插入图片描述

(以上,纯属娱乐,如有不当,敬请谅解!)

可见,在平日开发一个flink任务虽已尽可能简单,但开发周期也得1-2个工作日,甚至更长,有没有简单粗暴的,让我分分钟领盒饭,不,让我分分钟高效完成任务的!

当然有啦!!!接下来让我隆重的介绍一下今天的主角—PilfowX—大数据流水线系统。有兴趣可以查看之前的文章(StreamPark + PiflowX 打造新一代大数据计算处理平台-CSDN博客)。

PiflowX是基于Piflow和StreamPark二开实现的,在其基础上,实现了图像化拖拉拽的方式开发spark或flink作业,这里我将介绍flink任务的开发流程,以及如何零代码实现flink sql的开发。

PiflowX的flink组件算子基本都是基于flink table和sql实现的,我们只需在UI界面填写组件相关参数,之后的工作交给底层框架即可。

在这里插入图片描述

我们回顾一下flink sql语法定义。

Flink SQL 的语法和算子

Flink SQL 核心算子的语义设计参考了 1992、2011 等 ANSI-SQL 标准,Flink 使用 Apache Calcite 解析 SQL ,Calcite 支持标准的 ANSI SQL。

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name({ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n][ <watermark_definition> ][ <table_constraint> ][ , ...n])[COMMENT table_comment][PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]WITH (key1=val1, key2=val2, ...)[ LIKE source_table [( <like_options> )] | AS select_query ]<physical_column_definition>:column_name column_type [ <column_constraint> ] [COMMENT column_comment]<column_constraint>:[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED<table_constraint>:[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED<metadata_column_definition>:column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]<computed_column_definition>:column_name AS computed_column_expression [COMMENT column_comment]<watermark_definition>:WATERMARK FOR rowtime_column_name AS watermark_strategy_expression<source_table>:[catalog_name.][db_name.]table_name<like_options>:
{{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]
PiflowX组件flink table实现

在了解了flink sql的定义后,一切便简单多了,那么,我们只需要根据业务需要,设计出一个表单输入,填写我们的业务参数,然后,由框架自动生成sql不就可以了么。

以下介绍如何配置一个mysqlcdc组件:

1.首先从组件列表中拖入一个MysqlCdc组件到画布中,点击节点,右侧会显示出节点参数表单区域和参数说明和示例。参数解释可以查看之前的文章(PiflowX-MysqlCdc组件-CSDN博客)。

在这里插入图片描述

在这里插入图片描述

2.填写相关参数,其实就是在定义flink table中的with属性。

在属性输入框中,点击预览可以实时查看生成的flink sql。

在这里插入图片描述
在这里插入图片描述

生成的flink sql 语句仅供参考,最终执行的语句会在引擎执行侧生成。
在这里插入图片描述

3.接下来我们可以根据需要来定义flink table结构,此步骤和其他步骤没有先后顺序。点击表单属性tableDefinition,在此表单中我们可以输入flink table中的结构属性定义。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

可以看到,我们可以在此定义flink table中的表基本信息,物理列,元数据列,计算列,水印等,具体说明在此就不赘述了,以后会有具体文章来说明。看看最终的效果:

在这里插入图片描述

至此,我们通过简单的表单填写,便可开发一个flink任务,最后,点击运行,系统便可自动提交到flink环境,并可实时查看运行日志,是不是很方便快捷!

当然,目前系统处于初期研发阶段,还有很多不完善的地方,敬请谅解。最后,我们来看一个简单的实例,如果通过PiflowX开发一个mysql cdc实时同步和flink读取doris的任务。

PiflowX-Droris读写组件

PiflowX-MysqlCdc组件

相关文章:

PiflowX如何快速开发flink程序

PiflowX如何快速开发flink程序 参考资料 Flink最锋利的武器&#xff1a;Flink SQL入门和实战 | 附完整实现代码-腾讯云开发者社区-腾讯云 (tencent.com) Flink SQL 背景 Flink SQL 是 Flink 实时计算为简化计算模型&#xff0c;降低用户使用实时计算门槛而设计的一套符合标…...

Mysql运算符

文章目录 比较运算符< > !IS NULL \ IS NOT NULL \ ISNULLLEAST() \ GREATEST() 查询数据大小&#xff08;字典序&#xff09;BETWEEN...AND...IN (SET) \ NOT IN (SET)LIKE 模糊查询REGEXP \ RLIKE 逻辑运算符逻辑运算符&#xff1a; OR &#xff08;||&#xff09;、A…...

软件架构之事件驱动架构

一、定义 事件驱动的架构是围绕事件的发布、捕获、处理和存储&#xff08;或持久化&#xff09;而构建的集成模型。 某个应用或服务执行一项操作或经历另一个应用或服务可能想知道的更改时&#xff0c;就会发布一个事件&#xff08;也就是对该操作或更改的记录&#xff09;&am…...

C++ 后端面试 - 题目汇总

文章目录 &#x1f37a; 非技术问题&#x1f37b; 基本问题&#x1f942; 请自我介绍&#xff1f;&#x1f942; 你有什么问题需要问我的&#xff1f; &#x1f37b; 加班薪资&#x1f942; 你对加班有什么看法&#xff1f;&#x1f942; 你的薪资期望是多少&#xff1f;【待回…...

zds1104示波器使用指南

1、设置语言 2、功能检测验证示波器是否正常工作 3、示波器面板按钮详解 3.1、软键 3.2、运行控制与操作区 3.3、水平控制区 3.4、垂直控制区 3.5、多功能控制区 3.6、断电启动恢复&#xff0c;auto setup&#xff0c;default setup&#xff0c;恢复出厂设置详细解释 3.7、触…...

uni-app修改头像和个人信息

效果图 代码&#xff08;总&#xff09; <script setup lang"ts"> import { reqMember, reqMemberProfile } from /services/member/member import type { MemberResult, Gender } from /services/member/type import { onLoad } from dcloudio/uni-app impor…...

IDEA 中搭建 Spring Boot Maven 多模块项目 (父SpringBoot+子Maven)

第1步&#xff1a;新建一个SpringBoot 项目 作为 父工程 [Ref] 新建一个SpringBoot项目 删除无用的 .mvn 目录、 src 目录、 mvnw 及 mvnw.cmd 文件&#xff0c;最终只留 .gitignore 和 pom.xml 第2步&#xff1a;创建 子maven模块 第3步&#xff1a;整理 父 pom 文件 ① …...

竞赛保研 基于计算机视觉的身份证识别系统

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于机器视觉的身份证识别系统 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f9ff; 更多资料, 项目分享&#xff1a; https://gitee.com/dancheng-sen…...

在visual studio中调试时无法查看std::wstring

1.问题 在调试的时候发现std::wstring类型的变量查看不了&#xff0c;会显示(error)|0&#xff0c;百思不得其解。 2.解决方法 参考的&#xff1a;vs2015调试时无法显示QString变量的值&#xff0c;只显示地址_vs调试qstring的时候如何查看字符串-CSDN博客 在工具/选项/调试…...

2023年全国职业院校技能大赛高职组应用软件系统开发正式赛题—模块三:系统部署测试

模块三&#xff1a;系统部署测试&#xff08;3 小时&#xff09; 一、模块考核点 模块时长&#xff1a;3 小时模块分值&#xff1a;20 分本模块重点考查参赛选手的系统部署、功能测试、Bug 排查修复及文档编写能力&#xff0c;具体包括&#xff1a;系统部署。将给定项目发布到…...

微信小程序上传并显示图片

实现效果&#xff1a; 上传前显示&#xff1a; 点击后可上传&#xff0c;上传后显示&#xff1a; 源代码&#xff1a; .wxml <view class"{{company_logo_src?blank-area:}}" style"position:absolute;top:30rpx;right:30rpx;height:100rpx;width:100rp…...

java基础知识点系列——数据输入(五)

java基础知识点系列——数据输入&#xff08;五&#xff09; 数据输入概述 Scanner使用步骤 &#xff08;1&#xff09;导包 import java.util.Scanner&#xff08;2&#xff09;创建对象 Scanner sc new Scanner(System.in)&#xff08;3&#xff09;接收数据 int i sc…...

MySQL面试题 | 07.精选MySQL面试题

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…...

C语言中关于指针的理解及用法

关于指针意思的参考&#xff1a;https://baike.baidu.com/item/%e6%8c%87%e9%92%88/2878304 指针 指针变量 地址 野指针 野指针就是指针指向的位置是不可知的&#xff08;随机的&#xff0c;不正确的&#xff0c;没有明确限制的&#xff09; 以下是导致野指针的原因 1.指针…...

软件测试|深入理解Python中的re.search()和re.findall()区别

前言 在Python中&#xff0c;正则表达式是一种强大的工具&#xff0c;用于在文本中查找、匹配和处理模式。re 模块提供了许多函数来处理正则表达式&#xff0c;其中 re.search()和 re.findall() 是常用的两个函数&#xff0c;用于在字符串中查找匹配的模式。本文将深入介绍这两…...

❤ Vue3 完整项目太白搭建 Vue3+Pinia+Vant3/ElementPlus+typerscript(一)yarn 版本控制 ltb (太白)

❤ 项目搭建 一、项目信息 Vue3 完整项目搭建 Vue3PiniaVant3/ElementPlustyperscript&#xff08;一&#xff09;yarn 版本控制 项目地址&#xff1a; 二、项目搭建 &#xff08;1&#xff09;创建项目 yarn create vite <ProjectName> --template vueyarn install …...

linux搭建SRS服务器

linux搭建SRS服务器 文章目录 linux搭建SRS服务器SRS说明实验说明搭建步骤推流步骤查看web端服务器拉流步骤final SRS说明 SRS&#xff08;simple Rtmp Server&#xff09;,是一个简单高效的实时视频服务器&#xff0c;支持RTMP/WebRTC/HLS/HTTP-FLV/SRT, 是国人自己开发的一款…...

系列六、Spring Security中的认证 授权 角色继承

一、Spring Security中的认证 & 授权 & 角色继承 1.1、概述 关于Spring Security中的授权&#xff0c;请参考【系列一、认证 & 授权】&#xff0c;这里不再赘述。 1.2、资源类 /*** Author : 一叶浮萍归大海* Date: 2024/1/11 20:58* Description: 测试资源*/ Re…...

云原生周刊:OpenTofu 宣布正式发布 | 2023.1.15

开源项目推荐 kubeaudit kubeaudit 是一个开源项目&#xff0c;旨在帮助用户对其 Kubernetes 集群进行常见安全控制的审计。该项目提供了工具和检查规则&#xff0c;可以帮助用户发现潜在的安全漏洞和配置问题。 Chronos Chronos 是一款综合性开发人员工具&#xff0c;可监…...

【如何在 GitHub上面找项目】【转载】

很多的小伙伴&#xff0c;经常会有这样的困惑&#xff0c;我看了很多技术的学习文档、书籍、甚至视频&#xff0c;我想动手实践&#xff0c;于是我打开了GitHub&#xff0c;想找个开源项目&#xff0c;进行学习&#xff0c;获取项目实战经验。这个时候很多小伙伴就会面临这样的…...

idea大量爆红问题解决

问题描述 在学习和工作中&#xff0c;idea是程序员不可缺少的一个工具&#xff0c;但是突然在有些时候就会出现大量爆红的问题&#xff0c;发现无法跳转&#xff0c;无论是关机重启或者是替换root都无法解决 就是如上所展示的问题&#xff0c;但是程序依然可以启动。 问题解决…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

JVM垃圾回收机制全解析

Java虚拟机&#xff08;JVM&#xff09;中的垃圾收集器&#xff08;Garbage Collector&#xff0c;简称GC&#xff09;是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象&#xff0c;从而释放内存空间&#xff0c;避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...

vue3 字体颜色设置的多种方式

在Vue 3中设置字体颜色可以通过多种方式实现&#xff0c;这取决于你是想在组件内部直接设置&#xff0c;还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法&#xff1a; 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...

微信小程序云开发平台MySQL的连接方式

注&#xff1a;微信小程序云开发平台指的是腾讯云开发 先给结论&#xff1a;微信小程序云开发平台的MySQL&#xff0c;无法通过获取数据库连接信息的方式进行连接&#xff0c;连接只能通过云开发的SDK连接&#xff0c;具体要参考官方文档&#xff1a; 为什么&#xff1f; 因为…...

图表类系列各种样式PPT模版分享

图标图表系列PPT模版&#xff0c;柱状图PPT模版&#xff0c;线状图PPT模版&#xff0c;折线图PPT模版&#xff0c;饼状图PPT模版&#xff0c;雷达图PPT模版&#xff0c;树状图PPT模版 图表类系列各种样式PPT模版分享&#xff1a;图表系列PPT模板https://pan.quark.cn/s/20d40aa…...

Unity VR/MR开发-VR开发与传统3D开发的差异

视频讲解链接&#xff1a;【XR马斯维】VR/MR开发与传统3D开发的差异【UnityVR/MR开发教程--入门】_哔哩哔哩_bilibili...

前端调试HTTP状态码

1xx&#xff08;信息类状态码&#xff09; 这类状态码表示临时响应&#xff0c;需要客户端继续处理请求。 100 Continue 服务器已收到请求的初始部分&#xff0c;客户端应继续发送剩余部分。 2xx&#xff08;成功类状态码&#xff09; 表示请求已成功被服务器接收、理解并处…...

客户案例 | 短视频点播企业海外视频加速与成本优化:MediaPackage+Cloudfront 技术重构实践

01技术背景与业务挑战 某短视频点播企业深耕国内用户市场&#xff0c;但其后台应用系统部署于东南亚印尼 IDC 机房。 随着业务规模扩大&#xff0c;传统架构已较难满足当前企业发展的需求&#xff0c;企业面临着三重挑战&#xff1a; ① 业务&#xff1a;国内用户访问海外服…...

Java设计模式:责任链模式

一、什么是责任链模式&#xff1f; 责任链模式&#xff08;Chain of Responsibility Pattern&#xff09; 是一种 行为型设计模式&#xff0c;它通过将请求沿着一条处理链传递&#xff0c;直到某个对象处理它为止。这种模式的核心思想是 解耦请求的发送者和接收者&#xff0c;…...