高级主题

显式锁

在Java 5.0之前,在协调对共享对象的访问时可以使用的机制只有synchronized和volatile。Java 5.0增加了一种新的机制:ReentrantLock。与之前提到过的机制相反,ReentrantLock并不是一种替代内置加锁的方法,而是当内置加锁机制不适用时,作为一种可选择的高级功能。

LockReentrantLock

Lock接口中定义了一组抽象的加锁操作。与内置加锁机制不同的是,Lock提供了一种无条件的、可轮询的、定时的以及可中断的锁获取操作,所有加锁和解锁的方法都是显式的。在Lock的实现中必须提供与内部锁相同的内存可见性语义,但在加锁语义、调度算法、顺序保证以及性能特性等方面可以有所不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Lock {

void lock();

void lockInterruptibly() throws InterruptedException;

boolean tryLock();

boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

void unlock();

Condition newCondition();
}

ReentrantLock实现了Lock接口,并提供了与synchronized相同的互斥性和内存可见性。此外,与synchronized一样,ReentrantLock还提供了可重入的加锁语义。ReentrantLock支持在Lock接口中定义的所有获取锁模式,并且与synchronized相比,它还为处理锁的不可用性问题提供了更高的灵活性。

内置锁在功能上存在局限性:例如,无法中断一个正在等待获取锁的线程,或者无法在请求获取一个锁时无限地等待下去,但却无法实现非阻塞结构的加锁规则。

下面的例子是Lock接口的标准使用形式。这种形式比使用内置锁复杂一些:必须在finally块中释放锁。否则,如果在被保护的代码中抛出了异常,那么这个锁永远都无法释放。当使用加锁时,还必须考虑在try块中抛出异常的情况,如果可能使对象处于某种不一致的状态,那么就需要更多的try-catch或try-finally代码块。(当使用某种形式的加锁时,包括内置锁,都应该考虑在出现异常时的情况。)

1
2
3
4
5
6
7
8
Lock lock=new ReentrantLock();
lock.lock();
try{
//更新对象状态
//捕获异常,并在必要时恢复不变性条件
}finally{
lock.unlock();
}

轮询锁与定时锁

可定时的与可轮询的锁获取模式是由tryLock方法实现的,与无条件的锁获取模式相比,它具有更完善的错误恢复机制。善的错误恢复机制。在内置锁中,死锁是一个严重的问题,恢复程序的唯一方法是重新启动程序,而防止死锁的唯一方法就是在构造程序时避免出现不一致的锁顺序。可定时的与可轮询的锁提供了另一种选择:避免死锁的发生。

如果不能获得所有需要的锁,那么可以使用可定时的或可轮询的锁获取方式,从而使你重新获得控制权,它会释放已经获得的锁,然后重新尝试获取所有锁(或者至少会将这个失败记录到日志,并采取其他措施)。

示例 DeadlockAvoidance 通过tryLock来避免锁顺序死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class DeadlockAvoidance {

private static Random rnd = new Random();

public boolean transferMoney(
Account fromAcct, Account toAcct, DollarAmount amount, long timeout, TimeUnit unit)
throws InsufficientFundsException, InterruptedException {
long fixedDelay = getFixedDelayComponentNanos(timeout, unit);
long randMod = getRandomDelayModulusNanos(timeout, unit);
long stopTime = System.nanoTime() + unit.toNanos(timeout);

while (true) {
if (fromAcct.lock.tryLock()) {
try {
if (toAcct.lock.tryLock()) {
try {
if (fromAcct.getBalance().compareTo(amount) < 0) {
throw new InsufficientFundsException();
} else {
fromAcct.debit(amount);
toAcct.credit(amount);
return true;
}
} finally {
toAcct.lock.unlock();
}
}
} finally {
fromAcct.lock.unlock();
}
}
if (System.nanoTime() < stopTime) {
return false;
}
NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);
}
}

private static final int DELAY_FIXED = 1;
private static final int DELAY_RANDOM = 2;

static long getFixedDelayComponentNanos(long timeout, TimeUnit unit) {
return DELAY_FIXED;
}

static long getRandomDelayModulusNanos(long timeout, TimeUnit unit) {
return DELAY_RANDOM;
}

static class DollarAmount implements Comparable<DollarAmount> {
@Override
public int compareTo(DollarAmount other) {
return 0;
}

DollarAmount(int dollars) {}
}

class Account {
public Lock lock;

void debit(DollarAmount d) {}

void credit(DollarAmount d) {}

DollarAmount getBalance() {
return null;
}
}

class InsufficientFundsException extends Exception {}
}

在实现具有时间限制的操作时,定时锁同样非常有用。当在带有时间限制的操作中调用了一个阻塞方法时,它能根据剩余时间来提供一个时限。如果操作不能在指定的时间内给出结果,那么就会使程序提前结束。当使用内置锁时,在开始请求锁后,这个操作将无法取消,因此内置锁很难实现带有时间限制的操作。

示例 TimedLocking 演示了带有时间限制的加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TimedLocking {

private Lock lock = new ReentrantLock();

public boolean trySendOnSharedLine(String message, long timeout, TimeUnit unit)
throws InterruptedException {
long nanosToLock = unit.toNanos(timeout) - estimatedNanosToSend(message);
if (!lock.tryLock(nanosToLock, NANOSECONDS)) {
return false;
}
try {
return sendOnSharedLine(message);
} finally {
lock.unlock();
}
}

private boolean sendOnSharedLine(String message) {
/* send something */
return true;
}

long estimatedNanosToSend(String message) {
return message.length();
}
}

可中断的锁获取操作

正如定时的锁获取操作能在带有时间限制的操作中使用独占锁,可中断的锁获取操作同样能在可取消的操作中使用加锁。lockInterruptibly方法能够在获得锁的同时保持对中断的响应,并且由于它包含在Lock中,因此无须创建其他类型的不可中断阻塞机制。

可中断的锁获取操作的标准结构比普通的锁获取操作略微复杂一些,因为需要两个try块。(如果在可中断的锁获取操作中抛出了InterruptedException,那么可以使用标准的try-finally加锁模式。)定时的tryLock同样能响应中断,因此当需要实现一个定时的和可中断的锁获取操作时,可以使用tryLock方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class InterruptibleLocking {

private Lock lock = new ReentrantLock();

public boolean trySendOnSharedLine(String message) throws InterruptedException {
lock.lockInterruptibly();
try {
return cancellableSendOnSharedLine(message);
} finally {
lock.unlock();
}
}

private boolean cancellableSendOnSharedLine(String message) throws InterruptedException {
/* send something */
return true;
}
}

非块结构的加锁

在内置锁中,锁的获取和释放等操作都是基于代码块的——释放锁的操作总是与获取锁的操作处于同一个代码块,而不考虑控制权如何退出该代码块。自动的锁释放操作简化了对程序的分析,避免了可能的编码错误,但有时侯需要更灵活的加锁规则。

锁分段技术在基于散列的容器中实现了不同的散列链,以便使用不同的锁。我们可以通过采用类似的原则来降低链表中锁的粒度,即为每个链表节点使用一个独立的锁,使不同的线程能独立地对链表的不同部分进行操作。每个节点的锁将保护链接指针以及在该节点中存储的数据,因此当遍历或修改链表时,我们必须持有该节点上的这个锁,直到获得了下一个节点的锁,只有这样,才能释放前一个节点上的锁。在[CPJ 2.5.1.4]中介绍了使用这项技术的一个示例,并称之为连锁式加锁(Hand-Over-Hand Locking)或者锁耦合(Lock Coupling)。

性能考虑因素

当把ReentrantLock添加到Java 5.0时,它能比内置锁提供更好的竞争性能。

在Java 5.0中,当从单线程(无竞争)变化到多线程时,内置锁的性能将急剧下降,而ReentrantLock的性能下降则更为平缓,因而它具有更好的可伸缩性。但在Java 6中,情况就完全不同了,内置锁的性能不会由于竞争而急剧下降,并且两者的可伸缩性也基本相当。

性能和可伸缩性对于具体平台等因素都较为敏感,例如CPU、处理器数量、缓存大小以及JVM特性等,所有这些因素都可能会随着时间而发生变化。

公平性

在ReentrantLock的构造函数中提供了两种公平性选择:创建一个非公平的锁(默认)或者一个公平的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}

/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

在公平的锁上,线程将按照它们发出请求的顺序来获得锁,但在非公平的锁上,则允许“插队”:当一个线程请求非公平的锁时,如果在发出请求的同时该锁的状态变为可用,那么这个线程将跳过队列中所有的等待线程并获得这个锁。非公平的ReentrantLock并不提倡“插队”行为,但无法防止某个线程在合适的时候进行“插队”。在公平的锁中,如果有另一个线程持有这个锁或者有其他线程在队列中等待这个锁,那么新发出请求的线程将被放入队列中。在非公平的锁中,只有当锁被某个线程持有时,新发出请求的线程才会被放入队列中。即使对于公平锁而言,可轮询的tryLock仍然会“插队”。

在激烈竞争的情况下,非公平锁的性能高于公平锁的性能的一个原因是:在恢复一个被挂起的线程与该线程真正开始运行之间存在着严重的延迟。假设线程A持有一个锁,并且线程B请求这个锁。由于这个锁已被线程A持有,因此B将被挂起。当A释放锁时,B将被唤醒,因此会再次尝试获取锁。与此同时,如果C也请求这个锁,那么C很可能会在B被完全唤醒之前获得、使用以及释放这个锁。这样的情况是一种“双赢”的局面:B获得锁的时刻并没有推迟,C更早地获得了锁,并且吞吐量也获得了提高。

当持有锁的时间相对较长,或者请求锁的平均时间间隔较长,那么应该使用公平锁。在这些情况下,“插队”带来的吞吐量提升(当锁处于可用状态时,线程却还处于被唤醒的过程中)则可能不会出现。

内置加锁并不会提供确定的公平性保证,但在大多数情况下,在锁实现上实现统计上的公平性保证已经足够了。Java语言规范并没有要求JVM以公平的方式来实现内置锁,而在各种JVM中也没有这样做。ReentrantLock并没有进一步降低锁的公平性,而只是使一些已经存在的内容更明显。

synchronizedReentrantLock之间进行选择

ReentrantLock在加锁和内存上提供的语义与与内置锁相同,此外它还提供了一些其他功能,包括定时的锁等待、可中断的锁等待、公平性,以及实现非块结构的加锁。ReentrantLock在性能上似乎优于内置锁,其中在Java 6中略有胜出,而在Java 5.0中则是远远胜出。那么为什么不放弃synchronized,并在所有新的并发代码中都使用ReentrantLock?

与显式锁相比,内置锁仍然具有很大的优势:

  • 内置锁为许多开发人员所熟悉
  • 简洁紧凑
  • 在许多现有的程序中都已经使用了内置锁
  • ReentrantLock的危险性比同步机制要高(finally块中调用unlock)
  • 未来更可能会提升synchronized而不是ReentrantLock的性能,因为synchronizedJVM的内置属性,它能执行一些优化,例如对线程封闭的锁对象的锁消除优化,通过增加锁的粒度来消除内置锁的同步。

仅当内置锁不能满足需求时,才可以考虑使用ReentrantLock。

在一些内置锁无法满足需求的情况下,ReentrantLock可以作为一种高级工具,当需要一些高级功能时才应该使用ReentrantLock:

  • 可定时的、可轮询的与可中断的锁获取操作
  • 公平队列
  • 非块结构的锁

在Java 5.0中,内置锁与ReentrantLock相比还有另一个优点:在线程转储中能给出在哪些调用帧中获得了哪些锁,并能够检测和识别发生死锁的线程。JVM并不知道哪些线程持有ReentrantLock,因此在调试使用ReentrantLock的线程的问题时,将起不到帮助作用。

Java 6解决了这个问题,它提供了一个管理和调试接口,锁可以通过该接口进行注册,从而与ReentrantLocks相关的加锁信息就能出现在线程转储中,并通过其他的管理接口和调试接口来访问。与synchronized相比,这些调试消息是一种重要的优势,即便它们大部分都是临时性消息,线程转储中的加锁能给很多程序员带来帮助。ReentrantLock的非块结构特性仍然意味着,获取锁的操作不能与特定的栈帧关联起来,而内置锁却可以。

读-写锁

ReentrantLock实现了一种标准的互斥锁:每次最多只有一个线程能持有ReentrantLock。互斥是一种保守的加锁策略,虽然可以避免“写/写”冲突和“写/读”冲突,但同样也避免了“读/读”冲突。

