当前位置: 首页 > news >正文

FlinkAPI开发之自定义函数UDF

案例用到的测试数据请参考文章:
Flink自定义Source模拟数据流
原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048

概述

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类

函数类(Function Classes)

Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
需求:用来从用户的订单数据中筛选订单金额大于50的内容:

方式一:通过匿名类来实现FilterFunction接口:

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 实现自定义接口FilterFunctionDataStream<Orders> streamOperator = ordersDataStreamSource.filter(new FilterFunction<Orders>() {@Overridepublic boolean filter(Orders orders) throws Exception {//过滤金额大于10000元的订单if (orders.getOrder_amount() > 50) {return true;} else {return false;}}});streamOperator.print();environment.execute();}
}

在这里插入图片描述

方式二: 实现FilterFunction接口

import com.zxl.bean.Orders;
import org.apache.flink.api.common.functions.FilterFunction;public class OrderFilter implements FilterFunction<Orders> {@Overridepublic boolean filter(Orders orders) throws Exception {//过滤金额大于10000元的订单if (orders.getOrder_amount() > 50) {return true;} else {return false;}}
}
import com.zxl.Functions.OrderFilter;
import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 返回类型记得修改为 DataStreamDataStream<Orders> operator = ordersDataStreamSource.filter(new OrderFilter());operator.print();environment.execute();}
}

在这里插入图片描述

方式三:采用匿名函数(Lambda)

//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 函数使用Lambda表达式,不需要进行类型声明DataStream<Orders> streamOperator = ordersDataStreamSource.filter(orders -> orders.getOrder_amount() > 50);streamOperator.print();environment.execute();

在这里插入图片描述

富函数类(Rich Function Classes)

“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());ordersDataStreamSource.print();// TODO: 2024/1/7 接口类型第一个是传入类型,第二个是输出类型DataStream<String> operator = ordersDataStreamSource.map(new RichMapFunction<Orders, String>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");}@Overridepublic String map(Orders orders) throws Exception {return orders.getOrder_date().toString()+"字符串";}@Overridepublic void close() throws Exception {super.close();System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");}});operator.print();environment.execute();}
}

相关文章:

FlinkAPI开发之自定义函数UDF

案例用到的测试数据请参考文章&#xff1a; Flink自定义Source模拟数据流 原文链接&#xff1a;https://blog.csdn.net/m0_52606060/article/details/135436048 概述 用户自定义函数&#xff08;user-defined function&#xff0c;UDF&#xff09;&#xff0c;即用户可以根据…...

阿里云国际服务器设置安全防护程序

阿里云云服务器&#xff08;ECS&#xff09;提供弹性、安全、高性能、高性价比的虚拟云服务器&#xff0c;满足您的所有需求。立即在这里免费注册&#xff01; 常见 Web 应用程序 请勿对 Web 服务控制台&#xff08;如 WDCP、TOMCAT、Apache、Nginx、Jekins、PHPMyAdmin、Web…...

C++获取内存使用情况

在程序编程过程中&#xff0c;为了防止出现内存泄漏情况出现&#xff0c;需要持续关注内存程序内存占用情况。如下方式实现获取当前进程内存使用情况&#xff1a; linux&#xff1a; void my_top(string path, bool flag) {if(flag){FILE* read_top fopen("/proc/self/…...

CRMEB多商户短信开发

在使用CRMEB多商户系统的时候&#xff0c;想要二开使用其他平台的短信&#xff0c;这里以阿里云短信为例的具体实现方法。 一、加载阿里云短信的SDK&#xff0c;执行命令&#xff1a;composer require alibabacloud/dysmsapi-20170525 二、增加阿里云短信的驱动 1.在 crmeb\…...

Leetcode 1049 最后一块石头的重量II

题意理解&#xff1a; 有一堆石头&#xff0c;用整数数组 stones 表示。其中 stones[i] 表示第 i 块石头的重量。 每一回合&#xff0c;从中选出任意两块石头&#xff0c;然后将它们一起粉碎。假设石头的重量分别为 x 和 y&#xff0c;且 x < y。 思路转化&#xff1a;我们可…...

【设计模式之美】SOLID 原则之二:开闭原则方法论、开闭原则如何取舍

文章目录 一. 如何理解“对扩展开放、修改关闭”&#xff1f;二. 修改代码就意味着违背开闭原则吗&#xff1f;三. 如何做到“对扩展开放、修改关闭”&#xff1f;四. 如何在项目中灵活应用开闭原则&#xff1f; 一. 如何理解“对扩展开放、修改关闭”&#xff1f; 具体的说&a…...

Kafka 基本概念和术语

1、消息 Record&#xff1a;Kafka 是消息引擎嘛&#xff0c;这里的消息就是指 Kafka 处理的主要对象。 2、主题 Topic&#xff1a;主题是承载消息的逻辑容器&#xff0c;在实际使用中多用来区分具体的业务。在Kafka 中发布订阅的对象是 Topic。 3、分区 Partition&#xf…...

【每日面试题】Docker常见面试题精选

什么是Docker容器&#xff1f; Docker容器是一种轻量级的虚拟化技术&#xff0c;可以将应用及其依赖项打包在一个可移植的容器中&#xff0c;以便在多个环境中运行。 Docker镜像和容器之间有什么区别&#xff1f; Docker镜像是一个包含了应用程序及其依赖项的只读模板&#xf…...

uniapp项目怎么删除顶部导航栏

uniapp去掉顶部导航的方法&#xff1a; 1、去掉所有导航栏 "globalStyle": { "navigationBarTextStyle": "white", "navigationBarTitleText": "uni-app", "navigationBarBackgroundColor": "#007AFF"…...

