当前位置: 首页 > article >正文

并发编程AQS之ReentrantLock/Semaphore/CountDownLatch/CyclicBarrier

一、管程——Java线程同步的设计思想管程指的是管理共享变量以及对共享变量的操作过程让他们支持并发。互斥同一时刻只允许一个线程访问共享资源同步线程之间如何通信、协作。MESA模型在管程的发展史上先后出现过三种不同的管程模型分别是Hasen模型、Hoare模型和MESA模型。现在正在广泛使用的是MESA模型。管程中引入了条件变量的概念而且每个条件变量都对应有一个等待队列。条件变量和等待队列的作用是解决线程之间的同步问题。Java中针对管程有两种实现一种是基于Object的Monitor机制用于synchronized内置锁的实现一种是抽象队列同步器AQS用于JUC包下Lock锁机制的实现Slf4j public class ConditionDemo2 { private static final ReentrantLock lock new ReentrantLock(); private static final Condition condition lock.newCondition(); public static void main(String[] args) throws InterruptedException { new Thread(() - { log.debug(t1开始执行....); lock.lock(); try { log.debug(t1获取锁....); // 让线程在obj上一直等待下去 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); log.debug(t1执行完成....); } }, t1).start(); new Thread(() - { log.debug(t2开始执行....); lock.lock(); try { log.debug(t2获取锁....); // 让线程在obj上一直等待下去 condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); log.debug(t2执行完成....); } }, t2).start(); // 主线程两秒后执行 Thread.sleep(2000); log.debug(准备获取锁去唤醒 condition上阻塞的线程); lock.lock(); try { // 唤醒condition上所有阻塞的线程 condition.signalAll(); log.debug(唤醒condition上阻塞的线程); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }二、AQS原理分析1、什么是AQSjava.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为比如等待队列、条件队列、独占获取、共享获取等而这些行为的抽象就是基于AbstractQueuedSynchronizer简称AQS实现的AQS是一个抽象同步框架可以用来实现一个依赖状态的同步器。JDK中提供的大多数的同步器如Lock, Latch, Barrier等都是基于AQS框架来实现的一般是通过一个内部类Sync继承 AQS将同步器所有调用都映射到Sync对应的方法AQS具备的特性阻塞等待队列共享/独占公平/非公平可重入允许中断2、AQS核心结构private volatile int state;//共享变量使用volatile修饰保证线程可见性 //返回同步状态的当前值 protected final int getState() { return state; } // 设置同步状态的值 protected final void setState(int newState) { state newState; } //原子地CAS操作将同步状态值设置为给定值update如果当前同步状态的值等于expect期望值 protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }2.1、AQS内部维护属性volatile int statestate表示资源的可用状态2.2、State访问方式getState()setState()compareAndSetState()2.3、资源访问方式Exclusive-独占只有一个线程能执行如ReentrantLockShare-共享多个线程可以同时执行如Semaphore/CountDownLatch2.4、AQS实现方法isHeldExclusively()该线程是否正在独占资源。只有用到condition才需要去实现它。tryAcquire(int)独占方式。尝试获取资源成功则返回true失败则返回false。tryRelease(int)独占方式。尝试释放资源成功则返回true失败则返回false。tryAcquireShared(int)共享方式。尝试获取资源。负数表示失败0表示成功但没有剩余可用资源正数表示成功且有剩余资源。tryReleaseShared(int)共享方式。尝试释放资源如果释放后允许唤醒后续等待结点返回true否则返回false。2.5、AQS定义两种队列同步等待队列 主要用于维护获取锁失败时入队的线程。条件等待队列 调用await()的时候会释放锁然后线程会加入到条件队列调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中等待再次获得锁。2.6、AQS定义了5个队列中节点状态值为0初始化状态表示当前节点在sync队列中等待着获取锁。CANCELLED值为1表示当前的线程被取消SIGNAL值为-1表示当前节点的后继节点包含的线程需要运行也就是unparkCONDITION值为-2表示当前节点在等待condition也就是在condition队列中PROPAGATE值为-3表示当前场景下后续的acquireShared能够得以执行3、同步等待队列AQS当中的同步等待队列也称CLH队列CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列是FIFO先进先出线程等待队列Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。AQS 依赖CLH同步队列来完成同步状态的管理当前线程如果获取同步状态失败时AQS则会将当前线程已经等待状态等信息构造成一个节点Node并将其加入到CLH同步队列同时会阻塞当前线程当同步状态释放时会把首节点唤醒公平锁使其再次尝试获取同步状态。通过signal或signalAll将条件队列中的节点转移到同步队列。由条件队列转化为同步队列4、条件等待队列AQS中条件队列是使用单向列表保存的用nextWaiter来连接:调用await方法阻塞线程当前线程存在于同步队列的头结点调用await方法进行阻塞从同步队列转化到条件队列5、基于AQS实现一把独占锁/** * author * 基于AQS实现一把独占锁 */ public class TulingLock extends AbstractQueuedSynchronizer{ Override protected boolean tryAcquire(int unused) { //cas 加锁 state0 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } Override protected boolean tryRelease(int unused) { //释放锁 setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return getState() ! 0; } }三、ReentrantLockReentrantLock是一种基于AQS框架的应用实现是JDK中的一种线程并发访问的同步手段它的功能类似于synchronized是一种互斥锁可以保证线程安全。1、ReentrantLock使用方式public class ReentrantLockTest { private final ReentrantLock lock new ReentrantLock(); // ... public void doSomething() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock(); } } }2、ReentrantLock原理ReentrantLock基于 AQS CAS 实现。lock()流程图ReentrantLock基于抽象队列同步器AQS CAS 实现的加锁、释放锁。ReentrantLock实现了公平锁、非公平锁公平锁与非公平锁唯一的区别在于非公平锁不会判断等待队列中是否节点等待获取锁而是直接尝试获取锁获取不到再将当前线程节点添加进等待队列的尾节点判断当前线程节点是否挂起。unlock()流程图ReentrantLock释放锁的流程较为简单优先判断持有锁资源的线程是否为当前线程若不为当前线程抛出异常若为当前线程AQS的state的属性值减1再判断减1后的值是否为0若为0表示当前线程彻底释放锁资源唤醒等待队列中的挂起线程节点开始抢占锁资源。3、ReentrantLock源码分析3.1 构造函数private final Sync sync; // 默认使用非公平锁 public ReentrantLock() { sync new NonfairSync(); } // fairtrue公平锁否则非公平锁 public ReentrantLock(boolean fair) { sync fair ? new FairSync() : new NonfairSync(); }Sync是ReentrantLock的抽象静态内部类继承自AQS(AbstractQueuedSynchronizer) - 抽象队列同步器AQS中定义了锁的基本行为AQS中用volatile修饰的state表示当前锁重入的次数。NonfairSync、FairSync是ReentrantLock的静态内部类继承ReentrantLock$SyncNonfairSync实现非公平锁FairSync实现公平锁。3.2 lock()加锁private final Sync sync; // 加锁 public void lock() { sync.lock(); }3.3 公平锁调用AQS的acquire方法。ReentrantLock$FairSync#lock() 核心代码// 加锁 final void lock() { acquire(1); }3.4 非公平锁通过CAS尝试获取锁(将AQS的state由0修改为1)若成功代表当前线程获取锁资源成功若失败调用AQS的acquire方法。ReentrantLock$NonfairSync#lock() 核心代码// 加锁 final void lock() { // 获取锁资源CAS 修改 AQS 的 state 属性值,获取成功设置当前线程 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); // 获取失败执行AQS的acquire else acquire(1); }3.5 acquire()acquire()方法是Sync父类AQS中的方法AbstractQueuedSynchronizer#acquire() 核心代码// 获取锁资源 public final void acquire(int arg) { // 尝试获取锁资源 if (!tryAcquire(arg) // 当前线程为获取到锁资源加入等待队列同时挂起线程等待唤醒 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }3.6 tryAcquire()tryAcquire()方法在FairSync、NonFairSync中均有实现尝试获取锁资源核心代码如下// 公平锁 FairSync#tryAcquire() 方法 protected final boolean tryAcquire(int acquires) { // 获取当前线程 final Thread current Thread.currentThread(); // 获取AQS的 state int c getState(); // state 0 当前没有线程占用锁资源 if (c 0) { // 判断是否有线程在排队若有线程在排队返回true if (!hasQueuedPredecessors() // 尝试抢锁 compareAndSetState(0, acquires)) { // 无线程排队将线程属性设置为当前线程 setExclusiveOwnerThread(current); return true; } } // state ! 0 有线程占用锁资源 // 占用锁资源的线程是否为当前线程 else if (current getExclusiveOwnerThread()) { // state 1 int nextc c acquires; // 锁重入超出最大限制 (int的最大值)抛异常 if (nextc 0) throw new Error(Maximum lock count exceeded); // 将 state 1 设置给 state setState(nextc); // 当前线程拿到锁资源返回true return true; } return false; } // 非公平锁 NonFairSync#tryAcquire() 方法 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // 非公平锁 Sync#nonfairTryAcquire() 方法 final boolean nonfairTryAcquire(int acquires) { // 获取当前线程 final Thread current Thread.currentThread(); // 获取AQS的 state int c getState(); // 无线程占用锁资源 if (c 0) { // CAS 修改 state 的值修改成功设置线程属性为当前线程返回占用锁资源标识 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 有线程占用锁资源 // 占用锁资源的线程是当前线程重入 else if (current getExclusiveOwnerThread()) { // AQS 的 state acquires int nextc c acquires; // 超出锁重入的上限(int的最大值)抛异常 if (nextc 0) throw new Error(Maximum lock count exceeded); // 将 state acquires 设置到 state 属性 setState(nextc); return true; } return false; }获取当前线程AQS的stateAQS的state属性值为0表示无线程占用锁资源判断等待队列中是否有线程在排队若有线程在排队返回尝试抢锁失败标识将线程添加进等待队列中。若state属性值不为0判断持有锁资源的线程是否为当前线程若为当前线程AQS的state属性值 1返回尝试抢锁成功标识。公平锁与非公平锁的整体实现流程类似唯一不同的是AQS的state属性值为0无线程占用锁资源时非公平锁不会判断是否有线程在等待队列中排队而是直接通过CAS抢锁。3.7 addWaiter()为当前线程创建入队节点AbstractQueuedSynchronizer$Node入参mode表示锁类型在AQS的静态内部类Node中有SHARE、EXCLUSIVE两个属性SHARE代表共享锁、EXCLUSIVE代表排它锁。AbstractQueuedSynchronizer#addWaiter() 核心代码// 等待队列的尾节点懒加载只能通过enq方法添加节点 private transient volatile Node tail; private Node addWaiter(Node mode) { // 当前线程、获取的锁类型封装为Node对象 Node node new Node(Thread.currentThread(), mode); // 获取等待队列的尾节点 Node pred tail; // 尾节点不为null if (pred ! null) { // 将当前节点设置为等待队列的尾节点 node.prev pred; if (compareAndSetTail(pred, node)) { pred.next node; return node; } } // 等待队列为空初始化等待队列节点信息 enq(node); // 返回当前线程节点 return node; }等待队列不为空将当前线程封装的Node节点添加进队列尾部若等待队列为空先初始化等待队列然后在将Node节点添加进队列尾部。3.8 enq()等待队列尾节点为空时执行enq()方法初始化等待队列并将Node节点添加进等待队列中。private Node enq(final Node node) { for (;;) { // 获取等待队列的尾节点 Node t tail; // 等待队列为空初始化等待队列 if (t null) { // 初始化等待队列头尾节点 if (compareAndSetHead(new Node())) tail head; } else { // 当前线程的Node添加到等待队列中 node.prev t; if (compareAndSetTail(t, node)) { t.next node; return t; } } } }3.9 acquireQueued()当前线程是否挂起AbstractQueuedSynchronizer#acquireQueued() 核心代码final boolean acquireQueued(final Node node, int arg) { // 获取锁资源标识 boolean failed true; try { boolean interrupted false; // 自旋 for (;;) { // 获取当前节点的前驱节点 final Node p node.predecessor(); // 当前节点的前驱节点为头节点并获取锁资源成功 if (p head tryAcquire(arg)) { // 将当前节点设置到head - 头节点 setHead(node); // 原头节点的下一节点指向设置为nullGC回收 p.next null; // 设置获取锁资源成功 failed false; // 不管线程GC return interrupted; } // 如果当前节点不是head的下一节点获取锁资源失败,尝试将线程挂起 if (shouldParkAfterFailedAcquire(p, node) // 线程挂起 UNSAFE.park() parkAndCheckInterrupt()) interrupted true; } } finally { if (failed) cancelAcquire(node); } }查看当前排队的Node是否是head的next 如果是尝试获取锁资源 如果不是或者获取锁资源失败那么就尝试将当前Node的线程挂起unsafe.park()。3.10 shouldParkAfterFailedAcquire检查并更新未成功获取锁资源的状态返回true表示线程被挂起。AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire() 核心代码static final class Node { // 线程被取消 static final int CANCELLED 1; // 等待队列中存在待被唤醒的挂起线程 static final int SIGNAL -1; // 当前线程在Condition队列中未在AQS对列中 static final int CONDITION -2; // 解决JDK1.5的BUG。共享锁在释放资源后若头节点为0无法确定真的没有后继节点 // 如果头节点为0需要将头节点的状态改为 -3 当最新拿到锁资源的线程查看 // 是否有后继节点并且为当前锁为共享锁需唤醒排队的线程。 static final int PROPAGATE -3; } // 获取锁资源失败挂起线程 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取当前节点的上一个节点的状态 int ws pred.waitStatus; // 上一节点被挂起 if (ws Node.SIGNAL) // 返回true挂起当前线程 return true; if (ws 0) { // 上一节点被取消获取最近的线程挂起节点 // 并将当前节点的上一节点指向最近的线程挂起节点 do { node.prev pred pred.prev; } while (pred.waitStatus 0); // 最近线程挂起节点的下一节点指向当前节点 pred.next node; } else { // 上一节点状态小于等于0存在线程处于等待状态但未被挂起的场景 // 通过CAS将处于等待的线程挂起避免在挂起前节点获取到锁资源 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 返回true不挂起当前线程 return false; }在挂起线程前确认当前节点的上一个节点的状态。若为1代表是取消的节点不能挂起若为-1代表后续节点中有挂起的线程若为-2 (线程在等待队列 - Condition队列中)、-3 (避免线程无法唤醒的一个状态)需要将状态改为-1之后才能挂起当前线程。3.11 unlock()释放锁释放锁ReentrantLock#unlock() 核心代码// 释放锁 public void unlock() { sync.release(1); }unlock方法实际调用的是AQS的release方法AbstractQueuedSynchronizer#release() 核心代码// 等待队列的头节点懒加载通过setHead方法初始化 private transient volatile Node head; // 释放锁 public final boolean release(int arg) { // 当前线程释放锁资源的计数值 if (tryRelease(arg)) { // 当前线程玩去释放锁资源获取等待队列头节点 Node h head; if (h ! null h.waitStatus ! 0) // 唤醒等待队列中待唤醒的节点 unparkSuccessor(h); // 完全释放锁资源 return true; } // 当前线程未完全释放锁资源 return false; }3.12 tryRelease()释放锁Reenttrant$Sync#tryRelease()的核心代码// 释放锁 protected final boolean tryRelease(int releases) { // 修改 AQS 的 state int c getState() - releases; // 当前线程不是持有锁的线程抛出异常 if (Thread.currentThread() ! getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 是否成功的将锁资源完全释放标识 state 0 boolean free false; // 锁资源完全释放 if (c 0) { // 修改标识 free true; // 将占用锁资源的属性设置为null setExclusiveOwnerThread(null); } // state赋值 setState(c); // 返回true表示当前线程完全释放锁资源 // 返回false标识当前线程是由锁资源持有计数值减少 return free; }4、ReentrantLock非公平锁执行流程四、SemaphoreSemaphore基于 AQS CAS 实现的可根据构造参数的布尔值选择使用公平锁还是非公平锁。Semaphore默认使用非公平锁。Semaphore详情如下1、Semaphore构造函数// AQS的实现 private final Sync sync; // 默认使用非公平锁 public Semaphore(int permits) { sync new NonfairSync(permits); } // 根据fair布尔值选择使用公平锁还是非公平锁 public Semaphore(int permits, boolean fair) { sync fair ? new FairSync(permits) : new NonfairSync(permits); }2、公平锁与非公平锁Semaphore中公平锁与非公平锁的实现可以在tryAcquireShared()方法中找到两种锁的区别。3、NonfairSyncSemaphore#NonfairSync#tryAcquireShared() 详情如下// 非公平锁 获取信号量 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }Semaphore#Sync#nonfairTryAcquireShared() 详情如下// 非公平锁 获取信号量 final int nonfairTryAcquireShared(int acquires) { // 自旋 for (;;) { // 获取Semaphore中可用的信号量数 int available getState(); // 当前可用信号量数 - acquires int remaining available - acquires; // 可用信号量数不足 或 CAS操作获取信号量失败返回 当前可用信号量数 - acquires if (remaining 0 || compareAndSetState(available, remaining)) return remaining; } }4、FairSyncSemaphore#FairSync#tryAcquireShared() 详情如下protected int tryAcquireShared(int acquires) { // 自旋 for (;;) { // 等待队列中挂起线程返回-1 (根据返回的-1将当前线程添加到等待队列中) if (hasQueuedPredecessors()) return -1; // 尝试获取Semaphore的信号量下面与非公平锁逻辑相同 int available getState(); int remaining available - acquires; if (remaining 0 || compareAndSetState(available, remaining)) return remaining; } }5、acquire()Semaphore默认实现的是非公平锁acquire()按非公平锁的实现进行源码分析。Semaphore 中获取一个信号量Semaphore#acquire() 详情如下// Semaphore 中无信号量阻塞 public void acquire() throws InterruptedException { // 获取 Semaphore 信号量 sync.acquireSharedInterruptibly(1); }AbstractQueuedSynchronizer#acquireSharedInterruptibly() 详情如下public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 线程中断抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 尝试获取Semaphore的信号量 if (tryAcquireShared(arg) 0) // 尝试获取信号量失败再次获取Semaphore信号量 doAcquireSharedInterruptibly(arg); }private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node addWaiter(Node.SHARED); boolean failed true; try { // 自旋 for (;;) { final Node p node.predecessor(); // 当前节点的前驱节点为等待队列头节点 if (p head) { // 尝试获取信号量 int r tryAcquireShared(arg); // 获取信号量成功 if (r 0) { // 唤醒等待队列中的待唤醒线程 setHeadAndPropagate(node, r); p.next null; failed false; return; } } // 获取信号量失败挂起线程 线程阻塞待唤醒进行下一轮自旋 if (shouldParkAfterFailedAcquire(p, node) // 若当前线程被中断抛出InterruptedException异常 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }AbstractQueuedSynchronizer#setHeadAndPropagate()// node 当前节点propagate 剩余资源 private void setHeadAndPropagate(Node node, int propagate) { // 获取等待队列中的头节点 Node h head; // 将当前Node节点设置为等待队列的头节点 setHead(node); // 剩余资源大于0 || 原等待队列中的头节点为null || 原等待队列中 Node 的 ws 为 -1 或者 -3(共享锁) if (propagate 0 || h null || h.waitStatus 0 || (h head) null || h.waitStatus 0) { // 获取当前等待队列头节点的后继节点 Node s node.next; // 当前节点的后继节点为null 或 当前节点的后继节点为共享锁 if (s null || s.isShared()) doReleaseShared(); } }6、release()Semaphore默认实现的是非公平锁release()按非公平锁的实现进行源码分析。归还Semaphore的信号量Semaphore#release() 详情如下// 归还Semaphore的信号量 public void release() { sync.releaseShared(1); }Semaphore#Sync#releaseShared() 详情如下public final boolean releaseShared(int arg) { // 尝试归还信号量 if (tryReleaseShared(arg)) { // 归还信号量 doReleaseShared(); // 归还成功 return true; } // 归还失败 return false; }Semaphore#Sync#releaseShared() 详情如下// 尝试归还信号量 protected final boolean tryReleaseShared(int releases) { // 自旋 for (;;) { // 获取Semaphore中可用的信号量数 int current getState(); // 当前可用信号量数 归还的信号量 releases int next current releases; // 超出了int的最大值变成了负数 if (next current) throw new Error(Maximum permit count exceeded); // cas操作将信号量归还给Semaphore if (compareAndSetState(current, next)) return true; } }归还信号量成功唤醒等待队列中的挂起线程AbstractQueuedSynchronizer#doReleaseShared() :private void doReleaseShared() { // 自旋 for (;;) { // 获取等待队列头节点 Node h head; // 等待队列中有排队的线程 if (h ! null h ! tail) { int ws h.waitStatus; // 等待队列头节点ws -1说明其后继节点中有待唤醒的线程 if (ws Node.SIGNAL) { // cas 操作等待队列头节点的 ws 由 -1 更新为 0 cas失败继续下一次自旋 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒头节点的后继节点中待唤醒线程 unparkSuccessor(h); } // 解决共享锁JDK1.5的bug头节点的 ws 为0将头节点的 ws 设置为 -3 代表后继节点中可能有待唤醒的线程 else if (ws 0 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h head) break; } }7、总结不难看出公平锁与非公平锁的区别在于当线程尝试获取Semaphore中的信号量时公平锁优先判断等待队列中是否有挂起的线程如果有则将当前线程添加到等待队列中等待唤醒后抢夺信号量非公平锁不管等待队列中是否有挂起线程优先尝试获取信号量获取失败将当前线程添加到等待队列。五、CountDownLatchCountDownLatch让一个或多个线程等待其他线程执行完成后再执行。在创建CountDownLatch对象时必须指定线程数count每当一个线程执行完成调用countDown()方法线程数count减1当count减到0时await()方法就不再阻塞。CountDownLatch详情1、构造函数CountDownLatch没有无参构造函数在有参构造函数中初始化了sync属性。public CountDownLatch(int count) { // count 合法校验 if (count 0) throw new IllegalArgumentException(count 0); // 初始化sync属性 this.sync new Sync(count); }2、Sync - 队列同步器// 抽象队列同步器 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID 4982264981922014374L; // 将 count 赋值给 AQS 的 state 属性 Sync(int count) { setState(count); } // 获取 AQS 的 state 属性 int getCount() { return getState(); } // 判断所有线程是否都执行完成 1 - 全部执行完成-1 - 仍有线程在执行 protected int tryAcquireShared(int acquires) { return (getState() 0) ? 1 : -1; } // 释放锁 protected boolean tryReleaseShared(int releases) { // 自旋 for (;;) { // 获取 AQS 的 state int c getState(); // 锁资源已经释放完毕再次进入直接返回false什么也不做 if (c 0) return false; // state - 1 int nextc c-1; // CAS 赋值操作 if (compareAndSetState(c, nextc)) // 最后一个线程执行完state 0 返回true。 // countDown() 唤醒等待队列中的其他挂起线程 return nextc 0; } } }3、await() - 阻塞等待// AQS的state属性不为0 阻塞 public void await() throws InterruptedException { // 调用AQS提供的获取共享锁并允许中断的方法 sync.acquireSharedInterruptibly(1); }AbstractQueuedSynchronizer#acquireSharedInterruptibly()详情如下// 获取共享锁并允许其中断 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 线程中断抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // 获取共享锁由CountDownLatch实现 if (tryAcquireShared(arg) 0) // state 0说明有线程在持有锁资源将当前线程添加到AQS等待队列中 doAcquireSharedInterruptibly(arg); }CountDownLatch#Sync#tryAcquireShared()详情如下// 获取共享锁 protected int tryAcquireShared(int acquires) { // 线程全部执行完成返回 1未全部执行完成返回-1 return (getState() 0) ? 1 : -1; }AbstractQueuedSynchronizer#acquireSharedInterruptibly()详情如下// 将当前线程添加到AQS等待队列中 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 当前线程封装成Node添加到AQS等待队列中 final Node node addWaiter(Node.SHARED); boolean failed true; try { // 自旋 for (;;) { // 获取当前线程节点的前驱节点 final Node p node.predecessor(); // 前驱节点为等待队列头节点 if (p head) { // 调用 CountDownLatch 实现的方法 int r tryAcquireShared(arg); // 返回值为1表示 state 为 0 所有线程都释放了锁无其他线程持有锁资源 if (r 0) { // state 0将当前线程和后面所有排队的线程都唤醒。 setHeadAndPropagate(node, r); p.next null; failed false; return; } } // *** 线程在此处被挂起待所有线程释放锁资源后即state 0 线程被唤醒再继续往下执行 // 挂起获取锁资源失败的线程并且挂起的线程被中断抛出InterruptedException异常 if (shouldParkAfterFailedAcquire(p, node) parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }4、countDown() - 释放锁资源// countDown方法, 实际上调用了AQS的释放共享锁操作 public void countDown() { sync.releaseShared(1); }AbstractQueuedSynchronizer#releaseShared()详情如下// AQS提供的释放共享锁方法CountDownLatch实现了 tryReleaseShared 方法 public final boolean releaseShared(int arg) { // 尝试释放锁资源 if (tryReleaseShared(arg)) { // 没有线程持有锁资源唤醒等待队列中的其他挂起线程 doReleaseShared(); return true; } return false; }CountDownLatch#Sync#tryReleaseShared()详情如下protected boolean tryReleaseShared(int releases) { // 自旋 for (;;) { // 获取当前持有锁资源的线程数 int c getState(); // state已为0返回false那么再次执行countDown什么事情也不做 if (c 0) return false; // count - 1 int nextc c-1; // CAS 完成赋值操作 if (compareAndSetState(c, nextc)) // 没有线程持有锁资源返回true return nextc 0; } }AbstractQueuedSynchronizer#doReleaseShared()详情如下// 没有线程持有锁资源的处理 private void doReleaseShared() { // 自旋 for (;;) { // 获取等待队列的头节点 Node h head; // 等待队列中有挂起线程待唤醒 if (h ! null h ! tail) { int ws h.waitStatus; // 线程待唤醒 if (ws Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒线程 unparkSuccessor(h); } // CAS失败 else if (ws 0 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } // 等待队列头节点被改变结束循环 if (h head) break; } }5、总结CountDownLatch基于 AQS CAS 实现CountDownLatch的构造函数中必须指定count同时初始继承AQS的内部类Sync通过Sync对象将count赋值给AQS的state属性这样就可以基于AQS提供的方法完成CountDownLatch的功能。调用countDown()方法实际上是将AQS中 state 减 1。所有线程执行完成state 会被修改为 0 在countDown()中会唤醒等待队列中挂起的线程。调用await()方法实际上是判断AQS中的 state 是否为 0。state 0表示有线程仍在执行此时await()会阻塞线程。当最后一个线程执行结束state 变为 0countDown()唤醒线程后await()正常执行结束不再阻塞。六、CyclicBarrier与CountDownLatch、Semaphore直接基于AQS实现不同CyclicBarrier 是基于 ReentrantLock ConditionObject 实现的间接基于AQS实现的。1、CyclicBarrier内部结构Generation静态内部类持有布尔类型的属性broken默认为false只有在重置方法reset()、执行出现异常或中断调用breakBarrier() 属性会被设置为true。nextGenerate() 重置 CyclicBarrier 的计数器和generation属性。breakBarrier() 任务执行中断、异常、被重置将Generation中的布尔类型属性设置为true将Waiter队列中的线程转移到AQS队列中待执行完unlock方法后唤醒AQS队列中的挂起线程。await() CyclicBarrier的核心方法计数器递减处理。2、构造函数构造参数重载最终调用的是CyclicBarrier(int, Runnable)详情如下public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { // 参数合法性校验 if (parties 0) throw new IllegalArgumentException(); // final修饰所有线程执行完成归为或重置时 使用 this.parties parties; // 在await方法中计数值表示还有多少线程待执行await this.count parties; // 当计数count为0时 执行此Runnnable再唤醒被阻塞的线程 this.barrierCommand barrierAction; }CyclicBarrier属性3、await()在CyclicBarrier中await有重载方法。await()表示会一直等待指定数量的线程未准备就绪(执行await方法)await(timout, unit)表示等待timeout时间后指定数量的线程未准备就绪抛出TimeoutException超时异常。CyclicBarrier#await 详情如下// 执行没有超时时间的await public int await() throws InterruptedException, BrokenBarrierException { try { // 执行dowait() return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); } } // 执行有超时时间的await public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }await最终调用dowait()方法CyclicBarrier#dowait 详情如下private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 获取锁对象 final ReentrantLock lock this.lock; // 加锁 lock.lock(); try { // 获取generation对象 final Generation g generation; // 这组线程中在执行过程中是否异常、超时、中断、重置 if (g.broken) throw new BrokenBarrierException(); // 这组线程被中断重置标识与计数值 // 将Waiter队列中的线程转移到AQS队列抛出InterruptedException if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 计数值 - 1 int index --count; // 这组线程都已准备就绪 if (index 0) { // 执行结果标识 boolean ranAction false; try { // 若使用2个参数的有参构造就传入了自实现任务index 0先执行CyclicBarrier有参的任务 // 此处设计与 FutureTask 构造参数设计类似 final Runnable command barrierCommand; if (command ! null) // 执行任务 command.run(); // 执行完成设置为true ranAction true; // CyclicBarrier属性归位 nextGeneration(); return 0; } finally { // 执行过程中出现问题 if (!ranAction) // 重置标识与计数值将Waiter队列中的线程转移到AQS队列 breakBarrier(); } } // -- 之后count不为0表示还有线程在等待 // 自旋 直到被中断、超时、异常、count 0 for (;;) { try { // 未设置超时时间 if (!timed) // 挂起线程将线程转移到 Condition 队列 trip.await(); // 未达到等待时间 else if (nanos 0L) // 挂起线程并返回剩余等待时间 nanos trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 中断异常 if (g generation ! g.broken) { breakBarrier(); throw ie; } else { // 线程中断 Thread.currentThread().interrupt(); } } // 该组线程被中断、执行异常、超时抛出BrokenBarrierException异常 if (g.broken) throw new BrokenBarrierException(); if (g ! generation) return index; // 超时抛出异常TimeoutException if (timed nanos 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 释放锁资源 lock.unlock(); } }4、breakBarrier()结束CyclicBarrier的执行// 结束CyclicBarrier的执行 private void breakBarrier() { // 设置线程执行过程中是否异常、中断、重置标识 generation.broken true; // 重置计数值 count parties; // 将Condition队列中的Node转移到AQS队列中等到执行完unlockAQS队列中的挂起线程会被唤醒 // 有后继节点的设置ws -1; // 无后继节点的设置ws 0 trip.signalAll(); }5、reset()重置CyclicBarrier// 重置CyclicBarrier public void reset() { // 获取锁对象 final ReentrantLock lock this.lock; // 加锁 lock.lock(); try { // 设置当前generation属性并将Waiter队列中线程转移到AQS队列 breakBarrier(); // 重置generation 属性、计数值 nextGeneration(); } finally { // 释放锁 lock.unlock(); } }6、nextGeneration()CyclicBarrier归位private void nextGeneration() { // 将Waiter队列中线程转移到AQS队列 trip.signalAll(); // 计数值、generation 归位 count parties; generation new Generation(); }7、总结CyclicBarrier基于 ReentrantLock ConditionObject实现CyclicBarrier的构造函数中必须指定parties同时对象generation内部持有布尔型属性表示当前CyclicBarrier执行过程中是否有超时、异常、中断的情况。parties是初始待执行线程数在构造函数中会将parties赋给计数值count每当一个线程执行await()count就会减1。当count被减为0时代表所有线程都准备就绪此时判断构造函数是否初始化了barrierCommand属性若对barrierCommand属性做了赋值优先执行barrierCommand任务barrierCommand任务执行完成再将Waiter队列中的线程转移到AQS队列中执行完unlock唤醒AQS队列中的线程计数值count、generation归位。

相关文章:

并发编程AQS之ReentrantLock/Semaphore/CountDownLatch/CyclicBarrier

一、管程——Java线程同步的设计思想管程:指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。互斥:同一时刻只允许一个线程访问共享资源;同步:线程之间如何通信、协作。MESA模型在管程的发展史上&#xff0…...

python google docstring

## 关于Python Google Docstring的一些想法 说实话,我接触Google Docstring这个命名规范也有好些年头了。刚开始觉得不就是个注释嘛,后来才发现这东西藏着挺多门道的。 先说说Docstring到底是什么。简单讲,它就是在Python函数、类或者模块开头…...

python numpydoc

NumPyDoc,这东西说起来其实就是Python文档社区给NumPy写的那套文档风格指南。你可能见过那种函数定义下面写着Parameters、Returns、Raises的注释块,那就是它的产物。 NumPy的开发者们当年面对各种科学计算库的文档乱象,决定搞一套规范出来。…...

利用 taotoken 多模型能力构建 a b 测试内容生成流水线

利用 Taotoken 多模型能力构建 A/B 测试内容生成流水线 1. 多模型 A/B 测试的价值与场景 在内容运营与产品迭代过程中,生成式 AI 已成为提升效率的关键工具。不同模型对同一提示词(prompt)的响应可能存在显著差异,这种差异直接影…...

Applera1n:iOS设备离线激活锁绕过终极解决方案

Applera1n:iOS设备离线激活锁绕过终极解决方案 【免费下载链接】applera1n icloud bypass for ios 15-16 项目地址: https://gitcode.com/gh_mirrors/ap/applera1n 在iOS设备管理领域,激活锁绕过一直是技术专家和开发者关注的核心难题。Applera1n…...

python markdown

# Python Markdown 那些事:一个老程序员的自用笔记 记得刚接触Python Markdown那会儿,正赶上要给项目写文档。团队里有人用Sphinx,有人用Jupyter,吵得不可开交。最后我默默掏出Python Markdown写了份技术手册,三页纸解…...

3个创意场景:用Audacity把普通音频变成专业作品

3个创意场景:用Audacity把普通音频变成专业作品 【免费下载链接】audacity Audio Editor 项目地址: https://gitcode.com/GitHub_Trending/au/audacity 你是否曾想过,那些听起来平平无奇的录音,其实只需要几个简单的步骤就能焕然一新…...

7种专业模式:OBS Advanced Timer如何彻底改变直播时间管理体验?

7种专业模式:OBS Advanced Timer如何彻底改变直播时间管理体验? 【免费下载链接】obs-advanced-timer 项目地址: https://gitcode.com/gh_mirrors/ob/obs-advanced-timer 你是否曾在直播过程中手忙脚乱地查看时间,担心超时或错过重要…...

Steam游戏自动破解终极指南:如何用SteamAutoCrack重新想象游戏自由

Steam游戏自动破解终极指南:如何用SteamAutoCrack重新想象游戏自由 【免费下载链接】Steam-auto-crack Steam Game Automatic Cracker 项目地址: https://gitcode.com/gh_mirrors/st/Steam-auto-crack 你是否曾为合法购买的Steam游戏无法在离线环境下运行而困…...

微信聊天记录永久保存指南:用WeChatMsg打造你的数字记忆博物馆

微信聊天记录永久保存指南:用WeChatMsg打造你的数字记忆博物馆 【免费下载链接】WeChatMsg 提取微信聊天记录,将其导出成HTML、Word、CSV文档永久保存,对聊天记录进行分析生成年度聊天报告 项目地址: https://gitcode.com/GitHub_Trending/…...

微信小游戏实现汉字找茬找梗游戏(完整源码+详细教程)

先看效果:找茬找汉字闯关王 点击或则搜索即可。 一、项目介绍 汉字找茬、汉字找梗是当下热门的休闲益智类小游戏,依靠文字纠错、趣味识梗、诗词改错玩法,操作简单、趣味性强,十分适合作为微信小程序入门练手项目。 本文基于原…...

别再手动FTP了!用Java NFS Client把远程服务器文件当成本地目录来操作

告别FTP低效操作:Java NFS Client实现远程文件本地化编程实践 每次手动拖拽文件到FTP客户端时,你是否想过——这些重复操作本可以自动化完成?在分布式系统成为标配的今天,直接操作远程服务器文件应当像访问本地目录一样自然。本文…...

初创团队如何利用Taotoken低成本启动ai产品原型开发

初创团队如何利用Taotoken低成本启动AI产品原型开发 1. 资源有限情况下的技术选型挑战 初创团队在验证AI产品创意时,常面临模型选型与成本控制的双重压力。直接对接各大模型厂商需要分别申请API、管理多个密钥,且不同模型的计费方式和接口规范差异显著…...

Qt 5.15.2安装后,你的第一个‘Hello World’程序为什么跑不起来?常见环境配置坑点排查

Qt 5.15.2安装后"Hello World"程序运行失败的深度排查指南 当你满怀期待地完成Qt 5.15.2安装,准备编写第一个"Hello World"程序时,却发现项目无法构建或运行——这种挫败感我深有体会。作为从Qt 4.8时代一路走来的开发者&#xff0c…...

当DF-GAN遇上牛津花卉:从CUB-Bird迁移到Oxford-102的代码改造实战

DF-GAN模型迁移实战:从鸟类生成到花卉生成的深度改造指南 当你第一次尝试将训练好的DF-GAN模型从CUB-Bird数据集迁移到Oxford-102花卉数据集时,可能会遇到各种令人困惑的错误信息。这不是简单的数据集替换问题,而是需要深入理解两个数据集在结…...

WinClaw:Go语言实现的Windows轻量级自动化库实战指南

1. 项目概述:一个Windows环境下的轻量级自动化利器最近在折腾一些Windows环境下的自动化任务,比如批量重命名文件、定时清理日志、自动整理桌面截图,或者是一些需要重复点击的简单GUI操作。一开始想着用Python写脚本,但涉及到UI自…...

DeepSeek 上线识图模式迈向多模态交互,虽晚一步但表现仍值得期待

DeepSeek 上线识图模式,开启多模态交互新时代4 月 29 日,DeepSeek 网页版和 App 悄然上线了 "识图模式",支持上传图片进行内容理解与分析。这一功能的灰度测试,标志着 DeepSeek 从纯文本对话正式迈向多模态交互。在 Dee…...

腾讯混元推出极致量化压缩版翻译模型 Hy-MT1.5,440MB 本地运行,翻译质量超谷歌!

腾讯混元宣布推出极致量化压缩版本翻译模型 Hy-MT1.5-1.8B-1.25bit,将支持 33 种语言的翻译大模型压缩至 440MB,可在手机本地运行,且翻译质量优于谷歌翻译。模型特性:多语言支持与出色效果Hy-MT1.5 由腾讯混元团队打造&#xff0c…...

AI浪潮下中国PCB产业逆袭:从规模领先到技术争先,五大龙头各显神通

【导语:全球PCB产业聚光灯聚焦中国企业,它们正从“规模领先”迈向“技术争先”。本文深入剖析中国本土PCB军团竞争格局,对比五大龙头厂商发展模式,还展望了产业未来投资方向。】中国PCB厂商:从“大而不强”到生态位跃迁…...

AI“共情怂恿”致多起悲剧,普通人该如何与AI正确相处?

AI“魅魔”引发的致命悲剧上个月,美国联邦法院审理了一起特殊案件,36岁男子乔纳森为与谷歌大模型Gemini“转世相守”选择自杀,其父亲代表遗产方对谷歌提起诉讼。在生命最后56天里,乔纳森与被他命名为“Xia”的Gemini进行了4732条深…...

摩尔线程首份财报:营收高增但盈利待考,破局需拓展商业客群

摩尔线程披露首份年报及一季报4月26日晚间,摩尔线程披露上市以来首份年报及2026年一季报。据财报,其2025年全年营收15.06亿元,同比增长243.37%;2026年一季度营收7.38亿元,同比增长155.35%。营收增长与股价表现财报数据…...

如何精确计算3D模型体积?这个开源工具让你告别打印材料浪费

如何精确计算3D模型体积?这个开源工具让你告别打印材料浪费 【免费下载链接】STL-Volume-Model-Calculator STL Volume Model Calculator Python 项目地址: https://gitcode.com/gh_mirrors/st/STL-Volume-Model-Calculator 你是否曾经因为3D打印材料估算不准…...

2026年阿里云部署OpenClaw/Hermes Agent教程+百炼token Plan全流程攻略教程

2026年阿里云部署OpenClaw/Hermes Agent教程百炼token Plan全流程攻略教程 。OpenClaw和Hermes Agent是什么?OpenClaw和Hermes Agent怎么部署?如何部署OpenClaw/Hermes Agent?2026年还在为部署OpenClaw和Hermes Agent到处找教程踩坑吗&#x…...

GitHub加速插件:3分钟告别龟速下载,让代码克隆快如闪电

GitHub加速插件:3分钟告别龟速下载,让代码克隆快如闪电 【免费下载链接】Fast-GitHub 国内Github下载很慢,用上了这个插件后,下载速度嗖嗖嗖的~! 项目地址: https://gitcode.com/gh_mirrors/fa/Fast-GitHub 还在…...

实测 Taotoken 多模型聚合服务的延迟与稳定性表现

实测 Taotoken 多模型聚合服务的延迟与稳定性表现 1. 测试环境与准备 本次测试基于开发者日常使用场景,采用以下配置进行实测: 网络环境:家庭宽带与移动网络混合接入测试工具:curl 命令直接调用 API监控工具:Taotok…...

告别编译噩梦:用VSCode + CMake Tools插件无缝对接Visual Studio编译器(Win10/Win11实测)

告别编译噩梦:用VSCode CMake Tools插件无缝对接Visual Studio编译器(Win10/Win11实测) 在Windows平台上开发C项目时,许多开发者都面临一个两难选择:是使用功能全面但略显笨重的Visual Studio IDE,还是选择…...

3分钟学会:Windows电脑安装安卓应用的终极免费方案

3分钟学会:Windows电脑安装安卓应用的终极免费方案 【免费下载链接】APK-Installer An Android Application Installer for Windows 项目地址: https://gitcode.com/GitHub_Trending/ap/APK-Installer 还在为在Windows电脑上运行安卓应用而烦恼吗&#xff1f…...

科研/工作刚需|GEE完整学习路径(环境搭建→数据处理→10大案例→可视化

模块一: 遥感云计算基础与开发环境1.1 遥感云计算概述1.1.1 Earth Engine平台与生态系统GEE平台架构与技术特点Google Earth AI平台介绍与AlphaEarth Foundations模型原理与其他云计算平台(Microsoft Planetary Computer、PIE-Engine等)比较典型应用场景…...

大型语言模型推理评估与训练优化实践

1. 大型推理模型评估框架解析在人工智能领域,大型语言模型(LLM)的推理能力评估一直是研究热点。R-HORIZON评估框架的提出,为全面测试模型在代码生成和代理任务等复杂场景中的表现提供了系统化解决方案。这套评估体系的核心价值在于其多维度的测试维度设计…...

Agent 一接浏览器下载就开始拿错文件:从 Download Binding 到 Artifact Ledger 的工程实战

⚠️ 下载链路最危险的错,不是按钮点不动,而是拿到了“看起来像对的文件” 很多团队把 Browser Agent 接到报表导出、合同归档和工单附件流转后,最隐蔽的事故不是下载失败,而是下载成功却拿错了对象。⚠️ 用户明明在客户 A 的页面…...