AQS源码分析(3)——超时与中断

本文概要

AQS(1)中,从ReentrantLock入手,分析了获取和释放锁的源代码;在AQS(2)中,走了一遍Condition条件队列的等待与唤醒。 在两篇文章中,都只是找了典型的几个方法,如ReentrantLocklock()/unlock(),以及 Conditionawait()/signal()。事实上,还有带超时机制,和中断机制的方法,本文就来查漏补缺,大概过一遍 这几个方法。

ReentrantLocklock

ReentrantLock的加锁方法有二:

  1. lock()
  2. lockInterruptibly()

顾名思义,一个是可被中断,一个是不可被中断。先来回顾一下不可被中断的:

不可被中断的获取锁方法 lock()

详细分析见AQS1,这里只贴最关键的部分:

1
2
3
4
5
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

什么时候回进入到if分支呢?就是在排队的时候,如果被唤醒,会去检查一下有没有被中断过,若有的话在获取锁后返回进入该分支。

看一下acquireQueued方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// 即使中断唤醒后,也只是检测一下中断标记并返回,不会有什么额外的举动
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

其中selfInterrupt()方法如下:

1
2
3
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

就是标记一下中断状态。此时方法的调用者可以去检测一下该中断标记,进行相应处理,也可以啥也不做。

也就是说,中断线程不会影响该线程抢锁

可被中断的获取锁方法 lockInterruptibly()

二话不说,上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    // 既然是可被中断的,进来就先判断一下
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            // 区别在这里:如果唤醒后发现中断过,就抛出异常了
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 如果是抛出异常,fail就是true了,进入该分支
        if (failed)
            cancelAcquire(node);
    }
}

可中断和不可被中断的区别就是:

  • 不可被中断:发生中断,仅仅标记中断状态;
  • 可中断:发生中断,抛出异常,停止获取锁

再来看一下这个 cancelAcquire方法吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // 找到一个没有取消的前驱节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // 若当前节点为尾节点,则将尾节点设为前驱节点,并将其后继节点设为null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // 若前驱节点已经是头结点了,或前驱节点不是SIGNAL状态且无法将其设置成SIGNAL状态
        // 就唤醒后置节点,让其自行处理
        // 否则只是将后置节点设置为前驱节点的next节点
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

顺便一提的 tryLock()tryLock(long, TimeUnit)

tryLock() 只在锁没有被其他线程持有的时候可以获取,获取不到不排队,返回false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

tryLock(long, TimeUnit) 在超时后会抛出异常来停止获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            // 不一定每次获取失败都挂起,有个自旋时间阈值,小于该值就自旋,默认为1000纳秒
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

Conditionawait

等待方法有四个版本,分别是:

  1. await():无超时机制,可中断
  2. awaitNanos(long):有超时机制,可中断
  3. awaitUntil(Date):有超时机制,可中断
  4. await(long, TimeUnit):有超时机制,可中断
  5. awaitUninterruptibly():无超时机制,不可中断

awaitNanos(long) 为例来看看带超时机制的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // 当前时间 + 等待时间 = 超时时间
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        
        // 时辰已到,需要转移
        if (nanosTimeout <= 0L) {
            // 自己入队,或已经被signal唤醒入队了
            transferAfterCancelledWait(node);
            break;
        }
        // 时间还大于自旋阈值的话,就挂起,否则自旋
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return deadline - System.nanoTime();
}

不抛出异常的awaitUninterruptibly()方法:

1
2
3
4
5
6
7
8
9
10
11
12
public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

小结

本文主要讲了AQS超时中断两种情况下的不同方法,简单来说有以下几点:

  • 超时机制下,会有一个自旋阈值时间的变量来控制是否要挂起线程,当小于阈值时,自旋性能要更好,也就没必要挂起了;
  • 可否中断主要取决于方法对中断信号的处理方式,可以抛出异常来中断在队列中的等待,也可忽略不管