Reactive programming fabric - streams, actors and more Subscribe Pub Share

This is part 2 of our look at reactive programming elements - we started by contrasting callbacks and Futures with plain old threads, in Reactive programming and glue 1 - threads, callbacks and futures.

We'll move on to streams, actors and, if it doesn't get too long, introduce workflows. So... instead of a local "glue" to connect together the bits of logic (closures and threads and futures), we'll now move on to look at support for a "fabric" of connecting not only reactive but also distributed logic.

Recap

To recap quickly, we're looking at processing 5 steps in sequence, related to order notifications. The interesting part is to look at different ways to process these asynchronously and how do we glue together these asynchronous steps. In the first part we looked at simple threads, callbacks a la Node.js and futures.

Streams

A more recent evolution of producer-consumer protocols, streams, allows a different way to glue these steps. With streams, it makes sense to be transitioning from a simple method notify to a stream of notification requests, notificationStream : Stream[(String,String)]:

  notificationStream.flatMap { tuple=>
    db.findOrder(tuple._1).map(o => (o, tuple._2))
  }.flatMap { tuple=>
    db.updateOrder(tuple._1)
    db.findAccount(tuple._1.accountId).map(o => (o, tuple._2))
  }.flatMap { tuple=>
    db.updateAccount(tuple._1)
    email.send(tuple._1.email, tuple._2);
  }
}

The full gist is here.

