第一部分 基础知识

[TOC]

第一部分(第2章~第5章)重点介绍了并发性和线程安全性的基本概念,以及如何使用类库提供的基本并发构建块来构建线程安全类。在第一部分给出了一个清单,其中总结了这一部分中介绍的最重要的规则。

第2章与第3章构成了本书的基础。在这两章中给出了几乎所有用于避免并发危险、构造线程安全的类以及验证线程安全的规则。如果读者重“实践”而轻“理论”,那么可能会直接跳到第二部分,但在开始编写任何并发代码之前,一定要回来读一读这两章!

第4章介绍了如何将一些小的线程安全类组合成更大的线程安全类。

第5章介绍了在平台库中提供的一些基础的并发构建模块,包括线程安全的容器类和同步工具类。

线程安全性

什么是线程安全性

当多个线程访问某个类时,这个类始终都能表现出正确的行为,那么就称这个类是线程安全的。

原子性

竞态条件 Race Condition

当某个计算的正确性取决于多个线程的交替执行时序时,那么就会发生竞态条件
大多是竞态条件的本质:基于一种可能失效的观察结果来做出判断或者执行某个计算

先检查后执行是最常见的竞态条件
读取-修改-写入 是另一种竞态条件

原子操作

要避免竞态条件问题,就必须在某个线程修改变量时,通过某种方式防止其他线程使用这个变量,从而确保其他线程只能在修改操作完成之前或之后读取和修改状态,而不是在修改状态的过程中

原子操作是指,对于访问同一个状态的所有操作(包含该操作本身)来说,这个操作是一个以原子方式执行的操作
原子性:一组操作,要么全部执行,要么全部不执行

复合操作:包含了一组必须以原子方式执行的操作

加锁机制

要保持状态的一致性,就需要在单个原子操作中更新所有相关的状态变量

内置锁 synchronized

同步代码块包括两个部分:一个作为锁的对象引用,一个作为由这个锁保护的代码块

1
2
3
synchronized (lock) {
// 访问或者修改由锁保护的共享状态
}

每个 Java 对象都可以作为一个实现同步的锁,这个锁统称为内置锁(Intrinsic Lock)或者监视器锁(Monitor Lock)
Java 的内置锁是一种互斥锁

可重入

Java 内置锁是可重入的

如果不可重入,可能会发生死锁

性能

通常,在简单性与性能之间存在相互制约因素。当实现某个同步策略时,一定不要盲目地为了性能二牺牲简单性

当执行时间较长的计算或者可能无法快速完成的操作时,一定不要持有锁

对象的共享

同步还有另外一个重要的方面:内存可见性。我们不仅希望防止某个线程正在使用对象状态而另一个线程同时修改该状态,而且希望确保当一个线程修改了对象状态后,其他线程能够看到发生的状态变化

可见性

可见性时一种复杂的属性,因为可见性中的错误总是会违背我们的直觉。

在没有同步的情况下,编译器、处理器以及运行时等都可能对操作的执行顺序进行一些意想不到的调整。即指令重排序

加锁的含义不仅仅局限于互斥行为,还包括内存可见性。

Volatile 变量

  • Volatile 修饰的变量禁止指令重排序
  • 保证可见性

仅当 volatile变量能简化代码的实现以及对同步策略的验证时,才使用

volatile 变量的正确使用方式:

  • 确保它们自身状态的可见性
  • 确保它们所引用对象的状态的可见性
  • 标识一些重要的程序生命周期事件的发生

volatile 变量的局限性

  • 不足以确保递增操作的原子性

何时使用 volatile 变量

  • 对变量的写入操作不依赖变量的当前值,或者确保只有单个线程更新变量的值。
  • 该变量不会与其他状态一起纳入不变性条件中
  • 在访问变量时不需要加锁

线程封闭

当某个对象封闭在一个线程中时,这种用法将自动实现线程安全,即使被封闭的对象本身线程不安全

Ad-hoc 线程封闭

维护线程封闭性的职责完全由程序徐实现来承担。非常脆弱

栈封闭

局部变量的固有属性之一就是封闭在执行线程中

ThreadLocal 类

ThreadLocal 使线程中的某个值与保存值的对象关联起来

不变性

不可变对象一定是线程安全的

不可变对象需要满足的条件:

  • 对象创建之后其状态就不能修改
  • 对象的所有域都是 final
  • 对象是正确创建的

总结

在并发程序中使用和共享对象时,可以采用一些实用的策略:

  • 线程封闭
  • 只读共享
  • 线程安全共享
  • 保护对象

对象的组合

将现有的线程安全组件组合成更大规模的组件或程序

设计线程安全的类

在设计线程安全的类的过程中,需要包含一下三个基本要素:

  • 找出构成对象状态的所有变量
  • 找出约束状态变量的不变性条件
  • 建立对象状态的并发访问管理策略

收集同步需求

确保类的线程安全性,就需要确保它的不变性不会在并发访问的情况下被破坏。对象与变量都有一个状态空间,即所有可能的取值。状态空间越小,就越容易判断线程的状态。final 类型的域使用的越多,就越能简化对象可能状态的分析过程。在极端状态下,不可变对象只有唯一的状态。

许多类都定义了一些不变性条件,用于判断状态是有效的还是无效的。

在操作中还会包含一些后验条件来判断状态迁移是否是有效的。当下一个状态需要依赖当前状态时,这个操作就必须是一个复合操作。

由于不变性条件以及后验条件在状态及状态转换上施加了各种约束,因此就需要额外的同步与封装。

在类中也可以包含同时约束多个状态变量的不变性条件。这些相关的变量必须在单个原子操作中进行读取或更新。

如果不了解对象的不变性条件与后验条件,那么就不能确保线程安全性。要满足在状态变量的有效值或状态转换上的各种约束条件,就需要借助于原子性与封装性。

依赖状态的操作

如果在某个操作中包含有基于状态的先验条件,那么此操作被称为依赖状态的操作。

先验条件(Precondition),例如不能从空队列中移除元素,在删除元素之前,队列必须处于非空状态

在单线程程序中,如果某个操作无法满足先验条件,那么就只能失败。但在并发程序中,先验条件可能会由于其他线程执行的操作而变成真。在并发程序中要一直等到先验条件为真,然后再执行该操作。

