Sunday, January 29, 2012

Nutch/GORA - Delta Indexing

Background

Delta, or incremental indexing, is pretty much the main reason I am so excited about Nutch/GORA. Replacing segment files with database opens up the possibility of external programs modifying metadata in the database, and thereby modifying the pipeline's behavior to support delta indexing.

For web crawling (Nutch's default use case), the Adaptive Fetch Schedule is an excellent choice. This component (set by db.fetch.schedule.class) will decrease or increase the fetch interval based on whether the page has changed or not. However, if you wanted to use Nutch as an indexing pipeline for a Content Management System (CMS) for example, then you will need to offer the CMS folks a slightly more deterministic way to control what appears in the index.

When a CMS publishes a piece of content, the content should (within some predictable time interval) make it into the index (so it starts appearing in search results). On the other hand, when a CMS unpublishes a piece of content, it should disappear from the index. In this post, I propose an approach that does this, using Nutch/GORA (with custom plugins) and some supporting infrastructure.

Infrastructure

I have previously described using a HTTP server to stage content in a local filesystem to Nutch. I use a similar strategy here, except that my seed file for a given provider (or CMS) is now a dynamically generated "index" file, also served by the HTTP server, that lists all the content available from the provider. Something like this:

In line with this, my script that launches a CherryPy HTTP server now has an additional handler to dynamically generate the index page by recursively scanning the directory (/provider_index), as well as one that serves a named file from disk (/provider). Heres the code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#!/usr/bin/python
import cherrypy
import os
import os.path
import urllib

from cherrypy.lib.static import serve_file

SITES_DIR = "/path/to/your/sites"
SERVER_HOST = "localhost"
SERVER_PORT = 8080

def _accumulate_files(files, dirname, fnames):
  """
  This function gets called for every file (and directory) that is walked
  by os.path.walk. It accumulates the file names found into a flat array.
  The file names accumulated are relative to the providers directory.
  """
  for fname in fnames:
    abspath = os.path.join(dirname, fname)
    if os.path.isfile(abspath):
      abspath = abspath.replace(os.path.join(SITES_DIR, "providers"), "")[1:]
      files.append(abspath)

class Root:

  @cherrypy.expose
  def test(self, name):
    """
    Expose the mock site for testing.
    """
    return serve_file(os.path.join(SITES_DIR, "test", "%s.html" % (name)), \
      content_type="text/html")

  @cherrypy.expose
  def provider_index(self, name):
    """
    Builds an index page of links to all the files for the specified
    provider. The files are stored under sites/providers/$name. The
    function will recursively walk the filesystem under this directory
    and dynamically generate a flat list of links. Path separators in
    the filename are converted to "__" in the URL. The index page can
    be used as the seed URL for this content.
    """
    files = []
    os.path.walk(os.path.join(SITES_DIR, "providers", name), \
      _accumulate_files, files)
    index = "<html><head></head><body><ul>"
    for file in files:
      url = "http://%s:%s/provider/%s" % (SERVER_HOST, SERVER_PORT, \
        urllib.quote_plus(file.replace(os.path.sep, "__")))
      index += """
        <li><a href="%s">%s</a></li>
      """ % (url, url)
    index += "</ul></body></html>"
    return [index]

  @cherrypy.expose
  def provider(self, name):
    """
    Returns the contents of the XML file stored at the location 
    corresponding to the URL provided. The "__" in the URL are converted
    back to file path separators.
    """
    ct = None
    if name.endswith(".xml"):
      ct = "application/xml"
    elif name.endswith(".json"):
      ct = "application/json"
    if ct is None:
      return serve_file(os.path.join(SITES_DIR, "providers", \
        "%s" % name.replace("__", os.path.sep)), \
        content_type = "text/html")
    else:
      return serve_file(os.path.join(SITES_DIR, "providers", \
        "%s" % (urllib.unquote_plus(name).replace("__", os.path.sep))), \
        content_type = ct)

if __name__ == '__main__':
  current_dir = os.path.dirname(os.path.abspath(__file__))
  # Set up site-wide config first so we get a log if errors occur.
  cherrypy.config.update({'environment': 'production',
    'log.access_file': 'site.log',
    'log.screen': True,
    "server.socket_host" : SERVER_HOST,
    "server.socket_port" : SERVER_PORT})
  cherrypy.quickstart(Root(), '/')

My seed file now lists only the URL to the index file, as shown below:

1
http://localhost:8080/provider_index/prov1  u_idx=prov1

Delta Indexing Overview

As I mentioned earlier, the CMS can send either a publish or an unpublish request. A publish request can translate to an Add (if its a new page that doesn't exist in the database) or an Update. An unpublish request would translate to a Delete (or Hide) request at the database level.

For an Add request, we reset the fetch time of the provider page to the current time minus the fetch interval, making it eligible for a fetch in the next crawl run. Since it is dynamically generated off the file system, the added file is guaranteed to be in this file (since it exists in the filesystem). In the first iteration of the recrawl, the index page will be refetched and reparsed, and the second iteration will generate the newly added page(s) from the index page outlinks to the fetch list, and then to the index.

Of course, the index page itself should not make it into the index, so we have a custom Index Filter (described later) to filter these out by pattern.

Here is an alternative approach which relies on accurate Last-Modified headers and a more aggressive crawl schedule. With this approach, you don't need an index file or any custom handling for Adds and Updates, but you do need to crawl more frequently. I prefer an event based approach, which is what I have here.

For an Update request, we reset the fetch time of the page specified by the URL, similar to the index page. The first iteration of the next recrawl will pick the changes up and update it into the index.

For a Delete, we do two things - first, we delete the record from Solr. Then we mark the record deleted in Cassandra by setting the status to STATUS_GONE. We add logic to an indexing filter to filter out pages with this status from making it into the index.

Delta Indexing Tool

To do the publish and unpublish on the index, I built a tool (callable using full class name from bin/nutch). For this I built my package under nutch's src/java tree. I had to modify the nutch build.xml slightly to get my code to compile, specifically add the com/mycompany/nutch/**/*.java path to the javac includes in the "compile-core" target of nutch's build.xml.

I had initially planned on building it with GORA so it could be easily translated to other backends as well. The GORA tutorial has examples of retrieving and updating records by key. However, it turns out that the CassandraStore (the gora-cassandra implementation of the DataStore), has an empty implementation of get(String) that returns null.

So I finally settled on using the Hector API (which GORA also uses) to talk directly with the Cassandra database. I am using Hector's template API which feels similar to (and is probably inspired by) Spring's JdbcTemplate. Here is the code for this tool. Not as nice as using GORA (ie not database agnostic), but it'll have to do for now.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Source: src/java/com/mycompany/nutch/tools/DeltaHandler.java
package com.mycompany.nutch.tools;

import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.template.ColumnFamilyResult;
import me.prettyprint.cassandra.service.template.ColumnFamilyTemplate;
import me.prettyprint.cassandra.service.template.ColumnFamilyUpdater;
import me.prettyprint.cassandra.service.template.ThriftColumnFamilyTemplate;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.factory.HFactory;

import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.crawl.CrawlStatus;
import org.apache.nutch.util.TableUtil;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;

public class DeltaHandler {
  
  private static final Log LOG = LogFactory.getLog(DeltaHandler.class);
  
  private static final String PROVIDER_INDEX_URLPREFIX_KEY =
    "mycompany.provider.index.urlprefix";
  private static final String SOLR_URL = "mycompany.solr.url";
  private static final Utf8 U_IDX = new Utf8("u_idx");
  
  private String providerIndexUrlPrefix;
  private CommonsHttpSolrServer solrServer;
  private Keyspace keyspace;
  private ColumnFamilyTemplate<String,String> template;
  
  public DeltaHandler() {
    try {
      init();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
  
  private void init() throws Exception {
    Configuration conf = new Configuration();
    conf.addResource("nutch-default.xml");
    conf.addResource("nutch-site.xml");
    this.providerIndexUrlPrefix = conf.get(PROVIDER_INDEX_URLPREFIX_KEY);
    this.solrServer = new CommonsHttpSolrServer(conf.get(SOLR_URL));
    Cluster cluster = HFactory.getOrCreateCluster("Test Cluster", 
      new CassandraHostConfigurator("localhost:9160"));
    this.keyspace = HFactory.createKeyspace("webpage", cluster);
    this.template = new ThriftColumnFamilyTemplate<String,String>(
      keyspace, "f", StringSerializer.get(), StringSerializer.get()
    );
  }
  
  private void destroy() {
  }
  
  public void publish(String url, String idx) throws Exception {
    String key = TableUtil.reverseUrl(url);
    ColumnFamilyResult<String,String> res = 
      template.queryColumns(key);
    String bas = res.getString("bas"); 
    if (StringUtils.isEmpty(bas)) {
      // requested URL does not exist, should be an "add",
      // reset fetchtime for the index page
      key = TableUtil.reverseUrl(StringUtils.join(new String[] {
        providerIndexUrlPrefix, idx}, "/"));
      res = template.queryColumns(key);
      bas = res.getString("bas");
    }
    if (StringUtils.isEmpty(bas)) return;
    int fetchInterval = Integer.valueOf(res.getString("fi"));
    // update the fetch time to current - fetchInterval so
    // it is eligible for crawling immediately
    ColumnFamilyUpdater<String,String> updater =
      template.createUpdater(key);
    updater.setString("ts", String.valueOf(
      System.currentTimeMillis() - fetchInterval));
    template.update(updater);
  }
  
  public void unpublish(String url, String idx, boolean commit) 
      throws Exception {
    String key = TableUtil.reverseUrl(url);
    ColumnFamilyResult<String,String> res = template.queryColumns(key);
    String bas = res.getString("bas");
    if (StringUtils.isNotEmpty(bas)) {
      System.out.println("found it!");
      ColumnFamilyUpdater<String,String> updater = 
        template.createUpdater(key);
      updater.setString("st", String.valueOf(
        CrawlStatus.STATUS_GONE));
      template.update(updater);
      deleteFromSolr(key, commit);
    }
  }
  
  private void deleteFromSolr(String key, boolean commit) 
      throws Exception {
    solrServer.deleteById(key);
    if (commit) {
      solrServer.commit();
    }
  }
  
  private static void usage() {
    System.out.println(
      "Usage: DeltaHandler publish|unpublish url idx [commit]");
    System.out.println("commit = true|false, only for unpublish");
    System.exit(-1);
  }
  
  public static void main(String[] args) {
    String command = null;
    String url = null;
    String idx = null;
    Boolean commit = null;
    if (args.length > 0) {
      command = args[0];
      if (!("publish".equals(command)) && 
          !("unpublish".equals(command))) {
        usage();
      }
    }
    if ("publish".equals(command)) {
      if (args.length > 2) {
        url = args[1];
        idx = args[2];
      } else {
        usage();
      }
    } else if ("unpublish".equals(command)) {
      if (args.length > 3) {
        url = args[1];
        idx = args[2];
        commit = Boolean.valueOf(args[3]);
      } else {
        usage();
      }
    }
    DeltaHandler handler = null;
    try {
      handler = new DeltaHandler();
      if ("publish".equals(command)) {
        handler.publish(url, idx);
      } else {
        handler.unpublish(url, idx, commit);
      }
    } catch (Exception e) {
      LOG.error(e, e);
    } finally {
      if (handler != null) {
        handler.destroy();
      }
    }
  }
}

This is really only for testing. For high volume use, it would be fairly simple to have this logic live behind a webservice which the CMS code could call. But in any case, here are examples of command line usage of the tool above.

1
2
3
4
5
6
7
sujit@cyclone:local$ # publishing a page
sujit@cyclone:local$ bin/nutch com.mycompany.nutch.tools.DeltaHandler \
  publish http://localhost:8080/provider/prov1__1__000022.xml prov1
sujit@cyclone:local$ # unpublishing a page
sujit@cyclone:local$ bin/nutch com.mycompany.nutch.tools.DeltaHandler \
  unpublish http://localhost:8080/provider/prov1__1__000022.xml prov1 \
  true

I also added a few new properties for this tool (and for the plugin decribed below). Here are the additional properties from my nutch-site.xml file.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
<property>
  <name>mycompany.provider.index.urlprefix</name>
  <value>http://localhost:8080/provider_index</value>
  <description>The URL to the content server hosting the provider content  </description>
</property>

<property>
  <name>mycompany.solr.url</name>
  <value>http://localhost:8983/solr</value>
  <description>The SOLR URL to publish/unpublisth to</description>
</property>

<property>
  <name>mycompany.provider.index.pattern</name>
  <value>provider_index</value>
  <description>Pattern for "meta" pages listing other local pages. This
  page is needed for delta indexing to manage on-demand add/delete/update
  of pages from collections which need this feature. But it should not
  be indexed, so need to be removed during the indexing step.</description>
</property>

Delta Indexing Filter Plugin

As mentioned above, you probably don't want your index pages to make it into the index, since they are basically link farms and have no useful content (for the search user). So you want to suppress these pages from ever making it into the index.

The second class of pages would be the ones marked GONE by the unpublish command. The unpublish deletes the record from the Solr index, but you want to make sure that the page doesn't slip in on the next solrindex call. So we build an indexing filter which filters out these two category of pages.

The functionality above is built into the DeltaIndexFilter, a simple Nutch IndexFilter implementation. Here is the declaration for this filter from my plugin.xml file.

1
2
3
4
5
6
  <extension id="com.mycompany.nutch.indexer.provider"
      name="Delta Indexing related Page Index Suppressor"
      point="org.apache.nutch.indexer.IndexingFilter">
    <implementation id="mycompany-indexer-provider"
        class="com.mycompany.nutch.indexer.delta.DeltaIndexingFilter"/>
  </extension>

and here is the code for the DeltaIndexFilter.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/indexer/delta/DeltaIndexingFilter.jav
package com.mycompany.nutch.indexer.delta;

import java.util.Collection;
import java.util.HashSet;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.crawl.CrawlStatus;
import org.apache.nutch.indexer.IndexingException;
import org.apache.nutch.indexer.IndexingFilter;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.storage.WebPage.Field;

public class DeltaIndexingFilter implements IndexingFilter {

  private static final String PROVIDER_INDEX_PATTERN_KEY = 
    "mycompany.provider.index.pattern";
  
  private static final Set<WebPage.Field> FIELDS = 
    new HashSet<WebPage.Field>();
  static {
    FIELDS.add(WebPage.Field.STATUS);
    FIELDS.add(WebPage.Field.METADATA);
  }
  
  private Configuration conf;
  private String providerIndexPattern;
  
  @Override
  public NutchDocument filter(NutchDocument doc, String url, WebPage page)
      throws IndexingException {
    if (StringUtils.isEmpty(providerIndexPattern)) {
      return doc;
    } else {
      if (url.contains(providerIndexPattern) || 
          CrawlStatus.STATUS_GONE == page.getStatus()) {
        // do not index this page
        return null;
      } else {
        return doc;
      }
    }
  }

  @Override
  public Collection<Field> getFields() {
    return FIELDS;
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
    this.providerIndexPattern = conf.get(
      PROVIDER_INDEX_PATTERN_KEY);
  }
}

And thats pretty much it! Delta indexing in just a few lines of code (relatively speaking of course - this is Java we are talking about :-)), thanks to Nutch/GORA (which gives us the Cassandra database) and Hector (which gives us the API to write to it from outside Nutch).

Update - 2012-02-04: I managed to figure out how to implement the CassandraStore.get() method, so my DeltaHandler code does not have any (direct) Hector calls anymore, here it is.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Source: src/java/com/mycompany/nutch/tools/DeltaHandler.java
package com.mycompany.nutch.tools;

import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.gora.store.DataStore;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.crawl.CrawlStatus;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.Bytes;
import org.apache.nutch.util.TableUtil;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;

public class DeltaHandler {
  
  private static final Log LOG = LogFactory.getLog(DeltaHandler.class);
  
  private static final String PROVIDER_INDEX_URLPREFIX_KEY =
    "mycompany.provider.index.urlprefix";
  private static final String SOLR_URL = "mycompany.solr.url";
  private static final Utf8 U_IDX = new Utf8("u_idx");
  
  private String providerIndexUrlPrefix;
  private CommonsHttpSolrServer solrServer;
  private DataStore<String,WebPage> dataStore;
  
  public DeltaHandler() {
    try {
      init();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
  
  private void init() throws Exception {
    Configuration conf = new Configuration();
    conf.addResource("nutch-default.xml");
    conf.addResource("nutch-site.xml");
    this.providerIndexUrlPrefix = 
      conf.get(PROVIDER_INDEX_URLPREFIX_KEY);
    this.solrServer = new CommonsHttpSolrServer(
      conf.get(SOLR_URL));
    this.dataStore = StorageUtils.createDataStore(
      conf, String.class, WebPage.class);
  }

  private void destroy() {
    try {
      dataStore.close();
    } catch (Exception e) {
      LOG.error(e);
    }
  }
  
  /**
   * Could be an addition or update. If the URL is present
   * in the database, then it is considered to be an update
   * and the fetch time is reset to current time - fetch
   * interval. If the URL is not present in the database, it
   * is considered to be an add operation, and the meta index
   * page corresponding to the idx is reset, so it is recrawled.
   * @param url the URL to publish.
   * @param idx the value of the u_idx metadata.
   * @throws Exception if thrown.
   */
  public void publish(String url, String idx) throws Exception {
    LOG.info("Starting publish for url=" + url + ", u_idx=" + idx);
    String key = TableUtil.reverseUrl(url);
    LOG.info("key=" + key);
    WebPage page = dataStore.get(key);
    if (page == null) {
      // record does not exist, reset the index page
      String indexKey = TableUtil.reverseUrl(
        StringUtils.join(new String[] {
        providerIndexUrlPrefix, idx}, "/"));
      LOG.info("index key=" + indexKey);
      WebPage indexPage = dataStore.get(indexKey);
      LOG.info("is indexpage null?" + (indexPage == null));
      resetFetchTime(indexPage);
      dataStore.put(indexKey, indexPage);
      LOG.info("Completed publish for url=" + url + 
        ", u_idx=" + idx + ", reset fetch time for index page");
    } else {
      // record exists, reset its fetch time
      resetFetchTime(page);
      dataStore.put(key, page);
      LOG.info("Completed publish for url=" + url + 
        ", u_idx=" + idx + ", reset fetch time for page");
    }
  }

  private void resetFetchTime(WebPage page) {
    int fetchInterval = page.getFetchInterval();
    LOG.info("after get fetch interval=" + fetchInterval);
    page.setFetchTime(System.currentTimeMillis() - fetchInterval);
  }

  /**
   * Checks to see if the record exists in the database with
   * the specified u_idx metadata value. If so, deletes the
   * record from SOLR, then marks the record as GONE in database.
   * @param url the URL to unpublish.
   * @param idx the value of the u_idx parameter.
   * @throws Exception if thrown.
   */
  public void unpublish(String url, String idx, boolean commit) 
      throws Exception {
    LOG.info("Starting unpublish for url=" + url + 
      ", u_idx=" + idx + ", commit=" + commit);
    String key = TableUtil.reverseUrl(url);
    WebPage page = dataStore.get(key);
    if (page != null) {
      if (page.getMetadata().containsKey(U_IDX)) {
        String uIdx = Bytes.toString(Bytes.toBytes(
          page.getMetadata().get(U_IDX)));
        if (idx.equals(uIdx)) {
          page.setStatus(CrawlStatus.STATUS_GONE);
          dataStore.put(key, page);
          deleteFromSolr(key, commit);
          LOG.info("Completed unpublish for url=" + url + 
            ", u_idx=" + idx);
        }
      }
    }
  }

  private void deleteFromSolr(String key, boolean commit) 
      throws Exception {
    solrServer.deleteById(key);
    if (commit) {
      solrServer.commit();
    }
  }
  
  private static void usage() {
    System.out.println(
      "Usage: DeltaHandler publish|unpublish url idx [commit]");
    System.out.println("commit = true|false, only for unpublish");
    System.exit(-1);
  }
  
  public static void main(String[] args) {
    String command = null;
    String url = null;
    String idx = null;
    Boolean commit = null;
    if (args.length > 0) {
      command = args[0];
      if (!("publish".equals(command)) && 
          !("unpublish".equals(command))) {
        usage();
      }
    }
    if ("publish".equals(command)) {
      if (args.length > 2) {
        url = args[1];
        idx = args[2];
      } else {
        usage();
      }
    } else if ("unpublish".equals(command)) {
      if (args.length > 3) {
        url = args[1];
        idx = args[2];
        commit = Boolean.valueOf(args[3]);
      } else {
        usage();
      }
    }
    DeltaHandler handler = null;
    try {
      handler = new DeltaHandler();
      if ("publish".equals(command)) {
        handler.publish(url, idx);
      } else {
        handler.unpublish(url, idx, commit);
      }
    } catch (Exception e) {
      LOG.error(e, e);
    } finally {
      if (handler != null) {
        handler.destroy();
      }
    }
  }
}

The patch for CassandraStore.get() is available at GORA-93. Since it is applied against GORA 0.2-incubating, you will also need to patch your NutchGora branch with the patch from NUTCH-1205. Note that the reason why neither patch is applied at the moment is because of some failing unit tests traceable to the gora-sql module. The patches worked fine for me, but I am using only gora-cassandra, so YMMV.

Friday, January 20, 2012

Nutch/GORA - Parsing Custom XML

This week, I continue my exploration of Nutch/GORA to see if I can use it to parse and index custom (non-web) XML content. In addition to crawling selected medical sites for our search index, we also rely on content providers who provide us encyclopedic medical content in different areas. These providers periodically provide us data, each in their own custom XML format, which we parse and ingest into our indexes. In this post, I describe how I plan to do this using Nutch/GORA.

Nutch provides the XMLParser_Plugin as an add-on. It can be configured to parse a particular format using XPath expressions. I looked at it briefly, but I abandoned it because it wasn't clear how it would be able to parse multiple XML formats (one of my requirements).

Of course, one could settle on a common XML format as proposed in this proposal and then use this plugin. That is certainly an option, and it allows the parsing work to be farmed out - XML is almost like a second language for Java application programmers, unlike the backend types who do the crawling and indexing stuff. But the problem with this approach is the extra step, and the fact that the fetched content is no longer the original content we got.

The approach I came up with was to write a Nutch parse plugin that uses the u_idx metadata field (described in my previous post) to decide which parser to spin up for a given XML page. The XML parsers all take a String representing the fetched content and return a map of the parsed fields. This still allows the division of labor thing I spoke of earlier - the parsers are simple XML parsers, and the plugin deals with the details of updating the WebPage data back into Cassandra.

Exposing Content over HTTP

The first step is to expose the provider XML content over an HTTP interface. Right now I am developing/testing in local mode, but at some point I plan on running this in distributed mode, and having the content available over HTTP just makes more sense. So I added another handler to my CherryPy server to serve XML files with content type "application/xml". The updated code is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/python
import os.path
import cherrypy
from cherrypy.lib.static import serve_file

SITES_DIR="/path/to/my/data/directory"

class Root:

  @cherrypy.expose
  def test(self, name):
    return serve_file(os.path.join(SITES_DIR, "test", "%s.html" % (name)), \
      content_type="text/html")

  @cherrypy.expose
  def providers(self, name):
    return serve_file(os.path.join(SITES_DIR, "providers", \
      "%s" % (name.replace("__", "/"))), content_type="application/xml")

if __name__ == '__main__':
  current_dir = os.path.dirname(os.path.abspath(__file__))
  # Set up site-wide config first so we get a log if errors occur.
  cherrypy.config.update({'environment': 'production',
    'log.access_file': 'site.log',
    'log.screen': True,
    "server.socket_host" : "127.0.0.1",
    "server.socket_port" : 8080})
  cherrypy.quickstart(Root(), '/')

And my seed URL (generated by running a find command followed by a few text transformations in vim) looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
http://localhost:8080/providers/prov1__1__000001.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000002.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000003.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000004.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000005.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000006.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000007.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000008.xml  u_idx=prov1
http://localhost:8080/providers/prov1__1__000010.xml  u_idx=prov1
...

The XML files themselves are provided in a multi-directory format - path separators in the file name (relative to the root of the local site) become "__" in the URLs, and the handler makes the transformation to read and serve the appropriate file.

Parse Plugin Code

To enable the custom parsing, I added a parse plugin to my "mycompany" plugin project that uses the u_idx value to parse the content with the appropriate XML processor. I decided to use JDOM as my parsing library since this is already provided in the nutch distribution. Here is my updated plugin.xml file, the definition for the parse plugin is in the last block.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?xml version="1.0" encoding="UTF-8"?>
<!-- Source: src/plugin/mycompany/plugins.xml -->
<plugin id="mycompany" 
    name="Custom Company-specific plugins"
    version="1.0.0"
    provider-name="My Company">

  <runtime>
    <library name="mycompany.jar">
      <export name="*"/>
    </library>
    <library name="jdom-1.1.jar"/>
  </runtime>

  <requires>
    <import plugin="nutch-extensionpoints"/>
  </requires>

  <extension id="com.mycompany.nutch.indexer.usertags"
      name="User Tag Indexing Filter"
      point="org.apache.nutch.indexer.IndexingFilter">
    <implementation id="mycompany-indexer-usertags"
        class="com.mycompany.nutch.indexer.usertags.UserTagIndexingFilter"/>
  </extension>

  <extension id="com.mycompany.nutch.scoring.usertags"
      name="User Tag Scoring Filter"
      point="org.apache.nutch.scoring.ScoringFilter">
    <implementation id="mycompany-scoring-metadata"
        class="com.mycompany.nutch.scoring.usertags.UserTagScoringFilter"/>
  </extension>

  <extension id="com.mycompany.nutch.parse.xml"
      name="Provider XML Parser Plugin"
      point="org.apache.nutch.parse.Parser">
    <implementation id="mycompany-parse-provider-xml"
        class="com.mycompany.nutch.parse.xml.ProviderXmlParser">
      <parameter name="contentType" value="application/xml"/>
      <parameter name="pathSuffix" value="xml"/>
    </implementation>
  </extension>

</plugin>

The Java code for the parse plugin is shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/parse/xml/ProviderXmlParser.java
package com.mycompany.nutch.parse.xml;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.parse.Outlink;
import org.apache.nutch.parse.Parse;
import org.apache.nutch.parse.ParseStatusCodes;
import org.apache.nutch.parse.Parser;
import org.apache.nutch.storage.ParseStatus;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.storage.WebPage.Field;
import org.apache.nutch.util.Bytes;

public class ProviderXmlParser implements Parser {

  private static final Log LOG = LogFactory.getLog(ProviderXmlParser.class); 
  private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
  private static final Utf8 IDX_KEY = new Utf8("u_idx");
  
  static {
    FIELDS.add(WebPage.Field.METADATA);
    FIELDS.add(WebPage.Field.OUTLINKS);
  }

  private Configuration conf;
  
  @Override
  public Parse getParse(String url, WebPage page) {
    Parse parse = new Parse();
    parse.setParseStatus(new ParseStatus());
    parse.setOutlinks(new Outlink[0]);
    Map<Utf8,ByteBuffer> metadata = page.getMetadata();
    if (metadata.containsKey(IDX_KEY)) {
      String idx = Bytes.toString(Bytes.toBytes(metadata.get(IDX_KEY)));
      IProviderXmlProcessor processor = 
        ProviderXmlProcessorFactory.getProcessor(idx);
      if (processor != null) {
        try {
          LOG.info("Parsing URL:[" + url + "] with " + 
            processor.getClass().getSimpleName());
          Map<String,String> parsedFields = processor.parse(
              Bytes.toString(Bytes.toBytes(page.getContent())));
          parse.setText(parsedFields.get(ProviderXmlFields.content.name()));
          parse.setTitle(parsedFields.get(ProviderXmlFields.title.name()));
          // set the rest of the metadata back into the page
          for (String key : parsedFields.keySet()) {
            if (ProviderXmlFields.content.name().equals(key) ||
                ProviderXmlFields.title.name().equals(key)) {
              continue;
            }
            page.putToMetadata(new Utf8(key), 
              ByteBuffer.wrap(parsedFields.get(key).getBytes()));
          }
          parse.getParseStatus().setMajorCode(ParseStatusCodes.SUCCESS);
        } catch (Exception e) {
          LOG.warn("Parse of URL: " + url + " failed", e);
          LOG.warn("content=[" + Bytes.toString(Bytes.toBytes(
            page.getContent())) + "]");
          parse.getParseStatus().setMajorCode(ParseStatusCodes.FAILED);
        }
      }
    }
    return parse;
  }

  @Override
  public Collection<Field> getFields() {
    return FIELDS;
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }
}

As you can see, the getParse() method reads the u_idx metadata from the page, and based on that asks the ProviderXmlProcessorFactory for the appropriate processor. The factory returns the appropriate IProviderXmlProcessor implementation for the content type (or a null if no implementation can be found). The IProviderXmlProcessor takes the content as a string, parses it with custom logic and returns a Map of metadata names and values. The plugin than populates the Parse object and directly updates the WebPage with the metadata. Here is the rest of the code, without too much explanation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/parse/xml/ProviderXmlProcessorFactory.java
package com.mycompany.nutch.parse.xml;

import java.util.HashMap;
import java.util.Map;

public class ProviderXmlProcessorFactory {

  private static ProviderXmlProcessorFactory FACTORY = 
    new ProviderXmlProcessorFactory();
  private Map<String,IProviderXmlProcessor> processors = 
    new HashMap<String,IProviderXmlProcessor>();
  
  private ProviderXmlProcessorFactory() {
    processors.put("prov1", new Prov1XmlProcessor());
    // no more for now
  }
  
  public static IProviderXmlProcessor getProcessor(String idx) {
    return FACTORY.processors.get(idx);
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/parse/xml/IProviderXmlProcessor.java
package com.mycompany.nutch.parse.xml;

import java.util.Map;

public interface IProviderXmlProcessor {

  public Map<String,String> parse(String content) throws Exception;

}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/parse/xml/Prov1XmlProcessor.java
package com.mycompany.nutch.parse.xml;

import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Map;

import org.jdom.Document;
import org.jdom.Element;
import org.jdom.input.SAXBuilder;
import org.xml.sax.InputSource;

public class Prov1XmlProcessor implements IProviderXmlProcessor {

  @Override
  public Map<String,String> parse(String content) throws Exception {
    Map<String,String> parsedFields = new HashMap<String,String>();
    SAXBuilder builder = new SAXBuilder();
    Document doc = builder.build(new InputSource(
      new ByteArrayInputStream(content.getBytes())));
    Element root = doc.getRootElement();
    parsedFields.put(ProviderXmlFields.u_lang.name(), 
      root.getAttributeValue("language"));
    parsedFields.put(ProviderXmlFields.title.name(), 
      root.getAttributeValue("title"));
    parsedFields.put(ProviderXmlFields.u_category.name(), 
      root.getAttributeValue("subContent"));
    parsedFields.put(ProviderXmlFields.u_contentid.name(), 
      root.getAttributeValue("genContentID"));
    Element versionInfo = root.getChild("versionInfo");
    if (versionInfo != null) {
      parsedFields.put(ProviderXmlFields.u_reviewdate.name(), 
        ProviderXmlParserUtils.convertToIso8601(
        versionInfo.getAttributeValue("reviewDate")));
      parsedFields.put(ProviderXmlFields.u_reviewers.name(), 
        versionInfo.getAttributeValue("reviewedBy"));
    }
    parsedFields.put(ProviderXmlFields.content.name(), 
      ProviderXmlParserUtils.getTextContent(root));
    return parsedFields;
  }
}

In addition, I built some standard functions and put them in their own utility class, and an enum which lists the fields available.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Source: /src/plugin/mycompany/src/java/com/mycompany/nutch/parse/xml/ProviderXmlParserUtils.java
package com.mycompany.nutch.parse.xml;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jdom.Element;

public class ProviderXmlParserUtils {

  private static final Log LOG = LogFactory.getLog(ProviderXmlParserUtils.class);
  
  /////////// normalize incoming dates to ISO-8601 ///////////
  
  private static final String[] DATE_PATTERNS = new String[] {
    "yyyyMMdd",
    "MM/dd/yyyy",
  };
  private static final SimpleDateFormat ISO_8601_DATE_FORMAT =
    new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
  
  public static String convertToIso8601(String date) {
    try {
      Date d = DateUtils.parseDate(date, DATE_PATTERNS);
      return ISO_8601_DATE_FORMAT.format(d);
    } catch (Exception e) {
      LOG.warn("Cannot convert date: " + date + " to ISO-8601 format", e);
      return "";
    }
  }

  ///////////// get text content of XML //////////////////
  
  public static String getTextContent(Element root) {
    StringBuilder buf = new StringBuilder();
    getTextContent_r(buf, root);
    return buf.toString();
  }
  
  @SuppressWarnings("unchecked")
  private static void getTextContent_r(StringBuilder buf, Element e) {
    buf.append(e.getTextTrim());
    List<Element> children = e.getChildren();
    for (Element child : children) {
      getTextContent_r(buf, child);
    }
    buf.append(" ");
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/parse/xml/ProviderXmlFields.java
package com.mycompany.nutch.parse.xml;

public enum ProviderXmlFields {

  // common
  title,
  content,
  // prov1 
  u_lang,
  u_category,
  u_contentid,
  u_reviewdate,
  u_reviewers,
}

Note that while I have put all the classes in the same package for convenience, there is nothing preventing someone (and is probably recommended in an environment where you want to move the XML parsing work to a different group) from putting everything other than the actual plugin into a separate project and declaring the JAR from that project as a dependency.

To test it, I repeatedly dropped the webpage from Cassandra, ran inject, generate, fetch and parse and looked at the Cassandra records usng my display_webpage.py script and at error messages in logs/hadoop.log.

The first time I ran this code (well, not the first time, but you get the idea), I got the following exception in the logs/hadoop.log.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
2012-01-18 15:28:00,860 INFO  parse.ParserFactory - The parsing plugins: 
[org.apache.nutch.parse.tika.TikaParser] are enabled via the plugin.includes 
system property, and all claim to support the content type application/xml, 
but they are not mapped to it  in the parse-plugins.xml file
2012-01-18 15:28:01,219 ERROR tika.TikaParser - Error parsing http://localhost:
8080/providers/prov1__1__000008.xmlorg.apache.tika.exception.TikaException: XML
parse error at org.apache.tika.parser.xml.XMLParser.parse(XMLParser.java:71)
        at org.apache.nutch.parse.tika.TikaParser.getParse(TikaParser.java:117)
        at org.apache.nutch.parse.ParseCallable.call(ParseCallable.java:36)
        at org.apache.nutch.parse.ParseCallable.call(ParseCallable.java:23)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.lang.Thread.run(Thread.java:680)

This told me that the (preconfigured) Tika XML parser was getting to my data first, which I did not want, so I disabled it in the nutch-site.xml plugin.includes property. My plugin.includes now looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
<!-- From this:
<property>
  <name>plugin.includes</name>
  <value>protocol-http|urlfilter-regex|parse-(html|tika)|\
         index-(basic|anchor)|urlnormalizer-(pass|regex|\
         basic)|scoring-opic|mycompany</value>
  <description/>
</property>

to this: -->

<property>
  <name>plugin.includes</name>
  <value>protocol-http|urlfilter-regex|parse-html|\
         index-(basic|anchor)|urlnormalizer-(pass|regex|\
         basic)|scoring-opic|mycompany</value>
  <description/>
</property>

Indexing

After this, a couple of iterations later, I got a clean parse run. I followed that up with updatedb, then modified my old UserTagIndexingFilter to pull all metadata columns whose key is prefixed with u_ (meaning user-defined). Here is the updated code for the UserTagIndexingFilter.java.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Source: src/plugin/mycompany/src/java/com/mycompany/nutch/indexer/usertags/UserTagIndexingFilter.java
package com.mycompany.nutch.indexer.usertags;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.indexer.IndexingException;
import org.apache.nutch.indexer.IndexingFilter;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.storage.WebPage.Field;
import org.apache.nutch.util.Bytes;
import org.apache.nutch.util.TableUtil;

public class UserTagIndexingFilter implements IndexingFilter {

  private static final Log LOG = LogFactory.getLog(UserTagIndexingFilter.class);
  private static final String INHERITABLE_USERTAGS_LIST_PROP = 
    "mycompany.usertags.inheritable";
  
  private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
  
  static {
    FIELDS.add(WebPage.Field.METADATA);
  }
  
  private Configuration conf;
  private Set<String> inheritableUserTags;
  
  @Override
  public NutchDocument filter(NutchDocument doc, String url, 
      WebPage page) throws IndexingException {
    LOG.info("Adding user tags for page:" + url);
    Map<Utf8,ByteBuffer> metadata = page.getMetadata();
    for (Utf8 key : metadata.keySet()) {
      String keyStr = TableUtil.toString(key);
      if (StringUtils.isEmpty(keyStr)) {
        continue;
      }
      if (keyStr.startsWith("u_")) {
        String value = Bytes.toString(
          Bytes.toBytes(metadata.get(key)));
        doc.add(keyStr, value);
      }
    }
    return doc;
  }

  @Override
  public Configuration getConf() {
    return conf;
  }

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }

  @Override
  public Collection<Field> getFields() {
    return FIELDS;
  }
}

I also needed to add the new fields I was pulling out to the conf/solrindex-mapping.xml:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
<?xml version="1.0" encoding="UTF-8"?>
<!-- conf/solrindex-mapping.xml -->
<mapping>
  ...
  <fields>
    <field dest="content" source="content"/>
    <!-- more nutch defined fields here -->
    ...
    <!-- user defined -->
    <field dest="u_idx" source="u_idx"/>
    <field dest="u_contentid" source="u_contentid"/>
    <field dest="u_category" source="u_category"/>
    <field dest="u_lang" source="u_lang"/>
    <field dest="u_reviewdate" source="u_reviewdate"/>
    <field dest="u_reviewers" source="u_reviewers"/>
  </fields>
  <uniqueKey>id</uniqueKey>
</mapping>

and to the Solr schema.xml file (followed by a Solr restart).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
<?xml version="1.0" encoding="UTF-8" ?>
<!-- Source: conf/schema.xml (copied to solr's example/solr/conf) -->
<schema name="nutch" version="1.4">
  ...
        <field name="u_idx" type="string" stored="true" indexed="true"/>
        <field name="u_contentid" type="string" stored="true" indexed="true"/>
        <field name="u_category" type="string" stored="true" indexed="true"/>
        <field name="u_lang" type="string" stored="true" indexed="true"/>
        <field name="u_reviewdate" type="string" stored="true" indexed="false"/>
        <field name="u_reviewers" type="string" stored="true" indexed="false"/>
  ...
</schema>

Running the nutch solrindex job now promotes all the XML files (along with the parsed metadata) into the Solr index.

Conclusion

The approach described works well when an XML document corresponds to one WebPage record in Cassandra (and one Solr record). But in some cases, we parse the file up into sections and do other stuff with it. In such cases, it may be better to go with the intermediate XML markup approach described in the proposal above, or even a hybrid approach where the original file goes through the pipeline as described, then goes through an additional external parse phase which produces section files (for example) in the intermediate XML format. I haven't thought through this thing in great detail though, I plan on doing this after I am done checking out the basic stuff.