当前位置: 首页 > article >正文

【Flink快速入门-1.Flink 简介与环境配置】

Flink 简介与环境配置

实验介绍

在学习一门新的技术之前,我们首先要了解它的历史渊源,也就是说它为什么会出现,它能够解决什么业务痛点。所以本节我们的学习目的是了解 Flink 的背景,并运行第一个 Flink 程序,对它有一个初步的印象。

知识点
  • 流处理概述
  • Flink 简介
  • Flink 批处理 WordCount
  • Flink 流处理 WordCount

流处理简介

流处理并不是一个新概念,但是要做好并不是一件容易的事情。提到流处理,我们最先想到的可能是金融交易、信号检测以及地图导航等领域的应用。但是近年来随着信息技术的发展,除了前面提到的三个领域,其它方向对数据时效性的要求也越来越高。随着 Hadoop 生态的崛起,Storm、Spark Streaming、Samza、MillWheel 等一众流处理技术开始走入大众视野,但是我们最熟悉的应该还是 Storm 和 Spark Streaming。

我们知道,“高吞吐”、“低延迟”和”exactly-once“是衡量一个流处理框架的重要指标。 Storm 虽然提供了低延迟的流处理,但是在高吞吐方面的表现并不算佳,可以说基本满足不了日益暴涨的数据量,而且也没办法保证精准一次消费。Spark Streaming 中通过微批次的批处理来模拟流处理,只要将批处理的批次分的足够小,那么从宏观上来看就是流处理,这也是 Spark Streaming 的核心思想。通过微观批处理的方式,Spark Streaming 也实现了高吞吐和 exactly-once 语义,时效性也有了大幅提升,在很长一段时间里占据流处理榜首。但是受限于其实现方式,依然存在几秒的延迟,对于那些实时性要求较高的领域来说依然不够完美。
在这样的背景下,Flink 应运而生,接下来我们正式进入 Flink 的学习。

Flink 简介

Apache Flink 是为分布式、高性能、随时可用以及准确 的流处理应用程序打造的开源流处理框架,用于对无界和有界数据流进行有状态计算。Flink 最早起源于在 2010 ~ 2014 年,由 3 所地处柏林的大学和欧洲的一些其它大学共同进行研究的名为 Stratosphere 的项目。2014 年 4 月 Stratosphere 将其捐赠给 Apache 软件基 金会, 初始成员是 Stratosphere 系统的核心开发人员,2014 年 12 月,Flink 一跃成为 Apache 软件基金会的顶级项目。在 2015 年,阿里也加入到了 Flink 的开发工作中,并贡献了至少 150 万行代码。

Flink 一词在德语中有着“灵巧”、“快速”的意思,它的 logo 原型也是柏林常见的一种松鼠,以身材娇小、灵活著称,为该项目取这样的名字和选定这样的 logo 也正好符合 Flink 的特点和愿景。

在这里插入图片描述

注意,虽然我们说 Flink 是一个流处理框架,但是它同样可以进行批处理。因为在 Flink 的世界观里,批处理是流处理的一种特殊形式,这和 Spark 不同,在 Spark 中,流处理是通过大批量的微批处理实现的。

运行第一个 Flink 程序

接下来我们运行第一个 Flink 程序,感受一下它的魅力,从而对它有一个初步的印象。

搭建开发环境

本课程使用的是本地环境。后续实验不再提示。

首先我们需要在环境中搭建 Flink 运行环境,总共可以分为下面这几步:

  • 安装 jdk 并配置环境变量 【jdk 1.8】
  • 安装 scala 并配置环境变量 【Scala 2.11.12】
  • 安装 maven 并修改中心仓库为阿里云地址
  • 安装 IDEA 开发工具 【IDEA 2022.2.1或更新版】

我们的实验环境已经为大家安装了 jdk、scala、maven 和 IDEA,只需要在 IDEA 里配置使用即可。

双击桌面的 IDEA 程序,启动之后,点击 File -> New -> Project 创建一个新的 Maven 工程 FlinkLearning

【或者New Project->Maven Archetype】
在这里插入图片描述

