当前位置: 首页 > news >正文

利用java8 的 CompletableFuture 优化 Flink 程序,性能提升 50%

你好,我是 shengjk1,多年大厂经验,努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注!你会有如下收益:

  1. 了解大厂经验
  2. 拥有和大厂相匹配的技术等

希望看什么,评论或者私信告诉我!

文章目录

  • 一、前言
  • 二、Flink 代码优化
    • 2.0 问题发现
    • 2.1 原有代码
    • 2.2 CompletableFuture 优化
  • 三、avatorscript 使用的简单介绍
    • 3.1 自定义函数
    • 3.2 从 Map 中取值
    • 3.3 使用 Java 的工具类
    • 3.4 AviatorScript 函数
  • 四、总结


一、前言

目前 Flink 利用 avatorscript 脚本语言,来做到规则的自动化更新。avatorscript将表达式直接翻译成对应的 java 字节码执行,所以在大数据量的情况下,自然而然这里就成为了瓶颈

二、Flink 代码优化

2.0 问题发现

图片.png
通过 Flink UI 发现 window 算子是瓶颈,而 window 算子的核心就是 avatorscript 表达式

2.1 原有代码

xxx
AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
xxx

经过测试平均执行时间在1毫秒以内,但经不住数据量大,所以Flink QPS一直在 11w 左右

2.2 CompletableFuture 优化

xxx
List<CompletableFuture> executeFutures=new ArrayList<>();CompletableFuture<Object> executeFuture = CompletableFuture.supplyAsync(() -> {return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);});
executeFutures.add(executeFuture);for (int i = 0; i < executeFutures.size(); i++) {executeFutures.get(i).get()xxxx
}

修改完上线后,Flink QPS 有原来 11W 增加到 17W 左右

三、avatorscript 使用的简单介绍

为了让你更容易理解 avatorscript,这里我们也可以先简单的介绍一下:

3.1 自定义函数

class AddFunction extends AbstractFunction {@Overridepublic AviatorObject call(Map<String, Object> env,AviatorObject arg1, AviatorObject arg2) {Number left = FunctionUtils.getNumberValue(arg1, env);Number right = FunctionUtils.getNumberValue(arg2, env);return new AviatorDouble(left.intValue() + right.intValue());}public String getName() {return "add" ;}
}public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {//注册函数AviatorEvaluator.addFunction(new AddFunction());System.out.println(AviatorEvaluator.execute( "add(2,1)" ));
}

