19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的
文章目录
- Flink 系列文章
- 7、sql clinet中应用自定义函数
- 1)、实现自定义函数
- 2)、打包并上传jar至flink的lib目录下
- 3)、验证
- 1、创建表
- 2、初始化表数据
- 3、注册函数
- 4、验证自定义函数
- 8、pojo 数据类型应用示例-表值函数
本文展示了自定义函数在Flink sql client的应用以及自定义函数中使用pojo的示例。
本文依赖flink、kafka集群能正常使用。
本文分为2个部分,即自定义函数在Flink sql client中的应用以及自定义函数中使用pojo数据类型。
本文的示例如无特殊说明则是在Flink 1.17版本中运行。
7、sql clinet中应用自定义函数
本示例将上文中自定义的函数打包后在flink sql client中进行应用。
1)、实现自定义函数
本文的所有示例需要依赖的maven见本篇的上一篇:17、Flink 之Table API: Table API 支持的操作(1)
或者引入
<!-- flink依赖引入--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
- 示例代码
package org.table_sql;import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;/*** @author alanchan**/@FunctionHint(output = @DataTypeHint("ROW<id int, name String, age int, balance int, rowtime string>"))
public class Alan_SplitFunction extends TableFunction<Row> {public void eval(String str) {String[] row = str.split(",");collect(Row.of(Integer.valueOf(row[0]), row[1], Integer.valueOf(row[2]), Integer.valueOf(row[3]), row[4]));}}
2)、打包并上传jar至flink的lib目录下
将该文件打包成jar文件,特别说明的是,注意flink运行环境与打包引入的jar文件是否冲突,推荐做法是只打包创建自定义函数所依赖的jar文件,其他jar使用flink部署环境的jar。
本示例打包后的文件名:Alan_SplitFunction.jar
上传jar文件后,并重启flink集群。
3)、验证
1、创建表
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.Flink SQL> CREATE TABLE alan_split_table (
> userString STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'alan_split',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.Flink SQL> select * from alan_split_table;
[INFO] Result retrieval cancelled.
2、初始化表数据
本示例是通过kafka队列插入的数据,前提是kafka环境好用。
[alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic alan_split
>"11,alan,18,20,1699341167461"
>"12,alan,19,25,1699341168464"
>"13,alan,20,30,1699341169472"
>"14,alanchan,18,22,1699341170479"
>"15,alanchan,19,25,1699341171482"Flink SQL> select * from alan_split_table;
+----+--------------------------------+
| op | userString |
+----+--------------------------------+
| +I | 11,alan,18,20,1699341167461 |
| +I | 12,alan,19,25,1699341168464 |
| +I | 13,alan,20,30,1699341169472 |
| +I | 14,alanchan,18,22,169934117... |
| +I | 15,alanchan,19,25,169934117... |
3、注册函数
将自定义的函数注册为flink的临时函数,临时函数只在当前的会话中起作用,如果注册成其他函数,参考如下语法
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION[IF NOT EXISTS] [[catalog_name.]db_name.]function_nameAS identifier [LANGUAGE JAVA|SCALA|PYTHON][USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]# TEMPORARY
# 创建一个有 catalog 和数据库命名空间的临时 catalog function ,并覆盖原有的 catalog function 。# TEMPORARY SYSTEM
# 创建一个没有数据库命名空间的临时系统 catalog function ,并覆盖系统内置的函数。
本示例注册为临时函数,如下
Flink SQL> CREATE TEMPORARY FUNCTION alan_split AS 'org.table_sql.Alan_SplitFunction';
[INFO] Execute statement succeed.Flink SQL> show functions;
+-----------------------+
| function name |
+-----------------------+
| IFNULL |
| SOURCE_WATERMARK |
| TYPEOF |
| abs |
| acos |
| alan_split |
| and |
| array |
。。。。。。
4、验证自定义函数
Flink SQL> SELECT userString, t_id, t_name,t_age,t_balance,t_rowtime
> FROM alan_split_table
> LEFT JOIN LATERAL TABLE(alan_split(userString)) AS T(t_id, t_name,t_age,t_balance,t_rowtime) ON TRUE;
+----+--------------------------------+-------------+--------------------------------+-------------+-------------+--------------------------------+
| op | userString | t_id | t_name | t_age | t_balance | t_rowtime |
+----+--------------------------------+-------------+--------------------------------+-------------+-------------+--------------------------------+
| +I | 11,alan,18,20,1699341167461 | 11 | alan | 18 | 20 | 1699341167461 |
| +I | 12,alan,19,25,1699341168464 | 12 | alan | 19 | 25 | 1699341168464 |
| +I | 13,alan,20,30,1699341169472 | 13 | alan | 20 | 30 | 1699341169472 |
| +I | 14,alanchan,18,22,169934117... | 14 | alanchan | 18 | 22 | 1699341170479 |
| +I | 15,alanchan,19,25,169934117... | 15 | alanchan | 19 | 25 | 1699341171482 |
至此,完成了自定义函数注册至flink sql client的验证。
8、pojo 数据类型应用示例-表值函数
功能参考 19、Flink 的Table API 和 SQL 中的自定义函数及示例(2) 中的【4、表值函数-自定义函数说明及示例】
本示例仅仅是展示在自定义函数中使用pojo 对象。
本示例仅仅是一种实现方式,也可以覆盖getTypeInference并以编程方式提供所有组件,不再赘述。
本示例仅仅是以表值函数作为示例,其他的自定义函数类似。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
public class TestUDTableFunctionDemo2 {@Data@NoArgsConstructor@AllArgsConstructorpublic static class User {private int id;private String name;private int age;private int balance;private String rowtime;}// @FunctionHint(output = @DataTypeHint("User<id int, name String, age int, balance int, rowtime string>"))
// public static class OverloadedFunction extends TableFunction<Row> {@FunctionHint(output =@DataTypeHint(bridgedTo = User.class))public static class OverloadedFunction extends TableFunction<User> {public void eval(String str) {String[] user = str.split(",");// 使用 Row数据类型
// collect(Row.of(Integer.valueOf(user[0]), user[1], Integer.valueOf(user[2]), Integer.valueOf(user[3]), user[4]));// 使用User pojo数据类型collect(new User(Integer.valueOf(user[0]), user[1], Integer.valueOf(user[2]), Integer.valueOf(user[3]), user[4]));}}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStream<String> row = env.fromCollection(//id name age balance rowtimeArrays.asList("11,alan,18,20,1699341167461","12,alan,19,25,1699341168464","13,alan,20,30,1699341169472","14,alanchan,18,22,1699341170479","15,alanchan,19,25,1699341171482"));Table usersTable2 = tenv.fromDataStream(row, $("userString"));tenv.createTemporarySystemFunction("OverloadedFunction", OverloadedFunction.class);Table result5 = usersTable2.leftOuterJoinLateral(call("OverloadedFunction", $("userString")).as("t_id","t_name","t_age","t_balance","t_rowtime")).select($("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime")
// .select($("userString"),$("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime")) ; DataStream<Tuple2<Boolean, Row>> result5DS = tenv.toRetractStream(result5, Row.class);result5DS.print();
// 15> (true,+I[15, alanchan, 19, 25, 1699341171482])
// 12> (true,+I[12, alan, 19, 25, 1699341168464])
// 13> (true,+I[13, alan, 20, 30, 1699341169472])
// 11> (true,+I[11, alan, 18, 20, 1699341167461])
// 14> (true,+I[14, alanchan, 18, 22, 1699341170479])env.execute();}}
以上,展示了自定义函数在Flink sql client的应用以及自定义函数中使用pojo的示例。
相关文章:
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…...
Vue23-props配置功能
Vue2&3-props配置功能 Vue2-props配置 功能:接收从其他组件传过来的数据,将数据从静态转为动态注意: 同一层组件不能使用props,必须是父组件传子组件的形式。父组件传数据,子组件接收数据。不能什么数据都接收&a…...
怎样使用ovsyunlive在web网页上直接播放rtsp/rtmp视频
业务中需要在网页中直接播放rtsp和rtmp视频,多方比较测试发现ovsyunlive的播放器能直接播放rtsp/rtmp视频,还是非常方便简洁,使用过程如下: 1,Windows系统在github上面下载ovsyunlive绿色包下载解压。 github地址&am…...
MySQL | 查询接口性能调优、编码方式不一致导致索引失效
背景 最近业务反馈,列表查询速度过慢,需要优化。 到正式环境系统去验证,发现没筛选任何条件的情况下,查询需要三十多秒,而筛选了条件之后需要13秒。急需优化。 先说结论:连表用的字段编码方式不一致导致索…...
ASUS华硕灵耀X2 Duo UX481FA(FL,FZ)_UX4000F工厂模式原装出厂Windows10系统
下载链接:https://pan.baidu.com/s/1sRHKBOyc3zu1v0qw4dSASA?pwd7nb0 提取码:7nb0 带有ASUS RECOVERY恢复功能、自带所有驱动、出厂主题壁纸、系统属性专属LOGO标志、Office办公软件、MyASUS华硕电脑管家等预装程序所需要工具:16G或以上…...
企业安全—三保一评
0x00 前言 本篇主要是讲解三保一评的基础知识,以及对为什么要进行这些内容的原因进行总结。 0x01 整体 1.概述 三保分别是,分保,等保,关保。 分保就是指涉密信息系统的建设使用单位根据分级保护管理办法和有关标准,…...
“深入理解机器学习性能评估指标:TP、TN、FP、FN、精确率、召回率、准确率、F1-score和mAP”
目录 引言 分类标准 示例:癌症检测 1. 精确率(Precision) 2. 召回率(Recall) 3. 准确率(Accuracy) 4. F1-score 5. mAP(均值平均精度) 总结与通俗解释 引言 机器…...
Linux软件包(源码包和二进制包)
Linux下的软件包众多,且几乎都是经 GPL 授权、免费开源(无偿公开源代码)的。这意味着如果你具备修改软件源代码的能力,只要你愿意,可以随意修改。 GPL,全称 General Public License,中文名称“通…...
Leetcode-394 字符串解码(不会,复习)
此题不会!!!!!!!!!!!! 题解思路:元组思想:数字[字符串],每次遇到中括号意味着要重复数字次字符串…...
如何在Linux上搭建本地Docker Registry并实现远程连接
Linux 本地 Docker Registry本地镜像仓库远程连接 文章目录 Linux 本地 Docker Registry本地镜像仓库远程连接1. 部署Docker Registry2. 本地测试推送镜像3. Linux 安装cpolar4. 配置Docker Registry公网访问地址5. 公网远程推送Docker Registry6. 固定Docker Registry公网地址…...
assets_common.min.js
assets_common.min.js odoo将零散的js文件主要打包成了两个文件,分别是web.assets_common.min.js 和web.assets_backend.min.js, 我们分别看看这两个文件里都有些啥? common.js最先加载,看看里面都有些啥 1、boot.js 定义了od…...
前端工程化(vue2)
一、环境准备 1.依赖环境:NodeJS 官网:Node.js 2.脚手架:Vue-cli 参考网址:安装 | Vue CLI 介绍:Vue-cli用于快速的生成一个Vue的项目模板。主要功能有:统一的目录结构,本地调试࿰…...
深度学习(生成式模型)——Classifier Guidance Diffusion
文章目录 前言问题建模条件扩散模型的前向过程条件扩散模型的反向过程条件扩散模型的训练目标 前言 几乎所有的生成式模型,发展到后期都需要引入"控制"的概念,可控制的生成式模型才能更好应用于实际场景。本文将总结《Diffusion Models Beat …...
Hadoop架构、Hive相关知识点及Hive执行流程
Hadoop架构 Hadoop由三大部分组成:HDFS、MapReduce、yarn HDFS:负责数据的存储 其中包括: namenode:主节点,用来分配任务给从节点 secondarynamenode:副节点,辅助主节点 datanode:从节点&#x…...
P1529 [USACO2.4] 回家 Bessie Come Home 题解
文章目录 题目描述输入格式输出格式样例样例输入样例输出 提示完整代码 题目描述 现在是晚餐时间,而母牛们在外面分散的牧场中。 Farmer John 按响了电铃,所以她们开始向谷仓走去。 你的工作是要指出哪只母牛会最先到达谷仓(在给出的测试数…...
Python语法基础(条件语句 循环语句 函数 切片及索引)
目录 条件语句关键字与C对照注意 循环语句while 循环语句while else 循环语句for 循环语句range() 函数 for else 循环语句循环控制语句练习:打印乘法表 函数函数定义及调用函数值传递和引用传递多返回值参数类型位置参数默认参数关键字参数可变数量的参数可变数量的…...
Debian 9 Stretch APT问题
Debian 9 Stretch APT问题 flyfish 操作系统 Debian 9 Stretch 错误提示 使用sudo apt update错误提示 Ign:1 http://mirrors.aliyun.com/debian stretch InRelease Ign:2 http://mirrors.aliyun.com/debian-security stretch/updates InRelease Ign:3 http://mirrors.al…...
遍历List集合和Map进行修改和删除报java.util.ConcurrentModificationException错误详解
一、异常产生 当我们使用foreach迭代一个ArrayList或者HashMap时,如果尝试对集合做一些修改操作(例如删除元素或新增),可能会抛出java.util.ConcurrentModificationException的异常。 javapublic static void main(String[] args)…...
Android从一个APP跳转到另外一个APP
1、从当前APP去全新启动另外一个目标APP(非覆盖同一个进程): 启动另外一个目标APP(非覆盖原来APP的方式) 1、当前APP加入获取权限声明:(不加人权限检查,没法启动目标app࿰…...
我的创作纪念日——创作者2年
机缘 我最初使用CSDN估计是在2014年左右,当时还在读研,除了在当时比较有名的BBS例如小木虫上进行学术交流外,我发现很多问题百度后,都会转到CSDN,而且文章内容颇为专业,很多问题也都有专业的回答ÿ…...
循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...
解锁数据库简洁之道:FastAPI与SQLModel实战指南
在构建现代Web应用程序时,与数据库的交互无疑是核心环节。虽然传统的数据库操作方式(如直接编写SQL语句与psycopg2交互)赋予了我们精细的控制权,但在面对日益复杂的业务逻辑和快速迭代的需求时,这种方式的开发效率和可…...
Java - Mysql数据类型对应
Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...
让AI看见世界:MCP协议与服务器的工作原理
让AI看见世界:MCP协议与服务器的工作原理 MCP(Model Context Protocol)是一种创新的通信协议,旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天,MCP正成为连接AI与现实世界的重要桥梁。…...
IT供电系统绝缘监测及故障定位解决方案
随着新能源的快速发展,光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域,IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选,但在长期运行中,例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...
回溯算法学习
一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...
初探Service服务发现机制
1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能:服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源…...
JS手写代码篇----使用Promise封装AJAX请求
15、使用Promise封装AJAX请求 promise就有reject和resolve了,就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...
