当前位置: 首页 > news >正文

使用Flink处理Kafka中的数据

目录

        使用Flink处理Kafka中的数据

前提:

 一, 使用Flink消费Kafka中ProduceRecord主题的数据

具体代码为(scala)

执行结果

二, 使用Flink消费Kafka中ChangeRecord主题的数据 

         具体代码(scala)

                具体执行代码①

                重要逻辑代码② 

执行结果为:

使用Flink处理Kafka中的数据

        前提:

        创建主题  : ChangeRecord        ,     ProduceRecord

        使用kafka-topics.sh --zookeeper bigdata1:2181/kafka --list 查看主题

kafka-topics.sh --zookeeper bigdata1:2181/kafka --list

        

        然后开启数据生成器

./jnamake_data_file_v1 

        

一, 使用Flink消费Kafka中ProduceRecord主题的数据

        启动Flume a1, a1为所赋予的名称

         

        

 flume-ng agent --conf-file /opt/module/flume-1.9.0/job/flume-to-kafka-producerecord--name a1 -Dflume.root.logger=DEBUG,console

        启动一个Kafka的消费者(consumer)来消费(读取)Kafka中的消息         

kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --from-beginning --topic ProduceRecord

        

编写Scala工程代码,使用Flink消费Kafka中的数据并进行相应的数据统计计算。

 

       一, 使用Flink消费Kafka中ProduceRecord主题的数据,统计在已经检验的产品中,各设备每五分钟生产产品总数,将结果存入Redis中,key值为“totalproduce”,value值为“设备id,最近五分钟生产总数”。使用redis cli以HGETALL key方式获取totalproduce值,将结果截图粘贴至对应报告中,需两次截图,第一次截图和第二次截图间隔五分钟以上,第一次截图放前面,第二次放后面;

注:ProduceRecord主题,生产一个产品产生一条数据;

change_handle_state字段为1代表已经检验,0代表未检验;

时间语义使用Processing Time。

具体代码为(scala):
package gyflink
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisClusterConfig, FlinkJedisPoolConfig}
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}import java.util.Properties
object test1{def main(args: Array[String]): Unit = {// 创建Flink流执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 设置并行度env.setParallelism(1)//指定时间语义env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)// kafka的属性配置val properties = new Properties()properties.setProperty("bootstrap.servers","bigdata1:9092,bigdata2:9092,bigdata3:9092")properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringSerializer")properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringSerializer")properties.setProperty("auto.offset.reset","earliest")// 读取kafka数据val FlinkKafkaConsumer = new FlinkKafkaConsumer[String]("ProduceRecord", new SimpleStringSchema(), properties)val text = env.addSource(FlinkKafkaConsumer)// TODO 使用flink算子对数据进行处理//    topic的一条数据:2214,117,0002,2024-01-09 11:08:53,2024-01-09 11:08:53,2024-01-09 11:08:59,15897,1900-01-01 00:00:00,188815,0val inputMap = text.map(link => {val arr = link.split(",")     // 使用‘,’作为分割符(arr(1).toInt, arr(9).toInt)   // 下标取出第1个和第9个值}).filter(_._2 == 1)    // 筛选条件:把第二个元素等于1.keyBy(_._1)    // 将第一个元素作为key值.timeWindow(Time.minutes(5))   // 间隔5分钟进行计算.sum(1)inputMap.print("ds")// TODO 与 Redis 数据库进行连接// 创建Redis数据库的连接属性val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()   // 创建一个FlinkJedisPoolConfig对象.setHost("bigdata1")    // 设置Redis数据库的主机地址.setPort(6379)          // 设置Redis数据库的端口号.build()// 创建RedisSink对象,并将数据写入Redis中val redisSink = new RedisSink[(Int, Int)](config, new MyRedisMapper)   // MyRedisMapper是一个自定义的映射器,将flink的数据转换为Redis的格式// 发送数据inputMap.addSink(redisSink)    // 将flink的数据流和Redis数据库连接起来// 执行Flink程序env.execute("kafkaToRedis")    // 向flink提交作业,开始执行}//    根据题目要求class MyRedisMapper extends RedisMapper[(Int, Int)] {     // RedisMapper的方法是是将把flink的数据存储为Redis的存储格式//这里使用RedisCommand.HSET不用RedisCommand.SET,前者创建RedisHash表后者创建Redis普通的String对应表override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET,"totalproduce")override def getKeyFromData(t: (Int, Int)): String = t._1 + ""override def getValueFromData(t: (Int, Int)): String = t._2 + ""}}

执行结果:

        

        

二, 使用Flink消费Kafka中ChangeRecord主题的数据 

启动Flume a1, a1为所赋予的名称

        

