【Flink快速入门-5.流处理之多流转换算子】
流处理之多流转换算子
实验介绍
前面实验中介绍的算子已经能够满足我们的大部分开发需求了,但是在实际工作中有时候还会遇到一些业务场景,例如需要摄入多个输入流并将其合并处理,或者需要将一条输入流分割为多条子流,在不同的子流中处理不同的业务逻辑。所以本节实验的内容我们将学习 DataSteam API 中的可以将多条输入流合并为一个输入流,或者将一个输入流分割为多个子流的算子,我们将其统称为“多流转换算子”。
知识点
- Union
- filter
算子演示
Union
union 顾名思义就是连接的意思,所以 union 算子的作用就是合并两条或者多条相同类型的 DataStream,生成一个新的类型相同的 DataStream。如图所示:

需要注意的是,事件合流的方式为 FIFO 方式。操作符并不会产生一个特定顺序的事件流。union 操作符也不会进行去重。每一个输入事件都被发送到了下一个操作符。
假设某公司分别在淘宝和天猫都开设了自己的直营店,公司高层需要实时监控到两个店铺的交易数据,并希望通过大屏展示的方式实时滚动。我们可以通过两条 Socket 输入流来模拟这样的场景。
首先在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个 UnionOperator 的 Scala object,输入如下代码:
package com.vlab.operatorimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object UnionOperator {def main(args: Array[String]): Unit = {// 创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 接收京东订单val jdOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9999)// 接收拼刀刀订单val pindaoOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9998)// 将两条输入流合并为一条输入流val unionStream:DataStream[String] = jdOrder.union(pindaoOrder)// 设置并行度unionStream.print().setParallelism(1)// 执行env.execute("UnionOperator")}}
我们使用 netcat 监控两个端口来模拟发送淘宝和天猫的订单信息,然后使用 Flink 接收。打开终端窗口,执行 nc -l -p 9998 命令,紧接着打开另一个终端窗口,执行 nc -l -p 9999 命令。这样的话我们监控了 9998 和 9999 两个端口,接下来在 Flink 中进行接收。
运行刚刚的代码,然后在前面打开的两个终端中交替发送订单数据,观察 idea 控制台输出。

filter
使用 filter 来根据体温的阈值将流拆分为两个子流:一个是正常体温流,另一个是发烧体温流。然后我们可以对每个子流进行不同的业务逻辑处理。
疫情期间,全国各地的超市、医院、机场等公共场所入口都有温度监控设备,当该设备检测到某个人体温异常之后就会报警。假设鉴别正常体温和发烧体温的阈值为 36.0 摄氏度,也就是说,只要体温大于等于 36.0 摄氏度我们就认为其为发烧状态。我们使用 检测体温是否异常,我们可以使用 filter 来将流分为两条子流,一个代表 正常体温,另一个代表 发烧体温,然后可以对这些流进行不同的业务逻辑处理。
在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个名为 SelectOperator 的 Scala object,代码如下:
package com.shiyanlou.operatorimport org.apache.flink.streaming.api.scala._object SelectOperator {def main(args: Array[String]): Unit = {// 设置流环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 读取socket文本数据流val inputDS: DataStream[String] = env.socketTextStream("192.168.137.81", 9999)val peopleStream = inputDS.map(line => {val arr = line.split(" ")People(arr(0), arr(1).toFloat)})// 使用 keyBy 按照温度类型(high 或 normal)进行分组val highTempStream = peopleStream.filter(_.temperature > 36.5)val normalTempStream = peopleStream.filter(_.temperature <= 36.5)// 打印输出highTempStream.print("发烧")normalTempStream.print("体温正常")env.execute("SelectOperator")}case class People(name: String, temperature: Float)
}
上面的代码中,我们创建了一个 Socket 输入流监控localhost下的 9999 端口,然后将输入的文本使用空格分隔之后转换为People类。紧接着使用 Split 算子将体温大于 36.0 的人群定义为fever,将体温小于等于 36.0 的人群定义为normal,最后使用select算子选择了fever(发烧)状态的人群并输出到控制台。
打开终端,执行nc -l -p 9999,在 idea 运行以上代码,并在终端中依次发送下面的信息:
张小明 35.6
李鹏程 36.3
赵露 36.7
李阳 35.5
刘明 37.0
在 idea 的控制台会看到将体温高于 36.5 的做了打印(赵露、刘明)。

实验总结
本节实验中我们介绍了 Flink 中的多流转换算子,其中 Union 是将两个或者多个类型相同的输入流转换成一个输入流,而filter是将一个输入流根据给定的条件切分成多个子输入流。这部分内容在工作中会经常用到,大家一定要理解。
相关文章:
【Flink快速入门-5.流处理之多流转换算子】
流处理之多流转换算子 实验介绍 前面实验中介绍的算子已经能够满足我们的大部分开发需求了,但是在实际工作中有时候还会遇到一些业务场景,例如需要摄入多个输入流并将其合并处理,或者需要将一条输入流分割为多条子流,在不同的子…...
react传递函数与回调函数原理
为什么 React 允许直接传递函数? 回调函数核心逻辑 例子:父组件控制 Modal 的显示与隐藏 // 父组件 (ParentComponent.tsx) import React, { useState } from react; import { Modal, Button } from antd; import ModalContent from ./ModalContent;co…...
华为云kubernetes基于keda自动伸缩deployment副本(监听redis队列长度)
1 概述 KEDA(Kubernetes-based Event-Driven Autoscaler,网址是https://keda.sh)是在 Kubernetes 中事件驱动的弹性伸缩器,功能非常强大。不仅支持根据基础的CPU和内存指标进行伸缩,还支持根据各种消息队列中的长度、…...
Spring源码分析のBean扫描流程
文章目录 前言一、scanCandidateComponents1.1 isCandidateComponent1.1.1、排除/包含过滤器1.1.2、条件装配1.1.3、重载一1.1.4、重载二1.1.5、补充:Lookup注解 总结 前言 原生的Spring在构造ApplicationContext时,会调用refresh方法。其中就包含了扫描…...
Ubuntu安装docker:docker-desktop : 依赖: docker-ce-cli 但无法安装它、无法定位软件包 docker-ce-cli
具体错误 sudo apt-get install ./docker-desktop-amd64.deb [sudo] password for weiyu: 正在读取软件包列表... 完成 正在分析软件包的依赖关系树... 完成 正在读取状态信息... 完成 注意,选中 docker-desktop 而非 ./docker-desktop-amd64.de…...
基于大数据的奥运会获奖数据分析系统设计与实现
【大数据】基于大数据的奥运会获奖数据分析系统设计与实现(完整系统源码开发笔记详细部署教程)✅ 目录 一、项目简介二、项目界面展示三、项目视频展示 一、项目简介 该系统通过集成先进的数据抓取、处理、存储与可视化技术,为深入理解奥运会…...
数据结构 堆和priority_queue
一、堆的定义 堆(heap),是⼀棵有着特殊性质的完全⼆叉树,可以⽤来实现优先级队列(priorityqueue)。 堆需要满⾜以下性质: 1. 是⼀棵完全⼆叉树; 2. 对于树中每个结点,如…...
Dockerfile 编写推荐
一、导读 本文主要介绍在编写 docker 镜像的时候一些需要注意的事项和推荐的做法。 虽然 Dockerfile 简化了镜像构建的过程,并且把这个过程可以进行版本控制,但是不正当的 Dockerfile 使用也会导致很多问题。 docker 镜像太大。如果你经常使用镜像或者…...
【抽象代数】1.2. 半群与群
群的定义 群非空集合二元运算性质 定义1. 设 为一个非空集合,上有二元运算,满足结合律,则称或为一个半群。 定义2. 设 为半群,若元素 满足 ,则称 为 的左幺元(右幺元:)&#…...
Django中实现简单易用的分页工具
如何在Django中实现简单易用的分页工具?📚 嗨,小伙伴们!今天我们来看看如何在 Django 中实现一个超简单的分页工具。无论你是在处理博客文章、产品列表,还是用户评论,当数据量一大时,分页显得尤…...
「软件设计模式」装饰者模式(Decorator)
深入解析装饰者模式:动态扩展功能的艺术(C实现) 一、模式思想与应用场景 1.1 模式定义 装饰者模式(Decorator Pattern)是一种结构型设计模式,它通过将对象放入包含行为的特殊封装对象中,动态地…...
CI/CD(二)docker-compose安装Jenkins
1、docker-compose.yml version: 3.8services:jenkins:image: jenkins/jenkins:lts # 使用官方的 Jenkins LTS 镜像container_name: jenkinsuser: root # 如果需要以 root 用户运行ports:- "8080:8080" # Jenkins Web 界面端口- "50000:50000" # 用于 Jen…...
OpenCV机器学习(1)人工神经网络 - 多层感知器类cv::ml::ANN_MLP
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 cv::ml::ANN_MLP 是 OpenCV 库中的一部分,用于实现人工神经网络 - 多层感知器(Artificial Neural Network - Multi-Layer…...
ProxySQL构建PolarDB-X标准版高可用路由服务三节点集群
ProxySQL构建PolarDB-X标准版高可用路由服务三节点集群 一、PolarDB-X标准版主备集群搭建 三台机器上传 polardbx 包,包可以从官网https://openpolardb.com/download获取,这里提供离线rpm。 1、上传 polardbx 安装包 到 /opt目录下 rpm -ivh t-pol…...
15.1 Process(进程)类
版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 通常开发时想要获得进程是比较困难的事,必须要调用CreateToolhelpSnapshot、ProcessFirst、ProcessNext等API或者诸如 Zw…...
elasticsearch8 linux版以服务的方式启动
1.创建系统服务文件 对于使用 systemd 作为系统初始化系统的 Linux 发行版(如 CentOS 7 及以上、Ubuntu 16.04 及以上),需要创建一个 systemd 服务文件。以 root 用户或具有 sudo 权限的用户身份执行以下操作: sudo vim /etc/sy…...
小米 R3G 路由器刷机教程(Pandavan)
小米 R3G 路由器刷机教程(Pandavan) 一、前言 小米 R3G 路由器以其高性价比和稳定的性能备受用户青睐。然而,原厂固件的功能相对有限,难以满足高级用户的个性化需求。刷机不仅可以解锁路由器的潜能,还能通过第三方固…...
某大型业务系统技术栈介绍【应对面试】
微服务架构【图】 微服务架构【概念】 微服务架构,是一种架构模式,它提倡将单一应用程序划分成一组小的服务,服务之间互相协调、互相配合,为用户提供最终价值。在微服务架构中,服务与服务之间通信时,通常是…...
【区块链】零知识证明基础概念详解
🌈个人主页: 鑫宝Code 🔥热门专栏: 闲话杂谈| 炫酷HTML | JavaScript基础 💫个人格言: "如无必要,勿增实体" 文章目录 零知识证明基础概念详解引言1. 零知识证明的定义与特性1.1 基本定义1.2 三个核心…...
建筑行业安全技能竞赛流程方案
一、比赛时间: 6月23日8:30分准时到场;9:00-10:00理论考试;10:10-12:00现场隐患答疑;12:00-13:30午餐;下午13:30-15:30现场…...
【Axure高保真原型】引导弹窗
今天和大家中分享引导弹窗的原型模板,载入页面后,会显示引导弹窗,适用于引导用户使用页面,点击完成后,会显示下一个引导弹窗,直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...
【杂谈】-递归进化:人工智能的自我改进与监管挑战
递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
2.Vue编写一个app
1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...
【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表
1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...
【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看
文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...
Golang——7、包与接口详解
包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...
论文阅读:LLM4Drive: A Survey of Large Language Models for Autonomous Driving
地址:LLM4Drive: A Survey of Large Language Models for Autonomous Driving 摘要翻译 自动驾驶技术作为推动交通和城市出行变革的催化剂,正从基于规则的系统向数据驱动策略转变。传统的模块化系统受限于级联模块间的累积误差和缺乏灵活性的预设规则。…...
数学建模-滑翔伞伞翼面积的设计,运动状态计算和优化 !
我们考虑滑翔伞的伞翼面积设计问题以及运动状态描述。滑翔伞的性能主要取决于伞翼面积、气动特性以及飞行员的重量。我们的目标是建立数学模型来描述滑翔伞的运动状态,并优化伞翼面积的设计。 一、问题分析 滑翔伞在飞行过程中受到重力、升力和阻力的作用。升力和阻力与伞翼面…...
数据分析六部曲?
引言 上一章我们说到了数据分析六部曲,何谓六部曲呢? 其实啊,数据分析没那么难,只要掌握了下面这六个步骤,也就是数据分析六部曲,就算你是个啥都不懂的小白,也能慢慢上手做数据分析啦。 第一…...
