-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparse_drawio.py
349 lines (298 loc) · 11.5 KB
/
parse_drawio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
"""Parsing xml produced by draw.io to create a PySD model."""
import argparse
from datetime import datetime
import logging
from os import PathLike
import subprocess
from typing import NewType
from xml.sax.handler import feature_namespaces
from xml.sax import make_parser
from pathlib import Path
from xml.sax.handler import ContentHandler
import pysd
from pysd.translators.structures import abstract_expressions, abstract_model
from pysd.builders.python.python_model_builder import ModelBuilder
from drawio_pysd.equations_parsing import equation_2_ast, var_name_to_safe_name
from drawio_pysd.custom_asts import LinearDependencyStructure
ElementId = NewType("ElementId", str)
class PysdElementsHandler(ContentHandler):
"""Content handler for the xml file from drawio."""
subscripts: dict[ElementId, abstract_model.AbstractSubscriptRange]
elements: dict[ElementId, abstract_model.AbstractElement]
references: dict[ElementId, str]
connexions: list[tuple[ElementId, ElementId]]
safe_names: dict[str, ElementId]
def __init__(self):
super().__init__()
self.elements = {}
self.subscripts = {}
self.connexions = []
self.references = {}
self.safe_names = {}
def startElementNS(self, name, qname, attrs):
"""Start reading a element from the xml."""
match name[1]:
case "UserObject":
# User objects are the general cells from drawio dedicated to PySD
self.create_element(attrs)
case "mxCell":
try:
# These are other cells or edges
is_edge = bool(int(attrs.getValueByQName("edge")))
except KeyError:
try:
# These are other cells or edges
is_edge = not bool(int(attrs.getValueByQName("vertex")))
except KeyError:
is_edge = False
if is_edge:
self.process_mxCell_edge(attrs)
else:
self.process_mxCell_vertex(attrs)
def process_mxCell_edge(self, attrs):
"""Process the mxCell edges.
edges connect elements
"""
try:
source = attrs.getValueByQName("source")
target = attrs.getValueByQName("target")
self.connexions.append((source, target))
except KeyError:
pass
def process_mxCell_vertex(self, attrs):
"""Process the mxCell vertices.
vertices can be parts of subscripts (values of the subscripts)
"""
try:
parent = attrs.getValueByQName("parent")
if parent in self.subscripts:
self.subscripts[parent].subscripts.append(
attrs.getValueByQName("value")
)
except KeyError:
pass
def create_element(self, attrs):
"""Create an element from the xml attributes."""
name = attrs.getValueByQName("Name").strip()
try:
equation = attrs.getValueByQName("_equation")
except KeyError:
equation = ""
# Check if a key is in the attrs
try:
pysd_type = attrs.getValueByQName("_pysd_type")
except KeyError:
pysd_type = "AbstractElement"
id = attrs.getValueByQName("id")
if pysd_type == "Reference":
# Reference dont have ast but they have to point to
# the correct element
self.references[id] = name
return
try:
ast = self.create_ast(pysd_type, equation, attrs)
if ast is None:
return
except Exception as exp:
raise ValueError(
f"Error while creating the abstract structure"
f" for '{pysd_type}: {name}': {exp}"
) from exp
if isinstance(ast, abstract_model.AbstractSubscriptRange):
self.subscripts[id] = ast
return
try:
units = attrs.getValueByQName("Units")
except KeyError:
units = ""
try:
doc = attrs.getValueByQName("Doc")
except KeyError:
doc = ""
# Check that the name is unique in safe_name space
safe_name = var_name_to_safe_name(name)
if safe_name in self.safe_names:
raise ValueError(
f"Variables '{name}' and '{self.safe_names[safe_name]}' "
f"have the same safe name '{safe_name}'.\n"
"Please change one of the names."
)
self.safe_names[safe_name] = name
element = abstract_model.AbstractElement(
name=name,
components=[
abstract_model.AbstractComponent(
subscripts=([], []),
ast=ast,
)
],
documentation=doc,
units=units,
)
self.elements[id] = element
def create_ast(self, pysd_type, equation, attrs):
match pysd_type:
case "IntegStructure":
return abstract_expressions.IntegStructure(
initial=equation_2_ast(attrs.getValueByQName("_initial")),
flow=equation_2_ast(equation),
)
case "AbstractElement" | "AbstractComponent":
return equation_2_ast(equation)
case "AbstractUnchangeableConstant" | "ControlVar":
return float(attrs.getValueByQName("_initial"))
case "Subscript":
ast = abstract_model.AbstractSubscriptRange(
name=attrs.getValueByQName("Name"), subscripts=[], mapping=[]
)
return ast
case "LinearDependencyStructure":
return LinearDependencyStructure(
initial=equation_2_ast(attrs.getValueByQName("_initial")),
variable=equation_2_ast(equation),
)
case "Sink":
return None
case _:
raise NotImplementedError(f"pysd_type '{pysd_type}' not implemented.")
def _add_subscripts_from_connexions(self):
"""Add the subscripts to the elements based on the connexions."""
for source, target in self.connexions:
if source in self.subscripts and target in self.elements:
for c in self.elements[target].components:
c.subscripts[0].append(self.subscripts[source].name)
def post_parsing(self):
"""Modify some elements after the parsing.
Must be called after the parsing.
"""
# create a mapping variable name -> id
# to replace the references
mapping = {v.name: k for k, v in self.elements.items()}
# replaces references in the connexions
for i, (source, target) in enumerate(self.connexions):
if source in self.references:
self.connexions[i] = (mapping[self.references[source]], target)
if target in self.references:
self.connexions[i] = (source, mapping[self.references[target]])
self._add_subscripts_from_connexions()
def generate_abstract_model(file_path: PathLike) -> abstract_model.AbstractModel:
"""Generate an abstract model from the drawio file."""
file_path = Path(file_path)
if not file_path.is_file():
raise FileNotFoundError(f"{file_path}")
# Create the xml parser
parser = make_parser()
parser.setFeature(feature_namespaces, True)
elements_handler = PysdElementsHandler()
parser.setContentHandler(elements_handler)
parser.parse(file_path)
elements_handler.post_parsing()
model = abstract_model.AbstractModel(
file_path,
sections=(
abstract_model.AbstractSection(
name="__main__",
path=file_path.with_suffix(".py"),
type="main",
params=[],
returns=[],
subscripts=list(elements_handler.subscripts.values()),
elements=list(elements_handler.elements.values()),
constraints=(),
test_inputs=(),
split=False,
views_dict=None,
),
),
)
return model
if __name__ == "__main__":
# Create an argument reader with one single argument being the path of the file
arg_parser = argparse.ArgumentParser()
# Add the docstring of this py file as the description of the argument parser
arg_parser.description = __doc__
arg_parser.add_argument("file_path", type=str, help="Path of the file to parse")
# Add a run option to run the file only if selected
arg_parser.add_argument(
"--run", action="store_true", help="Run the file after parsing it"
)
# Add argument 'abs' to save the abstract model
arg_parser.add_argument(
"--abs", action="store_true", help="Save the abstract model"
)
# Add log and debug options
arg_parser.add_argument(
"-d",
"--debug",
help="Debugging statements",
action="store_const",
dest="loglevel",
const=logging.DEBUG,
default=logging.WARNING,
)
arg_parser.add_argument(
"-v",
"--verbose",
"--info",
help="Write out the output",
action="store_const",
dest="loglevel",
const=logging.INFO,
)
# Add a plot parameters which arguments will be plotted
arg_parser.add_argument(
"-p",
"--plot",
nargs="+",
type=str,
default=[],
help="Plot parameters",
dest="plot_params",
action="append",
)
args = arg_parser.parse_args()
# Set the logging value
logging.basicConfig()
logger = logging.getLogger("drawio_pysd")
logger.setLevel(args.loglevel)
logger.debug(args)
file_path = Path(args.file_path)
model = generate_abstract_model(file_path)
print(f"File {file_path} parsed and successfully converted to {model}")
if args.abs:
abs_file = file_path.with_name(file_path.name + ".abs")
with open(abs_file, "w", encoding="utf-8") as f:
# add a header to explain what is in the file
f.write(
f"# This file contains the PySD abstract model of the file {file_path}\n"
)
# add creation date and time
f.write(f"# Created on {datetime.now()}\n")
f.write(str(model.sections))
# format the file calling black
# TODO: call directly the black API instead of calling it as a subprocess
subprocess.run(["black", abs_file])
py_file = ModelBuilder(model).build_model()
print(f"File {file_path} parsed and successfully converted to {py_file}")
if args.run:
model = pysd.load(py_file)
results_df = model.run()
if args.plot_params:
# Plot all the variables in the results
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
x = results_df.index
for param_list in args.plot_params:
for param in param_list:
if param not in results_df.columns:
logger.warning(f'Parameter {param} not in the simulation outputs')
continue
ax.plot(x, results_df[param], label=f"{param}")
# Supress the last created ax that is useless
ax.legend()
plt.show()
else:
if args.plot_params:
logger.warning(
"Cannot plot if the model is not run, add --run in the command line args"
)