当前位置: 首页 > article >正文

Apache Flink 快速入门

Flink开发环境准备学习一门新的编程语言时往往会从hello world程序开始而接触一套新的大数据计算框架时则一般会从WordCount案例入手下面以大数据中最经典入门案例WordCount为例来编写Flink代码Flink底层源码是基于Java代码进行开发在Flink编程中我们除了可以使用Java语言来进行编写Flink程序外还可以使用Scala、Python语言来进行编写Flink程序在后续章节中我们将会主要使用Java和Scala来编写Flink程序。下面来准备下Flink开发环境。Flink版本本套课程中我们采用Flink最新版本1.16.0Flink1.16.0版本官方文档地址https://nightlies.apache.org/flink/flink-docs-release-1.16/JDK环境Flink核心模块均采用Java开发所以运行环境需要依赖JDK,Flink可以基于类UNIX 环境中运行例如Linux、Max OS、Windows等在这些系统上运行Flink时都需要配置JDK环境Flink 1.16.0版本需要JDK版本为JDK11,目前版本也支持使用JDK8后续版本对JDK8的支持将会移除。考虑到Flink后期与一些大数据框架进行整合这些大数据框架对JDK11的支持并不完善例如Hive3.1.3版本还不支持JDK11所以本课程采用JDK8来开发Flink。对JDK8安装及配置不再详述。附JDK11 下载地址如下https://www.oracle.com/java/technologies/javase-jdk11-downloads.html。开发工具我们可以选择IntelliJ IDEA或者Eclipse作为Flink应用的开发IDEFlink开发官方建议使用IntelliJ IDEA因为它默认集成了Scala和Maven环境使用更加方便我们这门课使用IntelliJ IDEA开发工具具体安装步骤不再详述。Maven环境通过IntelliJ IDEA进行开发Flink Application时可以使用Maven来作为项目jar包管理工具需要在本地安装Maven及配置Maven的环境变量需要注意的是Maven版本需要使用3.0.4及以上否则编译或开发过程中会有问题。这里使用Maven 3.2.5版本。Scala环境Flink开发语言可以选择Java、Scala、Python如果用户选择使用Scala作为Flink应用开发语言则需要安装Scala执行环境。在Flink1.15之前版本如果只是使用Flink的Java api 对于一些没有Scala模块的包和表相关模块的包需要在Maven引入对应的包中加入scala后缀例如flink-table-planner_2.11后缀2.11代表的就是Scala版本。在Flink1.15.0版本后Flink添加对opting-out排除 Scala的支持如果你只使用Flink的Java api导入包也不必包含scala后缀你可以使用任何Scala版本。如果使用Flink的Scala api需要选择匹配的Scala版本。从Flink1.7版本往后支持Scala 2.11和2.12版本从Flink1.15.0版本后只支持Scala 2.12不再支持Scala 2.11。Scala环境可以通过本地安装Scala执行环境也可以通过Maven依赖Scala-lib引入如果本地安装了Scala某个版本建议在Maven中添加Scala-lib依赖。Scala2.12.8之后的版本与之前的2.12.x版本不兼容,建议使用Scala2.12.8之后版本。Hadoop环境Flink可以操作HDFS中的数据及基于Yarn进行资源调度所以需要对应的Hadoop环境Flink1.16.0版本支持的Hadoop最低版本为2.8.5本课程中我们使用Hadoop3.3.4版本。关于Hadoop3.3.4版本搭建参照第三章节Flink入门案例需求读取本地数据文件统计文件中每个单词出现的次数。IDEA Project创建及配置本课程编写Flink代码选择语言为Java和Scala所以这里我们通过IntelliJ IDEA创建一个目录其中包括Java项目模块和Scala项目模块将Flink Java api和Flink Scala api分别在不同项目模块中实现。步骤如下打开IDEA创建空项目在IntelliJ IDEA 中安装Scala插件使用IntelliJ IDEA开发Flink如果使用Scala api 那么还需在IntelliJ IDEA中安装Scala的插件如果已经安装可以忽略此步骤下图为以安装Scala插件。打开Structure,创建项目新模块FlinkScalaCode模块导入Flink Maven依赖如下properties project.build.sourceEncodingUTF-8/project.build.sourceEncoding maven.compiler.source1.8/maven.compiler.source maven.compiler.target1.8/maven.compiler.target flink.version1.16.0/flink.version slf4j.version1.7.31/slf4j.version log4j.version2.17.1/log4j.version scala.version2.12.10/scala.version scala.binary.version2.12/scala.binary.version /properties dependencies !-- Flink批和流开发依赖包 -- dependency groupIdorg.apache.flink/groupId artifactIdflink-scala_${scala.binary.version}/artifactId version${flink.version}/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-streaming-scala_${scala.binary.version}/artifactId version${flink.version}/version /dependency dependency groupIdorg.apache.flink/groupId artifactIdflink-clients/artifactId version${flink.version}/version /dependency !-- Scala包 -- dependency groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId version${scala.version}/version /dependency dependency groupIdorg.scala-lang/groupId artifactIdscala-compiler/artifactId version${scala.version}/version /dependency dependency groupIdorg.scala-lang/groupId artifactIdscala-reflect/artifactId version${scala.version}/version /dependency !-- slf4jlog4j 日志相关包 -- dependency groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId version${slf4j.version}/version /dependency dependency groupIdorg.apache.logging.log4j/groupId artifactIdlog4j-to-slf4j/artifactId version${log4j.version}/version /dependency /dependencies案例数据准备Flink案例实现数据源分为有界和无界之分有界数据源可以编写批处理程序无界数据源可以编写流式程序。DataSet API用于批处理DataStream API用于流式处理。批处理使用ExecutionEnvironment和DataSet流式处理使用StreamingExecutionEnvironment和DataStream。DataSet和DataStream是Flink中表示数据的特殊类DataSet处理的数据是有界的DataStream处理的数据是无界的这两个类都是不可变的一旦创建出来就无法添加或者删除数据元。Flink 批数据处理案例以上输出结果开头展示的是处理当前数据的线程一个Flink应用程序执行时默认的线程数与当前节点cpu的总线程数有关。Flink批和流案例总结关于以上Flink 批数据处理和流式数据处理案例有以下几个点需要注意Flink程序编写流程总结编写Flink代码要符合一定的流程Flink代码编写流程如下a. 获取flink的执行环境批和流不同Execution Environment。b. 加载数据数据-- soure。c. 对加载的数据进行转换-- transformation。d. 对结果进行保存或者打印-- sink。e. 触发flink程序的执行 --env.execute()在Flink批处理过程中不需要执行execute触发执行在流式处理过程中需要执行env.execute触发程序执行。关于Flink的批处理和流处理上下文环境创建Flink批和流上下文环境有以下三种方式批处理上下文创建环境如下//设置Flink运行环境如果在本地启动则创建本地环境如果是在集群中启动则创建集群环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();//指定并行度创建本地环境LocalEnvironment localEnv ExecutionEnvironment.createLocalEnvironment(10);//指定远程JobManagerIp 和RPC 端口以及运行程序所在Jar包及其依赖包ExecutionEnvironment remoteEnv ExecutionEnvironment.createRemoteEnvironment(JobManagerHost, 6021, 5, application.jar);流处理上下文创建环境如下//设置Flink运行环境如果在本地启动则创建本地环境如果是在集群中启动则创建集群环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//指定并行度创建本地环境LocalStreamEnvironment localEnv StreamExecutionEnvironment.createLocalEnvironment(5);//指定远程JobManagerIp 和RPC 端口以及运行程序所在Jar包及其依赖包StreamExecutionEnvironment remoteEnv StreamExecutionEnvironment.createRemoteEnvironment(JobManagerHost, 6021, 5, application.jar);同样在Scala api 中批和流创建Flink 上下文环境也有以上三种方式在实际开发中建议批处理使用ExecutionEnvironment.getExecutionEnvironment()方式创建。流处理使用StreamExecutionEnvironment.getExecution-Environment()方式创建。Flink批和流 Java 和 Scala导入包不同在编写Flink Java api代码和Flink Scala api代码处理批或者流数据时引入的ExecutionEnvironment或StreamExecutionEnvironment包不同在编写代码时导入错误的包会导致编程有问题。批处理不同API引入ExecutionEnvironment如下//Flink Java api 引入的包import org.apache.flink.api.java.ExecutionEnvironment;//Flink Scala api 引入的包import org.apache.flink.api.scala.ExecutionEnvironment流处理不同API引入StreamExecutionEnvironment如下//Flink Java api 引入的包import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;//Flink Scala api 引入的包import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentFlink Java Api中创建 Tuple 方式在Flink Java api中创建Tuple2时可以通过new Tuple2方式也可以通过Tuple2.of方式两者本质一样。Flink Scala api需要导入隐式转换在Flink Scala api中批处理和流处理代码编写过程中需要导入对应的隐式转换来推断函数操作后的类型在批和流中导入隐式转换不同具体如下//Scala 批处理导入隐式转换使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.api.scala._//Scala 流处理导入隐式转换使用Scala API 时需要隐式转换来推断函数操作后的类型import org.apache.flink.streaming.api.scala._关于Flink Java api 中的 returns 方法Flink Java api中可以使用Lambda表达式当涉及到使用泛型Java会擦除泛型类型信息需要最后调用returns方法指定类型明确声明类型告诉系统函数生成的数据集或者数据流的类型。批和流对数据进行分组方法不同批和流处理中都是通过readTextFile来读取数据文件对数据进行转换处理后Flink批处理过程中通过groupBy指定按照什么规则进行数据分组,groupBy中可以根据字段位置指定key例如groupBy(0)如果数据是POJO自定义类型也可以根据字段名称指定key(例如groupBy(name))对于复杂的数据类型也可以通过定义key的选择器KeySelector来实现分组的key。Flink流处理过程中通过keyBy指定按照什么规则进行数据分组keyBy中也有以上三种方式指定分组key建议使用通过KeySelector来选择key其他方式已经过时。关于DataSet Api (Legacy)软弃用Flink架构可以处理批和流Flink 批处理数据需要使用到Flink中的DataSet API此API 主要是支持Flink针对批数据进行操作本质上Flink处理批数据也是看成一种特殊的流处理有界流所以没有必要分成批和流两套API从Flink1.12版本往后Dataset API 已经标记为Legacy(已过时)已被官方软弃用官方建议使用Table API 或者SQL 来处理批数据我们也可以使用带有Batch执行模式的DataStream API来处理批数据在未来Flink版本中DataSet API 将会被删除。关于这些API 具体使用后续章节会进行讲解。DataStream BATCH模式除了在代码中设置处理模式外还可以在Flink配置文件(flink-conf.yaml)中设置execution.runtime-mode参数来指定对应的模式也可以在集群中提交Flink任务时指定execution.runtime-mode来指定Flink官方建议在提交Flink任务时指定执行模式这样减少了代码配置给Flink Application提供了更大的灵活性提交任务指定参数如下$FLINK_HOME/bin/flink run -Dexecution.runtime-modeBATCH -c xxx xxx.jar关于Flink集群提交任务及Flink flink-conf.yaml配置文件在下个章节集群搭建会进行介绍。

