当前位置: 首页 > article >正文

Flink流处理:实时计算URL访问量TopN(基于时间窗口)

目录

代码分析

背景知识拓展

代码调优

1. 性能优化

1.1 使用 KeyedStream 和 ProcessWindowFunction 替代 windowAll

1.2 使用 ReduceFunction 优化聚合

2. 功能扩展

2.1 支持动态窗口大小

2.2 支持多维度统计

2.3 支持持久化存储

3. 代码可读性

3.1 提取公共逻辑

3.2 使用 Scala 的高级特性

4. 异常处理

4.1 处理数据异常

4.2 处理作业异常

5. 完整改进代码

总结


 

package processfunctionimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import source.ClickSourceimport scala.collection.mutable/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: processfunction* @author: 赵嘉盟-HONOR* @data: 2023-11-24 21:32* @DESCRIPTION**/
object TopNProcessAllWindowExample {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val data = env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp)data.map(_.url).windowAll(TumblingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).process(new ProcessAllWindowFunction[String,String,TimeWindow] {override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {val urlCountMap=mutable.Map[String,Int]()elements.foreach(data=>{urlCountMap.get(data) match {case Some(value) => urlCountMap.put(data,value+1)case None => urlCountMap.put(data,1)}})val tuples = urlCountMap.toList.sortBy(-_._2).take(10)val builder = new StringBuilder()builder.append(s"=========窗口:${context.window.getStart} ~ ${context.window.getEnd}========\n")for (i <- tuples.indices){val tuple = tuples(i)builder.append(s"浏览量Top ${i+1} url:${tuple._1} 浏览量是: ${tuple._2} \n")}out.collect(builder.toString())}}).print()env.execute("TopNDemo1")}
}

代码分析

 这段代码使用 Apache Flink 处理流数据,并计算每个时间窗口内 URL 的访问量 TopN。以下是代码的详细解释:

  1. 环境设置

    • val env = StreamExecutionEnvironment.getExecutionEnvironment:获取 Flink 的执行环境。
    • env.setParallelism(1):设置并行度为 1,即单线程执行。
  2. 数据源

    • val data = env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp):从 ClickSource 获取数据,并分配时间戳。ClickSource 是一个自定义的数据源,生成模拟的点击事件。
  3. 窗口操作

    • data.map(_.url):将数据流中的每个事件映射为 URL。
    • .windowAll(TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))):定义一个滚动窗口,窗口大小为 10 秒,滑动步长为 5 秒。windowAll 表示对所有数据进行窗口操作。
  4. 处理函数

    • .process(new ProcessAllWindowFunction[String, String, TimeWindow] { ... }):使用 ProcessAllWindowFunction 处理窗口内的所有数据。
    • urlCountMap:用于存储每个 URL 的访问次数。
    • elements.foreach(data => { ... }):遍历窗口内的所有 URL,更新 urlCountMap
    • val tuples = urlCountMap.toList.sortBy(-_._2).take(10):将 urlCountMap 转换为列表,按访问量降序排序,并取前 10 个。
    • builder.append(...):构建输出字符串,包含窗口时间和 TopN URL 的访问量。
    • out.collect(builder.toString()):将结果输出。
  5. 执行

    • env.execute("TopNDemo1"):启动 Flink 作业。

