Reactive programming and glue 1 - threads, callbacks and futures Subscribe Pub Share

Let's take a look at reactive programing concepts from a slightly different point of view: how do we glue reactive bits together? We'll contrast a few different approaches and see how they differ - hopefully, this will shine a light at a different angle on the subject, giving you more to think about before starting the next project.

A reactive program (or any program for that matter) is a set of bits of logic, or steps which need to be executed in a sequence, which is more or less deterministic, in order to achieve a goal. These steps themselves don't really change - they tend to be elementary things, all single-threaded local blocks of code or some remote service. It should be interesting to take a look at how we glue these together, how do we tie these steps together into a meaningful whole? What are the benefits of each approach?

It will be quite a long journey, as we'll look at plain old threads, callbacks, futures, ladders, streams, actors, data flows, JMS, workflows.

The examples will be mostly in Scala, with some Javascript sprinkled for good measure. We'll look at a simple use case where we have to update an order (to closed) and the respective account (let's say we shipped the items) and finally notify the user via email.

Order notifications

The problem at hand, the one we will use to see how each technique is applicable, is that of processing order notifications. A notification will:

  1. be related to an order, which we can load from a database
  2. update the order status
  3. find the related account information
  4. update the related account information as needed (amounts due etc)
  5. send an email notification

Each of these steps takes time and has to wait for different resources, so the idea would be to always try to not hold-up everyone while the different steps are processed.

Good old threads

Let's look first at the simplest case, using threads - basically running the steps in sequence, blocking one thread while waiting for the results of each. No error handling yet (or rather implicit via exceptions).

def notify(orderId, notification) = {
  val order = db.findOrder (id)
  db.updateOrder(order)
  val account = db.findAccount(order.accountId)
  db.updateAccount(account)
  email.send(account.email, notification);
}

The compiler builds a context, keeping the notification in scope for the last call. All finders are assumed synchronous and local. If they were not local, like an actual database call across a network, the thread would be suspended somewhere inside the database driver, waiting for the reply and then resumed (the entire context reinstated) when the result became available.

I just put that version up there for those of you not used to functional programming - here's a more functional version of it, where each finder returns an Option or a List and the compiler creates closures for us. It is quite similar in behaviour, except the notification becomes available as part of a closure.

def notify(orderId, notification) = {
  db.findOrder (id).flatMap {order =>
    db.updateOrder(order)
    db.findAccount(order.accountId).map {account=>
      db.updateAccount(account)
      email.send(account.email, notification);
    }
  }
}

This version is much more like the others we will see below and it's a good idea to make sure you understand how it works. The signatures are generally db.findOrder(id) : Option[Order] and Option has map and flatMap methods, which will basically be applied only if there was indeed an order, map(f : Order => X) and flatMap(f : Order => Option[X]).

The individual db and email operations will still occur in sequence, if the finders actually found the respective entities (order or account). The caller holds the entire thread, waiting for the last operation to be complete, so the notify call ends only at the end or it all, returning an Option indicating if everything was done. If at any point, any of the finders fails, it returns an empty Option right away.

Where is the glue? The glue that binds those individual processing steps is provided by the thread mechanism: the steps are executed automatically in sequence and tied together by the invisible thread, at runtime.

An exception would rollback the stack (if not the actual operations), while ThreadLocals filled with Transactions keep everything in check.

Many like to take potshots at the thread model nowadays, pointing out its drawbacks:

  • it blocks a thread for the duration of the entire notify (using too many threads consumes memory while using just a few threads would limit throughput)
  • the thread could possibly also hold some resources
  • it causes a thread switch every time you hand over waiting for a reply (performance can be negatively affected by switching too many threads - this is an expensive operation)
  • coupling and brittleness in operations, as the communication channel with the database must be up for the entire duration - although a smart driver design could take care of that

However,

  • it is logical to most people: actions follow in a sequence
  • there's ThreadLocal, enabling thread-local aids like Transactions or security contexts (this is a blessing in disguise, as if you're switching threads unwittingly in one of those closures, you're suddenly using someone else's Transaction and/or security context)
  • exceptions
  • thread dumps can tell you what's going on
  • easy to follow logs (by thread)
  • when you get to the last step, you know the previous steps were completed: you can't send the email if updateOrder threw an exception
  • blocking a thread is not normally an issue, unless the Thread also locks other resources (like database connections or sockets whatnot)
  • etc

In a sense, threads encapsulate the state of the continuation, at any point in the program.

Simple callbacks

The simplest way to glue together or bind asynchronous bits of a program, if you don't want to use threads, is using callbacks, something like this:

def notify(orderId, notification) = {
  db.findOrder (id) {order =>
    db.updateOrder(order) {
      db.findAccount(order.accountId) {account=>
        db.updateAccount(account) {
          email.send(account.email, notification);
        }
      }
    }
  }
}

