结构化并发应用程序

[TOC]

任务执行

大多数并发应用程序都是围绕“任务执行(Task Execution)”来构造的:任务通常是一些抽象的且离散的工作单元。

在线程中执行任务

当围绕“任务执行”来设计应用程序结构时,第一步就是要找出清晰的任务边界。在理想情况下,各个任务之间是相互独立的:任务并不依赖于其他任务的状态、结果或边界效应。

目标:在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量和快速的响应性。应用程序提供商希望程序支持尽可能多的用户,从而降低每个用户的服务成本,而用户则希望获得尽快的响应。而且,当负荷过载时,应用程序的性能应该是逐渐降低,而不是直接失败。

串行的执行任务

在应用程序中可以通过多种策略来调度任务,而其中一些策略能够更好地利用潜在的并发性。最简单的策略就是在单个线程中串行地执行各项任务。如 SingleThreadWebServer 所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SingleThreadWebServer {

public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
Socket connection = socket.accept();
handleRequest(connection);
}
}

private static void handleRequest(Socket connection) {
// handle request
}
}

在Web请求的处理中包含了一组不同的运算与I/O操作。服务器必须处理套接字I/O以读取请求和写回响应,这些操作通常会由于网络拥塞或连通性问题而被阻塞。此外,服务器还可能处理文件I/O或者数据库请求,这些操作同样会阻塞。在单线程的服务器中,阻塞不仅会推迟当前请求的完成时间,而且还将彻底阻止等待中的请求被处理。如果请求阻塞的时间过长,用户将认为服务器是不可用的,因为服务器看似失去了响应。同时,服务器的资源利用率非常低,因为当单线程在等待I/O操作完成时,CPU将处于空闲状态。

在服务器应用程序中,串行处理机制通常都无法提供高吞吐率或快速响应性。也有一些例外,例如,当任务数量很少且执行时间很长时,或者当服务器只为单个用户提供服务,并且该客户每次只发出一个请求时——但大多数服务器应用程序并不是按照这种方式来工作的。

显示的为任务创建线程

通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性,如ThreadPerTaskWebServer所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ThreadPerTaskWebServer {

public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
Socket connection = socket.accept();
new Thread(() -> handleRequest(connection)).start();
}
}

private static void handleRequest(Socket connection) {
// handle request
}
}
  • 任务处理过程从主线程中分离出来,使得主循环能够更快地重新等待下一个到来的连接。这使得程序在完成前面的请求之前可以接受新的请求,从而提高响应性。
  • 任务可以并行处理,从而能同时服务多个请求。如果有多个处理器,或者任务由于某种原因被阻塞,例如等待I/O完成、获取锁或者资源可用性等,程序的吞吐量将得到提高。
  • 任务处理代码必须是线程安全的,因为当有多个任务时会并发地调用这段代码。

在正常负载情况下,“为每个任务分配一个线程”的方法能提升串行执行的性能。只要请求的到达速率不超出服务器的请求处理能力,那么这种方法可以同时带来更快的响应性和更高的吞吐率。

无限制创建线程的不足

在生产环境中,“为每个任务分配一个线程”这种方法存在一些缺陷,尤其是当需要创建大量的线程时:

  • 线程生命周期的开销非常高。线程的创建与销毁并不是没有代价的。根据平台的不同,实际的开销也有所不同,但线程的创建过程都会需要时间,延迟处理的请求,并且需要JVM和操作系统提供一些辅助操作。
  • 资源消耗。
    • 活跃的线程会消耗系统资源,尤其是内存。
    • 如果可运行的线程数量多于可用处理器的数量,那么有些线程将闲置。
    • 大量空闲的线程会占用许多内存,给垃圾回收器带来压力,而且大量线程在竞争CPU资源时还将产生其他的性能开销。
    • 如果你已经拥有足够多的线程使所有CPU保持忙碌状态,那么再创建更多的线程反而会降低性能。
  • 稳定性。在可创建线程的数量上存在一个限制。这个限制值将随着平台的不同而不同,并且受多个因素制约,包括JVM的启动参数、Thread构造函数中请求的栈大小,以及底层操作系统对线程的限制等。如果破坏了这些限制,那么很可能抛出OutOfMemoryError异常。

在一定的范围内,增加线程可以提高系统的吞吐率,但如果超出了这个范围,再创建更多的线程只会降低程序的执行速度,并且如果过多地创建一个线程,那么整个应用程序将崩溃。要想避免这种危险,就应该对应用程序可以创建的线程数量进行限制,并且全面地测试应用程序,从而确保在线程数量达到限制时,程序也不会耗尽资源。

Executor 框架

任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。

线程池简化了线程的管理工作,并且 JUC 提供了一种灵活的线程池实现作为 Executor 框架的一部分。

在 Java 类库中,任务执行的主要抽象不是 Thread ,而是 Executor。

1
2
3
public interface Executor {
void execute(Runnable command);
}

虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。

Executor基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。

示例:基于 Executor 的 Web 服务器

TaskExecutionWebServer 使用了一种标准的Executor实现,即一个固定长度的线程池,可以容纳100个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class TaskExecutionWebServer {

private static final int NTHREADS = 100;

private static final Executor executor = Executors.newFixedThreadPool(NTHREADS);

public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
Socket connection = socket.accept();
executor.execute(() -> handleRequest(connection));
}
}

private static void handleRequest(Socket connection) {
// handle request
}
}

只需采用另一种不同的Executor实现,就可以改变服务器的行为。改变Executor实现或配置所带来的影响要远远小于改变任务提交方式带来的影响。

我们还可以自定义 Executor 接口来实现不同的策略。

执行策略

通过将任务的提交与执行解耦开来,从而无须太大的困难就可以为某种类型的任务指定和修改执行策略。

在执行策略中定义了任务执行的“What、Where、When、How”等方面,包括:

  • 在什么(What)线程中执行任务?
  • 任务按照什么(What)顺序执行(FIFO,LIFO,优先级)?
  • 有多少个(How Many)任务能够并发执行?
  • 在队列中有多少个(How Many)任务正在等待执行?
  • 如果系统由于过载而需要拒绝一个任务,那么应选择哪一个(Which)任务?另外,如何(How)通知应用程序有任务被拒绝?
  • 在执行一个任务之前或之后,应该进行哪些(What)动作?

各种执行策略都是一种资源管理工具,最佳策略取决于可用的计算资源以及对服务质量的需求。通过限制并发任务的数量,可以确保应用程序不会由于资源耗尽而失败,或者由于在稀缺资源上发生竞争而严重影响性能。通过将任务的提交与任务的执行策略分离开来,有助于在部署阶段选择与可用硬件资源最匹配的执行策略。

线程池

线程池,从定义上来看,是管理一组同构工作线程的资源池。线程池是与工作队列(Work Queue)密切相关的,其中在工作队列中保存了所有等待执行的任务。工作者线程(Worker Thread)的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

“在线程池中执行任务”比“为每个任务分配一个线程”优势更多:

  • 通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。
  • 当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。
  • 通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态。
  • 还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。

类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors中的静态工厂方法之一来创建一个线程池:

  • newFixedThreadPoolnewFixedThreadPool将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程)。
  • newCachedThreadPoolnewCachedThreadPool将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
  • newSingleThreadExecutornewSingleThreadExecutor是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。newSingleThreadExecutor 能确保依照任务在队列中的顺序来串行执行(例如FIFO、LIFO、优先级)。单线程的Executor还提供了大量的内部同步机制,从而确保了任务执行的任何内存写入操作对于后续任务来说都是可见的。这意味着,即使这个线程会不时地被另一个线程替代,但对象总是可以安全地封闭在“任务线程”中。
  • newScheduledThreadPoolnewScheduledThreadPool创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer。
  • newFixedThreadPoolnewCachedThreadPool这两个工厂方法返回通用的ThreadPoolExecutor实例,这些实例可以直接用来构造专门用途的executor。

Executor还可以实现各种调优、管理、监视、记录日志、错误报告和其他功能。

Executor 的生命周期

由于Executor以异步方式来执行任务,因此在任何时刻,之前提交任务的状态不是立即可见的。有些任务可能已经完成,有些可能正在运行,而其他的任务可能在队列中等待执行。

关闭形式:

  • 最平缓的关闭形式(完成所有已经启动的任务,并且不再接受任何新的任务)
  • 最粗暴的关闭形式(直接关掉机房的电源)
  • 以及其他各种可能的形式

Executor扩展了ExecutorService接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法):

1
2
3
4
5
6
7
8
public interface ExecutorService extends Executor{
void shutdown();
List<Runnable>shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;
//……其他用于任务提交的便利方法
}

ExecutorService的生命周期有3种状态:运行、关闭和已终止。

ExecutorService在初始创建时处于运行状态。

shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

ExecutorService关闭后提交的任务将由“拒绝执行处理器(RejectedExecutionHandler)”来处理。它会抛弃任务,或者使得execute方法抛出一个未检查的RejectedExecutionException

等所有任务都完成后,ExecutorService将转入终止状态。可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询ExecutorService是否已经终止。通常在调用awaitTermination之后会立即调用shutdown,从而产生同步地关闭ExecutorService的效果。

以下示例是一个执行关闭的 web 服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class LifecycleWebServer {

private static final int NTHREADS = 100;

private static final ExecutorService executor = Executors.newFixedThreadPool(NTHREADS);

public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (!executor.isShutdown()) {
try {
Socket connection = socket.accept();
executor.execute(() -> handleRequest(connection));
} catch (RejectedExecutionException e) {
if (!executor.isShutdown()) {
log("task submission rejected", e);
}
}
}
}

public static void stop() {
executor.shutdown();
}

private static void log(String msg, Exception e) {
Logger.getAnonymousLogger().log(Level.WARNING, msg, e);
}

private static void handleRequest(Socket connection) {
// handle request
}
}

延迟任务与周期任务

