关于 foldRight 为何能操作无穷列表

从我在第一次接触到 Haskell 中的 foldRight(foldr)起,我就对它十分好奇:foldRight 是如何能够处理无穷列表的?这个问题让我魂牵梦绕,我必须要得到一个合情合理的结果。

foldRight 的推导

首先摆出我们的惰性列表的 ADT 以及 foldRight 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import scala.{Stream => _} // 必须排除标准库的 Stream,避免重名
// 为图方便,两个值构造器放到前面
case object Empty extends Stream[Nothing]
final case class Cons[+A](h: () => A, t: () => Stream[A]) extends Stream[A] // 注意 final 需要在 case 之前
sealed trait Stream[+A] {
// toList 方便展示
def toList: List[A] = this match {
case Empty => Nil
case Cons(h, t) => h() :: t().toList
}
def foldRight[B](z: => B)(fn: (A, => B) => B): B = this match {
case Empty => z
case Cons(h, t) => fn(h(), t().foldRight(z)(fn)) // 也可理解为 h() >=> t().foldRight(z)(fn),其中 >=> 即为 fn 的中缀表示形式,右结合且对第二个参数非严格求值
}
}

object Stream {
def apply[A](xs: A*): Stream[A] =
xs.foldRight(Empty: Stream[A])((x, acc) => Cons(() => x, () => acc))

// smart constructor
def cons[A](x: A, xs: => Stream[A]): Stream[A] = {
lazy val head = x
lazy val tail = xs
Cons(() => head, () => tail)
}
}

Scala 的 foldRight 远远比 Haskell 中的要明显:考虑 fn 的签名(A, => B) => B,只要我们在 fn 中不对第二个参数求值,即我们不求值t().foldRight(z)(fn),即我们不求值t,则 t 就根本不会被计算!t 若不被计算,则流中从该处的 h 之后的元素都不会被计算了。

这是惰性列表和普通列表的 foldRight 的最大的不同:对于普通列表,它的每一次 fn 的调用之前,都会先对左右参数求值,而每次对右边参数求值都是对子列表的 foldRight,这种递归会无限进行,直到到达列表尾部或爆栈;而对于惰性列表,如果不对右边参数求值,则这次 foldRight 就算弄完了,后续列表的求值就当作不存在。

为此,我们可以干脆地实现一个 headOption 方法:

1
2
3
// Stream method
def headOption: Option[A] =
this.foldRight(None: Option[A])((x, _) => Some(x))

虽然这代码是我自己写的,但我还是觉得这玩意有点性感,我们去推导一下Stream,省略一切不必要的东西并使用我的形式:

1
2
3
4
5
Stream(1, 2, 3).headOption
Stream(1, 2, 3).foldRight(None)(fn) // 应用 headOption
Cons(1, Stream(2, 3)).foldRight(None)(fn) // 解构 Stream
fn(1, Stream(2, 3).foldRight(None)(fn)) // 应用 foldRight
Some(1) // 应用 fn

再考虑一个无穷列表,比如自然数列表,求他的 head:

1
2
3
4
5
Stream(0, 1, 2, 3, ...).headOption
Stream(0, 1, 2, 3, ...).foldRight(None)(fn)
Cons(0, Stream(1, 2, 3, ...)).foldRight(None)(fn)
fn(0, Stream(1, 2, 3, ...).foldRight(None)(fn))
Some(0)

从这里已经能看出来东西了:只要在某次 fn 调用中不对 acc 求值,我们就能终止后续的 foldRight 方法的调用,此时 fn 的返回值就是最后一次调用的 foldRight 方法的结果,而 z 值实际上只有在列表为空时才被使用

该方法对普通列表仍旧适用,但性能是极差的——无法终止后续的 foldRight 方法的调用——我取个 head 为啥要把整个列表都过一遍?

为此,我们再实现一个更加显然的例子——实现能处理无穷列表的 takeWhile:

1
2
3
// Stream method
def takeWhile(f: A => Boolean): Stream[A] =
this.foldRight(Empty: Stream[A])((x, acc) => if (f(x)) Stream.cons(x, acc) else Empty)

对一个自然数列表进行推导,获取所有小于等于 2 的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Stream(0, 1, 2, 3, ...).takeWhile(_ <= 2)
Stream(0, 1, 2, 3, ...).foldRight(Empty)(fn) // 应用 takeWhile
Cons(0, Stream(1, 2, 3, ...)).foldRight(Empty)(fn) // 解构 Stream
fn(0, Stream(1, 2, 3, ...).foldRight(Empty)(fn)) // 应用 foldRight
Cons(0, Stream(1, 2, 3, ...).foldRight(Empty)(fn)) // 应用 fn
Cons(0, Cons(1, Stream(2, 3, ...)).foldRight(Empty)(fn)) // 解构 Stream
Cons(0, fn(1, Stream(2, 3, ...).foldRight(Empty)(fn))) // 应用 foldRight
Cons(0, Cons(1, Stream(2, 3, ...).foldRight(Empty)(fn))) // 应用 fn
Cons(0, Cons(1, Cons(2, Stream(3, ...)).foldRight(Empty)(fn))) // 解构 Stream
Cons(0, Cons(1, fn(2, Stream(3, ...).foldRight(Empty)(fn)))) // 应用 foldRight
Cons(0, Cons(1, Cons(2, Stream(3, ...).foldRight(Empty)(fn)))) // 应用 fn
Cons(0, Cons(1, Cons(2, Cons(3, Stream(...)).foldRight(Empty)(fn)))) // 解构 Stream
Cons(0, Cons(1, Cons(2, fn(3, Stream(...).foldRight(Empty)(fn))))) // 应用 foldRight
Cons(0, Cons(1, Cons(2, Empty))) // 应用 fn
Stream(0, 1, 2) // 使用 Stream 表示

