当前位置: 首页 > news >正文

利用java8 的 CompletableFuture 优化 Flink 程序,性能提升 50%

你好,我是 shengjk1,多年大厂经验,努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注!你会有如下收益:

  1. 了解大厂经验
  2. 拥有和大厂相匹配的技术等

希望看什么,评论或者私信告诉我!

文章目录

  • 一、前言
  • 二、Flink 代码优化
    • 2.0 问题发现
    • 2.1 原有代码
    • 2.2 CompletableFuture 优化
  • 三、avatorscript 使用的简单介绍
    • 3.1 自定义函数
    • 3.2 从 Map 中取值
    • 3.3 使用 Java 的工具类
    • 3.4 AviatorScript 函数
  • 四、总结


一、前言

目前 Flink 利用 avatorscript 脚本语言,来做到规则的自动化更新。avatorscript将表达式直接翻译成对应的 java 字节码执行,所以在大数据量的情况下,自然而然这里就成为了瓶颈

二、Flink 代码优化

2.0 问题发现

图片.png
通过 Flink UI 发现 window 算子是瓶颈,而 window 算子的核心就是 avatorscript 表达式

2.1 原有代码

xxx
AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
xxx

经过测试平均执行时间在1毫秒以内,但经不住数据量大,所以Flink QPS一直在 11w 左右

2.2 CompletableFuture 优化

xxx
List<CompletableFuture> executeFutures=new ArrayList<>();CompletableFuture<Object> executeFuture = CompletableFuture.supplyAsync(() -> {return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);});
executeFutures.add(executeFuture);for (int i = 0; i < executeFutures.size(); i++) {executeFutures.get(i).get()xxxx
}

修改完上线后,Flink QPS 有原来 11W 增加到 17W 左右

三、avatorscript 使用的简单介绍

为了让你更容易理解 avatorscript,这里我们也可以先简单的介绍一下:

3.1 自定义函数

class AddFunction extends AbstractFunction {@Overridepublic AviatorObject call(Map<String, Object> env,AviatorObject arg1, AviatorObject arg2) {Number left = FunctionUtils.getNumberValue(arg1, env);Number right = FunctionUtils.getNumberValue(arg2, env);return new AviatorDouble(left.intValue() + right.intValue());}public String getName() {return "add" ;}
}public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {//注册函数AviatorEvaluator.addFunction(new AddFunction());System.out.println(AviatorEvaluator.execute( "add(2,1)" ));
}

