在前面的文章中已经介绍了wait和notify的基本知识,我们知道了他们都是Object这个基类中的方法,因此每个对象都天生拥有这两个方法,可见其重要性,在多线程的学习中,他们两兄弟可以实现线程之间的通信,当然了,还有许多其他的方式实现线程间通信,下面逐一击破。

一、前言

开发中不免会遇到需要所有子线程执行完毕通知主线程处理某些逻辑的场景。或者是线程 A 在执行到某个条件通知线程 B 执行某个操作。

二、等待通知机制

即用wait+notify来实现。

案例目标:两个线程交替打印奇偶数,一共100个。注意,这也是阿里的一道面试编程题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class TwoThreadWaitNotify {
private int count = 1;
private boolean flag = false;


public static void main(String[] args) {
TwoThreadWaitNotify twoThreadWaitNotify = new TwoThreadWaitNotify();
new Thread(new PrintOdd(twoThreadWaitNotify)).start();
new Thread(new PrintEven(twoThreadWaitNotify)).start();
}

//打印奇数
static class PrintOdd implements Runnable{

private TwoThreadWaitNotify twoThreadWaitNotify;

public PrintOdd(TwoThreadWaitNotify twoThreadWaitNotify){
this.twoThreadWaitNotify = twoThreadWaitNotify;
}

@Override
public void run() {
while(twoThreadWaitNotify.count <= 100){
synchronized (TwoThreadWaitNotify.class){
if(!twoThreadWaitNotify.flag){
System.out.println("奇数线程开始执行,打印:"+twoThreadWaitNotify.count);
twoThreadWaitNotify.count++;
twoThreadWaitNotify.flag = true;
TwoThreadWaitNotify.class.notify();
}else {
try {
TwoThreadWaitNotify.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

}
}
}

//打印偶数
static class PrintEven implements Runnable{

private TwoThreadWaitNotify twoThreadWaitNotify;

public PrintEven(TwoThreadWaitNotify twoThreadWaitNotify){
this.twoThreadWaitNotify = twoThreadWaitNotify;
}

@Override
public void run() {
while(twoThreadWaitNotify.count <= 100){
synchronized (TwoThreadWaitNotify.class){
if(twoThreadWaitNotify.flag){
System.out.println("偶数线程开始执行,打印:"+twoThreadWaitNotify.count);
twoThreadWaitNotify.count++;
twoThreadWaitNotify.flag = false;
TwoThreadWaitNotify.class.notify();
}else {
try {
TwoThreadWaitNotify.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}

运行结果部分截图:

image

这里的线程 A 和线程 B 都对同一个对象 TwoThreadWaitNotify.class 获取锁,A 线程调用了同步对象的 wait() 方法释放了锁并进入 WAITING 状态。

B 线程调用了 notify() 方法,这样 A 线程收到通知之后就可以从 wait() 方法中返回。

这里利用了 TwoThreadWaitNotify.class 对象完成了通信。

有一些需要注意:

  • wait()notify()notifyAll() 调用的前提都是获得了对象的锁(也可称为对象监视器)。
  • 调用 wait() 方法后线程会释放锁,进入WAITING 状态,该线程也会被移动到等待队列中。
  • 调用 notify() 方法会将等待队列中的线程移动到同步队列中,线程状态也会更新为 BLOCKED
  • wait() 方法返回的前提是调用 notify() 方法的线程释放锁,wait() 方法的线程获得锁。

⭐等待通知有着一个经典范式:

线程 A 作为消费者:

  1. 获取对象的锁。
    2.进入 while(判断条件),并调用 wait() 方法。
  2. 当条件满足跳出循环执行具体处理逻辑。

线程 B 作为生产者:

  1. 获取对象锁。
  2. 更改与线程 A 共用的判断条件。
  3. 调用 notify() 方法。

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//Thread A

synchronized(Object){
while(条件){
Object.wait();
}
//do something
}

//Thread B
synchronized(Object){
条件=false;//改变条件
Object.notify();
}

三、Join方式

image

这个之前也是提及过的,主要是可以打断主线程让子线程先执行完,但是缺点是粒度不够细腻,我不能控制子线程在某个点停一下让其他子线程执行。

image

核心逻辑:

1
2
3
while (isAlive()) {
wait(0);
}

join 线程完成后会调用 notifyAll() 方法,是在 JVM 实现中调用,所以这里看不出来。

四、volatile 共享内存

因为 Java 是采用共享内存的方式进行线程通信的,所以可以采用以下方式用主线程关闭 A 线程:

image

输出结果:

1
2
3
4
5
thread A正在运行。。。
thread A正在运行。。。
thread A正在运行。。。
thread A正在运行。。。
thread A执行完毕

这里的 flag 存放于主内存中,所以主线程和线程 A 都可以看到。flag 采用 volatile 修饰主要是为了内存可见性。

五、CountDownLatch 并发工具

在前面的文章中我们基本知道它的使用,但是很遗憾,没有找一个比较实际的场景来描述它的功能,下面我将以一个实际场景来用CountDownLatch来解决这个问题。

假设小明和小红是一对夫妻,他们两准备烧一个菜,就叫做青椒炒肉丝。我们知道,要想炒出青椒炒肉丝,需要切好青椒,然后切好肉丝,如果想要肉的质感爽嫩,需要用淀粉揉一揉,加点醋,加点料酒去去腥,并且弄好了之后需要先炒一下肉。最后两样都准备好之后:即切好的青椒和预热好的肉丝,那么就可以合在一起炒一下出锅了。

假设切青椒需要3分钟,准备好肉需要5分钟,这两个同时准备好之后就可以进行烧菜了。如何最大程度上提高效率呢?

显然,就是小明切青椒,小红搞肉丝,两个人并行。这个时候,我们可以用CountDownLatch来模拟这个场景。

切青椒线程:

image

准备肉丝的线程:

image

测试:

image

结果为:

1
2
3
4
5
小明开始切青椒...
小红开始准备肉丝...
小明切好青椒了...
小红准备好肉丝了...
over,食材全部准备好了,一起下锅 cost:5010

这里突出的就是,主线程等待两个线程都执行完了才能继续执行。

CountDownLatch 也是基于 AQS(AbstractQueuedSynchronizer) 实现的.

  • 初始化一个 CountDownLatch 时告诉并发的线程,然后在每个线程处理完毕之后调用 countDown() 方法。
  • 该方法会将 AQS 内置的一个 state 状态 -1 。
  • 最终在主线程调用 await() 方法,它会阻塞直到 state == 0 的时候返回。

这个功能是不是很类似于上面的join,但是它比join灵活多了。

六、CyclicBarrier 并发工具

这个工具类从原理上来看与CountDownLatch非常类似,具体的使用可以看AQS实现的一些同步组件介绍。他们两是有区别的。该工具可以实现 CountDownLatch 同样的功能,但是要更加灵活。甚至可以调用 reset() 方法重置 CyclicBarrier (需要自行捕获 BrokenBarrierException 处理) 然后重新执行。就不再赘述了。

七、线程响应中断

这个我们之前也是提过的,就是interrupt来实现,线程方法里面用while不停地判断中断标志位从而达到自主中断的目的。

image

输出结果:

1
2
3
thread A运行中。。
thread A运行中。。
thread A退出。。

可以采用中断线程的方式来通信,调用了 thread.interrupt() 方法其实就是将 thread 中的一个标志属性置为了 true

并不是说调用了该方法就可以中断线程,如果不对这个标志进行响应其实是没有什么作用(这里对这个标志进行了判断)。

但是如果抛出了 InterruptedException 异常,该标志就会被 JVM 重置为 false

八、线程池 awaitTermination() 方法

这个玩意与我们知道shutdown方法组合使用,我们知道,调用了 shutdown() 之后线程池会停止接受新任务,并且会平滑的关闭线程池中现有的任务。

关于awaitTermination()方法,接收timeoutTimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false.因此,在shutdwon之后,我们可以用awaitTermination()不断地监测剩下的线程的执行状态,执行完毕就可以执行主线程了。

image

输出结果:

1
2
3
4
5
2018-03-16 20:18:01.273 [pool-1-thread-2] INFO  c.c.actual.ThreadCommunication - running2
2018-03-16 20:18:01.273 [pool-1-thread-1] INFO c.c.actual.ThreadCommunication - running
2018-03-16 20:18:02.273 [main] INFO c.c.actual.ThreadCommunication - 线程还在执行。。。
2018-03-16 20:18:03.278 [main] INFO c.c.actual.ThreadCommunication - 线程还在执行。。。
2018-03-16 20:18:04.278 [main] INFO c.c.actual.ThreadCommunication - main over

最后再强调一下:

使用这个 awaitTermination() 方法的前提需要关闭线程池,如调用了 shutdown() 方法。

调用了 shutdown() 之后线程池会停止接受新任务,并且会平滑的关闭线程池中现有的任务。

九、管道通信

这个方式我见到的比较少,了解一下。

image

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
2018-03-16 19:56:43.014 [Thread-0] INFO  c.c.actual.ThreadCommunication - running
2018-03-16 19:56:43.014 [Thread-1] INFO c.c.actual.ThreadCommunication - running2
2018-03-16 19:56:43.130 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=0
2018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=1
2018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=2
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=3
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=4
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=5
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=6
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=7
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=8
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=9

Java 虽说是基于内存通信的,但也可以使用管道通信。

需要注意的是,输入流和输出流需要首先建立连接。这样线程 B 就可以收到线程 A 发出的消息了。

十、总结

实际开发中可以灵活根据需求选择最适合的线程通信方式。

整理自:深入理解线程通信