Skip to content

Commit

Permalink
Merge pull request #98 from pyiron/parse_full_workflow
Browse files Browse the repository at this point in the history
Parse full workflow
  • Loading branch information
samwaseda authored Jan 22, 2025
2 parents 5296df6 + 0073dfd commit 194eec8
Show file tree
Hide file tree
Showing 2 changed files with 272 additions and 70 deletions.
247 changes: 203 additions & 44 deletions pyiron_ontology/parser.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
from typing import TypeAlias

from semantikon.converter import parse_input_args, parse_output_args
from rdflib import Graph, Literal, RDF, RDFS, URIRef, OWL
from pyiron_workflow import NOT_DATA
from rdflib import Graph, Literal, RDF, RDFS, URIRef, OWL, PROV, Namespace
from pyiron_workflow import NOT_DATA, Workflow, Macro
from pyiron_workflow.node import Node


class PNS:
BASE = Namespace("http://pyiron.org/ontology/")
hasNode = BASE["hasNode"]
hasSourceFunction = BASE["hasSourceFunction"]
hasUnits = BASE["hasUnits"]
inheritsPropertiesFrom = BASE["inheritsPropertiesFrom"]
inputOf = BASE["inputOf"]
outputOf = BASE["outputOf"]


def get_source_output(var):
def get_source_output(var: Node) -> str | None:
if not var.connected:
return None
connection = var.connections[0]
return f"{connection.owner.label}.outputs.{connection.label}"


def get_inputs_and_outputs(node):
def get_inputs_and_outputs(node: Node) -> dict:
"""
Read input and output arguments with their type hints and return a
dictionary containing all input output information
Expand All @@ -22,6 +35,8 @@ def get_inputs_and_outputs(node):
(dict): dictionary containing input output args, type hints, values
and variable names
"""
if isinstance(node, Macro):
raise NotImplementedError("Macros are not supported yet")
inputs = parse_input_args(node.node_function)
outputs = parse_output_args(node.node_function)
if isinstance(outputs, dict):
Expand All @@ -47,47 +62,100 @@ def get_inputs_and_outputs(node):


