Flink Flink中的分流
一、什么是分流
所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

二、基于filter算子的简单实现分流
其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。
package com.flink.DataStream.SplitStream;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkSplitStreamByFilter {public static void main(String[] args) throws Exception {//TODO 创建Flink上下文执行环境StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration().set(RestOptions.BIND_PORT, "8081"));//.getExecutionEnvironment();//TODO 设置全局并行度为2streamExecutionEnvironment.setParallelism(2);DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);//TODO 先将输入流转为Integer类型SingleOutputStreamOperator<Integer> mapResult = dataStreamSource.map((input) -> {int i = Integer.parseInt(input);return i;});//TODO 使用匿名函数分流偶数流SingleOutputStreamOperator<Integer> ds1 = mapResult.filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer a) throws Exception {return a % 2 == 0;}});//TODO 使用lamda表达式分流奇数流SingleOutputStreamOperator<Integer> ds2 = mapResult.filter((a) -> a % 2 == 1);ds1.print("偶数流");ds2.print("奇数流");streamExecutionEnvironment.execute();}
}
执行结果
奇数流:1> 1
偶数流:2> 2
偶数流:1> 2
偶数流:2> 4
奇数流:1> 3
奇数流:2> 1Process finished with exit code 130 (interrupted by signal 2: SIGINT)
这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?
三、使用测输出流
关于处理函数中侧输出流的用法,我们已经在 7.5 节做了详细介绍。简单来说,只需要调用上下文 ctx 的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的 id 和类型。
相关文章:
Flink Flink中的分流
一、什么是分流 所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。 二、基于filter算子的简单实现分流 其实根据条件筛选数据的需求…...
传输层协议[精选]
网络: 跨主机通信. 互联网通信: 两点之间的通信路径有无数条. 集线器: 把一根网线差出来两根,但是同一时刻只能有一根线跑.交换机: 组建局域网.路由器: 本质就是将两个局域网连接起来 交换机和路由器之间的区别越来越模糊. 调制解调器: 使用电话线上网的时候,需要将电话线的模…...
LeetCode算法题解|474. 一和零
474. 一和零 题目链接:474. 一和零 题目描述 给你一个二进制字符串数组 strs 和两个整数 m 和 n 。 请你找出并返回 strs 的最大子集的长度,该子集中 最多 有 m 个 0 和 n 个 1 。 如果 x 的所有元素也是 y 的元素,集合 x 是集合 y 的 子…...
一种太阳能风能市电互补路灯方案介绍
太阳能市电互补路灯是一种环保、节能的照明设施,它利用太阳能进行发电并实现照明。这种路灯在白天吸收阳光并将其转化为电能,到了晚上则利用储存的电能为LED灯提供电力,实现照明功能。下面叁仟智慧将详细介绍太阳能市电互补路灯灯的工作原理和…...
世微 dc-dc降压恒流 LED汽车大灯 单灯 14V5A 68W车灯驱动方案 AP5191
产品描述 AP5191是一款PWM工作模式,高效率、外围简单、外置功率MOS管,适用于4.5-150V输入的高精度降压LED恒流驱动芯片。输出最大功率150W,最大电流6A。AP5191可实现线性调光和PWM调光,线性调光脚有效电压范围0.55-2.6V.AP5191 工作频率可以…...
基于时隙的多重冗余流指纹模型
文章信息 论文题目:基于时隙的多重冗余流指纹模型 期刊(会议):网络与信息安全学报 时间:2023 级别:CCF C 概述 为确保内生网络流量安全可信,本文在研究流水印及其扩展的流指纹机制的基础上&a…...
Visual Studio 2019 C# System.BadImageFormatException 解决方法
文章目录 1.DLL文件缺失或不匹配原因解决方法 2.系统环境变量Path下内容过多原因解决方法 3.位数错误原因解决方法 分析几种可能因素 1.DLL文件缺失或不匹配 原因 检查对应Debug路径下的DLL文件是否有缺失 解决方法 将对应的DLL文件放到Debug文件夹里面,检查冗余…...
深度学习之基于YoloV5车辆和行人目标检测系统
欢迎大家点赞、收藏、关注、评论啦 ,由于篇幅有限,只展示了部分核心代码。 文章目录 一项目简介YOLOv5 简介YOLOv5 特点 车辆和行人目标检测系统 二、功能三、系统四. 总结 一项目简介 # 深度学习之基于 YOLOv5 车辆和行人目标检测系统介绍 深度学习在…...
Django框架之中间件
目录 一、引入 二、Django中间件介绍 【1】什么是Django中间件 【2】Django中间件的作用 【3】示例 三、Django请求生命周期流程图 四、Django中间件是Django的门户 五、Django中间件详解 六、中间件必须要掌握的两个方法 (1) process_request (2) process_respon…...
BTC 复兴:Ordinals 带来创新活力,BitVM 与 BitStream 相继问世
除了备受瞩目的 ETF,今年 Bitcoin 生态迎来全新的发展活力和机遇。Ordinals 协议的横空出世,以此为基础诞生的 BRC20 协议给整个比特币生态带去了一波新的能量,迎来铭文热度高涨。而诸如 BitVM、BitStream 等新技术甫一问世,便引发…...
STM32 CAN协议讲解以及代码
STM32 CAN 文章目录 STM32 CAN前言一、CAN外设1.主控制寄存器CAN_MCR2.位时序寄存器CAN_BTR3.CAN的发送邮箱4.CAN的接收FIFO5.验收筛选器 二、代码配置1.初始化2.发送数据3.接收数据4.main.c 前言 前面学习了CAN的一些理论知识,他在我们的STM32里面是怎么用的呢 前…...
京东数据分析(京东大数据):2023年10月京东手机行业品牌销售排行榜
鲸参谋监测的京东平台10月份手机市场销售数据已出炉! 根据鲸参谋平台的数据显示,今年10月份,京东平台手机行业的销量约340万,环比增长约11%,同比则下滑约2%;销售额为108亿,环比增长约17%&#x…...
计算机毕业设计 基于Hadoop的物品租赁系统的设计与实现 Java实战项目 附源码+文档+视频讲解
博主介绍:✌从事软件开发10年之余,专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ 🍅文末获取源码联系🍅 👇🏻 精…...
pop链反序列化 [MRCTF2020]Ezpop1
打开题目 网站源码为 Welcome to index.php <?php //flag is in flag.php //WTF IS THIS? //Learn From https://ctf.ieki.xyz/library/php.html#%E5%8F%8D%E5%BA%8F%E5%88%97%E5%8C%96%E9%AD%94%E6%9C%AF%E6%96%B9%E6%B3%95 //And Crack It! class Modifier {protected …...
yolov5从英伟达平台移植到华为昇腾开发板上的思路
作者:朱金灿 来源:clever101的专栏 为什么大多数人学不会人工智能编程?>>> 最近需要将yolov5代码从英伟达平台移植到华为昇腾开发板上。搜了一些代码和资料,大致明白了二者的差别。 1.二者使用的模型文件不一样 yolov…...
网络运维与网络安全 学习笔记2023.11.25
网络运维与网络安全 学习笔记 第二十六天 今日目标 ACL原理与类型、基本ACL配置、高级ACL配置 高级ACL之ICMP、高级ACL之telnet ACL原理与类型 项目背景 为了企业的业务安全,要求不同部门对服务器有不同的权限 PC1不能访问Server PC2允许访问Server 允许其他所…...
Trustzone/TEE/安全 面试100问
关键词:cache学习、mmu学习、cache资料、mmu资料、arm资料、armv8资料、armv9资料、 trustzone视频、tee视频、ATF视频、secureboot视频、安全启动视频、selinux视频,cache视频、mmu视频,armv8视频、armv9视频、FF-A视频、密码学视频、RME/CCA视频、学习资料下载、免费学习资…...
【数据结构】D : 图的顶点可达闭包
D : 图的顶点可达闭包 Description 给定有向图的邻接矩阵A,其元素定义为:若存在顶点i到顶点j的有向边则A[i,j]1,若没有有向边则A[i,j] 0。试求A的可达闭包矩阵A*,其元素定义为:若存在顶点i到顶点j的有向路径则A*[i,j…...
链表?细!详细知识点总结!
链表 定义:链表是一种递归的数据结构,它或者为空(null),或者是指向一个结点(node)的引用,该结点含有一个泛型的元素和一个指向另一条链表的引用。 其实链表就是有序的列表,它在内…...
【数据结构实验】排序(三)快速排序算法的改进(三者取中法)
文章目录 1. 引言2. 快速排序算法2.1 传统快速排序2.2 三者取中法 3. 实验内容3.1 实验题目(一)输入要求(二)输出要求 3.2 算法实现 4. 实验结果 1. 引言 快速排序是一种经典的排序算法,其核心思想是通过选择一个基准元…...
从无人机飞控到机械臂:工程师如何用四元数(Quaternion)彻底告别‘万向死锁’的烦恼
从无人机飞控到机械臂:工程师如何用四元数彻底告别万向死锁 想象一下,你正在调试一架无人机的飞控系统。当飞机俯仰角接近90度时,突然发现滚转和偏航控制开始互相干扰,原本独立的三个轴向操作突然"锁死"成两个——这就是…...
BitTorrent Tracker服务器在亚洲节点的部署优化实践
BitTorrent Tracker服务器在亚洲节点的部署优化实践 【免费下载链接】trackerslist Updated list of public BitTorrent trackers 项目地址: https://gitcode.com/GitHub_Trending/tr/trackerslist ngosang/trackerslist作为开源技术社区中维护的公共BitTorrent Tracker…...
【SITS2026权威前瞻】:全球TOP12AI代码引擎实测对比,3大生产级陷阱你避开了吗?
第一章:SITS2026圆桌:智能代码生成未来 2026奇点智能技术大会(https://ml-summit.org) 在SITS2026圆桌论坛上,来自GitHub、Tabnine、DeepMind与国内大模型实验室的七位核心研发者共同探讨了智能代码生成从“补全助手”迈向“协同编程伙伴”…...
李慕婉-仙逆-造相Z-Turbo AI编程新时代:如何利用大模型提升开发者个人效能
李慕婉-仙逆-造相Z-Turbo AI编程新时代:如何利用大模型提升开发者个人效能 作为一名写了十几年代码的老兵,我经历过从记事本到IDE,再到各种自动化工具的演变。但说实话,最近半年,我工作流里最大的变化,不是…...
DAMOYOLO-S案例分享:古建筑图像中斗拱/飞檐/彩画构件自动识别
DAMOYOLO-S案例分享:古建筑图像中斗拱/飞檐/彩画构件自动识别 1. 引言 如果你是一位古建筑爱好者、文物保护工作者,或者是一名建筑专业的学生,你可能会遇到一个共同的难题:面对一张复杂的古建筑照片,如何快速、准确地…...
Go语言怎么嵌入静态文件_Go语言embed嵌入文件教程【秒懂】
Go 1.16 用 embed 包可将文件编译进二进制,但需满足路径为相对包根的字面量、包与变量声明正确三重约束;embed.FS 要求路径不可拼接、不可跨模块、不支持 ./ 前缀;读取需用 fs.ErrNotExist 判断缺失;HTTP 服务中可直接用 http.Fil…...
RexUniNLU RexPrompt技术解析:显式图式指导器如何缓解零样本任务歧义性
RexUniNLU RexPrompt技术解析:显式图式指导器如何缓解零样本任务歧义性 1. 引言:当AI面对“未知”任务时 想象一下,你拿到一个全新的文本处理任务,比如从一段新闻里找出所有“人物”和“组织机构”,但之前没人告诉过…...
Intv_AI_MK11多模态探索:与Claude模型对比分析与应用选型
Intv_AI_MK11多模态探索:与Claude模型对比分析与应用选型 1. 两大模型概览 Intv_AI_MK11和Claude都是当前备受关注的大模型,但它们在设计理念和技术路线上有着明显差异。Intv_AI_MK11主打多模态能力,能够同时处理文本、图像、音频等多种输入…...
FRCRN降噪在车载语音助手中的应用效果实测
FRCRN降噪在车载语音助手中的应用效果实测 开车时想用语音助手,最怕什么?十有八九是“它听不清”。窗外呼啸的风声、轮胎摩擦地面的噪音、空调出风口的呼呼声,还有偶尔响起的喇叭声,这些背景音交织在一起,常常让车里的…...
python如何对图片或文件的操作
一. base64 与图片的相互转换1. base64 转图片123456789101112131415161718192021import base64from io import BytesIOfrom PIL import Image# base64 编码的图像数据(示例)base64_data "iVBn9DHASKJDjDsdSADSf8lgg"# 将 base64 编码的字符串…...
