当前位置: 首页 > news >正文

【基础篇】四、本地部署Flink

文章目录

  • 1、本地独立部署会话模式的Flink
  • 2、本地独立部署会话模式的Flink集群
  • 3、向Flink集群提交作业
  • 4、Standalone方式部署单作业模式
  • 5、Standalone方式部署应用模式的Flink

Flink的常见三种部署方式:

  • 独立部署(Standalone部署)
  • 基于K8S部署
  • 基于Yarn部署

1、本地独立部署会话模式的Flink

独立部署就是独立运行,即Flink自己管理Flink资源,不依靠任何外部的资源管理平台,比如K8S或者Hadoop的Yarn,当然,独立部署的代价就是:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理,生产环境或者作业量大的场景下不建议采用独立部署。

  • 下载安装包
# 下载地址:
https://archive.apache.org/dist/flink/flink-1.17.0/
flink-1.17.0-bin-scala_2.12.tgz
  • 上传安装包到Linux节点服务器上的某目录:
/opt/moudle/flink-1.17.0-bin-scala_2.12.tgz
  • 解压
tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/
  • 启动,进入安装目录执行start-cluster.sh
[code9527@node01 flink-1.17.0]  bin/start-cluster.sh

在这里插入图片描述

  • 访问WebUI,对Flink集群进行监控管理
http://IP:8081

在这里插入图片描述

独立安装会话模式的Flink成功,控制台中,可以看到,TaskManager的数量为1(本来就一台机器,一个节点),由于默认每个TaskManager的Slot数量为1,所以总Slot数和可用Slot数都为1。最后,可停止集群:

[code9527@node01 flink-1.17.0]  /bin/stop-cluster.sh

可能遇到的坑:

坑1:

start-cluster.sh执行报错:Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME.

原因:未安装Java环境

yum install -y java-1.8.0-openjdk.x86_64

坑2:

http://IP:8081访问不通

处理下防火墙:

firewall-cmd --add-port 8081/tcp --permanentfirewall-cmd --reload

2、本地独立部署会话模式的Flink集群

上面部署的单机Flink,当你有多台服务器,要部署一个集群时,大体流程和上面一样。假设有三台服务器,角色分配规划如下:

在这里插入图片描述

节点服务器node-01node-02node-03
角色JobManager+TaskManagerTaskManagerTaskManager

主节点上的操作:

  • 上传安装包到Linux节点服务器上的某目录:
/opt/moudle/flink-1.17.0-bin-scala_2.12.tgz
  • 解压
tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/
  • 进入解压目录/conf目录,修改flink-conf.yaml文件
vi flink-conf.yaml# 修改内容如下:
# JobManager节点地址,我写了IP,这里IP或者hostname都行
jobmanager.rpc.address: node-01  
jobmanager.bind-host: 0.0.0.0
rest.address: node-01
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0    
taskmanager.host: node-01
  • 其他可选配置:在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置
- jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。- taskmanager.memory.process.size:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。- taskmanager.numberOfTaskSlots:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。- parallelism.default:Flink任务执行的并行度,默认为1。优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
  • 修改workers文件,指定干活儿的节点TaskManager的信息,这里是node01和另外两台主机02、03
[code9527@node01 conf] vi workers# 修改如下内容:
node-01
node-02
node-03# 用IP也行,这就是上面单机我也用用IP,而不用默认localhost的原因,多节点下看着乱得很
  • 修改masters文件
vi masters# 修改内容,hostname也行
node-01:8081

至于两个Task的从节点,直接把上面改好的Flink安装目录拷贝或分发给另外两个节点服务器:

# node02、node03上建好/opt/moudle/flink-1.17.0/目录后,01节点执行
scp /opt/moudle/flink-1.17.0 root@node02:/opt/moudle/flink-1.17.0/
scp /opt/moudle/flink-1.17.0 root@node03:/opt/moudle/flink-1.17.0/

再修改node02的taskmanager.host:

# conf目录下
vim flink-conf.yaml# 改为:
taskmanager.host: node02  # IP或hostname

再修改node03的taskmanager.host:

# conf目录下
vim flink-conf.yaml# 改为:
taskmanager.host: node03  # IP或hostname

回node01启动,执行start-cluster.sh

[code9527@node01 flink-1.17.0]  bin/start-cluster.sh

此时,在控制台,应该可以看到当前集群的TaskManager数量为3,总Slot数和可用Slot数都为3

3、向Flink集群提交作业

上一篇,写了读取socket发送的单词并统计单词个数的程序,这里演示将它提交到集群中年去执行,首先将程序打包,在pom.xml中添加打包插件:

<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>

打包,指令或者IDEA页面上操作:

mvn clean
mvn package

