Saturday, September 17, 2011

Using an Adjacency Map to match Multi-word Phrases

I recently run our entire taxonomy of approximately 1 million medical concepts through my UIMA Aggregate AE for taxonomy mapping described here, and it took 3 weeks. That's right, 3 weeks.

After I was done questioning my programming skills (or lack of it), I began wondering where all the time was being spent. Almost off the bat, I discovered that I had made the newbie mistake of not reusing cursors when reading from the database (its been a while since I've written straight JDBC code), resulting in the code opening and closing each cursor (some up to 20 times) for each of the 1M concepts. Still, that alone could not explain the long run time, so the next candidate was the UIMA AE itself.

Back in my CNET days, over one very late night, I learned to profile applications by inserting stopwatch calls into (my slow) code, and the lesson has stuck (thanks Adam :-)). I wanted to do the same thing here, ie, for a aggregate AE (consisting of a fixed flow of primitive AEs), I wanted to find the time taken within each primitive AE - then I could identify the AEs that needed improvement.

Since the primitive AE is controlled by the UIMA framework, the only way to get a handle to a StopWatch (the only way I know of, anyway) from within each primitive AE is to expose the StopWatch statically via a Singleton holder class, something like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Source: src/main/java/com/mycompany/tgni/uima/utils/StopwatchHolder.java
package com.mycompany.tgni.uima.utils;

import org.springframework.util.StopWatch;

public class StopwatchHolder {

  private static StopwatchHolder holder = new StopwatchHolder();
  private static StopWatch instance;
  
  private StopwatchHolder() {
    instance = new StopWatch();
  }
  
  public static StopWatch instance() {
    return instance;
  }
  
  public static void reset() {
    if (instance.isRunning()) {
      instance.stop();
    }
    instance = new StopWatch();
  }
}

