【极数系列】Flink配置参数如何获取?(06)
文章目录
- gitee码云地址
- 简介概述
- 01 配置值来自.properties文件
- 1.通过路径读取
- 2.通过文件流读取
- 3.通过IO流读取
- 02 配置值来自命令行
- 03 配置来自系统属性
- 04 注册以及使用全局变量
- 05 Flink获取参数值Demo
- 1.项目结构
- 2.pom.xml文件如下
- 3.配置文件
- 4.项目主类
- 5.运行查看相关日志
gitee码云地址
直接下载解压可用 https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:GetParamsStreamingJob
简介概述
1.几乎所有的批和流的 Flink 应用程序,都依赖于外部配置参数。这些配置参数可以用于指定输入和输出源(如路径或地址)、系统参数(并行度,运行时配置)和特定的应用程序参数(通常使用在用户自定义函数)。
2.为解决以上问题,Flink 提供一个名为 Parametertool
的简单公共类,其中包含了一些基本的工具。请注意,这里说的 Parametertool
并不是必须使用的。Commons CLI 和 argparse4j 等其他框架也可以非常好地兼容 Flink。
3.**ParameterTool**定义了一组静态方法,用于读取配置信息。该工具类内部使用了
Map` 类型,这样使得它可以很容易地与你的配置集成在一起。
01 配置值来自.properties文件
1.通过路径读取
//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";//方式一:直接使用内置工具类
ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);
String jobName_01 = parameter_01.get("jobName");
logger.info("方式一:读取配置文件中指定的key值={}",jobName_01);
2.通过文件流读取
//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";//方式二:使用文件
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter_02 = ParameterTool.fromPropertiesFile(propertiesFile);
String jobName_02 = parameter_02.get("jobName");
logger.info("方式二:读取配置文件中指定的key值={}",jobName_02);
3.通过IO流读取
//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";//方式三:使用IO流
InputStream propertiesFileInputStream = new FileInputStream(new File(propertiesFilePath));
ParameterTool parameter_03 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
String jobName_03 = parameter_03.get("jobName");
logger.info("方式三:读取配置文件中指定的key值={}",jobName_03);
02 配置值来自命令行
tips:在idea的命令行传参,格式:–jobName program_job_aurora
ParameterTool parameter_04 = ParameterTool.fromArgs(args);
String jobName_04 = parameter_04.get("jobName");
logger.info("方式四:命令行传参key值={}",jobName_04);
03 配置来自系统属性
tips:在idea的的jvm系统参数设置,格式:-Dinput=hdfs:///mydata
//方式五:获取jvm参数值
ParameterTool parameter_05 = ParameterTool.fromSystemProperties();
String jobName_05 = parameter_05.get("input");
logger.info("方式五:获取jvm参数key值={}",jobName_05);
04 注册以及使用全局变量
注意:Flink全局变量仅支持在富函数中使用,即Rich开头的类使用
//定义文件路径
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";//直接使用内置工具类获取参数
ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);//方式六:注册全局参数final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(parameter_01);//在任意富函数中均可以获取,注意!注意!注意!只有富文本函数才可以使用//1.创建富函数RichFlatMapFunction<String, String> richFlatMap = new RichFlatMapFunction<>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {//获取运行环境ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();//获取对应的值String jobName = parameters.getRequired("jobName");logger.info("方式六:获取全局注册参数key值={}",jobName_05);}};//2.创建数据集ArrayList<String> list = new ArrayList<>();list.add("001");list.add("002");list.add("003");//3.把有限数据集转换为数据源DataStreamSource<String> dataStreamSource = env.fromCollection(list).setParallelism(1);//4.执行富文本处理dataStreamSource.flatMap(richFlatMap);//5.启动程序env.execute();
05 Flink获取参数值Demo
1.项目结构
2.pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xsy</groupId><artifactId>aurora_flink</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>11</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json数据格式处理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.18.0</flink.version><!--scala版本--><scala.binary.version>2.11</scala.binary.version><!--log4j依赖--><log4j.version>2.17.1</log4j.version></properties><!--通用依赖--><dependencies><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--================================集成外部依赖==========================================--><!--集成日志框架 start--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!--集成日志框架 end--></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build><!--配置Maven项目中需要使用的远程仓库--><repositories><repository><id>aliyun-repos</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></repository></repositories><!--用来配置maven插件的远程仓库--><pluginRepositories><pluginRepository><id>aliyun-plugin</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories></project>
3.配置文件
(1)application.properties
jobName=job_aurora
jobMemory=1024
taskName=task_aurora
(2)log4j2.properties
rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp
4.项目主类
package com.aurora;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;/*** @description flink获取外部参数作业** @author 浅夏的猫* @datetime 15:54 2024/1/28
*/
public class GetParamsStreamingJob {private static final Logger logger = LoggerFactory.getLogger(GetParamsStreamingJob.class);public static void main(String[] args) throws Exception {//定义文件路径String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";//方式一:直接使用内置工具类ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);String jobName_01 = parameter_01.get("jobName");logger.info("方式一:读取配置文件中指定的key值={}",jobName_01);//方式二:使用文件File propertiesFile = new File(propertiesFilePath);ParameterTool parameter_02 = ParameterTool.fromPropertiesFile(propertiesFile);String jobName_02 = parameter_02.get("jobName");logger.info("方式二:读取配置文件中指定的key值={}",jobName_02);//方式三:使用IO流InputStream propertiesFileInputStream = new FileInputStream(new File(propertiesFilePath));ParameterTool parameter_03 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);String jobName_03 = parameter_03.get("jobName");logger.info("方式三:读取配置文件中指定的key值={}",jobName_03);//方式四:命令行传参格式:--jobName program_job_auroraParameterTool parameter_04 = ParameterTool.fromArgs(args);String jobName_04 = parameter_04.get("jobName");logger.info("方式四:命令行传参key值={}",jobName_04);//方式五:获取jvm参数值ParameterTool parameter_05 = ParameterTool.fromSystemProperties();String jobName_05 = parameter_05.get("input");logger.info("方式五:获取jvm参数key值={}",jobName_05);//方式六:注册全局参数final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(parameter_01);//在任意富函数中均可以获取,注意!注意!注意!只有富文本函数才可以使用//1.创建富函数RichFlatMapFunction<String, String> richFlatMap = new RichFlatMapFunction<>() {@Overridepublic void flatMap(String s, Collector<String> collector) throws Exception {//获取运行环境ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();//获取对应的值String jobName = parameters.getRequired("jobName");logger.info("方式六:获取全局注册参数key值={}",jobName_05);}};//2.创建数据集ArrayList<String> list = new ArrayList<>();list.add("001");list.add("002");list.add("003");//3.把有限数据集转换为数据源DataStreamSource<String> dataStreamSource = env.fromCollection(list).setParallelism(1);//4.执行富文本处理dataStreamSource.flatMap(richFlatMap);//5.启动程序env.execute();}}
5.运行查看相关日志
相关文章:

【极数系列】Flink配置参数如何获取?(06)
文章目录 gitee码云地址简介概述01 配置值来自.properties文件1.通过路径读取2.通过文件流读取3.通过IO流读取 02 配置值来自命令行03 配置来自系统属性04 注册以及使用全局变量05 Flink获取参数值Demo1.项目结构2.pom.xml文件如下3.配置文件4.项目主类5.运行查看相关日志 gite…...

【docker】linux系统docker的安装及使用
一、docker应用的安装 1.1 安装方式 Docker的自动化安装,即使用提供的一键安装的脚本,进行安装。 官方的一键安装方式:curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun 国内 daocloud一键安装命令:curl -s…...

【C++】一题掌握空指针
今天看见一道面试题,比较有意思,这一分享出来: 1.下面程序能编译通过吗? 2.下面程序会崩溃吗?在哪里崩溃 class A {public:void PrintA(){cout<<_a<<endl;}void Show(){cout<<"Show()"&…...

初识HarmonyOS
一、HarmonyOS VS Android 相信很多关注鸿蒙的⼈,都会关注的⼀个焦点话题,那就是HarmonyOS是不是Android的套壳,对于这个话题,我只想阐明以下⼏个观点: HarmonyOS并不是Android的替代品,HarmonyOS与Android并⾮同⼀个赛道。HarmonyOS⽬前缺乏⽣态⽀持这⼀点远远⽐不上An…...

备战蓝桥杯---二分(入门)
话不多说,先来个模板题来回顾一下上次讲的: 下面是AC代码: 下面进入正题: 本题对1,2行与3,4行组合,再用二分查找即可实现n^2logn的复杂度。 下面是AC代码: 接题: 让我们…...

开发 Chrome 浏览器插件时进行 Vue3+Vite 多页面多入口配置
使用 Vite 开发 Chrome 插件时,构建多页面以及多 js 文件 因为发现 Vite 多页面构建有很多分歧以及问题点,所以我把我在 Chrome 插件开发上面使用到的 Vite 多页面以及多入口文件构建配置单独拿出来 开发 Chrome 插件是,一般会需要一个 popup…...

MacOS X 中 OpenGL 环境搭建 Makefile的方式
1,预备环境 安装 brew: /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" 安装glfw: brew install glfw 安装glew: brew install glew 2.编译 下载源代码…...

前端工程化之:webpack1-6(编译过程)
一、webpack编译过程 webpack 的作用是将源代码编译(构建、打包)成最终代码。 整个过程大致分为三个步骤: 初始化编译输出 1.初始化 初始化时我们运行的命令 webpack 为核心包, webpack-cli 提供了 webpack 命令,通过…...

javaweb学习问题集
1 创建一个Javaweb项目 因为项目要放在tomcat10里运行,在添加tomcat10的依赖时,右键模块没有add frameworks support ,解决方法:按两下shift键,直接搜索 add frameworks support index.jsp文件我们已经不用了 我们在ideal上开发…...

java—AWT
AWT 课程:1、GUI编程简介_哔哩哔哩_bilibili 一.介绍 包含了很多类和接口!GUI!元素:窗口、按钮、文本框java.awt 二.窗口 1.构造 2.方法 // 实例化frame类Frame frame new Frame("这个一个框");// 设置可见性frame.…...

SQL注入-sqli-labs-master第一关
实验环境: Nginx.1.15.11 MySQL:5.7.26 实验步骤: 1.第一步: 在id1后加入一个闭合符号,如果报错,再在后面加上 -- 将后面注释掉,如果不报错,则证明为字符型。 http://127.0.0.1/…...

简述云原生基础定义及关键技术
云原生是什么 云原生是面向“云”而设计的应用,因此技术部分依赖于传统云计算的 3 层概念,基础设施即服务(IaaS)、平台即服务(PaaS)和软件即服务(SaaS)。 例如,敏捷的不可变基础设施交付类似于 IaaS,用来提供计算网络存储等基础资源,这些资源是可编程且不可变的,直…...

游戏中排行榜的后台实现
游戏中经常会有排行榜需求需要实现,例如常见的战力排行榜、积分排行榜等等。 排行榜一般会用到 Redis 来实现,原因是: Redis 基于内存操作,速度快Redis 提供了高效的有序集合 zset 例如创建一个名为 rank 的排行榜 # 为用户use…...

《动手学深度学习(PyTorch版)》笔记3.1
Chapter3 Linear Neural Networks 3.1 Linear Regression 3.1.1 Basic Concepts 我们通常使用 n n n来表示数据集中的样本数。对索引为 i i i的样本,其输入表示为 x ( i ) [ x 1 ( i ) , x 2 ( i ) , . . . , x n ( i ) ] ⊤ \mathbf{x}^{(i)} [x_1^{(i)}, x_2…...

【贪吃蛇:C语言实现】
文章目录 前言1.了解Win32API相关知识1.1什么是Win32API1.2设置控制台的大小、名称1.3控制台上的光标1.4 GetStdHandle(获得控制台信息)1.5 SetConsoleCursorPosition(设置光标位置)1.6 GetConsoleCursorInfo(获得光标…...

01.领域驱动设计:微服务设计为什么要选择DDD学习总结
目录 1、前言 2、软件架构模式的演进 3、微服务设计和拆分的困境 4、为什么 DDD适合微服务 5、DDD与微服务的关系 6、总结 1、前言 我们知道,微服务设计过程中往往会面临边界如何划定的问题,不同的人会根据自己对微服务的理 解而拆分出不同的微服…...

写静态页面——魅族导航_前端页面练习
0、效果: 1、html代码:: <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><…...

Go 命令行解析 flag 包之快速上手
本篇文章是 Go 标准库 flag 包的快速上手篇。 概述 开发一个命令行工具,视复杂程度,一般要选择一个合适的命令行解析库,简单的需求用 Go 标准库 flag 就够了,flag 的使用非常简单。 当然,除了标准库 flag 外&#x…...

React16源码: React中commitAllHostEffects内部的commitDeletion的源码实现
commitDeletion 1 )概述 在 react commit 阶段的 commitRoot 第二个while循环中调用了 commitAllHostEffects,这个函数不仅仅处理了新增节点,更新节点最后一个操作,就是删除节点,就需要调用 commitDeletion࿰…...

[机器学习]简单线性回归——梯度下降法
一.梯度下降法概念 2.代码实现 # 0. 引入依赖 import numpy as np import matplotlib.pyplot as plt# 1. 导入数据(data.csv) points np.genfromtxt(data.csv, delimiter,) points[0,0]# 提取points中的两列数据,分别作为x,y …...

2024年搭建幻兽帕鲁服务器价格多少?如何自建Palworld?
自建幻兽帕鲁服务器租用价格表,2024阿里云推出专属幻兽帕鲁Palworld游戏优惠服务器,配置分为4核16G和4核32G服务器,4核16G配置32.25元/1个月、3M带宽96.75元/1个月、8核32G配置10M带宽90.60元/1个月,8核32G配置3个月271.80元。ECS…...

『OpenCV-Python|鼠标作画笔』
Opencv-Python教程链接:https://opencv-python-tutorials.readthedocs.io/ 本文主要介绍OpenCV-Python如何将鼠标作画笔绘制圆或者矩形。 示例一:图片上双击的位置绘制一个圆圈 首先创建一个鼠标事件回调函数,鼠标事件发生时就会被执行。鼠标…...

关于如何利用ChatGPT提高编程效率的
自从去年ChatGPT3.5推出以后,这一年时间在编程过程中我也在慢慢熟悉人工智能的使用,目前来看即使是免费的ChatGPT3.5对于编程效率的提升也是有很大帮助的,虽然在使用过程中确实出现了一些问题,本文记录下我的一些心得体会和用法。…...

Excel VBA ——从MySQL数据库中导出一个报表-笔记
本文主要涉及: VBA中数据库连接参数改成从配置文件获取 VBA连接MySQL数据库 VBA读MySQL数据库 演示两种写入工作簿的代码实现系统环境: Windows 10 64bit Excel 365 64bit WAMP(3.2.2.2 64bit)集成的MariaDB版本为10.4.10&#…...

金融OCR领域实习日志(一)——OCR技术从0到1全面调研
一、OCR基础 任务要求: 工作原理 OCR(Optical Character Recognition,光学字符识别)是指电子设备(例如扫描仪或数码相)检查纸上打印的字符,经过检测暗、亮的模式肯定其形状,而后用…...

ELK日志解决方案
ELK日志解决方案 ELK套件日志系统应该是Elasticsearch使用最广泛的场景之一了,Elasticsearch支持海量数据的存储和查询,特别适合日志搜索场景。广泛使用的ELK套件(Elasticsearch、Logstash、Kibana)是日志系统最经典的案例,使用Logstash和Be…...

嵌入式学习-驱动
嵌入式的一些基本概念 CPU与MCU的区别 CPU(中央处理器,central processing unit) 指集成了运算器、控制器、寄存器、高速缓存等功能模块的芯片,负责执行计算机程序指令的处理器。MCU(单片微型计算机或单片机,microco…...

系统架构17 - 软件工程(5)
软件工程 软件测试测试原则测试方法静态测试动态测试黑盒测试白盒测试灰盒测试自动化测试 测试阶段单元测试集成测试系统测试性能测试验收测试其它测试AB测试Web测试链接测试表单测试 测试用例设计黑盒测试用例白盒测试用例 调试 系统维护遗留系统系统转换转换方式数据转换与迁…...

空气质量预测 | Python实现基于线性回归、Lasso回归、岭回归、决策树回归的空气质量预测模型
文章目录 效果一览文章概述源码设计参考资料效果一览 文章概述 政府机构使用空气质量指数 (AQI) 向公众传达当前空气污染程度或预测空气污染程度。 随着 AQI 的上升,公共卫生风险也会增加。 不同国家有自己的空气质量指数,对应不同国家的空气质量标准。 对于空气质量预测,…...

MYSQL数据库基本操作-DQL-基本查询
一.概念 数据库管理系统一个重要功能就是数据查询。数据查询不应是简单返回数据库中存储的数据,还应该根据需要对数据进行筛选以及确定数据以什么样的格式显示。 MySQL提供了功能强大,灵活的语句来实现这些操作。 MySQL数据库使用select语句来查询数据…...