flink中使用外部定时器实现定时刷新
背景:
我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢?
外部定时器定时加载实现
1.在open函数中进行定时器的创建和定时加载,这个方法对于所有的RichFunction富函数都适用,包括RichMap,RichFilter,RichSink等,代码如下所示
package wikiedits.schedule;import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExecutorUtils;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class ScheduleRichMapFunction extends RichFlatMapFunction<String, String> {// 定时任务执行器private transient ScheduledExecutorService scheduledExecutorService;// 本地变量private int threshold;@Overridepublic void open(Configuration parameters) throws Exception {// 1.从db查询数据初始化本地变量
// threshold = DBManager.SELECTSQL.getConfig("threshold");// 2.使用定时任务更新本地内存的配置信息以及更新本地变量threshold的值scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();
// for(ConfigEntity entity : configList){ConfigEntityLocalCache.getInstance().update("key", "value");
// }// 2.2 更新本地变量threshold的值
// threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {}@Overridepublic void close() throws Exception {ExecutorUtils.gracefulShutdown(100, TimeUnit.SECONDS, scheduledExecutorService);}}//本地缓存实现
package wikiedits.schedule;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 保存Config信息的本地缓存 ---定时同步DB配置表的数据*/
public class ConfigEntityLocalCache {private static volatile ConfigEntityLocalCache instance = new ConfigEntityLocalCache();/*** 获取本地缓存实例*/public static ConfigEntityLocalCache getInstance() {return instance;}/** 缓存内存配置项 */private static Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();/*** 更新本地缓存数据*/public boolean update(String key, String value){configCache.put(key, value);return true;}/*** 更新本地缓存数据*/public String getByKey(String key){return configCache.getIfPresent(key);}}
2.在静态类中通过static语句块创建定时器并定时加载,代码如下
package wikiedits.schedule;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;/*** 静态类定时加载DB配置表到本地内存中*/
public class StaticLoadUtil {// 定时任务执行器private static transient ScheduledExecutorService scheduledExecutorService;public static final Cache<String, String> configCache =CacheBuilder.newBuilder().initialCapacity(50).maximumSize(500).build();// 通过定时执行器定时同步本地缓存和DB配置表static {scheduledExecutorService = Executors.newScheduledThreadPool(10);scheduledExecutorService.scheduleWithFixedDelay(() -> {// 2.1 定时任务更新本地内存配置项// List<ConfigEntity> configList = DBManager.SELECTSQL.getConfigs();// for(ConfigEntity entity : configList){configCache.put("key", "value");// }// 2.2 更新本地变量threshold的值// threshold = DBManager.SELECTSQL.getConfig("threshold");}, 0, 100, TimeUnit.SECONDS);}/*** 获取本地缓存*/public static Cache<String, String> getConfigCache() {return configCache;}}
总结:
1.外部定时器可以通过在富函数的open中进行初始化并开始定时执行
2.外部定时器也可以通过创建一个单独的静态类,然后在static模块中进行初始化并开始定时执行
相关文章:
flink中使用外部定时器实现定时刷新
背景: 我们经常会使用到比如数据库中的配置表信息,而我们不希望每次都去查询db,那么我们就想定时把db配置表的数据定时加载到flink的本地内存中,那么如何实现呢? 外部定时器定时加载实现 1.在open函数中进行定时器的…...
Spring Cloud Pipelines 入门实践
文章目录 1. 前言2. Spring Cloud Pipelines 是做什么的2.1. 预定义的流程2.2. 集成测试和契约测试2.3.部署策略 4. Spring Cloud Pipelines的使用示例4.1. 创建一个Spring Boot应用4.2. 将代码托管到GitHub仓库4.3. 添加Spring Cloud Pipelines依赖4.4. 配置Spring Cloud Pipe…...
G1 GC详解及设置
一、概述 G1 GC,全称Garbage-First Garbage Collector,在JDK1.7中引入了G1 GC,从JAVA 9开始,G1 GC是默认的GC算法。通过-XX:UseG1GC参数来启用。G1收集器是工作在堆内不同分区上的收集器,分区既可以是年轻代也可以是老…...
GitHub详细教程
将代码推送到GitHub仓库涉及一系列的步骤。以下是详细的步骤说明: 创建一个新的仓库(如果还没有的话): 访问 GitHub。登录您的帐户。点击页面右上角的图标,然后选择“New repository”。填写仓库名称、描述等信息&…...
【小沐学Python】Python实现Web图表功能(Dash)
文章目录 1、简介2、安装3、功能示例3.1 Hello World3.2 连接到数据3.3 可视化数据3.4 控件和回调3.5 设置应用的样式3.5.1 HTML and CSS3.5.2 Dash Design Kit (DDK)3.5.3 Dash Bootstrap Components3.5.4 Dash Mantine Components 4、更多示例4.1 Basic Dashboard4.2 Using C…...
【RabbitMQ】docker rabbitmq集群 docker搭建rabbitmq集群
docker rabbitmq集群 docker搭建rabbitmq集群 RabbitMQ提供了两种常用的集群模式 1.普通集群模式 2.镜像集群模式 普通集群模式只能同步主节点上的交换机和队列信息,但对于队列中的消息不做同步,主节点宕机也不能进行切换(故障转移ÿ…...
Linux 网络驱动实验
本文章对Linux 网络驱动实验中的设备树进行介绍,Linux网络驱动程序比较复杂,只要学会应用。 1、I.MX6ULL 网络外设设备树 I.MX6ULL 有两个 10/100M 的网络 MAC 外设,因此 I.MX6ULL 网络驱动主要就是这两个网络 MAC 外设的驱动。 fec1…...
访问Apache Tomcat的虚拟主机管理页面
介绍 通过Tomcat Host Manager应用可以创建、删除、管理Tomcat内的虚拟主机(virtual hosts)。该应用是Tomcat安装的一部分,默认在<Tomcat安装目录>/webapps/host-manager: 配置用户名、密码、角色 要访问Host Manager应…...
【算法】排序——归并排序和计数排序
主页点击直达:个人主页 我的小仓库:代码仓库 C语言偷着笑:C语言专栏 数据结构挨打小记:初阶数据结构专栏 Linux被操作记:Linux专栏 LeetCode刷题掉发记:LeetCode刷题 算法头疼记:算法专栏…...
discuz封面设置失败的解决办法(centos系统+windows系统)
discuz封面设置失败的解决办法(centos系统windows系统) centos系统:1、开启/var/www/html 这个目录的读写权限chmod -R 777 /var/www/html然后重启httpd:service httpd restart如果discuz论坛发布帖子,还是显示封面设置失败的话…...
AI绘画-Stable Diffusion笔记
软件:Stable Diffusion 视频教程来自 https://www.bilibili.com/video/BV1As4y127HW/?spm_id_from333.337.search-card.all.click 提示词 提示词类别 内容型提示词 人物主题特征: 服饰穿搭:white dress 发型发色:blonde hair,l…...
中值滤波算法及例程
中值滤波是一种常用的非线性图像滤波算法,它能够有效去除图像中的椒盐噪声(即孤立的亮或暗像素点),同时保持图像边缘和细节的清晰度。中值滤波的主要思想是使用一个滑动窗口,在窗口内对像素值进行排序,并将…...
SpringBoot 如何使用 Ehcache 作为缓存
使用Spring Boot Sleuth进行分布式跟踪 在现代分布式应用程序中,跟踪请求和了解应用程序的性能是至关重要的。Spring Boot Sleuth是一个分布式跟踪解决方案,它可以帮助您在分布式系统中跟踪请求并分析性能问题。本文将介绍如何在Spring Boot应用程序中使…...
Stable Diffusion 图片换脸插件Roop保姆教程 附错误解决办法和API使用
换脸技术已经不是新鲜事物,但如何实现简单、快速、高效的换脸操作呢?Roop插件正是为解决这一问题而生的。 sd-webui-roop 插件适用于已经本地部署了SD的用户。相较于传统的换脸技术,Roop插件几乎不需要训练,只需一张照片,即可在10秒内完成换脸。 但是要注意到是必须注意…...
华为OD机试 - 组成最大数(Java 2023 B卷 100分)
目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出 华为OD机试 2023B卷题库疯狂收录中,刷题点这里 专栏导读 本专栏收录于《华为OD机试(JAVA)真题(A卷B卷)》…...
十一、2023.10.5.计算机网络(end).11
文章目录 17、说说 TCP 可靠性保证?18、简述 TCP 滑动窗口以及重传机制?19、说说滑动窗口过小怎么办?20、说说如果三次握手时候每次握手信息对方没收到会怎么样,分情况介绍?21、简述 TCP 的 TIME_WAIT,为什么需要有这个状态&…...
基于SpringBoot的网上摄影工作室
目录 前言 一、技术栈 二、系统功能介绍 用户信息管理 作品分类管理 轮播图管理 摄影作品管理 摄影作品收藏 摄影圈 摄影作品发布 三、核心代码 1、登录模块 2、文件上传模块 3、代码封装 前言 随着信息技术在管理上越来越深入而广泛的应用,管理信息系统…...
Spring源码解析——IOC之bean 的初始化
正文 一个 bean 经历了 createBeanInstance() 被创建出来,然后又经过一番属性注入,依赖处理,历经千辛万苦,千锤百炼,终于有点儿 bean 实例的样子,能堪大任了,只需要经历最后一步就破茧成蝶了。…...
互联网摸鱼日报(2023-10-07)
互联网摸鱼日报(2023-10-07) 36氪新闻 小米汽车将研发增程式电动车,产品已有规划;LG新能源和丰田汽车北美公司签署电动汽车电池供应协议|36氪新能源日报1005 详解企业数字化转型建设过程中所需的七种能力 电商平台,如何让丰收「…...
深入理解RBAC
RBAC是一种基于角色实现访问控制的权限管理机制,通过定义角色和权限、用户和角色、角色和角色之间的关系,实现多层次、细粒度、可复用的权限管理系统。原文: Role-based Access Control (RBAC) Model[1] Bernard HermantUnsplash Avery Pennarun写的&quo…...
工程地质软件市场:发展现状、趋势与策略建议
一、引言 在工程建设领域,准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具,正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...
将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?
Otsu 是一种自动阈值化方法,用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理,能够自动确定一个阈值,将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...
TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案
一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...
C# 类和继承(抽象类)
抽象类 抽象类是指设计为被继承的类。抽象类只能被用作其他类的基类。 不能创建抽象类的实例。抽象类使用abstract修饰符声明。 抽象类可以包含抽象成员或普通的非抽象成员。抽象类的成员可以是抽象成员和普通带 实现的成员的任意组合。抽象类自己可以派生自另一个抽象类。例…...
智能仓储的未来:自动化、AI与数据分析如何重塑物流中心
当仓库学会“思考”,物流的终极形态正在诞生 想象这样的场景: 凌晨3点,某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径;AI视觉系统在0.1秒内扫描包裹信息;数字孪生平台正模拟次日峰值流量压力…...
力扣-35.搜索插入位置
题目描述 给定一个排序数组和一个目标值,在数组中找到目标值,并返回其索引。如果目标值不存在于数组中,返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...
在web-view 加载的本地及远程HTML中调用uniapp的API及网页和vue页面是如何通讯的?
uni-app 中 Web-view 与 Vue 页面的通讯机制详解 一、Web-view 简介 Web-view 是 uni-app 提供的一个重要组件,用于在原生应用中加载 HTML 页面: 支持加载本地 HTML 文件支持加载远程 HTML 页面实现 Web 与原生的双向通讯可用于嵌入第三方网页或 H5 应…...
sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!
简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求,并检查收到的响应。它以以下模式之一…...
安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖
在Vuzix M400 AR智能眼镜的助力下,卢森堡罗伯特舒曼医院(the Robert Schuman Hospitals, HRS)凭借在无菌制剂生产流程中引入增强现实技术(AR)创新项目,荣获了2024年6月7日由卢森堡医院药剂师协会࿰…...
GitHub 趋势日报 (2025年06月06日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...
