Wednesday, April 03, 2013

A Newspaper Clipping Service with Cascading


This post describes a possible implementation for an automated Newspaper Clipping Service. The end-user is a researcher (or team of researchers) in a particular discipline who registers an interest in a set of topics (or web-pages). An assistant (or team of assistants) then scour information sources to find more documents of interest to the researcher based on these topics identified. In this particular case, the information sources were limited to a set of "approved" newspapers, hence the name "Newspaper Clipping Service". The goal is to replace the assistants with an automated system.

The solution I came up with was to analyze the original web pages and treat keywords extracted out of these pages as topics, then for each keyword, query a popular search engine and gather the top 10 results from each query. The search engine can be customized so the sites it looks at is restricted by the list of approved newspapers. Finally the URLs of the results are aggregated together, and only URLs which were returned by more than 1 keyword topic are given back to the user.

The entire flow can be thought of as a series of Hadoop Map-Reduce jobs, to first download, extract and count keywords from (web pages corresponding to) URLs, and then to extract and count search result URLs from the keywords. I've been wanting to play with Cascading for a while, and this seemed like a good candidate, so the solution is implemented with Cascading.

I have used Scalding in the past, but it seems to me that while Scalding's collection-like interface is easier to work with, its harder to extend. So even though I think I could have done this in Scalding without any problems, the objective was to learn Cascading, so I used that instead. I initially started using Cascading with Scala (I write enough Java code at work :-)), but Cascading's use of generics (at least some of it) is too complicated for Scala's type inference system, so I fell back to using Java instead*.

One can write Cascading code in local mode, which uses in-memory data structures and the local filesystem, or in hadoop mode, which uses Hadoop and HDFS. Since this was a learning exercise, I decided to use local mode. To move it to Hadoop, one would have to use Hadoop specific FlowControllers and Taps instead. Here is the code for the Main (callable) class. The entire Maven project is available on my GitHub page.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// Source: src/main/java/com/mycompany/newsclip/Main.java
package com.mycompany.newsclip;

import java.util.Properties;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.operation.Aggregator;
import cascading.operation.Filter;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.expression.ExpressionFilter;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.local.TextLine;
import cascading.tap.Tap;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;

public class Main {

  @SuppressWarnings("rawtypes")
  public static void main(String[] args) {
    // handle input parameters
    String input = args[0];
    String output = args[1];

    Fields urlFields = new Fields("num", "line");
    Tap iTap = new FileTap(new TextLine(urlFields), input);
    
    Fields kFields = new Fields("kword");
    Tap oTap = new FileTap(new TextLine(kFields), output);

    Pipe pipe = new Pipe("keyword");
    
    // read urls, download, clean and extract keywords (1:n)
    Function kFun = new KeywordExtractFunction(kFields);
    pipe = new Each(pipe, urlFields,  kFun);
    
    // group by word and count it
    pipe = new GroupBy(pipe, kFields);
    Aggregator kCount = new Count(new Fields("count"));
    pipe = new Every(pipe, kCount);
    
    // filter out words with count < 1
    Filter kCountFilter = new ExpressionFilter("$1 <= 1", Integer.class);
    pipe = new Each(pipe, kCountFilter);
    
    // pass the keywords to our custom google search
    Fields kcFields = new Fields("kword", "count");
    Fields uFields = new Fields("url");
    Function uFun = new UrlExtractFunction(uFields);
    pipe = new Each(pipe, kcFields, uFun);
    
    // group by url and count it
    pipe = new GroupBy(pipe, uFields);
    Aggregator uCount = new Count(new Fields("count"));
    pipe = new Every(pipe, uCount);
    
    // filter out urls that occur once
    Filter uCountFilter = new ExpressionFilter("$1 <= 1", Integer.class);
    pipe = new Each(pipe, uCountFilter);
    
    // remove the count value
    pipe = new Each(pipe, Fields.ALL, new Identity(), Fields.FIRST);
    
    FlowDef flowDef = FlowDef.flowDef().
      setName("newsclip").
      addSource(pipe, iTap).
      addTailSink(pipe,  oTap);
    
    Properties props = new Properties();
    AppProps.setApplicationJarClass(props, Main.class);
    FlowConnector flowConnector = new LocalFlowConnector(props);

    Flow flow = flowConnector.connect(flowDef);
    flow.writeDOT("data/newsclip.dot");
    flow.complete();
  }
}