3.2 从 Map 中取值

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {//注册函数AviatorEvaluator.addFunction(new AddFunction());HashMap<String, Object> stringObjectHashMap = new HashMap<>();stringObjectHashMap.put( "testId1" , 1);stringObjectHashMap.put( "testId2" , 2);Object execute = AviatorEvaluator.execute( "add(testId1,testId2)" , stringObjectHashMap);

3.3 使用 Java 的工具类

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {HashMap<String, Object> stringObjectHashMap = new HashMap<>();stringObjectHashMap.put( "ip" , "a1111" );// stringObjectHashMap.put("result", "a&B&C&d");stringObjectHashMap.put( "voucher_endtime" , "2022.03.02 11:32" );stringObjectHashMap.put( "imei2" , "v1aaaaaa1" );stringObjectHashMap.put( "testId" , "v1ot_service_quality_1111" );stringObjectHashMap.put( "testId1" , "sku" );stringObjectHashMap.put( "a" , "123" );stringObjectHashMap.put( "a1" , "null" );stringObjectHashMap.put( "b1" , 123);AviatorEvaluator.addStaticFunctions( "doubleStatic" , Double.class);AviatorEvaluator.addInstanceFunctions( "doubleInstance" , Double.class)execute2 = AviatorEvaluator.execute( "(doubleStatic.valueOf(sys_net_bandwidth))" , stringObjectHashMap);System.out.println(execute2);execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) " , stringObjectHashMap);System.out.println( "###" + execute2);execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(str(voucher)))" , stringObjectHashMap);

3.4 AviatorScript 函数

## examples/function.av
fn add(x, y) {return x + y;
}
p(add(1,2))
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {String function = "## examples/function.av\n" +"\n" +"fn add(x, y) {\n" +"  return x + y;\n" +"}" ;AviatorEvaluator.defineFunction( "add" , function);System.out.println( "defineFunction6666================+" + AviatorEvaluator.execute( "add(1,2)" , stringObjectHashMap));
}

四、总结

本文主要介绍了 Flink 中使用 avatorscript 脚本语言的问题,以及如何通过 CompletableFuture 优化代码来提高 Flink QPS。同时,还介绍了 avatorscript 的使用方法,包括自定义函数、从 Map 中取值、使用 Java 工具类和 AviatorScript 函数。通过本文的介绍,读者可以更好地了解 Flink 中 avatorscript 的使用方法,以及如何优化代码来提高 Flink QPS。

相关文章:

利用java8 的 CompletableFuture 优化 Flink 程序,性能提升 50%

你好&#xff0c;我是 shengjk1&#xff0c;多年大厂经验&#xff0c;努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注&#xff01;你会有如下收益&#xff1a; 了解大厂经验拥有和大厂相匹配的技术等 希望看什么&#xff0c;评论或者私信告诉我&#xff01; 文章目录 一…...

香橙派 AIpro综合体验及AI样例运行

香橙派 AIpro综合体验及AI样例运行 环境&#xff1a; 香橙派版本&#xff1a; AIpro(8TOPSINT8) OS : Ubuntu 22.04.3 LTS(GNU/Linux 5.10.0 aarch64) (2024-03-18) 远程服务端1&#xff1a;OpenSSH 8.9p1 远程服务端2&#xff1a;TightVNC Server 1.3.10 远程客户端&#xf…...

通过域名接口申请免费的ssl多域名证书

来此加密已顺利接入阿里云的域名接口&#xff0c;用户只需一键调用&#xff0c;便可轻松完成域名验证&#xff0c;从而更高效地申请证书。接下来&#xff0c;让我们详细解读一下整个操作过程。 来此加密官网 免费申请SSL证书 免费SSL多域名证书&#xff0c;泛域名证书。 首先&a…...

【JAVA WEB实用与优化技巧】如何自己封装一个自定义UI的Swagger组件,包含Swagger如何处理JWT无状态鉴权自动TOKEN获取

目录 一、Swagger 简介1. 什么是 Swagger&#xff1f;2. 如何使用 Swagger3. Springboot 中swagger的使用示例1. maven 引入安装2. java配置 二、Swagger UI存在的缺点1.不够方便直观2.请求的参数没有缓存3.不够美观4.如果是JWT 无状态登录&#xff0c;Swagger使用起来就没有那…...

理解大语言模型(二)——从零开始实现GPT-2

相关说明 这篇文章的大部分内容参考自我的新书《解构大语言模型&#xff1a;从线性回归到通用人工智能》&#xff0c;欢迎有兴趣的读者多多支持。 本文涉及到的代码链接如下&#xff1a;regression2chatgpt/ch11_llm/char_gpt.ipynb1 本文将讨论如何利用PyTorch从零开始搭建G…...

SSH远程登录时常见问题解决

SSH时出现WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED! 问题解决——SSH时出现WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED! WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED! 翻译过来就是 警告&#xff1a;远程主机标识已更改&#xff01; 此报错是由于远程的…...

工业级3D开发引擎HOOPS:创新与效率的融合!

在当今这个技术日新月异的时代&#xff0c;3D技术已成为推动各行各业发展的重要力量。从工程设计到游戏开发&#xff0c;从虚拟现实到增强现实&#xff0c;3D技术的应用无处不在&#xff0c;它极大地丰富了我们的生活和工作。而在这样的背景下&#xff0c;HOOPS作为一个强大的3…...

IDEA创建Spring Boot项目

1 打开新建项目界面 如图1&#xff0c;打开IDEA&#xff0c;点击菜单栏的File->New->Project&#xff0c;打开新建项目界面。 图1 新建项目 2 填写项目信息 在新建项目界面点击左侧工具栏的Spring Initializr选项&#xff0c;进行Spring Boot项目信息的填写&#xff…...

mysql实战——xtrabackup全量备份/增量备份及恢复

一、测试前准备 mysql数据库 端口3306数据文件目录 /data/mysql/3306/data 安装目录/usr/lcoal/mysql配置文件/etc/my.cnf 创建数据库 testXtra 创建备份目录 备份目录/data/backup/备份恢复数据文件目录/data/mysql/3307/data备份恢复配置文件/etc/my_3307.cnf 二、开始…...

探索演进:了解IPv4和IPv6之间的区别

探索演进&#xff1a;了解IPv4和IPv6之间的区别 在广阔的互联网领域中&#xff0c;设备之间的通信依赖于一组独特的协议来促进连接。前景协议中&#xff0c;IPv4&#xff08;Internet 协议版本 4&#xff09;和 IPv6&#xff08;Internet 协议版本 6&#xff09;是数字基础设施…...

Python 实现Word (DOC或DOCX)与TXT文本格式互转

目录 引言 安装Python库 使用Python将Word转换为TXT文本格式 使用Python将TXT文本格式转换为Word 引言 Word文档和TXT文本文件是日常工作和生活中两种常见的文件格式&#xff0c;各有其特点和优势。Word文档能够保留丰富的格式设置&#xff0c;如字体、段落、表格、图片等…...

anaconda install on CentOS 7

参考&#xff1a; CentOS 7安装conda并配置环境 CentOS 7安装conda并配置环境_centos conda-CSDN博客...

git管理Codeup云效平台

HTTPS方式实现Git命令 1.进入项目路径&#xff0c;如 cd demo&#xff0c;与此同时&#xff0c;在Codeup平台创建一个空仓库repo&#xff0c;获取空仓库的https协议地址&#xff0c;例如 https://codeup.aliyun.com/xxxx/xxxx/xxx.git。 2.在demo项目下执行 git init命令初始化…...

Pycharm最新安装教程(最新更新时间2024年5月27日)

ps&#xff1a;本教程Pycharm安装&#xff0c;最新更新时间&#xff1a;2024年5月27日&#xff0c;公众号持续更新关注公众号防失联哦 Pycharm 再次更新了一个小版本。又回到老话题&#xff0c;2023.3.2这个版本是否还能安装&#xff0c;笔者也亲测了一下。还是沿用本站之前的…...

医院门诊互联电子病历|基于SSM+vue的医院门诊互联电子病历管理信息系统的设计与实现(源码+数据库+文档)

医院门诊互联电子病历管理信息系统 目录 基于SSM&#xff0b;vue的医院门诊互联电子病历管理信息系统的设计与实现 一、前言 二、系统设计 三、系统功能设计 1系统功能模块 2后台登录模块 5.2.1管理员功能 5.2.2用户功能 5.2.3医生功能 四、数据库设计 五、核心代码…...

H3CNE-8-ARP工作原理

ARP&#xff1a;Address Resolution Protocol 通过目的IP地址请求对方的MAC地址的过程。 数据链路层在进行数据封装时&#xff0c;需要目的MAC地址。 arp -a 查看 arp -d * 清空 主机A发送一个数据包给主机C之前&#xff0c;首先要获取C的MAC地址 数据封装...

上交提出TrustGAIN,提出6G网络中可信AIGC新模式!

月16日至18日&#xff0c;2024全球6G技术大会在南京召开。会上&#xff0c;全球移动通信标准制定组织3GPP&#xff08;第三代合作伙伴计划&#xff09;的3位联席主席分享了3GPP6G标准时间表&#xff1a; 2024年9月&#xff0c;启动6G业务需求研究&#xff1b; 2025年6月&…...

内存泄漏案例分享2-Fragment的内存泄漏

案例2——hprof文件显示出Fragment内存泄漏 接下来我们来看fragment内存泄漏&#xff0c;老规矩查看fields和references&#xff0c;确保它符合内存泄漏的情形&#xff1b;我们点击jump to source查看泄漏的位置 Fragment#MZBannerView#内部类Runnbale /*** Banner 切换时间间…...

Selenium的百度高级搜索-自动化(未完成)

from selenium import webdriver from selenium.webdriver import ActionChainsdriver webdriver.Chrome() driver.implicitly_wait(10) driver.maximize_window() driver.get("https://www.baidu.com/")# 鼠标悬停(难点) setting driver.find_element_by_id("…...

cs与msf权限传递,以及mimikatz抓取win2012明文密码

在网络安全领域&#xff0c;权限提升和凭证盗窃是渗透测试和攻击中的关键环节。通过工具如CS和MSF&#xff0c;攻击者能够有效地在目标网络中进行权限传递。与此同时&#xff0c;Mimikatz作为一款强大的凭证盗窃工具&#xff0c;可以帮助攻击者从Windows Server 2012等系统中提…...

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)

说明&#xff1a; 想象一下&#xff0c;你正在用eNSP搭建一个虚拟的网络世界&#xff0c;里面有虚拟的路由器、交换机、电脑&#xff08;PC&#xff09;等等。这些设备都在你的电脑里面“运行”&#xff0c;它们之间可以互相通信&#xff0c;就像一个封闭的小王国。 但是&#…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP&#xff08;Interior Gateway Protocol&#xff0c;内部网关协议&#xff09; 是一种用于在一个自治系统&#xff08;AS&#xff09;内部传递路由信息的路由协议&#xff0c;主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

【AI学习】三、AI算法中的向量

在人工智能&#xff08;AI&#xff09;算法中&#xff0c;向量&#xff08;Vector&#xff09;是一种将现实世界中的数据&#xff08;如图像、文本、音频等&#xff09;转化为计算机可处理的数值型特征表示的工具。它是连接人类认知&#xff08;如语义、视觉特征&#xff09;与…...

数据库分批入库

今天在工作中&#xff0c;遇到一个问题&#xff0c;就是分批查询的时候&#xff0c;由于批次过大导致出现了一些问题&#xff0c;一下是问题描述和解决方案&#xff1a; 示例&#xff1a; // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包

文章目录 现象&#xff1a;mysql已经安装&#xff0c;但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时&#xff0c;可能是因为以下几个原因&#xff1a;1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...

Java多线程实现之Thread类深度解析

Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...

听写流程自动化实践,轻量级教育辅助

随着智能教育工具的发展&#xff0c;越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式&#xff0c;也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建&#xff0c;…...

CSS设置元素的宽度根据其内容自动调整

width: fit-content 是 CSS 中的一个属性值&#xff0c;用于设置元素的宽度根据其内容自动调整&#xff0c;确保宽度刚好容纳内容而不会超出。 效果对比 默认情况&#xff08;width: auto&#xff09;&#xff1a; 块级元素&#xff08;如 <div>&#xff09;会占满父容器…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...