当前位置: 首页 > news >正文

FlinkAPI开发之自定义函数UDF

案例用到的测试数据请参考文章:
Flink自定义Source模拟数据流
原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048

概述

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类

函数类(Function Classes)

Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
需求:用来从用户的订单数据中筛选订单金额大于50的内容:

方式一:通过匿名类来实现FilterFunction接口:

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 实现自定义接口FilterFunctionDataStream<Orders> streamOperator = ordersDataStreamSource.filter(new FilterFunction<Orders>() {@Overridepublic boolean filter(Orders orders) throws Exception {//过滤金额大于10000元的订单if (orders.getOrder_amount() > 50) {return true;} else {return false;}}});streamOperator.print();environment.execute();}
}

在这里插入图片描述

方式二: 实现FilterFunction接口

import com.zxl.bean.Orders;
import org.apache.flink.api.common.functions.FilterFunction;public class OrderFilter implements FilterFunction<Orders> {@Overridepublic boolean filter(Orders orders) throws Exception {//过滤金额大于10000元的订单if (orders.getOrder_amount() > 50) {return true;} else {return false;}}
}
import com.zxl.Functions.OrderFilter;
import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 返回类型记得修改为 DataStreamDataStream<Orders> operator = ordersDataStreamSource.filter(new OrderFilter());operator.print();environment.execute();}
}

在这里插入图片描述

方式三:采用匿名函数(Lambda)

//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 函数使用Lambda表达式,不需要进行类型声明DataStream<Orders> streamOperator = ordersDataStreamSource.filter(orders -> orders.getOrder_amount() > 50);streamOperator.print();environment.execute();

在这里插入图片描述

富函数类(Rich Function Classes)

“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());ordersDataStreamSource.print();// TODO: 2024/1/7 接口类型第一个是传入类型,第二个是输出类型DataStream<String> operator = ordersDataStreamSource.map(new RichMapFunction<Orders, String>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");}@Overridepublic String map(Orders orders) throws Exception {return orders.getOrder_date().toString()+"字符串";}@Overridepublic void close() throws Exception {super.close();System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");}});operator.print();environment.execute();}
}

相关文章:

FlinkAPI开发之自定义函数UDF

案例用到的测试数据请参考文章&#xff1a; Flink自定义Source模拟数据流 原文链接&#xff1a;https://blog.csdn.net/m0_52606060/article/details/135436048 概述 用户自定义函数&#xff08;user-defined function&#xff0c;UDF&#xff09;&#xff0c;即用户可以根据…...

阿里云国际服务器设置安全防护程序

阿里云云服务器&#xff08;ECS&#xff09;提供弹性、安全、高性能、高性价比的虚拟云服务器&#xff0c;满足您的所有需求。立即在这里免费注册&#xff01; 常见 Web 应用程序 请勿对 Web 服务控制台&#xff08;如 WDCP、TOMCAT、Apache、Nginx、Jekins、PHPMyAdmin、Web…...

C++获取内存使用情况

在程序编程过程中&#xff0c;为了防止出现内存泄漏情况出现&#xff0c;需要持续关注内存程序内存占用情况。如下方式实现获取当前进程内存使用情况&#xff1a; linux&#xff1a; void my_top(string path, bool flag) {if(flag){FILE* read_top fopen("/proc/self/…...

CRMEB多商户短信开发

在使用CRMEB多商户系统的时候&#xff0c;想要二开使用其他平台的短信&#xff0c;这里以阿里云短信为例的具体实现方法。 一、加载阿里云短信的SDK&#xff0c;执行命令&#xff1a;composer require alibabacloud/dysmsapi-20170525 二、增加阿里云短信的驱动 1.在 crmeb\…...

Leetcode 1049 最后一块石头的重量II

题意理解&#xff1a; 有一堆石头&#xff0c;用整数数组 stones 表示。其中 stones[i] 表示第 i 块石头的重量。 每一回合&#xff0c;从中选出任意两块石头&#xff0c;然后将它们一起粉碎。假设石头的重量分别为 x 和 y&#xff0c;且 x < y。 思路转化&#xff1a;我们可…...