在Java中,等待某个条件为真的各种内置机制(包括等待和通知等机制)都与内置加锁机制紧密关联,要想正确地使用它们并不容易。要想实现某个等待先验条件为真时才执行的操作,一种更简单的方法是通过现有库中的类(例如阻塞队列[Blocking Queue]或信号量[Semaphore])来实现依赖状态的行为。

对象的所有权

在定义哪些变量将构成对象的状态时,只考虑对象拥有的数据。所有权(Ownership)在Java中并没有得到充分的体现,而是属于类设计中的一个要素。

如果分配并填充了一个HashMap对象,那么就相当于创建了多个对象:HashMap对象,在HashMap对象中包含的多个对象,以及在Map.Entry中可能包含的内部对象。

无论如何,垃圾回收机制使我们避免了如何处理所有权的问题。

许多情况下,所有权与封装性总是相互关联的:对象封装它拥有的状态,反之也成立,即对它封装的状态拥有所有权。状态变量的所有者将决定采用何种加锁协议来维持变量状态的完整性。所有权意味着控制权。然而,如果发布了某个可变对象的引用,那么就不再拥有独占的控制权,最多是“共享控制权”。

容器类通常表现出一种“所有权分离”的形式,其中容器类拥有其自身的状态,而客户代码则拥有容器中各个对象的状态。

实例封闭

确保可以该对象只能由单个线程访问(线程封闭),或者通过一个锁来保护对该对象的所有访问。确保该对象只能由单个线程访问(线程封闭),或者通过一个锁来保护对该对象的所有访问

封装简化了线程安全类的实现过程,它提供了一种实例封闭机制(InstanceConfinement),通常也简称为“封闭”[CPJ 2.3.3]。

将数据封装在对象内部,可以将数据的访问限制在对象的方法上,从而更容易确保线程在访问数据时总能持有正确的锁。

被封闭对象一定不能超出它们既定的作用域。对象可以封闭在类的一个实例(例如作为类的一个私有成员)中,或者封闭在某个作用域内(例如作为一个局部变量),再或者封闭在线程内(例如在某个线程中将对象从一个方法传递到另一个方法,而不是在多个线程之间共享该对象)。当然,对象本身不会逸出——出现逸出情况的原因通常是由于开发人员在发布对象时超出了对象既定的作用域。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@ThreadSafe
public class PersonSet{

@GuardedBy"this"
private final Set<Person>mySet=new HashSet<Person>();

public synchronized void addPerson(Person p){
mySet.add(p);
}

public synchronized boolean containsPerson(Person p){
return mySet.contains(p);
}
}

PersonSet说明了如何通过封闭与加锁等机制使一个类成为线程安全的(即使这个类的状态变量并不是线程安全的)。

PersonSet的状态由HashSet来管理的,而HashSet并非线程安全的。但由于mySet是私有的并且不会逸出,因此HashSet被封闭在PersonSet中。

唯一能访问mySet的代码路径是addPerson与containsPerson,在执行它们时都要获得PersonSet上的锁。PersonSet的状态完全由它的内置锁保护,因而PersonSet是一个线程安全的类。

实例封闭是构建线程安全类的一个最简单方式,它还使得在锁策略的选择上拥有了更多的灵活性。

当然,如果将一个本该被封闭的对象发布出去,那么也能破坏封闭性。如果一个对象本应该封闭在特定的作用域内,那么让该对象逸出作用域就是一个错误。当发布其他对象时,例如迭代器或内部的类实例,可能会间接地发布被封闭对象,同样会使被封闭对象逸出。

封闭机制更易于构造线程安全的类,因为当封闭类的状态时,在分析类的线程安全性时就无须检查整个程序。

Java 监视器模式

从线程封闭原则及其逻辑推论可以得出Java监视器模式.

虽然Java监视器模式来自于Hoare对监视器机制的研究工作(Hoare,1974),但这种模式与真正的监视器类之间存在一些重要的差异。进入和退出同步代码块的字节指令也称为monitorenter和monitorexit,而Java的内置锁也称为监视器锁或监视器。

在某些情况下,程序需要一种更复杂的同步策略。Java监视器模式的主要优势就在于它的简单性。

Java监视器模式仅仅是一种编写代码的约定,对于任何一种锁对象,只要自始至终都使用该锁对象,都可以用来保护对象的状态。

使用私有的锁对象而不是对象的内置锁(或任何其他可通过公有方式访问的锁),有许多优点。

  • 私有的锁对象可以将锁封装起来,使客户代码无法得到锁,但客户代码可以通过公有方法来访问锁,以便(正确或者不正确地)参与到它的同步策略中。
  • 要想验证某个公有访问的锁在程序中是否被正确地使用,则需要检查整个程序,而不是单个的类。

示例:车辆追踪

一个用于调度车辆的“车辆追踪器”,例如出租车、警车、货车等。首先使用监视器模式来构建车辆追踪器,然后再尝试放宽某些封装性需求同时又保持线程安全性。

每台车都由一个String对象来标识,并且拥有一个相应的位置坐标(x, y)。在VehicleTracker类中封装了车辆的标识和位置,因而它非常适合作为基于MVC(Model-View-Controller,模型-视图-控制器)模式的GUI应用程序中的数据模型,并且该模型将由一个视图线程和多个执行更新操作的线程共享。视图线程会读取车辆的名字和位置,并将它们显示在界面上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@ThreadSafe
public class MonitorVehicleTracker {

private final Map<String, MutablePoint> locations;

public MonitorVehicleTracker(Map<String, MutablePoint> locations) {
this.locations = deepCopy(locations);
}

public synchronized Map<String, MutablePoint> getLocations() {
return deepCopy(locations);
}

public synchronized MutablePoint getLocation(String id) {
MutablePoint loc = locations.get(id);
return loc == null ? null : new MutablePoint(loc);
}

public synchronized void setLocation(String id, int x, int y) {
MutablePoint loc = locations.get(id);
if (loc == null) {
throw new IllegalArgumentException("No Such ID:" + id);
}
loc.x = x;
loc.y = y;
}

private static Map<String, MutablePoint> deepCopy(Map<String, MutablePoint> m) {
Map<String, MutablePoint> result = new HashMap<>();
for (String id : m.keySet()) {
result.put(id, m.get(id));
}
return Collections.synchronizedMap(result);
}
}

@Terrible
@NotThreadSafe
public class MutablePoint {

int x;

int y;

public MutablePoint() {
x = 0;
y = 0;
}

public MutablePoint(MutablePoint p) {
x = p.x;
y = p.y;
}
}

