DelayQueue介绍
5.1 DelayQueue介绍&应用
DelayQueue就是一个延迟队列,生产者写入一个消息,这个消息还有直接被消费的延迟时间。
需要让消息具有延迟的特性。
DelayQueue也是基于二叉堆结构实现的,甚至本事就是基于PriorityQueue实现的功能。二叉堆结构每次获取的是栈顶的数据,需要让DelayQueue中的数据,在比较时,跟根据延迟时间做比较,剩余时间最短的要放在栈顶。查看DelayQueue类信息:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
// 发现DelayQueue中的元素,需要继承Delayed接口。
}
// ==========================================
// 接口继承了Comparable,这样就具备了比较的能力。
public interface Delayed extends Comparable<Delayed> {
// 抽象方法,就是咱们需要设置的延迟时间
long getDelay(TimeUnit unit);
// Comparable接口提供的:public int compareTo(T o);
}
基于上述特点,声明一个可以写入DelayQueue的元素类
public class Task implements Delayed {
/** 任务的名称 */
private String name;
/** 什么时间点执行 */
private Long time;
/**
*
* @param name
* @param delay 单位毫秒。
*/
public Task(String name, Long delay) {
// 任务名称
this.name = name;this.time = System.currentTimeMillis() + delay;
}
/**
* 设置任务什么时候可以出延迟队列
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
// 单位是毫秒,视频里写错了,写成了纳秒,
return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
/**
* 两个任务在插入到延迟队列时的比较方式
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.time - ((Task)o).getTime());
}
}
在使用时,查看到DelayQueue底层用了PriorityQueue,在一定程度上,DelayQueue也是无界队列。
测试效果
public static void main(String[] args) throws InterruptedException {
// 声明元素
Task task1 = new Task("A",1000L);
Task task2 = new Task("B",5000L);
Task task3 = new Task("C",3000L);
Task task4 = new Task("D",2000L);
// 声明阻塞队列
DelayQueue<Task> queue = new DelayQueue<>();
// 将元素添加到延迟队列中
queue.put(task1);
queue.put(task2);
queue.put(task3);
queue.put(task4);
// 获取元素
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
// A,D,C,B
}
在应用时,外卖,15分钟商家需要节点,如果不节点,这个订单自动取消。
可以每下一个订单,就放到延迟队列中,如果规定时间内,商家没有节点,直接通过消费者获取元素,然后取消订单。
只要是有需要延迟一定时间后,再执行的任务,就可以通过延迟队列去实现。
5.2、DelayQueue核心属性
可以查看到DelayQueue就四个核心属性
// 因为DelayQueue依然属于阻塞队列,需要保证线程安全。看到只有一把锁,生产者和消费者使用的是一个lock
private final transient ReentrantLock lock = new ReentrantLock();
// 因为DelayQueue还是基于二叉堆结构实现的,没有必要重新搞一个二叉堆,直接使用的PriorityQueue
private final PriorityQueue<E> q = new PriorityQueue<E>();
// leader一般会存储等待栈顶数据的消费者,在整体写入和消费的过程中,会设置的leader的一些判断。
private Thread leader = null;
// 生产者在插入数据时,不会阻塞的。当前的Condition就是给消费者用的
// 比如消费者在获取数据时,发现栈顶的数据还又没到延迟时间。
// 这个时候,咱们就需要将消费者线程挂起,阻塞一会,阻塞到元素到了延迟时间,或者是,生产者插入的元素到了栈顶,此时生产者会唤醒消费者。
private final Condition available = lock.newCondition();
5.3、DelayQueue写入流程分析
Delay是无界的,数组可以动态的扩容,不需要关注生产者的阻塞问题,他就没有阻塞问题。
这里只需要查看offer方法即可。
public boolean offer(E e) {
// 直接获取lock,加锁。
final ReentrantLock lock = this.lock;lock.lock();
try {
// 直接调用PriorityQueue的插入方法,这里会根据之前重写Delayed接口中的compareTo方法做排序,然后调整上移和下移操作。
q.offer(e);
// 调用优先级队列的peek方法,拿到堆顶的数据
// 拿到堆顶数据后,判断是否是刚刚插入的元素
if (q.peek() == e) {
// leader赋值为null。在消费者的位置再提一嘴
leader = null;
// 唤醒消费者,避免刚刚插入的数据的延迟时间出现问题。
available.signal();
}
// 插入成功,
return true;
} finally {
// 释放锁
lock.unlock();
}
}
5.4、DelayQueue读取流程分析
消费者依然还是存在阻塞的情况,因为有两个情况
● 消费者要拿到栈顶数据,但是延迟时间还没到,此时消费者需要等待一会。
● 消费者要来拿数据,但是发现已经有消费者在等待栈顶数据了,这个后来的消费者也需要等待一会。
依然需要查看四个方法的实现
5.4.1 remove方法
// 依然是AbstractQueue提供的方法,有结果就返回,没结果扔异常
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
5.4.2 poll方法
// poll是浅尝一下,不会阻塞消费者,能拿就拿,拿不到就拉倒
public E poll() {
// 消费者和生产者是一把锁,先拿锁,加锁。
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 拿到栈顶数据。
E first = q.peek();
// 如果元素为null,直接返回null
// 如果getDelay方法返回的结果是大于0的,那说明当前元素还每到延迟时间,元素无法返回,返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)return null;
else
// 到这说明元素不为null,并且已经达到了延迟时间,直接调用优先级队列的poll方法
return q.poll();
} finally {
// 释放锁。
lock.unlock();
}
}
5.4.3 poll(time,unit)方法
这个是允许阻塞的,并且指定一定的时间
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 先将时间转为纳秒
long nanos = unit.toNanos(timeout);
// 拿锁,加锁。
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 死循环。
for (;;) {
// 拿到堆顶数据
E first = q.peek();
// 如果元素为nullif (first == null) {
// 并且等待的时间小于等于0。不能等了,直接返回null
if (nanos <= 0)
return null;
// 说明当前线程还有可以阻塞的时间,阻塞指定时间即可。
else
// 这里挂起线程后,说明队列没有元素,在生产者添加数据之后,会唤醒
nanos = available.awaitNanos(nanos);
// 到这说明,有数据
} else {
// 有数据的话,先获取数据现在是否可以执行,延迟时间是否已经到了指定时间
long delay = first.getDelay(NANOSECONDS);
// 延迟时间是否已经到了,
if (delay <= 0)
// 时间到了,直接执行优先级队列的poll方法,返回元素
return q.poll();
// ==================延迟时间没到,消费者需要等一会===================
// 这个是查看消费者可以等待的时间,
if (nanos <= 0)
// 直接返回nulll
return null;
// ==================延迟时间没到,消费者可以等一会===================
// 把first赋值为null
first = null;
// 如果等待的时间,小于元素剩余的延迟时间,消费者直接挂起。反正暂时拿不到,但是不能保证后续是否有生产者添加一个新的数据,我是可以拿到的。
// 如果已经有一个消费者在等待堆顶数据了,我这边不做额外操作,直接挂起即可。
if (nanos < delay || leader != null)nanos = available.awaitNanos(nanos);
// 当前消费者的阻塞时间可以拿到数据,并且没有其他消费者在等待堆顶数据
else {
// 拿到当前消费者的线程对象
Thread thisThread = Thread.currentThread();
// 将leader设置为当前线程
leader = thisThread;
try {
// 会让当前消费者,阻塞这个元素的延迟时间
long timeLeft = available.awaitNanos(delay);
// 重新计算当前消费者剩余的可阻塞时间,。
nanos -= delay - timeLeft;
} finally {
// 到了时间,将leader设置为null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 没有消费者在等待元素,队列中的元素不为null
if (leader == null && q.peek() != null)
// 只要当前没有leader在等,并且队列有元素,就需要再次唤醒消费者。、
// 避免队列有元素,但是没有消费者处理的问题
available.signal();
// 释放锁lock.unlock();
}
}
5.4.4 take方法
这个是允许阻塞的,但是可以一直等,要么等到元素,要么等到被中断。
public E take() throws InterruptedException {
// 正常加锁,并且允许中断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 拿到元素
E first = q.peek();
if (first == null)
// 没有元素挂起。
available.await();
else {
// 有元素,获取延迟时间。
long delay = first.getDelay(NANOSECONDS);
// 判断延迟时间是不是已经到了
if (delay <= 0)
// 基于优先级队列的poll方法返回
return q.poll();first = null;
// 如果有消费者在等,就正常await挂起
if (leader != null)
available.await();
// 如果没有消费者在等的堆顶数据,我来等
else {
// 获取当前线程
Thread thisThread = Thread.currentThread();
// 设置为leader,代表等待堆顶的数据
leader = thisThread;
try {
// 等待指定(堆顶元素的延迟时间)时长,
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
// leader赋值null
leader = null;
}
}
}
}
} finally {
// 避免消费者无线等,来一个唤醒消费者的方法,一般是其他消费者拿到元素走了之后,并且延迟队列还有元素,就执行if内部唤醒方法
if (leader == null && q.peek() != null)
available.signal();
// 释放锁
lock.unlock();}
}
相关文章:
DelayQueue介绍
5.1 DelayQueue介绍&应用 DelayQueue就是一个延迟队列,生产者写入一个消息,这个消息还有直接被消费的延迟时间。 需要让消息具有延迟的特性。 DelayQueue也是基于二叉堆结构实现的,甚至本事就是基于PriorityQueue实现的功能。二叉堆结构…...
centos8 redis 6.2.6源码安装+主从哨兵
文章目录 centos8 redis 6.2.6源码安装主从哨兵下载解压编译安装配置配置systemd服务启停及开机启动登录验证主从同步配置哨兵哨兵注册systemd centos8 redis 6.2.6源码安装主从哨兵 单机安装 下载解压 cd /data wget http://download.redis.io/releases/redis-6.2.6.tar.gz…...

机器学习之危险品车辆目标检测
危险品的运输涉及从离开仓库到由车辆运输到目的地的风险。监控事故、车辆运动动态以及车辆通过特定区域的频率对于监督车辆运输危险品的过程至关重要。 在线工具推荐: 三维数字孪生场景工具 - GLTF/GLB在线编辑器 - Three.js AI自动纹理化开发 - YOLO 虚幻合成数…...

DHCP协议及实验omnipeek抓包工具分析 IPv4协议
一 抓包命令 adb shell tcpdump -i wlan0 -w /data/tcpdump.pcap 抓包后截图如下 二 DHCP是什么 2.1 DHCP定义 DHCP( Dynamic Host Configuration Protocol, 动态主机配置协议)定义: 存在于应用层(OSI) 前身是BOOTP(Bootstrap Protocol)协议 是一个使用UDP(User …...

考过了PMP,面试的时候应该怎么办?
近期喜番在后台收到了很多同学们的私信,表示自己已经过了8月份的PMP考试,开始着手往项目管理岗位转型,但是对于项目管理岗位的面试却一筹莫展。放轻松,大家的需求喜番都了解了,喜番给大家总结了一些项目经理在面试的时…...

技巧-PyTorch中num_works的作用和实验测试
简介 在 PyTorch 中,num_workers 是 DataLoader 中的一个参数,用于控制数据加载的并发线程数。它允许您在数据加载过程中使用多个线程,以提高数据加载的效率。 具体来说,num_workers 参数指定了 DataLoader 在加载数据时将创建的…...
Android:FragmentTransaction
上一篇Android:FragmentTransaction我们大概介绍了FragmentManager的大致工作流程,知道了每个动作都会添加到Op队列里,并由FragmentTransaction进行管理,那么我们就来看看FragmentTransaction的具体内容。 首先FragmentTransacti…...

5.golang字符串的拆解和拼接
字符串是 Go 中的字节切片。可以通过将一组字符括在双引号中来创建字符串" "。Go 中的字符串是兼容Unicode编码的,并且是UTF-8编码的。 访问字符串的单个字节或字符 由于字符串是字节切片,因此可以访问字符串的每个字节。 func printStr(s …...

配置 Mantis 在 Windows 上的步骤
配置 Mantis Bug Tracker 在 Windows 上的步骤 Mantis Bug Tracker 是一款开源的缺陷跟踪系统,用于管理软件开发中的问题和缺陷。在 Windows 环境下配置 Mantis 可以帮助开发者更方便地进行项目管理。以下是一个详细的教程,包含了 EasyPHP Devserver 和…...

Android 单元测试初体验(二)-断言
[TOC](Android 单元测试初体验(二)-断言) 前言 当初在学校学安卓的时候,老师敢教学进度,翻到单元测试这一章节的时候提了两句,没有把单元测试当重点讲,只是说我们工作中几乎不会用到,果真在之前的几年工作当中我真的没…...

通过ros系统中websocket中发送sensor_msgs::Image数据给web端显示
通过ros系统中websocket中发送sensor_msgs::Image数据给web端显示(一) 图片数据转成base64编码方式 #include <ros/ros.h> #include <signal.h> #include <sensor_msgs/Image.h> #include <message_filters/subscriber.h> #include <message_filt…...
【 Kubernetes 风云录 】- Istio 应用多版本流量控制
文章目录 原理实现DeploymentVirtualServiceDestinationRule 约束部署 目的: 根据不同的引擎版本,可以把请求发送到指定的引擎上。可以实现版本降级。 原理 Istio通过VirtualService和DestinationRule两个资源对象来实现流量管理,其中VirtualService用于…...

比尔盖茨:GPT-5不会比GPT-4好多少,生成式AI已达到极限
比尔盖茨一句爆料,成为机器学习社区热议焦点: “GPT-5不会比GPT-4好多少。” 虽然他已不再正式参与微软的日常运营,但仍在担任顾问,并且熟悉OpenAI领导团队的想法。 消息来自德国《商报》(Handelsblatt)对…...

let const 与var的区别
1、let可以形成块级作用域,在es6之前javascript只有函数作用域,没有块级作用域。在es6之前实现块级作用域: 2、可以看到通过一个立即执行函数表达式,我们实现了一个局部作用域或者块级作用域,但是有了let之后就不需要写这样的代…...
git 把项目托管到码云
码云: 把项目托管到码云 1.注册并微活码云账号(https://gitee.com/] 2.牛成井前博 SSH公钥 (运行 ssh -t gitgitee.com 构测 SSH 公明是否有开成功) 3.创建率户的码人伦;库 4.把本地项口上传到码云对应的空白仓库中 第一:上传个新项目 cd existing_git_…...

sCrypt 现已支持各类主流前端框架
sCrypt 现已支持各类主流前端框架,包括: ReactNext.jsAngularSvelteVue 3.x or 2.x bundled with Vite or Webpack 通过在这些支持的前端框架中集成sCrypt开发环境,你可以直接在前端项目里访问合约实例和调用合约,方便用户使用Se…...

leetcode:2549. 统计桌面上的不同数字(python3解法)
难度:简单 给你一个正整数 n ,开始时,它放在桌面上。在 109 天内,每天都要执行下述步骤: 对于出现在桌面上的每个数字 x ,找出符合 1 < i < n 且满足 x % i 1 的所有数字 i 。然后,将这些…...
数据结构 / day03作业
1.顺序表按元素删除 //main.c#include "head.h" int main(int argc, const char *argv[]) {sqlist *listcreate_space();// printf("&list%p\n", list);int n;int index;data_type element, key;printf("please input n;");scanf("%d&…...

异步爬虫提速实践-在Scrapy中使用Aiohttp/Trio
在构建爬虫系统时,提高爬虫速度是一个关键问题。而使用异步爬虫技术可以显著提升爬取效率。在本文中,我将与大家分享如何在Scrapy中利用Aiohttp或Trio库实现异步爬取,以加快爬虫的速度。让我们开始吧! 1. 安装所需的库 首先&…...
Python与设计模式--访问者模式
23种计模式之 前言 (5)单例模式、工厂模式、简单工厂模式、抽象工厂模式、建造者模式、原型模式、(7)代理模式、装饰器模式、适配器模式、门面模式、组合模式、享元模式、桥梁模式、(11)策略模式、责任链模式、命令模式、中介者模…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...

k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...
Rapidio门铃消息FIFO溢出机制
关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系,以下是深入解析: 门铃FIFO溢出的本质 在RapidIO系统中,门铃消息FIFO是硬件控制器内部的缓冲区,用于临时存储接收到的门铃消息(Doorbell Message)。…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...

day36-多路IO复用
一、基本概念 (服务器多客户端模型) 定义:单线程或单进程同时监测若干个文件描述符是否可以执行IO操作的能力 作用:应用程序通常需要处理来自多条事件流中的事件,比如我现在用的电脑,需要同时处理键盘鼠标…...

windows系统MySQL安装文档
概览:本文讨论了MySQL的安装、使用过程中涉及的解压、配置、初始化、注册服务、启动、修改密码、登录、退出以及卸载等相关内容,为学习者提供全面的操作指导。关键要点包括: 解压 :下载完成后解压压缩包,得到MySQL 8.…...

使用SSE解决获取状态不一致问题
使用SSE解决获取状态不一致问题 1. 问题描述2. SSE介绍2.1 SSE 的工作原理2.2 SSE 的事件格式规范2.3 SSE与其他技术对比2.4 SSE 的优缺点 3. 实战代码 1. 问题描述 目前做的一个功能是上传多个文件,这个上传文件是整体功能的一部分,文件在上传的过程中…...
node.js的初步学习
那什么是node.js呢? 和JavaScript又是什么关系呢? node.js 提供了 JavaScript的运行环境。当JavaScript作为后端开发语言来说, 需要在node.js的环境上进行当JavaScript作为前端开发语言来说,需要在浏览器的环境上进行 Node.js 可…...

工厂方法模式和抽象工厂方法模式的battle
1.案例直接上手 在这个案例里面,我们会实现这个普通的工厂方法,并且对比这个普通工厂方法和我们直接创建对象的差别在哪里,为什么需要一个工厂: 下面的这个是我们的这个案例里面涉及到的接口和对应的实现类: 两个发…...

轻量安全的密码管理工具Vaultwarden
一、Vaultwarden概述 Vaultwarden主要作用是提供一个自托管的密码管理器服务。它是Bitwarden密码管理器的第三方轻量版,由国外开发者在Bitwarden的基础上,采用Rust语言重写而成。 (一)Vaultwarden镜像的作用及特点 轻量级与高性…...