Monday, December 23, 2013

Akka Content Ingestion Pipeline, Part I


I just finished attending Coursera's Reactive Programming classes in "spectator" mode (just watched the videos). The course was conducted by Martin Odersky (creator of Scala), Eric Meijer (creator of LINQ, he teaches about Monads) and Roland Kunh (Akka Tech Lead). My main draw for the course was the coverage of Akka Actors, something I have been intending to learn for a while, although I learned a lot from the other lectures as well. I first came across Scala Actors 5 years ago, but I didn't pursue it, mainly because the parallelism the approach offered was limited to a single JVM (implying a single large machine rather than many small machines). At the time, the Akka project was just getting started.

Today, Akka allows you to deploy actors across multiple JVMs on multiple machines in a network, is available in Scala and Java (important for addressing maintainability concerns in Java-only shops like mine), and provides additional supporting infrastructure via the Typesafe stack. It has progressed to the extent that it is the preferred Actor implementation for Scala 2.10+. It has a vibrant community and (reasonably) good documentation, so its not too hard to get started using it.

The example I choose as a vehicle for learning Akka is based on the Nutch pipeline. Its a pipeline I am very familiar with, we run a variant of this at work for our own content ingestion. Nutch runs on Hadoop as a series of Map-Reduce batch jobs, first fetching the pages, then parsing out key-value pairs out of them, and finally sending the key-value pairs and content off to the indexer so it can be searched by clients. Additionally (with NutchGORA) the data is persisted into a NoSQL database during the fetch and parse steps, so it can potentially be used as a content service as well. The example is non-trivial, so I decided to build it in steps and describe the evolution of this system across multiple posts, rather than describing the whole thing in one giant post 4 weeks later. Makes it easier for both of us.

The diagram below shows the actors and the message flows in our example system. There are 2 message flows (indicated by blue and red text and arrows). This is a message-passing model and looks more like the example I used for comparing various actor implementations 5 years ago than the NutchGORA model, but the business process is the same.


The top-level actor in our system is the Controller. The controller is the actor that other actors or callers from the outside interact with. The controller spawns three router actors on startup - the fetchersRouter, parsersRouter and the indexersRouter, which in turn spawn a fixed number (based on configuration) worker actors. In addition, the Controller also starts up a Reaper actor and registers the routers with it. All these actor startup is indicated by dotted green lines. From a class structure point of view, this means that the Controller, routers and the Reaper can refer to each other using references (without having to look it up from the context). The worker actors communicate only with its parent routers.

Our first message flow is the Fetch message. A Fetch message includes the URL to fetch, the current fetch depth, and any metadata included with the URL. The fetch depth is important for web crawling, where a depth > 0 indicates that outlinks must be crawled. The metadata is important for situations where you are parsing feeds and you want to carry over the title and summary from the feed rather than (or in addition to) parsing it, or supply additional data such as file create/modify dates for when you are fetching files from the local filesystem. We describe the Fetch message flow below:

  1. Fetch message is sent to the controller.
  2. Controller forwards the message to the Fetcher Router.
  3. Fetcher Router forwards the message to one of the workers, using Round Robin routing policy.
  4. If the URL is eligible to be downloaded, the Fetcher Worker downloads the URL and writes the contents and metadata into the database. Once done, it sends a FetchComplete message to the Fetcher Router that includes the database ID of the inserted record.
  5. This results in the Controller receiving a FetchComplete message, to which it responds by sending a Parse message to the Parser Router.
  6. The Parser Router forwards the Parse message to one of its workers.
  7. The Parse Worker retrieves the contents of the file from the database using the ID, converts the file to text and parses relevant key-value pairs from it. It then writes these key-value pairs and the text content back to the database. Once done, it sends a ParseComplete message back to its router.
  8. If the depth > 0, the parsing process also involves parsing the content for embedded outbound links, which are enqueued as additional Fetch requests on the Fetch Router.
  9. This results in the Controller receiving a ParseComplete message, to which it responds by sending an Index message to the Index Router.
  10. The Index Router forwards the Index message to one of its workers.
  11. The Index Worker retrieves the key-value pairs from the database and publishes the record to a Solr index. Once done, it sends back an IndexComplete message to its parent.

