Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Options to skip small files and not recurse on input paths #90

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 99 additions & 27 deletions src/main/java/com/hadoop/compression/lzo/DistributedLzoIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import com.hadoop.mapreduce.LzoIndexOutputFormat;
import com.hadoop.mapreduce.LzoSplitInputFormat;
import com.hadoop.mapreduce.LzoSplitRecordReader;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -25,57 +27,121 @@

public class DistributedLzoIndexer extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(DistributedLzoIndexer.class);

private final String LZO_EXTENSION = new LzopCodec().getDefaultExtension();

private final String LZO_SKIP_INDEXING_SMALL_FILES_KEY = "lzo_skip_indexing_small_files";
private final String LZO_SMALL_FILE_SIZE_KEY = "lzo_small file_size";
private final String LZO_RECURSIVE_INDEXING_KEY = "lzo_recursive_indexing";
private final boolean LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT = false;
private final boolean LZO_RECURSIVE_INDEXING_DEFAULT = true;
private final long LZO_SMALL_FILE_SIZE_DEFAULT = Long.MIN_VALUE;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

33-38: if these are meant as constants (which I think they are), they should be private static final's.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if Long.MIN_VALUE is the best default value here, as it would be -2**63. Note that this is printed in the usage as well. If the goal is to disable the small-file-skipping feature if this configuration is not set, isn't 0 fine as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, those are meant to be constants. I missed the static modifier. 0 seems reasonable, I'll work that into the next commit.

private boolean lzoSkipIndexingSmallFiles = this.LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT;
private boolean lzoRecursiveIndexing = this.LZO_RECURSIVE_INDEXING_DEFAULT;
private long lzoSmallFileSize = this.LZO_SMALL_FILE_SIZE_DEFAULT;

private final String TEMP_FILE_EXTENSION = "/_temporary";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here...


private Configuration conf = getConf();

/**
* Accepts paths which don't end in TEMP_FILE_EXTENSION
*/
private final PathFilter nonTemporaryFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return !path.toString().endsWith("/_temporary");
return !path.toString().endsWith(TEMP_FILE_EXTENSION);
}
};

private void walkPath(Path path, PathFilter pathFilter, List<Path> accumulator) {
/**
* Accepts paths pointing to files with length >= lzoSmallFileSize.
*/
private final PathFilter bigFileFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
FileStatus status;
try {
FileSystem fs = path.getFileSystem(conf);
status = fs.getFileStatus(path);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% happy with having to query the file status again. I know the PathFilter API doesn't help you there. I think it's perfectly fine to have a simple helper method here rather than using the PathFilter API.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a PathFilter here because I was attempting to be consistent with the nonTemporary PathFilter use. However, since the bigFileFilter isn't passed to a listStatus() call, a simple helper method does seem like it'd be a better choice.

} catch (IOException e) {
LOG.info("Unable to get status of path " + path);
return false;
}
return status.getLen() >= lzoSmallFileSize ? true : false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not simply

return status.getLen() >= lzoSmallFileSize;

?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point :).

}
};

private void visitPath(Path path, PathFilter pathFilter, List<Path> accumulator, boolean recursive) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have lzoRecursiveIndexing as a member variable, we don't need argument "recursive" for this method.

