From 28919dff8d509512893797e535a075592c40fe69 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryaboy Date: Sat, 11 Feb 2012 12:14:07 -0800 Subject: [PATCH 1/6] convert lzo index writing/reading to use an interface. --- .../compression/lzo/LzoBasicIndexSerde.java | 87 +++++++++++++++++++ .../com/hadoop/compression/lzo/LzoIndex.java | 51 +++++++---- .../hadoop/compression/lzo/LzoIndexSerde.java | 72 +++++++++++++++ 3 files changed, 192 insertions(+), 18 deletions(-) create mode 100644 src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java create mode 100644 src/java/com/hadoop/compression/lzo/LzoIndexSerde.java diff --git a/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java b/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java new file mode 100644 index 00000000..63e0ab4c --- /dev/null +++ b/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java @@ -0,0 +1,87 @@ +package com.hadoop.compression.lzo; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; + +public class LzoBasicIndexSerde implements LzoIndexSerde { + + private static final int BUFFER_CAPACITY = 16 * 1024 * 8; //size for a 4GB file (with 256KB lzo blocks) + + private DataOutputStream os; + private DataInputStream is; + private ByteBuffer bytesIn; + private long firstLong; + private int numBlocks = 0; + private boolean processedFirstLong = false; + + @Override + public boolean accepts(long firstLong) { + if (firstLong < 0) { + return false; + } else { + this.firstLong = firstLong; + return true; + } + } + + @Override + public void prepareToWrite(DataOutputStream os) throws IOException { + this.os = os; + } + + @Override + public void prepareToRead(DataInputStream is) throws IOException { + this.is = is; + bytesIn = fillBuffer(); + numBlocks = bytesIn.remaining()/8 + 1; // plus one for the first long. + nextCalls = 0; + processedFirstLong = false; + } + + @Override + public void writeOffset(long offset) throws IOException { + os.writeLong(offset); + } + + @Override + public void finishWriting() throws IOException { + os.close(); + } + + int nextCalls = 0; + @Override + public boolean hasNext() throws IOException { + return !processedFirstLong || (bytesIn != null && bytesIn.hasRemaining()); + } + + @Override + public long next() throws IOException { + if (!processedFirstLong) { + processedFirstLong = true; + return firstLong; + } + if (bytesIn != null && bytesIn.hasRemaining()) { + return bytesIn.getLong(); + } else { + throw new IOException("Attempt to read past the edge of the index."); + } + } + + private ByteBuffer fillBuffer() throws IOException { + DataOutputBuffer bytes = new DataOutputBuffer(BUFFER_CAPACITY); + // copy indexIn and close it if finished + IOUtils.copyBytes(is, bytes, 4*1024, true); + return ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()); + } + + @Override + public int numBlocks() { + return numBlocks; + } + +} diff --git a/src/java/com/hadoop/compression/lzo/LzoIndex.java b/src/java/com/hadoop/compression/lzo/LzoIndex.java index c5f9059e..f9feb4e4 100644 --- a/src/java/com/hadoop/compression/lzo/LzoIndex.java +++ b/src/java/com/hadoop/compression/lzo/LzoIndex.java @@ -20,7 +20,7 @@ import java.io.EOFException; import java.io.IOException; -import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import org.apache.hadoop.conf.Configurable; @@ -29,8 +29,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; @@ -44,6 +42,12 @@ public class LzoIndex { private long[] blockPositions_; + private static ArrayList> serdeClasses = + new ArrayList>(); + static { + serdeClasses.add(LzoBasicIndexSerde.class); + } + /** * Create an empty index, typically indicating no index file exists. */ @@ -175,21 +179,30 @@ public static LzoIndex readIndex(FileSystem fs, Path lzoFile) throws IOException // return empty index, fall back to the unsplittable mode return new LzoIndex(); } - - int capacity = 16 * 1024 * 8; //size for a 4GB file (with 256KB lzo blocks) - DataOutputBuffer bytes = new DataOutputBuffer(capacity); - - // copy indexIn and close it - IOUtils.copyBytes(indexIn, bytes, 4*1024, true); - - ByteBuffer bytesIn = ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()); - int blocks = bytesIn.remaining()/8; - LzoIndex index = new LzoIndex(blocks); - - for (int i = 0; i < blocks; i++) { - index.set(i, bytesIn.getLong()); + long firstLong = indexIn.readLong(); + LzoIndexSerde serde = null; + for (Class candidateClass : serdeClasses) { + LzoIndexSerde candidate = null; + try { + candidate = candidateClass.newInstance(); + } catch (InstantiationException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IllegalAccessException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + if (candidate.accepts(firstLong)) { + serde = candidate; + break; + } + } + serde.prepareToRead(indexIn); + int idx = 0; + LzoIndex index = new LzoIndex(serde.numBlocks()); + while (serde.hasNext()) { + index.set(idx++, serde.next()); } - return index; } @@ -226,6 +239,8 @@ public static void createIndex(FileSystem fs, Path lzoFile) try { is = fs.open(lzoFile); os = fs.create(tmpOutputFile); + LzoIndexSerde writer = new LzoBasicIndexSerde(); + writer.prepareToWrite(os); LzopDecompressor decompressor = (LzopDecompressor) codec.createDecompressor(); // Solely for reading the header codec.createInputStream(is, decompressor); @@ -252,7 +267,7 @@ public static void createIndex(FileSystem fs, Path lzoFile) numDecompressedChecksums : numDecompressedChecksums + numCompressedChecksums; long pos = is.getPos(); // write the pos of the block start - os.writeLong(pos - 8); + writer.writeOffset(pos - 8); // seek to the start of the next block, skip any checksums is.seek(pos + compressedBlockSize + (4 * numChecksumsToSkip)); } diff --git a/src/java/com/hadoop/compression/lzo/LzoIndexSerde.java b/src/java/com/hadoop/compression/lzo/LzoIndexSerde.java new file mode 100644 index 00000000..6616947c --- /dev/null +++ b/src/java/com/hadoop/compression/lzo/LzoIndexSerde.java @@ -0,0 +1,72 @@ +/* + * This file is part of Hadoop-Gpl-Compression. + * + * Hadoop-Gpl-Compression is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Hadoop-Gpl-Compression is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hadoop-Gpl-Compression. If not, see + * . + */ + +package com.hadoop.compression.lzo; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public interface LzoIndexSerde { + + /** + * Serdes will be tried in order until one is found that accepts + * the offered format. A format is determined from the first 8 + * bytes (represented as a long) written to the index file. + *

+ * The first long is somewhat constrained: the topmost bit should be + * 1, the next 31 are a version number by which the appropriate SerDe + * is decided, and the next 32 can have arbitrary data (a header, or + * a length of the header, or an offset.. up to you). + * + * @param firstLong + * @return true if this format is recognized by the SerDe, false otherwise. + */ + public boolean accepts(long firstLong); + + public void prepareToWrite(DataOutputStream os) throws IOException; + + /** + * Prepare to read the index. Note that the first 8 bits will have been already + * read from this stream, and passed to you in accepts() in the form of a long. + * @param is InputStream to read. + */ + public void prepareToRead(DataInputStream is) throws IOException; + + /** + * Write the next offset into the file. It is expected that + * the offsets are supplied in order. prepareToWrite() + * should be called before the first invocation of this method. + * @param offset + */ + public void writeOffset(long offset) throws IOException; + + public void finishWriting() throws IOException; + + public boolean hasNext() throws IOException; + + public long next() throws IOException; + + /** + * Get the number of block expected to be read from this index. + * Will only be called after prepareToRead(). + * @return number of block offsets that will be read back. + */ + public int numBlocks(); + +} From ebc50ea45cc9fbd0f36776ab0308b7883f8f58fa Mon Sep 17 00:00:00 2001 From: Dmitriy Ryaboy Date: Sat, 11 Feb 2012 12:30:51 -0800 Subject: [PATCH 2/6] Add test for LzoIndexSerdes --- .../com/hadoop/compression/lzo/LzoIndex.java | 5 ++- .../compression/lzo/TestLzoIndexSerde.java | 43 +++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java diff --git a/src/java/com/hadoop/compression/lzo/LzoIndex.java b/src/java/com/hadoop/compression/lzo/LzoIndex.java index f9feb4e4..be376477 100644 --- a/src/java/com/hadoop/compression/lzo/LzoIndex.java +++ b/src/java/com/hadoop/compression/lzo/LzoIndex.java @@ -230,6 +230,8 @@ public static void createIndex(FileSystem fs, Path lzoFile) FSDataInputStream is = null; FSDataOutputStream os = null; + LzoIndexSerde writer = new LzoBasicIndexSerde(); + Path outputFile = lzoFile.suffix(LZO_INDEX_SUFFIX); Path tmpOutputFile = lzoFile.suffix(LZO_TMP_INDEX_SUFFIX); @@ -239,7 +241,6 @@ public static void createIndex(FileSystem fs, Path lzoFile) try { is = fs.open(lzoFile); os = fs.create(tmpOutputFile); - LzoIndexSerde writer = new LzoBasicIndexSerde(); writer.prepareToWrite(os); LzopDecompressor decompressor = (LzopDecompressor) codec.createDecompressor(); // Solely for reading the header @@ -278,7 +279,7 @@ public static void createIndex(FileSystem fs, Path lzoFile) if (is != null) { is.close(); } - + writer.finishWriting(); if (os != null) { os.close(); } diff --git a/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java b/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java new file mode 100644 index 00000000..eeafe467 --- /dev/null +++ b/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java @@ -0,0 +1,43 @@ +package com.hadoop.compression.lzo; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import junit.framework.TestCase; + +public class TestLzoIndexSerde extends TestCase { + + public void testBasicSerde() throws IOException { + testGenericSerde(new LzoBasicIndexSerde()); + } + + /** + * Ensures that the provided serde can read its own output correctly + * @param serde + * @throws IOException + */ + public void testGenericSerde(LzoIndexSerde serde) throws IOException { + long[] expected = { 40L, 500L, 584L, 10017L }; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + serde.prepareToWrite(os); + for (long val : expected) { + serde.writeOffset(val); + } + serde.finishWriting(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream is = new DataInputStream(bais); + long firstLong = is.readLong(); + assertTrue(serde.accepts(firstLong)); + serde.prepareToRead(is); + for (long val : expected) { + assertTrue(serde.hasNext()); + assertEquals(val, serde.next()); + } + + } +} From fdbc1e774331df150840f4fdc420e5aab2e3cbc1 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryaboy Date: Sat, 11 Feb 2012 13:02:58 -0800 Subject: [PATCH 3/6] take care of instantiation exceptions for serdes --- .../com/hadoop/compression/lzo/LzoIndex.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/java/com/hadoop/compression/lzo/LzoIndex.java b/src/java/com/hadoop/compression/lzo/LzoIndex.java index be376477..20693473 100644 --- a/src/java/com/hadoop/compression/lzo/LzoIndex.java +++ b/src/java/com/hadoop/compression/lzo/LzoIndex.java @@ -183,15 +183,7 @@ public static LzoIndex readIndex(FileSystem fs, Path lzoFile) throws IOException LzoIndexSerde serde = null; for (Class candidateClass : serdeClasses) { LzoIndexSerde candidate = null; - try { - candidate = candidateClass.newInstance(); - } catch (InstantiationException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (IllegalAccessException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + candidate = quietGetInstance(candidateClass); if (candidate.accepts(firstLong)) { serde = candidate; break; @@ -293,5 +285,17 @@ public static void createIndex(FileSystem fs, Path lzoFile) } } } + + private static LzoIndexSerde quietGetInstance(Class klass) throws IOException { + LzoIndexSerde instance = null; + try { + instance = klass.newInstance(); + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + return instance; + } } From 2016eeab3f575066249e53894ac3d2a54a7b455a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryaboy Date: Sat, 11 Feb 2012 16:23:17 -0800 Subject: [PATCH 4/6] Add a compressed index representation. --- .../compression/lzo/LzoBasicIndexSerde.java | 2 - .../com/hadoop/compression/lzo/LzoIndex.java | 8 +- .../compression/lzo/LzoTinyOffsetsSerde.java | 124 ++++++++++++++++++ .../compression/lzo/TestLzoIndexSerde.java | 19 ++- 4 files changed, 142 insertions(+), 11 deletions(-) create mode 100644 src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java diff --git a/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java b/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java index 63e0ab4c..c43c543f 100644 --- a/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java +++ b/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java @@ -39,7 +39,6 @@ public void prepareToRead(DataInputStream is) throws IOException { this.is = is; bytesIn = fillBuffer(); numBlocks = bytesIn.remaining()/8 + 1; // plus one for the first long. - nextCalls = 0; processedFirstLong = false; } @@ -53,7 +52,6 @@ public void finishWriting() throws IOException { os.close(); } - int nextCalls = 0; @Override public boolean hasNext() throws IOException { return !processedFirstLong || (bytesIn != null && bytesIn.hasRemaining()); diff --git a/src/java/com/hadoop/compression/lzo/LzoIndex.java b/src/java/com/hadoop/compression/lzo/LzoIndex.java index 20693473..289b4177 100644 --- a/src/java/com/hadoop/compression/lzo/LzoIndex.java +++ b/src/java/com/hadoop/compression/lzo/LzoIndex.java @@ -46,6 +46,7 @@ public class LzoIndex { new ArrayList>(); static { serdeClasses.add(LzoBasicIndexSerde.class); + serdeClasses.add(LzoTinyOffsetsSerde.class); } /** @@ -190,10 +191,9 @@ public static LzoIndex readIndex(FileSystem fs, Path lzoFile) throws IOException } } serde.prepareToRead(indexIn); - int idx = 0; LzoIndex index = new LzoIndex(serde.numBlocks()); - while (serde.hasNext()) { - index.set(idx++, serde.next()); + for (int i = 0; i < serde.numBlocks(); i++) { + index.set(i, serde.next()); } return index; } @@ -222,7 +222,7 @@ public static void createIndex(FileSystem fs, Path lzoFile) FSDataInputStream is = null; FSDataOutputStream os = null; - LzoIndexSerde writer = new LzoBasicIndexSerde(); + LzoIndexSerde writer = new LzoTinyOffsetsSerde(); Path outputFile = lzoFile.suffix(LZO_INDEX_SUFFIX); Path tmpOutputFile = lzoFile.suffix(LZO_TMP_INDEX_SUFFIX); diff --git a/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java b/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java new file mode 100644 index 00000000..fe0c2655 --- /dev/null +++ b/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java @@ -0,0 +1,124 @@ +package com.hadoop.compression.lzo; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; + +/** + * The index is stored as follows: + *

    + *
  • 4 bytes: 1 for topmost bit, 30 0s, 1 (version number).. + *
  • 4 bytes: first offset (offset from LZO header) + *
  • 4 bytes: size of first block + *
  • Sequence of 2-byte shorts: delta to size of the first split that gets you to size of next split. + *
+ */ +public class LzoTinyOffsetsSerde implements LzoIndexSerde { + + private static final int BUFFER_CAPACITY = 16 * 1024 * 8; //size for a 4GB file (with 256KB lzo blocks) + + private DataOutputStream os; + private DataInputStream is; + private ByteBuffer bytesIn; + private int numBlocks = 0; + + private boolean readFirstLong = false; + private int firstBlockSize; + private boolean wroteFirstBlock = false; + + private boolean readInitialOffset = false; + private boolean wroteInitialOffset = false; + private long currOffset = 0; + + private static final int VERSION = (1 << 31) + 1; + + @Override + public boolean accepts(long firstLong) { + if ( ((int) (firstLong >>> 32)) == VERSION) { + currOffset = (int) (firstLong << 32 >>> 32); + return true; + } else { + return false; + } + } + + @Override + public void prepareToWrite(DataOutputStream os) throws IOException { + this.os = os; + os.writeInt(VERSION); + wroteFirstBlock = false; + wroteInitialOffset = false; + } + + @Override + public void prepareToRead(DataInputStream is) throws IOException { + this.is = is; + bytesIn = fillBuffer(); + int remaining = bytesIn.remaining(); + numBlocks = (remaining == 0) ? 1 : + (remaining == 4) ? 2 : + (bytesIn.remaining() - 4) / 2 + 2; // plus one for the first offset, plus one for block. + readFirstLong = false; + readInitialOffset = false; + } + + @Override + public void writeOffset(long offset) throws IOException { + + if (!wroteInitialOffset) { + os.writeInt((int) offset); + wroteInitialOffset = true; + } else if (!wroteFirstBlock) { + firstBlockSize = (int) (offset - currOffset); + os.writeInt(firstBlockSize); + wroteFirstBlock = true; + } else { + os.writeShort((short) ( (offset - currOffset) - firstBlockSize)); + } + currOffset = offset; + + } + + @Override + public void finishWriting() throws IOException { + os.close(); + } + + @Override + public boolean hasNext() throws IOException { + return !readInitialOffset || !readFirstLong || (bytesIn != null && bytesIn.hasRemaining()); + } + + @Override + public long next() throws IOException { + if (!readInitialOffset) { + readInitialOffset = true; + } else if (!readFirstLong) { + readFirstLong = true; + firstBlockSize = bytesIn.getInt(); + currOffset += firstBlockSize; + } else if (bytesIn != null && bytesIn.hasRemaining()) { + currOffset += firstBlockSize + bytesIn.getShort(); + } else { + throw new IOException("Attempt to read past the edge of the index."); + } + return currOffset; + } + + private ByteBuffer fillBuffer() throws IOException { + DataOutputBuffer bytes = new DataOutputBuffer(BUFFER_CAPACITY); + // copy indexIn and close it if finished + IOUtils.copyBytes(is, bytes, 4*1024, true); + return ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()); + } + + @Override + public int numBlocks() { + return numBlocks; + } + +} diff --git a/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java b/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java index eeafe467..a2e09b4d 100644 --- a/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java +++ b/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java @@ -10,16 +10,22 @@ public class TestLzoIndexSerde extends TestCase { - public void testBasicSerde() throws IOException { + public void testBasicSerde() throws IOException, InstantiationException, IllegalAccessException { testGenericSerde(new LzoBasicIndexSerde()); } + public void testLzoTinyOffsetsSerde() throws IOException, InstantiationException, IllegalAccessException { + testGenericSerde(new LzoTinyOffsetsSerde()); + } + /** * Ensures that the provided serde can read its own output correctly * @param serde * @throws IOException + * @throws IllegalAccessException + * @throws InstantiationException */ - public void testGenericSerde(LzoIndexSerde serde) throws IOException { + public void testGenericSerde(LzoIndexSerde serde) throws IOException, InstantiationException, IllegalAccessException { long[] expected = { 40L, 500L, 584L, 10017L }; ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream os = new DataOutputStream(baos); @@ -28,16 +34,19 @@ public void testGenericSerde(LzoIndexSerde serde) throws IOException { serde.writeOffset(val); } serde.finishWriting(); + serde = serde.getClass().newInstance(); ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); DataInputStream is = new DataInputStream(bais); long firstLong = is.readLong(); - assertTrue(serde.accepts(firstLong)); + assertTrue("Serde does not accept its own first long", serde.accepts(firstLong)); serde.prepareToRead(is); + assertEquals("Serde reports different number of blocks than expected", expected.length, serde.numBlocks()); for (long val : expected) { - assertTrue(serde.hasNext()); - assertEquals(val, serde.next()); + assertTrue("Serde does not return as many values as were written", serde.hasNext()); + assertEquals("Serde returned wrong offset", val, serde.next()); } + assertFalse(serde.hasNext()); } } From 8ee9cea4c2d98bf2bd71e9473f5702a4bab54c72 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryaboy Date: Sat, 11 Feb 2012 16:29:22 -0800 Subject: [PATCH 5/6] Added GPL licenses where required. --- .../compression/lzo/LzoBasicIndexSerde.java | 18 ++++++++++++++++++ .../compression/lzo/LzoTinyOffsetsSerde.java | 18 ++++++++++++++++++ .../compression/lzo/TestLzoIndexSerde.java | 18 ++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java b/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java index c43c543f..45708258 100644 --- a/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java +++ b/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java @@ -1,3 +1,21 @@ +/* + * This file is part of Hadoop-Gpl-Compression. + * + * Hadoop-Gpl-Compression is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Hadoop-Gpl-Compression is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hadoop-Gpl-Compression. If not, see + * . + */ + package com.hadoop.compression.lzo; import java.io.DataInputStream; diff --git a/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java b/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java index fe0c2655..589e30ec 100644 --- a/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java +++ b/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java @@ -1,3 +1,21 @@ +/* + * This file is part of Hadoop-Gpl-Compression. + * + * Hadoop-Gpl-Compression is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Hadoop-Gpl-Compression is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hadoop-Gpl-Compression. If not, see + * . + */ + package com.hadoop.compression.lzo; import java.io.DataInputStream; diff --git a/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java b/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java index a2e09b4d..22afefc6 100644 --- a/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java +++ b/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java @@ -1,3 +1,21 @@ +/* + * This file is part of Hadoop-Gpl-Compression. + * + * Hadoop-Gpl-Compression is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Hadoop-Gpl-Compression is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hadoop-Gpl-Compression. If not, see + * . + */ + package com.hadoop.compression.lzo; import java.io.ByteArrayInputStream; From 76c4aa0907c26d395ef21663fe49292f240f9193 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryaboy Date: Tue, 14 Feb 2012 22:43:41 -0800 Subject: [PATCH 6/6] rewrite LzoTinyOffsets to use VarInt from Mahout. Add NOTICE file. --- NOTICE.txt | 10 + .../compression/lzo/LzoBasicIndexSerde.java | 4 +- .../com/hadoop/compression/lzo/LzoIndex.java | 14 +- .../hadoop/compression/lzo/LzoIndexSerde.java | 9 +- .../compression/lzo/LzoTinyOffsetsSerde.java | 61 +++--- .../com/hadoop/compression/lzo/Varint.java | 180 ++++++++++++++++++ .../compression/lzo/TestLzoIndexSerde.java | 1 - 7 files changed, 235 insertions(+), 44 deletions(-) create mode 100644 NOTICE.txt create mode 100644 src/java/com/hadoop/compression/lzo/Varint.java diff --git a/NOTICE.txt b/NOTICE.txt new file mode 100644 index 00000000..eab39c08 --- /dev/null +++ b/NOTICE.txt @@ -0,0 +1,10 @@ +This product is distributed under the GNU General Public License (GPL v2+) (see COPYING). + +This product includes software developed by several Apache Software Foundation projects, +licensed under the Apache License, 2.0, including but not limited to: + - Apache Mahout + - Apache Hadoop + +This product includes a JUnit jar: http://junit.sourceforge.net/ +License: Common Public License - v 1.0 (http://junit.sourceforge.net/cpl-v10.html) +Copyright (c) 2000-2006, www.hamcrest.org diff --git a/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java b/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java index 45708258..83fa660a 100644 --- a/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java +++ b/src/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java @@ -96,8 +96,8 @@ private ByteBuffer fillBuffer() throws IOException { } @Override - public int numBlocks() { - return numBlocks; + public void finishReading() throws IOException { + is.close(); } } diff --git a/src/java/com/hadoop/compression/lzo/LzoIndex.java b/src/java/com/hadoop/compression/lzo/LzoIndex.java index 289b4177..f7635cc4 100644 --- a/src/java/com/hadoop/compression/lzo/LzoIndex.java +++ b/src/java/com/hadoop/compression/lzo/LzoIndex.java @@ -191,9 +191,17 @@ public static LzoIndex readIndex(FileSystem fs, Path lzoFile) throws IOException } } serde.prepareToRead(indexIn); - LzoIndex index = new LzoIndex(serde.numBlocks()); - for (int i = 0; i < serde.numBlocks(); i++) { - index.set(i, serde.next()); + // Sized for at least 1 256MB HDFS block with 256KB Lzo blocks. + // if it's less than that, you shouldn't bother indexing anyway. + ArrayList offsets = new ArrayList(1024); + while (serde.hasNext()) { + offsets.add(serde.next()); + } + serde.finishReading(); + + LzoIndex index = new LzoIndex(offsets.size()); + for (int i = 0; i < offsets.size(); i++) { + index.set(i, offsets.get(i)); } return index; } diff --git a/src/java/com/hadoop/compression/lzo/LzoIndexSerde.java b/src/java/com/hadoop/compression/lzo/LzoIndexSerde.java index 6616947c..ab2a29f1 100644 --- a/src/java/com/hadoop/compression/lzo/LzoIndexSerde.java +++ b/src/java/com/hadoop/compression/lzo/LzoIndexSerde.java @@ -58,15 +58,10 @@ public interface LzoIndexSerde { public void finishWriting() throws IOException; + public void finishReading() throws IOException; + public boolean hasNext() throws IOException; public long next() throws IOException; - /** - * Get the number of block expected to be read from this index. - * Will only be called after prepareToRead(). - * @return number of block offsets that will be read back. - */ - public int numBlocks(); - } diff --git a/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java b/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java index 589e30ec..36c8b22a 100644 --- a/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java +++ b/src/java/com/hadoop/compression/lzo/LzoTinyOffsetsSerde.java @@ -20,11 +20,8 @@ import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.IOUtils; /** * The index is stored as follows: @@ -37,12 +34,8 @@ */ public class LzoTinyOffsetsSerde implements LzoIndexSerde { - private static final int BUFFER_CAPACITY = 16 * 1024 * 8; //size for a 4GB file (with 256KB lzo blocks) - private DataOutputStream os; private DataInputStream is; - private ByteBuffer bytesIn; - private int numBlocks = 0; private boolean readFirstLong = false; private int firstBlockSize; @@ -52,6 +45,10 @@ public class LzoTinyOffsetsSerde implements LzoIndexSerde { private boolean wroteInitialOffset = false; private long currOffset = 0; + // for hasNext, we have to read the next value + // (or, if the buffer hasn't been used, just check it for null) + private Integer bufferedOffset = null; + private static final int VERSION = (1 << 31) + 1; @Override @@ -75,11 +72,6 @@ public void prepareToWrite(DataOutputStream os) throws IOException { @Override public void prepareToRead(DataInputStream is) throws IOException { this.is = is; - bytesIn = fillBuffer(); - int remaining = bytesIn.remaining(); - numBlocks = (remaining == 0) ? 1 : - (remaining == 4) ? 2 : - (bytesIn.remaining() - 4) / 2 + 2; // plus one for the first offset, plus one for block. readFirstLong = false; readInitialOffset = false; } @@ -95,7 +87,8 @@ public void writeOffset(long offset) throws IOException { os.writeInt(firstBlockSize); wroteFirstBlock = true; } else { - os.writeShort((short) ( (offset - currOffset) - firstBlockSize)); + int delta = ((int) (offset - currOffset)) - firstBlockSize; + Varint.writeSignedVarInt(delta, os); } currOffset = offset; @@ -106,9 +99,25 @@ public void finishWriting() throws IOException { os.close(); } + @Override + public void finishReading() throws IOException { + is.close(); + } + @Override public boolean hasNext() throws IOException { - return !readInitialOffset || !readFirstLong || (bytesIn != null && bytesIn.hasRemaining()); + if (readInitialOffset && readFirstLong) { + if (bufferedOffset == null) { + // try to read something. If we hit EOF, we are done. + try { + bufferedOffset = Varint.readSignedVarInt(is); + } catch (EOFException e) { + return false; + } + } + return true; + } + return !readInitialOffset || (!readFirstLong && is.available() != 0); } @Override @@ -117,26 +126,16 @@ public long next() throws IOException { readInitialOffset = true; } else if (!readFirstLong) { readFirstLong = true; - firstBlockSize = bytesIn.getInt(); + firstBlockSize = is.readInt(); currOffset += firstBlockSize; - } else if (bytesIn != null && bytesIn.hasRemaining()) { - currOffset += firstBlockSize + bytesIn.getShort(); } else { - throw new IOException("Attempt to read past the edge of the index."); + if (bufferedOffset == null) { + bufferedOffset = Varint.readSignedVarInt(is); + } + currOffset += firstBlockSize + bufferedOffset; + bufferedOffset = null; } return currOffset; } - private ByteBuffer fillBuffer() throws IOException { - DataOutputBuffer bytes = new DataOutputBuffer(BUFFER_CAPACITY); - // copy indexIn and close it if finished - IOUtils.copyBytes(is, bytes, 4*1024, true); - return ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()); - } - - @Override - public int numBlocks() { - return numBlocks; - } - } diff --git a/src/java/com/hadoop/compression/lzo/Varint.java b/src/java/com/hadoop/compression/lzo/Varint.java new file mode 100644 index 00000000..ad417d22 --- /dev/null +++ b/src/java/com/hadoop/compression/lzo/Varint.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * The Varint code is taken from the Apache Mahout project under the + * Apache Software License. + * https://github.com/apache/mahout/blob/trunk/core/src/main/java/org/apache/mahout/math/Varint.java + * + * We've replace the guava Preconditions usage with simple if() {throw IllegalArgumentException} blocks. + * Other than that, it's a direct copy as of Feb 14 2012. + * + */ + +package com.hadoop.compression.lzo; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + + +/** + *

Encodes signed and unsigned values using a common variable-length + * scheme, found for example in + * + * Google's Protocol Buffers. It uses fewer bytes to encode smaller values, + * but will use slightly more bytes to encode large values.

+ * + *

Signed values are further encoded using so-called zig-zag encoding + * in order to make them "compatible" with variable-length encoding.

+ */ +public final class Varint { + + private Varint() { + } + + /** + * Encodes a value using the variable-length encoding from + * + * Google Protocol Buffers. It uses zig-zag encoding to efficiently + * encode signed values. If values are known to be nonnegative, + * {@link #writeUnsignedVarLong(long, DataOutput)} should be used. + * + * @param value value to encode + * @param out to write bytes to + * @throws IOException if {@link DataOutput} throws {@link IOException} + */ + public static void writeSignedVarLong(long value, DataOutput out) throws IOException { + // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types + writeUnsignedVarLong((value << 1) ^ (value >> 63), out); + } + + /** + * Encodes a value using the variable-length encoding from + * + * Google Protocol Buffers. Zig-zag is not used, so input must not be negative. + * If values can be negative, use {@link #writeSignedVarLong(long, DataOutput)} + * instead. This method treats negative input as like a large unsigned value. + * + * @param value value to encode + * @param out to write bytes to + * @throws IOException if {@link DataOutput} throws {@link IOException} + */ + public static void writeUnsignedVarLong(long value, DataOutput out) throws IOException { + while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) { + out.writeByte(((int) value & 0x7F) | 0x80); + value >>>= 7; + } + out.writeByte((int) value & 0x7F); + } + + /** + * @see #writeSignedVarLong(long, DataOutput) + */ + public static void writeSignedVarInt(int value, DataOutput out) throws IOException { + // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types + writeUnsignedVarInt((value << 1) ^ (value >> 31), out); + } + + /** + * @see #writeUnsignedVarLong(long, DataOutput) + */ + public static void writeUnsignedVarInt(int value, DataOutput out) throws IOException { + while ((value & 0xFFFFFF80) != 0L) { + out.writeByte((value & 0x7F) | 0x80); + value >>>= 7; + } + out.writeByte(value & 0x7F); + } + + /** + * @param in to read bytes from + * @return decode value + * @throws IOException if {@link DataInput} throws {@link IOException} + * @throws IllegalArgumentException if variable-length value does not terminate + * after 9 bytes have been read + * @see #writeSignedVarLong(long, DataOutput) + */ + public static long readSignedVarLong(DataInput in) throws IOException { + long raw = readUnsignedVarLong(in); + // This undoes the trick in writeSignedVarLong() + long temp = (((raw << 63) >> 63) ^ raw) >> 1; + // This extra step lets us deal with the largest signed values by treating + // negative results from read unsigned methods as like unsigned values + // Must re-flip the top bit if the original read value had it set. + return temp ^ (raw & (1L << 63)); + } + + /** + * @param in to read bytes from + * @return decode value + * @throws IOException if {@link DataInput} throws {@link IOException} + * @throws IllegalArgumentException if variable-length value does not terminate + * after 9 bytes have been read + * @see #writeUnsignedVarLong(long, DataOutput) + */ + public static long readUnsignedVarLong(DataInput in) throws IOException { + long value = 0L; + int i = 0; + long b; + while (((b = in.readByte()) & 0x80L) != 0) { + value |= (b & 0x7F) << i; + i += 7; + if (i > 63) { + throw new IllegalArgumentException("Variable length quantity is too long"); + } + } + return value | (b << i); + } + + /** + * @throws IllegalArgumentException if variable-length value does not terminate + * after 5 bytes have been read + * @throws IOException if {@link DataInput} throws {@link IOException} + * @see #readSignedVarLong(DataInput) + */ + public static int readSignedVarInt(DataInput in) throws IOException { + int raw = readUnsignedVarInt(in); + // This undoes the trick in writeSignedVarInt() + int temp = (((raw << 31) >> 31) ^ raw) >> 1; + // This extra step lets us deal with the largest signed values by treating + // negative results from read unsigned methods as like unsigned values. + // Must re-flip the top bit if the original read value had it set. + return temp ^ (raw & (1 << 31)); + } + + /** + * @throws IllegalArgumentException if variable-length value does not terminate + * after 5 bytes have been read + * @throws IOException if {@link DataInput} throws {@link IOException} + * @see #readUnsignedVarLong(DataInput) + */ + public static int readUnsignedVarInt(DataInput in) throws IOException { + int value = 0; + int i = 0; + int b; + while (((b = in.readByte()) & 0x80) != 0) { + value |= (b & 0x7F) << i; + i += 7; + if (i > 35) { + throw new IllegalArgumentException("Variable length quantity is too long"); + } + } + return value | (b << i); + } + +} \ No newline at end of file diff --git a/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java b/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java index 22afefc6..42a5f77e 100644 --- a/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java +++ b/src/test/com/hadoop/compression/lzo/TestLzoIndexSerde.java @@ -59,7 +59,6 @@ public void testGenericSerde(LzoIndexSerde serde) throws IOException, Instantiat long firstLong = is.readLong(); assertTrue("Serde does not accept its own first long", serde.accepts(firstLong)); serde.prepareToRead(is); - assertEquals("Serde reports different number of blocks than expected", expected.length, serde.numBlocks()); for (long val : expected) { assertTrue("Serde does not return as many values as were written", serde.hasNext()); assertEquals("Serde returned wrong offset", val, serde.next());