在许多情况下,数据结构上的操作都是“读操作”——虽然它们也是可变的并且在某些情况下被修改,但其中大多数访问操作都是读操作。此时,如果能够放宽加锁需求,允许多个执行读操作的线程同时访问数据结构,那么将提升程序的性能。只要每个线程都能确保读取到最新的数据,并且在读取数据时不会有其他的线程修改数据,那么就不会发生问题。在这种情况下就可以使用读/写锁:一个资源可以被多个读操作访问,或者被一个写操作访问,但两者不能同时进行。

JUC 提供了 ReadWriteLock 接口:

ReadWriteLock中暴露了两个Lock对象,其中一个用于读操作,而另一个用于写操作。要读取由ReadWriteLock保护的数据,必须首先获得读取锁,当需要修改ReadWriteLock保护的数据时,必须首先获得写入锁。尽管这两个锁看上去是彼此独立的,但读取锁和写入锁只是读-写锁对象的不同视图。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();

/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}

在读-写锁实现的加锁策略中,允许多个读操作同时进行,但每次只允许一个写操作。与Lock一样,ReadWriteLock可以采用多种不同的实现方式,这些方式在性能、调度保证、获取优先性、公平性以及加锁语义等方面可能有所不同。

读-写锁是一种性能优化措施,在一些特定的情况下能实现更高的并发性。在实际情况中,对于在多处理器系统上被频繁读取的数据结构,读-写锁能够提高性能。而在其他情况下,读-写锁的性能比独占锁的性能要略差一些,这是因为它们的复杂性更高。

在读取锁和写入锁之间的交互可以采用多种实现方式。ReadWriteLock中的一些可选实现包括:

  • 释放优先。当一个写入操作释放写入锁时,并且队列中同时存在读线程和写线程,那么应该优先选择读线程,写线程,还是最先发出请求的线程?

  • 读线程插队。如果锁是由读线程持有,但有写线程正在等待,那么新到达的读线程能否立即获得访问权,还是应该在写线程后面等待?如果允许读线程插队到写线程之前,那么将提高并发性,但却可能造成写线程发生饥饿问题。

  • 重入性。读取锁和写入锁是否是可重入的?

  • 降级。如果一个线程持有写入锁,那么它能否在不释放该锁的情况下获得读取锁?这可能会使得写入锁被“降级”为读取锁,同时不允许其他写线程修改被保护的资源。

  • 升级。读取锁能否优先于其他正在等待的读线程和写线程而升级为一个写入锁?在大多数的读-写锁实现中并不支持升级,因为如果没有显式的升级操作,那么很容易造成死锁。(如果两个读线程试图同时升级为写入锁,那么二者都不会释放读取锁。)

ReentrantReadWriteLock为这两种锁都提供了可重入的加锁语义。与ReentrantLock类似,ReentrantReadWriteLock在构造时也可以选择是一个非公平的锁(默认)还是一个公平的锁。在公平的锁中,等待时间最长的线程将优先获得锁。如果这个锁由读线程持有,而另一个线程请求写入锁,那么其他读线程都不能获得读取锁,直到写线程使用完并且释放了写入锁。在非公平的锁中,线程获得访问许可的顺序是不确定的。写线程降级为读线程是可以的,但从读线程升级为写线程则是不可以的(这样做会导致死锁)。

ReentrantLock类似的是,ReentrantReadWriteLock中的写入锁只能有唯一的所有者,并且只能由获得该锁的线程来释放。在Java 5.0中,读取锁的行为更类似于一个Semaphore而不是锁,它只维护活跃的读线程的数量,而不考虑它们的标识。在Java 6中修改了这个行为:记录哪些线程已经获得了读者锁。

当锁的持有时间较长并且大部分操作都不会修改被守护的资源时,那么读-写锁能提高并发性。ReadWriteMap中使用了ReentrantReadWriteLock来包装Map,从而使它能在多个读线程之间被安全地共享,并且仍然能避免“读-写”或“写-写”冲突。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ReadWriteMap<K, V> {

private final Map<K, V> map;

private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();

public ReadWriteMap(Map<K, V> map) {
this.map = map;
}

public V put(K k, V v) {
writeLock.lock();
try {
return map.put(k, v);
} finally {
writeLock.unlock();
}
}

public V get(K k) {
readLock.lock();
try {
return map.get(k);
} finally {
readLock.unlock();
}
}
}

在现实中,ConcurrentHashMap的性能已经很好了,因此如果只需要一个并发的基于散列的映射,那么就可以使用ConcurrentHashMap来代替这种方法,但如果需要对另一种Map实现(例如LinkedHashMap)提供并发性更高的访问,那么可以使用这项技术。

小结

与内置锁相比,显式的Lock提供了一些扩展功能,在处理锁的不可用性方面有着更高的灵活性,并且对队列行有着更好的控制。但ReentrantLock不能完全替代synchronized,只有在synchronized无法满足需求时,才应该使用它。

读-写锁允许多个读线程并发地访问被保护的对象,当访问以读取操作为主的数据结构时,它能提高程序的可伸缩性。

构建自定义的同步工具

创建状态依赖类的最简单方法通常是在类库中现有状态依赖类的基础上进行构造。但如果类库没有提供你需要的功能,那么还可以使用Java语言和类库提供的底层机制来构造自己的同步机制,包括内置的条件队列显式的Condition对象以及**AbstractQueuedSynchronizer框架**。

本章将介绍实现状态依赖性的各种选择,以及在使用平台提供的状态依赖性机制时需要遵守的各项规则。

状态依赖性的管理

在单线程程序中调用一个方法时,如果某个基于状态的前提条件未得到满足(例如“连接池必须非空”),那么这个条件将永远无法成真。因此,在编写顺序程序中的类时,要使得这些类在它们的前提条件未被满足时就失败。于并发对象上依赖状态的方法,虽然有时候在前提条件不满足的情况下不会失败,但通常有一种更好的选择,即等待前提条件变为真。

依赖状态的操作可以一直阻塞直到可以继续执行,这比使它们先失败再实现起来要更为方便且更不易出错。

内置的条件队列可以使线程一直阻塞,直到对象进入某个进程可以继续执行的状态,并且当被阻塞的线程可以执行时再唤醒它们。为了突出高效的条件等待机制的价值,我们将首先介绍如何通过轮询与休眠等方式来(勉强地)解决状态依赖性问题。

可阻塞的状态依赖操作的形式如下例所示。这种加锁模式有些不同寻常,因为锁是在操作的执行过程中被释放与重新获取的。构成前提条件的状态变量必须由对象的锁来保护,从而使它们在测试前提条件的同时保持不变。如果前提条件尚未满足,就必须释放锁,以便其他线程可以修改对象的状态,否则,前提条件就永远无法变成真。在再次测试前提条件之前,必须重新获得锁。

1
2
3
4
5
6
7
8
9
acquire lock on object state
while (precondition does not hold) {
release lock
wait until precondition might hold
optionally fail if interrupted or timeout expires
reqcquire lock
}
perform action
release lock

在生产者-消费者的设计中经常会使用像ArrayBlockingQueue这样的有界缓存。在有界缓存提供的put和take操作中都包含有一个前提条件:不能从空缓存中获取元素,也不能将元素放入已满的缓存中。当前提条件未满足时,依赖状态的操作可以抛出一个异常或返回一个错误状态(使其成为调用者的一个问题),也可以保持阻塞直到对象进入正确的状态。

接下来介绍有界缓存的几种实现,其中将采用不同的方法来处理前提条件失败的问题。在每种实现中都扩展下面例子中的BaseBoundedBuffer,在这个类中实现了一个基于数组的循环缓存,其中各个缓存状态变量(buf、head、tail和count)均由缓存的内置锁来保护。它还提供了同步的doPut和doTake方法,并在子类中通过这些方法来实现put和take操作,底层的状态将对子类隐藏。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@ThreadSafe
public abstract class BaseBoundedBuffer<V> {

@GuardedBy("this")
private final V[] buf;

@GuardedBy("this")
private int tail;

@GuardedBy("this")
private int head;

@GuardedBy("this")
private int count;

@SuppressWarnings("unchecked")
protected BaseBoundedBuffer(int capacity) {
buf = (V[]) new Object[capacity];
}

protected final synchronized void doPut(V v) {
buf[tail] = v;
if (++tail == buf.length) {
tail = 0;
}
++count;
}

protected final synchronized V doTake() {
V v = buf[head];
buf[head] = null;
if (++head == buf.length) {
head = 0;
}
--count;
return v;
}

public final synchronized boolean isFull() {
return count == buf.length;
}

public final synchronized boolean isEmpty() {
return count == 0;
}
}

示例:将前提条件的失败传递给调用者

GrumpyBoundedBuffer是第一个简单的有界缓存实现。put和take方法都进行了同步以确保实现对缓存状态的独占访问,因为这两个方法在访问缓存时都采用“先检查再运行”的逻辑策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//当不满足前提条件时,有界缓存不会执行相应的操作
public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> {

protected GrumpyBoundedBuffer(int capacity) {
super(capacity);
}

public synchronized void put(V v) {
if (isFull()) {
throw new BufferFullException();
}
doPut(v);
}

public synchronized V take() {
if (isEmpty()) {
throw new BufferEmptyException();
}
return doTake();
}
}

尽管这种方法实现起来很简单,但使用起来却并非如此。异常应该用于发生异常条件的情况中[EJ Item 39]。“

缓存已满”并不是有界缓存的一个异常条件,就像“红灯”并不表示交通信号灯出现了异常。在实现缓存时得到的简化(使调用者管理状态依赖性)并不能抵消在使用时存在的复杂性,因为现在调用者必须做好捕获异常的准备,并且在每次缓存操作时都需要重试。这种方法的一种变化形式是,当缓存处于某种错误的状态时返回一个错误值。这是一种改进,因为并没有放弃异常机制,抛出的异常意味着“对不起,请再试一次”,但这种方法并没有解决根本问题:调用者必须自行处理前提条件失败的情况。

下面的例子给出了 take 的调用:

1
2
3
4
5
6
7
8
9
while (true){
try {
V item = buffer.take()
// 对 item 执行一些操作
break;
} catch (BufferEmptyException e) {
Thread.sleep(SLEEP_GRAUNLARITY)
}
}

上例中的客户代码不是实现重试的唯一方式。调用者可以不进入休眠状态,而直接重新调用take方法,这种方法被称为忙等待或自旋等待。如果缓存的状态在很长一段时间内都不会发生变化,那么使用这种方法就会消耗大量的CPU时间。但是,调用者也可以进入休眠状态来避免消耗过多的CPU时间,但如果缓存的状态在刚调用完sleep就立即发生变化,那么将不必要地休眠一段时间。因此,客户代码必须要在二者之间进行选择:要么容忍自旋导致的CPU时钟周期浪费,要么容忍由于休眠而导致的低响应性。(除了忙等待与休眠之外,还有一种选择就是调用Thread.yield,这相当于给调度器一个提示:现在需要让出一定的时间使另一个线程运行。假如正在等待另一个线程执行工作,那么如果选择让出处理器而不是消耗完整个CPU调度时间片,那么可以使整体的执行过程变快。)

示例:通过轮询与休眠来实现简单的阻塞性

SleepyBoundedBuffer尝试通过put和take方法来实现一种简单的“轮询与休眠”重试机制,从而使调用者无须在每次调用时都实现重试逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> {

private static final long SLEEP_GRANULARITY = 1000;

public SleepyBoundedBuffer(int capacity) {
super(capacity);
}

public void put(V v) throws InterruptedException {
while (true) {
synchronized (this) {
if (!isFull()) {
doPut(v);
return;
}
}
Thread.sleep(SLEEP_GRANULARITY);
}
}

public V take() throws InterruptedException {
while (true) {
synchronized (this) {
if (!isEmpty()) {
return doTake();
}
}
Thread.sleep(SLEEP_GRANULARITY);
}
}
}

如果缓存为空,那么take将休眠并直到另一个线程在缓存中放入一些数据;如果缓存是满的,那么put将休眠并直到另一个线程从缓存中移除一些数据,以便有空间容纳新的数据。这种方法将前提条件的管理操作封装起来,并简化了对缓存的使用——这正是朝着正确的改进方向迈出了一步。

SleepyBoundedBuffer的实现远比之前的实现复杂。缓存代码必须在持有缓存锁的时候才能测试相应的状态条件,因为表示状态条件的变量是由缓存锁保护的。如果测试失败,那么当前执行的线程将首先释放锁并休眠一段时间,从而使其他线程能够访问缓存。(通常,如果线程在休眠或者被阻塞时持有一个锁,那么这通常是一种不好的做法,因为只要线程不释放这个锁,有些条件(缓存为满/空)就永远无法为真。)当线程醒来时,它将重新请求锁并再次尝试执行操作,因而线程将反复地在休眠以及测试状态条件等过程之间进行切换,直到可以执行操作为止。

