201.04 - Timely effects

Imperative event handling - the Observer pattern

Widely used when views need to react to changes in a model. Variants:

  • Publish/subscribe
  • MVC

Example: Publisher and Subscriber traits:

trait Publisher {
  private var subscribers: Set[Subscriber] = Set()

  def subscribe(subscriber: Subscriber): Unit = 
    subscribers += subscriber

  def unsubscribe(subscriber: Subscriber): Unit = 
    subscribers -= subscriber

  def publish(): Unit = 
    subscribers.forEach(_.handler(this))
}

trait Subscriber {
  def handler(pub: Publisher)
}

Making the BankAccount a Publisher:

class BankAccount extends Publisher {
  private var balance = 0

  def currentBalance: Int = balance

  def deposit(amount: Int): Unit = {
    if (amount > 0) {
      balance = balance + amount
      publish()
    }
  }

  def withdraw(amount: Int): Unit = {
    if ((amount > 0) && (amount <= balance)) {
      balance = balance - amount
      publish()
    }
    else {
      throw new Error("Insufficient funds")
    }
  }
}

And here is a class that that maintains the total balance for a list of bank accounts:

class Consolidator(observed: List[BankAccount]) extends Subscriber {

  observed.forEach(_.subscribe(this)

  private var total: Int = _            // uninitialized
  compute()

  def totalBalance = total

  private def compute() = 
    total = observed.map(_.currentBalance).sum()

  def handler(pub: Publisher) = 
    compute()

}

Good:

  • Decouples views from state
  • Allows multiple views for a given state
  • Simple to set up

Bad:

  • Forces imperative style of programming (handlers are Unit-typed)
  • Many moving parts that need to be co-ordinated
  • Concurrency makes things more complicated
  • Views are tightly bound to one state; view update happens immediately when the state changes;

Example: Adobe's desktop applications (2008) - 1/3 of the code is in event handlers, 1/2 of the bugs are in this code.

Functional reactive programming

Reactive programming is about reacting to sequences of events that happen over time. A sequence of events gets aggregated into a signal.

A signal is:

  • a value that changes over time;
  • represented as a function from time to the value domain;
  • instead of propagating updates to mutable state, new signals are defined in terms of existing ones;

Example - mouse positions:

  • Event-based: whenever the mouse moves, a callback is called:

    MouseMoved(toPos: Position)
  • FRP: a signal which at any point gives the current mouse position:

    mousePosition: Signal[Position]

    Signals have two fundamental operations:

  • get the value of the signal at the current time:

    mousePosition()            // the current mouse position
    • define a signal in terms of other signals (here, using the Signal constructor):
      def inRectangle(LL: Position, UR: Position): Signal[Boolean] = {
      Signal {
      val pos = mousePosition()
      LL <= pos && pos >= UR
      }
      }

      Signals can also describe constant values:

      val sig = Signal(3)        // a constant value

A signal that varies in time can be described:

  • by using externally defined signals, such as mousePosition and mapping over them;
  • by using a var

Values of type Signal are immutable. But we can define a variable signal by using a subclass of Signal called Var:

val sig = Var(3)
sig.update(5)              // from now on, the signal value is 5
sig() = 7                  // another update using Scala abbreviated syntax

Important note: There is a big difference between updating a signal and declaring the signal as a mutable variable, because we can map over signals and therefore all "composed" signals are automatically updated. This is simply not possible by using mutable variables, unless manually propagating the updates.

Repeating the BankAccount example with signals:

class BankAccount extends Publisher {
  val balance = Var(0)

  def currentBalance: Int = balance

  def deposit(amount: Int): Unit = {
    if (amount > 0) {
      val b = balance()
      balance() = b + amount
    }
  }

  def withdraw(amount: Int): Unit = {
    if ((amount > 0) && (amount <= balance())) {
      val b = balance()
      balance() = b - amount
    }
    else {
      throw new Error("Insufficient funds")
    }
  }
}

def consolidated(accts: List[BankAccount]): Signal[Int] = 
  Signal(accts.map(_.balance()).sum)

Simple FRP implementation

A simple implementation of Signal and Var:

class Signal[T](expr: => T) {
  def apply(): T = ???
}

object Signal {
  def apply[T](expr: => T) = new Signal(expr) 
}

class Var[T](expr: => T) extends Signal[T](expr) {
  def update(expr: => T): Unit = ???
}

object Var{
  def apply[T](expr: => T) = new Var(expr) 
}

A signal maintains:

  • its current value;
  • the expression that determines the signal value;
  • a set of observers: other signals that depend on its value;

If the signal changes, all observers need to be re-evaluated.

How do we record dependencies in observers?

  • when evaluating a signal-based expression, need to know which signal caller gets defined or updated by the expression;
  • if we know that, then executing a sig() means adding the caller to the observers of sig;
  • when signal sig's value is changing all previously observing signals are re-evaluated and the list of observers is cleared;
  • Re-evaluation will re-enter a calling signal caller in the list of observers, as long as caller's value still depends on sig;

How do we find on whose behalf a signal expression is evaluated? One simple way is to maintain a global structure referring to the current caller. That data structure is stack-like because one evaluation of a signal might trigger others.

Here's a class for stackable variables:

class StackableVariable[T](init: T) = {

  private var values: List[T] = List(init)

  def value: T = values.head

  def withValue[R](newValue: T)(op: => R): R = {
    values = newValue :: values
    try op finally values = values.tail
  }
}

val caller = new StackableVariable(initialSig)
caller.withValue(otherSig) { ... }
... caller.value ...

We can then add a StackableVariable to the Signal object:

object NoSignal extends Signal[Nothing](???) { ... }     // "sentinel" object

object Signal {
  private var caller = new StackableVariable[Signal[_]](NoSignal)
  def apply[T](expr: => T) = new Signal(expr) 
}

And the Signal class:

class Signal[T](expr: => T) {
  import Signal._
  private var myExpr: () => T = _
  private var myValue: T = _
  private var observers: Set[Signal[_]] = Set()
  update(expr)

  protected def update(expr: => T): Unit = {
    myExpr = () => expr
    computeValue()
  }

  protected def computeValue(): Unit = {
    val newValue = caller.withValue(this)(myExpr())
    if (newValue != myValue) {
      myValue = newValue
      val obs = observers
      observers = Set()
      obs.forEach(_.computeValue())
    }
  }

  def apply(): T = {
    observers += caller.value
    assert(!caller.value.observers.contains(this), "cyclic signal definition")
    myValue
  }
}

We also have to disable the evaluation for NoSignal:

object NoSignal extends Signal[Nothing](???) {         // "sentinel" object
  override def computeValue() = ()
}

Finally, we have to expose the update in subclass Var:

class Var[T](expr: => T) extends Signal[T](expr) {
  def update(expr: => T): Unit = super.update(expr)
}

object Var{
  def apply[T](expr: => T) = new Var(expr) 
}

This implementation of Signal is not thread-safe because of the use of global callers stack. We can get around this in one of the following ways:

  • Use thread synchronization - slow, blocking
  • Use thread-local state (through scala.util.DynamicVariable) - imperative, slow (global hash-table lookup to access the thread-local space), thread-multiplexing problem
  • Implicit parameters - the callers are passed down in each call as an implicit parameter - more boiler-plate

    Latency as an effect

    The four essential effects in programming:

  • Synchronous
    • One: T/Try[T]
    • Many: Iterable[T]
  • Asynchronous:
    • One: Future[T] - monad that captures the fact that computations take time
    • Many: Observable[T]

Simple Adventure game:

trait Adventure {
  def collectCoins: List[Coin]
  def buyTreasure(coins: List[Coin]): Treasure
}

val adventure = Adventure()
val coins = adventure.collectCoins()
val treasure = adventure.buyTreasure(coins)

Morphing the Adventure game into a network Socket:

trait Socket {
  def readFromMemory: Array[Byte]
  def sendToEurope(packet: Array[Byte]): Array[Byte]
}

val socket = Socket()
val packet = socket.readFromMemory()
val confirmation = socket.sendToEurope(packet)

This is not as easy as it looks:

