flink的安装与使用(ubuntu)
组件版本
虚拟机:ubuntu-20.04.6-live-server-amd64.iso
flink:flink-1.18.0-bin-scala_2.12.tgz
jdk:jdk-8u291-linux-x64.tar
flink 下载
1、官网:https://flink.apache.org/downloads/
2、清华镜像:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/
flink 安装
1、上传文件至服务器指定路径
/usr/local/myapp/flink
2、解压文件
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz -C /usr/local/myapp/flink
jdk 安装
1、ubuntu 中自带了 jdk,先将其卸载
sudo apt-get remove *openjdk*
sudo apt-get autoremove
2、上传文件至服务器指定路径
/usr/local/myapp/jdk
3、解压文件
tar -zxvf jdk-8u291-linux-x64.tar -C /usr/local/myapp/jdk
4、配置环境变量
vim /etc/profile
在文末增加配置(路径根据自身情况进行调整)
export JAVA_HOME=/usr/local/myapp/jdk/jdk1.8.0_291
export JRE_HOME=/usr/local/myapp/jdk/jdk1.8.0_291/jre
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JRE_HOME/lib
5、测试 jdk
root@vm1:/usr/local/myapp/jdk# java -version
java version "1.8.0_291"
Java(TM) SE Runtime Environment (build 1.8.0_291-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.291-b10, mixed mode)
root@vm1:/usr/local/myapp/jdk# javac -version
javac 1.8.0_291
测试 flink
1、进入到 flink 的安装路径下
cd /usr/local/myapp/flink/flink-1.18.0/
2、修改配置文件
vim conf/flink-conf.yaml
内容
jobmanager.bind-host: 0.0.0.0
3、关闭/禁用防火墙
systemctl stop ufw.service
systemctl disable ufw.service
4、启动 flink
./bin/start-cluster.sh
5、浏览器访问:http://ip:8081/
能看到内容说明正常
设置 flink 的 Standalone 模式集群并上传任务执行
1、机器规划
| 类型 | 主机名 | IP |
|---|---|---|
| JobManager | vm1 | 192.168.141.120 |
| TaskManager | vm2 | 192.168.141.121 |
| TaskManager | vm3 | 192.168.141.122 |
2、设置每个服务器的机器名
vim /etc/hostname
3、设置每个服务器的 hosts 文件
vim /etc/hosts
增加三台服务器的机器名对照
192.168.141.120 vm1
192.168.141.121 vm2
192.168.141.122 vm3
使其立即生效(建议到这一步后,都重新启动下)
source /etc/hosts
4、设置服务器间的免密登录
4.1、自身免密
vm1 执行(vm2/vm3 同理)
ssh-keygen -t rsa
之后的内容全部回车即可
生成后,可在 /root/.ssh/ 中看到 id_rsa.pub 文件
通过命令设置到认证文件中
cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys
重启服务器,通过命令测试是否可以免密登录自身
ssh vm1
通过 exit 命令可以退出当前的 ssh 登录
4.2、设置相互免密(以 vm1 为演示,其余服务器同理)
在 vm1 服务器中,将生成的自身密钥传输到其余两台服务器上
scp /root/.ssh/id_rsa.pub root@vm2:/root
scp /root/.ssh/id_rsa.pub root@vm3:/root
在 vm2/vm3 服务器中,将传输过来的密钥,通过命令设置到认证文件中
cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys
vm1 设置完成,通过命令来测试能不能直接登录到 vm2/vm3 中
ssh vm2
ssh vm3
vm2/vm3 同理,都需要执行这些步骤:
A、生成自身密钥,添加到自身的认证文件中
B、将自身密钥传输到其余的服务器中,并在该服务器中通过命令设置自身密钥到其余服务器的认证文件中
注意:vm2 和 vm3 执行时,一个服务器完全执行结束/测试后,再进行下一个,不然会有密钥文件存在被覆盖的风险
5、设置主机时间同步
安装工具
apt-get install -y ntpdate
执行同步
ntpdate -u ntp.sjtu.edu.cn
6、配置 flink
以下以 vm1 为例,其他服务器的配置可将配置好的配置文件同步过去
6.1、masters 文件
vim masters
内容
vm1:8081
6.2、workers 文件
vim workers
内容
vm2
vm3
6.3、flink-conf.yaml 文件
vim flink-conf.yaml
内容(篇幅问题,去掉了注释)
env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMEDjobmanager.rpc.address: vm1jobmanager.rpc.port: 6123jobmanager.bind-host: 0.0.0.0jobmanager.memory.process.size: 1600mtaskmanager.bind-host: 0.0.0.0taskmanager.memory.process.size: 1728mtaskmanager.numberOfTaskSlots: 3parallelism.default: 1jobmanager.execution.failover-strategy: regionrest.port: 8081rest.address: vm1rest.bind-address: vm1blob.server.port: 45579
7、启动集群
只需在 vm1 上启动集群模式即可
root@vm1:/usr/local/myapp/flink/flink-1.18.0# ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host vm1.
Starting taskexecutor daemon on host vm2.
Starting taskexecutor daemon on host vm3.
可以看到 vm2/vm3 的也会被启动,不需要手动去 vm2/vm3 再启动一次了
可以通过 java 的 jps 命令查看程序是否启动成功了
vm1 上

