Skip to content

Commit

Permalink
Merge pull request #2016 from opencb/TASK-1070
Browse files Browse the repository at this point in the history
TASK-1070
  • Loading branch information
imedina authored Jun 28, 2022
2 parents cf9e4a8 + 73908f5 commit 4f00b3e
Show file tree
Hide file tree
Showing 34 changed files with 1,507 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.opencb.opencga.core.models.sample.SampleAclEntry;
import org.opencb.opencga.core.models.study.Study;
import org.opencb.opencga.core.models.study.StudyAclEntry;
import org.opencb.opencga.core.models.variant.VariantPruneParams;
import org.opencb.opencga.core.response.OpenCGAResult;
import org.opencb.opencga.core.response.VariantQueryResult;
import org.opencb.opencga.core.tools.ToolParams;
Expand Down Expand Up @@ -1086,6 +1087,13 @@ public boolean exists(String study, String token) throws StorageEngineException,
return engine.getMetadataManager().studyExists(studyFqn);
}

public void variantPrune(String project, URI outdir, VariantPruneParams params, String token) throws StorageEngineException, CatalogException {
secureOperationByProject(VariantPruneOperationTool.ID, project, new ObjectMap(), token, engine -> {
engine.variantsPrune(params.isDryRun(), params.isResume(), outdir);
return null;
});
}

// Permission related methods

private interface VariantReadOperation<R> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.opencb.opencga.analysis.variant.operations;

import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.models.variant.VariantPruneParams;
import org.opencb.opencga.core.tools.annotations.Tool;
import org.opencb.opencga.core.tools.annotations.ToolParams;

