Enable Akka cluster pub-sub for updates in a Play cluster Subscribe Pub

Let's look at coordinating a simple Play cluster with akka pub/sub messaging. We'll use it to keep a cold backup in sync (or a hot backup / peer failover node, why not). We will basically propagate any relevant update events across a cluster and let each node sort out what they have to do to update their eventual caches in response to those events: reload configuration, logout users etc.

Pub/sub is perfect for that, as it insulates us from knowing the actual cluster nodes (the pub/sub takes care of discovery etc) and also abstracts communication into "topics".

Why?

In the journey to hosting our proof-of-concept pet projects on their own Play backend, the first stop is to insulate it from the mean wide web and we do that with an Apache or nginx or some such reverse proxy, iptables firewalling etc. If you'd like me to get into the details of that, leave a comment below.

The next stop would be to have at least two servers, for transparent upgrades and failover. A pet project that upsets humans wouldn't make much sense, would it?

The crux of setting up the Apache proxy with a failover two node solution, is covered here.

But... beyond that... although the Play Framework model is stateless, we eventually end up caching some shared resources in the play server... like let's say something as simple as a configuration when serving multiple websites.

Now we have the big bad problem of synchronizing these shared and cached entities between two or more servers.

Note first of all that these entities we're talking about are not critical: it is not that important to keep them in sync. The side-effects of them running out of sync for a short period will not impact the behavior of the system meaningfully... some examples would be:

  • static configuration (like multi-hosting, redirections etc)
  • dynamic configuration affecting the system's behavior: new products added to a catalog etc, in case the catalog was cached
  • some rules about entities, a dynamic domain model
  • user permissions or preferences etc
  • etc

If it was important to keep this in sync and/or solve massive cluster issues like split brains whatnot, we'd have to look at other mechanisms, more complicated. Certainly, Apache Zookeeper or Netflix Eureka come to mind, for distributed configuration and service discovery, but how many technologies do we really need to just beef up our simple Proof of Concept?

Let's start simple and look at using simple Akka pub/sub messaging instead. It is already a part of play, so we'd not have to manage separate processes / bigger clusters and different technologies, so you see the attraction.

Setting up the akka cluster

Play uses akka, so most of what you need is already there, we just have to enable clustering, so first make sure to add these dependencies to your project:

"com.typesafe.akka"  %% "akka-cluster"       % "2.4.2",
"com.typesafe.akka"  %% "akka-contrib"       % "2.4.2",
"com.typesafe.akka"  %% "akka-slf4j"         % "2.4.2",

Then in your application.conf we will configure the clustering for akka:

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = on
    netty.tcp {
      hostname = "127.0.0.1"
      port = 9002
    }
  }

  cluster {
    seed-nodes = [
      # must use the application
      "akka.tcp://application@127.0.0.1:9002",
      "akka.tcp://application@127.0.0.1:9005"]

    auto-down-unreachable-after = 10s
  }

  akka.extensions = ["akka.cluster.pubsub.DistributedPubSub"]
}

Pay attention to the name of the akka system, use "application" as in akka.tcp://application@127.0.0.1:9002. That is the name used by play for the default system - it makes it easier.

Now - to test the cluster, we'll need to start two nodes, so here's the second conf file, application2.conf

### run the second server with
#  p -Dconfig.resource=conf/application2.conf -Dhttp.port=9001 ~run

play.server.http.port = 9001
http.port=9001

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = on
    netty.tcp {
      hostname = "127.0.0.1"
      port = 9005
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://application@127.0.0.1:9002",
      "akka.tcp://application@127.0.0.1:9005"]

    auto-down-unreachable-after = 10s
  }

  akka.extensions = ["akka.cluster.pubsub.DistributedPubSub"]
}

The command line to start the second server is (p is my shortcut for activator):

p -Dconfig.resource=conf/application2.conf -Dhttp.port=9001 ~run.

Using Akka pub/sub

Ok... we have an akka cluster enabled, with two nodes... now what?

We use them. In my application, I have a central stream of events of all kinds and different components listen to events of interest to them. I will post details on this later, suffice it to say that we'll add a sink for events, which will use Akka pub/sub and broadcast some of the events in the cluster:

/** main event dispatcher implementation */
@Singleton
class RkCqrs extends Services.EventProcessor {
  lazy val auditor = Akka.system.actorOf(Props[WikiAuditor], name = "WikiAuditor")

  def !(a: Any) {
    auditor ! a
  }
}

First, we'll need a wrapper for events, indicating a broadcasting request:

/** a request to broadcast another event */
case class BCast(ev: WikiEventBase)

Then, we'll look at the main event sink. It's used to process audit and update events in a separate thread and speed up initial response and other update processing - here's the section that decides what event types to broadcast:

/** async wiki observers
  *
  * it will also pub/sub in cluster, specific events
  */
class WikiAuditor extends Actor {

  lazy val pubSub = Akka.system.actorOf(Props[WikiPubSub], name = "WikiPubSub")

  def nodeName = Some(Config.node)

  def receive = {
    case wc: WikiConfigChanged => {
      val ev = new WikiConfigChanged(nodeName.mkString)
      WikiObservers.after(ev)   // process it locally and update the configuration

      pubSub ! BCast(ev)    // pub/sub in cluster
    }

  // ...
  }
}

As you see, it will stamp events with the node name and then process them locally and then send them to the pubSub actor:

/** pub/sub events to/from cluster members
  *
  * this is needed to update local caches on remote changes
  *
  */
class WikiPubSub extends Actor {
  val TOPIC = "WikiEvents"

  import akka.cluster.pubsub.DistributedPubSub
  import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe}

  val mediator = DistributedPubSub(context.system).mediator

  mediator ! Subscribe(TOPIC, self)

  def receive = {
    // to broadcast
    case pub@BCast(ev) => {
      clog << "CLUSTER_BRUTE_BCAST " + ev.toString
      mediator ! Publish(TOPIC, ev)
    }

    // actual work
    case ev1: WikiEventBase if (sender.compareTo(self) != 0) => {
      clog << "CLUSTER_BRUTE_RECEIVED " + ev1.toString
        WikiObservers.after(ev1)
    }
    case x@_ => cerr << "what? " + x.getClass.getName)
  }
}

Note that it will only process events where itself is not the sender: sender.compareTo(self) != 0as these were already processed. I guess we could simplify that bit as well later... that will also take care of the race condition on WikiObservers.after.

How would you address that?

Some issues

Well, first time I fired up an early version, it filled up my disk bouncing the same message between the two nodes so fast, until some akka deadLetters file filled up my hard drive... keep that in mind as a possibility.

Running it in production

To run it in production, on a real multi-node cluster, you'd need to change the configuration with the proper node names and open the firewalls between the nodes for the new ports for akka (9002 and 9005 in the example).

Extending it

This very simple mechanism can scale easily as your application grows, to allow distributing some processing onto other nodes and speed up the original response time.

Here's a quick example of tying this into a cache of say Users - while Mongo is fast, there really is no point loading user details couple times a click... however, if the user somehow ends up changing the username on the other node... ehh:

// authenticate will lookup in DB and cache
def auth (uid : String) = 
  Cache.getAs[User](uid + ".connected").map(u=> Some(u)).getOrElse {
    findUser(uid).map { u =>
      Cache.set(u.uid + ".connected", u, 300)
      u
     }
 } 

// listener to flush cache on any changes
WikiObservers mini {
  case WikiEvent("AUTH_CLEAN", "User", id, _, _, _, _) => {
    Cache.remove(u.uid + ".connected")
    }
}

// update profile and fire off a changed event
def newPassword (uid : String, newpwd:String) = 
    findUser(uid).map { u =>
      u.copy(pwd=pwd).update
      RqCqrs ! WikiEvent ("AUTH_CLEAN", "User", uid)
     }   

Akka is a very efficient actor/messaging system.

Here's more reading on akka clustering and pub/sub:

Further topics to look into:


Was this useful?    

By: Razie | 2016-09-22 .. 2016-09-24 | Tags: post , playframework , akka , actors , cluster , reactive


See more in: Cool Scala Subscribe

Viewed 3439 times ( | History | Print ) this page.

You need to log in to post a comment!