flink中使用外部定时器实现定时刷新
背景:
我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢?
外部定时器定时加载实现
1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,RichSink等,代码如下所示
package wikiedits.schedule;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class ScheduleRichMapFunction extends RichFlatMapFunction<String, String> {// 定时任务执行器private transient ScheduledExecutorService scheduledExecutorService;// 本地变量private int threshold;@Overridepublic void open(Configuration parameters) throws Exception {// 1.从db查询数据初始化本地变量
// threshold = DBManager.SELECTSQL.getConfig("threshold");// 2.使用定时任务更新本地内存的配置信息以及更新本地变量threshold的值scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();
// for(ConfigEntity entity : configList){ConfigEntityLocalCache.getInstance().update("key", "value");
// }// 2.2 更新本地变量threshold的值
// threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {}@Overridepublic void close() throws Exception {ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService);}}//本地缓存实现
package wikiedits.schedule;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 保存Config信息的本地缓存 ---定时同步DB配置表的数据*/
public class ConfigEntityLocalCache {private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache();/*** 获取本地缓存实例*/public static ConfigEntityLocalCache getInstance() {return instance;}/** 缓存内存配置项 */private static Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();/*** 更新本地缓存数据*/public boolean update(String key, String value){configCache.put(key, value);return true;}/*** 更新本地缓存数据*/public String getByKey(String key){return configCache.getIfPresent(key);}}
2.在静态类中通过static语句块创建定时器并定时加载,代码如下
package wikiedits.schedule;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 静态类定时加载DB配置表到本地内存中*/
public class StaticLoadUtil {// 定时任务执行器private static transient ScheduledExecutorService scheduledExecutorService;public static final Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();// 通过定时执行器定时同步本地缓存和DB配置表static {scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();// for(ConfigEntity entity : configList){configCache.put("key", "value");// }// 2.2 更新本地变量threshold的值// threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}/*** 获取本地缓存*/public static Cache<String, String> getConfigCache() {return configCache;}}
总结:
1.外部定时器可以通过在富函数的open中进行初始化并开始定时执行
2.外部定时器也可以通过创建一个单独的静态类,然后在static模块中进行初始化并开始定时执行
相关文章:
flink中使用外部定时器实现定时刷新
背景: 我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢? 外部定时器定时加载实现 1.在open函数中进行定时器的…...
Spring Cloud Pipelines 入门实践
文章目录 1. 前言2. Spring Cloud Pipelines 是做什么的2.1. 预定义的流程2.2. 集成测试和契约测试2.3.部署策略 4. Spring Cloud Pipelines的使用示例4.1. 创建一个Spring Boot应用4.2. 将代码托管到GitHub仓库4.3. 添加Spring Cloud Pipelines依赖4.4. 配置Spring Cloud Pipe…...
G1 GC详解及设置
一、概述 G1 GC,全称Garbage-First Garbage Collector,在JDK1.7中引入了G1 GC,从JAVA 9开始,G1 GC是默认的GC算法。通过-XX:UseG1GC参数来启用。G1收集器是工作在堆内不同分区上的收集器,分区既可以是年轻代也可以是老…...
GitHub详细教程
将代码推送到GitHub仓库涉及一系列的步骤。以下是详细的步骤说明: 创建一个新的仓库(如果还没有的话): 访问 GitHub。登录您的帐户。点击页面右上角的图标,然后选择“New repository”。填写仓库名称、描述等信息&…...
【小沐学Python】Python实现Web图表功能(Dash)
文章目录 1、简介2、安装3、功能示例3.1 Hello World3.2 连接到数据3.3 可视化数据3.4 控件和回调3.5 设置应用的样式3.5.1 HTML and CSS3.5.2 Dash Design Kit (DDK)3.5.3 Dash Bootstrap Components3.5.4 Dash Mantine Components 4、更多示例4.1 Basic Dashboard4.2 Using C…...
【RabbitMQ】docker rabbitmq集群 docker搭建rabbitmq集群
docker rabbitmq集群 docker搭建rabbitmq集群 RabbitMQ提供了两种常用的集群模式 1.普通集群模式 2.镜像集群模式 普通集群模式只能同步主节点上的交换机和队列信息,但对于队列中的消息不做同步,主节点宕机也不能进行切换(故障转移ÿ…...
Linux 网络驱动实验
本文章对Linux 网络驱动实验中的设备树进行介绍,Linux网络驱动程序比较复杂,只要学会应用。 1、I.MX6ULL 网络外设设备树 I.MX6ULL 有两个 10/100M 的网络 MAC 外设,因此 I.MX6ULL 网络驱动主要就是这两个网络 MAC 外设的驱动。 fec1…...
访问Apache Tomcat的虚拟主机管理页面
介绍 通过Tomcat Host Manager应用可以创建、删除、管理Tomcat内的虚拟主机(virtual hosts)。该应用是Tomcat安装的一部分,默认在<Tomcat安装目录>/webapps/host-manager: 配置用户名、密码、角色 要访问Host Manager应…...
【算法】排序——归并排序和计数排序
主页点击直达:个人主页 我的小仓库:代码仓库 C语言偷着笑:C语言专栏 数据结构挨打小记:初阶数据结构专栏 Linux被操作记:Linux专栏 LeetCode刷题掉发记:LeetCode刷题 算法头疼记:算法专栏…...
discuz封面设置失败的解决办法(centos系统+windows系统)
discuz封面设置失败的解决办法(centos系统windows系统) centos系统:1、开启/var/www/html 这个目录的读写权限chmod -R 777 /var/www/html然后重启httpd:service httpd restart如果discuz论坛发布帖子,还是显示封面设置失败的话…...
AI绘画-Stable Diffusion笔记
软件:Stable Diffusion 视频教程来自 https://www.bilibili.com/video/BV1As4y127HW/?spm_id_from333.337.search-card.all.click 提示词 提示词类别 内容型提示词 人物主题特征: 服饰穿搭:white dress 发型发色:blonde hair,l…...
中值滤波算法及例程
中值滤波是一种常用的非线性图像滤波算法,它能够有效去除图像中的椒盐噪声(即孤立的亮或暗像素点),同时保持图像边缘和细节的清晰度。中值滤波的主要思想是使用一个滑动窗口,在窗口内对像素值进行排序,并将…...
SpringBoot 如何使用 Ehcache 作为缓存
使用Spring Boot Sleuth进行分布式跟踪 在现代分布式应用程序中,跟踪请求和了解应用程序的性能是至关重要的。Spring Boot Sleuth是一个分布式跟踪解决方案,它可以帮助您在分布式系统中跟踪请求并分析性能问题。本文将介绍如何在Spring Boot应用程序中使…...
Stable Diffusion 图片换脸插件Roop保姆教程 附错误解决办法和API使用
换脸技术已经不是新鲜事物,但如何实现简单、快速、高效的换脸操作呢?Roop插件正是为解决这一问题而生的。 sd-webui-roop 插件适用于已经本地部署了SD的用户。相较于传统的换脸技术,Roop插件几乎不需要训练,只需一张照片,即可在10秒内完成换脸。 但是要注意到是必须注意…...
华为OD机试 - 组成最大数(Java 2023 B卷 100分)
目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出 华为OD机试 2023B卷题库疯狂收录中,刷题点这里 专栏导读 本专栏收录于《华为OD机试(JAVA)真题(A卷B卷)》…...
十一、2023.10.5.计算机网络(end).11
文章目录 17、说说 TCP 可靠性保证?18、简述 TCP 滑动窗口以及重传机制?19、说说滑动窗口过小怎么办?20、说说如果三次握手时候每次握手信息对方没收到会怎么样,分情况介绍?21、简述 TCP 的 TIME_WAIT,为什么需要有这个状态&…...
基于SpringBoot的网上摄影工作室
目录 前言 一、技术栈 二、系统功能介绍 用户信息管理 作品分类管理 轮播图管理 摄影作品管理 摄影作品收藏 摄影圈 摄影作品发布 三、核心代码 1、登录模块 2、文件上传模块 3、代码封装 前言 随着信息技术在管理上越来越深入而广泛的应用,管理信息系统…...
Spring源码解析——IOC之bean 的初始化
正文 一个 bean 经历了 createBeanInstance() 被创建出来,然后又经过一番属性注入,依赖处理,历经千辛万苦,千锤百炼,终于有点儿 bean 实例的样子,能堪大任了,只需要经历最后一步就破茧成蝶了。…...
互联网摸鱼日报(2023-10-07)
互联网摸鱼日报(2023-10-07) 36氪新闻 小米汽车将研发增程式电动车,产品已有规划;LG新能源和丰田汽车北美公司签署电动汽车电池供应协议|36氪新能源日报1005 详解企业数字化转型建设过程中所需的七种能力 电商平台,如何让丰收「…...
深入理解RBAC
RBAC是一种基于角色实现访问控制的权限管理机制,通过定义角色和权限、用户和角色、角色和角色之间的关系,实现多层次、细粒度、可复用的权限管理系统。原文: Role-based Access Control (RBAC) Model[1] Bernard HermantUnsplash Avery Pennarun写的&quo…...
阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
ESP32读取DHT11温湿度数据
芯片:ESP32 环境:Arduino 一、安装DHT11传感器库 红框的库,别安装错了 二、代码 注意,DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...
条件运算符
C中的三目运算符(也称条件运算符,英文:ternary operator)是一种简洁的条件选择语句,语法如下: 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true,则整个表达式的结果为“表达式1”…...
RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文全面剖析RNN核心原理,深入讲解梯度消失/爆炸问题,并通过LSTM/GRU结构实现解决方案,提供时间序列预测和文本生成…...
云原生玩法三问:构建自定义开发环境
云原生玩法三问:构建自定义开发环境 引言 临时运维一个古董项目,无文档,无环境,无交接人,俗称三无。 运行设备的环境老,本地环境版本高,ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...
【Java学习笔记】BigInteger 和 BigDecimal 类
BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点:传参类型必须是类对象 一、BigInteger 1. 作用:适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...
JVM 内存结构 详解
内存结构 运行时数据区: Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器: 线程私有,程序控制流的指示器,分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 每个线程都有一个程序计数…...
【Go语言基础【12】】指针:声明、取地址、解引用
文章目录 零、概述:指针 vs. 引用(类比其他语言)一、指针基础概念二、指针声明与初始化三、指针操作符1. &:取地址(拿到内存地址)2. *:解引用(拿到值) 四、空指针&am…...
CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝
目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为:一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...