Timer类负责管理延迟任务(“在100ms后执行该任务”)以及周期任务(“每l0ms执行一次该任务”)。然而,Timer存在一些缺陷,因此应该考虑使用ScheduledThreadPoolExecutor来代替它。

  • Timer在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他TimerTask的定时精确性。
  • Timer的另一个问题是,如果TimerTask抛出了一个未检查的异常,那么Timer将表现出糟糕的行为。Timer线程并不捕获异常,因此当TimerTask抛出未检查的异常时将终止定时线程。这种情况下,Timer也不会恢复线程的执行,而是会错误地认为整个Timer都被取消了。因此,已经被调度但尚未执行的TimerTask将不会再执行,新的任务也不能被调度。这个问题称之为“线程泄漏[Thread Leakage]”

找出可利用的并行性

Executor框架帮助指定执行策略,但如果要使用Executor,必须将任务表述为一个Runnable。

在大多数服务器应用程序中都存在一个明显的任务边界:单个客户请求。但有时候,任务边界并非是显而易见的,例如在很多桌面应用程序中。即使是服务器应用程序,在单个客户请求中仍可能存在可发掘的并行性,例如数据库服务器。

示例组件实现浏览器程序中的页面渲染(Page-Rendering)功能,它的作用是将HTML页面绘制到图像缓存中。为了简便,假设HTML页面只包含标签文本,以及预定义大小的图片和URL。

示例:串行的页面渲染器

最简单的方法就是对HTML文档进行串行处理。当遇到文本标签时,将其绘制到图像缓存中。当遇到图像引用时,先通过网络获取它,然后再将其绘制到图像缓存中。这很容易实现,程序只需将输入中的每个元素处理一次(甚至不需要缓存文档),但这种方法可能会令用户感到烦恼,他们必须等待很长时间,直到显示所有的文本。另一种串行执行方法更好一些,它先绘制文本元素,同时为图像预留出矩形的占位空间,在处理完了第一遍文本后,程序再开始下载图像,并将它们绘制到相应的占位空间中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class SingleThreadRenderer {

void renderPage(CharSequence source) {
rendText(source);

List<ImageData> imageDataList = new ArrayList<>();
for (ImageInfo imageInfo : scanForImageInfo(source)) {
imageDataList.add(imageInfo.downloadImage());
}

for (ImageData imageData : imageDataList) {
renderImage(imageData);
}
}

abstract void rendText(CharSequence source);

abstract List<ImageInfo> scanForImageInfo(CharSequence source);

abstract void renderImage(ImageData imageData);

interface ImageData {}

interface ImageInfo {
ImageData downloadImage();
}
}

图像下载过程的大部分时间都是在等待I/O操作执行完成,在这期间CPU几乎不做任何工作。因此,这种串行执行方法没有充分地利用CPU,使得用户在看到最终页面之前要等待过长的时间。通过将问题分解为多个独立的任务并发执行,能够获得更高的CPU利用率和响应灵敏度。

携带结果的任务Callable与Future

Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,虽然run能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。

许多任务实际上都是存在延迟的计算——执行数据库查询,从网络上获取资源,或者计算某个复杂的功能。对于这些任务,Callable是一种更好的抽象:它认为主入口点(即call)将返回一个值,并可能抛出一个异常。要使用Callable来表示无返回值的任务,可使用Callable<Void>。在Executor中包含了一些辅助方法能将其他类型的任务封装为一个Callable,例如Runnable和java.security.PrivilegedAction。

Runnable和Callable描述的都是抽象的计算任务。这些任务通常是有范围的,即都有一个明确的起始点,并且最终会结束。Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。由于有些任务可能要执行很长的时间,因此通常希望能够取消这些任务。在Executor框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们能响应中断时,才能取消。取消一个已经完成的任务不会有任何影响。

Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。在Future规范中包含的隐含意义是,任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它就永远停留在“完成”状态上。

get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了异常,那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消,那么get将抛出CancellationException。如果get抛出了ExecutionException,那么可以通过getCause来获得被封装的初始异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

