当前位置: 首页 > news >正文

【Java多线程案例】实现阻塞队列

1. 阻塞队列简介

1.1 阻塞队列概念

阻塞队列:是一种特殊的队列,具有队列"先进先出"的特性,同时相较于普通队列,阻塞队列是线程安全的,并且带有阻塞功能,表现形式如下:

  • 当队列满时,继续入队列就会阻塞,直到有其他线程从队列中取出元素
  • 当队列空时,继续出队列就会阻塞,直到有其他线程往队列中插入元素

基于阻塞队列我们可以实现生产者消费者模型,这在后端开发场景中是相当重要的!

1.2 生产者-消费者模型优势

基于阻塞队列实现的 生产者消费者模型 具有以下两大优势:

  1. 解耦合:

image.png
以搜狗搜索的服务器举例,用户输入搜索关键字 **美容,**客户端的请求到达搜狗的"入口服务器"时,会将请求转发到 广告服务器大搜索服务器,此时广告服务器返回相关广告内容,大搜索服务器根据搜索算法匹配对应结果返回,如果按照这种方式通信,那么入口服务器需要编写两套代码分别同广告服务器和大搜索服务器进行交互,并且一个严重问题是如果其中广告服务器宕机了,会导致入口服务器无法正常工作进而影响大搜索服务器也无法正常工作!!
image.png
而引入阻塞队列后,入口服务器不需要知晓广告服务器和大搜索服务器的存在,只需要往阻塞队列中发送请求即可,而广告服务器和大搜索服务器也不需要知道入口服务器的存在,只需要从阻塞队列中取出请求处理完毕返回给阻塞队列即可,并且当其中大搜索服务器宕机时,不影响其他服务器以及入口服务器的正常运作!

  1. 削峰填谷:

image.png
如果没有阻塞队列,当遇到一些突发场景例如"双十一"大促等客户请求量激增的时候,入口服务器转发的请求量增多,压力就会变大,同理广告服务器和大搜索服务器处理过程复杂繁多,消耗的硬件资源就会激增,达到硬件瓶颈之后服务器就宕机了(直观现象就是客户端发送请求,服务器不会响应了)
image.png
而引入阻塞队列/消息队列之后,由于阻塞队列只负责存储相应的请求或者响应,无需额外的业务处理,因此抗压能力比广告服务器和大搜索服务器更强,当客户请求量激增的时候交由阻塞队列承受,而广告服务器和大搜索服务器只需要按照特定的速率进行读取并返回处理结果即可,就起到了 削峰填谷 的作用!

注意:此处的阻塞队列在现实场景中并不是一个单纯的数据结构,往往是一个基于阻塞队列的服务器程序,例如消息队列(MQ)

2. 标准库中的阻塞队列

2.1 基本介绍

Java标准库提供了现成的阻塞队列数据结构供开发者使用,即BlockingQueue接口
BlockingQueue:该接口具有以下实现类:

  1. ArrayBlockingQueue:基于数组实现的阻塞队列
  2. LinkedBlockingQueue:基于链表实现的阻塞队列
  3. PriorityBlockingQueue:带有优先级的阻塞队列

BlockingQueue方法:该接口具有以下常用方法

  1. 带有阻塞功能:
  • put:向队列中入元素,队列满则阻塞等待
  • take:向队列中取出元素,队列空则阻塞等待
  1. 不带有阻塞功能:
  • peek:返回队头元素(不取出)
  • poll:返回队头元素(取出)
  • offer:向队列中插入元素

2.2 代码示例

/*** 测试Java标准库提供的阻塞队列实现*/
public class TestStandardBlockingQueue {private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);public static void main(String[] args) {// 生产者Thread t1 = new Thread(() -> {int i = 0;while (true) {try {queue.put(i);System.out.println("生产数据:" + i);i++;} catch (InterruptedException e) {throw new RuntimeException(e);}}});// 消费者Thread t2 = new Thread(() -> {while (true) {try {Thread.sleep(1000);int ele = queue.take();System.out.println("消费数据:" + ele);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}

运行效果
image.png
我们在主线程中创建了两个线程,其中t1线程作为生产者不断循环生产元素,而线程t2作为消费者每隔1s消费一个数据,所以我们很快看到当生产数据个数达到容量capacity时就会继续生产就会阻塞等待,直到消费者线程消费数据后才可以继续入队列,这样就实现了一个 生产者-消费者模型

3. 自定义实现阻塞队列

首先我们需要明确实现一个阻塞队列需要哪些步骤?

