AQS源码分析(2)——Condition条件队列

Condition 简介

Conditionawait()/signal()/signalAll()方法提供了Objectwait()/notify()/notifyAll()方法的替代品; 此外,还对其进行了增强:一个锁可以有多个Condition队列,以实现精准的唤醒。 本文主要通过源码来看看ConditionAQS里是怎么实现的。

AQS中,ConditionObject实现了Serializable接口,但是所有字段都是transient的,因此一旦反序列化,等待队列将会清空。 主要属性有四个:

1
2
3
4
5
6
7
// 等待队列头结点
private transient Node firstWaiter;
// 等待队列尾节点
private transient Node lastWaiter;
// 从等待中唤醒后对中断的两个处理策略(重新中断和抛出异常)
private static final int REINTERRUPT =  1;
private static final int THROW_IE    = -1;

代码分析

等待方法 await

等待方法有四个版本,分别是:

  1. await():无超时机制,可中断
  2. awaitNanos(long):有超时机制,可中断
  3. awaitUntil(Date):有超时机制,可中断
  4. await(long, TimeUnit):有超时机制,可中断
  5. awaitUninterruptibly():无超时机制,不可中断

下文以第1个无超时机制、可中断的等待方法为例来看一下代码,其他的方法后面再谈。

await()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final void await() throws InterruptedException {
    // 进方法先看看线程是否被中断了
    if (Thread.interrupted())
        throw new InterruptedException();
    
    // 将节点加到条件队列
    Node node = addConditionWaiter();
    
    // 完全释放锁,并用一个变量记录此时重入次数,在唤醒后可直接设置相应的次数
    int savedState = fullyRelease(node);
    
    int interruptMode = 0;
    // 两种情况离开循环
    // 1、已经在等待队列中了
    // 2、在等待的时候被中断了
    while (!isOnSyncQueue(node)) {
        // 在这里挂起,等待signal唤醒
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // 唤醒后进入等待队列,等待获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

addConditionWaiter()

将当前线程包装进node,并移入条件队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 若条件队列的最后一个节点已取消,则将其清出队列
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    
    // node初始化的状态为CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    
    // 尾节点为null说明队列中还没有节点,直接将当前节点设置为头结点
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

unlinkCancelledWaiters()

清除条件队列中取消的节点,纯链表操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            // 尾节点为null说明目前从队头开始所有节点都是取消的
            // 头结点一直向后移动
            if (trail == null)
                firstWaiter = next;
            else
                // 头结点固定后,尾节点一直后移
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }

fullyRelease()

完全释放锁,并返回当前state值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        // 若锁释放失败(没持有锁却去释放了/await了),就将该节点状态更新为CANCELLED
        // 此时已经进入条件队列,但是会被后续的节点清出
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

isOnSyncQueue()

判断是否已在等待队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
final boolean isOnSyncQueue(Node node) {
    // 等待状态是CONDITION(进入等待队列会被置为0),或者没有前置节点,那么久还在条件队列
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    
    // 有后继节点了,那肯定在等待队列中了
    if (node.next != null)
        return true;

    // 前置节点不为空不代表就在队列中了,因为有可能在入队的时候设置了前驱节点后,
    // CAS将自己设置为尾节点失败了,导致还不在等待队列中。
    // 此时需要从尾部去遍历查找,这种情况下一般都是在接近尾部地方,所以也不会遍历太多节点
    return findNodeFromTail(node);
}

findNodeFromTail()

从尾部遍历等待队列,判断是否在队列中

1
2
3
4
5
6
7
8
9
10
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

中断检查及处理

被唤醒后先进行中断检查,如果没有中断且还没被移入等待队列(意外唤醒),就将继续挂起; 否则就退出循环,根据中断类型进行处理了。

1
2
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    break;

检查在条件队列中排队时的中断情况:

  • signal之前被中断,返回THROW_IE;
  • signal之后被中断,返回REINTERRUPT;
  • 未被中断,返回0
1
2
3
4
5
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
1
2
3
4
5
6
7
8
9
10
11
// 如果排队获取锁的过程中被中断了,并且在条件队列中的中断类型不是“抛出异常”(THROW_IE),
// 就将中断类型设置为重新中断(REINTERRUPT),其实就只是设置中断标记。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;

// 又见清理取消节点,刚才是在加入条件队列的时候清理了一次
if (node.nextWaiter != null) 
    unlinkCancelledWaiters();

if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

根据中断类型分别处理:

  • THROW_IE:抛出异常
  • REINTERRUPT:标记一下中断标记
1
2
3
4
5
6
7
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

通知方法 signal

通知方法有通知1个和全部通知中两种:

  1. signal()
  2. signalAll()

两者的区别就在于将条件队列队头节点移到等待队列,或者是将所有节点都移到等待队列。 下文以signal()为例,分析一下代码:

signal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final void signal() {
    // 如果没有持有锁就抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

// 若转移队列失败(节点已取消)
// 就从前往后在条件队列里找合适的节点唤醒
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

transferForSignal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final boolean transferForSignal(Node node) {
    // 若CAS设置失败,则说明已取消
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 入等待队列,返回的是前驱节点
    Node p = enq(node);
    
    // 设置前驱节点的等待状态为SIGNAL
    // 若前驱节点已取消,或设置失败,就唤醒当前节点
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

这个时候唤醒节点做什么?

AQS源码分析(1)——从ReentrantLock初窥AQS中的 2.1.6 AQS的acquireQueued()方法里其实提到过, 在排队中被唤醒,会尝试去获取锁,如果获取失败,就会去挂起,在挂起之前会将前驱节点的等待状态设置为SIGNAL的。