301.04 - Data structures for parallel computing

Implementing Combiners

A transformer operation is an operation that creates another collection, instead than just a single value, ex: filter, map, flatMap, groupBy. As an opposite, methods such as fold, sum or aggregate are NOT transformer operations.

Sequential transformer operations can be implemented generically using an abstraction called Builder:

trait Builder[T, Repr] {
  def += (elem: T): this.type
  def result: Repr
}

T is the type of the elements of the collection, Repr is the type of the collection. For instance T could be String and Repr could be a Seq[String]. The two operations:

  • += is used to add an element to the collection
  • result is used to get the collection after all the elements have been added

Let's take a transformer operation. Its implementation will:

  • instantiate a new builder
  • iterate over the original collection and for each element transform it and call the += operation of the builder
  • at the end, call result to fetch the resulting collection

Builders can only be used to implement sequential transformer operations. To implement parallel transformer operations we need an abstraction called a Combiner:

trait Combiner[T, Repr] extends Builder[T, Repr] {
  def combine(that: Combiner[T, Repr]): Combiner[T, Repr]
}

A Combiner is a Builder with one extra method combine that takes the elements from two input combiners and creates a new combiner. How can the combine operation be implemented efficiently?

  • when Repr is a Set or a Map, the combine operation represents a union
  • when Repr is a Seq, the combine operation represents concatenation

In both cases the combine operation must be efficient, i.e. execute in O(log n + log m) time, where n and m are the sizes of the two input combiners. This is because during a parallel operation, the combine method could be invoked multiple times.

Let's assume that we implement (a variant of) the combine operation as follows:

def combine(xs: Array[Int], ys: Array[Int]): Array[Int] = {
  val r = new Array[Int](xs.length + ys.length)       // n+m computational steps
  Array.copy(xs, 0, r, 0, xs.length)                  // n computational steps
  Array.copy(ys, 0, r, xs.length, ys.length)          // m computational steps
  r
}

This method is very inefficient. Its total execution takes 2n + 2m computational steps, so this is a linear, O(n+m) execution time.

Arrays cannot be efficiently concatenated

What about other data structures, like maps or sets?

Sets

Most set implementation have efficient lookup, read and write operations:

  • hash tables - O(1)
  • balanced trees - O(log n)
  • linked lists - O(n)

Most set implementations don't have an efficient union operation, which makes combiners tricky to implement.

Sequences

Operation complexity for sequences vary:

  • mutable linked lists - O(1) prepend and append, O(n) lookup or insertion
  • functional (cons) lists - O(1) prepends O(n) anything else, including appends
  • array lists - O(1) append, O(1) random access, O(n) everything else

mutable linked lists can have O(1) for concatenation, for all other data structures the concatenation performs in O(n) as the data needs to be copied.

Parallel Two-Phase Construction

This is a technique used to implement Combiners efficiently. Before, we assumed that the internal data structure of the combiner is the same as the data structure that it combines (for instance: that combiners for arrays internally use arrays during the combine operation). In two-phase construction, the combiner has an intermediary data structure used during the combine operation. The intermediary data structure:

  • has an efficient combine method - O(log n + log m) or better
  • has an efficient += method
  • can be converted into the resulting data structure in O(n/P) time, where n is the size of the data structure and P is the number of processors (which means that copying from the intermediary to the resulting data structure must be parallelizable)

This would allow building the resulting data structure in two phases:

1) Phase 1

  • different processor build chunks of the intermediate data structure using the += operation
  • the n chunks are combined in a parallel reduction tree until there's a single intermediate data structure at the root

2) Phase 2

  • the result method uses the intermediary data structure to build the resulting data structure in parallel

Example: implementing an Array combiner. For simplicity, the combiner will deal with references. Internally, the combiner uses a numElements field to keep track of the total number of elements in the combiner. The internal storage area of the combiner is an ArrayBuffer of an ArrayBuffer of elements of type T. The internal ArrayBuffer is used to store the actual elements. The external ArrayBuffer is used for combining.

class ArrayCombiner[T <: AnyRef: ClassTag](val parallelism: Int) {
  private var numElements = 0
  private val buffers = new ArrayBuffer[ArrayBuffer[T]]
  buffers += new ArrayBuffer[T]   // only the first element is used, the others
                                  // will be used for combining

}

