当前位置: 首页 > news >正文

Flink之SideOutput(数据分流)

Flink在早期版本有一个split算子用来做数据分流使用的,但是在flink-1.12开始这个API就已经被删除了,在1.12版本以后我们是通过process算子来做数据分流的,这里就介绍一下如何使用prodess进行数据分流.

  • 代码
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/7* @Description: 测流输出**/
public class FlinkSideOutput {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 这里使用的是自定义数据源为了方便测试,具体数据源根据自己的实际情况进行更换DataStreamSource<CustomizeBean> customizeSourceStream = env.addSource(new CustomizeSource());/*** 需求* 1. 将性别为M且爱好为'羽毛球运动爱好者'分到一个流* 2. 将性别为W且爱好为'篮球运动爱好者'或'钓鱼爱好者'分到一个流* 3. 其他保留到主流**/SingleOutputStreamOperator<CustomizeBean> processedStream = customizeSourceStream.process(new ProcessFunction<CustomizeBean, CustomizeBean>() {@Overridepublic void processElement(CustomizeBean value, ProcessFunction<CustomizeBean, CustomizeBean>.Context ctx, Collector<CustomizeBean> out) throws Exception {String gender = value.getGender(); // 性别String hobbit = value.getHobbit(); // 爱好if (gender.equals("M") && hobbit.equals("羽毛球运动爱好者")) {// 将性别为M且爱好为'羽毛球运动爱好者'进行分流, 注意这里要声明类型,Java无法自行推断ctx.output(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)), value);} else if (gender.equals("W") && (hobbit.equals("篮球运动爱好者") || hobbit.equals("钓鱼爱好者"))) {// 将性别为W且爱好为'篮球运动爱好者'或'钓鱼爱好者'进行分流, 注意这里要声明类型,Java无法自行推断ctx.output(new OutputTag<CustomizeBean>("W-篮球/钓鱼", TypeInformation.of(CustomizeBean.class)), value);} else {// 将剩下的数据保留在主流中out.collect(value);}}});// 获取'M-羽毛球'分流数据,这里也要加上类型声明DataStream<CustomizeBean> mSideOutput = processedStream.getSideOutput(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)));// 打印'M-羽毛球'结果mSideOutput.print("M-羽毛球");// 获取'W-篮球/钓鱼'分流数据,这里也要加上类型声明DataStream<CustomizeBean> wSideOutput = processedStream.getSideOutput(new OutputTag<CustomizeBean>("W-篮球/钓鱼", TypeInformation.of(CustomizeBean.class)));// 打印结果wSideOutput.print("W-篮球/钓鱼");// 主流数据打印结果processedStream.print("主数据流");env.execute("Side Output");}
}
  • 结果数据
主数据流:2> CustomizeBean(name=AAA-641, age=44, gender=W, hobbit=非遗文化爱好者)
主数据流:3> CustomizeBean(name=AAA-17, age=62, gender=M, hobbit=书法爱好者)
主数据流:1> CustomizeBean(name=AAA-429, age=25, gender=W, hobbit=非遗文化爱好者)
主数据流:2> CustomizeBean(name=AAA-218, age=33, gender=M, hobbit=旅游爱好者)
主数据流:3> CustomizeBean(name=AAA-826, age=39, gender=M, hobbit=篮球运动爱好者)
主数据流:1> CustomizeBean(name=AAA-190, age=31, gender=M, hobbit=旅游爱好者)
主数据流:2> CustomizeBean(name=AAA-266, age=32, gender=W, hobbit=网吧战神)
主数据流:3> CustomizeBean(name=AAA-106, age=70, gender=M, hobbit=书法爱好者)
主数据流:1> CustomizeBean(name=AAA-911, age=50, gender=M, hobbit=网吧战神)
M-羽毛球:2> CustomizeBean(name=AAA-925, age=65, gender=M, hobbit=羽毛球运动爱好者)
主数据流:3> CustomizeBean(name=AAA-20, age=59, gender=M, hobbit=书法爱好者)
主数据流:1> CustomizeBean(name=AAA-409, age=79, gender=W, hobbit=天文知识爱好者)
主数据流:2> CustomizeBean(name=AAA-865, age=58, gender=W, hobbit=天文知识爱好者)
主数据流:3> CustomizeBean(name=AAA-898, age=33, gender=M, hobbit=天文知识爱好者)
主数据流:1> CustomizeBean(name=AAA-85, age=38, gender=W, hobbit=非遗文化爱好者)
主数据流:2> CustomizeBean(name=AAA-883, age=51, gender=M, hobbit=美食爱好者)
主数据流:3> CustomizeBean(name=AAA-243, age=37, gender=M, hobbit=钓鱼爱好者)
主数据流:1> CustomizeBean(name=AAA-430, age=28, gender=W, hobbit=旅游爱好者)
主数据流:2> CustomizeBean(name=AAA-127, age=65, gender=W, hobbit=网吧战神)
W-篮球/钓鱼:3> CustomizeBean(name=AAA-986, age=52, gender=W, hobbit=钓鱼爱好者)
主数据流:1> CustomizeBean(name=AAA-840, age=50, gender=W, hobbit=旅游爱好者)
M-羽毛球:2> CustomizeBean(name=AAA-196, age=34, gender=M, hobbit=羽毛球运动爱好者)
主数据流:3> CustomizeBean(name=AAA-142, age=46, gender=W, hobbit=乒乓球运动爱好者)
主数据流:1> CustomizeBean(name=AAA-985, age=78, gender=W, hobbit=美食爱好者)
W-篮球/钓鱼:2> CustomizeBean(name=AAA-490, age=50, gender=W, hobbit=钓鱼爱好者)
主数据流:3> CustomizeBean(name=AAA-295, age=77, gender=M, hobbit=篮球运动爱好者)
主数据流:1> CustomizeBean(name=AAA-754, age=50, gender=M, hobbit=天文知识爱好者)
主数据流:2> CustomizeBean(name=AAA-249, age=35, gender=W, hobbit=羽毛球运动爱好者)
W-篮球/钓鱼:3> CustomizeBean(name=AAA-908, age=27, gender=W, hobbit=钓鱼爱好者)
主数据流:1> CustomizeBean(name=AAA-674, age=73, gender=M, hobbit=非遗文化爱好者)

通过结果内容可以看到数据完全按照我们分流的逻辑进行输出的,如果想在主数据流中讲所有数据保留下来,Collector<Object> out单独拎出来即可,也就是不加到判断逻辑中,代码如下,这里就只展示部分代码了

SingleOutputStreamOperator<CustomizeBean> processedStream = customizeSourceStream.process(new ProcessFunction<CustomizeBean, CustomizeBean>() {@Overridepublic void processElement(CustomizeBean value, ProcessFunction<CustomizeBean, CustomizeBean>.Context ctx, Collector<CustomizeBean> out) throws Exception {String gender = value.getGender(); // 性别String hobbit = value.getHobbit(); // 爱好// 将所有数据保留在主流中out.collect(value);// 开始进行分流处理if (gender.equals("M") && hobbit.equals("羽毛球运动爱好者")) {// 将性别为M且爱好为'羽毛球运动爱好者'进行分流, 注意这里要声明类型,Java无法自行推断ctx.output(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)), value);} else if ((gender.equals("W") && (hobbit.equals("篮球运动爱好者")) || (gender.equals("W") && hobbit.equals("钓鱼爱好者")))) {// 将性别为W且爱好为'篮球运动爱好者'或'钓鱼爱好者'进行分流, 注意这里要声明类型,Java无法自行推断ctx.output(new OutputTag<CustomizeBean>("W-篮球/钓鱼", TypeInformation.of(CustomizeBean.class)), value);}}});

所有的内容到这里就结束了.

相关文章:

Flink之SideOutput(数据分流)

Flink在早期版本有一个split算子用来做数据分流使用的,但是在flink-1.12开始这个API就已经被删除了,在1.12版本以后我们是通过process算子来做数据分流的,这里就介绍一下如何使用prodess进行数据分流. 代码 import org.apache.flink.api.common.typeinfo.TypeInformation; im…...

Android Studio新版本logcat过滤说明

按包名过滤 //输入package:&#xff08;输入一个p就会有提示的&#xff09; &#xff0c;后面加上包名 比如: package:com.xal.runcontrol package:包名可以完整或者输部分包名即可 package:包名需要输完整准确 package~:正则表达式过滤 不了解正则表达式的可以参考&#…...

carsim与matlab仿真

matlab2021a安装教程&#xff0c;亲测。 百度网盘&#xff1a; matlab2021a安装包 提取码&#xff1a;1223 CarSim2020安装教程&#xff0c; 亲测。 百度网盘&#xff1a; CarSim2020安装包 提取码&#xff1a;1223 &#xff0c;破解可参考 b站视频...

rust里如何快速实现一个LRU 本地缓存?

LRU是Least Recently Used&#xff08;最近最少使用&#xff09;的缩写&#xff0c;是一种常见的缓存淘汰算法。LRU算法的基本思想是&#xff0c;当缓存空间已满时&#xff0c;优先淘汰最近最少使用的数据&#xff0c;以保留最常用的数据。 在计算机系统中&#xff0c;LRU算法…...

MQTT 订阅接收消息 mosquitto 方式

1 说明 采用 mosquitto 库&#xff0c;实现订阅主题&#xff0c;并接收消息。其中服务器有做限制&#xff0c;需要对应的 cilent id &#xff0c;cafile 、certfile 、keyfile 等配置2 环境 采用ubuntu 直接编译调试 安装mosquitto 库 sudo apt install libmosquitto-dev su…...

以mod_jk方式整合apache与tomcat(动静分离)

前言&#xff1a; 为什么要整合apache和tomcat apache对静态页面的处理能力强&#xff0c;而tomcat对静态页面的处理不如apache&#xff0c;整合后有以下好处 提升对静态文件的处理性能 利用 Web 服务器来做负载均衡以及容错 更完善地去升级应用程序 jk整合方式介绍&#…...

springboot动态数据源切换

1&#xff09;、就是将多个数据源全部注入到bean中&#xff0c;根据需要实现多数据源之间的切换。 2&#xff09;、使用baomidou的DS注解。见文章DS注解实现数据源动态切换 com.baomidou dynamic-datasource-spring-boot-starter 3.5.1 ##设置默认的数据源或者数据源组,默认值…...

代码随想录训练营day14

101. 对称二叉树 给你一个二叉树的根节点 root &#xff0c; 检查它是否轴对称。 func isSymmetric(root *TreeNode) bool {if root nil{ return true}return judge(root.Left,root.Right) }func judge(lf *TreeNode , ri *TreeNode)bool{if lf nil && ri nil{ retu…...

功能测试进阶自动化测试如何摸清学习方向,少走弯路呢?

目录 抛开疑问&#xff0c;只做学术探讨 小白在想什么&#xff1f; 盖楼之前先打好地基&#xff0c;首先需要学习一门语言 语言入门后&#xff0c;正式踏上开始自动化成神之路&#xff0c;入门篇Selenium 玩腻了Selenium 开始接触自动化框架unittest/testNG 不满足于单元…...

检测前端是否可以ping通后端返回的ip地址

检测前端是否可以ping通后端返回的ip地址 前端检测是否可ping通ip地址&#xff08;PC端&#xff09;前端检测是否可ping通ip地址&#xff08;uniapp小程序端&#xff09; 前端检测是否可ping通ip地址&#xff08;PC端&#xff09; // 前端检测是否可ping通ip地址 ping…...

SMART司马他法则(目标管理)

S代表具体(Specific)&#xff0c;指绩效考核要切中特定的工作指标&#xff0c;不能笼统&#xff1b; M代表可度量(Measurable)&#xff0c;指绩效指标是数量化或者行为化的&#xff0c;验证这些绩效指标的数据或者信息是可以获得的&#xff1b; A代表可实现(Attainable)&…...

【LeetCode】删除并获得点数

删除并获得点数 题目描述算法分析编程代码空间优化 链接: 删除并获得点数 题目描述 算法分析 编程代码 class Solution { public:int deleteAndEarn(vector<int>& nums) {const int N 10001;int arr[N] {0};for(const auto& n : nums){arr[n]n;}vector<in…...

SciencePub学术 | 传感器类重点SCIE征稿中

SciencePub学术 刊源推荐: 传感器类重点SCIE征稿中&#xff01;信息如下&#xff0c;录满为止&#xff1a; 一、期刊概况&#xff1a; 传感器类重点SCIE 【期刊简介】IF&#xff1a;2.0-2.5&#xff0c;JCR3区&#xff0c;中科院4区&#xff1b; 【版面类型】正刊&#xff1…...

移动端开发基础总结

移动端学习总结 (适合于复习) 移动端基础 技术选型&#xff1a; 单独制作移动端页面&#xff08;主流&#xff09; 流式布局&#xff08;百分比布局&#xff09;flex弹性布局&#xff08;强烈推荐&#xff09;lessrem媒体查询布局混合布局 响应式页面兼容移动端&#xff08;…...

小X学游泳(深搜)

第一题 题目描述 小X想要学游泳。 这天&#xff0c;小X来到了游泳池&#xff0c;发现游泳池可以用N行M列的格子来表示&#xff0c;每个格子的面积都是1&#xff0c;且格子内水深相同。 由于小X刚刚入门&#xff0c;他只能在水深相同的地方游泳。为此&#xff0c;他把整个游泳池…...

分布式协议与算法——拜占庭将军问题

拜占庭将军问题 背景&#xff1a;以战国时期为背景 战国时期&#xff0c;齐、楚、燕、韩、赵、魏、秦七雄并立&#xff0c;后来秦国的势力不断强大起来&#xff0c;成了东方六国的共同威胁。于是&#xff0c;这六个国家决定联合&#xff0c;全力抗秦&#xff0c;免得被秦国各个…...

MySQL数据库管理的基本原则和技巧

MySQL数据库是一种常用的关系型数据库管理系统&#xff0c;用于存储和管理大量的数据。在进行MySQL数据库管理时&#xff0c;有一些基本原则和技巧可以帮助我们更有效地管理数据库。 数据库设计原则&#xff1a; 合理规划数据表结构&#xff1a; 根据数据之间的关系和业务需求…...

SQL-每日一题【1193. 每月交易 I】

题目 Table: Transactions 编写一个 sql 查询来查找每个月和每个国家/地区的事务数及其总金额、已批准的事务数及其总金额。 以 任意顺序 返回结果表。 查询结果格式如下所示。 示例 1: 解题思路 1.题目要求我们查找每个月和每个国家/地区的事务数及其总金额、已批准的事务数…...

探析青少年口才训练在个人发展中的重要性与影响

论文题目&#xff1a;探析青少年口才训练在个人发展中的重要性与影响 摘要&#xff1a; 本论文旨在探讨青少年口才训练对个人发展的重要性和影响。通过对相关文献的综述和实证研究的分析&#xff0c;论文将阐述口才训练对青少年自信心、表达能力和思维能力的提升&#xff0c;以…...

HTML 元素的 class 和 id 属性有何区别?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 唯一性⭐ 选择器权重⭐ JS操作⭐ CSS和JavaScript引用⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏…...

图片调色思路分享

图片调色是摄影后期处理的核心环节&#xff0c;旨在塑造画面的色彩氛围、统一风格、突出主题或表达情感。以下是一个系统的调色思路&#xff0c;结合了您提纲中的基础调整与色彩管理部分&#xff1a;1. 基础定调与校正 (奠定基础)审视直方图与曝光&#xff1a;首先观察图像的直…...

终极Label Studio数据标注指南:从零开始构建AI训练数据集

终极Label Studio数据标注指南&#xff1a;从零开始构建AI训练数据集 【免费下载链接】label-studio Label Studio is a multi-type data labeling and annotation tool with standardized output format 项目地址: https://gitcode.com/GitHub_Trending/la/label-studio …...

3分钟搞定AdGuard浏览器扩展安装:终极广告拦截与隐私保护指南

3分钟搞定AdGuard浏览器扩展安装&#xff1a;终极广告拦截与隐私保护指南 【免费下载链接】AdguardBrowserExtension AdGuard browser extension 项目地址: https://gitcode.com/gh_mirrors/ad/AdguardBrowserExtension AdGuard浏览器扩展是一款功能强大的开源广告拦截工…...

数据仓库实战:复杂多层级维度建模全解 + 模型优化最佳实践

数据仓库实战&#xff1a;复杂多层级维度建模全解 模型优化最佳实践摘要一、基础认知&#xff1a;什么是复杂多层级维度&#xff1f;1.1 核心定义1.2 典型多层级维度场景1.3 多层级维度三大特征二、标准流程&#xff1a;多层级维度建模完整流程2.1 建模流程图2.2 分步流程说明…...

AnimateDiff终极指南:3步将静态图片变生动动画的免费神器

AnimateDiff终极指南&#xff1a;3步将静态图片变生动动画的免费神器 【免费下载链接】animatediff 项目地址: https://ai.gitcode.com/hf_mirrors/ai-gitcode/animatediff 你是否曾经幻想过&#xff0c;只需一行文字描述&#xff0c;就能让静态图片"活"起来…...

突破视频下载壁垒:yt-dlp-gui的全场景应用指南

突破视频下载壁垒&#xff1a;yt-dlp-gui的全场景应用指南 【免费下载链接】yt-dlp-gui Windows GUI for yt-dlp 项目地址: https://gitcode.com/gh_mirrors/yt/yt-dlp-gui 在数字化时代&#xff0c;视频内容已成为信息传递与知识获取的重要载体。然而&#xff0c;多数平…...

AI辅助开发:利用快马平台为520888·moc中创造交互式视觉艺术效果

AI辅助开发&#xff1a;利用快马平台为520888moc中创造交互式视觉艺术效果 最近在做一个创意项目&#xff0c;需要把"520888moc中"这个特殊字符串做成一个酷炫的交互式视觉艺术效果。作为一个前端开发新手&#xff0c;我一开始有点无从下手&#xff0c;直到发现了In…...

从底层源码深入分析Bean的实例化

在技术领域&#xff0c;我们常常被那些闪耀的、可见的成果所吸引。今天&#xff0c;这个焦点无疑是大语言模型技术。它们的流畅对话、惊人的创造力&#xff0c;让我们得以一窥未来的轮廓。然而&#xff0c;作为在企业一线构建、部署和维护复杂系统的实践者&#xff0c;我们深知…...

【learn-claude-code】S06ContextCompact - 上下文压缩:上下文会满,你需要腾出空间

核心理念 “上下文会满&#xff0c;你需要腾出空间” – 三层压缩策略&#xff0c;实现无限会话。 源码&#xff1a;https://github.com/xiayongchao/learn-claude-code-4j/blob/main/src/main/java/org/jc/agents/S06ContextCompact.java原版&#xff1a;https://github.com…...

3大核心功能解析:飞秋Mac版如何实现高效局域网通信

3大核心功能解析&#xff1a;飞秋Mac版如何实现高效局域网通信 【免费下载链接】feiq 基于qt实现的mac版飞秋&#xff0c;遵循飞秋协议(飞鸽扩展协议)&#xff0c;支持多项飞秋特有功能 项目地址: https://gitcode.com/gh_mirrors/fe/feiq 还在为Mac与Windows设备间的通…...