AbstractQueuedSynchronizer
AQS重要性:
Java => JVM
JUC => AQS
4.11.1 前置知识
- 公平锁和非公平锁
- 可重入锁
- 自旋思想
- LockSupport
- 双向链表数据结构
- 模板设计模式
4.11.2 AQS入门级理论知识
- AQS定义
抽象的队列同步器
- 是用来实现锁或者其它同步器组件的公共基础部分的抽象实现,是重量级基础框架及整个JUC体系的基石,主要用于解决锁分配给"谁"的问题
- 整体就是一个抽象的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量来表示持有锁的状态。
- 为什么AQS是JUC内容的基石
- 与AQS有关的类
- ReentrentLock
- CountDownLatch
- ReentrantReadWriteLock
- Semaphere
- ....
- 锁和同步器的关系
- 锁:面向锁的使用者:隐藏实现细节,调用即可
- 同步器:面向锁的实现者:提出了统一规范并简化了锁的实现,将其抽象出来屏蔽了同步状态管理、同步队列的管理和维护、阻塞线程排队和通知、唤醒机制等。是一切锁和同步组件实现的基石--- 公共基础部分。
- 锁:面向锁的使用者:隐藏实现细节,调用即可
- 与AQS有关的类
- 作用
- 加锁会导致阻塞: 有阻塞就需要排队,实现排队就必然需要队列
- 基本结构
- 同步状态
state
:volatile int state
及相应的CAS操作 - FIFO等待队列:CLH锁队列的变体,是一个虚拟的双向链表(Node节点)
- 模板方法:由子类实现的
tryAcquire
,tryRelease
等方法
- 同步状态
抢到资源的线程直接使用处理业务,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了。暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。
既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH (FIFO)
队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS同步队列的抽象表现。它将要请求共享资源的线程及自身的等待状态封装成队列的结点对象(Node
),通过CAS、自旋以及LockSuppor.park()的方式,维护state变量的状态,使并发达到同步的效果。
public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable {//CLH Nodesabstract static class Node {volatile Node prev; // initially attached via casTailvolatile Node next; // visibly nonnull when signallableThread waiter; // visibly nonnull when enqueuedvolatile int status; // written by owner, atomic bit ops by others...}// The synchronization state.private volatile int state;...
}
AQS使用一个volatie的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值的修改
4.11.3 AQS源码分析前置知识储备
AQS内部体系架构:
- 同步状态
state
:volatile int state
及相应的CAS操作 - FIFO等待队列:CLH锁队列的变体,是一个虚拟的双向链表(Node节点)
- 模板方法:由子类实现的
tryAcquire
,tryRelease
等方法
4.11.4 AQS源码深度讲解和分析
(1)Lock接口的实现类,基本都是通过聚合
了一个队列同步器
的子类完成线程访问控制的
public static class ReadLock implements Lock, java.io.Serializable {private final Sync sync;...
}
(2)ReentrantLock的原理
从ReentranLock解读AQS源码
// 构造器
public ReentrantLock() {sync = new NonfairSync(); // 默认非公平锁
}
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync(); // 根据fair来创建
}
// Sync extends AbstractQueuedSynchronizer@ReservedStackAccessprotected final boolean tryRelease(int releases) { // AQS 抽象方法int c = getState() - releases;if (getExclusiveOwnerThread() != Thread.currentThread())throw new IllegalMonitorStateException();boolean free = (c == 0);if (free)setExclusiveOwnerThread(null);setState(c);return free;}
对比公平锁和非公平锁的 tryAcquire()方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁中少了一个判断 !hasQueuedPredecessors()
hasQueuedPredecessors()
中判断了是否需要排队,导致公平锁和非公平锁的差异如下:
-
公平锁: 公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;
-
非公平锁: 不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程苏醒后,不一定就是排头的这个线程获得锁,它还是需要参加竞争锁(存在线程竞争的情况下),后来的线程可能不讲武德插队夺锁了。
(3)从最简单的lock方法开始看公平锁和非公平锁
// ReentrantLock类
public void lock() {sync.lock(); // FairSync extends Sync 非公平锁; NonfairSync extends Sync 公平锁
}// Sync类@ReservedStackAccessfinal void lock() {if (!initialTryLock())acquire(1);}// NonfairSync extends Syncfinal boolean initialTryLock() { // 定义首次尝试获取锁的方法。该方法被标记为 final,防止子类重写。// Sync 抽象方法Thread current = Thread.currentThread();if (compareAndSetState(0, 1)) { // first attempt is unguarded // 尝试CAS将锁状态从0改为1(无锁->锁定)setExclusiveOwnerThread(current); // 成功获取锁:将当前线程设置为独占所有者return true; // 返回成功获取锁} else if (getExclusiveOwnerThread() == current) { // 检查当前线程是否已是锁的持有者(可重入性检查)int c = getState() + 1; // 是持有者:计算新的重入计数(当前状态值+1)if (c < 0) // overflow // 检查是否超过最大重入次数(int溢出)throw new Error("Maximum lock count exceeded"); // 抛出错误:重入次数超过上限setState(c); // 安全更新锁状态(增加重入计数)return true; // 返回重入成功} elsereturn false; // 获取失败:锁被其他线程持有}/** FairSync extends Sync * Acquires only if reentrant or queue is empty.*/final boolean initialTryLock() {Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedThreads() && compareAndSetState(0, 1)) {setExclusiveOwnerThread(current);return true;}} else if (getExclusiveOwnerThread() == current) {if (++c < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(c);return true;}return false;}// AQS类
public final void acquire(int arg) {if (!tryAcquire(arg))acquire(null, arg, false, false, false, 0L);
}// NonfairSync extends Sync 非公平锁类/*** Acquire for non-reentrant cases after initialTryLock prescreen*/protected final boolean tryAcquire(int acquires) { // AQS 抽象方法if (getState() == 0 && compareAndSetState(0, acquires)) { // 锁空闲时直接CAS抢锁,不检查等待队列,体现“插队”非公平性setExclusiveOwnerThread(Thread.currentThread()); // 立即记录当前线程为独占所有者,支持可重入return true;}return false; // 获取锁失败(锁被占用或CAS竞争失败)}// FairSync extends Sync 公平锁类//Acquires only if thread is first waiter or empty protected final boolean tryAcquire(int acquires) {if (getState() == 0 && !hasQueuedPredecessors() && // 锁空闲时,先检查等待队列是否有更早的线程(hasQueuedPredecessors)compareAndSetState(0, acquires)) { // 保证FIFO公平性再进行CAS枪锁setExclusiveOwnerThread(Thread.currentThread()); // 抢锁成功,记录当前线程为独占所有者return true;}return false; // 获取锁失败(锁被占用、有更早等待线程或CAS失败)}// AQS的aquire方法: 入队node.waiter = current;Node t = tail;node.setPrevRelaxed(t); // avoid unnecessary fenceif (t == null)tryInitializeHead();else if (!casTail(t, node))node.setPrevRelaxed(null); // back outelset.next = node;
(4)unlock
// ReentrantLock
public void unlock() {sync.release(1);
}@ReservedStackAccess
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (getExclusiveOwnerThread() != Thread.currentThread())throw new IllegalMonitorStateException();boolean free = (c == 0);if (free)setExclusiveOwnerThread(null);setState(c);return free;
}// AQS 类
public final boolean release(int arg) {if (tryRelease(arg)) {signalNext(head);return true;}return false;
}
(5)总结
- 获取锁流程
- 非公平锁:
- CAS尝试加锁
- 加锁成功返回true,失败返回false
- 失败进入队列挂起等待通知
- 公平锁
- 尝试加锁
- 前面有线程排队,加锁失败,将排在队列尾部
- 前面没有线程排队,CAS尝试加锁
- 加锁成功,返回true
- 加锁失败,返回false
- 失败进入队列挂起等待通知
- 非公平锁:
- 释放锁流程
- 当前线程不是加锁线程,抛出异常
int c = getStatus() - 1;
- c 等于0,则将当前owner置null
- 如果
c != 0
, 则status(重入次数) - 1
, 并且返回false