当前位置: 首页 > article >正文

从零开始学Flink:揭开实时计算的神秘面纱

一、为什么需要Flink?

当你在电商平台秒杀商品时,1毫秒的延迟可能导致交易失败;当自动驾驶汽车遇到障碍物时,10毫秒的计算延迟可能酿成事故。这些场景揭示了一个残酷事实:数据的价值随时间呈指数级衰减。

传统批处理(如Hadoop)像老式火车,必须等所有乘客(数据)到齐才能发车;而流处理(如Flink)如同磁悬浮列车,每个乘客(数据)上车即刻出发。Flink的诞生,让数据从"考古材料"变为"新鲜血液"。

二、初识Flink

1. 定义

Apache Flink是由德国柏林工业大学于2009年启动的研究项目,2014年进入Apache孵化器,现已成为实时计算领域的事实标准。其核心能力可用一句话概括:对无界和有界数据流进行有状态计算。

2. 核心特性

流处理优先:批处理是流处理的特例(有界数据流)
事件时间语义:按数据真实发生时间处理(而非系统接收时间)
精确一次语义:确保计算结果100%准确
亚秒级延迟:处理延迟可控制在毫秒级

3. 技术架构

Flink运行时架构包含三个关键角色:

  • JobManager:大脑中枢,负责任务调度与检查点管理
  • TaskManager:肌肉组织,执行具体计算任务
  • Dispatcher:网关系统,提供REST接口提交作业

三、环境搭建

环境要求

​1. ​Windows 10 2004 或更高版本​​(建议使用 Windows 11)
​2. ​已启用 WSL 2​​
3. 存储空间:至少 1GB 可用空间

详细安装步骤

步骤 1:启用 WSL

在 PowerShell 中以管理员身份运行以下命令:

# 启用 WSL 功能dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsystem-Linux /all /norestart# 启用虚拟机平台dism.exe /online /enable-feature /featurename:VirtualMachinePlatform /all /norestart# 设置 WSL 2 为默认版本wsl --set-default-version 2# 重启电脑(必须步骤)
步骤 2:安装 Ubuntu

​1. 打开 Microsoft Store
​2. 搜索安装 ​​Ubuntu 22.04 LTS​​
3. 启动 Ubuntu 并创建用户名和密码

步骤 3:安装 Java 17

在 Ubuntu 终端执行:

  # 更新软件包列表sudo apt update# 安装 Java 17sudo apt install -y openjdk-17-jdk# 设置环境变量echo 'export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64' >>  /etc/profileecho 'export PATH=$PATH:$JAVA_HOME/bin' >> /etc/profilesource /etc/profile# 验证安装java -version# 应显示类似:OpenJDK Runtime Environment (build 17.0.14+...)
步骤 4:下载并安装 Flink 1.20.1
  # 下载 Flinkwget https://archive.apache.org/dist/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz# 解压安装包tar xzf flink-1.20.1-bin-scala_2.12.tgz# 移动到安装目录sudo mv flink-1.20.1 /opt/flink# 设置环境变量echo 'export FLINK_HOME=/opt/flink' >>  /etc/profileecho 'export PATH=$PATH:$FLINK_HOME/bin' >> /etc/profilesource /etc/profile
步骤 5:修改内存配置

编辑配置文件:

vi /opt/flink/conf/conf.yaml

修改以下关键参数:

  jobmanager:bind-host: 0.0.0.0rpc:address: localhostport: 6123memory:process:size: 1600mexecution:failover-strategy: regiontaskmanager:bind-host: 0.0.0.0host: localhostnumberOfTaskSlots: 2memory:process:size: 2048mparallelism:default: 2rest:address: localhostbind-address: 0.0.0.0port: 8081
步骤 6:启动 Flink 集群

# 启动集群(JobManager + TaskManager)
$FLINK_HOME/bin/start-cluster.sh# 检查运行状态
jps
步骤 7:访问 Web UI

在 Windows 浏览器中访问:
http://localhost:8081

四、实战第一个Flink程序:BatchWordCount

下面将详细介绍如何在Flink环境中创建并运行第一个WordCount程序。这个经典示例将带你从项目创建到代码执行,全面体验Flink开发流程。

项目结构设计

采用多模块Gradle项目,结构清晰:

  flink-learning/├── build.gradle                 # 根项目构建配置├── settings.gradle              # 多模块配置├── libraries.gradle            # 依赖统一管理├── data/                        # 数据文件夹│   ├── input.txt               # 输入文件│   └── output.txt              # 输出文件└── wordcount/                  # WordCount模块├── build.gradle            # 模块构建配置└── src/main/java           # 源代码目录└── cn/com/daimajiangxin/flink/wordcount└── BatchWordCount.java # 主程序