容易发现,我们实际上是在对列表中的每个元素 x 调用 fn,并在 fn 中根据 x 的值来确定操作——我是继续迭代下去,还是在这里直接终止?从这方面说来,fn 实际上是x => 行为的映射。使用这种思路,我们就能够实现 take。

为啥上面使用 takeWhile 做例子呢?因为 take 方法无法使用单纯的 foldRight 实现:它的迭代过程中需要维持一个“内部”的状态,即当前还需要 take 的元素数量。我们先编写一个显式的递归的 take 方法:

1
2
3
4
5
def take(n: Int): Stream[A] = this match {
case _ if n <= 0 => Empty
case Empty => Empty
case Cons(h, t) => Stream.cons(h(), t().take(n - 1))
}

但若强迫要求使用 foldRight,最显然的方式是在 fn 中使用一个闭包去捕获该变量并在每次更新时维护该变量,这性能可能是最好的,但并不优雅,且无法推导:

1
2
3
4
5
6
7
8
9
// Bad!
// Stream method
def badTake(n: Int): Stream[A] = {
var i = 0
this.foldRight(Empty: Stream[A])((x, acc) => if (i < n) {
i += 1
Stream.cons(x, acc)
} else Empty)
}

根据上面的认识,即fn = x => 行为,我们可以将状态包含在 x 中,即当前元素的下标,我们可以先实现一个mapWithIndex的工具方法,利用其将 index 包含在 x 中,并在折叠操作中进行使用,事后再通过 map 丢掉 index,而zipWithIndex可以使用zipNat去定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Stream function
def zip[A, B](la: Stream[A], lb: Stream[B]): Stream[(A, B)] = (la, lb) match {
case ((Cons(x, xs), Cons(y, ys))) => Stream.cons((x(), y()), zip(xs(), ys()))
case _ => Empty
}
// Stream function,自然数
def Nat: Stream[Int] = {
def go(n: Int): Stream[Int] =
cons(n, go(n + 1))
go(0)
}

// Stream method
def map[B](f: A => B): Stream[B] =
this.foldRight(Empty: Stream[B])((x, acc) => Stream.cons(f(x), acc))
// Stream method
def zipWithIndex: Stream[(A, Int)] = Stream.zip(this, Stream.Nat)
// Stream method
def take(n: Int): Stream[A] =
this.zipWithIndex.foldRight(Empty: Stream[(A, Int)]){ (x, acc) => // 不要在这里省代码!
val (_, index) = x
if (index < n) Stream.cons(x, acc)
else Empty
}.map(_._1)

万分注意!这里定义 take 的 foldRight 的 fn 的时候不能用偏函数的形式去解构 x(形如{ case (x@(_, index), acc) => ... }),该操作本质上是进行了一次模式匹配,因而会对 acc 进行求值,从而导致爆栈

根据之前对惰性列表中的学习,该操作虽然看起来有点吓人,但性能并不差——实际上只是对每一个元素进行包装和解包装了而已,性能是没有多少损失的,但仍旧有爆栈的风险,毕竟调用对每个元素递归调用 fn 是必须的。我觉得解决方案是使用命令式的方式去重新定义 foldRight,使用真正的栈去维护调用栈,然后所有操作都使用 foldRight 或 foldRight 编写的方法或使用命令式的代码来实现。我相信 Scala 标准库中的惰性列表也是这样做的,否则就只能在编译器上打孔了。毕竟爆栈的风险总是存在的,对于列表这种线性结构尤甚。

关于 >=>

对于无穷列表,我之前所使用的解释折叠的方法就很怪了——如何在一个无穷的序列后面加东西?为此该方法需要被改变,而改变后的方法和实际的计算过程是非常相像的——动态地去构造>=>所连接起来的序列

比如,求一个自然数序列的小于 3 的部分的和,代码我们会这么写:

1
Stream.Nat.foldRight(0)((x, acc) => if (x < 3) x + acc else 0)

为此,按照新的推导方法,过程是这样:

1
2
3
4
5
6
7
Stream(0, 1, 2, 3, 4, ...).fold
0 >=> Stream(1, 2, 3, 4, ...).fold
0 >=> 1 >=> Stream(2, 3, 4, ...).fold
0 >=> 1 >=> 2 >=> Stream(3, 4, ...).fold
0 >=> 1 >=> 2 >=> 0 // 在这里,fn(3, Stream(4, ...).fold) 的求值直接返回 0
0 + (1 + (2 + 0))
3

该过程就相当于是从前往后对每一个元素都进行“检查”,找到要求停止的元素,然后把 fn 的返回值作为初始值,在这里是当 x 为 3 时停止,返回一个 0。

那么问题又来了,对于惰性列表,链式调用(实际上是折叠操作的链式调用)有没有什么更加舒服的描述方式呢?

另一种考虑惰性列表的方式是将其看作无限嵌套的盒子,每次打开一个盒子,盒子里都会有一个值(在 Haskell 里,这个值也是盒子,所谓一切皆 thunk),和一个更小的盒子。

如何看待惰性求值

惰性求值不会改变代码的形式(即能操作普通列表的方法不需要改变结构 / 源代码就能够去操作惰性列表),但有着更高的性能并减少中间生成的临时数据结构,因此它可以认为是函数式编程中的一个优化方式

但这也带来了代价:对方法的时空复杂度的分析将变得困难,错误编写的代码会导致内存泄漏。因此,对于惰性求值的看法应当就同对于优化的看法一致:不要提前优化。但若是惰性列表会简化代码的情况,我认为主动去使用它也是合理的。