详解 Flink Table API 和 Flink SQL 之函数
一、系统内置函数
1. 比较函数
| API | 函数表达式 | 示例 |
|---|---|---|
| Table API | ===,>,<,!=,>=,<= | id===1001,age>18 |
| SQL | =,>,<,!=,>=,<= | id=‘1001’,age>18 |
2. 逻辑函数
| API | 函数表达式 | 示例 |
|---|---|---|
| Table API | &&,||,!,.isFalse | 1>1 && 2>1,true.isFalse |
| SQL | and,or,is false,not | 1>1 and 2>1,true is false |
3. 算术函数
| API | 函数表达式 | 示例 |
|---|---|---|
| Table API | +,-,*,/,n1.power(n2) | 1 + 1,2.power(2) |
| SQL | +,-,*,/,power(n1,n2) | 1 + 1,power(2,2) |
4. 字符串函数
| API | 函数表达式 | 示例 |
|---|---|---|
| Table API | string1 + string2,.upperCase(),.lowerCase(),.charLength() | ‘a’ + ‘b’, ‘hello’.upperCase() |
| SQL | string1 || string2,upper(),lower(),char_length() | ‘a’ || ‘b’,upper(‘hello’) |
5. 时间函数
| API | 函数表达式 | 示例 |
|---|---|---|
| Table API | .toDate,.toTimestamp,currentTime(),n.days,n.minutes | ‘20230107’.toDate, 2.days,10.minutes |
| SQL | Date string,Timestamp string,current_time,interval string range | Date ‘20230107’,interval ‘2’ hour/second/minute/day |
6. 聚合函数
| API | 函数表达式 | 示例 |
|---|---|---|
| Table API | .count,.sum,.sum0 | id.count,age.sum,sum0 表示求和的所有值都为 null 则返回 0 |
| SQL | count(),sum(),rank(),row_number() | count(*),sum(age) |
二、用户自定义函数(UDF)
UDF 显著地扩展了查询的表达能力,可以解决一些系统内置函数无法解决的需求。使用步骤为:自定义 UDF 函数类继承 UserDefinedFunction 抽象类;创建 UDF 实例并在环境中调用 registerFunction() 方法注册;在 Table API 或 SQL 中使用
1. 标量函数
Scalar Function,可以将 0、1 或多个标量值,映射到一个新的标量值,一进一出
/**用户自定义标量函数步骤:1.自定义函数类继承 ScalarFunction 抽象类,并在类中定义一个 public 的名为 eval 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestScalarFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的标量函数 hashCode,查询每条数据 id 的hashCode 值//2.创建自定义标量函数实例HashCode hashCode = new HashCode(0.8);//3.在环境中注册函数tableEnv.registerFunction("hashCode", hashCode);//4.使用//4.1 Table APITable resultTable = sensorTable.select("id, ts, hashCode(id)");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义 HashCode 函数,继承 ScalarFunction 类,并定义 eval 方法public static class HashCode extends ScalarFunction {private int factor;public HashCode(int factor) {this.factor = factor;}//该方法必须为 public 且名称必须为 eval,返回值和参数可以自定义public int eval(String str) {return str.hashCode() * factor;}}
}
2. 表函数
Table Function,可以将0、1或多个标量值作为输入参数,可以返回任意数量的行作为输出。一进多出
/**用户自定义表函数步骤:1.自定义函数类继承 TableFunction 抽象类,定义输出泛型,并在类中定义一个 public 的名为 eval,返回值为 void 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestTableFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表函数 Split,将每条数据的 id 分割并按 (word, length) 输出//2.创建自定义表函数实例Split split = new Split("_");//3.在环境中注册函数tableEnv.registerFunction("split", split);//4.使用//4.1 Table API//一进多出函数要配合 joinLateral 使用,侧写表Table resultTable = sensorTable.joinLateral("split(id) as (word, length)").select("id, ts, word, length");//4.2 SQL//一进多出函数要进行 lateral table 的关联,类似于 lateral viewTable resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length from sensor, lateral table(split(id)) as lt(word, length)");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义表函数 Split 继承 TableFunction,定义输出类型为 Tuple2<String, Integer>public static class Split extends TableFunction<Tuple2<String, Integer>> {private String separator = ",";public Split(String separator) {this.separator = separator;}//必须定义一个 public 的返回值为 void 的 eval 方法public void eval(String str) {for(String s : str.split(separator)) {//使用 collect() 方法输出结果collect(new Tuple2<>(s, s.length()));}}}
}
3. 聚合函数
Aggregate Function,可以把一个表中的数据,聚合成一个标量值,多进一出
/**用户自定义聚合函数步骤:1.自定义函数类继承 AggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和getValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的聚合函数 AvgTemp,按 id 分组后求最新的平均温度值//2.创建自定义聚合函数实例AvgTemp avgTemp = new AvgTemp();//3.在环境中注册函数tableEnv.registerFunction("avgTemp", avgTemp);//4.使用//4.1 Table API//聚合函数必须在 groupBy 后的 aggregate 方法使用Table resultTable = sensorTable.groupBy("id").aggregate("avgTemp(temp) as avgtemp").select("id, avgtemp");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) from sensor group by id");tableEnv.toRetractStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义聚合函数 AvgTemp 继承 AggregateFunction 类,定义输出类型为 Double,中间累加器类型为 Tuple2<Double, Integer> 并实现方法public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>> {@Overridepublic Double getValue(Tuple2<Double, Integer> accumulator) {return accumulator.f0 / accumulator.f1;}@Overridepublic Tuple2<Double, Integer> createAccumulator() {return new Tuple2<>(0.0, 0);}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Tuple2<Double, Integer> accumulator, Double temp) {accumulator.f0 += temp;accumulator.f1 += 1;}}
}
4. 表聚合函数
Table Aggregate Function,可以把一个表中数据,聚合为具有多行和多列的结果表,多进多出
/**用户自定义表聚合函数步骤:1.自定义函数类继承 TableAggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和 emitValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表聚合函数 Top2Temp,按 id 分组后求最新的 top2 温度值//2.创建自定义表聚合函数实例Top2Temp top2Temp = new Top2Temp();//3.在环境中注册函数tableEnv.registerFunction("top2Temp", top2Temp);//4.使用//Table API//表聚合函数必须在 groupBy 后的 flatAggregate 方法使用Table resultTable = sensorTable.groupBy("id").flatAggregate("top2Temp(temp) as (temp, irank)").select("id, temp, irank");//表聚合函数目前不能在 SQL 中使用tableEnv.toRetractStream(resultTable, Row.class).print("result");env.execute();}//定义一个 Accumulatorpublic static class Top2TempAcc {private Double highestTemp = Double.MIN_VALUE;private Double secondHighestTemp = Double.MIN_VALUE;public Double getHighestTemp() {return highestTemp;}public void setHighestTemp(Double highestTemp) {this.highestTemp=highestTemp;}public Double getSecondHighestTemp() {return secondHighestTemp;}public void setSecondHighestTemp(Double secondHighestTemp) {this.secondHighestTemp=secondHighestTemp;}}//1.自定义表聚合函数 Top2Temp 继承 TableAggregateFunction 类,定义输出类型为 Tuple2<Double, Integer>,中间累加器类型为自定义的 Top2TempAcc 类并实现方法public static class Top2Temp extends TableAggregateFunction<Tuple2<Double, Integer>, Top2TempAcc> {@Overridepublic void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>> out) {out.collect(new Tuple2<>(acc.getHighestTemp(), 1));out.collect(new Tuple2<>(acc.getSecondHighestTemp(), 2));}@Overridepublic Top2TempAcc createAccumulator() {return new Top2TempAcc();}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Top2TempAcc acc, Double temp) {if(acc.getHighestTemp() < temp) {acc.setSecondHighestTemp(acc.getHighestTemp());acc.setHighestTemp(temp);} else if(acc.getSecondHighestTemp() < temp) {acc.setSecondHighestTemp(temp);}}}
}
相关文章:
详解 Flink Table API 和 Flink SQL 之函数
一、系统内置函数 1. 比较函数 API函数表达式示例Table API,>,<,!,>,<id1001,age>18SQL,>,<,!,>,<id‘1001’&…...
rsa加签验签C#和js以及java互通
js实现rsa加签验签 https://github.com/kjur/jsrsasign 11.1.0版本 解压选择需要的版本,这里选择all版本了 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>JS RSA加签验签</title&g…...
C语言中数组和指针的关系
在C语言中,数组和指针之间存在着密切的关系,尽管它们在概念上是不同的。以下是关于C语言中数组和指针关系的一些要点: 数组名作为指针: 在大多数情况下,数组名在表达式中会被当作指向其第一个元素的指针。例如&#x…...
idea 新建一个 JSP(JavaServer Pages)项目
环境设置: 确保你的开发环境中已经安装了 Java 开发工具包(JDK)和一个 Java Web 开发的集成开发环境(IDE),比如 Eclipse、IntelliJ IDEA 或者 NetBeans。你还需要一个 Web 服务器,比如 Apache T…...
【名词解释】Unity中的表格布局组件及其使用示例
Unity中的表格布局组件通常指的是GridLayoutGroup,这是一个在Unity的UI系统中用来布局子对象的组件。它可以帮助开发者将UI元素按照网格的形式进行排列,非常适合创建表格、网格视图等布局。 名词解释: GridLayoutGroup:Unity UI…...
判断当前设备为移动端自适应 平板和pc端为375移动端样式
在libs的setRem.js中: let html document.querySelector("html"); function setRem() {let ui_w 375;let cl_w document.documentElement.clientWidth || document.body.clientWidth;cl_w > 750 ? cl_w 375 : "";html.style.fontSize …...
Science Advances|用于胃部pH监测和早期胃漏检测的生物可吸收无线无源柔性传感器(健康监测/柔性传感/柔性电子)
2024年4月19日,美国西北大学 John A. Rogers和中国科学技术大学吕頔(Di Lu)团队,在《Science Advances》上发布了一篇题为“Bioresorbable, wireless, passive sensors for continuous pH measurements and early detection of gastric leakage”的论文。论文内容如下: 一、…...
C# 使用 webview2 嵌入网页
需求:C#客户端程序, 窗口里嵌入一个web网页,可通过URL跳转的那种。并且,需要将登录的身份验证信息(token)设置到请求头里。 核心代码如下: // 打开按钮的点击事件 private void openBtn_Click(object sen…...
公司面试题总结(五)
25.谈一谈箭头函数与普通函数的区别,箭头函数主要解决什么问题? 箭头函数与普通函数的区别: ⚫ 语法简洁性: ◼ 箭头函数使用>符号定义,省略了 function 关键字,使得语法更为紧凑。 ◼ 对于单行函…...
Flutter笔记:关于WebView插件的用法(上)
Flutter笔记 关于WebView插件的用法(上) - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite:http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:htt…...
计算机毕业设计Python+Django农产品推荐系统 农产品爬虫 农产品商城 农产品大数据 农产品数据分析可视化 PySpark Hadoop Hive
课题研究的意义,国内外研究现状、水平和发展趋势 研究意义21世纪是一个信息爆炸的时代,人们在日常生活中可接触到的信息量非常之巨大。推荐系统逐步发展,其中又以个性化推荐系统最为瞩目。个性化推荐系统的核心在于个性化推荐算法,…...
phpcms仿蚁乐购淘宝客网站模板
phpcms仿蚁乐购网站模板,淘宝客行业模板免费下载,该模板网站很容易吸引访客点击,提升ip流量和pv是非常有利的。本套模板采用现在非常流行的全屏自适应布局设计,且栏目列表以简洁,非常时尚大气。页面根据分辨率大小而自…...
leetcode695 岛屿的最大面积
题目 给你一个大小为 m x n 的二进制矩阵 grid 。 岛屿 是由一些相邻的 1 (代表土地) 构成的组合,这里的「相邻」要求两个 1 必须在 水平或者竖直的四个方向上 相邻。你可以假设 grid 的四个边缘都被 0(代表水)包围着。 岛屿的面积是岛上值…...
小程序无法调用服务端问题排查
1、问题描述 突然有一天线上的小程序不能登录,经查小程序无法调用。经查无法小程序页面无法调用后台服务。 2、排查过程 由于无法登录小程序发布服务器,无法测试小程序前端服务器到服务端网络,并且小程序无法看到日志。所以就得从服务端和网…...
Linux:多线程的操作
多线程操作 进程与线程线程的创建 create_pthread创建线程池给线程传入对象的指针 线程等待 pthread_join退出线程 pthread_exit线程等待参数 retval 与 线程退出参数 retval 线程中断 pthread_cancel获取线程编号 pthread_self线程分离 pthread_detach 进程与线程 进程是资源…...
kunpeng的aarch64架构cpu、openeuler系统、昇腾服务器适配文档转换功能(doc转docx、ppt转pptx)
一、安装flatpak sudo yum install flatpak flatpak remote-add --if-not-exists flathub https://flathub.org/repo/flathub.flatpakrepo二、安装libreoffice flatpak install flathub org.libreoffice.LibreOffice三、使用 对于使用 flatpak 安装的 LibreOffice,不需要手…...
unity 打包PC安装包中常见文件的功能
目录 前言 一、打包好的文件 二、常用文件 1.文件夹XXX_Data 2.文件夹MonoBleedingEdge 3.文件夹XXX_Data内部 三、文件的应用 1.如果你替换了一个图片 2.如果你新增了或减少了图片和资源 3.场景中有变动 4.resources代码加载的资源改了 5.如果你代码替换了 四、作…...
【Ardiuno】实验使用ESP32单片机实现高级web服务器暂时动态图表功能(图文)
接下来,我们继续实验示例代码中的Wifi“高级web服务器”,配置相关的无线密码后,开始实验 #include <WiFi.h> #include <WiFiClient.h> #include <WebServer.h> #include <ESPmDNS.h>const char *ssid "XIAOFE…...
深入浅出服务网格(Service Mesh):现代微服务架构的护航者
什么是服务网格? 服务网格是一种专用于处理微服务间通信的基础设施层,通常以轻量级代理(sidecar)的形式部署在每个服务实例旁边。它主要负责以下几项任务: 服务发现:自动检测和注册服务实例,使…...
node调试
vscode安装插件:JavaScript Debugger (Nightly) 点击后生成一个launch.json文件 打断点,并发送一个请求来执行代码到断点处 按右上的向下箭头,进入源码,进行查看,左边查看变量等值...
服务器硬防的应用场景都有哪些?
服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式,避免服务器受到各种恶意攻击和网络威胁,那么,服务器硬防通常都会应用在哪些场景当中呢? 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...
R语言速释制剂QBD解决方案之三
本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...
QT3D学习笔记——圆台、圆锥
类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体(对象或容器)QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质(定义颜色、反光等)QFirstPersonC…...
HubSpot推出与ChatGPT的深度集成引发兴奋与担忧
上周三,HubSpot宣布已构建与ChatGPT的深度集成,这一消息在HubSpot用户和营销技术观察者中引发了极大的兴奋,但同时也存在一些关于数据安全的担忧。 许多网络声音声称,这对SaaS应用程序和人工智能而言是一场范式转变。 但向任何技…...
Qt 事件处理中 return 的深入解析
Qt 事件处理中 return 的深入解析 在 Qt 事件处理中,return 语句的使用是另一个关键概念,它与 event->accept()/event->ignore() 密切相关但作用不同。让我们详细分析一下它们之间的关系和工作原理。 核心区别:不同层级的事件处理 方…...
前端中slice和splic的区别
1. slice slice 用于从数组中提取一部分元素,返回一个新的数组。 特点: 不修改原数组:slice 不会改变原数组,而是返回一个新的数组。提取数组的部分:slice 会根据指定的开始索引和结束索引提取数组的一部分。不包含…...
Docker拉取MySQL后数据库连接失败的解决方案
在使用Docker部署MySQL时,拉取并启动容器后,有时可能会遇到数据库连接失败的问题。这种问题可能由多种原因导致,包括配置错误、网络设置问题、权限问题等。本文将分析可能的原因,并提供解决方案。 一、确认MySQL容器的运行状态 …...
Vue 3 + WebSocket 实战:公司通知实时推送功能详解
📢 Vue 3 WebSocket 实战:公司通知实时推送功能详解 📌 收藏 点赞 关注,项目中要用到推送功能时就不怕找不到了! 实时通知是企业系统中常见的功能,比如:管理员发布通知后,所有用户…...
验证redis数据结构
一、功能验证 1.验证redis的数据结构(如字符串、列表、哈希、集合、有序集合等)是否按照预期工作。 2、常见的数据结构验证方法: ①字符串(string) 测试基本操作 set、get、incr、decr 验证字符串的长度和内容是否正…...
Linux中INADDR_ANY详解
在Linux网络编程中,INADDR_ANY 是一个特殊的IPv4地址常量(定义在 <netinet/in.h> 头文件中),用于表示绑定到所有可用网络接口的地址。它是服务器程序中的常见用法,允许套接字监听所有本地IP地址上的连接请求。 关…...
