Flink java 工具类
flink 环境构建工具类
public class ExecutionEnvUtil {/*** 从配置文件中读取配置(生效优先级:配置文件<命令行参数<系统参数)** @param args* @return org.apache.flink.api.java.utils.ParameterTool* @date 2023/8/4 - 10:05 AM*/public static ParameterTool createParameterTool(final String[] args) throws Exception {return ParameterTool.fromPropertiesFile(ExecutionEnvUtil.class.getResourceAsStream(BaseConstants.PROPERTIES_FILE_NAME)).mergeWith(ParameterTool.fromArgs(args)).mergeWith(ParameterTool.fromSystemProperties());}/*** flink 环境配置** @param parameterTool* @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment* @date 2023/8/4 - 11:10 AM*/public static StreamExecutionEnvironment prepare(ParameterTool parameterTool) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(parameterTool.getInt(PropertiesConstants.STREAM_PARALLELISM, 12));env.getConfig();env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, Time.seconds(60)));if (parameterTool.getBoolean(PropertiesConstants.STREAM_CHECKPOINT_ENABLE, true)) {CheckPointUtil.setCheckpointConfig(env,parameterTool);// 取消作业时保留外部化 Checkpoint 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);}env.getConfig().setGlobalJobParameters(parameterTool);env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);return env;}
}
checkpoint 工具类
public class CheckPointUtil {private static final String CHECKPOINT_MEMORY = "memory";private static final String CHECKPOINT_FS = "fs";private static final String CHECKPOINT_ROCKETSDB = "rocksdb";/*** 默认的checkpoint 存储地址*/private static final String CHECKPOINT_DEFAULT = "default";/*** 设置flink check point** @param env* @param parameterTool* @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment* @date 2023/8/4 - 10:49 AM*/public static StreamExecutionEnvironment setCheckpointConfig(StreamExecutionEnvironment env, ParameterTool parameterTool) throws Exception{// 根据类型,设置合适的状态后端String stateBackendType = parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_TYPE, CHECKPOINT_DEFAULT);if (CHECKPOINT_MEMORY.equalsIgnoreCase(stateBackendType)) {//1、state 存放在内存中,默认是 5MStateBackend stateBackend = new MemoryStateBackend(5 * 1024 * 1024 * 100);env.setStateBackend(stateBackend);}else if (CHECKPOINT_FS.equalsIgnoreCase(stateBackendType)) {StateBackend stateBackend = new FsStateBackend(new URI(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR)), 0, true);env.setStateBackend(stateBackend);}else if (CHECKPOINT_ROCKETSDB.equalsIgnoreCase(stateBackendType)) {RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR), true);env.setStateBackend(rocksDBStateBackend);}//设置 checkpoint 周期时间env.enableCheckpointing(parameterTool.getLong(PropertiesConstants.STREAM_CHECKPOINT_INTERVAL, 60000));//高级设置(这些配置也建议写成配置文件中去读取,优先环境变量)// 设置 exactly-once 模式env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置 checkpoint 最小间隔 500 msenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(2*60000);// 设置 checkpoint 必须在n分钟内完成,否则会被丢弃env.getCheckpointConfig().setCheckpointTimeout(15*60000);// 设置 checkpoint 失败时,任务不会 fail,可容忍3次连续失败env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);// 设置 checkpoint 的并发度为 1env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);return env;}
}
构建kafak source 、sink
/*** 构建 source kafka** @param parameterTool* @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer<java.lang.String>* @date 2023/8/4 - 2:41 PM*/private static FlinkKafkaConsumer<String> buildSourceKafka(ParameterTool parameterTool){Properties props = KafkaConfigUtil.buildSourceKafkaProps(parameterTool);// 正则表达式消费FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(Pattern.compile(parameterTool.get(PropertiesConstants.KAFKA_SOURCE_TOPIC)),new SimpleStringSchema(),props);kafkaConsumer.setCommitOffsetsOnCheckpoints(true);// 从最开始的位置开始消费if(parameterTool.getBoolean(PropertiesConstants.KAFKA_START_FROM_FIRST, false)){kafkaConsumer.setStartFromEarliest();}else{kafkaConsumer.setStartFromGroupOffsets();}return kafkaConsumer;}/*** 构建 sink kafka** @param parameterTool* @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<com.alibaba.fastjson.JSONObject>* @date 2023/8/16 - 11:38 AM*/private static FlinkKafkaProducer<JSONObject> buildSinkKafka(ParameterTool parameterTool){Properties props = KafkaConfigUtil.buildSinkKafkaProps(parameterTool);return new FlinkKafkaProducer<>(parameterTool.get(PropertiesConstants.KAFKA_SINK_DEFAULT_TOPIC), (KafkaSerializationSchema<JSONObject>) (element, timestamp) ->new ProducerRecord<>(element.getString(BaseConstants.PARAM_LOG_TYPE), element.toJSONString().getBytes()),props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);}
kafka 工具类
public class KafkaConfigUtil {/*** 设置 kafka 配置** @param parameterTool* @return java.util.Properties* @date 2023/8/4 - 2:39 PM*/public static Properties buildSourceKafkaProps(ParameterTool parameterTool) {Properties props = parameterTool.getProperties();props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));props.put("group.id", parameterTool.get(PropertiesConstants.KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));props.put("flink.partition-discovery.interval-millis", "10000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");//0817 - 消费kafka数据超时时间和尝试次数props.put("request.timeout.ms", "30000");props.put("retries", 5);return props;}/*** 构建 sink kafka 配置** @param parameterTool* @return java.util.Properties* @date 2023/8/14 - 5:54 PM*/public static Properties buildSinkKafkaProps(ParameterTool parameterTool) {Properties props = parameterTool.getProperties();props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_SINK_BROKERS));props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.setProperty(ProducerConfig.RETRIES_CONFIG, "5");props.put(ProducerConfig.ACKS_CONFIG, "1");props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");return props;}}
jdbc 工具类
public class JdbcDatasourceUtils {public static volatile Map<String, HikariDataSource> DATASOURCES = new ConcurrentHashMap<>();/*** 获取hikari数据库链接池** @param jdbcUrl* @param dsUname* @param dsPwd* @param dsDriver* @return com.zaxxer.hikari.HikariDataSource* @date 2023/8/9 - 2:23 PM*/public static HikariDataSource getHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {String md5Key = Md5Util.encrypt(jdbcUrl + " " + dsUname + " " + dsPwd + " " + dsDriver);if (!DATASOURCES.containsKey(md5Key)) {synchronized (JdbcDatasourceUtils.class) {if (!DATASOURCES.containsKey(md5Key)) {DATASOURCES.put(md5Key, createHikariDataSource(jdbcUrl, dsUname, dsPwd, dsDriver));}}}return DATASOURCES.get(md5Key);}/*** 构建hikari数据库链接池** @param jdbcUrl* @param dsUname* @param dsPwd* @param dsDriver* @return com.zaxxer.hikari.HikariDataSource* @date 2023/8/9 - 2:14 PM*/private static HikariDataSource createHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {HikariConfig config = new HikariConfig();config.setJdbcUrl(jdbcUrl);config.setUsername(dsUname);config.setPassword(dsPwd);config.setDriverClassName(dsDriver);// 从池返回的连接的默认自动提交,默认值:trueconfig.setAutoCommit(true);//只读config.setReadOnly(true);// 连接超时时间:毫秒,默认值30秒config.setConnectionTimeout(10000);// 最大连接数config.setMaximumPoolSize(32);// 最小空闲连接config.setMinimumIdle(16);// 空闲连接超时时间config.setIdleTimeout(600000);// 连接最大存活时间config.setMaxLifetime(540000);// 连接测试查询config.setConnectionTestQuery("SELECT 1");return new HikariDataSource(config);}/*** 按列加载数据** @param dataSource* @param sql* @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>* @date 2023/8/15 - 6:03 PM*/public static List<Map<String, Object>> loadDatas(HikariDataSource dataSource, String sql) {return loadSql(dataSource, sql, resultSet -> {List<Map<String, Object>> datas = new ArrayList<>();try {if (null == resultSet){return datas;}ResultSetMetaData metaData = resultSet.getMetaData();//组装返回值Map<String, Object> entry;while (resultSet.next()) {entry = new LinkedHashMap<>();// getColumnLabel 取重命名,getColumnName 原始字段名for (int i = 1; i <= metaData.getColumnCount(); i++) {entry.put(metaData.getColumnLabel(i), resultSet.getObject(i));}datas.add(entry);}} catch (Exception e) {e.printStackTrace();}return datas;});}/*** 加载数据遍历放入set集合** @param dataSource* @param sql* @param function* @return java.util.Set<R>* @date 2023/8/15 - 6:03 PM*/public static <R> Set<R> loadSetDatas(HikariDataSource dataSource, String sql, Function<Object, R> function) {return loadSql(dataSource, sql, resultSet -> {Set<R> datas = new LinkedHashSet<>();try {if (null == resultSet){return datas;}ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {for (int i = 1; i <= metaData.getColumnCount(); i++) {datas.add(function.apply(resultSet.getObject(i)));}}} catch (Exception e) {e.printStackTrace();}return datas;});}/*** 执行查询sql** @param dataSource* @param sql* @param function* @return R* @date 2023/8/15 - 6:03 PM*/private static <R> R loadSql(HikariDataSource dataSource, String sql, Function<ResultSet, R> function) {Connection connection = null;PreparedStatement preparedStatement = null;ResultSet resultSet = null;try {connection = dataSource.getConnection();preparedStatement = connection.prepareStatement(sql);resultSet = preparedStatement.executeQuery();return function.apply(resultSet);} catch (Exception e){e.printStackTrace();} finally {if (connection != null){try {connection.close();} catch (SQLException e) {e.printStackTrace();}}if (preparedStatement != null){try {preparedStatement.close();} catch (SQLException e) {e.printStackTrace();}}if (resultSet != null){try {resultSet.close();} catch (SQLException e) {e.printStackTrace();}}}return function.apply(null);}
}
相关文章:
Flink java 工具类
flink 环境构建工具类 public class ExecutionEnvUtil {/*** 从配置文件中读取配置(生效优先级:配置文件<命令行参数<系统参数)** param args* return org.apache.flink.api.java.utils.ParameterTool* date 2023/8/4 - 10:05 AM*/public static …...
2023年你需要知道的最佳预算Wi-Fi路由器清单
买新路由器?让我们帮助你挑选一些既有很多功能和性能,又经济实惠的产品。 购买Wi-Fi路由器并不一定要倾家荡产,尤其是如果你不需要一个提供数百Mbps速度的路由器。廉价路由器是一个很好的选择,它包含了许多功能,不会对钱包造成影响。 一、2023年在廉价Wi-Fi路由器中寻找…...
Go语言基础之流程控制
流程控制是每种编程语言控制逻辑走向和执行次序的重要部分,流程控制可以说是一门语言的“经脉”。 Go语言中最常用的流程控制有if和for,而switch和goto主要是为了简化代码、降低重复代码而生的结构,属于扩展类的流程控制。 if else(分支结构…...
Git 安装、配置并把项目托管到码云 Gitee
错误聚集篇: 由于我 git 碰见大量错误,所以集合了一下: git 把项目托管到 码云出现的错误集合_打不着的大喇叭的博客-CSDN博客https://blog.csdn.net/weixin_49931650/article/details/132460492 1、安装 git 1.1 安装步骤 1.1.1 下载对应…...
C++信息学奥赛1147:最高分数的学生姓名
#include <iostream> #include <string> using namespace std; int main() {int n;// 输入一个整数ncin>>n;cin.ignore();string arr;string str;int max0;int fen;// 循环读取n个评分和对应的字符串for(int i0;i<n;i){cin>>fen>>arr;if(fen&…...
STM32使用PID调速
STM32使用PID调速 PID原理 PID算法是一种闭环控制系统中常用的算法,它结合了比例(P)、积分(I)和微分(D)三个环节,以实现对系统的控制。它的目的是使 控制系统的输出值尽可能接近预…...
【UE5:CesiumForUnreal】——3DTiles数据属性查询和单体高亮
目录 0.1 效果展示 0.2 实现步骤 1 数据准备 2 属性查询 2.1 射线检测 2.2 获取FeatureID 2.3 属性查询 2.4 属性显示 3 单体高亮 3.1 构建材质参数集 3.2 材质参数设置 3.3 添加Cesium Encode Metadata插件 3.4 从纹理中取出特定FeatureId属性信息 3.5 创建…...
无涯教程-PHP - 返回类型声明
在PHP 7中,引入了一个新函数返回类型声明,返回类型声明指定函数应返回的值的类型,可以声明返回类型的以下类型。 intfloatbooleanstringinterfacesarraycallable 有效返回类型 <?phpdeclare(strict_types1);function returnIntValue(i…...
DOS常见命令
DOS常见命令 DOS是什么如何打开DOScmd常见的命令集合 DOS是什么 DOC命令是我们浏览器中的终端 ,但不同的是我们打开软件的方式 使用的是点击文件图标,点击图标的同时 我们也相当于使用一个命令 只是我们看不见而已 在电脑上操作的时候 通常都是使用命令…...
Qt应用开发(拓展篇)——示波器/图表 QCustomPlot
一、介绍 QCustomPlot是一个用于绘图和数据可视化的Qt C小部件。它没有进一步的依赖关系,提供友好的文档帮助。这个绘图库专注于制作好看的,出版质量的2D绘图,图形和图表,以及为实时可视化应用程序提供高性能。 QCustomPl…...
【精度丢失】后端接口返回的Long类型参数,不同浏览器解析出的结果不一样
1、业务背景 有个同事找我帮他看一个问题,他给前端提供了一个接口。 这个接口是用来反查id的,他这里这个参数正常的返回值应该是 283232039247028226。 但前端反馈他,前端在浏览器(火狐)获取的值是 283232039247028…...
2023年国赛 高教社杯数学建模思路 - 案例:感知机原理剖析及实现
文章目录 1 感知机的直观理解2 感知机的数学角度3 代码实现 4 建模资料 # 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 感知机的直观理解 感知机应该属于机器学习算法中最简单的一种算法,其…...
java-红黑树
节点内部存储 红黑树规则 或者: 红黑树添加节点规则: 添加节点默认是红色的(效率高) 红黑树示例 注:红黑树增删改查性能都很好...
vue2 vue中的常用指令
一、为什么要学习Vue 1.前端必备技能 2.岗位多,绝大互联网公司都在使用Vue 3.提高开发效率 4.高薪必备技能(Vue2Vue3) 二、什么是Vue 概念:Vue (读音 /vjuː/,类似于 view) 是一套 **构建用户界面 ** 的 渐进式 …...
AI驱动下的智能制造:工业自动化的新纪元
随着人工智能(AI)技术的持续进步,其在工业自动化领域的影响日益显著。作为现代科技的代表,AI不仅为各行业带来了前所未有的商机和技术思路,更在工业自动化领域中引发了一场深刻的变革。本文将深入探讨AI对智能制造的影…...
docker 命令
一、docker命令 1、镜像保存 docker save imageid -o modelzoozl.tar #把镜像保存到本地 docker load -i dockername #把tar包load下来,load成镜像 docker export CONTAINERID/CONTAINERNAME -o modelzoozl.tar #把启动着的镜像导出 docker import modelzo…...
2023年高教社杯数学建模思路 - 复盘:光照强度计算的优化模型
文章目录 0 赛题思路1 问题要求2 假设约定3 符号约定4 建立模型5 模型求解6 实现代码 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 问题要求 现在已知一个教室长为15米,宽为12米&…...
生成式人工智能的潜在有害影响与未来之路(二)
利润高于隐私:不透明数据收集增加 背景和风险 生成型人工智能工具建立在各种大型、复杂的机器学习模型之上,这些模型需要大量的训练数据才能发挥作用。对于像ChatGPT这样的工具,数据包括从互联网上抓取的文本。对于像Lensa或Stable Diffusi…...
如何自己实现一个丝滑的流程图绘制工具(三)自定义挂载vue组件
背景 bpmn-js是个流程图绘制的工具,但是现在我希望实现的是,绘制的不是节点而是一个vue组件。 保留线的拖拽和连接。 方案 那就说明不是依赖于节点的样式,找到了他有个属性,就是类似覆盖节点的操作。 思路就是用vue组件做遮罩&…...
UNIAPP调用API接口
API:开发者可以通过这些接口与其它程序进行交互,获取所需数据或者执行指定操作。 网络请求 API: UniApp 中内置了网络请求 API,方便调用 uni.request uni.uploadFile uni.request 接口主要用于实现网络请求。GET 和 POST 是使用最普遍的两种…...
springboot 百货中心供应链管理系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,百货中心供应链管理系统被用户普遍使用,为方…...
【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...
React Native 开发环境搭建(全平台详解)
React Native 开发环境搭建(全平台详解) 在开始使用 React Native 开发移动应用之前,正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南,涵盖 macOS 和 Windows 平台的配置步骤,如何在 Android 和 iOS…...
安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件
在选煤厂、化工厂、钢铁厂等过程生产型企业,其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进,需提前预防假检、错检、漏检,推动智慧生产运维系统数据的流动和现场赋能应用。同时,…...
家政维修平台实战20:权限设计
目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系,主要是分成几个表,用户表我们是记录用户的基础信息,包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题,不同的角色…...
工程地质软件市场:发展现状、趋势与策略建议
一、引言 在工程建设领域,准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具,正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...
GC1808高性能24位立体声音频ADC芯片解析
1. 芯片概述 GC1808是一款24位立体声音频模数转换器(ADC),支持8kHz~96kHz采样率,集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器,适用于高保真音频采集场景。 2. 核心特性 高精度:24位分辨率,…...
【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习
禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...
代码规范和架构【立芯理论一】(2025.06.08)
1、代码规范的目标 代码简洁精炼、美观,可持续性好高效率高复用,可移植性好高内聚,低耦合没有冗余规范性,代码有规可循,可以看出自己当时的思考过程特殊排版,特殊语法,特殊指令,必须…...
比较数据迁移后MySQL数据库和OceanBase数据仓库中的表
设计一个MySQL数据库和OceanBase数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较…...
