当前位置: 首页 > news >正文

Flink 维表关联

1、实时查询维表

实时查询维表是指用户在 Flink 算子中直接访问外部数据库,比如用 MySQL 来进行关联,这种方式是同步方式,数据保证是最新的。但是,当我们的流计算数据过大,会对外 部系统带来巨大的访问压力,一旦出现比如连接失败、线程池满等情况,由于我们是同步调用,所以一般会导致线程阻塞、Task 等待数据返回,影响整体任务的吞吐量。而且这种方案对外部系统的 QPS 要求较高,在大数据实时计算场景下,QPS 远远高于普通的后台系统,峰值高达十万到几十万,整体作业瓶颈转移到外部系统


public class DimSync  extends RichMapFunction<fplOverview, String> {private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);private Connection conn = null;public void open(Configuration parameters) throws Exception {super.open(parameters);conn = DriverManager.getConnection("jdbc:test:3306/mysqldb?characterEncoding=UTF-8", "root", "qyllt1314#");}@Overridepublic String map(fplOverview fplOverview) throws Exception {JSONObject jsonObject = JSONObject.parseObject(fplOverview.toJson());String dp_id = jsonObject.getString("dp_id");//根据 dp_id 查询  上周的 fpl_amount,ywPreparedStatement pst = conn.prepareStatement("select  max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +"from fpl_overview \n" +"where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +"and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +"and dp_id = ? \n" +"group by dp_id");pst.setString(1,dp_id);ResultSet resultSet = pst.executeQuery();String fpl_amount = null;String yw = null ;while (resultSet.next()){fpl_amount = resultSet.getString(1);yw = resultSet.getString(2);}pst.close();jsonObject.put("lastweek_fpl_amount",fpl_amount);jsonObject.put("lastweek_yw",yw)return jsonObject.toString();}public void close() throws Exception {super.close();conn.close();}

2、LRU 缓存 (flink 异步Id)

利用 Flink 的 RichAsyncFunction 读取 mysql 的数据到缓存中,我们在关联维度表时先去查询缓存,如果缓存中不存在这条数据,就利用客户端去查询 mysql,然后插入到缓存中。


public class JDBCAsyncFunction  extends RichAsyncFunction<fplOverview, JsonObject> {private SQLClient client;@Overridepublic void open(Configuration parameters) throws Exception {Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(10).setEventLoopPoolSize(10));JsonObject config = new JsonObject().put("url", "jdbc:mysql://rm-bp161be65d56kbt4nzo.mysql.rds.aliyuncs.com:3306/mysqldb?characterEncoding=UTF-8;useSSL=false").put("driver_class", "com.mysql.cj.jdbc.Driver").put("max_pool_size", 10).put("user", "root").put("password", "");client = JDBCClient.createShared(vertx, config);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(fplOverview fplOverview, ResultFuture<JsonObject> resultFuture) throws Exception {client.getConnection(conn -> {if (conn.failed()) {return;}final SQLConnection connection = conn.result();// 执行sqlconnection.query("select  max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +"from fpl_overview \n" +"where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +"and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +"and dp_id = '" + fplOverview.getDp_id() + " ' " +"group by dp_id ", res2 -> {ResultSet rs = new ResultSet();if (res2.succeeded()) {rs = res2.result();}else{System.out.println("查询数据库出错");}List<JsonObject> stores = new ArrayList<>();for (JsonObject json : rs.getRows()) {stores.add(json);}connection.close();resultFuture.complete(stores);});});}}

3、预加载全量mysql数据

预加载全量mysql数据 使用 ScheduledExecutorService 每隔 5 分钟拉取一次维表数据,这种方式适用于那些实时场景不是很高,维表数据较小的场景

public class WholeLoad extends RichMapFunction<fplOverview,String> {private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);// 定义map的结果,key为关联字段private static Map<String,String> cache ;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);cache = new HashMap<>();ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);executor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {load();} catch (Exception e) {e.printStackTrace();}}},0,5, TimeUnit.MINUTES); //从现在开始每隔5分钟查询数据}@Overridepublic String map(fplOverview fplOverview) throws Exception {JSONObject jsonObject = JSONObject.parseObject(fplOverview.toJson());String dp_id = jsonObject.getString("dp_id");// 获取对应id的结果String rs = cache.get(dp_id);JSONObject rsObject = JSONObject.parseObject(rs);jsonObject.putAll(rsObject);return jsonObject.toString();}public   void  load() throws Exception {Class.forName("com.mysql.jdbc.Driver");Connection con = DriverManager.getConnection("jdbc:mysql://test:3306/mysqldb?characterEncoding=UTF-8", "root", "qyllt1314#");// 执行查询的SQLPreparedStatement statement = con.prepareStatement("select  dp_id,max(fpl_amount) as fpl_amount,max(yearweek(datatime)) as yw \n" +"from fpl_overview \n" +"where datatime >= date_sub(curdate(), interval weekday(curdate()) + 7 Day)  # 上周第一天\n" +"and datatime < date_sub(curdate(), interval weekday(curdate()) + 0 Day)  # 上周最后一天+1 \n" +"group by dp_id");ResultSet rs = statement.executeQuery();while (rs.next()) {// 查询结果放入缓存String dp_id = rs.getString("dp_id");String fpl_amount = rs.getString("fpl_amount");String yw = rs.getString("yw");JSONObject jsonObject = JSONObject.parseObject("{}");jsonObject.put("lastweek_fpl_amount",fpl_amount);jsonObject.put("lastweek_yw",yw);cache.put(dp_id,jsonObject.toString());}System.out.println("数据输出测试:"+cache.toString());con.close();}
}

相关文章:

Flink 维表关联

1、实时查询维表 实时查询维表是指用户在 Flink 算子中直接访问外部数据库&#xff0c;比如用 MySQL 来进行关联&#xff0c;这种方式是同步方式&#xff0c;数据保证是最新的。但是&#xff0c;当我们的流计算数据过大&#xff0c;会对外 部系统带来巨大的访问压力&#xff0…...

阳光蟹场小程序的盈利模式与思考深度

随着移动互联网的快速发展&#xff0c;小程序成为了各行各业进行数字化转型的重要工具之一。阳光蟹场小程序作为一款专为蟹场管理和销售提供支持的移动&#xff0c;其盈利模式也备受关注。本文将从阳光蟹场小程序的盈利途径、商业模式和对蟹场管理的影响等方面&#xff0c;深入…...

2-Java进阶知识总结-7-UDP-TCP

文章目录 网络编程概述网络编程三要素--IP地址IP地址--概念&#xff08;IP&#xff1a;Internet Protocol&#xff09;IP地址--分类IP地址--特殊的地址&#xff1a;127.0.0.1IP地址获取--DOS命令IP地址获取--InetAddress类 网络编程三要素--端口端口--概念端口号 网络编程三要素…...

C++数据结构X篇_19_排序基本概念及冒泡排序(重点是核心代码,冒泡是稳定的排序)

文章目录 1. 排序基本概念2. 冒泡排序2.1 核心代码2.2 冒泡排序代码2.3 查看冒泡排序的时间消耗2.4 冒泡排序改进版减小时间消耗 1. 排序基本概念 现实生活中排序很重要&#xff0c;例如:淘宝按条件搜索的结果展示等。 概念 排序是计算机内经常进行的一种操作&#xff0c;其目…...

工作:三菱伺服驱动器连接参数及其电机钢性参数配置与调整

工作&#xff1a;三菱伺服驱动器参数及电机钢性参数配置与调整 一、三菱PLC与伺服驱动器连接参数的设置 1. 伺服配置 单个JET伺服从站链接侧占用点数:Rx/Ry占用64点、RWw/RWr占用32点 图中配置了22个JET伺服从站&#xff0c;占用点数:Rx/Ry占用64222048‬点、RWw/RWr占用322…...

企事业单位/公司电脑文件透明加密保护 | 防泄密软件\系统!

推荐——「天锐绿盾电脑文件防泄密系统」 一款全面的企业/公司数据透明加密防泄密系统&#xff0c;旨在从源头上保障数据的安全和使用安全。 PC访问地址&#xff1a; https://isite.baidu.com/site/wjz012xr/2eae091d-1b97-4276-90bc-6757c5dfedee 它具有以下特点&#xff1a…...

[Leetcode] 0101. 对称二叉树

101. 对称二叉树 题目描述 给你一个二叉树的根节点 root &#xff0c; 检查它是否轴对称。 示例 1&#xff1a; 输入&#xff1a;root [1,2,2,3,4,4,3] 输出&#xff1a;true示例 2&#xff1a; 输入&#xff1a;root [1,2,2,null,3,null,3] 输出&#xff1a;false提示&#…...

.NET、VUE利用RSA加密完成登录并且发放JWT令牌设置权限访问

后端生成公钥私钥 使用RSA.ToXmlString(Boolean) 方法生成公钥以及私钥。 RSACryptoServiceProvider rSA new(); string pubKey rSA.ToXmlString(false);//公钥 string priKey rSA.ToXmlString(true);//私钥 后端将生成的公钥发送给前端 创建一个get请求&#xff0c;将…...

go实现文件的读写

读文件 1.ioutil.ReadFile package mainimport ("fmt""io/ioutil" )func main() {filePath : "example.txt"data, err : ioutil.ReadFile(filePath)if err ! nil {fmt.Printf("无法读取文件&#xff1a;%v\n", err)return}fmt.Print…...

基于 nodejs+vue购物网站设计系统mysql

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…...

Mysql数据库 4.SQL语言 DQL数据操纵语言 查询

DQL数据查询语言 从数据表中提取满足特定条件的记录 1.单表查询 2.多表查询 查询基础语法 select 关键字后指定要查询到的记录的哪些列 语法&#xff1a;select 列名&#xff08;字段名&#xff09;/某几列/全部列 from 表名 [具体条件]&#xff1b; select colnumName…...

threejs(3)-详解材质与纹理

一、Matcap(MeshMatcapMaterial)材质原理与应用 Matcap是一张含有光照信息的贴图&#xff0c;通常是直接截取材质球截图来使用。因此Matcap可以很好的模拟静止光源下的光照效果。 最直接的方式就是直接使用在View空间下的模型法向量的xy分量去采样Matcap。 另外还有一种常见…...

10月最新H5自适应樱花导航网站源码SEO增强版

10月最新H5自适应樱花导航网源码SEO增强版。非常强大的导航网站亮点就是对SEO优化比较好。 开发时PHP版本&#xff1a;7.3开发时MySQL版本&#xff1a;5.7.26 懂前端和PHP技术想更改前端页面的可以看&#xff1a;网站的前端页面不好看&#xff0c;你可以查看index目录&#x…...

探索SOCKS5与SK5代理在现代网络环境中的应用

随着互联网技术的飞速发展&#xff0c;网络安全成为了不容忽视的重要议题。其中&#xff0c;网络代理技术作为一种重要的网络安全手段&#xff0c;以其独特的功能和优势在网络安全领域占据了重要的位置。本文将探讨两种常见的代理技术&#xff1a;SOCKS5代理和SK5代理&#xff…...

有六家机器视觉公司今年11月份初放假到明年春节后,除夕不放假看住企业不跑路,不倒闭,明年大家日子会越来越甜

不幸的消息一个接着一个&#xff0c;请大家注意下面的消息 我已经收到已经有6家机器视觉公司今年11月份初放假到明年春节后&#xff0c;他们真的没有订单了&#xff0c;其中4家宣布员工可以自行寻找工作&#xff0c;今年除夕不放假是经济下行经济考量吗&#xff1f;看住企业不…...

【Linux】MAC帧协议 + ARP协议

文章目录 &#x1f4d6; 前言1. 数据链路层2. MAC帧格式3. 再谈局域网4. ARP协议4.1 路由器的转发过程&#xff1a;4.2 ARP协议格式&#xff1a; 5. 如何获得目的MAC地址 &#x1f4d6; 前言 在学完网络层IP协议之后&#xff0c;本章我们将继续向下沉一层&#xff0c;进入到数…...

深入理解指针:【探索指针的高级概念和应用一】

目录 前言&#xff1a; 1. 字符指针 2. 指针数组 3.数组指针 3.1数组指针的定义 3.2 &数组名VS数组名 3.3数组指针的使用 前言&#xff1a; &#x1f342;在了解今天的内容之前我们先复习一下指针的基本概念&#xff1a; 1&#xff0c;内存单元是有编号的&#xff…...

Leetcode周赛365补题(3 / 3)

目录 1、2、有序三元组的最大值 - 预处理前后最大值 遍历 &#xff08;1&#xff09;预处理前后值遍历&#xff08;枚举j&#xff09; &#xff08;2&#xff09;枚举k 2、无限数组的最短子数组 - 前缀和 滑动窗口 1、2、有序三元组的最大值 - 预处理前后最大值 遍历 …...

Python基础入门例程13-NP13 格式化输出(三)

目录 描述 输入描述&#xff1a; 输出描述&#xff1a; 示例1 解答&#xff1a; 1&#xff09;第一种strip函数 2&#xff09;先删除左边&#xff0c;再删除右边的空格&#xff0c;使用.lstrip函数和 .rstrip函数 3) 使用replace函数 4)使用split和join函数&#xff0c…...

Vue快速入门

一、概述 1.是一套前端框架&#xff0c;可免除原生JavaScript中的DOM操作&#xff0c;基于MVVM思想&#xff0c;实现数据双向绑定。 实现由MVC——>MVVM的转换 二、入门 1.新建HTML页面&#xff0c;引入Vue.js文件 2.在JS代码区&#xff0c;创建Vue核心对象&#xff0c;进行…...

MySQL - 如何判断一行扫描数?

在MySQL中&#xff0c;一行扫描数是在执行查询操作时&#xff0c;需要扫描的行数&#xff0c;以找到与查询条件匹配的行。这个值反映了查询的效率。 MySQL 判断一行扫描数的方法&#xff1a; 索引的使用&#xff1a;MySQL首先会检查查询是否可以使用索引。如果可以&#xff0…...

3682: 【C3】【递推】台阶问题

题目描述 有N级的台阶&#xff0c;你一开始在底部&#xff0c;每次可以向上迈最多K级台阶&#xff08;最少1级&#xff09;&#xff0c;问到达第N级台阶有多少种不同方式。 输入 两个正整数N&#xff0c;K。(N≤100000,K≤100) 输出 一个正整数&#xff0c;为不同方式数&a…...

C++(Qt)软件调试---线程死锁调试(15)

C(Qt)软件调试—线程死锁调试&#xff08;15&#xff09; 文章目录 C(Qt)软件调试---线程死锁调试&#xff08;15&#xff09;1、前言2、常见死锁3、linux下gdb调试C死锁1.1 使用代码1.2 gdb调试 3、linux下gdb调试Qt死锁1.1 使用代码1.2 gdb调试 4、Windows下gdb调试C死锁5、W…...

HugeGraph Hubble 配置 https 协议的操作步骤

背景 HugeGraph 图数据库的 Server 端支持 https 配置&#xff0c;官方文档中有说明相对比较容易&#xff0c;而 Hubble 部署过程都是 http的。 我们有一个应用要嵌入 hubble 页面&#xff0c;而且部署为 https &#xff0c;那么 Hubble 是否支持配置 https 呢&#xff1f;网…...

大型应用的架构演进--spring家族在其中的作用

01 大型应用的架构演进 带来的挑战&#xff1a; 运维与监控 分布式带来的复杂性 接口的调整成本 测试成本 依赖管理成本 02 Spring家族 在我看来&#xff0c;springboot的3大特点(我常用的)&#xff1a;内置的web容器&#xff1b;开箱即用的starter模版&#xff1b;自动配置&…...

LinkedHashMap 简单实现LRU

要使用 LinkedHashMap 来实现LRU&#xff08;最近最少使用&#xff09;缓存&#xff0c;可以设置它的访问顺序为true&#xff0c;以便在每次访问一个元素时&#xff0c;将它移到最后&#xff0c;从而实现LRU的特性。以下是一个简单的Java示例&#xff1a; import java.util.Li…...

mysql字符串函数

函数名 描述 示例 ASCII(s) 返回字符串s的第一个字符的ASCII码 返回CustomerName字段第一个字母的ASCII码&#xff1a; SELECT ASCII(CustomerName) AS NumCodeOfFirstChar FROM Customers; CHAR_LENGTH(s) 返回字符串s的字符数 返回字符串RUNOOB的字符数&#xff1a; …...

【强烈推荐】视频转gif、图片拼gif,嘎嘎好用,免费免费真的免费,亲测有效,无效过来打我

问题描述 最近遇到一个需求是需要将视频生成gif&#xff0c;这个看上去不是很难&#xff0c;所以有了以下的解决办法 解决办法 首先想到的当然是自己写一个&#xff0c;用了两套代码&#xff1a; from moviepy.editor import *# 读取视频文件 video_clip VideoFileClip(&quo…...

C# Onnx Yolov8 Detect 印章 指纹捺印 检测

应用场景 检测文件中的印章和指纹捺印&#xff0c;用于判断文件是否合规&#xff08;是否盖章&#xff0c;是否按印&#xff09; 效果 项目 代码 using Microsoft.ML.OnnxRuntime; using Microsoft.ML.OnnxRuntime.Tensors; using OpenCvSharp; using System; using System.…...

0034【Edabit ★☆☆☆☆☆】【修改Bug4】Buggy Code (Part 4)

0034【Edabit ★☆☆☆☆☆】【修改Bug4】Buggy Code (Part 4) bugs conditions strings Instructions Emmy has written a function that returns a greeting to users. However, she’s in love with Mubashir, and would like to greet him slightly differently. She add…...