Flink流计算处理-旁路输出
使用Flink做流数据处理时,除了主流数据输出,还自定义侧流输出即旁路输出,以实现灵活的数据拆分。
定义旁路输出标签
首先需要定义一个OutputTag,代码如下:
// 这需要是一个匿名的内部类,以便我们分析类型
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
在ProcessFunction使用Context调用
可以通过以下Function中,将outputTag作为参数传递到Context中
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
代码示例:
DataStream<Integer> input = ...;final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};SingleOutputStreamOperator<Integer> mainDataStream = input.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value,Context ctx,Collector<Integer> out) throws Exception {// 发送数据到主要的输出out.collect(value);// 发送数据到旁路输出ctx.output(outputTag, "sideout-" + String.valueOf(value));}});
在 DataStream 运算结果上使用 getSideOutput(OutputTag) 方法获取旁路输出流:
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = ...;
// 获取到侧流输出DataStream,输出结果类型要与outputTag 定义的一致
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
本文中只列出了Java代码的实现;
Flink官网还有Scala/python代码实现
参考链接:https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/datastream/side_output/
相关文章:
Flink流计算处理-旁路输出
使用Flink做流数据处理时,除了主流数据输出,还自定义侧流输出即旁路输出,以实现灵活的数据拆分。 定义旁路输出标签 首先需要定义一个OutputTag,代码如下: // 这需要是一个匿名的内部类,以便我们分析类型…...
nginx正向代理的配置和使用
nginx正向代理的配置和使用 nginx正向代理的配置和使用nginx正向代理的配置和使用安装包准备下载nginx安装包下载正向代理模块的包版本与模块对照表部署nginx服务上传nginx包和正向模块包解压,改名安装nginx配置正向代理创建nginx用户检查nginx配置并启动nginx服务所在服务器验…...
Oracle Trace File Analyzer 介绍及简单使用
一、什么是Oracle Trace File Analyzer Oracle Autonomous Health Framework(AHF) 包含 Oracle ORAchk, Oracle EXAchk, and Oracle Trace File Analyzer(TFA). AHF工具包包含了Oracle常用的多种诊断工具,如 ORAchk, Oracle EXAchk, and Oracle Trace File Analyzer…...
面试实战篇 | 快手本地生活,结合项目谈Redis实战项目场景?MySQL InnoDB存储引擎如何工作的?策略模式?
本期是【你好,面试官】系列文章的第21期,持续更新中…。 《你好,面试官》系列目前已经连载20篇了,据说看了这个系列的朋友都拿到了大厂offer~ 你好,面试官 | 你真的理解面向 “对象”?你好,面…...
Hadoop之——WordCount案例与执行本地jar包
目录 一、WordCount代码 (一)WordCount简介 1.wordcount.txt (二)WordCount的java代码 1.WordCountMapper 2.WordCountReduce 3.WordCountDriver (三)IDEA运行结果 (四)Hadoop运行wordcount 1.在HDFS上新建一个文件目录 2.新建一个文件,并上传至该目录下…...
利用git reflog 命令来查看历史提交记录,并使用提交记录恢复已经被删除掉的分支
一.问题描述 当我们在操作中手误删除了某个分支,那该分支中提交的内容也没有了,我们可以利用git reflog这个命令来查看历史提交的记录从而恢复被删除的分支和提交的内容 二.模拟问题 1.创建git仓库,并提交一个文件 [rootcentos7-temp /da…...
【软件测试】大厂测试开发你真的了解吗?测试开发养成记......
目录:导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜)前言 在一些大公司里&…...
Redis中的hash结构和扩容机制
1.rehash原理 hash包含两个数据结构为字典数组ht[0]和ht[1]。其中ht[0]用来存放数据,ht[1]在rehash时使用。 扩容时,ht[1]的大小为第一个大于等于ht[0].used*2的2的幂次方的数; 收缩时,ht[1]的大小为第一个大于等于ht[0].used的…...
【C++奇技淫巧】前置自增与后置自增的区别(++i,i++)【2023.02.08】
简介 先说i和i的区别,判断语句中if(i)是拿i的值先判断,而后自增;if(i)是先自增i再进行判断。涉及到左值与右值也有点区别,i返回的是右值,i返回的是左值。也就是下面的代码要解释的东西。 #include <iostream>i…...
实战打靶集锦-005-HL
**写在前面:**记录一次曲折的打靶经历。 目录1. 主机发现2. 端口扫描3. 服务枚举4. 服务探查4.1 浏览器访问4.2 目录枚举4.3 探查admin4.4 探查index4.5 探查login5 公共EXP搜索6. 再次目录枚举6.1 探查superadmin.php6.2 查看页面源代码6.3 base64绕过6.4 构建反弹…...
铁路系统各专业介绍(车机工电辆)
目录 1 车务段 1.1 职能简介 1.2 路段名单 1.3 岗位级别 2 机务段 2.1 职能简介 2.2 路段名单 2.3 岗位级别 3 工务段 3.1 职能简介 3.2 路段名单 3.3 岗位级别 4 电务段 4.1 职能简介 4.2 路段名单 4.3 岗位级别 5 车辆段 5.1 职能简介 5.2 路段名单 5.3 …...
2/11考试总结
时间安排 7:30–7:50 读题,T1貌似是个 dp ,T2 数据结构,T3 可能是数据结构。 7:50–9:45 T1,点规模非常大,可以达到 1e18 级别,感觉应该没法直接做,考虑每条新增的边的贡献,想到用 …...
Java Set集合
7 Set集合 7.1 Set集合的概述和特点 Set集合的特点 不包含重复元素的集合没有带索引的方法,所以不能使用普通for循环 Set集合是接口通过实现类实例化(多态的形式) HashSet:添加的元素是无序,不重复,无索引…...
【手写 Vuex 源码】第七篇 - Vuex 的模块安装
一,前言 上一篇,主要介绍了 Vuex 模块收集的实现,主要涉及以下几个点: Vuex 模块的概念;Vuex 模块和命名空间的使用;Vuex 模块收集的实现-构建“模块树”; 本篇,继续介绍 Vuex 模…...
EOC第六章《块与中枢派发》
文章目录第37条:理解block这一概念第38条:为常用的块类型创建typedef第39条:用handler块降低代码分散程度第41条:多用派发队列,少用同步锁方案一:使用串行同步队列来将读写操作都安排到同一个队列里&#x…...
八、Git远程仓库操作——跨团队成员的协作
前言 前面一篇博文介绍了git团队成员之间的协作,现在在介绍下如果是跨团队成员的话,如何协作? 跨团队成员协作,其实就是你不属于那个项目的成员,你没有权限向那个仓库提交代码。但是github还有另一种 pull request&a…...
算法刷题打卡第88天:字母板上的路径
字母板上的路径 难度:中等 我们从一块字母板上的位置 (0, 0) 出发,该坐标对应的字符为 board[0][0]。 在本题里,字母板为board ["abcde", "fghij", "klmno", "pqrst", "uvwxy", "…...
UVa The Morning after Halloween 万圣节后的早晨 双向BFS
题目链接:The Morning after Halloween 题目描述: 给定一个二维矩阵,图中有障碍物和字母,你需要把小写字母移动到对应的大写字母位置,不同的小写字母可以同时移动(上下左右四个方向或者保持不动 ࿰…...
Connext DDS属性配置参考大全(3)
Transport传输dds.participant.logging.time_based_logging.process_received_messagedds.participant.logging.time_based_logging.process_received_message.timeout...
Docker-安装Jenkins-使用jenkins发版Java项目
文章目录0.前言环境背景1.操作流程1.1前期准备工作1.1.1环境变量的配置1.2使用流水线的方式进行发版1.2.1新建流水线任务1.2.2流水线操作工具tools步骤stages步骤1:拉取代码编译步骤2:发送文件并启动0.前言 学海无涯,旅“途”漫漫,“途”中小记ÿ…...
使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...
未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?
编辑:陈萍萍的公主一点人工一点智能 未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战,在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...
Linux链表操作全解析
Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表?1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...
质量体系的重要
质量体系是为确保产品、服务或过程质量满足规定要求,由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面: 🏛️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限,形成层级清晰的管理网络…...
成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...
基于 TAPD 进行项目管理
起因 自己写了个小工具,仓库用的Github。之前在用markdown进行需求管理,现在随着功能的增加,感觉有点难以管理了,所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD,需要提供一个企业名新建一个项目&#…...
CVE-2020-17519源码分析与漏洞复现(Flink 任意文件读取)
漏洞概览 漏洞名称:Apache Flink REST API 任意文件读取漏洞CVE编号:CVE-2020-17519CVSS评分:7.5影响版本:Apache Flink 1.11.0、1.11.1、1.11.2修复版本:≥ 1.11.3 或 ≥ 1.12.0漏洞类型:路径遍历&#x…...
