Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK-7320 - Compilation failure for hadoop compatibility issues #2540

Merged
merged 1 commit into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ public static HBaseCompatApi getInstance() {
public abstract List<ServerName> getServerList(Admin admin) throws IOException;

public abstract byte[][] getTableStartKeys(Admin admin, Table table) throws IOException;

public abstract boolean isSnappyAvailable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompat;
import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompatApi;

Expand Down Expand Up @@ -45,4 +46,9 @@ public byte[][] getTableStartKeys(Admin admin, Table table) throws IOException {
}
return startKeys;
}

@Override
public boolean isSnappyAvailable() {
return SnappyCodec.isNativeCodeLoaded();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompat;
import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompatApi;

Expand Down Expand Up @@ -45,4 +46,9 @@ public byte[][] getTableStartKeys(Admin admin, Table table) throws IOException {
}
return startKeys;
}

@Override
public boolean isSnappyAvailable() {
return SnappyCodec.isNativeCodeLoaded();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

<properties>
<hbase.version>2.4.17</hbase.version>
<hadoop.version>2.10.0</hadoop.version>
<hadoop.version>3.3.4</hadoop.version>
<phoenix.version>5.1.3</phoenix.version>
<phoenix-thirdparty.version>1.1.0</phoenix-thirdparty.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,10 @@ public List<ServerName> getServerList(Admin admin) throws IOException {
public byte[][] getTableStartKeys(Admin admin, Table table) throws IOException {
return table.getRegionLocator().getStartKeys();
}

@Override
public boolean isSnappyAvailable() {
// [HADOOP-17125] - Using snappy-java in SnappyCodec - 3.3.1, 3.4.0
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,9 @@
<ignoredUsedUndeclaredDependency>
org.apache.tephra:tephra-core
</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>
org.apache.tephra:tephra-core-shaded
</ignoredUsedUndeclaredDependency>
<ignoredUsedUndeclaredDependency>
com.lmax:disruptor
</ignoredUsedUndeclaredDependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
Expand All @@ -24,6 +23,7 @@
import org.opencb.biodata.models.variant.avro.GeneCancerAssociation;
import org.opencb.biodata.models.variant.avro.VariantAvro;
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory;
import org.opencb.opencga.storage.hadoop.HBaseCompat;
import org.opencb.opencga.storage.hadoop.variant.AbstractVariantsTableDriver;
import org.opencb.opencga.storage.hadoop.variant.mr.VariantFileOutputFormat;
import org.opencb.opencga.storage.hadoop.variant.mr.VariantLocusKey;
Expand Down Expand Up @@ -144,7 +144,7 @@ protected void setupJob(Job job) throws IOException {
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
outputFormatClass = LazyOutputFormat.class;
}
if (SnappyCodec.isNativeCodeLoaded()) {
if (HBaseCompat.getInstance().isSnappyAvailable()) {
FileOutputFormat.setCompressOutput(job, true);
// FIXME: SnappyCodec might not be available in client side
// FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
Expand All @@ -17,6 +16,7 @@
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory;
import org.opencb.opencga.storage.hadoop.HBaseCompat;
import org.opencb.opencga.storage.hadoop.utils.ValueOnlyTextOutputFormat;
import org.opencb.opencga.storage.hadoop.variant.io.VariantDriver;
import org.slf4j.Logger;
Expand Down Expand Up @@ -164,7 +164,7 @@ protected void setupJob(Job job) throws IOException {
outputFormatClass = LazyOutputFormat.class;

job.setOutputFormatClass(ValueOnlyTextOutputFormat.class);
if (SnappyCodec.isNativeCodeLoaded()) {
if (HBaseCompat.getInstance().isSnappyAvailable()) {
FileOutputFormat.setCompressOutput(job, true);
// FIXME: SnappyCodec might not be available in client side
// FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
Expand Down Expand Up @@ -54,6 +53,15 @@
*/
public class VariantFileOutputFormat extends FileOutputFormat<Variant, NullWritable> {

private static Class<?> abfsOutputStreamClass;

static {
try {
abfsOutputStreamClass = Class.forName("org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream");
} catch (ClassNotFoundException e) {
abfsOutputStreamClass = null;
}
}

public static final String VARIANT_OUTPUT_FORMAT = "variant.output_format";

Expand All @@ -74,7 +82,7 @@ public RecordWriter<Variant, NullWritable> getRecordWriter(TaskAttemptContext jo
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fsOs = fs.create(file, false);
OutputStream out;
if (fsOs.getWrappedStream() instanceof AbfsOutputStream) {
if (isAbfsOutputStream(fsOs)) {
// Disable flush on ABFS. See HADOOP-16548
out = new FilterOutputStream(fsOs) {
@Override
Expand All @@ -92,6 +100,10 @@ public void flush() throws IOException {
return new VariantRecordWriter(configureWriter(job, countingOut), countingOut);
}

private static boolean isAbfsOutputStream(FSDataOutputStream fsOs) {
return abfsOutputStreamClass != null && abfsOutputStreamClass.isInstance(fsOs.getWrappedStream());
}

private DataWriter<Variant> configureWriter(final TaskAttemptContext job, OutputStream fileOut) throws IOException {
// job.getCounter(VcfDataWriter.class.getName(), "failed").increment(0); // init
final Configuration conf = job.getConfiguration();
Expand Down
Loading