Once that is done, its a simple matter of calling the start() and stop() methods on the underlying Stopwatch singleton (shown below in my modified code, in case you need to see it). Based on a run of the JUnit test that I used to test the aggregate AE, I discovered that the maximum time in the analysis is spent within the DictionaryAnnotator. Whats more, the DictionaryAnnotator is called 4 times (with different parameters) in a single pass through the aggregate AE, so improving the performance would probably be time well-spent. Here is the output of the StopWatch call before any changes were made.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
    [junit] StopWatch '': running time (millis) = 21
    [junit] -----------------------------------------
    [junit] ms     %     Task name
    [junit] -----------------------------------------
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00013  062%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00001  005%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00001  005%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00001  005%  DictionaryAnnotator[transform/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00001  005%  DictionaryAnnotator[transform/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00001  005%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00001  005%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00001  005%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00001  005%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 
    [junit] ------------- ---------------- ---------------

If you add up the times for the DictionaryAnnotator (except the very first call, which seems to be the framework lazily initializing the AE on the first call to process), the time spent in the DictionaryAnnotator accounts for 1/3 of the total runtime.

If you look at the code (you can find it in my old post here) its easy to see why it could be a problem. The code to shingle the input is essentially an O(n2) operation - the number of shingles produced for an n-word input is the sum of an arithmetic series (k n-word shingles, k-1 n-1 word shingles, and so on) - the sum is computed using the formula for Sn here. Each of these result in a map lookup of O(1).

On the other hand, using an adjacency map to store the collocated words for multi-word phrases in the dictionary, and scanning the input one word at a time, results in n lookups, each of O(1) for an input string of n words, so the complexity of such an algorithm would be O(n). So definitely there seems to be some scope for savings.

Here is the code for the updated DictionaryAnnotator.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// Source: src/main/java/com/mycompany/tgni/uima/annotators/keyword/DictionaryAnnotator.java
package com.mycompany.tgni.uima.annotators.keyword;

import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;

import org.apache.commons.lang.StringUtils;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.core.LowerCaseFilter;
import org.apache.lucene.analysis.core.WhitespaceTokenizer;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
import org.apache.lucene.util.Version;
import org.apache.uima.UimaContext;
import org.apache.uima.analysis_component.JCasAnnotator_ImplBase;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.jcas.JCas;
import org.apache.uima.resource.ResourceAccessException;
import org.apache.uima.resource.ResourceInitializationException;
import org.springframework.util.StopWatch;

import com.mycompany.tgni.uima.conf.SharedMapResource;
import com.mycompany.tgni.uima.conf.SharedSetResource;
import com.mycompany.tgni.uima.utils.StopwatchHolder;

public class DictionaryAnnotator extends JCasAnnotator_ImplBase {

  private String preserveOrTransform;
  private boolean ignoreCase;

  private Map<String,String> dictMap = new HashMap<String,String>();
  private Map<String,Set<String>> collocMap = new HashMap<String,Set<String>>();
  
  private final static String PRESERVE = "preserve";
  private final static String TRANSFORM = "transform";

  private class Word {
    public String word;
    public int start;
    public int end;
  }

  @Override
  public void initialize(UimaContext ctx) 
      throws ResourceInitializationException {
    super.initialize(ctx);
    preserveOrTransform = (String) ctx.getConfigParameterValue(
      "preserveOrTransform");
    ignoreCase = (Boolean) ctx.getConfigParameterValue("ignoreCase");
    try {
      if (PRESERVE.equals(preserveOrTransform)) {
        SharedSetResource res = (SharedSetResource) 
          ctx.getResourceObject("dictAnnotatorProperties");
        for (String dictPhrase : res.getConfig()) {
          dictMap.put(ignoreCase ? 
            StringUtils.lowerCase(dictPhrase) : dictPhrase, null);
        }
      } else if (TRANSFORM.equals(preserveOrTransform)) {
        SharedMapResource res = (SharedMapResource) 
          ctx.getResourceObject("dictAnnotatorProperties");
        Map<String,String> cfg = res.getConfig();
        for (String dictPhrase : cfg.keySet()) {
          dictMap.put(ignoreCase ? 
            StringUtils.lowerCase(dictPhrase) : dictPhrase, 
            cfg.get(dictPhrase));
        }
      } else {
        throw new ResourceInitializationException(
          new IllegalArgumentException(
          "Configuration parameter preserveOrTransform " +
          "must be either 'preserve' or 'transform'"));
      }
      for (String dictPhrase : dictMap.keySet()) {
        String[] words = StringUtils.split(dictPhrase, " ");
        String prevWord = words[0];
        for (int i = 1; i < words.length; i++) {
          if (! collocMap.containsKey(prevWord)) {
            collocMap.put(prevWord, new HashSet<String>());
          }
          collocMap.get(prevWord).add(words[i]);
          prevWord = "_" + words[i];
        }
      }
    } catch (ResourceAccessException e) {
      throw new ResourceInitializationException(e);
    }
  }
  
  @Override
  public void process(JCas jcas) 
      throws AnalysisEngineProcessException {
    StopWatch watch = StopwatchHolder.instance();
    watch.start(getClass().getSimpleName() + "[" + 
      preserveOrTransform + "/" + 
      (ignoreCase ? "ignoreCase" : "matchCase") + "]");
    try {
      Stack<Word> collocations = new Stack<Word>();
      String text = jcas.getDocumentText();
      WhitespaceTokenizer tokenizer = new WhitespaceTokenizer(
        Version.LUCENE_40, new StringReader(text));
      TokenStream tokenStream = ignoreCase ?
        new LowerCaseFilter(Version.LUCENE_40, tokenizer) : tokenizer;
      while (tokenStream.incrementToken()) {
        CharTermAttribute term = 
          (CharTermAttribute) tokenStream.getAttribute(
          CharTermAttribute.class);
        OffsetAttribute offset = 
          (OffsetAttribute) tokenStream.getAttribute(
          OffsetAttribute.class);
        Word word = new Word();
        word.word = term.toString();
        word.start = offset.startOffset();
        word.end = offset.endOffset();
        if (collocations.isEmpty()) {
          // no previous word in stack
          if (collocMap.containsKey(word.word)) {
            collocations.push(word);
          }
        } else {
          // previous word exists, part of phrase
          Word prevWord = collocations.peek();
          Set<String> nextWords = collocMap.get(prevWord.word);
          if (nextWords != null && nextWords.contains(word.word)) {
            word.word = "_" + word.word;
            collocations.push(word);
          } else {
            // complete phrase or single word found, check dictMap
            Word phrase = getPhrase(collocations);
            annotatePhrase(phrase, jcas);
            collocations.clear();
          }
        }
      }
      // end of input, handle trailing stacked words
      if (! collocations.isEmpty()) {
        Word phrase = getPhrase(collocations);
        annotatePhrase(phrase, jcas);
        collocations.clear();
      }
    } catch (IOException e) {
      throw new AnalysisEngineProcessException(e);
    }
    watch.stop();
  }

  private Word getPhrase(Stack<Word> collocations) {
    List<String> words = new ArrayList<String>();
    Word phrase = new Word();
    phrase.start = collocations.elementAt(0).start;
    phrase.end = 0;
    for (Iterator<Word> it = collocations.iterator(); it.hasNext(); ) {
      Word w = it.next();
      words.add(w.word.startsWith("_") ? w.word.substring(1) : w.word);
      phrase.end = w.end;
    }
    phrase.word = StringUtils.join(words.iterator(), " ");
    return phrase;
  }
  
  private void annotatePhrase(Word phrase, JCas jcas) {
    if (dictMap.containsKey(phrase)) {
      KeywordAnnotation annotation = new KeywordAnnotation(jcas);
      annotation.setBegin(phrase.start);
      annotation.setEnd(phrase.end);
      if (TRANSFORM.equals(preserveOrTransform)) {
        annotation.setTransformedValue(dictMap.get(phrase));
      }
      annotation.addToIndexes();
    }
  }
}

The dictionary terms are loaded into two maps in the initialize() method. The first map (dictMap) is a simple map that contains the word or phrase (for multi-word dictionary terms) on the LHS, and the synonym(s) on the RHS if transform is specified. The second one is the adjacency map (collocMap), that looks something like this in JSON format. Note that the value part is really a Set (for fast containment lookup) even though the notation indicates its a List.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
Dictionary Terms
================
vitamin a deficiency
vitamin d deficiency
canine vitamin k deficiency
sun burn

Equivalent Adjacency Map
========================
collocMap = {
  "vitamin" : [ "a", "d" ], 
  "_a" : [ "deficiency" ], 
  "_d" : [ "deficiency" ], 
  "canine" : [ "vitamin" ], 
  "_vitamin" : [ "k" ], 
  "_k" : [ "deficiency" ], 
  "sun" : [ "burn" ]
}

Only words in phrases are stored in the adjacency map. Words other than head words are prefixed with "_" to prevent the code from matching on partial phrases.

Running the JUnit test against the updated code results in the following timings.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
    [junit] StopWatch '': running time (millis) = 22
    [junit] -----------------------------------------
    [junit] ms     %     Task name
    [junit] -----------------------------------------
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00014  064%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00001  005%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00001  005%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00001  005%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00001  005%  DictionaryAnnotator[transform/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00001  005%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00001  005%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00001  005%  DictionaryAnnotator[transform/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00001  005%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 00000  000%  PatternAnnotator[preserve]
    [junit] 00000  000%  PatternAnnotator[transform]
    [junit] 00000  000%  DictionaryAnnotator[preserve/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/matchCase]
    [junit] 00000  000%  DictionaryAnnotator[preserve/ignoreCase]
    [junit] 00000  000%  DictionaryAnnotator[transform/ignoreCase]
    [junit] 
    [junit] ------------- ---------------- ---------------

Not much of a change, as you can see, but now the DictionaryAnnotator takes 1/4 of the total processing time in the test set. Figuring that perhaps my JUnit test strings did not exercise the algorithm enough, I then ran the loader (which calls the aggregate AE, and which took 3 weeks to complete the last time I ran it) over 10K concepts (1/100-th of the full dataset), using the old and new codes, and both times the job finished in about 45 minutes.

My conclusion is that O(n2) performance of the old code approximates the O(n) performance of the new code since n is quite small - most of my synonyms are 2-3 words long, with some outliers. So even though the data doesn't show significant improvement in performance, I will make the change anyway, since the new code has better performance characteristics.

Of course, extrapolating the numbers means that I still need 3.12 days to process the full dataset, which is still kind of high. Since the job lends itself well to parallelization, I am going to try doing that next.

Saturday, September 10, 2011

Multithreaded Proxy SOLR Search Handler

As a Lucene shop trying to migrate our somewhat unique search algorithms to SOLR, most of my experience has been on building custom handlers that wrap our algorithms so they can be served by SOLR. However, there are times when you can use SOLR's default handler with minor customizations.

One such opportunity presented itself when building the SOLR backend to a topic page. A topic page is basically a link farm of links that are contextually relevant to the topic (a disease or drug), classified into multiple facets. For example, a disease topic page could have links relevant to treatment, medication, symptoms, etc.

To power such a page, a set of queries are fired off against the SOLR server, each group (treatment, medication, symptom in the above example) corresponding to a single query. Generally the application would use some AJAX/Javascript magic to fire these queries off in parallel and have the individual parts of the page populate as the results became available.

In my case, I needed to provide a single call to the application that would get all the results at once (I guess they had enough AJAX/Javascript magic to deal with :-)). So I figured that it would be just as simple to move the multi-threading into the server. So basically I built a proxy search handler that took the topic name as its query, created different subqueries out of it for each group, and fired them off against itself in parallel using SolrJ. Once all the subqueries had returned results, the response is populated with a map of SolrDocumentList objects keyed by subquery name.

Its probably not a hugely unique idea or anything, but I have never read about anybody doing something like this, so I decided to put it out there, just in case someone found it useful. So anyway, here is how you configure it in solrconfig.xml:

1
2
3
4
5
6
7
8
9
  ...
  <requestHandler name="/mt" 
      class="com.mycompany.solr.handler.MTProxyRequestHandler">
    <lst name="defaults">
      <str name="echoParams">explicit</str>
      <str name="wt">xml</str>
    </lst>
  </requestHandler>
  ...

And here is what the code looks like:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// $Id: TopicRequestHandler.java,v 1.5 2011/09/06 21:49:01 spal Exp $
// $Source: /export/cvsrepository/mycompany/hl-solr/src/main/java/com/mycompany/solr/handler/TopicRequestHandler.java,v $
package com.mycompany.solr.handler;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.lucene.index.Term;
import org.apache.lucene.queryParser.ParseException;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.SolrRequest.METHOD;
import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.SearchHandler;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.QParser;
import org.apache.solr.search.QParserPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MTProxyRequestHandler extends SearchHandler {

  public static final String SUBQUERY_RESULTS_COMPONENT_NAME = 
    "subquery-results";
  
  private final Logger logger = LoggerFactory.getLogger(getClass());
  
  private CommonsHttpSolrServer solrServer;
  
  @SuppressWarnings("unchecked")
  @Override
  public void init(NamedList args) {
    super.init(args);
    try {
      this.solrServer = new CommonsHttpSolrServer("http://localhost:8080/solr");
      this.solrServer.setRequestWriter(new BinaryRequestWriter());
      this.solrServer.setParser(new BinaryResponseParser());
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public void inform(SolrCore core) {
    super.inform(core);
  }

  @Override
  public void handleRequestBody(SolrQueryRequest req, 
      SolrQueryResponse rsp) throws Exception {
    // extract parameters
    final SolrParams params = req.getParams();
    final String q = params.get(CommonParams.Q);
    final Map<String,List<SolrDocument>> facetResults = 
      new ConcurrentHashMap<String,List<SolrDocument>>();
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (final SubQuery sq : SubQuery.values()) {
      final Query subquery = sq.qparserPlugin.createParser(
        q, null, params, req).getQuery();
      executorService.submit(new Runnable() {
        @Override
        public void run() {
          try {
            // copy parameters from incoming request
            Map<String,String> paramMap = new HashMap<String,String>();
            for (Iterator<String> it = params.getParameterNamesIterator();
                it.hasNext(); ) {
              String key = it.next();
              paramMap.put(key, params.get(key));
            }
            // override the query string
            paramMap.put(CommonParams.Q, subquery.toString());
            SolrParams sqParams = new MapSolrParams(paramMap);
            QueryResponse proxyRsp = solrServer.query(sqParams, METHOD.POST);
            SolrDocumentList sdl = proxyRsp.getResults();
            facetResults.put(sq.name(), sdl);
          } catch (SolrServerException e) {
            logger.warn("Could not execute sub-query " + sq.name() + 
              " for :" + q, e);
          }
        }
      });
    }
    executorService.shutdown();
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    rsp.add(SUBQUERY_RESULTS_COMPONENT_NAME, facetResults);
  }
  
  // Set up the different QParserPlugin implementations, one for
  // each sub-query that will be sent in parallel.
  
  private enum SubQuery {
    sq1 (new MTProxyRequestHandler().new QParserPlugin1()),
    sq2 (new MTProxyRequestHandler().new QParserPlugin2());
    
    public QParserPlugin qparserPlugin;
    
    SubQuery(QParserPlugin qparserPlugin) {
      this.qparserPlugin = qparserPlugin;
    }
  };
  
  // each sub-query has a QParserPlugin and associated QParser
  // which generates the actual Query from the query string
  
  // Subquery 1
  private class QParserPlugin1 extends QParserPlugin {

    @Override
    public QParser createParser(String qstr, SolrParams localParams, 
        SolrParams params,
        SolrQueryRequest req) {
      return new QParser1(qstr, localParams, params, req);
    }

    @Override
    public void init(NamedList args) { /* NOOP */ }
  }

  private class QParser1 extends QParser {

    public QParser1(String qstr, SolrParams localParams, 
        SolrParams params, SolrQueryRequest req) {
      super(qstr, localParams, params, req);
    }

    @Override
    public Query parse() throws ParseException {
      return new TermQuery(new Term("f1", qstr));
    }
  }

  // Subquery 2
  private class QParserPlugin2 extends QParserPlugin {

    @Override
    public QParser createParser(String qstr, SolrParams localParams, 
        SolrParams params,
        SolrQueryRequest req) {
      return new QParser2(qstr, localParams, params, req);
    }

    @Override
    public void init(NamedList args) { /* NOOP */ }
  }
  
  private class QParser2 extends QParser {

    public QParser2(String qstr, SolrParams localParams, 
        SolrParams params, SolrQueryRequest req) {
      super(qstr, localParams, params, req);
    }

    @Override
    public Query parse() throws ParseException {
      return new TermQuery(new Term("f2", qstr));
    }
  }

  //////////////////////// SolrInfoMBeans methods //////////////////////

  @Override
  public String getDescription() {
    return "MT Proxy Handler";
  }

  @Override
  public String getSource() {
    return "$Source$";
  }

  @Override
  public String getSourceId() {
    return "$Id$";
  }

  @Override
  public String getVersion() {
    return "$Revision$";
  }
}

As you can see, its fairly straightforward. The different types of subqueries which need to be executed are enumerated by name (and associated QParserPlugin implementation) in the SubQuery enum. The input query string is passed to each QParser named in the corresponding QParserPlugin, which creates the subquery. Each subquery is executed in parallel against the local SOLR server using SolrJ. Once all the calls return, they are packaged up into the response. This structure (as opposed to a custom structure) is also easy for the client to parse, since its basically just a map of SolrDocumentList objects keyed by the subquery (group) name.

The QParsers here just create TermQueries against different fields in the index - I have kept it simple for ease of understanding. In reality the code does pretty hairy things to the input query string :-).

Regarding performance, my actual code gets results for 10 groups, and so far (on my dev indexes) performance is quite good. In any case, because the set of topics is finite, it is possible to autowarm these queries at startup.

In terms of portability and compliance, it would have probably been better if I had declared the QParserPlugin implementations in the solrconfig.xml file as shown in the SolrPlugins wiki page. However, these custom QParserPlugins are unlikely to be used outside the code, so I decided to just model them as inner classes.