  • reading from memory takes time
  • sending a packet over the network takes even more time

This should be rewritten as:

val socket = Socket()
val packet = socket.readFromMemory()
// block for 50,000 ns
// only continue if there is no exception
val confirmation = socket.sendToEurope(packet)
// block for 150,000 ns
// only continue if there is no exception

Instead of blocking/waiting, we could register a callback to be invoked when the computation is complete.

Future[T] is a monad that handles both exceptions and latency.

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global

trait Future[T] {
  def onComplete(callback: Try[T] => Unit)
    (implicit executor: ExecutionContext): Unit
}

Callbacks need to use pattern matching:

ts match {
  case Success(t) => onNext(t)
  case Failure(e) => onError(e)
}

Alternative design for Futures, using continuations:

trait Future[T] {
  def onComplete(success: T => Unit, failed: Throwable => Unit): Unit
}

or Observables:

trait Future[T] {
  def onComplete(callback: Observer[T]): Unit
}

trait Observer[T] {
  def onNext(value: T): Unit
  def onError(error: Throwable): Unit
}

Going back to the original example:

trait Future[T] {
  def onComplete(callback: Try[T] => Unit)
    (implicit executor: ExecutionContext): Unit
}

trait Socket {
  def readFromMemory: Future[Array[Byte]]
  def sendToEurope(packet: Array[Byte]): Future[Array[Byte]]
}

Using the Future to send the packet (version 1):

val socket = Socket()
val packet: Future[Array[Byte]] = socket.readFromMemory()
val confirmation: Future[Array[Byte]] = {
  packet.onComplete {
    case Success(p) => socket.sendToEurope(p)
    case Failure(t) => ...
  }
}

Doesn't work. onComplete returns a Unit, while we need it to be a Future[Array[Byte]]. Trying another way (version 2):

val socket = Socket()
val packet: Future[Array[Byte]] = socket.readFromMemory()

packet.onComplete {
  case Success(p) => {
    val confirmation: Future[Array[Byte]] = socket.sendToEurope(p)
  }
  case Failure(t) => ...
}

This is messy. Let's dive deeper and see how the Futures are constructed:

// Starts an asynchronous computation
// and returns a future object to which you
// can subscribe to be notified when the future
// is complete

object Future {
  def apply(body: => T)
    (implicit context ExecutionContext): Future[T]
}

Example of creating a Future:

import scala.concurrent.ExecutionContext.Implicits.global
import akka.serializer._

val queue = Queue[EmailMessage](
  EmailMessage(from = "Erik", to = "Roland"),
  EmailMessage(from = "Martin", to = "Erik"),
  EmailMessage(from = "Roland", to = "Martin")
)

def readFromMemory: Future[Array[Byte]] = Future {
  val email = queue.dequeue()
  val serializer = serialization.findSerializerFor(email)
  serializer.toBinary(email)
}

Combinators on Futures

Recap:

trait Awaitable[T] extends AnyRef {
  abstract def ready(atMost: Duration): Unit
  abstract def result(atMost: Duration): T
}

trait Future[T] extends Awaitable[T] {
  def filter(p: T => Boolean): Future[T]
  def flatMap[S](f: T => Future[S]): Future[U]
  def map[S](f: T => S): Future[S]
  def recoverWith(f: PartialFunction[Throwable, Future[T]]): Future[T]
}

object Future {
  def apply[T](body: => T): Future[T]
}

Redoing the Socket example, this time using flatMap (version 3):

val socket = Socket()
val packet: Future[Array[Byte]] = socket.readFromMemory()

val confirmation: Future[Array[Byte]] = 
    packet.flatMap(p => socket.sendToEurope(p))

Another (fictional) example:

import scala.imaginary.Http._
import scala.concurrent.ExecutionContext.Implicits.global

object Http {
  def apply(url: URL, req: Request): Future[Response] = 
     { ... runs the HTTP request asynchronously ... }

  def sendToEurope(packet: Array[Byte]): Future[Array[Byte]] =
    Http(URL("mail.server.eu", Request(packet))
      .filter(response => response.isOk)
      .map(response => response.toByteArray)
}