Friday, September 25, 2009

Hierarchical Agglomerative Clustering with Hadoop

Hierarchical Agglomerative Clustering is a bottom up clustering approach where at each stage, we find the closest two documents (or document clusters) and merge them into a new cluster. The process continues until some convergence criteria is satisfied.

Last week, I described a Map-Reduce job to generate a TD Matrix from the articles on my blog. This week I use the TD Matrix data to cluster the articles using the Hierarchical Agglomerative Clustering algorithm. The code models a 1 + (n * 3) stage pipeline of Map-Reduce jobs, where n is the number of stages (number of documents - number of clusters).

The distance measure to compute the closeness between two documents (or document clusters) is cosine similarity. At each stage, two items are merged into one, so one of the criteria for convergence is when the remaining number of document (or document clusters) are less than a threshold value. The other convergence criterion is when merging a document into a document or cluster would result in a cluster that is "too large", ie over a similarity threshold value.

Driver Code

Here is the driver program. As you can see, the very first step is to normalize the term frequencies across the document - that way the document coordinates in the term space are comparable to each other. Then the code executes in a loop, where 3 Map-Reduce jobs are run in a pipeline until the convergence criteria is satisfied, then terminates. There is some description for each job in the comments, and I also describe each job in more detail below.

I have been using the "recommended" strategy of using inner public static classes so far, but that typically results in very long code, so I switched to using separate classes for the Mapper and Reducer classes. I think this style is easier to read and maintain, especially if you use an IDE, since you can follow [CTRL+Click] hyperlinks.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/HierarchicalAgglomerativeClusterer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * Hadoop Job to do Hierarchical Agglomerative (bottom up) clustering.
 * We start with a normalized set of term frequencies for each document.
 * At each stage, we find the two documents with the highest similarity
 * and merge them into a cluster "document". We stop when the similarity
 * between two documents is lower than a predefined threshold or when
 * the document set is clustered into a predefined threshold of clusters.
 */
public class HierarchicalAgglomerativeClusterer {

  // configuration:
  /** Don't cluster if similarity is below this threshold */
  public static final Float SIMILARITY_THRESHOLD = 0.05F;
  /** Maximum number of clusters to be created */
  public static final Long MAX_CLUSTERS = 10L;
  
  // keys: used by Mappers/Reducers
  public static final String INPUT_DIR_KEY = "input.dir";
  public static final String INPUT_SIMILARITY_MAP_DIR_KEY =
    "input.sim.mapdir";
  public static final String SIMILARITY_THRESHOLD_KEY =
    "similarity.threshold";

  // reporting:
  public enum Counters {REMAINING_RECORDS};

