diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-api/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompatApi.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-api/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompatApi.java index 48ab8846f6..b9d6a9dd31 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-api/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompatApi.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-api/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompatApi.java @@ -36,4 +36,6 @@ public static HBaseCompatApi getInstance() { public abstract List getServerList(Admin admin) throws IOException; public abstract byte[][] getTableStartKeys(Admin admin, Table table) throws IOException; + + public abstract boolean isSnappyAvailable(); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.0/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.0/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java index bb74dabf4c..923c061c83 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.0/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.0/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java @@ -7,6 +7,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.compress.SnappyCodec; import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompat; import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompatApi; @@ -45,4 +46,9 @@ public byte[][] getTableStartKeys(Admin admin, Table table) throws IOException { } return startKeys; } + + @Override + public boolean isSnappyAvailable() { + return SnappyCodec.isNativeCodeLoaded(); + } } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.2/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.2/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java index f7cf534508..455d57aa26 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.2/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.2/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java @@ -7,6 +7,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.compress.SnappyCodec; import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompat; import org.opencb.opencga.storage.hadoop.variant.annotation.phoenix.PhoenixCompatApi; @@ -45,4 +46,9 @@ public byte[][] getTableStartKeys(Admin admin, Table table) throws IOException { } return startKeys; } + + @Override + public boolean isSnappyAvailable() { + return SnappyCodec.isNativeCodeLoaded(); + } } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.4/pom.xml b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.4/pom.xml index a86f33deb4..66134df8dc 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.4/pom.xml +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.4/pom.xml @@ -32,7 +32,7 @@ 2.4.17 - 2.10.0 + 3.3.4 5.1.3 1.1.0 diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.4/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.4/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java index 194b47b779..d680840cb3 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.4/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-compat/opencga-storage-hadoop-compat-hbase2.4/src/main/java/org/opencb/opencga/storage/hadoop/HBaseCompat.java @@ -38,4 +38,10 @@ public List getServerList(Admin admin) throws IOException { public byte[][] getTableStartKeys(Admin admin, Table table) throws IOException { return table.getRegionLocator().getStartKeys(); } + + @Override + public boolean isSnappyAvailable() { + // [HADOOP-17125] - Using snappy-java in SnappyCodec - 3.3.1, 3.4.0 + return true; + } } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/pom.xml b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/pom.xml index f41b6d7b0f..48b381275c 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/pom.xml +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/pom.xml @@ -448,6 +448,9 @@ org.apache.tephra:tephra-core + + org.apache.tephra:tephra-core-shaded + com.lmax:disruptor diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/io/VariantExporterDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/io/VariantExporterDriver.java index a26dd84d6b..d288fbf609 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/io/VariantExporterDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/io/VariantExporterDriver.java @@ -9,7 +9,6 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DeflateCodec; import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -24,6 +23,7 @@ import org.opencb.biodata.models.variant.avro.GeneCancerAssociation; import org.opencb.biodata.models.variant.avro.VariantAvro; import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory; +import org.opencb.opencga.storage.hadoop.HBaseCompat; import org.opencb.opencga.storage.hadoop.variant.AbstractVariantsTableDriver; import org.opencb.opencga.storage.hadoop.variant.mr.VariantFileOutputFormat; import org.opencb.opencga.storage.hadoop.variant.mr.VariantLocusKey; @@ -144,7 +144,7 @@ protected void setupJob(Job job) throws IOException { LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); outputFormatClass = LazyOutputFormat.class; } - if (SnappyCodec.isNativeCodeLoaded()) { + if (HBaseCompat.getInstance().isSnappyAvailable()) { FileOutputFormat.setCompressOutput(job, true); // FIXME: SnappyCodec might not be available in client side // FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantDriver.java index 960196f6f1..e25c9be620 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantDriver.java @@ -5,7 +5,6 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DeflateCodec; import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; @@ -17,6 +16,7 @@ import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory; +import org.opencb.opencga.storage.hadoop.HBaseCompat; import org.opencb.opencga.storage.hadoop.utils.ValueOnlyTextOutputFormat; import org.opencb.opencga.storage.hadoop.variant.io.VariantDriver; import org.slf4j.Logger; @@ -164,7 +164,7 @@ protected void setupJob(Job job) throws IOException { outputFormatClass = LazyOutputFormat.class; job.setOutputFormatClass(ValueOnlyTextOutputFormat.class); - if (SnappyCodec.isNativeCodeLoaded()) { + if (HBaseCompat.getInstance().isSnappyAvailable()) { FileOutputFormat.setCompressOutput(job, true); // FIXME: SnappyCodec might not be available in client side // FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/VariantFileOutputFormat.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/VariantFileOutputFormat.java index 248bcc5d16..445dfeb587 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/VariantFileOutputFormat.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/VariantFileOutputFormat.java @@ -20,7 +20,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; @@ -54,6 +53,15 @@ */ public class VariantFileOutputFormat extends FileOutputFormat { + private static Class abfsOutputStreamClass; + + static { + try { + abfsOutputStreamClass = Class.forName("org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream"); + } catch (ClassNotFoundException e) { + abfsOutputStreamClass = null; + } + } public static final String VARIANT_OUTPUT_FORMAT = "variant.output_format"; @@ -74,7 +82,7 @@ public RecordWriter getRecordWriter(TaskAttemptContext jo FileSystem fs = file.getFileSystem(conf); FSDataOutputStream fsOs = fs.create(file, false); OutputStream out; - if (fsOs.getWrappedStream() instanceof AbfsOutputStream) { + if (isAbfsOutputStream(fsOs)) { // Disable flush on ABFS. See HADOOP-16548 out = new FilterOutputStream(fsOs) { @Override @@ -92,6 +100,10 @@ public void flush() throws IOException { return new VariantRecordWriter(configureWriter(job, countingOut), countingOut); } + private static boolean isAbfsOutputStream(FSDataOutputStream fsOs) { + return abfsOutputStreamClass != null && abfsOutputStreamClass.isInstance(fsOs.getWrappedStream()); + } + private DataWriter configureWriter(final TaskAttemptContext job, OutputStream fileOut) throws IOException { // job.getCounter(VcfDataWriter.class.getName(), "failed").increment(0); // init final Configuration conf = job.getConfiguration();