Since we carry a few objects around, we use tuples (initially we had calls like notify (orderId, activity) so we'll represent it as a Tuple of two elements, here: that's what's going down the streams.

The individual steps are now either simple transformations A => B or they return more streams db.findOrder(id) : Stream[Order]. This will help us transform the stream of notification requests, in a set of steps, into a stream of email requests. I did not compile or run this one either, but it looks alright.

What's the glue, then? The stream processing middleware will take care of processing these steps asynchronously and sometimes optimize processing with advanced features like backpressure and operator fusion.

However, the added advantages of middleware processing does not make these steps any more decoupled: just like in the simple threaded example, this node has to stay up, for the entire stream transformation to work, so in that sense, this is not quite a fabric just yet - it's still just glue.

See The ask pattern is evil++!

There is however a big difference from the examples we had so far, with Futures and Options: there is no global scope anymore! Note the complication arising from having to carry the notification around, in that tuple. I'm sure there's better options, perhaps like splitting the first stream into two, do the transformations on the first and zipping them back before the end... but that would introduce too many stream-processing artifacts here... which would be fair, if we wanted to use the stream infrastructure for this job.

If we didn't have to carry the notification around, this would actually look a lot less menacing (I also let the updates go in parallel, to simplify it):

notificationStream.flatMap{ notification =>
  db.findOrder (notification.orderId).flatMap { order =>
    db.updateOrder(order)
    db.findAccount(order.accountId).flatMap { account =>
      db.updateAccount(account)
      email.send(account.email, notification)
    }
  }
}

This glue offers a lot more logic and value than the others, because the underlying streaming framework will take care of communication and reconnection retries, thread and socket management etc.

Note however that we now introduced a coupling via the closures which maintain state - the first streams example could potentially recover from a node failure between some of the temporary streams there, with a sufficiently robust stream infrastructure. Now we can no longer recover from a failure in the middle, since the closures capture state maintained on this node in memory.

Coupling and decoupling

While callbacks, streams, futures and even threads are simple mechanisms to orchestrate asynchronous behaviour, they introduce coupling: the caller needs to be up! The state of the continuation is in memory of one single node only. It cannot generally be backed up or load-balanced across a network, in the middle: if this node went down, the entire flow is gone and we'd rely on the initial caller to deal with it.

There are languages like scala, which allow serializing continuations, which we used in projects like Swarm to move the computation closer to the data, but in reality, here we'd want to use an explicit decoupling mechanism.

So... how do we glue those 3 steps, in a way that allows more distribution and even persistency? With better decoupling!

Actors and messages

While the usual decoupling mechanism at the system level is the messaging middleware and processes, a very effective construct for asynchronous decoupling within a program are actors: an actor is an object with state, and a mailbox full of messages, which are processed one at a time.

While those of you experienced in concurrent programming may think of it as nothing but a fancy monitor, it has the added advantage of decoupling.

Here's one way to implement the sequence with actors, using an akka actor (skipping all the irrelevant fluff around types and such - see compile-able code here):

class MyActor(orderId, notification) extends Actor {
  def receive = {
    case MsgNotify => db ! MsgFindOrder(orderId)
    case MsgOrderFound(order) => db ! MsgFindAccount(order.accountId)
    case MsgAccountFound(order) => email ! MsgSend(account.email, notification)
  }
}

def notify(orderId, notification) = {
   new MyActor(orderId, notification) ! new MsgNotify
}

A little more verbose, but notice that in this implementation, the actor maintains the state of this computation (to send a new notification, we would create and start a new actor). We could also design it so that the notification is copied in each message and then we can use a stateless actor. As you can see below, the db actor now needs to know about notifications, so that was a bad idea:

class MyActor2 extends Actor {
  def receive = {
    case MsgNotify(orderId, notification) => db ! MsgFindOrder(orderId, notification)
    case MsgOrderFound(order, notirication) => db ! MsgFindAccount(order.accountId, notification)
    case MsgAccountFound(order, notification) => email ! MsgSend(account.email, notification)
  }
}

This glue is a little more interesting and I want to discuss it for a minute.

We had to breakup the flow in explicit messages and assume that the db actor will respond with Found messages etc. The state of the entire flow is now encapsulated in the actor object and the messages that are now flying around the system.

The messages decouple the processing steps much more than closures or threads did - since the entire state of this (and next steps) is now in the messages and possibly some in the actor itself.

You can see how now we're really moving off a simplistic concept of glue to a (potentially distributed) fabric.

However, there are some benefits over the simple callback model:

  • if the node crashes, the state of the actor can be backed up and then recovered on restart, to the closest approximation
  • also, if the node crashes while processing a message, the system will retry the message on restart (assuming we persisted them)

This is a very important difference from all the previous models, which relied on a state maintained on the original node (the thread or the closures), which have to be available when the replies get there.

Also, the db actor can now be distributed: the two actors are decoupled. Akka can easily be configured to run the db actor on a separate node and it will automatically route the messages to and from there.

It can also be fairly easy to configure more processing power, as well as replication or load balancing etc, at the level of actors, so the db function is completely independently scaled from the email function etc.

Note that the operational and debugging issues are still there:

  • logging is not correlated, messages from different order IDs will interleave their logs

There is also a streams implementation using Akka, see [ http://doc.akka.io/docs/akka/current/scala/stream/index.html akka streams], very interesting to look at.

Now we're cooking!

Of course, there is some extra overhead from storing all those messages, but the glue usually has a very insignificant cost compared to the actual steps that we're trying to glue together. The actor subsystem is also very, very fast.

Of course, persisting the actor state and the messages would add quite a bit more overhead, but... there's no free lunch!

Workflows and rules engines

Workflow engines are typically used to glue asynchronous activities, at the macro level, coordinating and orchestrating worthy services.

Their benefits are quite similar to actors, plus:

  • graphical process designer
  • embedded error handling
  • etc

A good workflow library can also help with logging, as you can include the workflow ID in the logs, so you can easily find them in sequence. One could also add useful debugging features, such as visualizing the progress of a workflow based on logs and archived state as well as breakpoints, replaying troublesome workflows etc.

The workflow support would be fairly similar to the actor example, except the rules describing the message flow could be configuration, like:

$when this.MsgNotify(orderId, notif) => db.MsgFindOrder(orderId, notif)
$when this.MsgOrderFound(order, notif) => db.MsgFindAccount(order.accountId, notif)
$when this.MsgAccountFound(order, notif) => email.MsgSend(account.email, notif)

This is a real working project, you can play with it at razie/diesel.

So... glue vs fabric

We've seen that using simple glue, like threads, closures, futures, streams etc is great for simple bits of logic that are tightly coupled, i.e. if the respective node dies, these are either pretty fast or their effects not relevant: the boundary would make a decision to either retry and re-do the entire block or just drop it.

On the other hand, fabrics like actors (used in a way that allows persisting both messages and actor's state) or workflows, allow for putting together units of work that can be recovered at a more granular level.

There is some overhead involved, as in persisting the messages (or just distributing the messages and voting, in a pure "let it fail" model), but if the cost of the work is high enough (or the cost of re-doing the work is high enough) then it's totally worth it.

To be fair, the example we used was more of an orchestration scenario, which tend themselves naturally to the "fabric" model. If some simple processing was involved, with no side-effects, then the "glue" models would be the way to go, for sure.


Was this all useful? ...


So, sync vs async - the big discussion

It's time we take a breather and consider the bigger picture.

The big advantage of going asynchronous is in decoupling and gaining resilience: the communication channels and even the other systems don't have to be up at all times: you can survive short periods of failures or lack of connectivity, as the NIO drivers can reconnect and retransmit either their requests or their replies (in the case of idempotent operations, which is in itself a very interesting discussion).

In fact, Netflix confirmed this, as they recently spent considerable resources re-writing some of their core mediation components from synchronous and threaded to asynchronous NIO (using Netty) and were largely underwhelmed by the benefits observed.

A lot of people will see numbers like 10-25% performance improvement and think it to be a huge win. When factoring in the time and energy to realize the win, plus the other challenges in operations and debugging that async nio systems introduce, the performance considerations are not enough to justify the work. That said, the other benefits, such as connection management and push notifications, as well as possible resiliency gains, are much more important to us.

Even the resiliency improvements, didn't just come for free... it is apparent that the original developers were not sub-par and had written decent code.

We also expected to see some resiliency improvement. As I mentioned in the blog, I do think we will see this materialize, but it is not for free. We are actively looking into things like reducing instantiation and logging of exceptions when errors happen, changing throttling mechanisms, and tweaking of connection reuse and load balancing algorithms in order to realize more resiliency improvements.

Note that even when you go "asynchronous", you're still accumulating state, in the form of closures waiting to be called, so some form of back pressure is required (in a threaded implementation you could limit the number of threads or open client sockets and it would become an implicit form of back-pressure, one that also locks the client threads though, cascading slowness throughout the cluster in the form of non-responsiveness, leading to catastrophic failures, giving a different meaning to the expression "shit hit the fan"). This needs to be mitigated by configuring different ports with different thread pools and using async techniques inside, but I digress.

The biggest gains come from the ability to support many connections, websockets etc. This is where play with akka for instance also shine, as they decouple processing the requests from the connection and listeners.


Was this all useful? ...


With threads and closures and any time we create state on the caller, we introduce coupling, in the sense that the caller better be up when the reply comes... and in that sense, we talk about glue.

The other style is to use messaging and encapsulate state between steps in persiste-ables like actors and workflows and we can then talk about fabric.


Was this useful?    

By: Razie | 2017-05-03 .. 2019-10-31 | Tags: post , reactive , akka , streams , actors


See more in: Cool Scala Subscribe

Viewed 1238 times ( | History | Print ) this page.

You need to log in to post a comment!