当前位置: 首页 > news >正文

Flink多流处理之connect拼接流

Flink中的拼接流connect的使用其实非常简单,就是leftStream.connect(rightStream)的方式,但是有一点我们需要清楚,使用connect后并不是将两个流给串联起来了,而是将左流和右流建立一个联系,作为一个大的流,并且这个大的流可以使用相同的逻辑处理leftStreamrightStream,也可以使用不同的逻辑处理leftStreamrightStream.
如下图:
在这里插入图片描述

下面的演示代码也可以通过这个图结合来看,其实connect算子最主要的作用就是共享状态,如常用的广播状态.

  • 代码
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;import java.util.Arrays;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/7* @Description: 多流操作-流连接**/
public class FlinkConnect {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 添加数据源1DataStreamSource<String> sourceStream1 = env.fromCollection(Arrays.asList("a", "b", "c", "d"));// 添加数据源2DataStreamSource<Double> sourceStream2 = env.fromCollection(Arrays.asList(22.2, 11.0, 6.0, 98.0, 100.0));// 拼接数据流ConnectedStreams<String, Double> connectedStream = sourceStream1.connect(sourceStream2);// 这里使用map算子作为演示SingleOutputStreamOperator<String> resultStream = connectedStream.map(new CoMapFunction<String, Double, String>() {/*** map1作为左流**/@Overridepublic String map1(String value) throws Exception {return "字符串: " + value;}/*** map2作为右流**/@Overridepublic String map2(Double value) throws Exception {return "数字: " + (value * 100);}});// 打印结果resultStream.print();env.execute("Connect Operator");}
}
  • 结果
3> 字符串: b
1> 数字: 600.0
2> 字符串: a
3> 数字: 1100.0
2> 数字: 2220.0
2> 字符串: d
2> 数字: 9800.0
3> 数字: 10000.0
1> 字符串: c

相关文章:

Flink多流处理之connect拼接流

Flink中的拼接流connect的使用其实非常简单,就是leftStream.connect(rightStream)的方式,但是有一点我们需要清楚,使用connect后并不是将两个流给串联起来了,而是将左流和右流建立一个联系,作为一个大的流,并且这个大的流可以使用相同的逻辑处理leftStream和rightStream,也可以…...

对任意类型数都可以排序的函数:qsort函数

之前我们学习过冒泡排序&#xff1a; int main() {int arr[] { 9,7,8,6,5,4,3,2,1,0 };int sz sizeof(arr)/sizeof(arr[0]);int i 0;for (i 0; i < sz-1; i) {int j 0;for (j 0; j < sz-1-i; j) {if (arr[j] > arr[j 1]){int temp 0;temp arr[j];arr[j] ar…...

vue数据更新table内容不更新解决方法

场景&#xff1a; table组件绑定的数据变化时&#xff0c;页面没有重新渲染&#xff0c;常见于子组件中使用table组件 原理&#xff1a; 创建实例时 数组在vue中没有被监听到&#xff0c;属于非响应式数据&#xff0c;数组的下标变化无法监听到 解决方式&#xff1a; <e…...

合宙Air724UG LuatOS-Air script lib API--record

record Table of Contents record record.start(seconds, cbFnc, type, quality, rcdType, format, streamRptLen) record.stop(cbFnc) record.getFilePath() record.getData(offset, len) record.getSize() record.delete() record.exists() record.isBusy() record 模块功能&…...

基于Vgg16和Vgg19深度学习网络的步态识别系统matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 MATLAB2022A 3.部分核心程序 ................................................................ % 设置训练选项options …...

Java分布式微服务3——Docker

文章目录 Docker介绍安装DockerDocker基础操作Docker服务的启动镜像命令容器命令1. 从docker hub去查看Nginx容器的运行命令2. 查看所有容器状态3. 查看容器日志4. 进入Nginx容器执行命令&#xff0c;修改Html内容&#xff0c;添加“Hello World”5. 停止与开始容器6. 删除容器…...

js字符串替换

在JavaScript中&#xff0c;字符串替换 有多种方法&#xff0c;下面介绍其中一些比较常用的方法。 使用replace()方法、 replace()方法用于在字符串中查…...

网络防御(2)

1. 什么是防火墙&#xff1f; 2. 状态防火墙工作原理&#xff1f; 3. 防火墙如何处理双通道协议&#xff1f; 一、什么是防火墙&#xff1f; 防火墙是一种网络安全设备或软件&#xff0c;用于保护计算机网络免受未经授权的访问&#xff0c;并管理网络流量。它作为一个安全边界…...

[RCTF2019]DontEatMe

