Wednesday, January 01, 2014

Akka Content Ingestion Pipeline, Part IV


Happy New Year! Over the last three posts, I've described an Akka based Content Ingestion Pipeline modelled after the NutchGORA pipeline. This pipeline was my vehicle for learning Akka, something I've been planning to do for a while. In this post (the final installment on this series, at least so far), I explore how to distribute this application horizontally across multiple machines (scale out).

Akka is distributed by design. The code I've built so far can be regarded as a single server version of a distributed system. According to the Akka remoting documentation (emphasis mine):

Everything in Akka is designed to work in a distributed setting: all interactions of actors use purely message passing and everything is asynchronous. This effort has been undertaken to ensure that all functions are available equally when running within a single JVM or on a cluster of hundreds of machines. The key for enabling this is to go from remote to local by way of optimization instead of trying to go from local to remote by way of generalization.

Akka can work with remote Actors in two ways, either by looking them up in a remote ActorSystem, or by creating them in the remote ActorSystem. I use the latter approach. The components that do the heavy lifting in the pipeline are the workers, and scaling out to handle more incoming requests would imply increasing the number of workers or making them faster, both of which can be done by giving them their own dedicated hardware.

The architecture diagram has been updated with the distribution boundaries, they are indicated by the gray boxes below. The master node is the large gray box on the top, and contains the REST Interface, the Controller and the Router actors. The worker nodes (can be an array of nodes for each worker class) are the ones that wrap the Fetch, Parse and Index worker arrays.


Each of these nodes are wrapped in an Akka ActorSystem, which can be accessed by an URI from other ActorSystems. So in addition to the HTTP interface that the master node exposes to the outside world, it also exposes a host:port and has a name that other Akka ActorSystems can use to communicate with it.

For testing, I configured the pipeline with just 2 ActorSystems - the master node listening on localhost:2552 and identified by URI akka.tcp://DelSym@localhost:2552, and one remote node listening on localhost:2553 and identified by URI akka.tcp://remote@localhost:2553. Here is some code to create a named (name supplied from command line) remote Akka ActorSystem using configuration parameters in the remote.conf file.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Source: src/main/scala/com/mycompany/delsym/remote/RemoteAkka.scala
package com.mycompany.delsym.remote

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

object RemoteAkka extends App {

  val name = if (args.isEmpty) "remote" else args(0)
  
  val conf = ConfigFactory.load("remote")
  val host = conf.getString("akka.remote.netty.tcp.hostname")
  val port = conf.getInt("akka.remote.netty.tcp.port")
  
  val system = ActorSystem(name, conf)
  Console.println("Remote system [%s] listening on %s:%d"
    .format(name, host, port))
  
  sys.addShutdownHook {
    Console.println("Shutting down Remote Akka")
    system.shutdown
  }
}

The remote.conf file looks like this. This is meant to be used in order to start up ActorSystems on multiple nodes in a network.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Source: src/main/resources/remote.conf
akka {
  log-dead-letters-during-shutdown = off
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2553
    }
  }
}

I then added a property in application.conf to specify a list of ActorSystem URIs for the routers. The routers are Round Robin routers, so giving them a list of ActorSystem URIs will cause them to cycle through the URIs, creating remote Actors and distributing evenly across multiple remote ActorSystems. The Controller Actor code (which instantiates the routers) has been modified to create local workers if the node list is empty and remote workers otherwise. The updated code for the Controller is shown below:

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Source: src/main/scala/com/mycompany/delsym/actors/Controller.scala
package com.mycompany.delsym.actors

import scala.collection.JavaConversions.asScalaBuffer
import scala.concurrent.duration.DurationInt

import com.mycompany.delsym.daos.HtmlOutlinkFinder
import com.mycompany.delsym.daos.MockOutlinkFinder
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigList

import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.AddressFromURIString
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.actor.actorRef2Scala
import akka.remote.routing.RemoteRouterConfig
import akka.routing.RoundRobinRouter
import akka.routing.RouterConfig

class Controller extends Actor with ActorLogging {

  override val supervisorStrategy = OneForOneStrategy(
      maxNrOfRetries = 10,
      withinTimeRange = 1.minute) {
    case _: Exception => SupervisorStrategy.Restart
  }
  
  val reaper = context.actorOf(Props[Reaper], name="reaper")

  val conf = ConfigFactory.load()
  val numFetchers = conf.getInt("delsym.fetchers.numworkers")
  val fetchNodes = conf.getList("delsym.fetchers.nodes")
  
  val numParsers = conf.getInt("delsym.parsers.numworkers")
  val parseNodes = conf.getList("delsym.parsers.nodes")
  
  val numIndexers = conf.getInt("delsym.indexers.numworkers")
  val indexNodes = conf.getList("delsym.indexers.nodes")
  
  val testUser = conf.getBoolean("delsym.testuser")
  val outlinkFinder = if (testUser) new MockOutlinkFinder()
                      else new HtmlOutlinkFinder()
  
  val queueSizes = scala.collection.mutable.Map[String,Long]()
  
  val fetchers = context.actorOf(Props[FetchWorker]
    .withRouter(buildRouter(numFetchers, fetchNodes)), 
    name="fetchers")
  reaper ! Register(fetchers)
  queueSizes += (("fetchers", 0L))

  val parsers = context.actorOf(Props[ParseWorker]
    .withRouter(buildRouter(numParsers, parseNodes)), 
    name="parsers")
  reaper ! Register(parsers)
  queueSizes += (("parsers", 0L))
  