The corresponding Graphviz DOT file for the assembly (generated by flow.writeDOT in the code above) is shown at left. I converted it to a web-displayable PNG file using the command "dot -Tpng newsclip.dot -o newsclip.png".

The code above uses built-in functions and filters where available, but the core operations are done by custom functions. The KeywordExtractFunctionTest extracts a set of keywords from a web page given its URL. It uses Boilerpipe to extract the relevant plain text from a web page, and my implementation of the RAKE algorithm to extract keywords from the plain text.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Source: src/main/java/com/mycompany/newsclip/KeywordExtractFunction.java
package com.mycompany.newsclip;

import java.io.InputStream;
import java.net.URL;
import java.util.Collections;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import de.l3s.boilerpipe.BoilerpipeProcessingException;
import de.l3s.boilerpipe.extractors.DefaultExtractor;

@SuppressWarnings("rawtypes")
public class KeywordExtractFunction extends BaseOperation 
    implements Function {

  private static final long serialVersionUID = -7122434545764806604L;
  private static final Logger LOGGER = 
    LoggerFactory.getLogger(KeywordExtractFunction.class);

  public KeywordExtractFunction(Fields fields) {
    super(2, fields);
  }
  
  @Override
  public void operate(FlowProcess flowProcess, FunctionCall funCall) {
    TupleEntry args = funCall.getArguments();
    String url = args.getString(1);
    String rawText = download(url);
    String plainText = parse(rawText);
    List<String> keywords = extractKeywords(plainText);
    for (String keyword : keywords) {
      Tuple t = new Tuple();
      t.add(keyword);
      funCall.getOutputCollector().add(t);
    }
  }

  protected String download(String url) {
    try {
      URL u = new URL(url);
      u.openConnection();
      InputStream istream = u.openStream();
      StringBuilder buf = new StringBuilder();
      byte[] b = new byte[1024];
      int bytesRead = 0;
      while ((bytesRead = istream.read(b)) > 0) {
        buf.append(new String(b, 0, bytesRead));
        b = new byte[1024];
      }
      istream.close();
      return buf.toString();
    } catch (Exception e) {
      LOGGER.error(e.getMessage(), e);
      return null;
    }
  }

  protected String parse(String rawText) {
    if (StringUtils.isEmpty(rawText)) return null;
    else {
      try {
        return DefaultExtractor.INSTANCE.getText(rawText);
      } catch (BoilerpipeProcessingException e) {
        LOGGER.error(e.getMessage(), e);
        return null;
      }
    }
  }

  protected List<String> extractKeywords(String plainText) {
    try {
      return RakeExtractor.INSTANCE.extract(plainText);
    } catch (Exception e) {
      LOGGER.error(e.getMessage(), e);
      return Collections.emptyList();
    }
  }
}

The other custom function is the UrlExtractFunction, which takes each keyword and hands it off to Google's Custom Search API, and returns the URLs of the top 10 search results returned. The Custom Search instance you set up can be customized to only allow results from a list of websites (or the entire web). The KEY and CX values are parameters that identify your client to the Google Search API, and you will need to populate a file with these values in src/main/resources/google.lic (the one in GitHub has placeholders).

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
// Source: src/main/java/com/mycompany/newsclip/UrlExtractFunction.java
package com.mycompany.newsclip;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;

/**
 * Function to take a keyword and use Google's custom search
 * service to retrieve the top 10 URLs.
 */
@SuppressWarnings("rawtypes")
public class UrlExtractFunction extends BaseOperation implements Function {

  private static final long serialVersionUID = 1622228905563317614L;
  private static final Logger LOGGER = 
    LoggerFactory.getLogger(UrlExtractFunction.class);
  
  private static final String CUSTOM_SEARCH_URL_TEMPLATE =
    "https://www.googleapis.com/customsearch/v1?key={KEY}&cx={CX}&q={Q}";
  private String key;
  private String cx;
  private ObjectMapper objectMapper;
  
  public UrlExtractFunction(Fields fields) {
    super(2, fields);
    Properties props = new Properties();
    try {
      props.load(new FileInputStream("src/main/resources/google.lic"));
    } catch (IOException e) {
      LOGGER.error(e.getMessage(), e);
    }
    key = props.getProperty("key");
    cx = props.getProperty("cx");
    objectMapper = new ObjectMapper();
  }
  
