19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的
文章目录
- Flink 系列文章
- 7、sql clinet中应用自定义函数
- 1)、实现自定义函数
- 2)、打包并上传jar至flink的lib目录下
- 3)、验证
- 1、创建表
- 2、初始化表数据
- 3、注册函数
- 4、验证自定义函数
- 8、pojo 数据类型应用示例-表值函数
本文展示了自定义函数在Flink sql client的应用以及自定义函数中使用pojo的示例。
本文依赖flink、kafka集群能正常使用。
本文分为2个部分,即自定义函数在Flink sql client中的应用以及自定义函数中使用pojo数据类型。
本文的示例如无特殊说明则是在Flink 1.17版本中运行。
7、sql clinet中应用自定义函数
本示例将上文中自定义的函数打包后在flink sql client中进行应用。
1)、实现自定义函数
本文的所有示例需要依赖的maven见本篇的上一篇:17、Flink 之Table API: Table API 支持的操作(1)
或者引入
<!-- flink依赖引入--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>
- 示例代码
package org.table_sql;import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;/*** @author alanchan**/@FunctionHint(output = @DataTypeHint("ROW<id int, name String, age int, balance int, rowtime string>"))
public class Alan_SplitFunction extends TableFunction<Row> {public void eval(String str) {String[] row = str.split(",");collect(Row.of(Integer.valueOf(row[0]), row[1], Integer.valueOf(row[2]), Integer.valueOf(row[3]), row[4]));}}
2)、打包并上传jar至flink的lib目录下
将该文件打包成jar文件,特别说明的是,注意flink运行环境与打包引入的jar文件是否冲突,推荐做法是只打包创建自定义函数所依赖的jar文件,其他jar使用flink部署环境的jar。
本示例打包后的文件名:Alan_SplitFunction.jar
上传jar文件后,并重启flink集群。
3)、验证
1、创建表
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.Flink SQL> CREATE TABLE alan_split_table (
> userString STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'alan_split',
> 'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'csv'
> );
[INFO] Execute statement succeed.Flink SQL> select * from alan_split_table;
[INFO] Result retrieval cancelled.
2、初始化表数据
本示例是通过kafka队列插入的数据,前提是kafka环境好用。
[alanchan@server1 bin]$ kafka-console-producer.sh --broker-list server1:9092 --topic alan_split
>"11,alan,18,20,1699341167461"
>"12,alan,19,25,1699341168464"
>"13,alan,20,30,1699341169472"
>"14,alanchan,18,22,1699341170479"
>"15,alanchan,19,25,1699341171482"Flink SQL> select * from alan_split_table;
+----+--------------------------------+
| op | userString |
+----+--------------------------------+
| +I | 11,alan,18,20,1699341167461 |
| +I | 12,alan,19,25,1699341168464 |
| +I | 13,alan,20,30,1699341169472 |
| +I | 14,alanchan,18,22,169934117... |
| +I | 15,alanchan,19,25,169934117... |
3、注册函数
将自定义的函数注册为flink的临时函数,临时函数只在当前的会话中起作用,如果注册成其他函数,参考如下语法
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION[IF NOT EXISTS] [[catalog_name.]db_name.]function_nameAS identifier [LANGUAGE JAVA|SCALA|PYTHON][USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]# TEMPORARY
# 创建一个有 catalog 和数据库命名空间的临时 catalog function ,并覆盖原有的 catalog function 。# TEMPORARY SYSTEM
# 创建一个没有数据库命名空间的临时系统 catalog function ,并覆盖系统内置的函数。
本示例注册为临时函数,如下
Flink SQL> CREATE TEMPORARY FUNCTION alan_split AS 'org.table_sql.Alan_SplitFunction';
[INFO] Execute statement succeed.Flink SQL> show functions;
+-----------------------+
| function name |
+-----------------------+
| IFNULL |
| SOURCE_WATERMARK |
| TYPEOF |
| abs |
| acos |
| alan_split |
| and |
| array |
。。。。。。
4、验证自定义函数
Flink SQL> SELECT userString, t_id, t_name,t_age,t_balance,t_rowtime
> FROM alan_split_table
> LEFT JOIN LATERAL TABLE(alan_split(userString)) AS T(t_id, t_name,t_age,t_balance,t_rowtime) ON TRUE;
+----+--------------------------------+-------------+--------------------------------+-------------+-------------+--------------------------------+
| op | userString | t_id | t_name | t_age | t_balance | t_rowtime |
+----+--------------------------------+-------------+--------------------------------+-------------+-------------+--------------------------------+
| +I | 11,alan,18,20,1699341167461 | 11 | alan | 18 | 20 | 1699341167461 |
| +I | 12,alan,19,25,1699341168464 | 12 | alan | 19 | 25 | 1699341168464 |
| +I | 13,alan,20,30,1699341169472 | 13 | alan | 20 | 30 | 1699341169472 |
| +I | 14,alanchan,18,22,169934117... | 14 | alanchan | 18 | 22 | 1699341170479 |
| +I | 15,alanchan,19,25,169934117... | 15 | alanchan | 19 | 25 | 1699341171482 |
至此,完成了自定义函数注册至flink sql client的验证。
8、pojo 数据类型应用示例-表值函数
功能参考 19、Flink 的Table API 和 SQL 中的自定义函数及示例(2) 中的【4、表值函数-自定义函数说明及示例】
本示例仅仅是展示在自定义函数中使用pojo 对象。
本示例仅仅是一种实现方式,也可以覆盖getTypeInference并以编程方式提供所有组件,不再赘述。
本示例仅仅是以表值函数作为示例,其他的自定义函数类似。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
public class TestUDTableFunctionDemo2 {@Data@NoArgsConstructor@AllArgsConstructorpublic static class User {private int id;private String name;private int age;private int balance;private String rowtime;}// @FunctionHint(output = @DataTypeHint("User<id int, name String, age int, balance int, rowtime string>"))
// public static class OverloadedFunction extends TableFunction<Row> {@FunctionHint(output =@DataTypeHint(bridgedTo = User.class))public static class OverloadedFunction extends TableFunction<User> {public void eval(String str) {String[] user = str.split(",");// 使用 Row数据类型
// collect(Row.of(Integer.valueOf(user[0]), user[1], Integer.valueOf(user[2]), Integer.valueOf(user[3]), user[4]));// 使用User pojo数据类型collect(new User(Integer.valueOf(user[0]), user[1], Integer.valueOf(user[2]), Integer.valueOf(user[3]), user[4]));}}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStream<String> row = env.fromCollection(//id name age balance rowtimeArrays.asList("11,alan,18,20,1699341167461","12,alan,19,25,1699341168464","13,alan,20,30,1699341169472","14,alanchan,18,22,1699341170479","15,alanchan,19,25,1699341171482"));Table usersTable2 = tenv.fromDataStream(row, $("userString"));tenv.createTemporarySystemFunction("OverloadedFunction", OverloadedFunction.class);Table result5 = usersTable2.leftOuterJoinLateral(call("OverloadedFunction", $("userString")).as("t_id","t_name","t_age","t_balance","t_rowtime")).select($("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime")
// .select($("userString"),$("t_id"),$("t_name"),$("t_age"),$("t_balance"),$("t_rowtime")) ; DataStream<Tuple2<Boolean, Row>> result5DS = tenv.toRetractStream(result5, Row.class);result5DS.print();
// 15> (true,+I[15, alanchan, 19, 25, 1699341171482])
// 12> (true,+I[12, alan, 19, 25, 1699341168464])
// 13> (true,+I[13, alan, 20, 30, 1699341169472])
// 11> (true,+I[11, alan, 18, 20, 1699341167461])
// 14> (true,+I[14, alanchan, 18, 22, 1699341170479])env.execute();}}
以上,展示了自定义函数在Flink sql client的应用以及自定义函数中使用pojo的示例。
相关文章:
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…...

Vue23-props配置功能
Vue2&3-props配置功能 Vue2-props配置 功能:接收从其他组件传过来的数据,将数据从静态转为动态注意: 同一层组件不能使用props,必须是父组件传子组件的形式。父组件传数据,子组件接收数据。不能什么数据都接收&a…...

怎样使用ovsyunlive在web网页上直接播放rtsp/rtmp视频
业务中需要在网页中直接播放rtsp和rtmp视频,多方比较测试发现ovsyunlive的播放器能直接播放rtsp/rtmp视频,还是非常方便简洁,使用过程如下: 1,Windows系统在github上面下载ovsyunlive绿色包下载解压。 github地址&am…...

MySQL | 查询接口性能调优、编码方式不一致导致索引失效
背景 最近业务反馈,列表查询速度过慢,需要优化。 到正式环境系统去验证,发现没筛选任何条件的情况下,查询需要三十多秒,而筛选了条件之后需要13秒。急需优化。 先说结论:连表用的字段编码方式不一致导致索…...

ASUS华硕灵耀X2 Duo UX481FA(FL,FZ)_UX4000F工厂模式原装出厂Windows10系统
下载链接:https://pan.baidu.com/s/1sRHKBOyc3zu1v0qw4dSASA?pwd7nb0 提取码:7nb0 带有ASUS RECOVERY恢复功能、自带所有驱动、出厂主题壁纸、系统属性专属LOGO标志、Office办公软件、MyASUS华硕电脑管家等预装程序所需要工具:16G或以上…...

企业安全—三保一评
0x00 前言 本篇主要是讲解三保一评的基础知识,以及对为什么要进行这些内容的原因进行总结。 0x01 整体 1.概述 三保分别是,分保,等保,关保。 分保就是指涉密信息系统的建设使用单位根据分级保护管理办法和有关标准,…...

“深入理解机器学习性能评估指标:TP、TN、FP、FN、精确率、召回率、准确率、F1-score和mAP”
目录 引言 分类标准 示例:癌症检测 1. 精确率(Precision) 2. 召回率(Recall) 3. 准确率(Accuracy) 4. F1-score 5. mAP(均值平均精度) 总结与通俗解释 引言 机器…...

Linux软件包(源码包和二进制包)
Linux下的软件包众多,且几乎都是经 GPL 授权、免费开源(无偿公开源代码)的。这意味着如果你具备修改软件源代码的能力,只要你愿意,可以随意修改。 GPL,全称 General Public License,中文名称“通…...

Leetcode-394 字符串解码(不会,复习)
此题不会!!!!!!!!!!!! 题解思路:元组思想:数字[字符串],每次遇到中括号意味着要重复数字次字符串…...

如何在Linux上搭建本地Docker Registry并实现远程连接
Linux 本地 Docker Registry本地镜像仓库远程连接 文章目录 Linux 本地 Docker Registry本地镜像仓库远程连接1. 部署Docker Registry2. 本地测试推送镜像3. Linux 安装cpolar4. 配置Docker Registry公网访问地址5. 公网远程推送Docker Registry6. 固定Docker Registry公网地址…...
assets_common.min.js
assets_common.min.js odoo将零散的js文件主要打包成了两个文件,分别是web.assets_common.min.js 和web.assets_backend.min.js, 我们分别看看这两个文件里都有些啥? common.js最先加载,看看里面都有些啥 1、boot.js 定义了od…...

前端工程化(vue2)
一、环境准备 1.依赖环境:NodeJS 官网:Node.js 2.脚手架:Vue-cli 参考网址:安装 | Vue CLI 介绍:Vue-cli用于快速的生成一个Vue的项目模板。主要功能有:统一的目录结构,本地调试࿰…...

深度学习(生成式模型)——Classifier Guidance Diffusion
文章目录 前言问题建模条件扩散模型的前向过程条件扩散模型的反向过程条件扩散模型的训练目标 前言 几乎所有的生成式模型,发展到后期都需要引入"控制"的概念,可控制的生成式模型才能更好应用于实际场景。本文将总结《Diffusion Models Beat …...

Hadoop架构、Hive相关知识点及Hive执行流程
Hadoop架构 Hadoop由三大部分组成:HDFS、MapReduce、yarn HDFS:负责数据的存储 其中包括: namenode:主节点,用来分配任务给从节点 secondarynamenode:副节点,辅助主节点 datanode:从节点&#x…...

P1529 [USACO2.4] 回家 Bessie Come Home 题解
文章目录 题目描述输入格式输出格式样例样例输入样例输出 提示完整代码 题目描述 现在是晚餐时间,而母牛们在外面分散的牧场中。 Farmer John 按响了电铃,所以她们开始向谷仓走去。 你的工作是要指出哪只母牛会最先到达谷仓(在给出的测试数…...
Python语法基础(条件语句 循环语句 函数 切片及索引)
目录 条件语句关键字与C对照注意 循环语句while 循环语句while else 循环语句for 循环语句range() 函数 for else 循环语句循环控制语句练习:打印乘法表 函数函数定义及调用函数值传递和引用传递多返回值参数类型位置参数默认参数关键字参数可变数量的参数可变数量的…...
Debian 9 Stretch APT问题
Debian 9 Stretch APT问题 flyfish 操作系统 Debian 9 Stretch 错误提示 使用sudo apt update错误提示 Ign:1 http://mirrors.aliyun.com/debian stretch InRelease Ign:2 http://mirrors.aliyun.com/debian-security stretch/updates InRelease Ign:3 http://mirrors.al…...

遍历List集合和Map进行修改和删除报java.util.ConcurrentModificationException错误详解
一、异常产生 当我们使用foreach迭代一个ArrayList或者HashMap时,如果尝试对集合做一些修改操作(例如删除元素或新增),可能会抛出java.util.ConcurrentModificationException的异常。 javapublic static void main(String[] args)…...
Android从一个APP跳转到另外一个APP
1、从当前APP去全新启动另外一个目标APP(非覆盖同一个进程): 启动另外一个目标APP(非覆盖原来APP的方式) 1、当前APP加入获取权限声明:(不加人权限检查,没法启动目标app࿰…...
我的创作纪念日——创作者2年
机缘 我最初使用CSDN估计是在2014年左右,当时还在读研,除了在当时比较有名的BBS例如小木虫上进行学术交流外,我发现很多问题百度后,都会转到CSDN,而且文章内容颇为专业,很多问题也都有专业的回答ÿ…...
Qt Widget类解析与代码注释
#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码,写上注释 当然可以!这段代码是 Qt …...

Opencv中的addweighted函数
一.addweighted函数作用 addweighted()是OpenCV库中用于图像处理的函数,主要功能是将两个输入图像(尺寸和类型相同)按照指定的权重进行加权叠加(图像融合),并添加一个标量值&#x…...
系统设计 --- MongoDB亿级数据查询优化策略
系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log,共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题,不能使用ELK只能使用…...
数据链路层的主要功能是什么
数据链路层(OSI模型第2层)的核心功能是在相邻网络节点(如交换机、主机)间提供可靠的数据帧传输服务,主要职责包括: 🔑 核心功能详解: 帧封装与解封装 封装: 将网络层下发…...

Linux-07 ubuntu 的 chrome 启动不了
文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了,报错如下四、启动不了,解决如下 总结 问题原因 在应用中可以看到chrome,但是打不开(说明:原来的ubuntu系统出问题了,这个是备用的硬盘&a…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序
一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...
Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?
在大数据处理领域,Hive 作为 Hadoop 生态中重要的数据仓库工具,其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式,很多开发者常常陷入选择困境。本文将从底…...
QT3D学习笔记——圆台、圆锥
类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体(对象或容器)QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质(定义颜色、反光等)QFirstPersonC…...

搭建DNS域名解析服务器(正向解析资源文件)
正向解析资源文件 1)准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2)服务端安装软件:bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...