Yes, yes, callback BLIP? Here it is, a little more terse, in Javascript - I am sure most of you recognize it and probably have done this many times:

function notify(orderId, notification) {
  db.findOrder (id, function(order) {
    db.updateOrder(order, function() {
      db.findAccount(order.accountId, function(account) {
        db.updateAccount(account) {
          email.send(account.email, notification);
        }
      })
    })
  })
}

The db.findOrder(id)(next : Order => X) gets the continuation, the next step packaged as a closure and will call it if and when the order is found, on a different thread.

You could certainly leave it up to the main thread to remember the continuation, in which case the db calls are synchronous and we are basically in the same situation as above. However, this type of programming is mostly used with asynchronous I/O libraries, where each db call is asynchronous and releases the calling thread. The typical application is in node.js event loops for instance.

Note that the main thread is free as soon as it started the first call... so... who's doing the actual work? In this type of libraries/frameworks, the asynchronicity is delegated to some other component, for instance, the DB driver or someone else. As their various steps are completed, these other components invoke the callbacks, which start the next step.

All of these process outside of a "main thread".

This and most of the reactive programming techniques we'll look at here, rely on closures - a functional programming construct, where the compiler will capture the entire context of the program at the respective time (well, not all, just the part that was visible to or in the scope of the function being created) and package that together with the function itself, the callback here

Let's take an x-ray of what actually happens here. The several steps of this process or flow are encapsulated in the several callbacks, which will call each-other, in sequence. Each callback is a closure, encapsulating the current state in the scope of this flow, carrying with it values like notification and objects like order and account.

Each is an asynchronous call to a db, with a callback to be called when the db returns a result - this will occur asynchronously, in the same or other threads, depending on the implementation of the db. The point however is that these steps will be executed in sequence.

Each db operation, including the first one, will release the calling thread, parking the callback somewhere in the db driver, waiting for a result, very much like a familiar $.ajax(...) would. This callback will be invoked when the result presents itself, in a different thread (a central event loop or some other type of processing arrangement).

So - the notify will return right away, having launched a db.findOrder in the background, which will call the first callback and so on.

It is important to decide if the result of updating the order was important. If it wasn't, if we didn't have to wait for it, we could do something like below, where some steps are launched without callbacks:

def notify(orderId, notification) = {
  db.findOrder (id) {order =>
    db.updateOrder(order) {} // start this in parallel
    db.findAccount(order.accountId) {account=>
      db.updateAccount(account) {}; // start this in parallel
      email.send(account.email, notification);
    }
  }
}

We just start that operation db.updateOrder, but it would have no continuation: we would not wait for its result... so when we sent the email we wouldn't know if it actually succeeded. Say it triggered the shipment - will "John" get mad to find out that his order never shipped, although we just emailed him to "stand by the door"?

So, this doesn't really work - we need each callback, to take action if the step was or not ok. The models above, remember, assume everything is successful and there is no error handling.

Where's the glue, eh? What glues these steps together?

The glue is provided by the implementation of each call, in the db driver: each step packages its continuation into a closure, which is passed with each request as the callback and the driver will call them at the end, when the results are available. We don't know what the driver does, likely uses maps of callbacks or some such.

The driver may call them directly using its own threads (likely a problem) or schedule them elsewhere (main thread pool).

The closures are very much part of the glue, since they capture all the state in the scope, at the time the callback was created.

Let's look at some differences from plain old threads:

  • we're no longer blocking a thread for the entire duration, as the initial thread was released right away, allowing the main program to handle more requests, potentially
  • no stack trace or thread id that one can follow in the logs
  • no "thread global" values like Transactions, although these could be captured in the closures, if in scope at all times.
  • potentially less thread switching, resulting in somewhat better performance in some cases (the synchronization required inside the NIO drivers may negate that under load).

It's important to note that nobody is maintaining the state of the computation: no more stack trace. The notify returns as soon as the first finder starts and during the email.send you have basically no idea who started this and why. A breakpoint will still be triggered properly in the right place, but that's because the IDE is nice to you and remembers the context of the closure.

In that sense, of capturing the enclosing scope and state, closures are somewhat similar to threads. Where closures capture just specific values and variables as such, the thread captures many specific things, including current stack and pointer to the current instruction etc.

See Closures and reactive programming and Experimenting with Closures in reactive programming for more thoughts and experiments on closures.

Futures

A modern encapsulation of concurrency and asynchronicity are promises and futures.

def notify(orderId, notification) = {
  db.findOrder (id).flatMap {order =>
    db.updateOrder(order).flatMap {
      db.findAccount(order.accountId).flatMap {account=>
        db.updateAccount(account).flatMap {
          email.send(account.email, notification);
        }
      }
    }
  }
}

