Monday, March 19, 2012

Writing Lucene Records to SequenceFiles on HDFS

I've been looking at using algorithms from the Apache Mahout project, with a view to applying them on the data in my Cassandra database created using Nutch/GORA, and I have come to the conclusion that while being able to (write and) run Map-Reduce jobs directly against Cassandra or Lucene is cool, for maximum flexibility its preferable to use files as intermediate storage.

Couple of reasons for this. First, most "boxed" algorithms such as those Mahout provides require a specific format for input, and its much easier to just convert the data to a file format rather than worry about how to interface it directly to the datastore in question. Second, being able to pull the data out and experiment with it "offline" is easier because there are fewer dependencies to worry about.

One such flat file format popular in the Hadoop world is the SequenceFile. I've been meaning to check it out for a while now, and recently, an opportunity presented itself, in the form of a very large (~400 million records) Lucene index for which I needed to build a language model.

To build the model, I needed to pull out all the text for titles, authors and content out of the Lucene index into a set of SequenceFiles. The Lucene index is on a regular (ie non-HDFS) filesystem, and I wanted to read the index and write out the text into a SequenceFile in HDFS. This post describes the code I built to do this.

Here is the code to generate the sequence file(s). The code is heavily adapted from the examples provided here and here. Because of the size of the index, and because I had access to a fairly large multi-CPU box, I decided to partition the job using a simple hashmod partitioning scheme and run the partitions using GNU Parallel.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package com.mycompany.myapp.train;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.document.MapFieldSelector;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.CachingWrapperFilter;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryWrapperFilter;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.FSDirectory;

public class LuceneToSequenceFileGenerator {

  private static final int MAX_JOBS = 10;
  private static final int TITLE_WEIGHT = 8;
  private static final int AUTHOR_WEIGHT = 8;
  
  private String indexDir;
  private String seqfilesDir;
  private String hadoopDir;
  private int id;
  
  private void setIndexDir(String indexDir) {
    this.indexDir = indexDir;
  }

  private void setSequenceFilesDir(String seqfilesDir) {
    this.seqfilesDir = seqfilesDir;
  }

  private void setIndex(int id) {
    this.id = id;
  }
  
  private void setHadoopDir(String hadoopDir) {
    this.hadoopDir = hadoopDir;
  }
  
  private void generate() {
    IndexSearcher searcher = null;
    SequenceFile.Writer writer = null;
    try {
      Configuration conf = new Configuration();
      conf.addResource(new Path(FilenameUtils.concat(hadoopDir, 
        "conf/core-site.xml")));
      conf.addResource(new Path(FilenameUtils.concat(hadoopDir, 
        "conf/hdfs-site.xml")));
      FileSystem hdfs = FileSystem.get(conf);
      // check if path exists
      Path seqfilesPath = new Path(seqfilesDir);
      if (! hdfs.exists(seqfilesPath)) {
        usage("HDFS Directory " + seqfilesDir + " does not exist!");
        return;
      }
      // create writer based on the id passed in
      Path filename = new Path(FilenameUtils.concat(
        seqfilesDir, "indexpart-" + 
        StringUtils.leftPad(String.valueOf(id), 6, "0")));
      LongWritable key = new LongWritable();
      Text value = new Text();
      writer = SequenceFile.createWriter(
        hdfs, conf, filename, key.getClass(), value.getClass());
      // get the docids to work on from Lucene
      searcher = new IndexSearcher(FSDirectory.open(
        new File(indexDir)), true);
      FieldSelector selector = new MapFieldSelector(Arrays.asList(
        "title", "author", "body"));
      Query q = new MatchAllDocsQuery();
      Filter f = new CachingWrapperFilter(new QueryWrapperFilter(
        new TermQuery(new Term("filtername", "filtervalue"))));
      ScoreDoc[] hits = searcher.search(q, f, searcher.maxDoc()).scoreDocs;
      for (int i = 0; i < hits.length; i++) {
        int partition = i % MAX_JOBS;
        if (id != partition) {
          continue;
        }
        Document doc = searcher.doc(hits[i].doc, selector);
        String title = doc.get("title");
        String author = doc.get("author");
        String body = doc.get("body");
        key.set(Long.valueOf(i));
        value.set(constructValue(title, author, body));
        writer.append(key, value);
      }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      IOUtils.closeStream(writer);
      if (searcher != null) {
        try { searcher.close(); } 
        catch (IOException e) { e.printStackTrace(); }
      }
    }
  }

  private String constructValue(String title, String auth, String body) {
    StringBuilder buf = new StringBuilder();
    if (StringUtils.isNotEmpty(title)) {
      for (int i = 0; i < TITLE_WEIGHT; i++) {
        buf.append(title).append(" ");
      }
    }
    if (StringUtils.isNotEmpty(author)) {
      for (int i = 0; i < AUTHOR_WEIGHT; i++) {
        buf.append(author).append(" ");
      }
    }
    if (StringUtils.isNotEmpty(body)) {
      buf.append(body);
    }
    return buf.toString();
  }