public interface Future<V> {

/**
* Attempts to cancel execution of this task. This method has no
* effect if the task is already completed or cancelled, or could
* not be cancelled for some other reason. Otherwise, if this
* task has not started when {@code cancel} is called, this task
* should never run. If the task has already started, then the
* {@code mayInterruptIfRunning} parameter determines whether the
* thread executing this task (when known by the implementation)
* is interrupted in an attempt to stop the task.
*
* <p>The return value from this method does not necessarily
* indicate whether the task is now cancelled; use {@link
* #isCancelled}.
*
* @param mayInterruptIfRunning {@code true} if the thread
* executing this task should be interrupted (if the thread is
* known to the implementation); otherwise, in-progress tasks are
* allowed to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed; {@code true}
* otherwise. If two or more threads cause a task to be cancelled,
* then at least one of them returns {@code true}. Implementations
* may provide stronger guarantees.
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled();

/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone();

/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;

/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

可以通过许多种方法创建一个Future来描述任务。ExecutorService中的所有submit方法都将返回一个Future,从而将一个RunnableCallable提交给Executor,并得到一个Future用来获得任务的执行结果或者取消任务。还可以显式地为某个指定的RunnableCallable实例化一个FutureTask。(由于FutureTask实现了Runnable,因此可以将它提交给Executor来执行,或者直接调用它的run方法。)

示例:使用 Future 实现页面渲染器

为了使页面渲染器实现更高的并发性,首先将渲染过程分解为两个任务,

  • 一个是渲染所有的文本,
  • 另一个是下载所有的图像。

因为其中一个任务是CPU密集型,而另一个任务是I/O密集型,因此这种方法即使在单CPU系统上也能提升性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public abstract class FutureRenderer {

private final ExecutorService executor = Executors.newFixedThreadPool(100);

void renderPage(CharSequence source) {

List<ImageInfo> imageInfos = scanForImageInfo(source);

Future<List<ImageData>> future =
executor.submit(
() -> {
List<ImageData> imageDataList = new ArrayList<>();
for (ImageInfo imageInfo : imageInfos) {
imageDataList.add(imageInfo.downloadImage());
}
return imageDataList;
});

rendText(source);

try {
List<ImageData> imageDataList = future.get();
for (ImageData imageData : imageDataList) {
renderImage(imageData);
}
} catch (InterruptedException e) {
// 重新设置线程的中断状态
Thread.currentThread().interrupt();
// 由于不需要结果,因此取消任务
future.cancel(true);
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e);
}
}

abstract void rendText(CharSequence source);

abstract List<ImageInfo> scanForImageInfo(CharSequence source);

abstract void renderImage(ImageData imageData);

interface ImageData {}

interface ImageInfo {
ImageData downloadImage();
}
}

FutureRenderer使得渲染文本任务与下载图像数据的任务并发地执行。当所有图像下载完后,会显示到页面上。这将提升用户体验,不仅使用户更快地看到结果,还有效利用了并行性,但我们还可以做得更好。用户不必等到所有的图像都下载完成,而希望看到每当下载完一幅图像时就立即显示出来。

在异构任务并行化中存在的缺陷

两个人可以很好地分担洗碗的工作:其中一个人负责清洗,而另一个人负责烘干。然而,要将不同类型的任务平均分配给每个工人却并不容易。当人数增加时,如何确保他们能帮忙而不是妨碍其他人工作,或者在重新分配工作时,并不是容易的事情。如果没有在相似的任务之间找出细粒度的并行性,那么这种方法带来的好处将减少。

当在多个工人之间分配异构的任务时,还有一个问题就是各个任务的大小可能完全不同。为了使任务分解能提高性能,这种开销不能高于并行性实现的提升。

只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。

CompletionServiceExecutorBlockingQueue

如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务(CompletionService)。

CompletionServiceExecutorBlockingQueue的功能融合在一起。你可以将Callable任务提交给它来执行,然后使用类似于队列操作的takepoll等方法来获得已完成的结果,而这些结果会在完成时将被封装为FutureExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor

ExecutorCompletionService的实现非常简单。在构造函数中创建一个BlockingQueue来保存计算完成的结果。当计算完成时,调用FutureTask中的done方法。当提交某个任务时,该任务将首先包装为一个QueueingFuture,这是FutureTask的一个子类,然后再改写子类的done方法,并将结果放入BlockingQueue中,take和poll方法委托给了BlockingQueue,这些方法会在得出结果之前阻塞。

1
2
3
4
5
6
7
8
9
10
11
private static class QueueingFuture<V> extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task,
BlockingQueue<Future<V>> completionQueue) {
super(task, null);
this.task = task;
this.completionQueue = completionQueue;
}
private final Future<V> task;
private final BlockingQueue<Future<V>> completionQueue;
protected void done() { completionQueue.add(task); }
}

示例:使用 completionService 实现页面渲染器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public abstract class Render {

private final ExecutorService executor;

public Render(ExecutorService executor) {
this.executor = executor;
}

void renderPage(CharSequence source) {
List<ImageInfo> imageInfos = scanForImageInfo(source);
CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor);

for (ImageInfo imageInfo : imageInfos) {
completionService.submit(imageInfo::downloadImage);
}

renderText(source);

int size = imageInfos.size();
try {
for (int i = 0; i < size; i++) {
Future<ImageData> imageDataFuture = completionService.take();
ImageData imageData = imageDataFuture.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e);
}
}

protected abstract List<ImageInfo> scanForImageInfo(CharSequence source);

protected abstract void renderText(CharSequence source);

protected abstract void renderImage(ImageData imageData);

interface ImageData {}

interface ImageInfo {
ImageData downloadImage();
}
}

多个ExecutorCompletionService可以共享一个Executor,因此可以创建一个对于特定计算私有,又能共享一个公共Executor的ExecutorCompletionService。因此,CompletionService的作用就相当于一组计算的句柄,这与Future作为单个计算的句柄是非常类似的。通过记录提交给CompletionService的任务数量,并计算出已经获得的已完成结果的数量,即使使用一个共享的Executor,也能知道已经获得了所有任务结果的时间。

为任务设置时限

有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。

在有限时间内执行任务的主要困难在于,要确保得到答案的时间不会超过限定的时间,或者在限定的时间内无法获得答案。在支持时间限制的Future.get中支持这种需求:当结果可用时,它将立即返回,如果在指定时限内没有计算出结果,那么将抛出TimeoutException。

在使用限时任务时需要注意,当这些任务超时后应该立即停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。要实现这个功能,可以由任务本身来管理它的限定时间,并且在超时后中止执行或取消任务。此时可再次使用Future,如果一个限时的get方法抛出了TimeoutException,那么可以通过Future来取消任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class RenderWithTimeBudget {

private static final Ad DEFAULT_AD = new Ad();
private static final long TIME_BUDGET = 1000;
private static final ExecutorService exec = Executors.newCachedThreadPool();

Page renderPageWithAd() throws InterruptedException {
long end = System.nanoTime() + TIME_BUDGET;
Future<Ad> f = exec.submit(Ad::new);
Page page = renderPageBody();

Ad ad;
try {
long timeLeft = end - System.nanoTime();
ad = f.get(timeLeft, TimeUnit.NANOSECONDS);
} catch (ExecutionException e) {
ad = DEFAULT_AD;
} catch (TimeoutException e) {
ad = DEFAULT_AD;
f.cancel(true);
}

page.setAd(ad);
return page;
}

private Page renderPageBody() {
return new Page();
}

static class Ad {}

static class Page {
public void setAd(Ad ad) {}
}
}

示例中生成的页面中包括响应用户请求的内容以及从广告服务器上获得的广告。它将获取广告的任务提交给一个Executor,然后计算剩余的文本页面内容,最后等待广告信息,直到超出指定的时间。如果get超时,那么将取消[插图]广告获取任务,并转而使用默认的广告信息。

示例:旅行预订门户网站

预定时间”方法可以很容易地扩展到任意数量的任务上。考虑这样一个旅行预定门户网站:用户输入旅行的日期和其他要求,门户网站获取并显示来自多条航线、旅店或汽车租赁公司的报价。在获取不同公司报价的过程中,可能会调用Web服务、访问数据库、执行一个EDI事务或其他机制。在这种情况下,不宜让页面的响应时间受限于最慢的响应时间,而应该只显示在指定时间内收到的信息。

从一个公司获得报价的过程与从其他公司获得报价的过程无关,因此可以将获取报价的过程当成一个任务,从而使获得报价的过程能并发执行。创建n个任务,将其提交到一个线程池,保留n个Future,并使用限时的get方法通过Future串行地获取每一个结果,这一切都很简单,但还有一个更简单的方法——invokeAll。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class TimeBudget {

private static ExecutorService executor = Executors.newCachedThreadPool();

public List<TravelQuote> getRankedTravelQuotes(
TravelInfo travelInfo,
Set<TravelCompany> companies,
Comparator<TravelQuote> ranking,
long time,
TimeUnit unit)
throws InterruptedException {
List<QuoteTask> tasks = new ArrayList<>();
for (TravelCompany company : companies) {
tasks.add(new QuoteTask(company, travelInfo));
}

List<Future<TravelQuote>> futures = executor.invokeAll(tasks, time, unit);

List<TravelQuote> quotes = new ArrayList<>(tasks.size());
for (Future<TravelQuote> future : futures) {
for (QuoteTask task : tasks) {
try {
quotes.add(future.get());
} catch (ExecutionException e) {
quotes.add(task.getFailureQuote(e.getCause()));
} catch (CancellationException e) {
quotes.add(task.getTimeoutQuote(e));
}
}
}

quotes.sort(ranking);

return quotes;
}

static class QuoteTask implements Callable<TravelQuote> {
private final TravelCompany company;
private final TravelInfo travelInfo;

public QuoteTask(TravelCompany company, TravelInfo travelInfo) {
this.company = company;
this.travelInfo = travelInfo;
}

TravelQuote getFailureQuote(Throwable t) {
return null;
}

TravelQuote getTimeoutQuote(CancellationException e) {
return null;
}

@Override
public TravelQuote call() throws Exception {
return company.solicitQuote(travelInfo);
}
}
}

interface TravelCompany {
TravelQuote solicitQuote(TravelInfo travelInfo) throws Exception;
}

interface TravelQuote {}

interface TravelInfo {}

小结

  • 通过围绕任务执行来设计应用程序,可以简化开发过程,并有助于实现并发。
  • Executor框架将任务提交与执行策略解耦开来,同时还支持多种不同类型的执行策略。
  • 当需要创建线程来执行任务时,可以考虑使用 executor 。
  • 要想在将应用程序分解为不同的任务时获得最大的好处,必须定义清晰的任务边界。
  • 某些应用程序中存在着比较明显的任务边界,而在其他一些程序中则需要进一步分析才能揭示出粒度更细的并行性。

取消与关闭

任务和线程的启动很容易。在大多数时候,我们都会让它们运行直到结束,或者让它们自行停止。然而,有时候我们希望提前结束任务或线程,或许是因为用户取消了操作,或者应用程序需要被快速关闭。

要使任务和线程能安全、快速、可靠地停止下来,并不是一件容易的事。Java没有提供任何机制来安全地终止线程。但它提供了中断(Interruption),这是一种协作机制,能够使一个线程终止另一个线程的当前工作。

这种协作式的方法是必要的,我们很少希望某个任务、线程或服务立即停止,因为这种立即停止会使共享的数据结构处于不一致的状态。相反,在编写任务和服务时可以使用一种协作的方式:当需要停止时,它们首先会清除当前正在执行的工作,然后再结束。这提供了更好的灵活性,因为任务本身的代码比发出取消请求的代码更清楚如何执行清除工作。

生命周期结束(End-of-Lifecycle)的问题会使任务、服务以及程序的设计和实现等过程变得复杂,而这个在程序设计中非常重要的要素却经常被忽略。一个在行为良好的软件与勉强运行的软件之间的最主要区别就是,行为良好的软件能很完善地处理失败、关闭和取消等过程。本章将给出各种实现取消和中断的机制,以及如何编写任务和服务,使它们能对取消请求做出响应。

任务取消

如果外部代码能在某个操作正常完成之前将其置入“完成”状态,那么这个操作就可以称为可取消的(Cancellable)。

取消某个操作的原因很多:

  • 用户请求取消。
  • 有时间限制的操作。
  • 应用程序事件
  • 错误
  • 关闭

在Java中没有一种安全的抢占式方法来停止线程,因此也就没有安全的抢占式方法来停止任务。只有一些协作式的机制,使请求取消的任务和代码都遵循一种协商好的协议。

其中一种协作机制能设置某个“已请求取消(Cancellation Requested)”标志,而任务将定期地查看该标志。如果设置了这个标志,那么任务将提前结束。 如示例 PrimeGenerator 所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class PrimeGenerator implements Runnable {

@GuardedBy("this")
private final List<BigInteger> primes = new ArrayList<>();

private volatile boolean cancelled;

@Override
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}

public void cancel() {
cancelled = true;
}

public synchronized List<BigInteger> get() {
return new ArrayList<>(primes);
}

static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
new Thread(generator).start();
try {
SECONDS.sleep(1);
} finally {
generator.cancel();
}
return generator.get();
}
}

PrimeGenerator持续地枚举素数,直到它被取消。cancel方法将设置cancelled标志,并且主循环在搜索下一个素数之前会首先检查这个标志。(为了使这个过程能可靠地工作,标志cancelled必须为volatile类型。)

PrimeGenerator使用了一种简单的取消策略:客户代码通过调用cancel来请求取消,PrimeGenerator在每次搜索素数前首先检查是否存在取消请求,如果存在则退出。

一个可取消的任务必须拥有取消策略(Cancellation Policy),在这个策略中将详细地定义取消操作的“How”、“When”以及“What”,即其他代码如何(How)请求取消该任务,任务在何时(When)检查是否已经请求了取消,以及在响应取消请求时应该执行哪些(What)操作。

中断

PrimeGenerator中的取消机制最终会使得搜索素数的任务退出,但在退出过程中需要花费一定的时间。然而,如果使用这种方法的任务调用了一个阻塞方法,例如BlockingQueue.put,那么可能会产生一个更严重的问题——任务可能永远不会检查取消标志,因此永远不会结束。如示例 BrokenPrimeProducer 所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Terrible
public class BrokenPrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;

public BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}

@Override
public void run() {
BigInteger p = BigInteger.ONE;
try {
while (!cancelled) {
queue.put(p.nextProbablePrime());
}
} catch (InterruptedException consumed) {
}
}

public void cancel() {
cancelled = true;
}

void consumePrimes() throws InterruptedException {
BlockingQueue<BigInteger> primes = new ArrayBlockingQueue<>(100);
BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);
producer.start();
try {
while (needMorePrimes()) {
consume(primes.take());
}
} finally {
producer.cancel();
}
}

private boolean needMorePrimes() {
return true;
}

private void consume(BigInteger take) {}
}

生产者线程生成素数,并将它们放入一个阻塞队列。如果生产者的速度超过了消费者的处理速度,队列将被填满,put方法也会阻塞。当生产者在put方法中阻塞时,如果消费者希望取消生产者任务,那么将发生什么情况?它可以调用cancel方法来设置cancelled标志,但此时生产者却永远不能检查这个标志,因为它无法从阻塞的put方法中恢复过来(因为消费者此时已经停止从队列中取出素数,所以put方法将一直保持阻塞状态)。

一些特殊的阻塞库的方法支持中断。线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的工作。

在Java的API或语言规范中,并没有将中断与任何取消语义关联起来,但实际上,如果在取消之外的其他操作中使用中断,那么都是不合适的,并且很难支撑起更大的应用。

每个线程都有一个boolean类型的中断状态。当中断线程时,这个线程的中断状态将被设置为true。在Thread中包含了中断线程以及查询线程中断状态的方法,interrupt方法能中断目标线程,而isInterrupted方法能返回目标线程的中断状态。静态的interrupted方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法。

1
2
3
4
5
6
public class Thread {
public void interrupt(){……}
public boolean isInterrupted(){……}
public static boolean interrupted(){……}
……
}

阻塞库方法,例如Thread.sleepObject.wait等,都会检查线程何时中断,并且在发现中断时提前返回。它们在响应中断时执行的操作包括:**清除中断状态,抛出InterruptedException**,表示阻塞操作由于中断而提前结束。JVM并不能保证阻塞方法检测到中断的速度,但在实际情况中响应速度还是非常快的。

当线程在非阻塞状态下中断时,它的中断状态将被设置,然后根据将被取消的操作来检查中断状态以判断发生了中断。通过这样的方法,中断操作将变得“有黏性”——如果不触发InterruptedException,那么中断状态将一直保持,直到明确地清除中断状态。调用interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。

对中断操作的正确理解是:它并不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时刻中断自己。(这些时刻也被称为取消点)。有些方法,例如wait、sleep和join等,将严格地处理这种请求,当它们收到中断请求或者在开始执行时发现某个已被设置好的中断状态时,将抛出一个异常。设计良好的方法可以完全忽略这种请求,只要它们能使调用代码对中断请求进行某种处理。设计糟糕的方法可能会屏蔽中断请求,从而导致调用栈中的其他代码无法对中断请求作出响应。

在使用静态的interrupted时应该小心,因为它会清除当前线程的中断状态。如果在调用interrupted时返回了true,那么除非你想屏蔽这个中断,否则必须对它进行处理——可以抛出InterruptedException,或者通过再次调用interrupt来恢复中断状态。

如果任务代码能够响应中断,那么可以使用中断作为取消机制,并且利用许多库类中提供的中断支持。通常,中断是实现取消的最合理方式。

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;

public PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted()) {
queue.put(p = p.nextProbablePrime());
}
} catch (InterruptedException ignored) {
// allow thread exit
}
}

public void cancel() {
interrupt();
}
}

中断策略

正如任务中应该包含取消策略一样,线程同样应该包含中断策略。中断策略规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作(如果需要的话),哪些工作单元对于中断来说是原子操作,以及以多快的速度来响应中断。

最合理的中断策略是某种形式的线程级(Thread-Level)取消操作或服务级(Service-Level)取消操作:尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。此外还可以建立其他的中断策略,例如暂停服务或重新开始服务,但对于那些包含非标准中断策略的线程或线程池,只能用于能知道这些策略的任务中。

区分任务和线程对中断的反应是很重要的。一个中断请求可以有一个或多个接收者——中断线程池中的某个工作者线程,同时意味着“取消当前任务”和“关闭工作者线程”。

任务不会在其自己拥有的线程中执行,而是在某个服务(例如线程池)拥有的线程中执行。这就是为什么大多数可阻塞的库函数都只是抛出InterruptedException作为中断响应。它们永远不会在某个由自己拥有的线程中运行,因此它们为任务或库代码实现了最合理的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使调用栈中的上层代码可以采取进一步的操作。

任务不应该对执行该任务的线程的中断策略做出任何假设,除非该任务被专门设计为在服务中运行,并且在这些服务中包含特定的中断策略。无论任务把中断视为取消,还是其他某个中断响应操作,都应该小心地保存执行线程的中断状态。如果除了将InterruptedException传递给调用者外还需要执行其他操作,那么应该在捕获InterruptedException之后恢复中断状态:

1
Thread.currentThread().interrupt();

正如任务代码不应该对其执行所在的线程的中断策略做出假设,执行取消操作的代码也不应该对线程的中断策略做出假设。线程应该只能由其所有者中断,所有者可以将线程的中断策略信息封装到某个合适的取消机制中,例如关闭(shutdown)方法。

由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

响应中断

在前面的章节中,有两种实用策略可用于处理InterruptedException

  • 传递异常(可能在执行某个特定于任务的清除操作之后),从而使你的方法也成为可中断的阻塞方法。
  • 恢复中断状态,从而使调用栈中的上层代码能够对其进行处理。

只有实现了线程中断策略的代码才可以屏蔽中断请求。在常规的任务和库代码中都不应该屏蔽中断请求。

对于一些不支持取消但仍可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。在这种情况下,它们应该在本地保存中断状态,并在返回前恢复状态而不是在捕获InterruptedException时恢复状态。如示例所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class NonCancelableTask {
public Task getNextTask(BlockingQueue<Task> queue) {
boolean interrupted = false;
try {
while (true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
// fall through and retry
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

interface Task {}
}

如果代码不会调用可中断的阻塞方法,那么仍然可以通过在任务代码中轮询当前线程的中断状态来响应中断。要选择合适的轮询频率,就需要在效率和响应性之间进行权衡。如果响应性要求较高,那么不应该调用那些执行时间较长并且不响应中断的方法,从而对可调用的库代码进行一些限制。

在取消过程中可能涉及除了中断状态之外的其他状态。中断可以用来获得线程的注意,并且由中断线程保存的信息,可以为中断的线程提供进一步的指示。(当访问这些信息时,要确保使用同步。)

示例:计时运行

许多问题永远也无法解决(例如,枚举所有的素数),而某些问题,能很快得到答案,也可能永远得不到答案。在这些情况下,如果能够指定“最多花10分钟搜索答案”或者“枚举出在10分钟内能找到的答案”,那么将是非常有用的。

下面的示例在调用线程中运行任务,并安排了一个取消任务,在运行指定的时间间隔后中断它。这解决了从任务中抛出未检查异常的问题,因为该异常会被timedRun的调用者捕获。

1
2
3
4
5
6
7
8
9
10
public class TimedRun1 {

private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1);

public static void timedRun(Runnable r, long timeout, TimeUnit unit) {
Thread taskThread = Thread.currentThread();
cancelExec.schedule(taskThread::interrupt, timeout, unit);
r.run();
}
}

这是一种非常简单的方法,但却破坏了以下规则:在中断线程之前,应该了解它的中断策略。由于timedRun可以从任意一个线程中调用,因此它无法知道这个调用线程的中断策略。如果任务在超时之前完成,那么中断timedRun所在线程的取消任务将在timedRun返回到调用者之后启动。我们不知道在这种情况下将运行什么代码,但结果一定是不好的。(可以使用schedule返回的ScheduledFuture来取消这个取消任务以避免这种风险,这种做法虽然可行,但却非常复杂。)而且,如果任务不响应中断,那么timedRun会在任务结束时才返回,此时可能已经超过了指定的时限(或者还没有超过时限)。如果某个限时运行的服务没有在指定的时间内返回,那么将对调用者带来负面影响。

下个示例解决了之前解决方案中的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class TimedRun2 {

private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1);

public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
class RethrowableTask implements Runnable {
private volatile Throwable t;

@Override
public void run() {
try {
r.run();
} catch (Throwable t) {
this.t = t;
}
}

void rethrow() {
if (t != null) {
throw launderThrowable(t);
}
}
}

RethrowableTask task = new RethrowableTask();
Thread taskThread = new Thread(task);
taskThread.start();
cancelExec.schedule(
new Runnable() {
@Override
public void run() {
taskThread.interrupt();
}
},
timeout,
unit);
taskThread.join(unit.toMillis(timeout));
task.rethrow();
}
}

在这个示例的代码中解决了前面示例中的问题,但由于它依赖于一个限时的join,因此存在着join的不足:无法知道执行控制是因为线程正常退出而返回还是因为join超时而返回。

这是Thread API的一个缺陷,因为无论join是否成功地完成,在Java内存模型中都会有内存可见性结果,但join本身不会返回某个状态来表明它是否成功。

通过 Future 来实现取消

我们已经使用了一种抽象机制来管理任务的生命周期,处理异常,以及实现取消,即Future。

ExecutorService.submit将返回一个Future来描述任务。Future拥有一个cancel方法,该方法带有一个boolean类型的参数mayInterruptIfRunning,表示取消操作是否成功。(这只是表示任务是否能够接收中断,而不是表示任务是否能检测并处理中断。)如果mayInterruptIfRunningtrue并且任务当前正在某个线程中运行,那么这个线程能被中断。如果这个参数为false,那么意味着“若任务还没有启动,就不要运行它”,这种方式应该用于那些不处理中断的任务中。

除非你清楚线程的中断策略,否则不要中断线程,那么在什么情况下调用cancel可以将参数指定为true?执行任务的线程是由标准的Executor创建的,它实现了一种中断策略使得任务可以通过中断被取消,所以如果任务在标准Executor中运行,并通过它们的Future来取消任务,那么可以设置mayInterruptIfRunning

下面的示例给出了另一个版本的timedRun:将任务提交给一个ExecutorService,并通过一个定时的Future.get来获得结果。如果get在返回时抛出了一个TimeoutException,那么任务将通过它的Future来取消。还给出了另一种良好的编程习惯:取消那些不再需要结果的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TimedRun {

private static final ExecutorService executor = Executors.newCachedThreadPool();

public static void timeRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
Future<?> future = executor.submit(r);

try {
future.get(timeout, unit);
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e);
} catch (TimeoutException e) {
// task will be cancelled below
} finally {
// Harmless if task already completed
future.cancel(true);
}
}
}

处理不可中断的阻塞

在Java库中,许多可阻塞的方法都是通过提前返回或者抛出InterruptedException来响应中断请求的,从而使开发人员更容易构建出能响应取消请求的任务。

然而,并非所有的可阻塞方法或者阻塞机制都能响应中断;如果一个线程由于执行同步的Socket I/O或者等待获得内置锁而阻塞,那么中断请求只能设置线程的中断状态,除此之外没有其他任何作用。对于那些由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但这要求我们必须知道线程阻塞的原因。

  • Java.io包中的同步Socket I/O。在服务器应用程序中,最常见的阻塞I/O形式就是对套接字进行读取和写入。虽然InputStreamOutputStream中的read和write等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read或write等方法而被阻塞的线程抛出一个SocketException

  • Java.io包中的同步I/O。当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptException并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程都抛出AsynchronousCloseException。大多数标准的Channel都实现了InterruptibleChannel

  • Selector的异步I/O。如果一个线程在调用Selector.select方法(在java.nio.channels中)时阻塞了,那么调用closewakeup方法会使线程抛出ClosedSelectorException并提前返回。

  • 获取某个锁。如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,因为线程认为它肯定会获得锁,所以将不会理会中断请求。但是,在Lock类中提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class ReaderThread extends Thread {

private static final int BUFSZ = 1024;

private final Socket socket;
private final InputStream is;

public ReaderThread(Socket socket) throws IOException {
this.socket = socket;
is = socket.getInputStream();
}

@Override
public void run() {
try {
byte[] bytes = new byte[BUFSZ];
while (true) {
int count = is.read(bytes);
if (count < 0) {
break;
} else if (count > 0) {
processBuffer(bytes, count);
}
}
} catch (IOException ignored) {
/*允许线程退出*/
}
}

