工业—使用Flink处理Kafka中的数据_ProduceRecord1

1 、 使用 Flink 消费 Kafka 中 ProduceRecord 主题的数据,统计在已经检验的产品中,各设备每 5 分钟 生产产品总数,将结果存入Redis 中, key 值为 “totalproduce” , value 值为 “ 设备 id ,最近五分钟生产总数” 。使用 redis cli 以 HGETALL key 方式获取 totalproduce值。注: ProduceRecord 主题,生产一个产品产生一条数据; change_handle_state字段为 1 代表已经检验, 0 代表未检验; 时间语义使用Processing Time 。
package flink.connimport org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisConfigBase, FlinkJedisPoolConfig}
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper/*** Redis连接配置类* * 该类用于配置与Redis的连接参数,并提供获取RedisSink实例的方法,* 使Flink流处理能够将数据输出到Redis。*/
class ConnRedis(host: String, port: Int) {// Redis连接配置,使用FlinkJedisPoolConfig来配置Redis的连接参数private val jedisConfig: FlinkJedisConfigBase = createJedisConfig(host, port)/*** 创建并返回Redis连接配置* * @param host Redis服务器的主机名或IP地址* @param port Redis服务器的端口号* @return FlinkJedisConfigBase 连接配置对象*/private def createJedisConfig(host: String, port: Int): FlinkJedisConfigBase = {new FlinkJedisPoolConfig.Builder().setHost(host) // 设置Redis主机地址.setPort(port) // 设置Redis端口号.build() // 构建并返回配置对象}/*** 获取RedisSink实例* * 该方法接受一个RedisMapper作为参数,并返回一个与该Mapper关联的RedisSink。* * @param redisMapper 通过该Mapper来定义如何将数据写入Redis* @tparam T 泛型参数,表示流中数据的类型* @return RedisSink[T] 一个与Redis连接的Sink实例*/def getRedisSink[T](redisMapper: RedisMapper[T]): RedisSink[T] = {new RedisSink[T](jedisConfig, redisMapper)}
}
类和方法说明
1. 类 ConnRedis
-
构造函数:
ConnRedis类的构造函数接受两个参数host和port,分别表示 Redis 服务器的主机地址和端口号。这个类通过这些参数来配置与 Redis 的连接。 -
成员变量
jedisConfig:jedisConfig是FlinkJedisConfigBase类型的变量,用于存储 Redis 连接的配置。它是通过createJedisConfig方法创建的。
2. 方法 createJedisConfig
这个私有方法用于创建和配置 Redis 连接的参数,并返回一个 FlinkJedisConfigBase 类型的配置对象。
-
输入参数:
host:Redis 服务器的主机名或 IP 地址。port:Redis 服务器的端口号。
-
返回值:
- 返回一个
FlinkJedisConfigBase类型的对象,这个对象是使用FlinkJedisPoolConfig.Builder()来构建的。该构建器允许你设置 Redis 连接的主机地址和端口号,并最终生成一个配置对象。
这个配置对象后续将用于建立连接到 Redis 数据库。
- 返回一个
3. 方法 getRedisSink
这个方法用于返回一个 RedisSink 实例,这个实例允许 Flink 流处理作业将数据写入 Redis。
-
输入参数:
redisMapper:一个RedisMapper[T]类型的参数。RedisMapper是 Flink 中的一个接口,它定义了如何将流中的数据(类型为T)映射为 Redis 中的键值对。
-
返回值:
- 返回一个
RedisSink[T]对象。RedisSink是 Flink 用来将数据输出到外部系统(在这里是 Redis)的一种 Sink。构建时需要传入jedisConfig(Redis 连接配置)和redisMapper(数据映射器)两个参数。
通过
RedisSink,Flink 流处理作业可以将数据推送到 Redis,具体如何存储数据由redisMapper来决定。 - 返回一个
关键组件解析
-
FlinkJedisConfigBase和FlinkJedisPoolConfig:FlinkJedisConfigBase是 Flink 与 Redis 连接时使用的基础配置类,具体配置会使用FlinkJedisPoolConfig来实现,它封装了 Redis 连接池的相关设置,包括 Redis 的主机、端口等。
-
RedisSink:RedisSink是 Flink 提供的一个 Sink 用于将数据写入 Redis。在这里,它通过jedisConfig和redisMapper来配置如何与 Redis 建立连接,并将流处理结果写入 Redis。
-
RedisMapper:RedisMapper是 Flink 中的一个接口,用于定义如何将流中的数据映射为 Redis 的键值对。你可以实现RedisMapper接口,并在其中定义数据的 Redis 键和值的生成方式。比如,你可以将数据的某些字段作为 Redis 键,其他字段作为 Redis 值。
package flink.calculate.ProduceRecordimport flink.conn.ConnRedis
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}/*** Flink作业:统计生产记录并将结果输出到Redis* * 本作业从Kafka消费生产记录数据,统计每个设备在最近五分钟内的生产数量,* 并将结果存储到Redis中。*/
object TotalProduceToRedis {// 定义Kafka的相关配置private val topicName: String = "ProduceRecord" // Kafka主题名private val bootstrapServer: String = "master:9092,slave1:9092,slave2:9092" // Kafka bootstrap serversprivate val redisHost: String = "master" // Redis服务器地址// 主程序入口def main(args: Array[String]): Unit = {// 创建流式执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 设置自动水位线间隔为0,意味着不使用水位线(适用于没有事件时间的场景)env.getConfig.setAutoWatermarkInterval(0L)// 设置并行度为1env.setParallelism(1)// 设置Kafka Source:消费Kafka中的数据val kafkaSource: KafkaSource[String] = KafkaSource.builder().setTopics(topicName) // 设置Kafka主题.setBootstrapServers(bootstrapServer) // 设置Kafka服务器地址.setStartingOffsets(OffsetsInitializer.latest()) // 设置偏移量策略:latest表示读取最新数据.setValueOnlyDeserializer(new SimpleStringSchema()) // 设置数据的反序列化方式.build()// 从Kafka读取数据并生成DataStreamval dataStream: DataStream[String] = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), topicName)// 打印读取的数据(可用于调试)dataStream.print("========元数据============>")// 处理数据:统计每个设备的生产数量val result: DataStream[TotalProduce] = dataStream.map(parseProduceData) // 转换为TotalProduce对象.filter(_.ProduceInspect == 1) // 过滤已检验的产品.keyBy(_.ProduceMachineID) // 按设备ID分组.window(TumblingProcessingTimeWindows.of(Time.minutes(5))) // 使用滚动窗口,窗口大小为5分钟.sum(1) // 对生产数量求和// 将结果存入Redisresult.addSink(new ConnRedis(redisHost, 6379).getRedisSink(new TotalProduceMapper))// 打印最终结果(可用于调试)result.print("=========result=========>>>>>>>")// 执行流处理作业env.execute("TotalProduceToRedis")}/*** 将输入的生产记录字符串转换为TotalProduce对象* * @param input 生产记录字符串* @return 返回转换后的TotalProduce对象*/def parseProduceData(input: String): TotalProduce = {val array: Array[String] = input.split(",") // 按逗号分割字符串TotalProduce(array(1), array(9).toInt) // 返回TotalProduce对象,分别提取生产设备ID和生产检查状态}/*** 生产记录类,包含设备ID和生产检查状态** @param ProduceMachineID 设备ID* @param ProduceInspect 生产检查状态*/case class TotalProduce(ProduceMachineID: String, ProduceInspect: Int)/*** Redis数据映射器:将TotalProduce对象映射到Redis的命令*/class TotalProduceMapper extends RedisMapper[TotalProduce] {/*** 获取Redis命令描述信息* * @return RedisCommandDescription,表示Redis操作类型及目标*/override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET, "totalproduce") // 使用HSET命令将数据存入Redis的哈希表中}/*** 从TotalProduce对象中获取Redis的key* * @param t TotalProduce对象* @return Redis中的键,即设备ID*/override def getKeyFromData(t: TotalProduce): String = t.ProduceMachineID/*** 从TotalProduce对象中获取Redis的value* * @param t TotalProduce对象* @return Redis中的值,表示最近五分钟内该设备生产的数量*/override def getValueFromData(t: TotalProduce): String = {s"设备 ${t.ProduceMachineID} 最近五分钟生产了 ${t.ProduceInspect} 个产品"}}}
1. Kafka 配置
- Kafka 主题和服务器配置:
topicName:指定了 Kafka 主题名为ProduceRecord,这是从中消费数据的主题。bootstrapServer:定义了 Kafka 集群的地址(多个 broker),这些是连接 Kafka 的入口。
- Kafka 数据源创建:
- 通过
KafkaSource.builder()创建 Kafka 数据源,设定了主题、Kafka 服务器地址等信息。 setStartingOffsets(OffsetsInitializer.latest())设置了 Kafka 消费的偏移量策略为latest,即只读取最新的数据(如果是earliest,则从头开始读取所有数据)。setValueOnlyDeserializer(new SimpleStringSchema())设置了数据的反序列化方式,简单地将每条记录转换成字符串。
- 通过
2. 流处理环境设置
StreamExecutionEnvironment:创建流处理执行环境,用来定义和执行流式计算。setParallelism(1):设置作业的并行度为 1,意味着整个作业的任务只会在一个任务槽中执行,不会并行处理多个任务。setAutoWatermarkInterval(0L):关闭了水位线的使用。水位线用于处理事件时间和处理时间之间的延迟,这里设置为 0,表示不使用水位线。
3. 数据流处理
- 读取数据:通过
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), topicName)从 Kafka 中读取数据。这里的WatermarkStrategy.noWatermarks()表示不使用水位线。 - 数据解析和转换:
map(parseProduceData):将每条 Kafka 消息(字符串格式)转换为TotalProduce对象,提取出设备ID和生产检查状态。filter(_.ProduceInspect == 1):过滤数据,只保留生产检查状态为 1 的记录,意味着只关心已检验的产品。keyBy(_.ProduceMachineID):按照设备ID进行分组,保证每个设备的生产数据是单独处理的。window(TumblingProcessingTimeWindows.of(Time.minutes(5))):使用滚动窗口(Tumbling Window)进行计算,窗口大小为 5 分钟。sum(1):对每个窗口内的生产数量进行求和,这里假设ProduceInspect字段表示生产数量。
4. 输出到 Redis
- Redis 连接和写入:结果通过
result.addSink(new ConnRedis(redisHost, 6379).getRedisSink(new TotalProduceMapper))写入 Redis。ConnRedis类负责建立与 Redis 的连接,并通过getRedisSink获取一个用于将数据存入 Redis 的 Sink。TotalProduceMapper类负责将TotalProduce对象映射为 Redis 命令。具体地,通过HSET将数据存储到 Redis 哈希表中,键为设备ID,值为该设备在最近五分钟内生产的产品数量。
5. TotalProduceMapper 类
该类实现了 RedisMapper 接口,负责定义如何将 TotalProduce 对象映射成 Redis 命令和数据:
getCommandDescription:指定 Redis 操作类型为HSET,将数据存入 Redis 的哈希表totalproduce中。getKeyFromData:返回 Redis 中的键,即设备ID (ProduceMachineID)。getValueFromData:返回 Redis 中的值,即设备在最近五分钟内生产的数量。这里的值是一个字符串,包含设备ID和生产数量的信息。
6. 程序执行
env.execute("TotalProduceToRedis"):启动 Flink 作业,开始流式计算,消费 Kafka 数据,进行窗口计算,并将结果存入 Redis。
相关文章:
工业—使用Flink处理Kafka中的数据_ProduceRecord1
1 、 使用 Flink 消费 Kafka 中 ProduceRecord 主题的数据,统计在已经检验的产品中,各设备每 5 分钟 生产产品总数,将结果存入Redis 中, key 值为 “totalproduce” , value 值为 “ 设备 id ,最近五分钟生…...
探索CSS版心布局:构建现代网页的黄金比例
探索CSS版心布局:构建现代网页的黄金比例 在网页设计中,版心(或称为内容区域)是页面的核心部分,通常用于放置主要内容。使用CSS3的新特性,可以创建更加灵活和响应式的版心布局。本文将详细介绍如何使用CSS…...
华为NPU服务器昇腾Ascend 910B2部署通义千问Qwen2.5——基于mindie镜像一路试错版(三)
文章目录 前言纯模型推理启动服务后面干什么?这可咋整啊?愁死了!总结前言 这是咱这个系列的第三个文章了。 毕竟,这是我好几天摸索出的经验,能帮助各位在几个小时内领会,我觉得也算是我的功劳一件了。 所以,一是希望大家耐心看下去,耐心操作下去;而是恳请各位多多关…...
详解Java数据库编程之JDBC
目录 首先创建一个Java项目 在Maven中央仓库下载mysql connector的jar包 针对MySQL版本5 针对MySQL版本8 下载之后,在IDEA中创建的项目中建立一个lib目录,然后把刚刚下载好的jar包拷贝进去,然后右键刚刚添加的jar包,点击‘添…...
基于MFC实现的人机对战五子棋游戏
基于MFC实现的人机对战五子棋游戏 1、引言 此报告将详细介绍本次课程设计的动机、设计思路及编写技术的详细过程,展现我所学过的C知识以及我通过本次课程设计所学到例如MFC等知识。在文档最后我也会记录我所编写过程遇到的问题以及解决方案。 1.1 背景 五子棋是…...
AIGC 时代的文学:变革与坚守
目录 一.AIGC 带来的文学变革 1.创作方式的改变 2.阅读体验的升级 3.文学市场的重塑 二.文学在 AIGC 时代的坚守 1.人类情感的表达 2.文学的艺术性 3.文学的社会责任 三.AIGC 与人类作家的共生之路 1.相互学习 2.合作创作 3.共同发展 另: 总结 随着人…...
InfluxDB 集成 Grafana
将InfluxDB集成到Grafana进行详细配置通常包括以下几个步骤:安装与配置InfluxDB、安装与配置Grafana、在Grafana中添加InfluxDB数据源以及创建和配置仪表板。以下是一个详细的配置指南: 一、安装与配置InfluxDB 下载与安装: 从InfluxDB的官…...
笔记本电脑usb接口没反应怎么办?原因及解决方法
笔记本电脑的USB接口是我们日常使用中非常频繁的一个功能,无论是数据传输、充电还是外接设备,都离不开它。然而,当USB接口突然没有反应时,这无疑会给我们的工作和学习带来不小的困扰。下面,我们就来探讨一下笔记本USB接…...
【开源】A060-基于Spring Boot的游戏交易系统的设计与实现
🙊作者简介:在校研究生,拥有计算机专业的研究生开发团队,分享技术代码帮助学生学习,独立完成自己的网站项目。 代码可以查看项目链接获取⬇️,记得注明来意哦~🌹 赠送计算机毕业设计600个选题ex…...
static关键字在嵌入式C编程中的应用
目录 一、控制变量的存储周期和可见性 1.1. 局部静态变量 1.2. 全局静态变量 二、控制函数的可见性 2.1. 静态函数 2.2. 代码示例(假设有两个文件:file1.c和file2.c) 三、应用场景 3.1. 存储常用数据 3.2. 实现内部辅助函数 四、注…...
集合框架(1)
集合框架(1) 1、数组的特点与弊端 (1)特点: 数组初始化以后,长度就确定了。数组中的添加的元素是依次紧密排列的,有序的,可以重复的。数组声明的类型,就决定了进行元素初…...
Java 基础之泛型:类型安全的保障与灵活运用
在 Java 编程的世界里,泛型是一个至关重要且非常实用的特性。它在 Java 5 中被引入,从根本上改变了我们处理数据类型的方式,提供了更强的类型安全保障,同时也增加了代码的复用性和可读性。 一、什么是泛型 泛型(Gener…...
开发者如何使用GCC提升开发效率Opencv操作
看此篇前请先阅读 https://blog.csdn.net/qq_20330595/article/details/144134160?spm=1001.2014.3001.5502 https://blog.csdn.net/qq_20330595/article/details/144134160?spm=1001.2014.3001.5502 https://blog.csdn.net/qq_20330595/article/details/144216351?spm=1001…...
矩阵加法
矩阵加法 C语言代码C 语言代码Java语言代码Python语言代码 💐The Begin💐点点关注,收藏不迷路💐 输入两个n行m列的矩阵A和B,输出它们的和AB。 输入 第一行包含两个整数n和m,表示矩阵的行数和列数。1 <…...
yarn : 无法加载文件 E:\node\node_global\yarn.ps1,因为在此系统上禁止运行脚本
先确保安装了yarn —— npm install -g yarn 终端输入set-ExecutionPolicy RemoteSigned 若要在本地计算机上运行您编写的未签名脚本和来自其他用户的签名脚本,请使用以下命令将计算机上的执行策略更改为RemoteSigned 再去使用yarn okk~...
详解C++类与对象(四)
文章目录 1.类型转换1.1 前言1.2 类型转换的性质 2.static成员2.1 前言2.2 static的基本概念 3.友元4.内部类5.匿名对象 1.类型转换 1.1 前言 在C中,由于程序员可以自己显示定义一个新的类。这样就会出现一个问题:程序员自己显示定义的类类型与编译器中…...
Pandas处理和分析嵌套JSON数据:从字符串到结构化DataFrame
在数据分析领域,我们经常遇到需要从非结构化数据中提取有用信息的场景。特别是当数据以JSON字符串的形式出现时,如何有效地将其转换为结构化的表格形式,以便进行进一步的分析和处理,成为了一个常见的挑战。本文将通过一个具体的例…...
【强化学习入门笔记】1.5 贝尔曼最优公式
本系列为学习赵世钰老师的《强化学习的数学原理》所作的学习笔记. 课程视频网址:https://space.bilibili.com/2044042934 1.5.1 定义 1.5.1.1 Contraction mapping theorem (收缩映射定理) fixed point(不动点) 如果 x ∗ x^* x∗满足下式, x ∗ x^* x∗称之为…...
编码问题技术探讨:IDE全局GBK与项目UTF-8引发的中文乱码
在软件开发过程中,编码问题一直是开发者们需要面对和解决的难题之一。尤其是在使用IDE(集成开发环境)时,如果全局编码设置与项目编码设置不一致,往往会导致中文乱码的问题。本文将深入探讨这一问题的背景、示例以及解决…...
SpringBoot两天
SpringBoot讲义 什么是SpringBoot? Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置。通过这种方式…...
CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型
CVPR 2025 | MIMO:支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题:MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者:Yanyuan Chen, Dexuan Xu, Yu Hu…...
Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...
令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍
文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结: 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析: 实际业务去理解体会统一注…...
高防服务器能够抵御哪些网络攻击呢?
高防服务器作为一种有着高度防御能力的服务器,可以帮助网站应对分布式拒绝服务攻击,有效识别和清理一些恶意的网络流量,为用户提供安全且稳定的网络环境,那么,高防服务器一般都可以抵御哪些网络攻击呢?下面…...
聊一聊接口测试的意义有哪些?
目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开,首…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...
JAVA后端开发——多租户
数据隔离是多租户系统中的核心概念,确保一个租户(在这个系统中可能是一个公司或一个独立的客户)的数据对其他租户是不可见的。在 RuoYi 框架(您当前项目所使用的基础框架)中,这通常是通过在数据表中增加一个…...
Spring是如何解决Bean的循环依赖:三级缓存机制
1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间互相持有对方引用,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...
Web中间件--tomcat学习
Web中间件–tomcat Java虚拟机详解 什么是JAVA虚拟机 Java虚拟机是一个抽象的计算机,它可以执行Java字节码。Java虚拟机是Java平台的一部分,Java平台由Java语言、Java API和Java虚拟机组成。Java虚拟机的主要作用是将Java字节码转换为机器代码&#x…...
《Docker》架构
文章目录 架构模式单机架构应用数据分离架构应用服务器集群架构读写分离/主从分离架构冷热分离架构垂直分库架构微服务架构容器编排架构什么是容器,docker,镜像,k8s 架构模式 单机架构 单机架构其实就是应用服务器和单机服务器都部署在同一…...