  1. 首先我们需要实现一个普通队列
  2. 使用锁机制将普通队列变成线程安全的
  3. 通过特殊机制让该队列能够带有"阻塞"功能

3.1 实现普通队列

相信大家如果学过 数据结构与算法 相关课程,应该对队列这种数据结构的实现并不陌生!实现队列有基于数组的也有基于链表的,我们此处采用基于数组实现的,基于数组实现的循环队列也有以下两种方式:

  1. 腾出一个空间用来判断队列空或者满
  2. 使用额外的变量size用来记录当前元素的个数

我们使用第二种方式实现,实现代码如下:

/*** 自定义实现阻塞队列*/
public class MyBlockingQueue {private int head = 0; // 头指针private int tail = 0; // 尾指针private int size = 0; // 当前元素个数private String[] array = null;private int capacity; // 容量public MyBlockingQueue(int capacity) {this.capacity = capacity;this.array = new String[capacity];}/*** 入队列方法*/public void put(String elem) {if (size == capacity) {// 队列已经满了return;}array[tail] = elem;tail++;if (tail >= capacity) {tail = 0;}size++;}/*** 出队列方法*/public String take() {// 判断队列是否为空if (size == 0) {return null;}String topElem = array[head];head++;if (head >= capacity) {head = 0;}size--;return topElem;}public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(3);queue.put("11");queue.put("22");queue.put("33");System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());}
}

3.2 引入锁机制实现线程安全

引入synchronized关键字在原有队列实现的基础上实现线程安全,代码如下:

/*** 自定义实现阻塞队列*/
public class MyBlockingQueue {private int head = 0; // 头指针private int tail = 0; // 尾指针private int size = 0; // 当前元素个数private String[] array = null;private int capacity; // 容量private Object locker = new Object(); // 锁对象public MyBlockingQueue(int capacity) {this.capacity = capacity;this.array = new String[capacity];}/*** 入队列方法*/public void put(String elem) {synchronized (locker) {if (size == capacity) {// 队列已经满了return;}array[tail] = elem;tail++;if (tail >= capacity) {tail = 0;}size++;}}/*** 出队列方法*/public String take() {String topElem = "";synchronized (locker) {// 判断队列是否为空if (size == 0) {return null;}topElem = array[head];head++;if (head >= capacity) {head = 0;}size--;}return topElem;}public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(3);queue.put("11");queue.put("22");queue.put("33");System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());}
}

我们在puttake等关键方法上将 多个线程修改同一个变量 部分的操作进行加锁处理,实现线程安全!

3.3 加入阻塞功能

在普通队列的实现中,如果队列满或者空我们直接使用return关键字返回,但是在多线程环境下我们希望实现阻塞等待的功能,这就可以使用Object类提供的wait/notify这组方法实现阻塞与唤醒机制了!我们就需要考虑阻塞与唤醒的时机了!
何时阻塞:这个问题非常简单,当队列满时入队列操作就应该阻塞等待,而当队列为空时出队列操作就需要阻塞等待
何时唤醒:想必大家都可以想到,对于入队列操作来说,只要队列不满就可以被唤醒,而对于出队列操作来说,队列不为空就可以被唤醒,因此,只要有线程调用take操作出队列,那么入队列的线程就可以被唤醒,而只要有线程调用put操作入队列,那么出队列的线程就可以被唤醒

/*** 自定义实现阻塞队列*/
public class MyBlockingQueue {private int head = 0; // 头指针private int tail = 0; // 尾指针private int size = 0; // 当前元素个数private String[] array = null;private int capacity; // 容量private Object locker = new Object(); // 锁对象public MyBlockingQueue(int capacity) {this.capacity = capacity;this.array = new String[capacity];}/*** 入队列方法*/public void put(String elem) {synchronized (locker) {while (size == capacity) {// 队列已经满了(进行阻塞)try {locker.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}array[tail] = elem;tail++;if (tail >= capacity) {tail = 0;}size++;locker.notifyAll();}}/*** 出队列方法*/public String take() {String topElem = "";synchronized (locker) {// 判断队列是否为空while (size == 0) {try {locker.wait();} catch (InterruptedException e) {throw new RuntimeException(e);}}topElem = array[head];head++;if (head >= capacity) {head = 0;}size--;locker.notifyAll();}return topElem;}public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(10);// 生产者Thread producer = new Thread(() -> {int i = 0;while (true) {queue.put(i + "");System.out.println("生产元素:" + i);i++;}});// 消费者Thread consumer = new Thread(() -> {while (true) {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}String elem = queue.take();System.out.println("消费元素" + elem);}});producer.start();consumer.start();}
}

我们使用wait/notify这组操作实现了阻塞/唤醒功能,并且满足必须使用在synchronized关键字内部的使用条件,这里有一个注意点

为什么我们将if判断条件改成了while循环呢???这是需要考虑清楚的!

image.png
如图所示:一开始由于队列满所以生产者1进入阻塞状态,释放锁,然后生产者2也进入阻塞状态释放锁,此时消费者消费一个元素后唤醒生产者1,然后生产者1生产一个元素后(记住此时队列已满)继续唤醒,但是此时唤醒的恰恰是 生产者2 ,生产者2继续执行生产元素,于是就出现问题,我们总结一下出现问题的原因:

