A transformer operation is an operation that creates another collection, instead than just a single value, ex: filter, map, flatMap, groupBy. As an opposite, methods such as fold, sum or aggregate are NOT transformer operations.
Sequential transformer operations can be implemented generically using an abstraction called Builder:
trait Builder[T, Repr] {
def += (elem: T): this.type
def result: Repr
}
T is the type of the elements of the collection, Repr is the type of the collection. For instance T could be String and Repr could be a Seq[String]. The two operations:
Let's take a transformer operation. Its implementation will:
Builders can only be used to implement sequential transformer operations. To implement parallel transformer operations we need an abstraction called a Combiner:
trait Combiner[T, Repr] extends Builder[T, Repr] {
def combine(that: Combiner[T, Repr]): Combiner[T, Repr]
}
A Combiner is a Builder with one extra method combine that takes the elements from two input combiners and creates a new combiner. How can the combine operation be implemented efficiently?
In both cases the combine operation must be efficient, i.e. execute in O(log n + log m) time, where n and m are the sizes of the two input combiners. This is because during a parallel operation, the combine method could be invoked multiple times.
Let's assume that we implement (a variant of) the combine operation as follows:
def combine(xs: Array[Int], ys: Array[Int]): Array[Int] = {
val r = new Array[Int](xs.length + ys.length) // n+m computational steps
Array.copy(xs, 0, r, 0, xs.length) // n computational steps
Array.copy(ys, 0, r, xs.length, ys.length) // m computational steps
r
}
This method is very inefficient. Its total execution takes 2n + 2m computational steps, so this is a linear, O(n+m) execution time.
Arrays cannot be efficiently concatenated
What about other data structures, like maps or sets?
Most set implementation have efficient lookup, read and write operations:
Most set implementations don't have an efficient union operation, which makes combiners tricky to implement.
Operation complexity for sequences vary:
mutable linked lists can have O(1) for concatenation, for all other data structures the concatenation performs in O(n) as the data needs to be copied.
This is a technique used to implement Combiners efficiently. Before, we assumed that the internal data structure of the combiner is the same as the data structure that it combines (for instance: that combiners for arrays internally use arrays during the combine operation). In two-phase construction, the combiner has an intermediary data structure used during the combine operation. The intermediary data structure:
This would allow building the resulting data structure in two phases:
1) Phase 1
2) Phase 2
Example: implementing an Array combiner. For simplicity, the combiner will deal with references. Internally, the combiner uses a numElements field to keep track of the total number of elements in the combiner. The internal storage area of the combiner is an ArrayBuffer of an ArrayBuffer of elements of type T. The internal ArrayBuffer is used to store the actual elements. The external ArrayBuffer is used for combining.
class ArrayCombiner[T <: AnyRef: ClassTag](val parallelism: Int) {
private var numElements = 0
private val buffers = new ArrayBuffer[ArrayBuffer[T]]
buffers += new ArrayBuffer[T] // only the first element is used, the others
// will be used for combining
}
def +=(x: T) = {
buffers.last += x
numElements += 1
this
}
def combine(that: ArrayCombiner[T]) = {
buffers ++= that.buffers
numElements += that.numElements
this
}
def result: Array[T] = {
val array = new Array[T](numElements)
val step = math.max(1, numElements/parallelism)
val starts = (0 until numElements by step) :+ numElements
val chunks = starts.zip(starts.tail)
val tasks = for ((from, end) <- chunks) yield task {
copyTo(array, from, end)
}
tasks.foreach(_.join())
array
}
xs.par.aggregate(newCombiner)(_ += _, _ combine _).result
The conc data type is a parallel counterpart to functional cons-lists.
As a reminder, here is what the functional cons-lists look like:
sealed Trail List[+T] {
def head: T
def tail: List[T]
}
case class ::[T](head: T, tail: List[T]) extends List[T]
case object Nil extends List[Nothing] {
def head = sys.error("empty list")
def tail = sys.error("empty list")
}
So, there are two implementation of the List: *cons* and Nil***.
Implementing a filter method on the list:
def filter(lst: List[T])(p: T => Boolean): List[T] = lst match {
case x :: xs if (p(x)) => x :: filter(xs)(p)
case x :: xs => filter(xs)(p)
case Nil => Nil
}
Due to this recursive structure, lists are suitable for sequential computations. They are traversed from left to right.
Trees are suitable for parallel computations. Their subtrees can be traversed in parallel. Let's look at a Tree implementation:
sealed trait Tree[+T]
case class Node[T](left: Tree[T], right: Tree[T]) extends Tree[T]
case class Leaf[T](elem: T) extends Tree[T]
case object Empty extends Tree[Nothing]
Implementing again the filter operation, this time using Tree data types:
def filter(t: Tree[T])(p: T => Boolean): Tree[T] = t match {
case Node(left, right) => Node(parallel(filter(left)(p), filter(right)(p)))
case Leaf(elem) => if (p(elem)) t else Empty
case Empty => Empty
}
Trees are only good for parallelism if they are balanced.
The Conc data type represents balanced trees (suitable for parallelism).
sealed trait Conc[+T] {
def level: Int // longest path from the root to a leaf (tree height)
def size: Int // number of elements in the tree
def left: Conc[T]
def right: Conc[T]
}
Here is a succinct implementation of the Conc list:
case object Empty extends Conc[Nothing] {
def level = 0
def size = 0
}
case Single[T](val x: T) extends Conc[T] {
def level = 0
def size = 1
}
case class <>[T](left: Conc[T], right: Conc[T]) {
def level = 1 + math.max(left.level, right.level)
def size = left.size + right.size
}
In addition, we define two invariants on these data types:
1) A <> node can never contain Empty as one of its subtrees 2) The level difference between the left and right subtrees of a <> node can only be 1 or 0
Using these invariants we can implement concatenation as follows:
def <>(that: Conc[T]): Conc[T] = {
if (this == Empty) that // first invariant
else if (that == Empty) this // first invariant
else concat(this, that)
}
The concat method will re-organize the tree in order to satisfy the second invariant (i.e. balance the tree).
def concat[T](xs: Conc[T], ys: Conc[T]): Conc[T] = {
val diff = ys.level - xs.level
if ((diff >= -1) && (diff <= 1)) new <>(xs, ys)
else if (diff > -1) { // the left tree is higher than the right tree
if (xs.left.level >= xs.right.level) { // the left tree is left-leaning
val nr = concat(xs.right, ys)
new <>(xs.left, nr)
}
else { // the left tree is right-leaning
val nrr = concat(xs.right.right, ys)
if (nrr.level == (xs.level - 3)) {
val nl = xs.left
val nr = new <>(xs.right.left, nrr)
new <>(nl, nr)
}
else {
val nl = new <>(xs.left, xs.right.left)
val nr = nrr
new <>(nl, nr)
}
}
}
}
When concatenating, assuming the left tree is higher than the right tree:
How to implement an append method for conc-trees with a constant running time? Such a method is crucial in implementing combiners efficiently. Here is a possible implementation of the += method in a combiner:
var xs: Conc[T] = Empty
def +=(elem: T) = {
xs = xs <> Single(elem)
}
This method has an execution time of O(log n). This is not bad, but can we make this running time constant instead? This is possible by relaxing some of the previous invariants and introducing a new type of node in the conc-tree:
case class Append[T](left: Conc[T], right: Conc[T]) extends Conc[T] {
val level = 1 + math.max(left.level, right.level)
val size = left.size + right.size
}
We will allow relaxing the "balance tree" invariant for this type of node.
With this new node type, this is a possible implementation of appendLeaf:
def appendLeaf[T](xs: Conc[T], y: T): Conc[T] = Append(xs, new Single(y))
This operation has an execution time of O(1). Unfortunately, this leads to imbalanced trees. Can we still achieve an O(log n) running time for the concatenation? In other words, when concatenating, can we eliminate the Append nodes in O(log n) time? This is NOT possible. This implementation breaks the O(log n) bound on concatenation.
In the next implementation we will make sure that if the total number of elements in the tree is n, there are not more than (log n) Append nodes in the data structure.
In order to understand how to achieve this, let's take the example of counting in the binary number system. When counting in the binary system, when a number doubles (logarithmic growth of the value), the number of digits grows by 1 (linear growth of the number of digits): 4 = 100, 8 = 1000, 16 = 10000.
We can use this observation in building an algorithm for adding Append nodes:
def appendLeaf[T](xs: Conc[T], ys: Single[T]): Conc[T] = xs match {
case Empty => ys
case Single[T] => new <>(xs, ys)
case _ <> _ => new Append(xs, ys)
case xs: Append[T] => append(xs, ys)
}
@tailrec
private def append[T](xs: Append[T], ys: Conc[T]): Conc[T] = {
if (xs.right.level > ys.level) new Append(xs, ys)
else {
val zs = new <>(xs.right, ys)
xs.left match {
case ws @ Append(_, _) => append(ws, zs)
case ws if (ws.level <= zs.level) => ws <> zs
case ws => new Append(ws, zs)
}
}
}
Let's use the conc-tree defined before to implement a Combiner. We will call this implementation a ConcBuffer. The ConcBuffer appends elements into an array of size k. When the array gets full, it is stored in a Chunk node and added into the conc-tree.
class ConcBuffer[T: ClassTag](val k: Int, private var conc: Conc[T]) {
private var chunk: Array[T] = new Array(k)
private var chunkSize: Int = 0
}
The += operation usually just adds an element to the chunk array:
final def +=(elem: T): Unit = {
if (chunkSize >= k) expand()
chunk(chunkSize) = elem
chunkSize += 1
}
The expand method will push a filled array into the conc-tree, encapsulated in a new type of node:
class Chunk[T](val array: Array[T], val size: Int) extends Conc[T] {
def level = 0
}
With this new node type, the expand method is very simple:
private def expand() = {
conc = appendLeaf(conc, new Chunk(chunk, chunkSize))
chunk = new Array(k)
chunkSize = 0
}
Finally, the combine and result methods are equally simple:
final def combine(that: ConcBuffer[T]): ConcBuffer[T] = {
val combinedConc = this.result <> that.result
new ConcBuffer(k, combinedConc)
}
def result(): Conc[T] = {
conc = appendLeaf(conc, new Conc.Chunk(chunk, chunkSize))
conc
}
The ConcBuffer is invoked as follows:
xs.par.aggregate(new ConcBuffer[String]) (_ += _, _ combine _).result