private void processBuffer(byte[] bytes, int count) {}

@Override
public void interrupt() {

try {
socket.close();
} catch (IOException ignored) {
} finally {
super.interrupt();
}
}
}

ReaderThread给出了如何封装非标准的取消操作。ReaderThread管理了一个套接字连接,它采用同步方式从该套接字中读取数据,并将接收到的数据传递给processBuffer。为了结束某个用户的连接或者关闭服务器,ReaderThread改写了interrupt方法,使其既能处理标准的中断,也能关闭底层的套接字。因此,无论ReaderThread线程是在read方法中阻塞还是在某个可中断的阻塞方法中阻塞,都可以被中断并停止执行当前的工作。

采用newTaskFor来封装非标准的取消

我们可以通过newTaskFor方法来进一步优化ReaderThread中封装非标准取消的技术,这是Java 6ThreadPoolExecutor中的新增功能。当把一个Callable提交给ExecutorService时,submit方法会返回一个Future,我们可以通过这个Future来取消任务。newTaskFor是一个工厂方法,它将创建Future来代表任务。newTaskFor还能返回一个RunnableFuture接口,该接口扩展了FutureRunnable(并由FutureTask实现)。

通过定制表示任务的Future可以改变Future.cancel的行为。例如,定制的取消代码可以实现日志记录或者收集取消操作的统计信息,以及取消一些不响应中断的操作。通过改写interrupt方法,ReaderThread可以取消基于套接字的线程。同样,通过改写任务的Future.cancel方法也可以实现类似的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class SocketUsingTask<T> implements CancellableTask<T> {

@GuardedBy("this")
private Socket socket;

public SocketUsingTask<T> setSocket(Socket socket) {
this.socket = socket;
return this;
}

@Override
public void cancel() {
try {
if (socket != null) {
socket.close();
}
} catch (IOException ignored) {
// ignored
}
}

@Override
public RunnableFuture<T> newTask() {
return new FutureTask<>(this) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
try {
SocketUsingTask.this.cancel();
} finally {
return super.cancel(mayInterruptIfRunning);
}
}
};
}

