Skip to content

Commit

Permalink
Introduce an IterationDriver to track optimizer convergence
Browse files Browse the repository at this point in the history
  • Loading branch information
FranzBangar committed Nov 10, 2023
1 parent 7114952 commit 7d65442
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 27 deletions.
131 changes: 108 additions & 23 deletions src/classy_blocks/modify/optimizer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import copy
from typing import List
import dataclasses
from typing import ClassVar, List

import numpy as np
import scipy.optimize

from classy_blocks.mesh import Mesh
from classy_blocks.modify.clamps.clamp import ClampBase
from classy_blocks.modify.grid import Grid
from classy_blocks.modify.junction import Junction
from classy_blocks.util.constants import VSMALL
from classy_blocks.util.constants import VBIG
from classy_blocks.util.tools import report


Expand All @@ -20,13 +20,90 @@ class NoClampError(Exception):
"""Raised when there's no junction defined for a given Clamp"""


@dataclasses.dataclass
class IterationData:
"""Data about a single iteration's progress"""

relaxation: float
initial_quality: float
final_quality: float = VBIG

@property
def improvement(self) -> float:
return self.initial_quality - self.final_quality


class IterationDriver:
"""Bookkeeping: iterations, results, relaxation, quality and whatnot"""

INITIAL_RELAXATION: ClassVar[float] = 0.5

def __init__(self, max_iterations: int, relaxed_iterations: int, tolerance: float):
self.max_iterations = max_iterations
self.relaxed_iterations = relaxed_iterations
self.tolerance = tolerance

self.iterations: List[IterationData] = []

def begin_iteration(self, quality: float) -> IterationData:
iteration = IterationData(self.next_relaxation, quality)
self.iterations.append(iteration)

return iteration

def end_iteration(self, quality: float) -> None:
self.iterations[-1].final_quality = quality

@property
def last_quality(self) -> float:
if len(self.iterations) < 1:
return VBIG

return self.iterations[-1].final_quality

@property
def next_relaxation(self) -> float:
"""Returns the relaxation factor for the next iteration"""
step = (1 - self.INITIAL_RELAXATION) / self.relaxed_iterations
iteration = len(self.iterations)

return min(1, IterationDriver.INITIAL_RELAXATION + step * iteration)

@property
def initial_improvement(self) -> float:
if len(self.iterations) < 1:
return VBIG

return self.iterations[0].improvement

@property
def last_improvement(self) -> float:
if len(self.iterations) < 2:
return self.initial_improvement

return self.iterations[-1].improvement

@property
def converged(self) -> bool:
if len(self.iterations) <= 1:
# At least two iterations are needed
# so that the result of the last can be compared with the first one
return False

if len(self.iterations) >= self.max_iterations:
return True

return self.last_improvement / self.initial_improvement < self.tolerance * self.initial_improvement


class Optimizer:
"""Provides tools for blocking optimization"""

def __init__(self, mesh: Mesh):
def __init__(self, mesh: Mesh, report: bool = True):
self.mesh = mesh
self.grid = Grid(mesh)
self.report = report

self.grid = Grid(mesh)
self.clamps: List[ClampBase] = []

def release_vertex(self, clamp: ClampBase) -> None:
Expand All @@ -48,7 +125,7 @@ def _get_clamp(self, junction: Junction) -> ClampBase:

raise NoClampError

def optimize_clamp(self, clamp: ClampBase, relaxation: float) -> float:
def optimize_clamp(self, clamp: ClampBase, iteration: IterationData) -> float:
"""Move clamp.vertex so that quality at junction is improved;
rollback changes if grid quality decreased after optimization"""
initial_grid_quality = self.grid.quality
Expand Down Expand Up @@ -90,36 +167,44 @@ def fquality(params):
msg += f"{clamp.vertex.index}: {initial_grid_quality:.3e} > {current_grid_quality:.3e}"
report(msg)

clamp.update_params(clamp.params, relaxation)
clamp.update_params(clamp.params, iteration.relaxation)

return initial_grid_quality / current_grid_quality

def optimize_iteration(self, relaxation: float) -> None:
def optimize_iteration(self, iteration: IterationData) -> None:
# gather points that can be moved with optimization
for junction in self.grid.get_ordered_junctions():
try:
clamp = self._get_clamp(junction)
self.optimize_clamp(clamp, relaxation)
self.optimize_clamp(clamp, iteration)
except NoClampError:
continue

