diff --git a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java index d3abad4d880..5bd00b64b14 100644 --- a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java +++ b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java @@ -65,6 +65,7 @@ import org.opencb.opencga.core.models.sample.SampleAclEntry; import org.opencb.opencga.core.models.study.Study; import org.opencb.opencga.core.models.study.StudyAclEntry; +import org.opencb.opencga.core.models.variant.VariantPruneParams; import org.opencb.opencga.core.response.OpenCGAResult; import org.opencb.opencga.core.response.VariantQueryResult; import org.opencb.opencga.core.tools.ToolParams; @@ -1084,6 +1085,13 @@ public boolean exists(String study, String token) throws StorageEngineException, return engine.getMetadataManager().studyExists(studyFqn); } + public void variantPrune(String project, URI outdir, VariantPruneParams params, String token) throws StorageEngineException, CatalogException { + secureOperationByProject(VariantPruneOperationTool.ID, project, new ObjectMap(), token, engine -> { + engine.variantsPrune(params.isDryRun(), params.isResume(), outdir); + return null; + }); + } + // Permission related methods private interface VariantReadOperation { diff --git a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java new file mode 100644 index 00000000000..faaee5e681a --- /dev/null +++ b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java @@ -0,0 +1,27 @@ +package org.opencb.opencga.analysis.variant.operations; + +import org.opencb.opencga.core.models.common.Enums; +import org.opencb.opencga.core.models.variant.VariantPruneParams; +import org.opencb.opencga.core.tools.annotations.Tool; +import org.opencb.opencga.core.tools.annotations.ToolParams; + +@Tool(id = VariantPruneOperationTool.ID, description = VariantPruneOperationTool.DESCRIPTION, + type = Tool.Type.OPERATION, resource = Enums.Resource.VARIANT) +public class VariantPruneOperationTool extends OperationTool { + + public static final String DESCRIPTION = "Prune orphan variants from studies in a project."; + public static final String ID = "variant-prune"; + + @ToolParams + protected VariantPruneParams params = new VariantPruneParams(); + + @Override + protected void run() throws Exception { + + step(() -> { + getVariantStorageManager().variantPrune(params.getProject(), getOutDir().toUri(), params, getToken()); + }); + + + } +} diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/common/ExceptionUtils.java b/opencga-core/src/main/java/org/opencb/opencga/core/common/ExceptionUtils.java index 2ca4b146345..89e3a6fba43 100644 --- a/opencga-core/src/main/java/org/opencb/opencga/core/common/ExceptionUtils.java +++ b/opencga-core/src/main/java/org/opencb/opencga/core/common/ExceptionUtils.java @@ -27,19 +27,27 @@ public static StringBuilder prettyExceptionMessage(Throwable exception, StringBu public static StringBuilder prettyExceptionMessage(Throwable exception, StringBuilder message, boolean multiline, boolean includeClassName) { + String separator; + if (multiline) { + separator = "\n"; + } else { + separator = " ; "; + } + return prettyExceptionMessage(exception, message, multiline, includeClassName, separator); + } + + private static StringBuilder prettyExceptionMessage(Throwable exception, StringBuilder message, boolean multiline, + boolean includeClassName, String separator) { Set messages = new HashSet<>(); do { - if (exception.getMessage() != null && !messages.add(exception.getMessage())) { + if (exception.getMessage() != null && !messages.add(exception.getMessage()) + && exception.getSuppressed().length == 0) { // Duplicated message. Skip this cause exception = exception.getCause(); continue; } if (message.length() != 0) { - if (multiline) { - message.append("\n"); - } else { - message.append(" ; "); - } + message.append(separator); } String exMessage = exception.getMessage(); if (StringUtils.isBlank(exMessage)) { @@ -51,6 +59,26 @@ public static StringBuilder prettyExceptionMessage(Throwable exception, StringBu message.append("[").append(exception.getClass().getSimpleName()).append("] "); } message.append(exMessage); + if (exception.getSuppressed().length > 0) { + StringBuilder sb = new StringBuilder(); + String intraSeparator = multiline ? separator + " " : separator; + for (Throwable suppressed : exception.getSuppressed()) { + prettyExceptionMessage(suppressed, sb, multiline, includeClassName, intraSeparator); + } + message.append(" "); + } exception = exception.getCause(); } while (exception != null); diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/models/variant/VariantPruneParams.java b/opencga-core/src/main/java/org/opencb/opencga/core/models/variant/VariantPruneParams.java new file mode 100644 index 00000000000..ed0449871e6 --- /dev/null +++ b/opencga-core/src/main/java/org/opencb/opencga/core/models/variant/VariantPruneParams.java @@ -0,0 +1,39 @@ +package org.opencb.opencga.core.models.variant; + +import org.opencb.opencga.core.tools.ToolParams; + +public class VariantPruneParams extends ToolParams { + + public static final String DESCRIPTION = "Variant prune params. Use dry-run to just generate a report with the orphan variants."; + private String project; + private boolean dryRun; + private boolean resume; + + + public String getProject() { + return project; + } + + public VariantPruneParams setProject(String project) { + this.project = project; + return this; + } + + public boolean isDryRun() { + return dryRun; + } + + public VariantPruneParams setDryRun(boolean dryRun) { + this.dryRun = dryRun; + return this; + } + + public boolean isResume() { + return resume; + } + + public VariantPruneParams setResume(boolean resume) { + this.resume = resume; + return this; + } +} diff --git a/opencga-server/src/main/java/org/opencb/opencga/server/rest/operations/VariantOperationWebService.java b/opencga-server/src/main/java/org/opencb/opencga/server/rest/operations/VariantOperationWebService.java index 18ae35974be..218722e851d 100644 --- a/opencga-server/src/main/java/org/opencb/opencga/server/rest/operations/VariantOperationWebService.java +++ b/opencga-server/src/main/java/org/opencb/opencga/server/rest/operations/VariantOperationWebService.java @@ -472,6 +472,18 @@ public Response julie( return submitOperationToProject(JulieTool.ID, project, params, jobName, jobDescription, dependsOn, jobTags); } + @POST + @Path("/variant/prune") + @ApiOperation(value = VariantPruneOperationTool.DESCRIPTION, response = Job.class) + public Response variantPrune( + @ApiParam(value = ParamConstants.JOB_ID_CREATION_DESCRIPTION) @QueryParam(ParamConstants.JOB_ID) String jobName, + @ApiParam(value = ParamConstants.JOB_DESCRIPTION_DESCRIPTION) @QueryParam(ParamConstants.JOB_DESCRIPTION) String jobDescription, + @ApiParam(value = ParamConstants.JOB_DEPENDS_ON_DESCRIPTION) @QueryParam(JOB_DEPENDS_ON) String dependsOn, + @ApiParam(value = ParamConstants.JOB_TAGS_DESCRIPTION) @QueryParam(ParamConstants.JOB_TAGS) String jobTags, + @ApiParam(value = VariantPruneParams.DESCRIPTION) VariantPruneParams params) { + return submitOperationToProject(VariantPruneOperationTool.ID, params.getProject(), params, jobName, jobDescription, dependsOn, jobTags); + } + public Response submitOperation(String toolId, String study, ToolParams params, String jobName, String jobDescription, String jobDependsOn, String jobTags) { return submitOperation(toolId, null, study, params, jobName, jobDescription, jobDependsOn, jobTags); diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java index 287ab7dd00d..4cbc92a394d 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java @@ -1252,13 +1252,13 @@ public void unsecureUpdateTask(int studyId, TaskMetadata task) throws StorageEng taskDBAdaptor.updateTask(studyId, task, null); } - public TaskMetadata updateTask(int studyId, int taskId, UpdateFunction update) + public TaskMetadata updateTask(int studyId, int taskId, UpdateConsumer consumer) throws E, StorageEngineException { getTask(studyId, taskId); // Check task exists Lock lock = taskDBAdaptor.lock(studyId, taskId, lockDuration, lockTimeout); try { TaskMetadata task = getTask(studyId, taskId); - task = update.update(task); + consumer.update(task); lock.checkLocked(); unsecureUpdateTask(studyId, task); return task; @@ -1732,7 +1732,6 @@ public TaskMetadata.Status setStatus(int studyId, int taskId, TaskMetadata.Statu updateTask(studyId, taskId, task -> { previousStatus.set(task.currentStatus()); task.addStatus(Calendar.getInstance().getTime(), status); - return task; }); return previousStatus.get(); } @@ -1765,13 +1764,7 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName, List false); } /** - * Adds a new {@link TaskMetadata} to the Study Metadata. - * - * Allow execute concurrent operations depending on the "allowConcurrent" predicate. - * If any operation is in ERROR, is not the same operation, and concurrency is not allowed, - * throw {@link StorageEngineException#otherOperationInProgressException} - * If any operation is DONE, RUNNING, is same operation and resume=true, continue - * If all operations are ready, continue + * Find out if the task attempt can be executed, and if it should be a new variant, of continue executing a previous task. * * @param studyId Study id * @param jobOperationName Job operation name used to create the jobName and as {@link TaskMetadata#getOperationName()} @@ -1783,11 +1776,9 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName, List fileIds, boolean resume, TaskMetadata.Type type, - Predicate allowConcurrent) + private TaskMetadata getRunningTaskCompatibleOrFail(int studyId, String jobOperationName, List fileIds, boolean resume, + TaskMetadata.Type type, Predicate allowConcurrent) throws StorageEngineException { - TaskMetadata resumeTask = null; Iterator iterator = taskIterator(studyId, Arrays.asList( TaskMetadata.Status.DONE, @@ -1832,6 +1823,33 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName, throw new IllegalArgumentException("Unknown Status " + currentStatus); } } + return resumeTask; + } + + /** + * Adds a new {@link TaskMetadata} to the Study Metadata. + * + * Allow execute concurrent operations depending on the "allowConcurrent" predicate. + * If any operation is in ERROR, is not the same operation, and concurrency is not allowed, + * throw {@link StorageEngineException#otherOperationInProgressException} + * If any operation is DONE, RUNNING, is same operation and resume=true, continue + * If all operations are ready, continue + * + * @param studyId Study id + * @param jobOperationName Job operation name used to create the jobName and as {@link TaskMetadata#getOperationName()} + * @param fileIds Files to be processed in this batch. + * @param resume Resume operation. Assume that previous operation went wrong. + * @param type Operation type as {@link TaskMetadata.Type} + * @param allowConcurrent Predicate to test if the new operation can be executed at the same time as a non ready operation. + * If not, throws {@link StorageEngineException#otherOperationInProgressException} + * @return The current batchOperation + * @throws StorageEngineException if the operation can't be executed + */ + public TaskMetadata addRunningTask(int studyId, String jobOperationName, + List fileIds, boolean resume, TaskMetadata.Type type, + Predicate allowConcurrent) + throws StorageEngineException { + TaskMetadata resumeTask = getRunningTaskCompatibleOrFail(studyId, jobOperationName, fileIds, resume, type, allowConcurrent); TaskMetadata task; if (resumeTask == null) { @@ -1848,7 +1866,7 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName, throw new StorageEngineException("Attempt to execute a concurrent modification of task " + thisTask.getName() + " (" + thisTask.getId() + ") "); } else { - return thisTask.addStatus(Calendar.getInstance().getTime(), TaskMetadata.Status.RUNNING); + thisTask.addStatus(Calendar.getInstance().getTime(), TaskMetadata.Status.RUNNING); } }); } @@ -1856,6 +1874,26 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName, return task; } + + /** + * Check if a task can be executed. + * + * @param studyId Study id + * @param jobOperationName Job operation name used to create the jobName and as {@link TaskMetadata#getOperationName()} + * @param fileIds Files to be processed in this batch. + * @param resume Resume operation. Assume that previous operation went wrong. + * @param type Operation type as {@link TaskMetadata.Type} + * @param allowConcurrent Predicate to test if the new operation can be executed at the same time as a non ready operation. + * If not, throws {@link StorageEngineException#otherOperationInProgressException} + * @throws StorageEngineException if the operation can't be executed + */ + public void checkTaskCanRun(int studyId, String jobOperationName, List fileIds, boolean resume, + TaskMetadata.Type type, Predicate allowConcurrent) + throws StorageEngineException { + getRunningTaskCompatibleOrFail(studyId, jobOperationName, fileIds, resume, type, allowConcurrent); + } + + protected void checkName(final String type, String name) { if (StringUtils.isEmpty(name)) { throw new IllegalArgumentException(type + " can not be empty!"); diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java index 323ea94088a..5e2558339de 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java @@ -976,6 +976,18 @@ protected void postRemoveFiles(String study, List fileIds, List getVariantDeleter(String collection) { + return list -> { + try { + if (list != null) { + delete(collection, list.stream().map(Variant::toString).collect(Collectors.toList())); + } + } catch (IOException | SolrServerException e) { + throw new RuntimeException(e); + } + return true; + }; + } + /*------------------------------------- * P R I V A T E M E T H O D S -------------------------------------*/ diff --git a/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/VariantStorageBaseTest.java b/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/VariantStorageBaseTest.java index a5837f1ded4..4fc4d5e7275 100644 --- a/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/VariantStorageBaseTest.java +++ b/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/VariantStorageBaseTest.java @@ -55,6 +55,7 @@ public abstract class VariantStorageBaseTest extends GenericTest implements Vari public static final int NUM_VARIANTS = 9792; @Deprecated public static final int STUDY_ID = 1; public static final String STUDY_NAME = "1000g"; + public static final String STUDY_NAME_1 = STUDY_NAME; public static final String STUDY_NAME_2 = "study_2"; public static final String DB_NAME = "opencga_variants_test"; @Deprecated public static final int FILE_ID = 1; diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/app/PendingVariantsMain.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/app/PendingVariantsMain.java index 683c2bd3cb9..be3239203b9 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/app/PendingVariantsMain.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/app/PendingVariantsMain.java @@ -13,6 +13,7 @@ import org.opencb.opencga.storage.hadoop.variant.adaptors.VariantHadoopDBAdaptor; import org.opencb.opencga.storage.hadoop.variant.annotation.pending.AnnotationPendingVariantsManager; import org.opencb.opencga.storage.hadoop.variant.pending.PendingVariantsManager; +import org.opencb.opencga.storage.hadoop.variant.prune.SecondaryIndexPrunePendingVariantsManager; import org.opencb.opencga.storage.hadoop.variant.search.SecondaryIndexPendingVariantsManager; import org.opencb.opencga.storage.hadoop.variant.utils.HBaseVariantTableNameGenerator; @@ -57,7 +58,8 @@ public void run(String[] args) throws Exception { .stream() .map(TableName::getNameAsString) .filter(t -> HBaseVariantTableNameGenerator.isValidPendingSecondaryIndexTableName(t) - || HBaseVariantTableNameGenerator.isValidPendingAnnotationTableName(t)) + || HBaseVariantTableNameGenerator.isValidPendingAnnotationTableName(t) + || HBaseVariantTableNameGenerator.isValidPendingSecondaryIndexPruneTableName(t)) ); break; } @@ -117,6 +119,12 @@ private PendingVariantsManager getPendingVariantsManager(HBaseManager hBaseManag System.err.println("Detect SecondaryIndexPendingVariants table"); return new SecondaryIndexPendingVariantsManager( new VariantHadoopDBAdaptor(hBaseManager, hBaseManager.getConf(), tableNameGenerator, new ObjectMap())); + } else if (HBaseVariantTableNameGenerator.isValidPendingSecondaryIndexPruneTableName(table)) { + String dbName = HBaseVariantTableNameGenerator.getDBNameFromPendingSecondaryIndexPruneTableName(table); + HBaseVariantTableNameGenerator tableNameGenerator = new HBaseVariantTableNameGenerator(dbName, hBaseManager.getConf()); + System.err.println("Detect SecondaryIndexPendingPruneVariants table"); + return new SecondaryIndexPrunePendingVariantsManager( + new VariantHadoopDBAdaptor(hBaseManager, hBaseManager.getConf(), tableNameGenerator, new ObjectMap())); } else { throw new IllegalArgumentIOException("Table '" + table + "' is not a pendig vairants table"); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/AbstractHBaseDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/AbstractHBaseDriver.java index 57f5c09aa24..c6900f03f6b 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/AbstractHBaseDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/utils/AbstractHBaseDriver.java @@ -294,6 +294,59 @@ protected void deleteTemporaryFile(Path outdir) throws IOException { fileSystem.cancelDeleteOnExit(outdir); } + public class MapReduceOutputFile { + public static final String OUTPUT_PARAM = "output"; + + private final Supplier nameGenerator; + private final String tempFilePrefix; + protected Path localOutput; + protected Path outdir; + + public MapReduceOutputFile(Supplier nameGenerator, String tempFilePrefix) throws IOException { + this.nameGenerator = nameGenerator; + this.tempFilePrefix = tempFilePrefix; + getOutputPath(); + } + + protected void getOutputPath() throws IOException { + String outdirStr = getParam(OUTPUT_PARAM); + if (StringUtils.isNotEmpty(outdirStr)) { + outdir = new Path(outdirStr); + + if (isLocal(outdir)) { + localOutput = AbstractHBaseDriver.this.getLocalOutput(outdir, nameGenerator); + outdir = getTempOutdir(tempFilePrefix, localOutput.getName()); + outdir.getFileSystem(getConf()).deleteOnExit(outdir); + } + if (localOutput != null) { + LOGGER.info(" * Outdir file: " + localOutput.toUri()); + LOGGER.info(" * Temporary outdir file: " + outdir.toUri()); + } else { + LOGGER.info(" * Outdir file: " + outdir.toUri()); + } + } + } + + public void postExecute(boolean succeed) throws IOException { + if (succeed) { + if (localOutput != null) { + concatMrOutputToLocal(outdir, localOutput); + } + } + if (localOutput != null) { + deleteTemporaryFile(outdir); + } + } + + public Path getLocalOutput() { + return localOutput; + } + + public Path getOutdir() { + return outdir; + } + } + /** * Concatenate all generated files from a MapReduce job into one single local file. * diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java index f25872bbaca..87181b49fc2 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java @@ -96,6 +96,7 @@ import org.opencb.opencga.storage.hadoop.variant.index.sample.SampleIndexDeleteHBaseColumnTask; import org.opencb.opencga.storage.hadoop.variant.index.sample.SampleIndexBuilder; import org.opencb.opencga.storage.hadoop.variant.io.HadoopVariantExporter; +import org.opencb.opencga.storage.hadoop.variant.prune.VariantPruneManager; import org.opencb.opencga.storage.hadoop.variant.score.HadoopVariantScoreLoader; import org.opencb.opencga.storage.hadoop.variant.score.HadoopVariantScoreRemover; import org.opencb.opencga.storage.hadoop.variant.search.HadoopVariantSearchDataWriter; @@ -947,6 +948,11 @@ public void removeStudy(String studyName, URI outdir) throws StorageEngineExcept outdir); } + @Override + public void variantsPrune(boolean dryMode, boolean resume, URI outdir) throws StorageEngineException { + new VariantPruneManager(this).prune(dryMode, resume, outdir); + } + @Override public void loadVariantScore(URI scoreFile, String study, String scoreName, String cohort1, String cohort2, VariantScoreFormatDescriptor descriptor, ObjectMap options) diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java index c23d0b10970..b9168fecada 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java @@ -69,6 +69,9 @@ public enum HadoopVariantStorageOptions implements ConfigurationOption { VARIANT_TABLE_LOAD_REFERENCE("storage.hadoop.variant.table.load.reference", false), PENDING_SECONDARY_INDEX_TABLE_COMPRESSION("storage.hadoop.pendingSecondaryIndex.table.compression", Compression.Algorithm.SNAPPY.getName()), + PENDING_SECONDARY_INDEX_PRUNE_TABLE_COMPRESSION("storage.hadoop.pendingSecondaryIndexPrune.table.compression", + Compression.Algorithm.SNAPPY.getName()), + ///////////////////////// // Archive table configuration diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/VariantsTableDeleteColumnTask.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/VariantsTableDeleteColumnTask.java index 1eaa0dd0da8..0bdc2805c3f 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/VariantsTableDeleteColumnTask.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/VariantsTableDeleteColumnTask.java @@ -31,7 +31,7 @@ protected List map(Result result) { return mutations; } else { Put put = new Put(result.getRow()); - HadoopVariantSearchIndexUtils.addNotSyncStatus(put, GenomeHelper.COLUMN_FAMILY_BYTES); + HadoopVariantSearchIndexUtils.addNotSyncStatus(put); List mutationsWithPut = new ArrayList<>(mutations.size() + 1); mutationsWithPut.addAll(mutations); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/adaptors/phoenix/VariantPhoenixSchema.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/adaptors/phoenix/VariantPhoenixSchema.java index d8a504f1667..1d408da09b4 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/adaptors/phoenix/VariantPhoenixSchema.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/adaptors/phoenix/VariantPhoenixSchema.java @@ -55,6 +55,7 @@ public final class VariantPhoenixSchema { public static final String ANNOTATION_PREFIX = "A_"; public static final String POPULATION_FREQUENCY_PREFIX = ANNOTATION_PREFIX + "PF_"; public static final String FUNCTIONAL_SCORE_PREFIX = ANNOTATION_PREFIX + "FS_"; + public static final String SAMPLE_DATA_SUFIX = "_S"; public static final byte[] SAMPLE_DATA_SUFIX_BYTES = Bytes.toBytes(SAMPLE_DATA_SUFIX); public static final String FILE_SUFIX = "_F"; @@ -312,7 +313,7 @@ public static List getHumanPopulationFrequenciesColumns() { private VariantPhoenixSchema() { } - public static List getStudyColumns(Integer studyId) { + public static List getStudyColumns(int studyId) { return Arrays.asList(getStudyColumn(studyId), getFillMissingColumn(studyId)); } @@ -432,6 +433,14 @@ public static Column getConservationScoreColumn(String source, String rawValue, } } + public static List getStatsColumns(int studyId, List cohortIds) { + List columns = new ArrayList<>(cohortIds.size() * 5); + for (Integer cohortId : cohortIds) { + columns.addAll(getStatsColumns(studyId, cohortId)); + } + return columns; + } + public static List getStatsColumns(int studyId, int cohortId) { return Arrays.asList( getStatsColumn(studyId, cohortId), @@ -483,7 +492,7 @@ public static int extractSampleId(String columnKey) { } public static Integer extractSampleId(String columnKey, boolean failOnMissing) { - if (columnKey.endsWith(SAMPLE_DATA_SUFIX)) { + if (isSampleDataColumn(columnKey)) { return extractId(columnKey, failOnMissing, "sample"); } else if (failOnMissing) { throw new IllegalArgumentException("Not a sample column: " + columnKey); @@ -497,7 +506,7 @@ public static int extractFileIdFromSampleColumn(String columnKey) { } public static Integer extractFileIdFromSampleColumn(String columnKey, boolean failOnMissing) { - if (columnKey.endsWith(SAMPLE_DATA_SUFIX) && StringUtils.countMatches(columnKey, COLUMN_KEY_SEPARATOR) == 3) { + if (isSampleDataColumn(columnKey) && StringUtils.countMatches(columnKey, COLUMN_KEY_SEPARATOR) == 3) { return extractId(columnKey, failOnMissing, "sample", columnKey.indexOf(COLUMN_KEY_SEPARATOR) + 1); } else if (failOnMissing) { throw new IllegalArgumentException("Not a sample column: " + columnKey); @@ -506,12 +515,20 @@ public static Integer extractFileIdFromSampleColumn(String columnKey, boolean fa } } + public static boolean isSampleDataColumn(String columnKey) { + return columnKey.endsWith(SAMPLE_DATA_SUFIX); + } + + public static boolean isFileColumn(String columnKey) { + return columnKey.endsWith(FILE_SUFIX); + } + public static int extractFileId(String columnKey) { return extractFileId(columnKey, true); } public static Integer extractFileId(String columnKey, boolean failOnMissing) { - if (columnKey.endsWith(FILE_SUFIX)) { + if (isFileColumn(columnKey)) { return extractId(columnKey, failOnMissing, "file"); } else if (failOnMissing) { throw new IllegalArgumentException("Not a file column: " + columnKey); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/annotation/VariantAnnotationHadoopDBWriter.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/annotation/VariantAnnotationHadoopDBWriter.java index 8a2d9ca509b..44d7cd68c99 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/annotation/VariantAnnotationHadoopDBWriter.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/annotation/VariantAnnotationHadoopDBWriter.java @@ -81,7 +81,7 @@ protected List convert(List puts) { cleanPendingVariants(); } for (Put put : puts) { - HadoopVariantSearchIndexUtils.addNotSyncStatus(put, columnFamily); + HadoopVariantSearchIndexUtils.addNotSyncStatus(put); loadedVariants.add(put.getRow()); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/load/VariantHadoopDBWriter.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/load/VariantHadoopDBWriter.java index 7d58498ec89..fbd764f498c 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/load/VariantHadoopDBWriter.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/load/VariantHadoopDBWriter.java @@ -63,7 +63,7 @@ protected List convert(List list) { if (HadoopVariantStorageEngine.TARGET_VARIANT_TYPE_SET.contains(variant.getType())) { Put put = converter.convert(variant); if (put != null) { - HadoopVariantSearchIndexUtils.addUnknownSyncStatus(put, GenomeHelper.COLUMN_FAMILY_BYTES); + HadoopVariantSearchIndexUtils.addUnknownSyncStatus(put); puts.add(put); loadedVariants.getAndIncrement(); } else { diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsDescriptor.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsDescriptor.java new file mode 100644 index 00000000000..781edd12386 --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsDescriptor.java @@ -0,0 +1,55 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager; +import org.opencb.opencga.storage.hadoop.utils.HBaseManager; +import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageOptions; +import org.opencb.opencga.storage.hadoop.variant.pending.PendingVariantsDescriptor; +import org.opencb.opencga.storage.hadoop.variant.utils.HBaseVariantTableNameGenerator; + +import java.io.IOException; +import java.util.function.Function; + +/** + * Created on 13/02/19. + * + * @author Jacobo Coll <jacobo167@gmail.com> + */ +public class SecondaryIndexPrunePendingVariantsDescriptor implements PendingVariantsDescriptor { + + @Override + public String name() { + return "prune"; + } + + @Override + public void checkValidPendingTableName(String tableName) { + HBaseVariantTableNameGenerator.checkValidPendingSecondaryIndexPruneTableName(tableName); + } + + @Override + public String getTableName(HBaseVariantTableNameGenerator generator) { + return generator.getPendingSecondaryIndexPruneTableName(); + } + + @Override + public boolean createTableIfNeeded(String tableName, HBaseManager hBaseManager) throws IOException { + return createTableIfNeeded(tableName, hBaseManager, Compression.getCompressionAlgorithmByName( + hBaseManager.getConf().get( + HadoopVariantStorageOptions.PENDING_SECONDARY_INDEX_PRUNE_TABLE_COMPRESSION.key(), + HadoopVariantStorageOptions.PENDING_SECONDARY_INDEX_PRUNE_TABLE_COMPRESSION.defaultValue()))); + } + + @Override + public Scan configureScan(Scan scan, VariantStorageMetadataManager metadataManager) { + throw new UnsupportedOperationException(); + } + + @Override + public Function getPendingEvaluatorMapper(VariantStorageMetadataManager metadataManager, boolean overwrite) { + throw new UnsupportedOperationException(); + } +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsManager.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsManager.java new file mode 100644 index 00000000000..6f34b550683 --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsManager.java @@ -0,0 +1,12 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import org.opencb.opencga.storage.hadoop.variant.adaptors.VariantHadoopDBAdaptor; +import org.opencb.opencga.storage.hadoop.variant.pending.PendingVariantsManager; + +public class SecondaryIndexPrunePendingVariantsManager extends PendingVariantsManager { + + public SecondaryIndexPrunePendingVariantsManager(VariantHadoopDBAdaptor dbAdaptor) { + super(dbAdaptor, new SecondaryIndexPrunePendingVariantsDescriptor()); + } + +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java new file mode 100644 index 00000000000..aa3217ed2fe --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java @@ -0,0 +1,384 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; +import org.opencb.biodata.models.variant.Variant; +import org.opencb.biodata.models.variant.stats.VariantStats; +import org.opencb.opencga.core.common.TimeUtils; +import org.opencb.opencga.storage.core.exceptions.StorageEngineException; +import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager; +import org.opencb.opencga.storage.core.metadata.models.VariantScoreMetadata; +import org.opencb.opencga.storage.hadoop.variant.AbstractVariantsTableDriver; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.PhoenixHelper; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixKeyFactory; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchema; +import org.opencb.opencga.storage.hadoop.variant.converters.VariantRow; +import org.opencb.opencga.storage.hadoop.variant.mr.VariantMapReduceUtil; +import org.opencb.opencga.storage.hadoop.variant.mr.VariantTableHelper; +import org.opencb.opencga.storage.hadoop.variant.search.HadoopVariantSearchIndexUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.stream.Collectors; + +import static org.opencb.biodata.models.variant.StudyEntry.DEFAULT_COHORT; +import static org.opencb.opencga.storage.hadoop.variant.GenomeHelper.COLUMN_FAMILY_BYTES; + +public class VariantPruneDriver extends AbstractVariantsTableDriver { + + private Logger logger = LoggerFactory.getLogger(VariantPruneManager.class); + public static final String ATTRIBUTE_DELETION_STUDIES = "d_studies"; + public static final String ATTRIBUTE_DELETION_TYPE = "d_type"; + public static final byte[] ATTRIBUTE_DELETION_TYPE_FULL = Bytes.toBytes(VariantPruneReportRecord.Type.FULL.toString()); + public static final byte[] ATTRIBUTE_DELETION_TYPE_PARTIAL = Bytes.toBytes(VariantPruneReportRecord.Type.PARTIAL.toString()); + private final VariantPruneDriverParams params = new VariantPruneDriverParams(); + private MapReduceOutputFile output; + + @Override + protected Class getMapperClass() { + return VariantPruneMapper.class; + } + + @Override + protected String getJobOperationName() { + return "vairants-prune"; + } + + @Override + protected Map getParams() { + Map params = new HashMap<>(); + for (String key : this.params.fields().keySet()) { + params.put(key, "<" + key + ">"); + } + return params; + } + + @Override + protected void parseAndValidateParameters() throws IOException { + super.parseAndValidateParameters(); + + for (String key : params.fields().keySet()) { + String value = getParam(key); + if (value != null) { + params.updateParams(new HashMap<>(Collections.singletonMap(key, value))); + } + } + output = new MapReduceOutputFile( + () -> "variant_prune_report." + TimeUtils.getTime() + ".txt", + "variant_prune_report"); + } + + @Override + protected Job setupJob(Job job, String archiveTable, String variantTable) throws IOException { + + Scan scan = new Scan() + .addFamily(COLUMN_FAMILY_BYTES); + + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); + scan.setFilter(filterList); + + scan.addColumn(COLUMN_FAMILY_BYTES, VariantPhoenixSchema.VariantColumn.INDEX_NOT_SYNC.bytes()); + scan.addColumn(COLUMN_FAMILY_BYTES, VariantPhoenixSchema.VariantColumn.INDEX_UNKNOWN.bytes()); + scan.addColumn(COLUMN_FAMILY_BYTES, VariantPhoenixSchema.VariantColumn.INDEX_STUDIES.bytes()); + + Map> columnsPerStudy = new HashMap<>(); + VariantStorageMetadataManager metadataManager = getMetadataManager(); + for (Map.Entry entry : metadataManager.getStudies().entrySet()) { + Integer studyId = entry.getValue(); + Integer cohortId = metadataManager.getCohortId(studyId, DEFAULT_COHORT); + + scan.addColumn(COLUMN_FAMILY_BYTES, VariantPhoenixSchema.getStudyColumn(studyId).bytes()); + scan.addColumn(COLUMN_FAMILY_BYTES, VariantPhoenixSchema.getStatsColumn(studyId, cohortId).bytes()); + + List columnsToDelete = new ArrayList<>(); + for (PhoenixHelper.Column c : VariantPhoenixSchema.getStudyColumns(studyId)) { + columnsToDelete.add(c.column()); + } + metadataManager.cohortIterator(studyId).forEachRemaining(cohortMetadata -> { + for (PhoenixHelper.Column c : VariantPhoenixSchema.getStatsColumns(studyId, cohortMetadata.getId())) { + columnsToDelete.add(c.column()); + } + }); + for (VariantScoreMetadata variantScore : metadataManager.getStudyMetadata(studyId).getVariantScores()) { + columnsToDelete.add(VariantPhoenixSchema.getVariantScoreColumn(studyId, variantScore.getId()).column()); + } + columnsPerStudy.put(studyId, columnsToDelete); + } + + VariantMapReduceUtil.configureMapReduceScan(scan, getConf()); + logger.info("Scan = " + scan); + + FileOutputFormat.setCompressOutput(job, false); + FileOutputFormat.setOutputPath(job, output.getOutdir()); + + if (params.isDryRun()) { + logger.info("Dry mode"); + VariantMapReduceUtil.initTableMapperJob(job, variantTable, scan, getMapperClass()); + VariantMapReduceUtil.setNoneReduce(job); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Mutation.class); + job.setOutputFormatClass(VariantPruneReportOutputFormat.class); + } else { + logger.info("Configure multi output job"); + VariantMapReduceUtil.initTableMapperMultiOutputJob(job, variantTable, Collections.singletonList(scan), getMapperClass()); + job.setOutputFormatClass(VariantPruneReportAndWriteOutputFormat.class); + } + + VariantTableHelper.setVariantsTable(job.getConfiguration(), variantTable); + job.getConfiguration().set(VariantPruneMapper.PENDING_TABLE, getTableNameGenerator().getPendingSecondaryIndexPruneTableName()); + job.getConfiguration().set(VariantPruneMapper.PENDING_ANNOTATION_TABLE, getTableNameGenerator().getPendingAnnotationTableName()); + VariantPruneMapper.setColumnsPerStudy(job.getConfiguration(), columnsPerStudy); + return job; + } + + @Override + protected void postExecution(boolean succeed) throws IOException, StorageEngineException { + super.postExecution(succeed); + output.postExecute(succeed); + } + + public static class VariantPruneMapper extends TableMapper { + + public static final String PENDING_TABLE = "VariantPruneMapper.pending_table"; + public static final String PENDING_ANNOTATION_TABLE = "VariantPruneMapper.pending_annotations_table"; + public static final String COLUMNS_PER_STUDY = "VariantPruneMapper.columnsPerStudy"; + + private ImmutableBytesWritable variantsTable; + private ImmutableBytesWritable pendingDeletionVariantsTable; + private ImmutableBytesWritable pendingAnnotationVariantsTable; + private Map> columnsPerStudy; + + + public static final byte[] COLUMN = Bytes.toBytes("v"); + public static final byte[] VALUE = new byte[0]; + + @Override + protected void setup(Mapper.Context context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + variantsTable = new ImmutableBytesWritable(Bytes.toBytes(VariantTableHelper.getVariantsTable(conf))); + pendingDeletionVariantsTable = new ImmutableBytesWritable(Bytes.toBytes(conf.get(PENDING_TABLE))); + pendingAnnotationVariantsTable = new ImmutableBytesWritable(Bytes.toBytes(conf.get(PENDING_ANNOTATION_TABLE))); + + this.columnsPerStudy = getColumnsPerStudy(conf); + } + + public static void setColumnsPerStudy(Configuration conf, Map> columnsPerStudy) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry> entry : columnsPerStudy.entrySet()) { + sb.append(entry.getKey()).append("="); + for (String column : entry.getValue()) { + sb.append(column).append(","); + } + sb.append(";"); + } + conf.set(COLUMNS_PER_STUDY, sb.toString()); + } + + public static Map> getColumnsPerStudy(Configuration conf) { + Map> columnsPerStudy = new HashMap<>(); + for (String s : conf.get(COLUMNS_PER_STUDY).split(";")) { + String[] studyColumns = s.split("="); + Integer studyId = Integer.valueOf(studyColumns[0]); + columnsPerStudy.put(studyId, Arrays.stream(studyColumns[1].split(",")).map(Bytes::toBytes).collect(Collectors.toList())); + } + return columnsPerStudy; + } + + @Override + protected void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + VariantRow variantRow = new VariantRow(value); + + List emptyStudies = new ArrayList<>(); + List studies = new ArrayList<>(); + List studiesWithStats = new ArrayList<>(); + + Variant variant = variantRow.walker() + .onStudy(studies::add) + .onCohortStats(c -> { + studiesWithStats.add(c.getStudyId()); + VariantStats variantStats = c.toJava(); + if (variantStats.getFileCount() == 0) { + emptyStudies.add(c.getStudyId()); + } + }) + .walk(); + + if (studies.size() != studiesWithStats.size() || !studies.containsAll(studiesWithStats)) { + throw new IllegalStateException("Variant stats for cohort " + DEFAULT_COHORT + " not found in variant " + variant); + } + + context.getCounter(COUNTER_GROUP_NAME, "variants").increment(1); + + // It might happen that the variant has 0 studies, so emptyStudies is empty + if (emptyStudies.size() == studies.size()) { + // Drop variant && add to deleted variants list + context.getCounter(COUNTER_GROUP_NAME, "variants_deleted").increment(1); + + context.write(pendingDeletionVariantsTable, + new Put(value.getRow()).addColumn(COLUMN_FAMILY_BYTES, COLUMN, VALUE)); + context.write(pendingAnnotationVariantsTable, + new Delete(value.getRow())); + + Delete delete = new Delete(value.getRow()); + delete.addFamily(COLUMN_FAMILY_BYTES); + delete.setAttribute(ATTRIBUTE_DELETION_TYPE, ATTRIBUTE_DELETION_TYPE_FULL); + delete.setAttribute(ATTRIBUTE_DELETION_STUDIES, + Bytes.toBytes(emptyStudies.stream().map(Object::toString).collect(Collectors.joining(",")))); + context.write(variantsTable, delete); + } else if (emptyStudies.isEmpty()) { + // skipVariant + context.getCounter(COUNTER_GROUP_NAME, "variants_untouched").increment(1); + } else { + context.getCounter(COUNTER_GROUP_NAME, "variants_delete_some_studies").increment(1); + // Drop studies from variant + Delete delete = new Delete(value.getRow()); + + for (Integer emptyStudy : emptyStudies) { + List columnsToDelete = columnsPerStudy.get(emptyStudy); + for (byte[] columnToDelete : columnsToDelete) { + delete.addColumns(COLUMN_FAMILY_BYTES, columnToDelete); + } + } + if (delete.isEmpty()) { + // Impossible statement (unless a bug). + // This block is here to prevent accidental "full row" deletes. + throw new IllegalStateException("Unexpected empty delete at partial variant prune in variant " + variant); + } + delete.setAttribute(ATTRIBUTE_DELETION_TYPE, ATTRIBUTE_DELETION_TYPE_PARTIAL); + delete.setAttribute(ATTRIBUTE_DELETION_STUDIES, + Bytes.toBytes(emptyStudies.stream().map(Object::toString).collect(Collectors.joining(",")))); + + Put updateSecondaryIndexColumns = new Put(value.getRow()); + + HadoopVariantSearchIndexUtils.addNotSyncStatus(updateSecondaryIndexColumns); + + context.write(variantsTable, delete); + context.write(variantsTable, updateSecondaryIndexColumns); + } + } + } + + public static class VariantPruneReportAndWriteOutputFormat extends OutputFormat { + + private MultiTableOutputFormat hbaseOutputFormat; + private VariantPruneReportOutputFormat reportOutputFormat; + + public VariantPruneReportAndWriteOutputFormat() { + hbaseOutputFormat = new MultiTableOutputFormat(); + reportOutputFormat = new VariantPruneReportOutputFormat(); + } + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + RecordWriter hbaseRecordWriter = hbaseOutputFormat.getRecordWriter(context); + RecordWriter reportRecordWriter = reportOutputFormat.getRecordWriter(context); + return new RecordWriter() { + + @Override + public void write(ImmutableBytesWritable key, Mutation value) throws IOException, InterruptedException { + reportRecordWriter.write(key, value); + hbaseRecordWriter.write(key, value); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + reportRecordWriter.close(context); + hbaseRecordWriter.close(context); + } + }; + } + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + hbaseOutputFormat.checkOutputSpecs(context); + reportOutputFormat.checkOutputSpecs(context); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + // Ignore TableOutputCommitter as it is no-op. + return reportOutputFormat.getOutputCommitter(context); + } + } + + public static class VariantPruneReportOutputFormat extends FileOutputFormat { + + protected static class ReportRecordWriter extends RecordWriter { + private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + private static final byte[] SEPARATOR = "\t".getBytes(StandardCharsets.UTF_8); + + protected final DataOutputStream out; + protected final ImmutableBytesWritable variantsTable; + + public ReportRecordWriter(DataOutputStream out, ImmutableBytesWritable variantsTable) { + this.out = out; + this.variantsTable = variantsTable; + } + + public synchronized void write(ImmutableBytesWritable key, Mutation mutation) + throws IOException { + if (mutation instanceof Delete && key.equals(variantsTable)) { + Variant variant = VariantPhoenixKeyFactory.extractVariantFromVariantRowKey(mutation.getRow()); + out.write(variant.toString().getBytes(StandardCharsets.UTF_8)); + out.write(SEPARATOR); + out.write(mutation.getAttribute(ATTRIBUTE_DELETION_TYPE)); + out.write(SEPARATOR); + out.write(mutation.getAttribute(ATTRIBUTE_DELETION_STUDIES)); + out.write(NEWLINE); + } + } + + public synchronized void close(TaskAttemptContext context) throws IOException { + out.close(); + } + } + + public RecordWriter getRecordWriter(TaskAttemptContext job) + throws IOException, InterruptedException { + Configuration conf = job.getConfiguration(); + ImmutableBytesWritable variantsTable = new ImmutableBytesWritable(Bytes.toBytes(VariantTableHelper.getVariantsTable(conf))); + boolean isCompressed = getCompressOutput(job); + CompressionCodec codec = null; + String extension = ""; + if (isCompressed) { + Class codecClass = + getOutputCompressorClass(job, GzipCodec.class); + codec = ReflectionUtils.newInstance(codecClass, conf); + extension = codec.getDefaultExtension(); + } + Path file = getDefaultWorkFile(job, extension); + FileSystem fs = file.getFileSystem(conf); + DataOutputStream fileOut = fs.create(file, false); + if (isCompressed) { + fileOut = new DataOutputStream(codec.createOutputStream(fileOut)); + } + return new ReportRecordWriter(fileOut, variantsTable); + } + } + + @SuppressWarnings("unchecked") + public static void main(String[] args) { + main(args, (Class) MethodHandles.lookup().lookupClass()); + } + +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriverParams.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriverParams.java new file mode 100644 index 00000000000..dd55ac382b8 --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriverParams.java @@ -0,0 +1,27 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import org.opencb.opencga.core.tools.ToolParams; + +public class VariantPruneDriverParams extends ToolParams { + + private boolean dryRun; + private String output; + + public boolean isDryRun() { + return dryRun; + } + + public VariantPruneDriverParams setDryRun(boolean dryRun) { + this.dryRun = dryRun; + return this; + } + + public String getOutput() { + return output; + } + + public VariantPruneDriverParams setOutput(String output) { + this.output = output; + return this; + } +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManager.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManager.java new file mode 100644 index 00000000000..70104c377ac --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManager.java @@ -0,0 +1,324 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.util.Bytes; +import org.opencb.biodata.models.variant.StudyEntry; +import org.opencb.biodata.models.variant.Variant; +import org.opencb.commons.ProgressLogger; +import org.opencb.commons.datastore.core.Query; +import org.opencb.commons.run.ParallelTaskRunner; +import org.opencb.commons.run.Task; +import org.opencb.opencga.core.common.TimeUtils; +import org.opencb.opencga.storage.core.exceptions.StorageEngineException; +import org.opencb.opencga.storage.core.exceptions.VariantSearchException; +import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager; +import org.opencb.opencga.storage.core.metadata.models.TaskMetadata; +import org.opencb.opencga.storage.core.variant.search.solr.VariantSearchManager; +import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageEngine; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixKeyFactory; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchema; +import org.opencb.opencga.storage.hadoop.variant.pending.PendingVariantsDBCleaner; +import org.opencb.opencga.storage.hadoop.variant.search.HadoopVariantSearchDataDeleter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class VariantPruneManager { + + public static final int CHECK_DRY_RUN_LIMIT = 1000000; + private Logger logger = LoggerFactory.getLogger(VariantPruneManager.class); + public static final String OPERATION_NAME = "VariantPrune"; + private final HadoopVariantStorageEngine engine; + + public VariantPruneManager(HadoopVariantStorageEngine engine) { + this.engine = engine; + } + + public void prune(boolean dryMode, boolean resume, URI outdir) throws StorageEngineException { + List tasks = pre(dryMode, resume); + Thread hook = addHook(tasks); + try { + runPrune(dryMode, outdir); + post(tasks, true); + } catch (Exception e) { + try { + post(tasks, false); + } catch (Exception e1) { + e.addSuppressed(e1); + } + throw e; + } finally { + removeHook(hook); + } + } + + private void removeHook(Thread hook) { + Runtime.getRuntime().removeShutdownHook(hook); + } + + private Thread addHook(List tasks) { + Thread hook = new Thread(() -> { + try { + post(tasks, false); + } catch (StorageEngineException e) { + logger.error("Catch error while running shutdown hook.", e); + } + }); + Runtime.getRuntime().addShutdownHook(hook); + return hook; + } + + private Map runPrune(boolean dryMode, URI outdir) throws StorageEngineException { + + try { + if (!dryMode) { + // Do not create table in dry-mode. + String pruneTableName = engine.getDBAdaptor().getTableNameGenerator().getPendingSecondaryIndexPruneTableName(); + new SecondaryIndexPrunePendingVariantsDescriptor() + .createTableIfNeeded(pruneTableName, engine.getDBAdaptor().getHBaseManager()); + } + } catch (IOException e) { + throw StorageEngineException.ioException(e); + } + + VariantPruneDriverParams params = new VariantPruneDriverParams() + .setDryRun(dryMode) + .setOutput(outdir.resolve("variant_prune_report." + TimeUtils.getTime() + ".txt").toString()); + + engine.getMRExecutor().run(VariantPruneDriver.class, + VariantPruneDriver.buildArgs(engine.getVariantTableName(), params.toObjectMap()), + "Variant prune on table '" + engine.getVariantTableName() + "'" + ); + + Path report; + try { + report = Files.list(Paths.get(outdir)) + .filter(p -> p.getFileName().toString().contains("variant_prune_report")) + .findFirst() + .orElse(null); + Map countByType; + if (report == null) { + logger.info("Nothing to delete!"); + countByType = Arrays.stream(VariantPruneReportRecord.Type.values()).collect(Collectors.toMap(k -> k, k -> 0L)); + } else { + countByType = Files.lines(report) + .map(VariantPruneReportRecord::new) + .collect(Collectors.groupingBy( + VariantPruneReportRecord::getType, + Collectors.counting())); + } + + long totalCount = countByType.values().stream().mapToLong(l -> l).sum(); + if (dryMode) { + logger.info("Found {} variants to prune, {}", totalCount, countByType); + checkReportedVariants(report, totalCount); + } else { + logger.info("Pruned {} variants, {}", totalCount, countByType); + pruneFromSecondaryIndex(countByType.getOrDefault(VariantPruneReportRecord.Type.FULL, 0L)); + updateSecondaryIndex(countByType.getOrDefault(VariantPruneReportRecord.Type.PARTIAL, 0L)); + } + return countByType; + } catch (IOException e) { + throw StorageEngineException.ioException(e); + } + } + + private void checkReportedVariants(Path report, long count) throws IOException, StorageEngineException { + + logger.info("Check dry-run report. Found {} variants to prune", count); + + int batchSize = 100; + int variantsToSkip; + long variantsToCheck; + if (count > CHECK_DRY_RUN_LIMIT) { + logger.warn("Will check only {} out of {} variants", CHECK_DRY_RUN_LIMIT, count); + variantsToSkip = (int) (count / CHECK_DRY_RUN_LIMIT); + variantsToCheck = CHECK_DRY_RUN_LIMIT; + } else { + variantsToSkip = 0; + variantsToCheck = count; + } + + Iterator it = Files.lines(report).map(VariantPruneReportRecord::new).iterator(); + ProgressLogger progressLogger = new ProgressLogger("Checking variant to prune", variantsToCheck); + try (Table table = engine.getDBAdaptor().getHBaseManager().getConnection() + .getTable(TableName.valueOf(engine.getVariantTableName()))) { + AtomicInteger variantsWithProblems = new AtomicInteger(); + ParallelTaskRunner ptr = new ParallelTaskRunner<>( + i -> { + List records = new ArrayList<>(i); + int skippedVariants = 0; + while (it.hasNext() && records.size() < i) { + VariantPruneReportRecord next = it.next(); + if (skippedVariants == variantsToSkip) { + skippedVariants = 0; + records.add(next); + } else { + skippedVariants++; + } + } + return records; + }, + batch -> { + List gets = new ArrayList<>(batch.size()); + for (VariantPruneReportRecord record : batch) { + Get get = new Get(VariantPhoenixKeyFactory.generateVariantRowKey(record.getVariant())); + if (record.getType() == VariantPruneReportRecord.Type.PARTIAL) { + FilterList filter = new FilterList(FilterList.Operator.MUST_PASS_ONE); + for (Integer study : record.getStudies()) { + filter.addFilter( + new ColumnPrefixFilter(Bytes.toBytes(VariantPhoenixSchema.buildStudyColumnsPrefix(study)))); + } + get.setFilter(filter); + } + gets.add(get); + } + Result[] get = table.get(gets); + for (int i = 0; i < get.length; i++) { + Result result = get[i]; + Variant variant = VariantPhoenixKeyFactory.extractVariantFromVariantRowKey(result.getRow()); + VariantPruneReportRecord record = batch.get(i); + if (!variant.sameGenomicVariant(record.getVariant())) { + throw new IllegalStateException("Error checking report! Expected " + + record.getVariant() + ", got " + variant); + } + progressLogger.increment(1, () -> "up to variant " + variant); + +// List columns = new ArrayList<>(result.rawCells().length); + List sampleOrFileColumns = new ArrayList<>(result.rawCells().length); + for (Cell cell : result.rawCells()) { + String column = Bytes.toString( + cell.getQualifierArray(), + cell.getQualifierOffset(), + cell.getQualifierLength()); +// columns.add(column); + if (VariantPhoenixSchema.isSampleDataColumn(column) && VariantPhoenixSchema.isFileColumn(column)) { + sampleOrFileColumns.add(column); + } + } + // TODO: Don't just report, do some checks here +// logger.info("Variant : {}, prune type: {} , columns: {} , {}", variant, record.type, columns.size(), columns); + if (!sampleOrFileColumns.isEmpty()) { + logger.warn("Variant : {}, prune type: {} , columns: {} , {}", variant, record.getType(), + sampleOrFileColumns.size(), + sampleOrFileColumns); + variantsWithProblems.incrementAndGet(); + } + } + return batch; + }, + null, + ParallelTaskRunner.Config.builder().setBatchSize(batchSize).setCapacity(2).setNumTasks(4).build() + ); + try { + ptr.run(); + } catch (ExecutionException e) { + throw new StorageEngineException("Error checking variant prune report", e); + } + if (variantsWithProblems.get() > 0) { + throw new StorageEngineException("Error validating variant prune report!" + + " Found " + variantsWithProblems.get() + " out of " + variantsToCheck + " checked variants with inconsistencies"); + } + } + } + + private void pruneFromSecondaryIndex(long count) throws StorageEngineException { + logger.info("Deleting {} variants from secondary index", count); + logger.info("In case of resuming operation, the total number of variants to remove could be larger."); + SecondaryIndexPrunePendingVariantsManager manager = new SecondaryIndexPrunePendingVariantsManager(engine.getDBAdaptor()); + + Task progressTask = new ProgressLogger("Prune variants from secondary index", count) + .asTask(variant -> "up to variant " + variant); + PendingVariantsDBCleaner cleaner = manager.cleaner(); + + VariantSearchManager searchManager = engine.getVariantSearchManager(); + ParallelTaskRunner ptr = new ParallelTaskRunner<>( + manager.reader(new Query()), + progressTask, + new HadoopVariantSearchDataDeleter(engine.getDBName(), searchManager.getSolrClient(), cleaner), + ParallelTaskRunner.Config.builder().setNumTasks(1).setBatchSize(searchManager.getInsertBatchSize()).build()); + + try { + ptr.run(); + } catch (ExecutionException e) { + throw new StorageEngineException("Error checking variant prune report", e); + } + } + + private void updateSecondaryIndex(long count) throws StorageEngineException, IOException { + logger.info("Updating {} variants from secondary index", count); + logger.info("In case of resuming operation, the total number of variants to update could be larger."); + + try { + engine.secondaryIndex(); + } catch (VariantSearchException e) { + throw new StorageEngineException("Internal search index error", e); + } + } + + private List pre(boolean dryMode, boolean resume) throws StorageEngineException { + VariantStorageMetadataManager mm = engine.getMetadataManager(); + + List tasks = new LinkedList<>(); + List studiesWithoutStats = new LinkedList<>(); + + // First check no running operations in any study + for (Integer studyId : mm.getStudies().values()) { + // Do not allow concurrent operations at all. + mm.checkTaskCanRun(studyId, OPERATION_NAME, Collections.emptyList(), resume, TaskMetadata.Type.REMOVE, tm -> false); + } + + // Check that all variant stats are updated + for (Integer studyId : mm.getStudies().values()) { + if (!mm.getCohortMetadata(studyId, StudyEntry.DEFAULT_COHORT).isStatsReady()) { + studiesWithoutStats.add(mm.getStudyName(studyId)); + } + // FIXME: What if not invalid? + // Might happen if some samples were deleted, or when loading split files? + } + if (!studiesWithoutStats.isEmpty()) { + throw new StorageEngineException("Unable to run variant prune operation. " + + "Please, run variant stats index on cohort '" + StudyEntry.DEFAULT_COHORT + "' for studies " + studiesWithoutStats); + } + + // If no dry-mode, add the new tasks + if (!dryMode) { + for (Integer studyId : mm.getStudies().values()) { + // Do not allow concurrent operations at all. + tasks.add(mm.addRunningTask(studyId, OPERATION_NAME, Collections.emptyList(), resume, + TaskMetadata.Type.REMOVE, tm -> false)); + } + } + + return tasks; + } + + private void post(List tasks, boolean success) throws StorageEngineException { + VariantStorageMetadataManager mm = engine.getMetadataManager(); + for (TaskMetadata task : tasks) { + mm.updateTask(task.getStudyId(), task.getId(), t -> { + if (success) { + t.addStatus(TaskMetadata.Status.READY); + } else { + t.addStatus(TaskMetadata.Status.ERROR); + } + }); + } + } + +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneReportRecord.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneReportRecord.java new file mode 100644 index 00000000000..a8cc3cd5089 --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneReportRecord.java @@ -0,0 +1,42 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import org.opencb.biodata.models.variant.Variant; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class VariantPruneReportRecord { + private final Variant variant; + private final Type type; + private final List studies; + + enum Type { + FULL, PARTIAL + } + + public VariantPruneReportRecord(Variant variant, Type type, List studies) { + this.variant = variant; + this.type = type; + this.studies = studies; + } + + public VariantPruneReportRecord(String line) { + String[] split = line.split("\t"); + variant = new Variant(split[0]); + type = Type.valueOf(split[1]); + studies = Arrays.stream(split[2].split(",")).map(Integer::valueOf).collect(Collectors.toList()); + } + + public Variant getVariant() { + return variant; + } + + public Type getType() { + return type; + } + + public List getStudies() { + return studies; + } +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/score/VariantScoreToHBaseConverter.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/score/VariantScoreToHBaseConverter.java index 16b5a255e05..792f523d5b1 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/score/VariantScoreToHBaseConverter.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/score/VariantScoreToHBaseConverter.java @@ -26,7 +26,7 @@ public VariantScoreToHBaseConverter(byte[] columnFamily, int studyId, int scoreI public Put convert(Pair pair) { Put put = new Put(VariantPhoenixKeyFactory.generateVariantRowKey(pair.getKey())); add(put, column, Arrays.asList(pair.getValue().getScore(), pair.getValue().getPValue())); - HadoopVariantSearchIndexUtils.addNotSyncStatus(put, columnFamily); + HadoopVariantSearchIndexUtils.addNotSyncStatus(put); return put; } } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchDataDeleter.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchDataDeleter.java new file mode 100644 index 00000000000..d2b356f820e --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchDataDeleter.java @@ -0,0 +1,87 @@ +package org.opencb.opencga.storage.hadoop.variant.search; + +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.opencb.biodata.models.variant.Variant; +import org.opencb.commons.io.DataWriter; +import org.opencb.opencga.storage.hadoop.variant.adaptors.VariantHadoopDBAdaptor; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixKeyFactory; +import org.opencb.opencga.storage.hadoop.variant.pending.PendingVariantsDBCleaner; +import org.opencb.opencga.storage.hadoop.variant.prune.SecondaryIndexPrunePendingVariantsManager; + +import java.io.IOException; +import java.util.*; + + +/** + * Created on 19/04/18. + * + * @author Jacobo Coll <jacobo167@gmail.com> + */ +public class HadoopVariantSearchDataDeleter implements DataWriter { + + private final String collection; + private final SolrClient solrClient; + private final PendingVariantsDBCleaner cleaner; + + public HadoopVariantSearchDataDeleter(String collection, SolrClient solrClient, VariantHadoopDBAdaptor dbAdaptor) { + this(collection, solrClient, new SecondaryIndexPrunePendingVariantsManager(dbAdaptor).cleaner()); + } + + public HadoopVariantSearchDataDeleter(String collection, SolrClient solrClient, PendingVariantsDBCleaner cleaner) { + this.collection = collection; + this.solrClient = solrClient; + this.cleaner = cleaner; + } + + @Override + public boolean write(List batch) { + if (batch.isEmpty()) { + return true; + } + + List variantRows = new ArrayList<>(batch.size()); + List variantIds = new ArrayList<>(batch.size()); + + for (Variant variant : batch) { + byte[] row = VariantPhoenixKeyFactory.generateVariantRowKey(variant); + variantRows.add(row); + variantIds.add(variant.toString()); + } + + try { + solrClient.deleteById(collection, variantIds); + solrClient.commit(collection); + cleaner.write(variantRows); + cleaner.flush(); + } catch (SolrServerException | IOException e) { + throw new RuntimeException(e); + } + + return true; + } + + @Override + public boolean open() { + cleaner.open(); + return true; + } + + @Override + public boolean pre() { + cleaner.pre(); + return true; + } + + @Override + public boolean post() { + cleaner.post(); + return true; + } + + @Override + public boolean close() { + cleaner.close(); + return true; + } +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchIndexUtils.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchIndexUtils.java index 2b32610f68b..50a0a19ba89 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchIndexUtils.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchIndexUtils.java @@ -26,12 +26,11 @@ public class HadoopVariantSearchIndexUtils { * Marks the row as Not Sync Status. This method should be called when loading annotations or statistics, and when removing a study. * * @param put Mutation to add new Variant information. - * @param columnFamily Main column family. * @return The same put operation with the {@link VariantColumn#INDEX_NOT_SYNC} column. */ - public static Put addNotSyncStatus(Put put, byte[] columnFamily) { + public static Put addNotSyncStatus(Put put) { if (put != null) { - put.addColumn(columnFamily, VariantColumn.INDEX_NOT_SYNC.bytes(), System.currentTimeMillis(), + put.addColumn(GenomeHelper.COLUMN_FAMILY_BYTES, VariantColumn.INDEX_NOT_SYNC.bytes(), System.currentTimeMillis(), PBoolean.TRUE_BYTES); } return put; @@ -41,12 +40,11 @@ public static Put addNotSyncStatus(Put put, byte[] columnFamily) { * Marks the row as Unknown Sync Status. This method should be called when loading or removing files. * * @param put Mutation to add new Variant information. - * @param columnFamily Main column family. * @return The same put operation with the {@link VariantColumn#INDEX_UNKNOWN} column. */ - public static Put addUnknownSyncStatus(Put put, byte[] columnFamily) { + public static Put addUnknownSyncStatus(Put put) { if (put != null) { - put.addColumn(columnFamily, VariantColumn.INDEX_UNKNOWN.bytes(), System.currentTimeMillis(), + put.addColumn(GenomeHelper.COLUMN_FAMILY_BYTES, VariantColumn.INDEX_UNKNOWN.bytes(), System.currentTimeMillis(), PBoolean.TRUE_BYTES); } return put; diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/HadoopMRVariantStatisticsManager.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/HadoopMRVariantStatisticsManager.java index 8d98f3caa6a..6efa3f3fadc 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/HadoopMRVariantStatisticsManager.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/HadoopMRVariantStatisticsManager.java @@ -7,6 +7,7 @@ import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager; import org.opencb.opencga.storage.core.metadata.models.StudyMetadata; import org.opencb.opencga.storage.core.variant.VariantStorageOptions; +import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryException; import org.opencb.opencga.storage.core.variant.stats.VariantStatisticsManager; import org.opencb.opencga.storage.hadoop.variant.adaptors.VariantHadoopDBAdaptor; import org.opencb.opencga.storage.hadoop.variant.executors.MRExecutor; @@ -44,7 +45,9 @@ public void calculateStatistics(String study, List cohorts, QueryOptions } VariantStorageMetadataManager metadataManager = dbAdaptor.getMetadataManager(); StudyMetadata sm = metadataManager.getStudyMetadata(study); - + if (sm == null) { + throw VariantQueryException.studyNotFound(study); + } if (isAggregated(sm, options)) { Aggregation aggregation = getAggregation(sm, options); VariantStatsMapper.setAggregation(options, aggregation); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsDriver.java index 13a848b928b..eb76771b159 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsDriver.java @@ -1,8 +1,6 @@ package org.opencb.opencga.storage.hadoop.variant.stats; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; @@ -53,13 +51,13 @@ public class VariantStatsDriver extends AbstractVariantsTableDriver { private Collection cohorts; private static Logger logger = LoggerFactory.getLogger(VariantStatsDriver.class); - private Path outdir; - private Path localOutput; + private Aggregation aggregation; private boolean overwrite; private boolean statsMultiAllelic; private String statsDefaultGenotype; private boolean excludeFiles; + private MapReduceOutputFile output; public VariantStatsDriver() { } @@ -103,24 +101,9 @@ protected void parseAndValidateParameters() throws IOException { logger.info(" * " + VariantStorageOptions.STATS_DEFAULT_GENOTYPE.key() + ": " + statsDefaultGenotype); - String outdirStr = getParam(OUTPUT); - if (StringUtils.isNotEmpty(outdirStr)) { - outdir = new Path(outdirStr); - - if (isLocal(outdir)) { - localOutput = getLocalOutput(outdir, () -> "variant_stats." - + (cohorts.size() < 10 ? "." + String.join("_", cohortNames) : "") - + TimeUtils.getTime() + ".json"); - outdir = getTempOutdir("opencga_sample_variant_stats", localOutput.getName()); - outdir.getFileSystem(getConf()).deleteOnExit(outdir); - } - if (localOutput != null) { - logger.info(" * Outdir file: " + localOutput.toUri()); - logger.info(" * Temporary outdir file: " + outdir.toUri()); - } else { - logger.info(" * Outdir file: " + outdir.toUri()); - } - } + output = new MapReduceOutputFile(() -> "variant_stats." + + (cohorts.size() < 10 ? "." + String.join("_", cohortNames) : "") + + TimeUtils.getTime() + ".json", "opencga_sample_variant_stats"); } @Override @@ -146,7 +129,7 @@ protected Job setupJob(Job job, String archiveTableName, String variantTableName query.put(VariantQueryParam.INCLUDE_FILE.key(), VariantQueryUtils.NONE); } - if (outdir != null) { + if (output.getOutdir() != null) { // Do not index stats. // Allow any input query. // Write stats to file. @@ -165,7 +148,7 @@ protected Job setupJob(Job job, String archiveTableName, String variantTableName job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setCompressOutput(job, false); - TextOutputFormat.setOutputPath(job, outdir); + TextOutputFormat.setOutputPath(job, output.getOutdir()); } else if (AggregationUtils.isAggregated(aggregation)) { // For aggregated variants use plain VariantStatsMapper @@ -228,14 +211,7 @@ protected Job setupJob(Job job, String archiveTableName, String variantTableName @Override protected void postExecution(boolean succeed) throws IOException, StorageEngineException { super.postExecution(succeed); - if (succeed) { - if (localOutput != null) { - concatMrOutputToLocal(outdir, localOutput); - } - } - if (localOutput != null) { - deleteTemporaryFile(outdir); - } + output.postExecute(succeed); } @Override diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsFromResultMapper.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsFromResultMapper.java index db233c97c41..28b0115a326 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsFromResultMapper.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsFromResultMapper.java @@ -14,7 +14,6 @@ import org.opencb.opencga.storage.core.metadata.models.StudyMetadata; import org.opencb.opencga.storage.core.variant.VariantStorageOptions; import org.opencb.opencga.storage.core.variant.stats.VariantStatsWrapper; -import org.opencb.opencga.storage.hadoop.variant.GenomeHelper; import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchema; import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixKeyFactory; import org.opencb.opencga.storage.hadoop.variant.converters.stats.VariantStatsToHBaseConverter; @@ -185,7 +184,7 @@ private void write(Context context, VariantStatsWrapper wrapper) throws IOExcept if (put == null) { context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "stats.put.null").increment(1); } else { - HadoopVariantSearchIndexUtils.addNotSyncStatus(put, GenomeHelper.COLUMN_FAMILY_BYTES); + HadoopVariantSearchIndexUtils.addNotSyncStatus(put); context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "stats.put").increment(1); context.write(new ImmutableBytesWritable(helper.getVariantsTable()), put); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsMapper.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsMapper.java index 5f349cc76e9..3ed5d8176e7 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsMapper.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsMapper.java @@ -14,7 +14,6 @@ import org.opencb.opencga.storage.core.variant.VariantStorageOptions; import org.opencb.opencga.storage.core.variant.stats.VariantStatisticsCalculator; import org.opencb.opencga.storage.core.variant.stats.VariantStatsWrapper; -import org.opencb.opencga.storage.hadoop.variant.GenomeHelper; import org.opencb.opencga.storage.hadoop.variant.converters.stats.VariantStatsToHBaseConverter; import org.opencb.opencga.storage.hadoop.variant.mr.VariantMapper; import org.opencb.opencga.storage.hadoop.variant.mr.VariantTableHelper; @@ -95,7 +94,7 @@ protected void map(Object key, Variant variant, Context context) throws IOExcept if (put == null) { context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "stats.put.null").increment(1); } else { - HadoopVariantSearchIndexUtils.addNotSyncStatus(put, GenomeHelper.COLUMN_FAMILY_BYTES); + HadoopVariantSearchIndexUtils.addNotSyncStatus(put); context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "stats.put").increment(1); context.write(new ImmutableBytesWritable(helper.getVariantsTable()), put); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/utils/HBaseVariantTableNameGenerator.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/utils/HBaseVariantTableNameGenerator.java index cd23d8a59ff..7420070878d 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/utils/HBaseVariantTableNameGenerator.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/utils/HBaseVariantTableNameGenerator.java @@ -19,6 +19,7 @@ public class HBaseVariantTableNameGenerator { private static final String SAMPLE_SUFIX = "_variant_sample_index_"; private static final String PENDING_ANNOTATION_SUFIX = "_pending_annotation"; private static final String PENDING_SECONDARY_INDEX_SUFIX = "_pending_secondary_index"; + private static final String PENDING_SECONDARY_INDEX_PRUNE_SUFIX = "_pending_secondary_index_prune"; private static final int MINIMUM_DB_NAME_SIZE = 1; private final String namespace; @@ -27,6 +28,7 @@ public class HBaseVariantTableNameGenerator { private final String metaTableName; private final String pendingAnnotationTableName; private final String pendingSecondaryIndexTableName; + private final String pendingSecondaryIndexPruneTableName; public HBaseVariantTableNameGenerator(String dbName, ObjectMap options) { @@ -47,6 +49,7 @@ public HBaseVariantTableNameGenerator(String namespace, String dbName) { metaTableName = getMetaTableName(namespace, this.dbName); pendingAnnotationTableName = getPendingAnnotationTableName(namespace, this.dbName); pendingSecondaryIndexTableName = getPendingSecondaryIndexTableName(namespace, this.dbName); + pendingSecondaryIndexPruneTableName = getPendingSecondaryIndexPruneTableName(namespace, this.dbName); } public String getVariantTableName() { @@ -69,6 +72,10 @@ public String getPendingSecondaryIndexTableName() { return pendingSecondaryIndexTableName; } + public String getPendingSecondaryIndexPruneTableName() { + return pendingSecondaryIndexPruneTableName; + } + public String getMetaTableName() { return metaTableName; } @@ -133,6 +140,21 @@ public static boolean isValidPendingSecondaryIndexTableName(String pendingSecond return validSuffix(pendingSecondaryIndexTableName, PENDING_SECONDARY_INDEX_SUFIX); } + public static void checkValidPendingSecondaryIndexPruneTableName(String tableName) { + if (!isValidPendingSecondaryIndexPruneTableName(tableName)) { + throw new IllegalArgumentException("Invalid pending prune table name : " + tableName); + } + } + + public static boolean isValidPendingSecondaryIndexPruneTableName(String tableName) { + return validSuffix(tableName, PENDING_SECONDARY_INDEX_PRUNE_SUFIX); + } + + public static String getDBNameFromPendingSecondaryIndexPruneTableName(String tableName) { + checkValidPendingSecondaryIndexPruneTableName(tableName); + return tableName.substring(0, tableName.length() - PENDING_SECONDARY_INDEX_PRUNE_SUFIX.length()); + } + public static String getDBNameFromMetaTableName(String metaTableName) { checkValidMetaTableName(metaTableName); return metaTableName.substring(0, metaTableName.length() - META_SUFIX.length()); @@ -237,6 +259,10 @@ public static String getPendingSecondaryIndexTableName(String namespace, String return buildTableName(namespace, dbName, PENDING_SECONDARY_INDEX_SUFIX); } + public static String getPendingSecondaryIndexPruneTableName(String namespace, String dbName) { + return buildTableName(namespace, dbName, PENDING_SECONDARY_INDEX_PRUNE_SUFIX); + } + public static String getMetaTableName(String namespace, String dbName) { return buildTableName(namespace, dbName, META_SUFIX); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageTest.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageTest.java index 5b419028a49..973acaff2b1 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageTest.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageTest.java @@ -404,6 +404,9 @@ static StorageConfiguration updateStorageConfiguration(StorageConfiguration stor options.put(HadoopVariantStorageOptions.PENDING_SECONDARY_INDEX_TABLE_COMPRESSION.key(), supportedAlgorithms.contains(Compression.Algorithm.SNAPPY) ? Compression.Algorithm.SNAPPY.getName() : Compression.Algorithm.NONE.getName()); + options.put(HadoopVariantStorageOptions.PENDING_SECONDARY_INDEX_PRUNE_TABLE_COMPRESSION.key(), supportedAlgorithms.contains(Compression.Algorithm.SNAPPY) + ? Compression.Algorithm.SNAPPY.getName() + : Compression.Algorithm.NONE.getName()); FileSystem fs = FileSystem.get(HadoopVariantStorageTest.configuration.get()); String intermediateDirectory = fs.getHomeDirectory().toUri().resolve("opencga_test/").toString(); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManagerTest.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManagerTest.java new file mode 100644 index 00000000000..92530100a86 --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManagerTest.java @@ -0,0 +1,232 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import org.hamcrest.MatcherAssert; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.opencb.biodata.models.variant.StudyEntry; +import org.opencb.commons.datastore.core.ObjectMap; +import org.opencb.commons.datastore.core.Query; +import org.opencb.commons.datastore.core.QueryOptions; +import org.opencb.opencga.storage.core.exceptions.StorageEngineException; +import org.opencb.opencga.storage.core.variant.VariantStorageBaseTest; +import org.opencb.opencga.storage.core.variant.VariantStorageEngine; +import org.opencb.opencga.storage.core.variant.VariantStorageOptions; +import org.opencb.opencga.storage.core.variant.adaptors.VariantMatchers; +import org.opencb.opencga.storage.core.variant.annotation.DefaultVariantAnnotationManager; +import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageEngine; +import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageTest; +import org.opencb.opencga.storage.hadoop.variant.VariantHbaseTestUtils; +import org.opencb.opencga.storage.hadoop.variant.adaptors.VariantHadoopDBAdaptor; + +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.*; + +public class VariantPruneManagerTest extends VariantStorageBaseTest implements HadoopVariantStorageTest { + + public static final String STUDY_NAME_3 = "study_3"; + public static final String STUDY_NAME_4 = "study_4"; + public static final String STUDY_NAME_5 = "study_5"; + + @ClassRule + public static HadoopExternalResource externalResource = new HadoopExternalResource(); + private VariantHadoopDBAdaptor dbAdaptor; + private boolean loaded; + private HadoopVariantStorageEngine engine; + + @After + public void tearDown() throws Exception { + VariantHbaseTestUtils.printVariants(getVariantStorageEngine().getDBAdaptor(), newOutputUri(getTestName().getMethodName())); + } + + @Before + public void before() throws Exception { + engine = getVariantStorageEngine(); + dbAdaptor = engine.getDBAdaptor(); +// if (!loaded) { +// load(); +// loaded = true; +// } + } + + public void load() throws Exception { + clearDB(DB_NAME); + + // Study 1 - single file + ObjectMap params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false); + runETL(engine, smallInputUri, outputUri, params, true, true, true); + engine.calculateStats(STUDY_NAME, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + // Study 2 - multi files + params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME_2) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false) + .append(VariantStorageOptions.LOAD_SPLIT_DATA.key(), VariantStorageEngine.SplitData.MULTI); + + runETL(engine, getResourceUri("by_chr/chr22_1-1.variant-test-file.vcf.gz"), outputUri, params, true, true, true); + runETL(engine, getResourceUri("by_chr/chr22_1-2.variant-test-file.vcf.gz"), outputUri, params, true, true, true); + runETL(engine, getResourceUri("by_chr/chr22_1-2-DUP.variant-test-file.vcf.gz"), outputUri, params, true, true, true); + engine.calculateStats(STUDY_NAME_2, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + + // Study 3 - platinum + params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME_3) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false); + runETL(engine, getPlatinumFile(0), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(1), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(2), outputUri, params, true, true, true); + engine.calculateStats(STUDY_NAME_3, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + // Study 4 - platinum_2 + params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME_4) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false); + runETL(engine, getPlatinumFile(3), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(4), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(5), outputUri, params, true, true, true); + engine.calculateStats(STUDY_NAME_4, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + // Study 5, dense + params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME_4) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false); + runETL(engine, getResourceUri("variant-test-dense.vcf.gz"), outputUri, params, true, true, true); + engine.calculateStats(STUDY_NAME_4, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + + // ---------------- Annotate + this.variantStorageEngine.annotate(new Query(), new QueryOptions(DefaultVariantAnnotationManager.OUT_DIR, outputUri)); + + VariantHbaseTestUtils.printVariants(dbAdaptor, newOutputUri()); + } + + @Test + public void testVariantPruneSingleStudy() throws Exception { + ObjectMap params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME_3) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false); + runETL(engine, getPlatinumFile(0), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(1), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(2), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(3), outputUri, params, true, true, true); + + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_3, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + variantPrune("1_prune_dry", true, 0); + + int studyId = engine.getMetadataManager().getStudyId(STUDY_NAME_3); + List samples = engine.getMetadataManager().getIndexedSamples(studyId); + String sampleName = engine.getMetadataManager().getSampleName(studyId, samples.get(0)); + engine.removeSamples(STUDY_NAME_3, Collections.singletonList(sampleName), newOutputUri("2_remove_sample_" + sampleName)); + + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_3, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + int variantsToPrune = variantPrune("3_prune_dry", true); + variantPrune("4_prune_wet", false, variantsToPrune); + variantPrune("5_prune_dry", true, 0); + } + + @Test + public void testVariantPruneSingleStudyMultiFile() throws Exception { + ObjectMap params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME_2) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false) + .append(VariantStorageOptions.LOAD_SPLIT_DATA.key(), VariantStorageEngine.SplitData.MULTI); + + runETL(engine, getResourceUri("by_chr/chr22_1-1.variant-test-file.vcf.gz"), outputUri, params, true, true, true); + runETL(engine, getResourceUri("by_chr/chr22_1-2.variant-test-file.vcf.gz"), outputUri, params, true, true, true); + + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_2, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + runETL(engine, getResourceUri("by_chr/chr22_1-2-DUP.variant-test-file.vcf.gz"), outputUri, params, true, true, true); + + + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_2, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + variantPrune("1_prune_dry", true, 0); + + engine.removeFile(STUDY_NAME_2, "chr22_1-1.variant-test-file.vcf.gz", newOutputUri("2_remove_sample_chr22_1-1.variant-test-file")); + + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_2, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + int variantsToPrune = variantPrune("3_prune_dry", true); + variantPrune("4_prune_wet", false, variantsToPrune); + variantPrune("5_prune_dry", true, 0); + } + + + @Test + public void testVariantPruneMultiStudy() throws Exception { + load(); + + variantPrune("1_prune_dry", true, 0); + + engine.removeFile(STUDY_NAME_3, "1K.end.platinum-genomes-vcf-NA12877_S1.genome.vcf.gz", outputUri); + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_3, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + engine.removeFile(STUDY_NAME_2, "chr22_1-1.variant-test-file.vcf.gz", newOutputUri("2_remove_sample_chr22_1-1.variant-test-file")); + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_2, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + int variantsToPrune = variantPrune("3_prune_dry", true); + variantPrune("4_prune_wet", false, variantsToPrune); + variantPrune("5_prune_dry", true, 0); + } + + private int variantPrune(String testName, boolean dryMode) throws Exception { + return variantPrune(testName, dryMode, null); + } + + private int variantPrune(String testName, boolean dryMode, Integer expectedPrunedVariants) throws Exception { + URI outdir = newOutputUri(testName); + getVariantStorageEngine().variantsPrune(dryMode, false, outdir); + + Path report = Files.list(Paths.get(outdir)).filter(p -> p.getFileName().toString().contains("variant_prune_report")).findFirst() + .orElse(null); + int reportedVariants; + if (report == null) { + reportedVariants = 0; + } else { + reportedVariants = (int) Files.lines(report).count(); + } + if (expectedPrunedVariants == null) { + MatcherAssert.assertThat(reportedVariants, VariantMatchers.gt(0)); + } else { + assertEquals(expectedPrunedVariants.intValue(), reportedVariants); + } + return reportedVariants; + } + + private void checkStatsUpdateRequiredForVariantPrune() throws Exception { + try { + getVariantStorageEngine().variantsPrune(true, false, outputUri); + fail("Should fail, as the variant stats are not valid"); + } catch (StorageEngineException e) { + System.out.println(e.getClass() + " = " + e.getMessage()); + assertTrue(e.getMessage().startsWith("Unable to run variant prune operation. Please, run variant stats index on cohort")); + } + } + +} \ No newline at end of file diff --git a/opencga-storage/opencga-storage-mongodb/src/test/java/org/opencb/opencga/storage/mongodb/variant/MongoVariantStorageEngineTest.java b/opencga-storage/opencga-storage-mongodb/src/test/java/org/opencb/opencga/storage/mongodb/variant/MongoVariantStorageEngineTest.java index 530033c8e64..af8ee660581 100644 --- a/opencga-storage/opencga-storage-mongodb/src/test/java/org/opencb/opencga/storage/mongodb/variant/MongoVariantStorageEngineTest.java +++ b/opencga-storage/opencga-storage-mongodb/src/test/java/org/opencb/opencga/storage/mongodb/variant/MongoVariantStorageEngineTest.java @@ -107,7 +107,6 @@ public void stageResumeFromErrorTest() throws Exception { t.addStatus(new Date(System.currentTimeMillis() - 100), TaskMetadata.Status.RUNNING); t.addStatus(new Date(System.currentTimeMillis() - 50), TaskMetadata.Status.ERROR); // Last status is ERROR - return t; }); System.out.println("----------------"); @@ -136,7 +135,6 @@ public void stageForceResumeTest() throws Exception { operation.addStatus(new Date(System.currentTimeMillis() - 50), TaskMetadata.Status.ERROR); operation.addStatus(new Date(System.currentTimeMillis()), TaskMetadata.Status.RUNNING); // Last status is RUNNING - return operation; }); try { @@ -199,7 +197,6 @@ private long simulateStageError(StudyMetadata studyMetadata, VariantMongoDBAdapt TreeMap status = task.getStatus(); status.remove(status.lastKey(), TaskMetadata.Status.READY); task.addStatus(TaskMetadata.Status.ERROR); - return task; }); // 2) Remove from files collection