当前位置: 首页 > news >正文

工业—使用Flink处理Kafka中的数据_ChangeRecord2

使用 Flink 消费 Kafka ChangeRecord 主题的数据,每隔 1 分钟输出最近 3 分钟的预警次数最多的 设备,将结果存入Redis 中, key 值为 “warning_last3min_everymin_out” value 值为 窗口结束时间,设备id” (窗口结束时间格式: yyyy-MM-dd HH:mm:ss )。使用 redis cli HGETALL key方式获取 warning_last3min_everymin_out值。
注:时间语义使用 Processing Time
  1. Kafka Source

    • 从 Kafka 中读取实时的设备预警数据,数据内容应当包括设备 ID 和预警状态等信息。
    • 数据通过 SimpleStringSchema 反序列化为字符串格式,再由 parseMessage 进行解析和提取。
  2. 流处理与窗口

    • Flink 使用滑动时间窗口 (SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) 来计算每 1 分钟内过去 3 分钟内的设备预警数据。
    • 这意味着每 1 分钟计算一次,在每次计算中,会考虑过去 3 分钟内的数据,因此具有滑动窗口的特点。
  3. 窗口函数

    • 在 MaxNumWarnMachineID 中,窗口内的数据按设备 ID 分组,统计每个设备的预警次数,并选出预警次数最多的设备 ID。
    • apply 方法处理窗口内的数据后,输出一个包含时间戳(窗口结束时间)和设备 ID 的元组。
  4. Redis Sink

    • 计算后的每个时间窗口的最大预警设备 ID 将通过 Redis Sink 写入 Redis,数据结构为 HSET
    • Redis 中的键为 warning_last3min_everymin_out,值为设备 ID。

 

