AQS源码分析(1)——从RenentrantLock初窥AQS

AQS 简介

AbstractQueuedSynchronizer,抽象队列同步器,为JUC包中的各阻塞锁和同步器 提供了一个框架,其基于一个FIFO队列(CLH队列的一种变体),以模板设计模式,为大 多数同步器提供了一个基本的实现。

两个内部类

Node:队列节点类

Node类是AQS的FIFO等待队列的节点类,其主要有以下字段属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    // 共享模式下等待的节点标记
    static final Node SHARED = new Node();
    // 独占模式下等待的节点标记
    static final Node EXCLUSIVE = null;
    
    // 等待状态值:当前节点的线程已取消等待(可能是因为超时、中断等原因,一旦标记则不再变化)
    static final int CANCELLED =  1;
    // 等待状态值:当前节点的后继节点需要唤醒(也可以理解成后继节点的状态,在当前节点取消或释放锁时进行唤醒操作)
    static final int SIGNAL    = -1;
    // 等待状态值:当前节点在条件队列中排队等待(被移入等待队列后会设置成0)
    static final int CONDITION = -2;
    // 等待状态值:下一个共享锁的获取操作将无条件扩散(没有理解)
    static final int PROPAGATE = -3;
    // 等待状态,取值为上4种,以及默认值0/
    volatile int waitStatus;

    // 前驱节点
    volatile Node prev;
    // 后置节点
    volatile Node next;

    // 将当前节点入队的线程,即当前线程
    volatile Thread thread;

    // 在条件队列中使用
    Node nextWaiter;

ConditionObject:条件类

ConditionObject实现了Condition接口,用于实现Objectwait/notify/notifyAll方法。

相比ObjectCondition更加灵活,最大的优势就是一个锁对象可以有多个条件队列,实现精准唤醒。

主要属性有四个:

1
2
3
4
5
6
7
    // 等待队列头结点
    private transient Node firstWaiter;
    // 等待队列尾节点
    private transient Node lastWaiter;
    // 从等待中唤醒后对中断的两个处理策略(重新中断和抛出异常)
    private static final int REINTERRUPT =  1;
    private static final int THROW_IE    = -1;

关于Condition的代码分析将在下一篇中进行。

AQS自身的几个属性

1
2
3
4
5
6
7
8
9
10
    // 等待队列的头结点,其实并不在队列中。可以理解成当前占有锁的线程所在的节点(并不一定,
    // 看后面进一步说明)等待队列实际上是CLH队列锁的一种变种实现,有个特点就是需要一个虚
    // 头节点作为开始,但不需要在一开始就创建,因为如果从不产生竞争的话就会造成浪费。只有
    // 在第一次竞争发生时,才去创建。(看不懂的看下面代码分析)
    private transient volatile Node head;
    // 等待队列的尾节点
    private transient volatile Node tail;
    // 同步状态,获取/释放锁都要更新这个状态,在不同的实现里有不同含义
    // 如在ReentrantLock中代表重入的次数
    private volatile int state;

AQS子类的实现方式

AQS的子类一般被实现为内部辅助类,如ReentrantLock中的Sync抽象类 (有公平锁FairSync和非公平锁NonfairSync两种内部实现类)。

子类需要实现的几个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}    

可以看出,这几个方法分为了共享和独占两种情况,默认会抛出不支持操作异常。

子类可根据要实现的锁的类型,选择部分(如ReentrantLock)或全部(ReentrantReadWriteLock)进行实现。

显然,AQS的实现采用了模板设计模式

下文将从ReentrantLock为入口,来一窥AQS的精要~

代码分析

获取锁

ReentrantLocklock()方法

1
2
3
public void lock() {
    sync.lock();
}

Synclock()方法

在这里 公平锁和非公平锁出现了第一个不同:非公平锁会立刻去抢一次锁,失败了才走正常流程

1
2
3
4
5
6
7
8
9
10
11
12
13
// FairSync实现的 
final void lock() {
    acquire(1);
}