打包完成后,在target目录下即可找到所需JAR包,JAR包会有两个,一个原始包,一个带依赖的包(类似SpringBoot打包插件),因为集群中已经具备任务运行所需的所有依赖,所以建议使用原始包original-xxx.jar。下面打开Flink的控制台,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的JAR包:

在这里插入图片描述

点击该JAR包,出现任务配置页面,进行相应配置:

在这里插入图片描述
点Submit提交作业(点Submit没反应参考【这篇】),导航栏的Running Jobs可查看程序运行列表情况

在这里插入图片描述

在Flink程序里写的Linux主机里开启端口监听,并在socket端口中输入一些字符串:

在这里插入图片描述

先点击Task Manager侧边栏,再切StdOut的tab页,点刷新,可以看到运行成功:

在这里插入图片描述

先取消任务,接下来用命令行提交任务:

在这里插入图片描述
使用命令行提交,会话模式下还是先启动集群:

bin/start-cluster.sh

进入flink安装目录/opt/module/flink-1.17.0,把前面的jar包上传到该目录下,执行flink run指令提交作业

bin/flink run -m 10.4.95.27:8081 -c com.plat.count.SocketStreamWordCount ./FlinkService-1.0-SNAPSHOT.jar#  -m指定了提交到的JobManager
# -c指定了入口类

提交成功:

在这里插入图片描述
此时web控制台还是可以看到同样的效果,且/opt/module/flink-1.17.0/log路径中,也可以查看TaskManager的输出:

[root@node-105-69 log] cat flink-atguigu-standalonesession-0-node-105-69.out(hello,1)
(hello,2)
(flink,1)
(hello,3)
(scala,1)

4、Standalone方式部署单作业模式

部署不了单作业模式,前面说了,Standalone方式下,Flink并不支持单作业模式部署。因为单作业模式需要借助一些资源管理平台,比如K8S

5、Standalone方式部署应用模式的Flink

应用模式下不会提前创建集群,所以不能调用start-cluster.sh脚本。需要使用同样在bin目录下的standalone-job.sh来创建一个JobManager

# 先停掉会话模式
[root@node-105-69 flink-1.17.0] bin/stop-cluster.sh
# 继续开启对应的Linux主机的netcat
nc -lk 9527

将上面的安装包放到flink的lib目录下

[root@node-105-69 flink-1.17.0] mv FlinkService-1.0-SNAPSHOT.jar lib/

启动JobManager,这里我们直接指定作业入口类,脚本会到lib目录扫描所有的jar包

[root@node-105-69 flink-1.17.0] bin/standalone-job.sh start --job-classname com.plat.count.SocketStreamWordCount

启动TaskManager:(独立部署,这个时候干活的Task是手动起的)

[root@node-105-69 flink-1.17.0] bin/taskmanager.sh start

发送数据到9527端口:

在这里插入图片描述

查看控制台:

在这里插入图片描述

停掉集群:

bin/taskmanager.sh stop
bin/standalone-job.sh stop

相关文章:

【基础篇】四、本地部署Flink

文章目录 1、本地独立部署会话模式的Flink2、本地独立部署会话模式的Flink集群3、向Flink集群提交作业4、Standalone方式部署单作业模式5、Standalone方式部署应用模式的Flink Flink的常见三种部署方式&#xff1a; 独立部署&#xff08;Standalone部署&#xff09;基于K8S部署…...

简述什么是迭代器(Iterator)?

迭代器(Iterator)是一种设计模式,Java 中的迭代器是集合框架中的一个接口,它可以让程序员遍历集合中的元素而无需暴露集合的内部结构。使用迭代器可以遍历任何类型的集合,例如 List、Set 和 Map 等。 通过调用集合类的 iterator() 方法可以获取一个迭代器,并使用 hasNext…...

DarkGate恶意软件通过消息服务传播

导语 近日&#xff0c;一种名为DarkGate的恶意软件通过消息服务平台如Skype和Microsoft Teams进行传播。它冒充PDF文件&#xff0c;利用用户的好奇心诱使其打开&#xff0c;进而下载并执行恶意代码。这种攻击手段使用了Visual Basic for Applications&#xff08;VBA&#xff0…...

LeetCode——动态规划篇(六)

刷题顺序及思路来源于代码随想录&#xff0c;网站地址&#xff1a;https://programmercarl.com 目录 300. 最长递增子序列 - 力扣&#xff08;LeetCode&#xff09; 674. 最长连续递增序列 - 力扣&#xff08;LeetCode&#xff09; 718. 最长重复子数组 - 力扣&#xff08…...

sql 注入(2), 文件读写 木马植入 远程控制

