当前位置: 首页 > news >正文

0基础学习PyFlink——用户自定义函数之UDTF

大纲

  • 表值函数
  • 完整代码

在《0基础学习PyFlink——用户自定义函数之UDF》中,我们讲解了UDF。本节我们将讲解表值函数——UDTF
在这里插入图片描述

表值函数

我们对比下UDF和UDTF

def udf(f: Union[Callable, ScalarFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None, func_type: str = "general",udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:
def udtf(f: Union[Callable, TableFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_types: Union[List[DataType], DataType, str, List[str]] = None,deterministic: bool = None,name: str = None) -> Union[UserDefinedTableFunctionWrapper, Callable]:

可以发现:

  • UDF比UDTF多了func_type和udf_type参数;
  • UDTF的返回类型比UDF的丰富,多了两个List类型:List[DataType]和List[str];

特别是最后一点,可以认为是UDF和UDTF在应用上的主要区别。
换种更容易理解的说法是:UDTF可以返回任意数量的行作为输出而不是像UDF那样返回单个值(行)。
举一个例子:

word_count_data = ["A", "B", "C", "a", "C"] 

我们希望统计上面这些字符的个数,以及小写后字符的个数。这样A的个数是1,a的个数是2(因为a算一个,A小写后又算一个)。C的个数是2,g的个数是2。
这就要求统计算法在遇到大写字母时,需要统计大小写两种字母;而遇到小写字母时,只需要统计小写字母。

    @udtf(result_types=[DataTypes.STRING()], input_types=row_type_tab_source)def rowFunc(row):if row[0].isupper():yield row[0]yield row[0].lower()else:yield row[0]

yield关键字返回的是generator生成器。Table API对rowFunc的调用最终会生成[“A”,“a”,“B”,“b”,“C”,“c”,“a”,“C”,“c”]。
和调用UDF不同的是,需要使用flat_map来调用UDTF。flat即为“打平”,可以生动的理解为将多维降为一维。

    tab_trans=tab_source.flat_map(rowFunc)tab_trans.execute().print()
+--------------------------------+
|                             f0 |
+--------------------------------+
|                              A |
|                              a |
|                              B |
|                              b |
|                              C |
|                              c |
|                              a |
|                              C |
|                              c |
+--------------------------------+
9 rows in set

由于我们没有指定经过处理的值所属的字段名称,于是会使用默认的f0作为字段名。我们可以使用alias来给它别名下。

    tab_trans_alias=tab_trans.alias('trans_word')tab_trans_alias.execute().print()
+--------------------------------+
|                     trans_word |
+--------------------------------+
|                              A |
|                              a |
|                              B |
|                              b |
|                              C |
|                              c |
|                              a |
|                              C |
|                              c |
+--------------------------------+
9 rows in set

最后我们就可以用这个新的表做字数统计计算

    tab_trans_alias.group_by(col('trans_word')) \.select(col('trans_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()
+I[A, 1]
+I[a, 2]
+I[B, 1]
+I[b, 1]
+I[C, 2]
+I[c, 2]

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunctionword_count_data = ["A", "B", "C", "a", "C"]  def word_count():config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector('print')\.schema(sink_schema) \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)@udtf(result_types=[DataTypes.STRING()], input_types=row_type_tab_source)def rowFunc(row):if row[0].isupper():yield row[0]yield row[0].lower()else:yield row[0]tab_trans=tab_source.flat_map(rowFunc)tab_trans.execute().print()tab_trans_alias=tab_trans.alias('trans_word')tab_trans_alias.execute().print()tab_trans_alias.group_by(col('trans_word')) \.select(col('trans_word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()if __name__ == '__main__':word_count()

相关文章:

0基础学习PyFlink——用户自定义函数之UDTF

大纲 表值函数完整代码 在《0基础学习PyFlink——用户自定义函数之UDF》中,我们讲解了UDF。本节我们将讲解表值函数——UDTF 表值函数 我们对比下UDF和UDTF def udf(f: Union[Callable, ScalarFunction, Type] None,input_types: Union[List[DataType], DataTy…...

【Java 进阶篇】Java Request 原理详解

在网络应用开发中,HTTP请求是一项常见而关键的任务。当我们使用Java编写网络应用时,了解HTTP请求的工作原理变得至关重要。本文将详细介绍Java中HTTP请求的原理,包括请求的结构、发送请求的方法以及处理请求的过程。 HTTP请求的基本结构 HT…...

13 结构性模式-装饰器模式

1 装饰器模式介绍 在软件设计中,装饰器模式是一种用于替代继承的技术,它通过一种无须定义子类的方式给对象动态的增加职责,使用对象之间的关联关系取代类之间的继承关系. 2 装饰器模式原理 //抽象构件类 public abstract class Component{public abstract void operation(); }…...

支持向量机(SVM)

一. 什么是SVM 1. 简介 SVM,曾经是一个特别火爆的概念。它的中文名:支持向量机(Support Vector Machine, 简称SVM)。因为它红极一时,所以关于它的资料特别多,而且杂乱。虽然如此,只要把握住SV…...

Rabbitmq----分布式场景下的应用

服务异步通信-分布式场景下的应用 如果单机模式忘记也可以看看这个快速回顾rabbitmq,在做学习 消息队列在使用过程中,面临着很多实际问题需要思考: 1.消息可靠性 消息从发送,到消费者接收,会经理多个过程: 其中的每一…...

springboot + redis实现签到与统计功能

在很多项目中都会有签到与统计功能,最容易想到的方案是创建一个签到表来记录每个用户的签到记录,比如设计一个mysql数据库表: CREATE TABLE tb_sign id bigint(20) unsigned NOT NULL AUTOINCREMENT COMMENT 主键, user_id bigint(20) unsig…...

Redis | 数据结构(02)SDS

一、键值对数据库是怎么实现的? 在开始讲数据结构之前,先给介绍下 Redis 是怎样实现键值对(key-value)数据库的。 Redis 的键值对中的 key 就是字符串对象,而 value 可以是字符串对象,也可以是集合数据类型…...

Linux C语言开发-D7D8运算符

算术运算符&#xff1a;-*/%&#xff0c;浮点数可以参与除法运算&#xff0c;但不能参与取余运算 a%b&#xff1a;表示取模或取余 关系运算符&#xff1a;<,>,>,<,,! 逻辑运算符:!,&&,|| &&,||逻辑运算符是从左到右&#xff0c;依次运算&#…...

redis 配置主从复制,哨兵模式案例

哨兵(Sentinel)模式 1 . 什么是哨兵模式&#xff1f; 反客为主的自动版&#xff0c;能够自动监控master是否发生故障&#xff0c;如果故障了会根据投票数从slave中挑选一个 作为master&#xff0c;其他的slave会自动转向同步新的master&#xff0c;实现故障自动转义 2 . 原理…...

Python---练习:使用for循环实现用户名+密码认证

案例&#xff1a; 用for循环实现用户登录 ① 输入用户名和密码 ② 判断用户名和密码是否正确&#xff08;usernamelaowang&#xff0c;passwordlw123&#xff09; ③ 登录仅有三次机会&#xff0c;超过3次会报错 思考&#xff1a; 用户登陆情况有3种: ① 用户名错误(此时…...

react中使用jquery 语法

react中使用jquery 语法 npm install jquery引入 import $ from ‘jquery’ import React from react; import ./css/App.css import { Button } from antd; import $ from jquerylet slider_img [https://cdn.jsdelivr.net/gh/xaoxuu/cdn-wallpaper/abstract/41F215B9-261F…...

服务器中了360后缀勒索病毒怎么解决,勒索病毒解密,数据恢复

近期&#xff0c;网络上的各种病毒都比较猖獗&#xff0c;而其中较为明显的就是360后缀勒索病毒&#xff0c;从这个月开始云天数据恢复中心接到很多企业的求助&#xff0c;企业的服务器遭到了360后缀勒索病毒的攻击&#xff0c;通过给用户的服务器检测与加密病毒的分析&#xf…...

使用字节流读取文件中的数据的几种方式

public class FileReader02_ {public static void main(String[] args) {}Testpublic void m1() {String filePath "e:\\hello.txt";FileReader fileReader null;int date0;try {fileReader new FileReader(filePath);//循环读取 使用readwhile ((datefileReader.…...

Android WMS——概述(一)

Android 中的 WMS 指的是 Window Manager Service(窗口管理服务)。WMS 是 Android 系统中的核心服务,主要分为四大部分,分别是窗口管理,窗口动画,输入系统中转站和 Surface 管理 。负责管理应用程序窗口的创建、移动、调整大小和显示等操作。 一、功能简介 WMS 的职责可…...

Node编写获取用户信息接口

目录 前言 初始化路由模块 使用postman发送get获取用户信息请求 初始化路由处理函数模块 获取用户基本信息 前言 在前两篇文章中已经介绍了如何编写用户注册接口以及用户登录接口&#xff0c;这篇文章介绍如何获取用户信息&#xff0c;本篇文章建立在Node编写用户登录接口…...

【从0到1设计一个网关】自研网关的设计要点以及架构设计

文章目录 请求的流程架构设计设计要点项目架构流程设计源码地址: 源码地址 请求的流程 一个HTTP请求发送到网关并完成整个生命周期通常包括以下步骤: 客户端请求: 请求始于客户端,客户端通过HTTP请求(例如GET、POST等)发送请求到API网关的入口点。 API网关接收: API…...

论文-分布式-分布式计算|容错-分布式控制下的自稳定系统

参考文献Self-stabilizing systems in spite of distributed control可以把松散耦合的 循环序列过程 间的同步任务&#xff0c;看成是要保持一个这样的不变性&#xff1a;“系统要处于一种合法状态”因此每个进程在运行每一个可能会改变不变性的步骤之前都要先检查一下是可以执…...

C#压缩图片的方法

/// <summary> /// 图片压缩 /// </summary> /// <param name"imagePath">图片文件路径</param> /// <param name"targetFolder">保存文件夹</param> /// <param name"quality">压缩质量</param&g…...

安装 fcitx + 搜狗/谷歌输入法 之后导致 死机,重启后黑屏只有鼠标可以移动

一般的原因就是 &#xff1a; fcitx 导致的问题 方法就是 先卸载搜狗&#xff0c;再卸载fcitx 解决办法&#xff1a; 首先&#xff1a;ctrlaltF6 进入命令行界面&#xff0c;如果进不去就 ctrlaltF2 接下来执行&#xff1a; sudo apt-get remove sogoupinyin sudo apt-get …...

Maven项目转为SpringBoot项目

Maven项目转为SpringBoot项目 前言创建一个maven项目前的软件的一些通用设置Maven仓库的设置其他的设置字符编码编译器注解支持 创建的Maven项目修改为Spring Boot项目修改pom.xml文件修改启动类-Main新建WAR包所需的类 添加核心配置文件 测试的控制器最后整个项目的目录结构![…...

后进先出(LIFO)详解

LIFO 是 Last In, First Out 的缩写&#xff0c;中文译为后进先出。这是一种数据结构的工作原则&#xff0c;类似于一摞盘子或一叠书本&#xff1a; 最后放进去的元素最先出来 -想象往筒状容器里放盘子&#xff1a; &#xff08;1&#xff09;你放进的最后一个盘子&#xff08…...

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇&#xff0c;在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下&#xff1a; 【Note】&#xff1a;如果你已经完成安装等操作&#xff0c;可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作&#xff0c;重…...

云计算——弹性云计算器(ECS)

弹性云服务器&#xff1a;ECS 概述 云计算重构了ICT系统&#xff0c;云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台&#xff0c;包含如下主要概念。 ECS&#xff08;Elastic Cloud Server&#xff09;&#xff1a;即弹性云服务器&#xff0c;是云计算…...

简易版抽奖活动的设计技术方案

1.前言 本技术方案旨在设计一套完整且可靠的抽奖活动逻辑,确保抽奖活动能够公平、公正、公开地进行,同时满足高并发访问、数据安全存储与高效处理等需求,为用户提供流畅的抽奖体验,助力业务顺利开展。本方案将涵盖抽奖活动的整体架构设计、核心流程逻辑、关键功能实现以及…...

Linux云原生安全:零信任架构与机密计算

Linux云原生安全&#xff1a;零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言&#xff1a;云原生安全的范式革命 随着云原生技术的普及&#xff0c;安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测&#xff0c;到2025年&#xff0c;零信任架构将成为超…...

C# SqlSugar:依赖注入与仓储模式实践

C# SqlSugar&#xff1a;依赖注入与仓储模式实践 在 C# 的应用开发中&#xff0c;数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护&#xff0c;许多开发者会选择成熟的 ORM&#xff08;对象关系映射&#xff09;框架&#xff0c;SqlSugar 就是其中备受…...

Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)

在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马&#xff08;服务器方面的&#xff09;的原理&#xff0c;连接&#xff0c;以及各种木马及连接工具的分享 文件木马&#xff1a;https://w…...

Mysql中select查询语句的执行过程

目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析&#xff08;Parser&#xff09; 2.4、执行sql 1. 预处理&#xff08;Preprocessor&#xff09; 2. 查询优化器&#xff08;Optimizer&#xff09; 3. 执行器…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...

MySQL 部分重点知识篇

一、数据库对象 1. 主键 定义 &#xff1a;主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 &#xff1a;确保数据的完整性&#xff0c;便于数据的查询和管理。 示例 &#xff1a;在学生信息表中&#xff0c;学号可以作为主键&#xff…...