From aa7ea8e463e41f5743ba1505015a5f4c2a88b6ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20H=C3=B6rstrup?= Date: Thu, 15 Jun 2023 22:05:58 +0200 Subject: [PATCH 01/30] Adaptions and fixes for extension (#20) This changes mainly modify the scheduling and petri net generation code. The PFDL VS Code Extension needs information about the different components (Services, Loops, ..) to group them together inside a box. Moreover, efforts were made to properly cluster petri net components by the dot engine, so the overall appearance of large petri net is enhanced. Finally, some adapations for the observers were made, to provide more information like the used PFDL string. Changes: - Cluster Petri Net components of the PFDL with the help of the Snakes library - Assign group ids to components so they can be - Save group ids and cluster information in the dot file - Adaptions to Dashboard Observer --- pfdl_scheduler/extension.py | 23 +- pfdl_scheduler/petri_net/drawer.py | 34 ++- pfdl_scheduler/petri_net/generator.py | 304 +++++++++++++++++---- pfdl_scheduler/petri_net/logic.py | 26 +- pfdl_scheduler/scheduler.py | 34 ++- pfdl_scheduler/utils/dashboard_observer.py | 66 +++-- pfdl_scheduler/utils/log_entry_observer.py | 14 +- pfdl_scheduler/utils/parsing_utils.py | 9 +- 8 files changed, 388 insertions(+), 122 deletions(-) diff --git a/pfdl_scheduler/extension.py b/pfdl_scheduler/extension.py index 9c245a4..916c01f 100644 --- a/pfdl_scheduler/extension.py +++ b/pfdl_scheduler/extension.py @@ -6,12 +6,11 @@ """Contains the start up script used in the VSCode extension. -A program executed in the VS Code extension which -has a string containing a PFDL program as input. +A program that shall be executed in the VS Code extension which has a string containing a PFDL program as input as well as the name of the corresponding file. """ # standard libraries -import sys +import argparse # local sources from pfdl_scheduler.utils.parsing_utils import parse_string @@ -19,9 +18,23 @@ def main(): - valid, process = parse_string(sys.argv[2], used_in_extension=True) + parser = argparse.ArgumentParser( + description="A program that shall be executed in the VS Code extension which has a string containing a PFDL program as input as well as the name of the corresponding file." + ) + parser.add_argument("pfdl_string", type=str, help="The content of a given PFDL file as string.") + parser.add_argument("file_name", type=str, help="The name of the given PFDL file.") + args = parser.parse_args() + + pfdl_string = "" + if args.pfdl_string: + pfdl_string = args.pfdl_string + + valid, process = parse_string(pfdl_string, used_in_extension=True) if valid: - petrinet_generator = PetriNetGenerator(used_in_extension=True) + file_name = "" + if args.file_name: + file_name = args.file_name + petrinet_generator = PetriNetGenerator(used_in_extension=True, file_name=file_name) petrinet_generator.generate_petri_net(process) diff --git a/pfdl_scheduler/petri_net/drawer.py b/pfdl_scheduler/petri_net/drawer.py index 6030b48..696f3fd 100644 --- a/pfdl_scheduler/petri_net/drawer.py +++ b/pfdl_scheduler/petri_net/drawer.py @@ -8,7 +8,6 @@ # standard libraries import threading -from pathlib import Path # 3rd party lib import snakes.plugins @@ -18,26 +17,38 @@ draw_lock = threading.Lock() -"""Constants that are used in the drawer functions""" -NODE_SEP_VALUE = 5 +# Constants that are used in the drawer functions below. +# They are used by Snakes to pass the values to the dot engine +# Attribute overview: https://graphviz.org/doc/info/attrs.html +LAYOUT_METHOD = "dot" # the preferred layout engine (changes the position of the nodes) + +# Graph attributes +GRAPH_MARGIN = 15 # margin of the graph to the canvas (in inches) +NEW_RANK_VALUE = "true" # allow different positioning for clustering + +# Place attributes PLACE_SHAPE = "circle" PLACE_LABEL = "" +# Transition attributes TRANSITION_SHAPE = "rect" TRANSITION_FILL_COLOR = "black" TRANSITION_WIDTH = 1 TRANSITION_HEIGHT = 0.1 TRANSITION_LABEL = "" +# Arc attributes INHIBITOR_ARC_ARROW_HEAD = "odot" +DEFAULT_ARC_LABEL = "" -LAYOUT_METHOD = "dot" +DEFAULT_CLUSTER_STYLE = "" def draw_graph(graph, attr): """Set attributes for drawing the net.""" - attr["nodesep"] = NODE_SEP_VALUE + attr["margin"] = GRAPH_MARGIN + attr["newrank"] = NEW_RANK_VALUE def draw_place(place, attr): @@ -47,6 +58,8 @@ def draw_place(place, attr): else: attr["xlabel"] = place.name + attr["group"] = place.label("group_id") + if 1 in place: attr["label"] = "•" else: @@ -57,24 +70,28 @@ def draw_place(place, attr): def draw_transition(trans, attr): """Set attributes for drawing transitions.""" - attr["label"] = TRANSITION_LABEL + attr["label"] = "" # TRANSITION_LABEL attr["shape"] = TRANSITION_SHAPE attr["height"] = TRANSITION_HEIGHT attr["width"] = TRANSITION_WIDTH attr["fillcolor"] = TRANSITION_FILL_COLOR + attr["group"] = trans.label("group_id") def draw_arcs(arc, attr): """Set attributes for drawing arcs.""" if isinstance(arc, snakes.nets.Inhibitor): attr["arrowhead"] = INHIBITOR_ARC_ARROW_HEAD - attr["label"] = "" + attr["label"] = DEFAULT_ARC_LABEL + + +def draw_clusters(clust, attr): + attr["style"] = DEFAULT_CLUSTER_STYLE def draw_petri_net(net, file_path, file_ending=".png"): """Calls the draw method form the Snakes module on the given PetriNet.""" with draw_lock: - Path("./temp").mkdir(parents=True, exist_ok=True) net.draw( file_path + file_ending, LAYOUT_METHOD, @@ -82,4 +99,5 @@ def draw_petri_net(net, file_path, file_ending=".png"): arc_attr=draw_arcs, place_attr=draw_place, trans_attr=draw_transition, + cluster_attr=draw_clusters, ) diff --git a/pfdl_scheduler/petri_net/generator.py b/pfdl_scheduler/petri_net/generator.py index fe5e271..d907718 100644 --- a/pfdl_scheduler/petri_net/generator.py +++ b/pfdl_scheduler/petri_net/generator.py @@ -10,6 +10,8 @@ from typing import Any, Callable, Dict, List, OrderedDict, Tuple import uuid import functools +import json +from pathlib import Path # 3rd party packages from snakes import plugins @@ -29,8 +31,31 @@ from pfdl_scheduler.petri_net.drawer import draw_petri_net from pfdl_scheduler.petri_net.callbacks import PetriNetCallbacks -plugins.load(["labels", "gv"], "snakes.nets", "nets") -from nets import PetriNet, Place, Transition, Value +plugins.load(["labels", "gv", "clusters"], "snakes.nets", "nets") + +from nets import PetriNet, Place, Transition, Value, Cluster + + +class Node(object): + def __init__(self, group_id: str, name="", parent: "Node" = None): + self.group_id: str = group_id + self.name: str = name + self.children: List[Node] = [] + self.cluster = None + + if parent: + parent.add_child(self) + + def add_child(self, node: "Node"): + self.children.append(node) + + def toJSON(self): + children_list = [] + for child in self.children: + child_dict = child.toJSON() + children_list.append(child_dict) + json_dict = {"id": self.group_id, "name": self.name, "children": children_list} + return json_dict class PetriNetGenerator: @@ -45,6 +70,7 @@ class PetriNetGenerator: task_started_id: The id of the 'Task started' place. callbacks: A PetriNetCallbacks instance representing functions called while execution. generate_test_ids: A boolean indicating if test ids (counting from 0) should be generated. + used_in_extension: A boolean indicating if the Generator is used within the extension. """ def __init__( @@ -53,6 +79,7 @@ def __init__( used_in_extension: bool = False, generate_test_ids: bool = False, draw_net: bool = True, + file_name: str = "petri_net", ) -> None: """Initialize the object. @@ -64,11 +91,13 @@ def __init__( """ if used_in_extension: - self.path_for_image: str = "../media/petri_net" + self.path_for_image: str = "../media/" + file_name elif path_for_image == "": - self.path_for_image: str = "temp/petri_net" + Path("./temp").mkdir(parents=True, exist_ok=True) + self.path_for_image: str = "temp/" + file_name else: - self.path_for_image: str = path_for_image + "/temp/petri_net" + Path("./temp").mkdir(parents=True, exist_ok=True) + self.path_for_image: str = path_for_image + "/temp/" + file_name self.net: PetriNet = PetriNet("petri_net") self.draw_net: bool = draw_net @@ -78,6 +107,10 @@ def __init__( self.task_started_id: str = "" self.callbacks: PetriNetCallbacks = PetriNetCallbacks() self.generate_test_ids: bool = generate_test_ids + self.used_in_extension: bool = used_in_extension + self.tree = None + self.file_name = file_name + self.cluster = None def add_callback(self, transition_id: str, callback_function: Callable, *args: Any) -> None: """Registers the given callback function in the transition_dict. @@ -90,11 +123,12 @@ def add_callback(self, transition_id: str, callback_function: Callable, *args: A callback_function: The callback function which should be called. *args: Arguments with which the callback function is called. """ - callback = functools.partial(callback_function, *args) - if transition_id not in self.transition_dict: - self.transition_dict[transition_id] = [] + if not self.used_in_extension: + callback = functools.partial(callback_function, *args) + if transition_id not in self.transition_dict: + self.transition_dict[transition_id] = [] - self.transition_dict[transition_id].append(callback) + self.transition_dict[transition_id].append(callback) def generate_petri_net(self, process: Process) -> PetriNet: """Generates a Petri Net from the given Process object. @@ -109,29 +143,51 @@ def generate_petri_net(self, process: Process) -> PetriNet: self.tasks = process.tasks for task in process.tasks.values(): if task.name == "productionTask": + group_id = str(uuid.uuid4()) + self.tree = Node(group_id, task.name) + task_context = TaskAPI(task, None) if self.generate_test_ids: task_context.uuid = "0" - self.task_started_id = create_place(task.name + "_started", self.net) - connection_id = create_transition("", "", self.net) + self.task_started_id = create_place( + task.name + "_started", self.net, group_id, [0, 0] + ) + connection_id = create_transition("", "", self.net, group_id) self.add_callback(connection_id, self.callbacks.task_started, task_context) self.net.add_input(self.task_started_id, connection_id, Value(1)) - self.task_finished_id = create_place(task.name + "_finished", self.net) + self.task_finished_id = create_place(task.name + "_finished", self.net, group_id) - second_connection_id = create_transition("", "", self.net) + second_connection_id = create_transition("", "", self.net, group_id) + self.tree.cluster = Cluster( + [ + self.task_started_id, + connection_id, + second_connection_id, + self.task_finished_id, + ] + ) self.generate_statements( - task_context, task.statements, connection_id, second_connection_id + task_context, task.statements, connection_id, second_connection_id, self.tree ) self.net.add_output(self.task_finished_id, second_connection_id, Value(1)) self.add_callback(second_connection_id, self.callbacks.task_finished, task_context) + # assign new clusters before drawing + self.net.clusters = self.tree.cluster + if self.draw_net: - draw_petri_net(self.net, self.path_for_image) + json_string = json.dumps(self.tree.toJSON(), indent=4) + draw_petri_net(self.net, self.path_for_image, ".dot") + draw_petri_net(self.net, self.path_for_image, ".png") + with open(self.path_for_image + ".dot", "a") as file: + file.write("\ncall_tree:") + file.write(json_string) + return self.net def generate_statements( @@ -140,6 +196,7 @@ def generate_statements( statements: List, first_connection_id: str, last_connection_id: str, + node: Node, in_loop: bool = False, ) -> List[str]: """Generate Petri Net components for each statement in the given Task @@ -162,14 +219,24 @@ def generate_statements( # and we are not in the last iteration if multiple_statements: if i < len(statements) - 1: - current_connection_id = create_transition("connection", "", self.net) + current_connection_id = create_transition( + "connection", "", self.net, node.group_id + ) + node.cluster.add_node(current_connection_id) else: current_connection_id = last_connection_id else: previous_connection_id = first_connection_id current_connection_id = last_connection_id - args = (statement, task_context, previous_connection_id, current_connection_id, in_loop) + args = ( + statement, + task_context, + previous_connection_id, + current_connection_id, + node, + in_loop, + ) if isinstance(statement, Service): connection_ids = [self.generate_service(*args)] @@ -194,6 +261,7 @@ def generate_service( task_context: TaskAPI, first_transition_id: str, second_transition_id: str, + node: Node, in_loop: bool = False, ) -> str: """Generate the Petri Net components for a Service Call. @@ -201,15 +269,18 @@ def generate_service( Returns: The id of the last transition of the Service petri net component. """ + group_id = str(uuid.uuid4()) + Node(group_id, service.name, node) + service_api = ServiceAPI(service, task_context, in_loop=in_loop) - service_started_id = create_place(service.name + " started", self.net) + service_started_id = create_place(service.name + " started", self.net, group_id) + service_finished_id = create_place(service.name + " finished", self.net, group_id) - service_finished_id = create_place(service.name + " finished", self.net) self.place_dict[service_api.uuid] = service_finished_id - service_done_id = create_place(service.name + " done", self.net) - service_done_transition_id = create_transition("service_done", "", self.net) + service_done_id = create_place(service.name + " done", self.net, group_id) + service_done_transition_id = create_transition("service_done", "", self.net, group_id) self.add_callback(first_transition_id, self.callbacks.service_started, service_api) self.add_callback(service_done_transition_id, self.callbacks.service_finished, service_api) @@ -221,6 +292,18 @@ def generate_service( self.net.add_output(service_started_id, first_transition_id, Value(1)) self.net.add_input(service_done_id, second_transition_id, Value(1)) + node.cluster.add_child( + ( + Cluster( + [ + service_started_id, + service_finished_id, + service_done_transition_id, + service_done_id, + ] + ) + ) + ) return service_done_transition_id def generate_task_call( @@ -229,6 +312,7 @@ def generate_task_call( task_context: TaskAPI, first_transition_id: str, second_transition_id: str, + node: Node, in_loop: bool = False, ) -> List[str]: """Generate the Petri Net components for a Task Call. @@ -239,6 +323,13 @@ def generate_task_call( called_task = self.tasks[task_call.name] new_task_context = TaskAPI(called_task, task_context, task_call=task_call, in_loop=in_loop) + group_id = str(uuid.uuid4()) + task_node = Node(group_id, task_call.name, node) + + task_cluster = Cluster([]) + node.cluster.add_child(task_cluster) + task_node.cluster = task_cluster + # Order for callbacks important: Task starts before statement and finishes after self.add_callback(first_transition_id, self.callbacks.task_started, new_task_context) last_connection_ids = self.generate_statements( @@ -246,6 +337,7 @@ def generate_task_call( called_task.statements, first_transition_id, second_transition_id, + task_node, in_loop, ) @@ -260,6 +352,7 @@ def generate_parallel( task_context: TaskAPI, first_transition_id: str, second_transition_id: str, + node: Node, in_loop: bool = False, ) -> str: """Generate the Petri Net components for a Parallel statement. @@ -268,11 +361,23 @@ def generate_parallel( The id of the last transition of the Parallel petri net component. """ - sync_id = create_transition("", "", self.net) - parallel_finished_id = create_place("Parallel finished", self.net) + group_id = str(uuid.uuid4()) + parallel_node = Node(group_id, "Parallel", node) + + sync_id = create_transition("", "", self.net, group_id) + parallel_finished_id = create_place("Parallel finished", self.net, group_id) + + cluster = Cluster([], Cluster([sync_id, parallel_finished_id])) + node.cluster.add_child(cluster) + parallel_node.cluster = cluster for task_call in parallel.task_calls: - self.generate_task_call(task_call, task_context, first_transition_id, sync_id, in_loop) + parallel_cluster = Cluster([]) + cluster.add_child(parallel_cluster) + parallel_node.cluster = parallel_cluster + self.generate_task_call( + task_call, task_context, first_transition_id, sync_id, parallel_node, in_loop + ) self.net.add_output(parallel_finished_id, sync_id, Value(1)) self.net.add_input(parallel_finished_id, second_transition_id, Value(1)) @@ -284,6 +389,7 @@ def generate_condition( task_context: TaskAPI, first_transition_id: str, second_transition_id: str, + node: Node, in_loop: bool = False, ) -> List[str]: """Generate Petri Net components for the Condition statement. @@ -291,15 +397,20 @@ def generate_condition( Returns: The ids of the last transitions of the Condition petri net component. """ - passed_id = create_place("Passed", self.net) - failed_id = create_place("Failed", self.net) + group_id = str(uuid.uuid4()) + condition_node = Node(group_id, "Condition", node) + + passed_id = create_place("Passed", self.net, group_id) + failed_id = create_place("Failed", self.net, group_id) expression_id = create_place( - "If \n " + self.parse_expression(condition.expression), self.net + "If " + self.parse_expression(condition.expression), + self.net, + group_id, ) - first_passed_transition_id = create_transition("", "", self.net) - first_failed_transition_id = create_transition("", "", self.net) + first_passed_transition_id = create_transition("", "", self.net, group_id) + first_failed_transition_id = create_transition("", "", self.net, group_id) self.net.add_input(expression_id, first_passed_transition_id, Value(1)) self.net.add_input(expression_id, first_failed_transition_id, Value(1)) @@ -307,16 +418,26 @@ def generate_condition( self.net.add_input(passed_id, first_passed_transition_id, Value(1)) self.net.add_input(failed_id, first_failed_transition_id, Value(1)) - finished_id = create_place("Condition_Finished", self.net) + finished_id = create_place("Condition_Finished", self.net, group_id) - second_passed_transition_id = create_transition("", "", self.net) + second_passed_transition_id = create_transition("", "", self.net, group_id) self.net.add_output(finished_id, second_passed_transition_id, Value(1)) + cluster = Cluster([passed_id, failed_id, expression_id, finished_id]) + node.cluster.add_child(cluster) + + cluster_passed = Cluster([first_passed_transition_id, second_passed_transition_id]) + cluster_failed = Cluster([first_failed_transition_id]) + cluster.add_child(cluster_passed) + cluster.add_child(cluster_failed) + condition_node.cluster = cluster_passed + self.generate_statements( task_context, condition.passed_stmts, first_passed_transition_id, second_passed_transition_id, + condition_node, in_loop, ) @@ -327,12 +448,15 @@ def generate_condition( self.add_callback(first_transition_id, self.callbacks.condition_started, *args) if condition.failed_stmts: - second_failed_transition_id = create_transition("", "", self.net) + condition_node.cluster = cluster_failed + second_failed_transition_id = create_transition("", "", self.net, group_id) + cluster_failed.add_node(second_failed_transition_id) self.generate_statements( task_context, condition.failed_stmts, first_failed_transition_id, second_failed_transition_id, + condition_node, in_loop, ) @@ -348,6 +472,7 @@ def generate_counting_loop( task_context: TaskAPI, first_transition_id: str, second_transition_id: str, + node: Node, in_loop: bool = False, ) -> str: """Generates the Petri Net components for a Couting Loop. @@ -357,19 +482,21 @@ def generate_counting_loop( """ if loop.parallel: return self.generate_parallel_loop( - loop, task_context, first_transition_id, second_transition_id + loop, task_context, first_transition_id, second_transition_id, node ) - loop_id = create_place("Loop", self.net) + group_id = str(uuid.uuid4()) + counting_loop_node = Node(group_id, "Counting Loop", node) + loop_id = create_place("Loop", self.net, group_id) loop_text = "Loop" - loop_statements_id = create_place(loop_text, self.net) - loop_finished_id = create_place("Number of Steps Done", self.net) + loop_statements_id = create_place(loop_text, self.net, group_id) + loop_finished_id = create_place("Number of Steps Done", self.net, group_id) - condition_passed_transition_id = create_transition("", "", self.net) - condition_failed_transition_id = create_transition("", "", self.net) - iteration_step_done_transition_id = create_transition("", "", self.net) + condition_passed_transition_id = create_transition("", "", self.net, group_id) + condition_failed_transition_id = create_transition("", "", self.net, group_id) + iteration_step_done_transition_id = create_transition("", "", self.net, group_id) self.net.add_input(loop_id, condition_passed_transition_id, Value(1)) self.net.add_input(loop_statements_id, condition_passed_transition_id, Value(1)) @@ -377,12 +504,29 @@ def generate_counting_loop( self.net.add_input(loop_finished_id, condition_failed_transition_id, Value(1)) self.net.add_output(loop_id, iteration_step_done_transition_id, Value(1)) - loop_done_id = create_place("Loop Done", self.net) + loop_done_id = create_place("Loop Done", self.net, group_id) + + cluster = Cluster( + [ + loop_id, + loop_statements_id, + loop_finished_id, + condition_passed_transition_id, + condition_failed_transition_id, + iteration_step_done_transition_id, + loop_done_id, + ] + ) + + node.cluster.add_child(cluster) + counting_loop_node.cluster = cluster + self.generate_statements( task_context, loop.statements, condition_passed_transition_id, iteration_step_done_transition_id, + counting_loop_node, True, ) @@ -404,6 +548,7 @@ def generate_parallel_loop( task_context: TaskAPI, first_transition_id: str, second_transition_id: str, + node: Node, ) -> str: """Generates the static petri net components for a ParallelLoop. @@ -413,10 +558,18 @@ def generate_parallel_loop( Returns: The id of the last transition of the ParallelLoop petri net component. """ + + group_id = str(uuid.uuid4()) + parallel_loop_node = Node(group_id, "Parallel Loop", node) + parallel_loop_started = create_place( - "Start " + loop.statements[0].name + " in parallel", self.net + "Start " + loop.statements[0].name + " in parallel", + self.net, + group_id, ) - + cluster = Cluster([parallel_loop_started]) + node.cluster.add_child(cluster) + parallel_loop_node.cluster = cluster self.net.add_output(parallel_loop_started, first_transition_id, Value(1)) self.net.add_input(parallel_loop_started, second_transition_id, Value(1)) @@ -427,6 +580,7 @@ def generate_parallel_loop( parallel_loop_started, first_transition_id, second_transition_id, + parallel_loop_node, ) self.add_callback(first_transition_id, self.callbacks.parallel_loop_started, *args) return second_transition_id @@ -435,16 +589,20 @@ def remove_place_on_runtime(self, place_id: str) -> None: """Removes a place from the petri net at runtime. Args: - place_id: The id as stringg of the task which should be removed from the net. + place_id: The id as string of the task which should be removed from the net. """ - self.net.remove_place(place_id) - if self.draw_net: - draw_petri_net(self.net, self.path_for_image) + if self.net.has_place(place_id): + # temporary fix + # self.net.clusters.remove_node(self.task_started_id) + # self.net.remove_place(self.task_started_id) + + if self.draw_net: + draw_petri_net(self.net, self.path_for_image) def generate_empty_parallel_loop( self, first_transition_id: str, second_transition_id: str ) -> None: - empty_loop_place = create_place("Exeute 0 tasks", self.net) + empty_loop_place = create_place("Execute 0 tasks", self.net) self.net.add_output(empty_loop_place, first_transition_id, Value(1)) self.net.add_input(empty_loop_place, second_transition_id, Value(1)) @@ -454,6 +612,7 @@ def generate_while_loop( task_context: TaskAPI, first_transition_id: str, second_transition_id: str, + node: Node, in_loop: bool = False, ) -> str: """Generate the Petri Net components for a While Loop. @@ -461,16 +620,19 @@ def generate_while_loop( Returns: The id of the last transition of the WhileLoop petri net component. """ - loop_id = create_place("Loop", self.net) + group_id = str(uuid.uuid4()) + while_loop_node = Node(group_id, "While Loop", node) + + loop_id = create_place("Loop", self.net, group_id) loop_text = "Loop" - loop_statements_id = create_place(loop_text, self.net) - loop_finished_id = create_place("Number of Steps Done", self.net) + loop_statements_id = create_place(loop_text, self.net, group_id) + loop_finished_id = create_place("Number of Steps Done", self.net, group_id) - condition_passed_transition_id = create_transition("", "", self.net) - condition_failed_transition_id = create_transition("", "", self.net) - iteration_step_done_transition_id = create_transition("", "", self.net) + condition_passed_transition_id = create_transition("", "", self.net, group_id) + condition_failed_transition_id = create_transition("", "", self.net, group_id) + iteration_step_done_transition_id = create_transition("", "", self.net, group_id) self.net.add_input(loop_id, condition_passed_transition_id, Value(1)) self.net.add_input(loop_statements_id, condition_passed_transition_id, Value(1)) @@ -478,12 +640,28 @@ def generate_while_loop( self.net.add_input(loop_finished_id, condition_failed_transition_id, Value(1)) self.net.add_output(loop_id, iteration_step_done_transition_id, Value(1)) - loop_done_id = create_place("Loop Done", self.net) + loop_done_id = create_place("Loop Done", self.net, group_id) + + cluster = Cluster( + [ + loop_id, + loop_statements_id, + loop_finished_id, + condition_failed_transition_id, + loop_done_id, + condition_passed_transition_id, + iteration_step_done_transition_id, + ] + ) + + node.cluster.add_child(cluster) + while_loop_node.cluster = cluster self.generate_statements( task_context, loop.statements, condition_passed_transition_id, iteration_step_done_transition_id, + while_loop_node, True, ) @@ -528,7 +706,7 @@ def parse_expression(self, expression: Dict) -> str: ) -def create_place(name: str, net: PetriNet) -> str: +def create_place(name: str, net: PetriNet, group_id: str, cluster: List = []) -> str: """Utility function for creating a place with the snakes module. This function is used to add a place with the given name and to add labels for @@ -537,17 +715,20 @@ def create_place(name: str, net: PetriNet) -> str: Args: name: A string representing the displayed name of the place. net: The petri net instance this place should be added to. + group_id: Returns: A UUID as string for the added place. """ place_id = str(uuid.uuid4()) - net.add_place(Place(place_id, [])) - net.place(place_id).label(name=name) + net.add_place(Place(place_id, []), cluster=cluster) + net.place(place_id).label(name=name, group_id=group_id) return place_id -def create_transition(transition_name: str, transition_type: str, net: PetriNet) -> str: +def create_transition( + transition_name: str, transition_type: str, net: PetriNet, group_id: str +) -> str: """Utility function for creating a transition with the snakes module. This function is used to add a transition with the given name and to add labels for @@ -556,11 +737,16 @@ def create_transition(transition_name: str, transition_type: str, net: PetriNet) Args: transition_name: A string representing the displayed name of the transition. net: The petri net instance this transition should be added to. + group_id: Returns: A UUID as string for the added transition. """ transition_id = str(uuid.uuid4()) net.add_transition(Transition(transition_id)) - net.transition(transition_id).label(name=transition_name, transitionType=transition_type) + net.transition(transition_id).label( + name=transition_name, + transitionType=transition_type, + group_id=group_id, + ) return transition_id diff --git a/pfdl_scheduler/petri_net/logic.py b/pfdl_scheduler/petri_net/logic.py index c2b26b1..df7fae6 100644 --- a/pfdl_scheduler/petri_net/logic.py +++ b/pfdl_scheduler/petri_net/logic.py @@ -7,6 +7,7 @@ """Contains the PetriNetLogic class.""" # standard libraries +import json from typing import Dict # 3rd party packages @@ -32,7 +33,9 @@ class PetriNetLogic: transition_dict: A reference to the dict in the generator which maps the ids to callbacks. """ - def __init__(self, petri_net_generator: PetriNetGenerator, draw_net: bool = True): + def __init__( + self, petri_net_generator: PetriNetGenerator, draw_net: bool = True, file_name: str = "" + ): """Initialize the object. Args: @@ -43,18 +46,24 @@ def __init__(self, petri_net_generator: PetriNetGenerator, draw_net: bool = True self.petri_net: PetriNet = petri_net_generator.net self.draw_net: bool = draw_net self.transition_dict: Dict = self.petri_net_generator.transition_dict + self.file_name = file_name - def draw_petri_net(self, name: str, petri_net: PetriNet) -> None: + def draw_petri_net(self) -> None: """Saves the given petri net as an image in the current working directory. Args: name: The name of the image. petri_net: The petri net instance that should be drawn. """ - file_path = "./temp/" + name + + file_path = "./temp/" + self.file_name if self.draw_net: - draw_petri_net(petri_net, file_path) + draw_petri_net(self.petri_net, file_path) + draw_petri_net(self.petri_net, file_path, ".dot") + with open(file_path + ".dot", "a") as file: + file.write("\ncall_tree:") + file.write(json.dumps(self.petri_net_generator.tree.toJSON(), indent=4)) def evaluate_petri_net(self) -> None: """Tries to fire every transition as long as all transitions @@ -63,7 +72,7 @@ def evaluate_petri_net(self) -> None: index = 0 transitions = list(self.petri_net._trans) - while index < len(self.petri_net._trans): + while index < len(transitions): transition_id = transitions[index] if self.petri_net.transition(transition_id).enabled(Value(1)): @@ -72,13 +81,12 @@ def evaluate_petri_net(self) -> None: temp = None for callback in callbacks: - # parallel loop functionallity stop evaluation + # parallel loop functionality stop evaluation if callback.func.__name__ == "on_parallel_loop_started": temp = callback callbacks.remove(temp) if temp: - # self.draw_petri_net(self.petri_net.name, self.petri_net) for callback in list(callbacks): callback() callbacks.remove(callback) @@ -96,7 +104,7 @@ def evaluate_petri_net(self) -> None: else: index = index + 1 - self.draw_petri_net(self.petri_net.name, self.petri_net) + self.draw_petri_net() def fire_event(self, event: Event) -> bool: """Adds a token to the corresponding place of the event in the petri net. @@ -115,7 +123,7 @@ def fire_event(self, event: Event) -> bool: if self.petri_net.has_place(name_in_petri_net): self.petri_net.place(name_in_petri_net).add(1) - self.draw_petri_net(self.petri_net.name, self.petri_net) + self.draw_petri_net() self.evaluate_petri_net() return True diff --git a/pfdl_scheduler/scheduler.py b/pfdl_scheduler/scheduler.py index 8ba64ca..70581f7 100644 --- a/pfdl_scheduler/scheduler.py +++ b/pfdl_scheduler/scheduler.py @@ -25,7 +25,7 @@ from pfdl_scheduler.utils.parsing_utils import load_file from pfdl_scheduler.utils.parsing_utils import parse_file -from pfdl_scheduler.petri_net.generator import PetriNetGenerator +from pfdl_scheduler.petri_net.generator import Node, PetriNetGenerator from pfdl_scheduler.petri_net.logic import PetriNetLogic from pfdl_scheduler.scheduling.event import Event @@ -76,7 +76,7 @@ def __init__( pfdl_file_path: str, generate_test_ids: bool = False, draw_petri_net: bool = True, - scheduler_id: str = str(uuid.uuid4()), + scheduler_id: str = "", dashboard_host_address: str = "", ) -> None: """Initialize the object. @@ -93,7 +93,10 @@ def __init__( scheduler_id: A unique ID to identify the Scheduer / Production Order dashboard_host_address: The address of the Dashboard (if existing) """ - self.scheduler_id = scheduler_id + if scheduler_id == "": + self.scheduler_id: str = str(uuid.uuid4()) + else: + self.scheduler_id: str = scheduler_id self.running: bool = False self.pfdl_file_valid: bool = False self.process: Process = None @@ -105,28 +108,30 @@ def __init__( self.awaited_events: List[Event] = [] self.generate_test_ids: bool = generate_test_ids self.test_id_counters: List[int] = [0, 0] - self.pfdl_file_valid, self.process = parse_file(pfdl_file_path) + self.pfdl_file_valid, self.process, pfdl_string = parse_file(pfdl_file_path) if self.pfdl_file_valid: self.petri_net_generator = PetriNetGenerator( "", - used_in_extension=False, generate_test_ids=generate_test_ids, draw_net=draw_petri_net, + file_name=self.scheduler_id, ) self.register_for_petrinet_callbacks() self.petri_net_generator.generate_petri_net(self.process) - self.petri_net_logic = PetriNetLogic(self.petri_net_generator, draw_petri_net) + self.petri_net_logic = PetriNetLogic( + self.petri_net_generator, draw_petri_net, file_name=self.scheduler_id + ) awaited_event = Event(event_type=START_PRODUCTION_TASK, data={}) self.awaited_events.append(awaited_event) self.observers: List[Observer] = [] # enable logging - self.attach(LogEntryObserver()) + self.attach(LogEntryObserver(self.scheduler_id)) if dashboard_host_address != "": - self.attach(DashboardObserver(dashboard_host_address, self.scheduler_id)) + self.attach(DashboardObserver(dashboard_host_address, self.scheduler_id, pfdl_string)) def attach(self, observer: Observer) -> None: """Attach (add) an observer object to the observers list.""" @@ -400,17 +405,20 @@ def on_parallel_loop_started( parallel_loop_started, first_transition_id: str, second_transition_id: str, + node: Node, ) -> None: """Executes Scheduling logic when a Parallel Loop is started.""" task_count = self.get_loop_limit(loop, task_context) + # generate parallel tasks in petri net if task_count > 0: - for i in range(task_count): + for i in range(int(task_count)): self.petri_net_generator.generate_task_call( parallelTask, task_context, first_transition_id, second_transition_id, + node, False, ) @@ -424,7 +432,6 @@ def on_parallel_loop_started( first_transition_id, second_transition_id ) - # generate parallel tasks in petri net self.petri_net_generator.remove_place_on_runtime(parallel_loop_started) # start evaluation of net again @@ -443,11 +450,16 @@ def on_task_finished(self, task_api: TaskAPI) -> None: """Executes Scheduling logic when a Task is finished.""" for callback in self.task_callbacks.task_finished: callback(task_api) + + order_finished = False if task_api.task.name == "productionTask": self.running = False + order_finished = True + self.petri_net_logic.draw_petri_net() + self.notify(NotificationType.PETRI_NET, self.scheduler_id) log_entry = "Task " + task_api.task.name + " with UUID '" + task_api.uuid + "' finished." - self.notify(NotificationType.LOG_EVENT, (log_entry, logging.INFO, True)) + self.notify(NotificationType.LOG_EVENT, (log_entry, logging.INFO, order_finished)) def register_for_petrinet_callbacks(self) -> None: """Register scheduler callback functions in the petri net.""" diff --git a/pfdl_scheduler/utils/dashboard_observer.py b/pfdl_scheduler/utils/dashboard_observer.py index 34bcf9a..518927f 100644 --- a/pfdl_scheduler/utils/dashboard_observer.py +++ b/pfdl_scheduler/utils/dashboard_observer.py @@ -22,6 +22,19 @@ from pfdl_scheduler.api.observer_api import NotificationType from pfdl_scheduler.api.observer_api import Observer +# constants and variables +PETRI_NET_FILE_LOCATION = "temp/" # the location of the petri net file +PETRI_NET_TYPE = "dot" # the format of the sent petri net file + +# http routes +ORDER_ROUTE = "/pfdl_order" +PETRI_NET_ROUTE = "/petri_net" +LOG_EVENT_ROUTE = "/log_event" + +# order status +ORDER_STARTED = 2 +ORDER_FINISHED = 4 + message_queue = queue.Queue() lock = threading.Lock() @@ -39,11 +52,13 @@ class DashboardObserver(Observer): The Observer will send a post request to the dashboard with the data. """ - def __init__(self, host: str, scheduler_id: str): - self.host = host - self.scheduler_id = scheduler_id - current_timestamp = int(round(datetime.timestamp(datetime.now()))) - self.starting_date = current_timestamp + def __init__(self, host: str, scheduler_id: str, pfdl_string: str) -> None: + self.host: str = host + self.scheduler_id: str = scheduler_id + current_timestamp: int = int(round(datetime.timestamp(datetime.now()))) + self.starting_date: int = current_timestamp + self.pfdl_string: str = pfdl_string + self.order_finished: bool = False threading.Thread(target=send_post_requests, daemon=True).start() @@ -51,44 +66,53 @@ def __init__(self, host: str, scheduler_id: str): "order_id": scheduler_id, "starting_date": current_timestamp, "last_update": current_timestamp, - "status": 2, + "status": ORDER_STARTED, + "pfdl_string": self.pfdl_string, } - message_queue.put((self.host + "/pfdl_order", request_data)) + message_queue.put((self.host + ORDER_ROUTE, request_data)) def update(self, notification_type: NotificationType, data: Any) -> None: if notification_type == NotificationType.PETRI_NET: - content = "" - with open("temp/petri_net.dot") as file: - content = file.read() - - request_data = { - "order_id": self.scheduler_id, - "content": content, - "type_pn": "dot", - } - message_queue.put((self.host + "/petri_net", request_data)) + if not self.order_finished: + content = "" + with open( + PETRI_NET_FILE_LOCATION + self.scheduler_id + "." + PETRI_NET_TYPE + ) as file: + content = file.read() + + request_data = { + "order_id": self.scheduler_id, + "content": content, + "type_pn": PETRI_NET_TYPE, + } + message_queue.put((self.host + PETRI_NET_ROUTE, request_data)) elif notification_type == NotificationType.LOG_EVENT: log_event = data[0] log_level = data[1] order_finished = data[2] + + if order_finished: + self.order_finished = True + request_data = { "order_id": self.scheduler_id, "log_message": log_event, "log_date": int(round(datetime.timestamp(datetime.now()))), "log_level": log_level, } - message_queue.put((self.host + "/log_event", request_data)) + message_queue.put((self.host + LOG_EVENT_ROUTE, request_data)) - order_status = 2 + order_status = ORDER_STARTED if order_finished: - order_status = 4 + order_status = ORDER_FINISHED request_data = { "order_id": self.scheduler_id, "starting_date": self.starting_date, "last_update": int(round(datetime.timestamp(datetime.now()))), "status": order_status, + "pfdl_string": self.pfdl_string, } - message_queue.put((self.host + "/petri_net", request_data)) + message_queue.put((self.host + ORDER_ROUTE, request_data)) diff --git a/pfdl_scheduler/utils/log_entry_observer.py b/pfdl_scheduler/utils/log_entry_observer.py index 9bc1de6..529d08c 100644 --- a/pfdl_scheduler/utils/log_entry_observer.py +++ b/pfdl_scheduler/utils/log_entry_observer.py @@ -17,6 +17,12 @@ logging.getLogger("requests").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) +# constants +LOG_FILE_LOCATION = "temp/" +LOG_FILE_FORMAT = ".log" +LOG_FILE_ENCODING = "utf-8" +LOG_FILE_FILEMODE = "w" + class LogEntryObserver(Observer): """LogEntryObserver for receiving logging information from the Scheduler. @@ -24,12 +30,12 @@ class LogEntryObserver(Observer): LogLevels are based of https://docs.python.org/3/library/logging.html#logging-levels """ - def __init__(self): + def __init__(self, scheduler_id: str): logging.basicConfig( - filename="temp/scheduler.log", - encoding="utf-8", + filename=LOG_FILE_LOCATION + scheduler_id + LOG_FILE_FORMAT, + encoding=LOG_FILE_ENCODING, level=logging.DEBUG, - filemode="w", + filemode=LOG_FILE_FILEMODE, ) def update(self, notification_type: NotificationType, data: Any) -> None: diff --git a/pfdl_scheduler/utils/parsing_utils.py b/pfdl_scheduler/utils/parsing_utils.py index c41ee2e..8e29466 100644 --- a/pfdl_scheduler/utils/parsing_utils.py +++ b/pfdl_scheduler/utils/parsing_utils.py @@ -54,8 +54,6 @@ def parse_string( tree = parser.program() - write_tokens_to_file(token_stream) - if error_handler.has_error() is False: visitor = PFDLTreeVisitor(error_handler) process = visitor.visit(tree) @@ -69,17 +67,18 @@ def parse_string( return (False, None) -def parse_file(file_path: str) -> Tuple[bool, Union[None, Process]]: +def parse_file(file_path: str) -> Tuple[bool, Union[None, Process], str]: """Loads the content of the file from the given path and calls the parse_string function. Args: file_path: The path to the PFDL file. Returns: - A boolan indicating validity of the PFDL file and the process object if so, otherwise None. + A boolan indicating validity of the PFDL file, the content of the file, and the + process object if so, otherwise None. """ pfdl_string = load_file(file_path) - return parse_string(pfdl_string, file_path) + return *parse_string(pfdl_string, file_path), pfdl_string def write_tokens_to_file(token_stream: CommonTokenStream) -> None: From 5258a4b402b17a8feff5eed3dcfbf8afe270a39b Mon Sep 17 00:00:00 2001 From: Lars Toenning Date: Tue, 5 Sep 2023 09:24:39 +0200 Subject: [PATCH 02/30] Fix typos in documentation (#26) --- docs/getting_started/installation.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/getting_started/installation.md b/docs/getting_started/installation.md index 483c375..43fd71b 100644 --- a/docs/getting_started/installation.md +++ b/docs/getting_started/installation.md @@ -200,7 +200,7 @@ You can view the source code of the whole file here: Within the scheduler_demo.py file the Scheduler is created and started with the given PFDL file. The start method of the interface registers the callback functions to the Scheduler so they are called when specific events occur. After the registration the scheduler is started and then executed in a while loop. -In this loop an input to the Scheduler is emulated with the help of pythons input function. +In this loop an input to the Scheduler is emulated with the help of Python's input function. The given event of the user is then fired to the Scheduler. ```python3 linenums="1" @@ -226,7 +226,7 @@ There are four methods which serve as callback functions for the scheduler. The `cb_task_started` method for example gets called when a Task is started in the Scheduler. A [TaskAPI](../scheduler/api.md#pfdl_scheduler.api.task_api.TaskAPI) object is passed to the function which gives context information about the started Task. In this simple example the UUID of the started Task and its name are being printed to the console. -The UUID gets created when the Task is started and identifies the specific instane of the Task. +The UUID gets created when the Task is started and identifies the specific instance of the Task. If the same [Task](../scheduler/developer_reference.md#pfdl_scheduler.model.task) is called multiple times, for example in a loop, each instance get a unique ID for identification. The TaskAPI object consists of the called Task (or: Task definition), the unique ID and the TaskContext which is also a TaskAPI object. This TaskContext represents the calling Task (can also be none, if the TaskAPI object describes the `productionTask`). @@ -283,9 +283,9 @@ Before you start the Scheduler we explain in short the used PFDL file. In this simple scenario there is only the service `Painting`. This service could command a painting machine to paint the piece that is currently on it. To customize the painting process a `Color` parameter is passed to the service. -The Struct `Color` whihch is used as a description for Color variables consists of the color name and a RGB value in form of an array. +The Struct `Color`, which is used as a description for Color variables, consists of the color name and a RGB value in form of an array. As the painting machine can measure the wetness of the piece, the service will return a `PaintResult` which contains the wetness. -The whole production order starts with the productionTask and so from it the Task `painTask` is called which executes the `Painting` service. +The whole production order starts with the `productionTask` and so from it the Task `paintingTask` is called which executes the `Painting` service. The example PFDL file looks like the following: ```text linenums="1" @@ -328,11 +328,11 @@ A token should be in the `Painting started` place. The demo waits for user input. As it is for testing purposes only, the syntax for firing an event is simplified. You can copy the UUID of the started service and seperate it with a comma and write `service_finished`. -This will tell the scheduler that the service with the given uuid is finished. -Substitute the uuid with the one of the service and enter the command. +This will tell the scheduler that the service with the given UUID is finished. +Substitute the UUID with the one of the service and enter the command. ```text - ,service_finished + ,service_finished ``` After entering this command the token should be in the last place of the petri net as the service has finished now and no other statement is inside the `paintingTask`. From 5e7fc59ebea1452025ff3243274af8f659fe314f Mon Sep 17 00:00:00 2001 From: Maximilian Hoerstrup Date: Tue, 19 Sep 2023 09:23:53 +0200 Subject: [PATCH 03/30] Add file path as argument to argparser --- pfdl_scheduler/extension.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pfdl_scheduler/extension.py b/pfdl_scheduler/extension.py index 916c01f..f38cb29 100644 --- a/pfdl_scheduler/extension.py +++ b/pfdl_scheduler/extension.py @@ -21,6 +21,7 @@ def main(): parser = argparse.ArgumentParser( description="A program that shall be executed in the VS Code extension which has a string containing a PFDL program as input as well as the name of the corresponding file." ) + parser.add_argument("file_path", type=str, help="The requesters filepath.") parser.add_argument("pfdl_string", type=str, help="The content of a given PFDL file as string.") parser.add_argument("file_name", type=str, help="The name of the given PFDL file.") args = parser.parse_args() From 68ab87a7add17999e826a093aea8a3ee0f0effd4 Mon Sep 17 00:00:00 2001 From: Marius Brehler Date: Wed, 11 Oct 2023 16:09:55 +0200 Subject: [PATCH 04/30] Add SPDX tags (#28) --- README.md | 4 ++++ docs/examples/concurrency_and_synchronization.md | 4 ++++ docs/examples/control_structures.md | 4 ++++ docs/examples/introduction.md | 4 ++++ docs/examples/task_input_and_output.md | 4 ++++ docs/getting_started/architecture.md | 4 ++++ docs/getting_started/ci_cd.md | 4 ++++ docs/getting_started/glossary.md | 4 ++++ docs/getting_started/installation.md | 4 ++++ docs/index.md | 4 ++++ docs/pfdl/comments.md | 4 ++++ docs/pfdl/condition.md | 4 ++++ docs/pfdl/introduction.md | 4 ++++ docs/pfdl/loop.md | 4 ++++ docs/pfdl/parallel.md | 4 ++++ docs/pfdl/service.md | 4 ++++ docs/pfdl/struct.md | 4 ++++ docs/pfdl/task.md | 4 ++++ docs/scheduler/api.md | 4 ++++ docs/scheduler/dashboard.md | 4 ++++ docs/scheduler/developer_reference.md | 4 ++++ docs/scheduler/index.md | 4 ++++ docs/scheduler/model_classes.md | 4 ++++ docs/scheduler/scheduler_class.md | 4 ++++ docs/stylesheets/extra.css | 4 ++++ 25 files changed, 100 insertions(+) diff --git a/README.md b/README.md index 9112779..c7cb499 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,7 @@ +
Production Flow Description Language diff --git a/docs/examples/concurrency_and_synchronization.md b/docs/examples/concurrency_and_synchronization.md index db37165..71817bb 100644 --- a/docs/examples/concurrency_and_synchronization.md +++ b/docs/examples/concurrency_and_synchronization.md @@ -1,3 +1,7 @@ + # Concurrency and Synchronization As already stated in the [parallel](../pfdl/parallel.md) section, there is no keyword for the synchronization. The next statement after a parallel block is only started if all concurrently running tasks are finished. diff --git a/docs/examples/control_structures.md b/docs/examples/control_structures.md index a0627fa..14c67b3 100644 --- a/docs/examples/control_structures.md +++ b/docs/examples/control_structures.md @@ -1,3 +1,7 @@ + # Control structures ## Condition diff --git a/docs/examples/introduction.md b/docs/examples/introduction.md index 7bddf2b..b7c6d7a 100644 --- a/docs/examples/introduction.md +++ b/docs/examples/introduction.md @@ -1,3 +1,7 @@ + # Introduction In this section complete PFDL programs are presented to give a better understanding. diff --git a/docs/examples/task_input_and_output.md b/docs/examples/task_input_and_output.md index d0b332e..7a509c6 100644 --- a/docs/examples/task_input_and_output.md +++ b/docs/examples/task_input_and_output.md @@ -1,3 +1,7 @@ + # Task input and output Tasks can define an Input and an Output to pass data when calling each other. diff --git a/docs/getting_started/architecture.md b/docs/getting_started/architecture.md index ee9d9d3..2d932e3 100644 --- a/docs/getting_started/architecture.md +++ b/docs/getting_started/architecture.md @@ -1,3 +1,7 @@ +