【设计模式之美】SOLID 原则之二:开闭原则方法论、开闭原则如何取舍

文章目录 一. 如何理解“对扩展开放、修改关闭”&#xff1f;二. 修改代码就意味着违背开闭原则吗&#xff1f;三. 如何做到“对扩展开放、修改关闭”&#xff1f;四. 如何在项目中灵活应用开闭原则&#xff1f; 一. 如何理解“对扩展开放、修改关闭”&#xff1f; 具体的说&a…...

Kafka 基本概念和术语

1、消息 Record&#xff1a;Kafka 是消息引擎嘛&#xff0c;这里的消息就是指 Kafka 处理的主要对象。 2、主题 Topic&#xff1a;主题是承载消息的逻辑容器&#xff0c;在实际使用中多用来区分具体的业务。在Kafka 中发布订阅的对象是 Topic。 3、分区 Partition&#xf…...

【每日面试题】Docker常见面试题精选

什么是Docker容器&#xff1f; Docker容器是一种轻量级的虚拟化技术&#xff0c;可以将应用及其依赖项打包在一个可移植的容器中&#xff0c;以便在多个环境中运行。 Docker镜像和容器之间有什么区别&#xff1f; Docker镜像是一个包含了应用程序及其依赖项的只读模板&#xf…...

uniapp项目怎么删除顶部导航栏

uniapp去掉顶部导航的方法&#xff1a; 1、去掉所有导航栏 "globalStyle": { "navigationBarTextStyle": "white", "navigationBarTitleText": "uni-app", "navigationBarBackgroundColor": "#007AFF"…...

Midjourney词库

光线与影子篇 闪耀的霓虹灯 shimmeringneon lights 黑暗中的影子 shadows in the dark 照亮城市的月光 moonlightilluminatingthe city 强烈的阳光 strong sunlight 熠熠生辉的霓虹灯 glittering neon lights 黑暗中的神秘影子 mysterious shadows in the dark 照亮城市…...

【微服务】springcloud集成skywalking实现全链路追踪

目录 一、前言 二、环境准备 2.1 软件环境 2.2 微服务模块 2.3 环境搭建...

openssl3.2 - 官方dmeo学习 - server-cmod.c

文章目录 openssl3.2 - 官方dmeo学习 - server-cmod.c概述配置文件格式样例笔记END openssl3.2 - 官方dmeo学习 - server-cmod.c 概述 从配置文件中读参数, 建立TLS服务器, 死等客户端来连接. 客户端连接后, 打印客户端发来的内容. 配置文件格式有要求 配置文件格式样例 # …...

websocket介绍并模拟股票数据推流

Websockt概念 Websockt是一种网络通信协议&#xff0c;允许客户端和服务器双向通信。最大的特点就是允许服务器主动推送数据给客户端&#xff0c;比如股票数据在客户端实时更新&#xff0c;就能利用websocket。 Websockt和http协议一样&#xff0c;并不是设置在linux内核中&a…...

Python获取本机IP

以下代码Python3.11.6、MacOS系统中测试通过 import socketdef get_ip() -> str:with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:s.settimeout(0)try:# doesnt even have to be reachables.connect((10.254.254.254, 1))IP s.getsockname()[0]except Except…...

HTTP 3xx状态码:重定向的场景与区别

HTTP 状态码是服务器响应请求时传递给客户端的重要信息。3xx 系列的状态码主要与重定向有关&#xff0c;用于指示请求的资源已被移动到不同的位置&#xff0c;需要采取不同的操作来访问。 一、301 Moved Permanently 定义&#xff1a; 服务器表明请求的资源已永久移动到一个新…...

LangChain 69 向量数据库Pinecone入门

LangChain系列文章 LangChain 50 深入理解LangChain 表达式语言十三 自定义pipeline函数 LangChain Expression Language (LCEL)LangChain 51 深入理解LangChain 表达式语言十四 自动修复配置RunnableConfig LangChain Expression Language (LCEL)LangChain 52 深入理解LangCh…...

解决STM32F7系列芯片TIM无法触发ADC采样的问题