相关文章:

Apache Flink 快速入门

Flink开发环境准备学习一门新的编程语言时,往往会从"hello world"程序开始,而接触一套新的大数据计算框架时,则一般会从WordCount案例入手,下面以大数据中最经典入门案例WordCount为例,来编写Flink代码&…...

如何用免费纹理打包器优化游戏性能:5个实战技巧提升加载速度

如何用免费纹理打包器优化游戏性能:5个实战技巧提升加载速度 【免费下载链接】free-tex-packer Free texture packer 项目地址: https://gitcode.com/gh_mirrors/fr/free-tex-packer Free Texture Packer 是一款完全开源的精灵表生成工具,专门为游…...

基于 ComfyUI 本地部署 的「图像 + 音频 → 口型匹配 + 自动运镜」MV 全流程指南

基于 ComfyUI 本地部署 的「图像 + 音频 → 口型匹配 + 自动运镜」MV 全流程指南 适用人群:有一定电脑(Windows / macOS / Linux)操作经验、显卡(GPU)支持 CUDA/ROCm、能自行安装 Python 第三方库的技术爱好者。 目标:输入一张人像图片 + 一段伴奏/人声音频,自动生…...

基于OpenHarmony的智慧农业控制系统-硬件部分【1】

1.整体设备2.硬件清单:一、主控单元(边缘网关)硬件名称型号/规格数量备注小熊派开发板BearPi-HM Nano(搭载 Hi3861 芯片,支持 OpenHarmony LiteOS)1块核心控制单元,集成 Wi-Fi,负责数…...

