1. 简介
Condition
是一个接口,AbstractQueuedSynchronizer 中的ConditionObject
内部类实现了这个接口。Condition
声明了一组等待/通知
的方法,这些方法的功能与Object
中的wait/notify/notifyAll
等方法相似。这两者相同的地方在于,它们所提供的等待/通知
方法均是为了协同线程的运行秩序。只不过,Object 中的方法需要配合 synchronized 关键字使用,而 Condition 中的方法则要配合锁对象使用,并通过newCondition
方法获取实现类对象。除此之外,Condition 接口中声明的方法功能上更为丰富一些。比如,Condition 声明了具有不响应中断和超时功能的等待接口,这些都是 Object wait 方法所不具备的。
本篇文章是上一篇文章AbstractQueuedSynchronizer 原理分析 - 独占/共享模式的续篇,在学习 Condition 的原理前,建议大家先去了解 AbstractQueuedSynchronizer 同步队列相关原理。本篇文章会涉及到同步队列相关知识,这些知识在上一篇文章分析过。
关于Condition
的简介这里先说到这,接下来分析一下Condition
实现类ConditionObject
的原理。
2. 实现原理
ConditionObject
是通过基于单链表的条件队列来管理等待线程的。线程在调用await
方法进行等待时,会释放同步状态。同时线程将会被封装到一个等待节点中,并将节点置入条件队列尾部进行等待。当有线程在获取独占锁的情况下调用signal
或singalAll
方法时,队列中的等待线程将会被唤醒,重新竞争锁。另外,需要说明的是,一个锁对象可同时创建多个 ConditionObject 对象,这意味着多个竞争同一独占锁的线程可在不同的条件队列中进行等待。在唤醒时,可唤醒指定条件队列中的线程。其大致示意图如下:
以上就是 ConditionObject 所实现的等待/通知机制的大致原理,并不是很难理解。当然,在具体的实现中,则考虑的更为细致一些。相关细节将会在接下来一章中进行说明,继续往下看吧。
3. 源码解析
3.1 等待
ConditionObject 中实现了几种不同的等待方法,每种方法均有它自己的特点。比如await()
会响应中断,而awaitUninterruptibly()
则不响应中断。await(long, TimeUnit)
则会在响应中断的基础上,新增了超时功能。除此之外,还有一些等待方法,这里就不一一列举了。
在本节中,我将主要分析await()
的方法实现。其他的等待方法大同小异,就不一一分析了,有兴趣的朋友可以自己看一下。好了,接下来进入源码分析阶段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
|
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0;
while (!isOnSyncQueue(node)) { LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0)
reportInterruptAfterWait(interruptMode); }
private Node addConditionWaiter() { Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null;
if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null) return false;
if (node.next != null) return true; return findNodeFromTail(node); }
private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; }
while (!isOnSyncQueue(node)) Thread.yield(); } return false; }
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
static void selfInterrupt() { Thread.currentThread().interrupt(); }
|
3.2 通知
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do {
if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false;
Node p = enq(node); int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
|
看完了 signal 方法的分析,下面再来看看 signalAll 的源码分析,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); }
private void doSignalAll(Node first) { lastWaiter = firstWaiter = null;
do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
|
4. 其他
在我阅读 ConditionObject 源码时发现了一个问题 - await 方法竟然没有做同步控制。而在 signal 和 signalAll 方法开头都会调用 isHeldExclusively 检测线程是否已经获取了独占锁,未获取独占锁调用这两个方法会抛出异常。但在 await 方法中,却没有进行相关的检测。如果在正确的使用方式下调用 await 方法是不会出现问题的,所谓正确的使用方式指的是在获取锁的情况下调用 await 方法。但如果没获取锁就调用该方法,就会产生线程竞争的情况,这将会对条件队列的结构造成破坏。这里再来看一下新增节点的方法源码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
|
假如现在有线程 t1 和 t2,对应节点 node1 和 node2。线程 t1 获取了锁,而 t2 未获取锁,此时条件队列为空,即 firstWaiter = lastWaiter = null。演绎一下会导致条件队列被破坏的场景,如下:
- 时刻1:线程 t1 和 t2 同时执行到
if (t == null)
,两个线程都认为 if 条件满足 - 时刻2:线程 t1 初始化 firstWaiter,即将 firstWaiter 指向 node1
- 时刻3:线程 t2 再次修改 firstWaiter 的指向,此时 firstWaiter 指向 node2
如上,如果线程是按照上面的顺序执行,这会导致队列被破坏。firstWaiter 本应该指向 node1,但结果却指向了 node2,node1 被排挤出了队列。这样会导致什么问题呢?这样可能会导致线程 t1 一直阻塞下去。因为 signal/signalAll 是从条件队列头部转移节点的,但 node1 不在队列中,所以 node1 无法被转移到同步队列上。在不出现中断的情况下,node1 对应的线程 t1 会被永久阻塞住。
这里未对 await 方法进行同步控制,导致条件队列出现问题,应该算 ConditionObject 实现上的一个缺陷了。关于这个缺陷,博客园博主 活在夢裡 在他的文章 AbstractQueuedSynchronizer源码解读–续篇之Condition 中也提到了。并向 JDK 开发者提了一个 BUG,BUG 链接为 JDK-8187408,有兴趣的同学可以去看看。
5. 总结
到这里,Condition 的原理就分析完了。分析完 Condition 原理,关于 AbstractQueuedSynchronizer 的分析也就结束了。总体来说,通过分析 AQS 并写成博客,使我对 AQS 的原理有了更深刻的认识。AQS 是 JDK 中锁和其他并发组件实现的基础,弄懂 AQS 原理对后续在分析各种锁和其他同步组件大有裨益。
AQS 本身实现比较复杂,要处理各种各样的情况。作为类库,AQS 要考虑和处理各种可能的情况,实现起来可谓非常复杂。不仅如此,AQS 还很好的封装了同步队列的管理,线程的阻塞与唤醒等基础操作,大大降低了继承类实现同步控制功能的复杂度。所以,在本文的最后,再次向 AQS 的作者,Java 大师Doug Lea
致敬。
好了,本文到此结束,谢谢大家阅读。
参考