Skip to content

Commit

Permalink
Smaller lzo indexes: make #43 apply to current codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitriy Ryaboy committed Sep 5, 2014
1 parent e8c11c2 commit c620b24
Show file tree
Hide file tree
Showing 7 changed files with 609 additions and 56 deletions.
39 changes: 0 additions & 39 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -454,44 +454,5 @@
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<versionRange>[1.7,)</versionRange>
<goals>
<goal>run</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
106 changes: 106 additions & 0 deletions src/main/java/com/hadoop/compression/lzo/LzoBasicIndexSerde.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* This file is part of Hadoop-Gpl-Compression.
*
* Hadoop-Gpl-Compression is free software: you can redistribute it
* and/or modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Hadoop-Gpl-Compression is distributed in the hope that it will be
* useful, but WITHOUT ANY WARRANTY; without even the implied warranty
* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hadoop-Gpl-Compression. If not, see
* <http://www.gnu.org/licenses/>.
*/

package com.hadoop.compression.lzo;




import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;

public class LzoBasicIndexSerde implements LzoIndexSerde {

private static final int BUFFER_CAPACITY = 16 * 1024 * 8; //size for a 4GB file (with 256KB lzo blocks)

private DataOutputStream os;
private DataInputStream is;
private ByteBuffer bytesIn;
private long firstLong;
private int numBlocks = 0;
private boolean processedFirstLong = false;

@Override
public boolean accepts(long firstLong) {
if (firstLong < 0) {
return false;
} else {
this.firstLong = firstLong;
return true;
}
}

@Override
public void prepareToWrite(DataOutputStream os) throws IOException {
this.os = os;
}

@Override
public void prepareToRead(DataInputStream is) throws IOException {
this.is = is;
bytesIn = fillBuffer();
numBlocks = bytesIn.remaining()/8 + 1; // plus one for the first long.
processedFirstLong = false;
}

@Override
public void writeOffset(long offset) throws IOException {
os.writeLong(offset);
}

@Override
public void finishWriting() throws IOException {
os.close();
}

@Override
public boolean hasNext() throws IOException {
return !processedFirstLong || (bytesIn != null && bytesIn.hasRemaining());
}

@Override
public long next() throws IOException {
if (!processedFirstLong) {
processedFirstLong = true;
return firstLong;
}
if (bytesIn != null && bytesIn.hasRemaining()) {
return bytesIn.getLong();
} else {
throw new IOException("Attempt to read past the edge of the index.");
}
}

private ByteBuffer fillBuffer() throws IOException {
DataOutputBuffer bytes = new DataOutputBuffer(BUFFER_CAPACITY);
// copy indexIn and close it if finished
IOUtils.copyBytes(is, bytes, 4*1024, true);
return ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength());
}

@Override
public void finishReading() throws IOException {
is.close();
}

}
63 changes: 46 additions & 17 deletions src/main/java/com/hadoop/compression/lzo/LzoIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;

import org.apache.hadoop.conf.Configurable;
Expand All @@ -29,8 +29,6 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;

Expand All @@ -44,6 +42,15 @@ public class LzoIndex {

private long[] blockPositions_;

private static ArrayList<Class<? extends LzoIndexSerde>> serdeClasses =
new ArrayList<Class<? extends LzoIndexSerde>>();

static {
serdeClasses.add(LzoBasicIndexSerde.class);
serdeClasses.add(LzoTinyOffsetsSerde.class);
}


/**
* Create an empty index, typically indicating no index file exists.
*/
Expand Down Expand Up @@ -176,20 +183,28 @@ public static LzoIndex readIndex(FileSystem fs, Path lzoFile) throws IOException
return new LzoIndex();
}

int capacity = 16 * 1024 * 8; //size for a 4GB file (with 256KB lzo blocks)
DataOutputBuffer bytes = new DataOutputBuffer(capacity);

// copy indexIn and close it
IOUtils.copyBytes(indexIn, bytes, 4*1024, true);

ByteBuffer bytesIn = ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength());
int blocks = bytesIn.remaining()/8;
LzoIndex index = new LzoIndex(blocks);

