当前位置: 首页 > news >正文

Flink Flink中的分流

一、什么是分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
在这里插入图片描述

二、基于filter算子的简单实现分流

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

package com.flink.DataStream.SplitStream;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkSplitStreamByFilter {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration().set(RestOptions.BIND_PORT, "8081"));//.getExecutionEnvironment();//TODO 设置全局并行度为2streamExecutionEnvironment.setParallelism(2);DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);//TODO 先将输入流转为Integer类型SingleOutputStreamOperator<Integer> mapResult = dataStreamSource.map((input) -> {int i = Integer.parseInt(input);return i;});//TODO 使用匿名函数分流偶数流SingleOutputStreamOperator<Integer> ds1 = mapResult.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer a) throws Exception {return a % 2 == 0;}});//TODO 使用lamda表达式分流奇数流SingleOutputStreamOperator<Integer> ds2 = mapResult.filter((a) -> a % 2 == 1);ds1.print("偶数流");ds2.print("奇数流");streamExecutionEnvironment.execute();}
}

执行结果

奇数流:1> 1
偶数流:2> 2
偶数流:1> 2
偶数流:2> 4
奇数流:1> 3
奇数流:2> 1Process finished with exit code 130 (interrupted by signal 2: SIGINT)

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?

三、使用测输出流

关于处理函数中侧输出流的用法,我们已经在 7.5 节做了详细介绍。简单来说,只需要调用上下文 ctx 的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的 id 和类型。

相关文章:

Flink Flink中的分流

一、什么是分流 所谓“分流”&#xff0c;就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream&#xff0c;定义一些筛选条件&#xff0c;将符合条件的数据拣选出来放到对应的流里。 二、基于filter算子的简单实现分流 其实根据条件筛选数据的需求…...

传输层协议[精选]

网络: 跨主机通信. 互联网通信: 两点之间的通信路径有无数条. 集线器: 把一根网线差出来两根,但是同一时刻只能有一根线跑.交换机: 组建局域网.路由器: 本质就是将两个局域网连接起来 交换机和路由器之间的区别越来越模糊. 调制解调器: 使用电话线上网的时候,需要将电话线的模…...

LeetCode算法题解|474. 一和零

474. 一和零 题目链接&#xff1a;474. 一和零 题目描述 给你一个二进制字符串数组 strs 和两个整数 m 和 n 。 请你找出并返回 strs 的最大子集的长度&#xff0c;该子集中 最多 有 m 个 0 和 n 个 1 。 如果 x 的所有元素也是 y 的元素&#xff0c;集合 x 是集合 y 的 子…...

一种太阳能风能市电互补路灯方案介绍

太阳能市电互补路灯是一种环保、节能的照明设施&#xff0c;它利用太阳能进行发电并实现照明。这种路灯在白天吸收阳光并将其转化为电能&#xff0c;到了晚上则利用储存的电能为LED灯提供电力&#xff0c;实现照明功能。下面叁仟智慧将详细介绍太阳能市电互补路灯灯的工作原理和…...

世微 dc-dc降压恒流 LED汽车大灯 单灯 14V5A 68W车灯驱动方案 AP5191

产品描述 AP5191是一款PWM工作模式,高效率、外围简单、外置功率MOS管&#xff0c;适用于4.5-150V输入的高精度降压LED恒流驱动芯片。输出最大功率150W&#xff0c;最大电流6A。AP5191可实现线性调光和PWM调光&#xff0c;线性调光脚有效电压范围0.55-2.6V.AP5191 工作频率可以…...

基于时隙的多重冗余流指纹模型

文章信息 论文题目&#xff1a;基于时隙的多重冗余流指纹模型 期刊&#xff08;会议&#xff09;&#xff1a;网络与信息安全学报 时间&#xff1a;2023 级别&#xff1a;CCF C 概述 为确保内生网络流量安全可信&#xff0c;本文在研究流水印及其扩展的流指纹机制的基础上&a…...

Visual Studio 2019 C# System.BadImageFormatException 解决方法

文章目录 1.DLL文件缺失或不匹配原因解决方法 2.系统环境变量Path下内容过多原因解决方法 3.位数错误原因解决方法 分析几种可能因素 1.DLL文件缺失或不匹配 原因 检查对应Debug路径下的DLL文件是否有缺失 解决方法 将对应的DLL文件放到Debug文件夹里面&#xff0c;检查冗余…...

深度学习之基于YoloV5车辆和行人目标检测系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介YOLOv5 简介YOLOv5 特点 车辆和行人目标检测系统 二、功能三、系统四. 总结 一项目简介 # 深度学习之基于 YOLOv5 车辆和行人目标检测系统介绍 深度学习在…...

