多线程编程模式——单线程执行,保护性暂挂

最近学很多并发,是时候该开始做一些笔记了,跟随《图解 Java 多线程设计模式》一书,把所学的东西在设计模式的角度下统筹一下。我疑心这些模式究竟能否称为设计模式,它们所涉及的其实并非架构,而是一些更特定,细节,具体的东西,或许叫编程模式更为适当?但管他呢。

这篇笔记学习它的 Single Threaded Execution,Guarded Suspension 模式,Immutable 模式已近乎本能,不需要特地学习。

Single Threaded Execution 模式

Single Threaded Execution,其实就是单线程执行。这通常出现在需要把对某些资源的操作原子化时,即需要让其它线程要么看到该操作未开始,要么看到该操作完成,不能看到其中的中间状态;一个非常典型的例子是条件失效(Slipped Conditions),它是说代码赖以执行的条件在进行检查时满足,其后执行时却不满足了,这在单线程环境下是不可能的(下面这个例子仍是可能的,考虑其它进程也在操作文件)。

条件失效是 Race Condition 竞态条件的一种实例。

锁,资源,原子化

考虑这样一个函数,它尝试创建一个文件,倘若文件存在,就先删除该文件,再创建和返回文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
static File createFile(String path) {
File file = new File(path);
if (file.exists()) { // check and act
boolean res = file.delete();
if (!res) {
throw new IllegalStateException(String.format("文件 %s 删除失败", path));
}
}
try {
file.createNewFile();
} catch (IOException e) { throw new RuntimeException(e); }
return file;
}

这个例子中的文件虽然是局部变量,但是其是从文件系统中获取的,这个文件系统对所有线程都是可见、共享的

对这个函数,只要有相应权限,无论文件存在与否,对同一个路径连续调用两次 getFile 也是没问题的,但在多线程环境下呢?file.delete可能会返回 false,因此抛出异常,这表示,file.delete赖以执行的条件file.exists失效了——在试图删除这个文件时,这个文件已经不存在了。为此,就需要保证检查文件是否存在和删除文件这个过程是原子的以避免这个问题。

想要保证这个过程是原子的,仅对它本身加锁是不够的,在执行锁中的过程时,文件的状态仍旧可能被改变,也就是说file.createNewFile会被执行;我们需要保证对文件进行的每一个操作都是原子的,才能保证这个过程是原子的(当然,大可直接把锁加到整个方法上)。

或者说,锁保护的是资源,而非是过程,总是去明确锁保护哪些资源,是否在某些地方遗漏了保护(这等于没有保护),是重要的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static File getFile(String path) {
File file = new File(path);
synchronized (path.intern()) { // 这里可以换成双重检查锁以提高性能
if (file.exists()) {
boolean res = file.delete();
if (!res) {
throw new IllegalStateException(String.format("文件 %s 删除失败", path));
}
}
}
synchronized (path.intern()) {
try {
file.createNewFile();
} catch (IOException e) { throw new RuntimeException(e); }
return file;
}
}

共享资源,临界区

这便是 Single Threaded Execution 模式,其只有一个角色,即 Shared Resource 共享资源,其用于供多线程去并发访问。例子中的文件便是共享资源。对共享资源的读写必须得到正确同步,否则共享资源的状态可能被损坏。Single Threaded Execution 保证对共享资源的单线程访问。

只允许单个线程执行的程序范围称为临界区。

下列情况下,需要使用 Single Threaded Execution 模式:

  1. 多个线程同时访问共享资源
  2. 共享资源的状态会发生变化,若使用 Immutable 模式,就能避免这一条
  3. 安全性需要被确保;如果有时候安全性是可以违背的,比如允许某计数器有一定偏差,不进行同步也不是不行。

并且,即使共享资源的所有方法都是线程安全的,也不保证对其进行组合操作是线程安全的,比如考虑一个可变的 Point(不要这么干!):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final class Point {
private int x;
private int y;

// 这个锁没意义,因为 32 位赋值本来就是原子的
public synchronized void setX(int x) {
this.x = x;
}

public synchronized void setY(int y) {
this.y = y;
}
// getters...
}