这种通过轮询与休眠来实现阻塞操作的过程需要付出大量的努力。如果存在某种挂起线程的方法,并且这种方法能够确保当某个条件成真时线程立即醒来,那么将极大地简化实现工作。这正是条件队列实现的功能。

条件队列

“条件队列”这个名字来源于:它使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真。传统队列的元素是一个个数据,而与之不同的是,条件队列中的元素是一个个正在等待相关条件的线程。

正如每个Java对象都可以作为一个锁,每个对象同样可以作为一个条件队列,并且Object中的wait、notify和notifyAll方法就构成了内部条件队列的API。对象的内置锁与其内部条件队列是相互关联的,要调用对象X中条件队列的任何一个方法,必须持有对象X上的锁。这是因为“等待由状态构成的条件”与“维护状态一致性”这两种机制必须被紧密地绑定在一起:只有能对状态进行检查时,才能在某个条件上等待,并且只有能修改状态时,才能从条件等待中释放另一个线程。

Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并修改对象的状态。当被挂起的线程醒来时,它将在返回之前重新获取锁。

BoundedBuffer中使用了wait和notifyAll来实现一个有界缓存。这比使用“休眠”的有界缓存更简单,并且更高效(当缓存状态没有发生变化时,线程醒来的次数将更少),响应性也更高(当发生特定状态变化时将立即醒来)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {

protected BoundedBuffer(int capacity) {
super(capacity);
}

public synchronized void put(V v) throws InterruptedException {
while (isFull()) {
wait();
}
doPut(v);
notifyAll();
}

public synchronized V take() throws InterruptedException {
while (isEmpty()) {
wait();
}
V v = doTake();
notifyAll();
return v;
}
}

与使用“休眠”的有界缓存相比,条件队列并没有改变原来的语义。它只是在多个方面进行了优化:CPU效率、上下文切换开销和响应性等。如果某个功能无法通过“轮询和休眠”来实现,那么使用条件队列也无法实现,(这并非完全正确;一个公平的条件队列可以确保线程按照顺序从等待集合中释放。与内置锁相同,内置条件队列并不提供公平的排队操作,而在显式的Condition却可以提供公平或非公平的排队操作。)但条件队列使得在表达和管理状态依赖性时更加简单和高效。

使用条件队列

条件队列使构建高效以及高可响应性的状态依赖类变得更容易,但同时也很容易被不正确地使用。虽然许多规则都能确保正确地使用条件队列,但在编译器或系统平台上却并没有强制要求遵循这些规则。(这也是为什么要尽量基于LinkedBlockingQueue、Latch、Semaphore和FutureTask等类来构造程序的原因之一,如果能避免使用条件队列,那么实现起来将容易许多。)

条件谓词

要想正确地使用条件队列,关键是找出对象在哪个条件谓词上等待。

条件谓词将在等待与通知等过程中导致许多困惑,因为在API中没有对条件谓词进行实例化的方法,并且在Java语言规范或JVM实现中也没有任何信息可以确保正确地使用它们。事实上,在Java语言规范或Javadoc中根本就没有直接提到它。但如果没有条件谓词,条件等待机制将无法发挥作用。

条件谓词是使某个操作成为状态依赖操作的前提条件。

在有界缓存中,只有当缓存不为空时,take方法才能执行,否则必须等待。对take方法来说,它的条件谓词就是“缓存不为空”,take方法在执行之前必须首先测试该条件谓词。同样,put方法的条件谓词是“缓存不满”。

条件谓词是由类中各个状态变量构成的表达式。

BaseBoundedBuffer在测试“缓存不为空”时将把count与0进行比较,在测试“缓存不满”时将把count与缓存的大小进行比较。

在条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词。在条件谓词中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象与条件队列对象(即调用wait和notify等方法所在的对象)必须是同一个对象。

如果条件谓词不为真(缓存为空),那么take必须等待并直到另一个线程在缓存中放入一个对象。take将在缓存的内置条件队列上调用wait方法,这需要持有条件队列对象上的锁。这是一种谨慎的设计,因为take方法已经持有在测试条件谓词时(并且如果条件谓词为真,那么在同一个原子操作中修改缓存的状态)需要的锁。wait方法将释放锁,阻塞当前线程,并等待直到超时,然后线程被中断或者通过一个通知被唤醒。在唤醒进程后,wait在返回前还要重新获取锁。当线程从wait方法中被唤醒时,它在重新请求锁时不具有任何特殊的优先级,而要与任何其他尝试进入同步代码块的线程一起正常地在锁上进行竞争。

每一次wait调用都会隐式地与特定的条件谓词关联起来。当调用某个特定条件谓词的wait时,调用者必须已经持有与条件队列相关的锁,并且这个锁必须保护着构成条件谓词的状态变量。

过早唤醒

虽然在锁、条件谓词和条件队列之间的三元关系并不复杂,但wait方法的返回并不一定意味着线程正在等待的条件谓词已经变成真了。

内置条件队列可以与多个条件谓词一起使用。当一个线程由于调用notifyAll而醒来时,并不意味该线程正在等待的条件谓词已经变成真了。另外,wait方法还可以“假装”返回,而不是由于某个线程调用了notify。

当执行控制重新进入调用wait的代码时,它已经重新获取了与条件队列相关联的锁。现在条件谓词是不是已经变为真了?或许。在发出通知的线程调用notifyAll时,条件谓词可能已经变成真,但在重新获取锁时将再次变为假。在线程被唤醒到wait重新获取锁的这段时间里,可能有其他线程已经获取了这个锁,并修改了对象的状志。或者,条件谓词从调用wait起根本就没有变成真。你并不知道另一个线程为什么调用notify或notifyAll,也许是因为与同一条件队列相关的另一个条件谓词变成了真。

基于所有这些原因,每当线程从wait中唤醒时,都必须再次测试条件谓词,如果条件谓词不为真,那么就继续等待(或者失败)。由于线程在条件谓词不为真的情况下也可以反复地醒来,因此必须在一个循环中调用wait,并在每次迭代中都测试条件谓词。

下个例子给出了条件等待的标准形式:

1
2
3
4
5
6
7
8
9
void stateDependentMethod() throws InterruptedException{
//必须通过一个锁来保护条件谓词
synchronized(lock){
while(!conditionPredicate() ){
lock.wait();
}
//现在对象处于合适的状态
}
}

当使用条件等待时(例如Object.wait或Condition.await):

  • 通常都有一个条件谓词——包括一些对象状态的测试,线程在执行前必须首先通过这些测试。
  • 在调用wait之前测试条件谓词,并且从wait中返回时再次进行测试。
  • 在一个循环中调用 wait
  • 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量。
  • 当调用wait、notify或notifyAll等方法时,一定要持有与条件队列相关的锁。
  • 在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁。

丢失的信号

丢失的信号是指:线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词。现在,线程将等待一个已经发过的事件。

好比在启动了烤面包机后出去拿报纸,当你还在屋外时烤面包机的铃声响了,但你没有听到,因此还会坐在厨房的桌子前等着烤面包机的铃声。你可能会等待很长的时间。为了摆脱等待,其他人也不得不开始烤面包,从而使情况变得更糟,当铃声响起时,还要与别人争论这个面包是属于谁的。

如果线程A通知了一个条件队列,而线程B随后在这个条件队列上等待,那么线程B将不会立即醒来,而是需要另一个通知来唤醒它。

通知

每当在等待一个条件时,一定要确保在条件谓词变为真时通过某种方式发出通知。

在条件队列API中有两个发出通知的方法,即notify和notifyAll。无论调用哪一个,都必须持有与条件队列对象相关联的锁。在调用notify时,JVM会从这个条件队列上等待的多个线程中选择一个来唤醒,而调用notifyAll则会唤醒所有在这个条件队列上等待的线程。由于在调用notify或notifyAll时必须持有条件队列对象的锁,而如果这些等待中线程此时不能重新获得锁,那么无法从wait返回,因此发出通知的线程应该尽快地释放锁,从而确保正在等待的线程尽可能快地解除阻塞。

由于多个线程可以基于不同的条件谓词在同一个条件队列上等待,因此如果使用notify而不是notifyAll,那么将是一种危险的操作,因为单一的通知很容易导致类似于信号丢失的问题。

假设线程A在条件队列上等待条件谓词PA,同时线程B在同一个条件队列上等待条件谓词PB。现在,假设PB变成真,并且线程C执行一个notify:JVM将从它拥有的众多线程中选择一个并唤醒。如果选择了线程A,那么它被唤醒,并且看到PA尚未变成真,因此将继续等待。同时,线程B本可以开始执行,却没有被唤醒。这并不是严格意义上的“丢失信号”,而更像一种“被劫持的”信号,但导致的问题是相同的:线程正在等待一个已经(或者本应该)发生过的信号。

只有同时满足以下两个条件时,才能用单一的notify而不是notifyAll:

  • 所有等待线程的类型都相同。只有一个条件谓词与条件队列相关,并且每个线程在从wait返回后将执行相同的操作。
  • 单进单出。在条件变量上的每次通知,最多只能唤醒一个线程来执行。

由于大多数类并不满足这些需求,因此普遍认可的做法是优先使用notifyAll而不是notify。虽然notifyAll可能比notify更低效,但却更容易确保类的行为是正确的。

在BoundedBuffer的put和take方法中采用的通知机制是保守的:每当将一个对象放入缓存或者从缓存中移走一个对象时,就执行一次通知。我们可以对其进行优化:首先,仅当缓存从空变为非空,或者从满转为非满时,才需要释放一个线程。并且,仅当put或take影响到这些状态转换时,才发出通知。这也被称为“条件通知(Conditional Notification)。虽然“条件通知”可以提升性能,但却很难正确地实现(而且还会使子类的实现变得复杂),因此在使用时应该谨慎。下面的示例使用了条件通知:

1
2
3
4
5
6
7
8
9
10
public synchronized void put(V v)throws InterruptedException{
while(isFull()){
wait();
}
boolean wasEmpty=isEmpty();
doPut(v);
if(wasEmpty){
notifyAll();
}
}

单次通知和条件通知都属于优化措施。

通常,在使用这些优化措施时,应该遵循“首选使程序正确地执行,然后才使其运行得更快”这个原则。如果不正确地使用这些优化措施,那么很容易在程序中引入奇怪的活跃性故障。

示例:阀门类

通过使用条件等待,可以很容易地开发一个可重新关闭的ThreadGate类,如程序所示。程序清单14-9所示。ThreadGate可以打开和关闭阀门,并提供一个await方法,该方法能一直阻塞直到阀门被打开。在open方法中使用了notifyAll,这是因为这个类的语义不满足单次通知的“单进单出”测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@ThreadSafe
public class ThreadGate {

@GuardedBy("this")
private boolean isOpen;

@GuardedBy("this")
private int generation;

public synchronized void close() {
++generation;
isOpen = false;
notifyAll();
}

public synchronized void await() throws InterruptedException {
int arrivalGeneration = generation;
while (!isOpen && arrivalGeneration == generation) {
wait();
}
}
}

在await中使用的条件谓词比测试isOpen复杂得多。这种条件谓词是必需的,因为如果当阀门打开时有N个线程正在等待它,那么这些线程都应该被允许执行。然而,如果阀门在打开后又非常快速地关闭了,并且await方法只检查isOpen,那么所有线程都可能无法释放:当所有线程收到通知时,将重新请求锁并退出wait,而此时的阀门可能已经再次关闭了。因此,在ThreadGate中使用了一个更复杂的条件谓词:每次阀门关闭时,递增一个“Generation”计数器,如果阀门现在是打开的,或者阀门自从该线程到达后就一直是打开的,那么线程就可以通过await。

子类的安全问题

在使用条件通知或单次通知时,一些约束条件使得子类化过程变得更加复杂[CPJ 3.3.3.3]。要想支持子类化,那么在设计类时需要保证:如果在实施子类化时违背了条件通知或单次通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类。

对于状态依赖的类,要么将其等待和通知等协议完全向子类公开(并且写入正式文档),要么完全阻止子类参与到等待和通知等过程中。(这是对“要么围绕着继承来设计和文档化,要么禁止使用继承”这条规则的一种扩展[EJ Item15]。)

