Let's take a look at reactive programing concepts from a slightly different point of view: how do we glue reactive bits together? We'll contrast a few different approaches and see how they differ - hopefully, this will shine a light at a different angle on the subject, giving you more to think about before starting the next project.
A reactive program (or any program for that matter) is a set of bits of logic, or steps which need to be executed in a sequence, which is more or less deterministic, in order to achieve a goal. These steps themselves don't really change - they tend to be elementary things, all single-threaded local blocks of code or some remote service. It should be interesting to take a look at how we glue these together, how do we tie these steps together into a meaningful whole? What are the benefits of each approach?
It will be quite a long journey, as we'll look at plain old threads, callbacks, futures, ladders, streams, actors, data flows, JMS, workflows.
The examples will be mostly in Scala, with some Javascript sprinkled for good measure. We'll look at a simple use case where we have to update an order (to closed) and the respective account (let's say we shipped the items) and finally notify the user via email.
The problem at hand, the one we will use to see how each technique is applicable, is that of processing order notifications. A notification will:
Each of these steps takes time and has to wait for different resources, so the idea would be to always try to not hold-up everyone while the different steps are processed.
Let's look first at the simplest case, using threads - basically running the steps in sequence, blocking one thread while waiting for the results of each. No error handling yet (or rather implicit via exceptions).
def notify(orderId, notification) = {
val order = db.findOrder (id)
db.updateOrder(order)
val account = db.findAccount(order.accountId)
db.updateAccount(account)
email.send(account.email, notification);
}
The compiler builds a context, keeping the notification
in scope for the last call. All finders are assumed synchronous and local. If they were not local, like an actual database call across a network, the thread would be suspended somewhere inside the database driver, waiting for the reply and then resumed (the entire context reinstated) when the result became available.
notification
becomes available as part of a closure.
def notify(orderId, notification) = {
db.findOrder (id).flatMap {order =>
db.updateOrder(order)
db.findAccount(order.accountId).map {account=>
db.updateAccount(account)
email.send(account.email, notification);
}
}
}
This version is much more like the others we will see below and it's a good idea to make sure you understand how it works. The signatures are generally db.findOrder(id) : Option[Order]
and Option has map
and flatMap
methods, which will basically be applied only if there was indeed an order, map(f : Order => X)
and flatMap(f : Order => Option[X])
.
The individual db and email operations will still occur in sequence, if the finders actually found the respective entities (order or account). The caller holds the entire thread, waiting for the last operation to be complete, so the notify
call ends only at the end or it all, returning an Option indicating if everything was done. If at any point, any of the finders fails, it returns an empty Option right away.
An exception would rollback the stack (if not the actual operations), while ThreadLocals filled with Transactions keep everything in check.
Many like to take potshots at the thread model nowadays, pointing out its drawbacks:
notify
(using too many threads consumes memory while using just a few threads would limit throughput)However,
The simplest way to glue together or bind asynchronous bits of a program, if you don't want to use threads, is using callbacks, something like this:
def notify(orderId, notification) = {
db.findOrder (id) {order =>
db.updateOrder(order) {
db.findAccount(order.accountId) {account=>
db.updateAccount(account) {
email.send(account.email, notification);
}
}
}
}
}
Yes, yes, callback BLIP? Here it is, a little more terse, in Javascript - I am sure most of you recognize it and probably have done this many times:
function notify(orderId, notification) {
db.findOrder (id, function(order) {
db.updateOrder(order, function() {
db.findAccount(order.accountId, function(account) {
db.updateAccount(account) {
email.send(account.email, notification);
}
})
})
})
}
The db.findOrder(id)(next : Order => X)
gets the continuation, the next
step packaged as a closure and will call it if and when the order is found, on a different thread.
You could certainly leave it up to the main thread to remember the continuation, in which case the db
calls are synchronous and we are basically in the same situation as above. However, this type of programming is mostly used with asynchronous I/O libraries, where each db
call is asynchronous and releases the calling thread. The typical application is in node.js event loops for instance.
All of these process outside of a "main thread".
This and most of the reactive programming techniques we'll look at here, rely on closures - a functional programming construct, where the compiler will capture the entire context of the program at the respective time (well, not all, just the part that was visible to or in the scope of the function being created) and package that together with the function itself, the callback here
Let's take an x-ray of what actually happens here. The several steps of this process or flow are encapsulated in the several callbacks, which will call each-other, in sequence. Each callback is a closure, encapsulating the current state in the scope of this flow, carrying with it values like notification
and objects like order
and account
.
Each is an asynchronous call to a db
, with a callback to be called when the db returns a result - this will occur asynchronously, in the same or other threads, depending on the implementation of the db
. The point however is that these steps will be executed in sequence.
Each db operation, including the first one, will release the calling thread, parking the callback somewhere in the db driver, waiting for a result, very much like a familiar $.ajax(...)
would. This callback will be invoked when the result presents itself, in a different thread (a central event loop or some other type of processing arrangement).
So - the notify
will return right away, having launched a db.findOrder
in the background, which will call the first callback and so on.
It is important to decide if the result of updating the order was important. If it wasn't, if we didn't have to wait for it, we could do something like below, where some steps are launched without callbacks:
def notify(orderId, notification) = {
db.findOrder (id) {order =>
db.updateOrder(order) {} // start this in parallel
db.findAccount(order.accountId) {account=>
db.updateAccount(account) {}; // start this in parallel
email.send(account.email, notification);
}
}
}
We just start that operation db.updateOrder
, but it would have no continuation: we would not wait for its result... so when we sent the email we wouldn't know if it actually succeeded. Say it triggered the shipment - will "John" get mad to find out that his order never shipped, although we just emailed him to "stand by the door"?
So, this doesn't really work - we need each callback, to take action if the step was or not ok. The models above, remember, assume everything is successful and there is no error handling.
The glue is provided by the implementation of each call, in the db driver: each step packages its continuation into a closure, which is passed with each request as the callback and the driver will call them at the end, when the results are available. We don't know what the driver does, likely uses maps of callbacks or some such.
The driver may call them directly using its own threads (likely a problem) or schedule them elsewhere (main thread pool).
The closures are very much part of the glue, since they capture all the state in the scope, at the time the callback was created.
Let's look at some differences from plain old threads:
It's important to note that nobody is maintaining the state of the computation: no more stack trace. The notify
returns as soon as the first finder starts and during the email.send
you have basically no idea who started this and why. A breakpoint will still be triggered properly in the right place, but that's because the IDE is nice to you and remembers the context of the closure.
In that sense, of capturing the enclosing scope and state, closures are somewhat similar to threads. Where closures capture just specific values and variables as such, the thread captures many specific things, including current stack and pointer to the current instruction etc.
See Closures and reactive programming and Experimenting with Closures in reactive programming for more thoughts and experiments on closures.
A modern encapsulation of concurrency and asynchronicity are promises and futures.
def notify(orderId, notification) = {
db.findOrder (id).flatMap {order =>
db.updateOrder(order).flatMap {
db.findAccount(order.accountId).flatMap {account=>
db.updateAccount(account).flatMap {
email.send(account.email, notification);
}
}
}
}
}
All operations now return futures, and the db.findOrder(id) : Future[Order]
looks almost the same as the option version we first saw... and it behaves similarly, except that it returns a Future rather than a result and this Future is also an implicit thread switch. The db driver spawns a request somewhere else and returns a Future to be completed when the response is available from the database. The difference between this and the callback is that the notify
does get a Future object back right away and could later inquire as to the status of the entire thing, something like so:
val result = db.findOrder(id)
//... do something else
if(result.isCompleted) //...
Futures map and flatMap themselves into other Futures above, very much like the Options we looked at, creating essentially a pipeline of asynchronous transformations. The notify
gets a handle to the result of all this, the last Future in the series.
When the last future is complete, the result becomes available, just in case anyone was waiting for it.
Note that in the example above, although multi-threaded, reactive and asynchronous, the steps still occur in sequence, one after the other.
The glue is provided by the Future mechanism. The difference from plain callbacks is that a "workflow" of connected Futures is created up-front, see this puppy for instance:
scala> import scala.concurrent.Future
scala> import scala.concurrent.ExecutionContext.Implicits.global
scala> val x = Future { Thread.sleep(3000); 4 }.map(_ + 4).map(_ + 4)
x: scala.concurrent.Future[Int] = List()
// wait 3 seconds
scala> x
res11: scala.concurrent.Future[Int] = Success(12)
You can see some result is available right away (as a composed Future) even though the sleep is already going on elsewhere. Here's how you can see that you get the last one in the chain of transformations: change the type at the end and the type of the resulting Future changes with it:
scala> val x = Future { Thread.sleep(3000); 4 }.map(_ + 4).map(_ + "4")
x: scala.concurrent.Future[String] = List()
However smart these Future are, they still won't glue together your logs in the different steps etc. One thing they can help with is to fail the result when one step fails, propagating much like an Exception in a thread: if one of the Futures fails, then the result of the composition that this Future is part of will fail as well, so Futures also have a nicer answer to "John" getting mad about not getting his package: we can fold them at the end to see what did happen:
def notify(orderId, notification) = {
db.findOrder (id).flatMap {order =>
Futures.sequence(
List (
db.updateOrder(order),
db.findAccount(order.accountId).flatMap {account=>
Futures.sequence(
List (
db.updateAccount(account),
email.send(account.email, notification)
)
)
})
)
)
}
}
}
val result = notify(...)
//... later:
if(result.isCompleted && result.value.get.isSuccess) smile() else goodLuckFixingIt()
// although the more appropriate is to do it without tying your code back to threads, this way:
notify(...).onComplete {t=>
if (t.isSuccess) smile()
else goodLuckFixingIt()
}
The Futures.sequence
will combine the two futures, from db.updateOrder
and db.findAccount
, so these two start in parallel, progress as they can and then we wait for their results. It is a little counterintuitive that you start operations in parallel with a call to sequence
though, isn't it?
Honestly, I don't even know if the brackets match - there's so many of them and of course I didn't bother actually compiling or running this. It should though, right?
map
and flatMap
them together and compose them with sequence
and others, the build a flow, an invisible one (one that you could probably even inspect if we hack the classes composing it), which keeps track of progress, success and failure.
The implicit thread management is of course a bonus... one drawback is that just like with callbacks, all the operations must stay in the Future monad and their signatures reflect that, by returning Futures everywhere.
But does it look like some kind of sequence / parallel structure workflow? Sure it is - here's an example of such, an asynchronous internal DSL I was playing with a while ago:
import razie.wfs._
val workflow = seq {
par {
seq {
println ("he he - definition time")
later { _ + "runtime-a" }
}
later { _ + "runtime-b" }
}
sort[String] (_ < _)
matchLater { case x : List[String] => x mkString "," }
}
So, using Futures is one step better than simple callbacks (in event loops or other asynchronous execution arrangement), as we can keep track of what is going on to some extent, while allowing work to proceed in parallel.
Read promises and futures for good detail on Futures in scala.
We did not even talk about error handling and such - that would turn this into an entire book, but we should get there too, at some point.
Was this all useful? ...
We looked at plain threads, callbacks and Futures, as glue for several processing steps that we need to do. Each have some advantages and disadvantages. In the next post, Reactive programming fabric - streams, actors and more, we will move on to something a little different: streams and actors and then try to wrap up and draw some conclusions.