背景知识拓展

  1. Apache Flink

    • Apache Flink 是一个分布式流处理框架,支持高吞吐、低延迟的实时数据处理。
    • Flink 提供了丰富的 API,包括 DataStream API(用于流处理)和 DataSet API(用于批处理)。
  2. 流处理

    • 流处理是一种处理无界数据流的技术,适用于实时数据分析、监控等场景。
    • 与批处理不同,流处理是连续进行的,数据到达即处理。
  3. 窗口操作

    • 窗口操作是流处理中的核心概念,用于将无界数据流划分为有限的数据集进行处理。
    • 常见的窗口类型包括滚动窗口、滑动窗口和会话窗口。
  4. 时间语义

    • Flink 支持三种时间语义:事件时间(Event Time)、处理时间(Processing Time)和摄入时间(Ingestion Time)。
    • 事件时间是指事件实际发生的时间,通常用于处理乱序事件。
  5. ProcessFunction

    • ProcessFunction 是 Flink 提供的一种底层 API,允许用户自定义处理逻辑,包括访问时间戳、状态管理等。
    • ProcessAllWindowFunction 是 ProcessFunction 的一种,用于处理窗口内的所有数据。
  6. TopN 计算

    • TopN 计算是一种常见的分析任务,用于找出数据集中最频繁或最重要的元素。
    • 在流处理中,TopN 计算通常结合窗口操作进行,以实时更新结果。

代码调优

1. 性能优化

1.1 使用 KeyedStream 和 ProcessWindowFunction 替代 windowAll
  • 问题windowAll 会将所有数据发送到单个任务中,无法并行处理,性能较差。
  • 改进:使用 KeyedStream 对数据进行分组(例如按 URL 分组),然后使用 ProcessWindowFunction 处理每个窗口内的数据。
  • 代码示例
