通过ReentrantLock , 类来分析JUC包的基石 AbstractQueuedSynchronizer(后续简称 AQS)
- ReentrantLock skeleton
- AQS skeleton
- Lock 、unlock
- Condition
可重入锁(ReentrantLock skeleton)
类关系图(Class Diagrams)
从下图的类图来看,ReentrantLock
类实现了Lock
接口, 非阻塞式,超时支持.
更多细节参考 JDK-The-Concurrent-Framework
框架细节(skeleton)
锁的实现(Lock implement)
看ReentrantLock的源码我们可以发现,实现Lock接口的类 都由Sync 处理, Sync 类是extends AbstractQueuedSynchronizer. Sync 有两个实现,分别是公平和非公平锁
扩展功能(extended capabilities)
除了实现Lock 的方法外, 还具有扩展功能。获取等待队列,当前线程是否获得锁等监控
源码:
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}
static final class NonfairSync extends Sync{}
static final class FairSync extends Sync{}
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
...
}
抽象同步队列(AQS )
Class Diagrams
我们需要关注的是蓝色圈圈里的, AQS 其实底层用的是volatile 的Node节点 和 CAS 的操作, 实现阻塞队列的语义.
AQS skeleton
简单来说, AQS的实现可以从 双向节点、以及维护这些节点的方法 这两个方法去归类 ( 这里先不考虑Condition)
队列节点(Node)
数据结构(struct)
是一个双向链表,注意nextWaiter 是用于condition的Node
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
模式(mode)
共享锁( 例如: 读锁 )还是排斥锁
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
状态(waitStatus)
状态相关细节如下
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
队列节点的相关操作(Node operation)
CAS
对节点的cas操作
/**
* Setup to support compareAndSet. We need to natively implement
* this here: For the sake of permitting future enhancements, we
* cannot explicitly subclass AtomicInteger, which would be
* efficient and useful otherwise. So, as the lesser of evils, we
* natively implement using hotspot intrinsics API. And while we
* are at it, we do the same for other CASable fields (which could
* otherwise be done with atomic field updaters).
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* CAS waitStatus field of a node.
*/
private static final boolean compareAndSetWaitStatus(Node node,int expect,int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update);
}
/**
* CAS next field of a node.
*/
private static final boolean compareAndSetNext(Node node,Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
state
队列状态, 例如, ReentrantLock
中用于记录重入次数
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
/**
* The synchronization state.
*/
private volatile int state;
UnsupportedOperationException method
子类需要实现的方法, UnsupportedOperationException method
To use this class as the basis of a synchronizer, redefine the following methods, as applicable, by inspecting and/or modifying the synchronization state using getState()
, setState(int)
and/or compareAndSetState(int, int)
:
Each of these methods by default throws UnsupportedOperationException
onlyfinal
You may also find the inherited methods from AbstractOwnableSynchronizer
useful to keep track of the thread owning an exclusive synchronizer. You are encouraged to use them – this enables monitoring and diagnostic tools to assist users in determining which threads hold locks.
Condition
条件队列相关,下文分析
AQS impl example
ReentrantLock中的实现, 重点看看tryAcquire tryRelease的实现, 这里以Sync的实现NonfairSync (默认的非公平锁,注意非公平体现 不能保证FIFO,会造成线程饥饿, 但是好处是减少上下文切换)分析
Sync:
abstract static class Sync extends AbstractQueuedSynchronizer {
//抽象方法 留给子类实现
abstract void lock();
// 尝试加锁,初始化或者重入可以加锁成功
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//尝试释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//当前线程是否持有锁
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
...
}
NonfairSync :
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
加锁/解锁分析 (lock /unlock)
我们从java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
入手 , 来看看语言层面怎么实现同步语义( 注意,实现过程没有用到同步语义,所以我们要重点关注volidate,cas 以及顺序等)
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//CAS设置state,成功则获取锁,并设置当前占有锁的线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//否则进行获取锁的操作(阻塞、入队等)
acquire(1);
}
如果CAS获取失败, 调用java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
//获取失败 , 并且返回acquireQueued 返回true(阻塞状态被中断)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//回放中断
selfInterrupt();
}
下面, 依次分析下面三个方法:
- tryAcquire
- addWaiter
- acquireQueued
tryAcquire
在阻塞之前, 再次(注意在一开始lock方法中就尝试过一次, 可能是作者有意为之,或是…)尝试获取
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果当前state为0, 再次尝试获取
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//判断是否重入线程
else if (current == getExclusiveOwnerThread()) {
//这里操作无需加锁, 是线程安全的,其他线程是进不到这里
int nextc = c + acquires;
//最大可重入次数是Integer.MAX
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter
因为第一次CAS尝试逻辑和enq方法中一致, 所以我们只需要关注enq中的操作即可
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
//注意这里入参mode是EXCLUSIVE , 即上面Node中的mode
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//当tail队列不为空的时候,说明已经初始化过了, 直接进行一次CAS操作
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//否则进入enq
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//初始化队列
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//非原子操作, 会出现 "队列在prev方向一致,next方向不一致"
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
注意这里, 在插入过程中,会出现“node.prev指向旧的尾节点,但旧的尾节点.next为null未指向node(尽管,尾指针指向node)”的状态,即“队列在prev方向一致,next方向不一致”
acquireQueued
经过上述操作, 只是插入了队列, 状态并未变更, 所以这里要做的就是状态的维护
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果当前节点的前继节点是头节点,才开始尝试获取锁,获取成功则返回
if (p == head && tryAcquire(arg)) {
//Head 一开始为空节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//否则,阻塞或处理中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果获取失败, AQS.shouldParkAfterFailedAcquire()判断是否需要阻塞等待,如果需要,则通过AQS#parkAndCheckInterrupt()阻塞等待,直到被唤醒或被中断。
shouldParkAfterFailedAcquire
请求获取锁失败后处理, 是否需要阻塞判断,以及队列状态的维护
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//如果前继节点的状态大于0,则代表是取消状态,需要维护队列节点的状态
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
//因为当前node为尾节点,遍历队列, 找到未被取消的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//非原子操作, 会出现 "队列在prev方向一致,next方向不一致"
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//CAS修改ws的状态,直到ws的状态修改成功,才返回true需要阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
注意这里, 在插入过程中,会出现“node.prev指向非CACELLED的node,非CACELLED的node.next指向node(CACELLED)”的状态,即“队列在prev方向一致,next方向不一致”
parkAndCheckInterrupt
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
总结
队列状态
- 除了头节点,剩余节点都被阻塞,线程处于
WAITING
状态。 - 除了尾节点,剩余节点都满足
waitStatus==SIGNAL
,表示释放后需要唤醒后继节点。
tips:unlock的可以按照以上分析
一致性问题
为什么要从尾节点向前遍历,而不能从node向后遍历?这是因为,AQS中的等待队列基于一个弱一致性双向链表实现,允许某些时刻下,队列在prev方向一致,next方向不一致。 (上述已经分析了,这里做个总结)
理想情况下,队列每时每刻都处于一致的状态(强一致性模型),从node向后遍历找第一个未取消节点是更高效的做法。然而,维护一致性通常需要牺牲部分性能,为了进一步的提升性能,脑洞大开的神牛们想出了各种高性能的弱一致性模型。尽管模型允许了更多弱一致状态,但所有弱一致状态都在控制之下,不会出现一致性问题。
可见性问题
我们以tryRelease为例子, 看volidate的用法(对照tryAcquire分析tryRelease)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//先写exclusiveOwnerThread C
setExclusiveOwnerThread(null);
}
//再写state D
setState(c);
return free;
}
//
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//先读state A
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//再读exclusiveOwnerThread B
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
核心是三条Happens-Before规则:
程序顺序规则
:如果程序中操作A在操作B之前,那么在线程中操作A将在操作B之前执行。传递性
:如果操作A在操作B之前执行,并且操作B在操作C之前执行,那么操作A必须在操作C之前执行。volatile变量规则
:对volatile变量的写入操作必须在对该变量的读操作之前执行。
下面分析为什么C和D要按照这样的顺序
根据h-b规则可见性分析:
- A->B
- C->D
- D->A
- C->B
推出:
C(写exclusiveOwnerThread)->D(写state)->A(读state)->B(读exclusiveOwnerThread)
保证所有写对所有读可见
否则, 假设C和D操作互换下
- A(读state)->B(读ex)
- C(写state)->D(写ex)
- C->A
- D->B
推出:
- C->D->B
- C->A->B
D(写ex)和A(读state)没有h-b关系
条件分析(Condition)
Lock接口对标内置锁,而Condition接口对标内置条件队列。Condition主要提供了await、signal两种语义,和两种语义的衍生品。
类关系图(Class Diagrams)
与AQS skeleton类似,也是维护了一个条件队列,下面以await和signal()分析
await
可以先简单理解没有中断版本的,只是设置中断标记,不会抛出异常
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
注意调用await,会释放当前线程持有的Lock,知道被通知或中断(signal)
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 创建等待节点并放入队尾
Node node = addConditionWaiter();
//释放锁并保存释放前的状态(释放的是Lock,否则外部无法获取)
int savedState = fullyRelease(node);
/***** 无锁状态开始 *****/
int interruptMode = 0;
//如果不在AQS的同步队列中(signal中入队), 阻塞,直到收到信号或被中断
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//现在node的是AQS条件队列上的节点,后续的操作 与#lock方法的类似
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//恢复申请锁状态,并更新中断状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
/***** 无锁状态结束 *****/
//清理已被取消的节点,则表示在doSignal中没有完成 =null的操作
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//如果之前发生中断,则更具中断模式重放中断
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
ConditionObject#await()同ReentrantLock#lockInterruptibly()一样,都是可中断的:调用ConditionObject#await()后,当前线程将保持阻塞,直到收到信号或被中断。
addConditionWaiter
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#addConditionWaiter
创建节点, 并放入尾部
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 维护队列状态
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//队列是否为空,初始化头节点
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
需要提醒一下,尽管此处没有任何线程安全的保护,但实际使用时不会出现任何线程安全问题——因为条件队列的使用要求我们在调用await或signal时持有与该条件队列唯一相关的锁。(和AQS中的队列的操作相比,没有CAS操作)
到这里,得到两个ReentrantLock与ConditionObject在实现上的重要区别:
ReentrantLock创建的节点,初始状态为0;而ConditionObject创建的节点,初始状态为
CONDITION==-2
。ReentrantLock使用AQS内置的等待队列,由AQS维护;而每个ConditionObject都维护自己的等待队列。
fullyRelease(释放Lock)
java.util.concurrent.locks.AbstractQueuedSynchronizer#fullyRelease
释放锁并保存释放前的状态,重入次数 (释放的是Lock,否则外部无法获取)
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final long fullyRelease(Node node) {
boolean failed = true;
try {
long savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
//必须在Lock之后才能调用wait(),与unlock()同理
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
release可以看之前的tryrelease分析,AQS#fullyRelease()返回后,调用ConditionObject#await()的线程就释放了锁(这里必须释放锁, 否则就会出现其他调用Lock的地方无法获取锁,阻塞)
isOnSyncQueue(阻塞)
java.util.concurrent.locks.AbstractQueuedSynchronizer#isOnSyncQueue
阻塞,直到收到信号或被中断
// Internal support methods for Conditions
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
//如果是头节点 或者当前节点是CONDITION
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* @return true if present
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
先 跳转到 signal 的分析
AQS#isOnSyncQueue()判断节点node是否已经被放入了AQS内部的等待队列,是的话返回true,否则返回false。主要分几种情况:
- 如果
node.waitStatus == CONDITION
,则一定未放入。因为AQS#transferForSignal()6行还没来得及执行。 - 如果
node.prev == null
,则一定未放入。因为AQS#transferForSignal()6行执行完但9行未执行完。 - 如果
node.next != null
,则一定已放入。因为已经有了后继节点,则node本身肯定已经完成入队(ConditionObject内部的等待使用的后继指针为nextWaiter
)。 - 否则,说明满足
node.waitStatus != CONDITION && node.prev != null && node.next == null
,该状态无法确定node处于“未放入”还是“已放入”的状态。回忆AQS#enq()可知,node.prev != null
时,可能正在尝试CAS插入node,无法确定是在插入前还是插入后,也无法确定是否插入成功。AQS#findNodeFromTail()从尾节点开始遍历,如果能够遍历到node,则一定已放入(当然,next方向不一定满足一致性);否则,当前时刻还未插入或未插入成功,即一定未放入。
checkInterruptWhileWaiting (检查是否被中断)
判断是被中断(因为await是允许中断的,所以需要注意catch中断异常的处理)还是被唤醒
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#checkInterruptWhileWaiting
- 返回THROW_IE 如果中断发生在singnalled之前 ,抛出异常
- 返回REINTERRUPT 如果中断发生在signalled之后 , 只是简单设置状态
- 返回0 如果未发生过中断
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
//是否发生了中断,否则返回0
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
//发生在中断之前
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
首先,根据Thread.interrupted()
判断是否发生了中断。如果未发生中断,则Thread.interrupted()
返回false,ConditionObject#checkInterruptWhileWaiting()最终返回0。
否则,继续调用AQS#transferAfterCancelledWait()判断发生中断的时机。如果是在收到信号前发生了中断,AQS#transferForSignal()6行还没来得及执行,必然满足node.waitStatus == CONDITION
,则CAS设置node.waitStatus并将node入队(等待后面调用AQS#acquireQueued()竞争锁),然后返回true,ConditionObject#checkInterruptWhileWaiting()最终返回THROW_IE == -1。
否则,一定是在收到信号后发生了中断,但可能AQS#transferForSignal()6行执行完但9行未执行完,即node未完成入队,因此,21-22行空等待至node完成入队,然后返回false,ConditionObject#checkInterruptWhileWaiting()最终返回REINTERRUPT == 1。
终于跳出了循环,要重新申请锁了。(与#lock方法类似的就不重复分析)
信号前中断的特殊情况
信号前中断会导致node同时处于AQS与ConditionObject两方的等待队列中(使用不同的指针连接节点):
而另外两种情况下,节点都被迁入了AQS内部等待队列。
因此,只要ConditionObject内部等待队列中的节点满足node.waitStatus == SIGNAL
或node.waitStatus == 0
,就可以判断其同时位于于AQS与ConditionObject两方的等待队列中,也就能断定该节点属于信号前中断唤醒。
信号前中断唤醒的节点是无效的,需要被清理,可以用该条件找出这部分节点。该结论将在分析ConditionObject#unlinkCancelledWaiters()时派上用场。
unlinkCancelledWaiters
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#unlinkCancelledWaiters
实际上,ConditionObject#unlinkCancelledWaiters()用于清理ConditionObject内部等待队列中的非CONDITION节点。具体来说:
- 如果ConditionObject#await()时(更确切的说,AQS#fullyRelease()完全释放锁时)失败,节点转为CANCELLED状态,需要被清理。
- 分析AQS#transferAfterCancelledWait()有一个结论,信号前中断会导致node同时处于AQS与ConditionObject两方的等待队列中(使用不同的指针连接节点)。这些节点将随着AQS#acquireQueued()的执行转为SIGNAL或0状态。但由于ConditionObject#await()在收到信号前(更确切的说,在AQS#fullyRelease()完全释放锁后、收到信号前)被中断,因此上述节点也是无效的,需要被清理。
而被信号唤醒或信号后中断唤醒的节点,将首先移出ConditionObject内部等待队列,再进行状态转换。
综上,只需要清理ConditionObject内部等待队列中的非CONDITION节点
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
//当前节点
Node t = firstWaiter;
//上一个节点
Node trail = null;
while (t != null) {
//下一个节点
Node next = t.nextWaiter;
//不是CONDITION,移除当前节点(就是把上个节点的nextWaiter执行下个节点)
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
//如果trail为空,说明是头节点,直接替换firstWaiter
if (trail == null)
firstWaiter = next;
else //否则把上一个节点指向下一个节点
trail.nextWaiter = next;
//设置尾节点
if (next == null)
lastWaiter = trail;
}
else //保留当前节点为上个节点
trail = t;
//移动当前节点,开始下一轮
t = next;
}
}
reportInterruptAfterWait
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#reportInterruptAfterWait
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
signal
唤醒一个等待在Conditon上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//表示有节点进入了条件队列内部的等待队列,需要被唤醒
if (first != null)
doSignal(first);
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
//firstWaiter后移一位
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//试将条件队列内部的等待节点转换为AQS内部的等待节点,如果当前节点转换失败,就继续尝试下一节点
} while (!transferForSignal(first) &&(first = firstWaiter) != null);
}
transferForSignal
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//入队,返回旧的队尾节点,也就是新节点node的前继节点。
Node p = enq(node);
//只有正在执行AQS#acquireQueued()的线程T1能够与正在执行到这里的线程T2发生竞争。
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
回到ConditionObject#doSignal()与ConditionObject#signal(),成功结束。对于用户而言,线程此时已经被唤醒,尽管其大概率还在AQS内部阻塞排队,等待重新获得锁。
实际上,就算去掉10-12行也是满足正确性要求的。因为线程T2释放锁后,依然会将从队头开始的第一个非取消节点唤醒,该节点会继续ConditionObject#await()中的工作
总结
await可以简单认为是用于维护ConditionObject队列,可以分为阻塞前和阻塞后操作
阻塞前: 插入内部等待队列
阻塞后: 根据是否中断(一致性问题),来维护状态
signal可以简单的认为 将ConditionObject队列中的节点移动到AQS队列
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 951488791@qq.com