Tuesday, September 17, 2013

Driving a freight truck on ice:Practice of big data engineering.

Developing for big data systems is like driving a freight truck full of eggs on ice. It is different from driving a sedan as in enterprise systems, or a race car as in trading, or submarine rocket airplane that all machine learning and other voodoo is. The freight truck has large inertia, the ice is thin and the eggs are fragile.

The truck

My experience with  interviewing for data processing/ETL jobs is that it is focused more on certain tricks of the trade that are well known and expected by both sides. Classic interview questions try to assess the knowledge of algorithms and try to develop into the classic scenarios of when data does not fit into memory, or when a systems go down. Not only interviewing, but most of the art is focused on these problems of limited memory, system partitioning, and the overall operation in an unreliable environment. This leads to the expectation that most of the time data engineers spend thinking about the scale of the problem. We like to measure success in the PBs in the data warehouses and the TB/day of the throughput. However solutions of these sort of problems are a part of the engineering lore and we know how to design for them.

The day to day thought sink is not the scale, its the inertia of the system. The only way to deal with the inertia is compatibility, making sure that nothing disrupts the delicate movement of the truck on ice. Since it is expensive to stop and start a production system, we need to be able to take down and bring up pieces of system, while keeping the integrity of the system.

The questions I find myself asking when coding are:
- if we write data in a new way, will the old code be able to parse it and continue operating (backwards compatibility)
- will the new code be able to understand the old data (forward compatibility)
- if half of the system is running the new code and the other half the old code, will anything break?

The last one is very easy to forget about. With modern deployment tools and operational maturity it's easy to regard deployment as atomic - bam and the system is running the new code. But there is always a window of time when the system is in an intermediate state when things go wrong, and it is essential to keep these scenarios in mind.

Keeping these and other scenarios in mind is mentally challenging and leads to a paranoia or worse - the fall into the sin of sloppiness. As usual in software development good design helps - the more decoupled are the subsystems and the stronger are APIs on the edges, the easier it is to concentrate on the system in development and assure its viability. Also, it helps to roll out pieces one by one and test in the intermediate steps - keeping track of the errors and issues that come up.  Good development procedures, processes, QA and operations support also help, but never replace the rigorous design and thought.

Another helpful thing are the breaks. Any feature should have a way to rollback its effects, either through configuration or system settings.The other piece is the method or reprocessing if corruption occurs, which should be tested and ready before the deployment.

The ice 

I often forget that the cloud is not made of water vapor, but is actual hardware made of silicon and metal which is connected to power grids, cooling systems, network hardware, and lives in a cage under a roof. Map slots are not abstract numbers on the Hadoop job tracker. Hardware breaks, machines are taken out and brought back to the grid. While in general its is safe to assume that the environment is consistent and uniform, and leave it to the underlying systems to handle the hard lifting, it helps to have the knowledge of the ops processes in mind when debugging. I had a case when a machine came back into the rack with older version of software, manifesting a problem with occasional falls on some of the mappers. It helps to keep track of the machine names where failures occur and keep in touch with the operations crew - they live the zeitgeist of the platform.

The eggs

The data, being the fragile and perishable are the metaphorical eggs. in a large scale data system, there will always be some data rows that will break the system, and finding these rows is a painful exercise. The key here is instrumentation and defensive programming. Ideally, every field read from the database should be sanitized and checked, every non-conforming bit should be logged and accounted for. However this leads to a significant overhead in production, and with that its always good to have multiple run modes for the job with varying level of verbosity.

Another problem with data is its persistent nature - once its corrupted, its hard to reverse. In most of the cases reversal is either expensive, requiring long reprocessing, or outright impossible. The general way of backups do not apply - where exactly are you going to backup petabytes of data to. The practice is to deploy changes gradually, making sure that the data stays consistent. The most important tool in this process is the diffing process, which allows to diff data in multiple locations.


The eggs truck on ice

The most important is - have fun driving.

Saturday, June 30, 2012

On CAP theorem

From: http://www.cloudera.com/blog/2010/04/cap-confusion-problems-with-partition-tolerance/

....partition tolerance is not something we have a choice about designing into our systems. If you have a partition in your network, you lose either consistency (because you allow updates to both sides of the partition) or you lose availability (because you detect the error and shutdown the system until the error condition is resolved). Partition tolerance means simply developing a coping strategy by choosing which of the other system properties to drop. This is the real lesson of the CAP theorem – if you have a network that may drop messages, then you cannot have both availability and consistency, you must choose one. We should really be writing Possibility of Network Partitions => not(availability and consistency), but that’s not nearly so snappy.

Thursday, June 14, 2012

Perpetual Elephant: Hadoop stream processing.

Twitter Storm is great! However, a lot of shops have already invested heavily into Hadoop infrastructure and if Hadoop could be used for processing streams, one ideally could use the same code base both for high latency and low latency jobs. The trivial solution to this, of running small hadoop batches with high turnaround, doesn't fly since Hadoop job creation/cleanup and resource acquisition takes a lot of time. A rule of a thumb is that the cost is prohibitive for jobs runnig under 10 minutes.

