使用Flink处理Kafka中的数据
目录
使用Flink处理Kafka中的数据
前提:
一, 使用Flink消费Kafka中ProduceRecord主题的数据
具体代码为(scala)
执行结果
二, 使用Flink消费Kafka中ChangeRecord主题的数据
具体代码(scala)
具体执行代码①
重要逻辑代码②
执行结果为:
使用Flink处理Kafka中的数据
前提:
创建主题 : ChangeRecord , ProduceRecord
使用kafka-topics.sh --zookeeper bigdata1:2181/kafka --list 查看主题
kafka-topics.sh --zookeeper bigdata1:2181/kafka --list
然后开启数据生成器
./jnamake_data_file_v1
一, 使用Flink消费Kafka中ProduceRecord主题的数据
启动Flume a1, a1为所赋予的名称
flume-ng agent --conf-file /opt/module/flume-1.9.0/job/flume-to-kafka-producerecord--name a1 -Dflume.root.logger=DEBUG,console
启动一个Kafka的消费者(consumer)来消费(读取)Kafka中的消息
kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --from-beginning --topic ProduceRecord
编写Scala工程代码,使用Flink消费Kafka中的数据并进行相应的数据统计计算。
一, 使用Flink消费Kafka中ProduceRecord主题的数据,统计在已经检验的产品中,各设备每五分钟生产产品总数,将结果存入Redis中,key值为“totalproduce”,value值为“设备id,最近五分钟生产总数”。使用redis cli以HGETALL key方式获取totalproduce值,将结果截图粘贴至对应报告中,需两次截图,第一次截图和第二次截图间隔五分钟以上,第一次截图放前面,第二次放后面;
注:ProduceRecord主题,生产一个产品产生一条数据;
change_handle_state字段为1代表已经检验,0代表未检验;
时间语义使用Processing Time。
具体代码为(scala):
package gyflink
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.{FlinkJedisClusterConfig, FlinkJedisPoolConfig}
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}import java.util.Properties
object test1{def main(args: Array[String]): Unit = {// 创建Flink流执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 设置并行度env.setParallelism(1)//指定时间语义env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)// kafka的属性配置val properties = new Properties()properties.setProperty("bootstrap.servers","bigdata1:9092,bigdata2:9092,bigdata3:9092")properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer")properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringSerializer")properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringSerializer")properties.setProperty("auto.offset.reset","earliest")// 读取kafka数据val FlinkKafkaConsumer = new FlinkKafkaConsumer[String]("ProduceRecord", new SimpleStringSchema(), properties)val text = env.addSource(FlinkKafkaConsumer)// TODO 使用flink算子对数据进行处理// topic的一条数据:2214,117,0002,2024-01-09 11:08:53,2024-01-09 11:08:53,2024-01-09 11:08:59,15897,1900-01-01 00:00:00,188815,0val inputMap = text.map(link => {val arr = link.split(",") // 使用‘,’作为分割符(arr(1).toInt, arr(9).toInt) // 下标取出第1个和第9个值}).filter(_._2 == 1) // 筛选条件:把第二个元素等于1.keyBy(_._1) // 将第一个元素作为key值.timeWindow(Time.minutes(5)) // 间隔5分钟进行计算.sum(1)inputMap.print("ds")// TODO 与 Redis 数据库进行连接// 创建Redis数据库的连接属性val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder() // 创建一个FlinkJedisPoolConfig对象.setHost("bigdata1") // 设置Redis数据库的主机地址.setPort(6379) // 设置Redis数据库的端口号.build()// 创建RedisSink对象,并将数据写入Redis中val redisSink = new RedisSink[(Int, Int)](config, new MyRedisMapper) // MyRedisMapper是一个自定义的映射器,将flink的数据转换为Redis的格式// 发送数据inputMap.addSink(redisSink) // 将flink的数据流和Redis数据库连接起来// 执行Flink程序env.execute("kafkaToRedis") // 向flink提交作业,开始执行}// 根据题目要求class MyRedisMapper extends RedisMapper[(Int, Int)] { // RedisMapper的方法是是将把flink的数据存储为Redis的存储格式//这里使用RedisCommand.HSET不用RedisCommand.SET,前者创建RedisHash表后者创建Redis普通的String对应表override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET,"totalproduce")override def getKeyFromData(t: (Int, Int)): String = t._1 + ""override def getValueFromData(t: (Int, Int)): String = t._2 + ""}}
执行结果:
二, 使用Flink消费Kafka中ChangeRecord主题的数据
启动Flume a1, a1为所赋予的名称
flume-ng agent --conf-file /opt/module/flume-1.9.0/job/flume-to-kafka-changerecord --name a1 -Dflume.root.logger=DEBUG,console
启动一个Kafka的消费者(consumer)来消费(读取)Kafka中的消息
kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --from-beginning --topic ChangeRecord
二, 使用Flink消费Kafka中ChangeRecord主题的数据,当某设备30秒状态连续为“预警”,输出预警信息。当前预警信息输出后,最近30秒不再重复预警(即如果连续1分钟状态都为“预警”只输出两次预警信息)。将结果存入Redis中,key值为“warning30sMachine”,value值为“设备id,预警信息”。使用redis cli以HGETALL key方式获取warning30sMachine值,将结果截图粘贴至对应报告中,需两次截图,第一次截图和第二次截图间隔一分钟以上,第一次截图放前面,第二次放后面;
注:时间使用change_start_time字段,忽略数据中的change_end_time不参与任何计算。忽略数据迟到问题。
Redis的value示例:115,2022-01-01 09:53:10:设备115 连续30秒为预警状态请尽快处理!
(2022-01-01 09:53:10 为change_start_time字段值,中文内容及格式必须为示例所示内容。)
具体代码(scala):
具体执行代码①:
package gyflinkimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}import java.text.SimpleDateFormat// 定义一个Change类,这个类里面定义四个参数,这四个参数对应着分割后的元素
case class Change(ChangeId: Int, ChangeState:String, ChangeTime:String, timeStamp:Long)object flink_kafka_to_redis2 {def main(args: Array[String]): Unit = {/** 25_299_649,111,13,预警,2024-01-09 11:08:08,2024-01-09 11:08:52,15ChangeRecord的日志信息: 22_220_698,114,29,预警,2024-01-09 11:07:42,2024-01-09 11:09:00,15* */// TODO 创建flink的执行环境val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1) // 设置并行度为1,单节点运行// TODO 与kafka进行连接val kafkaSource = KafkaSource.builder().setBootstrapServers("bigdata1:9092") // 设置kafka服务器地址.setTopics("ChangeRecord") // flink需要订阅的主题.setValueOnlyDeserializer(new SimpleStringSchema()) // 设置只对value反序列化器,由于kafka使用网络进行传输,发送的是序列化数据,所以flink要做反序列化操作.setStartingOffsets(OffsetsInitializer.latest()) // 设置读取偏移量,从kafka最新的记录开始读取.build()// TODO 读取kafka数据,设置无水印val produceDataStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_flink_redis")// kafka属性 水印设置 名称val kafka_value = produceDataStream.map(x => {val data = x.split(",") // 每一条记录以‘,’进行分割val timestamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(data(4)).getTime // 将string类型的时间转换为timestamp类型,形成时间戳Change(data(1).toInt, data(3), data(4),timestamp) // 输出:Change(110,预警,2024-01-18 14:09:36,1705558176000)})// 设置水位线val waterTimeStream = kafka_value.assignTimestampsAndWatermarks( // 创建一个新的watermark策略,并应用与kafka数据流// 流过来的数据时间是递增的,将迟到的数据直接丢弃WatermarkStrategy.forMonotonousTimestamps() // 用于处理单调递增的时间戳(升序的时间戳).withTimestampAssigner(new SerializableTimestampAssigner[Change] { // 定义了一个时间戳分配器,从每个事件中提取时间戳override def extractTimestamp(change: Change, recordTimestamp: Long): Long = { // 定义了两个参数,第一个参数表示Change类型,第二个是个Long类型,这个函数返回值为Long的change.timeStamp // 从 change(Change) 提取timeStamp的参数}}))// 开始处理数据流val resultSteam = waterTimeStream.keyBy(_.ChangeId) // 按照ChangeId进行分组.process(new flink_kafka_to_redis2_Process) // 调用处理类// 与Redis建立连接val JedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("bigdata1").setPort(6379)// .setDatabase(0).build()val Warning30Machine = new RedisMapper[(Int, String)] {override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "warning30sMachine")override def getKeyFromData(t: (Int, String)): String = t._1.toStringoverride def getValueFromData(t: (Int, String)): String = t._2}// 建立Redis通道val redisSink = new RedisSink[(Int, String)](JedisPoolConfig, Warning30Machine)// 将结果流加入到通道resultSteam.addSink(redisSink)resultSteam.print()env.execute()}}
重要逻辑代码②:
package gyflink
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collectorclass flink_kafka_to_redis2_Process extends KeyedProcessFunction[Int,Change, (Int, String)] {// 键类型 输入类型 输出类型// 用于保存上一条的记录的状态private lazy val lastState:ValueState[Change] = getRuntimeContext.getState( // 延迟初始化的私有变量new ValueStateDescriptor[Change]("lastState",classOf[Change]))override def processElement(Change: Change, ctx: KeyedProcessFunction[Int, Change, (Int, String)]#Context, out: Collector[(Int, String)]): Unit = {// 获取定时服务val timerService = ctx.timerService()// 如果是预警信息if (Change.ChangeState.equals("预警")){if (lastState.value() == null){lastState.update(Change)timerService.registerEventTimeTimer(Change.timeStamp + 30000)}} else {// 出现不是预警信息,删除存在的定时器,如果不存在定时器会忽略if (lastState.value() != null){timerService.deleteEventTimeTimer(lastState.value().timeStamp + 30000)lastState.update(null)}}}// 定时器逻辑override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Int, Change, (Int, String)]#OnTimerContext, out: Collector[(Int, String)]): Unit = {val record = lastState.value()// out.collect((record.ChangeId,s"${record.ChangeTime}:设备${record.ChangeId}连续30秒为预警状态请尽快处理!"))out.collect(record.ChangeId,s"${record.ChangeId},${record.ChangeTime}:设备${record.ChangeId} 连续30 秒为预警状态请尽快处理!")lastState.update(null)}}
执行结果为:
相关文章:

使用Flink处理Kafka中的数据
目录 使用Flink处理Kafka中的数据 前提: 一, 使用Flink消费Kafka中ProduceRecord主题的数据 具体代码为(scala) 执行结果 二, 使用Flink消费Kafka中ChangeRecord主题的数据 具体代码(scala) 具体执行代码① 重要逻…...

跟着pink老师前端入门教程-day07
去掉li前面的项目符号(小圆点) 语法:list-style: none; 十五、圆角边框 在CSS3中,新增了圆角边框样式,这样盒子就可以变成圆角 border-radius属性用于设置元素的外边框圆角 语法:border-radius:length…...

Pixelmator Pro Mac版 v3.5 图像处理软件 兼容 M1/M2
在当今数字化时代,图像编辑软件成为了许多人必备的工具之一。无论您是摄影师、设计师还是普通用户,您都需要一款功能强大、易于使用的图像编辑软件来处理和优化您的照片和图像。而Pixelmator Pro for Mac正是满足这一需求的理想选择。 Pixelmator Pro f…...

《吐血整理》进阶系列教程-拿捏Fiddler抓包教程(15)-Fiddler弱网测试,知否知否,应是必知必会
1.简介 现在这个时代已经属于流量时代,用户对于App或者小程序之类的操作界面的数据和交互的要求也越来越高。对于测试人员弱网测试也是需要考验自己专业技术能力的一种技能。一个合格的测试人员,需要额外关注的场景就远不止断网、网络故障等情况了。还要…...

【vscode】远程资源管理器自动登录服务器保姆级教程
远程资源管理器自动登录服务器 介绍如何配置本地生成rsa服务端添加rsa.pub配置config文件 介绍 vscode SSH 保存密码自动登录服务器 对比通过账号密码登录,自动连接能节约更多时间效率,且通过vim修改不容易发现一些换行或者引号导致的错误,v…...

写点东西《Javascript switch 语句的替代方法》
写点东西《Javascript switch 语句的替代方法》 那么 switch 语句有什么问题? Object Literal 查找的替代方法 将我们学到的东西变成一个实用函数 您需要的一切都在一个地方# [](#javascript-version) Javascript 版本Tyepscript version🌟更多精彩 本文…...
python学习笔记10(循环结构2)
(一)循环结构2 1、扩展模式 语法: for 循环变量 in 遍历对象: 语句块1 else: 语句块2 说明:else在循环结束后执行,通常和break和continue结合使用 2、无限循环while while 表达式: 语句块…...
Codefroces 191A - Dynasty Puzzles
思路 d p dp dp d p i , j dp_{i,j} dpi,j 表示以 i i i 开始以 j j j 结尾的最长长度。方程: d p j , r m a x ( d p j , l , d p j , l l e n g t h l , r ) dp_{j,r}max(dp_{j,l}\;,\;dp_{j,l}length_{l,r}) dpj,rmax(dpj,l,dpj,llengthl,r) 有点区…...

HIVE中关联键类型不同导致数据重复,以及数据倾斜
比如左表关联键是string类型,右表关联键是bigint类型,关联后会出现多条的情况 解决方案: 关联键先统一转成string类型再进行关联 原因: 根据HIVE版本不同,数据位数上限不同, 低版本的超过16位会出现这种…...

CRM系统是如何解决企业的痛点的?
在当今竞争激烈的商业世界中,客户关系管理(CRM)数字化转型已经成为大企业成功的重要秘诀。大型跨国公司如亚马逊、苹果和微软等已经在CRM数字化方面走在了前列,实现了高度个性化的客户体验,加强了客户忠诚度。 然而&a…...

系统架构14 - 软件工程(2)
需求工程 需求工程软件需求两大过程三个层次业务需求(business requirement)用户需求(user requirement)功能需求 (functional requirement)非功能需求 概述活动阶段需求获取基本步骤获取方法 需求分析三大模型数据流图数据字典DD需求定义方法 需求验证需求管理需求基线变更控制…...

vue封装接口
目录 封装接口前缀 配置逻辑 接口存放文件 配置代理 获取数据方法 封装接口前缀 config.js const serverConfig {baseURL: "https://xxx.xxxxxxxx.com/api", // 请求基础地址,可根据环境自定义useTokenAuthorization: false, // 是否开启 token 认证};export …...

Dell戴尔XPS 8930笔记本电脑原装Win10系统 恢复出厂预装OEM系统
链接:https://pan.baidu.com/s/1eaTQeX-LnPJwWt3fBJD8lg?pwdajy2 提取码:ajy2 原厂系统自带所有驱动、出厂主题壁纸、系统属性联机支持标志、系统属性专属LOGO标志、Office办公软件、MyDell等预装程序 文件格式:esd/wim/swm 安装方式&am…...

elasticsearch的拼音分词器安装
安装拼音分词器 第一步:下载 要实现根据字母做补全,就必须对文档按照拼音分词。在 GitHub 上恰好有 elasticsearch 的拼音分词插件。地址: 仓管的主页: https://github.com/infinilabs/analysis-pinyin 仓管的版本页 https:…...
2024阿里云优惠,云服务器61元一年起
2024年最新阿里云主机价格,最低配置2核2G3M起步,只要61元一年,还可以在阿里云CLUB中心领券 aliyun.club 专用满减优惠券。 1、云服务器ECS经济型e实例2核2G、3M固定带宽99元一年 2、轻量应用服务器2核2G3M带宽轻量服务器一年61元 3、阿里云轻…...

基于SpringBoot+Vue实现的社区养老管理平台(源码+数据库脚本+设计文档+部署视频)
系统介绍 基于SpringBootVue实现的社区养老服务管理平台采用springboot以及vue框架技术,实现了社区养老管理系统,实现了对养老院的员工、管理员对入住的老人及其健康档案实现信息化管理。 技术选型 开发工具:idea2020.3Webstorm2020.3(其他…...

【漏洞复现】CloudPanel makefile接口远程命令执行漏洞(CVE-2023-35885)
文章目录 前言声明一、CloudPanel 简介二、漏洞描述三、影响版本四、漏洞复现五、修复建议 前言 CloudPanel 是一个基于 Web 的控制面板或管理界面,旨在简化云托管环境的管理。它提供了一个集中式平台,用于管理云基础架构的各个方面,包括 &a…...
【Spring Boot 3】【Redis】集成Redisson
【Spring Boot 3】【Redis】集成Redisson 背景介绍开发环境开发步骤及源码工程目录结构总结背景 软件开发是一门实践性科学,对大多数人来说,学习一种新技术不是一开始就去深究其原理,而是先从做出一个可工作的DEMO入手。但在我个人学习和工作经历中,每次学习新技术总是要花…...

YOLOv8改进 | Conv篇 | 2024.1月最新成果可变形卷积DCNv4(适用检测、Seg、分类、Pose、OBB)
一、本文介绍 本文给大家带来的改进机制是2024-1月的最新成果DCNv4,其是DCNv3的升级版本,效果可以说是在目前的卷积中名列前茅了,同时该卷积具有轻量化的效果!一个DCNv4参数量下降越15Wparameters左右,。它主要通过两个方面对前一版本DCNv3进行改进:首先,它移除了空间聚…...

理解反向代理
反向代理是一个不可或缺的组件。 它在客户端和服务器之间充当中介,提高了安全性、负载平衡和应用性能。 一、反向代理简介 反向代理是一种服务器,它位于客户端和后端服务器之间。与常见的(正向)代理不同,反向代理代表…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
Vim 调用外部命令学习笔记
Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...

Docker 离线安装指南
参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性,不同版本的Docker对内核版本有不同要求。例如,Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本,Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
SciencePlots——绘制论文中的图片
文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了:一行…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql
智慧工地管理云平台系统,智慧工地全套源码,java版智慧工地源码,支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求,提供“平台网络终端”的整体解决方案,提供劳务管理、视频管理、智能监测、绿色施工、安全管…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...

Nuxt.js 中的路由配置详解
Nuxt.js 通过其内置的路由系统简化了应用的路由配置,使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...

基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...

用docker来安装部署freeswitch记录
今天刚才测试一个callcenter的项目,所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...