sql 注入 文件读写 木马植入 远程控制 一, 检测读写权限 查看mysql全局变量 SHOW GLOBAL VARIABLES LIKE %secure%secure_file_priv 空, 则任意读写secure_file_priv 路径, 则只能读写该路径下的文件secure_file_priv NULL, 则禁止读写二, 读取文件, 使用 load_file() 函数…...

求直角三角形第三点的坐标

文章目录 求直角三角形第三点的坐标1. 原理2. 数学公式3. 推导过程 求直角三角形第三点的坐标 1. 原理 已知内容有&#xff1a; P1、P2 两点的坐标&#xff1b; dis1 为 P1与P2两点之间的距离&#xff1b; dis2 为 P2与P3两点之间的距离&#xff1b; 求解&#xff1a; …...

【Kotlin精简】第3章 类与接口

1 简介 Kotlin类的声明和Java没有什么区别&#xff0c;Kotlin中&#xff0c;类的声明也使用class关键字&#xff0c;如果只是声明一个空类&#xff0c;Kotlin和Java没有任何区别&#xff0c;不过定义类的其他成员会有一些区别。实例化类不用写new&#xff0c;类被继承或者重写…...

关于面试以及小白入职后的一些建议

面试的本质 面试的过程是一个互相选择的过程&#xff1b;面试官的诉求是&#xff0c;了解应聘者的个人基本信息、工作态度、专业能力及其他综合能力是否与公司招聘岗位匹配&#xff1b;面试者的诉求是&#xff0c;拿下招聘岗位offer&#xff0c;获得工作报酬&#xff1b; 面试…...

Excel 从网站获取表格

文章目录 导入网站数据导入股票实时行情 用 Excel 获取网站数据的缺点&#xff1a;只能获取表格类的数据&#xff0c;不能获取非结构化的数据。 导入网站数据 转到地址之后&#xff1a; 实测该功能经常导致 Excel 卡死。 导入股票实时行情...

rsync 备份工具(附rsync+inotify 实时同步部署实例)