当设计一个可被继承的状态依赖类时,至少需要公开条件队列和锁,并且将条件谓词和同步策略都写入文档。此外,还可能需要公开一些底层的状态变量。另外一种选择就是完全禁止子类化,例如将类声明为final类型,或者将条件队列、锁和状态变量等隐藏起来,使子类看不见它们。

封装条件队列

通常,我们应该把条件队列封装起来,因而除了使用条件队列的类,就不能在其他地方访问它。否则,调用者会自以为理解了在等待和通知上使用的协议,并且采用一种违背设计的方式来使用条件队列。

不幸的是,这条建议——将条件队列对象封装起来,与线程安全类的最常见设计模式并不一致,在这种模式中建议使用对象的内置锁来保护对象自身的状态。

入口协议与出口协议

Wellings(Wellings,2004)通过“入口协议和出口协议(Entry and ExitProtocols)”来描述wait和notify方法的正确使用。对于每个依赖状态的操作,以及每个修改其他操作依赖状态的操作,都应该定义一个入口协议和出口协议。入口协议就是该操作的条件谓词,出口协议则包括,检查被该操作修改的所有状态变量,并确认它们是否使某个其他的条件谓词变为真,如果是,则通知相关的条件队列。

在AbstractQueuedSynchronizer(java.util.concurrent包中大多数依赖状态的类都是基于这个类构建的)中使用出口协议。这个类并不是由同步器类执行自己的通知,而是要求同步器方法返回一个值来表示该类的操作是否已经解除了一个或多个等待线程的阻塞。这种明确的API调用需求使得更难以“忘记”在某些状态转换发生时进行通知。

显式的 Condition 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Condition {

void await() throws InterruptedException;

void awaitUninterruptibly();

long awaitNanos(long nanosTimeout) throws InterruptedException;

boolean await(long time, TimeUnit unit) throws InterruptedException;

boolean awaitUntil(Date deadline) throws InterruptedException;

void signal();

void signalAll();
}

内置条件队列存在一些缺陷。每个内置锁都只能有一个相关联的条件队列,因而在像BoundedBuffer这种类中,多个线程可能在同一个条件队列上等待不同的条件谓词,并且在最常见的加锁模式下公开条件队列对象。这些因素都使得无法满足在使用notifyAll时所有等待线程为同一类型的需求。如果想编写一个带有多个条件谓词的并发对象,或者想获得除了条件队列可见性之外的更多控制权,就可以使用显式的Lock和Condition而不是内置锁和条件队列,这是一种更灵活的选择。

一个Condition和一个Lock关联在一起,就像一个条件队列和一个内置锁相关联一样。要创建一个Condition,可以在相关联的Lock上调用Lock.newCondition方法。正如Lock比内置加锁提供了更为丰富的功能,Condition同样比内置条件队列提供了更丰富的功能:在每个锁上可存在多个等待、条件等待可以是可中断的或不可中断的、基于时限的等待,以及公平的或非公平的队列操作。

与内置条件队列不同的是,对于每个Lock,可以有任意数量的Condition对象。Condition对象继承了相关的Lock对象的公平性,对于公平的锁,线程会依照FIFO顺序从Condition.await中释放。

特别注意:在Condition对象中,与wait、notify和notifyAll方法对应的分别是await、signal和signalAll。但是,Condition对Object进行了扩展,因而它也包含wait和notify方法。一定要确保使用正确的版本——await和signal。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@ThreadSafe
public class ConditionBoundedBuffer<T> {

protected final Lock lock = new ReentrantLock();

private final Condition notFull = lock.newCondition();

private final Condition notEmpty = lock.newCondition();

private final T[] items = (T[]) new Object[1024];

@GuardedBy("lock")
private int tail, head, count;

public void put(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[tail] = t;
if (++tail == items.length) {
tail = 0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}

public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
T t = items[head];
items[head] = null;
if (++head == items.length) {
head = 0;
}
--count;
notFull.signal();
return t;
} finally {
lock.unlock();
}
}
}

ConditionBoundedBuffer的行为和BoundedBuffer相同,但它对条件队列的使用方式更容易理解——在分析使用多个Condition的类时,比分析一个使用单一内部队列加多个条件谓词的类简单得多。通过将两个条件谓词分开并放到两个等待线程集中,Condition使其更容易满足单次通知的需求。signal比signalAll更高效,它能极大地减少在每次缓存操作中发生的上下文切换与锁请求的次数。

与内置锁和条件队列一样,当使用显式的Lock和Condition时,也必须满足锁、条件谓词和条件变量之间的三元关系。在条件谓词中包含的变量必须由Lock来保护,并且在检查条件谓词以及调用await和signal时,必须持有Lock对象。(ReentrantLock要求在调用signal或signalAll时应该持有Lock,但在Lock的具体实现中,在构造Condition时也可以不满足这个需求。)

在使用显式的Condition和内置条件队列之间进行选择时,与在ReentrantLock和synchronized之间进行选择是一样的:如果需要一些高级功能,例如使用公平的队列操作或者在每个锁上对应多个等待线程集,那么应该优先使用Condition而不是内置条件队列。(如果需要ReentrantLock的高级功能,并且已经使用了它,那么就已经做出了选择。)

Synchronizer剖析

在ReentrantLock和Semaphore这两个接口之间存在许多共同点。这两个类都可以用做一个“阀门”,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用lock或acquire时成功返回),也可以等待(在调用lock或acquire时阻塞),还可以取消(在调用tryLock或tryAcquire时返回“假”,表示在指定的时间内锁是不可用的或者无法获得许可)。而且,这两个接口都支持可中断的、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作。

事实上,它们在实现时都使用了一个共同的基类,即AbstractQueuedSynchronizer(AQS),这个类也是其他许多同步类的基类。AQS是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。不仅ReentrantLock和Semaphore是基于AQS构建的,还包括CountDownLatch、ReentrantReadWriteLock、SynchronousQueue和FutureTask。

AQS解决了在实现同步器时涉及的大量细节问题,例如等待线程采用FIFO队列操作顺序。在不同的同步器中还可以定义一些灵活的标准来判断某个线程是应该通过还是需要等待。

基于AQS来构建同步器能带来许多好处。它不仅能极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题(这是在没有使用AQS来构建同步器时的情况)。在基于AQS构建的同步器中,只可能在一个时刻发生阻塞,从而降低上下文切换的开销,并提高吞吐量。在设计AQS时充分考虑了可伸缩性,因此java.util.concurrent中所有基于AQS构建的同步器都能获得这个优势。

AbstractQueuedSynchronizer

在基于AQS构建的同步器类中,最基本的操作包括各种形式的获取操作和释放操作。

获取操作是一种依赖状态的操作,并且通常会阻塞。当使用锁或信号量时,“获取”操作的含义就很直观,即获取的是锁或者许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态。在使用CountDownLatch时,“获取”操作意味着“等待并直到闭锁到达结束状态”,而在使用FutureTask时,则意味着“等待并直到任务已经完成”。

释放”并不是一个可阻塞的操作,当执行“释放”操作时,所有在请求时被阻塞的线程都会开始执行。

如果一个类想成为状态依赖的类,那么它必须拥有一些状态。AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getState, setState以及compareAndSetState等protected类型方法来进行操作。这个整数可以用于表示任意状态。例如,ReentrantLock用它来表示所有者线程已经重复获取该锁的次数,Semaphore用它来表示剩余的许可数量,FutureTask用它来表示任务的状态(尚未开始、正在运行、已完成以及已取消)。在同步器类中还可以自行管理一些额外的状态变量,例如,ReentrantLock保存了锁的当前所有者的信息,这样就能区分某个获取操作是重入的还是竞争的。

下例给出了AQS中的获取操作与释放操作的形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
boolean acquire() throws InterruptedException{
while(当前状态不允许获取操作) {
if( 需要阻塞获取请求 ) {
如果当前线程不在队列中,则将其插入队列阻塞当前线程
} else {
返回失败
}
}
可能更新同步器的状态
如果线程位于队列中,则将其移出队列
返回成功
}

void release() {
更新同步器的状态
if( 新的状态允许某个被阻塞的线程获取成功 ) {
解除队列中一个或多个线程的阻塞状态
}
}

根据同步器的不同,获取操作可以是一种独占操作(例如ReentrantLock),也可以是一个非独占操作(例如Semaphore和CountDownLatch)。

一个获取操作包括两部分。首先,同步器判断当前状态是否允许获得操作,如果是,则允许线程执行,否则获取操作将阻塞或失败。这种判断是由同步器的语义决定的。例如,对于锁来说,如果它没有被某个线程持有,那么就能被成功地获取,而对于闭锁来说,如果它处于结束状态,那么也能被成功地获取。其次,就是更新同步器的状态,获取同步器的某个线程可能会对其他线程能否也获取该同步器造成影响。

如果某个同步器支持独占的获取操作,那么需要实现一些保护方法,包括tryAcquiretryReleaseisHeldExclusively等,而对于支持共享获取的同步器,则应该实现tryAcquireSharedtryReleaseShared等方法。AQS中的accuire、acquireShared、release和releaseShared等方法都将调用这些方法在子类中带有前缀try的版本来判断某个操作是否能执行。在同步器的子类中,可以根据其获取操作和释放操作的语义,使用getState、setState以及compareAndSetState来检查和更新状态,并通过返回的状态值来告知基类“获取”或“释放”同步器的操作是否成功。例如,如果tryAcquireShared返回一个负值,那么表示获取操作失败,返回零值表示同步器通过独占方式被获取,返回正值则表示同步器通过非独占方式被获取。对于tryRelease和tryReleaseShared方法来说,如果释放操作使得所有在获取同步器时被阻塞的线程恢复执行,那么这两个方法应该返回true。

为了使支持条件队列的锁(例如ReentrantLock)实现起来更简单,AQS还提供了一些机制来构造与同步器相关联的条件变量。

示例:一个简单的闭锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class OneShotLatch {

private final Sync sync = new Sync();

public void signal() {
sync.releaseShared(0);
}

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(0);
}

static class Sync extends AbstractQueuedSynchronizer {

@Override
protected int tryAcquireShared(int ignored) {
// 如果闭锁是开的(state==1),那么这个操作将成功,否则将失败
return (getState() == 1) ? 1 : -1;
}

@Override
protected boolean tryReleaseShared(int ignored) {
// 现在打开闭锁
setState(1);
// 现在其他的线程可以获取该闭锁
return true;
}
}
}

在OneShotLatch中,AQS状态用来表示闭锁状态——关闭(0)或者打开(1)。await方法调用AQS的acquireSharedInterruptibly,然后接着调用OneShotLatch中的tryAcquireShared方法。在tryAcquireShared的实现中必须返回一个值来表示该获取操作能否执行。如果之前已经打开了闭锁,那么tryAcquireShared将返回成功并允许线程通过,否则就会返回一个表示获取操作失败的值。acquireSharedInterruptibly方法在处理失败的方式,是把这个线程放入等待线程队列中。类似地,signal将调用releaseShared,接下来又会调用tryReleaseShared。在tryReleaseShared中将无条件地把闭锁的状态设置为打开,(通过返回值)表示该同步器处于完全被释放的状态。因而AQS让所有等待中的线程都尝试重新请求该同步器,并且由于tryAcquireShared将返回成功,因此现在的请求操作将成功。

OneShotLatch是一个功能全面的、可用的、性能较好的同步器,并且仅使用了大约20多行代码就实现了。当然,它缺少了一些有用的特性,例如限时的请求操作以及检查闭锁的状态,但这些功能实现起来同样很容易,因为AQS提供了限时版本的获取方法,以及一些在常见检查中使用的辅助方法。

oneShotLatch也可以通过扩展AQS来实现,而不是将一些功能委托给AQS,但这种做法并不合理[EJItem 14],原因有很多。这样做将破坏OneShotLatch接口(只有两个方法)的简洁性,并且虽然AQS的公共方法不允许调用者破坏闭锁的状态,但调用者仍可以很容易地误用它们。java.util.concurrent中的所有同步器类都没有直接扩展AQS,而是都将它们的相应功能委托给私有的AQS子类来实现。

java.util.concurrent同步器类中的AQS

ReentrantLock

ReentrantLock只支持独占方式的获取操作,因此它实现了tryAcquiretryReleaseisHeldExclusively

ReentrantLock将同步状态用于保存锁获取操作的次数,并且还维护一个owner变量来保存当前所有者线程的标识符,只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量。在tryRelease中检查owner域,从而确保当前线程在执行unlock操作之前已经获取了锁:在tryAcquire中将使用这个域来区分获取操作是重入的还是竞争的。

