当前位置: 首页 > news >正文

详解 Flink Table API 和 Flink SQL 之函数

一、系统内置函数

1. 比较函数

API函数表达式示例
Table API===,>,<,!=,>=,<=id===1001,age>18
SQL=,>,<,!=,>=,<=id=‘1001’,age>18

2. 逻辑函数

API函数表达式示例
Table API&&,||,!,.isFalse1>1 && 2>1,true.isFalse
SQLand,or,is false,not1>1 and 2>1,true is false

3. 算术函数

API函数表达式示例
Table API+,-,*,/,n1.power(n2)1 + 1,2.power(2)
SQL+,-,*,/,power(n1,n2)1 + 1,power(2,2)

4. 字符串函数

API函数表达式示例
Table APIstring1 + string2,.upperCase(),.lowerCase(),.charLength()‘a’ + ‘b’,
‘hello’.upperCase()
SQLstring1 || string2,upper(),lower(),char_length()‘a’ || ‘b’,upper(‘hello’)

5. 时间函数

API函数表达式示例
Table API.toDate,.toTimestamp,currentTime(),n.days,n.minutes‘20230107’.toDate,
2.days,10.minutes
SQLDate string,Timestamp string,current_time,interval string rangeDate ‘20230107’,interval ‘2’ hour/second/minute/day

6. 聚合函数

API函数表达式示例
Table API.count,.sum,.sum0id.count,age.sum,sum0 表示求和的所有值都为 null 则返回 0
SQLcount(),sum(),rank(),row_number()count(*),sum(age)

二、用户自定义函数(UDF)

UDF 显著地扩展了查询的表达能力,可以解决一些系统内置函数无法解决的需求。使用步骤为:自定义 UDF 函数类继承 UserDefinedFunction 抽象类;创建 UDF 实例并在环境中调用 registerFunction() 方法注册;在 Table API 或 SQL 中使用

1. 标量函数

Scalar Function,可以将 0、1 或多个标量值,映射到一个新的标量值,一进一出

