From cf9e4a82e52dcbcfc6aa572f6ee89036024a41cc Mon Sep 17 00:00:00 2001 From: imedina Date: Sun, 19 Jun 2022 03:23:13 +0100 Subject: [PATCH 1/6] pom: fix mockito version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 86ce51f15cf..88f86b9af52 100644 --- a/pom.xml +++ b/pom.xml @@ -458,7 +458,7 @@ org.mockito mockito-core - 2.2.3-SNAPSHOT7 + 2.2.27 test From 85d670c641f690d733c2401a4929dedf888c21dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Thu, 23 Jun 2022 17:34:18 +0100 Subject: [PATCH 2/6] storage: Implement dry-mode variant prune #TASK-1116 --- .../manager/VariantStorageManager.java | 8 + .../operations/VariantPruneOperationTool.java | 32 ++ .../models/variant/VariantPruneParams.java | 39 ++ .../VariantOperationWebService.java | 12 + .../VariantStorageMetadataManager.java | 67 +++- .../core/variant/VariantStorageEngine.java | 11 + .../core/variant/VariantStorageBaseTest.java | 1 + .../hadoop/utils/AbstractHBaseDriver.java | 53 +++ .../variant/HadoopVariantStorageEngine.java | 6 + .../variant/HadoopVariantStorageOptions.java | 3 + .../VariantsTableDeleteColumnTask.java | 2 +- .../phoenix/VariantPhoenixSchema.java | 25 +- .../VariantAnnotationHadoopDBWriter.java | 2 +- .../variant/load/VariantHadoopDBWriter.java | 2 +- .../prune/HadoopVariantPruneManager.java | 260 +++++++++++++ ...ryIndexPrunePendingVariantsDescriptor.java | 55 +++ .../variant/prune/VariantPruneDriver.java | 363 ++++++++++++++++++ .../prune/VariantPruneDriverParams.java | 27 ++ .../prune/VariantPruneReportRecord.java | 42 ++ .../score/VariantScoreToHBaseConverter.java | 2 +- .../search/HadoopVariantSearchIndexUtils.java | 16 +- .../HadoopMRVariantStatisticsManager.java | 5 +- .../variant/stats/VariantStatsDriver.java | 40 +- .../stats/VariantStatsFromResultMapper.java | 3 +- .../variant/stats/VariantStatsMapper.java | 3 +- .../utils/HBaseVariantTableNameGenerator.java | 21 + .../variant/HadoopVariantStorageTest.java | 3 + .../prune/HadoopVariantPruneManagerTest.java | 232 +++++++++++ .../MongoVariantStorageEngineTest.java | 3 - 29 files changed, 1269 insertions(+), 69 deletions(-) create mode 100644 opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java create mode 100644 opencga-core/src/main/java/org/opencb/opencga/core/models/variant/VariantPruneParams.java create mode 100644 opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManager.java create mode 100644 opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsDescriptor.java create mode 100644 opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java create mode 100644 opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriverParams.java create mode 100644 opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneReportRecord.java create mode 100644 opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManagerTest.java diff --git a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java index 872ec34aa68..d505282b56a 100644 --- a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java +++ b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java @@ -66,6 +66,7 @@ import org.opencb.opencga.core.models.sample.SampleAclEntry; import org.opencb.opencga.core.models.study.Study; import org.opencb.opencga.core.models.study.StudyAclEntry; +import org.opencb.opencga.core.models.variant.VariantPruneParams; import org.opencb.opencga.core.response.OpenCGAResult; import org.opencb.opencga.core.response.VariantQueryResult; import org.opencb.opencga.core.tools.ToolParams; @@ -1086,6 +1087,13 @@ public boolean exists(String study, String token) throws StorageEngineException, return engine.getMetadataManager().studyExists(studyFqn); } + public void variantPrune(String project, URI outdir, VariantPruneParams params, String token) throws StorageEngineException, CatalogException { + secureOperationByProject(VariantPruneOperationTool.ID, project, new ObjectMap(), token, engine -> { + engine.variantsPrune(params.isDryRun(), params.isResume(), outdir); + return null; + }); + } + // Permission related methods private interface VariantReadOperation { diff --git a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java new file mode 100644 index 00000000000..73c9501e3ce --- /dev/null +++ b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java @@ -0,0 +1,32 @@ +package org.opencb.opencga.analysis.variant.operations; + +import org.opencb.opencga.core.models.common.Enums; +import org.opencb.opencga.core.models.variant.VariantPruneParams; +import org.opencb.opencga.core.tools.annotations.Tool; +import org.opencb.opencga.core.tools.annotations.ToolParams; + +@Tool(id = VariantPruneOperationTool.ID, description = VariantPruneOperationTool.DESCRIPTION, + type = Tool.Type.OPERATION, resource = Enums.Resource.VARIANT) +public class VariantPruneOperationTool extends OperationTool { + + public static final String DESCRIPTION = ""; + public static final String ID = "variant-prune"; + + @ToolParams + protected VariantPruneParams params = new VariantPruneParams(); + + @Override + protected void run() throws Exception { + + step(() -> { + + // TODO: Remove this line. Test purposes only + logger.warn("Enforce dry mode!"); + params.setDryRun(true); + + getVariantStorageManager().variantPrune(params.getProject(), getOutDir().toUri(), params, getToken()); + }); + + + } +} diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/models/variant/VariantPruneParams.java b/opencga-core/src/main/java/org/opencb/opencga/core/models/variant/VariantPruneParams.java new file mode 100644 index 00000000000..49caf4a55d9 --- /dev/null +++ b/opencga-core/src/main/java/org/opencb/opencga/core/models/variant/VariantPruneParams.java @@ -0,0 +1,39 @@ +package org.opencb.opencga.core.models.variant; + +import org.opencb.opencga.core.tools.ToolParams; + +public class VariantPruneParams extends ToolParams { + + public static final String DESCRIPTION = ""; + private String project; + private boolean dryRun; + private boolean resume; + + + public String getProject() { + return project; + } + + public VariantPruneParams setProject(String project) { + this.project = project; + return this; + } + + public boolean isDryRun() { + return dryRun; + } + + public VariantPruneParams setDryRun(boolean dryRun) { + this.dryRun = dryRun; + return this; + } + + public boolean isResume() { + return resume; + } + + public VariantPruneParams setResume(boolean resume) { + this.resume = resume; + return this; + } +} diff --git a/opencga-server/src/main/java/org/opencb/opencga/server/rest/operations/VariantOperationWebService.java b/opencga-server/src/main/java/org/opencb/opencga/server/rest/operations/VariantOperationWebService.java index c1600d0a977..2b7908ec42f 100644 --- a/opencga-server/src/main/java/org/opencb/opencga/server/rest/operations/VariantOperationWebService.java +++ b/opencga-server/src/main/java/org/opencb/opencga/server/rest/operations/VariantOperationWebService.java @@ -430,6 +430,18 @@ public Response julie( return submitOperationToProject(JulieTool.ID, project, params, jobName, jobDescription, dependsOn, jobTags); } + @POST + @Path("/variant/prune") + @ApiOperation(value = VariantPruneOperationTool.DESCRIPTION, response = Job.class) + public Response variantPrune( + @ApiParam(value = ParamConstants.JOB_ID_CREATION_DESCRIPTION) @QueryParam(ParamConstants.JOB_ID) String jobName, + @ApiParam(value = ParamConstants.JOB_DESCRIPTION_DESCRIPTION) @QueryParam(ParamConstants.JOB_DESCRIPTION) String jobDescription, + @ApiParam(value = ParamConstants.JOB_DEPENDS_ON_DESCRIPTION) @QueryParam(JOB_DEPENDS_ON) String dependsOn, + @ApiParam(value = ParamConstants.JOB_TAGS_DESCRIPTION) @QueryParam(ParamConstants.JOB_TAGS) String jobTags, + @ApiParam(value = VariantPruneParams.DESCRIPTION) VariantPruneParams params) { + return submitOperation(VariantPruneOperationTool.ID, params.getProject(), params, jobName, jobDescription, dependsOn, jobTags); + } + public Response submitOperation(String toolId, String study, ToolParams params, String jobName, String jobDescription, String jobDependsOn, String jobTags) { return submitOperation(toolId, null, study, params, jobName, jobDescription, jobDependsOn, jobTags); diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java index 287ab7dd00d..9138f550bd9 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java @@ -1252,13 +1252,13 @@ public void unsecureUpdateTask(int studyId, TaskMetadata task) throws StorageEng taskDBAdaptor.updateTask(studyId, task, null); } - public TaskMetadata updateTask(int studyId, int taskId, UpdateFunction update) + public TaskMetadata updateTask(int studyId, int taskId, UpdateConsumer consumer) throws E, StorageEngineException { getTask(studyId, taskId); // Check task exists Lock lock = taskDBAdaptor.lock(studyId, taskId, lockDuration, lockTimeout); try { TaskMetadata task = getTask(studyId, taskId); - task = update.update(task); + consumer.update(task); lock.checkLocked(); unsecureUpdateTask(studyId, task); return task; @@ -1732,7 +1732,6 @@ public TaskMetadata.Status setStatus(int studyId, int taskId, TaskMetadata.Statu updateTask(studyId, taskId, task -> { previousStatus.set(task.currentStatus()); task.addStatus(Calendar.getInstance().getTime(), status); - return task; }); return previousStatus.get(); } @@ -1765,13 +1764,7 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName, List false); } /** - * Adds a new {@link TaskMetadata} to the Study Metadata. - * - * Allow execute concurrent operations depending on the "allowConcurrent" predicate. - * If any operation is in ERROR, is not the same operation, and concurrency is not allowed, - * throw {@link StorageEngineException#otherOperationInProgressException} - * If any operation is DONE, RUNNING, is same operation and resume=true, continue - * If all operations are ready, continue + * Find out if the task attempt can be executed, and if it should be a new variant, of continue executing a previous task. * * @param studyId Study id * @param jobOperationName Job operation name used to create the jobName and as {@link TaskMetadata#getOperationName()} @@ -1783,11 +1776,9 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName, List fileIds, boolean resume, TaskMetadata.Type type, - Predicate allowConcurrent) + private TaskMetadata getRunningTaskCompatibleOrFail(int studyId, String jobOperationName, List fileIds, boolean resume, + TaskMetadata.Type type, Predicate allowConcurrent) throws StorageEngineException { - TaskMetadata resumeTask = null; Iterator iterator = taskIterator(studyId, Arrays.asList( TaskMetadata.Status.DONE, @@ -1832,6 +1823,33 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName, throw new IllegalArgumentException("Unknown Status " + currentStatus); } } + return resumeTask; + } + + /** + * Adds a new {@link TaskMetadata} to the Study Metadata. + * + * Allow execute concurrent operations depending on the "allowConcurrent" predicate. + * If any operation is in ERROR, is not the same operation, and concurrency is not allowed, + * throw {@link StorageEngineException#otherOperationInProgressException} + * If any operation is DONE, RUNNING, is same operation and resume=true, continue + * If all operations are ready, continue + * + * @param studyId Study id + * @param jobOperationName Job operation name used to create the jobName and as {@link TaskMetadata#getOperationName()} + * @param fileIds Files to be processed in this batch. + * @param resume Resume operation. Assume that previous operation went wrong. + * @param type Operation type as {@link TaskMetadata.Type} + * @param allowConcurrent Predicate to test if the new operation can be executed at the same time as a non ready operation. + * If not, throws {@link StorageEngineException#otherOperationInProgressException} + * @return The current batchOperation + * @throws StorageEngineException if the operation can't be executed + */ + public TaskMetadata addRunningTask(int studyId, String jobOperationName, + List fileIds, boolean resume, TaskMetadata.Type type, + Predicate allowConcurrent) + throws StorageEngineException { + TaskMetadata resumeTask = getRunningTaskCompatibleOrFail(studyId, jobOperationName, fileIds, resume, type, allowConcurrent); TaskMetadata task; if (resumeTask == null) { @@ -1848,7 +1866,7 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName, throw new StorageEngineException("Attempt to execute a concurrent modification of task " + thisTask.getName() + " (" + thisTask.getId() + ") "); } else { - return thisTask.addStatus(Calendar.getInstance().getTime(), TaskMetadata.Status.RUNNING); + thisTask.addStatus(Calendar.getInstance().getTime(), TaskMetadata.Status.RUNNING); } }); } @@ -1856,6 +1874,25 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName, return task; } + + /** + * Check if a task can be executed. + * + * @param studyId Study id + * @param jobOperationName Job operation name used to create the jobName and as {@link TaskMetadata#getOperationName()} + * @param fileIds Files to be processed in this batch. + * @param resume Resume operation. Assume that previous operation went wrong. + * @param type Operation type as {@link TaskMetadata.Type} + * @param allowConcurrent Predicate to test if the new operation can be executed at the same time as a non ready operation. + * If not, throws {@link StorageEngineException#otherOperationInProgressException} + */ + public void checkTaskCanRun(int studyId, String jobOperationName, List fileIds, boolean resume, + TaskMetadata.Type type, Predicate allowConcurrent) + throws StorageEngineException { + getRunningTaskCompatibleOrFail(studyId, jobOperationName, fileIds, resume, type, allowConcurrent); + } + + protected void checkName(final String type, String name) { if (StringUtils.isEmpty(name)) { throw new IllegalArgumentException(type + " can not be empty!"); diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java index 323ea94088a..026c5e34ae8 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java @@ -976,6 +976,17 @@ protected void postRemoveFiles(String study, List fileIds, List nameGenerator; + private final String tempFilePrefix; + protected Path localOutput; + protected Path outdir; + + public MapReduceOutputFile(Supplier nameGenerator, String tempFilePrefix) throws IOException { + this.nameGenerator = nameGenerator; + this.tempFilePrefix = tempFilePrefix; + getOutputPath(); + } + + protected void getOutputPath() throws IOException { + String outdirStr = getParam(OUTPUT_PARAM); + if (StringUtils.isNotEmpty(outdirStr)) { + outdir = new Path(outdirStr); + + if (isLocal(outdir)) { + localOutput = AbstractHBaseDriver.this.getLocalOutput(outdir, nameGenerator); + outdir = getTempOutdir(tempFilePrefix, localOutput.getName()); + outdir.getFileSystem(getConf()).deleteOnExit(outdir); + } + if (localOutput != null) { + LOGGER.info(" * Outdir file: " + localOutput.toUri()); + LOGGER.info(" * Temporary outdir file: " + outdir.toUri()); + } else { + LOGGER.info(" * Outdir file: " + outdir.toUri()); + } + } + } + + public void postExecute(boolean succeed) throws IOException { + if (succeed) { + if (localOutput != null) { + concatMrOutputToLocal(outdir, localOutput); + } + } + if (localOutput != null) { + deleteTemporaryFile(outdir); + } + } + + public Path getLocalOutput() { + return localOutput; + } + + public Path getOutdir() { + return outdir; + } + } + /** * Concatenate all generated files from a MapReduce job into one single local file. * diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java index 5662f06c0f9..b7e6dc44b86 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java @@ -96,6 +96,7 @@ import org.opencb.opencga.storage.hadoop.variant.index.sample.SampleIndexDeleteHBaseColumnTask; import org.opencb.opencga.storage.hadoop.variant.index.sample.SampleIndexBuilder; import org.opencb.opencga.storage.hadoop.variant.io.HadoopVariantExporter; +import org.opencb.opencga.storage.hadoop.variant.prune.HadoopVariantPruneManager; import org.opencb.opencga.storage.hadoop.variant.score.HadoopVariantScoreLoader; import org.opencb.opencga.storage.hadoop.variant.score.HadoopVariantScoreRemover; import org.opencb.opencga.storage.hadoop.variant.search.HadoopVariantSearchDataWriter; @@ -949,6 +950,11 @@ public void removeStudy(String studyName, URI outdir) throws StorageEngineExcept outdir); } + @Override + public void variantsPrune(boolean dryMode, boolean resume, URI outdir) throws StorageEngineException { + new HadoopVariantPruneManager(this).prune(dryMode, resume, outdir); + } + @Override public void loadVariantScore(URI scoreFile, String study, String scoreName, String cohort1, String cohort2, VariantScoreFormatDescriptor descriptor, ObjectMap options) diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java index dbbe73526a8..2ca95b9940b 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java @@ -68,6 +68,9 @@ public enum HadoopVariantStorageOptions implements ConfigurationOption { VARIANT_TABLE_LOAD_REFERENCE("storage.hadoop.variant.table.load.reference", false), PENDING_SECONDARY_INDEX_TABLE_COMPRESSION("storage.hadoop.pendingSecondaryIndex.table.compression", Compression.Algorithm.SNAPPY.getName()), + PENDING_SECONDARY_INDEX_PRUNE_TABLE_COMPRESSION("storage.hadoop.pendingSecondaryIndexPrune.table.compression", + Compression.Algorithm.SNAPPY.getName()), + ///////////////////////// // Archive table configuration diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/VariantsTableDeleteColumnTask.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/VariantsTableDeleteColumnTask.java index 1eaa0dd0da8..0bdc2805c3f 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/VariantsTableDeleteColumnTask.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/VariantsTableDeleteColumnTask.java @@ -31,7 +31,7 @@ protected List map(Result result) { return mutations; } else { Put put = new Put(result.getRow()); - HadoopVariantSearchIndexUtils.addNotSyncStatus(put, GenomeHelper.COLUMN_FAMILY_BYTES); + HadoopVariantSearchIndexUtils.addNotSyncStatus(put); List mutationsWithPut = new ArrayList<>(mutations.size() + 1); mutationsWithPut.addAll(mutations); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/adaptors/phoenix/VariantPhoenixSchema.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/adaptors/phoenix/VariantPhoenixSchema.java index e02b8547ca5..5ab68f52c70 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/adaptors/phoenix/VariantPhoenixSchema.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/adaptors/phoenix/VariantPhoenixSchema.java @@ -53,6 +53,7 @@ public final class VariantPhoenixSchema { public static final String ANNOTATION_PREFIX = "A_"; public static final String POPULATION_FREQUENCY_PREFIX = ANNOTATION_PREFIX + "PF_"; public static final String FUNCTIONAL_SCORE_PREFIX = ANNOTATION_PREFIX + "FS_"; + public static final String SAMPLE_DATA_SUFIX = "_S"; public static final byte[] SAMPLE_DATA_SUFIX_BYTES = Bytes.toBytes(SAMPLE_DATA_SUFIX); public static final String FILE_SUFIX = "_F"; @@ -335,7 +336,7 @@ public static List getHumanPopulationFrequenciesColumns() { private VariantPhoenixSchema() { } - public static List getStudyColumns(Integer studyId) { + public static List getStudyColumns(int studyId) { return Arrays.asList(getStudyColumn(studyId), getFillMissingColumn(studyId)); } @@ -454,6 +455,14 @@ public static Column getConservationScoreColumn(String source, String rawValue, } } + public static List getStatsColumns(int studyId, List cohortIds) { + List columns = new ArrayList<>(cohortIds.size() * 5); + for (Integer cohortId : cohortIds) { + columns.addAll(getStatsColumns(studyId, cohortId)); + } + return columns; + } + public static List getStatsColumns(int studyId, int cohortId) { return Arrays.asList( getStatsColumn(studyId, cohortId), @@ -505,7 +514,7 @@ public static int extractSampleId(String columnKey) { } public static Integer extractSampleId(String columnKey, boolean failOnMissing) { - if (columnKey.endsWith(SAMPLE_DATA_SUFIX)) { + if (isSampleDataColumn(columnKey)) { return extractId(columnKey, failOnMissing, "sample"); } else if (failOnMissing) { throw new IllegalArgumentException("Not a sample column: " + columnKey); @@ -519,7 +528,7 @@ public static int extractFileIdFromSampleColumn(String columnKey) { } public static Integer extractFileIdFromSampleColumn(String columnKey, boolean failOnMissing) { - if (columnKey.endsWith(SAMPLE_DATA_SUFIX) && StringUtils.countMatches(columnKey, COLUMN_KEY_SEPARATOR) == 3) { + if (isSampleDataColumn(columnKey) && StringUtils.countMatches(columnKey, COLUMN_KEY_SEPARATOR) == 3) { return extractId(columnKey, failOnMissing, "sample", columnKey.indexOf(COLUMN_KEY_SEPARATOR) + 1); } else if (failOnMissing) { throw new IllegalArgumentException("Not a sample column: " + columnKey); @@ -528,12 +537,20 @@ public static Integer extractFileIdFromSampleColumn(String columnKey, boolean fa } } + public static boolean isSampleDataColumn(String columnKey) { + return columnKey.endsWith(SAMPLE_DATA_SUFIX); + } + + public static boolean isFileColumn(String columnKey) { + return columnKey.endsWith(FILE_SUFIX); + } + public static int extractFileId(String columnKey) { return extractFileId(columnKey, true); } public static Integer extractFileId(String columnKey, boolean failOnMissing) { - if (columnKey.endsWith(FILE_SUFIX)) { + if (isFileColumn(columnKey)) { return extractId(columnKey, failOnMissing, "file"); } else if (failOnMissing) { throw new IllegalArgumentException("Not a file column: " + columnKey); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/annotation/VariantAnnotationHadoopDBWriter.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/annotation/VariantAnnotationHadoopDBWriter.java index 41c46cfc295..11bc1cc7d6d 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/annotation/VariantAnnotationHadoopDBWriter.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/annotation/VariantAnnotationHadoopDBWriter.java @@ -75,7 +75,7 @@ protected List convert(List puts) { cleanPendingVariants(); } for (Put put : puts) { - HadoopVariantSearchIndexUtils.addNotSyncStatus(put, columnFamily); + HadoopVariantSearchIndexUtils.addNotSyncStatus(put); loadedVariants.add(put.getRow()); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/load/VariantHadoopDBWriter.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/load/VariantHadoopDBWriter.java index 7d58498ec89..fbd764f498c 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/load/VariantHadoopDBWriter.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/load/VariantHadoopDBWriter.java @@ -63,7 +63,7 @@ protected List convert(List list) { if (HadoopVariantStorageEngine.TARGET_VARIANT_TYPE_SET.contains(variant.getType())) { Put put = converter.convert(variant); if (put != null) { - HadoopVariantSearchIndexUtils.addUnknownSyncStatus(put, GenomeHelper.COLUMN_FAMILY_BYTES); + HadoopVariantSearchIndexUtils.addUnknownSyncStatus(put); puts.add(put); loadedVariants.getAndIncrement(); } else { diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManager.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManager.java new file mode 100644 index 00000000000..a29863271c8 --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManager.java @@ -0,0 +1,260 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import com.google.common.collect.Iterators; +import com.google.common.collect.UnmodifiableIterator; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.util.Bytes; +import org.opencb.biodata.models.variant.StudyEntry; +import org.opencb.biodata.models.variant.Variant; +import org.opencb.commons.ProgressLogger; +import org.opencb.commons.run.ParallelTaskRunner; +import org.opencb.opencga.storage.core.exceptions.StorageEngineException; +import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager; +import org.opencb.opencga.storage.core.metadata.models.TaskMetadata; +import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageEngine; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixKeyFactory; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.ExecutionException; + +public class HadoopVariantPruneManager { + + private Logger logger = LoggerFactory.getLogger(HadoopVariantPruneManager.class); + public static final String OPERATION_NAME = "VariantPrune"; + private final HadoopVariantStorageEngine engine; + + public HadoopVariantPruneManager(HadoopVariantStorageEngine engine) { + this.engine = engine; + } + + public void prune(boolean dryMode, boolean resume, URI outdir) throws StorageEngineException { + List tasks = pre(dryMode, resume); + Thread hook = addHook(tasks); + try { + runPrune(dryMode, outdir); + post(tasks, true); + } catch (Exception e) { + try { + post(tasks, false); + } catch (Exception e1) { + e.addSuppressed(e1); + } + throw e; + } finally { + removeHook(hook); + } + } + + private void removeHook(Thread hook) { + Runtime.getRuntime().removeShutdownHook(hook); + } + + private Thread addHook(List tasks) { + Thread hook = new Thread(() -> { + try { + post(tasks, false); + } catch (StorageEngineException e) { + logger.error("Catch error while running shutdown hook.", e); + } + }); + Runtime.getRuntime().addShutdownHook(hook); + return hook; + } + + private void runPrune(boolean dryMode, URI outdir) throws StorageEngineException { + + try { + String pruneTableName = engine.getDBAdaptor().getTableNameGenerator().getPendingSecondaryIndexPruneTableName(); + new SecondaryIndexPrunePendingVariantsDescriptor() + .createTableIfNeeded(pruneTableName, engine.getDBAdaptor().getHBaseManager()); + } catch (IOException e) { + throw StorageEngineException.ioException(e); + } + + VariantPruneDriverParams params = new VariantPruneDriverParams().setDryRun(dryMode).setOutput(outdir.toString()); + + engine.getMRExecutor().run(VariantPruneDriver.class, + VariantPruneDriver.buildArgs(engine.getVariantTableName(), params.toObjectMap()), + "Variant prune on table '" + engine.getVariantTableName() + "'" + ); + + try { + Path report = Files.list(Paths.get(outdir)) + .filter(p -> p.getFileName().toString().contains("variant_prune_report")) + .findFirst() + .orElse(null); + if (report != null) { + long count = Files.lines(report).count(); + if (dryMode) { + logger.info("Found {} variants to delete", count); + checkReportedVariants(report, count); + } else { + logger.info("Deleted {} variants", count); + } + } else { + logger.info("Nothing to delete!"); + } + } catch (IOException e) { + throw StorageEngineException.ioException(e); + } + } + + private void checkReportedVariants(Path report, long count) throws IOException, StorageEngineException { + // TODO: If count is too large (e.g. > 10M), do not check all of them + + Iterator it = Files.lines(report).map(VariantPruneReportRecord::new).iterator(); + ProgressLogger progressLogger = new ProgressLogger("Checking variant to prune", count); + int batchSize = 100; + UnmodifiableIterator> batches = Iterators.partition(it, batchSize); + try (Table table = engine.getDBAdaptor().getHBaseManager().getConnection().getTable(TableName.valueOf(engine.getVariantTableName()))) { + ParallelTaskRunner ptr = new ParallelTaskRunner<>( + i -> batches.hasNext() ? batches.next() : null, + batch -> { + List gets = new ArrayList<>(batch.size()); + for (VariantPruneReportRecord record : batch) { + Get get = new Get(VariantPhoenixKeyFactory.generateVariantRowKey(record.getVariant())); + if (record.getType() == VariantPruneReportRecord.Type.PARTIAL) { + FilterList filter = new FilterList(FilterList.Operator.MUST_PASS_ONE); + for (Integer study : record.getStudies()) { + filter.addFilter(new ColumnPrefixFilter(Bytes.toBytes(VariantPhoenixSchema.buildStudyColumnsPrefix(study)))); + } + get.setFilter(filter); + } + gets.add(get); + } + Result[] get = table.get(gets); + for (int i = 0; i < get.length; i++) { + Result result = get[i]; + Variant variant = VariantPhoenixKeyFactory.extractVariantFromVariantRowKey(result.getRow()); + VariantPruneReportRecord record = batch.get(i); + if (!variant.sameGenomicVariant(record.getVariant())) { + throw new IllegalStateException("Error checking report! Expected " + record.getVariant() + ", got " + variant); + } + progressLogger.increment(1, () -> "up to variant " + variant); + + List columns = new ArrayList<>(result.rawCells().length); + List sampleOrFileColumns = new ArrayList<>(result.rawCells().length); + for (Cell cell : result.rawCells()) { + String column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + columns.add(column); + if (VariantPhoenixSchema.isSampleDataColumn(column) && VariantPhoenixSchema.isFileColumn(column)) { + sampleOrFileColumns.add(column); + } + } + // TODO: Don't just report, do some checks here +// logger.info("Variant : {}, prune type: {} , columns: {} , {}", variant, record.type, columns.size(), columns); + if (!sampleOrFileColumns.isEmpty()) { + logger.warn("Variant : {}, prune type: {} , columns: {} , {}", variant, record.getType(), + sampleOrFileColumns.size(), + sampleOrFileColumns); + } + } + return batch; + }, + null, + ParallelTaskRunner.Config.builder().setBatchSize(batchSize).setNumTasks(8).build() + ); + try { + ptr.run(); + } catch (ExecutionException e) { + throw new StorageEngineException("Error checking variant prune report", e); + } +// while (batches.hasNext()) { +// List batch = batches.next(); +// List gets = new ArrayList<>(batch.size()); +// for (PruneReportRecord record : batch) { +// Get get = new Get(VariantPhoenixKeyFactory.generateVariantRowKey(record.variant)); +// if (record.type == PruneReportRecord.Type.PARTIAL) { +// FilterList filter = new FilterList(FilterList.Operator.MUST_PASS_ONE); +// for (Integer study : record.studies) { +// filter.addFilter(new ColumnPrefixFilter(Bytes.toBytes(VariantPhoenixSchema.buildStudyColumnsPrefix(study)))); +// } +// get.setFilter(filter); +// } +// gets.add(get); +// } +// Result[] get = table.get(gets); +// for (int i = 0; i < get.length; i++) { +// Result result = get[i]; +// Variant variant = VariantPhoenixKeyFactory.extractVariantFromVariantRowKey(result.getRow()); +// PruneReportRecord record = batch.get(i); +// if (!variant.sameGenomicVariant(record.variant)) { +// throw new IllegalStateException("Error checking report!"); +// } +// List columns = new ArrayList<>(result.rawCells().length); +// for (Cell cell : result.rawCells()) { +// String column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); +// columns.add(column); +// } +// logger.info("Variant : {}, prune type: {} , columns: {} , {}", variant, record.type, columns.size(), columns); +// } +// +// } + + } + } + + private List pre(boolean dryMode, boolean resume) throws StorageEngineException { + VariantStorageMetadataManager mm = engine.getMetadataManager(); + + List tasks = new LinkedList<>(); + List studiesWithoutStats = new LinkedList<>(); + + // First check no running operations in any study + for (Integer studyId : mm.getStudies().values()) { + // Do not allow concurrent operations at all. + mm.checkTaskCanRun(studyId, OPERATION_NAME, Collections.emptyList(), resume, TaskMetadata.Type.REMOVE, tm -> false); + } + + // Check that all variant stats are updated + for (Integer studyId : mm.getStudies().values()) { + if (!mm.getCohortMetadata(studyId, StudyEntry.DEFAULT_COHORT).isStatsReady()) { + studiesWithoutStats.add(mm.getStudyName(studyId)); + } + // FIXME: What if not invalid? + // Might happen if some samples were deleted, or when loading split files? + } + if (!studiesWithoutStats.isEmpty()) { + throw new StorageEngineException("Unable to run variant prune operation. " + + "Please, run variant stats index on cohort '" + StudyEntry.DEFAULT_COHORT + "' for studies " + studiesWithoutStats); + } + + // If no dry-mode, add the new tasks + if (!dryMode) { + for (Integer studyId : mm.getStudies().values()) { + // Do not allow concurrent operations at all. + tasks.add(mm.addRunningTask(studyId, OPERATION_NAME, Collections.emptyList(), resume, TaskMetadata.Type.REMOVE, tm -> false)); + } + } + + return tasks; + } + + private void post(List tasks, boolean success) throws StorageEngineException { + VariantStorageMetadataManager mm = engine.getMetadataManager(); + for (TaskMetadata task : tasks) { + mm.updateTask(task.getStudyId(), task.getId(), t -> { + if (success) { + t.addStatus(TaskMetadata.Status.READY); + } else { + t.addStatus(TaskMetadata.Status.ERROR); + } + }); + } + } + +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsDescriptor.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsDescriptor.java new file mode 100644 index 00000000000..781edd12386 --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsDescriptor.java @@ -0,0 +1,55 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager; +import org.opencb.opencga.storage.hadoop.utils.HBaseManager; +import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageOptions; +import org.opencb.opencga.storage.hadoop.variant.pending.PendingVariantsDescriptor; +import org.opencb.opencga.storage.hadoop.variant.utils.HBaseVariantTableNameGenerator; + +import java.io.IOException; +import java.util.function.Function; + +/** + * Created on 13/02/19. + * + * @author Jacobo Coll <jacobo167@gmail.com> + */ +public class SecondaryIndexPrunePendingVariantsDescriptor implements PendingVariantsDescriptor { + + @Override + public String name() { + return "prune"; + } + + @Override + public void checkValidPendingTableName(String tableName) { + HBaseVariantTableNameGenerator.checkValidPendingSecondaryIndexPruneTableName(tableName); + } + + @Override + public String getTableName(HBaseVariantTableNameGenerator generator) { + return generator.getPendingSecondaryIndexPruneTableName(); + } + + @Override + public boolean createTableIfNeeded(String tableName, HBaseManager hBaseManager) throws IOException { + return createTableIfNeeded(tableName, hBaseManager, Compression.getCompressionAlgorithmByName( + hBaseManager.getConf().get( + HadoopVariantStorageOptions.PENDING_SECONDARY_INDEX_PRUNE_TABLE_COMPRESSION.key(), + HadoopVariantStorageOptions.PENDING_SECONDARY_INDEX_PRUNE_TABLE_COMPRESSION.defaultValue()))); + } + + @Override + public Scan configureScan(Scan scan, VariantStorageMetadataManager metadataManager) { + throw new UnsupportedOperationException(); + } + + @Override + public Function getPendingEvaluatorMapper(VariantStorageMetadataManager metadataManager, boolean overwrite) { + throw new UnsupportedOperationException(); + } +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java new file mode 100644 index 00000000000..82102b57c9a --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java @@ -0,0 +1,363 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.opencb.biodata.models.variant.StudyEntry; +import org.opencb.biodata.models.variant.Variant; +import org.opencb.biodata.models.variant.stats.VariantStats; +import org.opencb.opencga.core.common.TimeUtils; +import org.opencb.opencga.storage.core.exceptions.StorageEngineException; +import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager; +import org.opencb.opencga.storage.core.metadata.models.VariantScoreMetadata; +import org.opencb.opencga.storage.hadoop.variant.AbstractVariantsTableDriver; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.PhoenixHelper; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixKeyFactory; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchema; +import org.opencb.opencga.storage.hadoop.variant.converters.VariantRow; +import org.opencb.opencga.storage.hadoop.variant.mr.VariantMapReduceUtil; +import org.opencb.opencga.storage.hadoop.variant.mr.VariantTableHelper; +import org.opencb.opencga.storage.hadoop.variant.search.HadoopVariantSearchIndexUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.stream.Collectors; + +import static org.opencb.opencga.storage.hadoop.variant.GenomeHelper.COLUMN_FAMILY_BYTES; + +public class VariantPruneDriver extends AbstractVariantsTableDriver { + + private Logger logger = LoggerFactory.getLogger(HadoopVariantPruneManager.class); + public static final String ATTRIBUTE_DELETION_TYPE = "d_type"; + public static final byte[] ATTRIBUTE_DELETION_TYPE_FULL = Bytes.toBytes("FULL"); + public static final byte[] ATTRIBUTE_DELETION_TYPE_PARTIAL = Bytes.toBytes("PARTIAL"); + public static final String ATTRIBUTE_DELETION_STUDIES = "d_studies"; + private final VariantPruneDriverParams params = new VariantPruneDriverParams(); + private MapReduceOutputFile output; + + @Override + protected Class getMapperClass() { + return VariantPruneMapper.class; + } + + @Override + protected String getJobOperationName() { + return "vairants-prune"; + } + + @Override + protected Map getParams() { + Map params = new HashMap<>(); + for (String key : this.params.fields().keySet()) { + params.put(key, "<" + key + ">"); + } + return params; + } + + @Override + protected void parseAndValidateParameters() throws IOException { + super.parseAndValidateParameters(); + + for (String key : params.fields().keySet()) { + String value = getParam(key); + if (value != null) { + params.updateParams(new HashMap<>(Collections.singletonMap(key, value))); + } + } + output = new MapReduceOutputFile( + () -> "variant_prune_report." + TimeUtils.getTime() + ".txt", + "variant_prune_report"); + } + + @Override + protected Job setupJob(Job job, String archiveTable, String variantTable) throws IOException { + + Scan scan = new Scan() + .addFamily(COLUMN_FAMILY_BYTES); + + FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE); + scan.setFilter(filterList); + + scan.addColumn(COLUMN_FAMILY_BYTES, VariantPhoenixSchema.VariantColumn.INDEX_NOT_SYNC.bytes()); + scan.addColumn(COLUMN_FAMILY_BYTES, VariantPhoenixSchema.VariantColumn.INDEX_UNKNOWN.bytes()); + scan.addColumn(COLUMN_FAMILY_BYTES, VariantPhoenixSchema.VariantColumn.INDEX_STUDIES.bytes()); + + Map> columnsPerStudy = new HashMap<>(); + VariantStorageMetadataManager metadataManager = getMetadataManager(); + for (Map.Entry entry : metadataManager.getStudies().entrySet()) { + Integer studyId = entry.getValue(); + Integer cohortId = metadataManager.getCohortId(studyId, StudyEntry.DEFAULT_COHORT); + + scan.addColumn(COLUMN_FAMILY_BYTES, VariantPhoenixSchema.getStudyColumn(studyId).bytes()); + scan.addColumn(COLUMN_FAMILY_BYTES, VariantPhoenixSchema.getStatsColumn(studyId, cohortId).bytes()); + + List columnsToDelete = new ArrayList<>(); + for (PhoenixHelper.Column c : VariantPhoenixSchema.getStudyColumns(studyId)) { + columnsToDelete.add(c.column()); + } + metadataManager.cohortIterator(studyId).forEachRemaining(cohortMetadata -> { + for (PhoenixHelper.Column c : VariantPhoenixSchema.getStatsColumns(studyId, cohortMetadata.getId())) { + columnsToDelete.add(c.column()); + } + }); + for (VariantScoreMetadata variantScore : metadataManager.getStudyMetadata(studyId).getVariantScores()) { + columnsToDelete.add(VariantPhoenixSchema.getVariantScoreColumn(studyId, variantScore.getId()).column()); + } + columnsPerStudy.put(studyId, columnsToDelete); + } + + VariantMapReduceUtil.configureMapReduceScan(scan, getConf()); + + if (!params.isDryRun()) { + // TODO: Remove this line. Test purposes only + logger.warn("Not dry mode! Enforce dry-mode"); + params.setDryRun(true); + } + + FileOutputFormat.setCompressOutput(job, false); + FileOutputFormat.setOutputPath(job, output.getOutdir()); + + if (params.isDryRun()) { + logger.info("Dry mode"); + VariantMapReduceUtil.initTableMapperJob(job, variantTable, scan, getMapperClass()); + VariantMapReduceUtil.setNoneReduce(job); + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(Mutation.class); + job.setOutputFormatClass(VariantPruneReportOutputFormat.class); + } else { + logger.info("Configure multi output job"); + VariantMapReduceUtil.initTableMapperMultiOutputJob(job, variantTable, Collections.singletonList(scan), getMapperClass()); + job.setOutputFormatClass(VariantPruneReportAndWriteOutputFormat.class); + } + + VariantTableHelper.setVariantsTable(job.getConfiguration(), variantTable); + job.getConfiguration().set(VariantPruneMapper.PENDING_TABLE, getTableNameGenerator().getPendingSecondaryIndexPruneTableName()); + VariantPruneMapper.setColumnsPerStudy(job.getConfiguration(), columnsPerStudy); + return job; + } + + @Override + protected void postExecution(boolean succeed) throws IOException, StorageEngineException { + super.postExecution(succeed); + output.postExecute(succeed); + } + + public static class VariantPruneMapper extends TableMapper { + + public static final String PENDING_TABLE = "VariantPruneMapper.pending_table"; + public static final String COLUMNS_PER_STUDY = "VariantPruneMapper.columnsPerStudy"; + + private ImmutableBytesWritable variantsTable; + private ImmutableBytesWritable pendingDeletionVariantsTable; + private Map> columnsPerStudy; + + + public static final byte[] COLUMN = Bytes.toBytes("v"); + public static final byte[] VALUE = new byte[0]; + + @Override + protected void setup(Mapper.Context context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + variantsTable = new ImmutableBytesWritable(Bytes.toBytes(VariantTableHelper.getVariantsTable(conf))); + pendingDeletionVariantsTable = new ImmutableBytesWritable(Bytes.toBytes(conf.get(PENDING_TABLE))); + + this.columnsPerStudy = getColumnsPerStudy(conf); + } + + public static void setColumnsPerStudy(Configuration conf, Map> columnsPerStudy) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry> entry : columnsPerStudy.entrySet()) { + sb.append(entry.getKey()).append("="); + for (String column : entry.getValue()) { + sb.append(column).append(","); + } + sb.append(";"); + } + conf.set(COLUMNS_PER_STUDY, sb.toString()); + } + + public static Map> getColumnsPerStudy(Configuration conf) { + Map> columnsPerStudy = new HashMap<>(); + for (String s : conf.get(COLUMNS_PER_STUDY).split(";")) { + String[] studyColumns = s.split("="); + Integer studyId = Integer.valueOf(studyColumns[0]); + columnsPerStudy.put(studyId, Arrays.stream(studyColumns[1].split(",")).map(Bytes::toBytes).collect(Collectors.toList())); + } + return columnsPerStudy; + } + + @Override + protected void map(ImmutableBytesWritable key, Result value, Context context) + throws IOException, InterruptedException { + VariantRow variantRow = new VariantRow(value); + + List emptyStudies = new ArrayList<>(); + List studies = new ArrayList<>(); + + variantRow.walker() + .onStudy(studyId -> { + studies.add(studyId); + }) + .onCohortStats(c -> { + VariantStats variantStats = c.toJava(); + if (variantStats.getFileCount() == 0) { + emptyStudies.add(c.getStudyId()); + } + }) + .walk(); + + context.getCounter(COUNTER_GROUP_NAME, "variants").increment(1); + + // It might happen that the variant has 0 studies, so emptyStudies is empty + if (emptyStudies.size() == studies.size()) { + // Drop variant && add to deleted variants list + context.getCounter(COUNTER_GROUP_NAME, "variants_deleted").increment(1); + + context.write(pendingDeletionVariantsTable, + new Put(value.getRow()).addColumn(COLUMN_FAMILY_BYTES, COLUMN, VALUE)); + Delete delete = new Delete(value.getRow()); + delete.addFamily(COLUMN_FAMILY_BYTES); + delete.setAttribute(ATTRIBUTE_DELETION_TYPE, ATTRIBUTE_DELETION_TYPE_FULL); + delete.setAttribute(ATTRIBUTE_DELETION_STUDIES, + Bytes.toBytes(emptyStudies.stream().map(Object::toString).collect(Collectors.joining(",")))); + context.write(variantsTable, delete); + } else if (emptyStudies.isEmpty()) { + // skipVariant + context.getCounter(COUNTER_GROUP_NAME, "variants_untouched").increment(1); + } else { + context.getCounter(COUNTER_GROUP_NAME, "variants_delete_some_studies").increment(1); + // Drop studies from variant + Delete delete = new Delete(value.getRow()); + + for (Integer emptyStudy : emptyStudies) { + List columnsToDelete = columnsPerStudy.get(emptyStudy); + for (byte[] columnToDelete : columnsToDelete) { + delete.addColumns(COLUMN_FAMILY_BYTES, columnToDelete); + } + } + delete.setAttribute(ATTRIBUTE_DELETION_TYPE, ATTRIBUTE_DELETION_TYPE_PARTIAL); + delete.setAttribute(ATTRIBUTE_DELETION_STUDIES, + Bytes.toBytes(emptyStudies.stream().map(Object::toString).collect(Collectors.joining(",")))); + + Put updateSecondaryIndexColumns = new Put(value.getRow()); + + HadoopVariantSearchIndexUtils.addNotSyncStatus(updateSecondaryIndexColumns); + + context.write(variantsTable, delete); + context.write(variantsTable, updateSecondaryIndexColumns); + } + } + } + + public static class VariantPruneReportAndWriteOutputFormat extends OutputFormat { + + MultiTableOutputFormat hbaseOutputFormat; + VariantPruneReportOutputFormat reportOutputFormat; + + public VariantPruneReportAndWriteOutputFormat() { + hbaseOutputFormat = new MultiTableOutputFormat(); + reportOutputFormat = new VariantPruneReportOutputFormat(); + } + + @Override + public RecordWriter getRecordWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + RecordWriter hbaseRecordWriter = hbaseOutputFormat.getRecordWriter(context); + RecordWriter reportRecordWriter = reportOutputFormat.getRecordWriter(context); + return new RecordWriter() { + + @Override + public void write(ImmutableBytesWritable key, Mutation value) throws IOException, InterruptedException { + reportRecordWriter.write(key, value); + hbaseRecordWriter.write(key, value); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + reportRecordWriter.close(context); + hbaseRecordWriter.close(context); + } + }; + } + + @Override + public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { + hbaseOutputFormat.checkOutputSpecs(context); + reportOutputFormat.checkOutputSpecs(context); + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + // Ignore TableOutputCommitter as it is no-op. + return reportOutputFormat.getOutputCommitter(context); + } + } + + public static class VariantPruneReportOutputFormat extends FileOutputFormat { + + protected static class ReportRecordWriter extends RecordWriter { + private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + private static final byte[] SEPARATOR = "\t".getBytes(StandardCharsets.UTF_8); + + protected DataOutputStream out; + + public ReportRecordWriter(DataOutputStream out) { + this.out = out; + } + + public synchronized void write(ImmutableBytesWritable key, Mutation mutation) + throws IOException { + if (mutation instanceof Delete) { + Variant variant = VariantPhoenixKeyFactory.extractVariantFromVariantRowKey(mutation.getRow()); + out.write(variant.toString().getBytes(StandardCharsets.UTF_8)); + out.write(SEPARATOR); + out.write(mutation.getAttribute(ATTRIBUTE_DELETION_TYPE)); + out.write(SEPARATOR); + out.write(mutation.getAttribute(ATTRIBUTE_DELETION_STUDIES)); + out.write(NEWLINE); + } + } + + public synchronized void close(TaskAttemptContext context) throws IOException { + out.close(); + } + } + + public RecordWriter getRecordWriter(TaskAttemptContext job) + throws IOException, InterruptedException { + Configuration conf = job.getConfiguration(); + boolean isCompressed = getCompressOutput(job); + CompressionCodec codec = null; + String extension = ""; + if (isCompressed) { + Class codecClass = + getOutputCompressorClass(job, GzipCodec.class); + codec = ReflectionUtils.newInstance(codecClass, conf); + extension = codec.getDefaultExtension(); + } + Path file = getDefaultWorkFile(job, extension); + FileSystem fs = file.getFileSystem(conf); + DataOutputStream fileOut = fs.create(file, false); + if (isCompressed) { + fileOut = new DataOutputStream(codec.createOutputStream(fileOut)); + } + return new ReportRecordWriter(fileOut); + } + } + +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriverParams.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriverParams.java new file mode 100644 index 00000000000..dd55ac382b8 --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriverParams.java @@ -0,0 +1,27 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import org.opencb.opencga.core.tools.ToolParams; + +public class VariantPruneDriverParams extends ToolParams { + + private boolean dryRun; + private String output; + + public boolean isDryRun() { + return dryRun; + } + + public VariantPruneDriverParams setDryRun(boolean dryRun) { + this.dryRun = dryRun; + return this; + } + + public String getOutput() { + return output; + } + + public VariantPruneDriverParams setOutput(String output) { + this.output = output; + return this; + } +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneReportRecord.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneReportRecord.java new file mode 100644 index 00000000000..4da568e9cf2 --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneReportRecord.java @@ -0,0 +1,42 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import org.opencb.biodata.models.variant.Variant; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +class VariantPruneReportRecord { + private final Variant variant; + private final Type type; + private final List studies; + + enum Type { + FULL, PARTIAL + } + + public VariantPruneReportRecord(Variant variant, Type type, List studies) { + this.variant = variant; + this.type = type; + this.studies = studies; + } + + public VariantPruneReportRecord(String line) { + String[] split = line.split("\t"); + variant = new Variant(split[0]); + type = Type.valueOf(split[1]); + studies = Arrays.stream(split[2].split(",")).map(Integer::valueOf).collect(Collectors.toList()); + } + + public Variant getVariant() { + return variant; + } + + public Type getType() { + return type; + } + + public List getStudies() { + return studies; + } +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/score/VariantScoreToHBaseConverter.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/score/VariantScoreToHBaseConverter.java index 16b5a255e05..792f523d5b1 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/score/VariantScoreToHBaseConverter.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/score/VariantScoreToHBaseConverter.java @@ -26,7 +26,7 @@ public VariantScoreToHBaseConverter(byte[] columnFamily, int studyId, int scoreI public Put convert(Pair pair) { Put put = new Put(VariantPhoenixKeyFactory.generateVariantRowKey(pair.getKey())); add(put, column, Arrays.asList(pair.getValue().getScore(), pair.getValue().getPValue())); - HadoopVariantSearchIndexUtils.addNotSyncStatus(put, columnFamily); + HadoopVariantSearchIndexUtils.addNotSyncStatus(put); return put; } } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchIndexUtils.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchIndexUtils.java index 2b32610f68b..6189f0bedc4 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchIndexUtils.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchIndexUtils.java @@ -4,14 +4,20 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.phoenix.schema.types.PBoolean; +import org.apache.phoenix.schema.types.PIntegerArray; import org.apache.phoenix.schema.types.PhoenixArray; +import org.opencb.biodata.models.variant.Variant; import org.opencb.opencga.storage.core.variant.VariantStorageEngine; import org.opencb.opencga.storage.hadoop.variant.GenomeHelper; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.PhoenixHelper; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixKeyFactory; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchema; import org.opencb.opencga.storage.hadoop.variant.converters.AbstractPhoenixConverter; import org.opencb.opencga.storage.hadoop.variant.converters.VariantRow; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import static org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchema.VariantColumn; @@ -26,12 +32,11 @@ public class HadoopVariantSearchIndexUtils { * Marks the row as Not Sync Status. This method should be called when loading annotations or statistics, and when removing a study. * * @param put Mutation to add new Variant information. - * @param columnFamily Main column family. * @return The same put operation with the {@link VariantColumn#INDEX_NOT_SYNC} column. */ - public static Put addNotSyncStatus(Put put, byte[] columnFamily) { + public static Put addNotSyncStatus(Put put) { if (put != null) { - put.addColumn(columnFamily, VariantColumn.INDEX_NOT_SYNC.bytes(), System.currentTimeMillis(), + put.addColumn(GenomeHelper.COLUMN_FAMILY_BYTES, VariantColumn.INDEX_NOT_SYNC.bytes(), System.currentTimeMillis(), PBoolean.TRUE_BYTES); } return put; @@ -41,12 +46,11 @@ public static Put addNotSyncStatus(Put put, byte[] columnFamily) { * Marks the row as Unknown Sync Status. This method should be called when loading or removing files. * * @param put Mutation to add new Variant information. - * @param columnFamily Main column family. * @return The same put operation with the {@link VariantColumn#INDEX_UNKNOWN} column. */ - public static Put addUnknownSyncStatus(Put put, byte[] columnFamily) { + public static Put addUnknownSyncStatus(Put put) { if (put != null) { - put.addColumn(columnFamily, VariantColumn.INDEX_UNKNOWN.bytes(), System.currentTimeMillis(), + put.addColumn(GenomeHelper.COLUMN_FAMILY_BYTES, VariantColumn.INDEX_UNKNOWN.bytes(), System.currentTimeMillis(), PBoolean.TRUE_BYTES); } return put; diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/HadoopMRVariantStatisticsManager.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/HadoopMRVariantStatisticsManager.java index 8d98f3caa6a..6efa3f3fadc 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/HadoopMRVariantStatisticsManager.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/HadoopMRVariantStatisticsManager.java @@ -7,6 +7,7 @@ import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager; import org.opencb.opencga.storage.core.metadata.models.StudyMetadata; import org.opencb.opencga.storage.core.variant.VariantStorageOptions; +import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryException; import org.opencb.opencga.storage.core.variant.stats.VariantStatisticsManager; import org.opencb.opencga.storage.hadoop.variant.adaptors.VariantHadoopDBAdaptor; import org.opencb.opencga.storage.hadoop.variant.executors.MRExecutor; @@ -44,7 +45,9 @@ public void calculateStatistics(String study, List cohorts, QueryOptions } VariantStorageMetadataManager metadataManager = dbAdaptor.getMetadataManager(); StudyMetadata sm = metadataManager.getStudyMetadata(study); - + if (sm == null) { + throw VariantQueryException.studyNotFound(study); + } if (isAggregated(sm, options)) { Aggregation aggregation = getAggregation(sm, options); VariantStatsMapper.setAggregation(options, aggregation); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsDriver.java index 13a848b928b..eb76771b159 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsDriver.java @@ -1,8 +1,6 @@ package org.opencb.opencga.storage.hadoop.variant.stats; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; @@ -53,13 +51,13 @@ public class VariantStatsDriver extends AbstractVariantsTableDriver { private Collection cohorts; private static Logger logger = LoggerFactory.getLogger(VariantStatsDriver.class); - private Path outdir; - private Path localOutput; + private Aggregation aggregation; private boolean overwrite; private boolean statsMultiAllelic; private String statsDefaultGenotype; private boolean excludeFiles; + private MapReduceOutputFile output; public VariantStatsDriver() { } @@ -103,24 +101,9 @@ protected void parseAndValidateParameters() throws IOException { logger.info(" * " + VariantStorageOptions.STATS_DEFAULT_GENOTYPE.key() + ": " + statsDefaultGenotype); - String outdirStr = getParam(OUTPUT); - if (StringUtils.isNotEmpty(outdirStr)) { - outdir = new Path(outdirStr); - - if (isLocal(outdir)) { - localOutput = getLocalOutput(outdir, () -> "variant_stats." - + (cohorts.size() < 10 ? "." + String.join("_", cohortNames) : "") - + TimeUtils.getTime() + ".json"); - outdir = getTempOutdir("opencga_sample_variant_stats", localOutput.getName()); - outdir.getFileSystem(getConf()).deleteOnExit(outdir); - } - if (localOutput != null) { - logger.info(" * Outdir file: " + localOutput.toUri()); - logger.info(" * Temporary outdir file: " + outdir.toUri()); - } else { - logger.info(" * Outdir file: " + outdir.toUri()); - } - } + output = new MapReduceOutputFile(() -> "variant_stats." + + (cohorts.size() < 10 ? "." + String.join("_", cohortNames) : "") + + TimeUtils.getTime() + ".json", "opencga_sample_variant_stats"); } @Override @@ -146,7 +129,7 @@ protected Job setupJob(Job job, String archiveTableName, String variantTableName query.put(VariantQueryParam.INCLUDE_FILE.key(), VariantQueryUtils.NONE); } - if (outdir != null) { + if (output.getOutdir() != null) { // Do not index stats. // Allow any input query. // Write stats to file. @@ -165,7 +148,7 @@ protected Job setupJob(Job job, String archiveTableName, String variantTableName job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setCompressOutput(job, false); - TextOutputFormat.setOutputPath(job, outdir); + TextOutputFormat.setOutputPath(job, output.getOutdir()); } else if (AggregationUtils.isAggregated(aggregation)) { // For aggregated variants use plain VariantStatsMapper @@ -228,14 +211,7 @@ protected Job setupJob(Job job, String archiveTableName, String variantTableName @Override protected void postExecution(boolean succeed) throws IOException, StorageEngineException { super.postExecution(succeed); - if (succeed) { - if (localOutput != null) { - concatMrOutputToLocal(outdir, localOutput); - } - } - if (localOutput != null) { - deleteTemporaryFile(outdir); - } + output.postExecute(succeed); } @Override diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsFromResultMapper.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsFromResultMapper.java index db233c97c41..28b0115a326 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsFromResultMapper.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsFromResultMapper.java @@ -14,7 +14,6 @@ import org.opencb.opencga.storage.core.metadata.models.StudyMetadata; import org.opencb.opencga.storage.core.variant.VariantStorageOptions; import org.opencb.opencga.storage.core.variant.stats.VariantStatsWrapper; -import org.opencb.opencga.storage.hadoop.variant.GenomeHelper; import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchema; import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixKeyFactory; import org.opencb.opencga.storage.hadoop.variant.converters.stats.VariantStatsToHBaseConverter; @@ -185,7 +184,7 @@ private void write(Context context, VariantStatsWrapper wrapper) throws IOExcept if (put == null) { context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "stats.put.null").increment(1); } else { - HadoopVariantSearchIndexUtils.addNotSyncStatus(put, GenomeHelper.COLUMN_FAMILY_BYTES); + HadoopVariantSearchIndexUtils.addNotSyncStatus(put); context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "stats.put").increment(1); context.write(new ImmutableBytesWritable(helper.getVariantsTable()), put); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsMapper.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsMapper.java index 5f349cc76e9..3ed5d8176e7 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsMapper.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/stats/VariantStatsMapper.java @@ -14,7 +14,6 @@ import org.opencb.opencga.storage.core.variant.VariantStorageOptions; import org.opencb.opencga.storage.core.variant.stats.VariantStatisticsCalculator; import org.opencb.opencga.storage.core.variant.stats.VariantStatsWrapper; -import org.opencb.opencga.storage.hadoop.variant.GenomeHelper; import org.opencb.opencga.storage.hadoop.variant.converters.stats.VariantStatsToHBaseConverter; import org.opencb.opencga.storage.hadoop.variant.mr.VariantMapper; import org.opencb.opencga.storage.hadoop.variant.mr.VariantTableHelper; @@ -95,7 +94,7 @@ protected void map(Object key, Variant variant, Context context) throws IOExcept if (put == null) { context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "stats.put.null").increment(1); } else { - HadoopVariantSearchIndexUtils.addNotSyncStatus(put, GenomeHelper.COLUMN_FAMILY_BYTES); + HadoopVariantSearchIndexUtils.addNotSyncStatus(put); context.getCounter(VariantsTableMapReduceHelper.COUNTER_GROUP_NAME, "stats.put").increment(1); context.write(new ImmutableBytesWritable(helper.getVariantsTable()), put); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/utils/HBaseVariantTableNameGenerator.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/utils/HBaseVariantTableNameGenerator.java index cd23d8a59ff..9db6184cea9 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/utils/HBaseVariantTableNameGenerator.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/utils/HBaseVariantTableNameGenerator.java @@ -19,6 +19,7 @@ public class HBaseVariantTableNameGenerator { private static final String SAMPLE_SUFIX = "_variant_sample_index_"; private static final String PENDING_ANNOTATION_SUFIX = "_pending_annotation"; private static final String PENDING_SECONDARY_INDEX_SUFIX = "_pending_secondary_index"; + private static final String PENDING_SECONDARY_INDEX_PRUNE_SUFIX = "_pending_secondary_index_prune"; private static final int MINIMUM_DB_NAME_SIZE = 1; private final String namespace; @@ -27,6 +28,7 @@ public class HBaseVariantTableNameGenerator { private final String metaTableName; private final String pendingAnnotationTableName; private final String pendingSecondaryIndexTableName; + private final String pendingSecondaryIndexPruneTableName; public HBaseVariantTableNameGenerator(String dbName, ObjectMap options) { @@ -47,6 +49,7 @@ public HBaseVariantTableNameGenerator(String namespace, String dbName) { metaTableName = getMetaTableName(namespace, this.dbName); pendingAnnotationTableName = getPendingAnnotationTableName(namespace, this.dbName); pendingSecondaryIndexTableName = getPendingSecondaryIndexTableName(namespace, this.dbName); + pendingSecondaryIndexPruneTableName = getPendingSecondaryIndexPruneTableName(namespace, this.dbName); } public String getVariantTableName() { @@ -69,6 +72,10 @@ public String getPendingSecondaryIndexTableName() { return pendingSecondaryIndexTableName; } + public String getPendingSecondaryIndexPruneTableName() { + return pendingSecondaryIndexPruneTableName; + } + public String getMetaTableName() { return metaTableName; } @@ -133,6 +140,16 @@ public static boolean isValidPendingSecondaryIndexTableName(String pendingSecond return validSuffix(pendingSecondaryIndexTableName, PENDING_SECONDARY_INDEX_SUFIX); } + public static void checkValidPendingSecondaryIndexPruneTableName(String tableName) { + if (!isValidPendingSecondaryIndexPruneTableName(tableName)) { + throw new IllegalArgumentException("Invalid pending prune table name : " + tableName); + } + } + + public static boolean isValidPendingSecondaryIndexPruneTableName(String tableName) { + return validSuffix(tableName, PENDING_SECONDARY_INDEX_PRUNE_SUFIX); + } + public static String getDBNameFromMetaTableName(String metaTableName) { checkValidMetaTableName(metaTableName); return metaTableName.substring(0, metaTableName.length() - META_SUFIX.length()); @@ -237,6 +254,10 @@ public static String getPendingSecondaryIndexTableName(String namespace, String return buildTableName(namespace, dbName, PENDING_SECONDARY_INDEX_SUFIX); } + public static String getPendingSecondaryIndexPruneTableName(String namespace, String dbName) { + return buildTableName(namespace, dbName, PENDING_SECONDARY_INDEX_PRUNE_SUFIX); + } + public static String getMetaTableName(String namespace, String dbName) { return buildTableName(namespace, dbName, META_SUFIX); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageTest.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageTest.java index b3456a45791..e151c9b8b6d 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageTest.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageTest.java @@ -404,6 +404,9 @@ static StorageConfiguration updateStorageConfiguration(StorageConfiguration stor options.put(HadoopVariantStorageOptions.PENDING_SECONDARY_INDEX_TABLE_COMPRESSION.key(), supportedAlgorithms.contains(Compression.Algorithm.SNAPPY) ? Compression.Algorithm.SNAPPY.getName() : Compression.Algorithm.NONE.getName()); + options.put(HadoopVariantStorageOptions.PENDING_SECONDARY_INDEX_PRUNE_TABLE_COMPRESSION.key(), supportedAlgorithms.contains(Compression.Algorithm.SNAPPY) + ? Compression.Algorithm.SNAPPY.getName() + : Compression.Algorithm.NONE.getName()); FileSystem fs = FileSystem.get(HadoopVariantStorageTest.configuration.get()); String intermediateDirectory = fs.getHomeDirectory().toUri().resolve("opencga_test/").toString(); diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManagerTest.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManagerTest.java new file mode 100644 index 00000000000..2b2c4ce11f4 --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManagerTest.java @@ -0,0 +1,232 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import org.hamcrest.MatcherAssert; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.opencb.biodata.models.variant.StudyEntry; +import org.opencb.commons.datastore.core.ObjectMap; +import org.opencb.commons.datastore.core.Query; +import org.opencb.commons.datastore.core.QueryOptions; +import org.opencb.opencga.storage.core.exceptions.StorageEngineException; +import org.opencb.opencga.storage.core.variant.VariantStorageBaseTest; +import org.opencb.opencga.storage.core.variant.VariantStorageEngine; +import org.opencb.opencga.storage.core.variant.VariantStorageOptions; +import org.opencb.opencga.storage.core.variant.adaptors.VariantMatchers; +import org.opencb.opencga.storage.core.variant.annotation.DefaultVariantAnnotationManager; +import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageEngine; +import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageTest; +import org.opencb.opencga.storage.hadoop.variant.VariantHbaseTestUtils; +import org.opencb.opencga.storage.hadoop.variant.adaptors.VariantHadoopDBAdaptor; + +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.*; + +public class HadoopVariantPruneManagerTest extends VariantStorageBaseTest implements HadoopVariantStorageTest { + + public static final String STUDY_NAME_3 = "study_3"; + public static final String STUDY_NAME_4 = "study_4"; + public static final String STUDY_NAME_5 = "study_5"; + + @ClassRule + public static HadoopExternalResource externalResource = new HadoopExternalResource(); + private VariantHadoopDBAdaptor dbAdaptor; + private boolean loaded; + private HadoopVariantStorageEngine engine; + + @After + public void tearDown() throws Exception { + VariantHbaseTestUtils.printVariants(getVariantStorageEngine().getDBAdaptor(), newOutputUri(getTestName().getMethodName())); + } + + @Before + public void before() throws Exception { + engine = getVariantStorageEngine(); + dbAdaptor = engine.getDBAdaptor(); +// if (!loaded) { +// load(); +// loaded = true; +// } + } + + public void load() throws Exception { + clearDB(DB_NAME); + + // Study 1 - single file + ObjectMap params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false); + runETL(engine, smallInputUri, outputUri, params, true, true, true); + engine.calculateStats(STUDY_NAME, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + // Study 2 - multi files + params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME_2) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false) + .append(VariantStorageOptions.LOAD_SPLIT_DATA.key(), VariantStorageEngine.SplitData.MULTI); + + runETL(engine, getResourceUri("by_chr/chr22_1-1.variant-test-file.vcf.gz"), outputUri, params, true, true, true); + runETL(engine, getResourceUri("by_chr/chr22_1-2.variant-test-file.vcf.gz"), outputUri, params, true, true, true); + runETL(engine, getResourceUri("by_chr/chr22_1-2-DUP.variant-test-file.vcf.gz"), outputUri, params, true, true, true); + engine.calculateStats(STUDY_NAME_2, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + + // Study 3 - platinum + params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME_3) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false); + runETL(engine, getPlatinumFile(0), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(1), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(2), outputUri, params, true, true, true); + engine.calculateStats(STUDY_NAME_3, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + // Study 4 - platinum_2 + params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME_4) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false); + runETL(engine, getPlatinumFile(3), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(4), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(5), outputUri, params, true, true, true); + engine.calculateStats(STUDY_NAME_4, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + // Study 5, dense + params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME_4) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false); + runETL(engine, getResourceUri("variant-test-dense.vcf.gz"), outputUri, params, true, true, true); + engine.calculateStats(STUDY_NAME_4, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + + // ---------------- Annotate + this.variantStorageEngine.annotate(new Query(), new QueryOptions(DefaultVariantAnnotationManager.OUT_DIR, outputUri)); + + VariantHbaseTestUtils.printVariants(dbAdaptor, newOutputUri()); + } + + @Test + public void testVariantPruneSingleStudy() throws Exception { + ObjectMap params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME_3) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false); + runETL(engine, getPlatinumFile(0), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(1), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(2), outputUri, params, true, true, true); + runETL(engine, getPlatinumFile(3), outputUri, params, true, true, true); + + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_3, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + variantPrune("1_prune_dry", true, 0); + + int studyId = engine.getMetadataManager().getStudyId(STUDY_NAME_3); + List samples = engine.getMetadataManager().getIndexedSamples(studyId); + String sampleName = engine.getMetadataManager().getSampleName(studyId, samples.get(0)); + engine.removeSamples(STUDY_NAME_3, Collections.singletonList(sampleName), newOutputUri("2_remove_sample_" + sampleName)); + + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_3, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + int variantsToPrune = variantPrune("3_prune_dry", true); + variantPrune("4_prune_wet", false, variantsToPrune); + variantPrune("5_prune_dry", true, 0); + } + + @Test + public void testVariantPruneSingleStudyMultiFile() throws Exception { + ObjectMap params = new ObjectMap() + .append(VariantStorageOptions.STUDY.key(), STUDY_NAME_2) + .append(VariantStorageOptions.ANNOTATE.key(), false) + .append(VariantStorageOptions.STATS_CALCULATE.key(), false) + .append(VariantStorageOptions.LOAD_SPLIT_DATA.key(), VariantStorageEngine.SplitData.MULTI); + + runETL(engine, getResourceUri("by_chr/chr22_1-1.variant-test-file.vcf.gz"), outputUri, params, true, true, true); + runETL(engine, getResourceUri("by_chr/chr22_1-2.variant-test-file.vcf.gz"), outputUri, params, true, true, true); + + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_2, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + runETL(engine, getResourceUri("by_chr/chr22_1-2-DUP.variant-test-file.vcf.gz"), outputUri, params, true, true, true); + + + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_2, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + variantPrune("1_prune_dry", true, 0); + + engine.removeFile(STUDY_NAME_2, "chr22_1-1.variant-test-file.vcf.gz", newOutputUri("2_remove_sample_chr22_1-1.variant-test-file")); + + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_2, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + int variantsToPrune = variantPrune("3_prune_dry", true); + variantPrune("4_prune_wet", false, variantsToPrune); + variantPrune("5_prune_dry", true, 0); + } + + + @Test + public void testVariantPruneMultiStudy() throws Exception { + load(); + + variantPrune("1_prune_dry", true, 0); + + engine.removeFile(STUDY_NAME_3, "1K.end.platinum-genomes-vcf-NA12877_S1.genome.vcf.gz", outputUri); + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_3, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + engine.removeFile(STUDY_NAME_2, "chr22_1-1.variant-test-file.vcf.gz", newOutputUri("2_remove_sample_chr22_1-1.variant-test-file")); + checkStatsUpdateRequiredForVariantPrune(); + engine.calculateStats(STUDY_NAME_2, Collections.singletonList(StudyEntry.DEFAULT_COHORT), new QueryOptions()); + + int variantsToPrune = variantPrune("3_prune_dry", true); + variantPrune("4_prune_wet", false, variantsToPrune); + variantPrune("5_prune_dry", true, 0); + } + + private int variantPrune(String testName, boolean dryMode) throws Exception { + return variantPrune(testName, dryMode, null); + } + + private int variantPrune(String testName, boolean dryMode, Integer expectedPrunedVariants) throws Exception { + URI outdir = newOutputUri(testName); + getVariantStorageEngine().variantsPrune(dryMode, false, outdir); + + Path report = Files.list(Paths.get(outdir)).filter(p -> p.getFileName().toString().contains("variant_prune_report")).findFirst() + .orElse(null); + int reportedVariants; + if (report == null) { + reportedVariants = 0; + } else { + reportedVariants = (int) Files.lines(report).count(); + } + if (expectedPrunedVariants == null) { + MatcherAssert.assertThat(reportedVariants, VariantMatchers.gt(0)); + } else { + assertEquals(expectedPrunedVariants.intValue(), reportedVariants); + } + return reportedVariants; + } + + private void checkStatsUpdateRequiredForVariantPrune() throws Exception { + try { + getVariantStorageEngine().variantsPrune(true, false, outputUri); + fail("Should fail, as the variant stats are not valid"); + } catch (StorageEngineException e) { + System.out.println(e.getClass() + " = " + e.getMessage()); + assertTrue(e.getMessage().startsWith("Unable to run variant prune operation. Please, run variant stats index on cohort")); + } + } + +} \ No newline at end of file diff --git a/opencga-storage/opencga-storage-mongodb/src/test/java/org/opencb/opencga/storage/mongodb/variant/MongoVariantStorageEngineTest.java b/opencga-storage/opencga-storage-mongodb/src/test/java/org/opencb/opencga/storage/mongodb/variant/MongoVariantStorageEngineTest.java index 530033c8e64..af8ee660581 100644 --- a/opencga-storage/opencga-storage-mongodb/src/test/java/org/opencb/opencga/storage/mongodb/variant/MongoVariantStorageEngineTest.java +++ b/opencga-storage/opencga-storage-mongodb/src/test/java/org/opencb/opencga/storage/mongodb/variant/MongoVariantStorageEngineTest.java @@ -107,7 +107,6 @@ public void stageResumeFromErrorTest() throws Exception { t.addStatus(new Date(System.currentTimeMillis() - 100), TaskMetadata.Status.RUNNING); t.addStatus(new Date(System.currentTimeMillis() - 50), TaskMetadata.Status.ERROR); // Last status is ERROR - return t; }); System.out.println("----------------"); @@ -136,7 +135,6 @@ public void stageForceResumeTest() throws Exception { operation.addStatus(new Date(System.currentTimeMillis() - 50), TaskMetadata.Status.ERROR); operation.addStatus(new Date(System.currentTimeMillis()), TaskMetadata.Status.RUNNING); // Last status is RUNNING - return operation; }); try { @@ -199,7 +197,6 @@ private long simulateStageError(StudyMetadata studyMetadata, VariantMongoDBAdapt TreeMap status = task.getStatus(); status.remove(status.lastKey(), TaskMetadata.Status.READY); task.addStatus(TaskMetadata.Status.ERROR); - return task; }); // 2) Remove from files collection From abaf82110e66bc9f9356ec4bc5a1e05bc3cb6d4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Fri, 24 Jun 2022 15:44:58 +0100 Subject: [PATCH 3/6] storage: Implement non-dry mode variant-prune operation. #TASK-1116 --- .../operations/VariantPruneOperationTool.java | 5 - .../opencga/core/common/ExceptionUtils.java | 40 +++- .../VariantStorageMetadataManager.java | 1 + .../core/variant/VariantStorageEngine.java | 1 + .../search/solr/VariantSearchManager.java | 15 ++ .../variant/HadoopVariantStorageEngine.java | 4 +- ...ndaryIndexPrunePendingVariantsManager.java | 12 ++ .../variant/prune/VariantPruneDriver.java | 55 +++-- ...eManager.java => VariantPruneManager.java} | 189 ++++++++++++------ .../prune/VariantPruneReportRecord.java | 2 +- .../HadoopVariantSearchDataDeleter.java | 87 ++++++++ .../search/HadoopVariantSearchIndexUtils.java | 6 - ...Test.java => VariantPruneManagerTest.java} | 2 +- 13 files changed, 315 insertions(+), 104 deletions(-) create mode 100644 opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsManager.java rename opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/{HadoopVariantPruneManager.java => VariantPruneManager.java} (56%) create mode 100644 opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchDataDeleter.java rename opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/{HadoopVariantPruneManagerTest.java => VariantPruneManagerTest.java} (99%) diff --git a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java index 73c9501e3ce..5cac84f7eaf 100644 --- a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java +++ b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java @@ -19,11 +19,6 @@ public class VariantPruneOperationTool extends OperationTool { protected void run() throws Exception { step(() -> { - - // TODO: Remove this line. Test purposes only - logger.warn("Enforce dry mode!"); - params.setDryRun(true); - getVariantStorageManager().variantPrune(params.getProject(), getOutDir().toUri(), params, getToken()); }); diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/common/ExceptionUtils.java b/opencga-core/src/main/java/org/opencb/opencga/core/common/ExceptionUtils.java index 0d0fc95511a..8dd5408b85b 100644 --- a/opencga-core/src/main/java/org/opencb/opencga/core/common/ExceptionUtils.java +++ b/opencga-core/src/main/java/org/opencb/opencga/core/common/ExceptionUtils.java @@ -25,19 +25,27 @@ public static StringBuilder prettyExceptionMessage(Throwable exception, StringBu public static StringBuilder prettyExceptionMessage(Throwable exception, StringBuilder message, boolean multiline, boolean includeClassName) { + String separator; + if (multiline) { + separator = "\n"; + } else { + separator = " ; "; + } + return prettyExceptionMessage(exception, message, multiline, includeClassName, separator); + } + + private static StringBuilder prettyExceptionMessage(Throwable exception, StringBuilder message, boolean multiline, + boolean includeClassName, String separator) { Set messages = new HashSet<>(); do { - if (exception.getMessage() != null && !messages.add(exception.getMessage())) { + if (exception.getMessage() != null && !messages.add(exception.getMessage()) + && exception.getSuppressed().length == 0) { // Duplicated message. Skip this cause exception = exception.getCause(); continue; } if (message.length() != 0) { - if (multiline) { - message.append("\n"); - } else { - message.append(" ; "); - } + message.append(separator); } String exMessage = exception.getMessage(); if (StringUtils.isBlank(exMessage)) { @@ -49,6 +57,26 @@ public static StringBuilder prettyExceptionMessage(Throwable exception, StringBu message.append("[").append(exception.getClass().getSimpleName()).append("] "); } message.append(exMessage); + if (exception.getSuppressed().length > 0) { + StringBuilder sb = new StringBuilder(); + String intraSeparator = multiline ? separator + " " : separator; + for (Throwable suppressed : exception.getSuppressed()) { + prettyExceptionMessage(suppressed, sb, multiline, includeClassName, intraSeparator); + } + message.append(" "); + } exception = exception.getCause(); } while (exception != null); diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java index 9138f550bd9..4cbc92a394d 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/metadata/VariantStorageMetadataManager.java @@ -1885,6 +1885,7 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName, * @param type Operation type as {@link TaskMetadata.Type} * @param allowConcurrent Predicate to test if the new operation can be executed at the same time as a non ready operation. * If not, throws {@link StorageEngineException#otherOperationInProgressException} + * @throws StorageEngineException if the operation can't be executed */ public void checkTaskCanRun(int studyId, String jobOperationName, List fileIds, boolean resume, TaskMetadata.Type type, Predicate allowConcurrent) diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java index 026c5e34ae8..5e2558339de 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java @@ -980,6 +980,7 @@ protected void postRemoveFiles(String study, List fileIds, List getVariantDeleter(String collection) { + return list -> { + try { + if (list != null) { + delete(collection, list.stream().map(Variant::toString).collect(Collectors.toList())); + } + } catch (IOException | SolrServerException e) { + throw new RuntimeException(e); + } + return true; + }; + } + /*------------------------------------- * P R I V A T E M E T H O D S -------------------------------------*/ diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java index b7e6dc44b86..7bc6359e1e5 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java @@ -96,7 +96,7 @@ import org.opencb.opencga.storage.hadoop.variant.index.sample.SampleIndexDeleteHBaseColumnTask; import org.opencb.opencga.storage.hadoop.variant.index.sample.SampleIndexBuilder; import org.opencb.opencga.storage.hadoop.variant.io.HadoopVariantExporter; -import org.opencb.opencga.storage.hadoop.variant.prune.HadoopVariantPruneManager; +import org.opencb.opencga.storage.hadoop.variant.prune.VariantPruneManager; import org.opencb.opencga.storage.hadoop.variant.score.HadoopVariantScoreLoader; import org.opencb.opencga.storage.hadoop.variant.score.HadoopVariantScoreRemover; import org.opencb.opencga.storage.hadoop.variant.search.HadoopVariantSearchDataWriter; @@ -952,7 +952,7 @@ public void removeStudy(String studyName, URI outdir) throws StorageEngineExcept @Override public void variantsPrune(boolean dryMode, boolean resume, URI outdir) throws StorageEngineException { - new HadoopVariantPruneManager(this).prune(dryMode, resume, outdir); + new VariantPruneManager(this).prune(dryMode, resume, outdir); } @Override diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsManager.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsManager.java new file mode 100644 index 00000000000..6f34b550683 --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/SecondaryIndexPrunePendingVariantsManager.java @@ -0,0 +1,12 @@ +package org.opencb.opencga.storage.hadoop.variant.prune; + +import org.opencb.opencga.storage.hadoop.variant.adaptors.VariantHadoopDBAdaptor; +import org.opencb.opencga.storage.hadoop.variant.pending.PendingVariantsManager; + +public class SecondaryIndexPrunePendingVariantsManager extends PendingVariantsManager { + + public SecondaryIndexPrunePendingVariantsManager(VariantHadoopDBAdaptor dbAdaptor) { + super(dbAdaptor, new SecondaryIndexPrunePendingVariantsDescriptor()); + } + +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java index 82102b57c9a..b41e44e3b4a 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java @@ -14,6 +14,7 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Tool; import org.opencb.biodata.models.variant.StudyEntry; import org.opencb.biodata.models.variant.Variant; import org.opencb.biodata.models.variant.stats.VariantStats; @@ -34,6 +35,7 @@ import java.io.DataOutputStream; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors; @@ -42,11 +44,11 @@ public class VariantPruneDriver extends AbstractVariantsTableDriver { - private Logger logger = LoggerFactory.getLogger(HadoopVariantPruneManager.class); - public static final String ATTRIBUTE_DELETION_TYPE = "d_type"; - public static final byte[] ATTRIBUTE_DELETION_TYPE_FULL = Bytes.toBytes("FULL"); - public static final byte[] ATTRIBUTE_DELETION_TYPE_PARTIAL = Bytes.toBytes("PARTIAL"); + private Logger logger = LoggerFactory.getLogger(VariantPruneManager.class); public static final String ATTRIBUTE_DELETION_STUDIES = "d_studies"; + public static final String ATTRIBUTE_DELETION_TYPE = "d_type"; + public static final byte[] ATTRIBUTE_DELETION_TYPE_FULL = Bytes.toBytes(VariantPruneReportRecord.Type.FULL.toString()); + public static final byte[] ATTRIBUTE_DELETION_TYPE_PARTIAL = Bytes.toBytes(VariantPruneReportRecord.Type.PARTIAL.toString()); private final VariantPruneDriverParams params = new VariantPruneDriverParams(); private MapReduceOutputFile output; @@ -122,12 +124,7 @@ protected Job setupJob(Job job, String archiveTable, String variantTable) throws } VariantMapReduceUtil.configureMapReduceScan(scan, getConf()); - - if (!params.isDryRun()) { - // TODO: Remove this line. Test purposes only - logger.warn("Not dry mode! Enforce dry-mode"); - params.setDryRun(true); - } + logger.info("Scan = " + scan); FileOutputFormat.setCompressOutput(job, false); FileOutputFormat.setOutputPath(job, output.getOutdir()); @@ -147,6 +144,7 @@ protected Job setupJob(Job job, String archiveTable, String variantTable) throws VariantTableHelper.setVariantsTable(job.getConfiguration(), variantTable); job.getConfiguration().set(VariantPruneMapper.PENDING_TABLE, getTableNameGenerator().getPendingSecondaryIndexPruneTableName()); + job.getConfiguration().set(VariantPruneMapper.PENDING_ANNOTATION_TABLE, getTableNameGenerator().getPendingAnnotationTableName()); VariantPruneMapper.setColumnsPerStudy(job.getConfiguration(), columnsPerStudy); return job; } @@ -160,10 +158,12 @@ protected void postExecution(boolean succeed) throws IOException, StorageEngineE public static class VariantPruneMapper extends TableMapper { public static final String PENDING_TABLE = "VariantPruneMapper.pending_table"; + public static final String PENDING_ANNOTATION_TABLE = "VariantPruneMapper.pending_annotations_table"; public static final String COLUMNS_PER_STUDY = "VariantPruneMapper.columnsPerStudy"; private ImmutableBytesWritable variantsTable; private ImmutableBytesWritable pendingDeletionVariantsTable; + private ImmutableBytesWritable pendingAnnotationVariantsTable; private Map> columnsPerStudy; @@ -175,6 +175,7 @@ protected void setup(Mapper.Context context) throws IOException, InterruptedExce Configuration conf = context.getConfiguration(); variantsTable = new ImmutableBytesWritable(Bytes.toBytes(VariantTableHelper.getVariantsTable(conf))); pendingDeletionVariantsTable = new ImmutableBytesWritable(Bytes.toBytes(conf.get(PENDING_TABLE))); + pendingAnnotationVariantsTable = new ImmutableBytesWritable(Bytes.toBytes(conf.get(PENDING_ANNOTATION_TABLE))); this.columnsPerStudy = getColumnsPerStudy(conf); } @@ -209,10 +210,8 @@ protected void map(ImmutableBytesWritable key, Result value, Context context) List emptyStudies = new ArrayList<>(); List studies = new ArrayList<>(); - variantRow.walker() - .onStudy(studyId -> { - studies.add(studyId); - }) + Variant variant = variantRow.walker() + .onStudy(studies::add) .onCohortStats(c -> { VariantStats variantStats = c.toJava(); if (variantStats.getFileCount() == 0) { @@ -230,6 +229,9 @@ protected void map(ImmutableBytesWritable key, Result value, Context context) context.write(pendingDeletionVariantsTable, new Put(value.getRow()).addColumn(COLUMN_FAMILY_BYTES, COLUMN, VALUE)); + context.write(pendingAnnotationVariantsTable, + new Delete(value.getRow())); + Delete delete = new Delete(value.getRow()); delete.addFamily(COLUMN_FAMILY_BYTES); delete.setAttribute(ATTRIBUTE_DELETION_TYPE, ATTRIBUTE_DELETION_TYPE_FULL); @@ -250,6 +252,11 @@ protected void map(ImmutableBytesWritable key, Result value, Context context) delete.addColumns(COLUMN_FAMILY_BYTES, columnToDelete); } } + if (delete.isEmpty()) { + // Impossible statement (unless a bug). + // This block is here to prevent accidental "full row" deletes. + throw new IllegalStateException("Unexpected empty delete at partial variant prune in variant " + variant); + } delete.setAttribute(ATTRIBUTE_DELETION_TYPE, ATTRIBUTE_DELETION_TYPE_PARTIAL); delete.setAttribute(ATTRIBUTE_DELETION_STUDIES, Bytes.toBytes(emptyStudies.stream().map(Object::toString).collect(Collectors.joining(",")))); @@ -266,8 +273,8 @@ protected void map(ImmutableBytesWritable key, Result value, Context context) public static class VariantPruneReportAndWriteOutputFormat extends OutputFormat { - MultiTableOutputFormat hbaseOutputFormat; - VariantPruneReportOutputFormat reportOutputFormat; + private MultiTableOutputFormat hbaseOutputFormat; + private VariantPruneReportOutputFormat reportOutputFormat; public VariantPruneReportAndWriteOutputFormat() { hbaseOutputFormat = new MultiTableOutputFormat(); @@ -314,15 +321,17 @@ protected static class ReportRecordWriter extends RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); + ImmutableBytesWritable variantsTable = new ImmutableBytesWritable(Bytes.toBytes(VariantTableHelper.getVariantsTable(conf))); boolean isCompressed = getCompressOutput(job); CompressionCodec codec = null; String extension = ""; @@ -356,8 +366,13 @@ public RecordWriter getRecordWriter(TaskAttemp if (isCompressed) { fileOut = new DataOutputStream(codec.createOutputStream(fileOut)); } - return new ReportRecordWriter(fileOut); + return new ReportRecordWriter(fileOut, variantsTable); } } + @SuppressWarnings("unchecked") + public static void main(String[] args) { + main(args, (Class) MethodHandles.lookup().lookupClass()); + } + } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManager.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManager.java similarity index 56% rename from opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManager.java rename to opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManager.java index a29863271c8..20b15b396f1 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManager.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManager.java @@ -1,7 +1,5 @@ package org.opencb.opencga.storage.hadoop.variant.prune; -import com.google.common.collect.Iterators; -import com.google.common.collect.UnmodifiableIterator; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; @@ -13,13 +11,20 @@ import org.opencb.biodata.models.variant.StudyEntry; import org.opencb.biodata.models.variant.Variant; import org.opencb.commons.ProgressLogger; +import org.opencb.commons.datastore.core.Query; import org.opencb.commons.run.ParallelTaskRunner; +import org.opencb.commons.run.Task; +import org.opencb.opencga.core.common.TimeUtils; import org.opencb.opencga.storage.core.exceptions.StorageEngineException; +import org.opencb.opencga.storage.core.exceptions.VariantSearchException; import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager; import org.opencb.opencga.storage.core.metadata.models.TaskMetadata; +import org.opencb.opencga.storage.core.variant.search.solr.VariantSearchManager; import org.opencb.opencga.storage.hadoop.variant.HadoopVariantStorageEngine; import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixKeyFactory; import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchema; +import org.opencb.opencga.storage.hadoop.variant.pending.PendingVariantsDBCleaner; +import org.opencb.opencga.storage.hadoop.variant.search.HadoopVariantSearchDataDeleter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,14 +35,17 @@ import java.nio.file.Paths; import java.util.*; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; -public class HadoopVariantPruneManager { +public class VariantPruneManager { - private Logger logger = LoggerFactory.getLogger(HadoopVariantPruneManager.class); + public static final int CHECK_DRY_RUN_LIMIT = 1000000; + private Logger logger = LoggerFactory.getLogger(VariantPruneManager.class); public static final String OPERATION_NAME = "VariantPrune"; private final HadoopVariantStorageEngine engine; - public HadoopVariantPruneManager(HadoopVariantStorageEngine engine) { + public VariantPruneManager(HadoopVariantStorageEngine engine) { this.engine = engine; } @@ -75,54 +83,96 @@ private Thread addHook(List tasks) { return hook; } - private void runPrune(boolean dryMode, URI outdir) throws StorageEngineException { + private Map runPrune(boolean dryMode, URI outdir) throws StorageEngineException { try { - String pruneTableName = engine.getDBAdaptor().getTableNameGenerator().getPendingSecondaryIndexPruneTableName(); - new SecondaryIndexPrunePendingVariantsDescriptor() - .createTableIfNeeded(pruneTableName, engine.getDBAdaptor().getHBaseManager()); + if (!dryMode) { + // Do not create table in dry-mode. + String pruneTableName = engine.getDBAdaptor().getTableNameGenerator().getPendingSecondaryIndexPruneTableName(); + new SecondaryIndexPrunePendingVariantsDescriptor() + .createTableIfNeeded(pruneTableName, engine.getDBAdaptor().getHBaseManager()); + } } catch (IOException e) { throw StorageEngineException.ioException(e); } - VariantPruneDriverParams params = new VariantPruneDriverParams().setDryRun(dryMode).setOutput(outdir.toString()); + VariantPruneDriverParams params = new VariantPruneDriverParams() + .setDryRun(dryMode) + .setOutput(outdir.resolve("variant_prune_report." + TimeUtils.getTime() + ".txt").toString()); engine.getMRExecutor().run(VariantPruneDriver.class, VariantPruneDriver.buildArgs(engine.getVariantTableName(), params.toObjectMap()), "Variant prune on table '" + engine.getVariantTableName() + "'" ); + Path report; try { - Path report = Files.list(Paths.get(outdir)) + report = Files.list(Paths.get(outdir)) .filter(p -> p.getFileName().toString().contains("variant_prune_report")) .findFirst() .orElse(null); - if (report != null) { - long count = Files.lines(report).count(); - if (dryMode) { - logger.info("Found {} variants to delete", count); - checkReportedVariants(report, count); - } else { - logger.info("Deleted {} variants", count); - } - } else { + Map countByType; + if (report == null) { logger.info("Nothing to delete!"); + countByType = Arrays.stream(VariantPruneReportRecord.Type.values()).collect(Collectors.toMap(k -> k, k -> 0L)); + } else { + countByType = Files.lines(report) + .map(VariantPruneReportRecord::new) + .collect(Collectors.groupingBy( + VariantPruneReportRecord::getType, + Collectors.counting())); + } + + long totalCount = countByType.values().stream().mapToLong(l -> l).count(); + if (dryMode) { + logger.info("Found {} variants to prune, {}", totalCount, countByType); + checkReportedVariants(report, totalCount); + } else { + logger.info("Pruned {} variants, {}", totalCount, countByType); + pruneFromSecondaryIndex(countByType.getOrDefault(VariantPruneReportRecord.Type.FULL, 0L)); + updateSecondaryIndex(countByType.getOrDefault(VariantPruneReportRecord.Type.PARTIAL, 0L)); } + return countByType; } catch (IOException e) { throw StorageEngineException.ioException(e); } } private void checkReportedVariants(Path report, long count) throws IOException, StorageEngineException { - // TODO: If count is too large (e.g. > 10M), do not check all of them - Iterator it = Files.lines(report).map(VariantPruneReportRecord::new).iterator(); - ProgressLogger progressLogger = new ProgressLogger("Checking variant to prune", count); + logger.info("Check dry-run report. Found {} variants to prune", count); + int batchSize = 100; - UnmodifiableIterator> batches = Iterators.partition(it, batchSize); - try (Table table = engine.getDBAdaptor().getHBaseManager().getConnection().getTable(TableName.valueOf(engine.getVariantTableName()))) { + int variantsToSkip; + long variantsToCheck; + if (count > CHECK_DRY_RUN_LIMIT) { + logger.warn("Will check only {} out of {} variants", CHECK_DRY_RUN_LIMIT, count); + variantsToSkip = (int) (count / CHECK_DRY_RUN_LIMIT); + variantsToCheck = CHECK_DRY_RUN_LIMIT; + } else { + variantsToSkip = 0; + variantsToCheck = count; + } + + Iterator it = Files.lines(report).map(VariantPruneReportRecord::new).iterator(); + ProgressLogger progressLogger = new ProgressLogger("Checking variant to prune", variantsToCheck); + try (Table table = engine.getDBAdaptor().getHBaseManager().getConnection() + .getTable(TableName.valueOf(engine.getVariantTableName()))) { + AtomicInteger variantsWithProblems = new AtomicInteger(); ParallelTaskRunner ptr = new ParallelTaskRunner<>( - i -> batches.hasNext() ? batches.next() : null, + i -> { + List records = new ArrayList<>(i); + int skippedVariants = 0; + while (it.hasNext()) { + if (skippedVariants == variantsToSkip) { + skippedVariants = 0; + records.add(it.next()); + } else { + skippedVariants++; + } + } + return records; + }, batch -> { List gets = new ArrayList<>(batch.size()); for (VariantPruneReportRecord record : batch) { @@ -130,7 +180,8 @@ private void checkReportedVariants(Path report, long count) throws IOException, if (record.getType() == VariantPruneReportRecord.Type.PARTIAL) { FilterList filter = new FilterList(FilterList.Operator.MUST_PASS_ONE); for (Integer study : record.getStudies()) { - filter.addFilter(new ColumnPrefixFilter(Bytes.toBytes(VariantPhoenixSchema.buildStudyColumnsPrefix(study)))); + filter.addFilter( + new ColumnPrefixFilter(Bytes.toBytes(VariantPhoenixSchema.buildStudyColumnsPrefix(study)))); } get.setFilter(filter); } @@ -142,15 +193,19 @@ private void checkReportedVariants(Path report, long count) throws IOException, Variant variant = VariantPhoenixKeyFactory.extractVariantFromVariantRowKey(result.getRow()); VariantPruneReportRecord record = batch.get(i); if (!variant.sameGenomicVariant(record.getVariant())) { - throw new IllegalStateException("Error checking report! Expected " + record.getVariant() + ", got " + variant); + throw new IllegalStateException("Error checking report! Expected " + + record.getVariant() + ", got " + variant); } progressLogger.increment(1, () -> "up to variant " + variant); - List columns = new ArrayList<>(result.rawCells().length); +// List columns = new ArrayList<>(result.rawCells().length); List sampleOrFileColumns = new ArrayList<>(result.rawCells().length); for (Cell cell : result.rawCells()) { - String column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - columns.add(column); + String column = Bytes.toString( + cell.getQualifierArray(), + cell.getQualifierOffset(), + cell.getQualifierLength()); +// columns.add(column); if (VariantPhoenixSchema.isSampleDataColumn(column) && VariantPhoenixSchema.isFileColumn(column)) { sampleOrFileColumns.add(column); } @@ -161,6 +216,7 @@ private void checkReportedVariants(Path report, long count) throws IOException, logger.warn("Variant : {}, prune type: {} , columns: {} , {}", variant, record.getType(), sampleOrFileColumns.size(), sampleOrFileColumns); + variantsWithProblems.incrementAndGet(); } } return batch; @@ -173,38 +229,44 @@ private void checkReportedVariants(Path report, long count) throws IOException, } catch (ExecutionException e) { throw new StorageEngineException("Error checking variant prune report", e); } -// while (batches.hasNext()) { -// List batch = batches.next(); -// List gets = new ArrayList<>(batch.size()); -// for (PruneReportRecord record : batch) { -// Get get = new Get(VariantPhoenixKeyFactory.generateVariantRowKey(record.variant)); -// if (record.type == PruneReportRecord.Type.PARTIAL) { -// FilterList filter = new FilterList(FilterList.Operator.MUST_PASS_ONE); -// for (Integer study : record.studies) { -// filter.addFilter(new ColumnPrefixFilter(Bytes.toBytes(VariantPhoenixSchema.buildStudyColumnsPrefix(study)))); -// } -// get.setFilter(filter); -// } -// gets.add(get); -// } -// Result[] get = table.get(gets); -// for (int i = 0; i < get.length; i++) { -// Result result = get[i]; -// Variant variant = VariantPhoenixKeyFactory.extractVariantFromVariantRowKey(result.getRow()); -// PruneReportRecord record = batch.get(i); -// if (!variant.sameGenomicVariant(record.variant)) { -// throw new IllegalStateException("Error checking report!"); -// } -// List columns = new ArrayList<>(result.rawCells().length); -// for (Cell cell : result.rawCells()) { -// String column = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); -// columns.add(column); -// } -// logger.info("Variant : {}, prune type: {} , columns: {} , {}", variant, record.type, columns.size(), columns); -// } -// -// } + if (variantsWithProblems.get() > 0) { + throw new StorageEngineException("Error validating variant prune report!" + + " Found " + variantsWithProblems.get() + " out of " + variantsToCheck + " checked variants with inconsistencies"); + } + } + } + + private void pruneFromSecondaryIndex(long count) throws StorageEngineException { + logger.info("Deleting {} variants from secondary index", count); + logger.info("In case of resuming operation, the total number of variants to remove could be larger."); + SecondaryIndexPrunePendingVariantsManager manager = new SecondaryIndexPrunePendingVariantsManager(engine.getDBAdaptor()); + + Task progressTask = new ProgressLogger("Prune variants from secondary index", count) + .asTask(variant -> "up to variant " + variant); + PendingVariantsDBCleaner cleaner = manager.cleaner(); + VariantSearchManager searchManager = engine.getVariantSearchManager(); + ParallelTaskRunner ptr = new ParallelTaskRunner<>( + manager.reader(new Query()), + progressTask, + new HadoopVariantSearchDataDeleter(engine.getDBName(), searchManager.getSolrClient(), cleaner), + ParallelTaskRunner.Config.builder().setNumTasks(1).setBatchSize(searchManager.getInsertBatchSize()).build()); + + try { + ptr.run(); + } catch (ExecutionException e) { + throw new StorageEngineException("Error checking variant prune report", e); + } + } + + private void updateSecondaryIndex(long count) throws StorageEngineException, IOException { + logger.info("Updating {} variants from secondary index", count); + logger.info("In case of resuming operation, the total number of variants to update could be larger."); + + try { + engine.secondaryIndex(); + } catch (VariantSearchException e) { + throw new StorageEngineException("Internal search index error", e); } } @@ -237,7 +299,8 @@ private List pre(boolean dryMode, boolean resume) throws StorageEn if (!dryMode) { for (Integer studyId : mm.getStudies().values()) { // Do not allow concurrent operations at all. - tasks.add(mm.addRunningTask(studyId, OPERATION_NAME, Collections.emptyList(), resume, TaskMetadata.Type.REMOVE, tm -> false)); + tasks.add(mm.addRunningTask(studyId, OPERATION_NAME, Collections.emptyList(), resume, + TaskMetadata.Type.REMOVE, tm -> false)); } } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneReportRecord.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneReportRecord.java index 4da568e9cf2..a8cc3cd5089 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneReportRecord.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneReportRecord.java @@ -6,7 +6,7 @@ import java.util.List; import java.util.stream.Collectors; -class VariantPruneReportRecord { +public class VariantPruneReportRecord { private final Variant variant; private final Type type; private final List studies; diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchDataDeleter.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchDataDeleter.java new file mode 100644 index 00000000000..d2b356f820e --- /dev/null +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchDataDeleter.java @@ -0,0 +1,87 @@ +package org.opencb.opencga.storage.hadoop.variant.search; + +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.opencb.biodata.models.variant.Variant; +import org.opencb.commons.io.DataWriter; +import org.opencb.opencga.storage.hadoop.variant.adaptors.VariantHadoopDBAdaptor; +import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixKeyFactory; +import org.opencb.opencga.storage.hadoop.variant.pending.PendingVariantsDBCleaner; +import org.opencb.opencga.storage.hadoop.variant.prune.SecondaryIndexPrunePendingVariantsManager; + +import java.io.IOException; +import java.util.*; + + +/** + * Created on 19/04/18. + * + * @author Jacobo Coll <jacobo167@gmail.com> + */ +public class HadoopVariantSearchDataDeleter implements DataWriter { + + private final String collection; + private final SolrClient solrClient; + private final PendingVariantsDBCleaner cleaner; + + public HadoopVariantSearchDataDeleter(String collection, SolrClient solrClient, VariantHadoopDBAdaptor dbAdaptor) { + this(collection, solrClient, new SecondaryIndexPrunePendingVariantsManager(dbAdaptor).cleaner()); + } + + public HadoopVariantSearchDataDeleter(String collection, SolrClient solrClient, PendingVariantsDBCleaner cleaner) { + this.collection = collection; + this.solrClient = solrClient; + this.cleaner = cleaner; + } + + @Override + public boolean write(List batch) { + if (batch.isEmpty()) { + return true; + } + + List variantRows = new ArrayList<>(batch.size()); + List variantIds = new ArrayList<>(batch.size()); + + for (Variant variant : batch) { + byte[] row = VariantPhoenixKeyFactory.generateVariantRowKey(variant); + variantRows.add(row); + variantIds.add(variant.toString()); + } + + try { + solrClient.deleteById(collection, variantIds); + solrClient.commit(collection); + cleaner.write(variantRows); + cleaner.flush(); + } catch (SolrServerException | IOException e) { + throw new RuntimeException(e); + } + + return true; + } + + @Override + public boolean open() { + cleaner.open(); + return true; + } + + @Override + public boolean pre() { + cleaner.pre(); + return true; + } + + @Override + public boolean post() { + cleaner.post(); + return true; + } + + @Override + public boolean close() { + cleaner.close(); + return true; + } +} diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchIndexUtils.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchIndexUtils.java index 6189f0bedc4..50a0a19ba89 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchIndexUtils.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/search/HadoopVariantSearchIndexUtils.java @@ -4,20 +4,14 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.phoenix.schema.types.PBoolean; -import org.apache.phoenix.schema.types.PIntegerArray; import org.apache.phoenix.schema.types.PhoenixArray; -import org.opencb.biodata.models.variant.Variant; import org.opencb.opencga.storage.core.variant.VariantStorageEngine; import org.opencb.opencga.storage.hadoop.variant.GenomeHelper; -import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.PhoenixHelper; -import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixKeyFactory; -import org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchema; import org.opencb.opencga.storage.hadoop.variant.converters.AbstractPhoenixConverter; import org.opencb.opencga.storage.hadoop.variant.converters.VariantRow; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import static org.opencb.opencga.storage.hadoop.variant.adaptors.phoenix.VariantPhoenixSchema.VariantColumn; diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManagerTest.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManagerTest.java similarity index 99% rename from opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManagerTest.java rename to opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManagerTest.java index 2b2c4ce11f4..92530100a86 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/HadoopVariantPruneManagerTest.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/test/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManagerTest.java @@ -29,7 +29,7 @@ import static org.junit.Assert.*; -public class HadoopVariantPruneManagerTest extends VariantStorageBaseTest implements HadoopVariantStorageTest { +public class VariantPruneManagerTest extends VariantStorageBaseTest implements HadoopVariantStorageTest { public static final String STUDY_NAME_3 = "study_3"; public static final String STUDY_NAME_4 = "study_4"; From b2c8c3eaa22c73d955c109fac683227b750b6eb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Mon, 27 Jun 2022 12:18:19 +0100 Subject: [PATCH 4/6] storage: Fix dry-run check. #TASK-1116 --- .../variant/operations/VariantPruneOperationTool.java | 2 +- .../opencga/core/models/variant/VariantPruneParams.java | 2 +- .../rest/operations/VariantOperationWebService.java | 2 +- .../hadoop/variant/prune/VariantPruneManager.java | 9 +++++---- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java index 5cac84f7eaf..faaee5e681a 100644 --- a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java +++ b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/operations/VariantPruneOperationTool.java @@ -9,7 +9,7 @@ type = Tool.Type.OPERATION, resource = Enums.Resource.VARIANT) public class VariantPruneOperationTool extends OperationTool { - public static final String DESCRIPTION = ""; + public static final String DESCRIPTION = "Prune orphan variants from studies in a project."; public static final String ID = "variant-prune"; @ToolParams diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/models/variant/VariantPruneParams.java b/opencga-core/src/main/java/org/opencb/opencga/core/models/variant/VariantPruneParams.java index 49caf4a55d9..ed0449871e6 100644 --- a/opencga-core/src/main/java/org/opencb/opencga/core/models/variant/VariantPruneParams.java +++ b/opencga-core/src/main/java/org/opencb/opencga/core/models/variant/VariantPruneParams.java @@ -4,7 +4,7 @@ public class VariantPruneParams extends ToolParams { - public static final String DESCRIPTION = ""; + public static final String DESCRIPTION = "Variant prune params. Use dry-run to just generate a report with the orphan variants."; private String project; private boolean dryRun; private boolean resume; diff --git a/opencga-server/src/main/java/org/opencb/opencga/server/rest/operations/VariantOperationWebService.java b/opencga-server/src/main/java/org/opencb/opencga/server/rest/operations/VariantOperationWebService.java index 2b7908ec42f..1520e309e87 100644 --- a/opencga-server/src/main/java/org/opencb/opencga/server/rest/operations/VariantOperationWebService.java +++ b/opencga-server/src/main/java/org/opencb/opencga/server/rest/operations/VariantOperationWebService.java @@ -439,7 +439,7 @@ public Response variantPrune( @ApiParam(value = ParamConstants.JOB_DEPENDS_ON_DESCRIPTION) @QueryParam(JOB_DEPENDS_ON) String dependsOn, @ApiParam(value = ParamConstants.JOB_TAGS_DESCRIPTION) @QueryParam(ParamConstants.JOB_TAGS) String jobTags, @ApiParam(value = VariantPruneParams.DESCRIPTION) VariantPruneParams params) { - return submitOperation(VariantPruneOperationTool.ID, params.getProject(), params, jobName, jobDescription, dependsOn, jobTags); + return submitOperationToProject(VariantPruneOperationTool.ID, params.getProject(), params, jobName, jobDescription, dependsOn, jobTags); } public Response submitOperation(String toolId, String study, ToolParams params, diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManager.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManager.java index 20b15b396f1..70104c377ac 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManager.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneManager.java @@ -123,7 +123,7 @@ private Map runPrune(boolean dryMode, URI o Collectors.counting())); } - long totalCount = countByType.values().stream().mapToLong(l -> l).count(); + long totalCount = countByType.values().stream().mapToLong(l -> l).sum(); if (dryMode) { logger.info("Found {} variants to prune, {}", totalCount, countByType); checkReportedVariants(report, totalCount); @@ -163,10 +163,11 @@ private void checkReportedVariants(Path report, long count) throws IOException, i -> { List records = new ArrayList<>(i); int skippedVariants = 0; - while (it.hasNext()) { + while (it.hasNext() && records.size() < i) { + VariantPruneReportRecord next = it.next(); if (skippedVariants == variantsToSkip) { skippedVariants = 0; - records.add(it.next()); + records.add(next); } else { skippedVariants++; } @@ -222,7 +223,7 @@ private void checkReportedVariants(Path report, long count) throws IOException, return batch; }, null, - ParallelTaskRunner.Config.builder().setBatchSize(batchSize).setNumTasks(8).build() + ParallelTaskRunner.Config.builder().setBatchSize(batchSize).setCapacity(2).setNumTasks(4).build() ); try { ptr.run(); From 7cb403018967f7fa6a1e389e51083500cccdb9ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Mon, 27 Jun 2022 13:08:00 +0100 Subject: [PATCH 5/6] storage: Add an internal validation to check that all variants have variant stats. #TASK-1116 --- .../hadoop/variant/prune/VariantPruneDriver.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java index b41e44e3b4a..aa3217ed2fe 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/prune/VariantPruneDriver.java @@ -15,7 +15,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; -import org.opencb.biodata.models.variant.StudyEntry; import org.opencb.biodata.models.variant.Variant; import org.opencb.biodata.models.variant.stats.VariantStats; import org.opencb.opencga.core.common.TimeUtils; @@ -40,6 +39,7 @@ import java.util.*; import java.util.stream.Collectors; +import static org.opencb.biodata.models.variant.StudyEntry.DEFAULT_COHORT; import static org.opencb.opencga.storage.hadoop.variant.GenomeHelper.COLUMN_FAMILY_BYTES; public class VariantPruneDriver extends AbstractVariantsTableDriver { @@ -103,7 +103,7 @@ protected Job setupJob(Job job, String archiveTable, String variantTable) throws VariantStorageMetadataManager metadataManager = getMetadataManager(); for (Map.Entry entry : metadataManager.getStudies().entrySet()) { Integer studyId = entry.getValue(); - Integer cohortId = metadataManager.getCohortId(studyId, StudyEntry.DEFAULT_COHORT); + Integer cohortId = metadataManager.getCohortId(studyId, DEFAULT_COHORT); scan.addColumn(COLUMN_FAMILY_BYTES, VariantPhoenixSchema.getStudyColumn(studyId).bytes()); scan.addColumn(COLUMN_FAMILY_BYTES, VariantPhoenixSchema.getStatsColumn(studyId, cohortId).bytes()); @@ -209,10 +209,12 @@ protected void map(ImmutableBytesWritable key, Result value, Context context) List emptyStudies = new ArrayList<>(); List studies = new ArrayList<>(); + List studiesWithStats = new ArrayList<>(); Variant variant = variantRow.walker() .onStudy(studies::add) .onCohortStats(c -> { + studiesWithStats.add(c.getStudyId()); VariantStats variantStats = c.toJava(); if (variantStats.getFileCount() == 0) { emptyStudies.add(c.getStudyId()); @@ -220,6 +222,10 @@ protected void map(ImmutableBytesWritable key, Result value, Context context) }) .walk(); + if (studies.size() != studiesWithStats.size() || !studies.containsAll(studiesWithStats)) { + throw new IllegalStateException("Variant stats for cohort " + DEFAULT_COHORT + " not found in variant " + variant); + } + context.getCounter(COUNTER_GROUP_NAME, "variants").increment(1); // It might happen that the variant has 0 studies, so emptyStudies is empty From 73908f5da4e507d8287190cb83d4df2c26f2f56b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacobo=20Coll=20Morag=C3=B3n?= Date: Mon, 27 Jun 2022 13:40:15 +0100 Subject: [PATCH 6/6] storage: Add SecondaryIndexPrunePendingVariants to PendingVariantsMain #TASK-1116 --- .../storage/hadoop/app/PendingVariantsMain.java | 10 +++++++++- .../variant/utils/HBaseVariantTableNameGenerator.java | 5 +++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/app/PendingVariantsMain.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/app/PendingVariantsMain.java index 683c2bd3cb9..be3239203b9 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/app/PendingVariantsMain.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/app/PendingVariantsMain.java @@ -13,6 +13,7 @@ import org.opencb.opencga.storage.hadoop.variant.adaptors.VariantHadoopDBAdaptor; import org.opencb.opencga.storage.hadoop.variant.annotation.pending.AnnotationPendingVariantsManager; import org.opencb.opencga.storage.hadoop.variant.pending.PendingVariantsManager; +import org.opencb.opencga.storage.hadoop.variant.prune.SecondaryIndexPrunePendingVariantsManager; import org.opencb.opencga.storage.hadoop.variant.search.SecondaryIndexPendingVariantsManager; import org.opencb.opencga.storage.hadoop.variant.utils.HBaseVariantTableNameGenerator; @@ -57,7 +58,8 @@ public void run(String[] args) throws Exception { .stream() .map(TableName::getNameAsString) .filter(t -> HBaseVariantTableNameGenerator.isValidPendingSecondaryIndexTableName(t) - || HBaseVariantTableNameGenerator.isValidPendingAnnotationTableName(t)) + || HBaseVariantTableNameGenerator.isValidPendingAnnotationTableName(t) + || HBaseVariantTableNameGenerator.isValidPendingSecondaryIndexPruneTableName(t)) ); break; } @@ -117,6 +119,12 @@ private PendingVariantsManager getPendingVariantsManager(HBaseManager hBaseManag System.err.println("Detect SecondaryIndexPendingVariants table"); return new SecondaryIndexPendingVariantsManager( new VariantHadoopDBAdaptor(hBaseManager, hBaseManager.getConf(), tableNameGenerator, new ObjectMap())); + } else if (HBaseVariantTableNameGenerator.isValidPendingSecondaryIndexPruneTableName(table)) { + String dbName = HBaseVariantTableNameGenerator.getDBNameFromPendingSecondaryIndexPruneTableName(table); + HBaseVariantTableNameGenerator tableNameGenerator = new HBaseVariantTableNameGenerator(dbName, hBaseManager.getConf()); + System.err.println("Detect SecondaryIndexPendingPruneVariants table"); + return new SecondaryIndexPrunePendingVariantsManager( + new VariantHadoopDBAdaptor(hBaseManager, hBaseManager.getConf(), tableNameGenerator, new ObjectMap())); } else { throw new IllegalArgumentIOException("Table '" + table + "' is not a pendig vairants table"); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/utils/HBaseVariantTableNameGenerator.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/utils/HBaseVariantTableNameGenerator.java index 9db6184cea9..7420070878d 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/utils/HBaseVariantTableNameGenerator.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/utils/HBaseVariantTableNameGenerator.java @@ -150,6 +150,11 @@ public static boolean isValidPendingSecondaryIndexPruneTableName(String tableNam return validSuffix(tableName, PENDING_SECONDARY_INDEX_PRUNE_SUFIX); } + public static String getDBNameFromPendingSecondaryIndexPruneTableName(String tableName) { + checkValidPendingSecondaryIndexPruneTableName(tableName); + return tableName.substring(0, tableName.length() - PENDING_SECONDARY_INDEX_PRUNE_SUFIX.length()); + } + public static String getDBNameFromMetaTableName(String metaTableName) { checkValidMetaTableName(metaTableName); return metaTableName.substring(0, metaTableName.length() - META_SUFIX.length());