Django框架之中间件

目录 一、引入 二、Django中间件介绍 【1】什么是Django中间件 【2】Django中间件的作用 【3】示例 三、Django请求生命周期流程图 四、Django中间件是Django的门户 五、Django中间件详解 六、中间件必须要掌握的两个方法 (1) process_request (2) process_respon…...

BTC 复兴:Ordinals 带来创新活力,BitVM 与 BitStream 相继问世

除了备受瞩目的 ETF&#xff0c;今年 Bitcoin 生态迎来全新的发展活力和机遇。Ordinals 协议的横空出世&#xff0c;以此为基础诞生的 BRC20 协议给整个比特币生态带去了一波新的能量&#xff0c;迎来铭文热度高涨。而诸如 BitVM、BitStream 等新技术甫一问世&#xff0c;便引发…...

STM32 CAN协议讲解以及代码

STM32 CAN 文章目录 STM32 CAN前言一、CAN外设1.主控制寄存器CAN_MCR2.位时序寄存器CAN_BTR3.CAN的发送邮箱4.CAN的接收FIFO5.验收筛选器 二、代码配置1.初始化2.发送数据3.接收数据4.main.c 前言 前面学习了CAN的一些理论知识&#xff0c;他在我们的STM32里面是怎么用的呢 前…...

京东数据分析(京东大数据):2023年10月京东手机行业品牌销售排行榜

鲸参谋监测的京东平台10月份手机市场销售数据已出炉&#xff01; 根据鲸参谋平台的数据显示&#xff0c;今年10月份&#xff0c;京东平台手机行业的销量约340万&#xff0c;环比增长约11%&#xff0c;同比则下滑约2%&#xff1b;销售额为108亿&#xff0c;环比增长约17%&#x…...

计算机毕业设计 基于Hadoop的物品租赁系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…...

pop链反序列化 [MRCTF2020]Ezpop1

打开题目 网站源码为 Welcome to index.php <?php //flag is in flag.php //WTF IS THIS? //Learn From https://ctf.ieki.xyz/library/php.html#%E5%8F%8D%E5%BA%8F%E5%88%97%E5%8C%96%E9%AD%94%E6%9C%AF%E6%96%B9%E6%B3%95 //And Crack It! class Modifier {protected …...

yolov5从英伟达平台移植到华为昇腾开发板上的思路

作者&#xff1a;朱金灿 来源&#xff1a;clever101的专栏 为什么大多数人学不会人工智能编程&#xff1f;>>> 最近需要将yolov5代码从英伟达平台移植到华为昇腾开发板上。搜了一些代码和资料&#xff0c;大致明白了二者的差别。 1.二者使用的模型文件不一样 yolov…...

网络运维与网络安全 学习笔记2023.11.25

网络运维与网络安全 学习笔记 第二十六天 今日目标 ACL原理与类型、基本ACL配置、高级ACL配置 高级ACL之ICMP、高级ACL之telnet ACL原理与类型 项目背景 为了企业的业务安全&#xff0c;要求不同部门对服务器有不同的权限 PC1不能访问Server PC2允许访问Server 允许其他所…...

Trustzone/TEE/安全 面试100问

关键词:cache学习、mmu学习、cache资料、mmu资料、arm资料、armv8资料、armv9资料、 trustzone视频、tee视频、ATF视频、secureboot视频、安全启动视频、selinux视频,cache视频、mmu视频,armv8视频、armv9视频、FF-A视频、密码学视频、RME/CCA视频、学习资料下载、免费学习资…...

【数据结构】D : 图的顶点可达闭包

D : 图的顶点可达闭包 Description 给定有向图的邻接矩阵A&#xff0c;其元素定义为&#xff1a;若存在顶点i到顶点j的有向边则A[i,j]1&#xff0c;若没有有向边则A[i,j] 0。试求A的可达闭包矩阵A*&#xff0c;其元素定义为&#xff1a;若存在顶点i到顶点j的有向路径则A*[i,j…...

链表?细!详细知识点总结!

链表 定义&#xff1a;链表是一种递归的数据结构&#xff0c;它或者为空&#xff08;null)&#xff0c;或者是指向一个结点&#xff08;node&#xff09;的引用&#xff0c;该结点含有一个泛型的元素和一个指向另一条链表的引用。 ​ 其实链表就是有序的列表&#xff0c;它在内…...

【数据结构实验】排序(三)快速排序算法的改进(三者取中法)

文章目录 1. 引言2. 快速排序算法2.1 传统快速排序2.2 三者取中法 3. 实验内容3.1 实验题目&#xff08;一&#xff09;输入要求&#xff08;二&#xff09;输出要求 3.2 算法实现 4. 实验结果 1. 引言 快速排序是一种经典的排序算法&#xff0c;其核心思想是通过选择一个基准元…...