线程安全性的委托

大多数对象都是组合对象。

当从头开始构建一个类,或者将多个非线程安全的类组合为一个类时,Java监视器模式是非常有用的。但是,如果类中的各个组件都已经是线程安全的,会是什么情况呢?我们是否需要再增加一个额外的线程安全层?

答案是“视情况而定”。在某些情况下,通过多个线程安全类组合而成的类是线程安全的,而在某些情况下,这仅仅是一个好的开端。

示例:基于委托的车辆追踪器

面将介绍一个更实际的委托示例,构造一个委托给线程安全类的车辆追踪器。我们将车辆的位置保存到一个Map对象中,因此首先要实现一个线程安全的Map类,ConcurrentHashMap。我们还可以用一个不可变的 Point 类来代替 MutablePoint 以保存位置

1
2
3
4
5
6
7
8
9
10
11
@Immutable
public class Point {

final int x;
final int y;

public Point(int x, int y) {
this.x = x;
this.y = y;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@ThreadSafe
public class DelegatingVehicleTracker {

private final ConcurrentMap<String, Point> locations;
private final Map<String, Point> unmodifiableMap;

public DelegatingVehicleTracker(Map<String, Point> points) {
locations = new ConcurrentHashMap<>(points);
unmodifiableMap = Collections.unmodifiableMap(locations);
}

public Map<String, Point> getLocations() {
return unmodifiableMap;
}

public Point getLocation(String id) {
return locations.get(id);
}

public void setLocation(String id, int x, int y) {
if (locations.replace(id, new Point(x, y)) == null) {
throw new IllegalArgumentException("invalid vehicle name: " + id);
}
}
}

需要注意的是,我们稍微改变了车辆追踪器类的行为。在使用监视器模式的车辆追踪器中返回的是车辆位置的快照,而在使用委托的车辆追踪器中返回的是一个不可修改但却实时的车辆位置视图。这意味着,如果线程A调用getLocations,而线程B在随后修改了某些点的位置,那么在返回给线程A的Map中将反映出这些变化。在前面提到过,这可能是一种优点(更新的数据),也可能是一种缺点(可能导致不一致的车辆位置视图),具体情况取决于你的需求。

如果需要一个不发生变化的车辆视图,那么getLocations可以返回对locations这个Map对象的一个浅拷贝(Shallow Copy)。由于Map的内容是不可变的,因此只需复制Map的结构,而不用复制它的内容,如程序清单4-8所示(其中只返回一个HashMap,因为getLocations并不能保证返回一个线程安全的Map)。

1
2
3
public Map<String, Point>getLocations(){
return Collections.unmodifiableMap(new HashMap<String, Point>(locations));
}

独立的状态变量

到目前为止,这些委托示例都仅仅委托给了单个线程安全的状态变量。

我们还可以将线程安全性委托给多个状态变量,只要这些变量是彼此独立的,即组合而成的类并不会在其包含的多个状态变量上增加任何不变性条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class VisualComponent {

private final List<KeyListener> keyListeners = new CopyOnWriteArrayList<>();

private final List<MouseListener> mouseListeners = new CopyOnWriteArrayList<>();

public void addKeyListener(KeyListener listener) {
keyListeners.add(listener);
}

public void addMouseListener(MouseListener listener) {
mouseListeners.add(listener);
}

public void removeKeyListener(KeyListener listener) {
keyListeners.remove(listener);
}

public void removeMouseListener(MouseListener listener) {
mouseListeners.remove(listener);
}
}

VisualComponent使用CopyOnWriteArrayList来保存各个监听器列表。它是一个线程安全的链表,特别适用于管理监听器列表(参见5.2.3节)。每个链表都是线程安全的,此外,由于各个状态之间不存在耦合关系,因此VisualComponent可以将它的线程安全性委托给mouseListenerskeyListeners等对象。

当委托失效时

大多数组合对象都不会像 VisualComponent 这样简单:在它们的状态变量之间存在着某些不变性条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class NumberRange {

// 不变性条件:lower<=upper
private final AtomicInteger lower = new AtomicInteger(0);
private final AtomicInteger upper = new AtomicInteger(0);

public void setLower(int i) {
// 注意--不安全的 “先检查后执行”
if (i > upper.get()) {
throw new IllegalArgumentException("can't set lower to" + i + " > upper");
}
lower.set(i);
}

public void setUpper(int i) {
if (i < lower.get()) {
throw new IllegalArgumentException("can't set upper to " + i + " < lower");
}
}

public boolean isInRange(int i) {
return i >= lower.get() && i <= upper.get();
}
}

NumberRange不是线程安全的,没有维持对下界和上界进行约束的不变性条件。setLower和setUpper等方法都尝试维持不变性条件,但却无法做到。

setLower和setUpper都是“先检查后执行”的操作,但它们没有使用足够的加锁机制来保证这些操作的原子性。假设取值范围为(0,10),如果一个线程调用setLower(5),而另一个线程调用setUpper(4),那么在一些错误的执行时序中,这两个调用都将通过检查,并且都能设置成功。结果得到的取值范围就是(5,4),那么这是一个无效的状态。

因此,虽然AtomicInteger是线程安全的,但经过组合得到的类却不是。由于状态变量lower和upper不是彼此独立的,因此NumberRange不能将线程安全性委托给它的线程安全状态变量。

NumberRange可以通过加锁机制来维护不变性条件以确保其线程安全性,例如使用一个锁来保护lower和upper。此外,它还必须避免发布lower和upper,从而防止客户代码破坏其不变性条件。

  • 如果某个类含有复合操作,那么仅靠委托并不足以实现线程安全性。在这种情况下,这个类必须提供自己的加锁机制以保证这些复合操作都是原子操作,除非整个复合操作都可以委托给状态变量。

  • 如果一个类是由多个独立且线程安全的状态变量组成,并且在所有的操作中都不包含无效状态转换,那么可以将线程安全性委托给底层的状态变量。

  • 即使类的各个状态组成部分都是线程安全的,也不能确保类的线程安全性。

发布底层的状态变量

当把线程安全性委托给某个对象的底层状态变量时,在什么条件下才可以发布这些变量从而使其他类能修改它们?

答案仍然取决于在类中对这些变量施加了哪些不变性条件。

如果一个状态变量是线程安全的,并且没有任何不变性条件来约束它的值,在变量的操作上也不存在任何不允许的状态转换,那么就可以安全地发布这个变量。

示例:发布状态的车辆追踪器

我们来构造车辆追踪器的另一个版本,并在这个版本中发布底层的可变状态。我们需要修改接口以适应这种变化,即使用可变且线程安全的Point类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@ThreadSafe
public class SafePoint {

@GuardedBy("this")
private int x;

@GuardedBy("this")
private int y;

private SafePoint(int[] a) {
this(a[0], a[1]);
}

public SafePoint(int x, int y) {
this.x = x;
this.y = y;
}

public SafePoint(SafePoint p) {
this(p.get());
}

public synchronized int[] get() {
return new int[] {x, y};
}

public synchronized void set(int x, int y) {
this.x = x;
this.y = y;
}
}

SafePoint提供的get方法同时获得x和y的值,并将二者放在一个数组中返回[^4.3.5.1]

[^4.3.5.1]: 如果将拷贝构造函数实现为this(p.x, p.y),那么会产生竞态条件,而私有构造函数则可以避免这种竞态条件。这是私有构造函数捕获模式(PrivateConstructor Capture Idiom, Bloch and Gafter,2005)的一个实例。

。如果为x和y分别提供get方法,那么在获得这两个不同坐标的操作之间,x和y的值发生变化,从而导致调用者看到不一致的值:车辆从来没有到达过位置(x, y)。通过使用SafePoint,可以构造一个发布其底层可变状态的车辆追踪器,还能确保其线程安全性不被破坏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@ThreadSafe
public class PublishingVehicleTracker {

private final Map<String, SafePoint> locations;
private final Map<String, SafePoint> unmodifiableMap;

public PublishingVehicleTracker(Map<String, SafePoint> locations) {
this.locations = new ConcurrentHashMap<>(locations);
unmodifiableMap = Collections.unmodifiableMap(this.locations);
}

public Map<String, SafePoint> getLocations() {
return unmodifiableMap;
}

public SafePoint getLocation(String id) {
return locations.get(id);
}

public void setLocation(String id, int x, int y) {
if (!locations.containsKey(id)) {
throw new IllegalArgumentException("invalid vehicle name: " + id);
}
locations.get(id).set(x, y);
}
}

PublishingVehicleTracker将其线程安全性委托给底层的ConcurrentHashMap,只是Map中的元素是线程安全的且可变的Point,而并非不可变的。

getLocation方法返回底层Map对象的一个不可变副本。调用者不能增加或删除车辆,但却可以通过修改返回Map中的SafePoint值来改变车辆的位置。

再次指出,Map的这种“实时”特性究竟是带来好处还是坏处,仍然取决于实际的需求。

PublishingVehicleTracker是线程安全的,但如果它在车辆位置的有效值上施加了任何约束,那么就不再是线程安全的。

如果需要对车辆位置的变化进行判断或者当位置变化时执行一些操作,那么PublishingVehicleTracker中采用的方法并不合适。

在现有的线程安全类中添加功能

Java类库包含许多有用的“基础模块”类。通常,我们应该优先选择重用这些现有的类而不是创建新的类:重用能降低开发工作量、开发风险(因为现有的类都已经通过测试)以及维护成本。有时候,某个现有的线程安全类能支持我们需要的所有操作,但更多时候,现有的类只能支持大部分的操作,此时就需要在不破坏线程安全性的情况下添加一个新的操作。

修改原始的类

这通常无法做到,因为你可能无法访问或修改类的源代码。

要想修改原始的类,就需要理解代码中的同步策略,这样增加的功能才能与原有的设计保持一致。如果直接将新方法添加到类中,那么意味着实现同步策略的所有代码仍然处于一个源代码文件中,从而更容易理解与维护。

拓展这个类

BetterVector对Vector进行了扩展,并添加了一个新方法putIfAbsent。扩展Vector很简单,但并非所有的类都像Vector那样将状态向子类公开,因此也就不适合采用这种方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
@ThreadSafe
public class BetterVector<E> extends Vector<E> {

@Serial private static final long serialVersionUID = -688925674924800348L;

public synchronized boolean putIfAbsent(E e) {
boolean absent = !contains(e);
if (absent) {
add(e);
}
return absent;
}
}

“扩展”方法比直接将代码添加到类中更加脆弱,因为现在的同步策略实现被分布到多个单独维护的源代码文件中。如果底层的类改变了同步策略并选择了不同的锁来保护它的状态变量,那么子类会被破坏,因为在同步策略改变后它无法再使用正确的锁来控制对基类状态的并发访问。(在Vector的规范中定义了它的同步策略,因此BetterVector不存在这个问题。)

客户端加锁机制

第三种策略是扩展类的功能,但并不是扩展类本身,而是将扩展代码放入一个“辅助类”中。

客户端加锁是指,对于使用某个对象X的客户端代码,使用X本身用于保护其状态的锁来保护这段客户代码。要使用客户端加锁,你必须知道对象X使用的是哪一个锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@ThreadSafe
public class ListHelper<E> {

private final List<E> list = Collections.synchronizedList(new ArrayList<>());

public boolean putIfAbsent(E e) {
synchronized (list) {
boolean absent = !list.contains(e);
if (absent) {
list.add(e);
}
return absent;
}
}
}

客户端加锁却更加脆弱,因为它将类C的加锁代码放到与C完全无关的其他类中。当在那些并不承诺遵循加锁策略的类上使用客户端加锁时,要特别小心。

客户端加锁机制与扩展类机制有许多共同点,二者都是将派生类的行为与基类的实现耦合在一起。正如扩展会破坏实现的封装性,客户端加锁同样会破坏同步策略的封装性。

组合

当为现有的类添加一个原子操作时,有一种更好的方法:组合(Composition)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@ThreadSafe
public class ImprovedList<E> implements List<E> {

private final List<E> list;

public ImprovedList(List<E> list) {
this.list = list;
}

public synchronized boolean putIfAbsent(E e) {
boolean absent = !list.contains(e);
if (absent) {
list.add(e);
}
return absent;
}

// ....
}

ImprovedList通过自身的内置锁增加了一层额外的加锁。它并不关心底层的List是否是线程安全的,即使List不是线程安全的或者修改了它的加锁实现,ImprovedList也会提供一致的加锁机制来实现线程安全性。

虽然额外的同步层可能导致轻微的性能损失,但与模拟另一个对象的加锁策略相比,ImprovedList更为健壮。

事实上,我们使用了Java监视器模式来封装现有的List,并且只要在类中拥有指向底层List的唯一外部引用,就能确保线程安全性。

将同步策略文档化

在维护线程安全性时,文档是最强大的(同时也是最未被充分利用的)工具之一。用户可以通过查阅文档来判断某个类是否是线程安全的,而维护人员也可以通过查阅文档来理解其中的实现策略,避免在维护过程中破坏安全性。

在文档中说明客户代码需要了解的线程安全性保证,以及代码维护人员需要了解的同步策略。

synchronized、volatile或者任何一个线程安全类都对应于某种同步策略,用于在并发访问时确保数据的完整性。这种策略是程序设计的要素之一,因此应该将其文档化。当然,设计阶段是编写设计决策文档的最佳时间。

在设计同步策略时需要考虑多个方面,例如,将哪些变量声明为volatile类型,哪些变量用锁来保护,哪些锁保护哪些变量,哪些变量必须是不可变的或者被封闭在线程中的,哪些操作必须是原子操作等。其中某些方面是严格的实现细节,应该将它们文档化以便于日后的维护。还有一些方面会影响类中加锁行为的外在表现,也应该将其作为规范的一部分写入文档。最起码,应该保证将类中的线程安全性文档化。它是否是线程安全的?在执行回调时是否持有一个锁?是否有某些特定的锁会影响其行为?不要让客户冒着风险去猜测。如果你不想支持客户端加锁也是可以的,但一定要明确地指出来。

如果某个类没有明确地声明是线程安全的,那么就不要假设它是线程安全的。

许多Java技术规范都没有(或者至少不愿意)说明接口的线程安全性,例如ServletContext、HttpSession或DataSource。

基础构建模块

Java 平台类库提供了丰富的并发基础构建模块

同步容器类

  • VectorHashTable

  • 在JDK l.2中添加的一些功能相似的类,这些同步的封装器类是由Collections.synchronizedXxx等工厂方法创建的

这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公共方法进行同步,使得每次只有一个线程能访问容器的状态。

同步容器类的问题

同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。

容器上常见的复合操作包括:迭代(反复访问元素,直到遍历完容器中所有元素)、跳转(根据指定顺序找到当前元素的下一个元素)以及条件运算,例如“若没有则添加”(检查在Map中是否存在键值K,如果没有,就加入二元组(K, V))。

迭代器与ConcurrentModificationException

对容器类进行迭代的标准方式都是使用Iterator。

在设计同步容器类的迭代器时并没有考虑到并发修改的问题,并且它们表现出的行为是“及时失败”(fail-fast)的。这意味着,当它们发现容器在迭代过程中被修改时,就会抛出一个ConcurrentModificationException异常。

这种“及时失败”的迭代器并不是一种完备的处理机制,而只是“善意地”捕获并发错误,因此只能作为并发问题的预警指示器。它们采用的实现方式是,将计数器的变化与容器关联起来:如果在迭代期间计数器被修改,那么hasNextnext 将抛出ConcurrentModificationException。然而,这种检查是在没有同步的情况下进行的,因此可能会看到失效的计数值,而迭代器可能并没有意识到已经发生了修改。这是一种设计上的权衡,从而降低并发修改操作的检测代码[插图]对程序性能带来的影响。

要避免这个问题可以在迭代过程中持有容器的锁。然而,有时候开发人员并不希望在迭代期间对容器加锁。如果容器的规模很大,或者在每个元素上执行操作的时间很长,那么这些线程将长时间等待。此种方式可能会产生饥饿或者死锁,降低程序的伸缩性,极大的降低吞吐量以及CPU的利用率。

替代方法就是“克隆”容器,并在副本上进行迭代。这种方式的好坏取决于多个因素,包括容器的大小,在每个元素上执行的工作,迭代操作相对于容器其他操作的调用频率,以及在响应时间和吞吐量等方面的需求。

隐藏迭代器

虽然加锁可以防止迭代器抛出ConcurrentModificationException,但你必须要记住在所有对共享容器进行迭代的地方都需要加锁。实际情况要更加复杂,因为在某些情况下,迭代器会隐藏起来。例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Terrible
public class HiddenIterator {

private final Set<Integer> set = new HashSet<>();

public synchronized void add(Integer i) {
set.add(i);
}

public synchronized void remove(Integer i) {
set.remove(i);
}

public void addTenThings() {
Random random = new Random();
for (int i = 0; i < 10; i++) {
add(random.nextInt());
// set.toString 会隐式调用迭代器
System.out.println("DEBUG:added ten elements to" + set);
}
}
}

编译器将字符串的连接操作转换为调用StringBuilder.append(Object),而这个方法又会调用容器的toString方法,标准容器的toString方法将迭代容器,并在每个元素上调用toString来生成容器内容的格式化表示。

addTenThings方法可能会抛出ConcurrentModificationException,因为在生成调试消息的过程中,toString对容器进行迭代。

在使用println中的set之前必须首先获取HiddenIterator的锁,但在调试代码和日志代码中通常会忽视这个要求。

如果状态与保护它的同步代码之间相隔太远,那么开发人员就越容易忘记在访问状态时使用正确的同步。正如封装对象的状态有助于维持不变性条件一样,封装对象的同步机制同样有助于确保实施同步策略。

容器的hashCodeequals等方法也会间接地执行迭代操作,当容器作为另一个容器的元素或键值时,就会出现这种情况。同样,containsAllremoveAllretainAll等方法,以及把容器作为参数的构造函数,都会对容器进行迭代。所有这些间接的迭代操作都可能抛出ConcurrentModificationException

并发容器

同步容器将所有对容器状态的访问都串行化,以实现线程安全。这样做的代价是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重降低。

并发容器是针对多个线程并发访问设计的。通过并发容器来替代同步容器,可以极大的提高伸缩性并降低风险。

Java 5.0增加了两种新的容器类型:QueueBlockingQueueQueue用来临时保存一组等待处理的元素。它提供了几种实现,包括:ConcurrentLinkedQueue,这是一个传统的先进先出队列,以及PriorityQueue,这是一个(非并发的)优先队列。Queue上的操作不会阻塞,如果队列为空,那么获取元素的操作将返回空值。虽然可以用List来模拟Queue的行为——事实上,正是通过LinkedList来实现Queue的,但还需要一个Queue的类,因为它能去掉List的随机访问需求,从而实现更高效的并发。BlockingQueue扩展了Queue,增加了可阻塞的插入和获取等操作。如果队列为空,那么获取元素的操作将一直阻塞,直到队列中出现一个可用的元素。如果队列已满(对于有界队列来说),那么插入元素的操作将一直阻塞,直到队列中出现可用的空间。在“生产者-消费者”这种设计模式中,阻塞队列是非常有用。

正如ConcurrentHashMap用于代替基于散列的同步Map, Java 6也引入了ConcurrentSkipListMapConcurrentSkipListSet,分别作为同步的SortedMapSortedSet的并发替代品(例如用synchronizedMap包装的TreeMapTreeSet)。

ConcurrentHashMap

HashMap一样,ConcurrentHashMap也是一个基于散列的Map,但它使用了一种完全不同的加锁策略来提供更高的并发性和伸缩性。

ConcurrentHashMap并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁(Lock Striping)。ConcurrentHashMap带来的结果是,在并发访问环境下将实现更高的吞吐量,而在单线程环境中只损失非常小的性能。

尽管有这些改进,但仍然有一些需要权衡的因素。对于一些需要在整个Map上进行计算的方法,例如sizeisEmpty,这些方法的语义被略微减弱了以反映容器的并发特性。

HashtablesynchronizedMap相比,ConcurrentHashMap有着更多的优势以及更少的劣势,因此在大多数情况下,用ConcurrentHashMap来代替同步Map能进一步提高代码的可伸缩性。只有当应用程序需要加锁Map以进行独占访问时,才应该放弃使用ConcurrentHashMap

额外的 Map 操作

由于ConcurrentHashMap不能被加锁来执行独占访问,因此我们无法使用客户端加锁来创建新的原子操作。

但是,一些常见的复合操作,例如“若没有则添加”、“若相等则移除(Remove-If-Equal)”和“若相等则替换(Replace-If-Equal)”等,都已经实现为原子操作并且在ConcurrentMap的接口中声明,如果你需要在现有的同步Map中添加这样的功能,那么很可能就意味着应该考虑使用ConcurrentMap了。

1
2
3
4
5
6
7
8
9
10
public interface ConcurrentMapK, Vextends MapK, V{
//仅当K没有相应的映射值时才插入
V putIfAbsent(K key, V value);
//仅当K被映射到V时才移除
boolean remove(K key, V value);
//仅当K被映射到oldValue时才替换为newValue
boolean replace(K key, V oldValue, V newValue);
//仅当K被映射到某个值时才替换为newValue
V replace(K key, V newValue);
}

CopyOnWriteArrayList

CopyOnWriteArrayList用于替代同步List,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。(类似地,CopyOnWriteArraySet的作用是替代同步Set。)

“写入时复制(Copy-On-Write)”容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。“写入时复制”容器的迭代器保留一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。

每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大时。仅当迭代操作远远多于修改操作时,才应该使用“写入时复制”容器。

阻塞队列与生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用。队列可以是有界的也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法也永远不会阻塞。

阻塞队列支持生产者-消费者这种设计模式。该模式将“找出需要完成的工作”与“执行工作”这两个过程分离开来,并把工作项放入一个“待完成”列表中以便在随后处理,而不是找出后立即处理。生产者-消费者模式能简化开发过程,因为它消除了生产者类和消费者类之间的代码依赖性,此外,该模式还将生产数据的过程与使用数据的过程解耦开来以简化工作负载的管理,因为这两个过程在处理数据的速率上有所不同。

在基于阻塞队列构建的生产者-消费者设计中,BlockingQueue简化了生产者-消费者设计的实现过程,它支持任意数量的生产者和消费者。一种最常见的生产者-消费者设计模式就是线程池与工作队列的组合,在Executor任务执行框架中就体现了这种模式。

“生产者”和“消费者”的角色是相对的,某种环境中的消费者在另一种不同的环境中可能会成为生产者。

虽然生产者-消费者模式能够将生产者和消费者的代码彼此解耦开来,但它们的行为仍然会通过共享工作队列间接地耦合在一起。

Java类库中 BlockQueue 的实现:

  • LinkedBlockingQueueArrayBlockingQueue是FIFO队列,二者分别与LinkedListArrayList类似,但比同步List拥有更好的并发性能。
  • PriorityBlockingQueue是一个按优先级排序的队列,当你希望按照某种顺序而不是FIFO来处理元素时,这个队列将非常有用。正如其他有序的容器一样,PriorityBlockingQueue既可以根据元素的自然顺序来比较元素(如果它们实现了Comparable方法),也可以使用Comparator来比较。
  • SynchronousQueue。实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。这种实现队列的方式看似很奇怪,但由于可以直接交付工作,从而降低了将数据从生产者移动到消费者的延迟。直接交付方式还会将更多关于任务状态的信息反馈给生产者。当交付被接受时,它就知道消费者已经得到了任务,而不是简单地把任务放入一个队列。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用同步队列。

示例:桌面搜索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class ProducerConsumer {

static class FileCrawler implements Runnable {

private final BlockingQueue<File> fileQueue;
private final FileFilter fileFilter;
private final File root;

public FileCrawler(BlockingQueue<File> fileQueue, FileFilter fileFilter, File root) {
this.fileQueue = fileQueue;
this.fileFilter = file -> file.isDirectory() || fileFilter.accept(file);
this.root = root;
}

@Override
public void run() {}

private boolean alreadyIndexed(File f) {
return false;
}

private void crawl(File root) throws InterruptedException {
File[] files = root.listFiles(fileFilter);
if (files != null) {
for (File file : files) {
if (file.isDirectory()) {
crawl(file);
} else if (!alreadyIndexed(file)) {
fileQueue.put(file);
}
}
}
}
}

static class Indexer implements Runnable {

private final BlockingQueue<File> queue;

Indexer(BlockingQueue<File> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
while (true) {
indexFile(queue.take());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

public void indexFile(File file) {
// Index the file...
}
}

private static final int BOUND = 10;
private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();

public static void startIndexing(File[] roots) {
BlockingQueue<File> queue = new LinkedBlockingQueue<>(BOUND);
FileFilter filter = file -> true;

for (File root : roots) {
new Thread(new FileCrawler(queue, filter, root)).start();
}

for (int i = 0; i < N_CONSUMERS; i++) {
new Thread(new Indexer(queue)).start();
}
}
}

FileCrawler中给出了一个生产者任务,即在某个文件层次结构中搜索符合索引标准的文件,并将它们的名称放入工作队列。而且,在Indexer中还给出了一个消费者任务,即从队列中取出文件名称并对它们建立索引。

生产者-消费者模式提供了一种适合线程的方法将桌面搜索问题分解为更简单的组件。将文件遍历与建立索引等功能分解为独立的操作,比将所有功能都放到一个操作中实现有着更高的代码可读性和可重用性:每个操作只需完成一个任务,并且阻塞队列将负责所有的控制流,因此每个功能的代码都更加简单和清晰。

生产者-消费者模式提供了一种适合线程的方法将桌面搜索问题分解为更简单的组件。将文件遍历与建立索引等功能分解为独立的操作,比将所有功能都放到一个操作中实现有着更高的代码可读性和可重用性:每个操作只需完成一个任务,并且阻塞队列将负责所有的控制流,因此每个功能的代码都更加简单和清晰。

串行线程封闭

对于可变对象,生产者-消费者这种设计与阻塞队列一起,促进了串行线程封闭,从而将对象所有权从生产者交付给消费者。线程封闭对象只能由单个线程拥有,但可以通过安全地发布该对象来“转移”所有权。

对象池利用了串行线程封闭,将对象“借给”一个请求线程。只要对象池包含足够的内部同步来安全地发布池中的对象,并且只要客户代码本身不会发布池中的对象,或者在将对象返回给对象池后就不再使用它,那么就可以安全地在线程之间传递所有权。

我们也可以使用其他发布机制来传递可变对象的所有权,但必须确保只有一个线程能接受被转移的对象。阻塞队列简化了这项工作。除此之外,还可以通过ConcurrentMap的原子方法remove或者AtomicReference的原子方法compareAndSet来完成这项工作。

双端队列与工作密取

Java 6增加了两种容器类型,Deque(发音为“deck”)和BlockingDeque,它们分别对QueueBlockingQueue进行了扩展。Deque是一个双端队列,实现了在队列头和队列尾的高效插入和移除。具体实现包括ArrayDequeLinkedBlockingDeque

正如阻塞队列适用于生产者-消费者模式,双端队列同样适用于另一种相关模式,即工作密取(Work Stealing) (也称工作窃取)。在生产者-消费者设计中,所有消费者有一个共享的工作队列,而在工作密取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列末尾秘密地获取工作。

阻塞方法与中断方法

线程可能会阻塞或暂停执行,原因有多种:等待I/O操作结束,等待获得一个锁,等待从Thread.sleep方法中醒来,或是等待另一个线程的计算结果。当线程阻塞时,它通常被挂起,并处于某种阻塞状态(BLOCKED、WAITING或TIMED_WAITING)。阻塞操作与执行时间很长的普通操作的差别在于,被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行,例如等待I/O操作完成,等待某个锁变成可用,或者等待外部计算的结束。当某个外部事件发生时,线程被置回RUNNABLE状态,并可以再次被调度执行。

Thread提供了interrupt方法,用于中断线程或者查询线程是否已经被中断。每个线程都有一个布尔类型的属性,表示线程的中断状态,当中断线程时将设置这个状态。

中断是一种协作机制。一个线程不能强制其他线程停止正在执行的操作而去执行其他的操作。当线程A中断B时,A仅仅是要求B在执行到某个可以暂停的地方停止正在执行的操作——前提是如果线程B愿意停止下来。虽然在API或者语言规范中并没有为中断定义任何特定应用级别的语义,但最常使用中断的情况就是取消某个操作。方法对中断请求的响应度越高,就越容易及时取消那些执行时间很长的操作。

当在代码中调用了一个将抛出InterruptedException异常的方法时,你自己的方法也就变成了一个阻塞方法,并且必须要处理对中断的响应。对于库代码来说,有两种基本选择:

  1. 传递InterruptedException。避开这个异常通常是最明智的策略——只需把InterruptedException传递给方法的调用者。传递InterruptedException的方法包括,根本不捕获该异常,或者捕获该异常,然后在执行某种简单的清理工作后再次抛出这个异常。
  2. 恢复中断。有时候不能抛出InterruptedException,例如当代码是Runnable的一部分时。在这些情况下,必须捕获InterruptedException,并通过调用当前线程上的interrupt方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断.

在出现InterruptedException时不应该做的事情是,捕获它但不做出任何响应。

同步工具类

同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。在平台类库中还包含其他一些同步工具类的类,如果这些类还无法满足需要,那么可以创建自己的同步工具类。

所有的同步工具类都包含一些特定的结构化属性:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另一些方法用于高效地等待同步工具类进入到预期状态。

闭锁

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态[CPJ3.4.2]。

闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。

闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,闭锁的使用场景举例如下:

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示“资源R已经被初始化”,而所有需要R的操作都必须先在这个闭锁上等待。
  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S时,将首先在S依赖的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖S的服务才能继续执行。
  • 等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有玩家都准备就绪时,闭锁将到达结束状态。

CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class TestHarness {

public long timeTasks(int nThread, Runnable task) throws InterruptedException {
CountDownLatch startGate = new CountDownLatch(1);
CountDownLatch endGate = new CountDownLatch(nThread);
for (int i = 0; i < nThread; i++) {
new Thread(
() -> {
try {
startGate.await();

try {
task.run();
} finally {
endGate.countDown();
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})
.start();
}

long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();

return end - start;
}
}

FutureTask

FutureTask也可以用做闭锁。(FutureTask实现了Future语义,表示一种抽象的可生成结果的计算[CPJ 4.3.3])。

FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行(Waiting to run),正在运行(Running)和运行完成(Completed)。

Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递过程能实现结果的安全发布。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class PreLoader {

private final FutureTask<ProductInfo> future = new FutureTask<>(PreLoader.this::loadProductInfo);

private ProductInfo loadProductInfo() {
return null;
}

private final Thread thread = new Thread(future);

public void start() {
thread.start();
}

public ProductInfo get() throws DataLoadException, InterruptedException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException) {
throw (DataLoadException) cause;
} else {
throw LaunderThrowable.launderThrowable(cause);
}
}
}

static class DataLoadException extends Exception {

@Serial private static final long serialVersionUID = 3992718468942845928L;
}

interface ProductInfo {}
}

public class LaunderThrowable {

/**
* Coerce an unchecked Throwable to a RuntimeException
*
* <p>If the Throwable is an Error, throw it; if it is a RuntimeException return it, otherwise
* throw IllegalStateException
*/
public static RuntimeException launderThrowable(Throwable t) {
if (t instanceof RuntimeException) {
return (RuntimeException) t;
} else if (t instanceof Error) {
throw (Error) t;
} else {
throw new IllegalStateException("Not unchecked", t);
}
}
}

信号量

计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量[CPJ 3.4.1]。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

Semaphore中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。(在这种实现中不包含真正的许可对象,并且Semaphore也不会将许可与线程关联起来,因此在一个线程中获得的许可可以在另一个线程中释放。可以将acquire操作视为是消费一个许可,而release操作是创建一个许可,Semaphore并不受限于它在创建时的初始许可数量。)

计算信号量的一种简化形式是二值信号量,即初始值为1的Semaphore。二值信号量可以用做互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。

Semaphore可以用于实现资源池,例如数据库连接池。我们可以构造一个固定长度的资源池,当池为空时,请求资源将会失败,但你真正希望看到的行为是阻塞而不是失败,并且当池非空时解除阻塞。如果将Semaphore的计数值初始化为池的大小,并在从池中获取一个资源之前首先调用acquire方法获取一个许可,在将资源返回给池之后调用release释放许可,那么acquire将一直阻塞直到资源池不为空。

可以使用Semaphore将任何一种容器变成有界阻塞容器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 使用 Semaphore 为容器设置边界
public class BoundedHashSet<T> {

private final Set<T> set;
private final Semaphore sem;

public BoundedHashSet(int bound) {
set = Collections.synchronizedSet(new HashSet<>());
sem = new Semaphore(bound);
}

public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded) {
sem.release();
}
}
}

public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved) {
sem.release();
}
return wasRemoved;
}
}