我在测试STM32F746 ADC DMA TIM 做AD采样时候发现 使用cubeMX 库生成的代码无法进入DMA中断&#xff0c;发现官方勘误手册有做解释&#xff0c;需要打开DAC时钟。如下 如上图&#xff0c;在ADC初始化代码中加入 __HAL_RCC_DAC_CLK_ENABLE();...

观察者设计模式

行为型设计模式 行为型模式&#xff08;Behavioral Patterns&#xff09;&#xff1a;这类模式主要关注对象之间的通信。它们 分别是&#xff1a; 职责链模式&#xff08;Chain of Responsibility&#xff09;命令模式&#xff08;Command&#xff09;解释器模式&#xff08;…...

创建mysql普通用户

一、创建mysql普通用户的原因&#xff1a; 权限控制&#xff1a;MySQL的权限系统允许您为每个用户分配特定的权限。通过创建普通用户&#xff0c;您可以根据需要为每个用户分配特定的数据库和表权限&#xff0c;而不是将所有权限授予一个全局管理员用户。这有助于提高数据库的…...

基于多反应堆的高并发服务器【C/C++/Reactor】(中)完整代码

Buffer.h #pragma oncestruct Buffer {// 指向内存的指针char* data;int capacity;int readPos;int writePos; };// 初始化 struct Buffer* bufferInit(int size);// 销毁 void bufferDestroy(struct Buffer* buf);// 扩容 void bufferExtendRoom(struct Buffer* buf, int siz…...

vscode里如何用git

打开vs终端执行如下&#xff1a; 1 初始化 Git 仓库&#xff08;如果尚未初始化&#xff09; git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...

【Java学习笔记】Arrays类

Arrays 类 1. 导入包&#xff1a;import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序&#xff08;自然排序和定制排序&#xff09;Arrays.binarySearch()通过二分搜索法进行查找&#xff08;前提&#xff1a;数组是…...

Linux相关概念和易错知识点(42)(TCP的连接管理、可靠性、面临复杂网络的处理)

目录 1.TCP的连接管理机制&#xff08;1&#xff09;三次握手①握手过程②对握手过程的理解 &#xff08;2&#xff09;四次挥手&#xff08;3&#xff09;握手和挥手的触发&#xff08;4&#xff09;状态切换①挥手过程中状态的切换②握手过程中状态的切换 2.TCP的可靠性&…...

VTK如何让部分单位不可见

最近遇到一个需求&#xff0c;需要让一个vtkDataSet中的部分单元不可见&#xff0c;查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行&#xff0c;是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示&#xff0c;主要是最后一个参数&#xff0c;透明度…...

关于 WASM:1. WASM 基础原理

一、WASM 简介 1.1 WebAssembly 是什么&#xff1f; WebAssembly&#xff08;WASM&#xff09; 是一种能在现代浏览器中高效运行的二进制指令格式&#xff0c;它不是传统的编程语言&#xff0c;而是一种 低级字节码格式&#xff0c;可由高级语言&#xff08;如 C、C、Rust&am…...

云原生玩法三问:构建自定义开发环境

云原生玩法三问&#xff1a;构建自定义开发环境 引言 临时运维一个古董项目&#xff0c;无文档&#xff0c;无环境&#xff0c;无交接人&#xff0c;俗称三无。 运行设备的环境老&#xff0c;本地环境版本高&#xff0c;ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲

文章目录 前言第一部分&#xff1a;体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分&#xff1a;体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...

在Mathematica中实现Newton-Raphson迭代的收敛时间算法(一般三次多项式)

考察一般的三次多项式&#xff0c;以r为参数&#xff1a; p[z_, r_] : z^3 (r - 1) z - r; roots[r_] : z /. Solve[p[z, r] 0, z]&#xff1b; 此多项式的根为&#xff1a; 尽管看起来这个多项式是特殊的&#xff0c;其实一般的三次多项式都是可以通过线性变换化为这个形式…...

Windows安装Miniconda

一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...

4. TypeScript 类型推断与类型组合

一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式&#xff0c;自动确定它们的类型。 这一特性减少了显式类型注解的需要&#xff0c;在保持类型安全的同时简化了代码。通过分析上下文和初始值&#xff0c;TypeSc…...