diff --git a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java index edbde8f4..3b5f68cc 100644 --- a/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java +++ b/src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java @@ -9,8 +9,10 @@ import com.hadoop.mapreduce.LzoIndexOutputFormat; import com.hadoop.mapreduce.LzoSplitInputFormat; import com.hadoop.mapreduce.LzoSplitRecordReader; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -25,67 +27,153 @@ public class DistributedLzoIndexer extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(DistributedLzoIndexer.class); - private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension(); + private static final String LZO_EXTENSION = new LzopCodec().getDefaultExtension(); + + public static final String LZO_INDEXING_SKIP_SMALL_FILES_KEY = "lzo.indexing.skip-small-files.enabled"; + public static final boolean LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT = false; + public static final String LZO_INDEXING_SMALL_FILE_SIZE_KEY = "lzo.indexing.skip-small-files.size"; + public static final long LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT = 0; + public static final String LZO_INDEXING_RECURSIVE_KEY = "lzo.indexing.recursive.enabled"; + public static final boolean LZO_INDEXING_RECURSIVE_DEFAULT = true; + private static final String TEMP_FILE_NAME = "/_temporary"; + + private boolean lzoSkipIndexingSmallFiles = LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT; + private boolean lzoRecursiveIndexing = LZO_INDEXING_RECURSIVE_DEFAULT; + private long lzoSmallFileSize = LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT; + + public boolean getLzoSkipIndexingSmallFiles() { return lzoSkipIndexingSmallFiles; } + public boolean getLzoRecursiveIndexing() { return lzoRecursiveIndexing; } + public long getLzoSmallFileSize() { return lzoSmallFileSize; } + + private Configuration conf = getConf(); + + /** + * Accepts paths which don't end in TEMP_FILE_NAME + */ private final PathFilter nonTemporaryFilter = new PathFilter() { + @Override public boolean accept(Path path) { - return !path.toString().endsWith("/_temporary"); + return !path.toString().endsWith(TEMP_FILE_NAME); } }; - private void walkPath(Path path, PathFilter pathFilter, List accumulator) { + /** + * Returns whether a file should be considered small enough to skip indexing. + */ + public boolean isSmallFile(FileStatus status) { + return status.getLen() <= lzoSmallFileSize; + } + + /** + * Adds into accumulator paths under path which this indexer should index. + * @param path The root path to check under. + * @param pathFilter The filter to apply for all paths. + * @param accumulator The list to accumulate paths to process in. The state + * of this list is changed in this call. + */ + private void visitPath(Path path, PathFilter pathFilter, List accumulator) { try { - FileSystem fs = path.getFileSystem(getConf()); + FileSystem fs = path.getFileSystem(conf); FileStatus fileStatus = fs.getFileStatus(path); + visitPathHelper(fileStatus, fs, pathFilter, accumulator, true); + } catch (IOException ioe) { + LOG.error("Error visiting root path: " + path, ioe); + } + } - if (fileStatus.isDir()) { - FileStatus[] children = fs.listStatus(path, pathFilter); - for (FileStatus childStatus : children) { - walkPath(childStatus.getPath(), pathFilter, accumulator); - } - } else if (path.toString().endsWith(LZO_EXTENSION)) { - Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX); - if (fs.exists(lzoIndexPath)) { - // If the index exists and is of nonzero size, we're already done. - // We re-index a file with a zero-length index, because every file has at least one block. - if (fs.getFileStatus(lzoIndexPath).getLen() > 0) { - LOG.info("[SKIP] LZO index file already exists for " + path); - return; - } else { - LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)"); - accumulator.add(path); + /* isInitialCall exists for this method to be consistent with Hadoop's definition + of "recursive": if the root path to a job is a directory and + and "recursive = false", it still uses the files in that directory but does not + recurse into children directories. The initial call is from visitPath + with isInitialCall = true to mimic this behavior. Afterwards the recursive + case sets isInitialCall = false. */ + private void visitPathHelper(FileStatus fileStatus, FileSystem fs, PathFilter pathFilter, List accumulator, boolean isInitialCall) { + try { + Path path = fileStatus.getPath(); + if (fileStatus.isDirectory()) { + if (lzoRecursiveIndexing || isInitialCall) { + FileStatus[] children = fs.listStatus(path, pathFilter); + for (FileStatus childStatus : children) { + visitPathHelper(childStatus, fs, pathFilter, accumulator, false); } } else { - // If no index exists, we need to index the file. - LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)"); - accumulator.add(path); + LOG.info("[SKIP] Path " + path + " is a directory and recursion is not enabled."); } + } else if (shouldIndexFile(fileStatus, fs)) { + accumulator.add(path); } } catch (IOException ioe) { - LOG.warn("Error walking path: " + path, ioe); + LOG.warn("Error visiting path: " + fileStatus.getPath(), ioe); } } - public int run(String[] args) throws Exception { - if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) { - printUsage(); - ToolRunner.printGenericCommandUsage(System.err); - return -1; - } + /** + * Determine based on previous configuration of this indexer whether a file + * represented by fileStatus on the given FileSystem should be indexed or not. + * @param fileStatus The file to consider for indexing. + * @param fs The FileSystem on which the file resides. + * @return true if this indexer is configured to consider fileStatus indexable + * false if this indexer doesn't consider fileStatus indexable. + * @throws IOException + */ + public boolean shouldIndexFile(FileStatus fileStatus, FileSystem fs) throws IOException { + Path path = fileStatus.getPath(); + if (path.getName().endsWith(LZO_EXTENSION)) { + if (lzoSkipIndexingSmallFiles && isSmallFile(fileStatus)) { + LOG.info("[SKIP] Skip indexing small files enabled and " + path + " is too small"); + return false; + } - List inputPaths = new ArrayList(); - for (String strPath: args) { - walkPath(new Path(strPath), nonTemporaryFilter, inputPaths); - } + Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX); + if (fs.exists(lzoIndexPath)) { + // If the index exists and is of nonzero size, we're already done. + // We re-index a file with a zero-length index, because every file has at least one block. + FileStatus indexFileStatus = fs.getFileStatus(lzoIndexPath); + if (indexFileStatus.getLen() > 0) { + LOG.info("[SKIP] LZO index file already exists for " + path); + return false; + } else { + LOG.info("[ADD] LZO file " + path + " to indexing list (index file exists but is zero length)"); + return true; + } + } - if (inputPaths.isEmpty()) { - LOG.info("No input paths found - perhaps all " + - ".lzo files have already been indexed."); - return 0; + // If no index exists, we need to index the file. + LOG.info("[ADD] LZO file " + path + " to indexing list (no index currently exists)"); + return true; } - Job job = new Job(getConf()); - job.setJobName("Distributed Lzo Indexer " + Arrays.toString(args)); + // If not an LZO file, skip the file. + LOG.info("[SKIP] Not an LZO file: " + path); + return false; + } + + /** + * Configures this indexer from the values set in conf. + * @param conf The Configuration to read values from. + */ + public void configure(Configuration conf) { + lzoSkipIndexingSmallFiles = + conf.getBoolean(LZO_INDEXING_SKIP_SMALL_FILES_KEY, LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT); + + lzoSmallFileSize = + conf.getLong(LZO_INDEXING_SMALL_FILE_SIZE_KEY, LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT); + + lzoRecursiveIndexing = + conf.getBoolean(LZO_INDEXING_RECURSIVE_KEY, LZO_INDEXING_RECURSIVE_DEFAULT); + } + + /** + * Creates a Job from the given Configuration and commandline arguments. + * @param conf The base Configuration to use for the job. + * @param name The name to give the Job. Appened to "Distributed Lzo Indexer". + * @return The configured Job object. + * @throws IOException + */ + private Job createJob(Configuration conf, String name) throws IOException { + Job job = new Job(conf); + job.setJobName("Distributed Lzo Indexer " + name); job.setOutputKeyClass(Path.class); job.setOutputValueClass(LongWritable.class); @@ -93,13 +181,38 @@ public int run(String[] args) throws Exception { // The LzoIndexOutputFormat doesn't currently work with speculative execution. // Patches welcome. job.getConfiguration().setBoolean( - "mapred.map.tasks.speculative.execution", false); + "mapred.map.tasks.speculative.execution", false); job.setJarByClass(DistributedLzoIndexer.class); job.setInputFormatClass(LzoSplitInputFormat.class); job.setOutputFormatClass(LzoIndexOutputFormat.class); job.setNumReduceTasks(0); job.setMapperClass(Mapper.class); + return job; + } + + public int run(String[] args) throws Exception { + if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) { + printUsage(); + ToolRunner.printGenericCommandUsage(System.err); + return -1; // error + } + + configure(conf); + + List inputPaths = new ArrayList(); + for (String strPath : args) { + visitPath(new Path(strPath), nonTemporaryFilter, inputPaths); + } + + if (inputPaths.isEmpty()) { + LOG.info("No input paths found - perhaps all " + + ".lzo files have already been indexed."); + return 0; + } + + String jobName = Arrays.toString(args); + Job job = createJob(conf, jobName); for (Path p : inputPaths) { FileInputFormat.addInputPath(job, p); @@ -134,7 +247,13 @@ public static void main(String[] args) throws Exception { System.exit(exitCode); } - public static void printUsage() { - System.err.println("Usage: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer [file2.lzo directory3 ...]"); + public void printUsage() { + String usage = + "Command: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer [file2.lzo directory3 ...]" + + "\nConfiguration options: \"key\" [values] description" + + "\n" + LZO_INDEXING_SKIP_SMALL_FILES_KEY + " [true,false] <" + LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than " + LZO_INDEXING_SMALL_FILE_SIZE_KEY + " bytes." + + "\n" + LZO_INDEXING_SMALL_FILE_SIZE_KEY + " [long] <" + LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT + "> When indexing, skip files smaller than this number of bytes if " + LZO_INDEXING_SKIP_SMALL_FILES_KEY + " is true." + + "\n" + LZO_INDEXING_RECURSIVE_KEY + " [true,false] <" + LZO_INDEXING_RECURSIVE_DEFAULT + "> When indexing, recurse into child directories of input paths."; + LOG.error(usage); } } diff --git a/src/test/java/com/hadoop/compression/lzo/TestDistributedLzoIndexer.java b/src/test/java/com/hadoop/compression/lzo/TestDistributedLzoIndexer.java new file mode 100644 index 00000000..07a54d89 --- /dev/null +++ b/src/test/java/com/hadoop/compression/lzo/TestDistributedLzoIndexer.java @@ -0,0 +1,139 @@ +package com.hadoop.compression.lzo; + +import junit.framework.TestCase; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class TestDistributedLzoIndexer extends TestCase { + public void testConfigureDefaults() { + Configuration conf = new Configuration(); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + assertEquals(DistributedLzoIndexer.LZO_INDEXING_RECURSIVE_DEFAULT, indexer.getLzoRecursiveIndexing()); + assertEquals(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT, indexer.getLzoSkipIndexingSmallFiles()); + assertEquals(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT, indexer.getLzoSmallFileSize()); + } + + public void testConfigureSettings() { + Configuration conf = new Configuration(); + conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_RECURSIVE_KEY, false); + conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); + conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 5 * 1024L); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + assertEquals(false, indexer.getLzoRecursiveIndexing()); + assertEquals(true, indexer.getLzoSkipIndexingSmallFiles()); + assertEquals(5 * 1024L, indexer.getLzoSmallFileSize()); + } + + protected void doTestIsSmallFile(long fileSize, long smallThreshold, boolean expectedResult) { + Configuration conf = new Configuration(); + conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, smallThreshold); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + FileStatus status = new FileStatus(fileSize, false, 3, 512L, 100L, new Path("/tmp/my/file")); + + assertEquals(expectedResult, indexer.isSmallFile(status)); + } + + public void testIsSmallFileSmaller() throws Exception { + doTestIsSmallFile(500L, 1000L, true); + } + + public void testIsSmallFileEquals() throws Exception { + doTestIsSmallFile(500L, 500L, true); + } + + public void testIsSmallFileGreater() throws Exception { + doTestIsSmallFile(500L, 200L, false); + } + + public void testShouldIndexFileNotLzoFile() throws Exception { + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + + File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".foo"); + FileStatus status = new FileStatus(5L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); + + assertEquals(false, indexer.shouldIndexFile(status, fs)); + } + + public void testShouldIndexFileSkipSmallFiles() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); + conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 100L); + FileSystem fs = FileSystem.getLocal(conf); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + + File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo"); + FileStatus status = new FileStatus(50L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); + + assertEquals(false, indexer.shouldIndexFile(status, fs)); + } + + public void testShouldIndexFileIndexNonexistent() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); + conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 100L); + FileSystem fs = FileSystem.getLocal(conf); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + + File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo"); + FileStatus status = new FileStatus(200L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); + + assertEquals(true, indexer.shouldIndexFile(status, fs)); + } + + public void testShouldIndexFileEmptyIndexExists() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); + conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 100L); + FileSystem fs = FileSystem.getLocal(conf); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + + File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo"); + FileStatus status = new FileStatus(200L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); + + String tempFileIndexPath = tempFile.getAbsolutePath() + LzoIndex.LZO_INDEX_SUFFIX; + File tempFileIndex = new File(tempFileIndexPath); + if (!tempFileIndex.createNewFile()) { + throw new IOException("Could not create temp file for testing " + tempFileIndex); + } + + assertEquals(true, indexer.shouldIndexFile(status, fs)); + } + + public void testShouldIndexFileIndexExists() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(DistributedLzoIndexer.LZO_INDEXING_SKIP_SMALL_FILES_KEY, true); + conf.setLong(DistributedLzoIndexer.LZO_INDEXING_SMALL_FILE_SIZE_KEY, 100L); + FileSystem fs = FileSystem.getLocal(conf); + DistributedLzoIndexer indexer = new DistributedLzoIndexer(); + indexer.configure(conf); + + File tempFile = File.createTempFile("TestDistributedLzoIndexer", ".lzo"); + FileStatus status = new FileStatus(200L, false, 3, 512L, 100L, new Path(tempFile.getAbsolutePath())); + + String tempFileIndexPath = tempFile.getAbsolutePath() + LzoIndex.LZO_INDEX_SUFFIX; + File tempFileIndex = new File(tempFileIndexPath); + + OutputStream fos = new FileOutputStream(tempFileIndex); + fos.write(1); + fos.close(); + + assertEquals(false, indexer.shouldIndexFile(status, fs)); + } +} diff --git a/src/test/java/com/hadoop/compression/lzo/TestLzoRandData.java b/src/test/java/com/hadoop/compression/lzo/TestLzoRandData.java index 025d2cd2..f8576328 100644 --- a/src/test/java/com/hadoop/compression/lzo/TestLzoRandData.java +++ b/src/test/java/com/hadoop/compression/lzo/TestLzoRandData.java @@ -14,15 +14,12 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.util.ReflectionUtils; -import com.hadoop.compression.lzo.LzopCodec; - /** * Unit Test for LZO with random data. */ public class TestLzoRandData extends TestCase { Configuration conf; - CompressionCodec codec; @Override protected void setUp() throws Exception {