工业—使用Flink处理Kafka中的数据_ProduceRecord1
1 、 使用 Flink 消费 Kafka 中 ProduceRecord 主题的数据,统计在已经检验的产品中,各设备每 5 分钟 生产产品总数,将结果存入Redis 中, key 值为 “totalproduce” , value 值为 “ 设备 id ,最近五分钟生产总数” 。使用 redis cli 以 HGETALL key 方式获取 totalproduce值。注: ProduceRecord 主题,生产一个产品产生一条数据; change_handle_state字段为 1 代表已经检验, 0 代表未检验; 时间语义使用Processing Time 。
package flink.connimport org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisConfigBase, FlinkJedisPoolConfig}
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper/*** Redis连接配置类* * 该类用于配置与Redis的连接参数,并提供获取RedisSink实例的方法,* 使Flink流处理能够将数据输出到Redis。*/
class ConnRedis(host: String, port: Int) {// Redis连接配置,使用FlinkJedisPoolConfig来配置Redis的连接参数private val jedisConfig: FlinkJedisConfigBase = createJedisConfig(host, port)/*** 创建并返回Redis连接配置* * @param host Redis服务器的主机名或IP地址* @param port Redis服务器的端口号* @return FlinkJedisConfigBase 连接配置对象*/private def createJedisConfig(host: String, port: Int): FlinkJedisConfigBase = {new FlinkJedisPoolConfig.Builder().setHost(host) // 设置Redis主机地址.setPort(port) // 设置Redis端口号.build() // 构建并返回配置对象}/*** 获取RedisSink实例* * 该方法接受一个RedisMapper作为参数,并返回一个与该Mapper关联的RedisSink。* * @param redisMapper 通过该Mapper来定义如何将数据写入Redis* @tparam T 泛型参数,表示流中数据的类型* @return RedisSink[T] 一个与Redis连接的Sink实例*/def getRedisSink[T](redisMapper: RedisMapper[T]): RedisSink[T] = {new RedisSink[T](jedisConfig, redisMapper)}
}
类和方法说明
1. 类 ConnRedis
-
构造函数:
ConnRedis
类的构造函数接受两个参数host
和port
,分别表示 Redis 服务器的主机地址和端口号。这个类通过这些参数来配置与 Redis 的连接。 -
成员变量
jedisConfig
:jedisConfig
是FlinkJedisConfigBase
类型的变量,用于存储 Redis 连接的配置。它是通过createJedisConfig
方法创建的。
2. 方法 createJedisConfig
这个私有方法用于创建和配置 Redis 连接的参数,并返回一个 FlinkJedisConfigBase
类型的配置对象。
-
输入参数:
host
:Redis 服务器的主机名或 IP 地址。port
:Redis 服务器的端口号。
-
返回值:
- 返回一个
FlinkJedisConfigBase
类型的对象,这个对象是使用FlinkJedisPoolConfig.Builder()
来构建的。该构建器允许你设置 Redis 连接的主机地址和端口号,并最终生成一个配置对象。
这个配置对象后续将用于建立连接到 Redis 数据库。
- 返回一个
3. 方法 getRedisSink
这个方法用于返回一个 RedisSink
实例,这个实例允许 Flink 流处理作业将数据写入 Redis。
-
输入参数:
redisMapper
:一个RedisMapper[T]
类型的参数。RedisMapper
是 Flink 中的一个接口,它定义了如何将流中的数据(类型为T
)映射为 Redis 中的键值对。
-
返回值:
- 返回一个
RedisSink[T]
对象。RedisSink
是 Flink 用来将数据输出到外部系统(在这里是 Redis)的一种 Sink。构建时需要传入jedisConfig
(Redis 连接配置)和redisMapper
(数据映射器)两个参数。
通过
RedisSink
,Flink 流处理作业可以将数据推送到 Redis,具体如何存储数据由redisMapper
来决定。 - 返回一个
关键组件解析
-
FlinkJedisConfigBase
和FlinkJedisPoolConfig
:FlinkJedisConfigBase
是 Flink 与 Redis 连接时使用的基础配置类,具体配置会使用FlinkJedisPoolConfig
来实现,它封装了 Redis 连接池的相关设置,包括 Redis 的主机、端口等。
-
RedisSink
:RedisSink
是 Flink 提供的一个 Sink 用于将数据写入 Redis。在这里,它通过jedisConfig
和redisMapper
来配置如何与 Redis 建立连接,并将流处理结果写入 Redis。
-
RedisMapper
:RedisMapper
是 Flink 中的一个接口,用于定义如何将流中的数据映射为 Redis 的键值对。你可以实现RedisMapper
接口,并在其中定义数据的 Redis 键和值的生成方式。比如,你可以将数据的某些字段作为 Redis 键,其他字段作为 Redis 值。
package flink.calculate.ProduceRecordimport flink.conn.ConnRedis
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}/*** Flink作业:统计生产记录并将结果输出到Redis* * 本作业从Kafka消费生产记录数据,统计每个设备在最近五分钟内的生产数量,* 并将结果存储到Redis中。*/
object TotalProduceToRedis {// 定义Kafka的相关配置private val topicName: String = "ProduceRecord" // Kafka主题名private val bootstrapServer: String = "master:9092,slave1:9092,slave2:9092" // Kafka bootstrap serversprivate val redisHost: String = "master" // Redis服务器地址// 主程序入口def main(args: Array[String]): Unit = {// 创建流式执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 设置自动水位线间隔为0,意味着不使用水位线(适用于没有事件时间的场景)env.getConfig.setAutoWatermarkInterval(0L)// 设置并行度为1env.setParallelism(1)// 设置Kafka Source:消费Kafka中的数据val kafkaSource: KafkaSource[String] = KafkaSource.builder().setTopics(topicName) // 设置Kafka主题.setBootstrapServers(bootstrapServer) // 设置Kafka服务器地址.setStartingOffsets(OffsetsInitializer.latest()) // 设置偏移量策略:latest表示读取最新数据.setValueOnlyDeserializer(new SimpleStringSchema()) // 设置数据的反序列化方式.build()// 从Kafka读取数据并生成DataStreamval dataStream: DataStream[String] = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), topicName)// 打印读取的数据(可用于调试)dataStream.print("========元数据============>")// 处理数据:统计每个设备的生产数量val result: DataStream[TotalProduce] = dataStream.map(parseProduceData) // 转换为TotalProduce对象.filter(_.ProduceInspect == 1) // 过滤已检验的产品.keyBy(_.ProduceMachineID) // 按设备ID分组.window(TumblingProcessingTimeWindows.of(Time.minutes(5))) // 使用滚动窗口,窗口大小为5分钟.sum(1) // 对生产数量求和// 将结果存入Redisresult.addSink(new ConnRedis(redisHost, 6379).getRedisSink(new TotalProduceMapper))// 打印最终结果(可用于调试)result.print("=========result=========>>>>>>>")// 执行流处理作业env.execute("TotalProduceToRedis")}/*** 将输入的生产记录字符串转换为TotalProduce对象* * @param input 生产记录字符串* @return 返回转换后的TotalProduce对象*/def parseProduceData(input: String): TotalProduce = {val array: Array[String] = input.split(",") // 按逗号分割字符串TotalProduce(array(1), array(9).toInt) // 返回TotalProduce对象,分别提取生产设备ID和生产检查状态}/*** 生产记录类,包含设备ID和生产检查状态** @param ProduceMachineID 设备ID* @param ProduceInspect 生产检查状态*/case class TotalProduce(ProduceMachineID: String, ProduceInspect: Int)/*** Redis数据映射器:将TotalProduce对象映射到Redis的命令*/class TotalProduceMapper extends RedisMapper[TotalProduce] {/*** 获取Redis命令描述信息* * @return RedisCommandDescription,表示Redis操作类型及目标*/override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET, "totalproduce") // 使用HSET命令将数据存入Redis的哈希表中}/*** 从TotalProduce对象中获取Redis的key* * @param t TotalProduce对象* @return Redis中的键,即设备ID*/override def getKeyFromData(t: TotalProduce): String = t.ProduceMachineID/*** 从TotalProduce对象中获取Redis的value* * @param t TotalProduce对象* @return Redis中的值,表示最近五分钟内该设备生产的数量*/override def getValueFromData(t: TotalProduce): String = {s"设备 ${t.ProduceMachineID} 最近五分钟生产了 ${t.ProduceInspect} 个产品"}}}
1. Kafka 配置
- Kafka 主题和服务器配置:
topicName
:指定了 Kafka 主题名为ProduceRecord
,这是从中消费数据的主题。bootstrapServer
:定义了 Kafka 集群的地址(多个 broker),这些是连接 Kafka 的入口。
- Kafka 数据源创建:
- 通过
KafkaSource.builder()
创建 Kafka 数据源,设定了主题、Kafka 服务器地址等信息。 setStartingOffsets(OffsetsInitializer.latest())
设置了 Kafka 消费的偏移量策略为latest
,即只读取最新的数据(如果是earliest
,则从头开始读取所有数据)。setValueOnlyDeserializer(new SimpleStringSchema())
设置了数据的反序列化方式,简单地将每条记录转换成字符串。
- 通过
2. 流处理环境设置
StreamExecutionEnvironment
:创建流处理执行环境,用来定义和执行流式计算。setParallelism(1)
:设置作业的并行度为 1,意味着整个作业的任务只会在一个任务槽中执行,不会并行处理多个任务。setAutoWatermarkInterval(0L)
:关闭了水位线的使用。水位线用于处理事件时间和处理时间之间的延迟,这里设置为 0,表示不使用水位线。
3. 数据流处理
- 读取数据:通过
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), topicName)
从 Kafka 中读取数据。这里的WatermarkStrategy.noWatermarks()
表示不使用水位线。 - 数据解析和转换:
map(parseProduceData)
:将每条 Kafka 消息(字符串格式)转换为TotalProduce
对象,提取出设备ID和生产检查状态。filter(_.ProduceInspect == 1)
:过滤数据,只保留生产检查状态为 1 的记录,意味着只关心已检验的产品。keyBy(_.ProduceMachineID)
:按照设备ID进行分组,保证每个设备的生产数据是单独处理的。window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
:使用滚动窗口(Tumbling Window)进行计算,窗口大小为 5 分钟。sum(1)
:对每个窗口内的生产数量进行求和,这里假设ProduceInspect
字段表示生产数量。
4. 输出到 Redis
- Redis 连接和写入:结果通过
result.addSink(new ConnRedis(redisHost, 6379).getRedisSink(new TotalProduceMapper))
写入 Redis。ConnRedis
类负责建立与 Redis 的连接,并通过getRedisSink
获取一个用于将数据存入 Redis 的 Sink。TotalProduceMapper
类负责将TotalProduce
对象映射为 Redis 命令。具体地,通过HSET
将数据存储到 Redis 哈希表中,键为设备ID,值为该设备在最近五分钟内生产的产品数量。
5. TotalProduceMapper
类
该类实现了 RedisMapper
接口,负责定义如何将 TotalProduce
对象映射成 Redis 命令和数据:
getCommandDescription
:指定 Redis 操作类型为HSET
,将数据存入 Redis 的哈希表totalproduce
中。getKeyFromData
:返回 Redis 中的键,即设备ID (ProduceMachineID
)。getValueFromData
:返回 Redis 中的值,即设备在最近五分钟内生产的数量。这里的值是一个字符串,包含设备ID和生产数量的信息。
6. 程序执行
env.execute("TotalProduceToRedis")
:启动 Flink 作业,开始流式计算,消费 Kafka 数据,进行窗口计算,并将结果存入 Redis。
相关文章:

工业—使用Flink处理Kafka中的数据_ProduceRecord1
1 、 使用 Flink 消费 Kafka 中 ProduceRecord 主题的数据,统计在已经检验的产品中,各设备每 5 分钟 生产产品总数,将结果存入Redis 中, key 值为 “totalproduce” , value 值为 “ 设备 id ,最近五分钟生…...
探索CSS版心布局:构建现代网页的黄金比例
探索CSS版心布局:构建现代网页的黄金比例 在网页设计中,版心(或称为内容区域)是页面的核心部分,通常用于放置主要内容。使用CSS3的新特性,可以创建更加灵活和响应式的版心布局。本文将详细介绍如何使用CSS…...

华为NPU服务器昇腾Ascend 910B2部署通义千问Qwen2.5——基于mindie镜像一路试错版(三)
文章目录 前言纯模型推理启动服务后面干什么?这可咋整啊?愁死了!总结前言 这是咱这个系列的第三个文章了。 毕竟,这是我好几天摸索出的经验,能帮助各位在几个小时内领会,我觉得也算是我的功劳一件了。 所以,一是希望大家耐心看下去,耐心操作下去;而是恳请各位多多关…...

详解Java数据库编程之JDBC
目录 首先创建一个Java项目 在Maven中央仓库下载mysql connector的jar包 针对MySQL版本5 针对MySQL版本8 下载之后,在IDEA中创建的项目中建立一个lib目录,然后把刚刚下载好的jar包拷贝进去,然后右键刚刚添加的jar包,点击‘添…...

