diff --git a/src/main/java/com/hadoop/mapreduce/LzoLineRecordReader.java b/src/main/java/com/hadoop/mapreduce/LzoLineRecordReader.java index b6c279a9..a404716b 100644 --- a/src/main/java/com/hadoop/mapreduce/LzoLineRecordReader.java +++ b/src/main/java/com/hadoop/mapreduce/LzoLineRecordReader.java @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader; +import com.google.common.base.Charsets; import com.hadoop.compression.lzo.util.CompatibilityUtil; /** @@ -42,6 +43,18 @@ */ public class LzoLineRecordReader extends RecordReader { + protected static final boolean supportsCustomDelimiters; + static { + boolean hasConstructor = false; + try{ + RecordReader.class.getConstructor(java.io.InputStream.class, org.apache.hadoop.conf.Configuration.class, byte[].class); + hasConstructor = true; + } catch (NoSuchMethodException e) { + } catch (SecurityException e) { + } + supportsCustomDelimiters = hasConstructor; + } + private long start; private long pos; private long end; @@ -102,8 +115,23 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro // open the file and seek to the start of the split fileIn = fs.open(split.getPath()); + // load the custom line delimiter if one is set + final String delimiter = job.get("textinputformat.record.delimiter"); + byte[] recordDelimiterBytes = null; + if (null != delimiter) { + recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); + } + // creates input stream and also reads the file header - in = new LineReader(codec.createInputStream(fileIn), job); + if(supportsCustomDelimiters && recordDelimiterBytes != null) { + in = new LineReader(codec.createInputStream(fileIn), job, recordDelimiterBytes); + } else if(!supportsCustomDelimiters && recordDelimiterBytes != null) { + // TODO: log.warn("Cannot use customDelimiter " + delimiter + " for LZO files because your Hadoop installation doesn't support the LineReader(InputStream, Configuration, byte[]) constructor. Consider updating to a distribution of Hadoop that includes the patch in HADOOP-7096"); + // https://issues.apache.org/jira/browse/HADOOP-7096 + in = new LineReader(codec.createInputStream(fileIn), job); + } else { // recordDelimiterBytes == null + in = new LineReader(codec.createInputStream(fileIn), job); + } if (start != 0) { fileIn.seek(start); diff --git a/src/test/java/com/hadoop/mapreduce/TestLzoTextInputFormatDelimiter.java b/src/test/java/com/hadoop/mapreduce/TestLzoTextInputFormatDelimiter.java new file mode 100644 index 00000000..8b3996a5 --- /dev/null +++ b/src/test/java/com/hadoop/mapreduce/TestLzoTextInputFormatDelimiter.java @@ -0,0 +1,211 @@ +/* + * This file is part of Hadoop-Gpl-Compression. + * + * Hadoop-Gpl-Compression is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Hadoop-Gpl-Compression is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hadoop-Gpl-Compression. If not, see + * . + */ + +package com.hadoop.mapreduce; + +import java.io.IOException; +import java.util.List; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +import com.google.common.collect.Lists; +import com.hadoop.compression.lzo.GPLNativeCodeLoader; +import com.hadoop.compression.lzo.LzopCodec; +import com.hadoop.compression.lzo.util.CompatibilityUtil; + +/** + * Test the LzoTextInputFormat, make sure it splits the file properly and + * returns the right data. + */ +public class TestLzoTextInputFormatDelimiter extends TestCase { + + public void testNothing() { + } + /* + private static final Log LOG = LogFactory.getLog(TestLzoTextInputFormatDelimiter.class + .getName()); + + private Path outputDir; + + @Override + protected void setUp() throws Exception { + super.setUp(); + Path testBuildData = new Path(System.getProperty("test.build.data", "data")); + outputDir = new Path(testBuildData, "outputDir"); + } + + private void writeToFile(String contents, Job job, TaskAttemptContext attemptContext) throws IOException, InterruptedException { + // prep an LZO file with data + + TextOutputFormat.setCompressOutput(job, true); + TextOutputFormat.setOutputCompressorClass(job, LzopCodec.class); + TextOutputFormat.setOutputPath(job, outputDir); + + TextOutputFormat output = new TextOutputFormat(); + OutputCommitter committer = output.getOutputCommitter(attemptContext); + committer.setupJob(job); + RecordWriter rw = null; + + try { + rw = output.getRecordWriter(attemptContext); + rw.write(new Text(contents), new Text()); + } finally { + if (rw != null) { + rw.close(attemptContext); + committer.commitTask(attemptContext); + committer.commitJob(job); + } + } + } + + private List readRecordsFromFile(Job job, TaskAttemptContext attemptContext) throws IOException, InterruptedException { + LzoTextInputFormat inputFormat = new LzoTextInputFormat(); + TextInputFormat.setInputPaths(job, outputDir); + + List values = Lists.newArrayList(); + + List is = inputFormat.getSplits(job); + for (InputSplit inputSplit : is) { + RecordReader rr = inputFormat.createRecordReader( + inputSplit, attemptContext); + rr.initialize(inputSplit, attemptContext); + + while (rr.nextKeyValue()) { + Text value = rr.getCurrentValue(); + + values.add(value.toString()); + } + + rr.close(); + } + return values; + } + + private List readRecordsFromFileWithDelimiter(String delimiter, Job job, TaskAttemptContext attemptContext) throws IOException, InterruptedException { + job.getConfiguration().set("textinputformat.record.delimiter", delimiter); + + LzoTextInputFormat inputFormat = new LzoTextInputFormat(); + TextInputFormat.setInputPaths(job, outputDir); + + List values = Lists.newArrayList(); + + List is = inputFormat.getSplits(job); + for (InputSplit inputSplit : is) { + RecordReader rr = inputFormat.createRecordReader( + inputSplit, attemptContext); + rr.initialize(inputSplit, attemptContext); + + while (rr.nextKeyValue()) { + Text value = rr.getCurrentValue(); + + values.add(value.toString()); + } + + rr.close(); + } + return values; + } + + final private static char[] hexArray = "0123456789ABCDEF".toCharArray(); + private static String bytesToHex(byte[] bytes) { + char[] hexChars = new char[bytes.length * 2]; + for ( int j = 0; j < bytes.length; j++ ) { + int v = bytes[j] & 0xFF; + hexChars[j * 2] = hexArray[v >>> 4]; + hexChars[j * 2 + 1] = hexArray[v & 0x0F]; + } + return new String(hexChars); + } + + public void testCustomRecordDelimiter() throws IOException, InterruptedException { + if (!GPLNativeCodeLoader.isNativeCodeLoaded()) { + LOG.warn("Cannot run this test without the native lzo libraries"); + return; + } + + Configuration conf = new Configuration(); + conf.set("io.compression.codecs", LzopCodec.class.getName()); + + Job job = new Job(conf); + + FileSystem localFs = FileSystem.getLocal(new Configuration()); + localFs.delete(outputDir, true); + localFs.mkdirs(outputDir); + localFs.close(); + + TaskAttemptContext attemptContext = + CompatibilityUtil.newTaskAttemptContext(job.getConfiguration(), + new TaskAttemptID(TaskID.forName("task_123_0001_r_000001"), 2)); + + + // 12 records with default delimiter of \r or \n or \r\n and just three with custom \n + String data = "I have eaten\rthe plums\rthat were in\rthe icebox\r\n" + + "and which\ryou were probably\rsaving\rfor breakfast\r\n" + + "Forgive me\rthey were delicious\rso sweet\rand so cold"; + writeToFile(data, job, attemptContext); + + // read it with default delimiter + List defaultRecords = readRecordsFromFile(job, attemptContext); + assertEquals(12, defaultRecords.size()); +// System.out.println(defaultRecords); + List expectedDefaultRecords = Lists.newArrayList( + "I have eaten", "the plums", "that were in", "the icebox", + "and which", "you were probably", "saving", "for breakfast", + "Forgive me", "they were delicious", "so sweet", "and so cold\t"); +// System.out.println(bytesToHex(defaultRecords.get(0).getBytes())); +// System.out.println(bytesToHex(expectedDefaultRecords.get(0).getBytes())); +// System.out.println("-"); +// System.out.println(bytesToHex(defaultRecords.get(defaultRecords.size()-1).getBytes())); +// System.out.println(bytesToHex(expectedDefaultRecords.get(expectedDefaultRecords.size()-1).getBytes())); + for(int i=0; i customRecords = readRecordsFromFileWithDelimiter("\n", job, attemptContext); + assertEquals(3, customRecords.size()); + List expectedCustomRecords = Lists.newArrayList( + "I have eaten\rthe plums\rthat were in\rthe icebox\n", + "and which\ryou were probably\rsaving\rfor breakfast", + "Forgive me\rthey were delicious\rso sweet\rand so cold"); + for(int i=0; i