Saturday, May 10, 2014

Preprocessing data with Scalding and Amazon EMR


Lately, I have been trying to do some data analysis against (anonymized) Medicare/Mediclaim data. The data contains member information (gender, race, data of birth, chronic conditions, etc) for 6.6M patients, and inpatient and outpatient claims (1.3M and 15.8M respectively) for the period 2008-2010. I started out using Pandas to do some basic analysis - nothing fancy, basically finding distribution of various chronic conditions by race and sex (I figured age wouldn't be as interesting, since the age range is quite narrow in this case), correlations between different chronic conditions, etc. You can see the IPython notebook containing this analysis here.

While analyzing the inpatient claims data, I wondered about the possiblity of building a model to predict the probability of chronic disease given the claim codes for a patient. Pandas over IPython was okay for doing analysis with a subset (single file) of data, but got a bit irritating with the full dataset because of frequent hangs and subsequent IPython server restarts. So I began to consider using Hadoop, preferably over Amazon EMR - and Scalding was similar enough to Pandas to make this the obvious choice (obvious for me, at least - I prefer Scalding over Pig, and I haven't used (S)Crunch or Scoobi).

The last time I used Scalding was when I wrote the Scalding for the Impatient series (a Scalding version of Paco Nathan's Cascading for the Impatient series) as a way to learn Scala. Since then I have used Cascading (Scalding's clunkier but richer Java predecessor), mainly because I couldn't run Scalding in anything other than local mode. When I looked this time I found this project template from the kind folks at Snowplow Analytics to run Scalding code on Amazon EMR, so that was no longer a problem.

This post describes a mixture of Python and Scala/Scalding code that I hooked up to convert the raw Benefits and Inpatient Claims data from the Medicare/Medicaid dataset into data for an X matrix and multiple y vectors, each y corresponding to a single chronic condition. Scalding purists would probably find this somewhat inelegant and prefer a complete Scalding end-to-end solution, but my Scalding-fu extends only so far - hopefully it will improve with practice.

Project Setup


The Snowplow example project is based on custom Scala code in the project/ directory - I'm used to simpler projects, with a build.sbt in the root directory. I tried this route for a bit but soon gave up because of the complexity involved. The example project was also built for Scala 2.9, and I use 2.10 (the Eclipse based Scala IDE is tied to a specific Scala version). The change also prompted changes to other versions hardcoded in the build files, as well as some minor code changes, all of which are described below. Alternatively, you can just grab the contents of my project directory on GitHub.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# Source: project/build.properties
updated sbt.version to 0.13.0

# project/plugins.sbt
* updated sbt-assembly from 0.8.5 to 0.10.1
* added sbteclipse-plugin 2.4.0

# project/ExampleScaldingProjectBuild.scala
* renamed this file and contained class to ProjectBuild.scala

# project/Dependencies.scala
* renamed ScalaToolsSnapshots alias to "Scalatools snapshots at Sonatype"
* updated specs2 to 1.13 per comment (not used in my case)

# project/BuildSettings.scala
* updated basicSettings
* fixed up compile failure caused by change in sbt-assembly API

The project also provides a JobRunner object that is used to call the selected Job class from Hadoop. After testing that the WordCount example still worked on Amazon EMR with my changes, I moved the JobRunner.scala file to my own job package and removed the package for the Word Count job.

Data Preparation


The Benefit summary and the inpatient claims data consist of 58 and 19 files respectively. I had initially thought that Scalding would provide some Source abstraction that could read off a directory, but I couldn't find one in the examples. So I wrote this Python snippet to concatenate them into 2 large files.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Source: src/main/python/catfiles.py
import os
import sys

def usage():
  print "Usage: %s indir outfile skipheader" % (sys.argv[0])
  print "where:"
  print "indir - input directory of files to concat"
  print "outfile - outfile file to concat to"
  print "skipheader - true if header to be skipped else false"
  sys.exit(-1)

def main():
  if len(sys.argv) != 4 \
     or not os.path.isdir(sys.argv[1]) \
     or sys.argv[3] not in ["true", "false"]:
    usage()
  fout = open(sys.argv[2], 'wb')
  for fn in os.listdir(sys.argv[1]):
    print "Now processing: %s" % (fn)
    fin = open(os.path.join(sys.argv[1], fn), 'rb')
    should_skip_line = sys.argv[3] == "true"
    for line in fin:
      if should_skip_line: 
        should_skip_line = False
        continue
      fout.write(line)
    fin.close()
  fout.close()

if __name__ == "__main__":
  main()

Data Preprocessing Phase I


The first phase consists in creating three views of the claims data. The input data consists of 45 odd columns for different medical codes per claim record. The code data is sparse, ie, each claim would have only a few of these columns filled out. This phase reads the claims data and normalizes it into (member_id, code_type:code_value, number_of_claims) triples. From this normalized data, we also extract the unique code_type:code_value pairs (hereafter referred to as code_id) and the unique member_ids. Scalding code for that is shown below:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// Source: src/main/scala/com/mycompany/cmspp/PreprocJob1.scala
package com.mycompany.cmspp

import scala.io.Source

import com.twitter.scalding.Args
import com.twitter.scalding.Csv
import com.twitter.scalding.Job
import com.twitter.scalding.RichPipe
import com.twitter.scalding.TextLine

class PreprocJob1(args: Args) extends Job(args) {

  def normalizeClaims(line: String): List[(String,String)] = {
    val inputColnames = Schemas.InpatientClaims.map(
      sym => sym.name)
    val outputColnames = Schemas.Codes.map(sym => sym.name)
    val ocolset = outputColnames.toSet
    val colvals = line.split(",")
    val memberId = colvals.head
    inputColnames.zip(colvals)
      .filter(nv => ocolset.contains(nv._1))
      .filter(nv => (! nv._2.isEmpty()))
      .map(nv => nv._1.split("_").head + ":" + nv._2)
      .map(code => (memberId, code))
  }
  
  val claims = TextLine(args("claims"))
    .flatMap(('line) -> ('DESYNPUF_ID, 'CLAIM_CODE)) {
      line: String => normalizeClaims(line)
    }
    .project(('DESYNPUF_ID, 'CLAIM_CODE))
    .groupBy(('DESYNPUF_ID, 'CLAIM_CODE)) { 
      grp => grp.size('NUM_CLAIMS) 
    }

  val members = RichPipe(claims)
    .project('DESYNPUF_ID)
    .unique('DESYNPUF_ID)
    .write(Csv(args("members")))

  val codes = RichPipe(claims)
    .project('CLAIM_CODE)
    .unique('CLAIM_CODE)
    .write(Csv(args("codes")))
    
  claims.write(Csv(args("output")))
}

object PreprocJob1 {
  def main(args: Array[String]): Unit = {
    // input files
    val claims = "data/inpatient_claims.csv"
    // output files
    val members = "data/members_list.csv"
    val codes = "data/codes_list.csv"
    val output = "data/claim_triples.csv"
    (new PreprocJob1(Args(List(
        "--local", "", 
        "--claims", claims,
        "--members", members,
        "--codes", codes,
        "--output", output)))
    ).run
    Console.println("==== members_list ====")
    Source.fromFile(members).getLines().slice(0, 3)
      .foreach(Console.println(_))
    Console.println("==== codes_list ====")
    Source.fromFile(codes).getLines().slice(0, 3)
      .foreach(Console.println(_))
    Console.println("==== claim_triples ====")
    Source.fromFile(output).getLines().slice(0, 3)
      .foreach(Console.println(_))
  }
}

To run this on Amazon EMR, comment out the companion object that is used for local testing, then build the fat JAR with "sbt assembly", then upload the JAR in target/scala-2.10 and the data files to S3. To launch the job in EMR, use the following parameters:

1
2
3
4
5
6
7
8
Job JAR: s3://${BUCKET}/cmspp/scalding-meddata-preproc-0.1.0.jar
Job Arguments:
    com.mycompany.cmspp.PreprocJob1 \
    --hdfs \
    --claims s3://${BUCKET}/cmspp/claims/inpatient_claims.csv \
    --members s3://${BUCKET}/cmspp/members \
    --codes s3://${BUCKET}/cmspp/codes \
    --output s3://${BUCKET}/cmspp/triples

The first 3 lines of outputs for this phase (generated using a local run against a truncated dataset) are shown below.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
==== members_list ====
0001448457F2ED81
000188A3402777A5
0001EB1229306825
==== codes_list ====
CLM:202
CLM:203
CLM:206
==== claim_triples ====
0001448457F2ED81,CLM:217,1
0001448457F2ED81,CLM:460,1
0001448457F2ED81,CLM:881,1

Assigning Numeric IDs


For the two unique lists of member_ids and code_ids, we assign a sequential value so we can convert the claim triples data into a sparse matrix. Since Scalding assumes a distributed system, assigning a unique serial number is a hard thing to do - there are ways to do it with the groupAll() method which forces using a single reducer, but I couldn't make it work. Of course, this is trivial to do in a non-distributed environment. The following Python code is run against the member_id list and code_id list to produce "dictionaries" of (member_id, int) and (code_id, int) pairs.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Source: src/main/python/add_lineno.py
import os
import sys

def usage():
  print "Usage: %s infile outfile" % (sys.argv[0])
  print "where:"
  print "infile - input CSV file without line number"
  print "outfile - output CSV file with line number as first col"
  sys.exit(-1)

def main():
  if len(sys.argv) != 3 \
      or not os.path.isfile(sys.argv[1]):
    usage()
  fin = open(sys.argv[1], 'rb')
  fout = open(sys.argv[2], 'wb')
  lno = 0
  for line in fin:
    fout.write("%d,%s" % (lno, line))
    lno += 1
  fin.close()
  fout.close()

if __name__ == "__main__":
  main()

Data Preprocessing Phase II


This phase reads the dictionaries generated by the Python code above and the claims triples data to produce an X matrix and a set of y vectors (one for each chronic condition). The X matrix is L2-normalized and in sparse format. Here is the Scalding code to do this transformation.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Source: src/main/scala/com/mycompany/cmspp/PreprocJob2.scala
package com.mycompany.cmspp

import scala.io.Source

import com.twitter.scalding.Args
import com.twitter.scalding.Csv
import com.twitter.scalding.Job
import com.twitter.scalding.mathematics.Matrix.pipeExtensions

class PreprocJob2(args: Args) extends Job(args) {

  val benefits = Csv(args("benefits"), 
    fields=Schemas.BenefitSummary)
  val claims = Csv(args("claims"), 
    fields=List('DESYNPUF_ID, 'CLAIM_CODE, 'NUM_CLAIMS))
  val memberDict = Csv(args("members"),
    fields=List('MEM_IDX, 'DESYNPUF_ID))
  val codesDict = Csv(args("codes"), 
    fields=List('COD_IDX, 'CLAIM_CODE))
  
  claims.joinWithSmaller('DESYNPUF_ID -> 'DESYNPUF_ID, memberDict)
    .joinWithSmaller('CLAIM_CODE -> 'CLAIM_CODE, codesDict)
    .toMatrix[Long,Long,Double]('MEM_IDX, 'COD_IDX, 'NUM_CLAIMS)
    .rowL2Normalize
    .pipe
    .mapTo(('row, 'col, 'val) -> ('row, 'colval)) { 
      row: (Long, Long, Double) => 
        (row._1, row._2.toString + ":" + row._3.toString)
    }
    .groupBy('row) { grp => grp.mkString('colval, ",") }
    .write(Csv(args("xmatrix")))
    
  benefits.project(Schemas.Diseases)
    .joinWithSmaller('DESYNPUF_ID -> 'DESYNPUF_ID, memberDict)
    .discard('DESYNPUF_ID)
    .project('MEM_IDX :: Schemas.Diseases.tail)
    .write(Csv(args("yvectors")))
}

object PreprocJob2 {
  def main(args: Array[String]): Unit = {
    // input files
    val benefits = "data/benefit_summary.csv"
    val triples = "data/claim_triples.csv"
    val memberDict = "data/members_dict.csv"
    val codeDict = "data/codes_dict.csv"
    // output files
    val xmatrix = "data/x_matrix.csv"
    val yvectors = "data/y_vectors.csv"
    (new PreprocJob2(Args(List(
      "--local", "",
      "--benefits", benefits,
      "--claims", triples,
      "--members", memberDict,
      "--codes", codeDict,
      "--xmatrix", xmatrix,
      "--yvectors", yvectors)))
    ).run
    Console.println("==== xmatrix.csv ====")
    Source.fromFile(xmatrix).getLines().slice(0, 3)
      .foreach(Console.println(_))
    Console.println("==== yvectors.csv ====")
    Source.fromFile(yvectors).getLines().slice(0, 3)
      .foreach(Console.println(_))
  }
}

The inputs to this phase are the original benefits file, the claims triple data generated in Phase I and the two dictionaries generated in the previous step. The fat JAR already contains the PreprocJob2, so we can just reuse the JAR from the previous step. Parameters to launch the job on Amazon EMR are shown below.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
Job JAR: s3://${BUCKET}/cmspp/scalding-meddata-preproc-0.1.0.jar
Job Arguments:
    com.mycompany.cmspp.PreprocJob2 \
    --hdfs \
    --benefits s3://${BUCKET}/cmspp/benefits/benefit_summary.csv \
    --claims s3://${BUCKET}/cmspp/triples/ \
    --members s3://${BUCKET}/cmspp/members_dict/members_dict.csv \ 
    --codes s3://${BUCKET}/cmspp/codes_dict/codes_dict.csv \
    --xmatrix s3://${BUCKET}/cmspp/xmatrix \
    --yvectors s3://${BUCKET}/cmspp/yvectors

Here is what the output data looks like. As you can see, this data can now be used to construct sparse or dense input matrices for training a classification or regression algorithm. Once again, to the question of elegance, I realize that the columns could have been sorted, and the values in the yvectors should have been {0,1} not {1,2}. But these are easy to handle in downstream programs.

1
2
3
4
5
6
7
8
==== xmatrix.csv ====
0,"69:0.15617376188860607,65:0.15617376188860607,..."
1,"70:0.16222142113076254,68:0.16222142113076254,..."
2,"52:0.3333333333333333,43:0.3333333333333333,..."
==== yvectors.csv ====
0,1,1,1,2,1,1,1,1,2,2,2
1,1,1,1,1,1,1,1,1,2,2,2
2,2,1,2,1,2,2,2,1,2,1,2

Thats all I have for today. I have wanted for a while to be able to build Scalding jobs that could be run on Amazon EMR (or a Hadoop cluster other than on my laptop), so this is quite big for me. I plan on using this data as input to some classification program - I will write about that if its interesting enough to share. All the code described here can be found in GitHub page for this project.

Be the first to comment. Comments are moderated to prevent spam.