Condition详解
在线程间通信方式总结中有一个需求:轮流打印奇数和偶数,我们用wait和notify实现了一下,但是这种方式存在弊端,就是不能精确控制唤醒哪个线程,比如现在有一个需求是轮流打印ABC该怎么办呢?
首先准备三个线程,分别执行打印方法,是一个死循环,每次休息一秒。
一、wait/notify实现轮流打印ABC三个字母
如果是不加任何控制策略的话,必然是无法保证按照A
B
C
的顺序依次循环执行的,比如:
执行结果是:
1694620367
1 | A |
那么如何保证按照我们这个顺序执行呢?如果还是沿用这个方法,只能这样写:
思想也很简单,就是搞一个变量,规定只有0的时候才打印A
,只有1的时候才打印B
,只有2的时候才打印C
。那么,对于打印A
的线程,只要不是0就wait()
等待,一旦等于0就打印,并且加一;对于打印B
的线程,只要不是1就wait()
等待,一旦等于1就打印,并且加一。剩下同理。
这样,由于signal
是一个成员变量,初始值为0.那么三个线程中PrintB
和PrintC
都等待,只有PrintA
能执行打印,然后加为1,唤醒所有等待的线程来判断,此时打印A
的线程和打印C
的线程发现都不符合他们打印的条件,都进入了while
中等待了,只有打印B
的线程发现等于1,则不进入while
循环,打印再加一。依次反复,可以得到顺序打印的A
、B
、C
。
这种方式显然很不好,是靠notifyAll
来唤醒所有线程来实现的,那么我们能不能唤醒指定的线程呢?这样处理起来更加优雅效率也会更高!
二、Condition来实现
达到了上面一样的效果。此时,我们发现它的强大之处在于我们可以指定哪个线程唤醒了,这看起来是一点点进步,但是我们学习多线程那么长时间了,在我看来,是很大的一个进步,因为之前用notify
是随机唤醒一个,notifyAll
是唤醒全部,总是不能受我们的完全控制,虽然说线程的执行本身就是不确定的,因此不确定性是他们的天生属性,但是在某些场景下我们确实需要一个高效并且优雅的实现可控的方式,所以是很重要的。
它这种功能可以给我们带来什么呢?下面用它实现一个有界队列。(关于生产者消费者模式,当然也可以用了,写法非常简单,就是对照上面的例子改一下即可。)
三、Condition实现有界队列
我们已经接触了线程池,它里面涉及队列,有很多种队列,最常见的是ArrayBlockingQueue
以及LinkedBlockingQueue
,他们的源码中其实也是靠Condition
来实现阻塞塞值和阻塞取值的。现在我们也来实现一个比较简单的ArrayBlockingQueue
吧!
首先明确一下队列是FIFO
的,即先进先出,那么我们要保证先插入的要先弹出。其次要注意的是当没有元素的时候要阻塞,即等待有元素了才能获取;放入元素也是同理,要等有空位的才能重新放入。
如何实现以上这种数据结构呢?关于先进的先出来,我们可以用两个指针来实现,一个叫做addIndex
,一个叫做removeIndex
,初始都是指向第一个元素处。当塞进来一个元素,那么addIndex
就自增,当自增到最后一个位置,这个时候数组不一定是满的,因为有可能前面的值已经被取出去了,所以还需要一个变量count
来标志是否已经塞满,如果满了就阻塞,否则如果addIndex
到最后一个位置,就重新置0.
对于removeIndex
也是相同方向移除,假设最简单的情况,就是长度为5的数组,那么第一个元素放在0位置,第二个元素放在1位置,第三个元素放在2位置,此时要移除,那么第一个元素就是我们要的最先进来的元素,我们将其移除,并且removeIndex
加一指向第二个元素。如此反复执行。
代码:
1 | public class MyQueue { |
四、Condition原理概述
我们在上面的学习中看到,对于一个线程,我们就要准备一个Condition
对象,并且还要用可重入锁ReentrantLock
来实现加锁:
1 | public Lock lock = new ReentrantLock(); |
它的原理是什么呢?
我们看到,创建一个condition
对象是通过lock.newCondition()
,而这个方法实际上是会new
出一个ConditionObject
对象,该类是AQS
的一个内部类.
我们知道在锁机制的实现上,AQS
内部维护了一个同步队列,如果是独占式锁的话,所有获取锁失败的线程的尾插入到同步队列,同样的,condition
内部也是使用同样的方式,内部维护了一个 等待队列,所有调用condition.await
方法的线程会加入到等待队列中,并且线程状态转换为等待状态。另外注意到ConditionObject
中有两个成员变量:
1 | /** First node of condition queue. */ |
这样我们就可以看出来ConditionObject
通过持有等待队列的头尾指针来管理等待队列。主要注意的是Node
类复用了在AQS
中的Node
类。所以理解了AQS
就简单了。但是这个队列有一点不同,他是一个单向链表,而AQS
中的同步队列式一个双向链表。
同时还有一点需要注意的是:我们可以多次调用lock.newCondition()
方法创建多个condition
对象,也就是一个lock
可以持有多个等待队列。而在之前利用Object
的方式实际上是指在对象Object
对象监视器上只能拥有一个同步队列和一个等待队列,而并发包中的Lock
拥有一个同步队列和多个等待队列。示意图如下:
如图所示,ConditionObject
是AQS
的内部类,因此每个ConditionObject
能够访问到AQS
提供的方法,相当于每个Condition
都拥有所属同步器的引用。
好了,至此我们已经知道多次调用lock.newCondition()
方法创建多个condition
对象,并且实际上这个对象就是ConditionObject
。AQS维护的同步队列是一个双向链表结构,而这个Condition
对象维护的是一个单项链表结构。
五、await实现原理
当调用condition.await()
方法后会使得当前获取lock
的线程进入到等待队列,如果该线程能够从await()
方法返回的话一定是该线程获取了与condition
相关联的lock
。await()
方法源码为:
在当前线程调用condition.await()
方法后,会使得当前线程释放lock
然后加入到等待队列中,直至被signal
/signalAll
后会使得当前线程从等待队列中移至到同步队列中去,直到获得了lock
后才会从await
方法返回(跳出while
循环那就不用继续等待了呗),或者在等待时被中断会做中断处理。
所以对于await()
方法来说,它实现的功能为:将要等待的线程封装成节点尾插入到等待队列中,然后跟wait
一样释放这个等待线程的锁。这些做完了之后还需要while
循环判断是否已经在同步队列中,这个isOnsyncQueue
是由下面说到的signal
方法触发的,由于此时还没有signal
所以陷在死循环中出不来,就调用lockSupport.park
方法使他进入等待状态;当有signal
或者有中断发生的时候,就跳出循环,继续执行,此时如果是signal
触发的,就会进入下一个if
,那就调用acquireQueue
方法,这个方法在我们之前说的AQS
中是提及的,主要思想是如果这个节点的前驱节点是head
那么就自旋获取锁,否则可能会阻塞。这里已经从大体上说明了这个方法的整体思路,下面继续详细分析分析。
在这段代码中,我们将知道:
- 是怎样将当前线程添加到等待队列中去的?
- 释放锁的过程?
- 怎样才能从
await
方法退出?
第一个问题:是怎样将当前线程添加到等待队列中去的?
这段代码就很容易理解了,将当前节点包装成Node
,如果等待队列的firstWaiter
为null
的话(等待队列为空队列),则将firstWaiter
指向当前的Node
,否则,更新lastWaiter
(尾节点)即可。就是通过尾插入的方式将当前线程封装的Node
插入到等待队列中即可,同时可以看出等待队列是一个不带头结点的链式队列,之前我们学习AQS
时知道同步队列是一个带头结点的链式队列,这是两者的一个区别。将当前节点插入到等待队列之后,会使当前线程释放lock
,由fullyRelease
方法实现,fullyRelease
源码为:
调用AQS
的模板方法release
方法释放AQS
的同步状态(这样也说明了退出await
方法必须是已经获得了condition
引用(关联)的lock
)并且唤醒在同步队列中头结点的后继节点引用的线程,如果释放成功则正常返回,若失败的话就抛出异常。到目前为止,这两段代码已经解决了前面的两个问题的答案了,还剩下第三个问题,怎样从await
方法退出?现在回过头再来看await
方法有这样一段逻辑:
很显然,当线程第一次调用condition.await()
方法时,会进入到这个while()
循环中,因为判断的条件是这个线程是否在同步队列中,我们这个刚进等待队列,咋可能在同步队列。
然后通过LockSupport.park(this)
方法使得当前线程进入等待状态,那么要想退出这个await
方法第一个前提条件自然而然的是要先退出这个while
循环,出口就只剩下两个地方:
- 逻辑走到
break
退出while
循环; while
循环中的逻辑判断为false
。
再看代码出现第1种情况的条件是当前等待的线程被中断后代码会走到break
退出,第二种情况是当前节点被移动到了同步队列中(即另外线程调用的condition
的signal
或者signalAll
方法),while
中逻辑判断为false
后结束while
循环。
总结下,就是当前线程被中断或者调用condition.signal
/condition.signalAll
方法当前节点移动到了同步队列后 ,这是当前线程退出await
方法的前提条件。
当退出while
循环后就会调用acquireQueued(node, savedState)
,这个方法在介绍AQS的底层实现时说过了,该方法的作用是在自旋过程中线程不断尝试获取同步状态,直至成功(线程获取到lock
)。
await
方法示意图如下图:
如图,调用condition.await
方法的线程必须是已经获得了lock
,也就是当前线程是同步队列中的头结点。调用该方法后会使得当前线程所封装的Node
尾插入到等待队列中。
此外,await
也支持超时等待和不响应中断,这里不再赘述。
六、signal/signalAll实现原理
调用condition
的signal
或者signalAll
方法可以将等待队列中等待时间最长的节点移动到同步队列中,使得该节点能够有机会获得lock
。按照等待队列是先进先出(FIFO
)的,所以等待队列的头节点必然会是等待时间最长的节点,也就是每次调用condition
的signal
方法是将头节点移动到同步队列中。
signal
方法首先会检测当前线程是否已经获取lock
,如果没有获取lock
会直接抛出异常,如果获取的话再得到等待队列的头指针引用的节点,之后的操作的doSignal
方法也是基于该节点。下面我们来看看doSignal
方法做了些什么事情,doSignal
方法源码为:
具体逻辑请看注释,真正对头节点做处理的逻辑在transferForSignal
中,该方法源码为:
关键逻辑请看注释,这段代码主要做了两件事情1.将头结点的状态更改为CONDITION
;2.调用enq
方法,将该节点尾插入到同步队列中,关于enq
方法请看AQS
的底层实现这篇文章。现在我们可以得出结论:调用condition
的signal
的前提条件是当前线程已经获取了lock
,该方法会使得等待队列中的头节点即等待时间最长的那个节点移入到同步队列,而移入到同步队列后才有机会使得等待线程被唤醒,即从await
方法中的LockSupport.park(this)
方法中返回,从而才有机会使得调用await
方法的线程成功退出,此时就要回过头去再看看await
方法的后续处理流程了。signal
执行示意图如下图:
sigllAll
与sigal
方法的区别体现在doSignalAll
方法上,前面我们已经知道doSignal
方法只会对等待队列的头节点进行操作,而doSignalAll
只不过时间等待队列中的每一个节点都移入到同步队列中,即“通知”当前调用condition.await()
方法的每一个线程。