信号量的计数值会初始化为容器容量的最大值。add操作在向底层容器中添加一个元素之前,首先要获取一个许可。如果add操作没有添加任何元素,那么会立刻释放许可。同样,remove操作释放一个许可,使更多的元素能够添加到容器中。底层的Set实现并不知道关于边界的任何信息,这是由BoundedHashSet来处理的。

栅栏

栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常将一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功地通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class CellularAutomata {

private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;

public CellularAutomata(Board mainBoard) {
this.mainBoard = mainBoard;
int count = Runtime.getRuntime().availableProcessors();
barrier = new CyclicBarrier(count, mainBoard::commitNewValues);
workers = new Worker[count];
for (int i = 0; i < count; i++) {
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}
}

public void start() {
for (int i = 0; i < workers.length; i++) {
new Thread(workers[i]).start();
}
mainBoard.waitForConvergence();
}

private class Worker implements Runnable {

private final Board board;

private Worker(Board board) {
this.board = board;
}

@Override
public void run() {
while (!board.hasConverged()) {
for (int x = 0; x < board.getMaxX(); x++) {
for (int y = 0; y < board.getMaxY(); y++) {
board.setNewValue(x, y, computeValue(x, y));
}
}
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException ex) {
return;
}
}
}

private int computeValue(int x, int y) {
// Compute the new value that goes in (x,y)
return 0;
}
}

