当前位置: 首页 > article >正文

Flink 编程基础:Scala 版 DataStream API 入门

大家好!我是心海

流处理技术在大数据时代正变得越来越重要,而 Apache Flink 作为领先的流处理引擎,凭借其高性能、低延迟和丰富的 API 受到了广泛关注。本文将以 Scala 语言为例,详细讲解 Flink DataStream API 的基本编程模型,从数据源、数据转换、数据输出,到窗口划分与时间概念,最后结合经典的 WordCount 案例,带大家一步步动手实践。

目录

一、DataStream API:构建流处理世界的基石 

二、 基本编程实践:WordCount 示例

2.1 代码示例

2.2 代码解析

三、窗口的划分:在无限流中框定边界 

3.1 时间概念

3.2 窗口划分

 3.3 窗口计算

3.4 窗口计算示例

 四、总结


 

一、DataStream API:构建流处理世界的基石 

想象一下,现实世界的数据就像一条奔流不息的河流,时刻都在产生新的信息。DataStream API 就是 Flink 提供给我们的工具箱,里面装满了各种强大的工具,帮助我们捕获、转换和分析这条“数据河流”。

DataStream 编程模型:三段论

一个典型的 Flink DataStream 应用程序可以概括为以下三个核心步骤:

  1. 数据源(Source):数据的起点

  2. 数据转换(Transformation):数据的加工厂

  3. 数据输出(Sink):数据的归宿

可以用一张简单的图来表示这个过程:


二、 基本编程实践:WordCount 示例

为了让大家更直观地理解 Flink 编程,我们以经典的 WordCount 案例来讲解。下面的示例代码使用 Scala 编写,涵盖了数据源、数据转换、窗口计算以及数据输出的全流程。