for (int i = 0; i < blocks; i++) {
index.set(i, bytesIn.getLong());
long firstLong = indexIn.readLong();
LzoIndexSerde serde = null;
for (Class<? extends LzoIndexSerde> candidateClass : serdeClasses) {
LzoIndexSerde candidate = null;
candidate = quietGetInstance(candidateClass);
if (candidate.accepts(firstLong)) {
serde = candidate;
break;
}
}
serde.prepareToRead(indexIn);
// Sized for at least 1 256MB HDFS block with 256KB Lzo blocks.
// if it's less than that, you shouldn't bother indexing anyway.
ArrayList<Long> offsets = new ArrayList<Long>(1024);
while (serde.hasNext()) {
offsets.add(serde.next());
}
serde.finishReading();
LzoIndex index = new LzoIndex(offsets.size());
for (int i = 0; i < offsets.size(); i++) {
index.set(i, offsets.get(i));
}

return index;
}

Expand Down Expand Up @@ -217,6 +232,7 @@ public static void createIndex(FileSystem fs, Path lzoFile)

FSDataInputStream is = null;
FSDataOutputStream os = null;
LzoIndexSerde writer = new LzoTinyOffsetsSerde();
Path outputFile = lzoFile.suffix(LZO_INDEX_SUFFIX);
Path tmpOutputFile = lzoFile.suffix(LZO_TMP_INDEX_SUFFIX);

Expand All @@ -226,6 +242,7 @@ public static void createIndex(FileSystem fs, Path lzoFile)
try {
is = fs.open(lzoFile);
os = fs.create(tmpOutputFile);
writer.prepareToWrite(os);
LzopDecompressor decompressor = (LzopDecompressor) codec.createDecompressor();
// Solely for reading the header
codec.createInputStream(is, decompressor);
Expand All @@ -252,7 +269,7 @@ public static void createIndex(FileSystem fs, Path lzoFile)
numDecompressedChecksums : numDecompressedChecksums + numCompressedChecksums;
long pos = is.getPos();
// write the pos of the block start
os.writeLong(pos - 8);
writer.writeOffset(pos - 8);
// seek to the start of the next block, skip any checksums
is.seek(pos + compressedBlockSize + (4 * numChecksumsToSkip));
}
Expand All @@ -277,5 +294,17 @@ public static void createIndex(FileSystem fs, Path lzoFile)
}
}
}

private static LzoIndexSerde quietGetInstance(Class<? extends LzoIndexSerde> klass) throws IOException {
LzoIndexSerde instance = null;
try {
instance = klass.newInstance();
} catch (InstantiationException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
}
return instance;
}
}

67 changes: 67 additions & 0 deletions src/main/java/com/hadoop/compression/lzo/LzoIndexSerde.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* This file is part of Hadoop-Gpl-Compression.
*
* Hadoop-Gpl-Compression is free software: you can redistribute it
* and/or modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Hadoop-Gpl-Compression is distributed in the hope that it will be
* useful, but WITHOUT ANY WARRANTY; without even the implied warranty
* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hadoop-Gpl-Compression. If not, see
* <http://www.gnu.org/licenses/>.
*/

package com.hadoop.compression.lzo;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

public interface LzoIndexSerde {

/**
* Serdes will be tried in order until one is found that accepts
* the offered format. A format is determined from the first 8
* bytes (represented as a long) written to the index file.
* <p>
* The first long is somewhat constrained: the topmost bit should be
* 1, the next 31 are a version number by which the appropriate SerDe
* is decided, and the next 32 can have arbitrary data (a header, or
* a length of the header, or an offset.. up to you).
*
* @param firstLong
* @return true if this format is recognized by the SerDe, false otherwise.
*/
public boolean accepts(long firstLong);

public void prepareToWrite(DataOutputStream os) throws IOException;

/**
* Prepare to read the index. Note that the first 8 bits will have been already
* read from this stream, and passed to you in accepts() in the form of a long.
* @param is InputStream to read.
*/
public void prepareToRead(DataInputStream is) throws IOException;

/**
* Write the next offset into the file. It is expected that
* the offsets are supplied in order. <code>prepareToWrite()</code>
* should be called before the first invocation of this method.
* @param offset
*/
public void writeOffset(long offset) throws IOException;

public void finishWriting() throws IOException;

public void finishReading() throws IOException;

public boolean hasNext() throws IOException;

public long next() throws IOException;

}
Loading

0 comments on commit c620b24

Please sign in to comment.