详解 Flink Table API 和 Flink SQL 之函数
一、系统内置函数
1. 比较函数
| API | 函数表达式 | 示例 |
|---|---|---|
| Table API | ===,>,<,!=,>=,<= | id===1001,age>18 |
| SQL | =,>,<,!=,>=,<= | id=‘1001’,age>18 |
2. 逻辑函数
| API | 函数表达式 | 示例 |
|---|---|---|
| Table API | &&,||,!,.isFalse | 1>1 && 2>1,true.isFalse |
| SQL | and,or,is false,not | 1>1 and 2>1,true is false |
3. 算术函数
| API | 函数表达式 | 示例 |
|---|---|---|
| Table API | +,-,*,/,n1.power(n2) | 1 + 1,2.power(2) |
| SQL | +,-,*,/,power(n1,n2) | 1 + 1,power(2,2) |
4. 字符串函数
| API | 函数表达式 | 示例 |
|---|---|---|
| Table API | string1 + string2,.upperCase(),.lowerCase(),.charLength() | ‘a’ + ‘b’, ‘hello’.upperCase() |
| SQL | string1 || string2,upper(),lower(),char_length() | ‘a’ || ‘b’,upper(‘hello’) |
5. 时间函数
| API | 函数表达式 | 示例 |
|---|---|---|
| Table API | .toDate,.toTimestamp,currentTime(),n.days,n.minutes | ‘20230107’.toDate, 2.days,10.minutes |
| SQL | Date string,Timestamp string,current_time,interval string range | Date ‘20230107’,interval ‘2’ hour/second/minute/day |
6. 聚合函数
| API | 函数表达式 | 示例 |
|---|---|---|
| Table API | .count,.sum,.sum0 | id.count,age.sum,sum0 表示求和的所有值都为 null 则返回 0 |
| SQL | count(),sum(),rank(),row_number() | count(*),sum(age) |
二、用户自定义函数(UDF)
UDF 显著地扩展了查询的表达能力,可以解决一些系统内置函数无法解决的需求。使用步骤为:自定义 UDF 函数类继承 UserDefinedFunction 抽象类;创建 UDF 实例并在环境中调用 registerFunction() 方法注册;在 Table API 或 SQL 中使用
1. 标量函数
Scalar Function,可以将 0、1 或多个标量值,映射到一个新的标量值,一进一出
/**用户自定义标量函数步骤:1.自定义函数类继承 ScalarFunction 抽象类,并在类中定义一个 public 的名为 eval 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestScalarFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的标量函数 hashCode,查询每条数据 id 的hashCode 值//2.创建自定义标量函数实例HashCode hashCode = new HashCode(0.8);//3.在环境中注册函数tableEnv.registerFunction("hashCode", hashCode);//4.使用//4.1 Table APITable resultTable = sensorTable.select("id, ts, hashCode(id)");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义 HashCode 函数,继承 ScalarFunction 类,并定义 eval 方法public static class HashCode extends ScalarFunction {private int factor;public HashCode(int factor) {this.factor = factor;}//该方法必须为 public 且名称必须为 eval,返回值和参数可以自定义public int eval(String str) {return str.hashCode() * factor;}}
}
2. 表函数
Table Function,可以将0、1或多个标量值作为输入参数,可以返回任意数量的行作为输出。一进多出
/**用户自定义表函数步骤:1.自定义函数类继承 TableFunction 抽象类,定义输出泛型,并在类中定义一个 public 的名为 eval,返回值为 void 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestTableFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表函数 Split,将每条数据的 id 分割并按 (word, length) 输出//2.创建自定义表函数实例Split split = new Split("_");//3.在环境中注册函数tableEnv.registerFunction("split", split);//4.使用//4.1 Table API//一进多出函数要配合 joinLateral 使用,侧写表Table resultTable = sensorTable.joinLateral("split(id) as (word, length)").select("id, ts, word, length");//4.2 SQL//一进多出函数要进行 lateral table 的关联,类似于 lateral viewTable resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length from sensor, lateral table(split(id)) as lt(word, length)");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义表函数 Split 继承 TableFunction,定义输出类型为 Tuple2<String, Integer>public static class Split extends TableFunction<Tuple2<String, Integer>> {private String separator = ",";public Split(String separator) {this.separator = separator;}//必须定义一个 public 的返回值为 void 的 eval 方法public void eval(String str) {for(String s : str.split(separator)) {//使用 collect() 方法输出结果collect(new Tuple2<>(s, s.length()));}}}
}
3. 聚合函数
Aggregate Function,可以把一个表中的数据,聚合成一个标量值,多进一出
/**用户自定义聚合函数步骤:1.自定义函数类继承 AggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和getValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的聚合函数 AvgTemp,按 id 分组后求最新的平均温度值//2.创建自定义聚合函数实例AvgTemp avgTemp = new AvgTemp();//3.在环境中注册函数tableEnv.registerFunction("avgTemp", avgTemp);//4.使用//4.1 Table API//聚合函数必须在 groupBy 后的 aggregate 方法使用Table resultTable = sensorTable.groupBy("id").aggregate("avgTemp(temp) as avgtemp").select("id, avgtemp");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) from sensor group by id");tableEnv.toRetractStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义聚合函数 AvgTemp 继承 AggregateFunction 类,定义输出类型为 Double,中间累加器类型为 Tuple2<Double, Integer> 并实现方法public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>> {@Overridepublic Double getValue(Tuple2<Double, Integer> accumulator) {return accumulator.f0 / accumulator.f1;}@Overridepublic Tuple2<Double, Integer> createAccumulator() {return new Tuple2<>(0.0, 0);}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Tuple2<Double, Integer> accumulator, Double temp) {accumulator.f0 += temp;accumulator.f1 += 1;}}
}
4. 表聚合函数
Table Aggregate Function,可以把一个表中数据,聚合为具有多行和多列的结果表,多进多出
/**用户自定义表聚合函数步骤:1.自定义函数类继承 TableAggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和 emitValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表聚合函数 Top2Temp,按 id 分组后求最新的 top2 温度值//2.创建自定义表聚合函数实例Top2Temp top2Temp = new Top2Temp();//3.在环境中注册函数tableEnv.registerFunction("top2Temp", top2Temp);//4.使用//Table API//表聚合函数必须在 groupBy 后的 flatAggregate 方法使用Table resultTable = sensorTable.groupBy("id").flatAggregate("top2Temp(temp) as (temp, irank)").select("id, temp, irank");//表聚合函数目前不能在 SQL 中使用tableEnv.toRetractStream(resultTable, Row.class).print("result");env.execute();}//定义一个 Accumulatorpublic static class Top2TempAcc {private Double highestTemp = Double.MIN_VALUE;private Double secondHighestTemp = Double.MIN_VALUE;public Double getHighestTemp() {return highestTemp;}public void setHighestTemp(Double highestTemp) {this.highestTemp=highestTemp;}public Double getSecondHighestTemp() {return secondHighestTemp;}public void setSecondHighestTemp(Double secondHighestTemp) {this.secondHighestTemp=secondHighestTemp;}}//1.自定义表聚合函数 Top2Temp 继承 TableAggregateFunction 类,定义输出类型为 Tuple2<Double, Integer>,中间累加器类型为自定义的 Top2TempAcc 类并实现方法public static class Top2Temp extends TableAggregateFunction<Tuple2<Double, Integer>, Top2TempAcc> {@Overridepublic void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>> out) {out.collect(new Tuple2<>(acc.getHighestTemp(), 1));out.collect(new Tuple2<>(acc.getSecondHighestTemp(), 2));}@Overridepublic Top2TempAcc createAccumulator() {return new Top2TempAcc();}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Top2TempAcc acc, Double temp) {if(acc.getHighestTemp() < temp) {acc.setSecondHighestTemp(acc.getHighestTemp());acc.setHighestTemp(temp);} else if(acc.getSecondHighestTemp() < temp) {acc.setSecondHighestTemp(temp);}}}
}
相关文章:
详解 Flink Table API 和 Flink SQL 之函数
一、系统内置函数 1. 比较函数 API函数表达式示例Table API,>,<,!,>,<id1001,age>18SQL,>,<,!,>,<id‘1001’&…...
rsa加签验签C#和js以及java互通
js实现rsa加签验签 https://github.com/kjur/jsrsasign 11.1.0版本 解压选择需要的版本,这里选择all版本了 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>JS RSA加签验签</title&g…...
C语言中数组和指针的关系
在C语言中,数组和指针之间存在着密切的关系,尽管它们在概念上是不同的。以下是关于C语言中数组和指针关系的一些要点: 数组名作为指针: 在大多数情况下,数组名在表达式中会被当作指向其第一个元素的指针。例如&#x…...
idea 新建一个 JSP(JavaServer Pages)项目
环境设置: 确保你的开发环境中已经安装了 Java 开发工具包(JDK)和一个 Java Web 开发的集成开发环境(IDE),比如 Eclipse、IntelliJ IDEA 或者 NetBeans。你还需要一个 Web 服务器,比如 Apache T…...
【名词解释】Unity中的表格布局组件及其使用示例
Unity中的表格布局组件通常指的是GridLayoutGroup,这是一个在Unity的UI系统中用来布局子对象的组件。它可以帮助开发者将UI元素按照网格的形式进行排列,非常适合创建表格、网格视图等布局。 名词解释: GridLayoutGroup:Unity UI…...
判断当前设备为移动端自适应 平板和pc端为375移动端样式
在libs的setRem.js中: let html document.querySelector("html"); function setRem() {let ui_w 375;let cl_w document.documentElement.clientWidth || document.body.clientWidth;cl_w > 750 ? cl_w 375 : "";html.style.fontSize …...
Science Advances|用于胃部pH监测和早期胃漏检测的生物可吸收无线无源柔性传感器(健康监测/柔性传感/柔性电子)
2024年4月19日,美国西北大学 John A. Rogers和中国科学技术大学吕頔(Di Lu)团队,在《Science Advances》上发布了一篇题为“Bioresorbable, wireless, passive sensors for continuous pH measurements and early detection of gastric leakage”的论文。论文内容如下: 一、…...
C# 使用 webview2 嵌入网页
需求:C#客户端程序, 窗口里嵌入一个web网页,可通过URL跳转的那种。并且,需要将登录的身份验证信息(token)设置到请求头里。 核心代码如下: // 打开按钮的点击事件 private void openBtn_Click(object sen…...
公司面试题总结(五)
25.谈一谈箭头函数与普通函数的区别,箭头函数主要解决什么问题? 箭头函数与普通函数的区别: ⚫ 语法简洁性: ◼ 箭头函数使用>符号定义,省略了 function 关键字,使得语法更为紧凑。 ◼ 对于单行函…...
Flutter笔记:关于WebView插件的用法(上)
Flutter笔记 关于WebView插件的用法(上) - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite:http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:htt…...
计算机毕业设计Python+Django农产品推荐系统 农产品爬虫 农产品商城 农产品大数据 农产品数据分析可视化 PySpark Hadoop Hive
课题研究的意义,国内外研究现状、水平和发展趋势 研究意义21世纪是一个信息爆炸的时代,人们在日常生活中可接触到的信息量非常之巨大。推荐系统逐步发展,其中又以个性化推荐系统最为瞩目。个性化推荐系统的核心在于个性化推荐算法,…...
phpcms仿蚁乐购淘宝客网站模板
phpcms仿蚁乐购网站模板,淘宝客行业模板免费下载,该模板网站很容易吸引访客点击,提升ip流量和pv是非常有利的。本套模板采用现在非常流行的全屏自适应布局设计,且栏目列表以简洁,非常时尚大气。页面根据分辨率大小而自…...
leetcode695 岛屿的最大面积
题目 给你一个大小为 m x n 的二进制矩阵 grid 。 岛屿 是由一些相邻的 1 (代表土地) 构成的组合,这里的「相邻」要求两个 1 必须在 水平或者竖直的四个方向上 相邻。你可以假设 grid 的四个边缘都被 0(代表水)包围着。 岛屿的面积是岛上值…...
小程序无法调用服务端问题排查
1、问题描述 突然有一天线上的小程序不能登录,经查小程序无法调用。经查无法小程序页面无法调用后台服务。 2、排查过程 由于无法登录小程序发布服务器,无法测试小程序前端服务器到服务端网络,并且小程序无法看到日志。所以就得从服务端和网…...
Linux:多线程的操作
多线程操作 进程与线程线程的创建 create_pthread创建线程池给线程传入对象的指针 线程等待 pthread_join退出线程 pthread_exit线程等待参数 retval 与 线程退出参数 retval 线程中断 pthread_cancel获取线程编号 pthread_self线程分离 pthread_detach 进程与线程 进程是资源…...
kunpeng的aarch64架构cpu、openeuler系统、昇腾服务器适配文档转换功能(doc转docx、ppt转pptx)
一、安装flatpak sudo yum install flatpak flatpak remote-add --if-not-exists flathub https://flathub.org/repo/flathub.flatpakrepo二、安装libreoffice flatpak install flathub org.libreoffice.LibreOffice三、使用 对于使用 flatpak 安装的 LibreOffice,不需要手…...
unity 打包PC安装包中常见文件的功能
目录 前言 一、打包好的文件 二、常用文件 1.文件夹XXX_Data 2.文件夹MonoBleedingEdge 3.文件夹XXX_Data内部 三、文件的应用 1.如果你替换了一个图片 2.如果你新增了或减少了图片和资源 3.场景中有变动 4.resources代码加载的资源改了 5.如果你代码替换了 四、作…...
【Ardiuno】实验使用ESP32单片机实现高级web服务器暂时动态图表功能(图文)
接下来,我们继续实验示例代码中的Wifi“高级web服务器”,配置相关的无线密码后,开始实验 #include <WiFi.h> #include <WiFiClient.h> #include <WebServer.h> #include <ESPmDNS.h>const char *ssid "XIAOFE…...
深入浅出服务网格(Service Mesh):现代微服务架构的护航者
什么是服务网格? 服务网格是一种专用于处理微服务间通信的基础设施层,通常以轻量级代理(sidecar)的形式部署在每个服务实例旁边。它主要负责以下几项任务: 服务发现:自动检测和注册服务实例,使…...
node调试
vscode安装插件:JavaScript Debugger (Nightly) 点击后生成一个launch.json文件 打断点,并发送一个请求来执行代码到断点处 按右上的向下箭头,进入源码,进行查看,左边查看变量等值...
K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...
Python爬虫实战:研究feedparser库相关技术
1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...
系统设计 --- MongoDB亿级数据查询优化策略
系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log,共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题,不能使用ELK只能使用…...
服务器硬防的应用场景都有哪些?
服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式,避免服务器受到各种恶意攻击和网络威胁,那么,服务器硬防通常都会应用在哪些场景当中呢? 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...
Java 加密常用的各种算法及其选择
在数字化时代,数据安全至关重要,Java 作为广泛应用的编程语言,提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景,有助于开发者在不同的业务需求中做出正确的选择。 一、对称加密算法…...
Linux --进程控制
本文从以下五个方面来初步认识进程控制: 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程,创建出来的进程就是子进程,原来的进程为父进程。…...
Java 二维码
Java 二维码 **技术:**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...
JVM 内存结构 详解
内存结构 运行时数据区: Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器: 线程私有,程序控制流的指示器,分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 每个线程都有一个程序计数…...
Spring Boot + MyBatis 集成支付宝支付流程
Spring Boot MyBatis 集成支付宝支付流程 核心流程 商户系统生成订单调用支付宝创建预支付订单用户跳转支付宝完成支付支付宝异步通知支付结果商户处理支付结果更新订单状态支付宝同步跳转回商户页面 代码实现示例(电脑网站支付) 1. 添加依赖 <!…...
背包问题双雄:01 背包与完全背包详解(Java 实现)
一、背包问题概述 背包问题是动态规划领域的经典问题,其核心在于如何在有限容量的背包中选择物品,使得总价值最大化。根据物品选择规则的不同,主要分为两类: 01 背包:每件物品最多选 1 次(选或不选&#…...