@Override
public T call() throws Exception {
// 此处读取 Socket
return null;
}
}

interface CancellableTask<T> extends Callable<T> {
void cancel();

RunnableFuture<T> newTask();
}

@ThreadSafe
class CancellingExecutor extends ThreadPoolExecutor {
//....构造器

@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (callable instanceof CancellableTask) {
return ((CancellableTask<T>) callable).newTask();
} else {
return super.newTaskFor(callable);
}
}
}

CancellableTask中定义了一个CancellableTask接口,该接口扩展了Callable,并增加了一个cancel方法和一个newTask工厂方法来构造RunnableFuture。CancellingExecutor扩展了ThreadPoolExecutor,并通过改写newTaskFor使得CancellableTask可以创建自己的Future。

SocketUsingTask实现了CancellableTask,并定义了Future.cancel来关闭套接字和调用super.cancel。如果SocketUsingTask通过其自己的Future来取消,那么底层的套接字将被关闭并且线程将被中断。因此它提高了任务对取消操作的响应性:不仅能够在调用可中断方法的同时确保响应取消操作,而且还能调用可阻调的套接字I/O方法。

停止基于线程的服务

应用程序通常会创建拥有多个线程的服务,例如线程池,并且这些服务的生命周期通常比创建它们的方法的生命周期更长。如果应用程序准备退出,那么这些服务所拥有的线程也需要结束。由于无法通过抢占式的方法来停止线程,因此它们需要自行结束。

正确的封装原则是:除非拥有某个线程,否则不能对该线程进行操控。在线程API中,并没有对线程所有权给出正式的定义:线程由Thread对象表示,并且像其他对象一样可以被自由共享。然而,线程有一个相应的所有者,即创建该线程的类。因此线程池是其工作者线程的所有者,如果要中断这些线程,那么应该使用线程池。

与其他封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序并不能拥有工作者线程,因此应用程序不能直接停止工作者线程。相反,服务应该提供生命周期方法(LifecycleMethod)来关闭它自己以及它所拥有的线程。这样,当应用程序关闭该服务时,服务就可以关闭所有的线程了。在ExecutorService中提供了shutdownshutdownNow等方法。同样,在其他拥有线程的服务中也应该提供类似的关闭机制。

对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。

示例:日志服务

LogWriter 给出了一个简单的日志服务示例,其中日志操作在单独的日志线程中执行。产生日志消息的线程并不会将消息直接写入输出流,而是由LogWriter通过BlockingQueue将消息提交给日志线程,并由日志线程写入。这是一种多生产者单消费者(Multiple-Producer, Single-Consumer)的设计方式:每个调用log的操作都相当于一个生产者,而后台的日志线程则相当于消费者。如果消费者的处理速度低于生产者的生成速度,那么BlockingQueue将阻塞生产者,直到日志线程有能力处理新的日志消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//不支持关闭的生产者-消费者日志服务
public class LogWriter {

private final BlockingQueue<String> queue;
private final LoggerThread logger;

public LogWriter(BlockingQueue<String> queue, LoggerThread logger) {
this.queue = queue;
this.logger = logger;
}

public void start() {
logger.start();
}

public void log(String msg) throws InterruptedException {
queue.put(msg);
}

class LoggerThread extends Thread {
private final PrintWriter writer;

LoggerThread(PrintWriter writer) {
this.writer = writer;
}

@Override
public void run() {
try {
while (true) {
writer.println(queue.take());
}
} catch (InterruptedException ignored) {
// ignored
} finally {
writer.close();
}
}
}
}

为了使像LogWriter这样的服务在软件产品中能发挥实际的作用,还需要实现一种终止日志线程的方法,从而避免使JVM无法正常关闭。要停止日志线程是很容易的,因为它会反复调用take,而take能响应中断。如果将日志线程修改为当捕获到InterruptedException时退出,那么只需中断日志线程就能停止服务。

然而,如果只是使日志线程退出,那么还不是一种完备的关闭机制。这种直接关闭的做法会丢失那些正在等待被写入到日志的信息,不仅如此,其他线程将在调用log时被阻塞,因为日志消息队列是满的,因此这些线程将无法解除阻塞状态。当取消一个生产者-消费者操作时,需要同时取消生产者和消费者。在中断日志线程时会处理消费者,但在这个示例中,由于生产者并不是专门的线程,因此要取消它们将非常困难。

为LogWriter提供可靠关闭操作的方法是解决竞态条件问题,因而要使日志消息的提交操作成为原子操作。然而,我们不希望在消息加入队列时去持有一个锁,因为put方法本身就可以阻塞。我们采用的方法是:通过原子方式来检查关闭请求,并且有条件地递增一个计数器来“保持”提交消息的权利,如LogService所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class LogService {

private final BlockingQueue<String> queue;
private final LoggerThread logger;

@GuardedBy("this")
private boolean isShutdown;

@GuardedBy("this")
private int reservations;

public LogService(BlockingQueue<String> queue, LoggerThread logger) {
this.queue = queue;
this.logger = logger;
}

public void start() {
logger.start();
}

public void stop() {
synchronized (this) {
isShutdown = true;
}
logger.interrupt();
}

public void log(String msg) throws InterruptedException {
synchronized (this) {
if (isShutdown) {
throw new IllegalStateException("");
}
++reservations;
}
queue.put(msg);
}

class LoggerThread extends Thread {
private final PrintWriter writer;

LoggerThread(PrintWriter writer) {
this.writer = writer;
}

@Override
public void run() {
try {
while (true) {
try {
synchronized (LogService.this) {
if (isShutdown && reservations == 0) {
break;
}
}

String msg = queue.take();

synchronized (this) {
--reservations;
}

writer.println(msg);
} catch (InterruptedException ignored) {
// retry
}
}
} finally {
writer.close();
}
}
}
}

关闭 ExecutorService

ExecutorService提供了两种关闭方法:使用shutdown正常关闭,以及使用shutdownNow强行关闭。这两种关闭方式的差别在于各自的安全性和响应性:强行关闭的速度更快,但风险也更大,因为任务很可能在执行到一半时被结束;而正常关闭虽然速度慢,但却更安全,因为ExecutorService会一直等到队列中的所有任务都执行完成后才关闭。在其他拥有线程的服务中也应该考虑提供类似的关闭方式以供选择。

在复杂程序中,通常会将ExecutorService封装在某个更高级别的服务中,并且该服务能提供其自己的生命周期方法。

LogServiceUseExecutorService 将管理线程的工作委托给一个ExecutorService,而不是由其自行管理。通过封装ExecutorService,可以将所有权链(OwnershipChain)从应用程序扩展到服务以及线程,所有权链上的各个成员都将管理它所拥有的服务或线程的生命周期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class LogServiceUseExecutorService {

private final ExecutorService executor = Executors.newCachedThreadPool();
private final PrintWriter writer;

private final long timeout;
private final TimeUnit unit;

public LogServiceUseExecutorService(PrintWriter writer, long timeout, TimeUnit unit) {
this.writer = writer;
this.timeout = timeout;
this.unit = unit;
}

public void stop() throws InterruptedException {
try {
executor.shutdown();
executor.awaitTermination(timeout, unit);
} finally {
writer.close();
}
}

public void log(String msg) {
executor.execute(() -> writer.println(msg));
}
}

“毒丸”对象

另一种关闭生产者-消费者服务的方式就是使用“毒丸(Poison Pill)”对象:“毒丸”是指一个放在队列上的对象,其含义是:“当得到这个对象时,立即停止。” 在FIFO(先进先出)队列中,“毒丸”对象将确保消费者在关闭之前首先完成队列中的所有工作,在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会再提交任何工作。

下面的示例使用了毒丸对象来关闭服务,一个单生产者-单消费者的桌面搜索示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class IndexingService {

private static final File POISON = new File("");
private final CrawlerThread crawlerThread = new CrawlerThread();
private final IndexerThread indexerThread = new IndexerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;

public IndexingService(BlockingQueue<File> queue, FileFilter fileFilter, File root) {
this.queue = queue;
this.fileFilter = fileFilter;
this.root = root;
}

public void start() {
crawlerThread.start();
indexerThread.start();
}

class CrawlerThread extends Thread {

@Override
public void run() {
try {
crawl(root);
} catch (InterruptedException ignored) {
// retry
} finally {
while (true) {
try {
queue.put(POISON);
} catch (InterruptedException ignored) {
// retry
}
}
}
}

private void crawl(File root) throws InterruptedException {
//
}
}

class IndexerThread extends Thread {

@Override
public void run() {
try {
while (true) {
File file = queue.take();
if (file == POISON) {
break;
}
indexFile(file);
}
} catch (InterruptedException ignored) {
// ignored
}
}

private void indexFile(File file) {
//
}
}
}

