From a0ab7cf561be4c787a4388e6318dbbaac6f0c8ac Mon Sep 17 00:00:00 2001 From: AbnerSunYH Date: Mon, 23 Apr 2018 13:54:54 +0800 Subject: [PATCH] release_0_4_20-restore_read_split_lzo_file_with_index.patch --- .../com/hadoop/compression/lzo/LzoCodec.java | 2 + .../mapred/DeprecatedLzoLineRecordReader.java | 24 +++++-- .../mapred/DeprecatedLzoTextInputFormat.java | 67 ++++++++++++++++--- 3 files changed, 80 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/hadoop/compression/lzo/LzoCodec.java b/src/main/java/com/hadoop/compression/lzo/LzoCodec.java index c4d138f6..a7b1389d 100644 --- a/src/main/java/com/hadoop/compression/lzo/LzoCodec.java +++ b/src/main/java/com/hadoop/compression/lzo/LzoCodec.java @@ -102,6 +102,8 @@ public static String getRevisionHash() { } catch (IOException e) { LOG.error("Could not find /hadoop-lzo-build.properties resource file with revision hash"); return "UNKNOWN"; + } catch (NullPointerException e){ + return "UNKNOWN"; } } diff --git a/src/main/java/com/hadoop/mapred/DeprecatedLzoLineRecordReader.java b/src/main/java/com/hadoop/mapred/DeprecatedLzoLineRecordReader.java index 355b65e9..1340f619 100644 --- a/src/main/java/com/hadoop/mapred/DeprecatedLzoLineRecordReader.java +++ b/src/main/java/com/hadoop/mapred/DeprecatedLzoLineRecordReader.java @@ -20,6 +20,9 @@ import java.io.IOException; +import com.hadoop.compression.lzo.LzoIndex; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -34,6 +37,7 @@ @SuppressWarnings("deprecation") public class DeprecatedLzoLineRecordReader implements RecordReader { + private static final Log LOG = LogFactory.getLog(DeprecatedLzoTextInputFormat.class); private CompressionCodecFactory codecFactory = null; private long start; private long pos; @@ -41,9 +45,12 @@ public class DeprecatedLzoLineRecordReader implements RecordReader @@ -63,7 +66,8 @@ @SuppressWarnings("deprecation") public class DeprecatedLzoTextInputFormat extends TextInputFormat { - private final Map indexes = new HashMap(); + private static final Log LOG = LogFactory.getLog(DeprecatedLzoTextInputFormat.class); + private final Map indexes = new ConcurrentHashMap(); @Override protected FileStatus[] listStatus(JobConf conf) throws IOException { @@ -97,8 +101,8 @@ protected FileStatus[] listStatus(JobConf conf) throws IOException { @Override protected boolean isSplitable(FileSystem fs, Path filename) { if (LzoInputFormatCommon.isLzoFile(filename.toString())) { - LzoIndex index = indexes.get(filename); - return !index.isEmpty(); + LzoIndex index = getLzoIndex(fs, filename); + return index != null && !index.isEmpty(); } else { // Delegate non-LZO files to the TextInputFormat base class. return super.isSplitable(fs, filename); @@ -151,12 +155,57 @@ public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException { public RecordReader getRecordReader(InputSplit split, JobConf conf, Reporter reporter) throws IOException { FileSplit fileSplit = (FileSplit) split; - if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) { + if (LzoInputFormatCommon.isLzoIndexFile(fileSplit.getPath().toString())) { + // return dummy RecordReader when provider split is an index file + return new RecordReader() { + + public LongWritable createKey() { + return new LongWritable(); + } + + public Text createValue() { + return new Text(); + } + + public boolean next(LongWritable key, Text value) throws IOException { + return false; + } + + public float getProgress() throws IOException { + return 0.0f; + } + + public synchronized long getPos() throws IOException { + return 0; + } + + public synchronized void close() throws IOException { + } + }; + } else if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) { reporter.setStatus(split.toString()); - return new DeprecatedLzoLineRecordReader(conf, (FileSplit)split); + return new DeprecatedLzoLineRecordReader(conf, (FileSplit)split, + getLzoIndex(FileSystem.get(conf),((FileSplit) split).getPath())); } else { // delegate non-LZO files to the TextInputFormat base class. return super.getRecordReader(split, conf, reporter); } } + + private LzoIndex getLzoIndex(FileSystem fs, Path filePath) { + String fileName = filePath.toString(); + LzoIndex index = indexes.get(filePath); + if (LzoInputFormatCommon.isLzoFile(fileName)) { + if (index == null||index.isEmpty()) { + try { + LOG.warn("index is null, it will read index, file name:" + fileName); + index = LzoIndex.readIndex(fs, filePath); + indexes.put(filePath, index); + } catch (IOException e) { + LOG.error("LzoIndex readIndex error, fileName:" + fileName, e); + } + } + } + return index; + } }