3.2 从 Map 中取值

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {//注册函数AviatorEvaluator.addFunction(new AddFunction());HashMap<String, Object> stringObjectHashMap = new HashMap<>();stringObjectHashMap.put( "testId1" , 1);stringObjectHashMap.put( "testId2" , 2);Object execute = AviatorEvaluator.execute( "add(testId1,testId2)" , stringObjectHashMap);

3.3 使用 Java 的工具类

public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {HashMap<String, Object> stringObjectHashMap = new HashMap<>();stringObjectHashMap.put( "ip" , "a1111" );// stringObjectHashMap.put("result", "a&B&C&d");stringObjectHashMap.put( "voucher_endtime" , "2022.03.02 11:32" );stringObjectHashMap.put( "imei2" , "v1aaaaaa1" );stringObjectHashMap.put( "testId" , "v1ot_service_quality_1111" );stringObjectHashMap.put( "testId1" , "sku" );stringObjectHashMap.put( "a" , "123" );stringObjectHashMap.put( "a1" , "null" );stringObjectHashMap.put( "b1" , 123);AviatorEvaluator.addStaticFunctions( "doubleStatic" , Double.class);AviatorEvaluator.addInstanceFunctions( "doubleInstance" , Double.class)execute2 = AviatorEvaluator.execute( "(doubleStatic.valueOf(sys_net_bandwidth))" , stringObjectHashMap);System.out.println(execute2);execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) " , stringObjectHashMap);System.out.println( "###" + execute2);execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(str(voucher)))" , stringObjectHashMap);

3.4 AviatorScript 函数

## examples/function.av
fn add(x, y) {return x + y;
}
p(add(1,2))
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {String function = "## examples/function.av\n" +"\n" +"fn add(x, y) {\n" +"  return x + y;\n" +"}" ;AviatorEvaluator.defineFunction( "add" , function);System.out.println( "defineFunction6666================+" + AviatorEvaluator.execute( "add(1,2)" , stringObjectHashMap));
}

四、总结

本文主要介绍了 Flink 中使用 avatorscript 脚本语言的问题,以及如何通过 CompletableFuture 优化代码来提高 Flink QPS。同时,还介绍了 avatorscript 的使用方法,包括自定义函数、从 Map 中取值、使用 Java 工具类和 AviatorScript 函数。通过本文的介绍,读者可以更好地了解 Flink 中 avatorscript 的使用方法,以及如何优化代码来提高 Flink QPS。

相关文章:

利用java8 的 CompletableFuture 优化 Flink 程序,性能提升 50%

你好&#xff0c;我是 shengjk1&#xff0c;多年大厂经验&#xff0c;努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注&#xff01;你会有如下收益&#xff1a; 了解大厂经验拥有和大厂相匹配的技术等 希望看什么&#xff0c;评论或者私信告诉我&#xff01; 文章目录 一…...

香橙派 AIpro综合体验及AI样例运行

香橙派 AIpro综合体验及AI样例运行 环境&#xff1a; 香橙派版本&#xff1a; AIpro(8TOPSINT8) OS : Ubuntu 22.04.3 LTS(GNU/Linux 5.10.0 aarch64) (2024-03-18) 远程服务端1&#xff1a;OpenSSH 8.9p1 远程服务端2&#xff1a;TightVNC Server 1.3.10 远程客户端&#xf…...

通过域名接口申请免费的ssl多域名证书

来此加密已顺利接入阿里云的域名接口&#xff0c;用户只需一键调用&#xff0c;便可轻松完成域名验证&#xff0c;从而更高效地申请证书。接下来&#xff0c;让我们详细解读一下整个操作过程。 来此加密官网 免费申请SSL证书 免费SSL多域名证书&#xff0c;泛域名证书。 首先&a…...

【JAVA WEB实用与优化技巧】如何自己封装一个自定义UI的Swagger组件,包含Swagger如何处理JWT无状态鉴权自动TOKEN获取

目录 一、Swagger 简介1. 什么是 Swagger&#xff1f;2. 如何使用 Swagger3. Springboot 中swagger的使用示例1. maven 引入安装2. java配置 二、Swagger UI存在的缺点1.不够方便直观2.请求的参数没有缓存3.不够美观4.如果是JWT 无状态登录&#xff0c;Swagger使用起来就没有那…...

理解大语言模型(二)——从零开始实现GPT-2

相关说明 这篇文章的大部分内容参考自我的新书《解构大语言模型&#xff1a;从线性回归到通用人工智能》&#xff0c;欢迎有兴趣的读者多多支持。 本文涉及到的代码链接如下&#xff1a;regression2chatgpt/ch11_llm/char_gpt.ipynb1 本文将讨论如何利用PyTorch从零开始搭建G…...

SSH远程登录时常见问题解决

SSH时出现WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED! 问题解决——SSH时出现WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED! WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED! 翻译过来就是 警告&#xff1a;远程主机标识已更改&#xff01; 此报错是由于远程的…...

工业级3D开发引擎HOOPS:创新与效率的融合!

在当今这个技术日新月异的时代&#xff0c;3D技术已成为推动各行各业发展的重要力量。从工程设计到游戏开发&#xff0c;从虚拟现实到增强现实&#xff0c;3D技术的应用无处不在&#xff0c;它极大地丰富了我们的生活和工作。而在这样的背景下&#xff0c;HOOPS作为一个强大的3…...

IDEA创建Spring Boot项目

1 打开新建项目界面 如图1&#xff0c;打开IDEA&#xff0c;点击菜单栏的File->New->Project&#xff0c;打开新建项目界面。 图1 新建项目 2 填写项目信息 在新建项目界面点击左侧工具栏的Spring Initializr选项&#xff0c;进行Spring Boot项目信息的填写&#xff…...

mysql实战——xtrabackup全量备份/增量备份及恢复

一、测试前准备 mysql数据库 端口3306数据文件目录 /data/mysql/3306/data 安装目录/usr/lcoal/mysql配置文件/etc/my.cnf 创建数据库 testXtra 创建备份目录 备份目录/data/backup/备份恢复数据文件目录/data/mysql/3307/data备份恢复配置文件/etc/my_3307.cnf 二、开始…...

探索演进:了解IPv4和IPv6之间的区别

探索演进&#xff1a;了解IPv4和IPv6之间的区别 在广阔的互联网领域中&#xff0c;设备之间的通信依赖于一组独特的协议来促进连接。前景协议中&#xff0c;IPv4&#xff08;Internet 协议版本 4&#xff09;和 IPv6&#xff08;Internet 协议版本 6&#xff09;是数字基础设施…...

Python 实现Word (DOC或DOCX)与TXT文本格式互转

目录 引言 安装Python库 使用Python将Word转换为TXT文本格式 使用Python将TXT文本格式转换为Word 引言 Word文档和TXT文本文件是日常工作和生活中两种常见的文件格式&#xff0c;各有其特点和优势。Word文档能够保留丰富的格式设置&#xff0c;如字体、段落、表格、图片等…...

anaconda install on CentOS 7

参考&#xff1a; CentOS 7安装conda并配置环境 CentOS 7安装conda并配置环境_centos conda-CSDN博客...

git管理Codeup云效平台

HTTPS方式实现Git命令 1.进入项目路径&#xff0c;如 cd demo&#xff0c;与此同时&#xff0c;在Codeup平台创建一个空仓库repo&#xff0c;获取空仓库的https协议地址&#xff0c;例如 https://codeup.aliyun.com/xxxx/xxxx/xxx.git。 2.在demo项目下执行 git init命令初始化…...

Pycharm最新安装教程(最新更新时间2024年5月27日)

ps&#xff1a;本教程Pycharm安装&#xff0c;最新更新时间&#xff1a;2024年5月27日&#xff0c;公众号持续更新关注公众号防失联哦 Pycharm 再次更新了一个小版本。又回到老话题&#xff0c;2023.3.2这个版本是否还能安装&#xff0c;笔者也亲测了一下。还是沿用本站之前的…...

医院门诊互联电子病历|基于SSM+vue的医院门诊互联电子病历管理信息系统的设计与实现(源码+数据库+文档)

医院门诊互联电子病历管理信息系统 目录 基于SSM&#xff0b;vue的医院门诊互联电子病历管理信息系统的设计与实现 一、前言 二、系统设计 三、系统功能设计 1系统功能模块 2后台登录模块 5.2.1管理员功能 5.2.2用户功能 5.2.3医生功能 四、数据库设计 五、核心代码…...

H3CNE-8-ARP工作原理

ARP&#xff1a;Address Resolution Protocol 通过目的IP地址请求对方的MAC地址的过程。 数据链路层在进行数据封装时&#xff0c;需要目的MAC地址。 arp -a 查看 arp -d * 清空 主机A发送一个数据包给主机C之前&#xff0c;首先要获取C的MAC地址 数据封装...

上交提出TrustGAIN,提出6G网络中可信AIGC新模式!

月16日至18日&#xff0c;2024全球6G技术大会在南京召开。会上&#xff0c;全球移动通信标准制定组织3GPP&#xff08;第三代合作伙伴计划&#xff09;的3位联席主席分享了3GPP6G标准时间表&#xff1a; 2024年9月&#xff0c;启动6G业务需求研究&#xff1b; 2025年6月&…...

内存泄漏案例分享2-Fragment的内存泄漏

案例2——hprof文件显示出Fragment内存泄漏 接下来我们来看fragment内存泄漏&#xff0c;老规矩查看fields和references&#xff0c;确保它符合内存泄漏的情形&#xff1b;我们点击jump to source查看泄漏的位置 Fragment#MZBannerView#内部类Runnbale /*** Banner 切换时间间…...

Selenium的百度高级搜索-自动化(未完成)

from selenium import webdriver from selenium.webdriver import ActionChainsdriver webdriver.Chrome() driver.implicitly_wait(10) driver.maximize_window() driver.get("https://www.baidu.com/")# 鼠标悬停(难点) setting driver.find_element_by_id("…...

cs与msf权限传递,以及mimikatz抓取win2012明文密码

在网络安全领域&#xff0c;权限提升和凭证盗窃是渗透测试和攻击中的关键环节。通过工具如CS和MSF&#xff0c;攻击者能够有效地在目标网络中进行权限传递。与此同时&#xff0c;Mimikatz作为一款强大的凭证盗窃工具&#xff0c;可以帮助攻击者从Windows Server 2012等系统中提…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)

概述 在 Swift 开发语言中&#xff0c;各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过&#xff0c;在涉及到多个子类派生于基类进行多态模拟的场景下&#xff0c;…...

DAY 47

三、通道注意力 3.1 通道注意力的定义 # 新增&#xff1a;通道注意力模块&#xff08;SE模块&#xff09; class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...

大数据零基础学习day1之环境准备和大数据初步理解

学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 &#xff08;1&#xff09;设置网关 打开VMware虚拟机&#xff0c;点击编辑…...

1688商品列表API与其他数据源的对接思路

将1688商品列表API与其他数据源对接时&#xff0c;需结合业务场景设计数据流转链路&#xff0c;重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点&#xff1a; 一、核心对接场景与目标 商品数据同步 场景&#xff1a;将1688商品信息…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具&#xff0c;在大规模数据获取中发挥着关键作用。然而&#xff0c;传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时&#xff0c;常出现数据质…...

django blank 与 null的区别

1.blank blank控制表单验证时是否允许字段为空 2.null null控制数据库层面是否为空 但是&#xff0c;要注意以下几点&#xff1a; Django的表单验证与null无关&#xff1a;null参数控制的是数据库层面字段是否可以为NULL&#xff0c;而blank参数控制的是Django表单验证时字…...

MyBatis中关于缓存的理解

MyBatis缓存 MyBatis系统当中默认定义两级缓存&#xff1a;一级缓存、二级缓存 默认情况下&#xff0c;只有一级缓存开启&#xff08;sqlSession级别的缓存&#xff09;二级缓存需要手动开启配置&#xff0c;需要局域namespace级别的缓存 一级缓存&#xff08;本地缓存&#…...

WEB3全栈开发——面试专业技能点P7前端与链上集成

一、Next.js技术栈 ✅ 概念介绍 Next.js 是一个基于 React 的 服务端渲染&#xff08;SSR&#xff09;与静态网站生成&#xff08;SSG&#xff09; 框架&#xff0c;由 Vercel 开发。它简化了构建生产级 React 应用的过程&#xff0c;并内置了很多特性&#xff1a; ✅ 文件系…...

MLP实战二:MLP 实现图像数字多分类

任务 实战&#xff08;二&#xff09;&#xff1a;MLP 实现图像多分类 基于 mnist 数据集&#xff0c;建立 mlp 模型&#xff0c;实现 0-9 数字的十分类 task: 1、实现 mnist 数据载入&#xff0c;可视化图形数字&#xff1b; 2、完成数据预处理&#xff1a;图像数据维度转换与…...

分布式计算框架学习笔记

一、&#x1f310; 为什么需要分布式计算框架&#xff1f; 资源受限&#xff1a;单台机器 CPU/GPU 内存有限。 任务复杂&#xff1a;模型训练、数据处理、仿真并发等任务耗时严重。 并行优化&#xff1a;通过任务拆分和并行执行提升效率。 可扩展部署&#xff1a;适配从本地…...