def +=(x: T) = {
  buffers.last += x
  numElements += 1
  this
}

def combine(that: ArrayCombiner[T]) = {
  buffers ++= that.buffers
  numElements += that.numElements
  this
}

def result: Array[T] = {
  val array = new Array[T](numElements)
  val step = math.max(1, numElements/parallelism)
  val starts = (0 until numElements by step) :+ numElements
  val chunks = starts.zip(starts.tail)
  val tasks = for ((from, end) <- chunks) yield task {
    copyTo(array, from, end)
  }
  tasks.foreach(_.join())
  array
}

xs.par.aggregate(newCombiner)(_ += _, _ combine _).result

Conc-Trees

The conc data type is a parallel counterpart to functional cons-lists.

Lists

As a reminder, here is what the functional cons-lists look like:

sealed Trail List[+T] {
  def head: T
  def tail: List[T]
}

case class ::[T](head: T, tail: List[T]) extends List[T]

case object Nil extends List[Nothing] {
  def head = sys.error("empty list")
  def tail = sys.error("empty list")
}

So, there are two implementation of the List: *cons* and Nil***.

Implementing a filter method on the list:

def filter(lst: List[T])(p: T => Boolean): List[T] = lst match {
  case x :: xs if (p(x)) => x :: filter(xs)(p)
  case x :: xs => filter(xs)(p)
  case Nil => Nil
}

Due to this recursive structure, lists are suitable for sequential computations. They are traversed from left to right.

Trees

Trees are suitable for parallel computations. Their subtrees can be traversed in parallel. Let's look at a Tree implementation:

sealed trait Tree[+T]

case class Node[T](left: Tree[T], right: Tree[T]) extends Tree[T]

case class Leaf[T](elem: T) extends Tree[T]

case object Empty extends Tree[Nothing]

Implementing again the filter operation, this time using Tree data types:

def filter(t: Tree[T])(p: T => Boolean): Tree[T] = t match {
  case Node(left, right) => Node(parallel(filter(left)(p), filter(right)(p)))
  case Leaf(elem) => if (p(elem)) t else Empty
  case Empty => Empty
}

Trees are only good for parallelism if they are balanced.

Conc data type

The Conc data type represents balanced trees (suitable for parallelism).

sealed trait Conc[+T] {
  def level: Int          // longest path from the root to a leaf (tree height)
  def size: Int           // number of elements in the tree
  def left: Conc[T]
  def right: Conc[T]
}

Here is a succinct implementation of the Conc list:

case object Empty extends Conc[Nothing] {
  def level = 0
  def size = 0
}

case Single[T](val x: T) extends Conc[T] {
  def level = 0
  def size = 1
}

case class <>[T](left: Conc[T], right: Conc[T]) {
  def level = 1 + math.max(left.level, right.level)
  def size = left.size + right.size
}

In addition, we define two invariants on these data types:

1) A <> node can never contain Empty as one of its subtrees 2) The level difference between the left and right subtrees of a <> node can only be 1 or 0

Using these invariants we can implement concatenation as follows:

def <>(that: Conc[T]): Conc[T] = {
  if (this == Empty) that        // first invariant
  else if (that == Empty) this   // first invariant
  else concat(this, that)
}

The concat method will re-organize the tree in order to satisfy the second invariant (i.e. balance the tree).

def concat[T](xs: Conc[T], ys: Conc[T]): Conc[T] = {
  val diff = ys.level - xs.level
  if ((diff >= -1) && (diff <= 1)) new <>(xs, ys)
  else if (diff > -1) {   // the left tree is higher than the right tree
    if (xs.left.level >= xs.right.level) {   // the left tree is left-leaning
      val nr = concat(xs.right, ys)
      new <>(xs.left, nr)
    }
    else {  // the left tree is right-leaning
      val nrr = concat(xs.right.right, ys)
      if (nrr.level == (xs.level - 3)) {
        val nl = xs.left
        val nr = new <>(xs.right.left, nrr)
        new <>(nl, nr)
      }
      else {
        val nl = new <>(xs.left, xs.right.left)
        val nr = nrr
        new <>(nl, nr)
      }
    }
  }
}

