Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additions #2

Merged
merged 20 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,60 @@ import java.nio.file.Paths
import java.util.List
import java.util.Map
import java.nio.file.Path
import java.io.OutputStream
import java.util.zip.GZIPOutputStream

import groovy.transform.CompileStatic
import groovy.json.JsonOutput
import groovy.util.logging.Slf4j

@Slf4j
@CompileStatic
class IridaNextOutput {
class IridaNextJSONOutput {
private Map files = ["global": [], "samples": [:]]
private Map metadata = ["samples": [:]]
private Map<String,Set<String>> scopeIds = ["samples": [] as Set<String>]
// private final Map<String, List<Map<Object, Object>>> files = ["global": [], "samples": []]
// private final Map<String, Map<Object, Object>> metadata = ["samples": []]
private Path relativizePath
private Boolean shouldRelativize

public IridaNextJSONOutput(Path relativizePath) {
this.relativizePath = relativizePath
this.shouldRelativize = (this.relativizePath != null)
}

public void appendMetadata(String scope, Map data) {
if (scope in metadata.keySet()) {
Map validMetadata = data.collectEntries { k, v ->
if (k in scopeIds[scope]) {
return [(k): v]
} else {
log.trace "scope=${scope}, id=${k} is not a valid identifier. Removing from metadata."
}
}
metadata[scope] = (metadata[scope] as Map) + validMetadata
}
}

public void addId(String scope, String id) {
log.trace "Adding scope=${scope} id=${id}"
scopeIds[scope].add(id)
}

public Boolean isValidId(String scope, String id) {
return id in scopeIds[scope]
}

public void addFile(String scope, String subscope, Path path) {
if (!(scope in files.keySet())) {
throw new Exception("scope=${scope} not in valid set of scopes: ${files.keySet()}")
}

if (shouldRelativize) {
path = relativizePath.relativize(path)
}

// Treat empty string and null as same
if (subscope == "") {
subscope = null
Expand All @@ -48,6 +85,8 @@ class IridaNextOutput {
if (scope == "samples" && subscope == null) {
throw new Exception("scope=${scope} but subscope is null")
} else if (scope == "samples" && subscope != null) {
assert isValidId(scope, subscope)

def files_scope_map = (Map)files_scope
if (!files_scope_map.containsKey(subscope)) {
files_scope_map[subscope] = []
Expand All @@ -68,4 +107,18 @@ class IridaNextOutput {
public String toJson() {
return JsonOutput.toJson(["files": files, "metadata": metadata])
}

public void write(Path path) {
// Documentation for reading/writing to Nextflow files using this method is available at
// https://www.nextflow.io/docs/latest/script.html#reading-and-writing
path.withOutputStream {
OutputStream outputStream = it as OutputStream
if (path.extension == 'gz') {
outputStream = new GZIPOutputStream(outputStream)
}

outputStream.write(JsonOutput.prettyPrint(toJson()).getBytes("utf-8"))
outputStream.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import nextflow.trace.TraceRecord
import nextflow.processor.TaskRun
import nextflow.script.params.FileOutParam
import nextflow.script.params.ValueOutParam
import nextflow.Nextflow

import nextflow.iridanext.IridaNextOutput
import nextflow.iridanext.IridaNextJSONOutput

/**
* IridaNext workflow observer
Expand All @@ -48,10 +49,17 @@ class IridaNextObserver implements TraceObserver {
private Map<Path,Path> publishedFiles = [:]
private List<TaskRun> tasks = []
private List traces = []
private IridaNextOutput iridaNextOutput = new IridaNextOutput()
private IridaNextJSONOutput iridaNextJSONOutput
private Map<String,List<PathMatcher>> pathMatchers
private List<PathMatcher> samplesMatchers
private List<PathMatcher> globalMatchers
private PathMatcher samplesMetadataMatcher
private String filesMetaId
private String samplesMetadataId
private Path iridaNextOutputPath
private Path outputFilesRootDir
private Boolean outputFileOverwrite
private Session session

public IridaNextObserver() {
pathMatchers = [:]
Expand All @@ -76,26 +84,56 @@ class IridaNextObserver implements TraceObserver {

@Override
void onFlowCreate(Session session) {
def iridaNextFiles = session.config.navigate('iridanext.files')
if (iridaNextFiles != null) {
if (!iridaNextFiles instanceof Map<String,Object>) {
throw new Exception("Expected a map in config for iridanext.files=${iridaNextFiles}")
this.session = session
Path relativizePath = null

Boolean relativizeOutputPaths = session.config.navigate('iridanext.output.relativize', true)

iridaNextOutputPath = session.config.navigate('iridanext.output.path') as Path
if (iridaNextOutputPath != null) {
iridaNextOutputPath = Nextflow.file(iridaNextOutputPath) as Path
if (relativizeOutputPaths) {
relativizePath = iridaNextOutputPath.getParent()
}
}

outputFileOverwrite = session.config.navigate('iridanext.output.overwrite', false)

Map<String,Object> iridaNextFiles = session.config.navigate('iridanext.output.files') as Map<String,Object>
if (iridaNextFiles != null) {
// Used for overriding the "meta.id" key used to define identifiers for a scope
// (e.g., by default meta.id is used for a sample identifier in a pipeline)
this.filesMetaId = iridaNextFiles?.idkey ?: "id"

iridaNextFiles = (Map<String,Object>)iridaNextFiles
iridaNextFiles.each {scope, matchers ->
if (matchers instanceof String) {
matchers = [matchers]
}
// "id" is a special keyword and isn't used for file matchers
if (scope != "idkey") {
if (matchers instanceof String) {
matchers = [matchers]
}

if (!(matchers instanceof List)) {
throw new Exception("Invalid configuration for iridanext.files=${iridaNextFiles}")
if (!(matchers instanceof List)) {
throw new Exception("Invalid configuration for iridanext.files=${iridaNextFiles}")
}

List<PathMatcher> matchersGlob = matchers.collect {FileSystems.getDefault().getPathMatcher("glob:${it}")}
addPathMatchers(scope, matchersGlob)
}
}
}

List<PathMatcher> matchersGlob = matchers.collect {FileSystems.getDefault().getPathMatcher("glob:${it}")}
addPathMatchers(scope, matchersGlob)
def iridaNextMetadata = session.config.navigate('iridanext.output.metadata')
if (iridaNextMetadata != null) {
if (!iridaNextMetadata instanceof Map<String,Object>) {
throw new Exception("Expected a map in config for iridanext.metadata=${iridaNextMetadata}")
}

Map<String, String> samplesMetadata = iridaNextMetadata["samples"] as Map<String,String>
samplesMetadataMatcher = FileSystems.getDefault().getPathMatcher("glob:${samplesMetadata['path']}")
samplesMetadataId = samplesMetadata["id"]
}

iridaNextJSONOutput = new IridaNextJSONOutput(relativizePath)
}

@Override
Expand All @@ -107,13 +145,46 @@ class IridaNextObserver implements TraceObserver {
publishedFiles[source] = destination
}

private Map<String, Object> csvToJsonById(Path path, String idColumn) {
path = Nextflow.file(path) as Path
List rowsList = path.splitCsv(header:true, strip:true, sep:',', quote:'\"')

Map<String, Object> rowsMap = rowsList.collectEntries { row ->
if (idColumn !in row) {
throw new Exception("Error: column with idColumn=${idColumn} not in CSV ${path}")
} else {
return [(row[idColumn] as String): (row as Map).findAll { it.key != idColumn }]
}
}

return rowsMap
}

private void processOutputPath(Path outputPath, Map<String,String> indexInfo) {
if (publishedFiles.containsKey(outputPath)) {
Path publishedPath = publishedFiles[outputPath]
def currScope = indexInfo["scope"]

if (pathMatchers[currScope].any {it.matches(publishedPath)}) {
iridaNextJSONOutput.addFile(currScope, indexInfo["subscope"], publishedPath)
}
} else {
log.trace "Not match outputPath: ${outputPath}"
}
}

@Override
void onFlowComplete() {
if (!session.isSuccess())
return

// Generate files section
// Some of this code derived from https://github.com/nextflow-io/nf-prov/blob/master/plugins/nf-prov
tasks.each { task ->
Map<Short,Map<String,String>> outParamInfo = [:]
def currSubscope = null
task.outputs.each { outParam, object ->
log.debug "task ${task}, outParam ${outParam}, object ${object}"
Short paramIndex = outParam.getIndex()
if (!outParamInfo.containsKey(paramIndex)) {
Map<String,String> currIndexInfo = [:]
Expand All @@ -122,35 +193,55 @@ class IridaNextObserver implements TraceObserver {
// case meta map
if (outParam instanceof ValueOutParam && object instanceof Map) {
Map objectMap = (Map)object
if (outParam.getName() == "meta" && "id" in objectMap) {
if (outParam.getName() == "meta" && this.filesMetaId in objectMap) {
log.trace "${this.filesMetaId} in ${objectMap}"
currIndexInfo["scope"] = "samples"
currIndexInfo["subscope"] = objectMap["id"].toString()
currIndexInfo["subscope"] = objectMap[this.filesMetaId].toString()
iridaNextJSONOutput.addId(currIndexInfo["scope"], currIndexInfo["subscope"])
} else {
throw new Exception("Found value channel output that doesn't have meta.id: ${objectMap}")
throw new Exception("Found value channel output in task [${task.getName()}] that doesn't have meta.${this.filesMetaId}: ${objectMap}")
}
} else {
currIndexInfo["scope"] = "global"
currIndexInfo["subscope"] = ""
}
}

Map<String,String> currIndexInfo = outParamInfo[paramIndex]
log.debug "Setup info task [${task.getName()}], outParamInfo[${paramIndex}]: ${outParamInfo[paramIndex]}"
}

if (object instanceof Path) {
Path processPath = (Path)object

if (publishedFiles.containsKey(processPath)) {
Path publishedPath = publishedFiles[processPath]
def currScope = currIndexInfo["scope"]

if (pathMatchers[currScope].any {it.matches(publishedPath)}) {
iridaNextOutput.addFile(currScope, currIndexInfo["subscope"], publishedPath)
log.debug "outParamInfo[${paramIndex}]: ${outParamInfo[paramIndex]}, object as Path: ${object as Path}"
processOutputPath(object as Path, outParamInfo[paramIndex])
} else if (object instanceof List) {
log.debug "outParamInfo[${paramIndex}]: ${outParamInfo[paramIndex]}, object as List: ${object as List}"
(object as List).each {
if (it instanceof Path) {
processOutputPath(it as Path, outParamInfo[paramIndex])
}
}
}
}
}

log.info "${JsonOutput.prettyPrint(iridaNextOutput.toJson())}"
// Generate metadata section
// some code derived from https://github.com/nextflow-io/nf-validation
if (samplesMetadataMatcher != null && samplesMetadataId != null) {
List matchedFiles = new ArrayList(publishedFiles.values().findAll {samplesMetadataMatcher.matches(it)})

if (!matchedFiles.isEmpty()) {
log.trace "Matched metadata: ${matchedFiles}"
Map metadataSamplesMap = csvToJsonById(matchedFiles[0] as Path, samplesMetadataId)
iridaNextJSONOutput.appendMetadata("samples", metadataSamplesMap)
}
}

if (iridaNextOutputPath != null) {
if (iridaNextOutputPath.exists() && !outputFileOverwrite) {
throw new Exception("Error: iridanext.output.path=${iridaNextOutputPath} exists and iridanext.output.overwrite=${outputFileOverwrite}")
} else {
iridaNextJSONOutput.write(iridaNextOutputPath)
log.debug "Wrote IRIDA Next output to ${iridaNextOutputPath}"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@ import nextflow.trace.TraceObserverFactory
* @author Paolo Di Tommaso <[email protected]>
*/
@CompileStatic
class IridaNextFactory implements TraceObserverFactory {
class IridaNextObserverFactory implements TraceObserverFactory {

@Override
Collection<TraceObserver> create(Session session) {
final result = new ArrayList()
result.add( new IridaNextObserver() )
final enabled = session.config.navigate('iridanext.enabled')

if (enabled) {
result.add( new IridaNextObserver() )
}

return result
}
}
2 changes: 1 addition & 1 deletion plugins/nf-iridanext/src/resources/META-INF/extensions.idx
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nextflow.iridanext.IridaNextFactory
nextflow.iridanext.IridaNextObserverFactory
Loading