只有在生产者和消费者的数量都已知的情况下,才可以使用“毒丸”对象。只有在无界队列中,“毒丸”对象才能可靠地工作。

示例:只执行一次的服务

如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一个私有的Executor来简化服务的生命周期管理,其中该Executor的生命周期是由这个方法来控制的。(在这种情况下,invokeAll和invokeAny等方法通常会起较大的作用。)

示例checkMail方法能在多台主机上并行地检查新邮件。它创建一个私有的Executor,并向每台主机提交一个任务。然后,当所有邮件检查任务都执行完成后,关闭Executor并等待结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CheckForEmail {

public boolean checkEmail(Set<String> hosts, long timeout, TimeUnit unit)
throws InterruptedException {

ExecutorService executor = Executors.newCachedThreadPool();
AtomicBoolean hasNewEmail = new AtomicBoolean(false);

try {
for (String host : hosts) {
executor.execute(
() -> {
if (checkMail(host)) {
hasNewEmail.set(true);
}
});
}
} finally {
executor.shutdown();
executor.awaitTermination(timeout, unit);
}

return hasNewEmail.get();
}

private boolean checkMail(String host) {
return false;
}
}

shutdownNow 的局限性

当通过shutdownNow来强行关闭ExecutorService时,它会尝试取消正在执行的任务,并返回所有已提交但尚未开始的任务,从而将这些任务写入日志或者保存起来以便之后进行处理。然而,我们无法通过常规方法来找出哪些任务已经开始但尚未结束。这意味着我们无法在关闭过程中知道正在执行的任务的状态,除非任务本身会执行某种检查。要知道哪些任务还没有完成,你不仅需要知道哪些任务还没有开始,而且还需要知道当Executor关闭时哪些任务正在执行。

TrackingExecutor中给出了如何在关闭过程中判断正在执行的任务。通过封装ExecutorService并使得execute(类似地还有submit,在这里没有给出)记录哪些任务是在关闭后取消的,TrackingExecutor可以找出哪些任务已经开始但还没有正常完成。在Executor结束后,getCancelledTasks返回被取消的任务清单。要使这项技术能发挥作用,任务在返回时必须维持线程的中断状态,在所有设计良好的任务中都会实现这个功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class TrackingExecutor extends AbstractExecutorService {

private final ExecutorService executor;
private final Set<Runnable> taskCancelledAtShutdown =
Collections.synchronizedSet(new HashSet<>());

public TrackingExecutor(ExecutorService executor) {
this.executor = executor;
}

@Override
public void shutdown() {
executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
return executor.isShutdown();
}

@Override
public boolean isTerminated() {
return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
executor.execute(
() -> {
try {
command.run();
} finally {
if (isShutdown() && Thread.currentThread().isInterrupted()) {
taskCancelledAtShutdown.add(command);
}
}
});
}

public List<Runnable> getCancelledTasks() {
return new ArrayList<>(taskCancelledAtShutdown);
}
}

WebCrawler 是 TracingExecutor 的一个使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public abstract class WebCrawler {

private static final long TIMEOUT = 1000;
private static final TimeUnit UNIT = TimeUnit.MILLISECONDS;
private volatile TrackingExecutor executor;
private final Set<URL> urlsToCrawl = new HashSet<>();

public synchronized void start() {
executor = new TrackingExecutor(Executors.newCachedThreadPool());
for (URL url : urlsToCrawl) {
submitCrawTask(url);
}
urlsToCrawl.clear();
}

public synchronized void stop() throws InterruptedException {
try {
saveUncrawled(executor.shutdownNow());
if (executor.awaitTermination(TIMEOUT, UNIT)) {
saveUncrawled(executor.getCancelledTasks());
}
} finally {
executor = null;
}
}

private void saveUncrawled(List<Runnable> uncrawled) {
for (Runnable runnable : uncrawled) {
urlsToCrawl.add(((CrawlTask) runnable).getPage());
}
}

protected void submitCrawTask(URL url) {
executor.execute(new CrawlTask(url));
}

protected abstract List<URL> processPage(URL url);

private class CrawlTask implements Runnable {

private final URL url;

private CrawlTask(URL url) {
this.url = url;
}

@Override
public void run() {
for (URL link : processPage(url)) {
if (Thread.currentThread().isInterrupted()) {
return;
}
submitCrawTask(link);
}
}

public URL getPage() {
return url;
}
}
}

处理非正常的线程终止

导致线程提前死亡的最主要原因就是RuntimeException。由于这些异常表示出现了某种编程错误或者其他不可修复的错误,因此它们通常不会被捕获。它们不会在调用栈中逐层传递,而是默认地在控制台中输出栈追踪信息,并终止线程。

线程非正常退出的后果可能是良性的,也可能是恶性的,这要取决于线程在应用程序中的作用。

任何代码都可能抛出一个RuntimeException。每当调用另一个方法时,都要对它的行为保持怀疑,不要盲目地认为它一定会正常返回,或者一定会抛出在方法原型中声明的某个已检查异常。对调用的代码越不熟悉,就越应该对其代码行为保持怀疑。

在任务处理线程(例如线程池中的工作者线程或者Swing的事件派发线程等)的生命周期中,将通过某种抽象机制(例如Runnable)来调用许多未知的代码,我们应该对在这些线程中执行的代码能否表现出正确的行为保持怀疑。像Swing事件线程这样的服务可能只是因为某个编写不当的事件处理器抛出NullPointerException而失败,这种情况是非常糟糕的。因此,这些线程应该在try-catch代码块中调用这些任务,这样就能捕获那些未检查的异常了,或者也可以使用try-finally代码块来确保框架能够知道线程非正常退出的情况,并做出正确的响应。在这种情况下,你或许会考虑捕获RuntimeException,即当通过Runnable这样的抽象机制来调用未知的和不可信的代码时。

下面的片段给出了如何在线程池内部构建一个工作者线程。如果任务抛出了一个未检查异常,那么它将使线程终结,但会首先通知框架该线程已经终结。然后,框架可能会用新的线程来代替这个工作线程,也可能不会,因为线程池正在关闭,或者当前已有足够多的线程能满足需要。ThreadPoolExecutor和Swing都通过这项技术来确保行为糟糕的任务不会影响到后续任务的执行。当编写一个向线程池提交任务的工作者线程类时,或者调用不可信的外部代码时(例如动态加载的插件),使用这些方法中的某一种可以避免某个编写得糟糕的任务或插件不会影响调用它的整个线程。

1
2
3
4
5
6
7
8
9
10
11
public void run(){
Throwable thrown=null
try{
while(!isInterrupted())
runTask(getTaskFromWorkQueue());
} catch(Throwable e){
thrown = e;
} finally {
threadExited(this, thrown);
}
}
未捕获异常的处理

在Thread API中同样提供了UncaughtExceptionHandler,它能检测出某个线程由于未捕获的异常而终结的情况。这两种方法是互补的,通过将二者结合在一起,就能有效地防止线程泄漏问题。

当一个线程由于未捕获异常而退出时,JVM会把这个事件报告给应用程序提供的UncaughtExceptionHandler异常处理器。如果没有提供任何异常处理器,那么默认的行为是将栈追踪信息输出到System.err。

1
2
3
4
5
6
7
8
9
10
11
12
@FunctionalInterface
public interface UncaughtExceptionHandler {
/**
* Method invoked when the given thread terminates due to the
* given uncaught exception.
* <p>Any exception thrown by this method will be ignored by the
* Java Virtual Machine.
* @param t the thread
* @param e the exception
*/
void uncaughtException(Thread t, Throwable e);
}

异常处理器如何处理未捕获异常,取决于对服务质量的需求。最常见的响应方式是将一个错误信息以及相应的栈追踪信息写入应用程序日志中。异常处理器还可以采取更直接的响应,例如尝试重新启动线程,关闭应用程序,或者执行其他修复或诊断等操作。

在运行时间较长的应用程序中,通常会为所有线程的未捕获异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。

要为线程池中的所有线程设置一个UncaughtExceptionHandler,需要为ThreadPool-Executor的构造函数提供一个ThreadFactory。(与所有的线程操控一样,只有线程的所有者能够改变线程的UncaughtExceptionHandler。)标准线程池允许当发生未捕获异常时结束线程,但由于使用了一个try-finally代码块来接收通知,因此当线程结束时,将有新的线程来代替它。如果没有提供捕获异常处理器或者其他的故障通知机制,那么任务会悄悄失败,从而导致极大的混乱。如果你希望在任务由于发生异常而失败时获得通知,并且执行一些特定于任务的恢复操作,那么可以将任务封装在能捕获异常的Runnable或Callable中,或者改写ThreadPoolExecutor的afterExecute方法。

令人困惑的是,只有通过execute提交的任务,才能将它抛出的异常交给未捕获异常处理器,而通过submit提交的任务,无论是抛出的未检查异常还是已检查异常,都将被认为是任务返回状态的一部分。如果一个由submit提交的任务由于抛出了异常而结束,那么这个异常将被Future.get封装在ExecutionException中重新抛出。

JVM 关闭

JVM既可以正常关闭,也可以强行关闭。正常关闭的触发方式有多种,包括:当最后一个“正常(非守护)”线程结束时,或者当调用了System.exit时,或者通过其他特定于平台的方法关闭时(例如发送了SIGINT信号或键入Ctrl-C)。虽然可以通过这些标准方法来正常关闭JVM,但也可以通过调用Runtime.halt或者在操作系统中“杀死”JVM进程(例如发送SIGKILL)来强行关闭JVM。

关闭钩子

