Skip to content

Commit

Permalink
new Langevin integrator
Browse files Browse the repository at this point in the history
  • Loading branch information
shinkle-lanl committed Dec 3, 2024
1 parent 6dd3a91 commit 6b8ba8f
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 10 deletions.
4 changes: 2 additions & 2 deletions hippynn/molecular_dynamics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""

from .md import MolecularDynamics, Variable, NullUpdater, VelocityVerlet, LangevinDynamics, VariableUpdater
from .md import MolecularDynamics, Variable, NullUpdater, VelocityVerlet, LangevinDynamics, VariableUpdater, ASELangevinDynamics


__all__ = ["MolecularDynamics", "Variable", "NullUpdater", "VelocityVerlet", "LangevinDynamics", "VariableUpdater"]
__all__ = ["MolecularDynamics", "Variable", "NullUpdater", "VelocityVerlet", "LangevinDynamics", "VariableUpdater", "ASELangevinDynamics"]
118 changes: 110 additions & 8 deletions hippynn/molecular_dynamics/md.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ def pre_step(self, dt: float):
:param dt: timestep
"""
if len(self.variable.data["velocity"].shape) != len(self.variable.data["mass"].shape):
self.variable.data["mass"] = self.variable.data["mass"].unsqueeze(-1)

self.variable.data["velocity"] = self.variable.data["velocity"] + 0.5 * dt * self.variable.data["acceleration"]
self.variable.data["position"] = self.variable.data["position"] + self.variable.data["velocity"] * dt

Expand All @@ -233,12 +236,7 @@ def post_step(self, dt: float, model_outputs: dict):
:param model_outputs: dictionary of HIPNN model outputs
"""
self.variable.data["force"] = model_outputs[self.force_key].to(self.variable.device)
if len(self.variable.data["force"].shape) == len(self.variable.data["mass"].shape):
self.variable.data["acceleration"] = self.variable.data["force"].detach() / self.variable.data["mass"] * self.force_units / (self.position_units / self.time_units**2)
else:
self.variable.data["acceleration"] = (
self.variable.data["force"].detach() / self.variable.data["mass"][..., None] * self.force_units / (self.position_units / self.time_units**2)
)
self.variable.data["acceleration"] = self.variable.data["force"].detach() / self.variable.data["mass"] * self.force_units / (self.position_units / self.time_units**2)
self.variable.data["velocity"] = self.variable.data["velocity"] + 0.5 * dt * self.variable.data["acceleration"]


Expand Down Expand Up @@ -275,10 +273,12 @@ def __init__(
self.force_key = force_db_name
self.temperature = temperature
self.frix = frix
self.kB = ase.units.kB
self.force_units = (force_units or ase.units.eV/ase.units.Ang)
self.position_units = (position_units or ase.units.Ang)
self.time_units = (time_units or ase.units.fs)
self.kB = ase.units.kB * self.time_units / (self.position_units**2)

print(f"Friction coefficient: {self.frix}")

if seed is not None:
torch.manual_seed(seed)
Expand Down Expand Up @@ -309,7 +309,7 @@ def post_step(self, dt: float, model_outputs: dict):
self.variable.data["force"] = model_outputs[self.force_key].to(self.variable.device)

if len(self.variable.data["force"].shape) != len(self.variable.data["mass"].shape):
self.variable.data["mass"] = self.variable.data["mass"][..., None]
self.variable.data["mass"] = self.variable.data["mass"].unsqueeze(-1)

self.variable.data["acceleration"] = self.variable.data["force"].detach() / self.variable.data["mass"] * self.force_units / (self.position_units / self.time_units**2)

Expand All @@ -321,6 +321,103 @@ def post_step(self, dt: float, model_outputs: dict):
* torch.randn_like(self.variable.data["velocity"], memory_format=torch.contiguous_format)
)

class ASELangevinDynamics(VariableUpdater):
"""
Implements the Langevin algorithm
"""

required_variable_data = ["position", "velocity", "mass"]

def __init__(
self,
force_db_name: str,
temperature: float,
frix: float,
force_units: Optional[float] = None,
position_units: Optional[float] = None,
time_units: Optional[float] = None,
fix_cm: Optional[bool] = True,
rng: Optional[int] = None,
):
"""
:param force_db_name: key which will correspond to the force on the corresponding Variable
in the HIPNN model output dictionary
:param temperature: temperature for Langevin algorithm
:param frix: friction coefficient for Langevin algorithm
:param force_units: model force units output (in terms of ase.units), defaults to eV/Ang
:param position_units: model position units output (in terms of ase.units), defaults to Ang
:param time_units: model time units output (in terms of ase.units), defaults to fs
:param seed: used to set seed for reproducibility, defaults to None
mass of attached Variable must be in amu
"""

