0基础学习PyFlink——用户自定义函数之UDTAF
大纲
- UDTAF
- TableAggregateFunction的实现
- 累加器
- 定义
- 创建
- 累加
- 返回
- 类型
- 计算
- 完整代码
在前面几篇文章中,我们分别介绍了UDF、UDTF和UDAF这三种用户自定义函数。本节我们将介绍最后一种函数:UDTAF——用户自定义表值聚合函数。

UDTAF
UDTAF函数即具备了UDTF的特点,也具备UDAF的特点。即它可以像《0基础学习PyFlink——用户自定义函数之UDTF》介绍的UDTF那样可以返回任意数量的行作为输,又可以像《0基础学习PyFlink——用户自定义函数之UDAF》介绍的UDAF那样通过聚合的数据(多组)计算出一个值。
举一个例子:我们拿到一个学生成绩表,每行包括:
- 学生姓名
- 英语成绩
- 数学成绩
- 年级
现在我们需要把这张表调整为:
- 学生姓名
- 成绩
- 科目
- 科目年级平均成绩
- 年级

将一行中的“英语成绩”和“数学成绩”,拆成“成绩”和“科目”,相当于把一行数据拆解成多行,如上图左侧“张三”只有一行,而右侧有两行“张三”信息。这种拆解操作就需要T类型的用户自定义函数,比如UDTF和UDTAF。
而我们需要计算一个年级一科的平均成绩,比如1年级英语的平均成绩,则需要按年级聚合之后再做计算。这个就需要A类型的用户自定义函数,比如UDAF和UDTAF。
同时要满足上述两种技术方案的就是UDTAF。我们先看下主体代码,它和《0基础学习PyFlink——用户自定义函数之UDAF》中的很像。但是有两个重要区别: - 要设置成in_streaming_mode模式,否则会报错;
- udtaf要修饰一个对象,而非一个方法;
def calc():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_streaming_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('english', DataTypes.FLOAT()), DataTypes.FIELD('math', DataTypes.FLOAT()), DataTypes.FIELD('grade', DataTypes.STRING())])students_score = [("张三", 80.0, 60.0, "1"),("李四", 75.0, 95.0, "1"),("王五", 90.0, 90.0, "2"),("赵六", 85.0, 70.0, "2"),("孙七", 60.0, 0.0, "3"),]tab_source = t_env.from_elements(students_score, row_type_tab_source)split_class = udtaf(SplitClass())tab_source.group_by(col('grade')) \.flat_aggregate(split_class) \.select(col('*')) \.execute().print()
TableAggregateFunction的实现
用于计算的类要继承于TableAggregateFunction,即UDTAF中的TAF。
class SplitClass(TableAggregateFunction):_class_keys = ["english", "math"]
我们需要通过get_result_type告诉框架,UDTAF函数返回的是什么类型的数据。一般我们都是构造一个行类型——ROW,然后定义其每个字段的值和类型:
- name:string类型,用户姓名;
- score:float类型,考分;
- avg score:float类型,科目年级平均分数;
- class:sting类型,科目名称;
累加器
accumulator(累加器)是用于参与计算的中间数据。比如这个案例中,我们会向让accumulator保存拆解后的数据(即一行拆解成多行后的数据),然后再计算各年级每科的平均成绩。
定义
def get_accumulator_type(self):return DataTypes.ARRAY(DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())]))
因为只是为了保存展开的数据,于是我们只用定义均值计算之前的字段:
- name:string类型,姓名;
- score:float类型,分数;
- class:string类型,科目名称;
创建
刚开始时,我们让其是一个空数组,对应上定义中的ARRAY类型。
def create_accumulator(self):return []
累加
我们对科目进行遍历,进行行的拆分。即将(“张三”, 80.0, 60.0, “1”)拆解成(“张三”, 80.0, “english”)和(“张三”, 60.0, “math”)这样的两组数据。
def accumulate(self, accumulator, row):for i in self._class_keys:accumulator.append(Row(row["name"], row[i], i))
返回
类型
def get_result_type(self):return DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("avg score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])
可以看到result_type(返回类型)和accumulator_type(累加器类型)是不一样的(也可以一样,主要看怎么计算规则)。前者比后者多了“学科年级平均分”(avg score),这就更加接近我们希望获得的最终结果。
这些字段和我们目标字段只差一个grade(年级)。因为原始表中有grade,且我们会通过grade聚类,所以最终我们可以获得这个信息,而不用在这儿定义。
需要注意的是,虽然表值类型函数返回的是一组数据(若干Row),但是这儿只是返回Row的具体定义,而不是ARRAY[Row]。
计算
def emit_value(self, accumulator):rows = []for i in self._class_keys: total = 0.0student_count = 0for y in accumulator:# y[2] y[]"class"]if i == y[2]:# y[1] y["score"]total = total + y[1]student_count = student_count + 1avg_score = total / student_countfor y in accumulator:if i == y[2]:rows.append(Row(y[0], y[1], avg_score, y[2]))for x in rows: yield x
这个函数会在最后执行,它会通过累加器中的数据计算“学科年级平均分”,然后构造和“返回类型”一直的Row到rows数组中。最后通过yeild关键字返回一个生成器,我们可以将其看成还是一组Row,即拆解后的结果。
最后我们看下结果
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op | grade | name | score | avg score | class |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 1 | 张三 | 80.0 | 77.5 | english |
| +I | 1 | 李四 | 75.0 | 77.5 | english |
| +I | 1 | 张三 | 60.0 | 77.5 | math |
| +I | 1 | 李四 | 95.0 | 77.5 | math |
| +I | 2 | 王五 | 90.0 | 87.5 | english |
| +I | 2 | 赵六 | 85.0 | 87.5 | english |
| +I | 2 | 王五 | 90.0 | 80.0 | math |
| +I | 2 | 赵六 | 70.0 | 80.0 | math |
| +I | 3 | 孙七 | 60.0 | 60.0 | english |
| +I | 3 | 孙七 | 0.0 | 0.0 | math |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
10 rows in set



完整代码
from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf,TableAggregateFunction
import pandas as pd
from pyflink.table.udf import UserDefinedFunction
from typing import Listclass SplitClass(TableAggregateFunction):_class_keys = ["english", "math"]def emit_value(self, accumulator):rows = []for i in self._class_keys: total = 0.0student_count = 0for y in accumulator:if i == y[2]:total = total + y[1]student_count = student_count + 1avg_score = total / student_countfor y in accumulator:if i == y[2]:rows.append(Row(y[0], y[1], avg_score, y[2]))return rowsdef create_accumulator(self):return []def accumulate(self, accumulator, row):for i in self._class_keys:accumulator.append(Row(row["name"], row[i], i))def get_accumulator_type(self):return DataTypes.ARRAY(DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])) def get_result_type(self):return DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("avg score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])def calc():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_streaming_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('english', DataTypes.FLOAT()), DataTypes.FIELD('math', DataTypes.FLOAT()), DataTypes.FIELD('grade', DataTypes.STRING())])students_score = [("张三", 80.0, 60.0, "1"),("李四", 75.0, 95.0, "1"),("王五", 90.0, 90.0, "2"),("赵六", 85.0, 70.0, "2"),("孙七", 60.0, 0.0, "3"),]tab_source = t_env.from_elements(students_score, row_type_tab_source)split_class = udtaf(SplitClass())tab_source.group_by(col('grade')) \.flat_aggregate(split_class) \.select(col('*')) \.execute().print()if __name__ == '__main__':calc()
相关文章:
0基础学习PyFlink——用户自定义函数之UDTAF
大纲 UDTAFTableAggregateFunction的实现累加器定义创建累加 返回类型计算 完整代码 在前面几篇文章中,我们分别介绍了UDF、UDTF和UDAF这三种用户自定义函数。本节我们将介绍最后一种函数:UDTAF——用户自定义表值聚合函数。 UDTAF UDTAF函数即具备了…...
SQLi靶场
SQLi靶场 less1- less2 (详细讲解) less 1 Error Based-String (字符类型注入) 思路分析 判断是否存在SQL注入 已知参数名为id,输入数值和‘ 单引号‘’ 双引号来判断,它是数值类型还是字符类型 首先输入 1 , 发现…...
重庆开放大学学子们的好帮手
作为一名电大学员,我有幸目睹了一个令人惊叹的学习工具的诞生——电大搜题微信公众号。这个创新应用为重庆开放大学(广播电视大学)的学子们提供了便捷、高效的学习资源,成为他们的得力助手。 重庆开放大学是一所为全日制在职人员提…...
机器学习-学习率:从理论到实战,探索学习率的调整策略
目录 一、引言二、学习率基础定义与解释学习率与梯度下降学习率对模型性能的影响 三、学习率调整策略常量学习率时间衰减自适应学习率AdaGradRMSpropAdam 四、学习率的代码实战环境设置数据和模型常量学习率时间衰减Adam优化器 五、学习率的最佳实践学习率范围测试循环学习率&a…...
【Vue3-Flask-BS架构Web应用】实践笔记1-使用一个bat脚本自动化完整部署环境
前言 近年来,Web开发已经成为计算机科学领域中最热门和多产的领域之一。Python和Vue.js是两个备受欢迎的工具,用于构建现代Web应用程序。在本教程中,我们将探索如何使用这两个工具来创建一个完整的Web项目。我们将完成从安装Python和Vue.js到…...
工作小计-GPU硬编以及依赖库 nvcuvidnvidia-encode
工作小计-GPU编码以及依赖库 已经是第三篇关于编解码的记录了。项目中用到GPU编码很久了,因为yuv太大,所以编码显得很重要。这次遇到的问题是环境的搭建问题。需要把开发机上的环境放到docker中,以保证docker中同样可以进行GPU的编码。 1 定…...
前端 JS 经典:宏任务、微任务、事件循环(EventLoop)
1. 前言概览 js 是一门单线程的非阻塞的脚本语言 单线程:只有一个主线程处理所有任务 非阻塞:有异步任务,主线程挂起这个任务,等异步返回结果再根据一定规则执行 2. 宏任务与微任务 都是异步任务宏任务:script 标签&a…...
电子邮件发送接收原理(附 go 语言实现发送邮件)
前言 首先要了解电子邮件的发送接收,不是点到点的。我想给你传达个消息,不是直接我跑到你家里喊你:“嘿,xxx,是你的益达,快拿走”。 而是类似快递的发送收取方式,是有服务器的中转的。我先将我…...
体系结构评估——(三)风险承担者
风险承担者分为系统生产者、系统消费者、系统服务人员和其他四大类。 其中系统生产者有:软件系统架构师、开发人员、维护人员、集成人员、测试人员、标准专家、 性能工程师、安全专家、项目经理、产品线经理。 系统消费者有:客户、最终用户、应用开发…...
【HarmonyOS】元服务卡片展示动态数据,并定点更新卡片数据
【关键字】 元服务卡片、卡片展示动态数据、更新卡片数据 【写在前面】 本篇文章主要介绍开发元服务卡片时,如何实现卡片中动态显示数据功能,并实现定时数据刷新。本篇文章通过实现定时刷新卡片中日期数据为例,讲述展示动态数据与更新数据功…...
SaveFileDialog.OverwritePrompt
SaveFileDialog.OverwritePrompt 获取或设置一个值,该值指示如果用户指定的文件名已存在,Save As 对话框是否显示警告。 public bool OverwritePrompt { get; set; } OverwritePrompt 控制在将要在改写现在文件时是否提示用户 https://vimsky.com/…...
oracle统计信息
1. 查看表的统计信息 1.建表 SQL> create table test as select * from dba_objects;2.查看表的统计信息 select owner, table_name, num_rows, blocks, avg_row_lenfrom dba_tableswhere owner SCOTTand table_name TEST; OWNER TABLE_NAME NUM_ROWS BLO…...
LeetCode 面试题 16.01. 交换数字
文章目录 一、题目二、C# 题解 一、题目 编写一个函数,不用临时变量,直接交换 numbers [a, b] 中 a 与 b 的值。 示例: 输入: numbers [1,2] 输出: [2,1] 提示: numbers.length 2-2147483647 < numbers[i] < 214748364…...
手机apn介绍
公司遇到一件很棘手的事情,app发版之后,长江以北地方的用户网络信号很好,但是打开app之后网络连接不上,而长江以南的用户网络却很好。大家找了很多资料,提出一些方案: 1、是不是运营商把我们公司的ip给限制…...
垃圾回收系统小程序
在当今社会,废品回收不仅有利于环境保护,也有利于资源的再利用。随着互联网技术的发展,个人废品回收也可以通过小程序来实现。本文将介绍如何使用乔拓云网制作个人废品回收小程序。 1. 找一个合适的第三方制作平台/工具,比如乔拓云…...
【随机过程】布朗运动
这里写目录标题 Brownian motion Brownian motion The brownian motion 1D and brownian motion 2D functions, written with the cumsum command and without for loops, are used to generate a one-dimensional and two-dimensional Brownian motion, respectively. 使用cu…...
基于机器视觉的车道线检测 计算机竞赛
文章目录 1 前言2 先上成果3 车道线4 问题抽象(建立模型)5 帧掩码(Frame Mask)6 车道检测的图像预处理7 图像阈值化8 霍夫线变换9 实现车道检测9.1 帧掩码创建9.2 图像预处理9.2.1 图像阈值化9.2.2 霍夫线变换 最后 1 前言 🔥 优质竞赛项目系列,今天要分…...
C语言文件读写,文件相关操作
文章目录 C语言文件读写,文件相关操作1.C语言万物皆是地址,文件读操作2.文件的写3.文件的复制4.获取文件的大小5.文件的加密解密 C语言文件读写,文件相关操作 1.C语言万物皆是地址,文件读操作 // // Created by MagicBook on 20…...
竞赛选题 深度学习卷积神经网络的花卉识别
文章目录 0 前言1 项目背景2 花卉识别的基本原理3 算法实现3.1 预处理3.2 特征提取和选择3.3 分类器设计和决策3.4 卷积神经网络基本原理 4 算法实现4.1 花卉图像数据4.2 模块组成 5 项目执行结果6 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是 基…...
CMake教程 - basic point
CMake教程 - basic point 1 - Building a Basic Project 最基本的CMake项目是由单个源代码文件构建的可执行文件。对于像这样简单的项目,只需要一个带有三个命令的CMakeLists.txt文件。 注意:尽管CMake支持大写、小写和混合大小写命令,但小…...
CTF show Web 红包题第六弹
提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框,很难让人不联想到SQL注入,但提示都说了不是SQL注入,所以就不往这方面想了 先查看一下网页源码,发现一段JavaScript代码,有一个关键类ctfs…...
Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...
相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...
ip子接口配置及删除
配置永久生效的子接口,2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...
Linux离线(zip方式)安装docker
目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1:修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本:CentOS 7 64位 内核版本:3.10.0 相关命令: uname -rcat /etc/os-rele…...
【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)
本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...
C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)
名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...
CSS | transition 和 transform的用处和区别
省流总结: transform用于变换/变形,transition是动画控制器 transform 用来对元素进行变形,常见的操作如下,它是立即生效的样式变形属性。 旋转 rotate(角度deg)、平移 translateX(像素px)、缩放 scale(倍数)、倾斜 skewX(角度…...
基于PHP的连锁酒店管理系统
有需要请加文章底部Q哦 可远程调试 基于PHP的连锁酒店管理系统 一 介绍 连锁酒店管理系统基于原生PHP开发,数据库mysql,前端bootstrap。系统角色分为用户和管理员。 技术栈 phpmysqlbootstrapphpstudyvscode 二 功能 用户 1 注册/登录/注销 2 个人中…...
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的----NTFS源代码分析--重要
根目录0xa0属性对应的Ntfs!_SCB中的FileObject是什么时候被建立的 第一部分: 0: kd> g Breakpoint 9 hit Ntfs!ReadIndexBuffer: f7173886 55 push ebp 0: kd> kc # 00 Ntfs!ReadIndexBuffer 01 Ntfs!FindFirstIndexEntry 02 Ntfs!NtfsUpda…...