 flume-ng agent --conf-file /opt/module/flume-1.9.0/job/flume-to-kafka-changerecord        --name a1 -Dflume.root.logger=DEBUG,console

        启动一个Kafka的消费者(consumer)来消费(读取)Kafka中的消息         

kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --from-beginning --topic ChangeRecord       

        

       二, 使用Flink消费Kafka中ChangeRecord主题的数据,当某设备30秒状态连续为“预警”,输出预警信息。当前预警信息输出后,最近30秒不再重复预警(即如果连续1分钟状态都为“预警”只输出两次预警信息)。将结果存入Redis中,key值为“warning30sMachine”,value值为“设备id,预警信息”。使用redis cli以HGETALL key方式获取warning30sMachine值,将结果截图粘贴至对应报告中,需两次截图,第一次截图和第二次截图间隔一分钟以上,第一次截图放前面,第二次放后面;

注:时间使用change_start_time字段,忽略数据中的change_end_time不参与任何计算。忽略数据迟到问题。

Redis的value示例:115,2022-01-01 09:53:10:设备115 连续30秒为预警状态请尽快处理!

(2022-01-01 09:53:10 为change_start_time字段值,中文内容及格式必须为示例所示内容。)

具体代码(scala):

        具体执行代码①:
package gyflinkimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}import java.text.SimpleDateFormat// 定义一个Change类,这个类里面定义四个参数,这四个参数对应着分割后的元素
case class Change(ChangeId: Int, ChangeState:String, ChangeTime:String, timeStamp:Long)object flink_kafka_to_redis2 {def main(args: Array[String]): Unit = {/**                         25_299_649,111,13,预警,2024-01-09 11:08:08,2024-01-09 11:08:52,15ChangeRecord的日志信息: 22_220_698,114,29,预警,2024-01-09 11:07:42,2024-01-09 11:09:00,15* */// TODO 创建flink的执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)   // 设置并行度为1,单节点运行// TODO 与kafka进行连接val kafkaSource = KafkaSource.builder().setBootstrapServers("bigdata1:9092") // 设置kafka服务器地址.setTopics("ChangeRecord") // flink需要订阅的主题.setValueOnlyDeserializer(new SimpleStringSchema()) // 设置只对value反序列化器,由于kafka使用网络进行传输,发送的是序列化数据,所以flink要做反序列化操作.setStartingOffsets(OffsetsInitializer.latest()) // 设置读取偏移量,从kafka最新的记录开始读取.build()// TODO 读取kafka数据,设置无水印val produceDataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_flink_redis")//                                      kafka属性                 水印设置                    名称val kafka_value = produceDataStream.map(x => {val data = x.split(",")   // 每一条记录以‘,’进行分割val timestamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(data(4)).getTime    // 将string类型的时间转换为timestamp类型,形成时间戳Change(data(1).toInt, data(3), data(4),timestamp)     // 输出:Change(110,预警,2024-01-18 14:09:36,1705558176000)})// 设置水位线val waterTimeStream = kafka_value.assignTimestampsAndWatermarks(   // 创建一个新的watermark策略,并应用与kafka数据流// 流过来的数据时间是递增的,将迟到的数据直接丢弃WatermarkStrategy.forMonotonousTimestamps()    // 用于处理单调递增的时间戳(升序的时间戳).withTimestampAssigner(new SerializableTimestampAssigner[Change] {    // 定义了一个时间戳分配器,从每个事件中提取时间戳override def extractTimestamp(change: Change, recordTimestamp: Long): Long = {    // 定义了两个参数,第一个参数表示Change类型,第二个是个Long类型,这个函数返回值为Long的change.timeStamp    // 从 change(Change) 提取timeStamp的参数}}))// 开始处理数据流val resultSteam = waterTimeStream.keyBy(_.ChangeId) // 按照ChangeId进行分组.process(new flink_kafka_to_redis2_Process)   // 调用处理类// 与Redis建立连接val JedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("bigdata1").setPort(6379)//      .setDatabase(0).build()val Warning30Machine = new RedisMapper[(Int, String)] {override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "warning30sMachine")override def getKeyFromData(t: (Int, String)): String = t._1.toStringoverride def getValueFromData(t: (Int, String)): String = t._2}// 建立Redis通道val redisSink = new RedisSink[(Int, String)](JedisPoolConfig, Warning30Machine)// 将结果流加入到通道resultSteam.addSink(redisSink)resultSteam.print()env.execute()}}
        重要逻辑代码②:
package gyflink
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collectorclass flink_kafka_to_redis2_Process extends KeyedProcessFunction[Int,Change, (Int, String)] {//                                                           键类型  输入类型    输出类型// 用于保存上一条的记录的状态private lazy val lastState:ValueState[Change] = getRuntimeContext.getState(    // 延迟初始化的私有变量new ValueStateDescriptor[Change]("lastState",classOf[Change]))override def processElement(Change: Change, ctx: KeyedProcessFunction[Int, Change, (Int, String)]#Context, out: Collector[(Int, String)]): Unit = {// 获取定时服务val timerService = ctx.timerService()// 如果是预警信息if (Change.ChangeState.equals("预警")){if (lastState.value() == null){lastState.update(Change)timerService.registerEventTimeTimer(Change.timeStamp + 30000)}} else {// 出现不是预警信息,删除存在的定时器,如果不存在定时器会忽略if (lastState.value() != null){timerService.deleteEventTimeTimer(lastState.value().timeStamp + 30000)lastState.update(null)}}}// 定时器逻辑override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Int, Change, (Int, String)]#OnTimerContext, out: Collector[(Int, String)]): Unit = {val record = lastState.value()//    out.collect((record.ChangeId,s"${record.ChangeTime}:设备${record.ChangeId}连续30秒为预警状态请尽快处理!"))out.collect(record.ChangeId,s"${record.ChangeId},${record.ChangeTime}:设备${record.ChangeId} 连续30 秒为预警状态请尽快处理!")lastState.update(null)}}
执行结果为:

相关文章:

使用Flink处理Kafka中的数据

目录 使用Flink处理Kafka中的数据 前提: 一, 使用Flink消费Kafka中ProduceRecord主题的数据 具体代码为(scala) 执行结果 二, 使用Flink消费Kafka中ChangeRecord主题的数据 具体代码(scala) 具体执行代码① 重要逻…...

跟着pink老师前端入门教程-day07

去掉li前面的项目符号(小圆点) 语法:list-style: none; 十五、圆角边框 在CSS3中,新增了圆角边框样式,这样盒子就可以变成圆角 border-radius属性用于设置元素的外边框圆角 语法:border-radius:length…...

Pixelmator Pro Mac版 v3.5 图像处理软件 兼容 M1/M2

在当今数字化时代,图像编辑软件成为了许多人必备的工具之一。无论您是摄影师、设计师还是普通用户,您都需要一款功能强大、易于使用的图像编辑软件来处理和优化您的照片和图像。而Pixelmator Pro for Mac正是满足这一需求的理想选择。 Pixelmator Pro f…...

《吐血整理》进阶系列教程-拿捏Fiddler抓包教程(15)-Fiddler弱网测试,知否知否,应是必知必会

1.简介 现在这个时代已经属于流量时代,用户对于App或者小程序之类的操作界面的数据和交互的要求也越来越高。对于测试人员弱网测试也是需要考验自己专业技术能力的一种技能。一个合格的测试人员,需要额外关注的场景就远不止断网、网络故障等情况了。还要…...

【vscode】远程资源管理器自动登录服务器保姆级教程

远程资源管理器自动登录服务器 介绍如何配置本地生成rsa服务端添加rsa.pub配置config文件 介绍 vscode SSH 保存密码自动登录服务器 对比通过账号密码登录,自动连接能节约更多时间效率,且通过vim修改不容易发现一些换行或者引号导致的错误,v…...

写点东西《Javascript switch 语句的替代方法》

写点东西《Javascript switch 语句的替代方法》 那么 switch 语句有什么问题? Object Literal 查找的替代方法 将我们学到的东西变成一个实用函数 您需要的一切都在一个地方# [](#javascript-version) Javascript 版本Tyepscript version🌟更多精彩 本文…...

python学习笔记10(循环结构2)

(一)循环结构2 1、扩展模式 语法: for 循环变量 in 遍历对象: 语句块1 else: 语句块2 说明:else在循环结束后执行,通常和break和continue结合使用 2、无限循环while while 表达式: 语句块…...

Codefroces 191A - Dynasty Puzzles

思路 d p dp dp d p i , j dp_{i,j} dpi,j​ 表示以 i i i 开始以 j j j 结尾的最长长度。方程: d p j , r m a x ( d p j , l , d p j , l l e n g t h l , r ) dp_{j,r}max(dp_{j,l}\;,\;dp_{j,l}length_{l,r}) dpj,r​max(dpj,l​,dpj,l​lengthl,r​) 有点区…...

HIVE中关联键类型不同导致数据重复,以及数据倾斜

比如左表关联键是string类型,右表关联键是bigint类型,关联后会出现多条的情况 解决方案: 关联键先统一转成string类型再进行关联 原因: 根据HIVE版本不同,数据位数上限不同, 低版本的超过16位会出现这种…...

CRM系统是如何解决企业的痛点的?

在当今竞争激烈的商业世界中,客户关系管理(CRM)数字化转型已经成为大企业成功的重要秘诀。大型跨国公司如亚马逊、苹果和微软等已经在CRM数字化方面走在了前列,实现了高度个性化的客户体验,加强了客户忠诚度。 然而&a…...

系统架构14 - 软件工程(2)

需求工程 需求工程软件需求两大过程三个层次业务需求(business requirement)用户需求(user requirement)功能需求 (functional requirement)非功能需求 概述活动阶段需求获取基本步骤获取方法 需求分析三大模型数据流图数据字典DD需求定义方法 需求验证需求管理需求基线变更控制…...

vue封装接口

目录 封装接口前缀 配置逻辑 接口存放文件 配置代理 获取数据方法 封装接口前缀 config.js const serverConfig {baseURL: "https://xxx.xxxxxxxx.com/api", // 请求基础地址,可根据环境自定义useTokenAuthorization: false, // 是否开启 token 认证};export …...

Dell戴尔XPS 8930笔记本电脑原装Win10系统 恢复出厂预装OEM系统

链接:https://pan.baidu.com/s/1eaTQeX-LnPJwWt3fBJD8lg?pwdajy2 提取码:ajy2 原厂系统自带所有驱动、出厂主题壁纸、系统属性联机支持标志、系统属性专属LOGO标志、Office办公软件、MyDell等预装程序 文件格式:esd/wim/swm 安装方式&am…...

elasticsearch的拼音分词器安装

安装拼音分词器 第一步:下载 要实现根据字母做补全,就必须对文档按照拼音分词。在 GitHub 上恰好有 elasticsearch 的拼音分词插件。地址: 仓管的主页: https://github.com/infinilabs/analysis-pinyin 仓管的版本页 https:…...

2024阿里云优惠,云服务器61元一年起

2024年最新阿里云主机价格,最低配置2核2G3M起步,只要61元一年,还可以在阿里云CLUB中心领券 aliyun.club 专用满减优惠券。 1、云服务器ECS经济型e实例2核2G、3M固定带宽99元一年 2、轻量应用服务器2核2G3M带宽轻量服务器一年61元 3、阿里云轻…...

基于SpringBoot+Vue实现的社区养老管理平台(源码+数据库脚本+设计文档+部署视频)

系统介绍 基于SpringBootVue实现的社区养老服务管理平台采用springboot以及vue框架技术,实现了社区养老管理系统,实现了对养老院的员工、管理员对入住的老人及其健康档案实现信息化管理。 技术选型 开发工具:idea2020.3Webstorm2020.3(其他…...

【漏洞复现】CloudPanel makefile接口远程命令执行漏洞(CVE-2023-35885)

文章目录 前言声明一、CloudPanel 简介二、漏洞描述三、影响版本四、漏洞复现五、修复建议 前言 CloudPanel 是一个基于 Web 的控制面板或管理界面,旨在简化云托管环境的管理。它提供了一个集中式平台,用于管理云基础架构的各个方面,包括 &a…...

【Spring Boot 3】【Redis】集成Redisson

【Spring Boot 3】【Redis】集成Redisson 背景介绍开发环境开发步骤及源码工程目录结构总结背景 软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花…...

YOLOv8改进 | Conv篇 | 2024.1月最新成果可变形卷积DCNv4(适用检测、Seg、分类、Pose、OBB)

一、本文介绍 本文给大家带来的改进机制是2024-1月的最新成果DCNv4,其是DCNv3的升级版本,效果可以说是在目前的卷积中名列前茅了,同时该卷积具有轻量化的效果!一个DCNv4参数量下降越15Wparameters左右,。它主要通过两个方面对前一版本DCNv3进行改进:首先,它移除了空间聚…...

理解反向代理

反向代理是一个不可或缺的组件。 它在客户端和服务器之间充当中介,提高了安全性、负载平衡和应用性能。 一、反向代理简介 反向代理是一种服务器,它位于客户端和后端服务器之间。与常见的(正向)代理不同,反向代理代表…...

vscode里如何用git

打开vs终端执行如下: 1 初始化 Git 仓库(如果尚未初始化) git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...

【JavaEE】-- HTTP

1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器

一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...

通过Wrangler CLI在worker中创建数据库和表

官方使用文档:Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后,会在本地和远程创建数据库: npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库: 现在,您的Cloudfla…...

AtCoder 第409​场初级竞赛 A~E题解

A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入,一个是通过INMP441麦克风模块采集音频,一个是通过PCM5102A模块播放音频,那如果我们将两者结合起来,将麦克风采集到的音频通过PCM5102A播放,是不是就可以做一个扩音器了呢…...

【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)

骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...

IT供电系统绝缘监测及故障定位解决方案

随着新能源的快速发展,光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域,IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选,但在长期运行中,例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...

DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”

目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中,车辆不再仅仅是传统的交通工具,而是逐步演变为高度智能的移动终端。这一转变的核心支撑,来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒(T-Box)方案:NXP S32K146 与…...