vm2 上

vm3 上

从图上可以分析出是以 Standalone 的集群模式启动了,其中 vm1 是 JobManager,vm2/vm3 是 TaskManager
8、页面查看状态
浏览器输入地址:http://192.168.141.120:8081/
可看到主页面

9、自定义一个任务
idea 创建一个 maven 项目
9.1、依赖及插件
<properties><flink.version>1.18.0</flink.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>
9.2、程序内容
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;// 无界流
public class UnboundStreamJob {public static void main(String[] args) throws Exception {//1 获取flink运行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//2.加载数据源为dataStream ,绑定客户机的9999端口,将这个网络端口发送的数据加载为dataStreamDataStreamSource<String> dataStream = environment.socketTextStream("192.168.141.122", 9999, "\n");//3.执行多个转换算子 ,SingleOutputStreamOperator是DataStreamSource子类SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, String>() {@Override//value:表示一个待处理的数据,在这里就是一行字符串//out: 用于输出结果的工具对象public void flatMap(String value, Collector<String> out) throws Exception {//拆分value,通过out输出结果String[] words = value.split("//s+"); //去除一个或多个空格for (String word : words) {out.collect(word);}}}) //执行一行字符串拆分为多个单词.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}}) //将多个单词转换为(单词,1) 这种tuple2对象.keyBy(0) //根据单词为key分组,0表示tuple2中的第一个属性,也就是单词.sum(1);//统计每组单词的个数, 1表示tuple2中第2个属性,也就是次数//4.通过sink算子输出结果result.print();//5.发布执行environment.execute("flinkWordCount"); //为任务起别名}}
9.3、程序说明
与 vm3 所在的 IP 为 192.168.141.122 在 9999 端口上进行 socket 通信,程序接收到消息后,进行计算并输出到控制台中
10、在 vm3 上开启一个 socket 通信(这一步一定要在上传任务之前进行)
netcat -lk 9999
11、提交任务(WebUI 方式)
11.1、打包刚才的程序,将打包好的 jar 包复制到某个好找的路径
11.2、打开网页中的 Submit New Job 选项,并点击 Add New

11.3、选择刚才打包的 jar 包进行上传,之后点击该 jar 包,填写启动类的路径,之后点击 Submit 提交按钮

11.4、正常情况下,任务就发布完成了,可以在 Task Managers 查看哪个节点的 Free Slots 相比 All Slots 减少了一个,那么这个节点的服务器就是执行该任务的服务器

12、提交任务(命令方式)
12.1、上传 jar 包到服务器中(任意一个服务器都行)
root@vm1:/usr/local/myapp/flink/task# ls
demo01-1.0-SNAPSHOT.jar
12.2、添加到任务中
../flink-1.18.0/bin/flink run -d -c xx.xx.xx.UnboundStreamJob demo01-1.0-SNAPSHOT.jar
说明:需要指定启动类
12.3、看到下面的信息,说明提交任务完成
Job has been submitted with JobID a893314f5efbb93bf3e6edefa578fd35
13、测试
13.1、点击该服务器,其中的 Stdout 就是控制台输出的地方
我们在 vm3 中开启的 socket 通信中,发送一条消息

13.2、回到页面中,刷新下控制台输出,会发现多了一个输出信息

13.3、至此,测试就完成了
相关文章:
flink的安装与使用(ubuntu)
组件版本 虚拟机:ubuntu-20.04.6-live-server-amd64.iso flink:flink-1.18.0-bin-scala_2.12.tgz jdk:jdk-8u291-linux-x64.tar flink 下载 1、官网:https://flink.apache.org/downloads/ 2、清华镜像:https://mirr…...
容器:软件性能测试的最佳环境
容器总体上提供了一种经济的和可扩展的方法来测试产品在实际情况下的性能,同时还能保持较低的资源成本和开销成本。 软件性能和可伸缩性是我们谈论应用程序开发时经常遇到的话题。一个很大的原因是应用程序的性能和可伸缩性直接影响其在市场上的成功。一个应用程序…...
【Qt控件之QMovie】详解
Qt控件之QMovies 概述公共类型属性公共函数公共槽函数信号静态公共成员示例使用场景 概述 QMovie类是一个方便的类,用于播放具有QImageReader的动画。此类用于显示没有声音的简单动画。如果您想显示视频和媒体内容,请改用Qt多媒体框架Qt Multimedia mul…...
Star History 九月开源精选 |开源 GitHub Copilot 替代
虽然大火了近一年,但是截至目前 AI 唯一破圈的场景是帮助写代码(谷歌云旗下的 DORA 年度报告也给 AI 泼了盆冷水)。不过对于软件开发来说,生成式人工智能绝对已经是新的标配。 本期 Star History 收集了一些开源 GitHub Copilot …...
【Rabbit MQ】Rabbit MQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制
文章目录 前言:消息的可靠性问题一、生产者消息的确认1.1 生产者确认机制1.2 实现生产者消息的确认1.3 验证生产者消息的确认 二、消息的持久化2.1 演示消息的丢失2.2 声明持久化的交换机和队列2.3 发送持久化的消息 三、消费者消息的确认3.1 配置消费者消息确认3.2…...
C++设计模式_25_Interpreter 解析器
Interpreter 解析器被归为“领域规则”模式。Interpreter模式比较适合简单的文法表示,应用场景是比较有限的,解决问题的思路和场景都是一样的。 文章目录 1. “领域规则”模式1.1 典型模式2. 动机( Motivation)3. 代码演示Interpreter 解析器模式4. 模式定义5. 结构( Structu…...
能源化工过程-故障诊断数据集初探-田纳西-伊斯曼过程数据集
1. 田纳西-伊斯曼过程(TE)数据集简介 整个TE数据集由训练集和测试集构成,TE集中的数据由22次不同的仿真运行数据构成,TE集中每个样本都有52个观测变量。d00.dat至d21.dat为训练集样本,d00_te.dat至d21_te.dat为测试集样本。d00.dat和d00_te.dat为正常工况下的样本。d00.d…...
【Linux】安装配置解决CentosMobaXterm的使用及Linux常用命令以及命令模式
目录 Centos的介绍 centos安装配置&MobaXterm 创建 安装 编辑 配置 编辑 MobaXterm使用 Linux常用命令&模式 常用命令 vi或vim编辑器 三种模式 命令模式 编辑模式 末行模式 拍照备份 Centos的介绍 CentOS(Community Enterprise Op…...
一台服务器安装两个mysql、重置数据库用于测试使用
文章目录 一、切数据库数据存储文件夹已经存在数据库数据文件夹新建数据库数据文件夹 二、安装第二个mysql安装新数据库初始化数据库数据启动数据库关闭数据库 三、mysqld_multi单机多实例部署参考文档 一、切数据库数据存储文件夹 这个方法可以让你不用安装新的数据库&#x…...
JS动态转盘可手动设置份数与概率(详细介绍)
这个案例是我老师布置的一项作业,老师已详细讲解,本人分享给大家,详细为你们介绍如何实现。 我们转盘使用线段来实现 <!DOCTYPE html> <html> <head><meta charset"utf-8"><title></title>&l…...
在k8s中,etcd有什么作用?
在Kubernetes(K8s)中,etcd 是一个关键的组件,它扮演着集群状态存储的角色,具有以下作用: 分布式键值存储:etcd 是一个分布式键值存储系统,用于存储整个 Kubernetes 集群的配置信息、…...
conda配置虚拟环境相关记录
#教程 创建虚拟环境 创建 conda create --name yourEnv python3.7.5--name:也可以缩写为-n,【yourEnv】是新创建的虚拟环境的名字,创建完,可以装anaconda的目录下找到envs/yourEnv 目录python3.7.5:是python的版本号…...
数据库的本质永远都不会改变基础语句(第二十二课)
JAVA与Mysql._java数据库和mysql_真正的醒悟的博客-CSDN博客...
Object转List<>,转List<Map<>>
这样就不会局限在转换到List<Map<String,Object>>这一种类型上了.可以转换成List<Map<String,V>>上等,进行泛型转换虽然多了一个参数,但是可以重载啊注: 感觉field.get(key) 这里处理的不是很好,如果有更好的办法可以留言 public static <K, V> …...
React使用富文本CKEditor 5,上传图片并可设置大小
上传图片 基础使用(标题、粗体、斜体、超链接、缩进段落、有序无序、上传图片) 官网查看:https://ckeditor.com/docs/ckeditor5/latest/installation/integrations/react.html 安装依赖 npm install --save ckeditor/ckeditor5-react cked…...
【工具使用】批量修改文件夹的时间操作
一,简介 在工作过程中,有时需要修改文件夹的时间,本文分别介绍如何使用PowerShell修改文件夹的时间为指定时间或者当前时间。 二,操作步骤 请注意,在运行任何更改文件和文件夹时间的命令之前,请确保你有…...
Android Snackbar
1.Snackbar Snackbar是Material Design中的一个控件,用来代替Toast。Snackbar是一个类似Toast的快速弹出消息提示的控件。Snackbar在显示上比Toast丰富,而且提供了用户交互的接口。 ①默认情况下,Snackbar显示在屏幕底部,它出现…...
详解API接口如何安全的传输数据(内附商品详情API接口接入方式)
概述 API接口的安全传输是确保数据在API请求和响应之间的传输过程中不被截获、篡改或泄露的重要步骤。以下是一些用于增强API接口安全传输的常见技术和最佳实践: 使用HTTPS:使用HTTPS协议而不是HTTP,以确保数据在传输过程中的安全性。HTTPS使…...
网工内推 | 大专以上,福利待遇好,IE认证优先(云厂商)
01 主动脉科技有限公司 招聘岗位:网络工程师 职责描述: 1.负责云计算,IDC,BGP网络,通过团队协作,构建云业务后台技术支持服务体系。 2.通过工单、其他通讯工具等线上方式完成对客户的实施售后支持&#x…...
Python time strptime()和strftime()
1 strptime()方法 根据指定的格式把一个时间字符串解析为时间元组 重要的时间日期格式化符号 %y 两位数的年份表示(00-99) %Y 四位数的年份表示(000-9999) %m 月份(01-12) %d 月内中的一天(0-…...
【杂谈】-递归进化:人工智能的自我改进与监管挑战
递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...
可靠性+灵活性:电力载波技术在楼宇自控中的核心价值
可靠性灵活性:电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中,电力载波技术(PLC)凭借其独特的优势,正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据,无需额外布…...
聊聊 Pulsar:Producer 源码解析
一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者…...
使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装
以下是基于 vant-ui(适配 Vue2 版本 )实现截图中照片上传预览、删除功能,并封装成可复用组件的完整代码,包含样式和逻辑实现,可直接在 Vue2 项目中使用: 1. 封装的图片上传组件 ImageUploader.vue <te…...
Mobile ALOHA全身模仿学习
一、题目 Mobile ALOHA:通过低成本全身远程操作学习双手移动操作 传统模仿学习(Imitation Learning)缺点:聚焦与桌面操作,缺乏通用任务所需的移动性和灵活性 本论文优点:(1)在ALOHA…...
Web后端基础(基础知识)
BS架构:Browser/Server,浏览器/服务器架构模式。客户端只需要浏览器,应用程序的逻辑和数据都存储在服务端。 优点:维护方便缺点:体验一般 CS架构:Client/Server,客户端/服务器架构模式。需要单独…...
c# 局部函数 定义、功能与示例
C# 局部函数:定义、功能与示例 1. 定义与功能 局部函数(Local Function)是嵌套在另一个方法内部的私有方法,仅在包含它的方法内可见。 • 作用:封装仅用于当前方法的逻辑,避免污染类作用域,提升…...
C++实现分布式网络通信框架RPC(2)——rpc发布端
有了上篇文章的项目的基本知识的了解,现在我们就开始构建项目。 目录 一、构建工程目录 二、本地服务发布成RPC服务 2.1理解RPC发布 2.2实现 三、Mprpc框架的基础类设计 3.1框架的初始化类 MprpcApplication 代码实现 3.2读取配置文件类 MprpcConfig 代码实现…...
Python训练营-Day26-函数专题1:函数定义与参数
题目1:计算圆的面积 任务: 编写一个名为 calculate_circle_area 的函数,该函数接收圆的半径 radius 作为参数,并返回圆的面积。圆的面积 π * radius (可以使用 math.pi 作为 π 的值)要求:函数接收一个位置参数 radi…...
篇章二 论坛系统——系统设计
目录 2.系统设计 2.1 技术选型 2.2 设计数据库结构 2.2.1 数据库实体 1. 数据库设计 1.1 数据库名: forum db 1.2 表的设计 1.3 编写SQL 2.系统设计 2.1 技术选型 2.2 设计数据库结构 2.2.1 数据库实体 通过需求分析获得概念类并结合业务实现过程中的技术需要&#x…...
