flink中使用异步函数的几个注意事项
背景
在flink系统中,我们为了补充某个流事件成一个完整的记录,经常需要调用外部接口获取一些配置数据,流事件结合这些配置数据就可以组合成一条完整的记录,然而如果同步调用外部系统接口来实现,那么会有很大的性能瓶颈,这种情况下我们一般会使用异步函数提高性能,本文就来记录下使用异步函数的几个注意事项
异步函数的使用
首先看一下官方的例子:
/*** 实现 'AsyncFunction' 用于发送请求和设置回调。*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {/** 能够利用回调函数并发发送请求的数据库客户端 */private transient DatabaseClient client;@Overridepublic void open(Configuration parameters) throws Exception {client = new DatabaseClient(host, post, credentials);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {// 发送异步请求,接收 future 结果final Future<String> result = client.query(key);// 设置客户端完成请求后要执行的回调函数// 回调函数只是简单地把结果发给 futureCompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return result.get();} catch (InterruptedException | ExecutionException e) {// 显示地处理异常。return null;}}}).thenAccept( (String dbResult) -> {resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));});}
}// 创建初始 DataStream
DataStream<String> stream = ...;// 应用异步 I/O 转换操作
DataStream<Tuple2<String, String>> resultStream =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
注意事项如下:
1.在asyncinvoke方法中不能有阻塞的操作,比如这里仅仅是使用Future.thenAccept注册一个回调返回后的处理逻辑,而不会使用Future.get方法进行阻塞操作
2.AsyncDataStream.orderWait和AsyncDataStream.unorderWait方法都能正确的事件时间,也就是说即使是AsyncDataStream.unorderWait,它也能保证记录不会被之后的水位线超越
3.异步函数可以和检查点机制进行集成,也就是那些正在等待响应结果的记录会被写入检查点中,当故障恢复后,可以重新发送请求
4.如果服务端没有提供异步的客户端,我们可以用多线程进行模拟,只要多线程返回future对象即可
5.使用AsyncDataStream可以限制并发数以及如何进行超时处理等
相关文章:
flink中使用异步函数的几个注意事项
背景 在flink系统中,我们为了补充某个流事件成一个完整的记录,经常需要调用外部接口获取一些配置数据,流事件结合这些配置数据就可以组合成一条完整的记录,然而如果同步调用外部系统接口来实现,那么会有很大的性能瓶颈…...
QML之Repeater 控件使用
Repeater 控件是 重复作用 根据 model中的index 数量进行重复 废话不说 直接看如何用 当model 为数字时 Rectangle{height: 1200width: 500visible: trueanchors.fill: parentColumn{spacing: 20Repeater{model: 10delegate: Rectangle{width: 60height: 20color: index%2 …...
哈希树讲解
哈希树(HashTree)是哈希(Hash)算法的一种延续。传统数据结构中对如何避免哈希冲突都有一定的描述和解释,但是这些描述和解释都是泛泛而谈,并没有提出比较好的解决方案。这里所提到的哈希树(HashTree)算法就是要提供一种在理论上和实际应用中均能有效地处…...
vue 项目启动后一直不断的刷新停不下来
新建的vue 项目,配置了代理后项目一直刷新,停不下来,各种查找最后发现是vue.config.js 中的热更新配置项目开启的原因 const {defineConfig } require(vue/cli-service) const AutoImport require(unplugin-auto-import/webpack) const Co…...
makesense在线yolov5标注
文章目录 一、创建图片文件夹和label.txt二、在线标注数据 参考文章博主:风吹落叶花飘荡 一、创建图片文件夹和label.txt 创建一个放置图片的文件夹images,存放需要标注的图片(图片最好重命名为1,2,3…避免后面混淆) 创建label.t…...
python 之 矩阵相关操作
文章目录 1. **创建矩阵**:2. **矩阵加法**:3. **矩阵乘法**:4. **矩阵转置**:5. **元素级操作**:6. **汇总统计**:7. **逻辑操作**: 理解你的需求,我将为每个功能写一个单独的代码块…...
敢问路在何方
从2022年进入软件行业到现在,二十二年眨眼之间过去了,依然奋斗在编码第一线,挣扎在第一线,借此程序员佳节之际回顾一下这许多年的历程。 由于本人混得实在不好,工作过的地方都用某单位某公司来代替实际的,到…...
复习mysql中的事务
一个事务的开始和结尾必须是 start transaction | commit; rollback 事务特性 1.原子性:多个操作打包成一个整体,要么全部执行,要么一个都不执行。 不过这里的“一个都不执行”并不是真正的全不执行,只是看起来与没执行一样。…...
力扣刷题 day52:10-22
1.数组拆分 给定长度为 2n 的整数数组 nums ,你的任务是将这些数分成 n 对, 例如 (a1, b1), (a2, b2), ..., (an, bn) ,使得从 1 到 n 的 min(ai, bi) 总和最大。 返回该 最大总和 。 方法一:排序 #方法一:排序 def arrayPai…...
ELK概述部署和Filebeat 分布式日志管理平台部署
ELK概述部署、Filebeat 分布式日志管理平台部署 一、ELK 简介二、ELK部署2.1、部署准备2.2、优化elasticsearch用户拥有的内存权限2.3、启动elasticsearch是否成功开启2.4、浏览器查看节点信息2.5、安装 Elasticsearch-head 插件2.6、ELK Logstash 部署(在 Apache 节…...
分享一下我家网络机柜,家庭网络设备推荐
家里网络机柜搞了几天终于搞好了,非专业的,走线有点乱,勿喷。 从上到下的设备分别是: 无线路由器(当ap用):TL-XDR6088 插排:德木pdu机柜插排 硬盘录像机:TL-NVR6108-L8P 第二排左边…...
uboot移植之mx6ull_alientek_nand.h文件详解三
一. 简介 mx6ull_alientek_nand.h文件是 开发板的 uboot的一个配置文件。每个开发板都有一个 .h的配置文件。 mx6ull_alientek_nand.h 文件其实是 之前针对正点原子ALPHA开发板移植的 Uboot配置文件。 本文继上一篇文章的学习,地址如下:uboot移植之m…...
[Docker]一.Docker 简介与安装
一、Docker简介与为什么要用 Docker 1.1、Docker 介绍 Docker 是一个跨平台的开源的 应用容器引擎 ,诞生于 2013 年初,基于 Go语言 并遵从 Apache2.0 协议开源, Docker 可以把它理解成虚拟机,但是 Docker 和传统虚拟化方式 有所不同 …...
计算机网络-计算机网络体系结构-传输层
目录 一、UDP 二、TCP 特点 首部格式 连接管理 可靠传输 流量控制(点对点) 拥塞控制(全局) 三、拥塞控制算法 慢开始&拥塞避免 快重传&快恢复 功能一:提供进程与进程之间的逻辑通信 功能二:复用和分用 功能三:对收到的报…...
buuctf[HCTF 2018]WarmUp 1
题目环境: 发现除了表情包,再无其他F12试试发现source.php文件访问这个文件,格式如下:url/source.php回显如下:PHP代码审计: <?php highlight_file(__FILE__); class emmm {public static function ch…...
开源博客项目Blog .NET Core源码学习(4:生成验证码)
开源博客项目Blog中的后台管理登录界面中支持输入验证码(如下图所示),本文学习并记录项目中验证码的生成及调用方式。 博客项目中调用VerifyCode类生成验证码,该类位于App.Framwork项目中,命名空间为App.Framwork…...
gin框架39--重构 BasicAuth 中间件
gin框架39--重构 BasicAuth 中间件 介绍gin BasicAuth 解析自定义newAuth实现基础认证注意事项说明 介绍 每当我们打开一个网址的时候,会自动弹出一个认证界面,要求我们输入用户名和密码,这种BasicAuth是最基础、最常见的认证方式࿰…...
编译pycaffe过程中遇到的问题及解决
pycaffe是python调用caffe的方式,编译它就是要得到一个so库_pycaffe.so。 如题,在caffe的源码目录下,执行make pycaffe,跳出来一个错误: $ make pycaffe CXX/LD -o python/caffe/_caffe.so python/caffe/_caffe.cpp /usr/bin/ld…...
自然语言处理---Transformer机制详解之Transformer优势
1 Transformer的并行计算 对于Transformer比传统序列模型RNN/LSTM具备优势的第一大原因就是强大的并行计算能力. 对于RNN来说,任意时刻t的输入是时刻t的输入x(t)和上一时刻的隐藏层输出h(t-1),经过运算后得到当前时刻隐藏层的输出h(t),这个…...
改进YOLO系列 | YOLOv5/v7 引入 Dynamic Snake Convolution | 动态蛇形卷积
准确分割拓扑管状结构,如血管和道路,在各个领域中至关重要,可以确保下游任务的准确性和效率。然而,许多因素使任务复杂化,包括细小的局部结构和可变的全局形态。在这项工作中,我们注意到管状结构的特殊性,并利用这一知识来引导我们的DSCNet,以在三个阶段同时增强感知:…...
Vue记事本应用实现教程
文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展:显示创建时间8. 功能扩展:记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...
Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误
HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误,它们的含义、原因和解决方法都有显著区别。以下是详细对比: 1. HTTP 406 (Not Acceptable) 含义: 客户端请求的内容类型与服务器支持的内容类型不匹…...
阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
React Native在HarmonyOS 5.0阅读类应用开发中的实践
一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强,React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 (1)使用React Native…...
css3笔记 (1) 自用
outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size:0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格ÿ…...
Vue ③-生命周期 || 脚手架
生命周期 思考:什么时候可以发送初始化渲染请求?(越早越好) 什么时候可以开始操作dom?(至少dom得渲染出来) Vue生命周期: 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...
Linux中《基础IO》详细介绍
目录 理解"文件"狭义理解广义理解文件操作的归类认知系统角度文件类别 回顾C文件接口打开文件写文件读文件稍作修改,实现简单cat命令 输出信息到显示器,你有哪些方法stdin & stdout & stderr打开文件的方式 系统⽂件I/O⼀种传递标志位…...
云原生安全实战:API网关Envoy的鉴权与限流详解
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关 作为微服务架构的统一入口,负责路由转发、安全控制、流量管理等核心功能。 2. Envoy 由Lyft开源的高性能云原生…...
el-amap-bezier-curve运用及线弧度设置
文章目录 简介示例线弧度属性主要弧度相关属性其他相关样式属性完整示例链接简介 el-amap-bezier-curve 是 Vue-Amap 组件库中的一个组件,用于在 高德地图 上绘制贝塞尔曲线。 基本用法属性path定义曲线的路径,可以是多个弧线段的组合。stroke-weight线条的宽度。stroke…...