手把手教你从零搭建 MCP Server:AI 连接万物的保姆级实战教程

为什么要学 MCP? 说实话,最近半年 AI 开发圈最火的协议就是 MCP(Model Context Protocol)了。你可能已经用上了各种 AI 助手,但有没有想过:这些 AI 怎么连接你的数据库?怎么读你的本地文件&…...

萨科微宋仕强“华强北山寨手机”研究

萨科微宋仕强“华强北山寨手机”研究(十六),手机的灰色产业链。华强北每个手机柜台背后都有灰色供应链支撑。如香港手机比华强北便宜,就通过各种渠道从香港走私过来。沙头角的中英街两边分属于香港和深圳,香港一侧的走…...

Vue大屏自适应解决方案:如何应对多分辨率设备下的数据可视化挑战

Vue大屏自适应解决方案:如何应对多分辨率设备下的数据可视化挑战 【免费下载链接】v-scale-screen Vue large screen adaptive component vue大屏自适应组件 项目地址: https://gitcode.com/gh_mirrors/vs/v-scale-screen 在数字化转型浪潮中,企业…...

毕业论文神器!2026年好用AI论文平台榜单,高质初稿轻松写

2026 年实测 10 款主流 AI 论文工具,千笔AI以全流程覆盖 语义级降重 免费查重领跑综合榜;ThouPen 稳坐留学生毕业全流程工具头把交椅;免费工具中DeepSeek Scholar、豆包学术版表现亮眼,30 分钟即可生成万字高质量初稿&#xff0…...

