使用Flink处理Kafka中的数据
目录
使用Flink处理Kafka中的数据
前提:
一, 使用Flink消费Kafka中ProduceRecord主题的数据
具体代码为(scala)
执行结果
二, 使用Flink消费Kafka中ChangeRecord主题的数据
具体代码(scala)
具体执行代码①
重要逻辑代码②
执行结果为:
使用Flink处理Kafka中的数据
前提:
创建主题 : ChangeRecord , ProduceRecord
使用kafka-topics.sh --zookeeper bigdata1:2181/kafka --list 查看主题
kafka-topics.sh --zookeeper bigdata1:2181/kafka --list

然后开启数据生成器
./jnamake_data_file_v1

一, 使用Flink消费Kafka中ProduceRecord主题的数据
启动Flume a1, a1为所赋予的名称


flume-ng agent --conf-file /opt/module/flume-1.9.0/job/flume-to-kafka-producerecord--name a1 -Dflume.root.logger=DEBUG,console
启动一个Kafka的消费者(consumer)来消费(读取)Kafka中的消息
kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --from-beginning --topic ProduceRecord

编写Scala工程代码,使用Flink消费Kafka中的数据并进行相应的数据统计计算。
一, 使用Flink消费Kafka中ProduceRecord主题的数据,统计在已经检验的产品中,各设备每五分钟生产产品总数,将结果存入Redis中,key值为“totalproduce”,value值为“设备id,最近五分钟生产总数”。使用redis cli以HGETALL key方式获取totalproduce值,将结果截图粘贴至对应报告中,需两次截图,第一次截图和第二次截图间隔五分钟以上,第一次截图放前面,第二次放后面;
注:ProduceRecord主题,生产一个产品产生一条数据;
change_handle_state字段为1代表已经检验,0代表未检验;
时间语义使用Processing Time。
具体代码为(scala):
package gyflink
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisClusterConfig, FlinkJedisPoolConfig}
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}import java.util.Properties
object test1{def main(args: Array[String]): Unit = {// 创建Flink流执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 设置并行度env.setParallelism(1)//指定时间语义env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)// kafka的属性配置val properties = new Properties()properties.setProperty("bootstrap.servers","bigdata1:9092,bigdata2:9092,bigdata3:9092")properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringSerializer")properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringSerializer")properties.setProperty("auto.offset.reset","earliest")// 读取kafka数据val FlinkKafkaConsumer = new FlinkKafkaConsumer[String]("ProduceRecord", new SimpleStringSchema(), properties)val text = env.addSource(FlinkKafkaConsumer)// TODO 使用flink算子对数据进行处理// topic的一条数据:2214,117,0002,2024-01-09 11:08:53,2024-01-09 11:08:53,2024-01-09 11:08:59,15897,1900-01-01 00:00:00,188815,0val inputMap = text.map(link => {val arr = link.split(",") // 使用‘,’作为分割符(arr(1).toInt, arr(9).toInt) // 下标取出第1个和第9个值}).filter(_._2 == 1) // 筛选条件:把第二个元素等于1.keyBy(_._1) // 将第一个元素作为key值.timeWindow(Time.minutes(5)) // 间隔5分钟进行计算.sum(1)inputMap.print("ds")// TODO 与 Redis 数据库进行连接// 创建Redis数据库的连接属性val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder() // 创建一个FlinkJedisPoolConfig对象.setHost("bigdata1") // 设置Redis数据库的主机地址.setPort(6379) // 设置Redis数据库的端口号.build()// 创建RedisSink对象,并将数据写入Redis中val redisSink = new RedisSink[(Int, Int)](config, new MyRedisMapper) // MyRedisMapper是一个自定义的映射器,将flink的数据转换为Redis的格式// 发送数据inputMap.addSink(redisSink) // 将flink的数据流和Redis数据库连接起来// 执行Flink程序env.execute("kafkaToRedis") // 向flink提交作业,开始执行}// 根据题目要求class MyRedisMapper extends RedisMapper[(Int, Int)] { // RedisMapper的方法是是将把flink的数据存储为Redis的存储格式//这里使用RedisCommand.HSET不用RedisCommand.SET,前者创建RedisHash表后者创建Redis普通的String对应表override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET,"totalproduce")override def getKeyFromData(t: (Int, Int)): String = t._1 + ""override def getValueFromData(t: (Int, Int)): String = t._2 + ""}}
执行结果:


二, 使用Flink消费Kafka中ChangeRecord主题的数据
启动Flume a1, a1为所赋予的名称

flume-ng agent --conf-file /opt/module/flume-1.9.0/job/flume-to-kafka-changerecord --name a1 -Dflume.root.logger=DEBUG,console
启动一个Kafka的消费者(consumer)来消费(读取)Kafka中的消息
kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --from-beginning --topic ChangeRecord

二, 使用Flink消费Kafka中ChangeRecord主题的数据,当某设备30秒状态连续为“预警”,输出预警信息。当前预警信息输出后,最近30秒不再重复预警(即如果连续1分钟状态都为“预警”只输出两次预警信息)。将结果存入Redis中,key值为“warning30sMachine”,value值为“设备id,预警信息”。使用redis cli以HGETALL key方式获取warning30sMachine值,将结果截图粘贴至对应报告中,需两次截图,第一次截图和第二次截图间隔一分钟以上,第一次截图放前面,第二次放后面;
注:时间使用change_start_time字段,忽略数据中的change_end_time不参与任何计算。忽略数据迟到问题。
Redis的value示例:115,2022-01-01 09:53:10:设备115 连续30秒为预警状态请尽快处理!
(2022-01-01 09:53:10 为change_start_time字段值,中文内容及格式必须为示例所示内容。)
具体代码(scala):
具体执行代码①:
package gyflinkimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}import java.text.SimpleDateFormat// 定义一个Change类,这个类里面定义四个参数,这四个参数对应着分割后的元素
case class Change(ChangeId: Int, ChangeState:String, ChangeTime:String, timeStamp:Long)object flink_kafka_to_redis2 {def main(args: Array[String]): Unit = {/** 25_299_649,111,13,预警,2024-01-09 11:08:08,2024-01-09 11:08:52,15ChangeRecord的日志信息: 22_220_698,114,29,预警,2024-01-09 11:07:42,2024-01-09 11:09:00,15* */// TODO 创建flink的执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1) // 设置并行度为1,单节点运行// TODO 与kafka进行连接val kafkaSource = KafkaSource.builder().setBootstrapServers("bigdata1:9092") // 设置kafka服务器地址.setTopics("ChangeRecord") // flink需要订阅的主题.setValueOnlyDeserializer(new SimpleStringSchema()) // 设置只对value反序列化器,由于kafka使用网络进行传输,发送的是序列化数据,所以flink要做反序列化操作.setStartingOffsets(OffsetsInitializer.latest()) // 设置读取偏移量,从kafka最新的记录开始读取.build()// TODO 读取kafka数据,设置无水印val produceDataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_flink_redis")// kafka属性 水印设置 名称val kafka_value = produceDataStream.map(x => {val data = x.split(",") // 每一条记录以‘,’进行分割val timestamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(data(4)).getTime // 将string类型的时间转换为timestamp类型,形成时间戳Change(data(1).toInt, data(3), data(4),timestamp) // 输出:Change(110,预警,2024-01-18 14:09:36,1705558176000)})// 设置水位线val waterTimeStream = kafka_value.assignTimestampsAndWatermarks( // 创建一个新的watermark策略,并应用与kafka数据流// 流过来的数据时间是递增的,将迟到的数据直接丢弃WatermarkStrategy.forMonotonousTimestamps() // 用于处理单调递增的时间戳(升序的时间戳).withTimestampAssigner(new SerializableTimestampAssigner[Change] { // 定义了一个时间戳分配器,从每个事件中提取时间戳override def extractTimestamp(change: Change, recordTimestamp: Long): Long = { // 定义了两个参数,第一个参数表示Change类型,第二个是个Long类型,这个函数返回值为Long的change.timeStamp // 从 change(Change) 提取timeStamp的参数}}))// 开始处理数据流val resultSteam = waterTimeStream.keyBy(_.ChangeId) // 按照ChangeId进行分组.process(new flink_kafka_to_redis2_Process) // 调用处理类// 与Redis建立连接val JedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("bigdata1").setPort(6379)// .setDatabase(0).build()val Warning30Machine = new RedisMapper[(Int, String)] {override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "warning30sMachine")override def getKeyFromData(t: (Int, String)): String = t._1.toStringoverride def getValueFromData(t: (Int, String)): String = t._2}// 建立Redis通道val redisSink = new RedisSink[(Int, String)](JedisPoolConfig, Warning30Machine)// 将结果流加入到通道resultSteam.addSink(redisSink)resultSteam.print()env.execute()}}
重要逻辑代码②:
package gyflink
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collectorclass flink_kafka_to_redis2_Process extends KeyedProcessFunction[Int,Change, (Int, String)] {// 键类型 输入类型 输出类型// 用于保存上一条的记录的状态private lazy val lastState:ValueState[Change] = getRuntimeContext.getState( // 延迟初始化的私有变量new ValueStateDescriptor[Change]("lastState",classOf[Change]))override def processElement(Change: Change, ctx: KeyedProcessFunction[Int, Change, (Int, String)]#Context, out: Collector[(Int, String)]): Unit = {// 获取定时服务val timerService = ctx.timerService()// 如果是预警信息if (Change.ChangeState.equals("预警")){if (lastState.value() == null){lastState.update(Change)timerService.registerEventTimeTimer(Change.timeStamp + 30000)}} else {// 出现不是预警信息,删除存在的定时器,如果不存在定时器会忽略if (lastState.value() != null){timerService.deleteEventTimeTimer(lastState.value().timeStamp + 30000)lastState.update(null)}}}// 定时器逻辑override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Int, Change, (Int, String)]#OnTimerContext, out: Collector[(Int, String)]): Unit = {val record = lastState.value()// out.collect((record.ChangeId,s"${record.ChangeTime}:设备${record.ChangeId}连续30秒为预警状态请尽快处理!"))out.collect(record.ChangeId,s"${record.ChangeId},${record.ChangeTime}:设备${record.ChangeId} 连续30 秒为预警状态请尽快处理!")lastState.update(null)}}
执行结果为:


相关文章:
使用Flink处理Kafka中的数据
目录 使用Flink处理Kafka中的数据 前提: 一, 使用Flink消费Kafka中ProduceRecord主题的数据 具体代码为(scala) 执行结果 二, 使用Flink消费Kafka中ChangeRecord主题的数据 具体代码(scala) 具体执行代码① 重要逻…...
跟着pink老师前端入门教程-day07
去掉li前面的项目符号(小圆点) 语法:list-style: none; 十五、圆角边框 在CSS3中,新增了圆角边框样式,这样盒子就可以变成圆角 border-radius属性用于设置元素的外边框圆角 语法:border-radius:length…...
Pixelmator Pro Mac版 v3.5 图像处理软件 兼容 M1/M2
在当今数字化时代,图像编辑软件成为了许多人必备的工具之一。无论您是摄影师、设计师还是普通用户,您都需要一款功能强大、易于使用的图像编辑软件来处理和优化您的照片和图像。而Pixelmator Pro for Mac正是满足这一需求的理想选择。 Pixelmator Pro f…...
《吐血整理》进阶系列教程-拿捏Fiddler抓包教程(15)-Fiddler弱网测试,知否知否,应是必知必会
1.简介 现在这个时代已经属于流量时代,用户对于App或者小程序之类的操作界面的数据和交互的要求也越来越高。对于测试人员弱网测试也是需要考验自己专业技术能力的一种技能。一个合格的测试人员,需要额外关注的场景就远不止断网、网络故障等情况了。还要…...
【vscode】远程资源管理器自动登录服务器保姆级教程
远程资源管理器自动登录服务器 介绍如何配置本地生成rsa服务端添加rsa.pub配置config文件 介绍 vscode SSH 保存密码自动登录服务器 对比通过账号密码登录,自动连接能节约更多时间效率,且通过vim修改不容易发现一些换行或者引号导致的错误,v…...
写点东西《Javascript switch 语句的替代方法》
写点东西《Javascript switch 语句的替代方法》 那么 switch 语句有什么问题? Object Literal 查找的替代方法 将我们学到的东西变成一个实用函数 您需要的一切都在一个地方# [](#javascript-version) Javascript 版本Tyepscript version🌟更多精彩 本文…...
python学习笔记10(循环结构2)
(一)循环结构2 1、扩展模式 语法: for 循环变量 in 遍历对象: 语句块1 else: 语句块2 说明:else在循环结束后执行,通常和break和continue结合使用 2、无限循环while while 表达式: 语句块…...
Codefroces 191A - Dynasty Puzzles
思路 d p dp dp d p i , j dp_{i,j} dpi,j 表示以 i i i 开始以 j j j 结尾的最长长度。方程: d p j , r m a x ( d p j , l , d p j , l l e n g t h l , r ) dp_{j,r}max(dp_{j,l}\;,\;dp_{j,l}length_{l,r}) dpj,rmax(dpj,l,dpj,llengthl,r) 有点区…...
HIVE中关联键类型不同导致数据重复,以及数据倾斜
比如左表关联键是string类型,右表关联键是bigint类型,关联后会出现多条的情况 解决方案: 关联键先统一转成string类型再进行关联 原因: 根据HIVE版本不同,数据位数上限不同, 低版本的超过16位会出现这种…...
CRM系统是如何解决企业的痛点的?
在当今竞争激烈的商业世界中,客户关系管理(CRM)数字化转型已经成为大企业成功的重要秘诀。大型跨国公司如亚马逊、苹果和微软等已经在CRM数字化方面走在了前列,实现了高度个性化的客户体验,加强了客户忠诚度。 然而&a…...
系统架构14 - 软件工程(2)
需求工程 需求工程软件需求两大过程三个层次业务需求(business requirement)用户需求(user requirement)功能需求 (functional requirement)非功能需求 概述活动阶段需求获取基本步骤获取方法 需求分析三大模型数据流图数据字典DD需求定义方法 需求验证需求管理需求基线变更控制…...
vue封装接口
目录 封装接口前缀 配置逻辑 接口存放文件 配置代理 获取数据方法 封装接口前缀 config.js const serverConfig {baseURL: "https://xxx.xxxxxxxx.com/api", // 请求基础地址,可根据环境自定义useTokenAuthorization: false, // 是否开启 token 认证};export …...
Dell戴尔XPS 8930笔记本电脑原装Win10系统 恢复出厂预装OEM系统
链接:https://pan.baidu.com/s/1eaTQeX-LnPJwWt3fBJD8lg?pwdajy2 提取码:ajy2 原厂系统自带所有驱动、出厂主题壁纸、系统属性联机支持标志、系统属性专属LOGO标志、Office办公软件、MyDell等预装程序 文件格式:esd/wim/swm 安装方式&am…...
elasticsearch的拼音分词器安装
安装拼音分词器 第一步:下载 要实现根据字母做补全,就必须对文档按照拼音分词。在 GitHub 上恰好有 elasticsearch 的拼音分词插件。地址: 仓管的主页: https://github.com/infinilabs/analysis-pinyin 仓管的版本页 https:…...
2024阿里云优惠,云服务器61元一年起
2024年最新阿里云主机价格,最低配置2核2G3M起步,只要61元一年,还可以在阿里云CLUB中心领券 aliyun.club 专用满减优惠券。 1、云服务器ECS经济型e实例2核2G、3M固定带宽99元一年 2、轻量应用服务器2核2G3M带宽轻量服务器一年61元 3、阿里云轻…...
基于SpringBoot+Vue实现的社区养老管理平台(源码+数据库脚本+设计文档+部署视频)
系统介绍 基于SpringBootVue实现的社区养老服务管理平台采用springboot以及vue框架技术,实现了社区养老管理系统,实现了对养老院的员工、管理员对入住的老人及其健康档案实现信息化管理。 技术选型 开发工具:idea2020.3Webstorm2020.3(其他…...
【漏洞复现】CloudPanel makefile接口远程命令执行漏洞(CVE-2023-35885)
文章目录 前言声明一、CloudPanel 简介二、漏洞描述三、影响版本四、漏洞复现五、修复建议 前言 CloudPanel 是一个基于 Web 的控制面板或管理界面,旨在简化云托管环境的管理。它提供了一个集中式平台,用于管理云基础架构的各个方面,包括 &a…...
【Spring Boot 3】【Redis】集成Redisson
【Spring Boot 3】【Redis】集成Redisson 背景介绍开发环境开发步骤及源码工程目录结构总结背景 软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花…...
YOLOv8改进 | Conv篇 | 2024.1月最新成果可变形卷积DCNv4(适用检测、Seg、分类、Pose、OBB)
一、本文介绍 本文给大家带来的改进机制是2024-1月的最新成果DCNv4,其是DCNv3的升级版本,效果可以说是在目前的卷积中名列前茅了,同时该卷积具有轻量化的效果!一个DCNv4参数量下降越15Wparameters左右,。它主要通过两个方面对前一版本DCNv3进行改进:首先,它移除了空间聚…...
理解反向代理
反向代理是一个不可或缺的组件。 它在客户端和服务器之间充当中介,提高了安全性、负载平衡和应用性能。 一、反向代理简介 反向代理是一种服务器,它位于客户端和后端服务器之间。与常见的(正向)代理不同,反向代理代表…...
华为云AI开发平台ModelArts
华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...
基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
什么是库存周转?如何用进销存系统提高库存周转率?
你可能听说过这样一句话: “利润不是赚出来的,是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业,很多企业看着销售不错,账上却没钱、利润也不见了,一翻库存才发现: 一堆卖不动的旧货…...
Spring AI 入门:Java 开发者的生成式 AI 实践之路
一、Spring AI 简介 在人工智能技术快速迭代的今天,Spring AI 作为 Spring 生态系统的新生力量,正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务(如 OpenAI、Anthropic)的无缝对接&…...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...
macOS 终端智能代理检测
🧠 终端智能代理检测:自动判断是否需要设置代理访问 GitHub 在开发中,使用 GitHub 是非常常见的需求。但有时候我们会发现某些命令失败、插件无法更新,例如: fatal: unable to access https://github.com/ohmyzsh/oh…...
Linux中INADDR_ANY详解
在Linux网络编程中,INADDR_ANY 是一个特殊的IPv4地址常量(定义在 <netinet/in.h> 头文件中),用于表示绑定到所有可用网络接口的地址。它是服务器程序中的常见用法,允许套接字监听所有本地IP地址上的连接请求。 关…...
MySQL基本操作(续)
第3章:MySQL基本操作(续) 3.3 表操作 表是关系型数据库中存储数据的基本结构,由行和列组成。在MySQL中,表操作包括创建表、查看表结构、修改表和删除表等。本节将详细介绍这些操作。 3.3.1 创建表 在MySQL中&#…...
break 语句和 continue 语句
break语句和continue语句都具有跳转作用,可以让代码不按既有的顺序执行 break break语句用于跳出代码块或循环 1 2 3 4 5 6 for (var i 0; i < 5; i) { if (i 3){ break; } console.log(i); } continue continue语句用于立即终…...
