14.构建自定义的同步工具
类库中包含了许多存在状态依赖性的类,例如FutureTask,Semaphore和BlockingQueue等。在这些类中的一些操作中有着基于状态的前提条件,例如,不能从一个空的队列中删除元素,或者获取一个尚未结束的任务的计算结果,在这些操作可以执行之前,必须等到队列进入“非空”状态,或者任务进入“已完成”状态。
创建状态依赖类的最简单方法通常是在类库中现有的状态依赖类的基础上进行构造。本章将介绍实现状态依赖性的各种选择,以及在使用平台提供的状态依赖性机制时需要遵守的各项规则。
14.1 状态依赖性的管理
1.1 状态依赖的管理
对单线程的程序而言,如果基于状态的前提条件未得到满足,那么这个条件将永远无法成真,此时失败即可。
但是在并发程序中,基于状态的条件可能会由于其他线程的操作而改变:一个资源池在前几条指令之前还是空的,但现在却变为非空的,因为另外一个线程往里面添加了元素。对于并发对象,依赖于状态的方法,虽然有时可以在不满足前提条件的情况下选择失败, 不过更好的选择是等待前提条件为真。
状态依赖的操作可以一直阻塞直到可以继续执行,这比使他们先失败在实现起来要更为方便且更不容易出错。内置的条件队列可以使线程一直阻塞,直到对象进入某个进程可以继续执行的状态,并且当被阻塞的线程可以执行时再唤醒他们。为了突出搞笑的条件等待机制的价值。我们先介绍如何通过轮训与休眠等方式来解决状态依赖性的问题。
程序清单 14-1 可阻塞的状态依赖操作的结构
acquire lock object state while (precondition does not hold) { release lock wait until precondition might hold optionally fail if interrupted or timeout expires reacquire lock } perform action release lock
程序清单 14-1 的这种加锁模式有些不同寻常,因为锁是在操作执行过程中被释放与重新获取的。构成前提条件的状态变量必须由对象的锁保护起来,这样它们能在测试前提条件的过程中保持不变。如果前提条件尚未满足,就必须释放锁,让其他线程可以修改对象的状态。否则,前提条件就永远无发成真了。再次测试前提条件之前,必须要重新获得锁.
接下来以有届缓存的实现为例,介绍一下采用不同方式来处理前提条件失败。在每种实现中都扩展了程序清单14-2中的BaseBoundedBuffer,在该类中实现了一个基于数组的循环缓存,其中各个缓存状态变量(buf、head、tail、count)均有缓存的内置锁来保护。同时还提供了同步的doPut和doTake方法,并在子类中通过这些方法来实现put和take操作,底层状态对子类隐藏。
程序清单 14-2 有届缓存实现的基类
public class BaseBoundedBuffer<V> { private final V[] buf; private int tail; private int head; private int count; protected BaseBoundedBuffer(int capacity) { this.buf = (V[]) new Object[capacity]; } protected synchronized final void doPut(V v){ buf[tail] = v; if(++tail == buf.length) tail = 0; ++count; } protected synchronized final V doTake(){ V v = buf[head]; buf[head] = null; if (++head == buf.length) head = 0; -- count; return v; } public synchronized final boolean isFull(){ return count == buf.length; } public synchronized final boolean isEmpty(){ return count == 0 ; } }
1.2 通过"轮询加休眠"实现拙劣的阻塞
程序清单 14-3 使用简单阻塞实现的有届缓存
public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> { protected SleepyBoundedBuffer(int capacity) { super(capacity); } public void put(V v) throws InterruptedException { //无限尝试将v添加入集合 while (true) { //获得锁 synchronized (this) { //如果不空,就添加进集合,退出循环 if (!isFull()) { doPut(v); return; } } //否则释放锁,休眠一段时间,给其他线程一些修改的机会. Thread.sleep(1000); } } public V take() throws InterruptedException { while (true) { synchronized (this) { if (!isEmpty()) return doTake(); } Thread.sleep(1000); } } }
另外,除了阻塞休眠等待的方式,还可以将前提条件的失败传递给调用者,由调用者控制是否进入休眠。如果调用者不进入休眠而直接重新调用的方式成为忙等待或者自旋等待。
如果缓存的状态在很长一段时间内不会发生变化,那么使用这个方式就会消耗大量的CPU时间。但是,在进入休眠的情况下,如果缓存的状态在刚调完sleep后就立即发生变化,那么将不必要地休眠一段时间。因此我们必须要在这两者中做出选择:要么容忍自旋导致的CPU始终周期浪费,要么容忍由于休眠而导致的低响应性。
1.3 条件队列
通过轮询与休眠来实现阻塞操作的过程需要付出大量的努力。如果存在某中挂起线程的方法,并且这种方法能够确保当某个条件成真时线程立即醒来,那么将极大地简化实现工作。这正是条件队列的功能。
“条件队列”这个名字的来源于:它使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真。传统队列的元素是一个个数据,而与之不同的是,条件队列中的元素是一个个正在等待相关条件的线程。
正如每个Java对象都可以作为一个锁,每个对象同样可以作为一个条件队列,并且Object中的wait、notify和notifuAll方法就构成了内部条件队列的API。对象的内置锁与其内部条件队列是相互联系的,要调用对象X中条件队列的任何一个方法,必须持有对象X上的锁。这是因为“等待由状态构成的条件”与“维护状态一致性”这两种机制必须被紧密地绑定在一起:只有能对状态进行检查时,才能在某个条件上等待,并且只有能修改状态时,才能从条件等待中释放另一个线程。
Object.wait会自动释放锁,并请求操作系统挂起当前的线程,从而使其他线程能够获得这个锁并修改对象的状态。当被挂起的线程醒来时,他将在返回之前重新获取锁。程序清单14-6中使用了wait和notifyAll实现了一个有届缓存。
程序清单 14-6 使用条件队列实现的有届缓存
public class BoundedBuffer<V> extends BaseBoundedBuffer <V>{ // 条件谓词:not-full (!isFull) // 条件谓词:not-empty (!isEmpty) protected BoundedBuffer(int capacity) { super(capacity); } public synchronized void put(V v) throws InterruptedException { while(isFull()) wait(); doPut(v); notifyAll(); } public synchronized V take() throws InterruptedException { while (isEmpty()) wait(); V v = doTake(); notifyAll(); return v; } }
最终这比使用休眠的有届缓存更加简单,并且更加高效(线程醒来的次数更少),响应性也更高(当发生特定状态变化时将立即醒来)。
注意:与使用休眠的有届缓存相比,条件多列并没有改变原来的语义。他只是在多个方面进行了优化:CPU效率,上下文切换开销和响应性等。如果某个功能无法通过“轮询+休眠”来实现,那么使用条件队列也无法实现,但条件队列是的在表达和管理状态依赖时更加的简单和高效。
14.2 使用条件队列
条件队列使构建高效以及高可响应性的状态依赖类变得更容易,但同时也很容易被不正确的使用。
虽然许多规则都能确保正确地使用条件队列,但在编译器或系统平台上却并没有强制要求遵循这些规则。(这也是为什么要尽量基于 LinkedBlockingQueue、Latch、Semaphore 和 FutureTask 等类来构造程序的原因之一,如果能避免使用条件队列,那么实现起来将容易许多)。
2.1 条件谓词
要想正确地使用条件队列,关键是找出对象在哪个条件谓词上等待。如果没有条件谓词,条件等待机制将无法发挥作用。条件谓词是使某个操作成为状态依赖操作的前提条件。对 take 方法来说,它的条件谓词就是 "缓存不为空",take 方法在执行之前必须首先测试该条件谓词。
在条件等待中存在一种重要的三元关系,包括:
- 加锁
- wait 方法
- 条件谓词
在条件谓词中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象与条件队列对象(即调用 wait 和 notify 等方法所在的对象)必须是同一个对象。
每一次 wait 调用都会隐式地与特定的条件谓词关联起来。当调用某个特定条件谓词的 wait 时,调用者必须已经持有与条件队列相关的锁,并且这个锁必须保护着构成条件谓词的状态变量。
注意,当线程从wait方法中被唤醒时,他在重新请求锁时不具有任何特殊的优先级,而要与任何其他尝试进入同步代码块的线程一起正常的在锁上竞争。
2.2 过早唤醒
wait的返回并不一定意味着线程正在等待的条件谓词已经变成真了。内置条件队列可以与多个条件谓词一起使用,这是一种常见的情况,如在 BoundedBuffer 中使用的条件队列与 "非满" 和 "非空"两个条件谓词相关。
当一个线程由于调用 notifyAll 而醒来时,并不意味该线程正在等待的条件谓词已经变成真了。这就像烤面包机和咖啡机共用一个铃声,当响铃后,你必须查看是哪个设备发出的铃声。另外,wait 方法还可以 "假装" 返回,而不是由于某个线程调用了 notify。
当执行控制重新进入调用 wait 的代码时,它已经重新获取了与条件队列相关联的锁。现在条件谓词是不是已经变为真了?或许。在发出通知的线程调用 notifyAll 时,条件谓词可能已经变成真,但在重新获取锁时将再次变为假。在线程被唤醒到 wait 重新获取锁的这段时间里,可能有其他线程已经获取了这个锁,并修改了对象的状态。或者,条件谓词从调用 wait 起根本就没有变成真。你并不知道另一个线程为什么调用 notify 或 notifyAll,也许是因为与同一条件队列相关的另一个条件谓词变成了真。
基于所有这些原因,每当线程从 wait 中唤醒时,都必须再次测试条件谓词,如果条件谓词不为真,那么就继续等待(或者失败)。由于线程在条件谓词不为真的情况下也可以反复地醒来,因此必须在一个循环中调用 wait,并在每次迭代中都测试条件谓词。
程序清单 14-7 状态依赖方法的标准形式
void stateDependentMethod() throws InterruptedException { // 必须通过一个锁来保护条件谓词 synchronized (lock) { while (!conditionPredicate()) lock.wait(); // 现在对象处于合适的状态 } }
当使用条件等待时(Object.wait或者Condition.await):
- 永远设置一个条件谓词---一些对状态的测试,线程执行前必须满足它;
- 永远在调用wait前测试条件谓词,并且从wait中返回后再次测试;
- 永远在循环中调用wait;
- 确保构成条件谓词的状态变量被锁保护,而这个锁正是与条件队列相关联的;
- 当调用wait、notify或者notifyAll时,要持有与条件队列相关联的锁;并且,
- 在检查条件谓词之后、开始执行被保护的逻辑之前,不要释放锁.
2.3 丢失信号
之前我们讨论活跃性故障,有死锁和活锁。另一种形式的活跃性故障是丢失信号。
丢失的信号是指:线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词。现在,线程将等待一个已经发生过的事件。这就好比在启动了烤面包机出去拿报纸,当你还在屋外时烤面包机的铃声响了,但你没有听到,因此还会坐在厨房的桌子前等着烤面包机的铃声。你可能会等待很长的时间(为了摆脱等待,其他人也不得不开始烤面包,从而使得情况变得糟糕,当铃声响起时,还要与别人争论这个面包是属于谁的。)如果线程 A 通知了一个条件队列,而线程 B 随后在这个条件队列上等待,那么线程 B 将不会立即醒来,而是需要另一个通知来唤醒它。编码错误(例如没有在调用 wait 之前检测条件谓词)会导致信号的丢失。如果按照程序清单 14-7 的方式来设计条件等待,则不会发生信号丢失的问题。
注意:保证notify一定在wait之后!!!
2.4 通知
到目前为止,我们介绍了条件等待的前一半内容:等待。另一半内容是:通知。在有界缓存中,如果缓存为空,在调用 take 时将阻塞。在缓存变为非空时,为了使 take 解除阻塞,必须确保在每条使缓存变为非空的代码路径中都发出一个通知。
在条件队列 API 中有两个发出通知的方法,即:
- 单次通知: notify
- 全部通知: notifyAll
无论调用哪个,都必须持有与条件队列对象相关联的锁。在调用 notify 时,JVM 会从这个条件队列上等待的多个线程中选择一个来唤醒,而调用 notifyAll 则会唤醒所有在这个条件队列上等待的线程。由于在调用 notify 或 notifyAll 时必须持有条件队列对象的锁,因此发出通知的线程应该尽快地释放锁,从而确保正在等待的线程尽可能快地解除阻塞。
在 BoundedBuffer 中很好的说明了为什么在大多数情况下应该优先选择notifyAll。 这里的条件队列用于两个不同的条件谓词:"非空" 和 "非满"。假设线程 A 在条件队列上等待条件谓词 PA,同时线程 B 在同一个条件队列上等待条件谓词 PB。现在,假设 PB 变成真,并且线程 C 执行一个 notify:JVM 将从它拥有的众多线程中选择一个并唤醒。如果选择了线程 A,那么它被唤醒,并且看到 PA 尚未变成真,因此将继续等待。同时,线程 B 本可以开始执行,却没有被唤醒。这并不是严格意义上的 "丢失信号",更像一种 "被劫持的" 信号,但导致的问题时相同的:线程正在等待一个已经(或本应该)发生过的信号。
只有同时满足以下两个条件时,才能用单一的 notify 而不是 notifyAll:
- 所有等待线程的类型都相同:只有一个条件谓词与条件队列相关,并且每个线程在从 wait 返回后将执行相同的操作。
- 单进单出:在条件变量上的每次通知,最多只能唤醒一个线程来执行。
由于大多数类并不满足这些需求,因此普遍认可的做法是优先使用 notifyAll 而不是 notify。虽然 notifyAll 可能比 notify 更低效,但却更容易确保类的行为是正确的。这种低效情况带来的影响有时候很小,但有时候却非常大。当只有一个线程可以执行时,如果有 10 个线程在一个条件队列上等待,那么调用 notifyAll 将唤醒每个线程,并使得它们在锁上发生竞争。然后,它们中的大多数或者全部又都回到休眠状态。因而,在每个线程执行一个事件的同时,将出现大量的上下文切换操作以及发生竞争的锁获取操作。(最坏的情况是,在使用 notifyAll 时将导致 O(n^2) 次唤醒操作,而实际上只需要 n 次唤醒操作就足够了)这是 "性能考虑因素与安全性考虑因素相互矛盾" 的另一种情况。
在 BoundedBuffer 的 put 和 take 方法中采用的通知机制是保守的,可以对其进行优化:首先,仅当缓存从空变为非空,或者从满转为非满时,才需要释放一个线程。并且,仅当 put 或 take 影响到这些状态转换时,才发出通知。这也被称为"条件通知"。虽然可以提升性能,但却很难正确地实现(而且还会使子类的实现变得复杂),因此在使用时需谨慎。
程序清单 14-8 使用条件通知
public synchronized void put(V v) throws InterruptedException { while (isFull()) wait(); boolean wasEmpty = isEmpty(); doPut(v); if (wasEmpty) notifyAll(); }
单次通知和条件通知都属于优化措施。通常,在使用这些优化措施时,应该遵循“首先使程序正确地执行,然后才使其运行的更快”这个原则。如果不正确地使用这些优化措施,很容易在程序中引入奇怪的活跃性故障。
2.5 示例
通过使用条件等待,ThreadGate实现了一个可打开和重新关闭的阀门,并提供了一个await方法,该方法能一直阻塞知道阀门被打开。这open中使用了notifyAll,因为这个类的语义不满足单次通知的条件。
程序清单 14-9 实现一个可重新关闭的阀门
@ThreadSafepublic class ThreadGate { // CONDITION-PREDICATE: opened-since(n) (isOpen || generation>n) @GuardedBy("this") private boolean isOpen; @GuardedBy("this") private int generation; public synchronized void close() { isOpen = false; } public synchronized void open() { ++generation; isOpen = true; notifyAll(); } // BLOCKS-UNTIL: opened-since(generation on entry) public synchronized void await() throws InterruptedException { int arrivalGeneration = generation; while (!isOpen && arrivalGeneration == generation) wait(); } }
await中的使用的条件谓词比较复杂,这种条件谓词时必需的,因为如果当阀门打开时有 N 个线程正在等待它,那么这些线程都应该被允许执行。然而,如果阀门在打开后又非常快速地关闭了,并且 await 方法只检查 isOpen,那么所有线程都可能无法释放:当所有线程收到通知时,将重新请求锁并退出 wait,而此时的阀门可能已经再次关闭了。因此,在 ThreadGate 中使用了一个更复杂的条件谓词:每次阀门关闭时,递增一个 Generation 计数器,如果阀门现在是打开的,或者阀门自从该线程到达后就一直是打开的,那么线程就可以通过 await。
由于 ThreadGate 只支持等待打开阀门,因此它只在 open 中执行通知。要想既支持等待打开又支持等待关闭,那么必须在 open 和 close 中都进行通知。这很好地说明了为什么在维护状态依赖的类时是非常困难的——当增加一个新的状态依赖操作时,可能需要对多条修改对象的代码路径进行改动,才能正确地执行通知。
可见在使用条件队列时,除了使用起来比较复杂且易出错,还有面临诸如子类安全问题以及条件队列封装问题等。
14.3 显示的Condition对象
正如在某些情况下,当内置锁过于不灵活时,可以使用显式锁。在内置条件队列不满足需求时,可以使用显示条件队列Condition。注意:Lock 是一种广义的内置锁,Condition 也是一种广义的内置条件队列。
程序清单 14-10 Condition接口
public interface Condition { void await() throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; long awaitNanos(long nanosTimeout) throws InterruptedException; void awaitUninterruptibly(); boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
内置条件队列存在一些缺陷,每个内置锁都只能有一个相关联的条件队列,因而在像 BoundedBuffer 这种类中,多个线程可能在同一个条件队列上等待不同的条件谓词,并且在最常见的加锁模式下公开条件队列对象。这些因素都使得无法满足在使用 notifyAll 时所有等待线程为同一类型的需求。如果想编写一个带有多个条件谓词的并发对象,或者想获得除了条件队列可见性之外的更多控制权,就可以使用显式的 Lock 和 Condition 而不是内置锁和条件队列,这是一种更灵活的选择。
一个 Condition 和一个 Lock 关联在一起,就像一个条件队列和一个内置锁相关联一样。要创建一个 Condition,可以在相关联的 Lock 上调用 Lock.newCondition 方法。Condition 同样比内置条件队列提供更丰富的功能:在每个锁上可存在多个等待、条件等待可以使可中断的或不可中断的、基于时限的等待,以及公平的或非公平的队列操作。
与内置条件队列不同的是,对于每个 Lock,可以有任意数量的 Condition 对象。Condition 对象继承了相关的 Lock 对象的公平性,对于公平的锁,线程会依照 FIFO 顺序从 Condition.await 中释放。
有界缓存的另一种实现,即使用两个 Condition,分别为 notFull 和 notEmpty,用于表示 "非满" 与 "非空" 两个条件谓词。当缓存为空时,take 将阻塞并等待 notEmpty,此时 put 向 notEmpty 发送信号,可以解除任何在 take 中阻塞的线程。
程序清单 14-11 使用显示条件队列的有届缓存
public class ConditionBoundedBuffer<T> { protected final Lock lock = new ReentrantLock(); // 条件谓词:not-full (count < items.length) private final Condition notFull = lock.newCondition(); // 条件谓词:not-empty (count > 0) private final Condition notEmpty = lock.newCondition(); @GuardedBy("lock") private final T[] items = (T[]) new Object[100]; @GuardedBy("lock") private int tail, head, count; // 阻塞并指导not-full public void put(T x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[tail] = x; if (++tail == items.length) tail = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } // 阻塞并直到not-empty public T take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); T x = items[head]; items[head] = null; if (++head == items.length) head = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
注意:在 Condition 对象中,与 wait、notify 和 notifyAll 方法对应的分别是 await、signal 和 signalAll。但是,Condition 对 Object 进行了扩展,因而它也包含 wait 和 notify 方法。一定要确保使用正确的版本—— await 和 signal。
ConditionBoundedBuffer的行为与BoundedBuffer相同,但它对条件队列的使用方式更容易理解——在分析使用多个 Condition 的类时,比分析一个使用单一内部队列加多个条件谓词的类简单得多。通过将两个条件谓词分开并放到两个等待线程集中,Condition 使其更容易满足单次通知的需求。signal 比 signalAll 更高效,它能极大地减少在每次缓存操作中发生的上下文切换与锁请求次数。
与内置锁和条件队列一样,当使用显式的 Lock 和 Condition 时,也必须满足锁、条件谓词和条件变量之间的三元关系。在条件谓词中包含的变量必须由 Lock 来保护,并且在检查条件谓词以及调用 await 和 signal 时,必须持有 Lock 对象。
在显示condition与内置条件队列进行选择时,与在ReentrantLock和synchronized之间选择是一样的:如果需要一些高级功能,例如使用公平的队列操作或者在每个锁上对应多个等待线程集,那么应该优先使用 Condition。但如果需要 ReentrantLock 的高级功能,并且已经使用了它,那么就应该选择 Condition。
14.4 同步器Synchronizer刨析
在 ReentrantLock 和 Semaphore 这两个接口之间存在许多的共同点。这两个类都可以用做一个 "阀门" 类,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用 lock 或 acquire 时成功返回),也可以等待(在调用 lock 或 acquire 时阻塞),还可以取消(在调用 tryLock 或 tryAcquire 时返回 false,表示在指定时间内锁是不可用的或者无法获得许可)。而且,这两个接口都支持可中断的、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作。
列出了这么多共性以后,你或许会认为 Semaphore 是基于 ReentrantLock 实现的,或者认为 ReentrantLock 实际上是带有一个许可的Semaphore,这些实现方式都是可行的,可以通过锁来实现计数信号量,以及可以通过计数信号量来实现锁。
程序清单 14-12 使用Lock来实现信号量
// 并非j.u.c的真实实现方式 @ThreadSafepublic class ConditionBoundedBuffer<T> { protected final Lock lock = new ReentrantLock(); // CONDITION PREDICATE: notFull (count < items.length) private final Condition notFull = lock.newCondition(); // CONDITION PREDICATE: notEmpty (count > 0) private final Condition notEmpty = lock.newCondition(); private static final int BUFFER_SIZE = 100; @GuardedBy("lock") private final T[] items = (T[]) new Object[BUFFER_SIZE]; @GuardedBy("lock") private int tail, head, count; // BLOCKS-UNTIL: notFull public void put(T x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[tail] = x; if (++tail == items.length) tail = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } // BLOCKS-UNTIL: notEmpty public T take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); T x = items[head]; items[head] = null; if (++head == items.length) head = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
事实上,以上代码并非j.u.c的真实实现方式,而是在实现时使用了一个共同的基类,即AbstractQueuedSynchronizer(AQS),这个类也是其他许多同步类的基类。AQS 是一个用于构建锁和同步器的框架,许多同步器都可以通过 AQS 很容易并且高效地构造出来。不仅 ReentrantLock 和 Semaphore 时基于 AQS 构建的,还包括CountDownLatch、ReentrantReadWriteLock、SynchronousQueue 和 FutureTask。
AQS 解决了在实现同步器时涉及的大量细节问题,例如等待线程采用 FIFO 队列操作顺序。在不同的同步器中还可以定义一些灵活的标准来判断某个线程时应该通过还是需要等待。
基于 AQS 来构建同步器能带来许多好处。它不仅能极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。在 SemaphoreOnLock中,获取许可的操作可能在两个时刻阻塞:当锁保护信号量状态时,以及当许可不可用时。而在基于AQS构建的同步器中,只有一个时刻发生阻塞,从而降低上下文切换的开销,并提高吞吐量。所有基于AQS构建的同步器都具有这个优势。
14.5 AbstractQueuedSychronizer (AQS)
5.1 AQS获取和释放操作的标准形式
大多数开发者都不会直接使用AQS,标准的同步器类的集合更够满足绝大多数情况的需求。但如果能了解标准同步器类的实现方式,那么对于理解它们的工作原理是非常有帮助的。
在基于 AQS 构建的同步器类中,最基本的操作包括各种形式的获取和释放操作,获取操作时一种依赖状态的操作,并且通常会阻塞。释放并不是一个可阻塞的操作,当执行释放操作时,所有在请求时被阻塞线程都会开始执行。
当使用锁或信号量时,获取操作的含义就很直观,即获取的是锁或许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态。在使用 CountDownLatch 时,获取操作意味着 "等待并直到闭锁到达结束状态",而在使用 FutureTask 时,则意味着 "等待并直到任务已经完成"。
如果一个类想成为状态依赖的类,那么它必须拥有一些状态。AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过 getState,setState 以及 compareAndSetState 等 protected类型方法来进行操作。
这个整数可以用于表示任意状态。例如,ReentrantLock 用它来表示所有线程已经重复获取该锁的次数,Semaphore 用它来表示剩余的许可数量,FutureTask 用它来表示任务的状态(尚未开始、正在运行、已完成以及已取消)。在同步器类中还可以自行管理一些额外的状态变量,例如 ReentrantLock 保存了锁的当前所有者信息,以区分某个获取操作时重入还是竞争的。
程序清单 14-13给出了程序清单 14-13 AQS中获取和释放操作的标准形式。根据同步器的不同,获取操作可以是一种独占操作(ReentrantLock),也可以是一种非独占操作(Semaphore 或 CountDownLatch)。一个获取操作包括两部分,首先,同步器判断当前状态是否允许获得操作,如果是,则允许线程执行,否则获取操作将阻塞或失败。这种判断是由同步器的语义决定的。例如,对于锁来说,如果它没有被某个线程持有,那么就能被成功地获取,而对于闭锁来说,如果它处于结束状态,那么也能被成功地获取。
程序清单 14-13 AQS中获取和释放操作的标准形式
boolean acquire() throws InterruptedException { while (state does not permit acquire) { if (blocking acquisition requested) { enqueue current thread if not already queued block current thread } else return failure } possibly update synchronization state dequeue thread if it was queued return success } void release() { update synchronization state if (new state may permit a blocked thread to acquire) unblock one or more queued threads }
其次,就是更新同步器的状态,获取同步器的某个线程可能会对其他线程能否也获取该同步器造成影响。例如,当获取一个锁后,锁的状态将从 "未被持有" 变成 "已被持有",而从 Semaphore 中获取一个许可后,将把剩余许可的数量减 1。然而,当一个线程获取闭锁时,并不会影响其他线程能否获取它,因此获取闭锁的操作不会改变闭锁的状态。
如果某个同步器支持独占的获取操作,那么就需要实现一些保护方法,包括 tryAcquire、tryRelease 和 isHeldExclusively 等,而对于支持共享获取的同步器,则应该实现 tryAcquireShared 和 tryReleaseShared 等方法。AQS 中的 acquire、acquireShared、release 和 releaseShared 等方法都将调用这些方法在子类中带有前缀 try 的版本来判断某个操作是否能执行。在同步器的子类中,可以根据其获取操作和释放操作的语义,使用 getState、setState 以及 compareAndSetState 来检查和更新状态,并通过返回的状态值来告知基类 "获取" 或 "释放" 同步器的操作是否成功。例如,如果 tryAcquireShared 返回一个负值,那么表示获取操作失败,返回零值表示同步器通过独占方式被获取,返回正值则表示同步器通过非独占方式被获取。对于 tryRelease 和 tryReleaseShared 方法来说,如果释放操作使得所有在获取同步器时被阻塞的线程恢复执行,那么这两个方法应该返回 true。
5.2 例子:一个简单的闭锁
程序清单 14-14 中的OneShotLatch 是一个使用AQS实现的二元闭锁。它包含两个公有方法:await 和 signal,分别对应获取操作和释放操作。起初,闭锁是关闭的,任何调用 await 的线程都将阻塞并直到闭锁被打开。当通过调用 signal 打开闭锁时,所有等待中的线程都将被释放,并且随后到达闭锁的线程也被允许执行。
程序清单 14-14 使用AQS实现的二元闭锁
public class OneShotLatch { private final Sync sync = new Sync(); public void signal() { sync.releaseShared(0); } public void await() throws InterruptedException { //此方法会请求tryAcquireShared() sync.acquireSharedInterruptibly(0); } private class Sync extends AbstractQueuedSynchronizer { @Override protected int tryAcquireShared(int ignored) { // 如果闭锁打开则成功(state == 1),否则失败 return (getState() == 1) ? 1 : -1; } @Override protected boolean tryReleaseShared(int ignored) { //闭锁现在已打开 setState(1); //现在其他线程可以获得比索 return true; } } }
在 OneShotLatch 中,AQS 状态用来表示闭锁状态——关闭(0)或者打开(1)。await 方法调用 AQS 的 acquireSharedInterruptibly,然后接着调用 OneShotLatch 中的 tryAcquireShared 方法。在 tryAcquireShared 的实现中必须返回一个值来表示该获取操作能够执行。如果之前已经打开了闭锁,那么 tryAcquireShared 将返回成功并允许线程通过,否则就会返回一个表示获取操作失败的值。acquireSharedInterruptibly 方法在处理失败的方式,是把这个线程放入等待线程队列中。signal 将调用 releaseShared,接下来又会调用 tryReleaseShared。在 tryReleaseShared 中将无条件地把闭锁的状态设置为打开,(通过返回值)表示该同步器处于完全被释放的状态。因而 AQS 让所有等待中的线程都尝试重新请求该同步器,并且由于 tryAcquireShared 将返回成功,因而现在的请求操作将成功。
OneShotLatch 可以通过拓展 AQS 来实现,而不是将一些功能委托给 AQS,但这种做法并不合理,这样做将破坏该接口(只有两个方法)的简洁性,并且虽然 AQS 的公共方法不允许调用者破坏闭锁的状态,但调用者仍可以容易地误用它们。java.util.concurrent 中的所有同步器都没有直接拓展 AQS,而都是将它们的相应功能委托给私有的 AQS 子类来实现。
14.6 j.u.c同步容器类中AQS
java.util.concurrent 中的许多可阻塞类,例如 ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch、SynchronousQueue 和 FutureTask 等,都是基于 AQS 构建的。接下来简单看一下每个类是如果使用AQS的,如果需要深入了解细节可阅读源码。
6.1 ReentrantLock
只支持独占方式的获取操作,因此实现了 tryAcquire、tryRelease 和 IsHeldExclusively。程序清单 14-15 给出了非公平版本的tryAcquire。ReentrantLock 将同步状态由于保存锁获取操作的次数,并且还维护了一个 owner 变量来保存当前所有者线程的标识符,只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量(由于受保护的状态操作方法具有 volatile 类型的内存读写语义,同时 ReentrantLock 只是在调用 getState 之后才会读取 owner 域,并且只有在调用 setState 之前才会写入 owner,因此 ReentrantLock 可以拥有同步状态的内存语义,因此避免了进一步的同步)。在 tryRelease 中检查 owner 域,从而确保当前线程在执行 unlock 操作之前已经获取了锁:在 tryAcquire 中将使用这个域来区分获取操作是重入的还是竞争的。
程序清单 14-15 基于非公平ReentrantLock实现tryAcquire
protected boolean tryAcquire(int ignored) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, 1)) { owner = current; return true; } } else if (current == owner) { setState(c + 1); return true; } return false; }
当一个线程尝试获取锁时,tryAcquire 将首先检查锁的状态。如果锁未被持有,它将尝试更新锁的状态以表示锁已经被持有。由于状态可能在检查后被立即修改,因此 tryAcquire 使用 compareAndSetState 来原子地更新状态,表示这个锁已经被占有,并确保状态在最后一次检查以后就没有被修改过。如果锁状态表明它已经被持有,并且如果当前线程是锁的拥有者,那么获取计数会递增,如果当前线程不是锁的拥有者,那么获取操作将失败。
ReentrantLock 还利用 AQS 对多个条件变量和多个等待线程集的内置支持。Lock.newCondition 将返回一个新的 ConditionObject 实例,这是 AQS 的一个内部类。
6.2 Semaphore 与 CountDownLatch
Semaphore 将 AQS 的同步状态用于保存当前可用许可的数量。tryAcquireShared 方法首先计算剩余许可的数量,如果没有足够的许可,那么会返回一个值表示获取操作失败。如果还有剩余的许可,那么 tryAcquireShared 会通过 compareAndSetState 以原子方式来降低许可的计数。如果这个操作成功(这意味着许可的计数从上一次读取后就没有被修改过),那么将返回一个值表示获取操作成功。在返回值中还包含了表示其他共享获取操作能否成功的信息,如果成功,那么其他等待的线程同样会解除阻塞。
当没有足够的许可,或者当 tryAcquireShared 可以通过原子方式来更新许可的计数以响应获取操作时,while 循环将终止。虽然对 compareAndSetState 的调用可能由于与另一个线程发生竞争而失败,并使其重新尝试,但在经过了一定次数的重试操作以后,在这两个结束条件中有一个会变为真。同样,tryReleaseShared 将增加许可计数,这可能会解除等待中线程的阻塞状态,并且不断地重试直到更新操作成功。tryReleaseShared 的返回值表示在这次释放操作中解除了其他线程的阻塞。
程序清单 14-16 Semaphore中的tryAcquireShared和tryReleaseShared
protected int tryAcquireShared(int acquires) { while (true) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) { return remaining; } } } protected boolean tryReleaseShared(int releases) { while (true) { int p = getState(); if (compareAndSetState(p, p + releases)) { return true; } } }
CountDownLatch 使用 AQS 的方式与 Semaphore 相似:在同步状态中保存的是当前的计数值,countDown 方法调用 release,从而导致计数值递减,并且当计数值为零时,解除所有等待线程的阻塞。await 调用 acquire,当计数器为零时,acquire 将立即返回,否则将阻塞。
6.3 FutureTask
FutureTask 不像一个同步器,但 Future.get 的语义非常类似于闭锁的语义——如果发生了某个事件(由 FutureTask 表示的任务执行完成或被取消),那么线程就可以恢复执行,否则这些线程将停留在队列中并直到该事件完成。
在 FutureTask 中,AQS 同步状态被用来保存任务的状态,例如,正在运行、已完成或已取消。FutureTask 还维护一些额外的状态变量,用来保存计算结果或抛出的异常。此外,还维护了一个引用,指向正在执行计算任务的线程(如果它当前处于运行状态),因而如果任务取消,该线程就会中断。
6.4 ReentrantReadWriteLock
ReadWriteLock 接口表示存在两个锁:一个读取锁和一个写入锁,但在基于 AQS 实现的 ReentrantReadWriteLock 中,单个 AQS 子类将同时管理读取加锁和写入加锁。ReentrantReadWriteLock 使用一个 16 位的状态来表示写入锁的计数,并且使用另一个 16 位的状态来表示读取锁的计数。在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法。
AQS 在内部维护一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问。在 ReentrantReadWriteLock 中,当锁可用时,如果位于队列头部的线程执行写入操作,那么线程会得到这个锁,如果位于头部的线程执行读取访问,那么队列中在第一个写入线程之前的所有线程都将获得这个锁。(这种机制并不允许选择读取线程优先或写入线程优先等策略,在某些读写锁实现中也采用这种方式。因此,要么 AQS 的等待队列不能是一个 FIFO 队列,要么使用两个队列。然而,在实际中很少需要这么严格的排序策略。如果非公平版本中 ReentrantReadWriteLock 无法提供足够的活跃性,那么公平版本的 ReentrantReadWriteLock 通常会提供令人满意的排序保证,并且能确保读取线程和写入线程不会发生饥饿问题。)
了解更多知识,关注我。 ???