The other solution is to run jobs continuously. The problem here is that the reducers will not kick off before the mappers are complete. However, for data filtering/transformation tasks the reducers are not generally used, and so a partial solution could focus on mappers only. The mappers (a) can be run perpetually by feeding it data non-stop and (b) we can collect the output of mappers while they run.

 To solve (a) we need to do two things - create a custom InputSplit and InputFormat.

A short refresher: InputSplit is the view of the raw data, where Hadoop splits input files by some predefined block size and feeds it to mappers. RecordReader is transforms raw data to records. InputFormat connects RecordReader and InputSplit.  Mappers run as long as InputFormat  returns key-value pairs on next() call. The number of mappers depends  on the number InputSplits returned from  getInputSplits() of InputFormat. A mapper completion rate is decided based on getProgress() of the RecordReader  and the ratio of current position  to the length of the split.    

In order to  run the mappers continuously, one can just create a custom RecordReader, peg the getProgress and getPos  to some constant and keep returning values  from next(), and return 1 from getPos() when done processing.  This will keep running mappers perpetually.  The next() function is where data acquisition will happen, http requests, socket operations or whatever. If your data center allows outgoing connections from the grid boxes(mine doesn't, eh) you can crawl the web or read from Twitter Firehose.

A fun side effect of the value returned from getProgress is that Map/Reduce framework launches speculative tasks if the progress of a certain mapper falls behind the average of all mappers assigned to the job by a certain percent. By suddenly increasing the progress of the majority mappers, one can force Hadoop to run new mappers. This way one can scale the job when necessary.

The mappers will process the input and write to temporary directory on the HDFS, from where the files will be copied to final location once the processing is complete. Since in our case mappers never finish, we need to commit the files ourselves.

To collect output of a running mapper one has to write a custom OutputFormat, RecordWriter and OutputCommitter.  The general strategy is that the recordWriter will wrap around an existing RecordWriter(say SequenceFileOutputFormat.RecordWriter), rollover the record writer by closing an instance of RecordWriter and starting new one, and calling the OutputCommiter to copy the output to the final location. Alternatively one can just write to a downstream DB(if it can handle the volume).

One caveat is that by  writing  a lot of small files one can hit the limit of files on hdfs, but that will be cured by HDFS Federation in Hadoop.NEXT and higher (and maybe I will write some notes on migrating from hadoop.0.20 to hadoop.NEXT).

Voila.

P.S. I will share the code once I have something resembling a library, for now its just an overview of a hack. Implement on your own risk and don't spoil it for everyone by abusing EC2.



Tuesday, December 20, 2011

Simple things in PIG

A trivial task in PIG, which comes up a lot when doing sanity check on mapreduce program output is to count the number of rows in outputs and inputs. For those who are used to sql count, the way it is done in PIG is not intuitive. The reason is that COUNT in pig is used to count not number of rows, but number of tuples in a record, so an extra grouping is needed.

DATASET = LOAD .....
G = group DATASET all
C = foreach G GENERATE COUNT(DATASET);
DUMP C;

Monday, November 14, 2011

Hadoop Speculative Execution

One thing that seems easy to grasp in Hadoop is the concept of speculative execution. Sounds trivial - if I have a task and am not so sure of the environment, please go ahead dear yellow elephant, schedule some more of the task, and wait which one will finish first. One task here or one task there does not matter, and dammit, I want my results NOW.

But how does the little yellow fellow know what is slow and when to schedule? When running large tasks looks like it runs some speculative tasks anyway, and many get the idea that there is some magic number of tasks launched for each job, but this would be way too expensive if the number of mappers/reducers is large. Therefore Hadoop goes the semi-smart way:

The current algorithm works the following way:
A speculative task is kicked off for mappers and reducers that have completion rate under certain pecentage of the completion rate of the majority of running tasks. For example if you have 100 mappers, 90 of which are at 80% completion, and 10 are at 20%, then hadoop will start 10 addittional tasks for the slower ones.

in versions of hadoop over 0.20.2 there are 3 new fields in the jobconf
  • mapreduce.job.speculative.speculativecap
  • mapreduce.job.speculative.slowtaskthreshold
  • mapreduce.job.speculative.slownodethreshold
Hadoop launches a speculative task for a regular task if completion < slowtaskthrehold*mean(
completion of all other tasks) and the number of speculative tasks launched< speculativecap.

In older versions of Hadoop these threshold values are fixed and cannot be modified.


Wednesday, April 6, 2011

bash-fu: seq

so on my local cluster I have 9 nodes, and every now and then I need to clean up the mess that my SGE jobs create, or collect the logs or whatsoever. So I want to ssh and execute the same command on each machine. the node names are 'compute-0-[0-9]+'.

enter seq. seq will generate a sequence of numbers in the given range. Using the xargs, I write
seq 1 9 | xargs -L1 -Ix sh -c "ssh compute-0-x ' do something awesome'".

magic is easy :)