The other solution is to run jobs continuously. The problem here is that the reducers will not kick off before the mappers are complete. However, for data filtering/transformation tasks the reducers are not generally used, and so a partial solution could focus on mappers only. The mappers (a) can be run perpetually by feeding it data non-stop and (b) we can collect the output of mappers while they run.
To solve (a) we need to do two things - create a custom InputSplit and InputFormat.
A short refresher: InputSplit is the view of the raw data, where Hadoop splits input files by some predefined block size and feeds it to mappers. RecordReader is transforms raw data to records. InputFormat connects RecordReader and InputSplit. Mappers run as long as InputFormat returns key-value pairs on next() call. The number of mappers depends on the number InputSplits returned from getInputSplits() of InputFormat. A mapper completion rate is decided based on getProgress() of the RecordReader and the ratio of current position to the length of the split.
In order to run the mappers continuously, one can just create a custom RecordReader, peg the getProgress and getPos to some constant and keep returning values from next(), and return 1 from getPos() when done processing. This will keep running mappers perpetually. The next() function is where data acquisition will happen, http requests, socket operations or whatever. If your data center allows outgoing connections from the grid boxes(mine doesn't, eh) you can crawl the web or read from Twitter Firehose.
A fun side effect of the value returned from getProgress is that Map/Reduce framework launches speculative tasks if the progress of a certain mapper falls behind the average of all mappers assigned to the job by a certain percent. By suddenly increasing the progress of the majority mappers, one can force Hadoop to run new mappers. This way one can scale the job when necessary.
The mappers will process the input and write to temporary directory on the HDFS, from where the files will be copied to final location once the processing is complete. Since in our case mappers never finish, we need to commit the files ourselves.
To collect output of a running mapper one has to write a custom OutputFormat, RecordWriter and OutputCommitter. The general strategy is that the recordWriter will wrap around an existing RecordWriter(say
One caveat is that by writing a lot of small files one can hit the limit of files on hdfs, but that will be cured by HDFS Federation in Hadoop.NEXT and higher (and maybe I will write some notes on migrating from hadoop.0.20 to hadoop.NEXT).
Voila.
P.S. I will share the code once I have something resembling a library, for now its just an overview of a hack. Implement on your own risk and don't spoil it for everyone by abusing EC2.
To solve (a) we need to do two things - create a custom InputSplit and InputFormat.
A short refresher: InputSplit is the view of the raw data, where Hadoop splits input files by some predefined block size and feeds it to mappers. RecordReader is transforms raw data to records. InputFormat connects RecordReader and InputSplit. Mappers run as long as InputFormat returns key-value pairs on next() call. The number of mappers depends on the number InputSplits returned from getInputSplits() of InputFormat. A mapper completion rate is decided based on getProgress() of the RecordReader and the ratio of current position to the length of the split.
In order to run the mappers continuously, one can just create a custom RecordReader, peg the getProgress and getPos to some constant and keep returning values from next(), and return 1 from getPos() when done processing. This will keep running mappers perpetually. The next() function is where data acquisition will happen, http requests, socket operations or whatever. If your data center allows outgoing connections from the grid boxes(mine doesn't, eh) you can crawl the web or read from Twitter Firehose.
A fun side effect of the value returned from getProgress is that Map/Reduce framework launches speculative tasks if the progress of a certain mapper falls behind the average of all mappers assigned to the job by a certain percent. By suddenly increasing the progress of the majority mappers, one can force Hadoop to run new mappers. This way one can scale the job when necessary.
The mappers will process the input and write to temporary directory on the HDFS, from where the files will be copied to final location once the processing is complete. Since in our case mappers never finish, we need to commit the files ourselves.
To collect output of a running mapper one has to write a custom OutputFormat, RecordWriter and OutputCommitter. The general strategy is that the recordWriter will wrap around an existing RecordWriter(say
SequenceFileOutputFormat.RecordWriter
), rollover the record writer by closing an instance of RecordWriter and starting new one, and calling the OutputCommiter to copy the output to the final location. Alternatively one can just write to a downstream DB(if it can handle the volume). One caveat is that by writing a lot of small files one can hit the limit of files on hdfs, but that will be cured by HDFS Federation in Hadoop.NEXT and higher (and maybe I will write some notes on migrating from hadoop.0.20 to hadoop.NEXT).
Voila.
P.S. I will share the code once I have something resembling a library, for now its just an overview of a hack. Implement on your own risk and don't spoil it for everyone by abusing EC2.
No comments:
Post a Comment