flink处理函数--副输出功能
背景
在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出
副输出
本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出的功能,正常的记录还是从主输出中输出.代码如下:
package wikiedits.processfunc.job;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;
import wikiedits.processfunc.process.FreezingMonitor;
import wikiedits.processfunc.source.SensorSource;public class SideOutPutJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<SensorReading> readings = see.addSource(new SensorSource());SingleOutputStreamOperator<SensorReading> monitoredReadings = readings.process(new FreezingMonitor());// 打印附输出monitoredReadings.getSideOutput(new OutputTag<String>("freezing-alarms"){}).print();// 打印主输出monitoredReadings.print();see.execute();}
}package wikiedits.processfunc.process;import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;public class FreezingMonitor extends ProcessFunction<SensorReading, SensorReading> {private OutputTag<String> freezingAlarmOutput = new OutputTag<String>("freezing-alarms") {};@Overridepublic void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {if (value.temperature < 32.0) {ctx.output(freezingAlarmOutput, "freezing alarm for " + value.id + " :" + value.temperature);}out.collect(value);}}
package wikiedits.processfunc.source;/** Copyright 2015 Fabian Hueske / Vasia Kalavri** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import wikiedits.processfunc.pojo.SensorReading;import java.util.Calendar;
import java.util.Random;/*** Flink SourceFunction to generate SensorReadings with random temperature values.** Each parallel instance of the source simulates 10 sensors which emit one sensor reading every 100 ms.** Note: This is a simple data-generating source function that does not checkpoint its state.* In case of a failure, the source does not replay any data.*/
public class SensorSource extends RichParallelSourceFunction<SensorReading> {// flag indicating whether source is still runningprivate boolean running = true;/** run() continuously emits SensorReadings by emitting them through the SourceContext. */@Overridepublic void run(SourceContext<SensorReading> srcCtx) throws Exception {// initialize random number generatorRandom rand = new Random();// look up index of this parallel taskint taskIdx = this.getRuntimeContext().getIndexOfThisSubtask();// initialize sensor ids and temperaturesString[] sensorIds = new String[10];double[] curFTemp = new double[10];for (int i = 0; i < 10; i++) {sensorIds[i] = "sensor_" + (taskIdx * 10 + i);curFTemp[i] = 65 + (rand.nextGaussian() * 20);}while (running) {// get current timelong curTime = Calendar.getInstance().getTimeInMillis();// emit SensorReadingsfor (int i = 0; i < 10; i++) {// update current temperaturecurFTemp[i] += rand.nextGaussian() * 0.5;// emit readingsrcCtx.collect(new SensorReading(sensorIds[i], curTime, curFTemp[i]));}// wait for 100 msThread.sleep(3000);}}/** Cancels this SourceFunction. */@Overridepublic void cancel() {this.running = false;}
}
程序运行结果:

相关文章:
flink处理函数--副输出功能
背景 在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出 副输出 本文还是基于streaming-with-flink这本…...
Java数据结构————队列
一 、队列 在Java中,Queue是个接口,底层是通过链表实现的。 只允许在一端进行插入数据操作, 在另一端进行删除数据操作的特殊线性表, 队列具有先进先出FIFO(First In First Out) 。 入队列: 进行插入操作的一端称为…...
办公网络构建
办公网络项目背景 XX州市益智软件科技有限公司是XX市第九职业技术学校校办企业,依托学校人力技术、场地资源,面向市场独立经营、服务社会,主要从事网络设备销售、网络综合布线与网络管理。该公司现租用实训基地二层作为公司的办公经营场地…...
单层神经网络
神经网络 人工神经网络(Artificial Neural Network,ANN),简称神经网络(Neural Network,NN),是一种模仿生物神经网络的结构和功能的数学模型或计算模型。1943年,McCulloc…...
htb-cozyhosting
HTB-CozyHosting https://app.hackthebox.com/machines/CozyHosting ──(kwkl㉿kwkl)-[~] └─$ tail -l /etc/hosts …...
网络安全渗透测试工具之skipfish
网络安全渗透测试工具skipfish介绍 在数字化的时代,Web 应用程序安全成为了首要任务。想象一下,您是一位勇敢的安全冒险家,迎接着那些隐藏在 Web 应用程序中的未知风险。而在这个冒险之旅中,您需要一款强大的工具来帮助您发现漏洞,揭示弱点。而这个工具就是 Skipfish。 …...
【Rust】文件系统
目录 一、读取文件的字符串行 二、避免读取写入同一文件 三、使用内存映射随机访问文件 四、过去 24 小时内修改过的文件名 五、查找给定路径的循环 六、递归查找重名文件 七、使用给定断言递归查找所有文件 八、跳过隐藏文件遍历目录 九、在给定深度的目录࿰…...
mysql双主双从读写分离
架构图: 详细内容参考: 结果展示: 178.119.30.16(从)- master 178.119.30.17(从)- slave 由上述结果可以看出,产生了主备节点同时抢占VIP的问题(即脑裂问题)…...
postgresql-物化视图
postgresql-物化视图 物化视图创建物化视图刷新物化视图修改物化视图删除物化视图 物化视图 创建物化视图 postgresql使用create materialized view 语句创建视图 create materialized view if not exists name as query [with [NO] data];-- 创建一个包含员工统计信息的物化…...
多层神经网络和激活函数
多层神经网络的结构 多层神经网络就是由单层神经网络进行叠加之后得到的,所以就形成了层的概念,常见的多层神经网络有如下结构: 1)输入层(Input layer),众多神经元(Neuronÿ…...
Visual Studio Code键盘快捷键大全
Visual Studio Code键盘快捷键大全 前言导航快捷键编辑快捷键多光标快捷键终端快捷键调试快捷键文件管理快捷键Git快捷键代码格式化快捷键代码折叠快捷键工作区快捷键Markdown快捷键Zen模式快捷键窗口管理快捷键重构快捷键IntelliSense快捷键测试快捷键扩展快捷键 前言 欢迎来…...
新手学习笔记-----⽂件操作
目录 1. 为什么使⽤⽂件? 2. 什么是⽂件? 2.1 程序⽂件 2.2 数据⽂件 2.3 ⽂件名 3. ⼆进制⽂件和⽂本⽂件? 4. ⽂件的打开和关闭 4.1 流和标准流 4.1.1 流 4.1.2 标准流 4.2 ⽂件指针 4.3 ⽂件的打开和关闭 5. ⽂件的顺序读写 …...
LeetCode 251:展开二维向量
题目 Implement an iterator to flatten a 2d vector. Example: [1,2,3,4,5,6] [1,2,3,4,5,6] Follow up: As an added challenge, try to code it using only iterators in C++ or iterators in Java. 题解: 用两个index 分别记录list 的 index 和当前 list的element index. …...
练[BSidesCF 2020]Had a bad day
[BSidesCF 2020]Had a bad day 文章目录 [BSidesCF 2020]Had a bad day掌握知识解题过程关键paylaod 掌握知识 php伪协议进行文件包含,代码审计,strpos()函数会返回字符串在另一字符串中第一次出现的位置,如果没有找到则返回 FALSE&#…...
第十五章 类和对象——友元
生活中你的家有客厅(Public),有你的卧室(Private) 客厅所有来的客人都可以进去,但是你的卧室是私有的,也就是说只有你能进去 但是呢,你也可以允许你的好闺蜜好基友进去。 在程序里,有些私有属性 也想让类外特殊的一些…...
【数仓精品理论分析】能不能学大数据?
【数仓精品理论分析】能不能学大数据? 还能不能学大数据datapulse官网: 自身情况数据行业发展情况 还能不能学大数据 首先看到这个话题的时候,我是这样想的,能不能学大数据需要参考本人的自身情况【学历、年龄、决心、有没有矿或者…...
java复习-多态性
多态性 在Java中对于多态性由两种实现的模式: 方法的多态性 方法的重载:同一个方法名称可以根据传入的参数类型和个数的不同,进行不同的处理。 方法的覆写:同一个方法可能根据使用子类的不同,由不同的实现。 对象的…...
美团外卖优惠券小程序 美团优惠券微信小程序 自带流量主模式 带教程
小程序带举牌小人带菜谱流量主模式,挺多外卖小程序的,但是都没有搭建教程 搭建: 1、下载源码,去微信公众平台注册自己的账号 2、解压到桌面 3、打开微信开发者工具添加小程序-把解压的源码添加进去-appid改成自己小程序的 4、…...
编写IDEA插件,实现根据现有代码生成流程图
实现根据现有代码生成流程图的功能需要考虑以下几个步骤: 分析代码结构,获取代码中的变量声明、分支语句、循环语句等语句结构。 根据代码结构生成流程图的节点和边。 将生成的流程图展示在IDEA界面中。 下面逐一说明以上步骤的实现方法:…...
王杰国庆作业day6
服务器 #include <stdio.h> #include <string.h> #include <stdlib.h> #include <my_head.h> #define PORT 2324 //端口号 #define IP "192.168.10.107" //本机IP int main(int argc, const char *argv[]) {sqlite3* d…...
HunyuanVideo-Foley命令行教程:infer.py参数详解与批量音效生成脚本编写
HunyuanVideo-Foley命令行教程:infer.py参数详解与批量音效生成脚本编写 1. 环境准备与快速部署 在开始使用HunyuanVideo-Foley进行音效生成前,我们需要确保环境已经正确部署。本教程基于RTX 4090D 24GB显存显卡和CUDA 12.4优化环境。 1.1 镜像启动与…...
5分钟搭建专业级缠论可视化分析平台:从零到实战的完整指南
5分钟搭建专业级缠论可视化分析平台:从零到实战的完整指南 【免费下载链接】chanvis 基于TradingView本地SDK的可视化前后端代码,适用于缠论量化研究,和其他的基于几何交易的量化研究。 缠论量化 摩尔缠论 缠论可视化 TradingView TV-SDK 项…...
探索电池2RC等效电路模型:从参数辨识到SOC估计
电池2RC等效电路模型,最小二乘法参数辩识,电池端电压误差小,扩展卡尔曼估计SOC精度高。 有文档,数据,视频,仿真图。在电池研究领域,准确建模和参数估计对于理解电池行为至关重要。今天咱就唠唠电…...
vLLM-v0.17.1惊艳效果:束搜索+并行采样在长文本生成中的稳定性展示
vLLM-v0.17.1惊艳效果:束搜索并行采样在长文本生成中的稳定性展示 1. vLLM框架核心能力概览 vLLM是一个专为大型语言模型(LLM)设计的高性能推理和服务库,其最新版本v0.17.1在长文本生成稳定性方面取得了显著突破。这个开源项目最初由加州大学伯克利分校…...
选吉他不踩坑:合板、单板、全单材质深度解析,新手看懂这篇就够
对于新手来说,挑选吉他时最容易被“合板”“单板”“全单”这些专业术语绕晕。其实,这三者的核心区别在于木材的构成方式,而木材直接决定了吉他的音色、手感以及使用寿命。今天我们就抛开品牌干扰,纯科普这三种材质的底层逻辑&…...
【声音克隆】Qwen3-TTS-12Hz-1.7B-Base优化技巧:如何生成更自然、更逼真的语音
【声音克隆】Qwen3-TTS-12Hz-1.7B-Base优化技巧:如何生成更自然、更逼真的语音 1. 理解Qwen3-TTS的核心能力 1.1 多语言与方言支持 Qwen3-TTS-12Hz-1.7B-Base模型支持10种主要语言和多种方言风格,包括中文、英文、日文等。这种广泛的语言覆盖能力使其…...
OpenClaw异常处理:配置nanobot自动重试失败任务
OpenClaw异常处理:配置nanobot自动重试失败任务 1. 为什么需要自动重试机制 上周我让OpenClaw执行一个简单的夜间数据收集任务时,遇到了一个令人头疼的问题。凌晨3点,网络突然波动导致任务中断,而当我早上打开电脑时,…...
仿真模型中硅胶减震器的特征频率与谐振频率的受力分析
COMSOL仿真模型硅胶减震器减振器特征频率谐振频率受力分析仿真模型最近在研究硅胶减震器的特性,发现用COMSOL来仿真这东西还挺有意思的。硅胶减震器嘛,主要就是用来减振的,比如在一些精密仪器或者机械设备上,防止振动对设备造成损…...
Cesium使用
Cesium官网:https://cesiumjs.org 官方API文档:https://cesium.com/learn/ion-sdk/ref-doc 中文API文档:https://cesium.xin/cesium/cn/Documentation1.95 https://cesium.xin Cesium中文社区:http://cesiumcn.org …...
OpenClaw多通道管理:GLM-4.7-Flash同时对接飞书与钉钉的配置技巧
OpenClaw多通道管理:GLM-4.7-Flash同时对接飞书与钉钉的配置技巧 1. 为什么需要多通道管理? 上周我接到一个技术咨询需求:一个小型内容团队需要同时在飞书和钉钉两个平台上接收AI助手服务。他们的编辑用飞书,运营用钉钉…...