  1. notifyAll是随机唤醒,无法指定唤醒线程,因此可能出现生产者唤醒生产者,消费者唤醒消费者的情况
  2. if判定条件一经执行就无法继续判定,所以生产者2被唤醒后没有再次判断当前队列是否满

于是我们的应对策略就是使用while循环,当线程被唤醒使重新判断,如果队列仍满,入队列操作继续阻塞,而队列仍空,出队列操作继续阻塞!Java标准也推荐我们使用 while 关键字和 wait 关键字一起使用!
image.png

4. 应用场景(实现生产者消费者模型)

我们继续基于我们自定义实现的阻塞队列再来实现 生产者-消费者模型
代码示例(主函数)

public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue(10);// 生产者Thread producer = new Thread(() -> {int i = 0;while (true) {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}queue.put(i + "");System.out.println("生产元素:" + i);i++;}});// 消费者Thread consumer = new Thread(() -> {while (true) {String elem = queue.take();System.out.println("消费元素" + elem);}});producer.start();consumer.start();
}

运行效果
image.png
此时我们创建两个两个线程,producer作为生产者线程每隔1s生产一个元素,consumer作为消费者线程不断消费元素,此时我们看到的就是消费者消费很快,当阻塞队列空时就进入阻塞状态,直到生产者线程生产元素后才被唤醒继续执行!此时我们真正模拟实现了 阻塞队列 这样的数据结构!

相关文章:

【Java多线程案例】实现阻塞队列

1. 阻塞队列简介 1.1 阻塞队列概念 阻塞队列&#xff1a;是一种特殊的队列&#xff0c;具有队列"先进先出"的特性&#xff0c;同时相较于普通队列&#xff0c;阻塞队列是线程安全的&#xff0c;并且带有阻塞功能&#xff0c;表现形式如下&#xff1a; 当队列满时&…...

【制作100个unity游戏之24】unity制作一个3D动物AI生态系统游戏3(附项目源码)

最终效果 文章目录 最终效果系列目录前言随着地面法线旋转在地形上随机生成动物不同部位颜色不同最终效果源码完结系列目录 前言 欢迎来到【制作100个Unity游戏】系列!本系列将引导您一步步学习如何使用Unity开发各种类型的游戏。在这第24篇中,我们将探索如何用unity制作一…...

home work day5

第四章 堆与拷贝构造函数 一 、程序阅读题 1、给出下面程序输出结果。 #include <iostream.h> class example {int a; public: example(int b5){ab;} void print(){aa1;cout <<a<<"";} void print()const {cout<<a<<endl;} …...