前言 一道迷宫题&#xff0c;但是输入被加密后使用&#xff0c;迷宫也需要在程序中找出并没有直接输出 分析 反调试 发现有两个比较特殊的地方&#xff0c;随机数和创建了新线程&#xff0c;随机数后面又被重新赋值给覆盖了&#xff0c;暂时不用管&#xff0c;ZwSetInformat…...

6. CSS(三)

目录 一、盒子模型 &#xff08;一&#xff09;网页布局的本质 &#xff08;二&#xff09;盒子模型组成 &#xff08;三&#xff09;边框&#xff08;border&#xff09; &#xff08;四&#xff09;表格的细线边框 &#xff08;五&#xff09;内边距&#xff08;padding…...

计算机网络—HTTP

这里写目录标题 HTTP是什么HTTP常见状态码HTTP常见字段GET与POST的区别Get和Post是安全和幂等吗PUT幂等&#xff0c;不安全DELETE幂等&#xff0c;不是安全 HTTP缓存技术HTTP缓存实现技术 HTTP1.0优缺点和性能HTTP1.1优缺点和性能HTTP2优缺点和性能HTTP3优缺点和性能HTTP和HTTP…...

Tomcat线程池原理

1. 一个 SpringBoot 项目能同时处理多少请求&#xff1f;tomcat容器&#xff0c; 200 次。 2. 怎么来的&#xff1f; 而点击这些线程&#xff0c;查看其堆栈消息&#xff0c;可以看到 Tomcat、threads、ThreadPoolExecutor 等关键字 基于“短时间内有 200 个请求被立马处理…...

踩坑 视觉SLAM 十四讲第二版 ch13 编译及运行问题

一、安装Geset 库 sudo apt-get install libgtest-dev cd /usr/src/gtest sudo mkdir build cd build sudo cmake .. //一定要以sudo的方式运行&#xff0c;否则没有写入权限 sudo make //这个也一样要以sudo的方式 sudo cp libgtest*.a /usr/local/lib //将生成…...

【设计模式】-装饰器模式

Java 设计模式之装饰器模式 前言 在软件开发中&#xff0c;经常有需求对已有的对象进行功能的扩展&#xff0c;但是传统的继承方式会导致类的数量快速增多&#xff0c;且难以维护。为了解决这个问题&#xff0c;装饰器模式应运而生。 装饰器模式是一种结构型设计模式&#xff…...

七月学习总结

一晃暑期七月份已经结束了&#xff0c;八月份需要做的事情更多。 在成长的路上不断地迷茫&#xff0c;不断地前进。到底才能完成对自己地救赎。 目前想的就是以后走软件开发&#xff0c;往架构方向做&#xff0c;主语言Java或者go&#xff0c;408基础一定要扎实&#xff0c;计…...

Camunda 7.x 系列【6】Spring Boot 集成 Camunda 7.19

有道无术,术尚可求,有术无道,止于术。 本系列Spring Boot 版本 2.7.9 本系列Camunda 版本 7.19.0 源码地址:https://gitee.com/pearl-organization/camunda-study-demo 文章目录 1. 前言2. Camunda Platform Run3. Spring Boot 版本兼容性4. 集成 Spring Boot5. 启动项目…...

Kubernetes —调度器配置

目录 配置文件 扩展点 调度插件 多配置文件 应用于多个扩展点的插件 调度程序配置迁移 你可以通过编写配置文件&#xff0c;并将其路径传给 kube-scheduler 的命令行参数&#xff0c;定制 kube-scheduler 的行为。 调度模板&#xff08;Profile&#xff09;允许你配置 k…...

【微信小程序】申请蓝牙、位置和数据库等相关权限

在小程序的app.json文件中配置requiredPermissions字段&#xff0c;并在其中添加相应的权限。 以下是一个示例app.json文件的配置&#xff0c;包括了蓝牙、位置和数据库等权限的申请&#xff1a; {"pages": ["pages/index/index"],"requiredPermiss…...

ORB-SLAM2学习笔记6之D435i双目IR相机运行ROS版ORB-SLAM2并发布位姿pose的rostopic

文章目录 0 引言1 D435i相机配置2 新增发布双目位姿功能2.1 新增d435i_stereo.cc代码2.2 修改CMakeLists.txt2.3 新增配置文件D435i.yaml 3 编译运行和结果3.1 编译运行3.2 结果3.3 可能出现的问题 0 引言 ORB-SLAM2学习笔记1已成功编译安装ROS版本ORB-SLAM2到本地&#xff0c…...

【数据结构与算法——TypeScript】哈希表

【数据结构与算法——TypeScript】 哈希表(HashTable) 哈希表介绍和特性 哈希表是一种非常重要的数据结构&#xff0c;但是很多学习编程的人一直搞不懂哈希表到底是如何实现的。 在这一章节中&#xff0c;我门就一点点来实现一个自己的哈希表。通过实现来理解哈希表背后的原理…...

