flink中使用异步函数的几个注意事项
背景
在flink系统中,我们为了补充某个流事件成一个完整的记录,经常需要调用外部接口获取一些配置数据,流事件结合这些配置数据就可以组合成一条完整的记录,然而如果同步调用外部系统接口来实现,那么会有很大的性能瓶颈,这种情况下我们一般会使用异步函数提高性能,本文就来记录下使用异步函数的几个注意事项
异步函数的使用
首先看一下官方的例子:
/*** 实现 'AsyncFunction' 用于发送请求和设置回调。*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {/** 能够利用回调函数并发发送请求的数据库客户端 */private transient DatabaseClient client;@Overridepublic void open(Configuration parameters) throws Exception {client = new DatabaseClient(host, post, credentials);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {// 发送异步请求,接收 future 结果final Future<String> result = client.query(key);// 设置客户端完成请求后要执行的回调函数// 回调函数只是简单地把结果发给 futureCompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return result.get();} catch (InterruptedException | ExecutionException e) {// 显示地处理异常。return null;}}}).thenAccept( (String dbResult) -> {resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));});}
}// 创建初始 DataStream
DataStream<String> stream = ...;// 应用异步 I/O 转换操作
DataStream<Tuple2<String, String>> resultStream =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
注意事项如下:
1.在asyncinvoke方法中不能有阻塞的操作,比如这里仅仅是使用Future.thenAccept注册一个回调返回后的处理逻辑,而不会使用Future.get方法进行阻塞操作
2.AsyncDataStream.orderWait和AsyncDataStream.unorderWait方法都能正确的事件时间,也就是说即使是AsyncDataStream.unorderWait,它也能保证记录不会被之后的水位线超越
3.异步函数可以和检查点机制进行集成,也就是那些正在等待响应结果的记录会被写入检查点中,当故障恢复后,可以重新发送请求
4.如果服务端没有提供异步的客户端,我们可以用多线程进行模拟,只要多线程返回future对象即可
5.使用AsyncDataStream可以限制并发数以及如何进行超时处理等
相关文章:
flink中使用异步函数的几个注意事项
背景 在flink系统中,我们为了补充某个流事件成一个完整的记录,经常需要调用外部接口获取一些配置数据,流事件结合这些配置数据就可以组合成一条完整的记录,然而如果同步调用外部系统接口来实现,那么会有很大的性能瓶颈…...
QML之Repeater 控件使用
Repeater 控件是 重复作用 根据 model中的index 数量进行重复 废话不说 直接看如何用 当model 为数字时 Rectangle{height: 1200width: 500visible: trueanchors.fill: parentColumn{spacing: 20Repeater{model: 10delegate: Rectangle{width: 60height: 20color: index%2 …...
哈希树讲解
哈希树(HashTree)是哈希(Hash)算法的一种延续。传统数据结构中对如何避免哈希冲突都有一定的描述和解释,但是这些描述和解释都是泛泛而谈,并没有提出比较好的解决方案。这里所提到的哈希树(HashTree)算法就是要提供一种在理论上和实际应用中均能有效地处…...
vue 项目启动后一直不断的刷新停不下来
新建的vue 项目,配置了代理后项目一直刷新,停不下来,各种查找最后发现是vue.config.js 中的热更新配置项目开启的原因 const {defineConfig } require(vue/cli-service) const AutoImport require(unplugin-auto-import/webpack) const Co…...
makesense在线yolov5标注
文章目录 一、创建图片文件夹和label.txt二、在线标注数据 参考文章博主:风吹落叶花飘荡 一、创建图片文件夹和label.txt 创建一个放置图片的文件夹images,存放需要标注的图片(图片最好重命名为1,2,3…避免后面混淆) 创建label.t…...
python 之 矩阵相关操作
文章目录 1. **创建矩阵**:2. **矩阵加法**:3. **矩阵乘法**:4. **矩阵转置**:5. **元素级操作**:6. **汇总统计**:7. **逻辑操作**: 理解你的需求,我将为每个功能写一个单独的代码块…...
敢问路在何方
从2022年进入软件行业到现在,二十二年眨眼之间过去了,依然奋斗在编码第一线,挣扎在第一线,借此程序员佳节之际回顾一下这许多年的历程。 由于本人混得实在不好,工作过的地方都用某单位某公司来代替实际的,到…...
复习mysql中的事务
一个事务的开始和结尾必须是 start transaction | commit; rollback 事务特性 1.原子性:多个操作打包成一个整体,要么全部执行,要么一个都不执行。 不过这里的“一个都不执行”并不是真正的全不执行,只是看起来与没执行一样。…...
力扣刷题 day52:10-22
1.数组拆分 给定长度为 2n 的整数数组 nums ,你的任务是将这些数分成 n 对, 例如 (a1, b1), (a2, b2), ..., (an, bn) ,使得从 1 到 n 的 min(ai, bi) 总和最大。 返回该 最大总和 。 方法一:排序 #方法一:排序 def arrayPai…...
ELK概述部署和Filebeat 分布式日志管理平台部署
ELK概述部署、Filebeat 分布式日志管理平台部署 一、ELK 简介二、ELK部署2.1、部署准备2.2、优化elasticsearch用户拥有的内存权限2.3、启动elasticsearch是否成功开启2.4、浏览器查看节点信息2.5、安装 Elasticsearch-head 插件2.6、ELK Logstash 部署(在 Apache 节…...
分享一下我家网络机柜,家庭网络设备推荐
家里网络机柜搞了几天终于搞好了,非专业的,走线有点乱,勿喷。 从上到下的设备分别是: 无线路由器(当ap用):TL-XDR6088 插排:德木pdu机柜插排 硬盘录像机:TL-NVR6108-L8P 第二排左边…...
uboot移植之mx6ull_alientek_nand.h文件详解三
一. 简介 mx6ull_alientek_nand.h文件是 开发板的 uboot的一个配置文件。每个开发板都有一个 .h的配置文件。 mx6ull_alientek_nand.h 文件其实是 之前针对正点原子ALPHA开发板移植的 Uboot配置文件。 本文继上一篇文章的学习,地址如下:uboot移植之m…...
[Docker]一.Docker 简介与安装
一、Docker简介与为什么要用 Docker 1.1、Docker 介绍 Docker 是一个跨平台的开源的 应用容器引擎 ,诞生于 2013 年初,基于 Go语言 并遵从 Apache2.0 协议开源, Docker 可以把它理解成虚拟机,但是 Docker 和传统虚拟化方式 有所不同 …...
计算机网络-计算机网络体系结构-传输层
目录 一、UDP 二、TCP 特点 首部格式 连接管理 可靠传输 流量控制(点对点) 拥塞控制(全局) 三、拥塞控制算法 慢开始&拥塞避免 快重传&快恢复 功能一:提供进程与进程之间的逻辑通信 功能二:复用和分用 功能三:对收到的报…...
buuctf[HCTF 2018]WarmUp 1
题目环境: 发现除了表情包,再无其他F12试试发现source.php文件访问这个文件,格式如下:url/source.php回显如下:PHP代码审计: <?php highlight_file(__FILE__); class emmm {public static function ch…...
开源博客项目Blog .NET Core源码学习(4:生成验证码)
开源博客项目Blog中的后台管理登录界面中支持输入验证码(如下图所示),本文学习并记录项目中验证码的生成及调用方式。 博客项目中调用VerifyCode类生成验证码,该类位于App.Framwork项目中,命名空间为App.Framwork…...
gin框架39--重构 BasicAuth 中间件
gin框架39--重构 BasicAuth 中间件 介绍gin BasicAuth 解析自定义newAuth实现基础认证注意事项说明 介绍 每当我们打开一个网址的时候,会自动弹出一个认证界面,要求我们输入用户名和密码,这种BasicAuth是最基础、最常见的认证方式࿰…...
编译pycaffe过程中遇到的问题及解决
pycaffe是python调用caffe的方式,编译它就是要得到一个so库_pycaffe.so。 如题,在caffe的源码目录下,执行make pycaffe,跳出来一个错误: $ make pycaffe CXX/LD -o python/caffe/_caffe.so python/caffe/_caffe.cpp /usr/bin/ld…...
自然语言处理---Transformer机制详解之Transformer优势
1 Transformer的并行计算 对于Transformer比传统序列模型RNN/LSTM具备优势的第一大原因就是强大的并行计算能力. 对于RNN来说,任意时刻t的输入是时刻t的输入x(t)和上一时刻的隐藏层输出h(t-1),经过运算后得到当前时刻隐藏层的输出h(t),这个…...
改进YOLO系列 | YOLOv5/v7 引入 Dynamic Snake Convolution | 动态蛇形卷积
准确分割拓扑管状结构,如血管和道路,在各个领域中至关重要,可以确保下游任务的准确性和效率。然而,许多因素使任务复杂化,包括细小的局部结构和可变的全局形态。在这项工作中,我们注意到管状结构的特殊性,并利用这一知识来引导我们的DSCNet,以在三个阶段同时增强感知:…...
Metashape空三优化:关键参数解析与实战调优指南
1. Metashape空三处理的核心参数解析 空三(空中三角测量)是摄影测量中的关键步骤,它直接决定了后续建模和测绘成果的精度。在Metashape中,有几个核心参数会显著影响空三的质量和效率。这些参数看起来可能有些复杂,但理…...
一文读懂JJF2132—2024:荧光紫外老化试验箱校准该关注什么?
2024年12月14日,发布的JJF2132—2024《荧光紫外灯人工气候老化试验装置校准规范》正式实施,成为荧光紫外老化试验箱(QUV类设备)辐射照度参数校准的新的计量技术规范。这一规程的更新并非简单的技术迭代,而是对当前材料…...
优峰技术:中心波长可调滤波器在光通信测试中的应用与选型
在1.6T光模块、CPO、DWDM系统快速发展的今天,中心波长可调滤波器已经成为光通信测试、光谱分析、信道筛选的关键器件。作为光通信测试领域深耕多年的企业,深圳优峰技术结合国际主流产品标准与自研技术,推出高性能中心波长可调滤波器及配套测试…...
2025届毕业生推荐的五大降重复率神器实际效果
Ai论文网站排名(开题报告、文献综述、降aigc率、降重综合对比) TOP1. 千笔AI TOP2. aipasspaper TOP3. 清北论文 TOP4. 豆包 TOP5. kimi TOP6. deepseek 为降低AIGC检测率,其核心要点在于消除生成式文本呈现出的规律性特征。其一&#…...
AI时代工程师的Superpowers进化论技术
核心主题:探讨AI技术如何重塑工程师的能力边界,分析工程师在AI时代需要掌握的新技能与思维模式。技术驱动的能力进化传统工程师能力模型核心技能:编程、算法、系统设计、调试局限性:依赖人工分析,效率天花板明显AI赋能…...
为什么你的搜索还在用纯文本?多模态大模型已成头部平台标配,错过这波升级将落后至少18个月
第一章:多模态大模型在搜索中的应用 2026奇点智能技术大会(https://ml-summit.org) 多模态大模型正深刻重构现代搜索引擎的核心能力,使搜索从传统的关键词匹配跃迁为跨文本、图像、音频与视频的语义理解与意图对齐。用户上传一张模糊的街景照片并提问“…...
Wan2.2-I2V Anaconda环境配置全指南
Wan2.2-I2V Anaconda环境配置全指南 1. 为什么选择Anaconda来跑Wan2.2-I2V 刚开始接触Wan2.2-I2V时,我试过直接在系统Python里装依赖,结果不到半小时就卡在了CUDA版本冲突上。后来发现用Anaconda管理环境简直是救命稻草——它能把不同项目的Python版本…...
从原理到实战:深度相机在机器人避障中的核心算法解析
1. 深度相机如何成为机器人的"火眼金睛" 第一次接触深度相机时,我被它输出的彩色点云图震撼到了——就像给机器人装上了孙悟空的火眼金睛,普通摄像头只能看到平面图像,而深度相机却能直接"看"到物体的远近。这种三维视觉…...
Win to Go实战:轻松在外接硬盘或移动硬盘上部署Windows系统
1. 为什么你需要Win to Go? 想象一下这样的场景:你正在咖啡馆用笔记本处理工作文档,突然接到通知要去客户现场演示。传统做法是带着笨重的笔记本,或者把文件拷到U盘——但前者太重,后者可能遇到软件不兼容、环境配置缺…...
3步找回消失的微信记忆:WechatDecrypt工具实战指南
3步找回消失的微信记忆:WechatDecrypt工具实战指南 【免费下载链接】WechatDecrypt 微信消息解密工具 项目地址: https://gitcode.com/gh_mirrors/we/WechatDecrypt 你是否曾因手机更换而痛失珍贵的聊天记录?或是急需找回某段重要对话却束手无策&…...