/**用户自定义标量函数步骤:1.自定义函数类继承 ScalarFunction 抽象类,并在类中定义一个 public 的名为 eval 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestScalarFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的标量函数 hashCode,查询每条数据 id 的hashCode 值//2.创建自定义标量函数实例HashCode hashCode = new HashCode(0.8);//3.在环境中注册函数tableEnv.registerFunction("hashCode", hashCode);//4.使用//4.1 Table APITable resultTable = sensorTable.select("id, ts, hashCode(id)");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义 HashCode 函数,继承 ScalarFunction 类,并定义 eval 方法public static class HashCode extends ScalarFunction {private int factor;public HashCode(int factor) {this.factor = factor;}//该方法必须为 public 且名称必须为 eval,返回值和参数可以自定义public int eval(String str) {return str.hashCode() * factor;}}
}

2. 表函数

Table Function,可以将0、1或多个标量值作为输入参数,可以返回任意数量的行作为输出。一进多出

/**用户自定义表函数步骤:1.自定义函数类继承 TableFunction 抽象类,定义输出泛型,并在类中定义一个 public 的名为 eval,返回值为 void 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestTableFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表函数 Split,将每条数据的 id 分割并按 (word, length) 输出//2.创建自定义表函数实例Split split = new Split("_");//3.在环境中注册函数tableEnv.registerFunction("split", split);//4.使用//4.1 Table API//一进多出函数要配合 joinLateral 使用,侧写表Table resultTable = sensorTable.joinLateral("split(id) as (word, length)").select("id, ts, word, length");//4.2 SQL//一进多出函数要进行 lateral table 的关联,类似于 lateral viewTable resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length from sensor, lateral table(split(id)) as lt(word, length)");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义表函数 Split 继承 TableFunction,定义输出类型为 Tuple2<String, Integer>public static class Split extends TableFunction<Tuple2<String, Integer>> {private String separator = ",";public Split(String separator) {this.separator = separator;}//必须定义一个 public 的返回值为 void 的 eval 方法public void eval(String str) {for(String s : str.split(separator)) {//使用 collect() 方法输出结果collect(new Tuple2<>(s, s.length()));}}}
}

3. 聚合函数

Aggregate Function,可以把一个表中的数据,聚合成一个标量值,多进一出

/**用户自定义聚合函数步骤:1.自定义函数类继承 AggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和getValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的聚合函数 AvgTemp,按 id 分组后求最新的平均温度值//2.创建自定义聚合函数实例AvgTemp avgTemp = new AvgTemp();//3.在环境中注册函数tableEnv.registerFunction("avgTemp", avgTemp);//4.使用//4.1 Table API//聚合函数必须在 groupBy 后的 aggregate 方法使用Table resultTable = sensorTable.groupBy("id").aggregate("avgTemp(temp) as avgtemp").select("id, avgtemp");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) from sensor group by id");tableEnv.toRetractStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义聚合函数 AvgTemp 继承 AggregateFunction 类,定义输出类型为 Double,中间累加器类型为 Tuple2<Double, Integer> 并实现方法public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>> {@Overridepublic Double getValue(Tuple2<Double, Integer> accumulator) {return accumulator.f0 / accumulator.f1;}@Overridepublic Tuple2<Double, Integer> createAccumulator() {return new Tuple2<>(0.0, 0);}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Tuple2<Double, Integer> accumulator, Double temp) {accumulator.f0 += temp;accumulator.f1 += 1;}}
}

4. 表聚合函数

Table Aggregate Function,可以把一个表中数据,聚合为具有多行和多列的结果表,多进多出

/**用户自定义表聚合函数步骤:1.自定义函数类继承 TableAggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和 emitValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表聚合函数 Top2Temp,按 id 分组后求最新的 top2 温度值//2.创建自定义表聚合函数实例Top2Temp top2Temp = new Top2Temp();//3.在环境中注册函数tableEnv.registerFunction("top2Temp", top2Temp);//4.使用//Table API//表聚合函数必须在 groupBy 后的 flatAggregate 方法使用Table resultTable = sensorTable.groupBy("id").flatAggregate("top2Temp(temp) as (temp, irank)").select("id, temp, irank");//表聚合函数目前不能在 SQL 中使用tableEnv.toRetractStream(resultTable, Row.class).print("result");env.execute();}//定义一个 Accumulatorpublic static class Top2TempAcc {private Double highestTemp = Double.MIN_VALUE;private Double secondHighestTemp = Double.MIN_VALUE;public Double getHighestTemp() {return highestTemp;}public void setHighestTemp(Double highestTemp) {this.highestTemp=highestTemp;}public Double getSecondHighestTemp() {return secondHighestTemp;}public void setSecondHighestTemp(Double secondHighestTemp) {this.secondHighestTemp=secondHighestTemp;}}//1.自定义表聚合函数 Top2Temp 继承 TableAggregateFunction 类,定义输出类型为 Tuple2<Double, Integer>,中间累加器类型为自定义的 Top2TempAcc 类并实现方法public static class Top2Temp extends TableAggregateFunction<Tuple2<Double, Integer>, Top2TempAcc> {@Overridepublic void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>> out) {out.collect(new Tuple2<>(acc.getHighestTemp(), 1));out.collect(new Tuple2<>(acc.getSecondHighestTemp(), 2));}@Overridepublic Top2TempAcc createAccumulator() {return new Top2TempAcc();}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Top2TempAcc acc, Double temp) {if(acc.getHighestTemp() < temp) {acc.setSecondHighestTemp(acc.getHighestTemp());acc.setHighestTemp(temp);} else if(acc.getSecondHighestTemp() < temp) {acc.setSecondHighestTemp(temp);}}}
}

相关文章:

详解 Flink Table API 和 Flink SQL 之函数

一、系统内置函数 1. 比较函数 API函数表达式示例Table API&#xff0c;>&#xff0c;<&#xff0c;!&#xff0c;>&#xff0c;<id1001&#xff0c;age>18SQL&#xff0c;>&#xff0c;<&#xff0c;!&#xff0c;>&#xff0c;<id‘1001’&…...

rsa加签验签C#和js以及java互通

js实现rsa加签验签 https://github.com/kjur/jsrsasign 11.1.0版本 解压选择需要的版本&#xff0c;这里选择all版本了 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>JS RSA加签验签</title&g…...

C语言中数组和指针的关系

在C语言中&#xff0c;数组和指针之间存在着密切的关系&#xff0c;尽管它们在概念上是不同的。以下是关于C语言中数组和指针关系的一些要点&#xff1a; 数组名作为指针&#xff1a; 在大多数情况下&#xff0c;数组名在表达式中会被当作指向其第一个元素的指针。例如&#x…...

idea 新建一个 JSP(JavaServer Pages)项目

环境设置&#xff1a; 确保你的开发环境中已经安装了 Java 开发工具包&#xff08;JDK&#xff09;和一个 Java Web 开发的集成开发环境&#xff08;IDE&#xff09;&#xff0c;比如 Eclipse、IntelliJ IDEA 或者 NetBeans。你还需要一个 Web 服务器&#xff0c;比如 Apache T…...

【名词解释】Unity中的表格布局组件及其使用示例

Unity中的表格布局组件通常指的是GridLayoutGroup&#xff0c;这是一个在Unity的UI系统中用来布局子对象的组件。它可以帮助开发者将UI元素按照网格的形式进行排列&#xff0c;非常适合创建表格、网格视图等布局。 名词解释&#xff1a; GridLayoutGroup&#xff1a;Unity UI…...

判断当前设备为移动端自适应 平板和pc端为375移动端样式

在libs的setRem.js中&#xff1a; let html document.querySelector("html"); function setRem() {let ui_w 375;let cl_w document.documentElement.clientWidth || document.body.clientWidth;cl_w > 750 ? cl_w 375 : "";html.style.fontSize …...

Science Advances|用于胃部pH监测和早期胃漏检测的生物可吸收无线无源柔性传感器(健康监测/柔性传感/柔性电子)

2024年4月19日,美国西北大学 John A. Rogers和中国科学技术大学吕頔(Di Lu)团队,在《Science Advances》上发布了一篇题为“Bioresorbable, wireless, passive sensors for continuous pH measurements and early detection of gastric leakage”的论文。论文内容如下: 一、…...

C# 使用 webview2 嵌入网页

需求&#xff1a;C#客户端程序, 窗口里嵌入一个web网页&#xff0c;可通过URL跳转的那种。并且&#xff0c;需要将登录的身份验证信息&#xff08;token&#xff09;设置到请求头里。 核心代码如下&#xff1a; // 打开按钮的点击事件 private void openBtn_Click(object sen…...

公司面试题总结(五)

25.谈一谈箭头函数与普通函数的区别&#xff0c;箭头函数主要解决什么问题&#xff1f; 箭头函数与普通函数的区别&#xff1a; ⚫ 语法简洁性&#xff1a; ◼ 箭头函数使用>符号定义&#xff0c;省略了 function 关键字&#xff0c;使得语法更为紧凑。 ◼ 对于单行函…...

Flutter笔记:关于WebView插件的用法(上)

Flutter笔记 关于WebView插件的用法&#xff08;上&#xff09; - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:htt…...

计算机毕业设计Python+Django农产品推荐系统 农产品爬虫 农产品商城 农产品大数据 农产品数据分析可视化 PySpark Hadoop Hive

课题研究的意义&#xff0c;国内外研究现状、水平和发展趋势 研究意义21世纪是一个信息爆炸的时代&#xff0c;人们在日常生活中可接触到的信息量非常之巨大。推荐系统逐步发展&#xff0c;其中又以个性化推荐系统最为瞩目。个性化推荐系统的核心在于个性化推荐算法&#xff0c…...

phpcms仿蚁乐购淘宝客网站模板

phpcms仿蚁乐购网站模板&#xff0c;淘宝客行业模板免费下载&#xff0c;该模板网站很容易吸引访客点击&#xff0c;提升ip流量和pv是非常有利的。本套模板采用现在非常流行的全屏自适应布局设计&#xff0c;且栏目列表以简洁&#xff0c;非常时尚大气。页面根据分辨率大小而自…...

leetcode695 岛屿的最大面积

题目 给你一个大小为 m x n 的二进制矩阵 grid 。 岛屿 是由一些相邻的 1 (代表土地) 构成的组合&#xff0c;这里的「相邻」要求两个 1 必须在 水平或者竖直的四个方向上 相邻。你可以假设 grid 的四个边缘都被 0&#xff08;代表水&#xff09;包围着。 岛屿的面积是岛上值…...

小程序无法调用服务端问题排查

1、问题描述 突然有一天线上的小程序不能登录&#xff0c;经查小程序无法调用。经查无法小程序页面无法调用后台服务。 2、排查过程 由于无法登录小程序发布服务器&#xff0c;无法测试小程序前端服务器到服务端网络&#xff0c;并且小程序无法看到日志。所以就得从服务端和网…...

Linux:多线程的操作

多线程操作 进程与线程线程的创建 create_pthread创建线程池给线程传入对象的指针 线程等待 pthread_join退出线程 pthread_exit线程等待参数 retval 与 线程退出参数 retval 线程中断 pthread_cancel获取线程编号 pthread_self线程分离 pthread_detach 进程与线程 进程是资源…...

kunpeng的aarch64架构cpu、openeuler系统、昇腾服务器适配文档转换功能(doc转docx、ppt转pptx)

一、安装flatpak sudo yum install flatpak flatpak remote-add --if-not-exists flathub https://flathub.org/repo/flathub.flatpakrepo二、安装libreoffice flatpak install flathub org.libreoffice.LibreOffice三、使用 对于使用 flatpak 安装的 LibreOffice,不需要手…...

unity 打包PC安装包中常见文件的功能

目录 前言 一、打包好的文件 二、常用文件 1.文件夹XXX_Data 2.文件夹MonoBleedingEdge 3.文件夹XXX_Data内部 三、文件的应用 1.如果你替换了一个图片 2.如果你新增了或减少了图片和资源 3.场景中有变动 4.resources代码加载的资源改了 5.如果你代码替换了 四、作…...

【Ardiuno】实验使用ESP32单片机实现高级web服务器暂时动态图表功能(图文)

接下来&#xff0c;我们继续实验示例代码中的Wifi“高级web服务器”&#xff0c;配置相关的无线密码后&#xff0c;开始实验 #include <WiFi.h> #include <WiFiClient.h> #include <WebServer.h> #include <ESPmDNS.h>const char *ssid "XIAOFE…...

深入浅出服务网格(Service Mesh):现代微服务架构的护航者

什么是服务网格&#xff1f; 服务网格是一种专用于处理微服务间通信的基础设施层&#xff0c;通常以轻量级代理&#xff08;sidecar&#xff09;的形式部署在每个服务实例旁边。它主要负责以下几项任务&#xff1a; 服务发现&#xff1a;自动检测和注册服务实例&#xff0c;使…...

node调试

vscode安装插件&#xff1a;JavaScript Debugger (Nightly) 点击后生成一个launch.json文件 打断点&#xff0c;并发送一个请求来执行代码到断点处 按右上的向下箭头&#xff0c;进入源码&#xff0c;进行查看&#xff0c;左边查看变量等值...

[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?

&#x1f9e0; 智能合约中的数据是如何在区块链中保持一致的&#xff1f; 为什么所有区块链节点都能得出相同结果&#xff1f;合约调用这么复杂&#xff0c;状态真能保持一致吗&#xff1f;本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里&#xf…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

<6>-MySQL表的增删查改

目录 一&#xff0c;create&#xff08;创建表&#xff09; 二&#xff0c;retrieve&#xff08;查询表&#xff09; 1&#xff0c;select列 2&#xff0c;where条件 三&#xff0c;update&#xff08;更新表&#xff09; 四&#xff0c;delete&#xff08;删除表&#xf…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统&#xff0c;智慧工地全套源码&#xff0c;java版智慧工地源码&#xff0c;支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求&#xff0c;提供“平台网络终端”的整体解决方案&#xff0c;提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

MVC 数据库

MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...

如何为服务器生成TLS证书

TLS&#xff08;Transport Layer Security&#xff09;证书是确保网络通信安全的重要手段&#xff0c;它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书&#xff0c;可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...

uniapp中使用aixos 报错

问题&#xff1a; 在uniapp中使用aixos&#xff0c;运行后报如下错误&#xff1a; AxiosError: There is no suitable adapter to dispatch the request since : - adapter xhr is not supported by the environment - adapter http is not available in the build 解决方案&…...

Android第十三次面试总结(四大 组件基础)

Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成&#xff0c;用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机&#xff1a; ​onCreate()​​ ​调用时机​&#xff1a;Activity 首次创建时调用。​…...

jmeter聚合报告中参数详解

sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample&#xff08;样本数&#xff09; 表示测试中发送的请求数量&#xff0c;即测试执行了多少次请求。 单位&#xff0c;以个或者次数表示。 示例&#xff1a;…...

作为测试我们应该关注redis哪些方面

1、功能测试 数据结构操作&#xff1a;验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化&#xff1a;测试aof和aof持久化机制&#xff0c;确保数据在开启后正确恢复。 事务&#xff1a;检查事务的原子性和回滚机制。 发布订阅&#xff1a;确保消息正确传递。 2、性…...