From d07e133b8f117717954f75167e66f10ddf164c52 Mon Sep 17 00:00:00 2001 From: Frederik Van der Veken Date: Fri, 23 Feb 2024 15:39:23 +0100 Subject: [PATCH 01/10] Updated protectfile, by trying to make fstat comparison more robust --- xaux/protectfile.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/xaux/protectfile.py b/xaux/protectfile.py index b916d21..e97bc4b 100644 --- a/xaux/protectfile.py +++ b/xaux/protectfile.py @@ -46,16 +46,16 @@ def get_hash(filename, size=128): def get_fstat(filename): stats = Path(filename).stat() return { - 'n_sequence_fields': stats.n_sequence_fields, - 'n_unnamed_fields': stats.n_unnamed_fields, - 'st_mode': stats.st_mode, - 'st_ino': stats.st_ino, - 'st_dev': stats.st_dev, - 'st_uid': stats.st_uid, - 'st_gid': stats.st_gid, - 'st_size': stats.st_size, - 'st_mtime_ns': stats.st_mtime_ns, - 'st_ctime_ns': stats.st_ctime_ns, + 'n_sequence_fields': int(stats.n_sequence_fields), + 'n_unnamed_fields': int(stats.n_unnamed_fields), + 'st_mode': int(stats.st_mode), + 'st_ino': int(stats.st_ino), + 'st_dev': int(stats.st_dev), + 'st_uid': int(stats.st_uid), + 'st_gid': int(stats.st_gid), + 'st_size': int(stats.st_size), + 'st_mtime_ns': int(stats.st_mtime_ns), + 'st_ctime_ns': int(stats.st_ctime_ns), } class ProtectFile: @@ -207,7 +207,6 @@ def __init__(self, *args, **kwargs): if self._exists: raise FileExistsError - # Provide an expected running time (to free a file in case of crash) max_lock_time = arg.pop('max_lock_time', None) if max_lock_time is not None and self._readonly == False \ @@ -302,14 +301,20 @@ def __exit__(self, *args, **kwargs): # Check that original file was not modified in between (i.e. corrupted) # TODO: verify that checking file stats is 1) enough, and 2) not # potentially problematic on certain file systems - if self._exists and get_fstat(self.file) != self._fstat: + file_changed = False + if self._exists: + new_stats = get_fstat(self.file) + for key, val in self._fstat.items(): + if key not in new_stats or val != new_stats[key]: + file_changed = True + if file_changed: print(f"Error: File {self.file} changed during lock!") # If corrupted, restore from backup # and move result of calculation (i.e. tempfile) to the parent folder print("Old stats:") print(self._fstat) print("New stats:") - print(get_fstat(self.file)) + print(new_stats) self.restore() else: # All is fine: move result from temporary file to original From 744b52ef9d71bc27024364241c55f420642baae7 Mon Sep 17 00:00:00 2001 From: kparasch Date: Fri, 23 Feb 2024 15:59:34 +0100 Subject: [PATCH 02/10] test! --- .../test_deliberate_failure_and_protection.py | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 tests/test_deliberate_failure_and_protection.py diff --git a/tests/test_deliberate_failure_and_protection.py b/tests/test_deliberate_failure_and_protection.py new file mode 100644 index 0000000..3792950 --- /dev/null +++ b/tests/test_deliberate_failure_and_protection.py @@ -0,0 +1,70 @@ +from multiprocessing import Pool +import pytest +from xaux import ProtectFile +import json +from pathlib import Path +import time +import shutil + + +def rewrite(pf, with_copy=False): + data = json.load(pf) + time.sleep(0.2) + data["myint"] += 1 + if not with_copy: + pf.seek(0) # revert point to beginning of file + json.dump(data, pf, indent=4, sort_keys=True) + pf.truncate() + else: # write to another file and copy back + cfname = "_copy_" + pf.name + with open(cfname, "w") as cf: + json.dump(data, cf, indent=4, sort_keys=True) + shutil.copyfile(cfname, pf.name) + Path.unlink(Path(cfname)) + + +def change_file_protected(fname, with_copy=False): + with ProtectFile(fname, "r+", backup=False, wait=0.06) as pf: + rewrite(pf, with_copy=with_copy) + return + + +def change_file_standard(fname, with_copy=False): + with open(fname, "r+") as pf: # fails with this context + rewrite(pf) + return + + +def init_file(fname): + with ProtectFile(fname, "w", backup=False, wait=1) as pf: + json.dump({"myint": 0}, pf, indent=4) + + +def test_deliberate_failure(): + fname = "test_standard.json" + assert not Path(fname).exists() + init_file(fname) + workers = 4 + with Pool(processes=workers) as pool: + pool.map(change_file_standard, [fname] * 4) + + with open(fname, "r+") as pf: # fails with this context + data = json.load(pf) + assert data["myint"] != workers # assert that result is wrong + Path.unlink(Path(fname)) + + +@pytest.mark.parametrize("with_copy", [False, True]) +def test_protection(with_copy): + print(with_copy) + fname = "test_protection.json" + assert not Path(fname).exists() + init_file(fname) + workers = 4 + with Pool(processes=workers) as pool: + pool.map(change_file_protected, [(fname)] * 4) + + with open(fname, "r+") as pf: # fails with this context + data = json.load(pf) + assert data["myint"] == workers + Path.unlink(Path(fname)) From be43eb876aaa94519ef56f74e430a5a6fb1583fd Mon Sep 17 00:00:00 2001 From: kparasch Date: Fri, 23 Feb 2024 16:01:03 +0100 Subject: [PATCH 03/10] remove print --- tests/test_deliberate_failure_and_protection.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_deliberate_failure_and_protection.py b/tests/test_deliberate_failure_and_protection.py index 3792950..436d8b1 100644 --- a/tests/test_deliberate_failure_and_protection.py +++ b/tests/test_deliberate_failure_and_protection.py @@ -56,7 +56,6 @@ def test_deliberate_failure(): @pytest.mark.parametrize("with_copy", [False, True]) def test_protection(with_copy): - print(with_copy) fname = "test_protection.json" assert not Path(fname).exists() init_file(fname) From ce53f4a2a60d7068223d9ec7708d80d8c98c770d Mon Sep 17 00:00:00 2001 From: kparasch Date: Fri, 23 Feb 2024 17:32:11 +0100 Subject: [PATCH 04/10] RuntimeError bugfix --- xaux/protectfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xaux/protectfile.py b/xaux/protectfile.py index b916d21..5402b35 100644 --- a/xaux/protectfile.py +++ b/xaux/protectfile.py @@ -251,7 +251,7 @@ def __init__(self, *args, **kwargs): self._flock = io.open(self.lockfile, 'r+') break else: - raise RunTimeError("Too many lockfiles!") + raise RuntimeError("Too many lockfiles!") # Store lock information if max_lock_time is not None: From 3f3c14935a31dcd33c8ee7c9a45cdf1a65889bcf Mon Sep 17 00:00:00 2001 From: kparasch Date: Fri, 23 Feb 2024 20:45:44 +0100 Subject: [PATCH 05/10] xrdcp to eos functionality --- xaux/protectfile.py | 67 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 8 deletions(-) diff --git a/xaux/protectfile.py b/xaux/protectfile.py index 5402b35..5c7022d 100644 --- a/xaux/protectfile.py +++ b/xaux/protectfile.py @@ -14,6 +14,7 @@ import tempfile import time import json +import subprocess tempdir = tempfile.TemporaryDirectory() protected_open = {} @@ -58,6 +59,17 @@ def get_fstat(filename): 'st_ctime_ns': stats.st_ctime_ns, } +def xrdcp_installed(): + try: + cmd = subprocess.run(["xrdcp", "--version"], stdout=subprocess.PIPE, + stderr=subprocess.PIPE, check=True) + return cmd.returncode == 0 + except (subprocess.CalledProcessError, FileNotFoundError): + return False + + + + class ProtectFile: """A wrapper around a file pointer, protecting it with a lockfile and backups. @@ -164,6 +176,8 @@ def __init__(self, *args, **kwargs): max_lock_time : float, default None If provided, it will write the maximum runtime in seconds inside the lockfile. This is to avoided crashed accesses locking the file forever. + eos_url : string, default None + If provided, it will use xrdcp to copy the temporary file to eos and back. Additionally, the following parameters are inherited from open(): 'file', 'mode', 'buffering', 'encoding', 'errors', 'newline', 'closefd', 'opener' @@ -186,6 +200,17 @@ def __init__(self, *args, **kwargs): self._backup_if_readonly = arg.pop('backup_if_readonly', False) self._check_hash = arg.pop('check_hash', True) + self._eos_url = arg.pop('eos_url', None) + if self._eos_url is not None: + self.original_eos_path = arg['file'] + if not self._eos_url.startswith("root://eos") or not self._eos_url.endswith('.cern.ch/'): + raise NotImplementedError(f'Invalid EOS url provided: {self._eos_url}') + if not str(self.original_eos_path).startswith("/eos"): + raise NotImplementedError(f'Only /eos paths are supported with eos_url.') + if not xrdcp_installed(): + raise RuntimeError("xrdcp is not installed.") + self.original_eos_path = self._eos_url + self.original_eos_path + # Initialise paths arg['file'] = Path(arg['file']).resolve() file = arg['file'] @@ -193,6 +218,7 @@ def __init__(self, *args, **kwargs): self._lock = Path(file.parent, file.name + '.lock').resolve() self._temp = Path(tempdir.name, file.name).resolve() + # We throw potential FileNotFoundError and FileExistsError before # creating the backup and temporary files self._exists = True if self.file.is_file() else False @@ -280,8 +306,12 @@ def __init__(self, *args, **kwargs): # slow if many processes write to it concurrently if not self._readonly: if self._exists: - _print_debug("Init", f"cp {self.file=} to {self.tempfile=}") - shutil.copy2(self.file, self.tempfile) + if self._eos_url is not None: + _print_debug("Init", f"xrdcp {self.original_eos_path} to {self.tempfile=}") + self.xrdcp(self.original_eos_path, self.tempfile) + else: + _print_debug("Init", f"cp {self.file=} to {self.tempfile=}") + shutil.copy2(self.file, self.tempfile) arg['file'] = self.tempfile self._fd = io.open(**arg) @@ -338,16 +368,24 @@ def mv_temp(self, destination=None): if not self._readonly: if destination is None: # Move temporary file to original file - _print_debug("Mv_temp", f"cp {self.tempfile=} to {self.file=}") - shutil.copy2(self.tempfile, self.file) + if self._eos_url is not None: + _print_debug("Mv_temp", f"xrdcp {self.tempfile=} to {self.original_eos_path=}") + self.xrdcp(self.tempfile, self.original_eos_path) + else: + _print_debug("Mv_temp", f"cp {self.tempfile=} to {self.file=}") + shutil.copy2(self.tempfile, self.file) # Check if copy succeeded if self._check_hash and get_hash(self.tempfile) != get_hash(self.file): print(f"Warning: tried to copy temporary file {self.tempfile} into {self.file}, " + "but hashes do not match!") self.restore() else: - _print_debug("Mv_temp", f"cp {self.tempfile=} to {destination=}") - shutil.copy2(self.tempfile, destination) + if self._eos_url is not None: + _print_debug("Mv_temp", f"xrdcp {self.tempfile=} to {destination=}") + self.xrdcp(self.tempfile, destination) + else: + _print_debug("Mv_temp", f"cp {self.tempfile=} to {destination=}") + shutil.copy2(self.tempfile, destination) _print_debug("Mv_temp", f"unlink {self.tempfile=}") self.tempfile.unlink() @@ -359,8 +397,11 @@ def restore(self): self.backupfile.rename(self.file) print('Restored file to previous state.') if not self._readonly: - alt_file = Path(self.file.parent, self.file.name + '__' \ - + datetime.datetime.now().isoformat() + '.result').resolve() + if self._eos_url is not None: + extension = f"__{datetime.datetime.now().isoformat()}.result" + alt_file = self.original_eos_path + extension + else: + alt_file = Path(self.file.parent, self.file.name + extension).resolve() self.mv_temp(alt_file) print(f"Saved calculation results in {alt_file.name}.") @@ -385,4 +426,14 @@ def release(self, pop=True): self.lockfile.unlink() if pop: protected_open.pop(self._file, 0) + + def xrdcp(self, source=None, destination=None): + if source is None or destination is None: + raise RuntimeError("Source or destination not specified in xrdcp command.") + if self._eos_url is None: + raise RuntimeError("self._eos_url is None, while it shouldn't have been.") + + subprocess.run(["xrdcp", "-f", f"{str(source)}", f"{str(destination)}"], + check=True) + From 699f72f42b6f1341816ab02a07cec83d4bc54e4a Mon Sep 17 00:00:00 2001 From: kparasch Date: Fri, 23 Feb 2024 21:03:53 +0100 Subject: [PATCH 06/10] documentation update --- xaux/protectfile.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/xaux/protectfile.py b/xaux/protectfile.py index 5c7022d..d4e652c 100644 --- a/xaux/protectfile.py +++ b/xaux/protectfile.py @@ -118,14 +118,14 @@ class ProtectFile: -------- Reading in a file (while making sure it is not written to by another process): - >>> from protectfile import ProtectedFile - >>> with ProtectedFile('thebook.txt', 'r', backup=False, wait=1) as pf: + >>> from protectfile import ProtectFile + >>> with ProtectFile('thebook.txt', 'r', backup=False, wait=1) as pf: >>> text = pf.read() Reading and appending to a file: - >>> from protectfile import ProtectedFile - >>> with ProtectedFile('thebook.txt', 'r+', backup=False, wait=1) as pf: + >>> from protectfile import ProtectFile + >>> with ProtectFile('thebook.txt', 'r+', backup=False, wait=1) as pf: >>> text = pf.read() >>> pf.write("This string will be added at the end of the file, \ ... however, it won't be added to the 'text' variable") @@ -133,8 +133,8 @@ class ProtectFile: Reading and updating a JSON file: >>> import json - >>> from protectfile import ProtectedFile - >>> with ProtectedFile(info.json, 'r+', backup=False, wait=1) as pf: + >>> from protectfile import ProtectFile + >>> with ProtectFile(info.json, 'r+', backup=False, wait=1) as pf: >>> meta = json.load(pf) >>> meta.update({'author': 'Emperor Claudius'}) >>> pf.truncate(0) # Delete file contents (to avoid appending) @@ -144,13 +144,21 @@ class ProtectFile: Reading and updating a Parquet file: >>> import pandas as pd - >>> from protectfile import ProtectedFile - >>> with ProtectedFile(mydata.parquet, 'r+b', backup=False, wait=1) as pf: + >>> from protectfile import ProtectFile + >>> with ProtectFile(mydata.parquet, 'r+b', backup=False, wait=1) as pf: >>> data = pd.read_parquet(pf) >>> data['x'] += 5 >>> pf.truncate(0) # Delete file contents (to avoid appending) >>> pf.seek(0) # Move file pointer to start of file >>> data.to_parquet(pf, index=True) + + Reading and updating a json file in EOS with xrdcp: + + >>> from protectfile import ProtectFile + >>> eos_url = 'root://eosuser.cern.ch/' + >>> fname = '/eos/user/k/kparasch/test.json' + >>> with ProtectFile(fname, 'r+', eos_url=eos_url) as pf: + >>> pass """ def __init__(self, *args, **kwargs): From 195935eed33ed71a0158e228ceed399a498d7aa6 Mon Sep 17 00:00:00 2001 From: kparasch Date: Fri, 23 Feb 2024 21:31:59 +0100 Subject: [PATCH 07/10] few more comments and aguard against backup with eos --- xaux/protectfile.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/xaux/protectfile.py b/xaux/protectfile.py index d4e652c..64f7f6d 100644 --- a/xaux/protectfile.py +++ b/xaux/protectfile.py @@ -207,12 +207,15 @@ def __init__(self, *args, **kwargs): self._do_backup = True self._backup_if_readonly = arg.pop('backup_if_readonly', False) self._check_hash = arg.pop('check_hash', True) - + + # Make sure conditions are satisfied when using EOS-XRDCP self._eos_url = arg.pop('eos_url', None) if self._eos_url is not None: self.original_eos_path = arg['file'] + if self._do_backup or self._backup_if_readonly: + raise NotImplementedError("Backup not supported with eos_url.") if not self._eos_url.startswith("root://eos") or not self._eos_url.endswith('.cern.ch/'): - raise NotImplementedError(f'Invalid EOS url provided: {self._eos_url}') + raise NotImplementedError(f'Invalid EOS url provided: {self._eos_url=}') if not str(self.original_eos_path).startswith("/eos"): raise NotImplementedError(f'Only /eos paths are supported with eos_url.') if not xrdcp_installed(): @@ -315,7 +318,7 @@ def __init__(self, *args, **kwargs): if not self._readonly: if self._exists: if self._eos_url is not None: - _print_debug("Init", f"xrdcp {self.original_eos_path} to {self.tempfile=}") + _print_debug("Init", f"xrdcp {self.original_eos_path=} to {self.tempfile=}") self.xrdcp(self.original_eos_path, self.tempfile) else: _print_debug("Init", f"cp {self.file=} to {self.tempfile=}") From 82724c67eb662f74089a7d52ef06213c57bba008 Mon Sep 17 00:00:00 2001 From: kparasch Date: Fri, 23 Feb 2024 21:35:01 +0100 Subject: [PATCH 08/10] small bugfix --- xaux/protectfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xaux/protectfile.py b/xaux/protectfile.py index 64f7f6d..45eea8c 100644 --- a/xaux/protectfile.py +++ b/xaux/protectfile.py @@ -408,8 +408,8 @@ def restore(self): self.backupfile.rename(self.file) print('Restored file to previous state.') if not self._readonly: + extension = f"__{datetime.datetime.now().isoformat()}.result" if self._eos_url is not None: - extension = f"__{datetime.datetime.now().isoformat()}.result" alt_file = self.original_eos_path + extension else: alt_file = Path(self.file.parent, self.file.name + extension).resolve() From 78e09d7ae1abba4890ca98659ac0cdcf42892755 Mon Sep 17 00:00:00 2001 From: Frederik Van der Veken Date: Mon, 26 Feb 2024 09:54:24 +0100 Subject: [PATCH 09/10] whitespace cleaning --- xaux/protectfile.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/xaux/protectfile.py b/xaux/protectfile.py index 45cdb88..c49f79e 100644 --- a/xaux/protectfile.py +++ b/xaux/protectfile.py @@ -68,8 +68,6 @@ def xrdcp_installed(): return False - - class ProtectFile: """A wrapper around a file pointer, protecting it with a lockfile and backups. @@ -151,7 +149,7 @@ class ProtectFile: >>> pf.truncate(0) # Delete file contents (to avoid appending) >>> pf.seek(0) # Move file pointer to start of file >>> data.to_parquet(pf, index=True) - + Reading and updating a json file in EOS with xrdcp: >>> from protectfile import ProtectFile @@ -207,7 +205,7 @@ def __init__(self, *args, **kwargs): self._do_backup = True self._backup_if_readonly = arg.pop('backup_if_readonly', False) self._check_hash = arg.pop('check_hash', True) - + # Make sure conditions are satisfied when using EOS-XRDCP self._eos_url = arg.pop('eos_url', None) if self._eos_url is not None: @@ -229,7 +227,6 @@ def __init__(self, *args, **kwargs): self._lock = Path(file.parent, file.name + '.lock').resolve() self._temp = Path(tempdir.name, file.name).resolve() - # We throw potential FileNotFoundError and FileExistsError before # creating the backup and temporary files self._exists = True if self.file.is_file() else False @@ -387,7 +384,7 @@ def mv_temp(self, destination=None): if self._eos_url is not None: _print_debug("Mv_temp", f"xrdcp {self.tempfile=} to {self.original_eos_path=}") self.xrdcp(self.tempfile, self.original_eos_path) - else: + else: _print_debug("Mv_temp", f"cp {self.tempfile=} to {self.file=}") shutil.copy2(self.tempfile, self.file) # Check if copy succeeded @@ -399,7 +396,7 @@ def mv_temp(self, destination=None): if self._eos_url is not None: _print_debug("Mv_temp", f"xrdcp {self.tempfile=} to {destination=}") self.xrdcp(self.tempfile, destination) - else: + else: _print_debug("Mv_temp", f"cp {self.tempfile=} to {destination=}") shutil.copy2(self.tempfile, destination) _print_debug("Mv_temp", f"unlink {self.tempfile=}") @@ -442,7 +439,7 @@ def release(self, pop=True): self.lockfile.unlink() if pop: protected_open.pop(self._file, 0) - + def xrdcp(self, source=None, destination=None): if source is None or destination is None: raise RuntimeError("Source or destination not specified in xrdcp command.") From 0226f28816f66d8bfc8a103afac535390b029901 Mon Sep 17 00:00:00 2001 From: Frederik Van der Veken Date: Mon, 26 Feb 2024 09:54:42 +0100 Subject: [PATCH 10/10] Updated version number to v0.1.1. --- pyproject.toml | 2 +- tests/test_version.py | 2 +- xaux/general.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5dec139..73b406a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "xaux" -version = "0.1.0" +version = "0.1.1" description = "Support tools for Xsuite packages" authors = ["Frederik Van der Veken "] license = "Apache 2.0" diff --git a/tests/test_version.py b/tests/test_version.py index ab14578..5f0071d 100644 --- a/tests/test_version.py +++ b/tests/test_version.py @@ -1,5 +1,5 @@ from xaux import __version__ def test_version(): - assert __version__ == '0.1.0' + assert __version__ == '0.1.1' diff --git a/xaux/general.py b/xaux/general.py index b37f78d..32a5808 100644 --- a/xaux/general.py +++ b/xaux/general.py @@ -10,5 +10,5 @@ # =================== # Do not change # =================== -__version__ = '0.1.0' +__version__ = '0.1.1' # ===================