Saturday, November 10, 2012

Indexing into ElasticSearch with Akka and Scala


I just completed the Functional Programming Principles with Scala course on Coursera, taught by Dr Martin Odersky, the creator of Scala. Lately I've been trying to use Scala (instead of Java) for my personal projects, so I jumped at the opportunity to learn the language from its creator. While I did learn things about Scala I didn't know before, the greatest gain for me was what I learned about Functional Programming principles. Before the course, I had thought of Scala as a better, leaner Java, but after the course, I am beginning to appreciate also the emphasis on immutability and recursive constructs that one can see in Scala code examples on the Internet.

Some time after I finished the course, I heard about the Typesafe Developer Contest. Developers are invited to submit applications that use one or more of the components in the Typesafe Stack (Scala, Akka and Play). While I have no illusion of winning any prizes here, I figured it would be a good opportunity for to practice what I had learnt, and to also expand my knowledge of Scala to include two of its most popular libraries.

The application I came up with was a Scala front end to ElasticSearch. ElasticSearch is a distributed, RESTful search engine built on top of Apache Lucene, and has been on my list of things to check out for a while now. Communication with ElasticSearch is via its very comprehensive JSON over HTTP REST interface.

This post will describe the indexing portion of the application. Indexing by nature is embarassingly parallel, so its a perfect candidate for an Akka Actor based concurrency. The system consists of a single Master actor which spawns a fixed number of Worker actors and a Reaper actor which shuts down the system once all documents are processed.

Data is supplied to the indexing system as a arbitarily nested tree of files on the local filesystem. For each dataset, one must provide a parser to convert the file into a Map of name value pairs, a FileFilter that decides which of the files to pick for indexing, and the schema to use for the dataset. I used the Enron Email Dataset for my development and testing.

The Master Actor is responsible for crawling the local filesystem and distributing the list of files among the Worker Actors. Each Worker Actor processes one file at a time and POSTs the results as a JSON string to ElasticSearch, then sends back the result to the Master, which updates its count of successes and failures. Once the number of successes and failures equal the number of original files sent for processing to the workers, the Master sends a signal to the Reaper and shuts itself down. The Reaper then sends a signal to shut down the entire system. The structure is based heavily on the Akka tutorial and the Shutdown Patterns in Akka2 blog post.

The diagram below shows the different actors and the message sequence. Actors are represented by ovals and the solid colored lines represent the messages (and their sequence) being passed to them. The dotted green lines with the diamond heads show how the components are instantiated.


Additionally, I use Play's WebServices API to do the HTTP POST and PUT requests to ElasticSearch, and Play's JSON API to parse and create JSON requests out of native Scala data structures. Here is the code for the Actor system.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package esc.index

import java.io.{FileFilter, File}

import scala.Array.canBuildFrom
import scala.collection.immutable.Stream.consWrapper
import scala.io.Source

import akka.actor.actorRef2Scala
import akka.actor.{Props, ActorSystem, ActorRef, Actor}
import akka.routing.RoundRobinRouter
import play.api.libs.json.Json
import play.libs.WS

object Indexer extends App {

  /////////////// start main /////////////////

  val props = properties(new File("conf/indexer.properties"))
  val server0 = List(props("serverName"), 
      props("indexName")).foldRight("")(_ + "/" + _)
  val server1 = List(props("serverName"), 
      props("indexName"), 
      props("mappingName")).foldRight("")(_ + "/" + _)

  indexFiles(props)
    
  /////////////// end main /////////////////
  
  def indexFiles(props: Map[String,String]): Unit = {
    val system = ActorSystem("ElasticSearchIndexer")
    val reaper = system.actorOf(Props[Reaper], name="reaper")
    val master = system.actorOf(Props(new IndexMaster(props, reaper)), 
      name="master")
    master ! StartMsg
  }

  //////////////// actor and message definitions //////////////////
  
  sealed trait EscMsg
  case class StartMsg extends EscMsg
  case class IndexMsg(file: File) extends EscMsg
  case class IndexRspMsg(status: Int) extends EscMsg
  