基于MFC实现的人机对战五子棋游戏
基于MFC实现的人机对战五子棋游戏 1、引言 此报告将详细介绍本次课程设计的动机、设计思路及编写技术的详细过程,展现我所学过的C知识以及我通过本次课程设计所学到例如MFC等知识。在文档最后我也会记录我所编写过程遇到的问题以及解决方案。 1.1 背景 五子棋是…...

AIGC 时代的文学:变革与坚守
目录 一.AIGC 带来的文学变革 1.创作方式的改变 2.阅读体验的升级 3.文学市场的重塑 二.文学在 AIGC 时代的坚守 1.人类情感的表达 2.文学的艺术性 3.文学的社会责任 三.AIGC 与人类作家的共生之路 1.相互学习 2.合作创作 3.共同发展 另: 总结 随着人…...

InfluxDB 集成 Grafana
将InfluxDB集成到Grafana进行详细配置通常包括以下几个步骤:安装与配置InfluxDB、安装与配置Grafana、在Grafana中添加InfluxDB数据源以及创建和配置仪表板。以下是一个详细的配置指南: 一、安装与配置InfluxDB 下载与安装: 从InfluxDB的官…...

笔记本电脑usb接口没反应怎么办?原因及解决方法
笔记本电脑的USB接口是我们日常使用中非常频繁的一个功能,无论是数据传输、充电还是外接设备,都离不开它。然而,当USB接口突然没有反应时,这无疑会给我们的工作和学习带来不小的困扰。下面,我们就来探讨一下笔记本USB接…...