当一个线程尝试获取锁时,tryAcquire将首先检查锁的状态。如果锁未被持有,那么它将尝试更新锁的状态以表示锁已经被持有。由于状态可能在检查后被立即修改,因此tryAcquire使用compareAndSetState来原子地更新状态,表示这个锁已经被占有,并确保状态在最后一次检查以后就没有被修改过。(请参见15.3节中对compareAndSet的描述)。如果锁状态表明它已经被持有,并且如果当前线程是锁的拥有者,那么获取计数会递增,如果当前线程不是锁的拥有者,那么获取操作将失败。

ReentrantLock还利用了AQS对多个条件变量和多个等待线程集的内置支持。Lock.newCondition将返回一个新的ConditionObject实例,这是AQS的一个内部类。

示例:基于非公平的ReentrantLock实现tryAcquire:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected boolean tryAcquire(int ignored){
final Thread current=Thread.currentThread();
int c = getState();
if( c== 0) {
if(compareAndSetState(01)){
owner =current;
return true
}
} else if( current == owner){
setState(c+1);
return true
}
return false;
}

Semaphore与CountDownLatch

Semaphore将AQS的同步状态用于保存当前可用许可的数量。

tryAcquireShared方法首先计算剩余许可的数量,如果没有足够的许可,那么会返回一个值表示获取操作失败。如果还有剩余的许可,那么tryAcquireShared会通过compareAndSetState以原子方式来降低许可的计数。如果这个操作成功(这意味着许可的计数自从上一次读取后就没有被修改过),那么将返回一个值表示获取操作成功。在返回值中还包含了表示其他共享获取操作能否成功的信息,如果成功,那么其他等待的线程同样会解除阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 代码拷贝自 jdk17 源码
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
//当没有足够的许可,或者可以通过原子方式来更新许可的计数以响应获取操作时,循环将终止。
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

当没有足够的许可,或者当tryAcquireShared可以通过原子方式来更新许可的计数以响应获取操作时,while循环将终止。虽然对compareAndSetState的调用可能由于与另一个线程发生竞争而失败,并使其重新尝试,但在经过了一定次数的重试操作以后,在这两个结束条件中有一个会变为真。同样,tryReleaseShared将增加许可计数,这可能会解除等待中线程的阻塞状态,并且不断地重试直到更新操作成功。tryReleaseShared的返回值表示在这次释放操作中解除了其他线程的阻塞。

CountDownLatch使用AQS的方式与Semaphore很相似:在同步状态中保存的是当前的计数值。countDown方法调用release,从而导致计数值递减,并且当计数值为零时,解除所有等待线程的阻塞。await调用acquire,当计数器为零时,acquire将立即返回,否则将阻塞。

FutureTask

初看上去,FutureTask甚至不像一个同步器,但Future.get的语义非常类似于闭锁的语义——如果发生了某个事件(由FutureTask表示的任务执行完成或被取消),那么线程就可以恢复执行,否则这些线程将停留在队列中并直到该事件发生。

在FutureTask中,AQS同步状态被用来保存任务的状态,例如,正在运行、已完成或已取消。FutureTask还维护一些额外的状态变量,用来保存计算结果或者抛出的异常。此外,它还维护了一个引用,指向正在执行计算任务的线程(如果它当前处于运行状态),因而如果任务取消,该线程就会中断。

ReentrantReadWriteLock

ReadWriteLock接口表示存在两个锁:一个读取锁和一个写入锁,但在基于AQS实现的ReentrantReadWriteLock中,单个AQS子类将同时管理读取加锁和写入加锁。ReentrantReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位的状态来表示读取锁的计数。在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法。

AQS在内部维护一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问。在ReentrantReadWriteLock中,当锁可用时,如果位于队列头部的线程执行写入操作,那么线程会得到这个锁,如果位于队列头部的线程执行读取访问,那么队列中在第一个写入线程之前的所有线程都将获得这个锁。

小结

要实现一个依赖状态的类——如果没有满足依赖状态的前提条件,那么这个类的方法必须阻塞,那么最好的方式是基于现有的库类来构建。然而,有时候现有的库类不能提供足够的功能,在这种情况下,可以使用内置的条件队列、显式的Condition对象或者AbstractQueuedSynchronizer来构建自己的同步器。

内置条件队列与内置锁是紧密绑定在一起的,这是因为管理状态依赖性的机制必须与确保状态一致性的机制关联起来。同样,显式的Condition与显式的Lock也是紧密地绑定到一起的,并且与内置条件队列相比,还提供了一个扩展的功能集,包括每个锁对应于多个等待线程集,可中断或不可中断的条件等待,公平或非公平的队列操作,以及基于时限的等待。

原子变量与非阻塞同步机制

近年来,在并发算法领域的大多数研究都侧重于非阻塞算法,这种算法用底层的原子机器指令(例如比较并交换指令)代替锁来确保数据在并发访问中的一致性。非阻塞算法被广泛地用于在操作系统和JVM中实现线程/进程调度机制、垃圾回收机制以及锁和其他并发数据结构。

与基于锁的方案相比,非阻塞算法在设计和实现上都要复杂得多,但它们在可伸缩性和活跃性上却拥有巨大的优势。由于非阻塞算法可以使多个线程在竞争相同的数据时不会发生阻塞,因此它能在粒度更细的层次上进行协调,并且极大地减少调度开销。而且,在非阻塞算法中不存在死锁和其他活跃性问题。从Java 5.0开始,可以使用原子变量类(例如AtomicIntegerAtomicReference)来构建高效的非阻塞算法。

即使原子变量没有用于非阻塞算法的开发,它们也可以用做一种“更好的volatile类型变量”。原子变量提供了与volatile类型变量相同的内存语义,此外还支持原子的更新操作,从而使它们更加适用于实现计数器、序列发生器和统计数据收集等,同时还能比基于锁的方法提供更高的可伸缩性。

锁的劣势

现代的许多JVM都对非竞争锁获取和锁释放等操作进行了极大的优化,但如果有多个线程同时请求锁,那么JVM就需要借助操作系统的功能。如果出现了这种情况,那么一些线程将被挂起并且在稍后恢复运行。(当线程在锁上发生竞争时,智能的JVM不一定会挂起线程,而是根据之前获取操作中对锁的持有时间长短来判断是使此线程挂起还是自旋等待。)当线程恢复执行时,必须等待其他线程执行完它们的时间片以后,才能被调度执行。在挂起和恢复线程等过程中存在着很大的开销,并且通常存在着较长时间的中断。如果在基于锁的类中包含有细粒度的操作(例如同步容器类,在其大多数方法中只包含了少量操作),那么当在锁上存在着激烈的竞争时,调度开销与工作开销的比值会非常高。

与锁相比,volatile变量是一种更轻量级的同步机制,因为在使用这些变量时不会发生上下文切换或线程调度等操作。然而,volatile变量同样存在一些局限:虽然它们提供了相似的可见性保证,但不能用于构建原子的复合操作。因此,当一个变量依赖其他的变量时,或者当变量的新值依赖于旧值时,就不能使用volatile变量。

例如,虽然自增操作(++i)看起来像一个原子操作,但事实上它包含了3个独立的操作——获取变量的当前值,将这个值加1,然后再写入新值。为了确保更新操作不被丢失,整个的读-改-写操作必须是原子的。

锁定还存在其他一些缺点。当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行(例如发生了缺页错误、调度延迟,或者其他类似情况),那么所有需要这个锁的线程都无法执行下去。如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,那么这将是一个严重的问题——也被称为优先级反转(Priority Inversion)。即使高优先级的线程可以抢先执行,但仍然需要等待锁被释放,从而导致它的优先级会降至低优先级线程的级别。如果持有锁的线程被永久地阻塞(例如由于出现了无限循环,死锁,活锁或者其他的活跃性故障),所有等待这个锁的线程就永远无法执行下去。

即使忽略这些风险,锁定方式对于细粒度的操作(例如递增计数器)来说仍然是一种高开销的机制。

在管理线程之间的竞争时应该有一种粒度更细的技术,类似于volatile变量的机制,同时还要支持原子的更新操作。

硬件对并发的支持

独占锁是一项悲观技术——它假设最坏的情况(如果你不锁门,那么捣蛋鬼就会闯入并搞得一团糟),并且只有在确保其他线程不会造成干扰(通过获取正确的锁)的情况下才能执行下去。(又称悲观锁)。

对于细粒度的操作,还有另外一种更高效的方法,也是一种乐观的方法,通过这种方法可以在不发生干扰的情况下完成更新操作。这种方法需要借助冲突检查机制来判断在更新过程中是否存在来自其他线程的干扰,如果存在,这个操作将失败,并且可以重试(也可以不重试)。(即乐观锁)

现在,几乎所有的现代处理器中都包含了某种形式的原子读-改-写指令,例如比较并交换(Compare-and-Swap)或者关联加载/条件存储(Load-Linked/Store-Conditional)。操作系统和JVM使用这些指令来实现锁和并发的数据结构,但在Java 5.0之前,在Java类中还不能直接使用这些指令。

比较并交换

在大多数处理器架构(包括IA32和Sparc)中采用的方法是实现一个比较并交换(CAS)指令。CAS包含了3个操作数——需要读写的内存位置V、进行比较的值A和拟写入的新值B。当且仅当V的值等于A时,CAS才会通过原子方式用新值B来更新V的值,否则不会执行任何操作。CAS才会通过原子方式用新值B来更新V的值,否则不会执行任何操作。无论位置V的值是否等于A,都将返回V原有的值。(这种变化形式被称为比较并设置,无论操作是否成功都会返回。)CAS的含义是:“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”。

当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其他线程都将失败。然而,失败的线程并不会被挂起(这与获取锁的情况不同:当获取锁失败时,线程将被挂起),而是被告知在这次竞争中失败,并可以再次尝试。由于一个线程在竞争CAS时失败不会阻塞,因此它可以决定是否重新尝试,或者执行一些恢复操作,也或者不执行任何操作。这种灵活性就大大减少了与锁相关的活跃性风险。

下例模拟了CAS操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@ThreadSafe
public class SimulatedCAS {

@GuardedBy("this")
private int value;

public synchronized int get() {
return value;
}

public synchronized int compareAndSwap(int expectedValue, int newValue) {
int oldValue = value;
if (oldValue == expectedValue) {
value = newValue;
}
return oldValue;
}

public synchronized boolean compareAndSet(int expectedValue, int newValue) {
return expectedValue == compareAndSwap(expectedValue, newValue);
}
}

CAS的典型使用模式是:首先从V中读取值A,并根据A计算新值B,然后再通过CAS以原子方式将V中的值由A变成B(只要在这期间没有任何线程将V的值修改为其他值)。由于CAS能检测到来自其他线程的干扰,因此即使不使用锁也能够实现原子的读-改-写操作序列。

非阻塞的计数器

下例使用 CAS 实现了一个线程安全的计算器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class CasCounter {

private final SimulatedCAS value;

public CasCounter(SimulatedCAS value) {
this.value = value;
}

public int get() {
return value.get();
}

public int increment() {
int v;
do {
v = value.get();
} while (v != value.compareAndSwap(v, v + 1));
return v + 1;
}
}

递增操作采用了标准形式——读取旧的值,根据它计算出新值(加1),并使用CAS来设置这个新值。如果CAS失败,那么该操作将立即重试。通常,反复地重试是一种合理的策略,但在一些竞争很激烈的情况下,更好的方式是在重试之前首先等待一段时间或者回退,从而避免造成活锁问题。

初看起来,基于CAS的计数器似乎比基于锁的计数器在性能上更差一些,因为它需要执行更多的操作和更复杂的控制流,并且还依赖看似复杂的CAS操作。但实际上,当竞争程度不高时,基于CAS的计数器在性能上远远超过了基于锁的计数器,而在没有竞争时甚至更高。如果要快速获取无竞争的锁,那么至少需要一次CAS操作再加上与其他锁相关的操作,因此基于锁的计数器即使在最好的情况下也会比基于CAS的计数器在一般情况下能执行更多的操作。由于CAS在大多数情况下都能成功执行(假设竞争程度不高),因此硬件能够正确地预测while循环中的分支,从而把复杂控制逻辑的开销降至最低。

CAS的主要缺点是,它将使调用者处理竞争问题(通过重试、回退、放弃),而在锁中能自动处理竞争问题(线程在获得锁之前将一直阻塞)。事实上,CAS最大的缺陷在于难以围绕着CAS正确地构建外部算法。

