Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution Graph and Computation Optimization #27

Merged
merged 7 commits into from
Jan 30, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Code Improvement:
* Enabled circuit processing and circuit evaluation using execution graph
* Updated and passed all the corresponding tests
* Updated requirements.txt and setup.py
* Bumped up version number
* Fixed some docstrings
yuefan98 committed Jan 14, 2025
commit f752d85f50a4b3eb7dafe22a57c6eceeea071000
185 changes: 179 additions & 6 deletions nleis/fitting.py
Original file line number Diff line number Diff line change
@@ -7,6 +7,9 @@
from impedance.models.circuits.elements import get_element_from_name
from impedance.models.circuits.fitting import check_and_eval, rmse

import networkx as nx
import re

# Note: a lot of codes are directly adopted from impedance.py.,
# which is designed to be enable a easy integration in the future,
# but now we are keep them separated to ensure the stable performance
@@ -209,6 +212,7 @@ def set_default_bounds(circuit, constants={}):

def circuit_fit(frequencies, impedances, circuit, initial_guess, constants={},
bounds=None, weight_by_modulus=False, global_opt=False,
graph=False,
**kwargs):
""" Main function for fitting an equivalent circuit to data.

@@ -279,6 +283,8 @@ def circuit_fit(frequencies, impedances, circuit, initial_guess, constants={},
if bounds is None:
bounds = set_default_bounds(circuit, constants=constants)

cg = CircuitGraph(circuit, constants)

if not global_opt:
if 'maxfev' not in kwargs:
kwargs['maxfev'] = int(1e5)
@@ -289,10 +295,17 @@ def circuit_fit(frequencies, impedances, circuit, initial_guess, constants={},
if weight_by_modulus:
abs_Z = np.abs(Z)
kwargs['sigma'] = np.hstack([abs_Z, abs_Z])

popt, pcov = curve_fit(wrapCircuit(circuit, constants), f,
np.hstack([Z.real, Z.imag]),
p0=initial_guess, bounds=bounds, **kwargs)
if graph:
popt, pcov = curve_fit(cg.compute_long, f,
np.hstack([Z.real, Z.imag]),
p0=initial_guess,
bounds=bounds,
**kwargs,
)
else:
popt, pcov = curve_fit(wrapCircuit(circuit, constants), f,
np.hstack([Z.real, Z.imag]),
p0=initial_guess, bounds=bounds, **kwargs)

# Calculate one standard deviation error estimates for fit parameters,
# defined as the square root of the diagonal of the covariance matrix.
@@ -320,6 +333,9 @@ def opt_function(x):
return rmse(wrapCircuit(circuit, constants)(f, *x),
np.hstack([Z.real, Z.imag]))

def opt_function_graph(x):
return rmse(cg(f, *x), np.hstack([Z.real, Z.imag]))

class BasinhoppingBounds(object):
""" Adapted from the basinhopping documetation
https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.basinhopping.html
@@ -337,8 +353,12 @@ def __call__(self, **kwargs):

basinhopping_bounds = BasinhoppingBounds(xmin=bounds[0],
xmax=bounds[1])
results = basinhopping(opt_function, x0=initial_guess,
accept_test=basinhopping_bounds, **kwargs)
if graph:
results = basinhopping(opt_function_graph, x0=initial_guess,
accept_test=basinhopping_bounds, **kwargs)
else:
results = basinhopping(opt_function, x0=initial_guess,
accept_test=basinhopping_bounds, **kwargs)
popt = results.x

# Calculate perror
@@ -583,3 +603,156 @@ def extract_circuit_elements(circuit):
current_element.append(char)
extracted_elements.append(''.join(current_element))
return extracted_elements

# Circuit Graph for computation optimization
# Special Thanks to Jake Anderson for the original code


class CircuitGraph:
# regular expression to find parallel and difference blocks
_parallel_difference_block_expression = re.compile(r'(?:p|d)\([^()]*\)')
# _parallel_difference_block_expression = re.compile(r'p\([^()]*\)')

# regular expression to remove whitespace
_whitespce = re.compile(r"\s+")

def __init__(self, circuit, constants=None):
# remove all whitespace from the circuit string
self.circuit = self._whitespce.sub("", circuit)
# parse the circuit string and initialize the graph
self.parse_circuit()
# compute the execution order of the graph
self.execution_order = list(nx.topological_sort(self.graph))
# initialize the constants dictionary
self.constants = constants if constants is not None else dict()

