Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Options to skip small files and not recurse on input paths #90

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 162 additions & 43 deletions src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import com.hadoop.mapreduce.LzoIndexOutputFormat;
import com.hadoop.mapreduce.LzoSplitInputFormat;
import com.hadoop.mapreduce.LzoSplitRecordReader;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -25,81 +27,192 @@

public class DistributedLzoIndexer extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(DistributedLzoIndexer.class);
private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension();

private static final String LZO_EXTENSION = new LzopCodec().getDefaultExtension();

public static final String LZO_INDEXING_SKIP_SMALL_FILES_KEY = "lzo.indexing.skip-small-files.enabled";
public static final boolean LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT = false;
public static final String LZO_INDEXING_SMALL_FILE_SIZE_KEY = "lzo.indexing.skip-small-files.size";
public static final long LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT = 0;
public static final String LZO_INDEXING_RECURSIVE_KEY = "lzo.indexing.recursive.enabled";
public static final boolean LZO_INDEXING_RECURSIVE_DEFAULT = true;
private static final String TEMP_FILE_EXTENSION = "/_temporary";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really an extension but rather a file/directory name. Should it be more like TEMP_FILE_NAME or TEMP_DIRECTORY_NAME (depending on whether it is a file or directory)?


private boolean lzoSkipIndexingSmallFiles = LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT;
private boolean lzoRecursiveIndexing = LZO_INDEXING_RECURSIVE_DEFAULT;
private long lzoSmallFileSize = LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT;

public boolean getLzoSkipIndexingSmallFiles() { return lzoSkipIndexingSmallFiles; }
public boolean getLzoRecursiveIndexing() { return lzoRecursiveIndexing; }
public long getLzoSmallFileSize() { return lzoSmallFileSize; }

private Configuration conf = getConf();

/**
* Accepts paths which don't end in TEMP_FILE_EXTENSION
*/
private final PathFilter nonTemporaryFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return !path.toString().endsWith("/_temporary");
return !path.getName().endsWith(TEMP_FILE_EXTENSION);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must have missed this in the last pass. Path.getName() returns only the final component of the path (file/directory name) whereas Path.toString() returns the full path as a string. Since TEMP_FILE_EXTENSION starts with "/", path.getName().endsWith(TEMP_FILE_EXTENSION) will never return true, so this filter is not right, correct?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll fix it.

}
};