CAS的性能会随着处理器数量的不同而变化很大。CAS的执行性能不仅在不同的体系架构之间变化很大,甚至在相同处理器的不同版本之间也会发生改变。生产厂商迫于竞争的压力,在接下来的几年内还会继续提高CAS的性能。一个很管用的经验法则是:在大多数处理器上,在无竞争的锁获取和释放的“快速代码路径”上的开销,大约是CAS开销的两倍。

JVM 对 CAS 的支持

Java代码如何确保处理器执行CAS操作?

在Java 5.0之前,如果不编写明确的代码,那么就无法执行CAS。在Java 5.0中引入了底层的支持,在int、long和对象的引用等类型上都公开了CAS操作,并且JVM把它们编译为底层硬件提供的最有效方法。

在支持CAS的平台上,运行时把它们编译为相应的(多条)机器指令。在最坏的情况下,如果不支持CAS指令,那么JVM将使用自旋锁。

在原子变量类(例如java.util.concurrent.atomic中的AtomicXxx)中使用了这些底层的JVM支持为数字类型和引用类型提供一种高效的CAS操作,而在java.util.concurrent中的大多数类在实现时则直接或间接地使用了这些原子变量类。

原子变量类

  • 原子变量比锁的粒度更细,量级更轻,并且对于在多处理器系统上实现高性能的并发代码来说是非常关键的。
  • 原子变量将发生竞争的范围缩小到单个变量上,这是你获得的粒度最细的情况(假设算法能够基于这种细粒度来实现)。
  • 更新原子变量的快速(非竞争)路径不会比获取锁的快速路径慢,并且通常会更快,而它的慢速路径肯定比锁的慢速路径快,因为它不需要挂起或重新调度线程。
  • 在使用基于原子变量而非锁的算法中,线程在执行时更不易出现延迟,并且如果遇到竞争,也更容易恢复过来。

原子变量类相当于一种泛化的volatile变量,能够支持原子的和有条件的读-改-写操作。

AtomicInteger表示一个int类型的值,并提供了get和set方法,这些Volatile类型的int变量在读取和写入上有着相同的内存语义。它还提供了一个原子的compareAndSet方法(如果该方法成功执行,那么将实现与读取/写入一个volatile变量相同的内存效果),以及原子的添加、递增和递减等方法。

共有12个原子变量类,可分为4组:标量类(Scalar)、更新器类、数组类以及复合变量类。

  • 最常用的原子变量就是标量类:AtomicInteger、AtomicLong、AtomicBoolean以及AtomicReference。
  • 原子数组类(只支持Integer、Long和Reference版本)中的元素可以实现原子更新。原子数组类为数组的元素提供了volatile类型的访问语义,这是普通数组所不具备的特性——volatile类型的数组仅在数组引用上具有volatile语义,而在其元素上则没有。
  • 其它类型的原子变量将于下节讨论

原子变量是一种更好的 “volatile”

示例 通过CAS来维持包含多个变量的不变性条件 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class CasNumberRange {

private static class IntPair {
final int lower;
final int upper;

private IntPair(int lower, int upper) {
this.lower = lower;
this.upper = upper;
}
}

private final AtomicReference<IntPair> values = new AtomicReference<>(new IntPair(0, 0));

public int getLower() {
return values.get().lower;
}

public int getUpper() {
return values.get().upper;
}

public void setLower(int i) {
while (true) {
IntPair oldv = values.get();
if (i > oldv.upper) {
throw new IllegalArgumentException("Can't set lower to " + i + " > upper");
}
IntPair newv = new IntPair(i, oldv.upper);
if (values.compareAndSet(oldv, newv)) {
return;
}
}
}

public void setUpper(int i) {
while (true) {
IntPair oldv = values.get();
if (i < oldv.lower) {
throw new IllegalArgumentException("Can't set upper to " + i + " < lower");
}
IntPair newv = new IntPair(oldv.lower, i);
if (values.compareAndSet(oldv, newv)) {
return;
}
}
}
}

性能比较:锁与原子变量

如果线程本地的计算量较少,那么在锁和原子变量上的竞争将非常激烈。如果线程本地的计算量较多,那么在锁和原子变量上的竞争会降低,因为在线程中访问锁和原子变量的频率将降低。

在高度竞争的情况下,锁的性能将超过原子变量的性能,但在更真实的竞争情况下,原子变量的性能将超过锁的性能。这是因为锁在发生竞争时会挂起线程,从而降低了CPU的使用率和共享内存总线上的同步通信量。(这类似于在生产者-消费者设计中的可阻塞生产者,它能降低消费者上的工作负载,使消费者的处理速度赶上生产者的处理速度。)

在实际情况中,原子变量在可伸缩性上要高于锁,因为在应对常见的竞争程度时,原子变量的效率会更高。

锁与原子变量在不同竞争程度上的性能差异很好地说明了各自的优势和劣势。在中低程度的竞争下,原子变量能提供更高的可伸缩性,而在高强度的竞争下,锁能够更有效地避免竞争。(在单CPU的系统上,基于CAS的算法在性能上同样会超过基于锁的算法,因为CAS在单CPU的系统上通常能执行成功,只有在偶然情况下,线程才会在执行读-改-写的操作过程中被其他线程抢占执行。)

如果能够避免使用共享状态,那么开销将会更小。我们可以通过提高处理竞争的效率来提高可伸缩性,但只有完全消除竞争,才能实现真正的可伸缩性。

非阻塞算法

在基于锁的算法中可能会发生各种活跃性故障。如果线程在持有锁时由于阻塞I/O,内存页缺失或其他延迟而导致推迟执行,那么很可能所有线程都不能继续执行下去。如果在某种算法中,一个线程的失败或挂起不会导致其他线程也失败或挂起,那么这种算法就被称为非阻塞算法。如果在算法的每个步骤中都存在某个线程能够执行下去,那么这种算法也被称为无锁(Lock-Free)算法。如果在算法中仅将CAS用于协调线程之间的操作,并且能正确地实现,那么它既是一种无阻塞算法,又是一种无锁算法。

在非阻塞算法中通常不会出现死锁和优先级反转问题(但可能会出现饥饿和活锁问题,因为在算法中会反复地重试)。

在许多常见的数据结构中都可以使用非阻塞算法,包括栈、队列、优先队列以及散列表等,而要设计一些新的这种数据结构,最好还是由专家们来完成。

非阻塞的栈

在实现相同功能的前提下,非阻塞算法通常比基于锁的算法更为复杂。

创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性。

栈是最简单的链式数据结构:每个元素仅指向一个元素,并且每个元素也只被一个元素引用。

示例 ConcurrentStack 中给出了如何通过原子引用来构建栈:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ConcurrentStack<E> {

private final AtomicReference<Node<E>> top = new AtomicReference<>();

public void push(E item) {
Node<E> newNode = new Node<>(item);
Node<E> oldNode;
do {
oldNode = top.get();
newNode.next = oldNode;
} while (!top.compareAndSet(oldNode, newNode));
}

public E pop() {

Node<E> oldHead;
Node<E> newHead;

do {
oldHead = top.get();
if (oldHead == null) {
return null;
}
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));

return oldHead.item;
}

private static class Node<E> {
private final E item;
private Node<E> next;

private Node(E item) {
this.item = item;
}
}
}

栈是由Node元素构成的一个链表,其中栈顶作为根节点,并且在每个元素中都包含了一个值以及指向下一个元素的链接。push方法创建一个新的节点,该节点的next域指向当前的栈顶,然后使用CAS把这个新节点放入栈顶。如果在开始插入节点时,位于栈顶的节点没有发生变化,那么CAS就会成功,如果栈顶节点发生了变化(例如由于其他线程在本线程开始之前插入或移除了元素),那么CAS将会失败,而push方法会根据栈的当前状态来更新节点,并且再次尝试。无论哪种情况,在CAS执行完成后,后栈仍会处于一致的状态。

在像ConcurrentStack这样的非阻塞算法中都能确保线程安全性,因为compareAndSet像锁定机制一样,既能提供原子性,又能提供可见性。当一个线程需要改变栈的状态时,将调用compareAndSet,这个方法与写入volaitle类型的变量有着相同的内存效果。当线程检查栈的状态时,将在同一个AtomicReference上调用get方法,这个方法与读取volaitle类型的变量有着相同的内存效果。因此,一个线程执行的任何修改结构都可以安全地发布给其他正在查看状态的线程。并且,这个栈是通过compareAndSet来修改的,因此将采用原子操作来更新top的引用,或者在发现存在其他线程干扰的情况下,修改操作将失败。

非阻塞的链表

链接队列比栈更为复杂,因为它必须支持对头节点和尾结点的快速访问。因此,它需要单独维护的头指针和尾指针。有两个指针指向位于尾部的节点:当前最后一个元素的next指针,以及尾节点。当成功地插入一个新元素时,这两个指针都需要采用原子操作来更新。

初看起来,这个操作无法通过原子变量来实现。在更新这两个指针时需要不同的CAS操作,并且如果第一个CAS成功,但第二个CAS失败,那么队列将处于不一致的状态。而且,即使这两个CAS都成功了,那么在执行这两个CAS之间,仍可能有另一个线程会访问这个队列。因此,在为链接队列构建非阻塞算法时,需要考虑到这两种情况。

我们需要使用一些技巧。第一个技巧是,即使在一个包含多个步骤的更新操作中,也要确保数据结构总是处于一致的状态。这样,当线程B到达时,如果发现线程A正在执行更新,那么线程B就可以知道有一个操作已部分完成,并且不能立即开始执行自己的更新操作。然后,B可以等待(通过反复检查队列的状态)并直到A完成更新,从而使两个线程不会相互干扰。

虽然这种方法能够使不同的线程“轮流”访问数据结构,并且不会造成破坏,但如果一个线程在更新操作中失败了,那么其他的线程都无法再访问队列。要使得该算法成为一个非阻塞算法,必须确保当一个线程失败时不会妨碍其他线程继续执行下去。因此,第二个技巧是,如果当B到达时发现A正在修改数据结构,那么在数据结构中应该有足够多的信息,使得B能完成A的更新操作。如果B“帮助”A完成了更新操作,那么B可以执行自己的操作,而不用等待A的操作完成。当A恢复后再试图完成其操作时,会发现B已经替它完成了。

LinkedQueue中给出了Michael-Scott提出的非阻塞链接队列算法中的插入部分(Michael and Scott,1996),在ConcurrentLinkedQueue中使用的正是该算法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class LinkedQueue<E> {

private final Node<E> dummy = new Node<>(null, null);
private final AtomicReference<Node<E>> head = new AtomicReference<>(dummy);
private final AtomicReference<Node<E>> tail = new AtomicReference<>(dummy);

public boolean put(E item) {
Node<E> newNode = new Node<>(item, null);
while (true) {
Node<E> curTail = tail.get();
Node<E> tailNext = curTail.next.get();
if (curTail == tail.get()) {
if (tailNext != null) { // A
// 队列处于中间状态,推进尾节点
tail.compareAndSet(curTail, tailNext); // B
} else {
// 队列处于稳定状态,尝试插入新节点
if (curTail.next.compareAndSet(null, newNode)) { // C
// 插入操作成功,尝试推进尾节点
tail.compareAndSet(curTail, newNode); // D
return true;
}
}
}
}
}

private static class Node<E> {
final E item;
final AtomicReference<Node<E>> next;

private Node(E item, Node<E> next) {
this.item = item;
this.next = new AtomicReference<>(next);
}
}
}

在许多队列算法中,空队列通常都包含一个“哨兵(Sentinel)节点”或者“哑(Dummy)节点”,并且头节点和尾节点在初始化时都指向该哨兵节点。尾节点通常要么指向哨兵节点(如果队列为空),即队列的最后一个元素,要么(当有操作正在执行更新时)指向倒数第二个元素。

当插入一个新的元素时,需要更新两个指针。首先更新当前最后一个元素的next指针,将新节点链接到列表队尾,然后更新尾节点,将其指向这个新元素。在这两个操作之间,队列处于一种中间状态,如图所示。在第二次更新完成后,队列将再次处于稳定状态,如图所示。

image-20211222153303204

实现这两个技巧时的关键点在于:当队列处于稳定状态时,尾节点的next域将为空,如果队列处于中间状态,那么tail.next将为非空。因此,任何线程都能够通过检查tail.next来获取队列当前的状态。而且,当队列处于中间状态时,可以通过将尾节点向前移动一个节点,从而结束其他线程正在执行的插入元素操作,并使得队列恢复为稳定状态。

