Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#6: Implemented DataHandler class and extract/remove/namelist #7

Merged
merged 3 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ifsbench/data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# (C) Copyright 2020- ECMWF.
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

from ifsbench.data.datahandler import * # noqa
from ifsbench.data.extracthandler import * # noqa
from ifsbench.data.namelisthandler import * # noqa
from ifsbench.data.renamehandler import * # noqa
34 changes: 34 additions & 0 deletions ifsbench/data/datahandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# (C) Copyright 2020- ECMWF.
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

from abc import ABC, abstractmethod

__all__ = ['DataHandler']

class DataHandler(ABC):
"""
Base class for data pipeline steps.

Each DataHandler object describes one step in the data pipeline. Multiple
DataHandler objects can be executed sequentially to perform specific data
setup tasks.
"""


@abstractmethod
def execute(self, wdir, **kwargs):
"""
Run this data handling operation in a given directory.

Parameters
----------
wdir : str or :any:`pathlib.Path`
The directory where the data handling should take place.
Subclasses of DataHandler should operate relative to this path,
unless absolute paths are given.
"""
return NotImplemented
52 changes: 52 additions & 0 deletions ifsbench/data/extracthandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# (C) Copyright 2020- ECMWF.
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

import pathlib
import shutil

from ifsbench.data.datahandler import DataHandler
from ifsbench.logging import debug

__all__ = ['ExtractHandler']


class ExtractHandler(DataHandler):
"""
DataHandler that extracts a given archive to a specific directory.

Parameters
----------
archive_path: str or :any:`pathlib.Path`
The path to the archive that will be extracted. If a relative path
is given, this will be relative to the ``wdir`` argument in
:meth:`execute`.

target_dir: str, :any:`pathlib.Path` or None
The directory where the archive will be unpacked. If a relative path
is given, this will be relative to the ``wdir`` argument in
:meth:`execute`.
"""

def __init__(self, archive_path, target_dir=None):
self._archive_path = pathlib.Path(archive_path)
if target_dir is None:
self._target_dir = None
else:
self._target_dir = pathlib.Path(target_dir)

