当前位置: 首页 > news >正文

Netty—FuturePromise

Netty—Future&Promise

  • 一、JDK原生 Future
  • 二、Netty包下的 Future
  • 三、Promise
    • 1、使用Promise同步获取结果
    • 2、使用Promise异步获取结果
    • .3、使用Promise同步获取异常 - sync & get
    • 4、使用Promise同步获取异常 - await
    • 5、使用Promise异步获取异常

在异步处理时,经常用到这两个接口

首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展。

在这里插入图片描述

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果;
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
isCancellable-是否可以取消执行
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
removeListener-删除回调,异步接收结果
setSuccess--设置成功结果
setFailure--设置失败结果

一、JDK原生 Future

关于 java.util.concurrent包下的Future 接口,我想大家应该都很熟悉,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,通过它来获取提交的任务的执行状态和最终的执行结果,我们最常用它的 isDone()get() 方法。

// 尝试取消执行此任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否在正常执行完成之前取消
boolean isCancelled();
// 任务是否完成,完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法都将返回true
boolean isDone();
// 阻塞获取执行结果
V get() throws InterruptedException, ExecutionException;
// 阻塞获取执行结果,指定超时时间
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;

接下来,演示一下使用jdk原生Future获取执行结果~

@Slf4j
public class JdkFutureTest01 {public static void main(String[] args) {// 线程池ExecutorService service = newFixedThreadPool(2);// 提交任务Future<Object> future = service.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {log.info("执行计算");Thread.sleep(1000);return 50;}});try {System.out.println(future.get());service.shutdownNow();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}

二、Netty包下的 Future

原生的Future功能比较有限,Netty扩展了Future并增加了以下方法:

// 判断任务是否成功
boolean isSuccess();
// 判断是否可以取消执行
boolean isCancellable();
// 获取失败的信息
Throwable cause();
// 添加回调方法,异步接收结果
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 添加多个回调方法
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 删除回调方法,异步接收结果
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 删除多个回调方法
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待任务结束,如果任务失败,抛出异常
Future<V> sync() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> syncUninterruptibly();
// 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断
Future<V> await() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> awaitUninterruptibly();
// 等待该future在指定的时间限制内完成。
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
// 等待该future在指定的时间限制内完成。
boolean await(long timeoutMillis) throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeoutMillis);
// 获取任务结果,非阻塞,还未产生结果时返回 null
V getNow();

通过以上扩展的方法我们可以发现,Netty的Future增加了 sync()await() 方法用于阻塞等待,还提供了 addListener() 方法用于添加回调方法,异步接收结果。

sync() 方法内部会先调用 await() 方法,等待 await() 方法返回后,会检查该任务是否失败,如果失败则将失败的异常抛出来。即使用await()方法等待任务结束,如果任务失败,不会抛异常,而是需要通过 isSuccess 判断。然而 sync() 方法是直接抛出异常!

@Override
public Promise<V> sync() throws InterruptedException {await();rethrowIfFailed();return this;
}
private void rethrowIfFailed() {Throwable cause = cause();if (cause == null) {return;}PlatformDependent.throwException(cause);
}

接下来,演示一下使用Netty包下的Future获取执行结果~

@Slf4j
public class NettyFutureTest01 {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();EventLoop eventLoop = eventLoopGroup.next();Future<Integer> future = eventLoop.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.info("执行计算");Thread.sleep(1000);return 66;}});// 阻塞等待future.sync();log.info("收到结果{}", future.getNow());eventLoopGroup.shutdownGracefully();}
}

又或者使用 addListener() 方法用于添加回调方法,异步接收结果。

future.addListener(new GenericFutureListener<Future<? super Integer>>() {@Overridepublic void operationComplete(Future<? super Integer> future) throws Exception {log.info("收到结果{}", future.getNow());eventLoopGroup.shutdownGracefully();}
});

三、Promise

Future支持阻塞等待、添加回调方法、判断执行状态等,而Promise主要是支持状态设置相关方法。当底层I/O操作通过Promise改变执行状态,我们可以通过同步等待的Future立即得到结果。

// 设置成功结果并回调
Promise<V> setSuccess(V result);
// 同上,区别是是否报错
boolean trySuccess(V result);
// 设置失败异常并回调
Promise<V> setFailure(Throwable cause);
// 同上,区别是是否报错
boolean tryFailure(Throwable cause);
// 设置为不可取消状态
boolean setUncancellable();

可见,Promise作为一个特殊的Future,只是增加了一些状态设置方法。所以它常用于传入I/O业务代码中,用于I/O结束后设置成功(或失败)状态,并回调方法。以下是DefaultPromise的继承关系:
在这里插入图片描述

设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是,result字段带有volatile关键字来确保多线程之间的可见性。另外,设置完毕状态后,会尝试唤醒所有在阻塞等待该promise返回结果的线程。

