当前位置: 首页 > news >正文

【Flink快速入门-5.流处理之多流转换算子】

流处理之多流转换算子

实验介绍

前面实验中介绍的算子已经能够满足我们的大部分开发需求了,但是在实际工作中有时候还会遇到一些业务场景,例如需要摄入多个输入流并将其合并处理,或者需要将一条输入流分割为多条子流,在不同的子流中处理不同的业务逻辑。所以本节实验的内容我们将学习 DataSteam API 中的可以将多条输入流合并为一个输入流,或者将一个输入流分割为多个子流的算子,我们将其统称为“多流转换算子”。

知识点
  • Union
  • filter

算子演示

Union

union 顾名思义就是连接的意思,所以 union 算子的作用就是合并两条或者多条相同类型的 DataStream,生成一个新的类型相同的 DataStream。如图所示:
在这里插入图片描述

需要注意的是,事件合流的方式为 FIFO 方式。操作符并不会产生一个特定顺序的事件流。union 操作符也不会进行去重。每一个输入事件都被发送到了下一个操作符。

假设某公司分别在淘宝和天猫都开设了自己的直营店,公司高层需要实时监控到两个店铺的交易数据,并希望通过大屏展示的方式实时滚动。我们可以通过两条 Socket 输入流来模拟这样的场景。

首先在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个 UnionOperator 的 Scala object,输入如下代码:

package com.vlab.operatorimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}object UnionOperator {def main(args: Array[String]): Unit = {// 创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 接收京东订单val jdOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9999)// 接收拼刀刀订单val pindaoOrder:DataStream[String] = env.socketTextStream("192.168.137.81", 9998)// 将两条输入流合并为一条输入流val unionStream:DataStream[String] = jdOrder.union(pindaoOrder)// 设置并行度unionStream.print().setParallelism(1)// 执行env.execute("UnionOperator")}}

我们使用 netcat 监控两个端口来模拟发送淘宝和天猫的订单信息,然后使用 Flink 接收。打开终端窗口,执行 nc -l -p 9998 命令,紧接着打开另一个终端窗口,执行 nc -l -p 9999 命令。这样的话我们监控了 9998 和 9999 两个端口,接下来在 Flink 中进行接收。

运行刚刚的代码,然后在前面打开的两个终端中交替发送订单数据,观察 idea 控制台输出。
在这里插入图片描述

filter

使用 filter 来根据体温的阈值将流拆分为两个子流:一个是正常体温流,另一个是发烧体温流。然后我们可以对每个子流进行不同的业务逻辑处理。

疫情期间,全国各地的超市、医院、机场等公共场所入口都有温度监控设备,当该设备检测到某个人体温异常之后就会报警。假设鉴别正常体温和发烧体温的阈值为 36.0 摄氏度,也就是说,只要体温大于等于 36.0 摄氏度我们就认为其为发烧状态。我们使用 检测体温是否异常,我们可以使用 filter 来将流分为两条子流,一个代表 正常体温,另一个代表 发烧体温,然后可以对这些流进行不同的业务逻辑处理。

在我们 FlinkLearning 工程的 com.vlab.operator 包下创建一个名为 SelectOperator 的 Scala object,代码如下:

package com.shiyanlou.operatorimport org.apache.flink.streaming.api.scala._object SelectOperator {def main(args: Array[String]): Unit = {// 设置流环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 读取socket文本数据流val inputDS: DataStream[String] = env.socketTextStream("192.168.137.81", 9999)val peopleStream = inputDS.map(line => {val arr = line.split(" ")People(arr(0), arr(1).toFloat)})// 使用 keyBy 按照温度类型(high 或 normal)进行分组val highTempStream = peopleStream.filter(_.temperature > 36.5)val normalTempStream = peopleStream.filter(_.temperature <= 36.5)// 打印输出highTempStream.print("发烧")normalTempStream.print("体温正常")env.execute("SelectOperator")}case class People(name: String, temperature: Float)
}

上面的代码中,我们创建了一个 Socket 输入流监控localhost下的 9999 端口,然后将输入的文本使用空格分隔之后转换为People类。紧接着使用 Split 算子将体温大于 36.0 的人群定义为fever,将体温小于等于 36.0 的人群定义为normal,最后使用select算子选择了fever(发烧)状态的人群并输出到控制台。

打开终端,执行nc -l -p 9999,在 idea 运行以上代码,并在终端中依次发送下面的信息:

张小明 35.6
李鹏程 36.3
赵露 36.7
李阳 35.5
刘明 37.0

在 idea 的控制台会看到将体温高于 36.5 的做了打印(赵露、刘明)。
在这里插入图片描述

实验总结

本节实验中我们介绍了 Flink 中的多流转换算子,其中 Union 是将两个或者多个类型相同的输入流转换成一个输入流,而filter是将一个输入流根据给定的条件切分成多个子输入流。这部分内容在工作中会经常用到,大家一定要理解。

相关文章:

【Flink快速入门-5.流处理之多流转换算子】

流处理之多流转换算子 实验介绍 前面实验中介绍的算子已经能够满足我们的大部分开发需求了&#xff0c;但是在实际工作中有时候还会遇到一些业务场景&#xff0c;例如需要摄入多个输入流并将其合并处理&#xff0c;或者需要将一条输入流分割为多条子流&#xff0c;在不同的子…...

react传递函数与回调函数原理

为什么 React 允许直接传递函数&#xff1f; 回调函数核心逻辑 例子&#xff1a;父组件控制 Modal 的显示与隐藏 // 父组件 (ParentComponent.tsx) import React, { useState } from react; import { Modal, Button } from antd; import ModalContent from ./ModalContent;co…...

华为云kubernetes基于keda自动伸缩deployment副本(监听redis队列长度)

1 概述 KEDA&#xff08;Kubernetes-based Event-Driven Autoscaler&#xff0c;网址是https://keda.sh&#xff09;是在 Kubernetes 中事件驱动的弹性伸缩器&#xff0c;功能非常强大。不仅支持根据基础的CPU和内存指标进行伸缩&#xff0c;还支持根据各种消息队列中的长度、…...

Spring源码分析のBean扫描流程

文章目录 前言一、scanCandidateComponents1.1 isCandidateComponent1.1.1、排除/包含过滤器1.1.2、条件装配1.1.3、重载一1.1.4、重载二1.1.5、补充&#xff1a;Lookup注解 总结 前言 原生的Spring在构造ApplicationContext时&#xff0c;会调用refresh方法。其中就包含了扫描…...

Ubuntu安装docker:docker-desktop : 依赖: docker-ce-cli 但无法安装它、无法定位软件包 docker-ce-cli

具体错误 sudo apt-get install ./docker-desktop-amd64.deb [sudo] password for weiyu: 正在读取软件包列表... 完成 正在分析软件包的依赖关系树... 完成 正在读取状态信息... 完成 注意&#xff0c;选中 docker-desktop 而非 ./docker-desktop-amd64.de…...

基于大数据的奥运会获奖数据分析系统设计与实现

【大数据】基于大数据的奥运会获奖数据分析系统设计与实现&#xff08;完整系统源码开发笔记详细部署教程&#xff09;✅ 目录 一、项目简介二、项目界面展示三、项目视频展示 一、项目简介 该系统通过集成先进的数据抓取、处理、存储与可视化技术&#xff0c;为深入理解奥运会…...

数据结构 堆和priority_queue

一、堆的定义 堆&#xff08;heap&#xff09;&#xff0c;是⼀棵有着特殊性质的完全⼆叉树&#xff0c;可以⽤来实现优先级队列&#xff08;priorityqueue&#xff09;。 堆需要满⾜以下性质&#xff1a; 1. 是⼀棵完全⼆叉树&#xff1b; 2. 对于树中每个结点&#xff0c;如…...

Dockerfile 编写推荐

一、导读 本文主要介绍在编写 docker 镜像的时候一些需要注意的事项和推荐的做法。 虽然 Dockerfile 简化了镜像构建的过程&#xff0c;并且把这个过程可以进行版本控制&#xff0c;但是不正当的 Dockerfile 使用也会导致很多问题。 docker 镜像太大。如果你经常使用镜像或者…...

【抽象代数】1.2. 半群与群

群的定义 群非空集合二元运算性质 定义1. 设 为一个非空集合&#xff0c;上有二元运算&#xff0c;满足结合律&#xff0c;则称或为一个半群。 定义2. 设 为半群&#xff0c;若元素 满足 &#xff0c;则称 为 的左幺元&#xff08;右幺元&#xff1a;&#xff09;&#…...

Django中实现简单易用的分页工具

如何在Django中实现简单易用的分页工具&#xff1f;&#x1f4da; 嗨&#xff0c;小伙伴们&#xff01;今天我们来看看如何在 Django 中实现一个超简单的分页工具。无论你是在处理博客文章、产品列表&#xff0c;还是用户评论&#xff0c;当数据量一大时&#xff0c;分页显得尤…...

「软件设计模式」装饰者模式(Decorator)

深入解析装饰者模式&#xff1a;动态扩展功能的艺术&#xff08;C实现&#xff09; 一、模式思想与应用场景 1.1 模式定义 装饰者模式&#xff08;Decorator Pattern&#xff09;是一种结构型设计模式&#xff0c;它通过将对象放入包含行为的特殊封装对象中&#xff0c;动态地…...

CI/CD(二)docker-compose安装Jenkins

1、docker-compose.yml version: 3.8services:jenkins:image: jenkins/jenkins:lts # 使用官方的 Jenkins LTS 镜像container_name: jenkinsuser: root # 如果需要以 root 用户运行ports:- "8080:8080" # Jenkins Web 界面端口- "50000:50000" # 用于 Jen…...

OpenCV机器学习(1)人工神经网络 - 多层感知器类cv::ml::ANN_MLP

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::ml::ANN_MLP 是 OpenCV 库中的一部分&#xff0c;用于实现人工神经网络 - 多层感知器&#xff08;Artificial Neural Network - Multi-Layer…...

ProxySQL构建PolarDB-X标准版高可用路由服务三节点集群

ProxySQL构建PolarDB-X标准版高可用路由服务三节点集群 一、PolarDB-X标准版主备集群搭建 三台机器上传 polardbx 包&#xff0c;包可以从官网https://openpolardb.com/download获取&#xff0c;这里提供离线rpm。 1、上传 polardbx 安装包 到 /opt目录下 rpm -ivh t-pol…...

15.1 Process(进程)类

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 通常开发时想要获得进程是比较困难的事&#xff0c;必须要调用CreateToolhelpSnapshot、ProcessFirst、ProcessNext等API或者诸如 Zw…...

elasticsearch8 linux版以服务的方式启动

1.创建系统服务文件 对于使用 systemd 作为系统初始化系统的 Linux 发行版&#xff08;如 CentOS 7 及以上、Ubuntu 16.04 及以上&#xff09;&#xff0c;需要创建一个 systemd 服务文件。以 root 用户或具有 sudo 权限的用户身份执行以下操作&#xff1a; sudo vim /etc/sy…...

小米 R3G 路由器刷机教程(Pandavan)

小米 R3G 路由器刷机教程&#xff08;Pandavan&#xff09; 一、前言 小米 R3G 路由器以其高性价比和稳定的性能备受用户青睐。然而&#xff0c;原厂固件的功能相对有限&#xff0c;难以满足高级用户的个性化需求。刷机不仅可以解锁路由器的潜能&#xff0c;还能通过第三方固…...

某大型业务系统技术栈介绍【应对面试】

微服务架构【图】 微服务架构【概念】 微服务架构&#xff0c;是一种架构模式&#xff0c;它提倡将单一应用程序划分成一组小的服务&#xff0c;服务之间互相协调、互相配合&#xff0c;为用户提供最终价值。在微服务架构中&#xff0c;服务与服务之间通信时&#xff0c;通常是…...

【区块链】零知识证明基础概念详解

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 零知识证明基础概念详解引言1. 零知识证明的定义与特性1.1 基本定义1.2 三个核心…...

建筑行业安全技能竞赛流程方案

一、比赛时间&#xff1a; 6月23日8&#xff1a;30分准时到场&#xff1b;9&#xff1a;00&#xff0d;10&#xff1a;00理论考试&#xff1b;10&#xff1a;10-12:00现场隐患答疑&#xff1b;12:00-13&#xff1a;30午餐&#xff1b;下午13&#xff1a;30-15&#xff1a;30现场…...

P3 QT项目----记事本(3.8)

3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...

04-初识css

一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...

多种风格导航菜单 HTML 实现(附源码)

下面我将为您展示 6 种不同风格的导航菜单实现&#xff0c;每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

自然语言处理——循环神经网络

自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元&#xff08;GRU&#xff09;长短期记忆神经网络&#xff08;LSTM&#xff09…...

Mobile ALOHA全身模仿学习

一、题目 Mobile ALOHA&#xff1a;通过低成本全身远程操作学习双手移动操作 传统模仿学习&#xff08;Imitation Learning&#xff09;缺点&#xff1a;聚焦与桌面操作&#xff0c;缺乏通用任务所需的移动性和灵活性 本论文优点&#xff1a;&#xff08;1&#xff09;在ALOHA…...

管理学院权限管理系统开发总结

文章目录 &#x1f393; 管理学院权限管理系统开发总结 - 现代化Web应用实践之路&#x1f4dd; 项目概述&#x1f3d7;️ 技术架构设计后端技术栈前端技术栈 &#x1f4a1; 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 &#x1f5c4;️ 数据库设…...

初探Service服务发现机制

1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能&#xff1a;服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源&#xf…...

Netty从入门到进阶(二)

二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架&#xff0c;用于…...

MySQL 索引底层结构揭秘:B-Tree 与 B+Tree 的区别与应用

文章目录 一、背景知识&#xff1a;什么是 B-Tree 和 BTree&#xff1f; B-Tree&#xff08;平衡多路查找树&#xff09; BTree&#xff08;B-Tree 的变种&#xff09; 二、结构对比&#xff1a;一张图看懂 三、为什么 MySQL InnoDB 选择 BTree&#xff1f; 1. 范围查询更快 2…...

深入理解Optional:处理空指针异常

1. 使用Optional处理可能为空的集合 在Java开发中&#xff0c;集合判空是一个常见但容易出错的场景。传统方式虽然可行&#xff0c;但存在一些潜在问题&#xff1a; // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...