def get_triples(
data,
NS,
hasSourceFunction=None,
hasUnits=None,
inheritsPropertiesFrom=None,
update_query=True,
):
if hasSourceFunction is None:
hasSourceFunction = NS.hasSourceFunction
if hasUnits is None:
hasUnits = NS.hasUnits
if inheritsPropertiesFrom is None:
inheritsPropertiesFrom = NS.inheritsPropertiesFrom
data: dict,
workflow_namespace: str | None = None,
) -> Graph:
"""
Generate triples from a dictionary containing input output information. The
dictionary should be obtained from the get_inputs_and_outputs function, and
should contain the keys "inputs", "outputs", "function" and "label". Within
"inputs" and "outputs", the keys should be the variable names, and the values
should be dictionaries containing the keys "type", "value", "var_name" and
"connection". The "connection" key should contain the label of the output
variable that the input is connected to. The "type" key should contain the
URI of the type of the variable. The "value" key should contain the value of
the variable. The "var_name" key should contain the variable name. The "function"
key should contain the name of the function that the node is connected to. The
"label" key should contain the label of the node. In terms of python code,
it should look like this:
>>> data = {
>>> "inputs": {
>>> "input1": {
>>> "type": URIRef("http://example.org/Type"),
>>> "value": 1,
>>> "triples": some_triples,
>>> "restrictions": some_restrictions,
>>> "var_name": "input1",
>>> "connection": "output1"
>>> }
>>> },
>>> "outputs": {
>>> "output1": {
>>> "type": URIRef("http://example.org/Type"),
>>> "value": 1,
>>> "triples": other_triples,
>>> "var_name": "output1"
>>> }
>>> },
>>> "function": "function_name",
>>> "label": "label"
>>> }
triples should consist of a list of tuples, where each tuple contains 2 or 3
elements. If the tuple contains 2 elements, the first element should be the
predicate and the second element should be the object, in order for the subject
to be generated from the variable name.
Args:
data (dict): dictionary containing input output information
workflow_namespace (str): namespace of the workflow
Returns:
(rdflib.Graph): graph containing triples
"""
if workflow_namespace is None:
workflow_namespace = ""
else:
workflow_namespace += "."
graph = Graph()
full_label = workflow_namespace + data["label"]
# Triple already exists
label_def_triple = (NS[data["label"]], hasSourceFunction, NS[data["function"]])
label_def_triple = (URIRef(full_label), RDF.type, PROV.Activity)
if len(list(graph.triples(label_def_triple))) > 0:
return graph
graph.add(label_def_triple)
graph.add((URIRef(full_label), PNS.hasSourceFunction, URIRef(data["function"])))
for io_ in ["inputs", "outputs"]:
for key, d in data[io_].items():
full_key = data["label"] + f".{io_}." + key
label = NS[full_key]
full_key = full_label + f".{io_}." + key
label = URIRef(full_key)
graph.add((label, RDFS.label, Literal(full_key)))
graph.add((label, RDF.type, PROV.Entity))
if d.get("uri", None) is not None:
graph.add((label, RDF.type, d["uri"]))
graph.add((label, RDF.type, URIRef(d["uri"])))
if d.get("value", NOT_DATA) is not NOT_DATA:
graph.add((label, RDF.value, Literal(d["value"])))
graph.add((label, NS[io_[:-1] + "Of"], NS[data["label"]]))
if io_ == "inputs":
graph.add((label, PNS.inputOf, URIRef(full_label)))
elif io_ == "outputs":
graph.add((label, PNS.outputOf, URIRef(full_label)))
if d.get("units", None) is not None:
graph.add((label, hasUnits, NS[d["units"]]))
graph.add((label, PNS.hasUnits, URIRef(d["units"])))
if d.get("connection", None) is not None:
graph.add((label, inheritsPropertiesFrom, NS[d["connection"]]))
for t in _get_triples_from_restrictions(d, NS):
graph.add(_parse_triple(t, NS, label=label, data=data))
if update_query:
inherit_properties(graph, NS)
graph.add(
(
label,
PNS.inheritsPropertiesFrom,
URIRef(workflow_namespace + d["connection"]),
)
)
for t in _get_triples_from_restrictions(d):
graph.add(_parse_triple(t, ns=full_label, label=label))
return graph


def _get_triples_from_restrictions(data, NS):
def _get_triples_from_restrictions(data: dict) -> list:
triples = []
if data.get("restrictions", None) is not None:
triples = restriction_to_triple(data["restrictions"])
Expand All @@ -99,38 +167,86 @@ def _get_triples_from_restrictions(data, NS):
return triples


def restriction_to_triple(restrictions):
triples = []
assert isinstance(restrictions, tuple) and isinstance(restrictions[0], tuple)
if not isinstance(restrictions[0][0], tuple):
restrictions = (restrictions,)
for r in restrictions:
assert len(r[0]) == 2
_rest_type: TypeAlias = tuple[tuple[URIRef, URIRef], ...]


def _validate_restriction_format(
restrictions: _rest_type | tuple[_rest_type] | list[_rest_type],
) -> tuple[_rest_type]:
if not all(isinstance(r, tuple) for r in restrictions):
raise ValueError("Restrictions must be tuples of URIRefs")
elif all(isinstance(rr, URIRef) for r in restrictions for rr in r):
return (restrictions,)
elif all(isinstance(rrr, URIRef) for r in restrictions for rr in r for rrr in rr):
return restrictions
else:
raise ValueError("Restrictions must be tuples of URIRefs")


