flink处理函数--副输出功能
背景
在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出
副输出
本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出的功能,正常的记录还是从主输出中输出.代码如下:
package wikiedits.processfunc.job;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;
import wikiedits.processfunc.process.FreezingMonitor;
import wikiedits.processfunc.source.SensorSource;public class SideOutPutJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<SensorReading> readings = see.addSource(new SensorSource());SingleOutputStreamOperator<SensorReading> monitoredReadings = readings.process(new FreezingMonitor());// 打印附输出monitoredReadings.getSideOutput(new OutputTag<String>("freezing-alarms"){}).print();// 打印主输出monitoredReadings.print();see.execute();}
}package wikiedits.processfunc.process;import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;public class FreezingMonitor extends ProcessFunction<SensorReading, SensorReading> {private OutputTag<String> freezingAlarmOutput = new OutputTag<String>("freezing-alarms") {};@Overridepublic void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {if (value.temperature < 32.0) {ctx.output(freezingAlarmOutput, "freezing alarm for " + value.id + " :" + value.temperature);}out.collect(value);}}
package wikiedits.processfunc.source;/** Copyright 2015 Fabian Hueske / Vasia Kalavri** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import wikiedits.processfunc.pojo.SensorReading;import java.util.Calendar;
import java.util.Random;/*** Flink SourceFunction to generate SensorReadings with random temperature values.** Each parallel instance of the source simulates 10 sensors which emit one sensor reading every 100 ms.** Note: This is a simple data-generating source function that does not checkpoint its state.* In case of a failure, the source does not replay any data.*/
public class SensorSource extends RichParallelSourceFunction<SensorReading> {// flag indicating whether source is still runningprivate boolean running = true;/** run() continuously emits SensorReadings by emitting them through the SourceContext. */@Overridepublic void run(SourceContext<SensorReading> srcCtx) throws Exception {// initialize random number generatorRandom rand = new Random();// look up index of this parallel taskint taskIdx = this.getRuntimeContext().getIndexOfThisSubtask();// initialize sensor ids and temperaturesString[] sensorIds = new String[10];double[] curFTemp = new double[10];for (int i = 0; i < 10; i++) {sensorIds[i] = "sensor_" + (taskIdx * 10 + i);curFTemp[i] = 65 + (rand.nextGaussian() * 20);}while (running) {// get current timelong curTime = Calendar.getInstance().getTimeInMillis();// emit SensorReadingsfor (int i = 0; i < 10; i++) {// update current temperaturecurFTemp[i] += rand.nextGaussian() * 0.5;// emit readingsrcCtx.collect(new SensorReading(sensorIds[i], curTime, curFTemp[i]));}// wait for 100 msThread.sleep(3000);}}/** Cancels this SourceFunction. */@Overridepublic void cancel() {this.running = false;}
}
程序运行结果:

相关文章:
flink处理函数--副输出功能
背景 在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出 副输出 本文还是基于streaming-with-flink这本…...
Java数据结构————队列
一 、队列 在Java中,Queue是个接口,底层是通过链表实现的。 只允许在一端进行插入数据操作, 在另一端进行删除数据操作的特殊线性表, 队列具有先进先出FIFO(First In First Out) 。 入队列: 进行插入操作的一端称为…...
办公网络构建
办公网络项目背景 XX州市益智软件科技有限公司是XX市第九职业技术学校校办企业,依托学校人力技术、场地资源,面向市场独立经营、服务社会,主要从事网络设备销售、网络综合布线与网络管理。该公司现租用实训基地二层作为公司的办公经营场地…...
单层神经网络
神经网络 人工神经网络(Artificial Neural Network,ANN),简称神经网络(Neural Network,NN),是一种模仿生物神经网络的结构和功能的数学模型或计算模型。1943年,McCulloc…...
htb-cozyhosting
HTB-CozyHosting https://app.hackthebox.com/machines/CozyHosting ──(kwkl㉿kwkl)-[~] └─$ tail -l /etc/hosts …...
网络安全渗透测试工具之skipfish
网络安全渗透测试工具skipfish介绍 在数字化的时代,Web 应用程序安全成为了首要任务。想象一下,您是一位勇敢的安全冒险家,迎接着那些隐藏在 Web 应用程序中的未知风险。而在这个冒险之旅中,您需要一款强大的工具来帮助您发现漏洞,揭示弱点。而这个工具就是 Skipfish。 …...
【Rust】文件系统
目录 一、读取文件的字符串行 二、避免读取写入同一文件 三、使用内存映射随机访问文件 四、过去 24 小时内修改过的文件名 五、查找给定路径的循环 六、递归查找重名文件 七、使用给定断言递归查找所有文件 八、跳过隐藏文件遍历目录 九、在给定深度的目录࿰…...
mysql双主双从读写分离
架构图: 详细内容参考: 结果展示: 178.119.30.16(从)- master 178.119.30.17(从)- slave 由上述结果可以看出,产生了主备节点同时抢占VIP的问题(即脑裂问题)…...
postgresql-物化视图
postgresql-物化视图 物化视图创建物化视图刷新物化视图修改物化视图删除物化视图 物化视图 创建物化视图 postgresql使用create materialized view 语句创建视图 create materialized view if not exists name as query [with [NO] data];-- 创建一个包含员工统计信息的物化…...
多层神经网络和激活函数
多层神经网络的结构 多层神经网络就是由单层神经网络进行叠加之后得到的,所以就形成了层的概念,常见的多层神经网络有如下结构: 1)输入层(Input layer),众多神经元(Neuronÿ…...
Visual Studio Code键盘快捷键大全
Visual Studio Code键盘快捷键大全 前言导航快捷键编辑快捷键多光标快捷键终端快捷键调试快捷键文件管理快捷键Git快捷键代码格式化快捷键代码折叠快捷键工作区快捷键Markdown快捷键Zen模式快捷键窗口管理快捷键重构快捷键IntelliSense快捷键测试快捷键扩展快捷键 前言 欢迎来…...
新手学习笔记-----⽂件操作
目录 1. 为什么使⽤⽂件? 2. 什么是⽂件? 2.1 程序⽂件 2.2 数据⽂件 2.3 ⽂件名 3. ⼆进制⽂件和⽂本⽂件? 4. ⽂件的打开和关闭 4.1 流和标准流 4.1.1 流 4.1.2 标准流 4.2 ⽂件指针 4.3 ⽂件的打开和关闭 5. ⽂件的顺序读写 …...
LeetCode 251:展开二维向量
题目 Implement an iterator to flatten a 2d vector. Example: [1,2,3,4,5,6] [1,2,3,4,5,6] Follow up: As an added challenge, try to code it using only iterators in C++ or iterators in Java. 题解: 用两个index 分别记录list 的 index 和当前 list的element index. …...
练[BSidesCF 2020]Had a bad day
[BSidesCF 2020]Had a bad day 文章目录 [BSidesCF 2020]Had a bad day掌握知识解题过程关键paylaod 掌握知识 php伪协议进行文件包含,代码审计,strpos()函数会返回字符串在另一字符串中第一次出现的位置,如果没有找到则返回 FALSE&#…...
第十五章 类和对象——友元
生活中你的家有客厅(Public),有你的卧室(Private) 客厅所有来的客人都可以进去,但是你的卧室是私有的,也就是说只有你能进去 但是呢,你也可以允许你的好闺蜜好基友进去。 在程序里,有些私有属性 也想让类外特殊的一些…...
【数仓精品理论分析】能不能学大数据?
【数仓精品理论分析】能不能学大数据? 还能不能学大数据datapulse官网: 自身情况数据行业发展情况 还能不能学大数据 首先看到这个话题的时候,我是这样想的,能不能学大数据需要参考本人的自身情况【学历、年龄、决心、有没有矿或者…...
java复习-多态性
多态性 在Java中对于多态性由两种实现的模式: 方法的多态性 方法的重载:同一个方法名称可以根据传入的参数类型和个数的不同,进行不同的处理。 方法的覆写:同一个方法可能根据使用子类的不同,由不同的实现。 对象的…...
美团外卖优惠券小程序 美团优惠券微信小程序 自带流量主模式 带教程
小程序带举牌小人带菜谱流量主模式,挺多外卖小程序的,但是都没有搭建教程 搭建: 1、下载源码,去微信公众平台注册自己的账号 2、解压到桌面 3、打开微信开发者工具添加小程序-把解压的源码添加进去-appid改成自己小程序的 4、…...
编写IDEA插件,实现根据现有代码生成流程图
实现根据现有代码生成流程图的功能需要考虑以下几个步骤: 分析代码结构,获取代码中的变量声明、分支语句、循环语句等语句结构。 根据代码结构生成流程图的节点和边。 将生成的流程图展示在IDEA界面中。 下面逐一说明以上步骤的实现方法:…...
王杰国庆作业day6
服务器 #include <stdio.h> #include <string.h> #include <stdlib.h> #include <my_head.h> #define PORT 2324 //端口号 #define IP "192.168.10.107" //本机IP int main(int argc, const char *argv[]) {sqlite3* d…...
AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...
线程与协程
1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指:像函数调用/返回一样轻量地完成任务切换。 举例说明: 当你在程序中写一个函数调用: funcA() 然后 funcA 执行完后返回&…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...
vue3 定时器-定义全局方法 vue+ts
1.创建ts文件 路径:src/utils/timer.ts 完整代码: import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...
Unit 1 深度强化学习简介
Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...
3403. 从盒子中找出字典序最大的字符串 I
3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...
SpringCloudGateway 自定义局部过滤器
场景: 将所有请求转化为同一路径请求(方便穿网配置)在请求头内标识原来路径,然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...