  /**
   * The input to this job is a text file of document ids mapped 
   * to a stringified list of raw term occurrences for each 
   * qualifying term in the document set. This method will do 
   * a self-join on the input file, and pass it to the reducer,
   * which will calculate and output the inter-document cosine
   * similarities.
   * @param conf the global Configuration object.
   * @param indir the input directory for the raw TFs.
   * @param outdir the output directory for the normalized TFs.
   * @throws Exception if thrown.
   */
  private static void normalizeFrequencies(
      Configuration conf, Path indir, Path outdir) throws Exception {
    Job job = new Job(conf, "normalize-freqs");
    job.setJarByClass(HierarchicalAgglomerativeClusterer.class);
    FileInputFormat.addInputPath(job, indir);
    FileOutputFormat.setOutputPath(job, outdir);
    job.setMapperClass(TfNormalizerMapper.class);
    job.setReducerClass(TfNormalizerReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setNumReduceTasks(2);
    boolean jobStatus = job.waitForCompletion(true);
    if (! jobStatus) {
      throw new Exception(job.getJobName() + " failed");
    }
  }

  /**
   * Does a self-join on the normalized TF file (for the current run),
   * and passes each pair to the reducer, which computes the inter-document
   * cosine similarity, and writes out the interdocument cosine similarities
   * as the output.
   * @param conf the global Configuration object.
   * @param dataInput the directory containing the normalized TF file.
   * @param simInput the directory containing the similarity data from the
   *        previous run (null in case of the first run).
   * @param simOutput the output directory where the new similarity data
   *        is written. At each stage, only the similarity data that was
   *        not computed previously is computed.
   * @param iteration the run number (0-based).
   * @throws Exception if thrown.
   */
  private static void computeSimilarity(Configuration conf, Path dataInput,
      Path simInput, Path simOutput, int iteration) throws Exception {
    Job job = new Job(conf, "compute-similarity/" + iteration);
    job.getConfiguration().set(INPUT_DIR_KEY, dataInput.toString());
    if (iteration > 0) {
      job.getConfiguration().set(
        INPUT_SIMILARITY_MAP_DIR_KEY, simInput.toString());
    }
    FileInputFormat.addInputPath(job, dataInput);
    FileOutputFormat.setOutputPath(job, simOutput);
    job.setJarByClass(HierarchicalAgglomerativeClusterer.class);
    job.setMapperClass(SelfJoinMapper.class);
    job.setReducerClass(SimilarityCalculatorReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(DoubleWritable.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setNumReduceTasks(2);
    boolean jobStatus = job.waitForCompletion(true);
    if (! jobStatus) {
      throw new Exception(job.getJobName() + " failed");
    }
  }

  /**
   * Runs through the similarity data, finds the pair with the highest
   * similarity. For documents that are part of that pair, the key is
   * set to the paired key (key1+key2) and sent to the reducer, where 
   * the coordinates of the merged cluster is computed by adding each
   * coordinate position and dividing by 2. Other records which are 
   * not part of the pair are passed through unchanged. The merging 
   * will happen only if the similarity is above the similarity threshold.
   * @param conf the global configuration object.
   * @param dataInput the input data file for this run, containing the
   *        normalized TFs.
   * @param dataOutput the output directory, which contains the new document
   *        set, after merging of the pair with the highest similarity.
   * @param simOutput the similarity output of the previous run, to 
   *        compute the most similar document/cluster pairs.
   * @param iteration the run number (0-based).
   * @throws Exception if thrown.
   */
  private static void clusterDocs(Configuration conf, Path dataInput,
      Path dataOutput, Path simOutput, int iteration) throws Exception {
    Job job = new Job(conf, "add-remove-docs/" + iteration);
    job.getConfiguration().setFloat(
      SIMILARITY_THRESHOLD_KEY, SIMILARITY_THRESHOLD);
    job.getConfiguration().set(
      INPUT_SIMILARITY_MAP_DIR_KEY, simOutput.toString());
    FileInputFormat.addInputPath(job, dataInput);
    FileOutputFormat.setOutputPath(job, dataOutput);
    job.setJarByClass(HierarchicalAgglomerativeClusterer.class);
    job.setMapperClass(DocClusteringMapper.class);
    job.setReducerClass(DocClusteringReducer.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setNumReduceTasks(2);
    boolean jobStatus = job.waitForCompletion(true);
    if (! jobStatus) {
      throw new Exception(job.getJobName() + " failed");
    }
  }

  private static long countRemainingRecords(Configuration conf, 
      Path dataOutput, Path countOutput) throws Exception {
    Job job = new Job(conf, "count-remaining-records");
    FileInputFormat.addInputPath(job, dataOutput);
    FileOutputFormat.setOutputPath(job, countOutput);
    job.setJarByClass(HierarchicalAgglomerativeClusterer.class);
    job.setMapperClass(RecordCountMapper.class);
    job.setReducerClass(RecordCountReducer.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    boolean jobStatus = job.waitForCompletion(true);
    if (jobStatus) {
      FileSystem fs = FileSystem.get(conf);
      FileStatus[] fstatuses = fs.listStatus(countOutput);
      for (FileStatus fstatus : fstatuses) {
        Path path = fstatus.getPath();
        if (! path.getName().startsWith("part-r")) {
          continue;
        }
        FSDataInputStream fis = fs.open(path);
        BufferedReader reader = 
          new BufferedReader(new InputStreamReader(fis));
        String line = reader.readLine();
        reader.close();
        fis.close();
        return Long.valueOf(StringUtils.split(line, "\t")[1]);
      }
    } else {
      throw new Exception(job.getJobName() + " failed");
    }
    return 0L;
  }
  
  /**
   * This is how we are called.
   * @param argv the input directory containing the raw TFs.
   * @throws Exception if thrown.
   */
  public static void main(String[] argv) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = 
      new GenericOptionsParser(conf, argv).getRemainingArgs();
    if (otherArgs.length != 1) {
      System.err.println("Usage hac <indir>");
      System.exit(-1);
    }
    Path indir = new Path(otherArgs[0]);
    Path basedir = indir.getParent();
    
    // phase 1: normalize the term frequency across each document
    normalizeFrequencies(conf, indir, new Path(basedir, "temp0"));

    int iteration = 0;
    long previousRemainingRecords = 0L;
    for (;;) {
      // set up constants for current iteration
      Path dataInput = new Path(basedir, "temp" + iteration);
      Path dataOutput = new Path(basedir, "temp" + (iteration + 1));
      Path simInput = new Path(basedir, "temp_sim" + iteration);
      Path simOutput = new Path(basedir, "temp_sim" + (iteration + 1));
      Path countOutput = new Path(basedir, "temp_count" + (iteration + 1));

      // phase 2: do self-join on input file and compute similarity matrix
      // inputs:  self-join on files from temp_${iteration}
      // reference: similarity matrix file from temp_sim_${iteration},
      //            null if iteration=0.
      // outputs: similarity matrix file into temp_sim_${iteration+1}
      computeSimilarity(conf, dataInput, simInput, simOutput, iteration);

      // phase 3: find most similar pair, add pair, remove components 
      // input: files from temp_${iteration}
      // reference: files from temp_sim_${iteration} to create matrix
      // output: files into temp_${iteration+1}
      clusterDocs(conf, dataInput, dataOutput, simOutput, iteration);

      // check for termination criteria: either our pre-set maximum 
      // clusters for the document set has been reached, or clustering
      // has converged, so any cluster that will be created is "too large".
      // This is checked for in the DocClusteringReducer and it will 
      // not merge the rows in that case.
      long numRemainingRecords = 
        countRemainingRecords(conf, dataOutput, countOutput);
      if (numRemainingRecords <= MAX_CLUSTERS ||
          numRemainingRecords == previousRemainingRecords) {
        break;
      }
      previousRemainingRecords = numRemainingRecords;
      iteration++;
    }
    System.out.println("Output in " + new Path(basedir, "temp" + iteration));
  }
}

Phase 1: TF Normalization

The first stage is to normalize the term frequencies across the document. This is done by simply dividing each element of the document vector by the sum of all the elements. The mapper parses the comma-separated list of term frequencies into a SparseVector, sums over all the elements, divides each element by the sum, then converts it back to the comma-separated list.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/TfNormalizerMapper.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.math.linear.OpenMapRealVector;
import org.apache.commons.math.linear.SparseRealVector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Normalizes each document vector by dividing each element by the
 * sum of all the elements. Passes to the Reducer.
 */
public class TfNormalizerMapper 
    extends Mapper<LongWritable,Text,Text,Text> {

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    String[] kvp = StringUtils.split(value.toString(), "\t");
    String[] frequencies = StringUtils.split(kvp[1], ",");
    SparseRealVector tf = new OpenMapRealVector(frequencies.length);
    for (int i = 0; i < frequencies.length; i++) {
      tf.setEntry(i, Double.valueOf(frequencies[i]));
    }
    double sum = tf.getL1Norm();
    SparseRealVector normalizedTfs = 
      new OpenMapRealVector(tf.mapDivide(sum));
    StringBuilder nbuf = new StringBuilder();
    int len = normalizedTfs.getDimension();
    for (int i = 0; i < len; i++) {
      if (i > 0) {
        nbuf.append(",");
      }
      nbuf.append(String.valueOf(normalizedTfs.getEntry(i)));
    }
    context.write(new Text(kvp[0]), new Text(nbuf.toString()));
  }
}

The reducer in this stage is a simple identity reducer. The end product of this transformation is the normalized TFs for each document.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/TfNormalizerReducer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * This is a simple identity reducer. It does no reduction, the record
 * is already created in the mapper, it simply picks the first (and only)
 * mapped record in the Iterable and writes it.
 */
public class TfNormalizerReducer extends Reducer<Text,Text,Text,Text> {

  @Override
  public void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException {
    Text value = values.iterator().next();
    context.write(key, value);
  }
}

Phase 2: Self-Join and Similarity Calculation

The mapper in this phase does a self join of each record in the input with all the other records, and passes this on to the reducer. The self-join is done by passing in the data file(s) from the previous phase as the input data and also as a configuration parameter. For each record in the input data, the file is opened, read, joined with the input data and closed. This does result in a lot of disk IO - perhaps a better way performance-wise would be to read the file(s) from the configuration parameter into an in-memory list, since I only have about 170 documents to consider. But I wanted to figure out a way to work with arbitarily large input files, so this seemed to be a more general way to do this.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/SelfJoinMapper.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import net.sf.jtmt.clustering.hadoop.agglomerative.HierarchicalAgglomerativeClusterer.Counters;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Does a map-side replicated join to produce a self-join of the 
 * input data files of normalized TFs. I could probably have built
 * a mapper which uses a in-memory structure for one of the sides 
 * of the join given the size of the data, but I am trying to do a 
 * POC, so the "real" data is likely to be larger. So the join 
 * repeatedly opens and closes one copy of the file that is joined 
 * with the main input file. The output is the self-joined file.
 */
public class SelfJoinMapper 
    extends Mapper<LongWritable,Text,Text,Text> {

  private String inputDir;
  private FileSystem fs;
  private int numRecords = 0;

  @Override
  public void setup(Context context) 
  throws IOException, InterruptedException {
    inputDir = context.getConfiguration().get(
      HierarchicalAgglomerativeClusterer.INPUT_DIR_KEY);
    if (inputDir == null) {
      System.err.println("Cant get value for key:" + 
        HierarchicalAgglomerativeClusterer.INPUT_DIR_KEY + ", abort");
      System.exit(-1);
    }
    fs = FileSystem.get(context.getConfiguration());
  }

  @Override
  public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {
    numRecords++;
    String[] lhsPair = StringUtils.split(value.toString(), "\t");
    FileStatus[] fstatuses = fs.listStatus(new Path(inputDir));
    for (FileStatus fstatus : fstatuses) {
      Path path = fstatus.getPath();
      if (! path.getName().startsWith("part-r")) {
        continue;
      }
      FSDataInputStream fis = fs.open(path);
      BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
      String line = null;
      while ((line = reader.readLine()) != null) {
        String[] rhsPair = StringUtils.split(line, "\t");
        Text newKey = new Text(
            StringUtils.join(new String[] {lhsPair[0], rhsPair[0]}, "/"));
        Text newValue = new Text(
            StringUtils.join(new String[] {lhsPair[1], rhsPair[1]}, "/"));
        context.write(newKey, newValue);
      }
      reader.close();
      fis.close();
    }
    // report back to the framework how many records are remaining
    // for debugging purposes.
    context.getCounter(Counters.REMAINING_RECORDS).increment(1L);
  }

  @Override
  public void cleanup(Context context) 
      throws IOException, InterruptedException {
  }
}

On the reducer side, the joined records are converted into term vectors and the cosine similarity calculated, and the output is the similarity matrix for the document-set, ie, a key-value pair containing the keys of the original documents and the similarity value. The similarity matrix from a previous run, if it exists, is fed into the reducer, so it does not have to recalculate the values that have already been calculated.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/SimilarityCalculatorReducer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.math.linear.OpenMapRealVector;
import org.apache.commons.math.linear.SparseRealVector;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Takes as input the self-join of the normalized TF data, and the 
 * similarity matrix data from the previous run (if it exists). Looks
 * up the similarity between a pair of keys (if it exists) and writes
 * it out to the output. If it does not exist, then it computes it.
 * The first time (when there is no previous run), the number of 
 * similarity computations is O(n**2). In later runs, it is O(n).
 */
public class SimilarityCalculatorReducer 
    extends Reducer<Text,Text,Text,DoubleWritable> {

  private String inputSimilarityMapDir;
  private FileSystem fs;
  private Map<String,Double> similarityMatrix = 
    new HashMap<String,Double>();

  @Override
  public void setup(Context context) 
      throws IOException, InterruptedException {
    inputSimilarityMapDir = context.getConfiguration().get(
      HierarchicalAgglomerativeClusterer.INPUT_SIMILARITY_MAP_DIR_KEY);
    if (inputSimilarityMapDir == null) {
      System.err.println("Warning: no input similarity map dir, ignoring");
    } else {
      fs = FileSystem.get(context.getConfiguration());
      FileStatus[] fstatuses = fs.listStatus(
        new Path(inputSimilarityMapDir));
      for (FileStatus fstatus : fstatuses) {
        Path path = fstatus.getPath();
        if (! path.getName().startsWith("part-r")) {
          continue;
        }
        FSDataInputStream fis = fs.open(path);
        BufferedReader reader = new BufferedReader(
          new InputStreamReader(fis));
        String line = null;
        while ((line = reader.readLine()) != null) {
          String[] kvpairs = StringUtils.split(line, "\t");
          List<String> docIds = Arrays.asList(
            StringUtils.split(kvpairs[0], "/"));
          Collections.sort(docIds);
          String mapKey = StringUtils.join(docIds.iterator(), "/");
          if (! similarityMatrix.containsKey(mapKey)) {
            similarityMatrix.put(mapKey, Double.valueOf(kvpairs[1]));
          }
        }
        reader.close();
        fis.close();
      }
    }
  }

  @Override
  public void reduce(Text key, Iterable<Text> values, Context context)
  throws IOException, InterruptedException {
    Text value = values.iterator().next();
    List<String> docIds = Arrays.asList(
      StringUtils.split(key.toString(), "/"));
    Collections.sort(docIds);
    String mapKey = StringUtils.join(docIds.iterator(), "/");
    if (! similarityMatrix.containsKey(mapKey)) {
      // compute the cosine similarity between the two documents
      String[] termFreqs = StringUtils.split(value.toString(), "/");
      SparseRealVector doc1 = buildDocVector(termFreqs[0]);
      SparseRealVector doc2 = buildDocVector(termFreqs[1]);
      double cosim = 
        doc1.dotProduct(doc2) / (doc1.getNorm() * doc2.getNorm());
      similarityMatrix.put(mapKey, cosim);
    }
    context.write(new Text(mapKey), 
      new DoubleWritable(similarityMatrix.get(mapKey)));
  }

  private SparseRealVector buildDocVector(String flist) {
    String[] freqs = StringUtils.split(flist, ",");
    SparseRealVector doc = new OpenMapRealVector(freqs.length);
    for (int i = 0; i < freqs.length; i++) {
      doc.setEntry(i, Double.valueOf(freqs[i]));
    }
    return doc;
  }
}

Phase 3: Clustering

The Mapper part of this job finds the document pair that has the maximum similarity from the similarity map generated in the previous run. This pair corresponds to the document (or document cluster) pair that will be merged in this iteration. The input to the Mapper is the normalized TF data generated in Phase 1 (or the output of the previous phase 3 job in the previous iteration). For each of these two documents, the key is replaced by the composite key containing both document ids in the pair and passed to the reducer. All other rows are passed through as is.

The mapper also checks to see if the maximum similarity found is greater than the similarity threshold, otherwise, it will not do any special handling. As a consequence, the number of documents in the clustered document set will be unchanged from the previous run. This fact is used in our termination criteria.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/DocClusteringMapper.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Uses the similarity map and the normalized TF data from the previous
 * run, and computes the pair with the maximum similarity. For the docs
 * that are part of this pair, it will change the key to be the combined
 * key for the cluster to be formed and pass it to the reducer. This will
 * only happen for 2 documents in each iteration. The other docs are passed
 * through unchanged. 
 */
public class DocClusteringMapper extends Mapper<LongWritable,Text,Text,Text> {

  private double similarityThreshold = 0.0D;
  private double maxSim = 0.0D;
  private String[] maxSimKeys = null;
  
  public void setup(Context context) 
      throws IOException, InterruptedException {
    similarityThreshold = context.getConfiguration().getFloat(
      HierarchicalAgglomerativeClusterer.SIMILARITY_THRESHOLD_KEY, 0.0F); 
    String simDir = context.getConfiguration().get(
      HierarchicalAgglomerativeClusterer.INPUT_SIMILARITY_MAP_DIR_KEY);
    if (simDir == null) {
      System.err.println("Cant get value for key: " + 
        HierarchicalAgglomerativeClusterer.INPUT_SIMILARITY_MAP_DIR_KEY + 
        ", abort");
      System.exit(-1);
    }
    FileSystem fs = FileSystem.get(context.getConfiguration());
    FileStatus[] fstatuses = fs.listStatus(new Path(simDir));
    for (FileStatus fstatus : fstatuses) {
      Path path = fstatus.getPath();
      if (! path.getName().startsWith("part-r")) {
        continue;
      }
      FSDataInputStream fis = fs.open(path);
      BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
      String line = null;
      while ((line = reader.readLine()) != null) {
        String[] kvpair = StringUtils.split(line, "\t");
        String[] docIdPair = StringUtils.split(kvpair[0], "/");
        if (docIdPair[0].equals(docIdPair[1])) {
          // same doc, expected max, so skip
          continue;
        }
        if (Double.valueOf(kvpair[1]) > maxSim) {
          maxSim = Double.valueOf(kvpair[1]);
          maxSimKeys = docIdPair;
        }
      }
      reader.close();
      fis.close();
    }
  }
  
  @Override
  public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {
    String[] kvpair = StringUtils.split(value.toString(), "\t");
    if (maxSim > similarityThreshold &&
       (kvpair[0].equals(maxSimKeys[0]) || 
        kvpair[0].equals(maxSimKeys[1]))) {
      // if either of the keys in maxSimKeys match the key in the 
      // record, then replace the key with the combo-key (this key
      // represents the "cluster")
      String newKey = StringUtils.join(maxSimKeys, "+");
      context.write(new Text(newKey), new Text(kvpair[1]));
    } else {
      // pass record through unchanged
      context.write(new Text(kvpair[0]), new Text(kvpair[1]));
    }
  }
}

On the reducer side, the two documents that are supposed to be merged are going to have the same composite key, so when this is encountered, the reduction is to average the coordinates of the two documents into one. All other documents are passed through unchanged.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Source: ./src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/DocClusteringReducer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.math.linear.OpenMapRealVector;
import org.apache.commons.math.linear.SparseRealVector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Reduces the normalized TF data produced by the mapper. The mapper
 * produced 2 documents with the same key, which are candidates for 
 * the new cluster. The reducer will compute the new cluster coordinates
 * using the coordinates of its components. The other documents are
 * written out unchanged.
 */
public class DocClusteringReducer extends Reducer<Text,Text,Text,Text> {

  public void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException {
    double numRecords = 0D;
    SparseRealVector doc = null;
    for (Text value : values) {
      String[] tfs = StringUtils.split(value.toString(), ",");
      if (doc == null) {
        doc = new OpenMapRealVector(tfs.length);
      }
      for (int i = 0; i < tfs.length; i++) {
        doc.setEntry(i, doc.getEntry(i) + Double.valueOf(tfs[i]));
      }
      numRecords++;
    }
    SparseRealVector cluster = 
      new OpenMapRealVector(doc.mapDivide(numRecords));
    int numTerms = cluster.getDimension();
    StringBuilder buf = new StringBuilder();
    for (int i = 0; i < numTerms; i++) {
      if (i > 0) {
        buf.append(",");
      }
      buf.append(String.valueOf(cluster.getEntry(i)));
    }
    // replace the "+" in the key with "," since its done clustering
    String newKey = StringUtils.replace(key.toString(), "+", ",");
    context.write(new Text(newKey), new Text(buf.toString()));
  }
}

Phase 4: Checking for termination

The last phase just checks for the number of clustered documents in the document set. If the number of clustered documents reach a certain (our MAX_CLUSTERS) threshold, or is unchanged from the previous run (because it hit our SIMILARITY_THRESHOLD), then the loop is terminated and the job ends. The mapper here is a simple one, it writes a constant key and a 1 for each line it sees.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/RecordCountMapper.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Mapper to count number of lines in a file. For each line sent to the
 * mapper, it sends a constant key and a 1 to the reducer to add.
 */
public class RecordCountMapper 
    extends Mapper<LongWritable,Text,Text,LongWritable> {

  private static final Text COUNT_KEY = new Text("count");
  private static final LongWritable ONE = new LongWritable(1L);
  
  @Override
  public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {
    context.write(COUNT_KEY, ONE);
  }
}

The reducer just counts it and returns the count. The code in the driver then reads the file and retrieves the count and returns it to the main method.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Source: src/main/java/net/sf/jtmt/clustering/hadoop/agglomerative/RecordCountReducer.java
package net.sf.jtmt.clustering.hadoop.agglomerative;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Sums up the number of line counts sent by the mapper and returns a 
 * single line of output.
 */
public class RecordCountReducer 
    extends Reducer<Text,LongWritable,Text,LongWritable> {

  @Override
  public void reduce(Text key, Iterable<LongWritable> values, 
      Context context) throws IOException, InterruptedException {
    long sum = 0;
    for (LongWritable value : values) {
      sum++;
    }
    context.write(key, new LongWritable(sum));
  }
}

Running

I've run this (for 3-5 iterations) on both a standalone and pseudo-distributed modes, and they run fine in both scenarios, although they take a fair amount of time. Here is the script I used to run it.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/bin/bash
# Source: src/main/scripts/hadoop/run_hac.sh
# Script to call Hierarchical Agglomerative Clusterer.

## CONFIG ##
M2_REPO=/home/sujit/.m2/repository
HADOOP_HOME=/opt/hadoop-0.20.0
PROJECT_BASEDIR=/home/sujit/src/jtmt
MODE=p # mode can be (l)ocal or (p)seudo-distributed
if [ $MODE == "l" ]; then
  PROTOCOL_PREFIX=$PROJECT_BASEDIR/src/test/resources
else
  PROTOCOL_PREFIX=hdfs://localhost:54310
fi
INPUT_DIR=/hac/input
TEMP_DIRS=/hac/temp*
## CONFIG ##

# for local mode
if [ $MODE == "l" ]; then
  export HADOOP_CLASSPATH=$CLASSPATH:\
$M2_REPO/commons-lang/commons-lang/2.1/commons-lang-2.1.jar:\
$M2_REPO/commons-math/commons-math/2.0/commons-math-2.0.jar
fi

cd $HADOOP_HOME
if [ $MODE == "l" ]; then
  rm -rf $PROTOCOL_PREFIX$TEMP_DIRS
  # no special packaging required for local mode
  bin/hadoop jar $PROJECT_BASEDIR/target/jtmt-1.0-SNAPSHOT.jar \
    net...HierarchicalAgglomerativeClusterer \
    $PROTOCOL_PREFIX$INPUT_DIR
else
  bin/hadoop fs -rmr $TEMP_DIRS
  bin/hadoop jar $PROJECT_BASEDIR/target/ha-clusterer.jar \
    $PROTOCOL_PREFIX$INPUT_DIR
fi
cd -
unset HADOOP_CLASSPATH

I wanted to run this on Amazon EC2, but at the time of writing this post, there were no freely available Amazon EC2 machine images that included Hadoop 0.20. Arun Jacob describes here how he created his own Hadoop-0.20 AMI. Chad Metcalfe of Cloudera pointed out another way to get around this situation by choosing a stock AMI and installing Hadoop/CDH2 on it. Ideally, I would probably just wait for the Hadoop 0.20 AMI to become available and then use it. However, in the event that it does not anytime soon, the second approach seems (to me anyway) the next best.