LinkedQueue.put方法在插入新元素之前,将首先检查队列是否处于中间状态(步骤A)。如果是,那么有另一个线程正在插入元素(在步骤C和D之间)。此时当前线程不会等待其他线程执行完成,而是帮助它完成操作,并将尾节点向前推进一个节点(步骤B)。然后,它将重复执行这种检查,以免另一个线程已经开始插入新元素,并继续推进尾节点,直到它发现队列处于稳定状态之后,才会开始执行自己的插入操作。

由于步骤C中的CAS将把新节点链接到队列尾部,因此如果两个线程同时插入元素,那么这个CAS将失败。在这样的情况下,并不会造成破坏:不会发生任何变化,并且当前的线程只需重新读取尾节点并再次重试。如果步骤C成功了,那么插入操作将生效,第二个CAS(步骤D)被认为是一个“清理操作”,因为它既可以由执行插入操作的线程来执行,也可以由其他任何线程来执行。如果步骤D失败,那么执行插入操作的线程将返回,而不是重新执行CAS,因为不再需要重试——另一个线程已经在步骤B中完成了这个工作。这种方式能够工作,因为在任何线程尝试将一个新节点插入到队列之前,都会首先通过检查tail.next是否非空来判断是否需要清理队列。如果是,它首先会推进尾节点(可能需要执行多次),直到队列处于稳定状态。

原子的域更新器

在ConcurrentLinkedQueue中没有使用原子引用来表示每个Node,而是使用普通的volatile类型引用,并通过基于反射的AtomicReferenceFieldUpdater来进行更新,jdk17 中已被替换为 VarHander。

在ConcurrentLinkedQueue中,使用nextUpdater的compareAndSet方法来更新Node的next域。这个方法有点繁琐,但完全是为了提升性能。对于一些频繁分配并且生命周期短暂的对象,例如队列的链接节点,如果能去掉每个Node的AtomicReference创建过程,那么将极大地降低插入操作的开销。然而,几乎在所有情况下,普通原子变量的性能都很不错,只有在很少的情况下才需要使用原子的域更新器。(如果在执行原子更新的同时还需要维持现有类的串行化形式,那么原子的域更新器将非常有用。)

ABA 问题

ABA问题是一种异常现象:如果在算法中的节点可以被循环使用,那么在使用“比较并交换”指令时就可能出现这种问题(主要在没有垃圾回收机制的环境中)。在CAS操作中将判断“V的值是否仍然为A?”,并且如果是的话就继续执行更新操作。在大多数情况下,包括本章给出的示例,这种判断是完全足够的。然而,有时候还需要知道“自从上次看到V的值为A以来,这个值是否发生了变化?”在某些算法中,如果V的值首先由A变成B,再由B变成A,那么仍然被认为是发生了变化,并需要重新执行算法中的某些步骤。

如果在算法中采用自己的方式来管理节点对象的内存,那么可能出现ABA问题。在这种情况下,即使链表的头节点仍然指向之前观察到的节点,那么也不足以说明链表的内容没有发生改变。如果通过垃圾回收器来管理链表节点仍然无法避免ABA问题,那么还有一个相对简单的解决方案:不是更新某个引用的值,而是更新两个值,包括一个引用和一个版本号。即使这个值由A变为B,然后又变为A,版本号也将是不同的。

AtomicStampedReference(以及AtomicMarkableReference)支持在两个变量上执行原子的条件更新。AtomicStampedReference将更新一个“对象-引用”二元组,通过在引用上加上“版本号”,从而避免ABA问题。类似地,AtomicMarkableReference将更新一个“对象引用-布尔值”二元组,在某些算法中将通过这种二元组使节点保存在链表中同时又将其标记为“已删除的节点”。

小结

非阻塞算法通过底层的并发原语(例如比较并交换而不是锁)来维持线程的安全性。这些底层的原语通过原子变量类向外公开,这些类也用做一种“更好的volatile变量”,从而为整数和对象引用提供原子的更新操作。

非阻塞算法在设计和实现时非常困难,但通常能够提供更高的可伸缩性,并能更好地防止活跃性故障的发生。在JVM从一个版本升级到下一个版本的过程中,并发性能的主要提升都来自于(在JVM内部以及平台类库中)对非阻塞算法的使用。

Java 内存模型

本章将介绍Java内存模型的底层需求以及所提供的保证,此外还将介绍在本书给出的一些高层设计原则背后的原理。

什么是内存模型,为什么需要它

假设一个线程为变量aVariable赋值:

1
aVariable = 3;

内存模型需要解决这个问题:“在什么条件下,读取aVariable的线程将看到这个值为3?”,这听起来似乎是一个愚蠢的问题,但如果缺少同步,那么将会有许多因素使得线程无法立即甚至永远,看到另一个线程的操作结果。

以下因素都会使得一个线程无法看到变量的最新值,并且会导致其他线程中的内存操作似乎在乱序执行——如果没有使用正确的同步:

  • 在编译器中生成的指令顺序,可以与源代码中的顺序不同,
  • 此外编译器还会把变量保存在寄存器而不是内存中;
  • 处理器可以采用乱序或并行等方式来执行指令;
  • 缓存可能会改变将写入变量提交到主内存的次序;
  • 而且,保存在处理器本地缓存中的值,对于其他处理器是不可见的。

Java语言规范要求JVM在线程中维护一种类似串行的语义:只要程序的最终结果与在严格串行环境中执行的结果相同,那么上述所有操作都是允许的。

在最近几年中,计算性能的提升在很大程度上要归功于这些重新排序措施。当然,时钟频率的提供同样提升了性能,此外还有不断提升的并行性——采用流水线的超标量执行单元,动态指令调度,猜测执行以及完备的多级缓存。随着处理变得越来越强大,编译器也在不断地改进:通过对指令重新排序来实现优化执行,以及使用成熟的全局寄存器分配算法。由于时钟频率越来越难以提高,因此许多处理器制造厂商都开始转而生产多核处理器,因为能够提高的只有硬件并行性。

在多线程环境中,维护程序的串行性将导致很大的性能开销。对于并发应用程序中的线程来说,它们在大部分时间里都执行各自的任务,因此在线程之间的协调操作只会降低应用程序的运行速度,而不会带来任何好处。只有当多个线程要共享数据时,才必须协调它们之间的操作,并且JVM依赖程序通过同步操作来找出这些协调操作将在何时发生。

JMM规定了JVM必须遵循一组最小保证,这组保证规定了对变量的写入操作在何时将对于其他线程可见。JMM在设计时就在可预测性和程序的易于开发性之间进行了权衡,从而在各种主流的处理器体系架构上能实现高性能的JVM。

平台的内存模型

在共享内存的多处理器体系架构中,每个处理器都拥有自己的缓存,并且定期地与主内存进行协调。在不同的处理器架构中提供了不同级别的缓存一致性(Cache Coherence),其中一部分只提供最小的保证,即允许不同的处理器在任意时刻从同一个存储位置上看到不同的值。操作系统、编译器以及运行时(有时甚至包括应用程序)需要弥合这种在硬件能力与线程安全需求之间的差异。

要想确保每个处理器都能在任意时刻知道其他处理器正在进行的工作,将需要非常大的开销。在大多数时间里,这种信息是不必要的,因此处理器会适当放宽存储一致性保证,以换取性能的提升。在架构定义的内存模型中将告诉应用程序可以从内存系统中获得怎样的保证,此外还定义了一些特殊的指令(称为内存栅栏或栅栏),当需要共享数据时,这些指令就能实现额外的存储协调保证。为了使Java开发人员无须关心不同架构上内存模型之间的差异,Java还提供了自己的内存模型,并且JVM通过在适当的位置上插入内存栅栏来屏蔽在JMM与底层平台内存模型之间的差异。

程序执行一种简单假设:想象在程序中只存在唯一的操作执行顺序,而不考虑这些操作在何种处理器上执行,并且在每次读取变量时,都能获得在执行序列中(任何处理器)最近一次写入该变量的值。这种乐观的模型就被称为串行一致性。软件开发人员经常会错误地假设存在串行一致性,但在任何一款现代多处理器架构中都不会提供这种串行一致性,JMM也是如此

在现代支持共享内存的多处理器(和编译器)中,当跨线程共享数据时,会出现一些奇怪的情况,除非通过使用内存栅栏来防止这些情况的发生。幸运的是,Java程序不需要指定内存栅栏的位置,而只需通过正确地使用同步来找出何时将访问共享状态。

重排序

在没有充分同步的程序中,如果调度器采用不恰当的方式来交替执行不同线程的操作,那么将导致不正确的结果。更糟的是,JMM还使得不同线程看到的操作执行顺序是不同的,从而导致在缺乏同步的情况下,要推断操作的执行顺序将变得更加复杂。各种使操作延迟或者看似乱序执行的不同原因,都可以归为重排序。

PossibleReordering中说明了,在没有正确同步的情况下,即使要推断最简单的并发程序的行为也很困难。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class PossibleReordering {

int a;
int b = 0;
int x;
int y = 0;

@Test
void test() throws InterruptedException {
Thread one =
new Thread(
() -> {
a = 1;
x = b;
});

Thread two =
new Thread(
() -> {
b = 1;
y = a;
});

one.start();
two.start();

one.join();
two.join();

System.out.println("(" + x + "," + y + ")");
}
}

很容易想象PossibleReordering是如何输出(1,0)或(0,1)或(1,1)的:线程A可以在线程B开始之前就执行完成,线程B也可以在线程A开始之前执行完成,或者二者的操作交替执行。但奇怪的是,PossibleReordering还可以输出(0,0)。由于每个线程中的各个操作之间不存在数据流依赖性,因此这些操作可以乱序执行。(即使这些操作按照顺序执行,但在将缓存刷新到主内存的不同时序中也可能出现这种情况,从线程B的角度看,线程A中的赋值操作可能以相反的次序执行。)

下图给出了一种可能由重排序导致的交替执行方式,在这种情况中会输出(0,0)。

image-20211224112719963

内存级的重排序会使程序的行为变得不可预测。如果没有同步,那么推断出执行顺序将是非常困难的,而要确保在程序中正确地使用同步却是非常容易的。同步将限制编译器、运行时和硬件对内存操作重排序的方式,从而在实施重排序时不会破坏JMM提供的可见性保证。

在大多数主流的处理器架构中,内存模型都非常强大,使得读取volatile变量的性能与读取非volatile变量的性能大致相当。

Java 内存模型简介

Java内存模型是通过各种操作来定义的,包括对变量的读/写操作,监视器的加锁和释放操作,以及线程的启动和合并操作。

JMM为程序中所有的操作定义了一个偏序关系,称之为Happens-Before。要想保证执行操作B的线程看到操作A的结果(无论A和B是否在同一个线程中执行),那么在A和B之间必须满足Happens-Before关系。如果两个操作之间缺乏Happens-Before关系,那么JVM可以对它们任意地重排序。

偏序关系π是集合上的一种关系,具有反对称、自反和传递属性,但对于任意两个元素x, y来说,并不需要一定满足xπy或yπx的关系。我们每天都在使用偏序关系来表达喜好,例如我们可以更喜欢寿司而不是干酪三明治,可以更喜欢莫扎特而不是马勒,但我们不必在干酪三明治和莫扎特之间作出明确的喜好选择。

Happens-Before的规则包括:

  • 程序顺序规则。如果程序中操作A在操作B之前,那么在线程中A操作将在B操作之前执行。
  • 监视器锁规则。在监视器锁上的解锁操作必须在同一个监视器锁上的加锁操作之前执行。
    • 显示锁与内置锁在加锁和解锁等操作上有着相同的内存语义。
  • volatile变量规则。对volatile变量的写入操作必须在对该变量的读操作之前执行。
    • 原子变量与 volatile 变量在读操作和写操作上有着相同的语义。
  • 线程启动规则。在线程上对Thread.Start的调用必须在该线程中执行任何操作之前执行。
  • 线程结束规则。线程中的任何操作都必须在其他线程检测到该线程已经结束之前执行,或者从Thread.join中成功返回,或者在调用Thread.isAlive时返回false
  • 中断规则。当一个线程在另一个线程上调用interrupt时,必须在被中断线程检测到interrupt调用之前执行(通过抛出InterruptedException,或者调用isInterruptedinterrupted)。
  • 终结器规则。对象的构造函数必须在启动该对象的终结器之前执行完成。
  • 传递性。如果操作A在操作B之前执行,并且操作B在操作C之前执行,那么操作A必须在操作C之前执行。

虽然这些操作只满足偏序关系,但同步操作,如锁的获取与释放等操作,以及volatile变量的读取与写入操作,都满足全序关系。因此,在描述Happens-Before关系时,就可以使用“后续的锁获取操作”和“后续的volatile变量读取操作”等表达术语。

