【Flink快速入门-5.流处理之多流转换算子】
流处理之多流转换算子
实验介绍
前面实验中介绍的算子已经能够满足我们的大部分开发需求了,但是在实际工作中有时候还会遇到一些业务场景,例如需要摄入多个输入流并将其合并处理,或者需要将一条输入流分割为多条子流,在不同的子流中处理不同的业务逻辑。所以本节实验的内容我们将学习 DataSteam API 中的可以将多条输入流合并为一个输入流,或者将一个输入流分割为多个子流的算子,我们将其统称为“多流转换算子”。
知识点
- Union
- filter
算子演示
Union
union 顾名思义就是连接的意思,所以 union 算子的作用就是合并两条或者多条相同类型的 DataStream,生成一个新的类型相同的 DataStream。如图所示:

需要注意的是,事件合流的方式为 FIFO 方式。操作符并不会产生一个特定顺序的事件流。union 操作符也不会进行去重。每一个输入事件都被发送到了下一个操作符。
假设某公司分别在淘宝和天猫都开设了自己的直营店,公司高层需要实时监控到两个店铺的交易数据,并希望通过大屏展示的方式实时滚动。我们可以通过两条 Socket 输入流来模拟这样的场景。
首先在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个 UnionOperator 的 Scala object,输入如下代码:
package com.vlab.operatorimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object UnionOperator {def main(args: Array[String]): Unit = {// 创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 接收京东订单val jdOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9999)// 接收拼刀刀订单val pindaoOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9998)// 将两条输入流合并为一条输入流val unionStream:DataStream[String] = jdOrder.union(pindaoOrder)// 设置并行度unionStream.print().setParallelism(1)// 执行env.execute("UnionOperator")}}
我们使用 netcat 监控两个端口来模拟发送淘宝和天猫的订单信息,然后使用 Flink 接收。打开终端窗口,执行 nc -l -p 9998 命令,紧接着打开另一个终端窗口,执行 nc -l -p 9999 命令。这样的话我们监控了 9998 和 9999 两个端口,接下来在 Flink 中进行接收。
运行刚刚的代码,然后在前面打开的两个终端中交替发送订单数据,观察 idea 控制台输出。

filter
使用 filter 来根据体温的阈值将流拆分为两个子流:一个是正常体温流,另一个是发烧体温流。然后我们可以对每个子流进行不同的业务逻辑处理。
疫情期间,全国各地的超市、医院、机场等公共场所入口都有温度监控设备,当该设备检测到某个人体温异常之后就会报警。假设鉴别正常体温和发烧体温的阈值为 36.0 摄氏度,也就是说,只要体温大于等于 36.0 摄氏度我们就认为其为发烧状态。我们使用 检测体温是否异常,我们可以使用 filter 来将流分为两条子流,一个代表 正常体温,另一个代表 发烧体温,然后可以对这些流进行不同的业务逻辑处理。
在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个名为 SelectOperator 的 Scala object,代码如下:
package com.shiyanlou.operatorimport org.apache.flink.streaming.api.scala._object SelectOperator {def main(args: Array[String]): Unit = {// 设置流环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 读取socket文本数据流val inputDS: DataStream[String] = env.socketTextStream("192.168.137.81", 9999)val peopleStream = inputDS.map(line => {val arr = line.split(" ")People(arr(0), arr(1).toFloat)})// 使用 keyBy 按照温度类型(high 或 normal)进行分组val highTempStream = peopleStream.filter(_.temperature > 36.5)val normalTempStream = peopleStream.filter(_.temperature <= 36.5)// 打印输出highTempStream.print("发烧")normalTempStream.print("体温正常")env.execute("SelectOperator")}case class People(name: String, temperature: Float)
}
上面的代码中,我们创建了一个 Socket 输入流监控localhost下的 9999 端口,然后将输入的文本使用空格分隔之后转换为People类。紧接着使用 Split 算子将体温大于 36.0 的人群定义为fever,将体温小于等于 36.0 的人群定义为normal,最后使用select算子选择了fever(发烧)状态的人群并输出到控制台。
打开终端,执行nc -l -p 9999,在 idea 运行以上代码,并在终端中依次发送下面的信息:
张小明 35.6
李鹏程 36.3
赵露 36.7
李阳 35.5
刘明 37.0
在 idea 的控制台会看到将体温高于 36.5 的做了打印(赵露、刘明)。

实验总结
本节实验中我们介绍了 Flink 中的多流转换算子,其中 Union 是将两个或者多个类型相同的输入流转换成一个输入流,而filter是将一个输入流根据给定的条件切分成多个子输入流。这部分内容在工作中会经常用到,大家一定要理解。
相关文章:
【Flink快速入门-5.流处理之多流转换算子】
流处理之多流转换算子 实验介绍 前面实验中介绍的算子已经能够满足我们的大部分开发需求了,但是在实际工作中有时候还会遇到一些业务场景,例如需要摄入多个输入流并将其合并处理,或者需要将一条输入流分割为多条子流,在不同的子…...
react传递函数与回调函数原理
为什么 React 允许直接传递函数? 回调函数核心逻辑 例子:父组件控制 Modal 的显示与隐藏 // 父组件 (ParentComponent.tsx) import React, { useState } from react; import { Modal, Button } from antd; import ModalContent from ./ModalContent;co…...
华为云kubernetes基于keda自动伸缩deployment副本(监听redis队列长度)
1 概述 KEDA(Kubernetes-based Event-Driven Autoscaler,网址是https://keda.sh)是在 Kubernetes 中事件驱动的弹性伸缩器,功能非常强大。不仅支持根据基础的CPU和内存指标进行伸缩,还支持根据各种消息队列中的长度、…...
Spring源码分析のBean扫描流程
文章目录 前言一、scanCandidateComponents1.1 isCandidateComponent1.1.1、排除/包含过滤器1.1.2、条件装配1.1.3、重载一1.1.4、重载二1.1.5、补充:Lookup注解 总结 前言 原生的Spring在构造ApplicationContext时,会调用refresh方法。其中就包含了扫描…...
Ubuntu安装docker:docker-desktop : 依赖: docker-ce-cli 但无法安装它、无法定位软件包 docker-ce-cli
具体错误 sudo apt-get install ./docker-desktop-amd64.deb [sudo] password for weiyu: 正在读取软件包列表... 完成 正在分析软件包的依赖关系树... 完成 正在读取状态信息... 完成 注意,选中 docker-desktop 而非 ./docker-desktop-amd64.de…...
基于大数据的奥运会获奖数据分析系统设计与实现
【大数据】基于大数据的奥运会获奖数据分析系统设计与实现(完整系统源码开发笔记详细部署教程)✅ 目录 一、项目简介二、项目界面展示三、项目视频展示 一、项目简介 该系统通过集成先进的数据抓取、处理、存储与可视化技术,为深入理解奥运会…...
数据结构 堆和priority_queue
一、堆的定义 堆(heap),是⼀棵有着特殊性质的完全⼆叉树,可以⽤来实现优先级队列(priorityqueue)。 堆需要满⾜以下性质: 1. 是⼀棵完全⼆叉树; 2. 对于树中每个结点,如…...
Dockerfile 编写推荐
一、导读 本文主要介绍在编写 docker 镜像的时候一些需要注意的事项和推荐的做法。 虽然 Dockerfile 简化了镜像构建的过程,并且把这个过程可以进行版本控制,但是不正当的 Dockerfile 使用也会导致很多问题。 docker 镜像太大。如果你经常使用镜像或者…...
【抽象代数】1.2. 半群与群
群的定义 群非空集合二元运算性质 定义1. 设 为一个非空集合,上有二元运算,满足结合律,则称或为一个半群。 定义2. 设 为半群,若元素 满足 ,则称 为 的左幺元(右幺元:)&#…...
Django中实现简单易用的分页工具
如何在Django中实现简单易用的分页工具?📚 嗨,小伙伴们!今天我们来看看如何在 Django 中实现一个超简单的分页工具。无论你是在处理博客文章、产品列表,还是用户评论,当数据量一大时,分页显得尤…...
「软件设计模式」装饰者模式(Decorator)
深入解析装饰者模式:动态扩展功能的艺术(C实现) 一、模式思想与应用场景 1.1 模式定义 装饰者模式(Decorator Pattern)是一种结构型设计模式,它通过将对象放入包含行为的特殊封装对象中,动态地…...
CI/CD(二)docker-compose安装Jenkins
1、docker-compose.yml version: 3.8services:jenkins:image: jenkins/jenkins:lts # 使用官方的 Jenkins LTS 镜像container_name: jenkinsuser: root # 如果需要以 root 用户运行ports:- "8080:8080" # Jenkins Web 界面端口- "50000:50000" # 用于 Jen…...
OpenCV机器学习(1)人工神经网络 - 多层感知器类cv::ml::ANN_MLP
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 cv::ml::ANN_MLP 是 OpenCV 库中的一部分,用于实现人工神经网络 - 多层感知器(Artificial Neural Network - Multi-Layer…...
ProxySQL构建PolarDB-X标准版高可用路由服务三节点集群
ProxySQL构建PolarDB-X标准版高可用路由服务三节点集群 一、PolarDB-X标准版主备集群搭建 三台机器上传 polardbx 包,包可以从官网https://openpolardb.com/download获取,这里提供离线rpm。 1、上传 polardbx 安装包 到 /opt目录下 rpm -ivh t-pol…...
15.1 Process(进程)类
版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 通常开发时想要获得进程是比较困难的事,必须要调用CreateToolhelpSnapshot、ProcessFirst、ProcessNext等API或者诸如 Zw…...
elasticsearch8 linux版以服务的方式启动
1.创建系统服务文件 对于使用 systemd 作为系统初始化系统的 Linux 发行版(如 CentOS 7 及以上、Ubuntu 16.04 及以上),需要创建一个 systemd 服务文件。以 root 用户或具有 sudo 权限的用户身份执行以下操作: sudo vim /etc/sy…...
小米 R3G 路由器刷机教程(Pandavan)
小米 R3G 路由器刷机教程(Pandavan) 一、前言 小米 R3G 路由器以其高性价比和稳定的性能备受用户青睐。然而,原厂固件的功能相对有限,难以满足高级用户的个性化需求。刷机不仅可以解锁路由器的潜能,还能通过第三方固…...
某大型业务系统技术栈介绍【应对面试】
微服务架构【图】 微服务架构【概念】 微服务架构,是一种架构模式,它提倡将单一应用程序划分成一组小的服务,服务之间互相协调、互相配合,为用户提供最终价值。在微服务架构中,服务与服务之间通信时,通常是…...
【区块链】零知识证明基础概念详解
🌈个人主页: 鑫宝Code 🔥热门专栏: 闲话杂谈| 炫酷HTML | JavaScript基础 💫个人格言: "如无必要,勿增实体" 文章目录 零知识证明基础概念详解引言1. 零知识证明的定义与特性1.1 基本定义1.2 三个核心…...
建筑行业安全技能竞赛流程方案
一、比赛时间: 6月23日8:30分准时到场;9:00-10:00理论考试;10:10-12:00现场隐患答疑;12:00-13:30午餐;下午13:30-15:30现场…...
【C语言练习】080. 使用C语言实现简单的数据库操作
080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...
自然语言处理——Transformer
自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效,它能挖掘数据中的时序信息以及语义信息,但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN,但是…...
关于 WASM:1. WASM 基础原理
一、WASM 简介 1.1 WebAssembly 是什么? WebAssembly(WASM) 是一种能在现代浏览器中高效运行的二进制指令格式,它不是传统的编程语言,而是一种 低级字节码格式,可由高级语言(如 C、C、Rust&am…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
html-<abbr> 缩写或首字母缩略词
定义与作用 <abbr> 标签用于表示缩写或首字母缩略词,它可以帮助用户更好地理解缩写的含义,尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时,会显示一个提示框。 示例&#x…...
ABAP设计模式之---“简单设计原则(Simple Design)”
“Simple Design”(简单设计)是软件开发中的一个重要理念,倡导以最简单的方式实现软件功能,以确保代码清晰易懂、易维护,并在项目需求变化时能够快速适应。 其核心目标是避免复杂和过度设计,遵循“让事情保…...
Android第十三次面试总结(四大 组件基础)
Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成,用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机: onCreate() 调用时机:Activity 首次创建时调用。…...
React---day11
14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store: 我们在使用异步的时候理应是要使用中间件的,但是configureStore 已经自动集成了 redux-thunk,注意action里面要返回函数 import { configureS…...
「全栈技术解析」推客小程序系统开发:从架构设计到裂变增长的完整解决方案
在移动互联网营销竞争白热化的当下,推客小程序系统凭借其裂变传播、精准营销等特性,成为企业抢占市场的利器。本文将深度解析推客小程序系统开发的核心技术与实现路径,助力开发者打造具有市场竞争力的营销工具。 一、系统核心功能架构&…...
【SpringBoot自动化部署】
SpringBoot自动化部署方法 使用Jenkins进行持续集成与部署 Jenkins是最常用的自动化部署工具之一,能够实现代码拉取、构建、测试和部署的全流程自动化。 配置Jenkins任务时,需要添加Git仓库地址和凭证,设置构建触发器(如GitHub…...
