Massoud Mazar

Sharing The Knowledge

NAVIGATION - SEARCH

Custom Hadoop RecordReader to read JSON with no line breaks

This past week I had to deal with loading few terra bytes of data into our Spark cluster. This data is stored in a JSON array, and there is no line break to separate individual JSON objects. Spark can easily deal with JSON, but your JSON must be one object per line. I had to write a custom Hadoop RecordReader to work around this issue.

newAPIHadoopFile

It is easy to load a file in Spark with any line format. Here is how I ended up doing it:

import org.apache.hadoop.conf.Configuration
import mojio.spark.TripJsonFileInputFormat

val conf = new Configuration(sc.hadoopConfiguration)
val rddraw = sc.newAPIHadoopFile("path to my JSON file", classOf[TripJsonFileInputFormat],
 classOf[String], classOf[String], conf)

As you can see above, I had to define a new FileInputFormat which outputs a String key and String value.

FileInputFormat

Only thing this custom FileInputFormat does is to implement the createRecordReader method to load my custom RecordReader:

import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

class TripJsonFileInputFormat  extends FileInputFormat[String, String] {
  override def createRecordReader(split: InputSplit, context: TaskAttemptContext):
      RecordReader[String, String] = new TripJsonRecordReader()
}

TripJsonRecordReader

Record Reader is where the bulk of the operations happens. It is responsible to process the InputSplit (chunk of input). In summary, it opens a stream to the file, seeks to the start of the split and reads key/value pairs until it reaches the end of the split. In my case, I could rely on the fact that my JSON objects always started with a special property (Type:Trip).

import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import scala.io.Source

class TripJsonRecordReader() extends RecordReader[String, String] {

  var start, end, pos = 0L
  var reader: Source = _
  var key: String = _
  var value: String = _
  var objBuffer = new StringBuilder

  val pattern = "regex to find the unique id".r

  override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
    // find start and end of the split
    val split = inputSplit.asInstanceOf[FileSplit]
    start = split.getStart
    pos = start
    end = start + split.getLength

    // open a stream to the data, pointing to the start of the split
    val stream = split.getPath
      .getFileSystem(context.getConfiguration)
      .open(split.getPath)

    stream.seek(start)
    reader = Source.fromInputStream(stream, "utf-8")
  }

  override def nextKeyValue(): Boolean = {
    var objStart, objEnd = -1

    objStart = findNextObjectStart(0)
    if (objStart != -1) {
      objEnd = findNextObjectStart(objStart + 1)
    }

    if (objEnd == -1) {
      // We've reached end of Split
      key = null
      value = null
      false

    } else {
      // Value is the JSON string we found
      value = objBuffer.substring(objStart, objEnd - 1)

      // Trip Id is the key
      key = pattern.findFirstMatchIn(value).map(_.group(1)).getOrElse("id not found")

      // remove this object from the buffer
      objBuffer = objBuffer.drop(objEnd - 1)

      // Tell Hadoop a new line has been found
      true
    }
  }

  override def getCurrentKey(): String = {
    key
  }

  override def getCurrentValue(): String = {
    value
  }

  override def getProgress(): Float = {
    if (start == end) {
      0.0f
    } else {
      Math.min(1.0f, (pos - start) / (end - start).toFloat)
    }
  }

  override def close(): Unit = {
    if (reader != null) {
      reader.close()
    }
  }

  def loadNextChunk() : Unit = {
    // read a chunk
    val chars = reader.take(8192).mkString
    // move pointer
    pos += chars.getBytes().length
    // append characters to string buffer
    objBuffer.append(chars)
  }

  def findNextObjectStart(after: Int) : Int = {

    while (true) {
      // find start of the Trip object
      val found = objBuffer.indexOf("""{"Type":"trip",""", after)
      if (found != -1) {
        return found
      }
      if (pos >= end) {
        val len = objBuffer.toString().length
        if (len < 15)
          return -1
        else
          return len
      }
      // not found, load another chunk
      loadNextChunk()
    }

    -1
  }

}

 

Add comment