rsync 备份工具(附rsyncinotify 实时同步部署实例&#xff09; 1、rsync概述1.1关于rsync1.2rsync 的特点1.3工作原理 2、rsync相关命令2.1基本格式和常用选项2.2启动和关闭rsync服务2.3下行同步基本格式2.4上行同步基本格式2.5免交互2.5.1指定密码文件2.5.2rsync-daemon方式2.…...

Java架构师缓存性能优化

目录 1 缓存的负载策略2 缓存的序列化问题3 缓存命中率低4 缓存对数据库高并发访问5 缓存数据刷新的策略5.1. 实时策略5.2. 异步策略5.3. 定时策略6 何时写缓存7 批量数据来更新缓存8 缓存数据过期的策略9 缓存数据如何恢复10 缓存数据如何迁移11 缓存冷启动和缓存预热想学习架…...

探索服务器潜能:创意项目、在线社区与其他应用

目录 一、部署自己的创意项目 优势&#xff1a; 劣势&#xff1a; 结论&#xff1a; 二、打造一款全新的在线社区 优势&#xff1a; 劣势&#xff1a; 结论&#xff1a; 三、其他用途 总结&#xff1a; 随着互联网的发展&#xff0c;越来越多的人开始拥有自己的服务器…...

「网络编程」网络层协议_ IP协议学习_及深入理解

「前言」文章内容是网络层的IP协议讲解。 「归属专栏」网络编程 「主页链接」个人主页 「笔者」枫叶先生(fy) 目录 一、IP协议简介二、IP协议报头三、IP网段划分&#xff08;子网划分&#xff09;四、特殊的IP地址五、IP地址的数量限制六、私有IP地址和公网IP地址七、路由八、分…...

Go 1.21 新内置函数:min、max 和 clear

max 函数 func max[T cmp.Ordered](x T, y …T) T 这是一个泛型函数&#xff0c;用于从一组值中寻找并返回 最大值&#xff0c;该函数至少要传递一个参数。在上述函数签名中&#xff0c;T 表示类型参数&#xff0c;它必须满足 cmp.Ordered 接口中定义的数据类型要求&#xff0…...

家居行业如何打破获客困局?2023重庆建博会现场,智哪儿AI营销第一课给出了答案

10月12日-14日&#xff0c;2023中国&#xff08;重庆&#xff09;建筑及装饰材料博览会&#xff08;简称&#xff1a;2023中国重庆建博会&#xff09;正在重庆国际博览中心如火如荼地进行。「智哪儿」携手2023中国重庆建博会主办方共同主办的《2023家居行业AI营销第一课&#x…...

Spring framework Day11:策略模式中注入所有实现类

前言 什么是策略模式&#xff1f; 策略模式&#xff08;Strategy Pattern&#xff09;是一种面向对象设计模式&#xff0c;它定义了算法族&#xff08;一组相似的算法&#xff09;&#xff0c;并且将每个算法都封装起来&#xff0c;使得它们可以互相替换。策略模式让算法的变…...

MBBF展示的奇迹绿洲:5G的过去、此刻与未来

如果你来迪拜&#xff0c;一定不会错过全世界面积最大的人工岛项目&#xff0c;这是被称为世界第八大奇迹的棕榈岛。多年以来&#xff0c;这座岛从一片砂石、一棵棕榈树开始&#xff0c;逐步建成了整个波斯湾地区的地标&#xff0c;吸引着全世界游人的脚步。 纵观整个移动通信发…...

加持智慧医疗,美格智能5G数传+智能模组让就医触手可及

智慧医疗将云计算、物联网、大数据、AI等新兴技术融合赋能医疗健康领域&#xff0c;是提高医疗健康服务的资源利用效率&#xff0c;创造高质量健康医疗的新途径。《健康中国2030规划纲要》把医疗健康提升到了国家战略层面&#xff0c;之后《“十四五”全面医疗保障规划》等一系…...

Stm32_标准库_14_串口蓝牙模块_手机与蓝牙模块通信_实现模块读取并修改信息

由手机向蓝牙模块传输时间信息&#xff0c;Stm32获取信息并将已存在信息修改为传入信息 测试代码&#xff1a; #include "stm32f10x.h" // Device header #include "Delay.h" #include "OLED.h" #include "Serial.h"uint16_t num…...

UDP 的报文结构

UDP的报文结构&#xff1a; 其中前面的源端口号和目的端口号&#xff0c;UDP长度和UDP检验和&#xff0c;它们都是2个字节。 那么什么是UDP长度呢&#xff0c;它指的是后面的数据的长度&#xff0c;换算单位也就是64kb&#xff0c;因此一个数据报&#xff08;数据&#xff09;最…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建

制造业采购供应链管理是企业运营的核心环节&#xff0c;供应链协同管理在供应链上下游企业之间建立紧密的合作关系&#xff0c;通过信息共享、资源整合、业务协同等方式&#xff0c;实现供应链的全面管理和优化&#xff0c;提高供应链的效率和透明度&#xff0c;降低供应链的成…...

线程同步:确保多线程程序的安全与高效!

全文目录&#xff1a; 开篇语前序前言第一部分&#xff1a;线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分&#xff1a;synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分&#xff…...

unix/linux,sudo,其发展历程详细时间线、由来、历史背景

sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章&#xff1f;AI自动生成&#xff0c;效率提升10倍&#xff01; 支持多语言、自动配图、定时发布&#xff0c;让内容创作更轻松&#xff01; AI内容生成 → 不想每天写文章&#xff1f;AI一键生成高质量内容&#xff01;多语言支持 → 跨境电商必备&am…...

【JVM面试篇】高频八股汇总——类加载和类加载器

目录 1. 讲一下类加载过程&#xff1f; 2. Java创建对象的过程&#xff1f; 3. 对象的生命周期&#xff1f; 4. 类加载器有哪些&#xff1f; 5. 双亲委派模型的作用&#xff08;好处&#xff09;&#xff1f; 6. 讲一下类的加载和双亲委派原则&#xff1f; 7. 双亲委派模…...

LLMs 系列实操科普(1)

写在前面&#xff1a; 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容&#xff0c;原视频时长 ~130 分钟&#xff0c;以实操演示主流的一些 LLMs 的使用&#xff0c;由于涉及到实操&#xff0c;实际上并不适合以文字整理&#xff0c;但还是决定尽量整理一份笔…...

【C++】纯虚函数类外可以写实现吗?

1. 答案 先说答案&#xff0c;可以。 2.代码测试 .h头文件 #include <iostream> #include <string>// 抽象基类 class AbstractBase { public:AbstractBase() default;virtual ~AbstractBase() default; // 默认析构函数public:virtual int PureVirtualFunct…...

Modbus RTU与Modbus TCP详解指南

目录 1. Modbus协议基础 1.1 什么是Modbus? 1.2 Modbus协议历史 1.3 Modbus协议族 1.4 Modbus通信模型 🎭 主从架构 🔄 请求响应模式 2. Modbus RTU详解 2.1 RTU是什么? 2.2 RTU物理层 🔌 连接方式 ⚡ 通信参数 2.3 RTU数据帧格式 📦 帧结构详解 🔍…...

图解JavaScript原型:原型链及其分析 | JavaScript图解

​​ 忽略该图的细节&#xff08;如内存地址值没有用二进制&#xff09; 以下是对该图进一步的理解和总结 1. JS 对象概念的辨析 对象是什么&#xff1a;保存在堆中一块区域&#xff0c;同时在栈中有一块区域保存其在堆中的地址&#xff08;也就是我们通常说的该变量指向谁&…...