良心盘点!2026AI写作辅助软件榜单(覆盖 99% 毕业论文需求)

本文精选13 款2026 年实测 AI 论文工具,按全流程全能型、垂直领域专精型、润色降重专家、文献管理助手四大类别排序,覆盖从选题到定稿全链路,适配本科 / 硕博 / 期刊全场景,附选型速查表与避坑指南,帮你快速找到最佳拍…...

毕业论文难写?2026年AI论文平台排行榜权威发布,轻松定稿不是梦!

写论文效率低、熬夜赶稿、查重不过关?别慌!2026 年最新 AI 论文写作软件排行榜来了,覆盖选题、大纲、初稿、润色、降重、格式、文献引用全流程,帮你精准匹配最适合的学术助手,彻底告别论文内耗!&#x1f3c…...

传统开发VS低代码开发,谁更胜一筹?

低代码开发,让企业应用搭建像搭积木一样简单 在当今数字化时代,企业对于应用程序的需求日益增长。然而,传统的软件开发方式往往面临着开发周期长、成本高、技术门槛高等问题,这使得许多企业在数字化转型的道路上举步维艰。而低代…...

3PEAK思瑞浦 TP321-DF0R DFN1X1-4 运算放大器

特性 通用型,低成本: 增益带宽积:1MHz 低静态电流:45A/放大器 偏移电压:最大5.0毫伏 偏移电压温度漂移:2uV/C 输入偏置电流:10pA 共模抑制比/电源抑制比:90dB 单位增益稳定 轨到轨输入和输出 过驱动输入无相位反转 供电电压范围: TP321-DFOR: 2.1V 至 5.5V 其他部分…...

抖音下载神器:免费批量下载抖音视频、图集、音乐和直播回放完整指南

抖音下载神器:免费批量下载抖音视频、图集、音乐和直播回放完整指南 【免费下载链接】douyin-downloader A practical Douyin downloader for both single-item and profile batch downloads, with progress display, retries, SQLite deduplication, and browser f…...

CUDA为什么能统治AI世界?NVIDIA真正可怕的并不是GPU

前言很多人第一次接触AI行业时,都会听到一个词:CUDA。而且你会发现一个非常奇怪的现象:很多AI框架、深度学习项目、GPU训练环境,几乎都默认要求:NVIDIA显卡CUDA环境甚至很多时候:没有CUDA,AI项目…...

快速开发AI客服原型时如何利用Taotoken分钟级接入多模型

🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 快速开发AI客服原型时如何利用Taotoken分钟级接入多模型 在探索和构建AI客服原型时,开发者常常面临一个核心矛盾&#…...

怎么区分储能PACK线源头工厂和中间商?

在储能 PACK 自动化产线行业深耕多年,我见过不少新能源企业踩了中间商的坑。有的客户花了高于市场价两成的预算,拿到的却是套用通用模板的产线,防静电、防爆设计不到位,投产没多久就频繁故障;还有的后期出问题&#xf…...

Python爬虫中如何正确配置住宅IP代理?新手避坑指南

很多人买完住宅IP,配置半天还是报错、被封。本文手把手教你用Python正确接入住宅代理,附代码和常见问题解决。一、为什么你的代理配置总失败?常见的几种错误:协议用错:服务商给的SOCKS5,你却按HTTP方式配认…...

蜂窝物联网设计的全能选手:NRF9151-LACA-R7开发全攻略

前言在蜂窝物联网技术飞速发展的今天,设备的小型化、低功耗和全球化部署已成为不可逆转的趋势。Nordic Semiconductor推出的nRF9151系统级封装(SiP)解决方案,正是响应这一趋势的旗舰级产品。作为nRF91系列的最新一代成员&#xff…...

