当前位置: 首页 > news >正文

详解 Flink Table API 和 Flink SQL 之函数

一、系统内置函数

1. 比较函数

API函数表达式示例
Table API===,>,<,!=,>=,<=id===1001,age>18
SQL=,>,<,!=,>=,<=id=‘1001’,age>18

2. 逻辑函数

API函数表达式示例
Table API&&,||,!,.isFalse1>1 && 2>1,true.isFalse
SQLand,or,is false,not1>1 and 2>1,true is false

3. 算术函数

API函数表达式示例
Table API+,-,*,/,n1.power(n2)1 + 1,2.power(2)
SQL+,-,*,/,power(n1,n2)1 + 1,power(2,2)

4. 字符串函数

API函数表达式示例
Table APIstring1 + string2,.upperCase(),.lowerCase(),.charLength()‘a’ + ‘b’,
‘hello’.upperCase()
SQLstring1 || string2,upper(),lower(),char_length()‘a’ || ‘b’,upper(‘hello’)

5. 时间函数

API函数表达式示例
Table API.toDate,.toTimestamp,currentTime(),n.days,n.minutes‘20230107’.toDate,
2.days,10.minutes
SQLDate string,Timestamp string,current_time,interval string rangeDate ‘20230107’,interval ‘2’ hour/second/minute/day

6. 聚合函数

API函数表达式示例
Table API.count,.sum,.sum0id.count,age.sum,sum0 表示求和的所有值都为 null 则返回 0
SQLcount(),sum(),rank(),row_number()count(*),sum(age)

二、用户自定义函数(UDF)

UDF 显著地扩展了查询的表达能力,可以解决一些系统内置函数无法解决的需求。使用步骤为:自定义 UDF 函数类继承 UserDefinedFunction 抽象类;创建 UDF 实例并在环境中调用 registerFunction() 方法注册;在 Table API 或 SQL 中使用

1. 标量函数

Scalar Function,可以将 0、1 或多个标量值,映射到一个新的标量值,一进一出

