最近学很多并发,是时候该开始做一些笔记了,跟随《图解 Java 多线程设计模式》一书,把所学的东西在设计模式的角度下统筹一下。我疑心这些模式究竟能否称为设计模式,它们所涉及的其实并非架构,而是一些更特定,细节,具体的东西,或许叫编程模式更为适当?但管他呢。
这篇笔记学习它的 Single Threaded Execution,Guarded Suspension 模式,Immutable 模式已近乎本能,不需要特地学习。
Single Threaded Execution 模式 Single Threaded Execution,其实就是单线程执行。这通常出现在需要把对某些资源的操作原子化 时,即需要让其它线程要么看到该操作未开始,要么看到该操作完成,不能看到其中的中间状态 ;一个非常典型的例子是条件失效(Slipped Conditions) ,它是说代码赖以执行的条件在进行检查时满足,其后执行时却不满足 了,这在单线程环境下是不可能的(下面这个例子仍是可能的,考虑其它进程也在操作文件)。
条件失效是 Race Condition 竞态条件的一种实例。
锁,资源,原子化 考虑这样一个函数,它尝试创建一个文件,倘若文件存在,就先删除该文件,再创建和返回文件:
static  File createFile (String path)  {File  file  =  new  File (path);if  (file.exists()) { boolean  res  =  file.delete(); if  (!res) {throw  new  IllegalStateException (String.format("文件 %s 删除失败" , path));try  {catch  (IOException e) { throw  new  RuntimeException (e); }return  file;
这个例子中的文件虽然是局部变量,但是其是从文件系统中获取的,这个文件系统对所有线程都是可见、共享的
 
对这个函数,只要有相应权限,无论文件存在与否,对同一个路径连续调用两次 getFile 也是没问题的,但在多线程环境下呢?file.delete可能会返回 false,因此抛出异常,这表示,file.delete赖以执行的条件file.exists失效了——在试图删除这个文件时,这个文件已经不存在了 。为此,就需要保证检查文件是否存在和删除文件这个过程是原子的 以避免这个问题。
想要保证这个过程是原子的,仅对它本身加锁是不够的,在执行锁中的过程时,文件的状态仍旧可能被改变,也就是说file.createNewFile会被执行;我们需要保证对文件进行的每一个操作都是原子的,才能保证这个过程是原子的 (当然,大可直接把锁加到整个方法上)。
或者说,锁保护的是资源 ,而非是过程,总是去明确锁保护哪些资源,是否在某些地方遗漏了保护(这等于没有保护),是重要的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static  File getFile (String path)  {File  file  =  new  File (path);synchronized  (path.intern()) { if  (file.exists()) {boolean  res  =  file.delete(); if  (!res) {throw  new  IllegalStateException (String.format("文件 %s 删除失败" , path));synchronized  (path.intern()) {try  {catch  (IOException e) { throw  new  RuntimeException (e); }return  file;
共享资源,临界区 这便是 Single Threaded Execution 模式,其只有一个角色,即 Shared Resource 共享资源,其用于供多线程去并发访问。例子中的文件便是共享资源。对共享资源的读写必须得到正确同步,否则共享资源的状态可能被损坏。Single Threaded Execution 保证对共享资源的单线程访问。
只允许单个线程执行的程序范围 称为临界区。
下列情况下,需要使用 Single Threaded Execution 模式:
多个线程同时访问共享资源 
共享资源的状态会发生变化,若使用 Immutable 模式,就能避免这一条 
安全性需要被确保;如果有时候安全性是可以违背的,比如允许某计数器有一定偏差,不进行同步也不是不行。 
 
并且,即使共享资源的所有方法都是线程安全的,也不保证对其进行组合操作是线程安全的,比如考虑一个可变的 Point(不要这么干!):
public  final  class  Point  {private  int  x;private  int  y;public  synchronized  void  setX (int  x)  {this .x = x;public  synchronized  void  setY (int  y)  {this .y = y;
倘若某个线程要将某 point 置为(1, 1),某线程要将该 point 置为(-1, -1),最后根据线程调度顺序的不同,该 point 最后的值可能为(1, -1), (-1, 1);若要同步这两个线程,需要为元组(x, y)加锁,亦即对 point 本身加锁:
void  thread1 (Point point)  {synchronized (point) {1 )1 )void  thread2 (Point point)  {synchronized (point) {1 )1 )
与其这样操作,不如把 Point 变成不可变对象,每次去创建新 Point,然后修改引用。
Guarded Suspension 模式 Guarded Suspension 保护性暂挂模式,是利用 wait/notify 机制去让某线程去等待直到满足再去执行操作,其既是为了保证安全性,也是一种线程之间协同的机制 。
Guarded Suspension 模式应用在某些操作需要满足条件才能去执行的情况,其中若条件满足则直接执行,若条件不满足则等待条件满足 后再执行。
考虑一个无限长度的阻塞队列,提供一个 offer 和 take 方法去向队列中放入元素和取出元素,其中要求 take 方法在队列为空时等待直到非空才去获取。
这种 take 在单线程的时候是没意义的——如果当前队列为空,则在之后它不可能自动变为非空,因为没有其它线程去操作它。
 
如果不知道 wait/notify 机制的存在的话,可能会想到使用轮询去检查队列是否非空:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public  class  BlockingQueue <T> {private  final  Queue<T> queue = new  LinkedList <T>();public  synchronized  void  offer (T t)  {public  T take ()  {while  (true ) {synchronized  (queue) {if  (!queue.isEmpty()) {return  queue.remove();50 );
这能满足需求,但并不优雅且性能可能极差,idea 也会警告说可能 busy-waiting。解决方法是使用 java 所提供的线程协同机制 wait/notify 去进行同步,而非用轮询去进行同步。
Java 的线程协同机制:wait/notify wait/notify 是一种通知机制,当某线程持有锁并 wait 时,其将释放锁 并挂起,直到被中断,或其它线程持有锁时调用 notify/notifyAll 方法;某线程持有锁并执行 notify 时,其会将 wait 的一个或所有线程重新加入锁的等待队列(此时并不释放锁 ),并继续执行。执行 wait 和 notify 时必须持有锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public  static  class  WaitNotifyTest  {static  final  Object  lock  =  new  Object ();static  void  Thread1 ()  {synchronized  (lock) {"t1: before wait" );try  {"t1: after await" );catch  (InterruptedException e) { }static  void  Thread2 ()  {synchronized  (lock) {"t2: before notify" );try  {10 ); catch  (InterruptedException e) { }"t2: after notify" );public  static  void  main (String[] args)  throws  InterruptedException {new  Thread (WaitNotifyTest::Thread1).start();10 ); new  Thread (WaitNotifyTest::Thread2).start();
最后输出结果必定如下:
t1:  before  wait      -- 首先是 t1  开始执行t2:  before  notify   -- t1  遇到 wait,释放锁并挂起(变为  waiting),t2  拿到锁,开始执行t2:  after notify    -- t2  执行 notifyAll 之后,释放锁之前,这一行必定在 t1 : after await 之前输出t1:  after await     -- t2  释放锁,t1  拿到锁,执行之后的工作
使用 wait/notify 实现互斥锁 这个例子介绍了 wait/notify 的性质,下面是一个更有趣的实例——用 wait/notify 机制实现 Mutex,即不可重入的互斥锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public  static  class  Mutex  {private  Long  currentId  =  null ;public  synchronized  void  lock ()  {while (currentId != null ) {try  {this .wait();catch  (InterruptedException e) { }public  synchronized  void  unlock ()  {if  (currentId == null ) {throw  new  IllegalStateException ("锁未被持有" );if  (!currentId.equals(Thread.currentThread().getId())) {throw  new  IllegalStateException ("当前线程未持有锁" );null ;this .notify();
locked 字段表示该锁是否被持有。其机制是,当某线程调用 lock 方法时,检查锁是否被持有,若已持有,则等待并释放锁,否则持有锁,标记当前 id 为自身;解锁时,检查是否是自己持有锁,并释放锁,通知所有 wait 的线程继续执行。
这里有一个问题,为什么要在 while 中进行 wait 而非 if?假设我们把这里改成 if,假设锁已被占有时,两个线程同时调用了 lock 方法,并在 wait 处挂起;这时,当持有锁的线程解锁时,两个线程会先后从 wait 处开始执行,而他们都会修改 currentId,这意味着锁会被后来者抢走,出现了 bug。而若使用 while 做判断时,先执行的线程设置完 currentId 后,后执行的线程会在 while 中再次检查 currentId(记住,这两个线程执行的过程都是在 synchronized 块里的,不会有冲突),它发现 currentId 仍旧不是 null,因此会继续 wait。
总而言之,不使用 while 做判断的话,会存在这样的情况,即线程被唤醒了,但它等待的条件却并未满足 (这种情况似乎称为虚假唤醒),while 负责让这样的线程再次检查条件是否满足。
使用 wait/notify 实现无界阻塞队列 使用 wait/notify 机制,上面的阻塞队列就可以更优雅地实现了:
public  class  BlockingQueue <T> {private  final  Queue<T> queue = new  LinkedList <T>();public  synchronized  void  offer (T t)  {public  synchronized  T take ()  {while (queue.isEmpty()) {try  {catch  (InterruptedException e) { throw  new  RuntimeException (e); }return  queue.remove();
使用 ReentrantLock 和 Condition 实现有界阻塞队列 ReentrantLock 为可重入锁,可以用于实现 synchronized 代码块,ReentrantLock::newCondition 用于创建对应该锁的条件变量,其可用于实现 wait/notify 机制,其相较于synchronized更加灵活,包括但不限于:
ReentrantLock使用lock和unlock方法显式地加锁解锁,并有tryLock,lockInterruptibly等方法,允许可中断,可超时地获取锁,功能更强大(bug也更多) 
ReentrantLock允许创建多个信号变量对应同一把锁 
 
使用ReentrantLock,我们可以实现有界的阻塞队列,其包含两个信号变量notEmpty和notFull,用于在队列非空时通知消费者,队列非满时通知生产者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public  class  BlockingQueue <T> {private  final  ReentrantLock  lock  =  new  ReentrantLock ();private  final  Condition  notEmpty  =  lock.newCondition();private  final  Condition  notFull  =  lock.newCondition();private  final  int  maxQueueSize;private  final  Queue<T> queue;public  BlockingQueue (int  maxQueueSize)  {assert  maxQueueSize > 0 ;this .maxQueueSize = maxQueueSize;new  LinkedList <>();public  void  put (T t)  {try  {while (queue.size() == maxQueueSize) {catch  (InterruptedException e) {throw  new  RuntimeException (e);finally  {public  T remove ()  {try  {while (queue.size() == 0 ) {T  res  =  queue.remove();return  res;catch  (InterruptedException e) {throw  new  RuntimeException (e);finally  {