工业—使用Flink处理Kafka中的数据_ChangeRecord2

使用 Flink 消费 Kafka 中 ChangeRecord 主题的数据,每隔 1 分钟输出最近 3 分钟的预警次数最多的 设备,将结果存入Redis 中, key 值为 “warning_last3min_everymin_out” , value 值为 “ 窗口结束时间,设备id” (窗口结束时间格式: yyyy-MM-dd HH:mm:ss )。使用 redis cli 以 HGETALL key方式获取 warning_last3min_everymin_out值。注:时间语义使用 Processing Time 。
-
Kafka Source
- 从 Kafka 中读取实时的设备预警数据,数据内容应当包括设备 ID 和预警状态等信息。
- 数据通过
SimpleStringSchema反序列化为字符串格式,再由parseMessage进行解析和提取。
-
流处理与窗口
- Flink 使用滑动时间窗口 (
SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) 来计算每 1 分钟内过去 3 分钟内的设备预警数据。 - 这意味着每 1 分钟计算一次,在每次计算中,会考虑过去 3 分钟内的数据,因此具有滑动窗口的特点。
- Flink 使用滑动时间窗口 (
-
窗口函数
- 在
MaxNumWarnMachineID中,窗口内的数据按设备 ID 分组,统计每个设备的预警次数,并选出预警次数最多的设备 ID。 apply方法处理窗口内的数据后,输出一个包含时间戳(窗口结束时间)和设备 ID 的元组。
- 在
-
Redis Sink
- 计算后的每个时间窗口的最大预警设备 ID 将通过 Redis Sink 写入 Redis,数据结构为
HSET。 - Redis 中的键为
warning_last3min_everymin_out,值为设备 ID。
- 计算后的每个时间窗口的最大预警设备 ID 将通过 Redis Sink 写入 Redis,数据结构为
package flink.calculate.ChangeRecordimport org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.{KafkaSource, KafkaSourceBuilder}
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable// 定义常量
object Constants {val TOPIC_NAME = "ChangeRecord"val BOOTSTRAP_SERVERS = "192.168.222.101:9092,192.168.222.102:9092,192.168.222.103:9092"val REDIS_HOST = "192.168.222.101"
}// 主程序逻辑
object WarningLast3MinEveryMinOut {def main(args: Array[String]): Unit = {// 创建流执行环境并配置val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1) // 设置作业并行度// 构建Kafka数据源val kafkaSource = buildKafkaSource()// 从Kafka读取数据并处理val dataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), Constants.TOPIC_NAME).map(parseMessage) // 解析消息为 (标识符, 设备ID, 状态).filter(_._3 == "预警") // 过滤非预警状态的数据.keyBy(_._1) // 按标识符分组.windowAll(SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) // 滑动窗口.apply(new MaxNumWarnMachineID) // 应用窗口函数计算每分钟内过去3分钟的最多预警设备// 输出到控制台和RedisdataStream.print("Result =>")dataStream.addSink(buildRedisSink())// 执行Flink作业env.execute("WarningLast3MinEveryMinOut Job")}// 构建Kafka数据源private def buildKafkaSource(): KafkaSource[String] = {KafkaSource.builder[String]().setTopics(Constants.TOPIC_NAME).setBootstrapServers(Constants.BOOTSTRAP_SERVERS).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build()}// 解析来自Kafka的消息为元组private def parseMessage(message: String): (String, String, String) = {val fields = message.split(",")("warning_last3min_everymin_out", fields(1), fields(3))}// 构建Redis Sinkprivate def buildRedisSink(): ConnRedis.RedisSink[(String, String)] = {new ConnRedis(Constants.REDIS_HOST, 6379).getRedisSink(new Last3MinRedisMapper)}
}// 预警设备计数窗口函数
class MaxNumWarnMachineID extends AllWindowFunction[(String, String, String), (String, String), TimeWindow] {override def apply(window: TimeWindow, input: Iterable[(String, String, String)], out: Collector[(String, String)]): Unit = {// 统计每个设备ID的预警次数val machineCounts = input.groupBy(_._2).view.mapValues(_.size)// 获取窗口结束时间val windowEndTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(window.getEnd))// 获取预警次数最多的设备IDif (machineCounts.nonEmpty) {val maxMachineId = machineCounts.maxBy(_._2)._1out.collect((windowEndTime, maxMachineId))}}
}// Redis映射器
private class Last3MinRedisMapper extends RedisMapper[(String, String)] {override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "warning_last3min_everymin_out")override def getKeyFromData(data: (String, String)): String = data._1override def getValueFromData(data: (String, String)): String = data._2
}
相关文章:
工业—使用Flink处理Kafka中的数据_ChangeRecord2
使用 Flink 消费 Kafka 中 ChangeRecord 主题的数据,每隔 1 分钟输出最近 3 分钟的预警次数最多的 设备,将结果存入Redis 中, key 值为 “warning_last3min_everymin_out” , value 值为 “ 窗口结束时间,设备id” &am…...
【Java-数据结构篇】Java 中栈和队列:构建程序逻辑的关键数据结构基石
我的个人主页 我的专栏:Java-数据结构,希望能帮助到大家!!!点赞❤ 收藏❤ 一、引言 1. 栈与队列在编程中的角色定位 栈和队列作为两种基本的数据结构,在众多编程场景中都有着独特的地位。它们为数据的有序…...
工业—使用Flink处理Kafka中的数据_ProduceRecord1
1 、 使用 Flink 消费 Kafka 中 ProduceRecord 主题的数据,统计在已经检验的产品中,各设备每 5 分钟 生产产品总数,将结果存入Redis 中, key 值为 “totalproduce” , value 值为 “ 设备 id ,最近五分钟生…...
探索CSS版心布局:构建现代网页的黄金比例
探索CSS版心布局:构建现代网页的黄金比例 在网页设计中,版心(或称为内容区域)是页面的核心部分,通常用于放置主要内容。使用CSS3的新特性,可以创建更加灵活和响应式的版心布局。本文将详细介绍如何使用CSS…...
华为NPU服务器昇腾Ascend 910B2部署通义千问Qwen2.5——基于mindie镜像一路试错版(三)
文章目录 前言纯模型推理启动服务后面干什么?这可咋整啊?愁死了!总结前言 这是咱这个系列的第三个文章了。 毕竟,这是我好几天摸索出的经验,能帮助各位在几个小时内领会,我觉得也算是我的功劳一件了。 所以,一是希望大家耐心看下去,耐心操作下去;而是恳请各位多多关…...
详解Java数据库编程之JDBC
目录 首先创建一个Java项目 在Maven中央仓库下载mysql connector的jar包 针对MySQL版本5 针对MySQL版本8 下载之后,在IDEA中创建的项目中建立一个lib目录,然后把刚刚下载好的jar包拷贝进去,然后右键刚刚添加的jar包,点击‘添…...
基于MFC实现的人机对战五子棋游戏
基于MFC实现的人机对战五子棋游戏 1、引言 此报告将详细介绍本次课程设计的动机、设计思路及编写技术的详细过程,展现我所学过的C知识以及我通过本次课程设计所学到例如MFC等知识。在文档最后我也会记录我所编写过程遇到的问题以及解决方案。 1.1 背景 五子棋是…...
AIGC 时代的文学:变革与坚守
目录 一.AIGC 带来的文学变革 1.创作方式的改变 2.阅读体验的升级 3.文学市场的重塑 二.文学在 AIGC 时代的坚守 1.人类情感的表达 2.文学的艺术性 3.文学的社会责任 三.AIGC 与人类作家的共生之路 1.相互学习 2.合作创作 3.共同发展 另: 总结 随着人…...
InfluxDB 集成 Grafana
将InfluxDB集成到Grafana进行详细配置通常包括以下几个步骤:安装与配置InfluxDB、安装与配置Grafana、在Grafana中添加InfluxDB数据源以及创建和配置仪表板。以下是一个详细的配置指南: 一、安装与配置InfluxDB 下载与安装: 从InfluxDB的官…...
笔记本电脑usb接口没反应怎么办?原因及解决方法
笔记本电脑的USB接口是我们日常使用中非常频繁的一个功能,无论是数据传输、充电还是外接设备,都离不开它。然而,当USB接口突然没有反应时,这无疑会给我们的工作和学习带来不小的困扰。下面,我们就来探讨一下笔记本USB接…...
【开源】A060-基于Spring Boot的游戏交易系统的设计与实现
🙊作者简介:在校研究生,拥有计算机专业的研究生开发团队,分享技术代码帮助学生学习,独立完成自己的网站项目。 代码可以查看项目链接获取⬇️,记得注明来意哦~🌹 赠送计算机毕业设计600个选题ex…...
static关键字在嵌入式C编程中的应用
目录 一、控制变量的存储周期和可见性 1.1. 局部静态变量 1.2. 全局静态变量 二、控制函数的可见性 2.1. 静态函数 2.2. 代码示例(假设有两个文件:file1.c和file2.c) 三、应用场景 3.1. 存储常用数据 3.2. 实现内部辅助函数 四、注…...
集合框架(1)
集合框架(1) 1、数组的特点与弊端 (1)特点: 数组初始化以后,长度就确定了。数组中的添加的元素是依次紧密排列的,有序的,可以重复的。数组声明的类型,就决定了进行元素初…...
Java 基础之泛型:类型安全的保障与灵活运用
在 Java 编程的世界里,泛型是一个至关重要且非常实用的特性。它在 Java 5 中被引入,从根本上改变了我们处理数据类型的方式,提供了更强的类型安全保障,同时也增加了代码的复用性和可读性。 一、什么是泛型 泛型(Gener…...
开发者如何使用GCC提升开发效率Opencv操作
看此篇前请先阅读 https://blog.csdn.net/qq_20330595/article/details/144134160?spm=1001.2014.3001.5502 https://blog.csdn.net/qq_20330595/article/details/144134160?spm=1001.2014.3001.5502 https://blog.csdn.net/qq_20330595/article/details/144216351?spm=1001…...
矩阵加法
矩阵加法 C语言代码C 语言代码Java语言代码Python语言代码 💐The Begin💐点点关注,收藏不迷路💐 输入两个n行m列的矩阵A和B,输出它们的和AB。 输入 第一行包含两个整数n和m,表示矩阵的行数和列数。1 <…...
yarn : 无法加载文件 E:\node\node_global\yarn.ps1,因为在此系统上禁止运行脚本
先确保安装了yarn —— npm install -g yarn 终端输入set-ExecutionPolicy RemoteSigned 若要在本地计算机上运行您编写的未签名脚本和来自其他用户的签名脚本,请使用以下命令将计算机上的执行策略更改为RemoteSigned 再去使用yarn okk~...
详解C++类与对象(四)
文章目录 1.类型转换1.1 前言1.2 类型转换的性质 2.static成员2.1 前言2.2 static的基本概念 3.友元4.内部类5.匿名对象 1.类型转换 1.1 前言 在C中,由于程序员可以自己显示定义一个新的类。这样就会出现一个问题:程序员自己显示定义的类类型与编译器中…...
Pandas处理和分析嵌套JSON数据:从字符串到结构化DataFrame
在数据分析领域,我们经常遇到需要从非结构化数据中提取有用信息的场景。特别是当数据以JSON字符串的形式出现时,如何有效地将其转换为结构化的表格形式,以便进行进一步的分析和处理,成为了一个常见的挑战。本文将通过一个具体的例…...
【强化学习入门笔记】1.5 贝尔曼最优公式
本系列为学习赵世钰老师的《强化学习的数学原理》所作的学习笔记. 课程视频网址:https://space.bilibili.com/2044042934 1.5.1 定义 1.5.1.1 Contraction mapping theorem (收缩映射定理) fixed point(不动点) 如果 x ∗ x^* x∗满足下式, x ∗ x^* x∗称之为…...
无网络环境方案:OpenClaw离线运行千问3.5-9B
无网络环境方案:OpenClaw离线运行千问3.5-9B 1. 为什么需要离线运行方案 在金融、医疗等对数据安全要求极高的场景中,我们常常需要完全离线的AI解决方案。去年我在为某研究机构设计自动化文档处理系统时,就遇到了这样的需求——他们要求所有…...
影墨·今颜多模态应用:结合文本与图像输入的进阶生成案例
影墨今颜多模态应用:结合文本与图像输入的进阶生成案例 最近在玩一个挺有意思的模型,叫影墨今颜。它最吸引我的地方,不是单纯的文生图或者图生图,而是能把文字和图片“揉”在一起,生成一些意想不到的新东西。这感觉就…...
GSS引擎的未来发展:约束式布局在Web开发中的趋势
GSS引擎的未来发展:约束式布局在Web开发中的趋势 【免费下载链接】engine GSS engine 项目地址: https://gitcode.com/gh_mirrors/engi/engine GSS(Grid Style Sheet)引擎作为约束式布局在Web开发中的革命性解决方案,正在重…...
kys-cpp代码规范与最佳实践:如何编写高质量的C++游戏代码
kys-cpp代码规范与最佳实践:如何编写高质量的C游戏代码 【免费下载链接】kys-cpp 《金庸群侠传》c复刻版,已完工 项目地址: https://gitcode.com/gh_mirrors/ky/kys-cpp kys-cpp作为《金庸群侠传》的C复刻版项目,其代码质量直接影响游…...
5个高效乐谱资源获取技巧:音乐爱好者的MuseScore下载指南
5个高效乐谱资源获取技巧:音乐爱好者的MuseScore下载指南 【免费下载链接】dl-librescore Download sheet music 项目地址: https://gitcode.com/gh_mirrors/dl/dl-librescore 在数字音乐时代,获取高质量乐谱资源往往面临格式限制、下载门槛等问题…...
FireRedASR-AED-L模型Node.js后端调用实战:构建高并发语音处理API
FireRedASR-AED-L模型Node.js后端调用实战:构建高并发语音处理API 语音转文字的需求现在越来越普遍,从会议记录到客服录音分析,到处都能用上。如果你手头有一个像FireRedASR-AED-L这样强大的语音识别模型服务,怎么把它集成到你的…...
GLM-4-9B-Chat-1M多语言法律文书生成:中英双语合同条款自动起草
GLM-4-9B-Chat-1M多语言法律文书生成:中英双语合同条款自动起草 1. 项目简介与核心价值 法律文书起草是法律工作中的重要环节,但传统方式耗时耗力且容易出错。GLM-4-9B-Chat-1M模型的出现,为法律文书生成带来了全新的解决方案。 这个基于v…...
3步实现窗口置顶:AlwaysOnTop让重要内容不再“失踪“
3步实现窗口置顶:AlwaysOnTop让重要内容不再"失踪" 【免费下载链接】AlwaysOnTop Make a Windows application always run on top 项目地址: https://gitcode.com/gh_mirrors/al/AlwaysOnTop 在多任务处理时,你是否经常需要在多个窗口间…...
终极指南:5步彻底解决显卡驱动残留问题
终极指南:5步彻底解决显卡驱动残留问题 【免费下载链接】display-drivers-uninstaller Display Driver Uninstaller (DDU) a driver removal utility / cleaner utility 项目地址: https://gitcode.com/gh_mirrors/di/display-drivers-uninstaller 你是否曾经…...
StructBERT模型监控方案:性能与质量实时追踪
StructBERT模型监控方案:性能与质量实时追踪 1. 引言 当你把StructBERT模型部署到生产环境后,最担心的是什么?是服务突然崩溃,还是响应速度变慢,或者是模型预测质量下降?这些问题如果等到用户投诉才发现&…...