@Tool(id = VariantPruneOperationTool.ID, description = VariantPruneOperationTool.DESCRIPTION,
type = Tool.Type.OPERATION, resource = Enums.Resource.VARIANT)
public class VariantPruneOperationTool extends OperationTool {

public static final String DESCRIPTION = "Prune orphan variants from studies in a project.";
public static final String ID = "variant-prune";

@ToolParams
protected VariantPruneParams params = new VariantPruneParams();

@Override
protected void run() throws Exception {

step(() -> {
getVariantStorageManager().variantPrune(params.getProject(), getOutDir().toUri(), params, getToken());
});


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@ public static StringBuilder prettyExceptionMessage(Throwable exception, StringBu

public static StringBuilder prettyExceptionMessage(Throwable exception, StringBuilder message, boolean multiline,
boolean includeClassName) {
String separator;
if (multiline) {
separator = "\n";
} else {
separator = " ; ";
}
return prettyExceptionMessage(exception, message, multiline, includeClassName, separator);
}

private static StringBuilder prettyExceptionMessage(Throwable exception, StringBuilder message, boolean multiline,
boolean includeClassName, String separator) {
Set<String> messages = new HashSet<>();
do {
if (exception.getMessage() != null && !messages.add(exception.getMessage())) {
if (exception.getMessage() != null && !messages.add(exception.getMessage())
&& exception.getSuppressed().length == 0) {
// Duplicated message. Skip this cause
exception = exception.getCause();
continue;
}
if (message.length() != 0) {
if (multiline) {
message.append("\n");
} else {
message.append(" ; ");
}
message.append(separator);
}
String exMessage = exception.getMessage();
if (StringUtils.isBlank(exMessage)) {
Expand All @@ -49,6 +57,26 @@ public static StringBuilder prettyExceptionMessage(Throwable exception, StringBu
message.append("[").append(exception.getClass().getSimpleName()).append("] ");
}
message.append(exMessage);
if (exception.getSuppressed().length > 0) {
StringBuilder sb = new StringBuilder();
String intraSeparator = multiline ? separator + " " : separator;
for (Throwable suppressed : exception.getSuppressed()) {
prettyExceptionMessage(suppressed, sb, multiline, includeClassName, intraSeparator);
}
message.append(" <suppressed(");
if (multiline) {
message.append(intraSeparator);
} else {
message.append(" ");
}
message.append(sb);
if (multiline) {
message.append(separator);
} else {
message.append(" ");
}
message.append(")>");
}

exception = exception.getCause();
} while (exception != null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.opencb.opencga.core.models.variant;

import org.opencb.opencga.core.tools.ToolParams;

public class VariantPruneParams extends ToolParams {

public static final String DESCRIPTION = "Variant prune params. Use dry-run to just generate a report with the orphan variants.";
private String project;
private boolean dryRun;
private boolean resume;


public String getProject() {
return project;
}

public VariantPruneParams setProject(String project) {
this.project = project;
return this;
}

public boolean isDryRun() {
return dryRun;
}

public VariantPruneParams setDryRun(boolean dryRun) {
this.dryRun = dryRun;
return this;
}

public boolean isResume() {
return resume;
}

public VariantPruneParams setResume(boolean resume) {
this.resume = resume;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,18 @@ public Response julie(
return submitOperationToProject(JulieTool.ID, project, params, jobName, jobDescription, dependsOn, jobTags);
}

@POST
@Path("/variant/prune")
@ApiOperation(value = VariantPruneOperationTool.DESCRIPTION, response = Job.class)
public Response variantPrune(
@ApiParam(value = ParamConstants.JOB_ID_CREATION_DESCRIPTION) @QueryParam(ParamConstants.JOB_ID) String jobName,
@ApiParam(value = ParamConstants.JOB_DESCRIPTION_DESCRIPTION) @QueryParam(ParamConstants.JOB_DESCRIPTION) String jobDescription,
@ApiParam(value = ParamConstants.JOB_DEPENDS_ON_DESCRIPTION) @QueryParam(JOB_DEPENDS_ON) String dependsOn,
@ApiParam(value = ParamConstants.JOB_TAGS_DESCRIPTION) @QueryParam(ParamConstants.JOB_TAGS) String jobTags,
@ApiParam(value = VariantPruneParams.DESCRIPTION) VariantPruneParams params) {
return submitOperationToProject(VariantPruneOperationTool.ID, params.getProject(), params, jobName, jobDescription, dependsOn, jobTags);
}

public Response submitOperation(String toolId, String study, ToolParams params,
String jobName, String jobDescription, String jobDependsOn, String jobTags) {
return submitOperation(toolId, null, study, params, jobName, jobDescription, jobDependsOn, jobTags);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1252,13 +1252,13 @@ public void unsecureUpdateTask(int studyId, TaskMetadata task) throws StorageEng
taskDBAdaptor.updateTask(studyId, task, null);
}

public <E extends Exception> TaskMetadata updateTask(int studyId, int taskId, UpdateFunction<TaskMetadata, E> update)
public <E extends Exception> TaskMetadata updateTask(int studyId, int taskId, UpdateConsumer<TaskMetadata, E> consumer)
throws E, StorageEngineException {
getTask(studyId, taskId); // Check task exists
Lock lock = taskDBAdaptor.lock(studyId, taskId, lockDuration, lockTimeout);
try {
TaskMetadata task = getTask(studyId, taskId);
task = update.update(task);
consumer.update(task);
lock.checkLocked();
unsecureUpdateTask(studyId, task);
return task;
Expand Down Expand Up @@ -1732,7 +1732,6 @@ public TaskMetadata.Status setStatus(int studyId, int taskId, TaskMetadata.Statu
updateTask(studyId, taskId, task -> {
previousStatus.set(task.currentStatus());
task.addStatus(Calendar.getInstance().getTime(), status);
return task;
});
return previousStatus.get();
}
Expand Down Expand Up @@ -1765,13 +1764,7 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName, List<In
return addRunningTask(studyId, jobOperationName, fileIds, resume, type, b -> false);
}
/**
* Adds a new {@link TaskMetadata} to the Study Metadata.
*
* Allow execute concurrent operations depending on the "allowConcurrent" predicate.
* If any operation is in ERROR, is not the same operation, and concurrency is not allowed,
* throw {@link StorageEngineException#otherOperationInProgressException}
* If any operation is DONE, RUNNING, is same operation and resume=true, continue
* If all operations are ready, continue
* Find out if the task attempt can be executed, and if it should be a new variant, of continue executing a previous task.
*
* @param studyId Study id
* @param jobOperationName Job operation name used to create the jobName and as {@link TaskMetadata#getOperationName()}
Expand All @@ -1783,11 +1776,9 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName, List<In
* @return The current batchOperation
* @throws StorageEngineException if the operation can't be executed
*/
public TaskMetadata addRunningTask(int studyId, String jobOperationName,
List<Integer> fileIds, boolean resume, TaskMetadata.Type type,
Predicate<TaskMetadata> allowConcurrent)
private TaskMetadata getRunningTaskCompatibleOrFail(int studyId, String jobOperationName, List<Integer> fileIds, boolean resume,
TaskMetadata.Type type, Predicate<TaskMetadata> allowConcurrent)
throws StorageEngineException {

TaskMetadata resumeTask = null;
Iterator<TaskMetadata> iterator = taskIterator(studyId, Arrays.asList(
TaskMetadata.Status.DONE,
Expand Down Expand Up @@ -1832,6 +1823,33 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName,
throw new IllegalArgumentException("Unknown Status " + currentStatus);
}
}
return resumeTask;
}

/**
* Adds a new {@link TaskMetadata} to the Study Metadata.
*
* Allow execute concurrent operations depending on the "allowConcurrent" predicate.
* If any operation is in ERROR, is not the same operation, and concurrency is not allowed,
* throw {@link StorageEngineException#otherOperationInProgressException}
* If any operation is DONE, RUNNING, is same operation and resume=true, continue
* If all operations are ready, continue
*
* @param studyId Study id
* @param jobOperationName Job operation name used to create the jobName and as {@link TaskMetadata#getOperationName()}
* @param fileIds Files to be processed in this batch.
* @param resume Resume operation. Assume that previous operation went wrong.
* @param type Operation type as {@link TaskMetadata.Type}
* @param allowConcurrent Predicate to test if the new operation can be executed at the same time as a non ready operation.
* If not, throws {@link StorageEngineException#otherOperationInProgressException}
* @return The current batchOperation
* @throws StorageEngineException if the operation can't be executed
*/
public TaskMetadata addRunningTask(int studyId, String jobOperationName,
List<Integer> fileIds, boolean resume, TaskMetadata.Type type,
Predicate<TaskMetadata> allowConcurrent)
throws StorageEngineException {
TaskMetadata resumeTask = getRunningTaskCompatibleOrFail(studyId, jobOperationName, fileIds, resume, type, allowConcurrent);

TaskMetadata task;
if (resumeTask == null) {
Expand All @@ -1848,14 +1866,34 @@ public TaskMetadata addRunningTask(int studyId, String jobOperationName,
throw new StorageEngineException("Attempt to execute a concurrent modification of task " + thisTask.getName()
+ " (" + thisTask.getId() + ") ");
} else {
return thisTask.addStatus(Calendar.getInstance().getTime(), TaskMetadata.Status.RUNNING);
thisTask.addStatus(Calendar.getInstance().getTime(), TaskMetadata.Status.RUNNING);
}
});
}
}
return task;
}


/**
* Check if a task can be executed.
*
* @param studyId Study id
* @param jobOperationName Job operation name used to create the jobName and as {@link TaskMetadata#getOperationName()}
* @param fileIds Files to be processed in this batch.
* @param resume Resume operation. Assume that previous operation went wrong.
* @param type Operation type as {@link TaskMetadata.Type}
* @param allowConcurrent Predicate to test if the new operation can be executed at the same time as a non ready operation.
* If not, throws {@link StorageEngineException#otherOperationInProgressException}
* @throws StorageEngineException if the operation can't be executed
*/
public void checkTaskCanRun(int studyId, String jobOperationName, List<Integer> fileIds, boolean resume,
TaskMetadata.Type type, Predicate<TaskMetadata> allowConcurrent)
throws StorageEngineException {
getRunningTaskCompatibleOrFail(studyId, jobOperationName, fileIds, resume, type, allowConcurrent);
}


protected void checkName(final String type, String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException(type + " can not be empty!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,18 @@ protected void postRemoveFiles(String study, List<Integer> fileIds, List<Integer
*/
public abstract void removeStudy(String study, URI outdir) throws StorageEngineException;

/**
* Removes samples from the Variant Storage.
*
* @param dryMode Dry mode. Do not execute deletes
* @param resume Resume previously failed execution
* @param outdir Output directory
* @throws StorageEngineException If the samples can not be removed or there was some problem deleting it.
*/
public void variantsPrune(boolean dryMode, boolean resume, URI outdir) throws StorageEngineException {
throw new UnsupportedOperationException("Unsupported variant prune operation at storage engine " + getStorageEngineId());
}

public abstract void loadVariantScore(URI scoreFile, String study, String scoreName, String cohort1, String cohort2,
VariantScoreFormatDescriptor descriptor, ObjectMap options)
throws StorageEngineException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opencb.commons.datastore.solr.FacetQueryParser;
import org.opencb.commons.datastore.solr.SolrCollection;
import org.opencb.commons.datastore.solr.SolrManager;
import org.opencb.commons.io.DataWriter;
import org.opencb.commons.run.ParallelTaskRunner;
import org.opencb.commons.utils.ListUtils;
import org.opencb.opencga.core.common.TimeUtils;
Expand All @@ -57,6 +58,7 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;


/**
Expand Down Expand Up @@ -508,6 +510,19 @@ public void close() throws IOException {
solrManager.close();
}

public DataWriter<Variant> getVariantDeleter(String collection) {
return list -> {
try {
if (list != null) {
delete(collection, list.stream().map(Variant::toString).collect(Collectors.toList()));
}
} catch (IOException | SolrServerException e) {
throw new RuntimeException(e);
}
return true;
};
}

/*-------------------------------------
* P R I V A T E M E T H O D S
-------------------------------------*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public abstract class VariantStorageBaseTest extends GenericTest implements Vari
public static final int NUM_VARIANTS = 9792;
@Deprecated public static final int STUDY_ID = 1;
public static final String STUDY_NAME = "1000g";
public static final String STUDY_NAME_1 = STUDY_NAME;
public static final String STUDY_NAME_2 = "study_2";
public static final String DB_NAME = "opencga_variants_test";
@Deprecated public static final int FILE_ID = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opencb.opencga.storage.hadoop.variant.adaptors.VariantHadoopDBAdaptor;
import org.opencb.opencga.storage.hadoop.variant.annotation.pending.AnnotationPendingVariantsManager;
import org.opencb.opencga.storage.hadoop.variant.pending.PendingVariantsManager;
import org.opencb.opencga.storage.hadoop.variant.prune.SecondaryIndexPrunePendingVariantsManager;
import org.opencb.opencga.storage.hadoop.variant.search.SecondaryIndexPendingVariantsManager;
import org.opencb.opencga.storage.hadoop.variant.utils.HBaseVariantTableNameGenerator;

Expand Down Expand Up @@ -57,7 +58,8 @@ public void run(String[] args) throws Exception {
.stream()
.map(TableName::getNameAsString)
.filter(t -> HBaseVariantTableNameGenerator.isValidPendingSecondaryIndexTableName(t)
|| HBaseVariantTableNameGenerator.isValidPendingAnnotationTableName(t))
|| HBaseVariantTableNameGenerator.isValidPendingAnnotationTableName(t)
|| HBaseVariantTableNameGenerator.isValidPendingSecondaryIndexPruneTableName(t))
);
break;
}
Expand Down Expand Up @@ -117,6 +119,12 @@ private PendingVariantsManager getPendingVariantsManager(HBaseManager hBaseManag
System.err.println("Detect SecondaryIndexPendingVariants table");
return new SecondaryIndexPendingVariantsManager(
new VariantHadoopDBAdaptor(hBaseManager, hBaseManager.getConf(), tableNameGenerator, new ObjectMap()));
} else if (HBaseVariantTableNameGenerator.isValidPendingSecondaryIndexPruneTableName(table)) {
String dbName = HBaseVariantTableNameGenerator.getDBNameFromPendingSecondaryIndexPruneTableName(table);
HBaseVariantTableNameGenerator tableNameGenerator = new HBaseVariantTableNameGenerator(dbName, hBaseManager.getConf());
System.err.println("Detect SecondaryIndexPendingPruneVariants table");
return new SecondaryIndexPrunePendingVariantsManager(
new VariantHadoopDBAdaptor(hBaseManager, hBaseManager.getConf(), tableNameGenerator, new ObjectMap()));
} else {
throw new IllegalArgumentIOException("Table '" + table + "' is not a pendig vairants table");
}
Expand Down
Loading

0 comments on commit 4f00b3e

Please sign in to comment.