【JavaEE】多线程案例-阻塞队列
1. 前言
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
- 在队列为空时,获取元素的线程会等待队列变为非空
- 当队列满时,存储元素的线程会等待队列可用
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
2. 什么是生产者-消费者模型
生产者消费者模型是一种多线程并发协作的模型,由两类线程和一个缓冲区组成:生产者线程生产数据并把数据放在缓冲区,消费者线程从缓冲区取数据并消费。生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品。当存储空间为空时,消费者阻塞;当存储空间满时,生产者阻塞。
3. 生产者-消费者模型的意义
- 解耦合
- 削峰填谷
3.1 解耦合
两个模块的联系越紧密,耦合度就越高,耦合度越高就意味着两个模块的影响程度很大,当一个模块出现问题的时候,另一个模块也会受到影响而导致出现问题,特别是对分布式系统来说:解耦合是非常重要的。
假设上面是一个简单的分布式系统,服务器 A 和服务器 B 之间直接进行交互(服务器 A 向服务器 B 发送请求并接收服务器 B 返回的信息,服务器 B 向服务器 A 发送请求,以及接收服务器 A 返回的信息),服务器 A 和服务器 B 之间的耦合度比较高,当两个服务器之间的一个发生故障的时候就会导致两个服务器都无法使用。
不仅如此,当我们想要再添加一个服务器 C 与服务器 A 之间进行交互的时候,不仅需要对服务器 C 做出修改,还需要对服务器 A 作出修改。
相比上面的情况,如果我们使用生产者-消费者模型的话就可以解决上面的耦合度过高的问题。
服务器 A 接收到客户端发来的请求不是直接发送给服务器 B ,而是将接收到的请求加入到阻塞队列中,然后服务器 B 从阻塞队列中获取到请求,这样就避免了两个服务器之间进行直接的交互,降低了耦合性;不仅如此,当我们需要额外添加一个服务器 C 的时候,就不需要对服务器 A 做出修改,而是直接从阻塞队列获取请求信息。
3.2 削峰填谷
当客户端向服务器 A 短时间发出大量请求信息的话,那么当服务器 A 接收到客户端发来的请求的时候,就会立即将收到的所有信息都发送给服务器 B ,但是由于虽然服务器 A 能够接收的请求量可以很多,但是服务器 B 却不能一次接收这么多请求,就会导致服务器 B 会挂掉。
如果使用生产者-消费者莫模型的话,当客户端向服务器 A 短时间发送大量请求的话,服务器 A 不会将请求发送给 B ,而是发送给阻塞队列中,当阻塞队列满了的时候,服务器 A 就会停止向阻塞队列中发送请求,陷入阻塞状态,等服务器 B 向阻塞队列中受到请求使得阻塞队列容量减少的时候,服务器 A 才会继续向阻塞队列中发送收到的请求,这样就避免了服务器 B 短时间内受到大量的请求而挂掉的情况;如果阻塞队列中收到的请求信息被读取完的时候,服务器 B 就会停止从阻塞队列中读取请求,进入阻塞状态,直到服务器 A 向阻塞队列中发送请求。
4. 如何使用Java标准库提供的阻塞队列
当知道了生产者-消费者模型的意义之后,我们就来看看如何使用阻塞队列。在Java标准库中提供了阻塞队列 BlockingQueue 可以直接使用。
因为 BlockingQueu 是一个接口,无法实例化,所以需要创建出实现了 BlockingQueue 接口的类,而 ArrayBlockingQueue 和 LinkedBlockingQueue 实现了这个接口。
我们还可以观察到,BlockingQueue 还继承了 Queue ,也就是说我们也可以使用 Queue 中的方法,比如 offer 和 poll 等,但是在阻塞队列中不使用这两个方法,因为这两个方法不具有阻塞特性,而是使用 put 和 take 方法。
public class Demo1 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new LinkedBlockingQueue<>();queue.put("123");queue.put("234");queue.put("345");queue.put("456");System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());}
}
这里向阻塞队列中加入了四个数据,但是读取的时候读取了五次,所以看到线程进入了阻塞状态。
5. 自己实现一个阻塞队列
阻塞队列是建立在队列的基础上的,所以要想实现一个阻塞队列,首先需要实现出来一个队列,那么就先来看看如何实现出一个循环队列。
5.1 实现出循环队列
队列比较容易实现,但是循环队列该如何实现呢?当数据到达数组的最后的时候,将数组的下标修改为0,这样就可以达到循环的目的。
当 tail == head 的时候有两种情况:
- 队列中没有数据的时候
- 队列满了的时候
为了区分这两种情况,我们可以使用两种方法:
- 浪费一个空间,当tail++之后,如果tail + 1 == head,则表示队列满了,将 tail 修改为 0
- 定义一个size变量来表示队列中有效数据的个数,当size == queue.length的时候,表示队列满了
class MyQueue {private final String[] data = new String[1000];private int size;private int head = 0;private int tail = 0;public void put(String str) {//当添加数据的时候需要判断队列的容量是否已经满了if(size == data.length) return;data[tail++] = str;size++;if(tail == data.length) tail = 0;}public String take() {//读取数据的时候需要判断队列是否为空if(size == 0) return null;String ret = data[head++];size--;if(head == data.length) head = 0;return ret;}
}
5.2 实现阻塞队列
阻塞队列就是在队列为空时,获取元素的线程会等待队列变为非空;当队列满时,存储元素的线程会等待队列可用。并且因为阻塞队列运用的环境是多线程,需要考虑到线程安全的问题。
5.2.1 加锁
当需要进行查询和修改的操作时,需要对该操作进行加锁。因为我们的 put 和 take 基本上都在查询和修改数据,所以可以将这两个操作直接进行加锁操作。
class MyBlockingQueue {private final String[] data = new String[1000];private int size;private int head = 0;private int tail = 0;public void put(String str) {synchronized (this) {if(size == data.length) return;data[tail++] = str;size++;if(tail == data.length) tail = 0;}}public String take() {synchronized (this) {if(size == 0) return null;String ret = data[head++];size--;if(head == data.length) head = 0;return ret;}}
}
5.2.2 进行阻塞操作
当进行完加锁操作之后,我们还需要实现阻塞的作用,当添加数据的时候,如果队列中容量满了的时候就进入阻塞等待状态,直到进行了 take 读取数据操作删除数据的时候,才停止等待;当读取数据的时候,如果队列为空,那么该线程就进入阻塞等待状态,直到进行了 put 操作。
class MyBlockingQueue {private final String[] data = new String[1000];private int size;private int head = 0;private int tail = 0;public void put(String str) throws InterruptedException {synchronized (this) {if(size == data.length) {this.wait();}data[tail++] = str;size++;if(tail == data.length) tail = 0;//这个 notify 用来唤醒 take 操作的等待this.notify();}}public String take() throws InterruptedException {synchronized (this) {if(size == 0) {this.wait();}String ret = data[head++];size--;if(head == data.length) head = 0;//这个 notify 用来唤醒 put 操作的等待this.notify();return ret;}}
}
5.2.3 解决因被 interrupt 唤醒 waiting 状态的问题
当使用了 wait 和 notify 对 put 和 take 操作进行了阻塞等待和唤醒操作之后,我们还需要注意,难道只有 notify 才会唤醒 WAITING 状态吗?前面我们学习了使用 interrupt 来终止线程,但是 interrup 还会唤醒处于 WAITING 状态的线程,也就是说这里的 WAITING 状态的线程不仅可以被 notify 唤醒,还可以被 interrupt 唤醒。
- 当线程是因为 put 操作队列满了的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 take 操作的 notify 唤醒的时候就意味着此时队列还是满的,当进行添加操作的时候,就会将有效的数据覆盖掉;
- 当线程是因为 take 操作队列为空的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 put 操作的 notify 唤醒的时候就意味着此时队列还是空的,如果进行删除操作,并没有意义。
为了解决 WAITING 状态被 interrupt 唤醒而造成的问题,当线程被唤醒的时候,需要进行判断 size 是否还等于 0 或者 queue.length,如果还等于,就继续进入 WAITING 状态,但是光一次判断是不够的,因为还可能是被 interrupt 唤醒的,所以需要进行多次判断,可以用 while 循环来解决。
class MyBlockingQueue {private final String[] data = new String[1000];private int size;private int head = 0;private int tail = 0;public void put(String str) throws InterruptedException {synchronized (this) {while(size == data.length) {this.wait();}data[tail++] = str;size++;if(tail == data.length) tail = 0;//这个 notify 用来唤醒 take 操作的等待this.notify();}}public String take() throws InterruptedException {synchronized (this) {while(size == 0) {this.wait();}String ret = data[head++];size--;if(head == data.length) head = 0;//这个 notify 用来唤醒 put 操作的等待this.notify();return ret;}}
}
5.2.4 解决因指令重排序造成的问题
因为 put 和 take 操作要进行读和写的操作,可能会因为指令重排序的问题造成其他问题,这里就需要使用 volatile 解决指令重排序问题。
class MyBlockingQueue {private final String[] data = new String[1000];private volatile int size;private volatile int head = 0;private volatile int tail = 0;public void put(String str) throws InterruptedException {synchronized (this) {while(size == data.length) {this.wait();}data[tail++] = str;size++;if(tail == data.length) tail = 0;//这个 notify 用来唤醒 take 操作的等待this.notify();}}public String take() throws InterruptedException {synchronized (this) {while(size == 0) {this.wait();}String ret = data[head++];size--;if(head == data.length) head = 0;//这个 notify 用来唤醒 put 操作的等待this.notify();return ret;}}
}
测试实现的阻塞队列
public class Demo4 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();Thread t1 = new Thread(() -> {while(true) {try {System.out.println("消费元素" + queue.take());} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2 = new Thread(() -> {int count = 1;while(true) {try {queue.put(count + "");System.out.println("生产元素" + count);count++;Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}
让生产速度较慢,使得读取操作阻塞等待插入数据才执行。
public class Demo4 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();Thread t1 = new Thread(() -> {while(true) {try {System.out.println("消费元素" + queue.take());Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2 = new Thread(() -> {int count = 1;while(true) {try {queue.put(count + "");System.out.println("生产元素" + count);count++;} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}
让生产 put 操作进入阻塞等待状态。
相关文章:

【JavaEE】多线程案例-阻塞队列
1. 前言 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是: 在队列为空时,获取元素的线程会等待队列变为非空当队列满时,存储元素的线程会等待队列可用 阻塞队列常用于生产者和消费者的场…...

【物联网】简要介绍最小二乘法—C语言实现
最小二乘法是一种常用的数学方法,用于拟合数据和寻找最佳拟合曲线。它的目标是找到一个函数,使其在数据点上的误差平方和最小化。 文章目录 基本原理最小二乘法的求解应用举例使用C语言实现最小二乘法总结 基本原理 假设我们有一组数据点 ( x 1 , y 1 …...

慢查询SQL如何优化
一.什么是慢SQL? 慢SQL指的是Mysql中执行比较慢的SQL,排查慢SQL最常用的方法是通过慢查询日志来查找慢SQL。Mysql的慢查询日志是Mysql提供的一种日志记录,它用来记录Mysql中响应时间超过long_query_time值的sql,long_query_time的默认时间为10s. 二.查看慢SQL是否…...
UART 通信-使用VIO进行板级验证
串口系列知识分享: (1)串口通信实现-串口发送 (2)串口通信发送多字节数据 (3)串口通信实现-串口接收 (4)UART 通信-使用VIO进行板级验证 (5)串口接收-控制LED闪烁 (6)使用串口发送实现ACX720开发板时钟显示 (7)串口发送+RAM+VGA传图 文章目录 前言一、uart串口协…...
linux 查看可支持的shell
查看可支持的shell linux中支持多种shell类型,所以在shell文件的第一行需要指定所使用的shell #!/bin/bash 指定该脚本使用的是/bin/bash,这样的机制使得我们可以轻松地引用任何的解释器 查看该linux系统支持的shell cat /etc/shells/bin/sh/bin/bash/us…...

微服务简介
微服务简介 微服务架构是一种软件架构模式,它将一个大型应用程序拆分为一组小型、独立的服务,每个服务都有自己的业务逻辑和数据存储。这些服务可以独立开发、部署和扩展,通常使用HTTP或其他轻量级通信协议进行通信。 以下是微服务架构的一…...

PHP自己的框架2.0设置常量并绑定容器(重构篇三)
目录 1、设置常量并绑定容器 2、容器增加设置当前容器的实例和绑定一个类实例当容器 3、将常量绑定到容器中 4、运行效果 1、设置常量并绑定容器 2、容器增加设置当前容器的实例和绑定一个类实例当容器 //设置当前容器的实例public static function setInstance($instance){…...

重建大师提交空三后引擎状态是等待,怎么开启?
答:图片中这是在自由网空三阶段,整个AT都是等待中,可以修改任务目录和监控目录看一下,先设置引擎,再提交空三。...

【数据结构】堆的向上调整和向下调整以及相关方法
💐 🌸 🌷 🍀 🌹 🌻 🌺 🍁 🍃 🍂 🌿 🍄🍝 🍛 🍤 📃 文章目录 一、堆的概念二、堆的性质…...

【蓝桥杯选拔赛真题60】Scratch旋转风车 少儿编程scratch图形化编程 蓝桥杯选拔赛真题解析
目录 scratch旋转风车 一、题目要求 编程实现 二、案例分析 1、角色分析...

JavaSE、JavaEE与Spring的概念和异同点剖析以及规范13 个分析
JavaSE、JavaEE与Spring的概念和异同点剖析以及规范13 个分析 目录概述需求: 设计思路实现思路分析1.什么是JavaSE2.是JavaEE3.什么是Spring 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy&…...

微信小程序的图书馆图书借阅 座位预约系统 读者端设计与实现
该系统基于B/S即所谓浏览器/服务器模式,应用springboot框架,选择MySQL作为后台数据库。系统主要包括系统图书信息、图书借阅、图书归还、自习室信息、自习室预约等功能模块。 关键词 微信小程序的图书馆读者端;微信小程序;java语…...

在阿里云 linux 服务器上查看当前服务器的Nginx配置信息
我们可以通过命令 sudo nginx -t查看到nginx.conf的路径 可以通过 sudo nginx -T查看 nginx 详细配置信息,包括加载的配置文件和配置块的内容 其中也会包括配置文件的内容...

专业招投标书翻译怎样做比较好
在全球经济贸易一体化不断深入的时代,招投标作为国际通用的新型贸易方式,受到了大量中外企业的青睐。根据国际惯例,与招标采购活动有关的一切文件资料,均须使用英文编制。即使允许使用非英文语言编制,也必须随附一份英…...

算法总结10 线段树
算法总结10 线段树 线段树2569. 更新数组后处理求和查询 线段树 有一个数组,我们要: 更新数组的值(例如:都加上一个数,把子数组内的元素取反)查询一个子数组的值(例如:求和&#x…...

518抽奖软件,支持按人像照片抽奖
518抽奖软件简介 518抽奖软件,518我要发,超好用的年会抽奖软件,简约设计风格。 包含文字号码抽奖、照片抽奖两种模式,支持姓名抽奖、号码抽奖、数字抽奖、照片抽奖。(www.518cj.net) 照片抽奖模式 圆角边框 照片抽奖模式下&am…...

数字IC笔试面试题之--时钟偏斜(skew)与抖动(jitter)
1 时钟偏斜(clock skew) 时钟偏斜(偏移)是因为布线长度和负载不同,导致同一时钟上升沿到不同触发器的时间不同。这一时间差,即为时钟偏移。 时钟偏斜可能导致时序违例(本文直接粘贴了参考博客…...
免费api接口:物流api,企业工商查询api,游戏api。。。
免费api接口,物流api,企业工商查询api,游戏api。。。都有。 Facebook Games Services - Facebook Games Services 为游戏开发者提供了各种服务, 包括(但不限于) 成就 API, 分数 API, 应用通知, 请求, 游戏养成和 Facebook SDK for Unity.Google Play Games Service…...
第二十八章 Classes - 引用其他类的方法
文章目录 第二十八章 Classes - 引用其他类的方法引用其他类的方法对当前实例的引用 第二十八章 Classes - 引用其他类的方法 引用其他类的方法 在方法(或例程)中,使用下面的语法来引用其他类中的方法: 要调用类方法并访问其返回值,请使用如下表达式:…...

Android 中集成 TensorFlow Lite图片识别
在上图通过手机的相机拍摄到的物体识别出具体的名称,这个需要通过TensorFlow 训练的模型引用到项目中;以下就是详细地集成 TensorFlow步骤,请按照以下步骤进行操作: 在项目的根目录下的 build.gradle 文件中添加 TensorFlow 的 Ma…...

【Python】 -- 趣味代码 - 小恐龙游戏
文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...

ESP32读取DHT11温湿度数据
芯片:ESP32 环境:Arduino 一、安装DHT11传感器库 红框的库,别安装错了 二、代码 注意,DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...
Frozen-Flask :将 Flask 应用“冻结”为静态文件
Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是:将一个 Flask Web 应用生成成纯静态 HTML 文件,从而可以部署到静态网站托管服务上,如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...

【JVM面试篇】高频八股汇总——类加载和类加载器
目录 1. 讲一下类加载过程? 2. Java创建对象的过程? 3. 对象的生命周期? 4. 类加载器有哪些? 5. 双亲委派模型的作用(好处)? 6. 讲一下类的加载和双亲委派原则? 7. 双亲委派模…...
BLEU评分:机器翻译质量评估的黄金标准
BLEU评分:机器翻译质量评估的黄金标准 1. 引言 在自然语言处理(NLP)领域,衡量一个机器翻译模型的性能至关重要。BLEU (Bilingual Evaluation Understudy) 作为一种自动化评估指标,自2002年由IBM的Kishore Papineni等人提出以来,…...

Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)
引言 工欲善其事,必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后,我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集,就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...

Unity中的transform.up
2025年6月8日,周日下午 在Unity中,transform.up是Transform组件的一个属性,表示游戏对象在世界空间中的“上”方向(Y轴正方向),且会随对象旋转动态变化。以下是关键点解析: 基本定义 transfor…...

【Post-process】【VBA】ETABS VBA FrameObj.GetNameList and write to EXCEL
ETABS API实战:导出框架元素数据到Excel 在结构工程师的日常工作中,经常需要从ETABS模型中提取框架元素信息进行后续分析。手动复制粘贴不仅耗时,还容易出错。今天我们来用简单的VBA代码实现自动化导出。 🎯 我们要实现什么? 一键点击,就能将ETABS中所有框架元素的基…...