2.1 代码示例

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Timeobject WordCount {def main(args: Array[String]): Unit = {// 创建流执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 1. 数据源:从本地集合创建数据流(实际项目中可替换为读取文件或 Kafka)val text = env.fromElements("Flink is a streaming engine","Flink supports batch processing","Scala makes Flink programming concise")// 2. 数据转换:拆分单词,转换为 (word, 1) 格式val counts = text.flatMap(_.toLowerCase.split("\\W+")).filter(_.nonEmpty).map(word => (word, 1)).keyBy(_._1).reduce((a, b) => (a._1, a._2 + b._2))// 3. 数据输出:打印到控制台(实际项目中可写入文件、Kafka 或数据库)counts.print()// 执行程序env.execute("Scala Flink WordCount Example")}
}

2.2 代码解析

  • 数据源
    使用 env.fromElements 创建一个包含多条文本数据的流。在实际生产中,可以使用 env.readTextFile 或者 KafkaSource 读取实时数据。

  • 数据转换

    1. flatMap:将每行文本拆分成单词,并转换为小写。

    2. filter:过滤掉空字符串。

    3. map:将每个单词映射成 (word, 1) 元组。

    4. keyBy:根据单词进行分组。

    5. reduce:聚合相同单词的计数。

  • 数据输出
    使用 print 将计算结果输出到控制台。在实际项目中可以替换成其他 Sink(例如写入 Kafka、数据库或文件)。


三、窗口的划分:在无限流中框定边界 

由于流数据是无限的,我们需要将无限的流划分成有限大小的“窗口”,然后在每个窗口上进行计算。Flink 提供了灵活的窗口机制,可以根据时间、数量或其他条件来划分窗口。

无限延伸的蓝色波浪线代表数据流,上面被垂直的虚线分割成若干个矩形区域,每个矩形区域代表一个窗口。每个窗口内部包含若干个数据点。 

3.1 时间概念

在定义和计算窗口时,时间是一个至关重要的概念

  • 事件时间(Event Time)
    指数据中携带的时间戳,反映数据生成的真实时间

  • 处理时间(Processing Time)
    指系统接收到数据时的时间,不受数据本身时间戳影响。

  • 摄取时间(Ingestion Time)
    数据进入 Flink 系统的时间,一般介于事件时间和处理时间之间。

选择哪种时间语义取决于你的应用场景和对时间准确性的要求。事件时间通常是最准确的,但也可能涉及到处理乱序事件的问题。

3.2 窗口划分

常见的窗口类型有:

  • 滚动窗口(Tumbling Window)
    固定长度且互不重叠的窗口,如上面 WordCount 示例中的 5 秒窗口。

  • 滑动窗口(Sliding Window)
    窗口大小固定,但窗口之间存在重叠部分,可设置窗口滑动步长

  • 会话窗口(Session Window)
    根据数据之间的间隔动态划分,当间隔超过设定阈值时视为新窗口。

 3.3 窗口计算

在窗口内执行聚合

一旦我们定义了窗口的划分方式和使用的时间语义,就可以在每个窗口内执行各种计算,例如计数、求和、平均值、最大值、最小值等等。

Flink 提供了不同的 Window Assigners(窗口分配器)来定义如何将数据分配到窗口中,常见的有:

  • 时间窗口(Time Windows): 基于时间长度划分窗口,例如滚动时间窗口(Tumbling Time Window)、滑动时间窗口(Sliding Time Window)、会话窗口(Session Window)。

  • 计数窗口(Count Windows): 基于元素的数量划分窗口,例如滚动计数窗口(Tumbling Count Window)、滑动计数窗口(Sliding Count Window)。

3.4 窗口计算示例

下面是一个使用滑动窗口的示例,统计每个单词在 10 秒窗口内每隔 5 秒统计一次的计数:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Timeobject SlidingWindowWordCount {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval text = env.fromElements("Flink streaming window example","Flink supports different time semantics","Scala and Flink make a great combination")val counts = text.flatMap(_.toLowerCase.split("\\W+")).filter(_.nonEmpty).map(word => (word, 1)).keyBy(_._1)// 定义滑动窗口:窗口长度 10 秒,滑动步长 5 秒.timeWindow(Time.seconds(10), Time.seconds(5)).sum(1)counts.print()env.execute("Scala Flink Sliding Window WordCount")}
}
  • flatMap(_.toLowerCase.split("\\W+")):将每行文本转换为小写,然后按非单词字符(如空格、标点符号)进行拆分,将每个单词作为一个独立的元素输出。
  • filter(_.nonEmpty):过滤掉空字符串。
  • map(word => (word, 1)):将每个单词映射为一个二元组(word, 1),其中word是单词,1表示该单词出现了一次。
  • keyBy(_._1):根据二元组的第一个元素(即单词)进行分组,以便后续对每个单词进行独立的统计。
  • .timeWindow(Time.seconds(10), Time.seconds(5)):定义一个滑动窗口,窗口长度为 10 秒,滑动步长为 5 秒。这意味着每 5 秒会生成一个新的窗口,每个窗口包含最近 10 秒内的数据。
  • .sum(1):对每个窗口内的二元组的第二个元素(即计数)进行求和,得到每个单词在该窗口内的出现次数。

 


 

 四、总结

恭喜你!通过本篇文章,你已经对 Scala 版 Flink DataStream API 的编程基础有了初步的了解。我们学习了 DataStream API 的核心组成部分:数据源、数据转换和数据输出,并通过 WordCount 示例进行了实践。同时,我们也初步接触了窗口的概念、时间语义和基本的窗口计算。

Flink DataStream API 的功能远不止于此,还有更高级的转换算子、更灵活的窗口操作、状态管理、容错机制等等等待我们去探索。在接下来的文章中,我们将继续深入学习这些更高级的主题,带你逐步成为 Flink 流处理的专家!

希望这篇文章能够帮助你迈出 Flink Scala 编程的第一步。如果你有任何问题或建议,欢迎在评论区留言交流。让我们一起在 Flink 的世界里扬帆起航!

 如果这篇文章对你有所启发,期待你的点赞关注!

相关文章:

Flink 编程基础:Scala 版 DataStream API 入门

大家好!我是心海 流处理技术在大数据时代正变得越来越重要,而 Apache Flink 作为领先的流处理引擎,凭借其高性能、低延迟和丰富的 API 受到了广泛关注。本文将以 Scala 语言为例,详细讲解 Flink DataStream API 的基本编程模型&am…...

实战|使用环信Flutter SDK构建鸿蒙HarmonyOS应用及推送配置

本文为大家介绍如何在 Flutter 环境创建 Harmony 项目并集成环信即时通讯IM以及环信 Flutter Harmony 推送配置。 已经基于环信的 Flutter 项目也可以参考本文适配鸿蒙端。 一、开发环境要求 前置条件 1.安装DevEco-Studio 2.安装模拟器 DevEco-Studio 下载与操作指导&…...

HTML5好看的水果蔬菜在线商城网站源码系列模板5

文章目录 1.设计来源1.1 主界面1.2 关于我们1.3 商品服务1.4 果蔬展示1.5 联系我们1.6 商品具体信息1.7 登录注册 2.效果和源码2.1 动态效果2.2 源代码 源码下载万套模板,程序开发,在线开发,在线沟通 作者:xcLeigh 文章地址&#…...

宜搭与金蝶互通——连接器建立

一、 进入连接器工厂 图1 连接器入口 二、 新建连接器 图2 新建连接器第一步 1、 连接器显示名,如图2中①所示; 2、 图2中②域名,是金蝶系统API接口里面的“完整服务地址”com之前的信息,不含“https”,如图3中①所示; 3、 Base Url通常为“/”,如图2…...

SP7733:HPYNOS - Happy Numbers I(参考我之前的文章,哈希)

题目大意 我们定义“破坏”整数的过程是对其每一位上的数字的平方求和成为一个新数,如果一个数在经过若干次“破坏”以后变成了 1,那么这个数就是一个高兴的数字,输出变化次数,否则如果永远不会变成 1,输出 −1。 例如…...

《Java 泛型的作用与常见用法详解》

大家好呀!👋 今天我们要聊的是Java中一个超级重要但又让很多初学者头疼的概念——泛型(Generics)。带你彻底搞懂它!💪 准备好你的小本本,我们开始啦~📝 一、为什么需要泛型?&#x…...

【JavaWeb】详细讲解 HTTP 协议

文章目录 一、HTTP简介1.1 概念1.2 特点 二、协议2.1 HTTP-请求协议(1)GET方式(2)POST方式(3)GET和POST的区别: 2.2 HTTP-响应协议(1)格式(2)响应…...

【android bluetooth 框架分析 02】【Module详解 4】【Btaa 模块介绍】

1. 背景 我们在上一篇文章中介绍 HciHal 模块时&#xff0c;有如下代码 // system/gd/hal/hci_hal_android_hidl.ccvoid ListDependencies(ModuleList* list) const {list->add<SnoopLogger>();if (common::init_flags::btaa_hci_is_enabled()) {list->add<ac…...

“星睿O6” AI PC开发套件评测 - Windows on Arm 安装指南和性能测评

引言 Radxa联合此芯科技和安谋科技推出全新的"星睿O6"迷你 ITX 主板。该系统搭载了 CIX P1&#xff08;CD8180&#xff09;12 核 Armv9 处理器&#xff0c;拥有高达30T算力的NPU和高性能的GPU&#xff0c;最高配备64GB LPDDR内存&#xff0c;并提供了如 5GbE、HDMI …...

Python 调用 YOLOv11 ONNX

Python 调用 YOLO ONNX 1 下载ONNX文件2 Python代码 1 下载ONNX文件 ONNX下载地址 2 Python代码 import cv2 from ultralytics import YOLOdef check(yolo:str, path:str):# 加载 YOLOv11model YOLO(yolo)# 读取图片img cv2.imread(path)# 推理&#xff08;可以传文件路径…...

数据通信学习笔记之OSPF路由汇总

区域间路由汇总 路由汇总又被称为路由聚合&#xff0c;即是将一组前缀相同的路由汇聚成一条路由&#xff0c;从而达到减小路由表规模以及优化设备资源利用率的目的&#xff0c;我们把汇聚之前的这组路由称为精细路由或明细路由&#xff0c;把汇聚之后的这条路由称为汇总路由或…...

ASP.NET Core Web API 配置系统集成

文章目录 前言一、配置源与默认设置二、使用步骤1&#xff09;创建项目并添加配置2&#xff09;配置文件3&#xff09;强类型配置类4&#xff09;配置Program.cs5&#xff09;控制器中使用配置6&#xff09;配置优先级测试7&#xff09;动态重载配置测试8&#xff09;运行结果示…...

如何判断单片机性能极限?

目录 1、CPU 负载 2、内存使用情况 3、实时性能 4、外设带宽 5、功耗与温度 在嵌入式系统设计中&#xff0c;当系统变得复杂、功能增加时&#xff0c;单片机可能会逐渐逼近其性能极限。及时识别这些极限点对于保证产品质量、稳定性和用户体验至关重要。 当你的嵌入式系统…...

AI在多Agent协同领域的核心概念、技术方法、应用场景及挑战 的详细解析

以下是 AI在多Agent协同领域的核心概念、技术方法、应用场景及挑战 的详细解析&#xff1a; 1. 多Agent协同的定义与核心目标 多Agent系统&#xff08;MAS, Multi-Agent System&#xff09;&#xff1a; 由多个独立或协作的智能体&#xff08;Agent&#xff09;组成&#xff…...

1.凸包、极点、极边基础概念

目录 1.凸包 2.调色问题 3.极性(Extrem) 4.凸组合(Convex Combination) 5.问题转化(Strategy)​编辑 6.In-Triangle test 7.To-Left-test 8.极边&#xff08;Extream Edges&#xff09; 1.凸包 凸包就是上面蓝色皮筋围出来的范围 这些钉子可以转换到坐标轴中&#xff0…...

OSCP - Proving Grounds - DriftingBlues6

主要知识点 路径爆破dirtycow内核漏洞提权 具体步骤 总体来讲&#xff0c;这台靶机还是比较直接的&#xff0c;没有那么多的陷阱,非常适合用来学习 依旧是nmap开始,只开放了80端口 Nmap scan report for 192.168.192.219 Host is up (0.42s latency). Not shown: 65534 cl…...

深度理解指针之例题

文章目录 前言题目分析与讲解涉及知识点 前言 对指针有一定了解后&#xff0c;讲一下一道初学者的易错题 题目分析与讲解 先定义一个数组跟一个指针变量 然后把数组名赋值给指针变量————也就是把首地址传到pulPtr中 重点是分析这一句&#xff1a; *&#xff08;pulPtr…...

java 设计模式之策略模式

简介 策略模式&#xff1a;策略模式可以定制目标对象的行为&#xff0c;它尅通过传入不同的策略实现&#xff0c;来配置目标对象的行为。使用策略模式&#xff0c;就是为了定制目标对象在某个关键点的行为。 策略模式中的角色&#xff1a; 上下文类&#xff1a;持有一个策略…...

LeetCode算法题(Go语言实现)_51

题目 给你两个下标从 0 开始的整数数组 nums1 和 nums2 &#xff0c;两者长度都是 n &#xff0c;再给你一个正整数 k 。你必须从 nums1 中选一个长度为 k 的 子序列 对应的下标。 对于选择的下标 i0 &#xff0c;i1 &#xff0c;…&#xff0c; ik - 1 &#xff0c;你的 分数 …...

条件变量condition_variable

1.条件变量用来控制线程同步和协调。 2.调用wait方法&#xff0c;如果条件满足就不挂起&#xff0c;如果条件不满足就挂起线程并释放锁。 3.等到通知后&#xff0c;挂起线程先 竞争锁&#xff0c;然后 判断条件,如果满足条件就往下走&#xff0c;如果不满足就再次挂起并释放锁…...

Solon AI MCP Server 入门:Helloworld (支持 java8 到 java24。国产解决方案)

目前网上能看到的 MCP Server 基本上都是基于 Python 或者 nodejs &#xff0c;虽然也有 Java 版本的 MCP SDK&#xff0c;但是鲜有基于 Java 开发的。 作为Java 开发中的国产顶级框架 Solon 已经基于 MCP SDK 在进行 Solon AI MCP 框架开发了&#xff0c;本文将使用 Solon AI …...

Maven工具学习使用(十一)——部署项目到仓库

1、使用Maven默认方式 Maven 部署项目时默认使用的上传文件方式是通过 HTTP/HTTPS 协议。要在 Maven 项目中配置部署&#xff0c;您需要在项目的 pom.xml 文件中添加 部分。这个部分定义了如何部署项目的构件&#xff08;如 JAR 文件&#xff09;到仓库。。这个部分定义了如何…...

公司内部自建知识共享的方式分类、详细步骤及表格总结,分为开源(对外公开)和闭源(仅限内部),以及公共(全员可访问)和内部(特定团队/项目组)四个维度

以下是公司内部自建知识共享的方式分类、详细步骤及表格总结&#xff0c;分为开源&#xff08;对外公开&#xff09;和闭源&#xff08;仅限内部&#xff09;&#xff0c;以及公共&#xff08;全员可访问&#xff09;和内部&#xff08;特定团队/项目组&#xff09;四个维度&am…...

Oracle 19c部署之初始化实例(三)

上一篇文章中&#xff0c;我们已经完成了数据库软件安装&#xff0c;接下来我们需要进行实例初始化工作。 一、初始化实例的两种方式 1.1 图形化初始化实例 描述&#xff1a;图形化初始化实例是通过Oracle的Database Configuration Assistant (DBCA)工具完成的。用户通过一系…...

医疗设备预测性维护合规架构:从法规遵循到技术实现的深度解析

在医疗行业数字化转型加速推进的当下&#xff0c;医疗设备预测性维护已成为提升设备可用性、保障医疗安全的核心技术。然而&#xff0c;该技术的有效落地必须建立在严格的合规框架之上。医疗设备直接关乎患者生命健康&#xff0c;其维护过程涉及医疗法规、数据安全、质量管控等…...

Openfeign的最佳实践

文章目录 问题引入一、继承的方式1. 建立独立的Moudle服务2. 服务调用方继承jar包中的接口3. 直接注入继承后的接口进行使用 二、抽取的方式1. 建立独立的Moudle服务2.服务调用方依赖注入 问题引入 openfeign接口的实现和服务提供方的controller非常相似&#xff0c;例如&…...

Python中如何加密/解密敏感信息(如用户密码、token)

敏感信息,如用户密码、API密钥、访问令牌(token)、信用卡号以及其他个人身份信息(PII),构成了现代应用程序和系统中最为关键的部分。这些信息一旦被未经授权的第三方获取,可能引发灾难性的后果,从个人隐私泄露到企业经济损失,甚至是大规模的社会安全问题。保护这些敏感…...

【Java面试系列】Spring Cloud微服务架构中的分布式事务解决方案与Seata框架实现原理详解 - 3-5年Java开发必备知识

【Java面试系列】Spring Cloud微服务架构中的分布式事务解决方案与Seata框架实现原理详解 - 3-5年Java开发必备知识 引言 在微服务架构中&#xff0c;分布式事务是一个不可避免的挑战。随着业务复杂度的提升&#xff0c;如何保证跨服务的数据一致性成为了面试中的高频问题。本…...

从万维网到人工智能基石:大数据技术三十年演进史(1991-2025)

一、万维网的创世纪&#xff08;1991&#xff09; 1.1 信息共享的革命性突破 1991年8月6日&#xff0c;蒂姆伯纳斯-李在欧洲核子研究中心&#xff08;CERN&#xff09;发布首个万维网&#xff08;World Wide Web&#xff09;网站&#xff0c;构建了信息互联的三项核心技术&…...

Buildroot编译过程中下载源码失败

RK3588编译一下recovery&#xff0c;需要把buildroot源码编译一遍。遇到好几个文件都下载失败&#xff0c;如下所示 pm-utils 1.4.1这个包下载失败&#xff0c;下载地址http://pm-utils.freedesktop.org/releases 解决办法&#xff0c;换个网络用windows浏览器下载后&#xff…...