  private static void usage(String error) {
    if (StringUtils.isNotEmpty(error)) {
      System.out.println("Error: " + error);
    }
    System.out.println("Usage: LuceneToSequenceFileConverter " +
      "index_dir seq_dir hadoop_dir id");
    System.out.println("where:");
    System.out.println("index_dir: non-HDFS path to Lucene index directory");
    System.out.println("seq_dir: HDFS path to sequence files directory");
    System.out.println("hadoop_dir: Base directory of hadoop installation");
    System.out.println("id: the integer id for this job");
  }
  
  public static void main(String[] args) {
    if (args.length != 4) {
      usage("Invalid number of arguments");
      return;
    }
    LuceneToSequenceFileGenerator generator = 
      new LuceneToSequenceFileGenerator();
    generator.setIndexDir(args[0]);
    generator.setSequenceFilesDir(args[1]);
    generator.setHadoopDir(args[2]);
    generator.setIndex(Integer.valueOf(args[3]));
    generator.generate();
  }
}

I then packaged the code into a JAR (its part of an existing application), and then built a shell script that sets the CLASSPATH (everything that the existing application needs as specified in the build.xml, plus the hadoop-core-1.0.1.jar and all the JARs in HADOOP_HOME/lib). To run it, I first create an empty directory in HDFS for this process:

1
2
hduser@bigmac:myapp$ /opt/hadoop-1.0.1/bin/hadoop fs -mkdir \
  /data/hadoop/myapp

Then I created a file called ids.txt in which I put in the numbers 0-10, one per line. This corresponds to the fourth argument to the shell script wrapper (lucene2seq) that is passed to it by GNU parallel. The argument serves as a way to determine a unique output filename, as well as to decide which instance will process a given Lucene document. Here is the shell script call.

1
2
hduser@bigmac:myapp$ cat ids.txt | parallel ./lucene2seq.sh \
  /path/to/my/index /data/hadoop/myapp /opt/hadoop-1.0.1

The next step is to use Hadoop and Map-Reduce to build the language model for this. Progress has been a bit slow on my personal experimentation front lately (What? Two weeks to come up with this? :-)), and is likely to remain so for the next couple of months. This is because I am taking the Stanford Online NLP course, and thats taking up a lot of my time. But on the bright side, its been very interesting so far, and I am learning quite a bit of stuff I didn't know (or didn't think to inquire about) before, so hopefully this will show up in the quality of my solutions going forward.

7 comments (moderated to prevent spam):

Sujit Pal said...

@MTK, yes, I moderate comments, and per your request, I am not posting your comment (I cannot post partial comments, blogger does not allow me to do that). Blogger also does not tell me your email address. If you post a comment with your email address, I can use that to contact you directly if you want (and not post that comment either, obviously).

Frank Scholten said...

Hi Sujit Pal,

Perhaps you are interested in the lucene2seq tool I wrote? See https://github.com/frankscholten/mahout/tree/lucene2seq and http://www.searchworkings.org/blog/-/blogs/using-your-lucene-index-as-input-to-your-mahout-job-part-i

I am still experimenting with the MapReduce version which uses the FileSystemDirectory, a Lucene Directory implementation for HDFS. It seems to work but I am not sure if this is a good approach in terms of performance.

Cheers,

Frank

Sujit Pal said...

Thanks Frank, I am starting to play around with Mahout, so I think I will probably have a need for your lucene2seq tool. Regarding M/R based lucene2seq, my $0.02 is that such a thing may not be required, since Lucene indexes are on non-HDFS filesystems. So the choice is to copy Lucene index into HDFS and then running M/R job with multiple mappers into multiple sequence files, or write a (multi-threaded) job to read the index sequentially and write into sequence file(s), then copy the sequence files into HDFS. Lucene access is quite fast, so I think the second option should be good enough for most purposes.

shandrykk said...

Sir,

I am doing Mtech project on hadoop,semantic based data retrieval. For that i hav made use of ontology.In order to query the ontology,used sparql. I need to retriev pdf documents stored in hdfs. Is it possible to do the same? Currently i used apache tika to read pdf documents,but couldnt do mapreduce implementation of search including all this. Could you please help me?

Sujit Pal said...

Hi shandrykk, I dont think Hadoop is a good mechanism to do search against. At most you will be able to run batch jobs against it. It might be more useful to store the PDF on the file system or as a byte array in an external DB system. For search you should take advantage of Lucene and its inverted index - if you parse the PDF into text using Apache Tika as you are doing now, you should be able to make the document searchable in Lucene. Once this data comes up in a search result, you can bring up the actual PDF file from the disk or external DB.

shandrykk said...

Thankyou for your reply,sir.. Actually I have done my project with mapreduce,but it is not effiecent as u said,when compare it with lucene and inverted index. I will make use of lucene and will check for the project efficency..Thanks sir and sorry for the late reply.

Sujit Pal said...

No problems about the delay, shadrykk, glad it was helpful.