Saturday, June 30, 2012

On CAP theorem

From: http://www.cloudera.com/blog/2010/04/cap-confusion-problems-with-partition-tolerance/

....partition tolerance is not something we have a choice about designing into our systems. If you have a partition in your network, you lose either consistency (because you allow updates to both sides of the partition) or you lose availability (because you detect the error and shutdown the system until the error condition is resolved). Partition tolerance means simply developing a coping strategy by choosing which of the other system properties to drop. This is the real lesson of the CAP theorem – if you have a network that may drop messages, then you cannot have both availability and consistency, you must choose one. We should really be writing Possibility of Network Partitions => not(availability and consistency), but that’s not nearly so snappy.

Thursday, June 14, 2012

Perpetual Elephant: Hadoop stream processing.

Twitter Storm is great! However, a lot of shops have already invested heavily into Hadoop infrastructure and if Hadoop could be used for processing streams, one ideally could use the same code base both for high latency and low latency jobs. The trivial solution to this, of running small hadoop batches with high turnaround, doesn't fly since Hadoop job creation/cleanup and resource acquisition takes a lot of time. A rule of a thumb is that the cost is prohibitive for jobs runnig under 10 minutes.

The other solution is to run jobs continuously. The problem here is that the reducers will not kick off before the mappers are complete. However, for data filtering/transformation tasks the reducers are not generally used, and so a partial solution could focus on mappers only. The mappers (a) can be run perpetually by feeding it data non-stop and (b) we can collect the output of mappers while they run.

 To solve (a) we need to do two things - create a custom InputSplit and InputFormat.

A short refresher: InputSplit is the view of the raw data, where Hadoop splits input files by some predefined block size and feeds it to mappers. RecordReader is transforms raw data to records. InputFormat connects RecordReader and InputSplit.  Mappers run as long as InputFormat  returns key-value pairs on next() call. The number of mappers depends  on the number InputSplits returned from  getInputSplits() of InputFormat. A mapper completion rate is decided based on getProgress() of the RecordReader  and the ratio of current position  to the length of the split.    

In order to  run the mappers continuously, one can just create a custom RecordReader, peg the getProgress and getPos  to some constant and keep returning values  from next(), and return 1 from getPos() when done processing.  This will keep running mappers perpetually.  The next() function is where data acquisition will happen, http requests, socket operations or whatever. If your data center allows outgoing connections from the grid boxes(mine doesn't, eh) you can crawl the web or read from Twitter Firehose.

A fun side effect of the value returned from getProgress is that Map/Reduce framework launches speculative tasks if the progress of a certain mapper falls behind the average of all mappers assigned to the job by a certain percent. By suddenly increasing the progress of the majority mappers, one can force Hadoop to run new mappers. This way one can scale the job when necessary.

The mappers will process the input and write to temporary directory on the HDFS, from where the files will be copied to final location once the processing is complete. Since in our case mappers never finish, we need to commit the files ourselves.

To collect output of a running mapper one has to write a custom OutputFormat, RecordWriter and OutputCommitter.  The general strategy is that the recordWriter will wrap around an existing RecordWriter(say SequenceFileOutputFormat.RecordWriter), rollover the record writer by closing an instance of RecordWriter and starting new one, and calling the OutputCommiter to copy the output to the final location. Alternatively one can just write to a downstream DB(if it can handle the volume).

One caveat is that by  writing  a lot of small files one can hit the limit of files on hdfs, but that will be cured by HDFS Federation in Hadoop.NEXT and higher (and maybe I will write some notes on migrating from hadoop.0.20 to hadoop.NEXT).

Voila.

P.S. I will share the code once I have something resembling a library, for now its just an overview of a hack. Implement on your own risk and don't spoil it for everyone by abusing EC2.