Tokenizer与Embedding

Transformers 系列文章目录 第一章 Transformers 简介 第二章 Transformers 模型推理; 第三章 Tokenizer 与 Embedding 文章目录Transformers 系列文章目录前言Tokenizer与Embedding一、Tokenizer(分词器)和Embedding(词嵌入&a…...

书匠策AI:那个让你论文查重从“红色地狱“直接变“绿色天堂“的神器

各位正在跟论文死磕的同学们,先别划走。 今天咱们不聊怎么写开题报告,不聊怎么搭框架,咱们聊一个所有人写完初稿后都会遭遇的终极BOSS——查重。 你有没有经历过这种崩溃:熬夜写了一万字,信心满满提交查重&#xff0…...

微服务架构下的旺店通与畅捷通T+系统集成

旺店通与畅捷通T系统集成方案轻易云数据集成平台为企业提供高效、稳定的系统对接解决方案,实现旺店通企业奇门与畅捷通T系统的无缝数据流转。该方案充分发挥轻易云平台的智能化数据处理能力,确保业务数据在跨系统传输过程中的准确性和时效性。系统简介旺…...

SR全光谱反射式膜厚仪

作者:李志松Pioneer 翟天保Steven 田雨阳 版权声明:著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处注:本文所讲设备由李志松教授团队研发,属于商业产品矩阵内容,商业技术合…...

Unity+C#开发万人MMO服务器的实战架构与同步优化

1. 这不是“写个服务器”那么简单:先撕开“万人在线”的真实含义很多人看到“UnityC#开发万人MMO服务器”这个标题,第一反应是:“哦,用Unity做客户端,C#写个后端,Socket连一连,再加个数据库&…...

Unity+C#开发MMO服务端的务实架构与万人连接实战

1. 先泼一盆冷水:所谓“万人同时在线”的真实含义与常见误解 很多人看到“UnityC#开发万人MMO服务器”这个标题,第一反应是:哇,这得用多牛的分布式架构?是不是要上Kubernetes集群、分库分表、消息中间件全配齐&#xf…...

【Elasticsearch从入门到精通】第10篇:Elasticsearch REST API最佳实践——Content-Type、模糊性与访问控制

上一篇【第09篇】Elasticsearch API规范详解——多索引、日期数学与通用选项 下一篇【第11篇】Elasticsearch索引API详解——索引创建、删除与别名管理(明日更新,敬请期待) 摘要 掌握Elasticsearch REST API的使用规范不仅能避免常见错误&am…...

【Elasticsearch从入门到精通】第08篇:Elasticsearch集群扩展与运维——水平扩展与节点管理

上一篇【第07篇】Elasticsearch集群安全配置——TLS/SSL与密钥库管理 下一篇【第09篇】Elasticsearch API规范详解——多索引、日期数学与通用选项 摘要 Elasticsearch天生为分布式设计,其高扩展性和高可用性是核心优势。但在实际生产中,如何合理规划节…...

Unity游戏运行时自动翻译引擎原理与实战配置

1. 为什么Unity游戏翻译不能只靠“改文本”——XUnity.AutoTranslator不是插件,而是运行时翻译引擎 你有没有试过打开一个Unity游戏的Assets文件夹,用文本编辑器搜索中文字符串,然后手动替换成英文?我试过三次,每次都在…...

Unity本地化工作流:基于ULP的可维护多语言工程实践

1. 这不是“加个插件就完事”的翻译方案,而是Unity项目里真正能落地的本地化工作流 “Unity游戏自动翻译插件”——光看标题,很多人第一反应是:拖进Project窗口、点几下按钮、导出Excel、等AI吐出译文、再一键回填……然后就上线多语言了&…...

终极Windows激活解决方案:5步实现永久免费激活的完整指南

终极Windows激活解决方案:5步实现永久免费激活的完整指南 【免费下载链接】KMS_VL_ALL_AIO Smart Activation Script 项目地址: https://gitcode.com/gh_mirrors/km/KMS_VL_ALL_AIO 还在为Windows系统激活弹窗烦恼吗?是否经历过Office突然变成只读…...

为什么92%的团队误用Gemini做Java审查?资深架构师拆解3个致命配置陷阱及修复命令集

更多请点击: https://codechina.net 第一章:Gemini Java代码审查的真相与误区 Gemini 并非专为 Java 代码审查设计的工具,其底层模型(如 Gemini 1.5 Pro)虽具备强大的自然语言理解与代码生成能力,但缺乏静…...