深入理解AQS - ReentrantLock实现

本节我们看下Lock实现类之一ReentrantLock。我们先浏览下Lock类中的声明的接口:

lock方法用来获取锁,如果锁不可用,会阻塞当前线程。lock支持可中断方式lockInterruptibly,在未获取到锁的情况下,可以通过Thread.interrupt来中断锁的获取,同样提供tryLock以及变体,尝试获取锁。

在使用完毕后可以使用unlock释放锁。

如果需要线程等待可以使用newCondition创建一个Condition实例,调用相应的方法。

我们主要看下关于锁相关的实现原理。

ReentrantLock当调用lock发生了什么?

ReentrantLock是一种可重入互斥锁,与synchronized有相同的作用,同时提供了更多的灵活性。在其内部实现了公平的和非公平两种同步机制,默认为非公平的。我们先看下非公平的实现代码:

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

在非公平模式下获取锁,会先使用CAS方式设置AQSstate = 1。如果修改成功,将当前线程变为独占线程,则获取锁成功。如果当前有其他线程持有锁会,会进入acquire(1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

上面是调用acquire(1)的流程,方法内部首先进行尝试获取锁操作tryAcquirenonfairTryAcquire方法中首先获取state的值,如果为0表示之前线程已经释放了锁,跟首次获取锁流程一致。

我们假设锁被占用情况下,会继续判断请求锁的线程是当前占有锁的线程,如果是会将status进行+1操作,这个操作出现在,使用同一个锁的两个方法A和B,假设线程1调用A方法获取到锁,A方法内调用了B,这个就标识锁的可重入。

假设当前线程非获取到锁的线程,流程会进入到acquireQueued方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

在队列的尾部添加了一个独占的Node,进行锁的排队,在排队之前如果他的上一个Node是head,再次尝试请求能否获取到独占。如果不能进入下面流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

head的waitStatus默认为0,会进入else逻辑,修改head的waitStatusSIGNAL并返回false,则再次循环后下次进入会返回true,同样会走到parkAndCheckInterrupt方法:

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

调用LockSupport.park让线程阻塞。等待unpark调用,后续线程会重复这个过程。

此时我们第一个线程使用完之后,调用unlock,流程进入到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

tryRelease中把status减去原有的值,如果当前线程多次进入lock,则当所有方法调用完成后,如果state变为0,则进入unparkSuccessor操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

会调用LockSupport.unpark方法,将之前在acquireQueued阻塞第一个线程唤起,获取锁后执行后续逻辑。

至此一个独占锁的获取释放流程就完成了,这是非公平的锁机制。

如果是公平流程大致相同,不同的是它不像非公平首先先尝试获取锁,而是先看是否有别的线程排队,如果没有则获取锁,如果有线程排队则,请求锁的线程也进入队列等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

两种模式的主要区别还是,在非公平模式下会先尝试获取获取锁,如果未获取到再进入队列。公平锁如果线程获取锁时,如果存在队列则必须进行排队,保证公平性。

坚持原创技术分享,更多深度分析、实践代码,您的支持将鼓励我继续创作!