在正常关闭中,JVM首先调用所有已注册的关闭钩子(Shutdown Hook)。关闭钩子是指通过Runtime.addShutdownHook注册的但尚未开始的线程。JVM并不能保证关闭钩子的调用顺序。JVM并不会停止或中断任何在关闭时仍然运行的应用程序线程。当JVM最终结束时,这些线程将被强行结束。如果关闭钩子或终结器没有执行完成,那么正常关闭进程“挂起”并且JVM必须被强行关闭。当被强行关闭时,只是关闭JVM,而不会运行关闭钩子。

关闭钩子应该是线程安全的:它们在访问共享数据时必须使用同步机制,并且小心地避免发生死锁,这与其他并发代码的要求相同。而且,关闭钩子不应该对应用程序的状态(例如,其他服务是否已经关闭,或者所有的正常线程是否已经执行完成)或者JVM的关闭原因做出任何假设,因此在编写关闭钩子的代码时必须考虑周全。最后,关闭钩子必须尽快退出,因为它们会延迟JVM的结束时间,而用户可能希望JVM能尽快终止。

关闭钩子可以用于实现服务或应用程序的清理工作,例如删除临时文件,或者清除无法由操作系统自动清除的资源。

由于关闭钩子将并发执行,因此在关闭日志文件时可能导致其他需要日志服务的关闭钩子产生问题。为了避免这种情况,关闭钩子不应该依赖那些可能被应用程序或其他关闭钩子关闭的服务。实现这种功能的一种方式是对所有服务使用同一个关闭钩子(而不是每个服务使用一个不同的关闭钩子),并且在该关闭钩子中执行一系列的关闭操作。这确保了关闭操作在单个线程中串行执行,从而避免了在关闭操作之间出现竞态条件或死锁等问题。无论是否使用关闭钩子,都可以使用这项技术,通过将各个关闭操作串行执行而不是并行执行,可以消除许多潜在的故障。当应用程序需要维护多个服务之间的显式依赖信息时,这项技术可以确保关闭操作按照正确的顺序执行。

守护线程

守护线程的作用:希望创建一个线程来执行一些辅助工作,但又不希望这个线程阻碍JVM的关闭。

线程可分为两种:普通线程和守护线程。在JVM启动时创建的所有线程中,除了主线程以外,其他的线程都是守护线程(例如垃圾回收器以及其他执行辅助工作的线程)。当创建一个新线程时,新线程将继承创建它的线程的守护状态,因此在默认情况下,主线程创建的所有线程都是普通线程。

普通线程与守护线程之间的差异仅在于当线程退出时发生的操作。当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM会正常退出操作。当JVM停止时,所有仍然存在的守护线程都将被抛弃——既不会执行finally代码块,也不会执行回卷栈,而JVM只是直接退出。

我们应尽可能少地使用守护线程——很少有操作能够在不进行清理的情况下被安全地抛弃。

守护线程通常不能用来替代应用程序管理程序中各个服务的生命周期。

终结器

当不再需要内存资源时,可以通过垃圾回收器来回收它们,但对于其他一些资源,例如文件句柄或套接字句柄,当不再需要它们时,必须显式地交还给操作系统。为了实现这个功能,垃圾回收器对那些定义了finalize方法的对象会进行特殊处理:在回收器释放它们后,调用它们的finalize方法,从而保证一些持久化的资源被释放。

由于终结器可以在某个由JVM管理的线程中运行,因此终结器访问的任何状态都可能被多个线程访问,这样就必须对其访问操作进行同步。终结器并不能保证它们将在何时运行甚至是否会运行,并且复杂的终结器通常还会在对象上产生巨大的性能开销。要编写正确的终结器是非常困难的。在大多数情况下,通过使用finally代码块和显式的close方法,能够比使用终结器更好地管理资源。唯一的例外情况在于:当需要管理对象,并且该对象持有的资源是通过本地方法获得的。基于这些原因以及其他一些原因,我们要尽量避免编写或使用包含终结器的类(除非是平台库中的类)

线程池的使用

  • 介绍对线程池进行配置和调优的高级选项
  • 在使用任务执行框架时需要注意的各种危险
  • 一些使用 Executor 的高级示例

在任务与执行策略之间的隐形耦合

Executor 框架可以将任务提交与任务的执行策略解耦开来,这种说法多少有些言过其实。

并非所有的任务都能适用所有的执行策略,有些类型的任务需要明确的指定任务的执行策略:

  • 依赖性任务。大多数的任务都是独立的,它们不依赖于其他任务的执行顺序、执行结果或其他效果。如果提交给线程池的任务依赖其他任务,那么就隐含的给执行策略带来约束,此时必须小心的维持这些执行策略以避免产生活跃性问题。
  • 使用线程封闭机制的任务。这种情形将在任务与执行策略之间形成隐式的耦合——任务要求其执行所在的Executor是单线程的。如果将Executor从单线程环境改为线程池环境,那么将会失去线程安全性。
  • 对响应时间敏感的任务。如果将一个运行时间较长的任务提交到单线程的Executor中,或者将多个运行时间较长的任务提交到一个只包含少量线程的线程池中,那么将降低由该Executor管理的服务的响应性。
  • 使用 ThreadLocal 的任务。使用ThreadLocal的任务。ThreadLocal使每个线程都可以拥有某个变量的一个私有“版本”。只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用ThreadLocal才有意义,而在线程池的线程中不应该使用ThreadLocal在任务之间传递值。

只有当任务都是同类型的并且相互独立时,线程池的性能才能达到最佳。如果将运行时间较长的与运行时间较短的任务混合在一起,那么除非线程池很大,否则将可能造成“拥塞”。如果提交的任务依赖于其他任务,那么除非线程池无限大,否则将可能造成死锁。

在一些任务中,需要拥有或排除某种特定的执行策略。如果某些任务依赖于其他的任务,那么会要求线程池足够大,从而确保它们依赖任务不会被放入等待队列中或被拒绝,而采用线程封闭机制的任务需要串行执行。通过将这些需求写入文档,将来的代码维护人员就不会由于使用了某种不合适的执行策略而破坏安全性或活跃性。

线程饥饿死锁

在更大的线程池中,如果所有正在执行任务的线程都由于等待其他仍处于工作队列中的任务而阻塞,那么会发生同样的问题。这种现象被称为线程饥饿死锁(Thread Starvation Deadlock),只要线程池中的任务需要无限期地等待一些必须由池中其他任务才能提供的资源或条件,例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁。

示例 ThreadDeadLock 展示了线程饥饿死锁的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Terrible
public class ThreadDeadLock {

static ExecutorService executor = Executors.newSingleThreadExecutor();

static class LoadFileTask implements Callable<String> {

private final String fileName;

public LoadFileTask(String fileName) {
this.fileName = fileName;
}

@Override
public String call() throws Exception {
return "";
}
}

static class RenderPageTask implements Callable<String> {

@Override
public String call() throws Exception {

Future<String> header = executor.submit(new LoadFileTask("header.html"));
Future<String> footer = executor.submit(new LoadFileTask("footer.html"));

String page = renderBody();

// 将发生死锁,因为任务在等待自任务的结果
return header.get() + page + footer.get();
}

private String renderBody() {
return "";
}
}

@Test
@DisplayName("test_dead_lock")
void test_dead_lock() throws Exception {
RenderPageTask renderPageTask = new RenderPageTask();
renderPageTask.call();
}
}

除了在线程池大小上的显式限制外,还可能由于其他资源上的约束而存在一些隐式限制。

运行时间较长的任务

如果任务阻塞的时间过长,那么即使不出现死锁,线程池的响应性也会变得糟糕。执行时间较长的任务不仅会造成线程池堵塞,甚至还会增加执行时间较短任务的服务时间。如果线程池中线程的数量远小于在稳定状态下执行时间较长任务的数量,那么到最后可能所有的线程都会运行这些执行时间较长的任务,从而影响整体的响应性。

有一项技术可以缓解执行时间较长任务造成的影响,即限定任务等待资源的时间,而不要无限制地等待。在平台类库的大多数可阻塞方法中,都同时定义了限时版本和无限时版本,例如Thread.join、BlockingQueue.put、CountDownLatch.await以及Selector.select等。如果等待超时,那么可以把任务标识为失败,然后中止任务或者将任务重新放回队列以便随后执行。这样,无论任务的最终结果是否成功,这种办法都能确保任务总能继续执行下去,并将线程释放出来以执行一些能更快完成的任务。

如果在线程池中总是充满了被阻塞的任务,那么也可能表明线程池的规模过小。

设置线程池的大小

线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。

幸运的是,要设置线程池的大小也并不困难,只需要避免“过大”和“过小”这两种极端情况。如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。如果线程池过小,那么将导致许多空闲的处理器无法执行工作,从而降低吞吐率。

要想正确地设置线程池的大小,必须分析计算环境、资源预算和任务的特性。

  • 在部署的系统中有多少个CPU?
  • 多大的内存?
  • 任务是计算密集型、I/O密集型还是二者皆可?
  • 它们是否需要像JDBC连接这样的稀缺资源?

如果需要执行不同类别的任务,并且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。

  • 对于计算密集型的任务,在拥有Ncpu个处理器的系统上,当线程池的大小为Ncpu+1时,通常能实现最优的利用率。(即使当计算密集型的线程偶尔由于页缺失故障或者其他原因而暂停时,这个“额外”的线程也能确保CPU的时钟周期不会被浪费。)

  • 对于包含I/O操作或者其他阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。要正确地设置线程池的大小,你必须估算出任务的等待时间与计算时间的比值。要使处理器达到期望的使用率,线程池的最优大小等于:

$$
N_{cpu} = Number \ of \ CPUs
\
U_{cpu} = target \ CPU \ utilization, 0 <= U_{cpu} <= 1
\
\frac wc = ratio \ of \ wait \ time \ to \ comoute \ time
\


\
N_{threads} = N_{cpu} * U{cpu} * ( 1 + \frac wc )
$$

可以通过 Runtime 来获得 CPU 的数目

1
int cpus = Runtime.getRuntime().availableProcessors();

当然,CPU周期并不是唯一影响线程池大小的资源,还包括内存、文件句柄、套接字句柄和数据库连接等。计算这些资源对线程池的约束条件是更容易的:计算每个任务对该资源的需求量,然后用该资源的可用总量除以每个任务的需求量,所得结果就是线程池大小的上限。

配置 ThreadPoolExecutor