避开Fluent计算崩溃:用这3种网格划分策略彻底解决floating error问题

避开Fluent计算崩溃&#xff1a;3种网格划分策略彻底解决floating error问题 在CFD仿真工程师的日常工作中&#xff0c;没有什么比看到"floating point error"这个报错更令人沮丧的了。这个看似简单的错误提示背后&#xff0c;往往隐藏着复杂的数值计算问题。根据我们…...

嵌入式系统动态控制模型架构与实现解析

1. 嵌入式系统动态控制模型的核心架构解析在物联网和智能设备爆发的时代&#xff0c;嵌入式系统正面临前所未有的灵活性和可扩展性挑战。传统嵌入式系统的控制策略往往在设备出厂时就被固化&#xff0c;任何策略调整都需要重新烧录固件或更换硬件。这种刚性架构已经无法满足现代…...

告别FPS采样慢!用RandLA-Net的随机采样高效处理大规模点云(附S3DIS数据集实战)

突破大规模点云处理瓶颈&#xff1a;RandLA-Net随机采样技术深度解析与实战 点云数据处理在自动驾驶、三维重建和机器人导航等领域扮演着关键角色&#xff0c;但传统方法如FPS&#xff08;最远点采样&#xff09;在面对百万级点云时往往力不从心。我曾在一个城市级三维建模项目…...

从QNX到Android Auto:车载Camera软件栈全解析,高通8155平台上的IFE、BPS、IPE都干了啥?

车载摄像头技术栈深度解析&#xff1a;从传感器到多屏协同的完整链路 在智能座舱系统中&#xff0c;摄像头已从简单的倒车影像工具演变为支撑DMS&#xff08;驾驶员监控&#xff09;、OMS&#xff08;乘员监控&#xff09;、AVM&#xff08;全景环视&#xff09;等高级功能的核…...

告别Windows!在Ubuntu 22.04上搞定NI-VISA驱动,用C++控制你的USB示波器

告别Windows&#xff01;在Ubuntu 22.04上搞定NI-VISA驱动&#xff0c;用C控制你的USB示波器 当实验室的示波器突然无法连接Windows电脑时&#xff0c;我意识到是时候拥抱Linux了。作为电子工程师&#xff0c;我们常常被Windows平台的即插即用惯坏了&#xff0c;但当你需要在科…...

Qwen2.5-VL-7B-Instruct在智能导航系统中的应用:牢记回家路

Qwen2.5-VL-7B-Instruct在智能导航系统中的应用&#xff1a;牢记回家路 想象一下&#xff0c;当你开车回家时&#xff0c;导航系统不仅能告诉你该走哪条路&#xff0c;还能认出你常去的超市、记得你喜欢的咖啡店&#xff0c;甚至提醒你&#xff1a;"今天常去的那家花店有新…...

3个思维转变:让Fiji图像处理软件启动速度提升500%的颠覆性方法

3个思维转变&#xff1a;让Fiji图像处理软件启动速度提升500%的颠覆性方法 【免费下载链接】fiji A "batteries-included" distribution of ImageJ :battery: 项目地址: https://gitcode.com/gh_mirrors/fi/fiji 你是否曾经在等待Fiji启动时&#xff0c;看着缓…...

Windows Cleaner终极指南:5分钟解决C盘爆红问题,快速释放空间提升电脑性能

Windows Cleaner终极指南&#xff1a;5分钟解决C盘爆红问题&#xff0c;快速释放空间提升电脑性能 【免费下载链接】WindowsCleaner Windows Cleaner——专治C盘爆红及各种不服&#xff01; 项目地址: https://gitcode.com/gh_mirrors/wi/WindowsCleaner Windows Cleane…...

Windows Cleaner深度解析:彻底解决C盘爆红问题的开源利器

Windows Cleaner深度解析&#xff1a;彻底解决C盘爆红问题的开源利器 【免费下载链接】WindowsCleaner Windows Cleaner——专治C盘爆红及各种不服&#xff01; 项目地址: https://gitcode.com/gh_mirrors/wi/WindowsCleaner 你是否曾经遇到过这样的窘境&#xff1f;正在…...

番茄小说下载器:终极离线阅读解决方案,随时随地畅享小说世界

番茄小说下载器&#xff1a;终极离线阅读解决方案&#xff0c;随时随地畅享小说世界 【免费下载链接】Tomato-Novel-Downloader 番茄小说下载器不精简版 项目地址: https://gitcode.com/gh_mirrors/to/Tomato-Novel-Downloader 你是否曾在地铁通勤时网络突然中断&#x…...