Pages

10 December 2013

The revenge of the chunks

This series of posts feels like a whole saga for something which should have a quick an easy way to demonstrate the obvious superiority of functional programming over a simple loop. In the first post. Then the second post was about defining proper scalaz-stream combinators to do the same thing, and particularly how to "chunk" the processing in order to get good performances.

However as I was writing unit tests for my requirements I realized that the problem was harder than I thought. In particular, the files I'm processing can have several sections made of HEADERs and TRAILERs. When you create chunks of lines to process this results in a number of combinations that need to be analysed. A chunk can:

  • start with a HEADER but not finish with a TRAILER which is in another chunk
  • contain lines only
  • contains lines + a TRAILER + a new HEADER
  • and so on...

For each of these cases it is necessary to use the current state and the contents of the lines to determine if the file is malformed or not. This is a lot less easy that previously.

All the combinations

This is what I came up with:

  def process(path: String, targetName: String, chunkSize: Int = 10000): String \/ File = {

    val targetPath = path.replace(".DAT", "")+targetName

    val read = 
      linesRChunk(path, chunkSize) |> 
      validateLines.map(lines => lines.mkString("\n"))

    val task = 
      ((read |> process1.intersperse("\n") |> 
      process1.utf8Encode) to io.fileChunkW(targetPath)).run

    task.attemptRun.leftMap(_.getMessage).map(_ => new File(targetPath))
  }

  /**
   * validate that the lines have the right sequence of HEADER/column names/lines/TRAILER
   * and the right number of lines
   */
  def validateLines: Process1[Vector[String], Vector[String]] = {

    // feed lines into the lines parser with a given state
    // when it's done, follow by parsing with a new state
    def parse(lines: Vector[String], state: LineState, newState: LineState) =
      emit(lines) |> linesParser(state) fby linesParser(newState)

    // parse chunks of lines
    def linesParser(state: LineState): Process1[Vector[String], Vector[String]] = {
      receive1[Vector[String], Vector[String]] { case lines =>
        lines match {
          case first +: rest if isHeader(first) =>
            if (state.openedSection) fail("A trailer is missing")
            else
              parse(lines.drop(2),
                    state.open,
                    LineState(lines.count(isHeader) > lines.count(isTrailer), 
                              lines.drop(2).size))

          case first +: rest if isTrailer(first) =>
            val expected = "\\d+".r.findFirstIn(first).map(_.toInt).getOrElse(0)

            if (!state.openedSection)             
              fail("A header is missing")
            else if (state.lineCount != expected) 
              fail(s"expected $expected lines, got ${state.lineCount}")
            else {
              val dropped = lines.drop(1)
              parse(dropped,
                    state.restart,
                    LineState(dropped.count(isHeader) > dropped.count(isTrailer), 
                              dropped.size))
            }

          case first +: rest =>
            if (!state.openedSection) fail("A header is missing")
            else {
              val (first, rest) = lines.span(line => !isTrailer(line))
              emit(first) fby
              parse(rest, state.addLines(first.size), state.addLines(lines.size))
            }

          case Vector() => halt
        }
      }
    }
    // initialise the parsing expecting a HEADER
    linesParser(LineState())
  }


  private def fail(message: String) = Halt(new Exception(message))
  private def isHeader(line: String) = line.startsWith("HEADER|")
  private def isTrailer(line: String) = line.startsWith("TRAILER|")

The bulk of the code is the validateLines process which verifies the file structure:

  • if the first line of this chunk is a HEADER the next line needs to be skipped, we know we opened a new section, and we feed the rest to the lines parser again. However we fail the process if we were not expecting a HEADER there

  • if the first line of this chunk is a TRAILER we do something similar but we also check the expected number of lines

  • otherwise we try to emit as many lines as possible until the next HEADER or TRAILER and we recurse

This is a bit complex because we need to analyse the first element of the chunk, then emit the rest and calculate the new state we will have when this whole chunk is emitted. On the other hand the processor is easy to test because I don't have to read or write files to check it. This would be a bit more difficult to do with the loop version.

But unfortunately not all the tests are green. One is still not passing. What if there is no ending TRAILER in the file? How can I raise an exception? There's no process to run, because there are no more lines to process! My test is pending for now, and I'll post the solution once I have it (maybe there's a smarter way to rewrite all of this?).

Is it worth it?

This was definitely worth it for me in terms of learning the scalaz-stream library. However in terms of pure programmer "productivity", for this kind of requirements, it feels like an overkill. The imperative solution is very easy to come up with and there is no problems with performances. This should change once streaming parsing is available (see the roadmap). Probably this use case will just be expressed as a one-liner. In the light of this post I'm just curious how the implementation will deal with chunking.

1 comment:

David Barri said...

I've been meaning to look into scalaz-stream for a while now. It looks a bit verbose at the moment but promising, I'm looking forward to seeing it evolve. Awesome posts, keep 'em coming!