Netty—FuturePromise
Netty—Future&Promise
在异步处理时,经常用到这两个接口
首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展。
- jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果;
- netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
- netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
isCancellable | - | 是否可以取消执行 | |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
removeListener | - | 删除回调,异步接收结果 | |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
一、JDK原生 Future
关于 java.util.concurrent包下的Future 接口,我想大家应该都很熟悉,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,通过它来获取提交的任务的执行状态和最终的执行结果,我们最常用它的 isDone()
和 get()
方法。
// 尝试取消执行此任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否在正常执行完成之前取消
boolean isCancelled();
// 任务是否完成,完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法都将返回true
boolean isDone();
// 阻塞获取执行结果
V get() throws InterruptedException, ExecutionException;
// 阻塞获取执行结果,指定超时时间
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
接下来,演示一下使用jdk原生Future获取执行结果~
@Slf4j
public class JdkFutureTest01 {public static void main(String[] args) {// 线程池ExecutorService service = newFixedThreadPool(2);// 提交任务Future<Object> future = service.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {log.info("执行计算");Thread.sleep(1000);return 50;}});try {System.out.println(future.get());service.shutdownNow();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}
二、Netty包下的 Future
原生的Future功能比较有限,Netty扩展了Future并增加了以下方法:
// 判断任务是否成功
boolean isSuccess();
// 判断是否可以取消执行
boolean isCancellable();
// 获取失败的信息
Throwable cause();
// 添加回调方法,异步接收结果
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 添加多个回调方法
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 删除回调方法,异步接收结果
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 删除多个回调方法
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待任务结束,如果任务失败,抛出异常
Future<V> sync() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> syncUninterruptibly();
// 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断
Future<V> await() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> awaitUninterruptibly();
// 等待该future在指定的时间限制内完成。
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
// 等待该future在指定的时间限制内完成。
boolean await(long timeoutMillis) throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeoutMillis);
// 获取任务结果,非阻塞,还未产生结果时返回 null
V getNow();
通过以上扩展的方法我们可以发现,Netty的Future增加了 sync()
和 await()
方法用于阻塞等待,还提供了 addListener()
方法用于添加回调方法,异步接收结果。
sync()
方法内部会先调用await()
方法,等待await()
方法返回后,会检查该任务是否失败,如果失败则将失败的异常抛出来。即使用await()
方法等待任务结束,如果任务失败,不会抛异常,而是需要通过 isSuccess 判断。然而sync()
方法是直接抛出异常!@Override public Promise<V> sync() throws InterruptedException {await();rethrowIfFailed();return this; } private void rethrowIfFailed() {Throwable cause = cause();if (cause == null) {return;}PlatformDependent.throwException(cause); }
接下来,演示一下使用Netty包下的Future获取执行结果~
@Slf4j
public class NettyFutureTest01 {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();EventLoop eventLoop = eventLoopGroup.next();Future<Integer> future = eventLoop.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.info("执行计算");Thread.sleep(1000);return 66;}});// 阻塞等待future.sync();log.info("收到结果{}", future.getNow());eventLoopGroup.shutdownGracefully();}
}
又或者使用 addListener()
方法用于添加回调方法,异步接收结果。
future.addListener(new GenericFutureListener<Future<? super Integer>>() {@Overridepublic void operationComplete(Future<? super Integer> future) throws Exception {log.info("收到结果{}", future.getNow());eventLoopGroup.shutdownGracefully();}
});
三、Promise
Future支持阻塞等待、添加回调方法、判断执行状态等,而Promise主要是支持状态设置相关方法。当底层I/O操作通过Promise改变执行状态,我们可以通过同步等待的Future立即得到结果。
// 设置成功结果并回调
Promise<V> setSuccess(V result);
// 同上,区别是是否报错
boolean trySuccess(V result);
// 设置失败异常并回调
Promise<V> setFailure(Throwable cause);
// 同上,区别是是否报错
boolean tryFailure(Throwable cause);
// 设置为不可取消状态
boolean setUncancellable();
可见,Promise作为一个特殊的Future,只是增加了一些状态设置方法。所以它常用于传入I/O业务代码中,用于I/O结束后设置成功(或失败)状态,并回调方法。以下是DefaultPromise
的继承关系:
设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是,result字段带有volatile关键字来确保多线程之间的可见性。另外,设置完毕状态后,会尝试唤醒所有在阻塞等待该promise返回结果的线程。
// result 字段的原子更新器 private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result"); // 缓存执行结果的字段 private volatile Object result; // Promise所在的线程 private final EventExecutor executor; // 一个或多个回调方法 private Object listeners; // 阻塞线程数量计数器 private short waiters;@Override public Promise<V> setSuccess(V result) {if (setSuccess0(result)) {return this;}throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(V result) {return setValue0(result == null ? SUCCESS : result); } private boolean setValue0(Object objResult) {// 原子修改result字段为 objResultif (RESULT_UPDATER.compareAndSet(this, null, objResult) ||RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {if (checkNotifyWaiters()) {notifyListeners();}return true;}return false; } private synchronized boolean checkNotifyWaiters() {if (waiters > 0) {// 唤醒其他等待线程notifyAll();}return listeners != null; }
1、使用Promise同步获取结果
@Slf4j
public class PromiseDemo01 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop = new DefaultEventLoop();Promise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}log.info("set success");promise.setSuccess(10);});log.info("start...");log.info("promise.getNow():{}" , promise.getNow());log.info("promise.get():{}" , promise.get());}
}
2、使用Promise异步获取结果
@Slf4j
public class PromiseDemo03 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.debug("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());log.info("promise.get():{}" , promise.get());}
}
.3、使用Promise同步获取异常 - sync & get
Slf4j
public class PromiseDemo03 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.debug("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());log.info("promise.get():{}" , promise.get());}
}
4、使用Promise同步获取异常 - await
@Slf4j
public class PromiseDemo04 {public static void main(String[] args) throws InterruptedException, ExecutionException {DefaultEventLoop eventLoop = new DefaultEventLoop();Promise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.info("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());promise.await();if (promise.isSuccess()) {log.info("{}", promise.getNow());} else {log.error("{}", promise.cause().toString());}}
}
5、使用Promise异步获取异常
@Slf4j
public class PromiseDemo05 {public static void main(String[] args) throws InterruptedException, ExecutionException {DefaultEventLoop eventLoop = new DefaultEventLoop();Promise<Integer> promise = new DefaultPromise<>(eventLoop);promise.addListener(future -> {if (promise.isSuccess()) {log.info("{}", promise.getNow());} else {log.error("{}", promise.cause().toString());}});eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.info("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());}
}
相关文章:

Netty—FuturePromise
Netty—Future&Promise 一、JDK原生 Future二、Netty包下的 Future三、Promise1、使用Promise同步获取结果2、使用Promise异步获取结果.3、使用Promise同步获取异常 - sync & get4、使用Promise同步获取异常 - await5、使用Promise异步获取异常 在异步处理时࿰…...

固定资产卡片乱怎么管理
固定资产卡片是记录公司固定资产信息的重要工具,如果管理不善,容易造成卡片混乱、数据错误等问题。 为了避免这种情况的发生,可以采取以下措施: 建立完善的资产管理制度,明确固定资产的分类、标准和使用情况&#x…...

AutoHotkey(AHK)脚本,自动截图当前屏幕并发送给微信窗口
前提先安装好 AutoHotkey ,本脚本依赖AutoHotkey 环境 首先 ,设置微信的快捷键 执行代码如下: Loop {SendInput, {Alt down}s{Alt up}Sleep, 2000 ; 等待2秒; 双击鼠标左键Click, 2Sleep, 1000 ; 等待1秒SendInput, {Alt down}a{Alt up}Sl…...

Golang - go build打包文件
Go编译打包文件 1、简单打包 程序 main1.go: package mainimport "fmt"func main() {fmt.Println("Hello World!") } 打包: # 在linux服务上执行下面的3个命令 # linux平台,生成main1可执行程序 CGO_ENABLED0 GOOSlinux GOARCHam…...

Java的归并排序
不爱生姜不吃醋⭐️⭐️⭐️ 如果本文有什么错误的话欢迎在评论区中指正 与其明天开始,不如现在行动! 文章目录 🌴前言🌴一.归并排序1.概念2.时间复杂度3.代码实现 🌴二、小和问题1.概念2.举例3.代码实现 🌴…...

B. The Walkway Codeforces Round 893 (Div. 2)
Problem - B - Codeforces 题目大意:小明在数轴上要从1走到n,其中某些坐标上有一些饼干店,共m个,小明身上也有无限多的饼干,它首先一定会在1的位置吃一个饼干,在每个饼干店的位置会吃一个,在前…...

第四篇 DirectShow 采集调用结构关系
第一篇: DirectShow视频采集_会头痛的可达鸭的博客-CSDN博客 一、GraphBuilder 1、IFilterGraph2、IGraphBuilder、ICaptureGraphBuiler2 (1)、CLSID IFilterGraph CLSID_FilterGraphIFilterGraph2 CLSID_CaptureGraphBuilderIGraphBuilder CL…...

2605. 从两个数字数组里生成最小数字
文章目录 Tag题目来源题目解读解题思路方法一:枚举比较法方法二:集合的位运算表示法 写在最后 Tag 【贪心】【位运算】【数组】 题目来源 2605. 从两个数字数组里生成最小数字 题目解读 给定两个各自只包含数字 1 到 9 的两个数组,每个数组…...

服务器发送事件Server-sent events详解与示例
Server-sent events 服务端进行数据推送除了WebSocket之外,还可以使用Server-Send-Event方案。 与 WebSocket不同的是,服务器发送事件是单向的。数据消息只能从服务端到发送到客户端(如用户的浏览器)。这使其成为不需要从客户端…...

SOLIDWORKS 多实体的建模方式
SOLIDWORKS多实体是SOLIDWORKS中一个非常有用的功能。在SOLIDWORKS中,对于模型的设定通常被大家所熟知的有以下几种类型:零件、装配体以及工程图。 其实还有一种划分,就是多实体。严格意义上来说,多实体既不属于零件也不属于装配体…...

NSSCTF web 刷题记录1
文章目录 前言题目[GXYCTF 2019]禁止套娃方法一方法二 [NCTF 2019]Fake XML cookbook[NSSRound#7 Team]ec_RCE[NCTF 2018]Flask PLUS 前言 今天是2023.9.3,大二开学前的最后一天。老实说ctf的功力还是不太够做的题目太少,新学期新气象。不可急于求成&am…...

遥感指数数据库
目前遥感指数多种多样,那怎么针对不同的应用领域选择合适的植被指数?不同卫星又有哪些可以反演的指数? Henrich等人开发了Index Database(网址:https://www.indexdatabase.de/),总结了目前主流的遥感指数,…...

如何让insert程序速度快,可以试试联合SQL(insert 和 select 一起使用)?
查询添加可选择SQL执行,速度远超程序执行 insert 和 select案例 insert into 表1(列1,列2,列3,...) select 列1,列2,列3,...from表2(GROUP BY 列)116511 条数据 耗时45秒, 如果是程序查询然后再insert,则需要30分钟左右!&#x…...

IP地址、网关、网络/主机号、子网掩码关系
一、IP地址 IP地址组成 IP地址分为两个部分:网络号和主机号 (1)网络号:标识网段,保证相互连接的两个网段具有不同的标识。 (2)主机号:标识主机,同一网段内,主机之间具有相同的网…...

使用skvideo.io.vread读取avi视频,报错“No way to determine width or height from video...”
问题描述: 一开始安装sk-video,在使用skvideo.io.vread读取avi视频,报错“No way to determine width or height from video. Need -s in inputdict. Consult documentation on I/O.” 解决方案: 1. 卸载sk-video pip uninsta…...

Nomad 系列-安装
系列文章 Nomad 系列文章 Nomad 简介 开新坑!近期算是把自己的家庭实验室环境初步搞好了,终于可以开始进入正题研究了。 首先开始的是 HashiCorp Nomad 系列,欢迎阅读。 关于 Nomad 的简介,之前在 大规模 IoT 边缘容器集群管…...

网络版五子棋C++实现
目录 1.项目介绍 2.开发环境 3.核心技术 4.环境搭建 5.WebSocketpp介绍 5.1WebSocketpp是什么 5.2为什么使用WebSocketpp 5.3原理解析: 5.4WebSocketpp主要特性 6.WebSocketpp使用 7.JsonCpp使用 8.MySQL API 9.项目模块设计以及流程图 10.封装日志宏…...

项目招标投标公众号小程序开源版开发
项目招标投标公众号小程序开源版开发 以下是一个招标投标公众号小程序的功能列表: 用户注册与登录:用户可以注册账号并登录公众号小程序。项目发布:用户可以发布招标项目的详细信息,包括项目名称、招标单位、项目描述、招标要求…...

华为OD机试-机器人走迷宫
题目描述 机器人走一个迷宫,给出迷宫的x和y(x*y的迷宫)并且迷宫中有障碍物,输入k表示障碍物有k个,并且会将障碍物的坐标挨个输入. 机器人从0,0的位置走到x,y的位置并且只能向x,y增加的方向走,不能回退. 如代码类注释展示的样子,#表示可以走的方格,0代表障碍,机器人从0,0的位置…...

Jenkins搭建步骤Linux环境
1、进入目标目录开始准备环境 安装jdk 安装maven 安装tomcat 安装node 下载Jenkins.war并且拷贝进tomcat的webapp的文件夹下。 环境变量配置如下自行更改: #--------------For JDK---------------- export JAVA_HOME/usr/local/java/jdk1.8.0_192 export PATH/usr…...

2023 AZ900备考
文章目录 如何学习最近准备考AZ900考试,找了一圈文档,结果发现看那么多文档,不如直接看官方的教程https://learn.microsoft.com/zh-cn/certifications/exams/az-900/ ,简单直接,突然想到纳瓦尔宝典中提到多花时间进行思…...

青翼科技基于VITA57.1的16路数据收发处理平台产品手册
FMC211是一款基于VITA57.1标准规范的实现16路LVDS数据采集、1路光纤数据收发处理FMC子卡模块。 该板卡支持2路CVBS(复合视频)视频输入,能够自动检测标准的模拟基带电视信号,并将其转变为8位ITU-R.656接口信号或者4:2:2分量视频信…...

Ansible_自动化运维实战(一)
1.DELL的一款服务器的价格、型号、配置(CPU,内存、硬盘、支持的RAID功能) DELL 服务器的定价、型号和配置因型号而异,可以通过访问 DELL 官方网站或联系 DELL 客户服务获取具体信息。一种示例是 DELL PowerEdge R740,其…...

说说Flink中的State
分析&回答 基本类型划分 在Flink中,按照基本类型,对State做了以下两类的划分: Keyed State,和Key有关的状态类型,它只能被基于KeyedStream之上的操作,方法所使用。我们可以从逻辑上理解这种状态是一…...

适合心理法律在线咨询预约含视频图文电话咨询功能的小程序开发
目前智能手机普及,很多以前需要线下咨询的场景都被搬到了线上,这样既可以使咨询者更方便,也可以使被咨询者接待效率更高,服务更多咨询者。基于此我们开发了专门的一款具有线上咨询功能的小程序,同时为了方便被咨询者服…...

Redis-Cluster集群操作--添加节点、删除节点
一、环境部署 部署好Redis-Cluster集群,参考上个本人的博客:Redis-Cluster集群的部署(详细步骤)_是胡也是福的博客-CSDN博客 新准备一台机器,修改主机名,关闭防火墙和selinux,参考:…...

ModaHub魔搭社区:星环科技向量数据库Hippo社区版来啦
大语言模型正在与企业应用迅速结合,并深刻改变企业的各个产业环节。而大模型训练所使用的数据包含了如文档、图片、音视频等各种类型的非结构化数据,传统关系型数据库能力有限。通过将这些非结构化数据转换为多维向量,可以结构化地在向量数据库中进行管理,实现高效的数据存…...

gitHub添加ssh
gitHub添加ssh 首先你需要有一个github的账户 第一步: 打开终端,输入以下命令,注意“your email”处你自己的邮箱,创建新的ssh ssh-keygen -t ed25519 -C “your email” 第二步:使用ssh登录ssh-agent,终端…...

sql:SQL优化知识点记录(十)
(1)慢查询日志 Group by的优化跟Order by趋同,只是多了一个having 开启慢查询日志: 演示一下慢sql:4秒之后才会出结果 查看一下:下方显示慢查询的sql (2)批量插入数据脚本 函数和存…...

STM32 RTC实验
RTC时钟简介 STM32F103的实时时钟(RTC)是一个独立的定时器。 STM32的RTC模块拥有一组连续计数的计数器,在相对应的软件配置下,可提供时钟日历的功能。 修改计数器的值可以重新设置系统的当前时间和日期。 RTC模块和时钟配置系统…...