当前位置: 首页 > news >正文

flink中使用异步函数的几个注意事项

背景

在flink系统中,我们为了补充某个流事件成一个完整的记录,经常需要调用外部接口获取一些配置数据,流事件结合这些配置数据就可以组合成一条完整的记录,然而如果同步调用外部系统接口来实现,那么会有很大的性能瓶颈,这种情况下我们一般会使用异步函数提高性能,本文就来记录下使用异步函数的几个注意事项

异步函数的使用

首先看一下官方的例子:

/*** 实现 'AsyncFunction' 用于发送请求和设置回调。*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {/** 能够利用回调函数并发发送请求的数据库客户端 */private transient DatabaseClient client;@Overridepublic void open(Configuration parameters) throws Exception {client = new DatabaseClient(host, post, credentials);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {// 发送异步请求,接收 future 结果final Future<String> result = client.query(key);// 设置客户端完成请求后要执行的回调函数// 回调函数只是简单地把结果发给 futureCompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return result.get();} catch (InterruptedException | ExecutionException e) {// 显示地处理异常。return null;}}}).thenAccept( (String dbResult) -> {resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));});}
}// 创建初始 DataStream
DataStream<String> stream = ...;// 应用异步 I/O 转换操作
DataStream<Tuple2<String, String>> resultStream =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

注意事项如下:
1.在asyncinvoke方法中不能有阻塞的操作,比如这里仅仅是使用Future.thenAccept注册一个回调返回后的处理逻辑,而不会使用Future.get方法进行阻塞操作
2.AsyncDataStream.orderWait和AsyncDataStream.unorderWait方法都能正确的事件时间,也就是说即使是AsyncDataStream.unorderWait,它也能保证记录不会被之后的水位线超越
3.异步函数可以和检查点机制进行集成,也就是那些正在等待响应结果的记录会被写入检查点中,当故障恢复后,可以重新发送请求
4.如果服务端没有提供异步的客户端,我们可以用多线程进行模拟,只要多线程返回future对象即可
5.使用AsyncDataStream可以限制并发数以及如何进行超时处理等

相关文章:

flink中使用异步函数的几个注意事项

背景 在flink系统中&#xff0c;我们为了补充某个流事件成一个完整的记录&#xff0c;经常需要调用外部接口获取一些配置数据&#xff0c;流事件结合这些配置数据就可以组合成一条完整的记录&#xff0c;然而如果同步调用外部系统接口来实现&#xff0c;那么会有很大的性能瓶颈…...

QML之Repeater 控件使用

Repeater 控件是 重复作用 根据 model中的index 数量进行重复 废话不说 直接看如何用 当model 为数字时 Rectangle{height: 1200width: 500visible: trueanchors.fill: parentColumn{spacing: 20Repeater{model: 10delegate: Rectangle{width: 60height: 20color: index%2 …...

哈希树讲解

哈希树(HashTree)是哈希(Hash)算法的一种延续。传统数据结构中对如何避免哈希冲突都有一定的描述和解释&#xff0c;但是这些描述和解释都是泛泛而谈&#xff0c;并没有提出比较好的解决方案。这里所提到的哈希树(HashTree)算法就是要提供一种在理论上和实际应用中均能有效地处…...

vue 项目启动后一直不断的刷新停不下来

新建的vue 项目&#xff0c;配置了代理后项目一直刷新&#xff0c;停不下来&#xff0c;各种查找最后发现是vue.config.js 中的热更新配置项目开启的原因 const {defineConfig } require(vue/cli-service) const AutoImport require(unplugin-auto-import/webpack) const Co…...

makesense在线yolov5标注

文章目录 一、创建图片文件夹和label.txt二、在线标注数据 参考文章博主&#xff1a;风吹落叶花飘荡 一、创建图片文件夹和label.txt 创建一个放置图片的文件夹images&#xff0c;存放需要标注的图片&#xff08;图片最好重命名为1,2,3…避免后面混淆&#xff09; 创建label.t…...

python 之 矩阵相关操作

文章目录 1. **创建矩阵**&#xff1a;2. **矩阵加法**&#xff1a;3. **矩阵乘法**&#xff1a;4. **矩阵转置**&#xff1a;5. **元素级操作**&#xff1a;6. **汇总统计**&#xff1a;7. **逻辑操作**&#xff1a; 理解你的需求&#xff0c;我将为每个功能写一个单独的代码块…...

敢问路在何方

从2022年进入软件行业到现在&#xff0c;二十二年眨眼之间过去了&#xff0c;依然奋斗在编码第一线&#xff0c;挣扎在第一线&#xff0c;借此程序员佳节之际回顾一下这许多年的历程。 由于本人混得实在不好&#xff0c;工作过的地方都用某单位某公司来代替实际的&#xff0c;到…...

复习mysql中的事务

一个事务的开始和结尾必须是 start transaction | commit; rollback 事务特性 1.原子性&#xff1a;多个操作打包成一个整体&#xff0c;要么全部执行&#xff0c;要么一个都不执行。 不过这里的“一个都不执行”并不是真正的全不执行&#xff0c;只是看起来与没执行一样。…...

力扣刷题 day52:10-22

1.数组拆分 给定长度为 2n 的整数数组 nums &#xff0c;你的任务是将这些数分成 n 对, 例如 (a1, b1), (a2, b2), ..., (an, bn) &#xff0c;使得从 1 到 n 的 min(ai, bi) 总和最大。 返回该 最大总和 。 方法一&#xff1a;排序 #方法一&#xff1a;排序 def arrayPai…...

ELK概述部署和Filebeat 分布式日志管理平台部署

ELK概述部署、Filebeat 分布式日志管理平台部署 一、ELK 简介二、ELK部署2.1、部署准备2.2、优化elasticsearch用户拥有的内存权限2.3、启动elasticsearch是否成功开启2.4、浏览器查看节点信息2.5、安装 Elasticsearch-head 插件2.6、ELK Logstash 部署&#xff08;在 Apache 节…...

分享一下我家网络机柜,家庭网络设备推荐

家里网络机柜搞了几天终于搞好了&#xff0c;非专业的&#xff0c;走线有点乱&#xff0c;勿喷。 从上到下的设备分别是&#xff1a; 无线路由器&#xff08;当ap用&#xff09;:TL-XDR6088 插排&#xff1a;德木pdu机柜插排 硬盘录像机&#xff1a;TL-NVR6108-L8P 第二排左边…...

uboot移植之mx6ull_alientek_nand.h文件详解三

一. 简介 mx6ull_alientek_nand.h文件是 开发板的 uboot的一个配置文件。每个开发板都有一个 .h的配置文件。 mx6ull_alientek_nand.h 文件其实是 之前针对正点原子ALPHA开发板移植的 Uboot配置文件。 本文继上一篇文章的学习&#xff0c;地址如下&#xff1a;uboot移植之m…...

[Docker]一.Docker 简介与安装

一、Docker简介与为什么要用 Docker 1.1、Docker 介绍 Docker 是一个跨平台的开源的 应用容器引擎 &#xff0c;诞生于 2013 年初&#xff0c;基于 Go语言 并遵从 Apache2.0 协议开源, Docker 可以把它理解成虚拟机&#xff0c;但是 Docker 和传统虚拟化方式 有所不同 …...

计算机网络-计算机网络体系结构-传输层

目录 一、UDP 二、TCP 特点 首部格式 连接管理 可靠传输 流量控制(点对点) 拥塞控制(全局) 三、拥塞控制算法 慢开始&拥塞避免 快重传&快恢复 功能一&#xff1a;提供进程与进程之间的逻辑通信 功能二&#xff1a;复用和分用 功能三&#xff1a;对收到的报…...

buuctf[HCTF 2018]WarmUp 1

题目环境&#xff1a; 发现除了表情包&#xff0c;再无其他F12试试发现source.php文件访问这个文件&#xff0c;格式如下&#xff1a;url/source.php回显如下&#xff1a;PHP代码审计&#xff1a; <?php highlight_file(__FILE__); class emmm {public static function ch…...

开源博客项目Blog .NET Core源码学习(4:生成验证码)

开源博客项目Blog中的后台管理登录界面中支持输入验证码&#xff08;如下图所示&#xff09;&#xff0c;本文学习并记录项目中验证码的生成及调用方式。   博客项目中调用VerifyCode类生成验证码&#xff0c;该类位于App.Framwork项目中&#xff0c;命名空间为App.Framwork…...

gin框架39--重构 BasicAuth 中间件

gin框架39--重构 BasicAuth 中间件 介绍gin BasicAuth 解析自定义newAuth实现基础认证注意事项说明 介绍 每当我们打开一个网址的时候&#xff0c;会自动弹出一个认证界面&#xff0c;要求我们输入用户名和密码&#xff0c;这种BasicAuth是最基础、最常见的认证方式&#xff0…...

编译pycaffe过程中遇到的问题及解决

pycaffe是python调用caffe的方式&#xff0c;编译它就是要得到一个so库_pycaffe.so。 如题&#xff0c;在caffe的源码目录下&#xff0c;执行make pycaffe&#xff0c;跳出来一个错误: $ make pycaffe CXX/LD -o python/caffe/_caffe.so python/caffe/_caffe.cpp /usr/bin/ld…...

自然语言处理---Transformer机制详解之Transformer优势

1 Transformer的并行计算 对于Transformer比传统序列模型RNN/LSTM具备优势的第一大原因就是强大的并行计算能力. 对于RNN来说&#xff0c;任意时刻t的输入是时刻t的输入x(t)和上一时刻的隐藏层输出h(t-1)&#xff0c;经过运算后得到当前时刻隐藏层的输出h(t)&#xff0c;这个…...

改进YOLO系列 | YOLOv5/v7 引入 Dynamic Snake Convolution | 动态蛇形卷积

准确分割拓扑管状结构,如血管和道路,在各个领域中至关重要,可以确保下游任务的准确性和效率。然而,许多因素使任务复杂化,包括细小的局部结构和可变的全局形态。在这项工作中,我们注意到管状结构的特殊性,并利用这一知识来引导我们的DSCNet,以在三个阶段同时增强感知:…...

Vim 调用外部命令学习笔记

Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...

OpenLayers 可视化之热力图

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 热力图&#xff08;Heatmap&#xff09;又叫热点图&#xff0c;是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...

【入坑系列】TiDB 强制索引在不同库下不生效问题

文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...

OkHttp 中实现断点续传 demo

在 OkHttp 中实现断点续传主要通过以下步骤完成&#xff0c;核心是利用 HTTP 协议的 Range 请求头指定下载范围&#xff1a; 实现原理 Range 请求头&#xff1a;向服务器请求文件的特定字节范围&#xff08;如 Range: bytes1024-&#xff09; 本地文件记录&#xff1a;保存已…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!

5月28日&#xff0c;中天合创屋面分布式光伏发电项目顺利并网发电&#xff0c;该项目位于内蒙古自治区鄂尔多斯市乌审旗&#xff0c;项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站&#xff0c;总装机容量为9.96MWp。 项目投运后&#xff0c;每年可节约标煤3670…...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

以光量子为例,详解量子获取方式

光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学&#xff08;silicon photonics&#xff09;的光波导&#xff08;optical waveguide&#xff09;芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中&#xff0c;光既是波又是粒子。光子本…...

【Linux手册】探秘系统世界:从用户交互到硬件底层的全链路工作之旅

目录 前言 操作系统与驱动程序 是什么&#xff0c;为什么 怎么做 system call 用户操作接口 总结 前言 日常生活中&#xff0c;我们在使用电子设备时&#xff0c;我们所输入执行的每一条指令最终大多都会作用到硬件上&#xff0c;比如下载一款软件最终会下载到硬盘上&am…...

Pydantic + Function Calling的结合

1、Pydantic Pydantic 是一个 Python 库&#xff0c;用于数据验证和设置管理&#xff0c;通过 Python 类型注解强制执行数据类型。它广泛用于 API 开发&#xff08;如 FastAPI&#xff09;、配置管理和数据解析&#xff0c;核心功能包括&#xff1a; 数据验证&#xff1a;通过…...

QT开发技术【ffmpeg + QAudioOutput】音乐播放器

一、 介绍 使用ffmpeg 4.2.2 在数字化浪潮席卷全球的当下&#xff0c;音视频内容犹如璀璨繁星&#xff0c;点亮了人们的生活与工作。从短视频平台上令人捧腹的搞笑视频&#xff0c;到在线课堂中知识渊博的专家授课&#xff0c;再到影视平台上扣人心弦的高清大片&#xff0c;音…...