The other message to handle is the Stop message, which allows actors to consume all messages that are enqueued currently, then shut them down. Here is how that works.

  1. Stop message is sent to the Controller.
  2. The Controller forwards the Stop Message to the Reaper. At startup, each of the routers were registered with the Reaper, so the Reaper adds their references to a List and begins monitoring them for Termination (DeathWatch).
  3. The Reaper sends a PoisonPill message wrapper in a Broadcast message to the FetchRouter. Using the Broadcast wrapper ensures that the router sends a PoisonPill to each of its workers, not just the next one. A PoisonPill is placed at the end of each Worker's queue. After this, no new messages can be placed on these queues. The Workers continue to process messages till their queue is drained and then terminates. When all workers are terminated, the router terminates.
  4. Because the Reaper is watching the FetchRouter, the Reaper gets a Terminated message from it, and reacts by removing the router's reference from its list. It then sends a PoisonPill Broadcast message to the next reference on its list, the ParserRouter.
  5. Like the FetcherRouter, the ParserRouter terminates its Workers and then itself.
  6. The Reaper gets a Terminated message from the ParserRouter, and removes it from its list, then sends a PoisonPill Broadcast to the IndexRouter.
  7. Like the FetcherRouter and the ParserRouter, the IndexRouter too terminates its workers and then itself.
  8. The Reaper gets a Terminated message from the IndexRouter, and removes it from its list.
  9. Because its ActorRef list is now empty, the Reaper shuts down the Controller. At this point, the system has no more actors, so it shuts down also.

In addition, our system also supports a Stats message (not shown in diagram) which returns the sizes of the three process "queues". This is done by incrementing and decrementing a set of counters each time we recieve a Fetch/Parse/Index and FetchComplete/ParseComplete/IndexComplete message at the Controller respectively.

We can also send Parse or Index messages directly to the controller. Haven't thought through this completely, but we could probably also supply metadata parameters to skip certain operations, thus providing more flexibility.

Here is the code to support this functionality. First we define our messages. All our messages except for the Stop have arguments, and are hence defined as Case classes. Stop is defined as a Case object. We extend a sealed trait marker interface to prevent outside code from adding new messages.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Source: src/main/scala/com/mycompany/delsym/actors/DelsymMessage.scala
package com.mycompany.delsym.actors

import akka.actor.ActorRef

sealed trait DelsymMessage

//////// messages sent from outside to controller /////////

case class Fetch(url: String, depth: Int, 
  metadata: Map[String,Any]) extends DelsymMessage
  
case class Stats(stats: Map[String,Int]) extends DelsymMessage

case object Stop extends DelsymMessage

case class Register(ref: ActorRef) extends DelsymMessage

////////// messages between supervisor and worker //////////

case class Parse(id: String) extends DelsymMessage

case class Index(id: String) extends DelsymMessage

case class FetchComplete(id: String) extends DelsymMessage

case class ParseComplete(id: String) extends DelsymMessage

case class IndexComplete(id: String) extends DelsymMessage

Our next class is the Controller. The Controller instantiates the Reaper and the three Routers, then registers the Routers with the Reaper. It also sets up the counters to support the Stats message.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
// Source: src/main/scala/com/mycompany/delsym/actors/Controller.scala
package com.mycompany.delsym.actors

import scala.concurrent.duration.DurationInt

import com.typesafe.config.ConfigFactory

import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.OneForOneStrategy
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.actor.actorRef2Scala
import akka.routing.RoundRobinRouter

class Controller extends Actor with ActorLogging {

  override val supervisorStrategy = OneForOneStrategy(
      maxNrOfRetries = 10,
      withinTimeRange = 1.minute) {
    case _: Exception => SupervisorStrategy.Restart
  }
  
  val reaper = context.actorOf(Props[Reaper], name="reaper")

  val config = ConfigFactory.load()
  val numFetchers = config.getInt("delsym.fetchers.numworkers")
  val numParsers = config.getInt("delsym.parsers.numworkers")
  val numIndexers = config.getInt("delsym.indexers.numworkers")
  val queueSizes = scala.collection.mutable.Map[String,Int]()
  
  val restartChild = OneForOneStrategy() {
    case e: Exception => SupervisorStrategy.Restart
  }
  val fetchers = context.actorOf(Props[FetchWorker]
    .withRouter(RoundRobinRouter(nrOfInstances=numFetchers, 
    supervisorStrategy=restartChild)), 
    name="fetchers")
  reaper ! Register(fetchers)
  queueSizes += (("fetchers", 0))
  
  val parsers = context.actorOf(Props[ParseWorker]
    .withRouter(RoundRobinRouter(nrOfInstances=numParsers, 
    supervisorStrategy=restartChild)), 
    name="parsers")
  reaper ! Register(parsers)
  queueSizes += (("parsers", 0))
  
  val indexers = context.actorOf(Props[IndexWorker]
    .withRouter(RoundRobinRouter(nrOfInstances=numIndexers,
    supervisorStrategy=restartChild)),
    name="indexers")
  reaper ! Register(indexers)
  queueSizes += (("indexers", 0))
    
