Dealing with Corrupt Files in Hadoop

As I've been working with Hadoop a lot in the last several months, I've come to realize that it doesn't deal gracefully with corrupt files (e.g., mal-formed gzip files). I would throw a cluster at a couple hundred thousand files (of which one or two were bad) and the job would die two hours into execution, throwing EOFException errors all over the place. If I was only processing one file, I suppose that's a reasonably acceptable outcome. But when 99.9% of your files are fine, and the corrupt ones aren't recoverable anyway, there's no sense in blowing up the whole job just because a trivial portion of the data was bad.

elephant loses balance

Hadoop's relatively unknown LostBalance exception.

Turns out, it's not too hard to catch those exceptions within a custom record reader, log a warning message, gracefully ignore the files in question, and go about your business. Here's how to do it.

Catching and Logging Exceptions

For the purposes of this post, I'll be modifying LineRecordReader from the Hadoop 2.6.0 mapreduce (note: not the old mapred) framework. You could easily use this modified version in a FileInputFormat, TextInputFormat or something similar... but where it will really shine is in CombineFileInputFormat and CombineTextInputFormat, where it will let you keep a job running even if you hit some problem files. If you want to modify a different type of record reader, though, the same concepts would apply.

For reference, here's the standard LineRecordReader code:

So, first things first... basic setup. We're going to import org.apache.mapreduce.lib.input.* (since our new record reader is no longer in that package). We're also going to create a boolean to keep track of whether the file the record reader is working with is broken or not. Finally, we'll create a string to keep track of the name of our file.

Now, we'll start by modifying the initialize() method to first set our fileBroken indicator to false, then store the name of the current file, and finally check to see if the input split we've been given has a length of 0. If it does, we'll assume the file is broken, and submit a log message. So, the first couple lines of initialize() become:

The next thing we need to do is catch any IOExceptions when they occur. This can happen any time the ReadLine() method is called on our input file. The first place that can happen is at the end of the initialize() method, where the LineRecordReader skips the first line of all but the first split. The original code looks like this...

We'll simply surround this with a try/catch block. If we get an exception, we'll log it, and mark the file as broken.

We'll also need to catch IOExceptions in the while() loop of nextKeyValue(). (They can also be thrown by skipUtfByteOrderMark(), but that only gets called by nextKeyValue(), so catching exceptions in nextKeyValue() is sufficient.) The original code...

And our modified version, which catches and logs exceptions:

Of course, catching and logging these exceptions is only half the battle. We're also going to want to make sure that our hadoop job doesn't try to process key/value pairs from a corrupted file, or continue to try to read lines after the file is known to be crap.

Stopping corrupt file processing

To stop the processing of the current file, we're first going to prevent emitting a key and value for the line on which an exception occurs. So, at the end of nextKeyValue(), we replace the following code, which prevents emitting empty lines:

We'll simply add an additional condition to make sure it doesn't emit keys and values for zero-length lines or lines from broken files...

Finally, we'll modify the getProgress() method to say that this split is 100% processed any time the file is broken. Here's the full code for the modified method:

That's it! We've now built a LineRecordReader capable of gracefully logging IOExceptions and stopping processing for only that file, rather than blowing up the whole job!

Conclusion

If you've been scrolling down looking for something you can copy and paste, I don't blame you. Here's all the code for what I call the NoCrashLineRecordReader. If you want to use it, you'll need to modify an input format (e.g., TextInputFormat) to use this custom record reader. I've discussed that process a bit in a previous post on manually making codec selections in Hadoop.

I'd be interested to hear about anybody else's experiences with this technique. I've tried this RecordReader out with zero-length gzip files, corrupt gzip files (e.g., open them up in a text editor and delete some bytes willy-nilly), as well as valid data, and it seems to work fine. I've used a similar custom RecordReader at-scale in a production environment and that seems to be working fine as well but, unfortunately, I haven't been able to test this code that thoroughly.

As usual, I'd love to hear your thoughts, questions, or suggestions. Please let me know if you notice something out of place!

Leave a Reply

Your email address will not be published. Required fields are marked *