Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK-6219 - Automatic Variant Secondary Index #2495

Open
wants to merge 23 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
744de3c
core: add new operations configuration, #TASK-6219
pfurio Jun 27, 2024
6d7bcfb
master: first draft of the VariantOperationManager, #TASK-6219
pfurio Jun 28, 2024
b79ef94
core: add new operation index data models, #TASK-6219
pfurio Jul 1, 2024
5abee26
master: implement VariantOperationOrchestrator, #TASK-6219
pfurio Jul 1, 2024
eef6834
master: retry secondary indexes, #TASK-6219
pfurio Jul 2, 2024
11492e6
core: add DataField descriptions, #TASK-6219
pfurio Jul 2, 2024
95cacb3
Merge branch 'develop' into TASK-6219
pfurio Jul 3, 2024
b2cbb62
app: add migration, #TASK-6219
pfurio Jul 3, 2024
023997d
master: add Orchestrator test, #TASK-6219
pfurio Jul 3, 2024
ddc9ae2
master: use new VariantOperationOrchestrator, #TASK-6219
pfurio Jul 8, 2024
b7cad6f
master: rename Orchestrator for Janitor, #TASK-6219
pfurio Jul 8, 2024
d6074fc
master: Clean variables. #TASK-6219
j-coll Jul 10, 2024
9616d1e
storage: Add status to ProjectMetadata for variant-annotation and var…
j-coll Aug 14, 2024
0a765b7
analysis: Synchronize to catalog annotation and secondary annotation …
j-coll Aug 14, 2024
edb1c04
storage: Synchronize secondary sample index changes. #TASK-6219
j-coll Aug 16, 2024
ca80b71
master: VariantOperationJanitor extends MonitorParentDaemon. #TASK-6219
j-coll Aug 16, 2024
c1e5371
core: add WEEKEND policy, #TASK-6219
pfurio Nov 12, 2024
022df7e
master: change janitor interval to 30s, #TASK-6219
pfurio Nov 12, 2024
4d6e572
Merge branch 'develop' into TASK-6219
j-coll Dec 19, 2024
d7e93bb
master: Fix conflicts. #TASK-6219
j-coll Oct 16, 2024
c1ab887
storage: Fix NPE on missing PM at updateVariantIndexTimestamp. #TASK-…
j-coll Dec 19, 2024
e93d1e3
Merge branch 'develop' into TASK-6219
pfurio Jan 23, 2025
f221632
core: write default configuration in configuration.yml file, #TASK-6219
pfurio Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package org.opencb.opencga.analysis.tools;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.opencb.opencga.core.config.Analysis;
import org.opencb.opencga.core.exceptions.ToolException;
import org.opencb.opencga.core.tools.annotations.Tool;
import org.reflections.Reflections;
Expand All @@ -39,6 +41,19 @@ public class ToolFactory {

public static final String DEFAULT_PACKAGE = "org.opencb.opencga";

// public ToolFactory(Analysis analysisConf) {
//
// }

private static void loadTools(Analysis analysisConf) {
if (analysisConf != null
&& CollectionUtils.isNotEmpty(analysisConf.getPackages())) {
loadTools(analysisConf.getPackages());
} else {
loadTools(Collections.singletonList(DEFAULT_PACKAGE));
}
}

private static synchronized Map<String, Class<? extends OpenCgaTool>> loadTools(List<String> packages) {
if (toolsCache == null) {
Reflections reflections = new Reflections(new ConfigurationBuilder()
Expand Down Expand Up @@ -137,6 +152,10 @@ public final Class<? extends OpenCgaTool> getToolClass(String toolId, List<Strin
return aClass;
}

@Deprecated
/*
* Should use the version with "packages" list
*/
public Tool getTool(String toolId) throws ToolException {
return getTool(toolId, Collections.singletonList(DEFAULT_PACKAGE));
}
Expand Down Expand Up @@ -165,13 +184,8 @@ public final OpenCgaTool createTool(Class<? extends OpenCgaTool> aClass) throws
}
}

public Collection<Class<? extends OpenCgaTool>> getTools() {
loadTools(Collections.singletonList(DEFAULT_PACKAGE));
return toolsList;
}

public Collection<Class<? extends OpenCgaTool>> getTools(List<String> packages) {
loadTools(packages);
public Collection<Class<? extends OpenCgaTool>> getTools(Analysis analysisConf) {
loadTools(analysisConf);
return toolsList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public VariantSearchLoadResult secondaryAnnotationIndex(String project, String r
Query inputQuery = new Query();
inputQuery.putIfNotEmpty(VariantQueryParam.REGION.key(), region);
VariantSearchLoadResult result = engine.secondaryIndex(inputQuery, new QueryOptions(params), overwrite);
getSynchronizer(engine).synchronizeCatalogFromStorage(token);
getSynchronizer(engine).synchronizeCatalogFromStorage(project, null, token);
return result;
});
}
Expand Down Expand Up @@ -284,8 +284,7 @@ public void annotate(String projectStr, List<String> studies, String region, boo
public void saveAnnotation(String project, String annotationName, ObjectMap params, String token)
throws CatalogException, StorageEngineException {
secureOperationByProject(VariantAnnotationSaveOperationTool.ID, project, params, token, engine -> {
CatalogStorageMetadataSynchronizer
.updateProjectMetadata(catalogManager, engine.getMetadataManager(), project, token);
getSynchronizer(engine).synchronizeProjectMetadataFromCatalog(project, token);
engine.saveAnnotation(annotationName, params);
return null;
});
Expand All @@ -294,8 +293,7 @@ public void saveAnnotation(String project, String annotationName, ObjectMap para
public void deleteAnnotation(String project, String annotationName, ObjectMap params, String token)
throws CatalogException, StorageEngineException {
secureOperationByProject(VariantAnnotationDeleteOperationTool.ID, project, params, token, engine -> {
CatalogStorageMetadataSynchronizer
.updateProjectMetadata(catalogManager, engine.getMetadataManager(), project, token);
getSynchronizer(engine).synchronizeProjectMetadataFromCatalog(project, token);
engine.deleteAnnotation(annotationName, params);
return null;
});
Expand Down Expand Up @@ -559,9 +557,9 @@ public OpenCGAResult<Job> configureSampleIndex(String studyStr, SampleIndexConfi
sampleIndexConfiguration.validate(version);
String studyFqn = getStudyFqn(studyStr, token);
engine.getMetadataManager().addSampleIndexConfiguration(studyFqn, sampleIndexConfiguration, true);

getSynchronizer(engine).synchronizeCatalogProjectFromStorageByStudy(studyFqn, token);
catalogManager.getStudyManager()
.setVariantEngineConfigurationSampleIndex(studyStr, sampleIndexConfiguration, token);
.setVariantEngineConfigurationSampleIndex(studyFqn, sampleIndexConfiguration, token);
if (skipRebuild) {
return new OpenCGAResult<>(0, new ArrayList<>(), 0, new ArrayList<>(), 0);
} else {
Expand Down Expand Up @@ -594,6 +592,7 @@ public OpenCGAResult<Job> setCellbaseConfiguration(String project, CellBaseConfi
.append("cellbaseConfiguration", cellbaseConfiguration)
.append("annotate", annotate)
.append("annotationSaveId", annotationSaveId), token, engine -> {
String projectFqn = getProjectFqn(project, token);
OpenCGAResult<Job> result = new OpenCGAResult<>();
result.setResultType(Job.class.getCanonicalName());
result.setResults(new ArrayList<>());
Expand All @@ -611,6 +610,9 @@ public OpenCGAResult<Job> setCellbaseConfiguration(String project, CellBaseConfi
engine.reloadCellbaseConfiguration();

if (engine.getMetadataManager().exists()) {
engine.getMetadataManager().invalidateCurrentVariantAnnotationIndex();
logger.info("Invalidating current variant annotation index on project '{}'", projectFqn);
getSynchronizer(engine).synchronizeCatalogProjectFromStorage(projectFqn, token);
List<String> jobDependsOn = new ArrayList<>(1);
if (StringUtils.isNotEmpty(annotationSaveId)) {
VariantAnnotationSaveParams params = new VariantAnnotationSaveParams(annotationSaveId);
Expand Down Expand Up @@ -1133,7 +1135,7 @@ public boolean synchronizeCatalogStudyFromStorage(String study, String token)
String studySqn = getStudyFqn(study, token);
return secureOperation("synchronizeCatalogStudyFromStorage", studySqn, new ObjectMap(), token, engine -> {
CatalogStorageMetadataSynchronizer synchronizer = getSynchronizer(engine);
return synchronizer.synchronizeCatalogStudyFromStorage(studySqn, token);
return synchronizer.synchronizeCatalogFromStorage(studySqn, token);
});
}

Expand All @@ -1146,7 +1148,7 @@ public boolean synchronizeCatalogStudyFromStorage(String study, List<String> fil
return secureOperation("synchronizeCatalogStudyFromStorage", studySqn, new ObjectMap(), token, engine -> {
List<File> filesFromCatalog = catalogManager.getFileManager()
.get(studySqn, files, FILE_GET_QUERY_OPTIONS, token).getResults();
return getSynchronizer(engine).synchronizeCatalogFilesFromStorage(studySqn, filesFromCatalog, token);
return getSynchronizer(engine).synchronizeCatalogFromStorage(studySqn, filesFromCatalog, false, token);
});
}

Expand Down Expand Up @@ -1666,6 +1668,10 @@ private String getStudyFqn(String study, String token) throws CatalogException {
return catalogManager.getStudyManager().get(study, StudyManager.INCLUDE_STUDY_IDS, token).first().getFqn();
}

private String getProjectFqn(String projectStr, String token) throws CatalogException {
return getProjectFqn(projectStr, ((List<String>) null), token);
}

private String getProjectFqn(String projectStr, String study, String token) throws CatalogException {
return getProjectFqn(projectStr, Arrays.asList(StringUtils.split(study, ",")), token);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,21 @@ public OperationManager(VariantStorageManager variantStorageManager, VariantStor
this.variantStorageEngine = variantStorageEngine;
}

public final StudyMetadata synchronizeCatalogStudyFromStorage(String study, String token)
throws CatalogException, StorageEngineException {
return synchronizeCatalogStudyFromStorage(study, token, false);
}

public final StudyMetadata synchronizeCatalogStudyFromStorage(String study, String token, boolean failIfNotExist)
throws CatalogException, StorageEngineException {
VariantStorageMetadataManager metadataManager = variantStorageEngine.getMetadataManager();
CatalogStorageMetadataSynchronizer metadataSynchronizer
= new CatalogStorageMetadataSynchronizer(catalogManager, metadataManager);

StudyMetadata studyMetadata = metadataManager.getStudyMetadata(study);
if (studyMetadata == null) {
if (!metadataManager.studyExists(study)) {
if (failIfNotExist) {
throw new CatalogException("Study '" + study + "' does not exist on the VariantStorage");
}
} else {
// Update Catalog file and cohort status.
metadataSynchronizer.synchronizeCatalogStudyFromStorage(studyMetadata, token);
metadataSynchronizer.synchronizeCatalogFromStorage(study, token);
}
return studyMetadata;
return metadataManager.getStudyMetadata(study);
}

protected final String getStudyFqn(String study, String token) throws CatalogException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.core.common.TimeUtils;
import org.opencb.opencga.core.models.file.File;
import org.opencb.opencga.core.models.project.Project;
import org.opencb.opencga.core.models.project.ProjectOrganism;
import org.opencb.opencga.storage.core.exceptions.StorageEngineException;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryParam;
Expand Down Expand Up @@ -96,7 +94,7 @@ private void annotate(String projectStr, List<String> studies, String loadFileSt
if (StringUtils.isEmpty(loadFileStr)) {
variantStorageEngine.annotate(outdir.toUri(), annotationQuery, annotationOptions);
new CatalogStorageMetadataSynchronizer(catalogManager, variantStorageEngine.getMetadataManager())
.synchronizeCatalogFromStorage(token);
.synchronizeCatalogFromStorage(projectStr, null, token);
} else {
Path loadFilePath = Paths.get(loadFileStr);
boolean fileExists = Files.exists(loadFilePath);
Expand All @@ -123,11 +121,8 @@ private void annotate(String projectStr, List<String> studies, String loadFileSt
}

private void synchronizeProjectMetadata(String projectStr, String token) throws CatalogException, StorageEngineException {
Project project = catalogManager.getProjectManager().get(projectStr, QueryOptions.empty(), token).first();
ProjectOrganism organism = project.getOrganism();
int currentRelease = project.getCurrentRelease();
CatalogStorageMetadataSynchronizer.updateProjectMetadata(variantStorageEngine.getMetadataManager(), organism, currentRelease,
project.getCellbase());
new CatalogStorageMetadataSynchronizer(catalogManager, variantStorageEngine.getMetadataManager())
.synchronizeProjectMetadataFromCatalog(projectStr, token);
}

private String buildOutputFileName(String alias, String region) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public class VariantFileIndexerOperationManager extends OperationManager {
private URI outDirUri;
private int release;
private List<File> filesToIndex;
private CatalogStorageMetadataSynchronizer synchronizer;
private CatalogStorageMetadataSynchronizer _synchronizer;
private boolean fullSynchronize = false;
private boolean force;

Expand All @@ -119,7 +119,7 @@ public List<StoragePipelineResult> index(String study, List<String> files, URI o
updateProject(studyFqn, token);

List<URI> fileUris = findFilesToIndex(params, token);
if (fileUris.size() == 0) {
if (fileUris.isEmpty()) {
logger.warn("Nothing to do.");
return Collections.emptyList();
}
Expand All @@ -130,6 +130,14 @@ public List<StoragePipelineResult> index(String study, List<String> files, URI o

private void check(String study, ObjectMap params, String token) throws Exception {
studyFqn = getStudyFqn(study, token);
String projectFqn = catalogManager.getStudyManager().getProjectFqn(studyFqn);

Project project = catalogManager
.getProjectManager()
.get(projectFqn,
new QueryOptions(QueryOptions.INCLUDE, Collections.singletonList(CURRENT_RELEASE.key())),
token).first();
release = project.getCurrentRelease();

JwtPayload jwtPayload = new JwtPayload(token);
CatalogFqn catalogFqn = CatalogFqn.extractFqnFromStudy(studyFqn, jwtPayload);
Expand Down Expand Up @@ -169,16 +177,9 @@ private void check(String study, ObjectMap params, String token) throws Exceptio

private void updateProject(String studyFqn, String token) throws CatalogException, StorageEngineException {
String projectFqn = catalogManager.getStudyManager().getProjectFqn(studyFqn);
Project project = catalogManager
.getProjectManager()
.get(projectFqn,
new QueryOptions(QueryOptions.INCLUDE, Arrays.asList(CURRENT_RELEASE.key(), ORGANISM.key(), CELLBASE.key())),
token).first();
release = project.getCurrentRelease();

// Add species, assembly and release
CatalogStorageMetadataSynchronizer.updateProjectMetadata(variantStorageEngine.getMetadataManager(), project.getOrganism(), release,
project.getCellbase());
getSynchronizer().synchronizeProjectMetadataFromCatalog(projectFqn, token);
}

/**
Expand All @@ -192,8 +193,6 @@ private void updateProject(String studyFqn, String token) throws CatalogExceptio
* @throws StorageEngineException
*/
private List<URI> findFilesToIndex(ObjectMap params, String token) throws CatalogException, URISyntaxException, StorageEngineException {
synchronizer = new CatalogStorageMetadataSynchronizer(catalogManager, variantStorageEngine.getMetadataManager());

List<File> inputFiles = new ArrayList<>();
for (String file : files) {
File inputFile = catalogManager.getFileManager().get(studyFqn, file, FILE_GET_QUERY_OPTIONS, token).first();
Expand Down Expand Up @@ -223,7 +222,7 @@ private List<URI> findFilesToIndex(ObjectMap params, String token) throws Catalo
}

// Update Catalog from the storage metadata. This may change the index status of the inputFiles .
synchronizer.synchronizeCatalogFilesFromStorage(studyFqn, inputFiles, token, FILE_GET_QUERY_OPTIONS);
getSynchronizer().synchronizeCatalogFromStorage(studyFqn, inputFiles, token, FILE_GET_QUERY_OPTIONS);

logger.debug("Index - Number of files to be indexed: {}, list of files: {}", inputFiles.size(),
inputFiles.stream().map(File::getName).collect(Collectors.toList()));
Expand Down Expand Up @@ -334,13 +333,12 @@ private List<StoragePipelineResult> indexFiles(List<URI> fileUris, String token,
updateDefaultCohortStatus(studyFqn, prevDefaultCohortStatus, token);
}
if (fullSynchronize) {
synchronizer.synchronizeCatalogStudyFromStorage(studyFqn, token);
getSynchronizer().synchronizeCatalogFromStorage(studyFqn, token);
} else {
List<File> inputFiles = catalogManager.getFileManager().search(studyFqn,
new Query(FileDBAdaptor.QueryParams.URI.key(), fileUris),
new QueryOptions(QueryOptions.INCLUDE, "id,name,path,uri"), token).getResults();
synchronizer.synchronizeCatalogFilesFromStorage(studyFqn, inputFiles, token);
synchronizer.synchronizeCohorts(studyFqn, token);
getSynchronizer().synchronizeCatalogFromStorage(studyFqn, inputFiles, true, token);
}
}
variantStorageEngine.close();
Expand Down Expand Up @@ -810,6 +808,14 @@ private String getTransformedFileIdFromOriginal(File file) throws CatalogExcepti
return transformedFileId;
}

private CatalogStorageMetadataSynchronizer getSynchronizer() throws StorageEngineException {
if (_synchronizer == null) {
_synchronizer = new CatalogStorageMetadataSynchronizer(catalogManager, variantStorageEngine.getMetadataManager());

}
return _synchronizer;
}

private enum Type {
// AUTO, // TODO
TRANSFORM,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public Collection<String> stats(String study, List<String> cohorts, String regio
// Synchronize catalog with storage
CatalogStorageMetadataSynchronizer synchronizer =
new CatalogStorageMetadataSynchronizer(catalogManager, variantStorageEngine.getMetadataManager());
synchronizer.synchronizeCatalogStudyFromStorage(study, token);
synchronizer.synchronizeCatalogFromStorage(study, token);

Map<String, List<String>> cohortsMap = checkCanCalculateCohorts(study, cohorts, overwriteStats, resume, token);

Expand Down Expand Up @@ -108,7 +108,7 @@ public Collection<String> delete(String study, List<String> cohorts, ObjectMap p
// Synchronize catalog with storage
CatalogStorageMetadataSynchronizer synchronizer =
new CatalogStorageMetadataSynchronizer(catalogManager, variantStorageEngine.getMetadataManager());
synchronizer.synchronizeCatalogStudyFromStorage(study, token);
synchronizer.synchronizeCatalogFromStorage(study, token);

try {
// Modify cohort status to "INVALID"
Expand Down
Loading
Loading