  def receive = {
    case m: Fetch => {
      increment("fetchers")
      fetchers ! m
    }
    case m: FetchComplete => {
      decrement("fetchers")
      parsers ! Parse(m.id)
    }
    case m: Parse => {
      increment("parsers")
      parsers ! m
    }
    case m: ParseComplete => {
      decrement("parsers")
      outlinks(m.id).map(outlink => 
        fetchers ! Fetch(outlink._1, outlink._2, outlink._3))
      indexers ! Index(m.id)
    }
    case m: Index => {
      increment("indexers")
      indexers ! m
    }
    case m: IndexComplete => {
      decrement("indexers")
    }
    case m: Stats => sender ! queueSize()
    case Stop => reaper ! Stop
    case _ => log.info("Unknown message received.")
  }
  
  def queueSize(): Stats = Stats(queueSizes.toMap)
  
  def outlinks(id: String): 
      List[(String,Int,Map[String,Any])] = {
    log.info("TODO: Fetch outlinks for id:{}", id)
    List()
  }
  
  def increment(key: String): Unit = {
    queueSizes += ((key, queueSizes(key) + 1))
  }
  
  def decrement(key: String): Unit = {
    queueSizes += ((key, queueSizes(key) - 1))
  }
}

The Reaper implements the Akka DeathWatch pattern, listening for Termination messages sent by the routers. It implements the process of terminating each router sequentially on receipt of a Stop signal from the client (via the Controller).

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Source: src/main/scala/com/mycompany/delsym/actors/Reaper.scala
package com.mycompany.delsym.actors

import akka.actor.ActorLogging
import akka.actor.Actor
import java.util.concurrent.atomic.AtomicLong
import akka.actor.Terminated
import scala.collection.mutable.ArrayBuffer
import akka.actor.ActorRef
import akka.routing.Broadcast
import akka.actor.PoisonPill

class Reaper extends Actor with ActorLogging {

  val refs = ArrayBuffer[ActorRef]()
  
  def receive = {
    case Register(ref) => {
      context.watch(ref)
      refs += ref
    }
    case Stop => {
      refs.head ! Broadcast(PoisonPill)
    }
    case Terminated(ref) => {
      val tail = refs.tail
      if (tail.isEmpty) context.system.shutdown
      else {
        refs.clear
        refs ++= tail
        refs.head ! Broadcast(PoisonPill)
      }
    }
    case _ => log.info("Unknown message received.")
  }
}

The workers are just stubs at the moment and not that interesting. All they do is log a message saying that they fired their method, implying that the message was received and processed correctly. As an example, we show the FetcherWorker below. Other workers can be found on the GitHub for this project.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Source: src/main/scala/com/mycompany/delsym/actors/FetchWorker.scala
package com.mycompany.delsym.actors

import akka.actor.ActorLogging
import akka.actor.Actor
import com.typesafe.config.ConfigFactory

class FetchWorker extends Actor with ActorLogging {

  val conf = ConfigFactory.load()
  
  def receive = {
    case m: Fetch => {
      val id = fetchAndStore(m.url, m.depth, m.metadata)
      sender ! FetchComplete(id)
    }
    case _ => log.info("Unknown message.")
  }

  def fetchAndStore(url: String, depth: Int, 
      metadata: Map[String,Any]): String = {
    log.info("TODO: fetching URL:{}", url)
    url
  }
}

Akka uses TestKit and ScalaTest for testing. I am still learning these, and my ScalaTest/TestKit foo is not strong enough to write integration tests yet, so I just ran the code (using sbt run) to verify that the flow works. Heres the code for the Main method.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Source: src/main/scala/com/mycompany/delsym/actors/Main.scala
package com.mycompany.delsym.actors

import akka.actor.Props
import akka.actor.ActorSystem

object Main extends App {
  val system = ActorSystem("DelsymTest")
  val controller = system.actorOf(Props[Controller], "controller")
  
  (0 until 100).foreach(i => {
    if (i == 50) controller ! Stats(null)
    controller ! Fetch(i.toString, 0, Map())
  })
  controller ! Stop
}

In addition to the excellent Reactive Programming course I already cited above, I found the Akka documentation and the Akka in Action book invaluable for figuring out Akka and writing the code above.

Thats all I have for today. As you can see, Akka provides a lot of functionality, so the application code is relatively short and uncomplicated for the functionality it provides. The code for this post is also available on my GitHub repository for my Delsym project.

BTW, if you are curious about the project name, it comes from CONtent inGESTION - Delsym is an over the counter Cough medicine, and so makes CON(tent) (in)GESTION GO away FASTER. Yes, I know, a bit far fetched, but its my project and I am sticking to the name :-).

Be the first to comment. Comments are moderated to prevent spam.