Flink 编程基础:Scala 版 DataStream API 入门
大家好!我是心海
流处理技术在大数据时代正变得越来越重要,而 Apache Flink 作为领先的流处理引擎,凭借其高性能、低延迟和丰富的 API 受到了广泛关注。本文将以 Scala 语言为例,详细讲解 Flink DataStream API 的基本编程模型,从数据源、数据转换、数据输出,到窗口划分与时间概念,最后结合经典的 WordCount 案例,带大家一步步动手实践。
目录
一、DataStream API:构建流处理世界的基石
二、 基本编程实践:WordCount 示例
2.1 代码示例
2.2 代码解析
三、窗口的划分:在无限流中框定边界
3.1 时间概念
3.2 窗口划分
3.3 窗口计算
3.4 窗口计算示例
四、总结
一、DataStream API:构建流处理世界的基石
想象一下,现实世界的数据就像一条奔流不息的河流,时刻都在产生新的信息。DataStream API 就是 Flink 提供给我们的工具箱,里面装满了各种强大的工具,帮助我们捕获、转换和分析这条“数据河流”。
DataStream 编程模型:三段论
一个典型的 Flink DataStream 应用程序可以概括为以下三个核心步骤:
数据源(Source):数据的起点
数据转换(Transformation):数据的加工厂
数据输出(Sink):数据的归宿
可以用一张简单的图来表示这个过程:

二、 基本编程实践:WordCount 示例
为了让大家更直观地理解 Flink 编程,我们以经典的 WordCount 案例来讲解。下面的示例代码使用 Scala 编写,涵盖了数据源、数据转换、窗口计算以及数据输出的全流程。
2.1 代码示例
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Timeobject WordCount {def main(args: Array[String]): Unit = {// 创建流执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// 1. 数据源:从本地集合创建数据流(实际项目中可替换为读取文件或 Kafka)val text = env.fromElements("Flink is a streaming engine","Flink supports batch processing","Scala makes Flink programming concise")// 2. 数据转换:拆分单词,转换为 (word, 1) 格式val counts = text.flatMap(_.toLowerCase.split("\\W+")).filter(_.nonEmpty).map(word => (word, 1)).keyBy(_._1).reduce((a, b) => (a._1, a._2 + b._2))// 3. 数据输出:打印到控制台(实际项目中可写入文件、Kafka 或数据库)counts.print()// 执行程序env.execute("Scala Flink WordCount Example")}
}
2.2 代码解析
-
数据源
使用env.fromElements创建一个包含多条文本数据的流。在实际生产中,可以使用env.readTextFile或者 KafkaSource 读取实时数据。 -
数据转换
-
flatMap:将每行文本拆分成单词,并转换为小写。
-
filter:过滤掉空字符串。
-
map:将每个单词映射成 (word, 1) 元组。
-
keyBy:根据单词进行分组。
-
reduce:聚合相同单词的计数。
-
-
数据输出
使用print将计算结果输出到控制台。在实际项目中可以替换成其他 Sink(例如写入 Kafka、数据库或文件)。
三、窗口的划分:在无限流中框定边界
由于流数据是无限的,我们需要将无限的流划分成有限大小的“窗口”,然后在每个窗口上进行计算。Flink 提供了灵活的窗口机制,可以根据时间、数量或其他条件来划分窗口。

无限延伸的蓝色波浪线代表数据流,上面被垂直的虚线分割成若干个矩形区域,每个矩形区域代表一个窗口。每个窗口内部包含若干个数据点。
3.1 时间概念
在定义和计算窗口时,时间是一个至关重要的概念
事件时间(Event Time)
指数据中携带的时间戳,反映数据生成的真实时间。处理时间(Processing Time)
指系统接收到数据时的时间,不受数据本身时间戳影响。摄取时间(Ingestion Time)
数据进入 Flink 系统的时间,一般介于事件时间和处理时间之间。
选择哪种时间语义取决于你的应用场景和对时间准确性的要求。事件时间通常是最准确的,但也可能涉及到处理乱序事件的问题。
3.2 窗口划分
常见的窗口类型有:
滚动窗口(Tumbling Window)
固定长度且互不重叠的窗口,如上面 WordCount 示例中的 5 秒窗口。滑动窗口(Sliding Window)
窗口大小固定,但窗口之间存在重叠部分,可设置窗口滑动步长。会话窗口(Session Window)
根据数据之间的间隔动态划分,当间隔超过设定阈值时视为新窗口。
3.3 窗口计算
在窗口内执行聚合
一旦我们定义了窗口的划分方式和使用的时间语义,就可以在每个窗口内执行各种计算,例如计数、求和、平均值、最大值、最小值等等。
Flink 提供了不同的 Window Assigners(窗口分配器)来定义如何将数据分配到窗口中,常见的有:
-
时间窗口(Time Windows): 基于时间长度划分窗口,例如滚动时间窗口(Tumbling Time Window)、滑动时间窗口(Sliding Time Window)、会话窗口(Session Window)。
-
计数窗口(Count Windows): 基于元素的数量划分窗口,例如滚动计数窗口(Tumbling Count Window)、滑动计数窗口(Sliding Count Window)。
3.4 窗口计算示例
下面是一个使用滑动窗口的示例,统计每个单词在 10 秒窗口内每隔 5 秒统计一次的计数:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Timeobject SlidingWindowWordCount {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval text = env.fromElements("Flink streaming window example","Flink supports different time semantics","Scala and Flink make a great combination")val counts = text.flatMap(_.toLowerCase.split("\\W+")).filter(_.nonEmpty).map(word => (word, 1)).keyBy(_._1)// 定义滑动窗口:窗口长度 10 秒,滑动步长 5 秒.timeWindow(Time.seconds(10), Time.seconds(5)).sum(1)counts.print()env.execute("Scala Flink Sliding Window WordCount")}
}
flatMap(_.toLowerCase.split("\\W+")):将每行文本转换为小写,然后按非单词字符(如空格、标点符号)进行拆分,将每个单词作为一个独立的元素输出。filter(_.nonEmpty):过滤掉空字符串。map(word => (word, 1)):将每个单词映射为一个二元组(word, 1),其中word是单词,1表示该单词出现了一次。keyBy(_._1):根据二元组的第一个元素(即单词)进行分组,以便后续对每个单词进行独立的统计。.timeWindow(Time.seconds(10), Time.seconds(5)):定义一个滑动窗口,窗口长度为 10 秒,滑动步长为 5 秒。这意味着每 5 秒会生成一个新的窗口,每个窗口包含最近 10 秒内的数据。.sum(1):对每个窗口内的二元组的第二个元素(即计数)进行求和,得到每个单词在该窗口内的出现次数。
四、总结
恭喜你!通过本篇文章,你已经对 Scala 版 Flink DataStream API 的编程基础有了初步的了解。我们学习了 DataStream API 的核心组成部分:数据源、数据转换和数据输出,并通过 WordCount 示例进行了实践。同时,我们也初步接触了窗口的概念、时间语义和基本的窗口计算。
Flink DataStream API 的功能远不止于此,还有更高级的转换算子、更灵活的窗口操作、状态管理、容错机制等等等待我们去探索。在接下来的文章中,我们将继续深入学习这些更高级的主题,带你逐步成为 Flink 流处理的专家!
希望这篇文章能够帮助你迈出 Flink Scala 编程的第一步。如果你有任何问题或建议,欢迎在评论区留言交流。让我们一起在 Flink 的世界里扬帆起航!
如果这篇文章对你有所启发,期待你的点赞关注!

相关文章:
Flink 编程基础:Scala 版 DataStream API 入门
大家好!我是心海 流处理技术在大数据时代正变得越来越重要,而 Apache Flink 作为领先的流处理引擎,凭借其高性能、低延迟和丰富的 API 受到了广泛关注。本文将以 Scala 语言为例,详细讲解 Flink DataStream API 的基本编程模型&am…...
实战|使用环信Flutter SDK构建鸿蒙HarmonyOS应用及推送配置
本文为大家介绍如何在 Flutter 环境创建 Harmony 项目并集成环信即时通讯IM以及环信 Flutter Harmony 推送配置。 已经基于环信的 Flutter 项目也可以参考本文适配鸿蒙端。 一、开发环境要求 前置条件 1.安装DevEco-Studio 2.安装模拟器 DevEco-Studio 下载与操作指导&…...
HTML5好看的水果蔬菜在线商城网站源码系列模板5
文章目录 1.设计来源1.1 主界面1.2 关于我们1.3 商品服务1.4 果蔬展示1.5 联系我们1.6 商品具体信息1.7 登录注册 2.效果和源码2.1 动态效果2.2 源代码 源码下载万套模板,程序开发,在线开发,在线沟通 作者:xcLeigh 文章地址&#…...
宜搭与金蝶互通——连接器建立
一、 进入连接器工厂 图1 连接器入口 二、 新建连接器 图2 新建连接器第一步 1、 连接器显示名,如图2中①所示; 2、 图2中②域名,是金蝶系统API接口里面的“完整服务地址”com之前的信息,不含“https”,如图3中①所示; 3、 Base Url通常为“/”,如图2…...
SP7733:HPYNOS - Happy Numbers I(参考我之前的文章,哈希)
题目大意 我们定义“破坏”整数的过程是对其每一位上的数字的平方求和成为一个新数,如果一个数在经过若干次“破坏”以后变成了 1,那么这个数就是一个高兴的数字,输出变化次数,否则如果永远不会变成 1,输出 −1。 例如…...
《Java 泛型的作用与常见用法详解》
大家好呀!👋 今天我们要聊的是Java中一个超级重要但又让很多初学者头疼的概念——泛型(Generics)。带你彻底搞懂它!💪 准备好你的小本本,我们开始啦~📝 一、为什么需要泛型?&#x…...
【JavaWeb】详细讲解 HTTP 协议
文章目录 一、HTTP简介1.1 概念1.2 特点 二、协议2.1 HTTP-请求协议(1)GET方式(2)POST方式(3)GET和POST的区别: 2.2 HTTP-响应协议(1)格式(2)响应…...
【android bluetooth 框架分析 02】【Module详解 4】【Btaa 模块介绍】
1. 背景 我们在上一篇文章中介绍 HciHal 模块时,有如下代码 // system/gd/hal/hci_hal_android_hidl.ccvoid ListDependencies(ModuleList* list) const {list->add<SnoopLogger>();if (common::init_flags::btaa_hci_is_enabled()) {list->add<ac…...
“星睿O6” AI PC开发套件评测 - Windows on Arm 安装指南和性能测评
引言 Radxa联合此芯科技和安谋科技推出全新的"星睿O6"迷你 ITX 主板。该系统搭载了 CIX P1(CD8180)12 核 Armv9 处理器,拥有高达30T算力的NPU和高性能的GPU,最高配备64GB LPDDR内存,并提供了如 5GbE、HDMI …...
Python 调用 YOLOv11 ONNX
Python 调用 YOLO ONNX 1 下载ONNX文件2 Python代码 1 下载ONNX文件 ONNX下载地址 2 Python代码 import cv2 from ultralytics import YOLOdef check(yolo:str, path:str):# 加载 YOLOv11model YOLO(yolo)# 读取图片img cv2.imread(path)# 推理(可以传文件路径…...
数据通信学习笔记之OSPF路由汇总
区域间路由汇总 路由汇总又被称为路由聚合,即是将一组前缀相同的路由汇聚成一条路由,从而达到减小路由表规模以及优化设备资源利用率的目的,我们把汇聚之前的这组路由称为精细路由或明细路由,把汇聚之后的这条路由称为汇总路由或…...
ASP.NET Core Web API 配置系统集成
文章目录 前言一、配置源与默认设置二、使用步骤1)创建项目并添加配置2)配置文件3)强类型配置类4)配置Program.cs5)控制器中使用配置6)配置优先级测试7)动态重载配置测试8)运行结果示…...
如何判断单片机性能极限?
目录 1、CPU 负载 2、内存使用情况 3、实时性能 4、外设带宽 5、功耗与温度 在嵌入式系统设计中,当系统变得复杂、功能增加时,单片机可能会逐渐逼近其性能极限。及时识别这些极限点对于保证产品质量、稳定性和用户体验至关重要。 当你的嵌入式系统…...
AI在多Agent协同领域的核心概念、技术方法、应用场景及挑战 的详细解析
以下是 AI在多Agent协同领域的核心概念、技术方法、应用场景及挑战 的详细解析: 1. 多Agent协同的定义与核心目标 多Agent系统(MAS, Multi-Agent System): 由多个独立或协作的智能体(Agent)组成ÿ…...
1.凸包、极点、极边基础概念
目录 1.凸包 2.调色问题 3.极性(Extrem) 4.凸组合(Convex Combination) 5.问题转化(Strategy)编辑 6.In-Triangle test 7.To-Left-test 8.极边(Extream Edges) 1.凸包 凸包就是上面蓝色皮筋围出来的范围 这些钉子可以转换到坐标轴中࿰…...
OSCP - Proving Grounds - DriftingBlues6
主要知识点 路径爆破dirtycow内核漏洞提权 具体步骤 总体来讲,这台靶机还是比较直接的,没有那么多的陷阱,非常适合用来学习 依旧是nmap开始,只开放了80端口 Nmap scan report for 192.168.192.219 Host is up (0.42s latency). Not shown: 65534 cl…...
深度理解指针之例题
文章目录 前言题目分析与讲解涉及知识点 前言 对指针有一定了解后,讲一下一道初学者的易错题 题目分析与讲解 先定义一个数组跟一个指针变量 然后把数组名赋值给指针变量————也就是把首地址传到pulPtr中 重点是分析这一句: *(pulPtr…...
java 设计模式之策略模式
简介 策略模式:策略模式可以定制目标对象的行为,它尅通过传入不同的策略实现,来配置目标对象的行为。使用策略模式,就是为了定制目标对象在某个关键点的行为。 策略模式中的角色: 上下文类:持有一个策略…...
LeetCode算法题(Go语言实现)_51
题目 给你两个下标从 0 开始的整数数组 nums1 和 nums2 ,两者长度都是 n ,再给你一个正整数 k 。你必须从 nums1 中选一个长度为 k 的 子序列 对应的下标。 对于选择的下标 i0 ,i1 ,…, ik - 1 ,你的 分数 …...
条件变量condition_variable
1.条件变量用来控制线程同步和协调。 2.调用wait方法,如果条件满足就不挂起,如果条件不满足就挂起线程并释放锁。 3.等到通知后,挂起线程先 竞争锁,然后 判断条件,如果满足条件就往下走,如果不满足就再次挂起并释放锁…...
Solon AI MCP Server 入门:Helloworld (支持 java8 到 java24。国产解决方案)
目前网上能看到的 MCP Server 基本上都是基于 Python 或者 nodejs ,虽然也有 Java 版本的 MCP SDK,但是鲜有基于 Java 开发的。 作为Java 开发中的国产顶级框架 Solon 已经基于 MCP SDK 在进行 Solon AI MCP 框架开发了,本文将使用 Solon AI …...
Maven工具学习使用(十一)——部署项目到仓库
1、使用Maven默认方式 Maven 部署项目时默认使用的上传文件方式是通过 HTTP/HTTPS 协议。要在 Maven 项目中配置部署,您需要在项目的 pom.xml 文件中添加 部分。这个部分定义了如何部署项目的构件(如 JAR 文件)到仓库。。这个部分定义了如何…...
公司内部自建知识共享的方式分类、详细步骤及表格总结,分为开源(对外公开)和闭源(仅限内部),以及公共(全员可访问)和内部(特定团队/项目组)四个维度
以下是公司内部自建知识共享的方式分类、详细步骤及表格总结,分为开源(对外公开)和闭源(仅限内部),以及公共(全员可访问)和内部(特定团队/项目组)四个维度&am…...
Oracle 19c部署之初始化实例(三)
上一篇文章中,我们已经完成了数据库软件安装,接下来我们需要进行实例初始化工作。 一、初始化实例的两种方式 1.1 图形化初始化实例 描述:图形化初始化实例是通过Oracle的Database Configuration Assistant (DBCA)工具完成的。用户通过一系…...
医疗设备预测性维护合规架构:从法规遵循到技术实现的深度解析
在医疗行业数字化转型加速推进的当下,医疗设备预测性维护已成为提升设备可用性、保障医疗安全的核心技术。然而,该技术的有效落地必须建立在严格的合规框架之上。医疗设备直接关乎患者生命健康,其维护过程涉及医疗法规、数据安全、质量管控等…...
Openfeign的最佳实践
文章目录 问题引入一、继承的方式1. 建立独立的Moudle服务2. 服务调用方继承jar包中的接口3. 直接注入继承后的接口进行使用 二、抽取的方式1. 建立独立的Moudle服务2.服务调用方依赖注入 问题引入 openfeign接口的实现和服务提供方的controller非常相似,例如&…...
Python中如何加密/解密敏感信息(如用户密码、token)
敏感信息,如用户密码、API密钥、访问令牌(token)、信用卡号以及其他个人身份信息(PII),构成了现代应用程序和系统中最为关键的部分。这些信息一旦被未经授权的第三方获取,可能引发灾难性的后果,从个人隐私泄露到企业经济损失,甚至是大规模的社会安全问题。保护这些敏感…...
【Java面试系列】Spring Cloud微服务架构中的分布式事务解决方案与Seata框架实现原理详解 - 3-5年Java开发必备知识
【Java面试系列】Spring Cloud微服务架构中的分布式事务解决方案与Seata框架实现原理详解 - 3-5年Java开发必备知识 引言 在微服务架构中,分布式事务是一个不可避免的挑战。随着业务复杂度的提升,如何保证跨服务的数据一致性成为了面试中的高频问题。本…...
从万维网到人工智能基石:大数据技术三十年演进史(1991-2025)
一、万维网的创世纪(1991) 1.1 信息共享的革命性突破 1991年8月6日,蒂姆伯纳斯-李在欧洲核子研究中心(CERN)发布首个万维网(World Wide Web)网站,构建了信息互联的三项核心技术&…...
Buildroot编译过程中下载源码失败
RK3588编译一下recovery,需要把buildroot源码编译一遍。遇到好几个文件都下载失败,如下所示 pm-utils 1.4.1这个包下载失败,下载地址http://pm-utils.freedesktop.org/releases 解决办法,换个网络用windows浏览器下载后ÿ…...
