从我在第一次接触到 Haskell 中的 foldRight(foldr)起,我就对它十分好奇:foldRight 是如何能够处理无穷列表的?这个问题让我魂牵梦绕,我必须要得到一个合情合理的结果。
foldRight 的推导 首先摆出我们的惰性列表的 ADT 以及 foldRight 的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import scala.{Stream => _} case object Empty extends Stream [Nothing ]final case class Cons [+A ](h: ( ) => A , t : () => Stream [A ]) extends Stream [A ] sealed trait Stream [+A ] { def toList : List [A ] = this match { case Empty => Nil case Cons (h, t) => h() :: t().toList } def foldRight [B ](z: => B )(fn: (A , => B ) => B ): B = this match { case Empty => z case Cons (h, t) => fn(h(), t().foldRight(z)(fn)) } }object Stream { def apply [A ](xs: A *): Stream [A ] = xs.foldRight(Empty : Stream [A ])((x, acc) => Cons (() => x, () => acc)) def cons [A ](x: A , xs: => Stream [A ]): Stream [A ] = { lazy val head = x lazy val tail = xs Cons (() => head, () => tail) } }
Scala 的 foldRight 远远比 Haskell 中的要明显:考虑 fn 的签名(A, => B) => B
,只要我们在 fn 中不对第二个参数求值,即我们不求值t().foldRight(z)(fn)
,即我们不求值t
,则 t 就根本不会被计算!t 若不被计算,则流中从该处的 h 之后的元素都不会被计算了。
这是惰性列表和普通列表的 foldRight 的最大的不同:对于普通列表,它的每一次 fn 的调用之前,都会先对左右参数求值,而每次对右边参数求值都是对子列表的 foldRight,这种递归会无限进行,直到到达列表尾部或爆栈;而对于惰性列表,如果不对右边参数求值,则这次 foldRight 就算弄完了,后续列表的求值就当作不存在。
为此,我们可以干脆地实现一个 headOption 方法:
def headOption : Option [A ] = this .foldRight(None : Option [A ])((x, _) => Some (x))
虽然这代码是我自己写的,但我还是觉得这玩意有点性感,我们去推导一下Stream
,省略一切不必要的东西并使用我的形式:
Stream (1 , 2 , 3 ).headOptionStream (1 , 2 , 3 ).foldRight(None )(fn) Cons (1 , Stream (2 , 3 )).foldRight(None )(fn) fn(1 , Stream (2 , 3 ).foldRight(None )(fn)) Some (1 )
再考虑一个无穷列表,比如自然数列表,求他的 head:
Stream (0 , 1 , 2 , 3 , ...).headOptionStream (0 , 1 , 2 , 3 , ...).foldRight(None )(fn)Cons (0 , Stream (1 , 2 , 3 , ...)).foldRight(None )(fn) fn(0 , Stream (1 , 2 , 3 , ...).foldRight(None )(fn))Some (0 )
从这里已经能看出来东西了:只要在某次 fn 调用中不对 acc 求值,我们就能终止后续的 foldRight 方法的调用,此时 fn 的返回值就是最后一次调用的 foldRight 方法的结果,而 z 值实际上只有在列表为空时才被使用 。
该方法对普通列表仍旧适用,但性能是极差的——无法终止后续的 foldRight 方法的调用——我取个 head 为啥要把整个列表都过一遍?
为此,我们再实现一个更加显然的例子——实现能处理无穷列表的 takeWhile:
def takeWhile (f: A => Boolean ): Stream [A ] = this .foldRight(Empty : Stream [A ])((x, acc) => if (f(x)) Stream .cons(x, acc) else Empty )
对一个自然数列表进行推导,获取所有小于等于 2 的值:
Stream (0 , 1 , 2 , 3 , ...).takeWhile(_ <= 2 )Stream (0 , 1 , 2 , 3 , ...).foldRight(Empty )(fn) Cons (0 , Stream (1 , 2 , 3 , ...)).foldRight(Empty )(fn) fn(0 , Stream (1 , 2 , 3 , ...).foldRight(Empty )(fn)) Cons (0 , Stream (1 , 2 , 3 , ...).foldRight(Empty )(fn)) Cons (0 , Cons (1 , Stream (2 , 3 , ...)).foldRight(Empty )(fn)) Cons (0 , fn(1 , Stream (2 , 3 , ...).foldRight(Empty )(fn))) Cons (0 , Cons (1 , Stream (2 , 3 , ...).foldRight(Empty )(fn))) Cons (0 , Cons (1 , Cons (2 , Stream (3 , ...)).foldRight(Empty )(fn))) Cons (0 , Cons (1 , fn(2 , Stream (3 , ...).foldRight(Empty )(fn)))) Cons (0 , Cons (1 , Cons (2 , Stream (3 , ...).foldRight(Empty )(fn)))) Cons (0 , Cons (1 , Cons (2 , Cons (3 , Stream (...)).foldRight(Empty )(fn)))) Cons (0 , Cons (1 , Cons (2 , fn(3 , Stream (...).foldRight(Empty )(fn))))) Cons (0 , Cons (1 , Cons (2 , Empty ))) Stream (0 , 1 , 2 )
容易发现,我们实际上是在对列表中的每个元素 x 调用 fn,并在 fn 中根据 x 的值来确定操作——我是继续迭代下去,还是在这里直接终止? 从这方面说来,fn 实际上是x => 行为
的映射。使用这种思路,我们就能够实现 take。
为啥上面使用 takeWhile 做例子呢?因为 take 方法无法使用单纯的 foldRight 实现:它的迭代过程中需要维持一个“内部”的状态,即当前还需要 take 的元素数量。我们先编写一个显式的递归的 take 方法:
def take (n: Int ): Stream [A ] = this match { case _ if n <= 0 => Empty case Empty => Empty case Cons (h, t) => Stream .cons(h(), t().take(n - 1 )) }
但若强迫要求使用 foldRight,最显然的方式是在 fn 中使用一个闭包去捕获该变量并在每次更新时维护该变量,这性能可能是最好的,但并不优雅,且无法推导:
def badTake (n: Int ): Stream [A ] = { var i = 0 this .foldRight(Empty : Stream [A ])((x, acc) => if (i < n) { i += 1 Stream .cons(x, acc) } else Empty ) }
根据上面的认识,即fn = x => 行为
,我们可以将状态包含在 x 中,即当前元素的下标,我们可以先实现一个mapWithIndex
的工具方法,利用其将 index 包含在 x 中,并在折叠操作中进行使用,事后再通过 map 丢掉 index,而zipWithIndex
可以使用zip
和Nat
去定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def zip [A , B ](la: Stream [A ], lb: Stream [B ]): Stream [(A , B )] = (la, lb) match { case ((Cons (x, xs), Cons (y, ys))) => Stream .cons((x(), y()), zip(xs(), ys())) case _ => Empty }def Nat : Stream [Int ] = { def go (n: Int ): Stream [Int ] = cons(n, go(n + 1 )) go(0 ) }def map [B ](f: A => B ): Stream [B ] = this .foldRight(Empty : Stream [B ])((x, acc) => Stream .cons(f(x), acc))def zipWithIndex : Stream [(A , Int )] = Stream .zip(this , Stream .Nat )def take (n: Int ): Stream [A ] = this .zipWithIndex.foldRight(Empty : Stream [(A , Int )]){ (x, acc) => val (_, index) = x if (index < n) Stream .cons(x, acc) else Empty }.map(_._1)
万分注意!这里定义 take 的 foldRight 的 fn 的时候不能用偏函数的形式去解构 x(形如{ case (x@(_, index), acc) => ... }
),该操作本质上是进行了一次模式匹配,因而会对 acc 进行求值,从而导致爆栈 !
根据之前对惰性列表中的学习,该操作虽然看起来有点吓人,但性能并不差——实际上只是对每一个元素进行包装和解包装了而已,性能是没有多少损失的,但仍旧有爆栈的风险,毕竟调用对每个元素递归调用 fn 是必须的 。我觉得解决方案是使用命令式的方式去重新定义 foldRight,使用真正的栈去维护调用栈,然后所有操作都使用 foldRight 或 foldRight 编写的方法或使用命令式的代码来实现。我相信 Scala 标准库中的惰性列表也是这样做的,否则就只能在编译器上打孔了。毕竟爆栈的风险总是存在的,对于列表这种线性结构尤甚。
关于 >=> 对于无穷列表,我之前所使用的解释折叠的方法就很怪了——如何在一个无穷的序列后面加东西?为此该方法需要被改变,而改变后的方法和实际的计算过程是非常相像的——动态地去构造>=>
所连接起来的序列 。
比如,求一个自然数序列的小于 3 的部分的和,代码我们会这么写:
Stream .Nat .foldRight(0 )((x, acc) => if (x < 3 ) x + acc else 0 )
为此,按照新的推导方法,过程是这样:
Stream (0 , 1 , 2 , 3 , 4 , ...).fold0 >=> Stream (1 , 2 , 3 , 4 , ...).fold0 >=> 1 >=> Stream (2 , 3 , 4 , ...).fold0 >=> 1 >=> 2 >=> Stream (3 , 4 , ...).fold0 >=> 1 >=> 2 >=> 0 0 + (1 + (2 + 0 ))3
该过程就相当于是从前往后对每一个元素都进行“检查”,找到要求停止的元素,然后把 fn 的返回值作为初始值,在这里是当 x 为 3 时停止,返回一个 0。
那么问题又来了,对于惰性列表,链式调用(实际上是折叠操作的链式调用)有没有什么更加舒服的描述方式呢?
另一种考虑惰性列表的方式是将其看作无限嵌套的盒子,每次打开一个盒子,盒子里都会有一个值(在 Haskell 里,这个值也是盒子,所谓一切皆 thunk),和一个更小的盒子。
如何看待惰性求值 惰性求值不会改变代码的形式(即能操作普通列表的方法不需要改变结构 / 源代码就能够去操作惰性列表),但有着更高的性能并减少中间生成的临时数据结构,因此它可以认为是函数式编程中的一个优化方式 。
但这也带来了代价:对方法的时空复杂度的分析将变得困难,错误编写的代码会导致内存泄漏。因此,对于惰性求值的看法应当就同对于优化的看法一致:不要提前优化 。但若是惰性列表会简化代码的情况,我认为主动去使用它也是合理的。