最近学很多并发,是时候该开始做一些笔记了,跟随《图解 Java 多线程设计模式》一书,把所学的东西在设计模式的角度下统筹一下。我疑心这些模式究竟能否称为设计模式,它们所涉及的其实并非架构,而是一些更特定,细节,具体的东西,或许叫编程模式更为适当?但管他呢。
这篇笔记学习它的 Single Threaded Execution,Guarded Suspension 模式,Immutable 模式已近乎本能,不需要特地学习。
Single Threaded Execution 模式 Single Threaded Execution,其实就是单线程执行。这通常出现在需要把对某些资源的操作原子化 时,即需要让其它线程要么看到该操作未开始,要么看到该操作完成,不能看到其中的中间状态 ;一个非常典型的例子是条件失效(Slipped Conditions) ,它是说代码赖以执行的条件在进行检查时满足,其后执行时却不满足 了,这在单线程环境下是不可能的(下面这个例子仍是可能的,考虑其它进程也在操作文件)。
条件失效是 Race Condition 竞态条件的一种实例。
锁,资源,原子化 考虑这样一个函数,它尝试创建一个文件,倘若文件存在,就先删除该文件,再创建和返回文件:
static File createFile (String path) { File file = new File (path); if (file.exists()) { boolean res = file.delete(); if (!res) { throw new IllegalStateException (String.format("文件 %s 删除失败" , path)); } } try { file.createNewFile(); } catch (IOException e) { throw new RuntimeException (e); } return file; }
这个例子中的文件虽然是局部变量,但是其是从文件系统中获取的,这个文件系统对所有线程都是可见、共享的
对这个函数,只要有相应权限,无论文件存在与否,对同一个路径连续调用两次 getFile 也是没问题的,但在多线程环境下呢?file.delete
可能会返回 false,因此抛出异常,这表示,file.delete
赖以执行的条件file.exists
失效了——在试图删除这个文件时,这个文件已经不存在了 。为此,就需要保证检查文件是否存在和删除文件这个过程是原子的 以避免这个问题。
想要保证这个过程是原子的,仅对它本身加锁是不够的,在执行锁中的过程时,文件的状态仍旧可能被改变,也就是说file.createNewFile
会被执行;我们需要保证对文件进行的每一个操作都是原子的,才能保证这个过程是原子的 (当然,大可直接把锁加到整个方法上)。
或者说,锁保护的是资源 ,而非是过程,总是去明确锁保护哪些资源,是否在某些地方遗漏了保护(这等于没有保护),是重要的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static File getFile (String path) { File file = new File (path); synchronized (path.intern()) { if (file.exists()) { boolean res = file.delete(); if (!res) { throw new IllegalStateException (String.format("文件 %s 删除失败" , path)); } } } synchronized (path.intern()) { try { file.createNewFile(); } catch (IOException e) { throw new RuntimeException (e); } return file; } }
共享资源,临界区 这便是 Single Threaded Execution 模式,其只有一个角色,即 Shared Resource 共享资源,其用于供多线程去并发访问。例子中的文件便是共享资源。对共享资源的读写必须得到正确同步,否则共享资源的状态可能被损坏。Single Threaded Execution 保证对共享资源的单线程访问。
只允许单个线程执行的程序范围 称为临界区。
下列情况下,需要使用 Single Threaded Execution 模式:
多个线程同时访问共享资源
共享资源的状态会发生变化,若使用 Immutable 模式,就能避免这一条
安全性需要被确保;如果有时候安全性是可以违背的,比如允许某计数器有一定偏差,不进行同步也不是不行。
并且,即使共享资源的所有方法都是线程安全的,也不保证对其进行组合操作是线程安全的,比如考虑一个可变的 Point(不要这么干!):
public final class Point { private int x; private int y; public synchronized void setX (int x) { this .x = x; } public synchronized void setY (int y) { this .y = y; } }
倘若某个线程要将某 point 置为(1, 1)
,某线程要将该 point 置为(-1, -1)
,最后根据线程调度顺序的不同,该 point 最后的值可能为(1, -1)
, (-1, 1)
;若要同步这两个线程,需要为元组(x, y)
加锁,亦即对 point 本身加锁:
void thread1 (Point point) { synchronized (point) { point.setX(1 ) point.setY(1 ) } }void thread2 (Point point) { synchronized (point) { point.setX(-1 ) point.setY(-1 ) } }
与其这样操作,不如把 Point 变成不可变对象,每次去创建新 Point,然后修改引用。
Guarded Suspension 模式 Guarded Suspension 保护性暂挂模式,是利用 wait/notify 机制去让某线程去等待直到满足再去执行操作,其既是为了保证安全性,也是一种线程之间协同的机制 。
Guarded Suspension 模式应用在某些操作需要满足条件才能去执行的情况,其中若条件满足则直接执行,若条件不满足则等待条件满足 后再执行。
考虑一个无限长度的阻塞队列,提供一个 offer 和 take 方法去向队列中放入元素和取出元素,其中要求 take 方法在队列为空时等待直到非空才去获取。
这种 take 在单线程的时候是没意义的——如果当前队列为空,则在之后它不可能自动变为非空,因为没有其它线程去操作它。
如果不知道 wait/notify 机制的存在的话,可能会想到使用轮询去检查队列是否非空:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class BlockingQueue <T> { private final Queue<T> queue = new LinkedList <T>(); public synchronized void offer (T t) { queue.offer(t); } public T take () { while (true ) { synchronized (queue) { if (!queue.isEmpty()) { return queue.remove(); } } Thread.sleep(50 ); } } }
这能满足需求,但并不优雅且性能可能极差,idea 也会警告说可能 busy-waiting。解决方法是使用 java 所提供的线程协同机制 wait/notify 去进行同步,而非用轮询去进行同步。
Java 的线程协同机制:wait/notify wait/notify 是一种通知机制,当某线程持有锁并 wait 时,其将释放锁 并挂起,直到被中断,或其它线程持有锁时调用 notify/notifyAll 方法;某线程持有锁并执行 notify 时,其会将 wait 的一个或所有线程重新加入锁的等待队列(此时并不释放锁 ),并继续执行。执行 wait 和 notify 时必须持有锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public static class WaitNotifyTest { static final Object lock = new Object (); static void Thread1 () { synchronized (lock) { System.out.println("t1: before wait" ); try { lock.wait(); System.out.println("t1: after await" ); } catch (InterruptedException e) { } } } static void Thread2 () { synchronized (lock) { System.out.println("t2: before notify" ); lock.notify(); try { Thread.sleep(10 ); } catch (InterruptedException e) { } System.out.println("t2: after notify" ); } } public static void main (String[] args) throws InterruptedException { new Thread (WaitNotifyTest::Thread1).start(); Thread.sleep(10 ); new Thread (WaitNotifyTest::Thread2).start(); } }
最后输出结果必定如下:
t1: before wait -- 首先是 t1 开始执行t2: before notify -- t1 遇到 wait,释放锁并挂起(变为 waiting),t2 拿到锁,开始执行t2: after notify -- t2 执行 notifyAll 之后,释放锁之前,这一行必定在 t1 : after await 之前输出t1: after await -- t2 释放锁,t1 拿到锁,执行之后的工作
使用 wait/notify 实现互斥锁 这个例子介绍了 wait/notify 的性质,下面是一个更有趣的实例——用 wait/notify 机制实现 Mutex,即不可重入的互斥锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static class Mutex { private Long currentId = null ; public synchronized void lock () { while (currentId != null ) { try { this .wait(); } catch (InterruptedException e) { } } currentId = Thread.currentThread().getId(); } public synchronized void unlock () { if (currentId == null ) { throw new IllegalStateException ("锁未被持有" ); } if (!currentId.equals(Thread.currentThread().getId())) { throw new IllegalStateException ("当前线程未持有锁" ); } currentId = null ; this .notify(); } }
locked 字段表示该锁是否被持有。其机制是,当某线程调用 lock 方法时,检查锁是否被持有,若已持有,则等待并释放锁,否则持有锁,标记当前 id 为自身;解锁时,检查是否是自己持有锁,并释放锁,通知所有 wait 的线程继续执行。
这里有一个问题,为什么要在 while 中进行 wait 而非 if?假设我们把这里改成 if,假设锁已被占有时,两个线程同时调用了 lock 方法,并在 wait 处挂起;这时,当持有锁的线程解锁时,两个线程会先后从 wait 处开始执行,而他们都会修改 currentId,这意味着锁会被后来者抢走,出现了 bug。而若使用 while 做判断时,先执行的线程设置完 currentId 后,后执行的线程会在 while 中再次检查 currentId(记住,这两个线程执行的过程都是在 synchronized 块里的,不会有冲突),它发现 currentId 仍旧不是 null,因此会继续 wait。
总而言之,不使用 while 做判断的话,会存在这样的情况,即线程被唤醒了,但它等待的条件却并未满足 (这种情况似乎称为虚假唤醒),while 负责让这样的线程再次检查条件是否满足。
使用 wait/notify 实现无界阻塞队列 使用 wait/notify 机制,上面的阻塞队列就可以更优雅地实现了:
public class BlockingQueue <T> { private final Queue<T> queue = new LinkedList <T>(); public synchronized void offer (T t) { queue.offer(t); notify(); } public synchronized T take () { while (queue.isEmpty()) { try { wait(); } catch (InterruptedException e) { throw new RuntimeException (e); } } return queue.remove(); } }
使用 ReentrantLock 和 Condition 实现有界阻塞队列 ReentrantLock 为可重入锁,可以用于实现 synchronized 代码块,ReentrantLock::newCondition 用于创建对应该锁的条件变量,其可用于实现 wait/notify 机制,其相较于synchronized更加灵活,包括但不限于:
ReentrantLock使用lock和unlock方法显式地加锁解锁,并有tryLock,lockInterruptibly等方法,允许可中断,可超时地获取锁,功能更强大(bug也更多)
ReentrantLock允许创建多个信号变量对应同一把锁
使用ReentrantLock,我们可以实现有界的阻塞队列,其包含两个信号变量notEmpty和notFull,用于在队列非空时通知消费者,队列非满时通知生产者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class BlockingQueue <T> { private final ReentrantLock lock = new ReentrantLock (); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); private final int maxQueueSize; private final Queue<T> queue; public BlockingQueue (int maxQueueSize) { assert maxQueueSize > 0 ; this .maxQueueSize = maxQueueSize; queue = new LinkedList <>(); } public void put (T t) { lock.lock(); try { while (queue.size() == maxQueueSize) { notFull.await(); } queue.offer(t); notEmpty.signalAll(); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { lock.unlock(); } } public T remove () { lock.lock(); try { while (queue.size() == 0 ) { notEmpty.await(); } T res = queue.remove(); notFull.signalAll(); return res; } catch (InterruptedException e) { throw new RuntimeException (e); } finally { lock.unlock(); } } }