def restriction_to_triple(
restrictions: _rest_type | tuple[_rest_type] | list[_rest_type],
) -> list[tuple[URIRef | None, URIRef, URIRef]]:
"""
Convert restrictions to triples
Args:
restrictions (tuple): tuple of restrictions
Returns:
(list): list of triples
In the semantikon notation, restrictions are given in the format:
>>> restrictions = (
>>> (OWL.onProperty, EX.HasSomething),
>>> (OWL.someValuesFrom, EX.Something)
>>> )
This tuple is internally converted to the triples:
>>> (
>>> (EX.HasSomethingRestriction, RDF.type, OWL.Restriction),
>>> (EX.HasSomethingRestriction, OWL.onProperty, EX.HasSomething),
>>> (EX.HasSomethingRestriction, OWL.someValuesFrom, EX.Something),
>>> (my_object, RDFS.subClassOf, EX.HasSomethingRestriction)
>>> )
"""
restrictions_collection = _validate_restriction_format(restrictions)
triples: list[tuple[URIRef | None, URIRef, URIRef]] = []
for r in restrictions_collection:
label = r[0][1] + "Restriction"
triples.append((label, RDF.type, OWL.Restriction))
for rr in r:
triples.append((label, rr[0], rr[1]))
triples.append((RDF.type, label))
triples.append((None, RDF.type, label))
return triples


def _parse_triple(triples, NS, label=None, data=None):
def _parse_triple(
triples: tuple,
ns: str,
label: URIRef | None = None,
) -> tuple:
if len(triples) == 2:
subj, pred, obj = label, triples[0], triples[1]
elif len(triples) == 3:
subj, pred, obj = triples
else:
raise ValueError("Triple must have 2 or 3 elements")
if subj is None:
subj = label
if obj is None:
obj = label
if obj.startswith("inputs.") or obj.startswith("outputs."):
obj = data["label"] + "." + obj
obj = ns + "." + obj
if not isinstance(obj, URIRef):
obj = NS[obj]
obj = URIRef(obj)
return subj, pred, obj


def inherit_properties(graph, NS, n=None):
def _inherit_properties(graph: Graph, n: int | None = None):
update_query = (
f"PREFIX ns: <{NS}>",
f"PREFIX ns: <{PNS.BASE}>",
f"PREFIX rdfs: <{RDFS}>",
f"PREFIX rdf: <{RDF}>",
"",
Expand All @@ -147,12 +263,21 @@ def inherit_properties(graph, NS, n=None):
"}",
)
if n is None:
n = len(list(graph.triples((None, NS.inheritsPropertiesFrom, None))))
n = len(list(graph.triples((None, PNS.inheritsPropertiesFrom, None))))
for _ in range(n):
graph.update("\n".join(update_query))


def validate_values(graph):
def validate_values(graph: Graph) -> list:
"""
Validate if all values required by restrictions are present in the graph
Args:
graph (rdflib.Graph): graph to be validated
Returns:
(list): list of missing triples
"""
missing_triples = []
for restrictions in graph.subjects(RDF.type, OWL.Restriction):
on_property = graph.value(restrictions, OWL.onProperty)
Expand All @@ -165,3 +290,37 @@ def validate_values(graph):
(instance, on_property, some_values_from)
)
return missing_triples


def parse_workflow(
workflow: Workflow,
graph: Graph | None = None,
inherit_properties: bool = True,
) -> Graph:
"""
Generate RDF graph from a pyiron workflow object
Args:
workflow (pyiron_workflow.workflow.Workflow): workflow object
graph (rdflib.Graph): graph to be updated
inherit_properties (bool): if True, properties are inherited
Returns:
(rdflib.Graph): graph containing workflow information
"""
if graph is None:
graph = Graph()
workflow_label = URIRef(workflow.label)
graph.add((workflow_label, RDFS.label, Literal(workflow.label)))
for node in workflow:
data = get_inputs_and_outputs(node)
graph.add(
(workflow_label, PNS.hasNode, URIRef(workflow.label + "." + data["label"]))
)
graph += get_triples(
data=data,
workflow_namespace=workflow.label,
)
if inherit_properties:
_inherit_properties(graph)
return graph
Loading

0 comments on commit 194eec8

Please sign in to comment.