interface Board {
int getMaxX();

int getMaxY();

int getValue(int x, int y);

int setNewValue(int x, int y, int value);

void commitNewValues();

boolean hasConverged();

void waitForConvergence();

Board getSubBoard(int numPartitions, int index);
}
}

CellularAutomata中给出了如何通过栅栏来计算细胞的自动化模拟,例如Conway的生命游戏(Gardner,1970)。在把模拟过程并行化时,为每个元素(在这个示例中相当于一个细胞)分配一个独立的线程是不现实的,因为这将产生过多的线程,而在协调这些线程上导致的开销将降低计算性能。合理的做法是,将问题分解成一定数量的子问题,为每个子问题分配一个线程来进行求解,之后再将所有的结果合并起来。CellularAutomata将问题分解为Ncpu个子问题,其中Ncpu等于可用CPU的数量,并将每个子问题分配给一个线程。[插图]在每个步骤中,工作线程都为各自子问题中的所有细胞计算新值。当所有工作线程都到达栅栏时,栅栏会把这些新值提交给数据模型。在栅栏的操作执行完以后,工作线程将开始下一步的计算,包括调用isDone方法来判断是否需要进行下一次迭代。

另一种形式的栅栏是Exchanger,它是一种两方(Two-Party)栅栏,各方在栅栏位置上交换数据[CPJ 3.4.3]。当两方执行不对称的操作时,Exchanger会非常有用,例如当一个线程向缓冲区写入数据,而另一个线程从缓冲区中读取数据。这些线程可以使用Exchanger来汇合,并将满的缓冲区与空的缓冲区交换。当两个线程通过Exchanger交换对象时,这种交换就把这两个对象安全地发布给另一方。