倘若某个线程要将某 point 置为(1, 1),某线程要将该 point 置为(-1, -1),最后根据线程调度顺序的不同,该 point 最后的值可能为(1, -1), (-1, 1);若要同步这两个线程,需要为元组(x, y)加锁,亦即对 point 本身加锁:

1
2
3
4
5
6
7
8
9
10
11
12
void thread1(Point point) {
synchronized(point) {
point.setX(1)
point.setY(1)
}
}
void thread2(Point point) {
synchronized(point) {
point.setX(-1)
point.setY(-1)
}
}

与其这样操作,不如把 Point 变成不可变对象,每次去创建新 Point,然后修改引用。

Guarded Suspension 模式

Guarded Suspension 保护性暂挂模式,是利用 wait/notify 机制去让某线程去等待直到满足再去执行操作,其既是为了保证安全性,也是一种线程之间协同的机制

Guarded Suspension 模式应用在某些操作需要满足条件才能去执行的情况,其中若条件满足则直接执行,若条件不满足则等待条件满足后再执行。

考虑一个无限长度的阻塞队列,提供一个 offer 和 take 方法去向队列中放入元素和取出元素,其中要求 take 方法在队列为空时等待直到非空才去获取。

这种 take 在单线程的时候是没意义的——如果当前队列为空,则在之后它不可能自动变为非空,因为没有其它线程去操作它。

如果不知道 wait/notify 机制的存在的话,可能会想到使用轮询去检查队列是否非空:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class BlockingQueue<T> {
private final Queue<T> queue = new LinkedList<T>();
// 需要加锁以同步对 queue 的访问
public synchronized void offer(T t) {
queue.offer(t);
}

// 不能给方法外部加锁,这会导致无法 offer,必然死锁
public T take() {
while (true) {
synchronized (queue) {
if (!queue.isEmpty()) {
return queue.remove();
}
}
Thread.sleep(50);
}
}
}

这能满足需求,但并不优雅且性能可能极差,idea 也会警告说可能 busy-waiting。解决方法是使用 java 所提供的线程协同机制 wait/notify 去进行同步,而非用轮询去进行同步。

Java 的线程协同机制:wait/notify

wait/notify 是一种通知机制,当某线程持有锁并 wait 时,其将释放锁并挂起,直到被中断,或其它线程持有锁时调用 notify/notifyAll 方法;某线程持有锁并执行 notify 时,其会将 wait 的一个或所有线程重新加入锁的等待队列(此时并不释放锁),并继续执行。执行 wait 和 notify 时必须持有锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static class WaitNotifyTest {
static final Object lock = new Object();

static void Thread1() {
synchronized (lock) {
System.out.println("t1: before wait");
try {
lock.wait();
System.out.println("t1: after await");
} catch (InterruptedException e) { }
}
}
static void Thread2() {
synchronized (lock) {
System.out.println("t2: before notify");
lock.notify();
try {
Thread.sleep(10); // 即使 sleep,也不会有改变
} catch (InterruptedException e) { }
System.out.println("t2: after notify");
}
}

public static void main(String[] args) throws InterruptedException {
new Thread(WaitNotifyTest::Thread1).start();
Thread.sleep(10); // notify 必须在 wait 之后执行才有效果
new Thread(WaitNotifyTest::Thread2).start();
}
}

最后输出结果必定如下:

1
2
3
4
t1: before wait     -- 首先是 t1 开始执行
t2: before notify -- t1 遇到 wait,释放锁并挂起(变为 waiting),t2 拿到锁,开始执行
t2: after notify -- t2 执行 notifyAll 之后,释放锁之前,这一行必定在 t1: after await 之前输出
t1: after await -- t2 释放锁,t1 拿到锁,执行之后的工作

使用 wait/notify 实现互斥锁