  class IndexMaster(props: Map[String,String], reaper: ActorRef) 
      extends Actor {
    val numIndexers = props("numIndexers").toInt
    val schema = Class.forName(props("schemaClass")).
      newInstance.asInstanceOf[Schema]
    val router = context.actorOf(Props(new IndexWorker(props)).
      withRouter(RoundRobinRouter(numIndexers)))
    
    var nreqs = 0
    var succs = 0
    var fails = 0
    
    def createIndex(): Int = sendToServer(server0, """
      {"settings": 
        {"index": 
          {"number_of_shards": %s,
           "number_of_replicas": %s}
      }}""".format(props("numShards"), props("numReplicas")), 
      false)
    
    def createSchema(): Int = sendToServer(server1 + "_mapping", 
      """{ "%s" : { "properties" : %s } }""".
      format(props("indexName"), schema.mappings), false)
    
    def receive = {
      case StartMsg => {
        val filefilter = Class.forName(props("filterClass")).
          newInstance.asInstanceOf[FileFilter]
        val files = walk(new File(props("rootDir"))).
          filter(f => filefilter.accept(f))
        createIndex()
        createSchema()
        for (file <- files) {
          nreqs = nreqs + 1
          router ! IndexMsg(file) 
        }
      }
      case IndexRspMsg(status) => {
        if (status == 0) succs = succs + 1 else fails = fails + 1
        val processed = succs + fails
        if (processed % 100 == 0)
          println("Processed %d/%d (success=%d, failures=%d)".
            format(processed, nreqs, succs, fails))
        if (nreqs == processed) {
          println("Processed %d/%d (success=%d, failures=%d)".
            format(processed, nreqs, succs, fails))
          reaper ! IndexRspMsg(-1)
          context.stop(self)
        }
      }
    }
  }
  
  class IndexWorker(props: Map[String,String]) extends Actor {
    
    val parser = Class.forName(props("parserClass")).
      newInstance.asInstanceOf[Parser]
    val schema = Class.forName(props("schemaClass")).
      newInstance.asInstanceOf[Schema]

    def addDocument(doc: Map[String,String]): Int = {
      val json = doc.filter(kv => schema.isValid(kv._1)).
        map(kv => if (schema.isMultiValued(kv._1)) 
          Json.toJson(kv._1) -> Json.toJson(kv._2.split(",").
            map(e => e.trim).toSeq)
          else Json.toJson(kv._1) -> Json.toJson(kv._2)).
        foldLeft("")((s, e) => s + e._1 + " : " + e._2 + ",")
      sendToServer(server1, "{" + json.substring(0, json.length - 1) + "}", true)
    }
    
    def receive = {
      case IndexMsg(file) => {
        val doc = parser.parse(Source.fromFile(file))
        sender ! IndexRspMsg(addDocument(doc))
      }
    }
  }
  
  class Reaper extends Actor {
    def receive = {
      case IndexRspMsg(-1) => {
        println("Shutting down ElasticSearchIndexer")
        context.system.shutdown 
      }
    }  
  }
  
  ///////////////// global functions ////////////////////
  
  def properties(conf: File): Map[String,String] = {
    Map() ++ Source.fromFile(conf).getLines().toList.
      filter(line => (! (line.isEmpty || line.startsWith("#")))).
      map(line => (line.split("=")(0) -> line.split("=")(1)))
  }  

  def walk(root: File): Stream[File] = {
    if (root.isDirectory) 
      root #:: root.listFiles.toStream.flatMap(walk(_))
    else root #:: Stream.empty
  }

  def sendToServer(server: String, payload: String, 
      usePost: Boolean): Int = {
    val rsp = if (usePost) WS.url(server).post(payload).get
              else WS.url(server).put(payload).get
    val rspBody = Json.parse(rsp.getBody)
    (rspBody \ "ok").asOpt[Boolean] match {
      case Some(true) => 0
      case _ => -1
    }
  }
}

Of course, the indexing application is intended to be useful beyond the Enron dataset. To that end, I define a set of extension points which can be implemented by someone intending to index some new data with my code above. Its modeled as a set of traits for which concrete implementations need to be provided.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package esc.index

import scala.io.Source
import play.api.libs.json.Json


/////////////// Parser /////////////////

/**
 * An implementation of the Parser trait must be supplied
 * by the user for each new data source. The parse() method
 * defines the parsing logic for the new content.
 */
trait Parser {
  
  /**
   * @param s a Source representing a file on local filesystem.
   * @return a Map of field name and value.
   */
  def parse(s: Source): Map[String,String]
}

/////////////// Schema /////////////////

/**
 * An implementation of the Schema trait must be supplied 
 * by the user for each new data source. The mappings() 
 * method is a JSON string containing the fields and their
 * properties. It can be used to directly do a put_mapping
 * call on elastic search. The base trait defines some 
 * convenience methods on the mapping string.
 */
trait Schema {
  
  /**
   * @return a JSON string representing the field names
   * and properties for the content source.
   */
  def mappings(): String

