当前位置: 首页 > news >正文

flink中使用异步函数的几个注意事项

背景

在flink系统中,我们为了补充某个流事件成一个完整的记录,经常需要调用外部接口获取一些配置数据,流事件结合这些配置数据就可以组合成一条完整的记录,然而如果同步调用外部系统接口来实现,那么会有很大的性能瓶颈,这种情况下我们一般会使用异步函数提高性能,本文就来记录下使用异步函数的几个注意事项

异步函数的使用

首先看一下官方的例子:

/*** 实现 'AsyncFunction' 用于发送请求和设置回调。*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {/** 能够利用回调函数并发发送请求的数据库客户端 */private transient DatabaseClient client;@Overridepublic void open(Configuration parameters) throws Exception {client = new DatabaseClient(host, post, credentials);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {// 发送异步请求,接收 future 结果final Future<String> result = client.query(key);// 设置客户端完成请求后要执行的回调函数// 回调函数只是简单地把结果发给 futureCompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return result.get();} catch (InterruptedException | ExecutionException e) {// 显示地处理异常。return null;}}}).thenAccept( (String dbResult) -> {resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));});}
}// 创建初始 DataStream
DataStream<String> stream = ...;// 应用异步 I/O 转换操作
DataStream<Tuple2<String, String>> resultStream =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

注意事项如下:
1.在asyncinvoke方法中不能有阻塞的操作,比如这里仅仅是使用Future.thenAccept注册一个回调返回后的处理逻辑,而不会使用Future.get方法进行阻塞操作
2.AsyncDataStream.orderWait和AsyncDataStream.unorderWait方法都能正确的事件时间,也就是说即使是AsyncDataStream.unorderWait,它也能保证记录不会被之后的水位线超越
3.异步函数可以和检查点机制进行集成,也就是那些正在等待响应结果的记录会被写入检查点中,当故障恢复后,可以重新发送请求
4.如果服务端没有提供异步的客户端,我们可以用多线程进行模拟,只要多线程返回future对象即可
5.使用AsyncDataStream可以限制并发数以及如何进行超时处理等

相关文章:

flink中使用异步函数的几个注意事项

背景 在flink系统中&#xff0c;我们为了补充某个流事件成一个完整的记录&#xff0c;经常需要调用外部接口获取一些配置数据&#xff0c;流事件结合这些配置数据就可以组合成一条完整的记录&#xff0c;然而如果同步调用外部系统接口来实现&#xff0c;那么会有很大的性能瓶颈…...

QML之Repeater 控件使用

Repeater 控件是 重复作用 根据 model中的index 数量进行重复 废话不说 直接看如何用 当model 为数字时 Rectangle{height: 1200width: 500visible: trueanchors.fill: parentColumn{spacing: 20Repeater{model: 10delegate: Rectangle{width: 60height: 20color: index%2 …...

哈希树讲解

哈希树(HashTree)是哈希(Hash)算法的一种延续。传统数据结构中对如何避免哈希冲突都有一定的描述和解释&#xff0c;但是这些描述和解释都是泛泛而谈&#xff0c;并没有提出比较好的解决方案。这里所提到的哈希树(HashTree)算法就是要提供一种在理论上和实际应用中均能有效地处…...

vue 项目启动后一直不断的刷新停不下来

新建的vue 项目&#xff0c;配置了代理后项目一直刷新&#xff0c;停不下来&#xff0c;各种查找最后发现是vue.config.js 中的热更新配置项目开启的原因 const {defineConfig } require(vue/cli-service) const AutoImport require(unplugin-auto-import/webpack) const Co…...

makesense在线yolov5标注

文章目录 一、创建图片文件夹和label.txt二、在线标注数据 参考文章博主&#xff1a;风吹落叶花飘荡 一、创建图片文件夹和label.txt 创建一个放置图片的文件夹images&#xff0c;存放需要标注的图片&#xff08;图片最好重命名为1,2,3…避免后面混淆&#xff09; 创建label.t…...

python 之 矩阵相关操作

文章目录 1. **创建矩阵**&#xff1a;2. **矩阵加法**&#xff1a;3. **矩阵乘法**&#xff1a;4. **矩阵转置**&#xff1a;5. **元素级操作**&#xff1a;6. **汇总统计**&#xff1a;7. **逻辑操作**&#xff1a; 理解你的需求&#xff0c;我将为每个功能写一个单独的代码块…...

敢问路在何方

从2022年进入软件行业到现在&#xff0c;二十二年眨眼之间过去了&#xff0c;依然奋斗在编码第一线&#xff0c;挣扎在第一线&#xff0c;借此程序员佳节之际回顾一下这许多年的历程。 由于本人混得实在不好&#xff0c;工作过的地方都用某单位某公司来代替实际的&#xff0c;到…...

复习mysql中的事务

一个事务的开始和结尾必须是 start transaction | commit; rollback 事务特性 1.原子性&#xff1a;多个操作打包成一个整体&#xff0c;要么全部执行&#xff0c;要么一个都不执行。 不过这里的“一个都不执行”并不是真正的全不执行&#xff0c;只是看起来与没执行一样。…...

力扣刷题 day52:10-22

1.数组拆分 给定长度为 2n 的整数数组 nums &#xff0c;你的任务是将这些数分成 n 对, 例如 (a1, b1), (a2, b2), ..., (an, bn) &#xff0c;使得从 1 到 n 的 min(ai, bi) 总和最大。 返回该 最大总和 。 方法一&#xff1a;排序 #方法一&#xff1a;排序 def arrayPai…...

ELK概述部署和Filebeat 分布式日志管理平台部署

ELK概述部署、Filebeat 分布式日志管理平台部署 一、ELK 简介二、ELK部署2.1、部署准备2.2、优化elasticsearch用户拥有的内存权限2.3、启动elasticsearch是否成功开启2.4、浏览器查看节点信息2.5、安装 Elasticsearch-head 插件2.6、ELK Logstash 部署&#xff08;在 Apache 节…...

分享一下我家网络机柜,家庭网络设备推荐

家里网络机柜搞了几天终于搞好了&#xff0c;非专业的&#xff0c;走线有点乱&#xff0c;勿喷。 从上到下的设备分别是&#xff1a; 无线路由器&#xff08;当ap用&#xff09;:TL-XDR6088 插排&#xff1a;德木pdu机柜插排 硬盘录像机&#xff1a;TL-NVR6108-L8P 第二排左边…...

uboot移植之mx6ull_alientek_nand.h文件详解三

一. 简介 mx6ull_alientek_nand.h文件是 开发板的 uboot的一个配置文件。每个开发板都有一个 .h的配置文件。 mx6ull_alientek_nand.h 文件其实是 之前针对正点原子ALPHA开发板移植的 Uboot配置文件。 本文继上一篇文章的学习&#xff0c;地址如下&#xff1a;uboot移植之m…...

[Docker]一.Docker 简介与安装

一、Docker简介与为什么要用 Docker 1.1、Docker 介绍 Docker 是一个跨平台的开源的 应用容器引擎 &#xff0c;诞生于 2013 年初&#xff0c;基于 Go语言 并遵从 Apache2.0 协议开源, Docker 可以把它理解成虚拟机&#xff0c;但是 Docker 和传统虚拟化方式 有所不同 …...

计算机网络-计算机网络体系结构-传输层

目录 一、UDP 二、TCP 特点 首部格式 连接管理 可靠传输 流量控制(点对点) 拥塞控制(全局) 三、拥塞控制算法 慢开始&拥塞避免 快重传&快恢复 功能一&#xff1a;提供进程与进程之间的逻辑通信 功能二&#xff1a;复用和分用 功能三&#xff1a;对收到的报…...

buuctf[HCTF 2018]WarmUp 1

题目环境&#xff1a; 发现除了表情包&#xff0c;再无其他F12试试发现source.php文件访问这个文件&#xff0c;格式如下&#xff1a;url/source.php回显如下&#xff1a;PHP代码审计&#xff1a; <?php highlight_file(__FILE__); class emmm {public static function ch…...

开源博客项目Blog .NET Core源码学习(4:生成验证码)

开源博客项目Blog中的后台管理登录界面中支持输入验证码&#xff08;如下图所示&#xff09;&#xff0c;本文学习并记录项目中验证码的生成及调用方式。   博客项目中调用VerifyCode类生成验证码&#xff0c;该类位于App.Framwork项目中&#xff0c;命名空间为App.Framwork…...

gin框架39--重构 BasicAuth 中间件

gin框架39--重构 BasicAuth 中间件 介绍gin BasicAuth 解析自定义newAuth实现基础认证注意事项说明 介绍 每当我们打开一个网址的时候&#xff0c;会自动弹出一个认证界面&#xff0c;要求我们输入用户名和密码&#xff0c;这种BasicAuth是最基础、最常见的认证方式&#xff0…...

编译pycaffe过程中遇到的问题及解决

pycaffe是python调用caffe的方式&#xff0c;编译它就是要得到一个so库_pycaffe.so。 如题&#xff0c;在caffe的源码目录下&#xff0c;执行make pycaffe&#xff0c;跳出来一个错误: $ make pycaffe CXX/LD -o python/caffe/_caffe.so python/caffe/_caffe.cpp /usr/bin/ld…...

自然语言处理---Transformer机制详解之Transformer优势

1 Transformer的并行计算 对于Transformer比传统序列模型RNN/LSTM具备优势的第一大原因就是强大的并行计算能力. 对于RNN来说&#xff0c;任意时刻t的输入是时刻t的输入x(t)和上一时刻的隐藏层输出h(t-1)&#xff0c;经过运算后得到当前时刻隐藏层的输出h(t)&#xff0c;这个…...

改进YOLO系列 | YOLOv5/v7 引入 Dynamic Snake Convolution | 动态蛇形卷积

准确分割拓扑管状结构,如血管和道路,在各个领域中至关重要,可以确保下游任务的准确性和效率。然而,许多因素使任务复杂化,包括细小的局部结构和可变的全局形态。在这项工作中,我们注意到管状结构的特殊性,并利用这一知识来引导我们的DSCNet,以在三个阶段同时增强感知:…...

国防科技大学计算机基础课程笔记02信息编码

1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制&#xff0c;因此这个了16进制的数据既可以翻译成为这个机器码&#xff0c;也可以翻译成为这个国标码&#xff0c;所以这个时候很容易会出现这个歧义的情况&#xff1b; 因此&#xff0c;我们的这个国…...

3403. 从盒子中找出字典序最大的字符串 I

3403. 从盒子中找出字典序最大的字符串 I 题目链接&#xff1a;3403. 从盒子中找出字典序最大的字符串 I 代码如下&#xff1a; class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...

深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南

&#x1f680; C extern 关键字深度解析&#xff1a;跨文件编程的终极指南 &#x1f4c5; 更新时间&#xff1a;2025年6月5日 &#x1f3f7;️ 标签&#xff1a;C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言&#x1f525;一、extern 是什么&#xff1f;&…...

什么是Ansible Jinja2

理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具&#xff0c;可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板&#xff0c;允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板&#xff0c;并通…...

NPOI Excel用OLE对象的形式插入文件附件以及插入图片

static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制

目录 节点的功能承载层&#xff08;GATT/Adv&#xff09;局限性&#xff1a; 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能&#xff0c;如 Configuration …...

Vue 模板语句的数据来源

&#x1f9e9; Vue 模板语句的数据来源&#xff1a;全方位解析 Vue 模板&#xff08;<template> 部分&#xff09;中的表达式、指令绑定&#xff08;如 v-bind, v-on&#xff09;和插值&#xff08;{{ }}&#xff09;都在一个特定的作用域内求值。这个作用域由当前 组件…...

Unity VR/MR开发-VR开发与传统3D开发的差异

视频讲解链接&#xff1a;【XR马斯维】VR/MR开发与传统3D开发的差异【UnityVR/MR开发教程--入门】_哔哩哔哩_bilibili...

shell脚本质数判断

shell脚本质数判断 shell输入一个正整数,判断是否为质数(素数&#xff09;shell求1-100内的质数shell求给定数组输出其中的质数 shell输入一个正整数,判断是否为质数(素数&#xff09; 思路&#xff1a; 1:1 2:1 2 3:1 2 3 4:1 2 3 4 5:1 2 3 4 5-------> 3:2 4:2 3 5:2 3…...

react更新页面数据,操作页面,双向数据绑定

// 路由不是组件的直接跳转use client&#xff0c;useEffect&#xff0c;useRouter&#xff0c;需3个结合&#xff0c; use client表示客户端 use client; import { Button,Card, Space,Tag,Table,message,Input } from antd; import { useEffect,useState } from react; impor…...