创建好之后点击左上角 File > Settings 中,将 Maven 的配置文件修改为 D:\programs\apahe-maven-3.6.3\conf\settings.xml,配置之后的 Maven 中心仓库为阿里云,加载依赖会快很多:

在这里插入图片描述

项目中点击File-》Project Structure -> Libraries-》加号-》添加Scala SDK-》选择所需要的scala版本-》ok进行下载
在这里插入图片描述

在工程 src/main 目录中创建 scala 文件夹,然后右键,选择 Mark Directory as,并将其标记为 Sources Root。

在这里插入图片描述

在 scala 目录里创建 com.vlab.wc 包,并分别创建 BatchWordCountStreamWordCount 两个 Scala Object,分别代表 Flink 批处理和 Flink 流处理。
在这里插入图片描述

至此,我们的准备工作已经完成,接下来正式进入编码阶段。

Flink 批处理 WordCount

修改 pom.xml 文件中的代码为如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>pblh123.lh</groupId><artifactId>FlinkLearning</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><!--    配置国内 Maven 依赖库的镜像源镜--><repositories><repository><id>aliyun</id><url>https://maven.aliyun.com/repository/public</url></repository></repositories>
<!--配置插件的镜像源--><pluginRepositories><pluginRepository><id>aliyun</id><url>https://maven.aliyun.com/repository/public</url></pluginRepository></pluginRepositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.17.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.17.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.2</version></dependency></dependencies><build><plugins><!-- 该插件用于将 Scala 代码编译成 class 文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.6.0</version><configuration><archive><manifest><mainClass></mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

修改完成后注意点击加载图标重新 Load。
在这里插入图片描述

BatchWordCount.scala 中的代码如下:

package com.vlab.wcimport org.apache.flink.api.scala._/*** @projectName FlinkLearning* @package com.vlab.wc* @className com.vlab.wc.BatchWordCount* @description Flink Batch Word Count Example* @author pblh123* @date 2025/2/7 14:41* @version 1.17.2*/
object BatchWordCount {def main(args: Array[String]): Unit = {// 判断输入参数数量是否正确if (args.length != 1) {System.err.println("Usage: BatchWordCount <input path>")System.exit(5)}// 获取输入路径val inputPath = args(0)// 创建执行环境val env = ExecutionEnvironment.getExecutionEnvironment// 读取输入数据val inputDS: DataSet[String] = env.readTextFile(inputPath)// 计算词频val wordCountDS: DataSet[(String, Int)] = inputDS.flatMap(_.split("\\s+")) // 扁平化,并且处理多个空格作为分隔符.map((_, 1)) // 转换为 (word, 1).groupBy(0) // 按第一个字段(word)进行分组.sum(1) // 对第二个字段(计数)求和// 打印输出wordCountDS.print()}
}

datas/words.txt 路径下创建 words.txt 文件并在其中加入如下内容(注意每个单词之间使用空格分隔):

在这里插入图片描述

hello world
hello flink
hello spark
hello java

右键选中 BatchWordCount.scala,点击 Run 运行,将会看到如下输出:

如果出现一些其他的报错和警告可以忽略。

(java,1)
(world,1)
(flink,1)
(hello,4)
(spark,1)
Flink 流处理 WordCount

StreamWordCount.scala 中加入如下代码:

package com.vlab.wcimport org.apache.flink.streaming.api.scala._/*** @projectName FlinkLearning  * @package com.vlab.wc  * @className com.vlab.wc.StreamWordCount  * @description ${description}  * @author pblh123* @date 2025/2/7 14:41* @version 1.0**/object StreamWordCount {def main(args: Array[String]): Unit = {// 创建执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//  监控Socket数据val textDstream: DataStream[String] = env.socketTextStream("localhost", 9999)// 导入隐式转换import org.apache.flink.api.scala._// 计算逻辑val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split(" ")).filter(_.nonEmpty).map((_, 1)).keyBy(0).sum(1)// 设置并行度dataStream.print().setParallelism(1)// 执行env.execute("Socket stream word count")}
}

打开终端,并输入 nc -l -p 9999,然后输入以下内容:

hello world
hello flink
hello spark
hello java

运行 StreamWordCount.scala 将会看到如下输出:

(hello,1)
(flink,1)
(hello,2)
(spark,1)
(java,1)
(hello,3)
...

至此,我们的第一个 Flink 实验就已经完成了。

实验总结

本节实验中,我们介绍了 Flink 出现的背景,并和 Storm、Spark Streaming 做了简单对比,然后在实验环境下安装了 idea 开发工具并运行了第一个 Flink 程序。至此,相信大家已经对 Flink 已经有了一个初步的认识。

相关文章:

【Flink快速入门-1.Flink 简介与环境配置】

Flink 简介与环境配置 实验介绍 在学习一门新的技术之前&#xff0c;我们首先要了解它的历史渊源&#xff0c;也就是说它为什么会出现&#xff0c;它能够解决什么业务痛点。所以本节我们的学习目的是了解 Flink 的背景&#xff0c;并运行第一个 Flink 程序&#xff0c;对它有…...

WPF基础 | 初探 WPF:理解其核心架构与开发环境搭建

WPF基础 | 初探 WPF&#xff1a;理解其核心架构与开发环境搭建 一、前言二、WPF 核心架构2.1 核心组件2.2 布局系统2.3 数据绑定机制2.4 事件处理机制 三、WPF 开发环境搭建3.1 安装 Visual Studio3.2 创建第一个 WPF 应用程序 结束语优质源码分享 WPF基础 | 初探 WPF&#xff…...

深入学习MySQL 支持的事务隔离级别

MySQL 支持四种事务隔离级别&#xff0c;分别是读未提交&#xff08;Read Uncommitted&#xff09;、读已提交&#xff08;Read Committed&#xff09;、可重复读&#xff08;Repeatable Read&#xff09;和可串行化&#xff08;Serializable&#xff09;。不同的隔离级别对数据…...

JVM 四虚拟机栈

虚拟机栈出现的背景 由于跨平台性的设计&#xff0c;Java的指令都是根据栈来设计的。不同平台CPU架构不同&#xff0c;所以不能设计为基于寄存器的。优点是跨平台&#xff0c;指令集小&#xff0c;编译器容易实现&#xff0c;缺点是性能下降&#xff0c;实现同样的功能需要更多…...

深入理解小波变换:信号处理的强大工具

引言 在科学与工程领域&#xff0c;信号处理一直是关键环节&#xff0c;傅里叶变换与小波变换作为重要的分析工具&#xff0c;在其中发挥着重要作用。本文将深入探讨小波变换&#xff0c;阐述其原理、优势以及与傅里叶变换的对比&#xff0c;并通过具体案例展示其应用价值。 一…...

【大数据技术】搭建完全分布式高可用大数据集群(Kafka)

搭建完全分布式高可用大数据集群(Kafka) kafka_2.13-3.9.0.tgz注:请在阅读本篇文章前,将以上资源下载下来。 写在前面 本文主要介绍搭建完全分布式高可用集群 Kafka 的详细步骤。 注意: 统一约定将软件安装包存放于虚拟机的/software目录下,软件安装至/opt目录下。 安…...

使用git clone一个指定文件或者目录

1 创建一个空文件夹 mkdir -p test2 进入创建的文件夹 cd /test3 执行git init初始化git 在新建的文件夹下执行初始化动作是因为要保证本地是在git的环境下&#xff0c;为关联远程仓库提供前提。 git init4 与远程仓库进行关联 以我自己的gitcode仓库为例 git remote add…...

解决react中函数式组件usestate异步更新

问题&#xff1a;在点击modal组件确认后 调用后端接口&#xff0c;使用setstateone&#xff08;false&#xff09;使modal组件关闭&#xff0c;但是设置后关闭不了&#xff0c;在设置setstateone&#xff08;false&#xff09;前后打印出了对应的stateone都为true&#xff0c;但…...

关于ESP-IDF 5.4 中添加第三方组件esp32-camera找不到文件,编译错误解决办法(花了一天时间解决)

最近需要使用ESP32-S3-CAM 的OV2640摄像头采集图像&#xff0c;为了加速开发进度&#xff0c;于是选择了esp32-camera组件&#xff0c;该组件不是官方组件&#xff0c;需要自己git clone。但在为项目添加esp32-camera组件时&#xff0c;一直编译错误&#xff0c;找不到头文件&a…...

Android LifecycleOwner 闪退,java 继承、多态特性!

1. 闪退 同意隐私政策后&#xff0c;启动进入游戏 Activity 闪退 getLifecycle NullPointerException 空指针异常 FATAL EXCEPTION: main Process: com.primer.aa.gg, PID: 15722 java.lang.RuntimeException: Unable to instantiate activity ComponentInfo{com.primer.aa.…...

feign Api接口中注解问题:not annotated with HTTP method type (ex. GET, POST)

Bug Description 在调用Feign api时&#xff0c;出现如下异常&#xff1a; java.lang.IllegalStateException: Method PayFeignSentinelApi#getPayByOrderNo(String) not annotated with HTTPReproduciton Steps 1.启动nacos-pay-provider服务&#xff0c;并启动nacos-pay-c…...

Swipe横滑与SwipeItem自定义横滑相互影响

背景 vue项目&#xff0c;H5页面&#xff0c;使用vant的组件库轮播组件<Swipe>&#xff0c;UI交互要求&#xff0c;在每个SwipeItem中有内容&#xff0c;可自横滑&#xff0c;查看列表内容 核心代码 <template><Swipeclass"my_swipe":autoplay&quo…...

[LeetCode]day16 242.有效的字母异位词

242. 有效的字母异位词 - 力扣&#xff08;LeetCode&#xff09; 题目描述 给定两个字符串 s 和 t &#xff0c;编写一个函数来判断 t 是否是 s 的 字母异位词 示例 1: 输入: s "anagram", t "nagaram" 输出: true示例 2: 输入: s "rat"…...

基于SpringBoot养老院平台系统功能实现五

一、前言介绍&#xff1a; 1.1 项目摘要 随着全球人口老龄化的不断加剧&#xff0c;养老服务需求日益增长。特别是在中国&#xff0c;随着经济的快速发展和人民生活水平的提高&#xff0c;老年人口数量不断增加&#xff0c;对养老服务的质量和效率提出了更高的要求。传统的养…...

基于HTML5 Canvas 和 JavaScript 实现的烟花动画效果

以下是一个使用 HTML5 Canvas 和 JavaScript 实现的烟花动画效果代码盒子: <!DOCTYPE html> <html> <head><title>烟花效果...

【3分钟极速部署】在本地快速部署deepseek

第一步&#xff0c;找到网站&#xff0c;下载&#xff1a; 首先找到Ollama &#xff0c; 根据自己的电脑下载对应的版本 。 我个人用的是Windows 我就先尝试用Windows版本了 &#xff0c;文件不是很大&#xff0c;下载也比较的快 第二部就是安装了 &#xff1a; 安装完成后提示…...

Linux ftrace 内核跟踪入门

文章目录 ftrace介绍开启ftraceftrace使用ftrace跟踪指定内核函数ftrace跟踪指定pid ftrace原理ftrace与stracetrace-cmd 工具KernelShark参考 ftrace介绍 Ftrace is an internal tracer designed to help out developers and designers of systems to find what is going on i…...

深入理解 Rust 模块中的路径与公开性:绝对路径、相对路径和 `pub` 的应用

1. 路径的两种形式&#xff1a;绝对路径与相对路径 在 Rust 中&#xff0c;路径类似于文件系统中的目录路径&#xff0c;用来告诉编译器去哪里查找某个项。路径主要有两种形式&#xff1a; 绝对路径 绝对路径从 crate 的根开始。对于当前 crate 的代码&#xff0c;绝对路径以关…...

基于钉钉API的连接器实现:企业数据集成与自动化管理

文章目录 概要背景与需求钉钉API概述连接器实现小结 概要 在当今数字化时代&#xff0c;企业面临着海量数据的管理与整合挑战。钉钉作为国内广泛使用的办公协作平台&#xff0c;提供了丰富的API接口&#xff0c;支持企业进行数据集成与自动化管理。本文将介绍如何通过钉钉API实…...

[Day 16]螺旋遍历二维数组

今天我们看一下力扣上的这个题目&#xff1a;146.螺旋遍历二维数组 题目描述&#xff1a; 给定一个二维数组 array&#xff0c;请返回「螺旋遍历」该数组的结果。 螺旋遍历&#xff1a;从左上角开始&#xff0c;按照 向右、向下、向左、向上 的顺序 依次 提取元素&#xff0c…...

【教程】docker升级镜像

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 目录 自动升级 手动升级 无论哪种方式&#xff0c;最重要的是一定要通过-v参数做数据的持久化&#xff01; 自动升级 使用watchtower&#xff0c;可…...

使用jmeter进行压力测试

使用jmeter进行压力测试 jmeter安装 官网安装包下载&#xff0c;选择二进制文件&#xff0c;解压。 tar -xzvf apache-jmeter-x.tgz依赖jdk安装。 yum install java-1.8.0-openjdk环境变量配置&#xff0c;修改/etc/profile文件&#xff0c;添加以下内容。 export JMETER/…...

链表和 list

一、单链表的模拟实现 1.实现方式 链表的实现方式分为动态实现和静态实现两种。 动态实现是通过 new 申请结点&#xff0c;然后通过 delete 释放结点的形式构造链表。这种实现方式最能体 现链表的特性&#xff1b; 静态实现是利用两个数组配合来模拟链表。一个表示数据域&am…...

【AI大模型】Ubuntu18.04安装deepseek-r1模型+服务器部署+内网访问

以下内容主要参考博文&#xff1a;DeepSeek火爆全网&#xff0c;官网宕机&#xff1f;本地部署一个随便玩「LLM探索」 - 程序设计实验室 - 博客园 安装 ollama Download Ollama on Linux curl -fsSL https://ollama.com/install.sh | sh 配置 ollama 监听地址 ollama 安装后…...

cmd执行mysql命令

安装mysql之后如果想使用cmd执行mysql命令&#xff0c;需要怎么操作呢&#xff0c;下面一起看一下。 安装mysql之后&#xff0c;如果直接去cmd窗口执行MySQL命令&#xff0c;窗口可能会提示mysql不是可执行命令。 需要配置系统的环境变量&#xff0c;将mysql的安装路径配置系…...

网络安全威胁框架与入侵分析模型概述

引言 “网络安全攻防的本质是人与人之间的对抗&#xff0c;每一次入侵背后都有一个实体&#xff08;个人或组织&#xff09;”。这一经典观点概括了网络攻防的深层本质。无论是APT&#xff08;高级持续性威胁&#xff09;攻击、零日漏洞利用&#xff0c;还是简单的钓鱼攻击&am…...

【算法】动态规划专题⑦ —— 多重背包问题 + 二进制分解优化 python

目录 前置知识进入正题优化方法&#xff1a;二进制分解实战演练 前置知识 【算法】动态规划专题⑤ —— 0-1背包问题 滚动数组优化 python 【算法】动态规划专题⑥ —— 完全背包问题 python 进入正题 多重背包问题I https://www.acwing.com/problem/content/4/ 题目描述 有…...

详细教程 | 如何使用DolphinScheduler调度Flink实时任务

Apache DolphinScheduler 非常适用于实时数据处理场景&#xff0c;尤其是与 Apache Flink 的集成。DolphinScheduler 提供了丰富的功能&#xff0c;包括任务依赖管理、动态调度、实时监控和日志管理&#xff0c;能够有效简化 Flink 实时任务的管理和部署。通过 DolphinSchedule…...

PHP之hyperf学习笔记

Hyperf Model,Dao&#xff0c;Service&#xff0c;Contronller 路由 使用文件来配置路由&#xff0c;就是和laravel一样的 Router::addGroup(["middleware" > ["web", "auth"],"namespace" > "Hyperf\HttpServer\Contr…...

react的antd表格数据回显在form表单中

1、首先为table添加编辑按钮 {title: 操作,align: center,render: (_: any, record: any) > (<div style{{ display: flex, alignItems: center, justifyContent: space-evenly }}><Buttonsize"small"onClick{() > deitor(record)} style{{ margin…...