def parse_circuit(self):
# initialize the node counters for each type of block
self.snum = 1
self.pnum = 1
self.dnum = 1
# initialize the circuit string to be parsed
parsing_circuit = self.circuit

# determine all of the base elements, their functions
# and add them to the graph
element_name = extract_circuit_elements(parsing_circuit)
element_func = [
circuit_elements[get_element_from_name(e)] for e in element_name
]
# graph initialization
self.graph = nx.DiGraph()
# add nodes to the graph
for e, f in zip(element_name, element_func):
self.graph.add_node(e, Z=f)

# find unnested parallel and difference blocks
pd_blocks = self._parallel_difference_block_expression.findall(
parsing_circuit)

while len(pd_blocks) > 0:
# add parallel or difference blocks to the graph
# unnesting each time around the loop
for pd in pd_blocks:
operator = pd[0]
pd_elem = pd[2:-1].split(",")
if operator == "p":
pnode = f"p{self.pnum}"
self.pnum += 1
self.graph.add_node(pnode, Z=circuit_elements["p"])
for elem in pd_elem:
elem = self.add_series_elements(elem)
self.graph.add_edge(elem, pnode)
parsing_circuit = parsing_circuit.replace(pd, pnode)
elif operator == "d":
dnode = f"d{self.dnum}"
self.dnum += 1
self.graph.add_node(dnode, Z=circuit_elements["d"])
for elem in pd_elem:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if statements are really only needed to get the number to use. You could simplify it a lot with

if operator == "p":
  nnum = self.pnum
  self.pnum += 1
elif operator == "d":
  nnum = self.dnum
  self.dnum += 1

node = f"{operator}{nnum}"
self.graph.add_node(node, Z=circuit_elements[operator])
for elem in pd_elem:
  elem = self.add_series_elements(elem)
  self.graph.add_edge(elem, node)
parsing_circuit = parsing_circuit.replace(pd, node)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for spending time reviewing my implementation. I agree that this will make the code cleaner and simpler. I will make the changes accordingly.

elem = self.add_series_elements(elem)
self.graph.add_edge(elem, dnode)
parsing_circuit = parsing_circuit.replace(pd, dnode)
pd_blocks = self._parallel_difference_block_expression.findall(
parsing_circuit)

# pick up any top line series connections
self.add_series_elements(parsing_circuit)

# assign layers to the nodes
for layer, nodes in enumerate(nx.topological_generations(self.graph)):
for n in nodes:
self.graph.nodes[n]["layer"] = layer
# function to add series elements to the graph

def add_series_elements(self, elem):
selem = elem.split("-")
if len(selem) > 1:
node = f"s{self.snum}"
self.snum += 1
self.graph.add_node(node, Z=circuit_elements["s"])
for n in selem:
self.graph.add_edge(n, node)
return node

# if there isn't a series connection in elem just return it unchanged
return selem[0]

# function to visualize the graph
def visualize_graph(self, **kwargs):
pos = nx.multipartite_layout(self.graph, subset_key="layer")
nx.draw_networkx(self.graph, pos=pos, **kwargs)

# function to compute the impedance of the circuit
def compute(self, f, *parameters):
node_results = {}
pindex = 0
for node in self.execution_order:
Zfunc = self.graph.nodes[node]["Z"]
plist = [
node_results[pred] for pred in self.graph.predecessors(node)
]

if len(plist) < 1:
n_params = Zfunc.num_params
for j in range(n_params):
p_name = format_parameter_name(node, j, n_params)
if p_name in self.constants:
plist.append(self.constants[p_name])
else:
plist.append(parameters[pindex])
pindex += 1
node_results[node] = Zfunc(plist, f)
else:
node_results[node] = Zfunc(plist)

return np.squeeze(node_results[node])

# To enable comparision

def __eq__(self, other):
if not isinstance(other, CircuitGraph):
return False
# Compare the internal graph attributes
return (self.graph.nodes == other.graph.nodes
and self.graph.edges == other.graph.edges)

# To enable direct calling

def __call__(self, f, *parameters):
Z = self.compute(f, *parameters)
return np.hstack([Z.real, Z.imag])

def compute_long(self, f, *parameters):
Z = self.compute(f, *parameters)
return np.hstack([Z.real, Z.imag])

def calculate_circuit_length(self):
n_params = [
getattr(Zfunc, "num_params", 0)
for node, Zfunc in self.graph.nodes(data="Z")
]
return np.sum(n_params)


def format_parameter_name(name, j, n_params):
return f"{name}_{j}" if n_params > 1 else f"{name}"
Loading