  @Override
  public void operate(FlowProcess flowProcess, FunctionCall funCall) {
    TupleEntry args = funCall.getArguments();
    String keyword = args.getString(0);
    List<String> urls = parseSearchResult(keyword);
    for (String url : urls) {
      Tuple t = new Tuple();
      t.add(url);
      funCall.getOutputCollector().add(t);
    }
  }

  protected List<String> parseSearchResult(String keyword) {
    try {
      String url = CUSTOM_SEARCH_URL_TEMPLATE.
        replaceAll("{KEY}", key).
        replaceAll("{CX}", cx).
        replaceAll("{Q}", URLEncoder.encode(keyword, "UTF-8"));
      URL u = new URL(url);
      u.openConnection();
      InputStream istream = u.openStream();
      StringBuilder buf = new StringBuilder();
      byte[] b = new byte[1024];
      int bytesRead = 0;
      while ((bytesRead = istream.read(b)) > 0) {
        buf.append(new String(b, 0, bytesRead));
        b = new byte[1024];
      }
      istream.close();
      return parseJson(buf.toString());
    } catch (Exception e) {
      LOGGER.error(e.getMessage(), e);
      return Collections.emptyList();
    }
  }

  protected List<String> parseJson(String json) throws Exception {
    List<String> urls = new ArrayList<String>();
    JsonParser parser = objectMapper.getJsonFactory().
      createJsonParser(json);
    JsonNode root = objectMapper.readTree(parser);
    ArrayNode items = (ArrayNode) root.get("items");
    for (JsonNode item : items) {
      urls.add(item.get("link").getTextValue());
    }
    return urls;
  }
}

And thats pretty much it. Put the list of your "interesting pages", one per line, into data/urls.txt, and run the Cascading job locally using the mvn exec:java command, as shown below. The output of the job is written to data/new_urls.txt. The new data can be used to feed back URLs into the original list (perhaps after some sort of manual vetting by the researcher).

1
2
3
sujit@cyclone:cascading-newsclip$ mvn exec:java \
  -Dexec.mainClass="com.mycompany.newsclip.Main" \
  -Dexec.args="data/urls.txt data/new_urls.txt"

As you can see from the diagram, the Cascading code is running 11 Hadoop Map-Reduce jobs in sequence. This translates to a lot of Hadoop code. So Cascading, like Pig, is a huge time saver. Pig does allow Java UDFs, but I think Cascading's all-Java approach is easier to work with.

[*] Update 2013-04-16: I came across Tommy Chheng's post where he shows how to write Cascading code in Scala on this GitHub page. So great news, it appears that it may be possible to do this after all :-).

2 comments (moderated to prevent spam):

Unknown said...

First of all Thanks for all your articles .These areticles are really helpful for student like me who are working under this area.

Currently, I am working on Classification using Vector Space. Would you tell me how to generate the Training data from a set of text files. I have found no much information related to this. So Please tell me how to generate training Data for classification.

In your work, The training data in training document "Sugar-Coffee-Cocoa-Docs" is in given below format,

0: sugar: -- Document Separator -- reut2-021.sgm
Would you tell me how to generate the training Data in above format?

Sujit Pal said...

Hi Siva, you are welcome and thanks for the kind words. To answer your question, depending on your use-case, making your own training data can be quite an undertaking. In general a classification problem is given a set of X,y pairs (where X is a nSamples by nFeatures vector and y is a column vector of size nSamples) as training data, generate a model that will predict y' values for a new X' (same nFeatures as X).

Based on your question, I am guessing you are mainly interested in text classification into genres. Datasets are available from the internet where the y values (the genre) are already set. I got the cocoa-sugar-coffee dataset from Dr Manu Konchady's textmine (textmine.sf.net) project. There are other sources such as UCI Machine Learning Repository, Reuters, 20-newsgroups, GENIA, etc (I was recently looking at the LingPipe tutorial, and there are some links in there also).

OTOH, if you are doing work on some new data, say classifying Bengali novels (for lack of a better example) into a fixed set of genres, then you will probably have to figure out the y yourself (from reading the novels and deciding for yourself). For the X vector, you can tokenize the text and convert into a TD matrix and transpose it to get X. You can also add other features, perhaps author name, year of publication, etc as additional features.

Anyway, hopefully I have answered your question?