数据交换的时机取决于应用程序的响应需求。最简单的方案是,当缓冲区被填满时,由填充任务进行交换,当缓冲区为空时,由清空任务进行交换。这样会把需要交换的次数降至最低,但如果新数据的到达率不可预测,那么一些数据的处理过程就将延迟。另一个方法是,不仅当缓冲被填满时进行交换,并且当缓冲被填充到一定程度并保持一定时间后,也进行交换。

第一部分总结

  • 可变状态是至关重要的(It’s the mutable state, stupid)。

    • 所有的并发问题都可以归结为如何协调对并发状态的访问。可变状态越少,就越容易确保线程安全性。不可变对象一定是线程安全的。
  • 尽量将域声明为final类型,除非需要它们是可变的。

  • 不可变对象一定是线程安全的。

    • 不可变对象能极大地降低并发编程的复杂性。它们更为简单而且安全,可以任意共享而无须使用加锁或保护性复制等机制。
  • 封装有助于管理复杂性。

    • 在编写线程安全的程序时,虽然可以将所有数据都保存在全局变量中,但为什么要这样做?将数据封装在对象中,更易于维持不变性条件:将同步机制封装在对象中,更易于遵循同步策略。
  • 用锁来保护每个可变变量。

  • 当保护同一个不变性条件中的所有变量时,要使用同一个锁。

  • 在执行复合操作期间,要持有锁。

  • 如果从多个线程中访问同一个可变变量时没有同步机制,那么程序会出现问题。

  • 不要故作聪明地推断出不需要使用同步。

  • 在设计过程中考虑线程安全,或者在文档中明确地指出它不是线程安全的。

  • 将同步策略文档化。


, ,