// result 字段的原子更新器
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// 缓存执行结果的字段
private volatile Object result;
// Promise所在的线程
private final EventExecutor executor;
// 一个或多个回调方法
private Object listeners;
// 阻塞线程数量计数器
private short waiters;@Override
public Promise<V> setSuccess(V result) {if (setSuccess0(result)) {return this;}throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {// 原子修改result字段为 objResultif (RESULT_UPDATER.compareAndSet(this, null, objResult) ||RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {if (checkNotifyWaiters()) {notifyListeners();}return true;}return false;
}
private synchronized boolean checkNotifyWaiters() {if (waiters > 0) {// 唤醒其他等待线程notifyAll();}return listeners != null;
}

1、使用Promise同步获取结果

@Slf4j
public class PromiseDemo01 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop = new DefaultEventLoop();Promise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}log.info("set success");promise.setSuccess(10);});log.info("start...");log.info("promise.getNow():{}" , promise.getNow());log.info("promise.get():{}" , promise.get());}
}

2、使用Promise异步获取结果

@Slf4j
public class PromiseDemo03 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.debug("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());log.info("promise.get():{}" , promise.get());}
}

.3、使用Promise同步获取异常 - sync & get

Slf4j
public class PromiseDemo03 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.debug("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());log.info("promise.get():{}" , promise.get());}
}

4、使用Promise同步获取异常 - await