  /**
   * @param fieldname the name of the field.
   * @return true if field exists in mapping, else false.
   */
  def isValid(fieldname: String): Boolean = {
    lazy val schemaMap = Json.parse(mappings)
    (schemaMap \ fieldname \ "type").asOpt[String] match {
      case Some(_) => true
      case None => false
    }
  }
  
  /**
   * @param fieldname the name of the field.
   * @return true if field is declared as multivalued, else false.
   */
  def isMultiValued(fieldname: String): Boolean = {
    lazy val schemaMap = Json.parse(mappings)
    (schemaMap \ fieldname \ "multi_field").asOpt[String] match {
      case Some("yes") => true
      case Some("no") => false
      case None => false
    }
  }
}

And finally here are concrete implementation of these traits for the Enron dataset.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package esc.index

import scala.io.Source
import scala.collection.immutable.HashMap
import java.io.FileFilter
import java.io.File
import java.util.Date
import java.text.SimpleDateFormat

/**
 * User-configurable classes for the Enron data. These are
 * the classes that will be required to be supplied by a user
 * for indexing a new data source. 
 */

class EnronParser extends Parser {

  override def parse(source: Source): Map[String,String] = {
    parse0(source.getLines(), HashMap[String,String](), false)
  }
  
  def parse0(lines: Iterator[String], map: Map[String,String], 
      startBody: Boolean): Map[String,String] = {
    if (lines.isEmpty) map
    else {
      val head = lines.next()
      if (head.trim.length == 0) parse0(lines, map, true)
      else if (startBody) {
        val body = map.getOrElse("body", "") + "\n" + head
        parse0(lines, map + ("body" -> body), startBody)
      } else {
        val split = head.indexOf(':')
        if (split > 0) {
          val kv = (head.substring(0, split), head.substring(split + 1))
          val key = kv._1.map(c => if (c == '-') '_' else c).trim.toLowerCase
          val value = kv._1 match {
            case "Date" => formatDate(kv._2.trim)
            case _ => kv._2.trim
          }
          parse0(lines, map + (key -> value), startBody)
        } else parse0(lines, map, startBody)
      }
    }
  }
  
  def formatDate(date: String): String = {
    lazy val parser = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss")
    lazy val formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
    formatter.format(parser.parse(date.substring(0, date.lastIndexOf('-') - 1)))
  }
}

class EnronFileFilter extends FileFilter {
  override def accept(file: File): Boolean = {
    file.getAbsolutePath().contains("/sent/")
  }
}

class EnronSchema extends Schema {
  override def mappings(): String = """{
    "message_id": {"type": "string", "index": "not_analyzed", "store": "yes"},
    "from": {"type": "string", "index": "not_analyzed", "store": "yes"},
    "to": {"type": "string", "index": "not_analyzed", "store": "yes", 
           "multi_field": "yes"},
    "x_cc": {"type": "string", "index": "not_analyzed", "store": "yes", 
           "multi_field": "yes"},
    "x_bcc": {"type": "string", "index": "not_analyzed", "store": "yes", 
           "multi_field": "yes"},
    "date": {"type": "date", "index": "not_analyzed", "store": "yes"},
    "subject": {"type": "string", "index": "analyzed", "store": "yes"},
    "body": {"type": "string", "index": "analyzed", "store": "yes"}
  }"""
}

I did a very basic installation of ElasticSearch, just unzipping the distribution (my version is 0.19.11) and setting the cluster.name to "sujits_first_es_server" and network.bind_host to 127.0.0.1, then started on a separate terminal window with "bin/elasticsearch -f -Des.config.file=config/elasticsearch.yml". You can verify that its up on http://localhost:9200.

This is most likely misconfiguration on my part, but whenever I had the network up (wired or wireless), ElasticSearch would try to find clusters to replicate with. Ultimately I had to turn off networking on my computer to get all the data to load.

One more thing is that while the code runs to completion, it does not exit, I have to manually terminate it with CTRL+C. I suspect it is some Future waiting on the sendToServer method, since if I replace that with a NOOP returning 0, it does run to completion normally. I need to investigate this further.

I will describe the search subsystem in a future post (once I finish it).

Update 2012-11-20 - I made some changes to the indexer to use the current GA version of Play2 (2.9.1/2.0.4) instead of a later yet to be released version that I was using previously. I also changed the code to use the Scala WebService libs instead of the Java one that I was using previously. I had hoped that this would allow the application to terminate, but no luck there. I ended up having to start a Play embedded server because the WS calls kept complaining about no running application being found. Somehow the Play WS API seems to be overkill for this application, I am considering switching to spray instead. The latest code can be found on GitHub.

Be the first to comment. Comments are moderated to prevent spam.