  val indexers = context.actorOf(Props[IndexWorker]
    .withRouter(buildRouter(numIndexers, indexNodes)),
    name="indexers")
  reaper ! Register(indexers)
  queueSizes += (("indexers", 0L))

  def receive = {
    case m: Fetch => {
      increment("fetchers")
      fetchers ! m
    }
    case m: FetchComplete => {
      decrement("fetchers")
      if (m.fwd) parsers ! Parse(m.url)
    }
    case m: Parse => {
      increment("parsers")
      parsers ! m
    }
    case m: ParseComplete => {
      decrement("parsers")
      outlinks(m.url).map(outlink => 
        fetchers ! Fetch(outlink._1, outlink._2, outlink._3))
      if (m.fwd) indexers ! Index(m.url)
    }
    case m: Index => {
      increment("indexers")
      indexers ! m
    }
    case m: IndexComplete => {
      decrement("indexers")
    }
    case m: Stats => {
      sender ! queueSize()
    }
    case m: Stop => {
      reaper ! Stop(0)
    }
    case _ => log.info("Unknown message received.")
  }
  
  def buildRouter(n: Int, nodes: ConfigList): RouterConfig = {
    if (nodes.isEmpty) RoundRobinRouter(n)
    else {
      val addrs = nodes.unwrapped()
        .map(node => node.asInstanceOf[String])
        .map(node => AddressFromURIString(node))
        .toSeq
      RemoteRouterConfig(RoundRobinRouter(n), addrs)
    }
  }
  
  def queueSize(): Stats = Stats(queueSizes.toMap)
  
  def outlinks(url: String): 
      List[(String,Int,Map[String,String])] = {
    outlinkFinder.findOutlinks(url) match {
      case Right(triples) => triples
      case Left(f) => List.empty
    }
  }
  
  def increment(key: String): Unit = {
    queueSizes += ((key, queueSizes(key) + 1))
  }
  
  def decrement(key: String): Unit = {
    queueSizes += ((key, queueSizes(key) - 1))
  }
}

The documentation indicates that a better approach would be to declare the routers in configuration, so the local configuration would be different from the distributed configuration. I did not do this because my test case refers to the routers as /controller/* but the actual code refers to it as /api/controller/* (I should probably change the test code but I was too lazy). But in any case, changing from a local to a remote router configuration is simply a matter of wrapping the Router Configuration with a RemoteRouterConfig (buildRouter function in the code above), so this approach works fine also.

Going from local to remote also requires you to think about serialization. I have chosen to use Java serialization, and I have configured Akka (via the application.conf file) to automatically use Java serialization for my messages. In addition, the distributed version of the master ActorSystem also exposes its own address in the configuration and sets the provider to a remote ActorRef provider. The other important difference is the non empty nodes list value under the delsym namespace for each of the fetcher, parser and indexer. The remote configuration is shown below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// Source: src/main/resources/application.conf.remote
akka {
  loglevel = INFO
  stdout-loglevel = INFO
  akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
  log-dead-letters-during-shutdown = off
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
        port = 2552
      }
    }
    serializers {
      java = "akka.serialization.JavaSerializer"
    }
    serialization-bindings {
      "com.mycompany.delsym.actors.DelsymMessage" = java
    }
  }
}

spray {
  can {
    server {
      server-header = "DelSym REST API"
    }
  }
}

delsym {
  testuser = true
  fetchers {
    numworkers = 5
    refreshIntervalDays = 30
    numRetries = 3
    nodes = ["akka.tcp://remote@127.0.0.1:2553"]
  }
  parsers {
    numworkers = 5
    nodes = ["akka.tcp://remote@127.0.0.1:2553"]
  }
  indexers {
    numworkers = 5
    nodes = ["akka.tcp://remote@127.0.0.1:2553"]
  }
  mongodb {
    host = "127.0.0.1"
    port = 27017
    dbname = "delsymdb"
    collname = "documents"
  }
  solr {
    server = "http://127.0.0.1:8983/solr/collection1/"
    dbfieldnames = "_id,url,p_title,p_author,textContent"
    solrfieldnames = "id,url,title,author,text"
    commitInterval = 10
  }
  rest {
    host = "127.0.0.1"
    port = 8080
    timeout = 1
  }
}

There are now 3 versions of application.conf in the Delsym repo on GitHub. You will have to link to the correct one depending on whether you want to run the mock tests, run in local (all actors on single ActorSystem) or remote (multiple ActorSystems) mode.

The effort to build the code for this part of the pipeline was mostly conceptual, ie, understanding how the different components fit together. I found the following Akka reference pages very useful. The pages are all for Akka version 2.2.3 (latest stable version) that I used for this work - the default pages that show up (in response to a Google search for example) are for version 2.3 which is still in development. The 2.3 code is different enough for this detail to be annoying, so mentioning it here.


In addition, I also found the akka-sample-remote-scala useful, although the pattern shown there is slightly different from what I used. Another useful source was the Remoting chapter from the Akka Essentials book.

I was able to run the ActorFlowTest unit test with Mock workers (minus the asserts, since the workers update counters on the remote ActorSystem which I no longer have control over) and verify from the logs that the fetching, parsing and indexing happen on my remote ActorSystem at localhost:2553. The code also exits cleanly which means Deathwatch works fine with remote workers. However, I saw lots of messages sent to the dead-letter mailbox which I haven't been able to figure out yet (they are not application messages) - I will post an update here (and bugfix to the DelSym GitHub repo once I do.

Be the first to comment. Comments are moderated to prevent spam.