【JavaEE】多线程案例-阻塞队列
1. 前言
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
- 在队列为空时,获取元素的线程会等待队列变为非空
- 当队列满时,存储元素的线程会等待队列可用
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
2. 什么是生产者-消费者模型
生产者消费者模型是一种多线程并发协作的模型,由两类线程和一个缓冲区组成:生产者线程生产数据并把数据放在缓冲区,消费者线程从缓冲区取数据并消费。生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品。当存储空间为空时,消费者阻塞;当存储空间满时,生产者阻塞。
3. 生产者-消费者模型的意义
- 解耦合
- 削峰填谷
3.1 解耦合
两个模块的联系越紧密,耦合度就越高,耦合度越高就意味着两个模块的影响程度很大,当一个模块出现问题的时候,另一个模块也会受到影响而导致出现问题,特别是对分布式系统来说:解耦合是非常重要的。
假设上面是一个简单的分布式系统,服务器 A 和服务器 B 之间直接进行交互(服务器 A 向服务器 B 发送请求并接收服务器 B 返回的信息,服务器 B 向服务器 A 发送请求,以及接收服务器 A 返回的信息),服务器 A 和服务器 B 之间的耦合度比较高,当两个服务器之间的一个发生故障的时候就会导致两个服务器都无法使用。
不仅如此,当我们想要再添加一个服务器 C 与服务器 A 之间进行交互的时候,不仅需要对服务器 C 做出修改,还需要对服务器 A 作出修改。
相比上面的情况,如果我们使用生产者-消费者模型的话就可以解决上面的耦合度过高的问题。
服务器 A 接收到客户端发来的请求不是直接发送给服务器 B ,而是将接收到的请求加入到阻塞队列中,然后服务器 B 从阻塞队列中获取到请求,这样就避免了两个服务器之间进行直接的交互,降低了耦合性;不仅如此,当我们需要额外添加一个服务器 C 的时候,就不需要对服务器 A 做出修改,而是直接从阻塞队列获取请求信息。
3.2 削峰填谷
当客户端向服务器 A 短时间发出大量请求信息的话,那么当服务器 A 接收到客户端发来的请求的时候,就会立即将收到的所有信息都发送给服务器 B ,但是由于虽然服务器 A 能够接收的请求量可以很多,但是服务器 B 却不能一次接收这么多请求,就会导致服务器 B 会挂掉。
如果使用生产者-消费者莫模型的话,当客户端向服务器 A 短时间发送大量请求的话,服务器 A 不会将请求发送给 B ,而是发送给阻塞队列中,当阻塞队列满了的时候,服务器 A 就会停止向阻塞队列中发送请求,陷入阻塞状态,等服务器 B 向阻塞队列中受到请求使得阻塞队列容量减少的时候,服务器 A 才会继续向阻塞队列中发送收到的请求,这样就避免了服务器 B 短时间内受到大量的请求而挂掉的情况;如果阻塞队列中收到的请求信息被读取完的时候,服务器 B 就会停止从阻塞队列中读取请求,进入阻塞状态,直到服务器 A 向阻塞队列中发送请求。
4. 如何使用Java标准库提供的阻塞队列
当知道了生产者-消费者模型的意义之后,我们就来看看如何使用阻塞队列。在Java标准库中提供了阻塞队列 BlockingQueue 可以直接使用。
因为 BlockingQueu 是一个接口,无法实例化,所以需要创建出实现了 BlockingQueue 接口的类,而 ArrayBlockingQueue 和 LinkedBlockingQueue 实现了这个接口。
我们还可以观察到,BlockingQueue 还继承了 Queue ,也就是说我们也可以使用 Queue 中的方法,比如 offer 和 poll 等,但是在阻塞队列中不使用这两个方法,因为这两个方法不具有阻塞特性,而是使用 put 和 take 方法。
public class Demo1 {public static void main(String[] args) throws InterruptedException {BlockingQueue<String> queue = new LinkedBlockingQueue<>();queue.put("123");queue.put("234");queue.put("345");queue.put("456");System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());}
}
这里向阻塞队列中加入了四个数据,但是读取的时候读取了五次,所以看到线程进入了阻塞状态。
5. 自己实现一个阻塞队列
阻塞队列是建立在队列的基础上的,所以要想实现一个阻塞队列,首先需要实现出来一个队列,那么就先来看看如何实现出一个循环队列。
5.1 实现出循环队列
队列比较容易实现,但是循环队列该如何实现呢?当数据到达数组的最后的时候,将数组的下标修改为0,这样就可以达到循环的目的。
当 tail == head 的时候有两种情况:
- 队列中没有数据的时候
- 队列满了的时候
为了区分这两种情况,我们可以使用两种方法:
- 浪费一个空间,当tail++之后,如果tail + 1 == head,则表示队列满了,将 tail 修改为 0
- 定义一个size变量来表示队列中有效数据的个数,当size == queue.length的时候,表示队列满了
class MyQueue {private final String[] data = new String[1000];private int size;private int head = 0;private int tail = 0;public void put(String str) {//当添加数据的时候需要判断队列的容量是否已经满了if(size == data.length) return;data[tail++] = str;size++;if(tail == data.length) tail = 0;}public String take() {//读取数据的时候需要判断队列是否为空if(size == 0) return null;String ret = data[head++];size--;if(head == data.length) head = 0;return ret;}
}
5.2 实现阻塞队列
阻塞队列就是在队列为空时,获取元素的线程会等待队列变为非空;当队列满时,存储元素的线程会等待队列可用。并且因为阻塞队列运用的环境是多线程,需要考虑到线程安全的问题。
5.2.1 加锁
当需要进行查询和修改的操作时,需要对该操作进行加锁。因为我们的 put 和 take 基本上都在查询和修改数据,所以可以将这两个操作直接进行加锁操作。
class MyBlockingQueue {private final String[] data = new String[1000];private int size;private int head = 0;private int tail = 0;public void put(String str) {synchronized (this) {if(size == data.length) return;data[tail++] = str;size++;if(tail == data.length) tail = 0;}}public String take() {synchronized (this) {if(size == 0) return null;String ret = data[head++];size--;if(head == data.length) head = 0;return ret;}}
}
5.2.2 进行阻塞操作
当进行完加锁操作之后,我们还需要实现阻塞的作用,当添加数据的时候,如果队列中容量满了的时候就进入阻塞等待状态,直到进行了 take 读取数据操作删除数据的时候,才停止等待;当读取数据的时候,如果队列为空,那么该线程就进入阻塞等待状态,直到进行了 put 操作。
class MyBlockingQueue {private final String[] data = new String[1000];private int size;private int head = 0;private int tail = 0;public void put(String str) throws InterruptedException {synchronized (this) {if(size == data.length) {this.wait();}data[tail++] = str;size++;if(tail == data.length) tail = 0;//这个 notify 用来唤醒 take 操作的等待this.notify();}}public String take() throws InterruptedException {synchronized (this) {if(size == 0) {this.wait();}String ret = data[head++];size--;if(head == data.length) head = 0;//这个 notify 用来唤醒 put 操作的等待this.notify();return ret;}}
}
5.2.3 解决因被 interrupt 唤醒 waiting 状态的问题
当使用了 wait 和 notify 对 put 和 take 操作进行了阻塞等待和唤醒操作之后,我们还需要注意,难道只有 notify 才会唤醒 WAITING 状态吗?前面我们学习了使用 interrupt 来终止线程,但是 interrup 还会唤醒处于 WAITING 状态的线程,也就是说这里的 WAITING 状态的线程不仅可以被 notify 唤醒,还可以被 interrupt 唤醒。
- 当线程是因为 put 操作队列满了的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 take 操作的 notify 唤醒的时候就意味着此时队列还是满的,当进行添加操作的时候,就会将有效的数据覆盖掉;
- 当线程是因为 take 操作队列为空的时候进入阻塞等待状态的时候,如果是因为被 interrupt 唤醒而不是 put 操作的 notify 唤醒的时候就意味着此时队列还是空的,如果进行删除操作,并没有意义。
为了解决 WAITING 状态被 interrupt 唤醒而造成的问题,当线程被唤醒的时候,需要进行判断 size 是否还等于 0 或者 queue.length,如果还等于,就继续进入 WAITING 状态,但是光一次判断是不够的,因为还可能是被 interrupt 唤醒的,所以需要进行多次判断,可以用 while 循环来解决。
class MyBlockingQueue {private final String[] data = new String[1000];private int size;private int head = 0;private int tail = 0;public void put(String str) throws InterruptedException {synchronized (this) {while(size == data.length) {this.wait();}data[tail++] = str;size++;if(tail == data.length) tail = 0;//这个 notify 用来唤醒 take 操作的等待this.notify();}}public String take() throws InterruptedException {synchronized (this) {while(size == 0) {this.wait();}String ret = data[head++];size--;if(head == data.length) head = 0;//这个 notify 用来唤醒 put 操作的等待this.notify();return ret;}}
}
5.2.4 解决因指令重排序造成的问题
因为 put 和 take 操作要进行读和写的操作,可能会因为指令重排序的问题造成其他问题,这里就需要使用 volatile 解决指令重排序问题。
class MyBlockingQueue {private final String[] data = new String[1000];private volatile int size;private volatile int head = 0;private volatile int tail = 0;public void put(String str) throws InterruptedException {synchronized (this) {while(size == data.length) {this.wait();}data[tail++] = str;size++;if(tail == data.length) tail = 0;//这个 notify 用来唤醒 take 操作的等待this.notify();}}public String take() throws InterruptedException {synchronized (this) {while(size == 0) {this.wait();}String ret = data[head++];size--;if(head == data.length) head = 0;//这个 notify 用来唤醒 put 操作的等待this.notify();return ret;}}
}
测试实现的阻塞队列
public class Demo4 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();Thread t1 = new Thread(() -> {while(true) {try {System.out.println("消费元素" + queue.take());} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2 = new Thread(() -> {int count = 1;while(true) {try {queue.put(count + "");System.out.println("生产元素" + count);count++;Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}
让生产速度较慢,使得读取操作阻塞等待插入数据才执行。
public class Demo4 {public static void main(String[] args) {MyBlockingQueue queue = new MyBlockingQueue();Thread t1 = new Thread(() -> {while(true) {try {System.out.println("消费元素" + queue.take());Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}}});Thread t2 = new Thread(() -> {int count = 1;while(true) {try {queue.put(count + "");System.out.println("生产元素" + count);count++;} catch (InterruptedException e) {throw new RuntimeException(e);}}});t1.start();t2.start();}
}
让生产 put 操作进入阻塞等待状态。
相关文章:

【JavaEE】多线程案例-阻塞队列
1. 前言 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是: 在队列为空时,获取元素的线程会等待队列变为非空当队列满时,存储元素的线程会等待队列可用 阻塞队列常用于生产者和消费者的场…...

【物联网】简要介绍最小二乘法—C语言实现
最小二乘法是一种常用的数学方法,用于拟合数据和寻找最佳拟合曲线。它的目标是找到一个函数,使其在数据点上的误差平方和最小化。 文章目录 基本原理最小二乘法的求解应用举例使用C语言实现最小二乘法总结 基本原理 假设我们有一组数据点 ( x 1 , y 1 …...

慢查询SQL如何优化
一.什么是慢SQL? 慢SQL指的是Mysql中执行比较慢的SQL,排查慢SQL最常用的方法是通过慢查询日志来查找慢SQL。Mysql的慢查询日志是Mysql提供的一种日志记录,它用来记录Mysql中响应时间超过long_query_time值的sql,long_query_time的默认时间为10s. 二.查看慢SQL是否…...
UART 通信-使用VIO进行板级验证
串口系列知识分享: (1)串口通信实现-串口发送 (2)串口通信发送多字节数据 (3)串口通信实现-串口接收 (4)UART 通信-使用VIO进行板级验证 (5)串口接收-控制LED闪烁 (6)使用串口发送实现ACX720开发板时钟显示 (7)串口发送+RAM+VGA传图 文章目录 前言一、uart串口协…...
linux 查看可支持的shell
查看可支持的shell linux中支持多种shell类型,所以在shell文件的第一行需要指定所使用的shell #!/bin/bash 指定该脚本使用的是/bin/bash,这样的机制使得我们可以轻松地引用任何的解释器 查看该linux系统支持的shell cat /etc/shells/bin/sh/bin/bash/us…...

微服务简介
微服务简介 微服务架构是一种软件架构模式,它将一个大型应用程序拆分为一组小型、独立的服务,每个服务都有自己的业务逻辑和数据存储。这些服务可以独立开发、部署和扩展,通常使用HTTP或其他轻量级通信协议进行通信。 以下是微服务架构的一…...

PHP自己的框架2.0设置常量并绑定容器(重构篇三)
目录 1、设置常量并绑定容器 2、容器增加设置当前容器的实例和绑定一个类实例当容器 3、将常量绑定到容器中 4、运行效果 1、设置常量并绑定容器 2、容器增加设置当前容器的实例和绑定一个类实例当容器 //设置当前容器的实例public static function setInstance($instance){…...

重建大师提交空三后引擎状态是等待,怎么开启?
答:图片中这是在自由网空三阶段,整个AT都是等待中,可以修改任务目录和监控目录看一下,先设置引擎,再提交空三。...

【数据结构】堆的向上调整和向下调整以及相关方法
💐 🌸 🌷 🍀 🌹 🌻 🌺 🍁 🍃 🍂 🌿 🍄🍝 🍛 🍤 📃 文章目录 一、堆的概念二、堆的性质…...

【蓝桥杯选拔赛真题60】Scratch旋转风车 少儿编程scratch图形化编程 蓝桥杯选拔赛真题解析
目录 scratch旋转风车 一、题目要求 编程实现 二、案例分析 1、角色分析...

JavaSE、JavaEE与Spring的概念和异同点剖析以及规范13 个分析
JavaSE、JavaEE与Spring的概念和异同点剖析以及规范13 个分析 目录概述需求: 设计思路实现思路分析1.什么是JavaSE2.是JavaEE3.什么是Spring 参考资料和推荐阅读 Survive by day and develop by night. talk for import biz , show your perfect code,full busy&…...

微信小程序的图书馆图书借阅 座位预约系统 读者端设计与实现
该系统基于B/S即所谓浏览器/服务器模式,应用springboot框架,选择MySQL作为后台数据库。系统主要包括系统图书信息、图书借阅、图书归还、自习室信息、自习室预约等功能模块。 关键词 微信小程序的图书馆读者端;微信小程序;java语…...

在阿里云 linux 服务器上查看当前服务器的Nginx配置信息
我们可以通过命令 sudo nginx -t查看到nginx.conf的路径 可以通过 sudo nginx -T查看 nginx 详细配置信息,包括加载的配置文件和配置块的内容 其中也会包括配置文件的内容...

专业招投标书翻译怎样做比较好
在全球经济贸易一体化不断深入的时代,招投标作为国际通用的新型贸易方式,受到了大量中外企业的青睐。根据国际惯例,与招标采购活动有关的一切文件资料,均须使用英文编制。即使允许使用非英文语言编制,也必须随附一份英…...

算法总结10 线段树
算法总结10 线段树 线段树2569. 更新数组后处理求和查询 线段树 有一个数组,我们要: 更新数组的值(例如:都加上一个数,把子数组内的元素取反)查询一个子数组的值(例如:求和&#x…...

518抽奖软件,支持按人像照片抽奖
518抽奖软件简介 518抽奖软件,518我要发,超好用的年会抽奖软件,简约设计风格。 包含文字号码抽奖、照片抽奖两种模式,支持姓名抽奖、号码抽奖、数字抽奖、照片抽奖。(www.518cj.net) 照片抽奖模式 圆角边框 照片抽奖模式下&am…...

数字IC笔试面试题之--时钟偏斜(skew)与抖动(jitter)
1 时钟偏斜(clock skew) 时钟偏斜(偏移)是因为布线长度和负载不同,导致同一时钟上升沿到不同触发器的时间不同。这一时间差,即为时钟偏移。 时钟偏斜可能导致时序违例(本文直接粘贴了参考博客…...
免费api接口:物流api,企业工商查询api,游戏api。。。
免费api接口,物流api,企业工商查询api,游戏api。。。都有。 Facebook Games Services - Facebook Games Services 为游戏开发者提供了各种服务, 包括(但不限于) 成就 API, 分数 API, 应用通知, 请求, 游戏养成和 Facebook SDK for Unity.Google Play Games Service…...
第二十八章 Classes - 引用其他类的方法
文章目录 第二十八章 Classes - 引用其他类的方法引用其他类的方法对当前实例的引用 第二十八章 Classes - 引用其他类的方法 引用其他类的方法 在方法(或例程)中,使用下面的语法来引用其他类中的方法: 要调用类方法并访问其返回值,请使用如下表达式:…...

Android 中集成 TensorFlow Lite图片识别
在上图通过手机的相机拍摄到的物体识别出具体的名称,这个需要通过TensorFlow 训练的模型引用到项目中;以下就是详细地集成 TensorFlow步骤,请按照以下步骤进行操作: 在项目的根目录下的 build.gradle 文件中添加 TensorFlow 的 Ma…...

Zustand 状态管理库:极简而强大的解决方案
Zustand 是一个轻量级、快速和可扩展的状态管理库,特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...
Golang dig框架与GraphQL的完美结合
将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用,可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器,能够帮助开发者更好地管理复杂的依赖关系,而 GraphQL 则是一种用于 API 的查询语言,能够提…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...
反射获取方法和属性
Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...
unix/linux,sudo,其发展历程详细时间线、由来、历史背景
sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...

Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...

佰力博科技与您探讨热释电测量的几种方法
热释电的测量主要涉及热释电系数的测定,这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中,积分电荷法最为常用,其原理是通过测量在电容器上积累的热释电电荷,从而确定热释电系数…...