val keyedData = data.keyBy(_.url) // 按 URL 分组
keyedData.window(TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new ProcessWindowFunction[ClickEvent, (String, Int), String, TimeWindow] {override def process(key: String, context: Context, elements: Iterable[ClickEvent], out: Collector[(String, Int)]): Unit = {out.collect((key, elements.size)) // 输出 URL 及其访问次数}})
1.2 使用 ReduceFunction 优化聚合
  • 问题:在 ProcessAllWindowFunction 中,每次都需要遍历所有数据,效率较低。
  • 改进:使用 ReduceFunction 或 AggregateFunction 进行增量聚合,减少数据遍历次数。
  • 代码示例
val reducedData = keyedData.window(TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).reduce((event1, event2) => event1) // 仅保留一个事件,统计访问次数

2. 功能扩展

2.1 支持动态窗口大小
  • 问题:窗口大小和滑动步长是固定的,无法动态调整。
  • 改进:通过配置文件或外部参数动态设置窗口大小和滑动步长。
  • 代码示例
val windowSize = Time.seconds(10) // 从配置中读取
val slideSize = Time.seconds(5)   // 从配置中读取
data.windowAll(TumblingEventTimeWindows.of(windowSize, slideSize))
2.2 支持多维度统计
  • 问题:当前仅统计 URL 的访问量,无法支持其他维度的统计(如用户、设备等)。
  • 改进:扩展统计维度,支持按用户、设备等多维度统计。
  • 代码示例
val userKeyedData = data.keyBy(_.user) // 按用户分组
userKeyedData.window(TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new ProcessWindowFunction[ClickEvent, (String, Int), String, TimeWindow] {override def process(key: String, context: Context, elements: Iterable[ClickEvent], out: Collector[(String, Int)]): Unit = {out.collect((key, elements.size)) // 输出用户及其访问次数}})
2.3 支持持久化存储
  • 问题:结果仅打印到控制台,无法持久化存储。
  • 改进:将结果写入外部存储系统(如 Kafka、HDFS、数据库等)。
  • 代码示例
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
val kafkaProducer = new FlinkKafkaProducer[String]("output-topic", new SimpleStringSchema(), properties)
resultStream.addSink(kafkaProducer)

3. 代码可读性

3.1 提取公共逻辑
  • 问题:统计逻辑和输出逻辑耦合在一起,代码可读性较差。
  • 改进:将统计逻辑和输出逻辑分离,提取为独立的方法或类。
  • 代码示例
def countUrlAccess(elements: Iterable[String]): List[(String, Int)] = {val urlCountMap = mutable.Map[String, Int]()elements.foreach(url => urlCountMap.update(url, urlCountMap.getOrElse(url, 0) + 1))urlCountMap.toList.sortBy(-_._2).take(10)
}def formatResult(window: TimeWindow, tuples: List[(String, Int)]): String = {val builder = new StringBuilder()builder.append(s"=========窗口:${window.getStart} ~ ${window.getEnd}========\n")tuples.zipWithIndex.foreach { case ((url, count), index) =>builder.append(s"浏览量Top ${index + 1} url: $url 浏览量是:$count \n")}builder.toString()
}
3.2 使用 Scala 的高级特性
  • 问题:代码风格较为传统,未充分利用 Scala 的高级特性。
  • 改进:使用 Scala 的 case classOption 和函数式编程特性。
  • 代码示例
case class UrlAccess(url: String, count: Int)val result = elements.groupBy(identity).map { case (url, urls) => UrlAccess(url, urls.size) }.toList.sortBy(-_.count).take(10)

4. 异常处理

4.1 处理数据异常
  • 问题:未处理数据异常(如空数据、非法数据等)。
  • 改进:添加异常处理逻辑,确保作业的健壮性。
  • 代码示例
try {val tuples = countUrlAccess(elements)out.collect(formatResult(context.window, tuples))
} catch {case e: Exception => println(s"处理窗口数据时发生异常:${e.getMessage}")
}
4.2 处理作业异常
  • 问题:未处理作业级别的异常(如数据源故障、网络中断等)。
  • 改进:使用 Flink 的 RestartStrategy 和 Checkpointing 机制。
  • 代码示例
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10)))
env.enableCheckpointing(5000) // 每 5 秒进行一次 checkpoint

5. 完整改进代码

以下是结合上述改进的完整代码:

package processfunctionimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import source.ClickSourceimport scala.collection.mutablecase class UrlAccess(url: String, count: Int)object TopNProcessAllWindowExample {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val data = env.addSource(new ClickSource).assignAscendingTimestamps(_.timestamp)data.map(_.url).windowAll(TumblingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new ProcessAllWindowFunction[String, String, TimeWindow] {override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {try {val tuples = countUrlAccess(elements)out.collect(formatResult(context.window, tuples))} catch {case e: Exception => println(s"处理窗口数据时发生异常:${e.getMessage}")}}}).print()env.execute("TopNDemo1")}def countUrlAccess(elements: Iterable[String]): List[UrlAccess] = {elements.groupBy(identity).map { case (url, urls) => UrlAccess(url, urls.size) }.toList.sortBy(-_.count).take(10)}def formatResult(window: TimeWindow, tuples: List[UrlAccess]): String = {val builder = new StringBuilder()builder.append(s"=========窗口:${window.getStart} ~ ${window.getEnd}========\n")tuples.zipWithIndex.foreach { case (UrlAccess(url, count), index) =>builder.append(s"浏览量Top ${index + 1} url: $url 浏览量是:$count \n")}builder.toString()}
}

总结

通过上述改进,代码在 性能功能可读性 和 健壮性 方面都得到了显著提升。你可以根据实际需求进一步调整和扩展代码,例如支持更多维度统计、集成外部存储系统等。

相关文章:

Flink流处理:实时计算URL访问量TopN(基于时间窗口)

目录 代码分析 背景知识拓展 代码调优 1. 性能优化 1.1 使用 KeyedStream 和 ProcessWindowFunction 替代 windowAll 1.2 使用 ReduceFunction 优化聚合 2. 功能扩展 2.1 支持动态窗口大小 2.2 支持多维度统计 2.3 支持持久化存储 3. 代码可读性 3.1 提取公共逻辑 …...

初识函数------了解函数的定义、函数的参数、函数的返回值、说明文档的书写、函数的嵌套使用、变量的作用域(全局变量与局部变量)

文章目录 一、什么是函数&#xff1f;二、函数定义与调用2.1 基本语法2.2 示例演示 三、函数参数详解3.1 位置参数3.2 默认参数3.3 可变参数3.4 关键字参数 四、返回值与文档说明4.1 返回多个值4.2 编写文档字符串 五、函数嵌套与作用域5.1 嵌套函数示例5.2 变量作用域5.3 glob…...

java collection集合特点知识点详解

在 Java 中&#xff0c;Collection 是所有集合类的根接口&#xff0c;它定义了一组对象的基本操作。Java 集合框架提供了丰富的实现类&#xff08;如List、Set、Queue&#xff09;&#xff0c;具有以下核心特点&#xff1a; 一、统一的接口设计 1. 核心接口层次 Collection …...

ngx_http_realip_module 模块概述

一、使用场景 日志记录 记录真实客户端 IP 而非反向代理的 IP&#xff0c;有助于流量分析和安全审计。访问控制 基于真实 IP 实现防火墙规则&#xff08;allow/deny&#xff09;或限流&#xff0c;而非误将上游 IP 视为客户端。GeoIP、WAF、限速等功能 模块化的上游真实 IP 支…...

自定义CString类与MFC CString类接口对比

接口对比表格 功能分类 你的 CString 接口 MFC CString 接口&#xff08;ANSI&#xff09; 一致性 差异说明 构造函数 CString() CString(const char*) CString(char) CString(const CString&) CString() CString(LPCSTR) CString(TCHAR) CString(const CString&…...

华为OD机试真题——考勤信息(2025A卷:100分)Java/python/JavaScript/C/C++/GO最佳实现

2025 A卷 100分 题型 本专栏内全部题目均提供Java、python、JavaScript、C、C++、GO六种语言的最佳实现方式; 并且每种语言均涵盖详细的问题分析、解题思路、代码实现、代码详解、3个测试用例以及综合分析; 本文收录于专栏:《2025华为OD真题目录+全流程解析+备考攻略+经验分…...

Go语言测试用例的执行与分析

在软件开发过程中&#xff0c;测试用例是确保代码质量的关键环节。Go语言作为一种现代的编程语言&#xff0c;它内置了强大的测试框架&#xff0c;可以帮助开发者轻松编写和执行测试用例。本文将介绍如何在 Go 语言中编写、执行测试用例&#xff0c;并对测试结果进行分析。 ## …...

vue3 vite 路由

如路由是这种格式 http://localhost:7058/admin/product/brand路由配置如下 import { createRouter, createWebHistory } from vue-router import HomeView from ../views/HomeView.vue import NProgress from nprogress; import nprogress/nprogress.css; import {errorRour…...

MyBatis:动态SQL

文章目录 动态SQLif标签trim标签where标签set标签foreach标签include标签和sql标签 Mybatis动态SQL的官方文档&#xff1a; https://mybatis.net.cn/dynamic-sql.html 动态SQL 动态SQL是 MyBatis的强大特性之一,如果是使用JDBC根据不同条件拼接sql很麻烦&#xff0c;例如拼接…...

游戏引擎学习第280天:精简化的流式实体sim

回顾并为今天的内容做铺垫 今天的任务是让之前关于实体存储方式的改动真正运行起来。我们现在希望让实体系统变得更加真实和实用&#xff0c;能够支撑我们游戏实际所需的功能。这就要求我们对它进行更合理的实现和调试。 昨天我们基本让代码编译通过了&#xff0c;但实际上还…...

femap许可与多用户共享

随着电磁仿真技术的发展&#xff0c;Femap作为一款领先的工具&#xff0c;在多个领域中发挥着不可替代的作用。然而&#xff0c;对于许多团队和企业来说&#xff0c;如何高效、经济地管理和使用Femap许可证成为了一个亟待解决的问题。本文将探讨Femap许可与多用户共享的概念、优…...

王树森推荐系统公开课 排序03:预估分数融合

融合预估分数 p c l i c k ⋅ p l i k e p_{click} \cdot p_{like} pclick​⋅plike​ 有实际意义&#xff0c;等于在曝光中点赞的概率。 p c l i c k ⋅ p c o l l e c t p_{click} \cdot p_{collect} pclick​⋅pcollect​ 同理。 按多种排名做 ensemble sort。 某电商的融…...

网络I/O学习-poll(三)

一、为什么要用Poll 由于select参数太多&#xff0c;较于复杂&#xff0c;调用起来较为麻烦&#xff1b;poll对其进行了优化 二、poll机制 poll也是一个系统调用&#xff0c;每次调用都会将所有客户端的fd拷贝到内核空间&#xff0c;然后进行轮询&#xff0c;判断IO是否就绪…...

k8s(12) — 版本控制和滚动更新(金丝雀部署理念)

金丝雀部署简介&#xff1a; 1、基本概念 金丝雀部署是一种软件开发中的渐进式发布策略&#xff0c;其核心思想是通过将新版本应用逐步发布给一小部分用户&#xff08;即 “金丝雀” 用户&#xff09;&#xff0c;在真实环境中验证功能稳定性和性能表现&#xff0c;再逐步扩大发…...

【git config --global alias | Git分支操作效率提升实践指南】

git config --global alias | Git分支操作效率提升实践指南 背景与痛点分析 在现代软件开发团队中&#xff0c;Git分支管理是日常工作的重要组成部分。特别是在规范的开发流程中&#xff0c;我们经常会遇到类似 feature/user-management、bugfix/login-issue 或 per/cny/dev …...

chrome源码中WeakPtr 跨线程使用详解:原理、风险与最佳实践

base::WeakPtr 在 Chromium 中 不能安全地跨线程使用。这是一个很关键的点&#xff0c;下面详细解释原因及正确用法。 &#x1f50d;原理与使用 ✅ 先说答案&#xff1a; base::WeakPtr 本质上是**线程绑定&#xff08;thread-affine&#xff09;**的。不能在多个线程之间创建…...

【Go】从0开始学习Go

文章目录 从0开始学习Go0 与C对比1 代码框架1.1 helloworld式代码示例1.2 主体代码元素&#xff08;核心三部分&#xff09;1.3 其他 2 与C/C区别3 有用的小工具4 注意事项 从0开始学习Go 0 与C对比 特性CGo编译型语言需要编译为机器码直接编译为二进制可执行文件静态类型类型…...

Windows 安装显卡驱动

1.第一步&#xff1a;打开Nvidia 官网驱动下载页面 2.第二步&#xff1a;选择相关信息&#xff0c; 玩游戏选择&#xff0c;GeForce Game Ready ,创意设计、摄影直播 选择 NVIDIA Studio 驱动程序 &#xff08;NVIDIA Studio Driver - WHQL.&#xff09; 2.第三步&#xff1…...

模块与包的导入

一、导入官方库 我们复盘下学习python的逻辑&#xff0c;所谓学习python就是学习python常见的基础语法学习你所处理任务需要用到的第三方库 类别典型库解决的问题学习门槛基础工具os、sys、json操作系统交互、序列化数据&#xff08;如读写 JSON 文件&#xff09;低科学计算n…...

Google设置app-ads.txt

问题&#xff1a; 应用上架后admob后台显示应用广告投放量受限&#xff0c;需要设置app-ads.txt才行。 如何解决&#xff1a; 官方教程: 看了下感觉不难&#xff0c;创建一个txt&#xff0c;将第二条的代码复制进行就得到app-ads.txt了。 然后就是要把这个txt放到哪才可以…...

docker安装rockerMQ

参考Docker部署RocketMQ5.x (单机部署配置参数详解不使用docker-compose直接部署)_rocketmq不推荐用docker部署-CSDN博客 镜像拉取 镜像地址&#xff1a; https://hub.docker.com/r/apache/rocketmq/tags 我在部署的时候最新发行版是5.1.0可以根据需求自行选择一个5.x的版本&a…...

交叉引用、多个参考文献插入、跨文献插入word/wps中之【插入[1-3]、连续文献】

我们在写论文时&#xff0c;需要插入大量参考文献。 有时&#xff0c;一句话需要引用多个文献&#xff0c;如&#xff1a;[1-3]或者[1,3,4]这种形式多个文献插入、跨文献插入。 在上一篇文章中&#xff0c;我们提到可以直接打“-”或者“&#xff0c;”&#xff0c;但是word导出…...

PLC双人舞:profinet转ethernet ip网关奏响施耐德与AB的协奏曲

PLC双人舞&#xff1a;ethernet ip转profinet网关奏响施耐德与AB的协奏曲 案例分析&#xff1a;施耐德PLC与AB PLC的互联互通 在现代工业自动化中&#xff0c;设备之间的互联互通至关重要。本案例旨在展示如何通过北京倍讯科技的EtherNet/IP转Modbus网关&#xff0c;将施耐德P…...

Image and depth from a conventional camera with a coded aperture论文阅读

Image and depth from a conventional camera with a coded aperture 1. 研究目标与实际意义1.1 研究目标1.2 实际问题与产业意义2. 创新方法:编码光圈设计与统计模型2.1 核心思路2.2 关键公式与模型架构2.2.1 图像形成模型2.2.2 深度可区分性准则2.2.3 统计模型与优化框架2.2…...

缺乏团队建设活动,如何增强凝聚力?

当一个团队缺乏系统性的建设活动时&#xff0c;成员之间容易产生疏离感、误解与信任缺失&#xff0c;最终影响整体执行力和目标达成。要有效增强团队凝聚力&#xff0c;应从设计高参与感的团队活动、结合业务与人文目标、营造持续共创的文化机制、推动跨层级协作互动等层面着手…...

特征筛选方法总结

非模型方法 一.FILTER过滤法&#xff1a; 1.缺失值比例&#xff08;80%以上缺失则删除&#xff09;/方差 注意&#xff1a; 连续变量只删方差为0的&#xff0c;因为变量取值范围会影响方差大小。 离散类的看各类取值占比,如果是三分类变量可以视作连续变量。 函数&#xff1a;V…...

力扣HOT100之二叉树:230. 二叉搜索树中第 K 小的元素

这道题直接用最笨的办法来做的&#xff0c;用递归来做&#xff0c;我们定义一个全局变量vector<int> element&#xff0c;然后使用中序遍历&#xff0c;每当碰到一个非空节点就将其加入到向量中&#xff0c;这样依赖当向量中的元素小于k时&#xff0c;就返回0&#xff0c…...

pinia.defineStore is not a function

错误信息表明 pinia.defineStore 不是一个函数,这通常意味着 pinia 没有被正确导入或初始化。 解决方案 检查 Pinia 的导入 确保你从 pinia 中正确导入了 defineStore。正确的导入方式应该是: javascript import { defineStore } from ‘pinia’; 如果你使用的是 createPin…...

入职软件开发与实施工程师了后........

时隔几个月没有创作的我又回来了&#xff0c;这几个月很忙&#xff0c;我一直在找工作&#xff0c;在自考&#xff08;顺便还处理了一下分手的事&#xff09;&#xff0c;到处奔波&#xff0c;心力交瘁。可能我骨子里比较傲吧。我不愿意着急谋生&#xff0c;做我不愿意做的普通…...

PCL点云库点云数据处理入门系列教材目录(2025年5月更新....)

PCL点云库点云数据处理入门系列教材目录 基础阶段 第 1 讲&#xff1a;PCL库简介和安装&#xff08;Win10/11VS2019PCL 1.12.0&#xff09;第 2 讲&#xff1a;PCL库中点云基本知识和数据类型结构第 3 讲&#xff1a;PCL库中点云数据格式PCD和PLY及其输入输出&#xff08;IO&…...