def optimize(self, max_iterations: int = 20, tolerance: float = 0.05, initial_relaxation=0.7) -> None:
def optimize(self, max_iterations: int = 20, relaxed_iterations: int = 2, tolerance: float = 0.1) -> None:
"""Move vertices, defined and restrained with Clamps
so that better mesh quality is obtained."""
prev_quality = self.grid.quality
so that better mesh quality is obtained.
Within each iteration, all vertices will be moved, starting with the one with the most influence on quality.
Lower tolerance values"""
driver = IterationDriver(max_iterations, relaxed_iterations, tolerance)

while not driver.converged:
iteration = driver.begin_iteration(self.grid.quality)
self.optimize_iteration(iteration)
driver.end_iteration(self.grid.quality)

for i in range(max_iterations):
# use lower relaxation factor with first iterations, then increase
# TODO: tests
relaxation = 1 - (1 - initial_relaxation) * np.exp(-i)
self.optimize_iteration(relaxation)
# for i in range(max_iterations):
# # use lower relaxation factor with first iterations, then increase
# # TODO: tests
# relaxation = 1 - (1 - initial_relaxation) * np.exp(-i)
# self.optimize_iteration(relaxation)

this_quality = self.grid.quality
# this_quality = self.grid.quality

report(f"Optimization iteration {i}: {prev_quality:.3e} > {this_quality:.3e} (relaxation: {relaxation})")
# report(f"Optimization iteration {i}: {prev_quality:.3e} > {this_quality:.3e} (relaxation: {relaxation})")

if abs((prev_quality - this_quality) / (this_quality + VSMALL)) < tolerance:
report("Tolerance reached, stopping optimization")
break
# if abs((prev_quality - this_quality) / (this_quality + VSMALL)) < tolerance:
# report("Tolerance reached, stopping optimization")
# break

prev_quality = this_quality
# prev_quality = this_quality
2 changes: 2 additions & 0 deletions src/classy_blocks/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
TOL = 1e-7
# a small-ish value, named after OpenFOAM's constant
VSMALL = 1e-6
# a big-ish value
VBIG = 1e12

# Block definition:
# a more intuitive and quicker way to set patches,
Expand Down
110 changes: 106 additions & 4 deletions tests/test_optimize/test_optimizer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,116 @@
import unittest

import numpy as np
from parameterized import parameterized

from classy_blocks.construct.operations.box import Box
from classy_blocks.mesh import Mesh
from classy_blocks.modify.clamps.free import FreeClamp
from classy_blocks.modify.clamps.links import TranslationLink
from classy_blocks.modify.find.geometric import GeometricFinder
from classy_blocks.modify.optimizer import Optimizer
from classy_blocks.modify.optimizer import IterationData, IterationDriver, Optimizer
from classy_blocks.util import functions as f
from classy_blocks.util.constants import VBIG


class IterationDriverTests(unittest.TestCase):
def setUp(self):
self.max_iterations = 20
self.tolerance = 0.1
self.relaxed_iterations = 2

@property
def driver(self) -> IterationDriver:
return IterationDriver(self.max_iterations, self.relaxed_iterations, self.tolerance)

@parameterized.expand(
(
(0, 0.5),
(1, 0.75),
(2, 1),
(3, 1),
)
)
def test_next_relaxation(self, iteration, relaxation):
driver = self.driver

for _ in range(iteration):
driver.iterations.append(IterationData(0, 0, 0))

self.assertEqual(driver.next_relaxation, relaxation)

def test_initial_improvement_empty(self):
self.assertEqual(self.driver.initial_improvement, VBIG)

def test_initial_improvement(self):
driver = self.driver

driver.begin_iteration(1000)
driver.end_iteration(900)

self.assertEqual(driver.initial_improvement, 100)

def test_end_last_improvement_single(self):
driver = self.driver

driver.begin_iteration(1000)
driver.end_iteration(900)

self.assertEqual(driver.last_improvement, 100)

def test_initial_improvement_multi(self):
driver = self.driver

driver.begin_iteration(1000)
driver.end_iteration(900)

driver.begin_iteration(900)
driver.end_iteration(899)

self.assertEqual(driver.initial_improvement, 100)

def test_last_improvement_multi(self):
driver = self.driver

driver.begin_iteration(1000)
driver.end_iteration(900)

driver.begin_iteration(900)
driver.end_iteration(899)

self.assertEqual(driver.last_improvement, 1)

def test_converged_empty(self):
self.assertFalse(self.driver.converged)

def test_converged_iter_limit(self):
driver = self.driver

for i in range(1, driver.max_iterations + 2):
driver.begin_iteration(1000 / i)
driver.end_iteration(1100 / i)

self.assertTrue(driver.converged)

def test_converged_first(self):
"""Cannot converge in the first iteration"""
driver = self.driver

driver.begin_iteration(1000)
driver.end_iteration(900)

self.assertFalse(driver.converged)

def test_converged_improvement(self):
driver = self.driver

driver.begin_iteration(1000)
driver.end_iteration(900)

driver.begin_iteration(900)
driver.end_iteration(890)

self.assertTrue(driver.converged)


class OptimizerTests(unittest.TestCase):
Expand All @@ -31,7 +133,7 @@ def setUp(self):
self.finder = GeometricFinder(self.mesh)
self.optimizer = Optimizer(self.mesh)

self.vertex = list(self.finder.find_in_sphere([0, 0, 0]))[0]
self.vertex = next(iter(self.finder.find_in_sphere([0, 0, 0])))

def test_optimize(self):
# move a point, then optimize it back to
Expand All @@ -45,7 +147,7 @@ def test_optimize(self):
np.testing.assert_almost_equal(self.vertex.position, [0, 0, 0], decimal=1)

def test_optimize_linked(self):
follower = list(self.finder.find_in_sphere([0, 1, 0]))[0]
follower = next(iter(self.finder.find_in_sphere([0, 1, 0])))
link = TranslationLink(self.vertex, follower)

self.vertex.move_to([0.3, 0.3, 0.3])
Expand All @@ -58,4 +160,4 @@ def test_optimize_linked(self):

self.assertGreater(f.norm(follower.position - f.vector(0, 1, 0)), 0)
np.testing.assert_almost_equal(self.vertex.position, [0, 0, 0], decimal=1)
np.testing.assert_equal(follower.position - self.vertex.position, [0, 1, 0])
np.testing.assert_almost_equal(follower.position - self.vertex.position, [0, 1, 0])

0 comments on commit 7d65442

Please sign in to comment.