/**用户自定义标量函数步骤:1.自定义函数类继承 ScalarFunction 抽象类,并在类中定义一个 public 的名为 eval 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestScalarFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的标量函数 hashCode,查询每条数据 id 的hashCode 值//2.创建自定义标量函数实例HashCode hashCode = new HashCode(0.8);//3.在环境中注册函数tableEnv.registerFunction("hashCode", hashCode);//4.使用//4.1 Table APITable resultTable = sensorTable.select("id, ts, hashCode(id)");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义 HashCode 函数,继承 ScalarFunction 类,并定义 eval 方法public static class HashCode extends ScalarFunction {private int factor;public HashCode(int factor) {this.factor = factor;}//该方法必须为 public 且名称必须为 eval,返回值和参数可以自定义public int eval(String str) {return str.hashCode() * factor;}}
}

2. 表函数

Table Function,可以将0、1或多个标量值作为输入参数,可以返回任意数量的行作为输出。一进多出

/**用户自定义表函数步骤:1.自定义函数类继承 TableFunction 抽象类,定义输出泛型,并在类中定义一个 public 的名为 eval,返回值为 void 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestTableFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表函数 Split,将每条数据的 id 分割并按 (word, length) 输出//2.创建自定义表函数实例Split split = new Split("_");//3.在环境中注册函数tableEnv.registerFunction("split", split);//4.使用//4.1 Table API//一进多出函数要配合 joinLateral 使用,侧写表Table resultTable = sensorTable.joinLateral("split(id) as (word, length)").select("id, ts, word, length");//4.2 SQL//一进多出函数要进行 lateral table 的关联,类似于 lateral viewTable resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length from sensor, lateral table(split(id)) as lt(word, length)");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义表函数 Split 继承 TableFunction,定义输出类型为 Tuple2<String, Integer>public static class Split extends TableFunction<Tuple2<String, Integer>> {private String separator = ",";public Split(String separator) {this.separator = separator;}//必须定义一个 public 的返回值为 void 的 eval 方法public void eval(String str) {for(String s : str.split(separator)) {//使用 collect() 方法输出结果collect(new Tuple2<>(s, s.length()));}}}
}

3. 聚合函数

Aggregate Function,可以把一个表中的数据,聚合成一个标量值,多进一出

/**用户自定义聚合函数步骤:1.自定义函数类继承 AggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和getValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的聚合函数 AvgTemp,按 id 分组后求最新的平均温度值//2.创建自定义聚合函数实例AvgTemp avgTemp = new AvgTemp();//3.在环境中注册函数tableEnv.registerFunction("avgTemp", avgTemp);//4.使用//4.1 Table API//聚合函数必须在 groupBy 后的 aggregate 方法使用Table resultTable = sensorTable.groupBy("id").aggregate("avgTemp(temp) as avgtemp").select("id, avgtemp");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) from sensor group by id");tableEnv.toRetractStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义聚合函数 AvgTemp 继承 AggregateFunction 类,定义输出类型为 Double,中间累加器类型为 Tuple2<Double, Integer> 并实现方法public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>> {@Overridepublic Double getValue(Tuple2<Double, Integer> accumulator) {return accumulator.f0 / accumulator.f1;}@Overridepublic Tuple2<Double, Integer> createAccumulator() {return new Tuple2<>(0.0, 0);}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Tuple2<Double, Integer> accumulator, Double temp) {accumulator.f0 += temp;accumulator.f1 += 1;}}
}

4. 表聚合函数

Table Aggregate Function,可以把一个表中数据,聚合为具有多行和多列的结果表,多进多出

/**用户自定义表聚合函数步骤:1.自定义函数类继承 TableAggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和 emitValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表聚合函数 Top2Temp,按 id 分组后求最新的 top2 温度值//2.创建自定义表聚合函数实例Top2Temp top2Temp = new Top2Temp();//3.在环境中注册函数tableEnv.registerFunction("top2Temp", top2Temp);//4.使用//Table API//表聚合函数必须在 groupBy 后的 flatAggregate 方法使用Table resultTable = sensorTable.groupBy("id").flatAggregate("top2Temp(temp) as (temp, irank)").select("id, temp, irank");//表聚合函数目前不能在 SQL 中使用tableEnv.toRetractStream(resultTable, Row.class).print("result");env.execute();}//定义一个 Accumulatorpublic static class Top2TempAcc {private Double highestTemp = Double.MIN_VALUE;private Double secondHighestTemp = Double.MIN_VALUE;public Double getHighestTemp() {return highestTemp;}public void setHighestTemp(Double highestTemp) {this.highestTemp=highestTemp;}public Double getSecondHighestTemp() {return secondHighestTemp;}public void setSecondHighestTemp(Double secondHighestTemp) {this.secondHighestTemp=secondHighestTemp;}}//1.自定义表聚合函数 Top2Temp 继承 TableAggregateFunction 类,定义输出类型为 Tuple2<Double, Integer>,中间累加器类型为自定义的 Top2TempAcc 类并实现方法public static class Top2Temp extends TableAggregateFunction<Tuple2<Double, Integer>, Top2TempAcc> {@Overridepublic void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>> out) {out.collect(new Tuple2<>(acc.getHighestTemp(), 1));out.collect(new Tuple2<>(acc.getSecondHighestTemp(), 2));}@Overridepublic Top2TempAcc createAccumulator() {return new Top2TempAcc();}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Top2TempAcc acc, Double temp) {if(acc.getHighestTemp() < temp) {acc.setSecondHighestTemp(acc.getHighestTemp());acc.setHighestTemp(temp);} else if(acc.getSecondHighestTemp() < temp) {acc.setSecondHighestTemp(temp);}}}
}

相关文章:

详解 Flink Table API 和 Flink SQL 之函数

一、系统内置函数 1. 比较函数 API函数表达式示例Table API&#xff0c;>&#xff0c;<&#xff0c;!&#xff0c;>&#xff0c;<id1001&#xff0c;age>18SQL&#xff0c;>&#xff0c;<&#xff0c;!&#xff0c;>&#xff0c;<id‘1001’&…...

rsa加签验签C#和js以及java互通

js实现rsa加签验签 https://github.com/kjur/jsrsasign 11.1.0版本 解压选择需要的版本&#xff0c;这里选择all版本了 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>JS RSA加签验签</title&g…...

C语言中数组和指针的关系

在C语言中&#xff0c;数组和指针之间存在着密切的关系&#xff0c;尽管它们在概念上是不同的。以下是关于C语言中数组和指针关系的一些要点&#xff1a; 数组名作为指针&#xff1a; 在大多数情况下&#xff0c;数组名在表达式中会被当作指向其第一个元素的指针。例如&#x…...

idea 新建一个 JSP(JavaServer Pages)项目

环境设置&#xff1a; 确保你的开发环境中已经安装了 Java 开发工具包&#xff08;JDK&#xff09;和一个 Java Web 开发的集成开发环境&#xff08;IDE&#xff09;&#xff0c;比如 Eclipse、IntelliJ IDEA 或者 NetBeans。你还需要一个 Web 服务器&#xff0c;比如 Apache T…...

【名词解释】Unity中的表格布局组件及其使用示例

Unity中的表格布局组件通常指的是GridLayoutGroup&#xff0c;这是一个在Unity的UI系统中用来布局子对象的组件。它可以帮助开发者将UI元素按照网格的形式进行排列&#xff0c;非常适合创建表格、网格视图等布局。 名词解释&#xff1a; GridLayoutGroup&#xff1a;Unity UI…...

判断当前设备为移动端自适应 平板和pc端为375移动端样式

在libs的setRem.js中&#xff1a; let html document.querySelector("html"); function setRem() {let ui_w 375;let cl_w document.documentElement.clientWidth || document.body.clientWidth;cl_w > 750 ? cl_w 375 : "";html.style.fontSize …...

Science Advances|用于胃部pH监测和早期胃漏检测的生物可吸收无线无源柔性传感器(健康监测/柔性传感/柔性电子)

2024年4月19日,美国西北大学 John A. Rogers和中国科学技术大学吕頔(Di Lu)团队,在《Science Advances》上发布了一篇题为“Bioresorbable, wireless, passive sensors for continuous pH measurements and early detection of gastric leakage”的论文。论文内容如下: 一、…...

C# 使用 webview2 嵌入网页

需求&#xff1a;C#客户端程序, 窗口里嵌入一个web网页&#xff0c;可通过URL跳转的那种。并且&#xff0c;需要将登录的身份验证信息&#xff08;token&#xff09;设置到请求头里。 核心代码如下&#xff1a; // 打开按钮的点击事件 private void openBtn_Click(object sen…...

公司面试题总结(五)

25.谈一谈箭头函数与普通函数的区别&#xff0c;箭头函数主要解决什么问题&#xff1f; 箭头函数与普通函数的区别&#xff1a; ⚫ 语法简洁性&#xff1a; ◼ 箭头函数使用>符号定义&#xff0c;省略了 function 关键字&#xff0c;使得语法更为紧凑。 ◼ 对于单行函…...

Flutter笔记:关于WebView插件的用法(上)

Flutter笔记 关于WebView插件的用法&#xff08;上&#xff09; - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:htt…...

计算机毕业设计Python+Django农产品推荐系统 农产品爬虫 农产品商城 农产品大数据 农产品数据分析可视化 PySpark Hadoop Hive

课题研究的意义&#xff0c;国内外研究现状、水平和发展趋势 研究意义21世纪是一个信息爆炸的时代&#xff0c;人们在日常生活中可接触到的信息量非常之巨大。推荐系统逐步发展&#xff0c;其中又以个性化推荐系统最为瞩目。个性化推荐系统的核心在于个性化推荐算法&#xff0c…...

phpcms仿蚁乐购淘宝客网站模板

phpcms仿蚁乐购网站模板&#xff0c;淘宝客行业模板免费下载&#xff0c;该模板网站很容易吸引访客点击&#xff0c;提升ip流量和pv是非常有利的。本套模板采用现在非常流行的全屏自适应布局设计&#xff0c;且栏目列表以简洁&#xff0c;非常时尚大气。页面根据分辨率大小而自…...

leetcode695 岛屿的最大面积

题目 给你一个大小为 m x n 的二进制矩阵 grid 。 岛屿 是由一些相邻的 1 (代表土地) 构成的组合&#xff0c;这里的「相邻」要求两个 1 必须在 水平或者竖直的四个方向上 相邻。你可以假设 grid 的四个边缘都被 0&#xff08;代表水&#xff09;包围着。 岛屿的面积是岛上值…...

小程序无法调用服务端问题排查

1、问题描述 突然有一天线上的小程序不能登录&#xff0c;经查小程序无法调用。经查无法小程序页面无法调用后台服务。 2、排查过程 由于无法登录小程序发布服务器&#xff0c;无法测试小程序前端服务器到服务端网络&#xff0c;并且小程序无法看到日志。所以就得从服务端和网…...

Linux:多线程的操作

多线程操作 进程与线程线程的创建 create_pthread创建线程池给线程传入对象的指针 线程等待 pthread_join退出线程 pthread_exit线程等待参数 retval 与 线程退出参数 retval 线程中断 pthread_cancel获取线程编号 pthread_self线程分离 pthread_detach 进程与线程 进程是资源…...

kunpeng的aarch64架构cpu、openeuler系统、昇腾服务器适配文档转换功能(doc转docx、ppt转pptx)

一、安装flatpak sudo yum install flatpak flatpak remote-add --if-not-exists flathub https://flathub.org/repo/flathub.flatpakrepo二、安装libreoffice flatpak install flathub org.libreoffice.LibreOffice三、使用 对于使用 flatpak 安装的 LibreOffice,不需要手…...

unity 打包PC安装包中常见文件的功能

目录 前言 一、打包好的文件 二、常用文件 1.文件夹XXX_Data 2.文件夹MonoBleedingEdge 3.文件夹XXX_Data内部 三、文件的应用 1.如果你替换了一个图片 2.如果你新增了或减少了图片和资源 3.场景中有变动 4.resources代码加载的资源改了 5.如果你代码替换了 四、作…...

【Ardiuno】实验使用ESP32单片机实现高级web服务器暂时动态图表功能(图文)

接下来&#xff0c;我们继续实验示例代码中的Wifi“高级web服务器”&#xff0c;配置相关的无线密码后&#xff0c;开始实验 #include <WiFi.h> #include <WiFiClient.h> #include <WebServer.h> #include <ESPmDNS.h>const char *ssid "XIAOFE…...

深入浅出服务网格(Service Mesh):现代微服务架构的护航者

什么是服务网格&#xff1f; 服务网格是一种专用于处理微服务间通信的基础设施层&#xff0c;通常以轻量级代理&#xff08;sidecar&#xff09;的形式部署在每个服务实例旁边。它主要负责以下几项任务&#xff1a; 服务发现&#xff1a;自动检测和注册服务实例&#xff0c;使…...

node调试

vscode安装插件&#xff1a;JavaScript Debugger (Nightly) 点击后生成一个launch.json文件 打断点&#xff0c;并发送一个请求来执行代码到断点处 按右上的向下箭头&#xff0c;进入源码&#xff0c;进行查看&#xff0c;左边查看变量等值...

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文&#xff5c;魏琳华 编&#xff5c;王一粟 一场大会&#xff0c;聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中&#xff0c;汇集了学界、创业公司和大厂等三方的热门选手&#xff0c;关于多模态的集中讨论达到了前所未有的热度。其中&#xff0c;…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院挂号小程序

一、开发准备 ​​环境搭建​​&#xff1a; 安装DevEco Studio 3.0或更高版本配置HarmonyOS SDK申请开发者账号 ​​项目创建​​&#xff1a; File > New > Create Project > Application (选择"Empty Ability") 二、核心功能实现 1. 医院科室展示 /…...

让AI看见世界:MCP协议与服务器的工作原理

让AI看见世界&#xff1a;MCP协议与服务器的工作原理 MCP&#xff08;Model Context Protocol&#xff09;是一种创新的通信协议&#xff0c;旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天&#xff0c;MCP正成为连接AI与现实世界的重要桥梁。…...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

自然语言处理——循环神经网络

自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元&#xff08;GRU&#xff09;长短期记忆神经网络&#xff08;LSTM&#xff09…...

网络编程(UDP编程)

思维导图 UDP基础编程&#xff08;单播&#xff09; 1.流程图 服务器&#xff1a;短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...

Device Mapper 机制

Device Mapper 机制详解 Device Mapper&#xff08;简称 DM&#xff09;是 Linux 内核中的一套通用块设备映射框架&#xff0c;为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程&#xff0c;并配以详细的…...

深度学习水论文:mamba+图像增强

&#x1f9c0;当前视觉领域对高效长序列建模需求激增&#xff0c;对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模&#xff0c;以及动态计算优势&#xff0c;在图像质量提升和细节恢复方面有难以替代的作用。 &#x1f9c0;因此短时间内&#xff0c;就有不…...