All operations now return futures, and the db.findOrder(id) : Future[Order] looks almost the same as the option version we first saw... and it behaves similarly, except that it returns a Future rather than a result and this Future is also an implicit thread switch. The db driver spawns a request somewhere else and returns a Future to be completed when the response is available from the database. The difference between this and the callback is that the notify does get a Future object back right away and could later inquire as to the status of the entire thing, something like so:

val result = db.findOrder(id)
//... do something else
if(result.isCompleted) //...

Futures map and flatMap themselves into other Futures above, very much like the Options we looked at, creating essentially a pipeline of asynchronous transformations. The notify gets a handle to the result of all this, the last Future in the series.

When the last future is complete, the result becomes available, just in case anyone was waiting for it.

Note that in the example above, although multi-threaded, reactive and asynchronous, the steps still occur in sequence, one after the other.

The glue is provided by the Future mechanism. The difference from plain callbacks is that a "workflow" of connected Futures is created up-front, see this puppy for instance:

scala> import scala.concurrent.Future
scala> import scala.concurrent.ExecutionContext.Implicits.global

scala> val x = Future { Thread.sleep(3000); 4 }.map(_ + 4).map(_ + 4)
x: scala.concurrent.Future[Int] = List()

// wait 3 seconds

scala> x
res11: scala.concurrent.Future[Int] = Success(12)

You can see some result is available right away (as a composed Future) even though the sleep is already going on elsewhere. Here's how you can see that you get the last one in the chain of transformations: change the type at the end and the type of the resulting Future changes with it:

scala> val x = Future { Thread.sleep(3000); 4 }.map(_ + 4).map(_ + "4")
x: scala.concurrent.Future[String] = List()

However smart these Future are, they still won't glue together your logs in the different steps etc. One thing they can help with is to fail the result when one step fails, propagating much like an Exception in a thread: if one of the Futures fails, then the result of the composition that this Future is part of will fail as well, so Futures also have a nicer answer to "John" getting mad about not getting his package: we can fold them at the end to see what did happen:

def notify(orderId, notification) = {
  db.findOrder (id).flatMap {order =>
    Futures.sequence(
      List (
        db.updateOrder(order),
        db.findAccount(order.accountId).flatMap {account=>
          Futures.sequence(
            List (
              db.updateAccount(account),
              email.send(account.email, notification)
            )
          )
          })
          )
      )
    }
  }
}

val result = notify(...)
//... later:
if(result.isCompleted && result.value.get.isSuccess) smile() else goodLuckFixingIt()

// although the more appropriate is to do it without tying your code back to threads, this way:
notify(...).onComplete {t=>
  if (t.isSuccess) smile()
  else goodLuckFixingIt()
}

The Futures.sequence will combine the two futures, from db.updateOrder and db.findAccount, so these two start in parallel, progress as they can and then we wait for their results. It is a little counterintuitive that you start operations in parallel with a call to sequence though, isn't it?

Honestly, I don't even know if the brackets match - there's so many of them and of course I didn't bother actually compiling or running this. It should though, right?

The glue provided by the Futures is a good one. Behind the scenes, as you map and flatMap them together and compose them with sequence and others, the build a flow, an invisible one (one that you could probably even inspect if we hack the classes composing it), which keeps track of progress, success and failure.

The implicit thread management is of course a bonus... one drawback is that just like with callbacks, all the operations must stay in the Future monad and their signatures reflect that, by returning Futures everywhere.

But does it look like some kind of sequence / parallel structure workflow? Sure it is - here's an example of such, an asynchronous internal DSL I was playing with a while ago:

import razie.wfs._
val workflow = seq {    
  par {      
    seq {      
      println ("he he - definition time")
      later { _ + "runtime-a" }
      }
    later { _ + "runtime-b" }
    }
    sort[String] (_ < _)
    matchLater { case x : List[String] => x mkString "," }  
  }

So, using Futures is one step better than simple callbacks (in event loops or other asynchronous execution arrangement), as we can keep track of what is going on to some extent, while allowing work to proceed in parallel.

Read promises and futures for good detail on Futures in scala.

We did not even talk about error handling and such - that would turn this into an entire book, but we should get there too, at some point.


Was this all useful? ...


Other options that we will skip, at least for now, are delimited continuations and coroutines. Node.js has a very unique cooperative mechanism as well, given the way closures are scheduled (a single main event loop).

We looked at plain threads, callbacks and Futures, as glue for several processing steps that we need to do. Each have some advantages and disadvantages. In the next post, Reactive programming fabric - streams, actors and more, we will move on to something a little different: streams and actors and then try to wrap up and draw some conclusions.


Was this useful?    

By: Razie | 2017-05-03 .. 2019-10-31 | Tags: post , reactive , akka , streams


See more in: Cool Scala Subscribe

Viewed 1182 times ( | History | Print ) this page.

You need to log in to post a comment!