高性能企业级数据集成架构设计:Pentaho Kettle 11.0核心引擎深度解析与部署指南

高性能企业级数据集成架构设计&#xff1a;Pentaho Kettle 11.0核心引擎深度解析与部署指南 【免费下载链接】pentaho-kettle Pentaho Data Integration ( ETL ) a.k.a Kettle 项目地址: https://gitcode.com/gh_mirrors/pe/pentaho-kettle Pentaho Data Integration&am…...

如何高效解决多云存储兼容问题?Alibaba Cloud OSS SDK实战指南

如何高效解决多云存储兼容问题&#xff1f;Alibaba Cloud OSS SDK实战指南 【免费下载链接】alibabacloud-oss-sdk The OSS SDK. Powered by Darabonba. 项目地址: https://gitcode.com/gh_mirrors/al/alibabacloud-oss-sdk 面对日益复杂的多云存储环境&#xff0c;开发…...

<el-button type=“primary“><el-icon><Plus /></el-icon> 上传照片</el-button>的庖丁解牛

它的本质是&#xff1a;**这行代码不仅仅是一个按钮&#xff0c;它是一个 复合交互单元 (Composite Interaction Unit)。它通过 语义化标签 (el-button)、视觉信号 (type"primary", Plus Icon) 和 文本提示 (“上传照片”) 的组合&#xff0c;向用户传达了一个明确的…...

2026终极测评:16款降AIGC工具横评,论文降重降ai率终极答案!

随着AI写作技术的迅猛发展&#xff0c;越来越多的学术创作者开始依赖各类生成工具提升效率。然而&#xff0c;2026年各大高校与科研机构对AIGC内容的检测标准愈发严格&#xff0c;论文中的一丝AI痕迹都可能成为被质疑的导火索。面对日益严峻的查重与AIGC检测压力&#xff0c;如…...

TokUnion 技术架构解析:AI+GEO 驱动的跨境增长数据闭环设计

摘要最近这个时间段&#xff0c;是国货出海精细化与合规化转型背景的深度期&#xff0c;传统粗放式广告投放&#xff0c;和单一渠道运营模式面临获客成本高、ROI 不可控、数据孤岛、合规风险突出等问题。下面这个文章&#xff0c;我会以TokUnion数字化协同体系为研究对象&#…...

如何快速修复损坏的QR码:QrazyBox终极指南

如何快速修复损坏的QR码&#xff1a;QrazyBox终极指南 【免费下载链接】qrazybox QR Code Analysis and Recovery Toolkit 项目地址: https://gitcode.com/gh_mirrors/qr/qrazybox 你是否曾遇到过打印的二维码被咖啡渍污染&#xff0c;或者手机拍摄的二维码模糊不清&…...

NotebookLM落地失败真相:为什么83%团队卡在第2阶段?3个权威诊断指标立即自检

更多请点击&#xff1a; https://codechina.net 第一章&#xff1a;NotebookLM落地失败的核心归因诊断 NotebookLM 作为 Google 推出的面向文档理解的实验性 AI 工具&#xff0c;其本地化部署与企业级集成常遭遇系统性失效。深入分析表明&#xff0c;失败并非源于单一技术缺陷…...

保姆级教程:在Vue3项目中用ZLMediaKit+WebRTC实现超低延迟监控直播(附完整代码)

Vue3WebRTC超低延迟监控直播实战指南 在实时视频监控领域&#xff0c;延迟是衡量系统性能的核心指标之一。传统RTSP流媒体方案在Web端实现时&#xff0c;往往面临秒级甚至更长的延迟&#xff0c;这在对实时性要求极高的安防监控、工业检测等场景中成为致命短板。本文将深入探讨…...

WorkshopDL神器秘籍:零门槛解锁Steam创意工坊的终极跨平台方案

WorkshopDL神器秘籍&#xff1a;零门槛解锁Steam创意工坊的终极跨平台方案 【免费下载链接】WorkshopDL WorkshopDL - The Best Steam Workshop Downloader 项目地址: https://gitcode.com/gh_mirrors/wo/WorkshopDL 你是不是曾经在Epic商店买了《盖瑞的模组》&#xff…...

KMS_VL_ALL_AIO:三步永久激活Windows和Office的智能解决方案

KMS_VL_ALL_AIO&#xff1a;三步永久激活Windows和Office的智能解决方案 【免费下载链接】KMS_VL_ALL_AIO Smart Activation Script 项目地址: https://gitcode.com/gh_mirrors/km/KMS_VL_ALL_AIO 还在为Windows系统频繁弹出激活提示而烦恼吗&#xff1f;Office文档突然…...