// NonfairSync实现的 
final void lock() {
    // 先抢一下,失败了才走正常流程
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

AQSacquire()方法

1
2
3
4
5
1
2
3
4
5
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

ReentrantLocktryAcquire()方法

tryAcquire()方法需要实现类自己去实现,在公平锁中,是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 当前锁是可以用的,但是得先看看队列中是否已有其他等待者的waiter了
        if (!hasQueuedPredecessors() &&
            // 试着CAS抢一下锁,抢不到说明被其他线程抢先了(不是队列里的,队列里没有线程)
            // 注意这里抢到了就只是设置了一下state,并没有进行head初始化
            compareAndSetState(0, acquires)) {
            // 抢到了就标记当前锁的占有者是本线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 进到这个分支就代表是重入了,当前独占锁的线程就是本次要来获取的线程
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

hasQueuedPredecessors方法判断队列中是否有其他等待者:

1
2
3
4
5
6
7
8
9
10
11
// 有两种情况会导致返回false:
// 1. 头节点 == 尾节点,说明队列还是空的,头节点还没被初始化;
// 2. 等待队列中没有其他线程等待
public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // 头节点和尾节点
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

再来看看非公平锁的实现:这里和公平锁有第二个不同,锁可用时不管队列里有没有排队的,先抢一波试试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 发现锁可用,就先抢一波
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

AQSaddWaiter()方法

这里回到AQSacquire方法,tryAcquire失败后,就要乖乖排队了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 若为null,说明头节点还没有初始化
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 失败的原因有2:
    // 1. 头节点还没有初始化
    // 2. 其他线程抢先了
    enq(node);
    return node;
}

enq方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 这是失败原因1:头节点未初始化
        // 初始化完了进入下个循环,再入队
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 这里是和addWaiter里一样的,只不过是自旋去入队
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

AQSacquireQueued()方法

正常情况下,到这里就已经成功入队了,然后回到AQSacquire方法:

1
2
3
4
5
1
2
3
4
5
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquireQueued方法中,将完成排队获取锁的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 若前置节点是head,可以去尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 要么前置节点不是head,要么获取失败
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 挂起线程
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire方法判断是否要挂起当前线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// pred为node的前置节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 若前置节点的等待状态时SIGNAL(-1),则可以安全挂起
    if (ws == Node.SIGNAL)
        return true;
    // 前置节点的等待状态大于0(只有CANCEL大于0),则向前找,直到找到一个不大于0的
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 前置节点等待状态为0或者PROPAGATE,这时CAS去设置为SIGNAL,但是不立刻就挂起
        // PROPAGATE先不谈,为何为0呢?
        // 所以每个新的node入队时,waitStatu都是0
        // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
        // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 为何不挂起?有可能经过刚才的向前找合适的前驱节点后,当前节点已经是head的直接后继节点了,就不需要挂起了
    return false;
}

简单以一个流程图总结一下:
这里有图

释放锁

ReentrantLockunlock()方法

1
2
3
public void unlock() {
    sync.release(1);
}

AQSrelease()方法

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

ReentrantLocktryRelease方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 若非当前持有锁的线程,就抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // c 等于 0,则代表重入次数都清空了,可以释放锁了
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

AQSunparkSuccessor方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void unparkSuccessor(Node node) {
    // 成不成功无所谓,反正锁也用完了,在下面都要唤醒后继节点了
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 唤醒后继节点,一般就是后面的一个,但若已取消或为null,
    // 就从尾节点开始向前找到最前面的一个没有取消且不为null
    // 的节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

最后来看一下唤醒后的线程怎么运行吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 若前置节点是head,可以去尝试获取锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 要么前置节点不是head,要么获取失败
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 挂起线程
                parkAndCheckInterrupt())
                interrupted = true;
            // 如果没有发生过中断,那就不会进条件
            // 在下一个循环中就会去获取锁了
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

–未完待续–

参考文章:

  1. 一行一行源码分析清楚AbstractQueuedSynchronizer