这个例子介绍了 wait/notify 的性质,下面是一个更有趣的实例——用 wait/notify 机制实现 Mutex,即不可重入的互斥锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static class Mutex {
// 该字段存储持有锁的线程的 id,通过 Thread.currentThread().getId() 获取,为线程的唯一标识符
// 因为 currentId 只在 synchronized 块中被访问,所以不需要 volatile
private Long currentId = null;

public synchronized void lock() {
while(currentId != null) {
try {
this.wait();
} catch (InterruptedException e) { }
}
currentId = Thread.currentThread().getId();
}

public synchronized void unlock() {
if (currentId == null) {
throw new IllegalStateException("锁未被持有");
}
if (!currentId.equals(Thread.currentThread().getId())) {
throw new IllegalStateException("当前线程未持有锁");
}
currentId = null;
this.notify();
}
}

locked 字段表示该锁是否被持有。其机制是,当某线程调用 lock 方法时,检查锁是否被持有,若已持有,则等待并释放锁,否则持有锁,标记当前 id 为自身;解锁时,检查是否是自己持有锁,并释放锁,通知所有 wait 的线程继续执行。

这里有一个问题,为什么要在 while 中进行 wait 而非 if?假设我们把这里改成 if,假设锁已被占有时,两个线程同时调用了 lock 方法,并在 wait 处挂起;这时,当持有锁的线程解锁时,两个线程会先后从 wait 处开始执行,而他们都会修改 currentId,这意味着锁会被后来者抢走,出现了 bug。而若使用 while 做判断时,先执行的线程设置完 currentId 后,后执行的线程会在 while 中再次检查 currentId(记住,这两个线程执行的过程都是在 synchronized 块里的,不会有冲突),它发现 currentId 仍旧不是 null,因此会继续 wait。

总而言之,不使用 while 做判断的话,会存在这样的情况,即线程被唤醒了,但它等待的条件却并未满足(这种情况似乎称为虚假唤醒),while 负责让这样的线程再次检查条件是否满足。

使用 wait/notify 实现无界阻塞队列

使用 wait/notify 机制,上面的阻塞队列就可以更优雅地实现了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class BlockingQueue<T> {
private final Queue<T> queue = new LinkedList<T>();

public synchronized void offer(T t) {
queue.offer(t);
notify();
}
public synchronized T take() {
while(queue.isEmpty()) {
try {
wait();
} catch (InterruptedException e) { throw new RuntimeException(e); }
}
return queue.remove();
}
}

使用 ReentrantLock 和 Condition 实现有界阻塞队列

ReentrantLock 为可重入锁,可以用于实现 synchronized 代码块,ReentrantLock::newCondition 用于创建对应该锁的条件变量,其可用于实现 wait/notify 机制,其相较于synchronized更加灵活,包括但不限于:

  1. ReentrantLock使用lock和unlock方法显式地加锁解锁,并有tryLock,lockInterruptibly等方法,允许可中断,可超时地获取锁,功能更强大(bug也更多)
  2. ReentrantLock允许创建多个信号变量对应同一把锁

使用ReentrantLock,我们可以实现有界的阻塞队列,其包含两个信号变量notEmpty和notFull,用于在队列非空时通知消费者,队列非满时通知生产者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class BlockingQueue<T> {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
private final int maxQueueSize;
private final Queue<T> queue;

public BlockingQueue(int maxQueueSize) {
assert maxQueueSize > 0;
this.maxQueueSize = maxQueueSize;
queue = new LinkedList<>();
}

/**
* 尝试把值入队列,若满则阻塞
* @param t
*/
public void put(T t) {
lock.lock();
try {
while(queue.size() == maxQueueSize) {
notFull.await();
}
queue.offer(t);
notEmpty.signalAll();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}

/**
* 移除和返回队列头部,无值则阻塞
* @return
*/
public T remove() {
lock.lock();
try {
while(queue.size() == 0) {
notEmpty.await();
}
T res = queue.remove();
notFull.signalAll();
return res;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}