try {
FileSystem fs = path.getFileSystem(getConf());
FileSystem fs = path.getFileSystem(this.conf);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the style of this code does not use the "this." notation. Can we remove the "this" reference from the changes?

FileStatus fileStatus = fs.getFileStatus(path);

if (fileStatus.isDir()) {
FileStatus[] children = fs.listStatus(path, pathFilter);
for (FileStatus childStatus : children) {
walkPath(childStatus.getPath(), pathFilter, accumulator);
}
} else if (path.toString().endsWith(LZO_EXTENSION)) {
Path lzoIndexPath = path.suffix(LzoIndex.LZO_INDEX_SUFFIX);
if (fs.exists(lzoIndexPath)) {
// If the index exists and is of nonzero size, we're already done.
// We re-index a file with a zero-length index, because every file has at least one block.
if (fs.getFileStatus(lzoIndexPath).getLen() > 0) {
LOG.info("[SKIP] LZO index file already exists for " + path);
return;
} else {
LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)");
accumulator.add(path);
if (fileStatus.isDirectory()) {
if (recursive) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires a discussion. This behavior of "recursive" is a little surprising to me. If I'm not mistaken (from things like FileSystem API), recursive=false means still processing direct file children of the directory but not traversing into subdirectories. But this patch would skip the directory entirely.

I think it may be good to be consistent and consider direct file children still. Thoughts? Either way, I think we need some comments in the code to clarify what it means.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct. We should try to be consistent with hadoop's definition of recursive and still process depth = 1 levels of child files.

FileStatus[] children = fs.listStatus(path, pathFilter);
for (FileStatus childStatus : children) {
visitPath(childStatus.getPath(), pathFilter, accumulator, recursive);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the recursive case, I think we are calling Path.getFileSystem() and getFileStatus() redundantly. I know this is carried over from the original, but I think it's a little wasteful. How about something like this?

private void visitPath(FileStatus fileStatus, FileSystem fs, PathFilter pathFilter, List<Path> accumulator) {
  ...
  if (fileStatus.isDirectory()) {
    if (recursive) {
      FileStatus[] children = fs.listStatus(fileStatus.getPath(), pathFilter);
      for (FileStatus childStatus : children) {
        visitPath(childStatus, fs, pathFilter, accumulator);
    ...
}

}
} else {
// If no index exists, we need to index the file.
LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)");
accumulator.add(path);
LOG.info("[SKIP] Path " + path + " is a directory and recursion is not enabled.");
}
} else if (shouldIndexPath(fileStatus, fs)) {
accumulator.add(path);
}
} catch (IOException ioe) {
LOG.warn("Error walking path: " + path, ioe);
}
}

private boolean shouldIndexPath(FileStatus fileStatus, FileSystem fs) throws IOException {
Path path = fileStatus.getPath();
if (path.toString().endsWith(LZO_EXTENSION)) {
if (this.lzoSkipIndexingSmallFiles && !this.bigFileFilter.accept(path)) {
LOG.info("[SKIP] Skip indexing small files enabled and " + path + " is too small");
return false;
}

Path lzoIndexPath = new Path(path, LzoIndex.LZO_INDEX_SUFFIX);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, I noticed this changed from "path.suffix(LzoIndex.LZO_INDEX_SUFFIX)" to "new Path(path, LzoIndex.LZO_INDEX_SUFFIX)". Is this an intentional change? In what way do these differ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they're roughly the same. I'm more used to using the constructor. The constructor does some additional resolution of the URIs for parent/child and normalization of the paths. The suffix method does this:

public Path suffix(String suffix) {
    return new Path(this.getParent(), this.getName() + suffix);
}

Which does seem like it should also work, so I will switch back to using .suffix here since it is what already works.

if (fs.exists(lzoIndexPath)) {
// If the index exists and is of nonzero size, we're already done.
// We re-index a file with a zero-length index, because every file has at least one block.
if (fileStatus.getLen() > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely incorrect. We should check the length of the index file, not the original lzo file. "fileStatus" is for the lzo file. See the corresponding line in the existing code.

LOG.info("[SKIP] LZO index file already exists for " + path);
return false;
} else {
LOG.info("Adding LZO file " + path + " to indexing list (index file exists but is zero length)");
return true;
}
} else {
// If no index exists, we need to index the file.
LOG.info("Adding LZO file " + path + " to indexing list (no index currently exists)");
return true;
}
}
return false;
}

public int run(String[] args) throws Exception {
if (args.length == 0 || (args.length == 1 && "--help".equals(args[0]))) {
printUsage();
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}

this.conf = getConf();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this redundant assignment?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remnant of the previous approach where the Configuration wasn't set at class instantiation, will remove in next commit.


this.lzoSkipIndexingSmallFiles =
this.conf.getBoolean(this.LZO_SKIP_INDEXING_SMALL_FILES_KEY, this.LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT);

this.lzoSmallFileSize =
this.conf.getLong(this.LZO_SMALL_FILE_SIZE_KEY, this.LZO_SMALL_FILE_SIZE_DEFAULT);

// Find paths to index based on recursive/not
this.lzoRecursiveIndexing = this.conf.getBoolean(LZO_RECURSIVE_INDEXING_KEY, this.LZO_RECURSIVE_INDEXING_DEFAULT);
List<Path> inputPaths = new ArrayList<Path>();
for (String strPath: args) {
walkPath(new Path(strPath), nonTemporaryFilter, inputPaths);
for (String strPath : args) {
visitPath(new Path(strPath), nonTemporaryFilter, inputPaths, this.lzoRecursiveIndexing);
}

if (inputPaths.isEmpty()) {
Expand All @@ -84,7 +150,7 @@ public int run(String[] args) throws Exception {
return 0;
}

Job job = new Job(getConf());
Job job = new Job(this.conf);
job.setJobName("Distributed Lzo Indexer " + Arrays.toString(args));

job.setOutputKeyClass(Path.class);
Expand Down Expand Up @@ -134,7 +200,13 @@ public static void main(String[] args) throws Exception {
System.exit(exitCode);
}

public static void printUsage() {
System.err.println("Usage: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer <file.lzo | directory> [file2.lzo directory3 ...]");
public void printUsage() {
String usage =
"Command: hadoop jar /path/to/this/jar com.hadoop.compression.lzo.DistributedLzoIndexer <file.lzo | directory> [file2.lzo directory3 ...]" +
"\nConfiguration options: \"key\" [values] <default> description" +
"\n" + this.LZO_SKIP_INDEXING_SMALL_FILES_KEY + " [true,false] <" + this.LZO_SKIP_INDEXING_SMALL_FILES_DEFAULT + "> When indexing, skip files smaller than " + this.LZO_SMALL_FILE_SIZE_KEY + " bytes." +
"\n" + this.LZO_SMALL_FILE_SIZE_KEY + " [long] <" + this.LZO_SMALL_FILE_SIZE_DEFAULT + "> When indexing, skip files smaller than this number of bytes if " + this.LZO_SKIP_INDEXING_SMALL_FILES_KEY + " is true." +
"\n" + this.LZO_RECURSIVE_INDEXING_KEY + " [true,false] <" + this.LZO_RECURSIVE_INDEXING_DEFAULT + "> Look for files to index recursively from paths on command line.";
System.err.println(usage);
}
}