self.force_key = force_db_name
self.temperature = temperature
self.frix = frix
self.force_units = (force_units or ase.units.eV/ase.units.Ang)
self.position_units = (position_units or ase.units.Ang)
self.time_units = (time_units or ase.units.fs)
self.kB = ase.units.kB * self.time_units / (self.position_units**2)
self.fix_cm = fix_cm

print(f"Friction coefficient: {frix}")

if rng is None:
self.rng = np.random
else:
self.rng = rng


def pre_step(self, dt:float):
"""Updates to variables performed during each step of MD simulation before HIPNN model evaluation
:param dt: timestep
"""

if len(self.variable.data["velocity"].shape) != len(self.variable.data["mass"].shape):
self.variable.data["mass"] = self.variable.data["mass"].unsqueeze(-1)

sigma = (2 * self.temperature * self.frix / self.variable.data["mass"])**(1/2)
self.c1 = dt / 2 - (dt**2) * self.frix / 8
self.c2 = dt * self.frix / 2 - (dt**2) * (self.frix**2) / 8
self.c3 = (dt**(1/2)) * sigma / 2 - (dt**(1.5)) * self.frix * sigma / 8
self.c5 = (dt**(1.5)) * sigma / (2 * (3**(1/2)))
self.c4 = self.frix / 2 * self.c5

xi = torch.as_tensor(self.rng.standard_normal(size=self.variable.data["velocity"].shape), device=self.variable.data["velocity"].device, dtype=self.variable.data["velocity"].dtype)
eta = torch.as_tensor(self.rng.standard_normal(size=self.variable.data["velocity"].shape), device=self.variable.data["velocity"].device, dtype=self.variable.data["velocity"].dtype)
self.rnd_pos = self.c5 * eta
self.rnd_vel = self.c3 * xi - self.c4 * eta
if self.fix_cm:
self.rnd_pos -= self.rnd_pos.mean(axis=1)
mass = self.variable.data["mass"].clone().detach()
self.rnd_vel -= (self.rnd_vel * mass).mean(axis=1) / mass

self.variable.data["velocity"] += self.c1 * self.variable.data["acceleration"] - self.c2 * self.variable.data["velocity"] + self.rnd_vel
self.variable.data["position"] = self.variable.data["position"] + self.variable.data["velocity"] * dt + self.rnd_pos

if "cell" in self.variable.data.keys():
_, self.variable.data["position"], *_ = wrap_systems_torch(coords=self.variable.data["position"], cell=self.variable.data["cell"], cutoff=0) # cutoff only impacts unused outputs; can be set arbitrarily
try:
self.variable.data["unwrapped_position"] = self.variable.data["unwrapped_position"] + self.variable.data["velocity"] * dt
except KeyError:
self.variable.data["unwrapped_position"] = self.variable.data["position"].clone().detach()

def post_step(self, dt: float, model_outputs: dict):
"""
Updates to variables performed during each step of MD simulation after HIPNN model evaluation
:param dt: timestep
:param model_outputs: dictionary of HIPNN model outputs
"""

self.variable.data["force"] = model_outputs[self.force_key].to(self.variable.device)

self.variable.data["acceleration"] = self.variable.data["force"].detach() / self.variable.data["mass"] * self.force_units / (self.position_units / self.time_units**2)

self.variable.data["velocity"] += self.c1 * self.variable.data["acceleration"] - self.c2 * self.variable.data["velocity"] + self.rnd_vel


class MolecularDynamics:
"""
Expand Down Expand Up @@ -446,6 +543,9 @@ def _step(

model_outputs = self.model(**model_inputs)

# for key, value in model_outputs.items():
# print(key, value.mean())

for variable in self.variables:
variable.updater.post_step(dt, model_outputs)

Expand Down Expand Up @@ -477,6 +577,8 @@ def run(self, dt: float, n_steps: int, record_every: Optional[int] = None):
model_outputs = self._step(dt)
if record_every is not None and (i + 1) % record_every == 0:
self._update_data(model_outputs)
# if i % 10 == 0:
# print((model_outputs["force"] ** 2).sum(axis=-1).mean(), flush=True)

def get_data(self):
"""Returns a dictionary of the recorded data"""
Expand Down

0 comments on commit 6b8ba8f

Please sign in to comment.