flink处理函数--副输出功能
背景
在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出
副输出
本文还是基于streaming-with-flink这本书的例子作为演示,它实现一个把温度低于32度的记录输出到副输出的功能,正常的记录还是从主输出中输出.代码如下:
package wikiedits.processfunc.job;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;
import wikiedits.processfunc.process.FreezingMonitor;
import wikiedits.processfunc.source.SensorSource;public class SideOutPutJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<SensorReading> readings = see.addSource(new SensorSource());SingleOutputStreamOperator<SensorReading> monitoredReadings = readings.process(new FreezingMonitor());// 打印附输出monitoredReadings.getSideOutput(new OutputTag<String>("freezing-alarms"){}).print();// 打印主输出monitoredReadings.print();see.execute();}
}package wikiedits.processfunc.process;import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import wikiedits.processfunc.pojo.SensorReading;public class FreezingMonitor extends ProcessFunction<SensorReading, SensorReading> {private OutputTag<String> freezingAlarmOutput = new OutputTag<String>("freezing-alarms") {};@Overridepublic void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {if (value.temperature < 32.0) {ctx.output(freezingAlarmOutput, "freezing alarm for " + value.id + " :" + value.temperature);}out.collect(value);}}
package wikiedits.processfunc.source;/** Copyright 2015 Fabian Hueske / Vasia Kalavri** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import wikiedits.processfunc.pojo.SensorReading;import java.util.Calendar;
import java.util.Random;/*** Flink SourceFunction to generate SensorReadings with random temperature values.** Each parallel instance of the source simulates 10 sensors which emit one sensor reading every 100 ms.** Note: This is a simple data-generating source function that does not checkpoint its state.* In case of a failure, the source does not replay any data.*/
public class SensorSource extends RichParallelSourceFunction<SensorReading> {// flag indicating whether source is still runningprivate boolean running = true;/** run() continuously emits SensorReadings by emitting them through the SourceContext. */@Overridepublic void run(SourceContext<SensorReading> srcCtx) throws Exception {// initialize random number generatorRandom rand = new Random();// look up index of this parallel taskint taskIdx = this.getRuntimeContext().getIndexOfThisSubtask();// initialize sensor ids and temperaturesString[] sensorIds = new String[10];double[] curFTemp = new double[10];for (int i = 0; i < 10; i++) {sensorIds[i] = "sensor_" + (taskIdx * 10 + i);curFTemp[i] = 65 + (rand.nextGaussian() * 20);}while (running) {// get current timelong curTime = Calendar.getInstance().getTimeInMillis();// emit SensorReadingsfor (int i = 0; i < 10; i++) {// update current temperaturecurFTemp[i] += rand.nextGaussian() * 0.5;// emit readingsrcCtx.collect(new SensorReading(sensorIds[i], curTime, curFTemp[i]));}// wait for 100 msThread.sleep(3000);}}/** Cancels this SourceFunction. */@Overridepublic void cancel() {this.running = false;}
}
程序运行结果:
相关文章:

flink处理函数--副输出功能
背景 在flink中,如果你想要访问记录的处理时间或者事件时间,注册定时器,或者是将记录输出到多个输出流中,你都需要处理函数的帮助,本文就来通过一个例子来讲解下副输出 副输出 本文还是基于streaming-with-flink这本…...

Java数据结构————队列
一 、队列 在Java中,Queue是个接口,底层是通过链表实现的。 只允许在一端进行插入数据操作, 在另一端进行删除数据操作的特殊线性表, 队列具有先进先出FIFO(First In First Out) 。 入队列: 进行插入操作的一端称为…...

办公网络构建
办公网络项目背景 XX州市益智软件科技有限公司是XX市第九职业技术学校校办企业,依托学校人力技术、场地资源,面向市场独立经营、服务社会,主要从事网络设备销售、网络综合布线与网络管理。该公司现租用实训基地二层作为公司的办公经营场地…...

单层神经网络
神经网络 人工神经网络(Artificial Neural Network,ANN),简称神经网络(Neural Network,NN),是一种模仿生物神经网络的结构和功能的数学模型或计算模型。1943年,McCulloc…...

htb-cozyhosting
HTB-CozyHosting https://app.hackthebox.com/machines/CozyHosting ──(kwkl㉿kwkl)-[~] └─$ tail -l /etc/hosts …...

网络安全渗透测试工具之skipfish
网络安全渗透测试工具skipfish介绍 在数字化的时代,Web 应用程序安全成为了首要任务。想象一下,您是一位勇敢的安全冒险家,迎接着那些隐藏在 Web 应用程序中的未知风险。而在这个冒险之旅中,您需要一款强大的工具来帮助您发现漏洞,揭示弱点。而这个工具就是 Skipfish。 …...
【Rust】文件系统
目录 一、读取文件的字符串行 二、避免读取写入同一文件 三、使用内存映射随机访问文件 四、过去 24 小时内修改过的文件名 五、查找给定路径的循环 六、递归查找重名文件 七、使用给定断言递归查找所有文件 八、跳过隐藏文件遍历目录 九、在给定深度的目录࿰…...