package flink.calculate.ChangeRecordimport org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.{KafkaSource, KafkaSourceBuilder}
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
import org.apache.flink.util.Collector
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable// 定义常量
object Constants {val TOPIC_NAME = "ChangeRecord"val BOOTSTRAP_SERVERS = "192.168.222.101:9092,192.168.222.102:9092,192.168.222.103:9092"val REDIS_HOST = "192.168.222.101"
}// 主程序逻辑
object WarningLast3MinEveryMinOut {def main(args: Array[String]): Unit = {// 创建流执行环境并配置val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1) // 设置作业并行度// 构建Kafka数据源val kafkaSource = buildKafkaSource()// 从Kafka读取数据并处理val dataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), Constants.TOPIC_NAME).map(parseMessage) // 解析消息为 (标识符, 设备ID, 状态).filter(_._3 == "预警") // 过滤非预警状态的数据.keyBy(_._1) // 按标识符分组.windowAll(SlidingProcessingTimeWindows.of(Time.minutes(3), Time.minutes(1))) // 滑动窗口.apply(new MaxNumWarnMachineID) // 应用窗口函数计算每分钟内过去3分钟的最多预警设备// 输出到控制台和RedisdataStream.print("Result =>")dataStream.addSink(buildRedisSink())// 执行Flink作业env.execute("WarningLast3MinEveryMinOut Job")}// 构建Kafka数据源private def buildKafkaSource(): KafkaSource[String] = {KafkaSource.builder[String]().setTopics(Constants.TOPIC_NAME).setBootstrapServers(Constants.BOOTSTRAP_SERVERS).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build()}// 解析来自Kafka的消息为元组private def parseMessage(message: String): (String, String, String) = {val fields = message.split(",")("warning_last3min_everymin_out", fields(1), fields(3))}// 构建Redis Sinkprivate def buildRedisSink(): ConnRedis.RedisSink[(String, String)] = {new ConnRedis(Constants.REDIS_HOST, 6379).getRedisSink(new Last3MinRedisMapper)}
}// 预警设备计数窗口函数
class MaxNumWarnMachineID extends AllWindowFunction[(String, String, String), (String, String), TimeWindow] {override def apply(window: TimeWindow, input: Iterable[(String, String, String)], out: Collector[(String, String)]): Unit = {// 统计每个设备ID的预警次数val machineCounts = input.groupBy(_._2).view.mapValues(_.size)// 获取窗口结束时间val windowEndTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(window.getEnd))// 获取预警次数最多的设备IDif (machineCounts.nonEmpty) {val maxMachineId = machineCounts.maxBy(_._2)._1out.collect((windowEndTime, maxMachineId))}}
}// Redis映射器
private class Last3MinRedisMapper extends RedisMapper[(String, String)] {override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "warning_last3min_everymin_out")override def getKeyFromData(data: (String, String)): String = data._1override def getValueFromData(data: (String, String)): String = data._2
}

 

相关文章:

工业—使用Flink处理Kafka中的数据_ChangeRecord2

使用 Flink 消费 Kafka 中 ChangeRecord 主题的数据,每隔 1 分钟输出最近 3 分钟的预警次数最多的 设备,将结果存入Redis 中, key 值为 “warning_last3min_everymin_out” , value 值为 “ 窗口结束时间,设备id” &am…...

【Java-数据结构篇】Java 中栈和队列:构建程序逻辑的关键数据结构基石

我的个人主页 我的专栏:Java-数据结构,希望能帮助到大家!!!点赞❤ 收藏❤ 一、引言 1. 栈与队列在编程中的角色定位 栈和队列作为两种基本的数据结构,在众多编程场景中都有着独特的地位。它们为数据的有序…...

工业—使用Flink处理Kafka中的数据_ProduceRecord1

1 、 使用 Flink 消费 Kafka 中 ProduceRecord 主题的数据,统计在已经检验的产品中,各设备每 5 分钟 生产产品总数,将结果存入Redis 中, key 值为 “totalproduce” , value 值为 “ 设备 id ,最近五分钟生…...

探索CSS版心布局:构建现代网页的黄金比例

探索CSS版心布局:构建现代网页的黄金比例 在网页设计中,版心(或称为内容区域)是页面的核心部分,通常用于放置主要内容。使用CSS3的新特性,可以创建更加灵活和响应式的版心布局。本文将详细介绍如何使用CSS…...

华为NPU服务器昇腾Ascend 910B2部署通义千问Qwen2.5——基于mindie镜像一路试错版(三)

文章目录 前言纯模型推理启动服务后面干什么?这可咋整啊?愁死了!总结前言 这是咱这个系列的第三个文章了。 毕竟,这是我好几天摸索出的经验,能帮助各位在几个小时内领会,我觉得也算是我的功劳一件了。 所以,一是希望大家耐心看下去,耐心操作下去;而是恳请各位多多关…...

详解Java数据库编程之JDBC

目录 首先创建一个Java项目 在Maven中央仓库下载mysql connector的jar包 针对MySQL版本5 针对MySQL版本8 下载之后,在IDEA中创建的项目中建立一个lib目录,然后把刚刚下载好的jar包拷贝进去,然后右键刚刚添加的jar包,点击‘添…...

基于MFC实现的人机对战五子棋游戏

基于MFC实现的人机对战五子棋游戏 1、引言 此报告将详细介绍本次课程设计的动机、设计思路及编写技术的详细过程,展现我所学过的C知识以及我通过本次课程设计所学到例如MFC等知识。在文档最后我也会记录我所编写过程遇到的问题以及解决方案。 1.1 背景 五子棋是…...

AIGC 时代的文学:变革与坚守

目录 一.AIGC 带来的文学变革 1.创作方式的改变 2.阅读体验的升级 3.文学市场的重塑 二.文学在 AIGC 时代的坚守 1.人类情感的表达 2.文学的艺术性 3.文学的社会责任 三.AIGC 与人类作家的共生之路 1.相互学习 2.合作创作 3.共同发展 另: 总结 随着人…...

InfluxDB 集成 Grafana

将InfluxDB集成到Grafana进行详细配置通常包括以下几个步骤:安装与配置InfluxDB、安装与配置Grafana、在Grafana中添加InfluxDB数据源以及创建和配置仪表板。以下是一个详细的配置指南: 一、安装与配置InfluxDB 下载与安装: 从InfluxDB的官…...

笔记本电脑usb接口没反应怎么办?原因及解决方法

笔记本电脑的USB接口是我们日常使用中非常频繁的一个功能,无论是数据传输、充电还是外接设备,都离不开它。然而,当USB接口突然没有反应时,这无疑会给我们的工作和学习带来不小的困扰。下面,我们就来探讨一下笔记本USB接…...

【开源】A060-基于Spring Boot的游戏交易系统的设计与实现

🙊作者简介:在校研究生,拥有计算机专业的研究生开发团队,分享技术代码帮助学生学习,独立完成自己的网站项目。 代码可以查看项目链接获取⬇️,记得注明来意哦~🌹 赠送计算机毕业设计600个选题ex…...

static关键字在嵌入式C编程中的应用

目录 一、控制变量的存储周期和可见性 1.1. 局部静态变量 1.2. 全局静态变量 二、控制函数的可见性 2.1. 静态函数 2.2. 代码示例(假设有两个文件:file1.c和file2.c) 三、应用场景 3.1. 存储常用数据 3.2. 实现内部辅助函数 四、注…...

集合框架(1)

集合框架(1) 1、数组的特点与弊端 (1)特点: 数组初始化以后,长度就确定了。数组中的添加的元素是依次紧密排列的,有序的,可以重复的。数组声明的类型,就决定了进行元素初…...

Java 基础之泛型:类型安全的保障与灵活运用

在 Java 编程的世界里,泛型是一个至关重要且非常实用的特性。它在 Java 5 中被引入,从根本上改变了我们处理数据类型的方式,提供了更强的类型安全保障,同时也增加了代码的复用性和可读性。 一、什么是泛型 泛型(Gener…...

开发者如何使用GCC提升开发效率Opencv操作

看此篇前请先阅读 https://blog.csdn.net/qq_20330595/article/details/144134160?spm=1001.2014.3001.5502 https://blog.csdn.net/qq_20330595/article/details/144134160?spm=1001.2014.3001.5502 https://blog.csdn.net/qq_20330595/article/details/144216351?spm=1001…...

矩阵加法        ‌‍‎‏

矩阵加法 C语言代码C 语言代码Java语言代码Python语言代码 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 输入两个n行m列的矩阵A和B&#xff0c;输出它们的和AB。 输入 第一行包含两个整数n和m&#xff0c;表示矩阵的行数和列数。1 <…...

yarn : 无法加载文件 E:\node\node_global\yarn.ps1,因为在此系统上禁止运行脚本

先确保安装了yarn —— npm install -g yarn 终端输入set-ExecutionPolicy RemoteSigned 若要在本地计算机上运行您编写的未签名脚本和来自其他用户的签名脚本&#xff0c;请使用以下命令将计算机上的执行策略更改为RemoteSigned 再去使用yarn okk~...

详解C++类与对象(四)

文章目录 1.类型转换1.1 前言1.2 类型转换的性质 2.static成员2.1 前言2.2 static的基本概念 3.友元4.内部类5.匿名对象 1.类型转换 1.1 前言 在C中&#xff0c;由于程序员可以自己显示定义一个新的类。这样就会出现一个问题&#xff1a;程序员自己显示定义的类类型与编译器中…...

Pandas处理和分析嵌套JSON数据:从字符串到结构化DataFrame

在数据分析领域&#xff0c;我们经常遇到需要从非结构化数据中提取有用信息的场景。特别是当数据以JSON字符串的形式出现时&#xff0c;如何有效地将其转换为结构化的表格形式&#xff0c;以便进行进一步的分析和处理&#xff0c;成为了一个常见的挑战。本文将通过一个具体的例…...

【强化学习入门笔记】1.5 贝尔曼最优公式

本系列为学习赵世钰老师的《强化学习的数学原理》所作的学习笔记. 课程视频网址&#xff1a;https://space.bilibili.com/2044042934 1.5.1 定义 1.5.1.1 Contraction mapping theorem (收缩映射定理) fixed point(不动点) 如果 x ∗ x^* x∗满足下式, x ∗ x^* x∗称之为…...

镜像里切换为普通用户

如果你登录远程虚拟机默认就是 root 用户&#xff0c;但你不希望用 root 权限运行 ns-3&#xff08;这是对的&#xff0c;ns3 工具会拒绝 root&#xff09;&#xff0c;你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案&#xff1a;创建非 roo…...

大学生职业发展与就业创业指导教学评价

这里是引用 作为软工2203/2204班的学生&#xff0c;我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要&#xff0c;而您认真负责的教学态度&#xff0c;让课程的每一部分都充满了实用价值。 尤其让我…...

快刀集(1): 一刀斩断视频片头广告

一刀流&#xff1a;用一个简单脚本&#xff0c;秒杀视频片头广告&#xff0c;还你清爽观影体验。 1. 引子 作为一个爱生活、爱学习、爱收藏高清资源的老码农&#xff0c;平时写代码之余看看电影、补补片&#xff0c;是再正常不过的事。 电影嘛&#xff0c;要沉浸&#xff0c;…...

为什么要创建 Vue 实例

核心原因:Vue 需要一个「控制中心」来驱动整个应用 你可以把 Vue 实例想象成你应用的**「大脑」或「引擎」。它负责协调模板、数据、逻辑和行为,将它们变成一个活的、可交互的应用**。没有这个实例,你的代码只是一堆静态的 HTML、JavaScript 变量和函数,无法「活」起来。 …...

CVPR2025重磅突破:AnomalyAny框架实现单样本生成逼真异常数据,破解视觉检测瓶颈!

本文介绍了一种名为AnomalyAny的创新框架&#xff0c;该方法利用Stable Diffusion的强大生成能力&#xff0c;仅需单个正常样本和文本描述&#xff0c;即可生成逼真且多样化的异常样本&#xff0c;有效解决了视觉异常检测中异常样本稀缺的难题&#xff0c;为工业质检、医疗影像…...

高保真组件库:开关

一:制作关状态 拖入一个矩形作为关闭的底色:44 x 22,填充灰色CCCCCC,圆角23,边框宽度0,文本为”关“,右对齐,边距2,2,6,2,文本颜色白色FFFFFF。 拖拽一个椭圆,尺寸18 x 18,边框为0。3. 全选转为动态面板状态1命名为”关“。 二:制作开状态 复制关状态并命名为”开…...

第22节 Node.js JXcore 打包

Node.js是一个开放源代码、跨平台的、用于服务器端和网络应用的运行环境。 JXcore是一个支持多线程的 Node.js 发行版本&#xff0c;基本不需要对你现有的代码做任何改动就可以直接线程安全地以多线程运行。 本文主要介绍JXcore的打包功能。 JXcore 安装 下载JXcore安装包&a…...

数据挖掘是什么?数据挖掘技术有哪些?

目录 一、数据挖掘是什么 二、常见的数据挖掘技术 1. 关联规则挖掘 2. 分类算法 3. 聚类分析 4. 回归分析 三、数据挖掘的应用领域 1. 商业领域 2. 医疗领域 3. 金融领域 4. 其他领域 四、数据挖掘面临的挑战和未来趋势 1. 面临的挑战 2. 未来趋势 五、总结 数据…...

自定义线程池1.2

自定义线程池 1.2 1. 简介 上次我们实现了 1.1 版本&#xff0c;将线程池中的线程数量交给使用者决定&#xff0c;并且将线程的创建延迟到任务提交的时候&#xff0c;在本文中我们将对这个版本进行如下的优化&#xff1a; 在新建线程时交给线程一个任务。让线程在某种情况下…...

Gitlab + Jenkins 实现 CICD

CICD 是持续集成&#xff08;Continuous Integration, CI&#xff09;和持续交付/部署&#xff08;Continuous Delivery/Deployment, CD&#xff09;的缩写&#xff0c;是现代软件开发中的一种自动化流程实践。下面介绍 Web 项目如何在代码提交到 Gitlab 后&#xff0c;自动发布…...