@Slf4j
public class PromiseDemo04 {public static void main(String[] args) throws InterruptedException, ExecutionException {DefaultEventLoop eventLoop = new DefaultEventLoop();Promise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.info("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());promise.await();if (promise.isSuccess()) {log.info("{}", promise.getNow());} else {log.error("{}", promise.cause().toString());}}
}

5、使用Promise异步获取异常

@Slf4j
public class PromiseDemo05 {public static void main(String[] args) throws InterruptedException, ExecutionException {DefaultEventLoop eventLoop = new DefaultEventLoop();Promise<Integer> promise = new DefaultPromise<>(eventLoop);promise.addListener(future -> {if (promise.isSuccess()) {log.info("{}", promise.getNow());} else {log.error("{}", promise.cause().toString());}});eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.info("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());}
}

相关文章:

Netty—FuturePromise

Netty—Future&Promise 一、JDK原生 Future二、Netty包下的 Future三、Promise1、使用Promise同步获取结果2、使用Promise异步获取结果.3、使用Promise同步获取异常 - sync & get4、使用Promise同步获取异常 - await5、使用Promise异步获取异常 在异步处理时&#xff0…...

固定资产卡片乱怎么管理

固定资产卡片是记录公司固定资产信息的重要工具&#xff0c;如果管理不善&#xff0c;容易造成卡片混乱、数据错误等问题。 为了避免这种情况的发生&#xff0c;可以采取以下措施&#xff1a;  建立完善的资产管理制度&#xff0c;明确固定资产的分类、标准和使用情况&#x…...

AutoHotkey(AHK)脚本,自动截图当前屏幕并发送给微信窗口

前提先安装好 AutoHotkey &#xff0c;本脚本依赖AutoHotkey 环境 首先 &#xff0c;设置微信的快捷键 执行代码如下&#xff1a; Loop {SendInput, {Alt down}s{Alt up}Sleep, 2000 ; 等待2秒; 双击鼠标左键Click, 2Sleep, 1000 ; 等待1秒SendInput, {Alt down}a{Alt up}Sl…...

Golang - go build打包文件

Go编译打包文件 1、简单打包 程序 main1.go&#xff1a; package mainimport "fmt"func main() {fmt.Println("Hello World!") } 打包&#xff1a; # 在linux服务上执行下面的3个命令 # linux平台,生成main1可执行程序 CGO_ENABLED0 GOOSlinux GOARCHam…...

Java的归并排序

不爱生姜不吃醋⭐️⭐️⭐️ 如果本文有什么错误的话欢迎在评论区中指正 与其明天开始&#xff0c;不如现在行动&#xff01; 文章目录 &#x1f334;前言&#x1f334;一.归并排序1.概念2.时间复杂度3.代码实现 &#x1f334;二、小和问题1.概念2.举例3.代码实现 &#x1f334…...

B. The Walkway Codeforces Round 893 (Div. 2)

Problem - B - Codeforces 题目大意&#xff1a;小明在数轴上要从1走到n&#xff0c;其中某些坐标上有一些饼干店&#xff0c;共m个&#xff0c;小明身上也有无限多的饼干&#xff0c;它首先一定会在1的位置吃一个饼干&#xff0c;在每个饼干店的位置会吃一个&#xff0c;在前…...

第四篇 DirectShow 采集调用结构关系

第一篇: DirectShow视频采集_会头痛的可达鸭的博客-CSDN博客 一、GraphBuilder 1、IFilterGraph2、IGraphBuilder、ICaptureGraphBuiler2 (1)、CLSID IFilterGraph CLSID_FilterGraphIFilterGraph2 CLSID_CaptureGraphBuilderIGraphBuilder CL…...

2605. 从两个数字数组里生成最小数字

文章目录 Tag题目来源题目解读解题思路方法一&#xff1a;枚举比较法方法二&#xff1a;集合的位运算表示法 写在最后 Tag 【贪心】【位运算】【数组】 题目来源 2605. 从两个数字数组里生成最小数字 题目解读 给定两个各自只包含数字 1 到 9 的两个数组&#xff0c;每个数组…...

服务器发送事件Server-sent events详解与示例

Server-sent events 服务端进行数据推送除了WebSocket之外&#xff0c;还可以使用Server-Send-Event方案。 与 WebSocket不同的是&#xff0c;服务器发送事件是单向的。数据消息只能从服务端到发送到客户端&#xff08;如用户的浏览器&#xff09;。这使其成为不需要从客户端…...

SOLIDWORKS 多实体的建模方式

SOLIDWORKS多实体是SOLIDWORKS中一个非常有用的功能。在SOLIDWORKS中&#xff0c;对于模型的设定通常被大家所熟知的有以下几种类型&#xff1a;零件、装配体以及工程图。 其实还有一种划分&#xff0c;就是多实体。严格意义上来说&#xff0c;多实体既不属于零件也不属于装配体…...

NSSCTF web 刷题记录1

文章目录 前言题目[GXYCTF 2019]禁止套娃方法一方法二 [NCTF 2019]Fake XML cookbook[NSSRound#7 Team]ec_RCE[NCTF 2018]Flask PLUS 前言 今天是2023.9.3&#xff0c;大二开学前的最后一天。老实说ctf的功力还是不太够做的题目太少&#xff0c;新学期新气象。不可急于求成&am…...

遥感指数数据库

目前遥感指数多种多样&#xff0c;那怎么针对不同的应用领域选择合适的植被指数&#xff1f;不同卫星又有哪些可以反演的指数&#xff1f; Henrich等人开发了Index Database(网址&#xff1a;https://www.indexdatabase.de/)&#xff0c;总结了目前主流的遥感指数&#xff0c;…...

如何让insert程序速度快,可以试试联合SQL(insert 和 select 一起使用)?

查询添加可选择SQL执行&#xff0c;速度远超程序执行 insert 和 select案例 insert into 表1(列1,列2,列3,...) select 列1,列2,列3,...from表2(GROUP BY 列)116511 条数据 耗时45秒&#xff0c; 如果是程序查询然后再insert&#xff0c;则需要30分钟左右&#xff01;&#x…...

IP地址、网关、网络/主机号、子网掩码关系

一、IP地址 IP地址组成 IP地址分为两个部分&#xff1a;网络号和主机号 &#xff08;1&#xff09;网络号:标识网段&#xff0c;保证相互连接的两个网段具有不同的标识。 &#xff08;2&#xff09;主机号:标识主机&#xff0c;同一网段内&#xff0c;主机之间具有相同的网…...

使用skvideo.io.vread读取avi视频,报错“No way to determine width or height from video...”

问题描述&#xff1a; 一开始安装sk-video&#xff0c;在使用skvideo.io.vread读取avi视频&#xff0c;报错“No way to determine width or height from video. Need -s in inputdict. Consult documentation on I/O.” 解决方案&#xff1a; 1. 卸载sk-video pip uninsta…...

Nomad 系列-安装

系列文章 Nomad 系列文章 Nomad 简介 开新坑&#xff01;近期算是把自己的家庭实验室环境初步搞好了&#xff0c;终于可以开始进入正题研究了。 首先开始的是 HashiCorp Nomad 系列&#xff0c;欢迎阅读。 关于 Nomad 的简介&#xff0c;之前在 大规模 IoT 边缘容器集群管…...

网络版五子棋C++实现

目录 1.项目介绍 2.开发环境 3.核心技术 4.环境搭建 5.WebSocketpp介绍 5.1WebSocketpp是什么 5.2为什么使用WebSocketpp 5.3原理解析&#xff1a; 5.4WebSocketpp主要特性 6.WebSocketpp使用 7.JsonCpp使用 8.MySQL API 9.项目模块设计以及流程图 10.封装日志宏…...

项目招标投标公众号小程序开源版开发

项目招标投标公众号小程序开源版开发 以下是一个招标投标公众号小程序的功能列表&#xff1a; 用户注册与登录&#xff1a;用户可以注册账号并登录公众号小程序。项目发布&#xff1a;用户可以发布招标项目的详细信息&#xff0c;包括项目名称、招标单位、项目描述、招标要求…...

华为OD机试-机器人走迷宫

题目描述 机器人走一个迷宫,给出迷宫的x和y(x*y的迷宫)并且迷宫中有障碍物,输入k表示障碍物有k个,并且会将障碍物的坐标挨个输入. 机器人从0,0的位置走到x,y的位置并且只能向x,y增加的方向走,不能回退. 如代码类注释展示的样子,#表示可以走的方格,0代表障碍,机器人从0,0的位置…...

Jenkins搭建步骤Linux环境

1、进入目标目录开始准备环境 安装jdk 安装maven 安装tomcat 安装node 下载Jenkins.war并且拷贝进tomcat的webapp的文件夹下。 环境变量配置如下自行更改&#xff1a; #--------------For JDK---------------- export JAVA_HOME/usr/local/java/jdk1.8.0_192 export PATH/usr…...

2023 AZ900备考

文章目录 如何学习最近准备考AZ900考试&#xff0c;找了一圈文档&#xff0c;结果发现看那么多文档&#xff0c;不如直接看官方的教程https://learn.microsoft.com/zh-cn/certifications/exams/az-900/ &#xff0c;简单直接&#xff0c;突然想到纳瓦尔宝典中提到多花时间进行思…...

青翼科技基于VITA57.1的16路数据收发处理平台产品手册

FMC211是一款基于VITA57.1标准规范的实现16路LVDS数据采集、1路光纤数据收发处理FMC子卡模块。 该板卡支持2路CVBS&#xff08;复合视频&#xff09;视频输入&#xff0c;能够自动检测标准的模拟基带电视信号&#xff0c;并将其转变为8位ITU-R.656接口信号或者4:2:2分量视频信…...

Ansible_自动化运维实战(一)

1.DELL的一款服务器的价格、型号、配置&#xff08;CPU&#xff0c;内存、硬盘、支持的RAID功能&#xff09; DELL 服务器的定价、型号和配置因型号而异&#xff0c;可以通过访问 DELL 官方网站或联系 DELL 客户服务获取具体信息。一种示例是 DELL PowerEdge R740&#xff0c;其…...

说说Flink中的State

分析&回答 基本类型划分 在Flink中&#xff0c;按照基本类型&#xff0c;对State做了以下两类的划分&#xff1a; Keyed State&#xff0c;和Key有关的状态类型&#xff0c;它只能被基于KeyedStream之上的操作&#xff0c;方法所使用。我们可以从逻辑上理解这种状态是一…...

适合心理法律在线咨询预约含视频图文电话咨询功能的小程序开发

目前智能手机普及&#xff0c;很多以前需要线下咨询的场景都被搬到了线上&#xff0c;这样既可以使咨询者更方便&#xff0c;也可以使被咨询者接待效率更高&#xff0c;服务更多咨询者。基于此我们开发了专门的一款具有线上咨询功能的小程序&#xff0c;同时为了方便被咨询者服…...

Redis-Cluster集群操作--添加节点、删除节点

一、环境部署 部署好Redis-Cluster集群&#xff0c;参考上个本人的博客&#xff1a;Redis-Cluster集群的部署&#xff08;详细步骤&#xff09;_是胡也是福的博客-CSDN博客 新准备一台机器&#xff0c;修改主机名&#xff0c;关闭防火墙和selinux&#xff0c;参考&#xff1a…...

ModaHub魔搭社区:星环科技向量数据库Hippo社区版来啦

大语言模型正在与企业应用迅速结合,并深刻改变企业的各个产业环节。而大模型训练所使用的数据包含了如文档、图片、音视频等各种类型的非结构化数据,传统关系型数据库能力有限。通过将这些非结构化数据转换为多维向量,可以结构化地在向量数据库中进行管理,实现高效的数据存…...

gitHub添加ssh

gitHub添加ssh 首先你需要有一个github的账户 第一步&#xff1a; 打开终端&#xff0c;输入以下命令&#xff0c;注意“your email”处你自己的邮箱&#xff0c;创建新的ssh ssh-keygen -t ed25519 -C “your email” 第二步&#xff1a;使用ssh登录ssh-agent&#xff0c;终端…...

sql:SQL优化知识点记录(十)

&#xff08;1&#xff09;慢查询日志 Group by的优化跟Order by趋同&#xff0c;只是多了一个having 开启慢查询日志&#xff1a; 演示一下慢sql&#xff1a;4秒之后才会出结果 查看一下&#xff1a;下方显示慢查询的sql &#xff08;2&#xff09;批量插入数据脚本 函数和存…...

STM32 RTC实验

RTC时钟简介 STM32F103的实时时钟&#xff08;RTC&#xff09;是一个独立的定时器。 STM32的RTC模块拥有一组连续计数的计数器&#xff0c;在相对应的软件配置下&#xff0c;可提供时钟日历的功能。 修改计数器的值可以重新设置系统的当前时间和日期。 RTC模块和时钟配置系统…...