核心文件配置

详细配置参考代码仓库:https://gitee.com/daimajiangxin/flink-learning.git

WordCount代码实现

package cn.com.daimajiangxin.flink.wordcount;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;public class BatchWordCount {public static void main(String[] args) throws Exception {// 转换Windows路径格式args = convertWindowsPaths(args);// 参数校验if (args.length < 2) {System.err.println("Usage: BatchWordCount <input> <output> [--parallelism=N]");System.err.println("Example: BatchWordCount input.txt output.txt --parallelism=4");System.exit(1);}final String inputPath = args[0];final String outputPath = args[1];int parallelism = 1; // 默认并行度// 1. 创建流批一体执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 明确指定批处理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 设置并行度和作业名称env.setParallelism(parallelism);env.getConfig().enableObjectReuse();// 2. 使用最新的FileSource API读取输入数据DataStream<String> text = createFileSource(env, inputPath, parallelism);// 3. 定义处理逻辑SingleOutputStreamOperator<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).name("Tokenizer").setParallelism(parallelism).keyBy(value -> value.f0).reduce(new SumReducer()).name("SumReducer").setParallelism(parallelism).returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));// 4. 输出结果到文件counts.writeAsText(outputPath).name("FileSink").setParallelism(1);// 5. 执行作业try {System.out.println("Starting Flink WordCount job...");System.out.println("Input path: " + inputPath);System.out.println("Output path: " + outputPath);System.out.println("Parallelism: " + parallelism);env.execute("Flink Batch WordCount Example");System.out.println("Job completed successfully!");} catch (Exception e) {System.err.println("Job execution failed: " + e.getMessage());e.printStackTrace();}}// Windows路径转换private static String[] convertWindowsPaths(String[] args) {if (args.length >= 1) {args[0] = "file:///" + args[0].replace("\\", "/").replace(" ", "%20");}if (args.length >= 2) {args[1] = "file:///" + args[1].replace("\\", "/").replace(" ", "%20");}return args;}// 创建文件源private static DataStream<String> createFileSource(StreamExecutionEnvironment env, String path, int parallelism) {// 使用file://前缀Path filePath = new Path(path);System.out.println("Loading file from: " + filePath);TextLineFormat format = new TextLineFormat(StandardCharsets.UTF_8);FileSource<String> fileSource = FileSource.forRecordStreamFormat(format, filePath).build();WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(10));return env.fromSource(fileSource,watermarkStrategy,"FileSource").name("FileSource").setParallelism(1);}// 分词器public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// 过滤空行if (value == null || value.trim().isEmpty()) return;// 转换为小写并分割单词String[] words = value.toLowerCase().split("\\W+");for (String word : words) {if (!word.isEmpty()) {out.collect(Tuple2.of(word, 1));}}}}// 累加器public static final class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) {return Tuple2.of(v1.f0, v1.f1 + v2.f1);}}
}

输入文件示例 (input.txt)

input.txt参考代码仓库:https://gitee.com/daimajiangxin/flink-learning.git

运行Flink作业

这里讲述在IDEA中运行刚刚写的BatchWordCount 任务,配置IDEA的APPlication。

VM选项配置
  --add-exports=java.base/sun.net.util=ALL-UNNAMED--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED--add-opens=java.base/java.lang=ALL-UNNAMED--add-opens=java.base/java.net=ALL-UNNAMED--add-opens=java.base/java.io=ALL-UNNAMED--add-opens=java.base/java.nio=ALL-UNNAMED--add-opens=java.base/sun.nio.ch=ALL-UNNAMED--add-opens=java.base/java.lang.reflect=ALL-UNNAMED--add-opens=java.base/java.text=ALL-UNNAMED--add-opens=java.base/java.time=ALL-UNNAMED--add-opens=java.base/java.util=ALL-UNNAMED--add-opens=java.base/java.util.concurrent=ALL-UNNAMED--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
程序参数
 代码放置路径\\flink-learning\\data\\input.txt代码放置路径\bigdata\\flink-learning\\data\\output.txt
运行BatchWordCount类

Run 或者Debug BatchWordCount的 APPlication.

20250608143813

预期输出

运行成功data目录下会生成output的文件。

(processing,1)
(batch,2)
(flink,2)
(hello,2)

20250608143152

五、技术要点解析

  • 流批一体API:Flink 1.20+使用StreamExecutionEnvironment统一处理批流
  • 文件源:使用FileSource API
  • 精确一次处理:批处理天然支持Exactly-Once语义
  • 并行度控制:通过setParallelism控制任务并行度
  • Windows路径适配:统一转换为file:///开头的URI格式

六、学习路线建议

完成WordCount后,可逐步探索:

  • 实时流处理(SocketWordCount)
  • 状态管理(StatefulProcessing)
  • 事件时间处理(EventTimeProcessing)
  • 窗口计算(TumblingWindow、SlidingWindow)
  • CEP复杂事件处理
  • Table API和SQL
    通过这个完整的BatchWordCount实例,你已经掌握了Flink项目的搭建、编码和运行全流程。随着Flink在实时数据处理领域的广泛应用,这些技能将成为大数据开发的宝贵资产。

相关文章:

从零开始学Flink:揭开实时计算的神秘面纱

一、为什么需要Flink&#xff1f; 当你在电商平台秒杀商品时&#xff0c;1毫秒的延迟可能导致交易失败&#xff1b;当自动驾驶汽车遇到障碍物时&#xff0c;10毫秒的计算延迟可能酿成事故。这些场景揭示了一个残酷事实&#xff1a;数据的价值随时间呈指数级衰减。 传统批处理…...

一、ES6-let声明变量【解刨分析最详细】

一、块级作用域 { let Tim"Tim是靓仔&#xff01;" } console.log("Tim:",Tim) 打印结果&#xff1a;Tim未进行任何定义&#xff01; 原因&#xff1a;因为Tim定义再块级{}里面&#xff0c;它的声音Tim只服务于该块级里面。而打印结果是再块级外面&#…...

Appium如何支持ios真机测试

ios模拟器上UI自动化测试 以appiumwebdriverio为例&#xff0c;详细介绍如何在模拟器上安装和测试app。在使用ios模拟器前&#xff0c;需要安装xcode&#xff0c;创建和启动一个simulator。simulator创建好后&#xff0c;就可以使用xcrun simctl命令安装被测应用并开始测试了。…...

JDK17 Http Request 异步处理 源码刨析

为什么可以异步&#xff1f; #调用起始源码 // 3. 发送异步请求并处理响应 CompletableFuture future client.sendAsync( request, HttpResponse.BodyHandlers.ofString() // 响应体转为字符串 ).thenApply(response -> { // 状态码检查&#xff08;非200系列抛出异常&…...

【Zephyr 系列 8】构建完整 BLE 产品架构:状态机 + AT 命令 + 双通道通信实战

🧠关键词:Zephyr、BLE、状态机、双向透传、AT 命令、Buffer、主从共存、系统架构 📌适合人群:希望开发 BLE 产品(模块/标签/终端)具备可控、可测、可维护架构的开发者 🧭 引言:从“点功能”到“系统架构” 前面几篇我们已经逐步构建了 BLE 广播、连接、数据透传系统…...

【Mac 从 0 到 1 保姆级配置教程 16】- Docker 快速安装配置、常用命令以及实际项目演示

文章目录 前言1. Docker 是什么&#xff1f;2. 为什么要使用 Docker&#xff1f; 安装 Docker1. 安装 Docker Desktop2. 安装 OrbStack3. Docker Desktop VS OrbStack5. 验证安装 使用 Docker 运行项目1. 克隆项目到本地2. 进入项目目录3. 启动容器: 查看运行效果1. OrbStack 中…...

2025-05-01-决策树算法及应用

决策树算法及应用 参考资料 GitHub - zhaoyichanghong/machine_learing_algo_python: implement the machine learning algorithms by p(机器学习相关的 github 仓库)决策树实现与应用决策树 概述 机器学习算法分类 决策树算法 决策树是一种以树状结构对数据进行划分的分类…...

Redis知识体系

1. 概述 本文总结了Redis基本的核心知识体系&#xff0c;在学习Redis的过程中&#xff0c;可以将其作为学习框架&#xff0c;以此更好的从整体的角度去理解和学习Redis的内容和设计思想。同时知识框架带来的好处是可以帮助我们更好的进行记忆&#xff0c;在大脑中形成相应的知识…...

mysql-MySQL体系结构和存储引擎

1. MySQL体系结构和存储引擎 MySQL被设计成一个单进程多线程架构的数据库&#xff0c;MySQL数据库实例在系统上的表现就是一个进 程当启动实例时&#xff0c;读取配置文件&#xff0c;根据配置文件的参数来启动数据库实例&#xff1b;若没有&#xff0c;按编译时的默认 参数设…...

Pycharm 函数注释

1 Docstring format File -> Settings -> Tools -> Python Integrated Tools -> Docstrings -> Docstring format&#xff0c;选择google File -> Settings -> Editor -> General -> Smart Keys -> Insert type placeholders in the documenta…...

如何使用 Redis 快速实现布隆过滤器?

以下是使用 Redis 实现布隆过滤器的两种方案&#xff0c;结合原理说明和操作步骤&#xff1a; 方案一&#xff1a;手动实现&#xff08;基于 Redis Bitmap&#xff09; 原理 利用 Redis 的 SETBIT 和 GETBIT 操作位数组&#xff0c;结合多个哈希函数计算位置。 步骤 确定参数…...

黑马Javaweb Request和Response

一.介绍 在 Web 开发中&#xff0c;HttpServletRequest 和 HttpServletResponse 是两个非常重要的类&#xff0c;它们分别用于处理客户端的请求和服务器的响应。以下是它们的详细说明和使用方法&#xff1a; 1. HttpServletRequest HttpServletRequest 是一个接口&#xff0…...

山东大学深度学习2025年期末考试

一、名词解释&#xff08;24&#xff09; 1.反向传播 2.激活函数 3.梯度裁剪 4.数据增强 5.迁移学习 6.过拟合 7.word2Vec 8.注意力机制 二、简答题&#xff08;48&#xff09; 1.池化的概念&#xff08;作用&#xff09;以及常见的两种池化操作 2.LSTM为什么能解决…...

添加按钮跳转页面并且根据网站的用户状态判断是否显示按钮

现在我们需要的是为页面添加一个按钮&#xff0c;这个按钮是动态的&#xff0c;需要根据网站用户登录过后是否是vip来判断是否显示&#xff0c;然后按钮的效果是跳转到某个页面。 首先我们需要在页面中找到我们需要添加按钮的位置&#xff0c;找到对应的文件&#xff0c;然后比…...

Gerrit+repo管理git仓库,如果本地有新分支不能执行repo sync来同步远程所有修改,会报错

问题&#xff1a;创建一个本地分支TEST 来关联远程已有分支origin/TEST&#xff0c;直接执行repo sync可能会出现问题&#xff1a;比如&#xff0c;本地分支TES会错乱关联到origin/master&#xff0c;或者拉不下最新代码等问题。 // git checkout -b 新分支名 远程分支名字 git…...

豆瓣图书评论数据分析与可视化

【题目描述】豆瓣图书评论数据爬取。以《平凡的世界》、《都挺好》等为分析对象&#xff0c;编写程序爬取豆瓣读书上针对该图书的短评信息&#xff0c;要求&#xff1a; &#xff08;1&#xff09;对前3页短评信息进行跨页连续爬取&#xff1b; &#xff08;2&#xff09;爬取…...

Vue ④-组件通信 || 进阶语法

组件三大部分 template&#xff1a;只有能一个根元素 style&#xff1a;全局样式(默认)&#xff1a;影响所有组件。局部样式&#xff1a;scoped 下样式&#xff0c;只作用于当前组件 script&#xff1a;el 根实例独有&#xff0c;data 是一个函数&#xff0c;其他配置项一致…...

0x-2-Oracle Linux 9上安装JDK配置环境变量

一、JDK选择和使用 安装完Oracle Linux9.6&#xff0c;同时使用rpm包安装Oracle 23 ai free后&#xff0c; 将面临sqlcl程序无法使用和java无法使用&#xff0c;需要相应进行变量配置问题。 1、java 环境运行不存在&#xff0c;Oracle 23ai free安装后默认安装JDK 11 /opt/…...

深入理解卷积神经网络:从原理到应用

在人工智能领域&#xff0c;卷积神经网络&#xff08;Convolutional Neural Network, CNN&#xff09;无疑是计算机视觉领域的璀璨明珠。从 1998 年 Yann LeCun 提出 LeNet-5 实现手写数字识别&#xff0c;到 2012 年 AlexNet 在 ImageNet 大赛上创造历史性突破&#xff0c;CNN…...

从入门到实战:AI学习路线全解析——避坑指南

分享一下阿里的人工智能学习路线,为感兴趣系统学习的小伙伴们探路。 一、谁适合学这门AI课程?五类人群的精准定位 无论你是零基础小白还是职场转型者,这套系统化课程都能为你量身定制成长路径: 零基础爱好者(无编程/数学背景) 课程提供Python和数学前置学习建议,先补基…...

Spring Boot + Thymeleaf 防重复提交

在 Spring Boot 与 Thymeleaf 结合的 Web 应用中&#xff0c;防止重复提交可以采用token 机制 客户端禁用按钮的方式实现&#xff0c;在高并发场景下&#xff0c;考虑使用 Redis 存储 token 而非 Session。 第一步&#xff1a;后端实现 Controller public class FormControl…...

uniapp实现的简约美观的星级评分组件

采用 uniapp 实现的一款简约美观的星级评分模板&#xff0c;提供丝滑动画效果&#xff0c;用户可根据自身需求进行自定义修改、扩展&#xff0c;纯CSS、HTML实现&#xff0c;支持web、H5、微信小程序&#xff08;其他小程序请自行测试&#xff09; 可到插件市场下载尝试&#x…...

AWS Elastic Beanstalk + CodePipeline(Python Flask Web的国区CI/CD)

目标 需要使用AWS Elastic Beanstalk 部署一个Python的Flask Web应用&#xff0c;并且使用CodePipeline作为CI/CD工作流。 eb部署图 前提 假设你已经有一个能够正常运行的Python的Flask Web应用项目代码&#xff0c;而且需要对已有Flask工程做一些调整。由于AWS Elastic Bea…...

多线程语音识别工具

软件介绍 本文介绍一款支持大厂接口的语音转文字工具&#xff0c;具备免配置、免费使用的特点。 软件特性 该工具是一款完全免费的桌面端应用程序&#xff0c;部署于开源社区平台&#xff0c;其核心优势在于整合了多家技术供应商的接口资源。 操作方式 用户只需将音频…...

前端对WebSocket进行封装,并建立心跳监测

WebSocket的介绍&#xff1a; WebSocket 是一种在客户端和服务器之间进行全双工、双向通信的协议。它是基于 HTTP 协议&#xff0c;但通过升级&#xff08;HTTP 升级请求&#xff09;将连接转换为 WebSocket 协议&#xff0c;从而提供更高效的实时数据交换。 WebSocket 的特点…...

DiMTAIC 2024 数字医学技术及应用创新大赛-甲状腺B超静态及动态影像算法赛-参赛项目

参赛成绩 项目介绍 去年参加完这个比赛之后&#xff0c;整理了项目文件和代码&#xff0c;虽然比赛没有获奖&#xff0c;但是参赛过程中自己也很有收获&#xff0c;自己一个人搭建了完整的pipeline并基于此提交了多次提高成绩&#xff0c;现在把这个项目梳理成博客&#xff0c…...

window安装docker\docker-compose

安装前配置 打开控制面板,参照下图打开“启动或关闭windows功能”,Hyper-V 和容器需要启用 程序和功能 启动或关闭windows功能 勾选Hyper-V 安装路径配置 Docker在Windows上的默认安装路径为C:\Program Files\Docker。 以管理员身份运行CMD在D盘,dev文件夹下创建Docker文…...

Jenkins的学习与使用(CI/CD)

文章目录 前言背景CI/CDJenkins简介Jenkins特性 安装Jenkins工作流程&#xff08;仅供参考&#xff09;安装maven和其他插件新建任务任务源码管理配置maven配置git&#xff08;非必需&#xff09; 尝试手动构建jar包可能遇到的错误 发布到远程服务器前置清理工作构建触发器git钩…...

vue-14(使用 ‘router.push‘ 和 ‘router.replace‘ 进行编程导航)

使用 ‘router.push’ 和 ‘router.replace’ 进行编程导航 编程导航是使用 Vue Router 构建动态和交互式 Web 应用程序的一个重要方面。它允许您根据应用程序逻辑、用户作或特定条件控制用户的导航流。您可以使用 router.push 和 router.replace 方法以编程方式导航到不同的路…...

使用WPF的Microsoft.Xaml.Behaviors.Wpf中通用 UI 元素事件

Nuget下载之后记得要先引用下面的 xmlns:i"http://schemas.microsoft.com/xaml/behaviors" <!-- 鼠标事件 --> <i:EventTrigger EventName"MouseEnter"/> <!-- 鼠标进入 --> <i:EventTrigger EventName"MouseLeave"/&g…...