【Netty】Promise 源码分析(十七)
文章目录
- 前言
- 一、Promise 接口
- 二、Netty 的 DefaultPromise
- 2.1、设置任务的成功或失败
- 2.2、获取 Future 任务执行结果和添加监听事件
- 三、Netty 的 DefaultChannelPromise
- 总结
前言
回顾Netty系列文章:
- Netty 概述(一)
- Netty 架构设计(二)
- Netty Channel 概述(三)
- Netty ChannelHandler(四)
- ChannelPipeline源码分析(五)
- 字节缓冲区 ByteBuf (六)(上)
- 字节缓冲区 ByteBuf(七)(下)
- Netty 如何实现零拷贝(八)
- Netty 程序引导类(九)
- Reactor 模型(十)
- 工作原理详解(十一)
- Netty 解码器(十二)
- Netty 编码器(十三)
- Netty 编解码器(十四)
- 自定义解码器、编码器、编解码器(十五)
- Future 源码分析(十六)
本篇文章我就就来分析一下可写的 Future,也就是 promise,Netty 中的 Promise 扩展自 Netty 的 Future。
一、Promise 接口
在 Netty 中,Promise 接口是一种特殊的可写的 Future。 Promise 的核心源码如下:
public interface Promise<V> extends Future<V> {Promise<V> setSuccess(V var1);boolean trySuccess(V var1);Promise<V> setFailure(Throwable var1);boolean tryFailure(Throwable var1);boolean setUncancellable();Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1);Promise<V> await() throws InterruptedException;Promise<V> awaitUninterruptibly();Promise<V> sync() throws InterruptedException;Promise<V> syncUninterruptibly();
}
从上面可以看出,Promise 就是一个可写的 Future。在 Future 机制中,业务逻辑所在任务执行的状态(成功或失败)是在 Future 中实现的;而在 Promise 中,可以在业务逻辑中控制任务的执行结果,相比 Future 更加灵活。
以下是一个 Promise 的示例(伪代码)。
//异步的耗时任务接收一个 Promise
public Promise asynchronousFunction() {Promise promise = new PromiseImpl();Object result = null;return =search() //业务逻辑if (sucess) {promise.setSuccess(result); //通知 promise 当前异步任务成功了,并传入结果} else if (failed) {promise.setFailure(reason);//通知 promise 当前异步任务失败了} else if (error) {promise.setFailure(error);//通知 promise 当前异步任务发生了异常}
}//调用异步的耗时操作
Promise promise = asynchronousFunction(promise);//会立即返回 promise//添加成功处理 / 失败处理 / 异步处理等事件
promise.addListener();//例如:可以添加成功后的执行事件//继续做其他事件,不需要理会 asynchronousFunction 何时结束
doOtherThings();
在 Netty 中,Promise 继承了 Future,因此也具备了 Future 的所有功能。在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败。
Netty 的常用 Promise 类有 DefaultPromise 类,这是 Promise 实现的基础,DefaultChannelPromise 是 DefaultPromise 的子类,加入了channel属性。
二、Netty 的 DefaultPromise
Netty 中涉及异步操作的地方都使用了 Promise 。例如,下面是服务器/客户端启动时的注册任务,最终会调用 Unsafe 的 register,调用过程中会传入一个Promise 。Unsafe 进行事件的注册时调用 Promise 可以设置成功或者失败。
//SingleThreadEventLoop.java
public ChannelFuture register(ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;
}//AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");if (AbstractChannel.this.isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));} else if (!AbstractChannel.this.isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));} else {AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {this.register0(promise);} else {try {eventLoop.execute(new Runnable() {public void run() {AbstractUnsafe.this.register0(promise);}});} catch (Throwable var4) {AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);this.closeForcibly();AbstractChannel.this.closeFuture.setClosed();this.safeSetFailure(promise, var4);}}}
}
DefaultPromise 提供的功能可以分为两个部分;一个是为调用者提供 get()和addListen()用于获取 Future 任务执行结果和添加监听事件;另一部分是为业务处理任务提供setSucess()等方法设置任务的成功或失败。
2.1、设置任务的成功或失败
DefaultPromise 核心源码如下:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {public Promise<V> setSuccess(V result) {if (this.setSuccess0(result)) {return this;} else {throw new IllegalStateException("complete already: " + this);}}public boolean trySuccess(V result) {return this.setSuccess0(result);}public Promise<V> setFailure(Throwable cause) {if (this.setFailure0(cause)) {return this;} else {throw new IllegalStateException("complete already: " + this, cause);}}public boolean tryFailure(Throwable cause) {return this.setFailure0(cause);}public boolean setUncancellable() {if (RESULT_UPDATER.compareAndSet(this, (Object)null, UNCANCELLABLE)) {return true;} else {Object result = this.result;return !isDone0(result) || !isCancelled0(result);}}public boolean isSuccess() {Object result = this.result;return result != null && result != UNCANCELLABLE && !(result instanceof DefaultPromise.CauseHolder);}public boolean isCancellable() {return this.result == null;}//...}
2.2、获取 Future 任务执行结果和添加监听事件
DefaultPromise 的get方法有 3 个。
无参数的get会阻塞等待;
有参数的get会等待指定事件,若未结束就抛出超时异常,这两个get是在其父类 AbstractFuture中实现的。getNow()方法则会立马返回结果。
源码如下:
public V getNow() {Object result = this.result;return !(result instanceof DefaultPromise.CauseHolder) && result != SUCCESS && result != UNCANCELLABLE ? result : null;
}public V get() throws InterruptedException, ExecutionException {Object result = this.result;if (!isDone0(result)) {this.await();result = this.result;}if (result != SUCCESS && result != UNCANCELLABLE) {Throwable cause = this.cause0(result);if (cause == null) {return result;} else if (cause instanceof CancellationException) {throw (CancellationException)cause;} else {throw new ExecutionException(cause);}} else {return null;}
}public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {Object result = this.result;if (!isDone0(result)) {if (!this.await(timeout, unit)) {throw new TimeoutException();}result = this.result;}if (result != SUCCESS && result != UNCANCELLABLE) {Throwable cause = this.cause0(result);if (cause == null) {return result;} else if (cause instanceof CancellationException) {throw (CancellationException)cause;} else {throw new ExecutionException(cause);}} else {return null;}
}
await() 方法判断 Future 任务是否结束,之后获取 this 锁,如果任务未完成则调用 Object 的 wait()等待。源码如下:
public Promise<V> await() throws InterruptedException { if (this.isDone()) {return this;} else if (Thread.interrupted()) {throw new InterruptedException(this.toString());} else {this.checkDeadLock();synchronized(this) {while(!this.isDone()) {this.incWaiters();try {this.wait();} finally {this.decWaiters();}}return this;}}//...
}
addListener 方法被调用时,将传入的回调传入listeners对象中。如果监听多于 1 个,会创建DeflaultFutureListeners对象将回调方法保存在一个数组中。
removeListener会将listeners设置为null(只有一个时)或从数组中移除(多个回调时)。源码如下。
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {ObjectUtil.checkNotNull(listener, "listener");synchronized(this) {this.addListener0(listener);}if (this.isDone()) {this.notifyListeners();}return this;
} public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {ObjectUtil.checkNotNull(listeners, "listeners");synchronized(this) {GenericFutureListener[] var3 = listeners;int var4 = listeners.length;int var5 = 0;while(var5 < var4) {GenericFutureListener<? extends Future<? super V>> listener = var3[var5];if (listener != null) {this.addListener0(listener);++var5;continue;}}}if (this.isDone()) {this.notifyListeners();}return this;
}public Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {ObjectUtil.checkNotNull(listener, "listener");synchronized(this) {this.removeListener0(listener);return this;}
}public Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {ObjectUtil.checkNotNull(listeners, "listeners");synchronized(this) {GenericFutureListener[] var3 = listeners;int var4 = listeners.length;for(int var5 = 0; var5 < var4; ++var5) {GenericFutureListener<? extends Future<? super V>> listener = var3[var5];if (listener == null) {break;}this.removeListener0(listener);}return this;}
}
在添加监听器的过程中,如果任务刚好执行完毕 done(),则立即触发监听事件。触发监听通过notifyListeners()实现。主要逻辑如下:
如果当前addListener的线程(准确来说应该是调用了notifyListeners的线程,因为addListener和setSuccess都会调用notifyListeners和 Promise 内的线程池)与当前执行的线程是同一个线程,则放在线程池中执行,否则提交到线程池中执行;
而如果是执行 Future 任务的线程池中的setSuccess时,调用notifyListeners(),会放在当前线程中执行。内部维护了notifyListeners用来记录是否已经触发过监听事件,只有未触发过且监听列表不为空,才会依次遍历并调用operationComplete。
三、Netty 的 DefaultChannelPromise
DefaultChannelPromise 是 DefaultPromise 的子类,内部维护了一个通道变量 channel。
Promise 机制相关的方法都是调用父类方法。
除此之外,DefaultChannelPromise 还实现了FlushCheckpoint接口,供ChannelFlushPromiseNotifier使用,可以将ChannelFuture注册到ChannelFlushPromiseNotifier类,当有数据写入或到达checkpoint时使用。
核心源码如下:
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {private final Channel channel;private long checkpoint;//...public Channel channel() {return this.channel;}public ChannelPromise setSuccess() {return this.setSuccess((Void)null);}public ChannelPromise setSuccess(Void result) {super.setSuccess(result);return this;}public boolean trySuccess() {return this.trySuccess((Object)null);}public ChannelPromise setFailure(Throwable cause) {super.setFailure(cause);return this;}//...public ChannelPromise promise() {return this;}protected void checkDeadLock() {if (this.channel().isRegistered()) {super.checkDeadLock();}}public ChannelPromise unvoid() {return this;}public boolean isVoid() {return false;}
}
总结
以上我们分析了 Netty 中的 Promise,知道了它是扩展自 Netty 的 Future,是一个可写的 Future。
相关文章:
【Netty】Promise 源码分析(十七)
文章目录 前言一、Promise 接口二、Netty 的 DefaultPromise2.1、设置任务的成功或失败2.2、获取 Future 任务执行结果和添加监听事件 三、Netty 的 DefaultChannelPromise总结 前言 回顾Netty系列文章: Netty 概述(一)Netty 架构设计&…...
测牛学堂:2023最新自动化软件测试教程之python基础(字符串常用api总结)
python字符串常用API总结 1 count 查找某个字符在整个字符串中出现的次数 2 capitalize 将字符串的第一个字符转换为大写 3 center(width,fillchar) 返回一个指定宽度的字符串,fillchar为填充的字符,默认是空格,常用* str1 分隔线 print(st…...

【信号变化检测】使用新颖的短时间条件局部峰值速率特征进行信号变化/事件/异常检测(Matlab代码实现)
、 💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭…...

MQTT GUI 客户端 可视化管理工具
MQTT GUI 客户端 可视化管理工具 介绍 多标签页管理,同时打开多个连接提供原生性能,并且比使用 Electron 等 Web 技术开发的同等应用程序消耗的资源少得多支持 MQTT v5.0 以及 MQTT v3.1.1 协议,支持通过 WebSocket 连接至 MQTT 服务器以树…...

计算机硬件系统 — 冯诺依曼体系结构运行原理解析
目录 文章目录 目录计算机系统计算机硬件系统(冯诺依曼体系结构)PC 主机硬件CPU(中央处理器)CPU 的组成部分CPU 总线控制器单元运算器单元寄存器组超线程与多核架构三级高速缓存为什么需要缓存三级缓存结构 CPU 的指令集指令集的类…...
10.Linux查看文件内容
在 Linux 中,可以使用多种命令来查看文件内容。以下是几个常用的命令及其用法: cat 命令:以行为单位显示整个文件内容。 cat file.txt # 显示名为 file.txt 的文件内容less 命令:分页显示文件内容,可向前/后翻页、搜索…...

API接口测试—详情版(拼多多根据ID取商品详情)
一、为什么要做接口测试 做接口测试的原因主要有以下几个方面: 1. 确保接口功能正确性:接口是不同软件系统或者不同模块之间的数据传输和交互的通道,通过接口测试能够确保不同系统或者模块之间传递的信息准确无误,从而保证了整个…...

【论文阅读】23_SIGIR_Disentangled Contrastive Collaborative Filtering(分离对比协同过滤)
【论文阅读】23_SIGIR_Disentangled Contrastive Collaborative Filtering(分离对比协同过滤) 文章目录 【论文阅读】23_SIGIR_Disentangled Contrastive Collaborative Filtering(分离对比协同过滤)1. 来源2. 介绍3. 模型方法3.1…...
目前的网络情况与特点
现有网络无法进展安全管理与控制,缺乏可管理与安全性,一旦 网络出现病毒与网络攻击现象,将会涉与到个别部门部数据丢失与影 响相关的业务运作。 1 1.1 采用普通傻瓜式交换机 目前全所各部门采用的交换机根本上为 TP-LINK、D-LINK 10/100M 傻瓜…...

css选择器及其权重
1. 类型选择器 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-wid…...

RK3588平台开发系列讲解(项目篇)RKNN-Toolkit2 的使用
平台内核版本安卓版本RK3588Linux 5.10Android 12文章目录 一、RKNN-Toolkit2安装二、模型转换和模型推理三、性能和内存评估沉淀、分享、成长,让自己和他人都能有所收获!😄 📢 NPU 是专门用于神经网络的处理单元。它旨在加速人工智能领域的神经网络算法,如机器视觉和自…...
C/C++基础讲解(九十九)之经典篇(第几天/排序)
C/C++基础讲解(九十九)之经典篇(第几天/排序) 程序之美 前言 很多时候,特别是刚步入大学的学子们,对于刚刚开展的计算机课程基本上是一团迷雾,想要弄明白其中的奥秘,真的要花费一些功夫,我和大家一样都是这么啃过来的,从不知到知知,懵懂到入门,每一步都走的很艰辛,…...

quickstart Guide快速入门
本文档参考backtrader官方文档,是官方文档的完整中文翻译,可作为backtrader中文教程、backtrader中文参考手册、backtrader中文开发手册、backtrader入门资料使用。 快速入门章节目录 快速入门使用平台从0到100:一步一步的演示基本设置设置现…...

Kubernetes 证书详解
K8S 证书介绍 在 Kube-apiserver 中提供了很多认证方式,其中最常用的就是 TLS 认证,当然也有 BootstrapToken,BasicAuth 认证等,只要有一个认证通过,那么 Kube-apiserver 即认为认证通过。下面就主要讲解 TLS 认证。 …...

Python常用数据结构
Python 提供了多种内置的数据结构,用于存储和组织数据。以下是一些常见的 Python 数据结构: 1.列表(List):列表是一个有序、可变的数据集合,可以包含任意类型的元素。列表使用方括号 [] 表示,元…...

CompletableFuture详解-初遇者-很细
目录 一、创建异步任务 1. supplyAsync 2. runAsync 3.获取任务结果的方法 二、异步回调处理 1.thenApply和thenApplyAsync 2.thenAccept和thenAcceptAsync 2.thenRun和thenRunAsync 3.whenComplete和whenCompleteAsync 4.handle和handleAsync 三、多任务组合处理 1…...

【iOS】—— iOS中的相关锁
文章目录 自旋锁1.OSSpinLock2.os_unfair_lock3.atomic 互斥锁pthread_mutexsynchronizedobjc_sync_enterobjc_sync_exit注意事项 NSLockNSRecursiveLock信号量条件锁NSConditionNSConditionLock 读写锁总结 锁作为一种非强制的机制,被用来保证线程安全。每一个线程…...

表单重复提交:
1. 表单重复提交原因 当用户提交完请求,浏览器会记录最后一次请求的全部信息。用户按下功能键F5,就会发起浏览器记录的最后一次请求。如果最后一次请求为添加操作,那么此时刷新按钮就会再次提交数据,造成表单重复提交。 2. 表单…...
【0197】共享内存管理结构(shmem)之创建共享内存分配机制(Shared Memory Allocation)(2 - 2)
文章目录 1. 概述2. 初始化事务管理器 ShmemVariableCache2.1 从共享内存分配 VariableCacheData 大小内存空间2.1.1 分配对齐块2.2 内存空间清零相关文章: 【0195】共享内存管理结构(shmem)之概念篇(1) 【0196】共享内存管理结构(shmem)之创建共享内存分配机制(Shared…...

ChatGPT国内免费使用方法有哪些?
目录 ChatGPT介绍:一、ChatGPT是什么?二、ChatGPT发展:三、ChatGPT 优点:四、国内使用ChatGPT方法五、结语: ChatGPT介绍: 一、ChatGPT是什么? ChatGPT 是一个基于语言模型 GPT-3.5 的聊天机器人,ChatGPT模型是Instruct GPT的姊妹模型(siblingmodel&a…...

Appium+python自动化(十六)- ADB命令
简介 Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具,其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利,如安装和调试…...
多场景 OkHttpClient 管理器 - Android 网络通信解决方案
下面是一个完整的 Android 实现,展示如何创建和管理多个 OkHttpClient 实例,分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

2.Vue编写一个app
1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

全球首个30米分辨率湿地数据集(2000—2022)
数据简介 今天我们分享的数据是全球30米分辨率湿地数据集,包含8种湿地亚类,该数据以0.5X0.5的瓦片存储,我们整理了所有属于中国的瓦片名称与其对应省份,方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级
在互联网的快速发展中,高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司,近期做出了一个重大技术决策:弃用长期使用的 Nginx,转而采用其内部开发…...

微信小程序云开发平台MySQL的连接方式
注:微信小程序云开发平台指的是腾讯云开发 先给结论:微信小程序云开发平台的MySQL,无法通过获取数据库连接信息的方式进行连接,连接只能通过云开发的SDK连接,具体要参考官方文档: 为什么? 因为…...

使用 SymPy 进行向量和矩阵的高级操作
在科学计算和工程领域,向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能,能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作,并通过具体…...

Ubuntu系统复制(U盘-电脑硬盘)
所需环境 电脑自带硬盘:1块 (1T) U盘1:Ubuntu系统引导盘(用于“U盘2”复制到“电脑自带硬盘”) U盘2:Ubuntu系统盘(1T,用于被复制) !!!建议“电脑…...

高考志愿填报管理系统---开发介绍
高考志愿填报管理系统是一款专为教育机构、学校和教师设计的学生信息管理和志愿填报辅助平台。系统基于Django框架开发,采用现代化的Web技术,为教育工作者提供高效、安全、便捷的学生管理解决方案。 ## 📋 系统概述 ### 🎯 系统定…...