Skip to main content

81. 并发工具优于 wait 和 notify

  本书第 1 版中专门用了一个条目来说明如何正确地使用 waitnotify ( Bloch01,详见第 50 条) 。它提出的建议仍然有效,并且在本条目的最后也对此做了概述,但是这条建议现在远远没有之前那么重要了。这是因为几乎没有理由再使用 waitnotify 了。自从 Java 5 发行版本开始, Java 平台就提供了更高级的并发工具,它们可以完成以前必须在 waitnotify 上手写代码来完成的各项工作。 既然正确地使用 wait 和 notify 比较困难,就应该用更高级的并发工具来代替。

  java.util.concurrent 中更高级的工具分成三类: Executor Framework 、并发集合(Concurrent Collection)以及同步器(Synchronizer),Executor Framework 只在第 80 条中简单地提到过,并发集合和同步器将在本条目中进行简单的阐述。

  并发集合为标准的集合接口(如 ListQueueMap )提供了高性能的并发实现。为了提供高并发性,这些实现在内部自己管理同步(详见第 79 条) 。因此, 并发集合中不可能排除并发活动;将它锁定没有什么作用,只会使程序的速度变慢。

  因为无法排除并发集合中的并发活动,这意味着也无法自动地在并发集合中组成方法调用。因此,有些并发集合接口已经通过依赖状态的修改操作(state-dependent modify operation)进行了扩展,它将几个基本操作合并到了单个原子操作中。事实证明,这些操作在并发集合中已经够用,它们通过缺省方法(详见第 21 条)被加到了 Java 8 对应的集合接口中。

  例如, MapputIfAbsent(key, value) 方法,当键没有映射时会替它插入一个映射,并返回与键关联的前一个值,如果没有这样的值,则返回 null 。这样就能很容易地实现线程安全的标准 Map 了。例如,下面这个方法模拟了 String.intern 的行为:

// Concurrent canonicalizing map atop ConcurrentMap - not optimal
private static final ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
public static String intern(String s) {
    String previousValue = map.putIfAbsent(s, s);
    return previousValue == null ? s : previousValue;
}

  事实上,你还可以做得更好。ConcurrentHashMap 对获取操作(如 get)进行了优化。因此,只有当 get 表明有必要的时候,才值得先调用 get ,再调用 putIfAbsent :

// Concurrent canonicalizing map atop ConcurrentMap - faster!
public static String intern(String s) {
    String result = map.get(s);
    if (result == null) {
        result = map.putIfAbsent(s, s);
        if (result == null)
            result = s;
    }
    return result;
}

  ConcurrentHashMap 除了提供卓越的并发性之外,速度也非常快。在我的机器上,上面这个优化过的 intern 方法比 String.intern 快了不止 6 倍(但是记住, String.intern 必须使用某种弱引用,避免随着时间的推移而发生内存泄漏)。并发集合导致同步的集合大多被废弃了。比如, 应该优先使用 ConcurrentHashMap ,而不是使用 Collections.synchronizedMap 只要用并发 Map 替换同步 Map ,就可以极大地提升并发应用程序的性能。

  有些集合接口已经通过阻塞操作(blocking operation )进行了扩展,它们会一直等待(或者阻塞)到可以成功执行为止。例如, BlockingQueue 扩展了 Queue 接口,并添加了包括 take 在内的几个方法,它从队列中删除并返回了头元素,如果队列为空,就等待。这样就允许将阻塞队列用于工作队列(work queue),也称作生产者一消费者队列 (producer-consumer queue),一个或者多个生产者线程(producer thread)在工作队列中添加工作项目,并且当工作项目可用时,一个或者多个消费者线程(consumer thread )则从工作队列中取出队列并处理工作项目。不出所料,大多数 ExecutorService 实现(包括 ThreadPoolExecutor)都使用了一个 BlockingQueue(详见第 80 条) 。

  同步器(Synchronizer)是使线程能够等待另一个线程的对象,允许它们协调动作。最常用的同步器是 CountDownLatchSemaphore 。较不常用的是 CyclicBarrierExchanger 。功能最强大的同步器是 Phaser

  倒计数锁存器(Countdown Latch)是一次性的障碍,允许一个或者多个线程等待一个或者多个其他线程来做某些事情。CountDownLatch 的唯一构造器带有一个 int 类型的参数,这个 int 参数是指允许所有在等待的线程被处理之前,必须在锁存器上调用 countDown 方法的次数。

  要在这个简单的基本类型之上构建一些有用的东西,做起来是相当容易。例如,假设想要构建一个简单的框架,用来给一个动作的并发执行定时。这个框架中只包含单个方法,该方法带有一个执行该动作的 executor ,一个并发级别(表示要并发执行该动作的次数),以及表示该动作的 runnable 。所有的工作线程( worker thread )自身都准备好,要在 timer 线程启动时钟之前运行该动作。当最后一个工作线程准备好运行该动作时, timer 线程就「发射发令枪(fires the starting gun)」,同时允许工作线程执行该动作。一旦最后一个工作线程执行完该动作,timer 线程就立即停止计时。直接在 wait 和 notify 之上实现这个逻辑会很棍乱,而在 CountDownLatch 之上实现则相当简单:

// Simple framework for timing concurrent execution
public static long time(Executor executor, int concurrency,
                        Runnable action) throws InterruptedException {
    CountDownLatch ready = new CountDownLatch(concurrency);
    CountDownLatch start = new CountDownLatch(1);
    CountDownLatch done = new CountDownLatch(concurrency);
    for (int i = 0; i < concurrency; i++) {
        executor.execute(() -> {
            ready.countDown();
            // Tell timer we're ready
            try {
                start.await();
                // Wait till peers are ready
                action.run();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            finally {
                done.countDown();
                // Tell timer we're done
            }
        });
    }
    ready.await();
    // Wait for all workers to be ready
    long startNanos = System.nanoTime();
    start.countDown();
    // And they're off!
    done.await();
    // Wait for all workers to finish
    return System.nanoTime() - startNanos;
}

  注意这个方法使用了三个倒计数锁存器。第一个是 ready ,工作线程用它来告诉 timer 线程它们已经准备好了。然后工作线程在第二个锁存器 start 上等待。当最后一个工作线程调用 ready.countDown 时, timer 线程记录下起始时间,并调用 start.countDown 允许所有的工作线程继续进行。然后 timer 线程在第三个锁存器 done 上等待,直到最后一个工作线程运行完该动作,并调用 done.countDown 。一旦调用这个, timer 线程就会苏醒过来,并记录下结束的时间。

  还有一些细节值得注意。传递给 time 方法的 executor 必须允许创建至少与指定并发级别一样多的线程,否则这个测试就永远不会结束。这就是线程饥饿死锁(thread starvationdeadlock) [Goetz06, 8.1.1] 。如果工作线程捕捉到 InterruptedException ,就会利用习惯用法 Thread.currentThread().interrupt() 重新断言中断,并从它的 run 方法中返回。这样就允许 executor 在必要的时候处理中断,事实上也理应如此。注意,我们利用了 System.nanoTime 来给活动定时。对于间歇式的定时,始终应该优先使用 System.nanoTime ,而不是使用 System.currentTimeMillis 。因为 System.nanoTime 更准确,也更精确,它不受系统的实时时钟的调整所影响。最后,注意本例中的代码并不能进行准确的定时,除非 action 能完成一定量的工作,比如一秒或者一秒以上。众所周知,准确的微基准测试十分困难,最好在专门的框架如 jmh 的协助下进行[JMH] 。

  本条目仅仅触及了并发工具的一些皮毛。例如,前一个例子中的那三个倒计数锁存器其实可以用一个 CyclicBarrier 或者 Phaser 实例代替。这样得到的代码更加简洁,但是理解起来比较困难。虽然你始终应该优先使用并发工具,而不是使用 wait 方法和 notify 方法,但可能必须维护使用了 wait 方法和 notify 方法的遗留代码。wait 方法被用来使线程等待某个条件。它必须在同步区域内部被调用,这个同步区域将对象锁定在了调用 wait 方法的对象上。下面是使用 wait 方法的标准模式:

// The standard idiom for using the wait method
synchronized (obj) {
    while (<condition does not hold>)
    obj.wait();
    // (Releases lock, and reacquires on wakeup)
    ... // Perform action appropriate to condition
}

  始终应该使用 wait 循环模式来调用 wait 方法;永远不要在循环之外调用 wait 方法。循环会在等待之前和之后对条件进行测试。

  在等待之前测试条件,当条件已经成立时就跳过等待,这对于确保活性是必要的。如果条件已经成立,并且在线程等待之前, notify (或者 notifyAll)方法已经被调用, 则无法保证该线程总会从等待中苏醒过来。

  在等待之前测试条件,如果条件不成立的话继续等待,这对于确保安全性是必要的。当条件不成立的时候,如果线程继续执行,则可能会破坏被锁保护的约束关系。当条件不成立时,有以下理由可使一个线程苏醒过来:

  • 另一个线程可能已经得到了锁,并且从一个线程调用 notify 方法那一刻起,到等待线程苏醒过来的这段时间中,得到锁的线程已经改变了受保护的状态。
  • 条件并不成立,但是另一个线程可能意外地或恶意地调用了 notify 方法。在公有可访问的对象上等待,这些类实际上把自己暴露在了这种危险的境地中。公有可访问对象的同步方法中包含的 wait 方法都会出现这样的问题。
  • 通知线程( notifying thread )在唤醒等待线程时可能会过于「慷慨」 。例如,即使只有某些等待线程的条件已经被满足,但是通知线程可能仍然调用 notifyAll 方法。
  • 在没有通知的情况下,等待线程也可能(但很少)会苏醒过来。这被称为“伪唤醒”(spurious wakeup) [POSIX, 11.4.3.6.1; Java9-api] 。

  一个相关的话题是,为了唤醒正在等待的线程,你应该使用 notify 方法还是 notifyAll 方法(回忆一下, notify 方法唤醒的是单个正在等待的线程,假设有这样的线程存在,而 notifyAll 方法唤醒的则是所有正在等待的线程) 。一种常见的说法是,应该始终使用 notifyAll 方法。这是合理而保守的建议。它总会产生正确的结果,因为它可以保证你将会唤醒所有需要被唤醒的线程。你可能也会唤醒其他一些线程,但是这不会影响程序的正确性。这些线程醒来之后,会检查它们正在等待的条件,如果发现条件并不满足,就会继续等待。

  从优化的角度来看,如果处于等待状态的所有线程都在等待同一个条件,而每次只有一个线程可以从这个条件中被唤醒,那么你就应该选择调用 notify 方法,而不是 notifyAll 方法。

  即使这些前提条件都满足,也许还是有理由使用 notifyAll 方法而不是 notify 方法。就好像把 wait 方法调用放在一个循环中,以避免在公有可访问对象上的意外或恶意的通知一样,与此类似,使用 notifyAll 方法代替 notify 方法可以避免来自不相关线程的意外或恶意的等待。否则,这样的等待会“吞掉”一个关键的通知,使真正的接收线程无限地等待下去。

  简而言之,直接使用 wait 方法和 notify 方法就像用“并发汇编语言”进行编程一样,而 java.util.concurrent 则提供了更高级的语言。 没有理由在新代码中使用 wait 方法和 notify 方法,即使有,也是极少的。 如果你在维护使用 wait 方法和 notify 方法的代码,务必确保始终是利用标准的模式从 while 循环内部调用 wait 方法。一般情况下,应该优先使用 notifyAll 方法,而不是使用 notify 方法。如果使用 notify 方法,请一定要小心,以确保程序的活性。