ThreadPoolExecutor是一个灵活的、稳定的线程池,允许进行各种定制。

如果默认的执行策略不能满足需求,那么可以通过ThreadPoolExecutor的构造函数来实例化一个对象,并根据自己的需求来定制,并且可以参考Executors的源代码来了解默认配置下的执行策略,然后再以这些执行策略为基础进行修改。

ThreadPoolExecutor的通用构造函数:

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
....
}

线程的创建与销毁

线程池的基本大小(Core Pool Size)、最大大小(Maximum Pool Size)以及存活时间等因素共同负责线程的创建与销毁。

  • 基本大小也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。
    • 在创建ThreadPoolExecutor初期,线程并不会立即启动,而是等到有任务提交时才会启动,除非调用prestartAllCoreThreads。
    • 开发人员以免有时会将线程池的基本大小设置为零,从而最终销毁工作者线程以免阻碍JVM的退出。
  • 线程池的最大大小表示可同时活动的线程数量的上限。
  • 如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。

通过调节线程池的基本大小和存活时间,可以帮助线程池回收空闲线程占有的资源,从而使得这些资源可以用于执行其他工作。(显然,这是一种折衷:回收空闲线程会产生额外的延迟,因为当需求增加时,必须创建新的线程来满足需求。)

  • newFixedThreadPool工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。
  • newCachedThreadPool工厂方法将线程池的最大大小设置为Integer.MAX_VALUE,而将基本大小设置为零,并将超时设置为1分钟,这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩。其他形式的线程池可以通过显式的ThreadPoolExecutor构造函数来构造。

管理队列任务

在有限的线程池中会限制可并发执行的任务数量。(单线程的Executor是一种值得注意的特例:它们能确保不会有任务并发执行,因为它们通过线程封闭来实现线程安全性。)

ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。基本的任务排队方法有3种:无界队列、有界队列和同步移交(SynchronousHandoff)。队列的选择与其他的配置参数有关,例如线程池的大小等。

newFixedThreadPool和newSingleThreadExecutor在默认情况下将使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理它们的速度,那么队列将无限制地增加。

一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue。

有界队列的问题:当队列填满后,新的任务该怎么办?饱和策略可以解决这个问题。

在使用有界的工作队列时,队列的大小与线程池的大小必须一起调节。如果线程池较小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时还可以减少上下文切换,但付出的代价是可能会限制吞吐量。

对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue才有实际价值。在newCachedThreadPool工厂方法中就使用了SynchronousQueue

当使用像LinkedBlockingQueueArrayBlockingQueue这样的FIFO(先进先出)队列时,任务的执行顺序与它们的到达顺序相同。如果想进一步控制任务执行顺序,还可以使用PriorityBlockingQueue,这个队列将根据优先级来安排任务。任务的优先级是通过自然顺序或Comparator(如果任务实现了Comparable)来定义的。

对于Executor, newCachedThreadPool工厂方法是一种很好的默认选择,它能提供比固定大小的线程池更好的排队性能。

只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列就可能导致线程“饥饿”死锁问题。此时应该使用无界的线程池,例如newCachedThreadPool

饱和策略

当有界队列被填满后,饱和策略开始发挥作用。

ThreadPoolExecutor的饱和策略可以通过调用setRejectedExecutionHandler来修改。

DK提供了几种不同的RejectedExecutionHandler实现,每种实现都包含有不同的饱和策略:AbortPolicyCallerRunsPolicyDiscardPolicyDiscardOldestPolicy

中止(Abort)”策略是默认的饱和策略,该策略将抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。

1
2
3
4
5
6
7
8
9
10
public static class AbortPolicy implements RejectedExecutionHandler {

public AbortPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

“抛弃(Discard)”策略会悄悄抛弃该任务。

1
2
3
4
5
6
7
public static class DiscardPolicy implements RejectedExecutionHandler {

public DiscardPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

抛弃最旧的(Discard-Oldest)”策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。

1
2
3
4
5
6
7
8
9
10
11
public static class DiscardOldestPolicy implements RejectedExecutionHandler {

public DiscardOldestPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

如果工作队列是一个优先队列,那么“抛弃最旧的”策略将导致抛弃优先级最高的任务,因此最好不要将“抛弃最旧的”饱和策略和优先级队列放在一起使用。

“调用者运行(Caller-Runs)”策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。由于执行任务需要一定的时间,因此主线程至少在一段时间内不能提交任何任务,从而使得工作者线程有时间来处理完正在执行的任务。

1
2
3
4
5
6
7
8
9
10
public static class CallerRunsPolicy implements RejectedExecutionHandler {

public CallerRunsPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

当工作队列被填满后,没有预定义的饱和策略来阻塞execute。然而,通过使用Semaphore(信号量)来限制任务的到达率,就可以实现这个功能。如 BoundedExecutor 所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class BoundedExecutor {

private final ExecutorService executorService;
private final Semaphore semaphore;

public BoundedExecutor(ExecutorService executorService, Semaphore semaphore) {
this.executorService = executorService;
this.semaphore = semaphore;
}

public void submitTask(Runnable command) throws InterruptedException {
semaphore.acquire();
try {
executorService.execute(
() -> {
try {
command.run();
} finally {
semaphore.release();
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}

线程工厂

每当线程池需要创建一个新的线程时,都是通过线程工厂方法来完成的。在ThreadFactory中只定义了一个方法newThread,每当线程池需要创建一个新线程时都会调用这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
public interface ThreadFactory {

/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}

默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
@SuppressWarnings("removal")
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

通过指定一个线程工厂方法,可以定制线程池的配置信息。

在许多情况下都需要使用定制的线程工厂方法。

  • 希望为线程池中的线程指定一个UncaughtExceptionHandler,
  • 或者实例化一个定制的Thread类用于执行调试信息的记录。
  • 修改线程的优先级(这通常并不是一个好主意。)或者守护状态(同样,这也不是一个好主意。)。
  • 给线程取一个更有意义的名称,用来解释线程的转储信息和错误日志。

示例 MyThreadFactory 和 MyAppThread:

MyThreadFactory中给出了一个自定义的线程工厂。它创建了一个新的MyAppThread实例,并将一个特定于线程池的名字传递给MyAppThread的构造函数,从而可以在线程转储和错误日志信息中区分来自不同线程池的线程。在应用程序的其他地方也可以使用MyAppThread,以便所有线程都能使用它的调试功能。

MyAppThread中还可以定制其他行为,包括:为线程指定名字,设置自定义UncaughtExceptionHandler向Logger中写入信息,维护一些统计信息(包括有多少个线程被创建和销毁),以及在线程被创建或者终止时把调试消息写入日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class MyThreadFactory implements ThreadFactory {

private final String poolName;

public MyThreadFactory(String poolName) {
this.poolName = poolName;
}

@Override
public Thread newThread(Runnable r) {
return new MyAppThread(r, poolName);
}
}

public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();

public MyAppThread(Runnable r) {
this(r, DEFAULT_NAME);
}

public MyAppThread(Runnable runnable, String name) {
super(runnable, name + "-" + created.incrementAndGet());
setUncaughtExceptionHandler(
(t, e) -> log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e));
}

@Override
public void run() {
// Copy debug flag to ensure consistent value throughout.
boolean debug = debugLifecycle;
if (debug) {
log.log(Level.FINE, "Created " + getName());
}
try {
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug) {
log.log(Level.FINE, "Exiting " + getName());
}
}
}

public static int getThreadsCreated() {
return created.get();
}

public static int getThreadsAlive() {
return alive.get();
}

public static boolean getDebug() {
return debugLifecycle;
}

public static void setDebug(boolean b) {
debugLifecycle = b;
}
}

在调用构造函数后再定制ThreadPoolExecutor

在调用完ThreadPoolExecutor的构造函数后,仍然可以通过设置函数(Setter)来修改大多数传递给它的构造函数的参数(例如线程池的基本大小、最大大小、存活时间、线程工厂以及拒绝执行处理器(RejectedExecution Handler))。

在Executors中包含一个unconfigurableExecutorService工厂方法,该方法对一个现有的ExecutorService进行包装,使其只暴露出ExecutorService的方法,因此不能对它进行配置。newSingleThreadExecutor返回按这种方式封装的ExecutorService,而不是最初的ThreadPoolExecutor。

如果将ExecutorService暴露给不信任的代码,又不希望对其进行修改,就可以通过unconfigurableExecutorService来包装它。

扩展ThreadPoolExecutor

ThreadPoolExecutor是可扩展的,它提供了几个可以在子类化中改写的方法:beforeExecuteafterExecuteterminated,这些方法可以用于扩展ThreadPoolExecutor的行为。

在执行任务的线程中将调用beforeExecuteafterExecute等方法,在这些方法中还可以添加日志、计时、监视或统计信息收集的功能。无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。(如果任务在完成后带有一个Error,那么就不会调用afterExecute。)如果beforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。

在线程池完成关闭操作时调用terminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后。terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知、记录日志或者收集finalize统计信息等操作。

示例:给线程池添加统计信息

TimingThreadPool中给出了一个自定义的线程池,它通过beforeExecute、afterExecute和terminated等方法来添加日志记录和统计信息收集。为了测量任务的运行时间,beforeExecute必须记录开始时间并把它保存到一个afterExecute可以访问的地方。因为这些方法将在执行任务的线程中调用,因此beforeExecute可以把值保存到一个ThreadLocal变量中,然后由afterExecute来读取。在TimingThreadPool中使用了两个AtomicLong变量,分别用于记录已处理的任务数和总的处理时间,并通过terminated来输出包含平均任务时间的日志消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class TimingThreadPool extends ThreadPoolExecutor {

private static final ThreadLocal<Long> startTime = new ThreadLocal<>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();

@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s : end %s, time= %dns", t, r, taskTime));
} finally {
super.afterExecute(r, t);
}
}

@Override
protected void terminated() {
try {
log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}

递归算法的并行化

如果循环中的迭代操作都是独立的,并且不需要等待所有的迭代操作都完成再继续执行,那么就可以使用Executor将串行循环转化为并行循环。

在一些递归设计中同样可以采用循环并行化的方法。一种简单的情况是:在每个迭代操作中都不需要来自于后续递归迭代的结果。


, ,