private void walkPath(Path path, PathFilter pathFilter, List<Path> accumulator) {
/**
* Returns whether a file should be considered small enough to skip indexing.
*/
public boolean isSmallFile(FileStatus status) {
return status.getLen() <= lzoSmallFileSize;
}

/**
* Adds into accumulator paths under path which this indexer should index.
* @param path The root path to check under.
* @param pathFilter The filter to apply for all paths.
* @param accumulator The list to accumulate paths to process in. The state
* of this list is changed in this call.
*/
private void visitPath(Path path, PathFilter pathFilter, List<Path> accumulator) {
try {
FileSystem fs = path.getFileSystem(getConf());
FileSystem fs = path.getFileSystem(conf);
FileStatus fileStatus = fs.getFileStatus(path);
visitPathHelper(fileStatus, fs, pathFilter, accumulator, true);
} catch (IOException ioe) {
LOG.error("Error visiting root path: " + path, ioe);
}
}

if (fileStatus.isDir()) {
FileStatus[] children = fs.listStatus(path, pathFilter);
for (FileStatus childStatus : children) {
walkPath(childStatus.getPath(), pathFilter, accumulator);
}
} else if (path.toString().endsWith(LZO_EXTENSION)) {
Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX);
if (fs.exists(lzoIndexPath)) {
// If the index exists and is of nonzero size, we're already done.
// We re-index a file with a zero-length index, because every file has at least one block.
if (fs.getFileStatus(lzoIndexPath).getLen() > 0) {
LOG.info("[SKIP] LZO index file already exists for " + path);
return;
} else {
LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)");
accumulator.add(path);
/* isInitialCall exists for this method to be consistent with Hadoop's definition
of "recursive": if the root path to a job is a directory and
and "recursive = false", it still uses the files in that directory but does not
recurse into children directories. The initial call is from visitPath
with isInitialCall = true to mimic this behavior. Afterwards the recursive
case sets isInitialCall = false. */
private void visitPathHelper(FileStatus fileStatus, FileSystem fs, PathFilter pathFilter, List<Path> accumulator, boolean isInitialCall) {
try {
Path path = fileStatus.getPath();
if (fileStatus.isDirectory()) {
if (lzoRecursiveIndexing || isInitialCall) {
FileStatus[] children = fs.listStatus(path, pathFilter);
for (FileStatus childStatus : children) {
visitPathHelper(childStatus, fs, pathFilter, accumulator, false);
}
} else {
// If no index exists, we need to index the file.
LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)");
accumulator.add(path);
LOG.info("[SKIP] Path " + path + " is a directory and recursion is not enabled.");
}
} else if (shouldIndexFile(fileStatus, fs)) {
accumulator.add(path);
}
} catch (IOException ioe) {
LOG.warn("Error walking path: " + path, ioe);
LOG.warn("Error visiting path: " + fileStatus.getPath(), ioe);
}
}

public int run(String[] args) throws Exception {
if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) {
printUsage();
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
/**
* Determine based on previous configuration of this indexer whether a file
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Determine -> Determines

* represented by fileStatus on the given FileSystem should be indexed or not.
* @param fileStatus The file to consider for indexing.
* @param fs The FileSystem on which the file resides.
* @return true if this indexer is configured to consider fileStatus indexable
* false if this indexer doesn't consider fileStatus indexable.
* @throws IOException
*/
public boolean shouldIndexFile(FileStatus fileStatus, FileSystem fs) throws IOException {
Path path = fileStatus.getPath();
if (path.getName().endsWith(LZO_EXTENSION)) {
if (lzoSkipIndexingSmallFiles && isSmallFile(fileStatus)) {
LOG.info("[SKIP] Skip indexing small files enabled and " + path + " is too small");
return false;
}

List<Path> inputPaths = new ArrayList<Path>();
for (String strPath: args) {
walkPath(new Path(strPath), nonTemporaryFilter, inputPaths);
}
Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX);
if (fs.exists(lzoIndexPath)) {
// If the index exists and is of nonzero size, we're already done.
// We re-index a file with a zero-length index, because every file has at least one block.
FileStatus indexFileStatus = fs.getFileStatus(lzoIndexPath);
if (indexFileStatus.getLen() > 0) {
LOG.info("[SKIP] LZO index file already exists for " + path);
return false;
} else {
LOG.info("[ADD] LZO file " + path + " to indexing list (index file exists but is zero length)");
return true;
}
}

if (inputPaths.isEmpty()) {
LOG.info("No input paths found - perhaps all " +
".lzo files have already been indexed.");
return 0;
// If no index exists, we need to index the file.
LOG.info("[ADD] LZO file " + path + " to indexing list (no index currently exists)");
return true;
}

Job job = new Job(getConf());
job.setJobName("Distributed Lzo Indexer " + Arrays.toString(args));
// If not an LZO file, skip the file.
LOG.info("[SKIP] Not an LZO file: " + path);
return false;
}

/**
* Configures this indexer from the values set in conf.
* @param conf The Configuration to read values from.
*/
public void configure(Configuration conf) {
lzoSkipIndexingSmallFiles =
conf.getBoolean(LZO_INDEXING_SKIP_SMALL_FILES_KEY, LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT);

lzoSmallFileSize =
conf.getLong(LZO_INDEXING_SMALL_FILE_SIZE_KEY, LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT);

lzoRecursiveIndexing =
conf.getBoolean(LZO_INDEXING_RECURSIVE_KEY, LZO_INDEXING_RECURSIVE_DEFAULT);
}

/**
* Creates a Job from the given Configuration and commandline arguments.
* @param conf The base Configuration to use for the job.
* @param name The name to give the Job. Appened to "Distributed Lzo Indexer".
* @return The configured Job object.
* @throws IOException
*/
private Job createJob(Configuration conf, String name) throws IOException {
Job job = new Job(conf);
job.setJobName("Distributed Lzo Indexer " + name);

job.setOutputKeyClass(Path.class);
job.setOutputValueClass(LongWritable.class);

// The LzoIndexOutputFormat doesn't currently work with speculative execution.
// Patches welcome.
job.getConfiguration().setBoolean(
"mapred.map.tasks.speculative.execution", false);
"mapred.map.tasks.speculative.execution", false);

job.setJarByClass(DistributedLzoIndexer.class);
job.setInputFormatClass(LzoSplitInputFormat.class);
job.setOutputFormatClass(LzoIndexOutputFormat.class);
job.setNumReduceTasks(0);
job.setMapperClass(Mapper.class);
return job;
}

public int run(String[] args) throws Exception {
if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) {
printUsage();
ToolRunner.printGenericCommandUsage(System.err);
return -1; // error
}

configure(conf);

List<Path> inputPaths = new ArrayList<Path>();
for (String strPath : args) {
visitPath(new Path(strPath), nonTemporaryFilter, inputPaths);
}

if (inputPaths.isEmpty()) {
LOG.info("No input paths found - perhaps all " +
".lzo files have already been indexed.");
return 0;
}

String jobName = Arrays.toString(args);
Job job = createJob(conf, jobName);

for (Path p : inputPaths) {
FileInputFormat.addInputPath(job, p);
Expand Down Expand Up @@ -134,7 +247,13 @@ public static void main(String[] args) throws Exception {
System.exit(exitCode);
}

public static void printUsage() {
System.err.println("Usage: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer <file.lzo | directory> [file2.lzo directory3 ...]");
public void printUsage() {
String usage =
"Command: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer <file.lzo | directory> [file2.lzo directory3 ...]" +
"\nConfiguration options: \"key\" [values] <default> description" +
"\n" + LZO_INDEXING_SKIP_SMALL_FILES_KEY + " [true,false] <" + LZO_INDEXING_SKIP_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than " + LZO_INDEXING_SMALL_FILE_SIZE_KEY + " bytes." +
"\n" + LZO_INDEXING_SMALL_FILE_SIZE_KEY + " [long] <" + LZO_INDEXING_SMALL_FILE_SIZE_DEFAULT + "> When indexing, skip files smaller than this number of bytes if " + LZO_INDEXING_SKIP_SMALL_FILES_KEY + " is true." +
"\n" + LZO_INDEXING_RECURSIVE_KEY + " [true,false] <" + LZO_INDEXING_RECURSIVE_DEFAULT + "> When indexing, recurse into child directories of input paths.";
LOG.error(usage);
}
}
Loading