Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restore read split lzo file with index #136

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/com/hadoop/compression/lzo/LzoCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public static String getRevisionHash() {
} catch (IOException e) {
LOG.error("Could not find /hadoop-lzo-build.properties resource file with revision hash");
return "UNKNOWN";
} catch (NullPointerException e){
return "UNKNOWN";
}
}

Expand Down
24 changes: 20 additions & 4 deletions src/main/java/com/hadoop/mapred/DeprecatedLzoLineRecordReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.io.IOException;

import com.hadoop.compression.lzo.LzoIndex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -34,16 +37,20 @@

@SuppressWarnings("deprecation")
public class DeprecatedLzoLineRecordReader implements RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(DeprecatedLzoTextInputFormat.class);
private CompressionCodecFactory codecFactory = null;
private long start;
private long pos;
private final long end;
private final LineReader in;
private final FSDataInputStream fileIn;

DeprecatedLzoLineRecordReader(Configuration conf, FileSplit split) throws IOException {
DeprecatedLzoLineRecordReader(Configuration conf, FileSplit split) throws IOException{
this(conf, split,null);
}

DeprecatedLzoLineRecordReader(Configuration conf, FileSplit split, LzoIndex lzoIndex) throws IOException {
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();

FileSystem fs = file.getFileSystem(conf);
Expand All @@ -52,19 +59,28 @@ public class DeprecatedLzoLineRecordReader implements RecordReader<LongWritable,
if (codec == null) {
throw new IOException("No LZO codec found, cannot run.");
}
long splitEnd = start + split.getLength();
long lzoBlockEnd = lzoIndex == null ? -1: lzoIndex.findNextPosition(splitEnd);
end = lzoBlockEnd == -1 ? splitEnd : lzoBlockEnd;

// Open the file and seek to the next split.
fileIn = fs.open(file);
// Create input stream and read the file header.
in = new LineReader(codec.createInputStream(fileIn), conf);
if (start != 0) {
fileIn.seek(start);

fileIn.seek(lzoIndex == null ? start : lzoIndex.findNextPosition(start));
// Read and ignore the first line.
in.readLine(new Text());
start = fileIn.getPos();
}

LOG.info("codec class:" + codec.getClass().getSimpleName() +
", file path:" + file.getName() +
", split start:" + split.getStart() +
", split end:" + splitEnd +
", start:" + start +
", end:" + end);

pos = start;
}

Expand Down
67 changes: 58 additions & 9 deletions src/main/java/com/hadoop/mapred/DeprecatedLzoTextInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -42,12 +45,12 @@
import com.hadoop.compression.lzo.LzoInputFormatCommon;

/**
* This class conforms to the old (org.apache.hadoop.mapred.*) hadoop API style
* which is deprecated but still required in places. Streaming, for example,
* does a check that the given input format is a descendant of
* This class conforms to the old (org.apache.hadoop.mapred.*) hadoop API style
* which is deprecated but still required in places. Streaming, for example,
* does a check that the given input format is a descendant of
* org.apache.hadoop.mapred.InputFormat, which any InputFormat-derived class
* from the new API fails. In order for streaming to work, you must use
* com.hadoop.mapred.DeprecatedLzoTextInputFormat, not
* com.hadoop.mapred.DeprecatedLzoTextInputFormat, not
* com.hadoop.mapreduce.LzoTextInputFormat. The classes attempt to be alike in
* every other respect.
* <p>
Expand All @@ -63,7 +66,8 @@

@SuppressWarnings("deprecation")
public class DeprecatedLzoTextInputFormat extends TextInputFormat {
private final Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>();
private static final Log LOG = LogFactory.getLog(DeprecatedLzoTextInputFormat.class);
private final Map<Path, LzoIndex> indexes = new ConcurrentHashMap<Path, LzoIndex>();

@Override
protected FileStatus[] listStatus(JobConf conf) throws IOException {
Expand Down Expand Up @@ -97,8 +101,8 @@ protected FileStatus[] listStatus(JobConf conf) throws IOException {
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
if (LzoInputFormatCommon.isLzoFile(filename.toString())) {
LzoIndex index = indexes.get(filename);
return !index.isEmpty();
LzoIndex index = getLzoIndex(fs, filename);
return index != null && !index.isEmpty();
} else {
// Delegate non-LZO files to the TextInputFormat base class.
return super.isSplitable(fs, filename);
Expand Down Expand Up @@ -151,12 +155,57 @@ public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
JobConf conf, Reporter reporter) throws IOException {
FileSplit fileSplit = (FileSplit) split;
if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) {
if (LzoInputFormatCommon.isLzoIndexFile(fileSplit.getPath().toString())) {
// return dummy RecordReader when provider split is an index file
return new RecordReader<LongWritable, Text>() {

public LongWritable createKey() {
return new LongWritable();
}

public Text createValue() {
return new Text();
}

public boolean next(LongWritable key, Text value) throws IOException {
return false;
}

public float getProgress() throws IOException {
return 0.0f;
}

public synchronized long getPos() throws IOException {
return 0;
}

public synchronized void close() throws IOException {
}
};
} else if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) {
reporter.setStatus(split.toString());
return new DeprecatedLzoLineRecordReader(conf, (FileSplit)split);
return new DeprecatedLzoLineRecordReader(conf, (FileSplit)split,
getLzoIndex(FileSystem.get(conf),((FileSplit) split).getPath()));
} else {
// delegate non-LZO files to the TextInputFormat base class.
return super.getRecordReader(split, conf, reporter);
}
}

private LzoIndex getLzoIndex(FileSystem fs, Path filePath) {
String fileName = filePath.toString();
LzoIndex index = indexes.get(filePath);
if (LzoInputFormatCommon.isLzoFile(fileName)) {
if (index == null||index.isEmpty()) {
try {
LOG.warn("index is null, it will read index, file name:" + fileName);
index = LzoIndex.readIndex(fs, filePath);
indexes.put(filePath, index);
} catch (IOException e) {
LOG.error("LzoIndex readIndex error, fileName:" + fileName, e);
}
}
}
return index;
}
}