mysql双主双从读写分离
架构图: 详细内容参考: 结果展示: 178.119.30.16(从)- master 178.119.30.17(从)- slave 由上述结果可以看出,产生了主备节点同时抢占VIP的问题(即脑裂问题)…...

postgresql-物化视图
postgresql-物化视图 物化视图创建物化视图刷新物化视图修改物化视图删除物化视图 物化视图 创建物化视图 postgresql使用create materialized view 语句创建视图 create materialized view if not exists name as query [with [NO] data];-- 创建一个包含员工统计信息的物化…...

多层神经网络和激活函数
多层神经网络的结构 多层神经网络就是由单层神经网络进行叠加之后得到的,所以就形成了层的概念,常见的多层神经网络有如下结构: 1)输入层(Input layer),众多神经元(Neuronÿ…...
Visual Studio Code键盘快捷键大全
Visual Studio Code键盘快捷键大全 前言导航快捷键编辑快捷键多光标快捷键终端快捷键调试快捷键文件管理快捷键Git快捷键代码格式化快捷键代码折叠快捷键工作区快捷键Markdown快捷键Zen模式快捷键窗口管理快捷键重构快捷键IntelliSense快捷键测试快捷键扩展快捷键 前言 欢迎来…...

新手学习笔记-----⽂件操作
目录 1. 为什么使⽤⽂件? 2. 什么是⽂件? 2.1 程序⽂件 2.2 数据⽂件 2.3 ⽂件名 3. ⼆进制⽂件和⽂本⽂件? 4. ⽂件的打开和关闭 4.1 流和标准流 4.1.1 流 4.1.2 标准流 4.2 ⽂件指针 4.3 ⽂件的打开和关闭 5. ⽂件的顺序读写 …...
LeetCode 251:展开二维向量
题目 Implement an iterator to flatten a 2d vector. Example: [1,2,3,4,5,6] [1,2,3,4,5,6] Follow up: As an added challenge, try to code it using only iterators in C++ or iterators in Java. 题解: 用两个index 分别记录list 的 index 和当前 list的element index. …...

练[BSidesCF 2020]Had a bad day
[BSidesCF 2020]Had a bad day 文章目录 [BSidesCF 2020]Had a bad day掌握知识解题过程关键paylaod 掌握知识 php伪协议进行文件包含,代码审计,strpos()函数会返回字符串在另一字符串中第一次出现的位置,如果没有找到则返回 FALSE&#…...

第十五章 类和对象——友元
生活中你的家有客厅(Public),有你的卧室(Private) 客厅所有来的客人都可以进去,但是你的卧室是私有的,也就是说只有你能进去 但是呢,你也可以允许你的好闺蜜好基友进去。 在程序里,有些私有属性 也想让类外特殊的一些…...

【数仓精品理论分析】能不能学大数据?
【数仓精品理论分析】能不能学大数据? 还能不能学大数据datapulse官网: 自身情况数据行业发展情况 还能不能学大数据 首先看到这个话题的时候,我是这样想的,能不能学大数据需要参考本人的自身情况【学历、年龄、决心、有没有矿或者…...
java复习-多态性
多态性 在Java中对于多态性由两种实现的模式: 方法的多态性 方法的重载:同一个方法名称可以根据传入的参数类型和个数的不同,进行不同的处理。 方法的覆写:同一个方法可能根据使用子类的不同,由不同的实现。 对象的…...

美团外卖优惠券小程序 美团优惠券微信小程序 自带流量主模式 带教程
小程序带举牌小人带菜谱流量主模式,挺多外卖小程序的,但是都没有搭建教程 搭建: 1、下载源码,去微信公众平台注册自己的账号 2、解压到桌面 3、打开微信开发者工具添加小程序-把解压的源码添加进去-appid改成自己小程序的 4、…...
编写IDEA插件,实现根据现有代码生成流程图
实现根据现有代码生成流程图的功能需要考虑以下几个步骤: 分析代码结构,获取代码中的变量声明、分支语句、循环语句等语句结构。 根据代码结构生成流程图的节点和边。 将生成的流程图展示在IDEA界面中。 下面逐一说明以上步骤的实现方法:…...

王杰国庆作业day6
服务器 #include <stdio.h> #include <string.h> #include <stdlib.h> #include <my_head.h> #define PORT 2324 //端口号 #define IP "192.168.10.107" //本机IP int main(int argc, const char *argv[]) {sqlite3* d…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘
美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...

MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)
文章目录 1.什么是Redis?2.为什么要使用redis作为mysql的缓存?3.什么是缓存雪崩、缓存穿透、缓存击穿?3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂
蛋白质结合剂(如抗体、抑制肽)在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上,高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术,但这类方法普遍面临资源消耗巨大、研发周期冗长…...
postgresql|数据库|只读用户的创建和删除(备忘)
CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...

SpringCloudGateway 自定义局部过滤器
场景: 将所有请求转化为同一路径请求(方便穿网配置)在请求头内标识原来路径,然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...

HDFS分布式存储 zookeeper
hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架,允许使用简单的变成模型跨计算机对大型集群进行分布式处理(1.海量的数据存储 2.海量数据的计算)Hadoop核心组件 hdfs(分布式文件存储系统)&a…...