【开源】A060-基于Spring Boot的游戏交易系统的设计与实现
🙊作者简介:在校研究生,拥有计算机专业的研究生开发团队,分享技术代码帮助学生学习,独立完成自己的网站项目。 代码可以查看项目链接获取⬇️,记得注明来意哦~🌹 赠送计算机毕业设计600个选题ex…...

static关键字在嵌入式C编程中的应用
目录 一、控制变量的存储周期和可见性 1.1. 局部静态变量 1.2. 全局静态变量 二、控制函数的可见性 2.1. 静态函数 2.2. 代码示例(假设有两个文件:file1.c和file2.c) 三、应用场景 3.1. 存储常用数据 3.2. 实现内部辅助函数 四、注…...
集合框架(1)
集合框架(1) 1、数组的特点与弊端 (1)特点: 数组初始化以后,长度就确定了。数组中的添加的元素是依次紧密排列的,有序的,可以重复的。数组声明的类型,就决定了进行元素初…...
Java 基础之泛型:类型安全的保障与灵活运用
在 Java 编程的世界里,泛型是一个至关重要且非常实用的特性。它在 Java 5 中被引入,从根本上改变了我们处理数据类型的方式,提供了更强的类型安全保障,同时也增加了代码的复用性和可读性。 一、什么是泛型 泛型(Gener…...

开发者如何使用GCC提升开发效率Opencv操作
看此篇前请先阅读 https://blog.csdn.net/qq_20330595/article/details/144134160?spm=1001.2014.3001.5502 https://blog.csdn.net/qq_20330595/article/details/144134160?spm=1001.2014.3001.5502 https://blog.csdn.net/qq_20330595/article/details/144216351?spm=1001…...

矩阵加法
矩阵加法 C语言代码C 语言代码Java语言代码Python语言代码 💐The Begin💐点点关注,收藏不迷路💐 输入两个n行m列的矩阵A和B,输出它们的和AB。 输入 第一行包含两个整数n和m,表示矩阵的行数和列数。1 <…...

yarn : 无法加载文件 E:\node\node_global\yarn.ps1,因为在此系统上禁止运行脚本
先确保安装了yarn —— npm install -g yarn 终端输入set-ExecutionPolicy RemoteSigned 若要在本地计算机上运行您编写的未签名脚本和来自其他用户的签名脚本,请使用以下命令将计算机上的执行策略更改为RemoteSigned 再去使用yarn okk~...

详解C++类与对象(四)
文章目录 1.类型转换1.1 前言1.2 类型转换的性质 2.static成员2.1 前言2.2 static的基本概念 3.友元4.内部类5.匿名对象 1.类型转换 1.1 前言 在C中,由于程序员可以自己显示定义一个新的类。这样就会出现一个问题:程序员自己显示定义的类类型与编译器中…...
Pandas处理和分析嵌套JSON数据:从字符串到结构化DataFrame
在数据分析领域,我们经常遇到需要从非结构化数据中提取有用信息的场景。特别是当数据以JSON字符串的形式出现时,如何有效地将其转换为结构化的表格形式,以便进行进一步的分析和处理,成为了一个常见的挑战。本文将通过一个具体的例…...

【强化学习入门笔记】1.5 贝尔曼最优公式
本系列为学习赵世钰老师的《强化学习的数学原理》所作的学习笔记. 课程视频网址:https://space.bilibili.com/2044042934 1.5.1 定义 1.5.1.1 Contraction mapping theorem (收缩映射定理) fixed point(不动点) 如果 x ∗ x^* x∗满足下式, x ∗ x^* x∗称之为…...
编码问题技术探讨:IDE全局GBK与项目UTF-8引发的中文乱码
在软件开发过程中,编码问题一直是开发者们需要面对和解决的难题之一。尤其是在使用IDE(集成开发环境)时,如果全局编码设置与项目编码设置不一致,往往会导致中文乱码的问题。本文将深入探讨这一问题的背景、示例以及解决…...

SpringBoot两天
SpringBoot讲义 什么是SpringBoot? Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置。通过这种方式…...

练习(含atoi的模拟实现,自定义类型等练习)
一、结构体大小的计算及位段 (结构体大小计算及位段 详解请看:自定义类型:结构体进阶-CSDN博客) 1.在32位系统环境,编译选项为4字节对齐,那么sizeof(A)和sizeof(B)是多少? #pragma pack(4)st…...

对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...

ElasticSearch搜索引擎之倒排索引及其底层算法
文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...

ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
Caliper 配置文件解析:fisco-bcos.json
config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...
探索Selenium:自动化测试的神奇钥匙
目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...

uniapp 小程序 学习(一)
利用Hbuilder 创建项目 运行到内置浏览器看效果 下载微信小程序 安装到Hbuilder 下载地址 :开发者工具默认安装 设置服务端口号 在Hbuilder中设置微信小程序 配置 找到运行设置,将微信开发者工具放入到Hbuilder中, 打开后出现 如下 bug 解…...

【Post-process】【VBA】ETABS VBA FrameObj.GetNameList and write to EXCEL
ETABS API实战:导出框架元素数据到Excel 在结构工程师的日常工作中,经常需要从ETABS模型中提取框架元素信息进行后续分析。手动复制粘贴不仅耗时,还容易出错。今天我们来用简单的VBA代码实现自动化导出。 🎯 我们要实现什么? 一键点击,就能将ETABS中所有框架元素的基…...

【java面试】微服务篇
【java面试】微服务篇 一、总体框架二、Springcloud(一)Springcloud五大组件(二)服务注册和发现1、Eureka2、Nacos (三)负载均衡1、Ribbon负载均衡流程2、Ribbon负载均衡策略3、自定义负载均衡策略4、总结 …...

高端性能封装正在突破性能壁垒,其芯片集成技术助力人工智能革命。
2024 年,高端封装市场规模为 80 亿美元,预计到 2030 年将超过 280 亿美元,2024-2030 年复合年增长率为 23%。 细分到各个终端市场,最大的高端性能封装市场是“电信和基础设施”,2024 年该市场创造了超过 67% 的收入。…...