def execute(self, wdir, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docstring

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the execute function needs its own docstring? I thought that it would just inherit the docstring from the parent class DataHandler?!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it works like this but fine with keeping it like this for now.

wdir = pathlib.Path(wdir)

target_dir = wdir
if self._target_dir is not None:
if self._target_dir.is_absolute():
target_dir = self._target_dir
else:
target_dir = wdir/self._target_dir

debug(f"Unpack archive {self._archive_path} to {target_dir}.")
shutil.unpack_archive(self._archive_path, target_dir)
johannesbulin marked this conversation as resolved.
Show resolved Hide resolved
166 changes: 166 additions & 0 deletions ifsbench/data/namelisthandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# (C) Copyright 2020- ECMWF.
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

from enum import auto, Enum
import pathlib

import f90nml

from ifsbench.data.datahandler import DataHandler
from ifsbench.logging import debug, info


__all__ = ['NamelistOverride', 'NamelistHandler', 'NamelistOperation']

class NamelistOperation(Enum):
SET = auto()
APPEND = auto()
DELETE = auto()

class NamelistOverride:
"""
Specify changes that will be applied to a namelist.

Parameters
----------
key: str or iterable of str
The namelist entry that will be modified. Can be either a string
where '/' separates the namelist name and the entry key or an iterable
of strings of length two.

mode: NamelistOperation
What kind of operation is specified. Can be
* Set a certain entry.
* Append to an array entry.
* Delete an entry.

value: str or None
The value that is set (SET operation) or appended (APPEND).
"""


def __init__(self, key, mode, value=None):
if isinstance(key, str):
self._keys = key.split('/')
else:
self._keys = tuple(key)

if len(self._keys) != 2:
raise ValueError("The key object must be of length two.")

self._mode = mode
self._value = value

if self._value is None:
if self._mode in (NamelistOperation.SET, NamelistOperation.APPEND):
raise ValueError("The new value must not be None!")

def apply(self, namelist):
"""
Apply the stored changes to a namelist.

Parameters
----------
namelist: :any:`f90nml.Namelist`
The namelist to which the changes are applied.
"""

if self._keys[0] not in namelist:
if self._mode == NamelistOperation.DELETE:
return

namelist[self._keys[0]] = {}

namelist = namelist[self._keys[0]]
key = self._keys[-1]

if self._mode == NamelistOperation.SET:
debug(f"Set namelist entry {str(self._keys)} = {str(self._value)}.")
johannesbulin marked this conversation as resolved.
Show resolved Hide resolved
namelist[key] = self._value
elif self._mode == NamelistOperation.APPEND:
if key not in namelist:
namelist[key] = []

if not hasattr(namelist[key], 'append'):
raise ValueError("Values can only be appended to arrays!")

# f90nml doesn't seem to do any kind of checking, so we could
# create arrays in the namelist where the entries have different
# types.
# This will most likely cause issues, so we verify here, that
# the array entries have the same type.
if len(namelist[key]) > 0:
type_list = type(namelist[key][0])
type_value = type(self._value)

if type_list != type_value:
raise ValueError("The given value must have the same type as existing array entries!")

debug(f"Append {str(self._value)} to namelist entry {str(self._keys)}.")

namelist[key].append(self._value)

elif self._mode == NamelistOperation.DELETE:
if key in namelist:
debug(f"Delete namelist entry {str(self._keys)}.")
del namelist[key]

class NamelistHandler(DataHandler):
"""
DataHandler specialisation that can modify Fortran namelists.

Parameters
----------
input_path: str or :any:`pathlib.Path`
The path to the namelist that will be modified. If a relative path
is given, this will be relative to the ``wdir`` argument in
:meth:`execute`.

output_path: str or :any:`pathlib.Path`
The path to which the updated namelist will be written. If a relative
path is given, this will be relative to the ``wdir`` argument in
:meth:`execute`.

overrides: iterable of :class:`NamelistOverride`
The NamelistOverrides that will be applied.
"""

def __init__(self, input_path, output_path, overrides):

self._input_path = pathlib.Path(input_path)
self._output_path = pathlib.Path(output_path)

self._overrides = list(overrides)
for override in self._overrides:
if not isinstance(override, NamelistOverride):
raise ValueError("Namelist overrides must be NamelistOverride objects!")
Comment on lines +138 to +140
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use a list comprehension here:

Suggested change
for override in self._overrides:
if not isinstance(override, NamelistOverride):
raise ValueError("Namelist overrides must be NamelistOverride objects!")
if not all(isinstance(override, NamelistOverride) for override in self._overrides):
raise ValueError("Namelist overrides must be NamelistOverride objects!")


def execute(self, wdir, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing docstring

wdir = pathlib.Path(wdir)

if self._input_path.is_absolute():
input_path = self._input_path
else:
input_path = wdir/self._input_path
johannesbulin marked this conversation as resolved.
Show resolved Hide resolved

# Do nothing if the input namelist doesn't exist.
if not input_path.exists():
info(f"Namelist {input_path} doesn't exist.")
return

if self._output_path.is_absolute():
output_path = self._output_path
else:
output_path = wdir/self._output_path

debug(f"Modify namelist {input_path}.")
namelist = f90nml.read(input_path)

for override in self._overrides:
override.apply(namelist)

namelist.write(output_path, force=True)
115 changes: 115 additions & 0 deletions ifsbench/data/renamehandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# (C) Copyright 2020- ECMWF.
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

from enum import auto, Enum
from pathlib import Path
import re
import shutil

from ifsbench.data.datahandler import DataHandler
from ifsbench.logging import debug

__all__ = ['RenameHandler', 'RenameMode']


class RenameMode(Enum):
"""
Enumeration of available rename operations.

Attributes
----------
COPY :
Copy the file from its current place to the new location.
SYMLINK :
Create a symlink in the new location, pointing to its current
location.
MOVE :
Move the file from its current place to the new location.
"""
COPY = auto()
SYMLINK = auto()
MOVE = auto()


class RenameHandler(DataHandler):
"""
DataHandler specialisation that can move/rename files by using regular
expressions (as in :any:`re.sub`).

Parameters
----------
pattern: str or :any:`re.Pattern`
The pattern that will be replaced. Corresponds to ``pattern`` in
:any:`re.sub`.

repl: str
The replacement pattern. Corresponds to ``repl`` in :any:`re.sub`.

mode: :class:`RenameMode`
Specifies how the renaming is done (copy, move, symlink).
"""

def __init__(self, pattern, repl, mode=RenameMode.SYMLINK):
if isinstance(pattern, re.Pattern):
self._pattern = pattern
else:
self._pattern = re.compile(pattern)
self._repl = str(repl)
self._mode = mode


def execute(self, wdir, **kwargs):
wdir = Path(wdir)

# We create a dictionary first, that stores the paths that will be
# modified.
path_mapping = {}

for f in wdir.rglob('*'):
if f.is_dir():
continue

dest = Path(self._pattern.sub(self._repl, str(f.relative_to(wdir))))
dest = (wdir/dest).resolve()

if f != dest:
path_mapping[f] = dest

# Check that we don't end up with two initial files being renamed to
# the same file. Crash if this is the case.
if len(set(path_mapping.keys())) != len(set(path_mapping.values())):
raise RuntimeError("Renaming would cause two different files to be given the same name!")

for source, dest in path_mapping.items():
# Crash if we are renaming one of the files to a path that is also
# the "source" for another renaming.
if dest in path_mapping:
raise RuntimeError(f"Can't move {source} to {dest} as there is a cyclical dependency!")

# Delete whatever resides at dest at the moment (whether it's a
# file or a directory).
if dest.exists():
debug(f"Delete existing file/directory {dest} before renaming.")
try:
shutil.rmtree(dest)
except NotADirectoryError:
dest.unlink()

dest.parent.mkdir(parents=True, exist_ok=True)

if self._mode == RenameMode.COPY:
debug(f"Copy {source} to {dest}.")

shutil.copy(source, dest)
elif self._mode == RenameMode.SYMLINK:
debug(f"Symlink {source} to {dest}.")

dest.symlink_to(source)
elif self._mode == RenameMode.MOVE:
debug(f"Move {source} to {dest}.")

source.rename(dest)
Loading
Loading