Midjourney词库

光线与影子篇 闪耀的霓虹灯 shimmeringneon lights 黑暗中的影子 shadows in the dark 照亮城市的月光 moonlightilluminatingthe city 强烈的阳光 strong sunlight 熠熠生辉的霓虹灯 glittering neon lights 黑暗中的神秘影子 mysterious shadows in the dark 照亮城市…...

【微服务】springcloud集成skywalking实现全链路追踪

目录 一、前言 二、环境准备 2.1 软件环境 2.2 微服务模块 2.3 环境搭建...

openssl3.2 - 官方dmeo学习 - server-cmod.c

文章目录 openssl3.2 - 官方dmeo学习 - server-cmod.c概述配置文件格式样例笔记END openssl3.2 - 官方dmeo学习 - server-cmod.c 概述 从配置文件中读参数, 建立TLS服务器, 死等客户端来连接. 客户端连接后, 打印客户端发来的内容. 配置文件格式有要求 配置文件格式样例 # …...

websocket介绍并模拟股票数据推流

Websockt概念 Websockt是一种网络通信协议&#xff0c;允许客户端和服务器双向通信。最大的特点就是允许服务器主动推送数据给客户端&#xff0c;比如股票数据在客户端实时更新&#xff0c;就能利用websocket。 Websockt和http协议一样&#xff0c;并不是设置在linux内核中&a…...

Python获取本机IP

以下代码Python3.11.6、MacOS系统中测试通过 import socketdef get_ip() -> str:with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:s.settimeout(0)try:# doesnt even have to be reachables.connect((10.254.254.254, 1))IP s.getsockname()[0]except Except…...

HTTP 3xx状态码:重定向的场景与区别

HTTP 状态码是服务器响应请求时传递给客户端的重要信息。3xx 系列的状态码主要与重定向有关&#xff0c;用于指示请求的资源已被移动到不同的位置&#xff0c;需要采取不同的操作来访问。 一、301 Moved Permanently 定义&#xff1a; 服务器表明请求的资源已永久移动到一个新…...

LangChain 69 向量数据库Pinecone入门

LangChain系列文章 LangChain 50 深入理解LangChain 表达式语言十三 自定义pipeline函数 LangChain Expression Language (LCEL)LangChain 51 深入理解LangChain 表达式语言十四 自动修复配置RunnableConfig LangChain Expression Language (LCEL)LangChain 52 深入理解LangCh…...

解决STM32F7系列芯片TIM无法触发ADC采样的问题

我在测试STM32F746 ADC DMA TIM 做AD采样时候发现 使用cubeMX 库生成的代码无法进入DMA中断&#xff0c;发现官方勘误手册有做解释&#xff0c;需要打开DAC时钟。如下 如上图&#xff0c;在ADC初始化代码中加入 __HAL_RCC_DAC_CLK_ENABLE();...

观察者设计模式

行为型设计模式 行为型模式&#xff08;Behavioral Patterns&#xff09;&#xff1a;这类模式主要关注对象之间的通信。它们 分别是&#xff1a; 职责链模式&#xff08;Chain of Responsibility&#xff09;命令模式&#xff08;Command&#xff09;解释器模式&#xff08;…...

创建mysql普通用户

一、创建mysql普通用户的原因&#xff1a; 权限控制&#xff1a;MySQL的权限系统允许您为每个用户分配特定的权限。通过创建普通用户&#xff0c;您可以根据需要为每个用户分配特定的数据库和表权限&#xff0c;而不是将所有权限授予一个全局管理员用户。这有助于提高数据库的…...

基于多反应堆的高并发服务器【C/C++/Reactor】(中)完整代码

Buffer.h #pragma oncestruct Buffer {// 指向内存的指针char* data;int capacity;int readPos;int writePos; };// 初始化 struct Buffer* bufferInit(int size);// 销毁 void bufferDestroy(struct Buffer* buf);// 扩容 void bufferExtendRoom(struct Buffer* buf, int siz…...

利用ngx_stream_return_module构建简易 TCP/UDP 响应网关

一、模块概述 ngx_stream_return_module 提供了一个极简的指令&#xff1a; return <value>;在收到客户端连接后&#xff0c;立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量&#xff08;如 $time_iso8601、$remote_addr 等&#xff09;&a…...

rknn优化教程(二)

文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK&#xff0c;开始写第二篇的内容了。这篇博客主要能写一下&#xff1a; 如何给一些三方库按照xmake方式进行封装&#xff0c;供调用如何按…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

【算法训练营Day07】字符串part1

文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接&#xff1a;344. 反转字符串 双指针法&#xff0c;两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...

Spring AI 入门:Java 开发者的生成式 AI 实践之路

一、Spring AI 简介 在人工智能技术快速迭代的今天&#xff0c;Spring AI 作为 Spring 生态系统的新生力量&#xff0c;正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务&#xff08;如 OpenAI、Anthropic&#xff09;的无缝对接&…...

NLP学习路线图(二十三):长短期记忆网络(LSTM)

在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

Redis数据倾斜问题解决

Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中&#xff0c;部分节点存储的数据量或访问量远高于其他节点&#xff0c;导致这些节点负载过高&#xff0c;影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...

3-11单元格区域边界定位(End属性)学习笔记

返回一个Range 对象&#xff0c;只读。该对象代表包含源区域的区域上端下端左端右端的最后一个单元格。等同于按键 End 向上键(End(xlUp))、End向下键(End(xlDown))、End向左键(End(xlToLeft)End向右键(End(xlToRight)) 注意&#xff1a;它移动的位置必须是相连的有内容的单元格…...