Flink多流处理之connect拼接流
Flink中的拼接流connect的使用其实非常简单,就是leftStream.connect(rightStream)的方式,但是有一点我们需要清楚,使用connect后并不是将两个流给串联起来了,而是将左流和右流建立一个联系,作为一个大的流,并且这个大的流可以使用相同的逻辑处理leftStream和rightStream,也可以使用不同的逻辑处理leftStream和rightStream.
如下图:

下面的演示代码也可以通过这个图结合来看,其实connect算子最主要的作用就是共享状态,如常用的广播状态.
- 代码
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;import java.util.Arrays;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/7* @Description: 多流操作-流连接**/
public class FlinkConnect {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 添加数据源1DataStreamSource<String> sourceStream1 = env.fromCollection(Arrays.asList("a", "b", "c", "d"));// 添加数据源2DataStreamSource<Double> sourceStream2 = env.fromCollection(Arrays.asList(22.2, 11.0, 6.0, 98.0, 100.0));// 拼接数据流ConnectedStreams<String, Double> connectedStream = sourceStream1.connect(sourceStream2);// 这里使用map算子作为演示SingleOutputStreamOperator<String> resultStream = connectedStream.map(new CoMapFunction<String, Double, String>() {/*** map1作为左流**/@Overridepublic String map1(String value) throws Exception {return "字符串: " + value;}/*** map2作为右流**/@Overridepublic String map2(Double value) throws Exception {return "数字: " + (value * 100);}});// 打印结果resultStream.print();env.execute("Connect Operator");}
}
- 结果
3> 字符串: b
1> 数字: 600.0
2> 字符串: a
3> 数字: 1100.0
2> 数字: 2220.0
2> 字符串: d
2> 数字: 9800.0
3> 数字: 10000.0
1> 字符串: c
相关文章:
Flink多流处理之connect拼接流
Flink中的拼接流connect的使用其实非常简单,就是leftStream.connect(rightStream)的方式,但是有一点我们需要清楚,使用connect后并不是将两个流给串联起来了,而是将左流和右流建立一个联系,作为一个大的流,并且这个大的流可以使用相同的逻辑处理leftStream和rightStream,也可以…...
对任意类型数都可以排序的函数:qsort函数
之前我们学习过冒泡排序: int main() {int arr[] { 9,7,8,6,5,4,3,2,1,0 };int sz sizeof(arr)/sizeof(arr[0]);int i 0;for (i 0; i < sz-1; i) {int j 0;for (j 0; j < sz-1-i; j) {if (arr[j] > arr[j 1]){int temp 0;temp arr[j];arr[j] ar…...
vue数据更新table内容不更新解决方法
场景: table组件绑定的数据变化时,页面没有重新渲染,常见于子组件中使用table组件 原理: 创建实例时 数组在vue中没有被监听到,属于非响应式数据,数组的下标变化无法监听到 解决方式: <e…...
合宙Air724UG LuatOS-Air script lib API--record
record Table of Contents record record.start(seconds, cbFnc, type, quality, rcdType, format, streamRptLen) record.stop(cbFnc) record.getFilePath() record.getData(offset, len) record.getSize() record.delete() record.exists() record.isBusy() record 模块功能&…...
基于Vgg16和Vgg19深度学习网络的步态识别系统matlab仿真
目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 MATLAB2022A 3.部分核心程序 ................................................................ % 设置训练选项options …...
Java分布式微服务3——Docker
文章目录 Docker介绍安装DockerDocker基础操作Docker服务的启动镜像命令容器命令1. 从docker hub去查看Nginx容器的运行命令2. 查看所有容器状态3. 查看容器日志4. 进入Nginx容器执行命令,修改Html内容,添加“Hello World”5. 停止与开始容器6. 删除容器…...
js字符串替换
在JavaScript中,字符串替换 有多种方法,下面介绍其中一些比较常用的方法。 使用replace()方法、 replace()方法用于在字符串中查…...
网络防御(2)
1. 什么是防火墙? 2. 状态防火墙工作原理? 3. 防火墙如何处理双通道协议? 一、什么是防火墙? 防火墙是一种网络安全设备或软件,用于保护计算机网络免受未经授权的访问,并管理网络流量。它作为一个安全边界…...
[RCTF2019]DontEatMe
前言 一道迷宫题,但是输入被加密后使用,迷宫也需要在程序中找出并没有直接输出 分析 反调试 发现有两个比较特殊的地方,随机数和创建了新线程,随机数后面又被重新赋值给覆盖了,暂时不用管,ZwSetInformat…...
6. CSS(三)
目录 一、盒子模型 (一)网页布局的本质 (二)盒子模型组成 (三)边框(border) (四)表格的细线边框 (五)内边距(padding…...
计算机网络—HTTP
这里写目录标题 HTTP是什么HTTP常见状态码HTTP常见字段GET与POST的区别Get和Post是安全和幂等吗PUT幂等,不安全DELETE幂等,不是安全 HTTP缓存技术HTTP缓存实现技术 HTTP1.0优缺点和性能HTTP1.1优缺点和性能HTTP2优缺点和性能HTTP3优缺点和性能HTTP和HTTP…...
Tomcat线程池原理
1. 一个 SpringBoot 项目能同时处理多少请求?tomcat容器, 200 次。 2. 怎么来的? 而点击这些线程,查看其堆栈消息,可以看到 Tomcat、threads、ThreadPoolExecutor 等关键字 基于“短时间内有 200 个请求被立马处理…...
踩坑 视觉SLAM 十四讲第二版 ch13 编译及运行问题
一、安装Geset 库 sudo apt-get install libgtest-dev cd /usr/src/gtest sudo mkdir build cd build sudo cmake .. //一定要以sudo的方式运行,否则没有写入权限 sudo make //这个也一样要以sudo的方式 sudo cp libgtest*.a /usr/local/lib //将生成…...
【设计模式】-装饰器模式
Java 设计模式之装饰器模式 前言 在软件开发中,经常有需求对已有的对象进行功能的扩展,但是传统的继承方式会导致类的数量快速增多,且难以维护。为了解决这个问题,装饰器模式应运而生。 装饰器模式是一种结构型设计模式ÿ…...
七月学习总结
一晃暑期七月份已经结束了,八月份需要做的事情更多。 在成长的路上不断地迷茫,不断地前进。到底才能完成对自己地救赎。 目前想的就是以后走软件开发,往架构方向做,主语言Java或者go,408基础一定要扎实,计…...
Camunda 7.x 系列【6】Spring Boot 集成 Camunda 7.19
有道无术,术尚可求,有术无道,止于术。 本系列Spring Boot 版本 2.7.9 本系列Camunda 版本 7.19.0 源码地址:https://gitee.com/pearl-organization/camunda-study-demo 文章目录 1. 前言2. Camunda Platform Run3. Spring Boot 版本兼容性4. 集成 Spring Boot5. 启动项目…...
Kubernetes —调度器配置
目录 配置文件 扩展点 调度插件 多配置文件 应用于多个扩展点的插件 调度程序配置迁移 你可以通过编写配置文件,并将其路径传给 kube-scheduler 的命令行参数,定制 kube-scheduler 的行为。 调度模板(Profile)允许你配置 k…...
【微信小程序】申请蓝牙、位置和数据库等相关权限
在小程序的app.json文件中配置requiredPermissions字段,并在其中添加相应的权限。 以下是一个示例app.json文件的配置,包括了蓝牙、位置和数据库等权限的申请: {"pages": ["pages/index/index"],"requiredPermiss…...
ORB-SLAM2学习笔记6之D435i双目IR相机运行ROS版ORB-SLAM2并发布位姿pose的rostopic
文章目录 0 引言1 D435i相机配置2 新增发布双目位姿功能2.1 新增d435i_stereo.cc代码2.2 修改CMakeLists.txt2.3 新增配置文件D435i.yaml 3 编译运行和结果3.1 编译运行3.2 结果3.3 可能出现的问题 0 引言 ORB-SLAM2学习笔记1已成功编译安装ROS版本ORB-SLAM2到本地,…...
【数据结构与算法——TypeScript】哈希表
【数据结构与算法——TypeScript】 哈希表(HashTable) 哈希表介绍和特性 哈希表是一种非常重要的数据结构,但是很多学习编程的人一直搞不懂哈希表到底是如何实现的。 在这一章节中,我门就一点点来实现一个自己的哈希表。通过实现来理解哈希表背后的原理…...
Java 使用国密算法实现数据加密传输
本文是混合加密:前端 SM2 SM4,后端 Spring Boot Hutool 解密的完整示例。 方案的逻辑是: 前端随机生成一个 SM4 key 用 SM4 加密整个业务 JSON 用后端提供的 SM2 公钥 加密这个 SM4 key 后端先用 SM2 私钥 解出 SM4 key 再用 SM4 解出…...
Leather Dress Collection镜像免配置:预装SD1.5+12LoRA+app.py开箱即用
Leather Dress Collection镜像免配置:预装SD1.512LoRAapp.py开箱即用 想快速生成各种酷炫的皮革服装设计图,但被繁琐的模型下载、环境配置和参数调试劝退?今天介绍的Leather Dress Collection镜像,就是为你准备的“开箱即用”解决…...
Realistic Vision V5.1本地AI摄影方案:支持HDR合成与多曝光融合预处理
Realistic Vision V5.1本地AI摄影方案:支持HDR合成与多曝光融合预处理 1. 项目概述 Realistic Vision V5.1虚拟摄影棚是一款基于Stable Diffusion 1.5生态顶级写实模型开发的本地化AI摄影工具。它通过深度优化模型参数和显存管理,让普通用户无需专业摄…...
OpenClaw+Qwen3.5-4B-Claude镜像:30分钟搭建逻辑推理自动化工作流
OpenClawQwen3.5-4B-Claude镜像:30分钟搭建逻辑推理自动化工作流 1. 为什么需要逻辑推理自动化 上周我遇到一个典型的技术问题:需要从200多行Python日志中找出导致接口超时的根本原因。手动排查不仅耗时,还容易遗漏关键线索。这让我开始思考…...
大麦抢票自动化终极指南:5分钟快速上手教程
大麦抢票自动化终极指南:5分钟快速上手教程 【免费下载链接】ticket-purchase 大麦自动抢票,支持人员、城市、日期场次、价格选择 项目地址: https://gitcode.com/GitHub_Trending/ti/ticket-purchase 您是否曾因热门演唱会门票秒光而遗憾&#x…...
探索SillyTavern角色卡片系统:从数据封装到沉浸式互动的技术解析
探索SillyTavern角色卡片系统:从数据封装到沉浸式互动的技术解析 【免费下载链接】SillyTavern LLM Frontend for Power Users. 项目地址: https://gitcode.com/GitHub_Trending/si/SillyTavern 核心价值:重新定义AI角色的数字存在形式 当我们与…...
如何利用多渠道SEO推广提高网站流量
<h2>多渠道SEO推广:如何提高网站流量</h2> <p>在当前竞争激烈的互联网环境中,网站流量是衡量网站成功与否的重要指标之一。如何利用多渠道SEO推广提高网站流量,成为了每一个网站运营者关注的焦点。本文将从问题分析、原因说…...
不用编译!快速修改Scratch-blocks积木字体的偷懒方法
零编译实战:Scratch-blocks字体调整极简方案 在Scratch 3.0的二次开发过程中,积木字体过小是开发者普遍遇到的痛点。官方移除了字体调节功能后,低分辨率设备上的中文显示尤为模糊。传统解决方案需要配置Python环境并重新编译scratch-blocks库…...
从零开始:在VMware虚拟机中部署Qwen3.5-4B-Claude-4.6-Opus-Reasoning-Distilled-GGUF进行开发测试
从零开始:在VMware虚拟机中部署Qwen3.5-4B-Claude-4.6-Opus-Reasoning-Distilled-GGUF进行开发测试 1. 准备工作与环境搭建 在开始之前,我们需要准备好必要的软件和资源。首先确保你的主机系统满足以下要求: 至少16GB内存(推荐…...
TouchGal:一站式Galgame社区解决方案终极指南
TouchGal:一站式Galgame社区解决方案终极指南 【免费下载链接】kun-touchgal-next TouchGAL是立足于分享快乐的一站式Galgame文化社区, 为Gal爱好者提供一片净土! 项目地址: https://gitcode.com/gh_mirrors/ku/kun-touchgal-next 还在为寻找Galgame资源而四…...