下图给出了当两个线程使用同一个锁进行同步时,在它们之间的Happens-Before关系。在线程A内部的所有操作都按照它们在源程序中的先后顺序来排序,在线程B内部的操作也是如此。由于A释放了锁M,并且B随后获得了锁M,因此A中所有在释放锁之前的操作,也就位于B中请求锁之后的所有操作之前。如果这两个线程是在不同的锁上进行同步的,那么就不能推断它们之间的动作顺序,因为在这两个线程的操作之间并不存在Happens-Before关系。

image-20211224115155859

借助同步

由于Happens-Before的排序功能很强大,因此有时候可以“借助(Piggyback)”现有同步机制的可见性属性。这需要将Happens-Before的程序顺序规则与其他某个顺序规则(通常是监视器锁规则或者volatile变量规则)结合起来,从而对某个未被锁保护的变量的访问操作进行排序。这项技术由于对语句的顺序非常敏感,因此很容易出错。它是一项高级技术,并且只有当需要最大限度地提升某些类(例如ReentrantLock)的性能时,才应该使用这项技术。

类库中提供了一些Happens-Before排序,包括:

  • 将一个元素放入一个线程安全容器的操作将在另一个线程从该容器中获得这个元素的操作之前执行。
  • 在CountDownLatch上的倒数操作将在线程从闭锁上的await方法中返回之前执行。
  • 释放Semaphore许可的操作将在从该Semaphore上获得一个许可之前执行。
  • Future表示的任务的所有操作将在从Future.get中返回之前执行。
  • 向Executor提交一个Runnable或Callable的操作将在任务开始执行之前执行。
  • 一个线程到达CyclicBarrier或Exchanger的操作将在其他到达该栅栏或交换点的线程被释放之前执行。如果CyclicBarrier使用一个栅栏操作,那么到达栅栏的操作将在栅栏操作之前执行,而栅栏操作又会在线程从栅栏中释放之前执行。

发布

造成不正确发布的真正原因,就是在“发布一个共享对象”与“另一个线程访问该对象”之间缺少一种Happens-Before排序。

不安全的发布

当缺少Happens-Before关系时,就可能出现重排序问题,这就解释了为什么在没有充分同步的情况下发布一个对象会导致另一个线程看到一个只被部分构造的对象。在初始化一个新的对象时需要写入多个变量,即新对象中的各个域。同样,在发布一个引用时也需要写入一个变量,即新对象的引用。如果无法确保发布共享引用的操作在另一个线程加载该共享引用之前执行,那么对新对象引用的写入操作将与对象中各个域的写入操作重排序(从使用该对象的线程的角度来看)。在这种情况下,另一个线程可能看到对象引用的最新值,但同时也将看到对象的某些或全部状态中包含的是无效值,即一个被部分构造对象。

错误的延迟初始化将导致不正确的发布,如下例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Terrible
public class UnsafeLazyInitialization {

private static Resource resource;

public static Resource getInstance() {
if (resource == null) {
resource = new Resource();
}
return resource;
}

static class Resource {}
}

初看起来,在程序中存在的问题只有竞态条件问题。然而,即使不考虑竞态条件问题,UnsafeLazyInitialization仍然是不安全的,因为另一个线程可能看到对部分构造的Resource实例的引用。

假设线程A是第一个调用getInstance的线程。它将看到resource为null,并且初始化一个新的Resource,然后将resource设置为执行这个新实例。当线程B随后调用getInstance,它可能看到resource的值为非空,因此使用这个已经构造好的Resource。最初这看不出任何问题,但线程A写入resource的操作与线程B读取resource的操作之间不存在Happens-Before关系。在发布对象时存在数据竞争问题,因此B并不一定能看到Resource的正确状态。

当新分配一个Resource时,Resource的构造函数将把新实例中的各个域由默认值(由Object构造函数写入的)修改为它们的初始值。由于在两个线程中都没有使用同步,因此线程B看到的线程A中的操作顺序,可能与线程A执行这些操作时的顺序并不相同。因此,即使线程A初始化Resource实例之后再将resource设置为指向它,线程B仍可能看到对resource的写入操作将在对Resource各个域的写入操作之前发生。因此,线程B就可能看到一个被部分构造的Resource实例,该实例可能处于无效状态,并在随后该实例的状态可能出现无法预料的变化。

除了不可变对象以外,使用被另一个线程初始化的对象通常都是不安全的,除非对象的发布操作是在使用该对象的线程开始使用之前执行。

安全的发布

Happens-Before比安全发布提供了更强可见性与顺序保证。

如果将X从A安全地发布到B,那么这种安全发布可以保证X状态的可见性,但无法保证A访问的其他变量的状态可见性。然而,如果A将X置入队列的操作在线程B从队列中获取X的操作之前执行,那么B不仅能看到A留下的X状态(假设线程A或其他线程都没有对X再进行修改),而且还能看到A在移交X之前所做的任何操作(再次注意同样的警告)。

既然JMM已经提供了这种更强大的Happens-Before关系,那么为什么还要介绍@GuardedBy和安全发布呢?与内存写入操作的可见性相比,从转移对象的所有权以及对象公布等角度来看,它们更符合大多数的程序设计。Happens-Before排序是在内存访问级别上操作的,它是一种“并发级汇编语言”,而安全发布的运行级别更接近程序设计。

安全初始化模式

有时候,我们需要推迟一些高开销的对象初始化操作,并且只有当使用这些对象时才进行初始化,但我们也看到了在误用延迟初始化时导致的问题。

下例通过将getResource方法声明为synchronized,可以修复UnsafeLazyInitialization中的问题。由于getInstance的代码路径很短(只包括一个判断预见和一个预测分支),因此如果getInstance没有被多个线程频繁调用,那么在SafeLazyInitialization上不会存在激烈的竞争,从而能提供令人满意的性能。

1
2
3
4
5
6
7
8
9
10
11
12
@ThreadSafe
public class SafeLazyInitialization {

private static Resource resource;

public synchronized Resource getInstance() {
if (resource == null) {
resource = new Resource();
}
return resource;
}
}

在初始器中采用了特殊的方式来处理静态域(或者在静态初始化代码块中初始化的值[JPL 2.2.1和2.5.3]),并提供了额外的线程安全性保证。静态初始化器是由JVM在类的初始化阶段执行,即在类被加载后并且被线程使用之前。由于JVM将在初始化期间获得一个锁[JLS 12.4.2],并且每个线程都至少获取一次这个锁以确保这个类已经加载,因此在静态初始化期间,内存写入操作将自动对所有线程可见。因此无论是在被构造期间还是被引用时,静态初始化的对象都不需要显式的同步。然而,这个规则仅适用于在构造时的状态,如果对象是可变的,那么在读线程和写线程之间仍然需要通过同步来确保随后的修改操作是可见的,以及避免数据破坏。

通过使用提前初始化(Eager Initialization),避免了在每次调用SafeLazyInitialization中的getInstance时所产生的同步开销。如下例所示:

1
2
3
4
5
6
7
8
9
@ThreadSafe
public class EagerInitialization {

private static final Resource resource = new Resource();

public Resource getInstance() {
return resource;
}
}

通过将这项技术和JVM的延迟加载机制结合起来,可以形成一种延迟初始化技术,从而在常见的代码路径中不需要同步。下例 “延迟初始化占位(Holder)类模式”[EJ Item 48]中使用了一个专门的类来初始化Resource。

1
2
3
4
5
6
7
8
9
10
11
@ThreadSafe
public class ResourceFactory {

private static class ResourceHolder {
private static final Resource resource = new Resource();
}

public static Resource getInstance() {
return ResourceHolder.resource;
}
}

双重检查锁

在任何一本介绍并发的书中都会讨论声名狼藉的双重检查加锁(DCL),在早期的JVM中,同步(甚至是无竞争的同步)都存在着巨大的性能开销。因此,人们想出了许多“聪明的(或者至少看上去聪明)”技巧来降低同步的影响,有些技巧很好,但也有些技巧是不好的,甚至是糟糕的,DCL就属于“糟糕”的一类。如下例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@NotThreadSafe
@Terrible
public class DoubleCheckedLocking {

private static Resource resource;

public static Resource getInstance() {
if (resource == null) {
synchronized (DoubleCheckedLocking.class) {
if (resource == null) {
resource = new Resource();
}
}
}
return resource;
}
}

由于早期的JVM在性能上存在一些有待优化的地方,因此延迟初始化经常被用来避免不必要的高开销操作,或者降低程序的启动时间。在编写正确的延迟初始化方法中需要使用同步。但在当时,同步不仅执行速度很慢,并且更重要的是,开发人员还没有完全理解同步的含义:虽然人们能很好地理解了“独占性”的含义,但却没有很好地理解“可见性”的含义。

DCL声称能实现两全其美——在常见代码路径上的延迟初始化中不存在同步开销。它的工作原理是,首先检查是否在没有同步的情况下需要初始化,如果resource引用不为空,那么就直接使用它。否则,就进行同步并再次检查Resource是否被初始化,从而保证只有一个线程对共享的Resource执行初始化。在常见的代码路径中——获取一个已构造好的Resource引用,并没有使用同步。这就是问题所在:线程可能看到一个仅被部分构造的Resource

DCL的真正问题在于:当在没有同步的情况下读取一个共享对象时,可能发生的最糟糕事情只是看到一个失效值(在这种情况下是一个空值),此时DCL方法将通过在持有锁的情况下再次尝试来避免这种风险。然而,实际情况远比这种情况糟糕——线程可能看到引用的当前值,但对象的状态值却是失效的,这意味着线程可以看到对象处于无效或错误的状态。

在JMM的后续版本(Java 5.0以及更高的版本)中,如果把resource声明为volatile类型,那么就能启用DCL,并且这种方式对性能的影响很小,因为volatile变量读取操作的性能通常只是略高于非volatile变量读取操作的性能。

DCL的这种使用方法已经被广泛地废弃了——促使该模式出现的驱动力(无竞争同步的执行速度很慢,以及JVM启动时很慢)已经不复存在,因而它不是一种高效的优化措施。延迟初始化占位类模式能带来同样的优势,并且更容易理解。

初始化过程中的安全性

如果能确保初始化过程的安全性,那么就可以使得被正确构造的不可变对象在没有同步的情况下也能安全地在多个线程之间共享,而不管它们是如何发布的,甚至通过某种数据竞争来发布。如果不能确保初始化的安全性,那么当在发布或线程中没有使用同步时,一些本应为不可变对象(例如String)的值将会发生改变。

初始化安全性将确保,对于被正确构造的对象,所有线程都能看到由构造函数为对象给各个final域设置的正确值,而不管采用何种方式来发布对象。而且,对于可以通过被正确构造对象中某个final域到达的任意变量(例如某个final数组中的元素,或者由一个final域引用的HashMap的内容)将同样对于其他线程是可见的。

对于含有final域的对象,初始化安全性可以防止对对象的初始引用被重排序到构造过程之前。当构造函数完成时,构造函数对final域的所有写入操作,以及对通过这些域可以到达的任何变量的写入操作,都将被“冻结”,并且任何获得该对象引用的线程都至少能确保看到被冻结的值。对于通过final域可到达的初始变量的写入操作,将不会与构造过程后的操作一起被重排序。

初始化安全性意味着,下例中的SafeStates可以安全地发布,即便通过不安全的延迟初始化,或者在没有同步的情况下将SafeStates的引用放到一个公有的静态域,或者没有使用同步以及依赖于非线程安全的HashSet。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@ThreadSafe
public class SafeStates {

private final Map<String, String> states;

public SafeStates() {
states = new HashMap<>();
states.put("11", "22");
}

public String getState(String key) {
return states.get(key);
}
}

然而,许多对SafaStates的细微修改都可能破坏它的线程安全性。如果states不是final类型,或者存在除构造函数以外的其他方法能修改states,那么初始化安全性将无法确保在缺少同步的情况下安全地访问SafeStates。如果在SafeStates中还有其他的非final域,那么其他线程仍然可能看到这些域上的不正确的值。这也导致了对象在构造过程中逸出,从而使初始化安全性的保证无效。

初始化安全性只能保证通过final域可达的值从构造过程完成时开始的可见性。对于通过非final域可达的值,或者在构成过程完成后可能改变的值,必须采用同步来确保可见性。

小结

Java内存模型说明了某个线程的内存操作在哪些情况下对于其他线程是可见的。其中包括确保这些操作是按照一种Happens-Before的偏序关系进行排序,而这种关系是基于内存操作和同步操作等级别来定义的。


, ,