Widely used when views need to react to changes in a model. Variants:
Example: Publisher and Subscriber traits:
trait Publisher {
private var subscribers: Set[Subscriber] = Set()
def subscribe(subscriber: Subscriber): Unit =
subscribers += subscriber
def unsubscribe(subscriber: Subscriber): Unit =
subscribers -= subscriber
def publish(): Unit =
subscribers.forEach(_.handler(this))
}
trait Subscriber {
def handler(pub: Publisher)
}
Making the BankAccount a Publisher:
class BankAccount extends Publisher {
private var balance = 0
def currentBalance: Int = balance
def deposit(amount: Int): Unit = {
if (amount > 0) {
balance = balance + amount
publish()
}
}
def withdraw(amount: Int): Unit = {
if ((amount > 0) && (amount <= balance)) {
balance = balance - amount
publish()
}
else {
throw new Error("Insufficient funds")
}
}
}
And here is a class that that maintains the total balance for a list of bank accounts:
class Consolidator(observed: List[BankAccount]) extends Subscriber {
observed.forEach(_.subscribe(this)
private var total: Int = _ // uninitialized
compute()
def totalBalance = total
private def compute() =
total = observed.map(_.currentBalance).sum()
def handler(pub: Publisher) =
compute()
}
Good:
Bad:
Example: Adobe's desktop applications (2008) - 1/3 of the code is in event handlers, 1/2 of the bugs are in this code.
Reactive programming is about reacting to sequences of events that happen over time. A sequence of events gets aggregated into a signal.
A signal is:
Example - mouse positions:
Event-based: whenever the mouse moves, a callback is called:
MouseMoved(toPos: Position)
FRP: a signal which at any point gives the current mouse position:
mousePosition: Signal[Position]
Signals have two fundamental operations:
get the value of the signal at the current time:
mousePosition() // the current mouse position
def inRectangle(LL: Position, UR: Position): Signal[Boolean] = {
Signal {
val pos = mousePosition()
LL <= pos && pos >= UR
}
}
Signals can also describe constant values:
val sig = Signal(3) // a constant valueA signal that varies in time can be described:
Values of type Signal are immutable. But we can define a variable signal by using a subclass of Signal called Var:
val sig = Var(3)
sig.update(5) // from now on, the signal value is 5
sig() = 7 // another update using Scala abbreviated syntax
Important note: There is a big difference between updating a signal and declaring the signal as a mutable variable, because we can map over signals and therefore all "composed" signals are automatically updated. This is simply not possible by using mutable variables, unless manually propagating the updates.
Repeating the BankAccount example with signals:
class BankAccount extends Publisher {
val balance = Var(0)
def currentBalance: Int = balance
def deposit(amount: Int): Unit = {
if (amount > 0) {
val b = balance()
balance() = b + amount
}
}
def withdraw(amount: Int): Unit = {
if ((amount > 0) && (amount <= balance())) {
val b = balance()
balance() = b - amount
}
else {
throw new Error("Insufficient funds")
}
}
}
def consolidated(accts: List[BankAccount]): Signal[Int] =
Signal(accts.map(_.balance()).sum)
A simple implementation of Signal and Var:
class Signal[T](expr: => T) {
def apply(): T = ???
}
object Signal {
def apply[T](expr: => T) = new Signal(expr)
}
class Var[T](expr: => T) extends Signal[T](expr) {
def update(expr: => T): Unit = ???
}
object Var{
def apply[T](expr: => T) = new Var(expr)
}
A signal maintains:
If the signal changes, all observers need to be re-evaluated.
How do we record dependencies in observers?
How do we find on whose behalf a signal expression is evaluated? One simple way is to maintain a global structure referring to the current caller. That data structure is stack-like because one evaluation of a signal might trigger others.
Here's a class for stackable variables:
class StackableVariable[T](init: T) = {
private var values: List[T] = List(init)
def value: T = values.head
def withValue[R](newValue: T)(op: => R): R = {
values = newValue :: values
try op finally values = values.tail
}
}
val caller = new StackableVariable(initialSig)
caller.withValue(otherSig) { ... }
... caller.value ...
We can then add a StackableVariable to the Signal object:
object NoSignal extends Signal[Nothing](???) { ... } // "sentinel" object
object Signal {
private var caller = new StackableVariable[Signal[_]](NoSignal)
def apply[T](expr: => T) = new Signal(expr)
}
And the Signal class:
class Signal[T](expr: => T) {
import Signal._
private var myExpr: () => T = _
private var myValue: T = _
private var observers: Set[Signal[_]] = Set()
update(expr)
protected def update(expr: => T): Unit = {
myExpr = () => expr
computeValue()
}
protected def computeValue(): Unit = {
val newValue = caller.withValue(this)(myExpr())
if (newValue != myValue) {
myValue = newValue
val obs = observers
observers = Set()
obs.forEach(_.computeValue())
}
}
def apply(): T = {
observers += caller.value
assert(!caller.value.observers.contains(this), "cyclic signal definition")
myValue
}
}
We also have to disable the evaluation for NoSignal:
object NoSignal extends Signal[Nothing](???) { // "sentinel" object
override def computeValue() = ()
}
Finally, we have to expose the update in subclass Var:
class Var[T](expr: => T) extends Signal[T](expr) {
def update(expr: => T): Unit = super.update(expr)
}
object Var{
def apply[T](expr: => T) = new Var(expr)
}
This implementation of Signal is not thread-safe because of the use of global callers stack. We can get around this in one of the following ways:
Implicit parameters - the callers are passed down in each call as an implicit parameter - more boiler-plate
The four essential effects in programming:
Simple Adventure game:
trait Adventure {
def collectCoins: List[Coin]
def buyTreasure(coins: List[Coin]): Treasure
}
val adventure = Adventure()
val coins = adventure.collectCoins()
val treasure = adventure.buyTreasure(coins)
Morphing the Adventure game into a network Socket:
trait Socket {
def readFromMemory: Array[Byte]
def sendToEurope(packet: Array[Byte]): Array[Byte]
}
val socket = Socket()
val packet = socket.readFromMemory()
val confirmation = socket.sendToEurope(packet)
This is not as easy as it looks:
This should be rewritten as:
val socket = Socket()
val packet = socket.readFromMemory()
// block for 50,000 ns
// only continue if there is no exception
val confirmation = socket.sendToEurope(packet)
// block for 150,000 ns
// only continue if there is no exception
Instead of blocking/waiting, we could register a callback to be invoked when the computation is complete.
Future[T] is a monad that handles both exceptions and latency.
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
trait Future[T] {
def onComplete(callback: Try[T] => Unit)
(implicit executor: ExecutionContext): Unit
}
Callbacks need to use pattern matching:
ts match {
case Success(t) => onNext(t)
case Failure(e) => onError(e)
}
Alternative design for Futures, using continuations:
trait Future[T] {
def onComplete(success: T => Unit, failed: Throwable => Unit): Unit
}
or Observables:
trait Future[T] {
def onComplete(callback: Observer[T]): Unit
}
trait Observer[T] {
def onNext(value: T): Unit
def onError(error: Throwable): Unit
}
Going back to the original example:
trait Future[T] {
def onComplete(callback: Try[T] => Unit)
(implicit executor: ExecutionContext): Unit
}
trait Socket {
def readFromMemory: Future[Array[Byte]]
def sendToEurope(packet: Array[Byte]): Future[Array[Byte]]
}
Using the Future to send the packet (version 1):
val socket = Socket()
val packet: Future[Array[Byte]] = socket.readFromMemory()
val confirmation: Future[Array[Byte]] = {
packet.onComplete {
case Success(p) => socket.sendToEurope(p)
case Failure(t) => ...
}
}
Doesn't work. onComplete returns a Unit, while we need it to be a Future[Array[Byte]]. Trying another way (version 2):
val socket = Socket()
val packet: Future[Array[Byte]] = socket.readFromMemory()
packet.onComplete {
case Success(p) => {
val confirmation: Future[Array[Byte]] = socket.sendToEurope(p)
}
case Failure(t) => ...
}
This is messy. Let's dive deeper and see how the Futures are constructed:
// Starts an asynchronous computation
// and returns a future object to which you
// can subscribe to be notified when the future
// is complete
object Future {
def apply(body: => T)
(implicit context ExecutionContext): Future[T]
}
Example of creating a Future:
import scala.concurrent.ExecutionContext.Implicits.global
import akka.serializer._
val queue = Queue[EmailMessage](
EmailMessage(from = "Erik", to = "Roland"),
EmailMessage(from = "Martin", to = "Erik"),
EmailMessage(from = "Roland", to = "Martin")
)
def readFromMemory: Future[Array[Byte]] = Future {
val email = queue.dequeue()
val serializer = serialization.findSerializerFor(email)
serializer.toBinary(email)
}
Recap:
trait Awaitable[T] extends AnyRef {
abstract def ready(atMost: Duration): Unit
abstract def result(atMost: Duration): T
}
trait Future[T] extends Awaitable[T] {
def filter(p: T => Boolean): Future[T]
def flatMap[S](f: T => Future[S]): Future[U]
def map[S](f: T => S): Future[S]
def recoverWith(f: PartialFunction[Throwable, Future[T]]): Future[T]
}
object Future {
def apply[T](body: => T): Future[T]
}
Redoing the Socket example, this time using flatMap (version 3):
val socket = Socket()
val packet: Future[Array[Byte]] = socket.readFromMemory()
val confirmation: Future[Array[Byte]] =
packet.flatMap(p => socket.sendToEurope(p))
Another (fictional) example:
import scala.imaginary.Http._
import scala.concurrent.ExecutionContext.Implicits.global
object Http {
def apply(url: URL, req: Request): Future[Response] =
{ ... runs the HTTP request asynchronously ... }
def sendToEurope(packet: Array[Byte]): Future[Array[Byte]] =
Http(URL("mail.server.eu", Request(packet))
.filter(response => response.isOk)
.map(response => response.toByteArray)
}