When concatenating, assuming the left tree is higher than the right tree:

  • if the left tree is left-leaning, we must recursively concatenate the right subtree (of the left tree) to the right tree
  • if the left subtree is roght-leaning, ... it's more complicated

Amortized Conc-Tree appends

How to implement an append method for conc-trees with a constant running time? Such a method is crucial in implementing combiners efficiently. Here is a possible implementation of the += method in a combiner:

var xs: Conc[T] = Empty
def +=(elem: T) = {
  xs = xs <> Single(elem)
}

This method has an execution time of O(log n). This is not bad, but can we make this running time constant instead? This is possible by relaxing some of the previous invariants and introducing a new type of node in the conc-tree:

case class Append[T](left: Conc[T], right: Conc[T]) extends Conc[T] {
  val level = 1 + math.max(left.level, right.level)
  val size = left.size + right.size
}

We will allow relaxing the "balance tree" invariant for this type of node.

With this new node type, this is a possible implementation of appendLeaf:

def appendLeaf[T](xs: Conc[T], y: T): Conc[T] = Append(xs, new Single(y))

This operation has an execution time of O(1). Unfortunately, this leads to imbalanced trees. Can we still achieve an O(log n) running time for the concatenation? In other words, when concatenating, can we eliminate the Append nodes in O(log n) time? This is NOT possible. This implementation breaks the O(log n) bound on concatenation.

In the next implementation we will make sure that if the total number of elements in the tree is n, there are not more than (log n) Append nodes in the data structure.

In order to understand how to achieve this, let's take the example of counting in the binary number system. When counting in the binary system, when a number doubles (logarithmic growth of the value), the number of digits grows by 1 (linear growth of the number of digits): 4 = 100, 8 = 1000, 16 = 10000.

We can use this observation in building an algorithm for adding Append nodes:

def appendLeaf[T](xs: Conc[T], ys: Single[T]): Conc[T] = xs match {
  case Empty => ys
  case Single[T] => new <>(xs, ys)
  case _ <> _ => new Append(xs, ys)
  case xs: Append[T] => append(xs, ys)
}

@tailrec
private def append[T](xs: Append[T], ys: Conc[T]): Conc[T] = {
  if (xs.right.level > ys.level) new Append(xs, ys)
  else {
    val zs = new <>(xs.right, ys)
    xs.left match {
      case ws @ Append(_, _) => append(ws, zs)
      case ws if (ws.level <= zs.level) => ws <> zs
      case ws => new Append(ws, zs)
    }
  }
}

Conc-tree combiners

Let's use the conc-tree defined before to implement a Combiner. We will call this implementation a ConcBuffer. The ConcBuffer appends elements into an array of size k. When the array gets full, it is stored in a Chunk node and added into the conc-tree.

class ConcBuffer[T: ClassTag](val k: Int, private var conc: Conc[T]) {
  private var chunk: Array[T] = new Array(k)
  private var chunkSize: Int = 0
}

The += operation usually just adds an element to the chunk array:

final def +=(elem: T): Unit = {
  if (chunkSize >= k) expand()
  chunk(chunkSize) = elem
  chunkSize += 1
}

The expand method will push a filled array into the conc-tree, encapsulated in a new type of node:

class Chunk[T](val array: Array[T], val size: Int) extends Conc[T] {
  def level = 0
}

With this new node type, the expand method is very simple:

private def expand() = {
  conc = appendLeaf(conc, new Chunk(chunk, chunkSize))
  chunk = new Array(k)
  chunkSize = 0
}

Finally, the combine and result methods are equally simple:

final def combine(that: ConcBuffer[T]): ConcBuffer[T] = {
  val combinedConc = this.result <> that.result
  new ConcBuffer(k, combinedConc)
}

def result(): Conc[T] = {
  conc = appendLeaf(conc, new Conc.Chunk(chunk, chunkSize))
  conc
}

The ConcBuffer is invoked as follows:

xs.par.aggregate(new ConcBuffer[String]) (_ += _, _ combine _).result