c#安全-nativeAOT

文章目录 前记AOT测试反序列化Emit 前记 JIT\AOT JIT编译器&#xff08;Just-in-Time Complier&#xff09;,AOT编译器&#xff08;Ahead-of-Time Complier&#xff09;。 AOT测试 首先编译一段普通代码 using System; using System.Runtime.InteropServices; namespace co…...

【Java】案例:检测MySQL是否存在某数据库,没有则创建

1.代码 package hello; import java.sql.*;public class CeShi {//定义基本数据static final String JDBC_DRIVER "com.mysql.cj.jdbc.Driver";static final String DB_URL "jdbc:mysql://localhost/";static final String USER "your_username&q…...

内网渗透靶场02----Weblogic反序列化+域渗透

网络拓扑&#xff1a; 攻击机&#xff1a; Kali: 192.168.111.129 Win10: 192.168.111.128 靶场基本配置&#xff1a;web服务器双网卡机器&#xff1a; 192.168.111.80&#xff08;模拟外网&#xff09;10.10.10.80&#xff08;模拟内网&#xff09;域成员机器 WIN7PC192.168.…...

[嵌入式系统-9]:C语言程序调用汇编语言程序的三种方式

目录 1. 使用函数声明和函数调用&#xff1a; 2. 使用汇编内联&#xff08;Inline Assembly&#xff09;&#xff1a; 3. 使用汇编代码文件和链接器&#xff1a; C语言程序可以调用汇编程序的方式有多种&#xff0c;下面列举了几种常见的方式&#xff1a; 1. 使用函数声明和…...

备战蓝桥杯---搜索(完结篇)

再看一道不完全是搜索的题&#xff1a; 解法1&#xff1a;贪心并查集&#xff1a; 把冲突事件从大到小排&#xff0c;判断是否两个在同一集合&#xff0c;在的话就返回&#xff0c;不在的话就合并。 下面是AC代码&#xff1a; #include<bits/stdc.h> using namespace …...

深入浅出:Golang的Crypto/SHA256库实战指南

深入浅出&#xff1a;Golang的Crypto/SHA256库实战指南 介绍crypto/sha256库概览主要功能应用场景库结构和接口实例 基础使用教程字符串哈希化文件哈希化处理大型数据 进阶使用方法增量哈希计算使用Salt增强安全性多线程哈希计算 实际案例分析案例一&#xff1a;安全用户认证系…...

Unity_ShaderGraph节点问题

Unity_ShaderGraph节点问题 Unity版本&#xff1a;Unity2023.1.19 为什么在Unity2023.1.19的Shader Graph中找不见PBR Master节点&#xff1f; 以下这个PBR Maste从何而来&#xff1f;...

Java集合 Collection接口

这里写目录标题 集合Collection接口创建一个性表增加元素删除元素修改元素判断元素遍历集合实例判断元素是否存在 集合 Java中的Collection接口是集合类的一个顶级接口&#xff0c;它定义了一些基本的操作&#xff0c;如添加、删除、查找等。Collection接口主要有以下几个常用…...

C# Task的使用

C#中的Task类是.NET框架中用于实现异步编程的核心组件之一&#xff0c;它在.NET Framework 4及更高版本以及.NET Core中广泛使用。Task对象代表一个异步操作&#xff0c;并提供了跟踪异步操作状态、获取结果和处理完成通知的方法。 Task 类提供了对异步操作的封装&#xff0c;…...

尚硅谷Ajax笔记

一天拿下 介绍二级目录三级目录 b站链接 介绍 ajax优缺点 http node.js下载配置好环境 express框架 切换到项目文件夹&#xff0c;执行下面两条命令 有报错,退出用管理员身份打开 或者再命令提示符用管理员身份打开 npm init --yes npm i express请求 <script>//引…...

【MATLAB源码-第138期】基于matlab的D2D蜂窝通信仿真,对比启发式算法,最优化算法和随机算法的性能。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 D2D蜂窝通信介绍 D2D蜂窝通信允许在同一蜂窝网络覆盖区域内的终端设备直接相互通信&#xff0c;而无需数据经过基站或网络核心部分转发。这种通信模式具有几个显著优点&#xff1a;首先&#xff0c;它可以显著降低通信延迟&…...

AcWing 第 142 场周赛 B.最有价值字符串(AcWing 5468) (Java)

AcWing 第 142 场周赛 B.最有价值字符串(AcWing 5468) (Java) 比赛链接&#xff1a;AcWing 第 142 场周赛 x题传送门&#xff1a;B.最有价值字符串 题目&#xff1a;不展示 分析&#xff1a; 题目不难&#xff0c;不过有坑&#x1f62d;。 我们可以定义一个数组记录每个字…...

滑块识别验证

滑块识别 1. 获取图片 测试网站&#xff1a;https://www.geetest.com/adaptive-captcha-demo 2. 点击滑块拼图并开始验证 # 1.打开首页 driver.get(https://www.geetest.com/adaptive-captcha-demo)# 2.点击【滑动拼图验证】 tag WebDriverWait(driver, 30, 0.5).until(la…...

每日五道java面试题之java基础篇(四)

第一题. 访问修饰符 public、private、protected、以及不写&#xff08;默认&#xff09;时的区别&#xff1f; Java 中&#xff0c;可以使⽤访问控制符来保护对类、变量、⽅法和构造⽅法的访问。Java ⽀持 4 种不同的访问权限。 default (即默认&#xff0c;什么也不写&…...

我的docker随笔43:问答平台answer部署

本文介绍开源问答社区平台Answer的容器化部署。 起因 笔者一直想搭建一个类似stack overflower这样的平台&#xff0c;自使用了Typora&#xff0c;就正式全面用MarkdownTyporagit来积累自己的个人知识库&#xff0c;但没有做到web化&#xff0c;现在也还在探索更好的方法。 无…...

17、ELK

17、ELK helm 安装 elkfk&#xff08;kafka 集群外可访问&#xff09; ES/Kibana <— Logstash <— Kafka <— Filebeat 部署顺序&#xff1a; 1、elasticsearch 2、kibana 3、kafka 4、logstash 5、filebeat kubectl create ns elkhelm3部署elkfk 1、elast…...

React+Antd+tree实现树多选功能(选中项受控+支持模糊检索)

1、先上效果 树型控件&#xff0c;选中项形成一棵新的树&#xff0c;若父选中&#xff0c;子自动选中&#xff0c;子取消&#xff0c;父不取消&#xff0c;子选中&#xff0c;所有的父节点自动取消。同时支持模糊检索&#xff0c;会检索出所有包含该内容的关联节点。 2、环境准…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外&#xff0c;K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案&#xff0c;全安装在K8S群集中。 具体可参…...

【Linux】C语言执行shell指令

在C语言中执行Shell指令 在C语言中&#xff0c;有几种方法可以执行Shell指令&#xff1a; 1. 使用system()函数 这是最简单的方法&#xff0c;包含在stdlib.h头文件中&#xff1a; #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

微服务商城-商品微服务

数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)

UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中&#xff0c;UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化&#xf…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...

力扣-35.搜索插入位置

题目描述 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...

以光量子为例,详解量子获取方式

光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学&#xff08;silicon photonics&#xff09;的光波导&#xff08;optical waveguide&#xff09;芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中&#xff0c;光既是波又是粒子。光子本…...

Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)

Aspose.PDF 限制绕过方案&#xff1a;Java 字节码技术实战分享&#xff08;仅供学习&#xff09; 一、Aspose.PDF 简介二、说明&#xff08;⚠️仅供学习与研究使用&#xff09;三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...

基于TurtleBot3在Gazebo地图实现机器人远程控制

1. TurtleBot3环境配置 # 下载TurtleBot3核心包 mkdir -p ~/catkin_ws/src cd ~/catkin_ws/src git clone -b noetic-devel https://github.com/ROBOTIS-GIT/turtlebot3.git git clone -b noetic https://github.com/ROBOTIS-GIT/turtlebot3_msgs.git git clone -b noetic-dev…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...