Flink java 工具类
flink 环境构建工具类
public class ExecutionEnvUtil {/*** 从配置文件中读取配置(生效优先级:配置文件<命令行参数<系统参数)** @param args* @return org.apache.flink.api.java.utils.ParameterTool* @date 2023/8/4 - 10:05 AM*/public static ParameterTool createParameterTool(final String[] args) throws Exception {return ParameterTool.fromPropertiesFile(ExecutionEnvUtil.class.getResourceAsStream(BaseConstants.PROPERTIES_FILE_NAME)).mergeWith(ParameterTool.fromArgs(args)).mergeWith(ParameterTool.fromSystemProperties());}/*** flink 环境配置** @param parameterTool* @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment* @date 2023/8/4 - 11:10 AM*/public static StreamExecutionEnvironment prepare(ParameterTool parameterTool) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(parameterTool.getInt(PropertiesConstants.STREAM_PARALLELISM, 12));env.getConfig();env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, Time.seconds(60)));if (parameterTool.getBoolean(PropertiesConstants.STREAM_CHECKPOINT_ENABLE, true)) {CheckPointUtil.setCheckpointConfig(env,parameterTool);// 取消作业时保留外部化 Checkpoint 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);}env.getConfig().setGlobalJobParameters(parameterTool);env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);return env;}
}
checkpoint 工具类
public class CheckPointUtil {private static final String CHECKPOINT_MEMORY = "memory";private static final String CHECKPOINT_FS = "fs";private static final String CHECKPOINT_ROCKETSDB = "rocksdb";/*** 默认的checkpoint 存储地址*/private static final String CHECKPOINT_DEFAULT = "default";/*** 设置flink check point** @param env* @param parameterTool* @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment* @date 2023/8/4 - 10:49 AM*/public static StreamExecutionEnvironment setCheckpointConfig(StreamExecutionEnvironment env, ParameterTool parameterTool) throws Exception{// 根据类型,设置合适的状态后端String stateBackendType = parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_TYPE, CHECKPOINT_DEFAULT);if (CHECKPOINT_MEMORY.equalsIgnoreCase(stateBackendType)) {//1、state 存放在内存中,默认是 5MStateBackend stateBackend = new MemoryStateBackend(5 * 1024 * 1024 * 100);env.setStateBackend(stateBackend);}else if (CHECKPOINT_FS.equalsIgnoreCase(stateBackendType)) {StateBackend stateBackend = new FsStateBackend(new URI(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR)), 0, true);env.setStateBackend(stateBackend);}else if (CHECKPOINT_ROCKETSDB.equalsIgnoreCase(stateBackendType)) {RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR), true);env.setStateBackend(rocksDBStateBackend);}//设置 checkpoint 周期时间env.enableCheckpointing(parameterTool.getLong(PropertiesConstants.STREAM_CHECKPOINT_INTERVAL, 60000));//高级设置(这些配置也建议写成配置文件中去读取,优先环境变量)// 设置 exactly-once 模式env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置 checkpoint 最小间隔 500 msenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(2*60000);// 设置 checkpoint 必须在n分钟内完成,否则会被丢弃env.getCheckpointConfig().setCheckpointTimeout(15*60000);// 设置 checkpoint 失败时,任务不会 fail,可容忍3次连续失败env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);// 设置 checkpoint 的并发度为 1env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);return env;}
}
构建kafak source 、sink
/*** 构建 source kafka** @param parameterTool* @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer<java.lang.String>* @date 2023/8/4 - 2:41 PM*/private static FlinkKafkaConsumer<String> buildSourceKafka(ParameterTool parameterTool){Properties props = KafkaConfigUtil.buildSourceKafkaProps(parameterTool);// 正则表达式消费FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(Pattern.compile(parameterTool.get(PropertiesConstants.KAFKA_SOURCE_TOPIC)),new SimpleStringSchema(),props);kafkaConsumer.setCommitOffsetsOnCheckpoints(true);// 从最开始的位置开始消费if(parameterTool.getBoolean(PropertiesConstants.KAFKA_START_FROM_FIRST, false)){kafkaConsumer.setStartFromEarliest();}else{kafkaConsumer.setStartFromGroupOffsets();}return kafkaConsumer;}/*** 构建 sink kafka** @param parameterTool* @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<com.alibaba.fastjson.JSONObject>* @date 2023/8/16 - 11:38 AM*/private static FlinkKafkaProducer<JSONObject> buildSinkKafka(ParameterTool parameterTool){Properties props = KafkaConfigUtil.buildSinkKafkaProps(parameterTool);return new FlinkKafkaProducer<>(parameterTool.get(PropertiesConstants.KAFKA_SINK_DEFAULT_TOPIC), (KafkaSerializationSchema<JSONObject>) (element, timestamp) ->new ProducerRecord<>(element.getString(BaseConstants.PARAM_LOG_TYPE), element.toJSONString().getBytes()),props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);}
kafka 工具类
public class KafkaConfigUtil {/*** 设置 kafka 配置** @param parameterTool* @return java.util.Properties* @date 2023/8/4 - 2:39 PM*/public static Properties buildSourceKafkaProps(ParameterTool parameterTool) {Properties props = parameterTool.getProperties();props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));props.put("group.id", parameterTool.get(PropertiesConstants.KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));props.put("flink.partition-discovery.interval-millis", "10000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");//0817 - 消费kafka数据超时时间和尝试次数props.put("request.timeout.ms", "30000");props.put("retries", 5);return props;}/*** 构建 sink kafka 配置** @param parameterTool* @return java.util.Properties* @date 2023/8/14 - 5:54 PM*/public static Properties buildSinkKafkaProps(ParameterTool parameterTool) {Properties props = parameterTool.getProperties();props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_SINK_BROKERS));props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.setProperty(ProducerConfig.RETRIES_CONFIG, "5");props.put(ProducerConfig.ACKS_CONFIG, "1");props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");return props;}}
jdbc 工具类
public class JdbcDatasourceUtils {public static volatile Map<String, HikariDataSource> DATASOURCES = new ConcurrentHashMap<>();/*** 获取hikari数据库链接池** @param jdbcUrl* @param dsUname* @param dsPwd* @param dsDriver* @return com.zaxxer.hikari.HikariDataSource* @date 2023/8/9 - 2:23 PM*/public static HikariDataSource getHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {String md5Key = Md5Util.encrypt(jdbcUrl + " " + dsUname + " " + dsPwd + " " + dsDriver);if (!DATASOURCES.containsKey(md5Key)) {synchronized (JdbcDatasourceUtils.class) {if (!DATASOURCES.containsKey(md5Key)) {DATASOURCES.put(md5Key, createHikariDataSource(jdbcUrl, dsUname, dsPwd, dsDriver));}}}return DATASOURCES.get(md5Key);}/*** 构建hikari数据库链接池** @param jdbcUrl* @param dsUname* @param dsPwd* @param dsDriver* @return com.zaxxer.hikari.HikariDataSource* @date 2023/8/9 - 2:14 PM*/private static HikariDataSource createHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {HikariConfig config = new HikariConfig();config.setJdbcUrl(jdbcUrl);config.setUsername(dsUname);config.setPassword(dsPwd);config.setDriverClassName(dsDriver);// 从池返回的连接的默认自动提交,默认值:trueconfig.setAutoCommit(true);//只读config.setReadOnly(true);// 连接超时时间:毫秒,默认值30秒config.setConnectionTimeout(10000);// 最大连接数config.setMaximumPoolSize(32);// 最小空闲连接config.setMinimumIdle(16);// 空闲连接超时时间config.setIdleTimeout(600000);// 连接最大存活时间config.setMaxLifetime(540000);// 连接测试查询config.setConnectionTestQuery("SELECT 1");return new HikariDataSource(config);}/*** 按列加载数据** @param dataSource* @param sql* @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>* @date 2023/8/15 - 6:03 PM*/public static List<Map<String, Object>> loadDatas(HikariDataSource dataSource, String sql) {return loadSql(dataSource, sql, resultSet -> {List<Map<String, Object>> datas = new ArrayList<>();try {if (null == resultSet){return datas;}ResultSetMetaData metaData = resultSet.getMetaData();//组装返回值Map<String, Object> entry;while (resultSet.next()) {entry = new LinkedHashMap<>();// getColumnLabel 取重命名,getColumnName 原始字段名for (int i = 1; i <= metaData.getColumnCount(); i++) {entry.put(metaData.getColumnLabel(i), resultSet.getObject(i));}datas.add(entry);}} catch (Exception e) {e.printStackTrace();}return datas;});}/*** 加载数据遍历放入set集合** @param dataSource* @param sql* @param function* @return java.util.Set<R>* @date 2023/8/15 - 6:03 PM*/public static <R> Set<R> loadSetDatas(HikariDataSource dataSource, String sql, Function<Object, R> function) {return loadSql(dataSource, sql, resultSet -> {Set<R> datas = new LinkedHashSet<>();try {if (null == resultSet){return datas;}ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {for (int i = 1; i <= metaData.getColumnCount(); i++) {datas.add(function.apply(resultSet.getObject(i)));}}} catch (Exception e) {e.printStackTrace();}return datas;});}/*** 执行查询sql** @param dataSource* @param sql* @param function* @return R* @date 2023/8/15 - 6:03 PM*/private static <R> R loadSql(HikariDataSource dataSource, String sql, Function<ResultSet, R> function) {Connection connection = null;PreparedStatement preparedStatement = null;ResultSet resultSet = null;try {connection = dataSource.getConnection();preparedStatement = connection.prepareStatement(sql);resultSet = preparedStatement.executeQuery();return function.apply(resultSet);} catch (Exception e){e.printStackTrace();} finally {if (connection != null){try {connection.close();} catch (SQLException e) {e.printStackTrace();}}if (preparedStatement != null){try {preparedStatement.close();} catch (SQLException e) {e.printStackTrace();}}if (resultSet != null){try {resultSet.close();} catch (SQLException e) {e.printStackTrace();}}}return function.apply(null);}
}
相关文章:
Flink java 工具类
flink 环境构建工具类 public class ExecutionEnvUtil {/*** 从配置文件中读取配置(生效优先级:配置文件<命令行参数<系统参数)** param args* return org.apache.flink.api.java.utils.ParameterTool* date 2023/8/4 - 10:05 AM*/public static …...
2023年你需要知道的最佳预算Wi-Fi路由器清单
买新路由器?让我们帮助你挑选一些既有很多功能和性能,又经济实惠的产品。 购买Wi-Fi路由器并不一定要倾家荡产,尤其是如果你不需要一个提供数百Mbps速度的路由器。廉价路由器是一个很好的选择,它包含了许多功能,不会对钱包造成影响。 一、2023年在廉价Wi-Fi路由器中寻找…...
Go语言基础之流程控制
流程控制是每种编程语言控制逻辑走向和执行次序的重要部分,流程控制可以说是一门语言的“经脉”。 Go语言中最常用的流程控制有if和for,而switch和goto主要是为了简化代码、降低重复代码而生的结构,属于扩展类的流程控制。 if else(分支结构…...
Git 安装、配置并把项目托管到码云 Gitee
错误聚集篇: 由于我 git 碰见大量错误,所以集合了一下: git 把项目托管到 码云出现的错误集合_打不着的大喇叭的博客-CSDN博客https://blog.csdn.net/weixin_49931650/article/details/132460492 1、安装 git 1.1 安装步骤 1.1.1 下载对应…...
C++信息学奥赛1147:最高分数的学生姓名
#include <iostream> #include <string> using namespace std; int main() {int n;// 输入一个整数ncin>>n;cin.ignore();string arr;string str;int max0;int fen;// 循环读取n个评分和对应的字符串for(int i0;i<n;i){cin>>fen>>arr;if(fen&…...
STM32使用PID调速
STM32使用PID调速 PID原理 PID算法是一种闭环控制系统中常用的算法,它结合了比例(P)、积分(I)和微分(D)三个环节,以实现对系统的控制。它的目的是使 控制系统的输出值尽可能接近预…...
【UE5:CesiumForUnreal】——3DTiles数据属性查询和单体高亮
目录 0.1 效果展示 0.2 实现步骤 1 数据准备 2 属性查询 2.1 射线检测 2.2 获取FeatureID 2.3 属性查询 2.4 属性显示 3 单体高亮 3.1 构建材质参数集 3.2 材质参数设置 3.3 添加Cesium Encode Metadata插件 3.4 从纹理中取出特定FeatureId属性信息 3.5 创建…...
无涯教程-PHP - 返回类型声明
在PHP 7中,引入了一个新函数返回类型声明,返回类型声明指定函数应返回的值的类型,可以声明返回类型的以下类型。 intfloatbooleanstringinterfacesarraycallable 有效返回类型 <?phpdeclare(strict_types1);function returnIntValue(i…...
DOS常见命令
DOS常见命令 DOS是什么如何打开DOScmd常见的命令集合 DOS是什么 DOC命令是我们浏览器中的终端 ,但不同的是我们打开软件的方式 使用的是点击文件图标,点击图标的同时 我们也相当于使用一个命令 只是我们看不见而已 在电脑上操作的时候 通常都是使用命令…...
Qt应用开发(拓展篇)——示波器/图表 QCustomPlot
一、介绍 QCustomPlot是一个用于绘图和数据可视化的Qt C小部件。它没有进一步的依赖关系,提供友好的文档帮助。这个绘图库专注于制作好看的,出版质量的2D绘图,图形和图表,以及为实时可视化应用程序提供高性能。 QCustomPl…...
【精度丢失】后端接口返回的Long类型参数,不同浏览器解析出的结果不一样
1、业务背景 有个同事找我帮他看一个问题,他给前端提供了一个接口。 这个接口是用来反查id的,他这里这个参数正常的返回值应该是 283232039247028226。 但前端反馈他,前端在浏览器(火狐)获取的值是 283232039247028…...
2023年国赛 高教社杯数学建模思路 - 案例:感知机原理剖析及实现
文章目录 1 感知机的直观理解2 感知机的数学角度3 代码实现 4 建模资料 # 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 感知机的直观理解 感知机应该属于机器学习算法中最简单的一种算法,其…...
java-红黑树
节点内部存储 红黑树规则 或者: 红黑树添加节点规则: 添加节点默认是红色的(效率高) 红黑树示例 注:红黑树增删改查性能都很好...
vue2 vue中的常用指令
一、为什么要学习Vue 1.前端必备技能 2.岗位多,绝大互联网公司都在使用Vue 3.提高开发效率 4.高薪必备技能(Vue2Vue3) 二、什么是Vue 概念:Vue (读音 /vjuː/,类似于 view) 是一套 **构建用户界面 ** 的 渐进式 …...
AI驱动下的智能制造:工业自动化的新纪元
随着人工智能(AI)技术的持续进步,其在工业自动化领域的影响日益显著。作为现代科技的代表,AI不仅为各行业带来了前所未有的商机和技术思路,更在工业自动化领域中引发了一场深刻的变革。本文将深入探讨AI对智能制造的影…...
docker 命令
一、docker命令 1、镜像保存 docker save imageid -o modelzoozl.tar #把镜像保存到本地 docker load -i dockername #把tar包load下来,load成镜像 docker export CONTAINERID/CONTAINERNAME -o modelzoozl.tar #把启动着的镜像导出 docker import modelzo…...
2023年高教社杯数学建模思路 - 复盘:光照强度计算的优化模型
文章目录 0 赛题思路1 问题要求2 假设约定3 符号约定4 建立模型5 模型求解6 实现代码 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 问题要求 现在已知一个教室长为15米,宽为12米&…...
生成式人工智能的潜在有害影响与未来之路(二)
利润高于隐私:不透明数据收集增加 背景和风险 生成型人工智能工具建立在各种大型、复杂的机器学习模型之上,这些模型需要大量的训练数据才能发挥作用。对于像ChatGPT这样的工具,数据包括从互联网上抓取的文本。对于像Lensa或Stable Diffusi…...
如何自己实现一个丝滑的流程图绘制工具(三)自定义挂载vue组件
背景 bpmn-js是个流程图绘制的工具,但是现在我希望实现的是,绘制的不是节点而是一个vue组件。 保留线的拖拽和连接。 方案 那就说明不是依赖于节点的样式,找到了他有个属性,就是类似覆盖节点的操作。 思路就是用vue组件做遮罩&…...
UNIAPP调用API接口
API:开发者可以通过这些接口与其它程序进行交互,获取所需数据或者执行指定操作。 网络请求 API: UniApp 中内置了网络请求 API,方便调用 uni.request uni.uploadFile uni.request 接口主要用于实现网络请求。GET 和 POST 是使用最普遍的两种…...
突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合
强化学习(Reinforcement Learning, RL)是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程,然后使用强化学习的Actor-Critic机制(中文译作“知行互动”机制),逐步迭代求解…...
阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
【论文笔记】若干矿井粉尘检测算法概述
总的来说,传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度,通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...
【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...
管理学院权限管理系统开发总结
文章目录 🎓 管理学院权限管理系统开发总结 - 现代化Web应用实践之路📝 项目概述🏗️ 技术架构设计后端技术栈前端技术栈 💡 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 🗄️ 数据库设…...
Java + Spring Boot + Mybatis 实现批量插入
在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法:使用 MyBatis 的 <foreach> 标签和批处理模式(ExecutorType.BATCH)。 方法一:使用 XML 的 <foreach> 标签ÿ…...
并发编程 - go版
1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...
探索Selenium:自动化测试的神奇钥匙
目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...
恶补电源:1.电桥
一、元器件的选择 搜索并选择电桥,再multisim中选择FWB,就有各种型号的电桥: 电桥是用来干嘛的呢? 它是一个由四个二极管搭成的“桥梁”形状的电路,用来把交流电(AC)变成直流电(DC)。…...
C++_哈希表
本篇文章是对C学习的哈希表部分的学习分享 相信一定会对你有所帮助~ 那咱们废话不多说,直接开始吧! 一、基础概念 1. 哈希核心思想: 哈希函数的作用:通过此函数建立一个Key与存储位置之间的映射关系。理想目标:实现…...
