diff --git a/pyiron_base/database/filetable.py b/pyiron_base/database/filetable.py index 26fbbf287..1a813f552 100644 --- a/pyiron_base/database/filetable.py +++ b/pyiron_base/database/filetable.py @@ -2,10 +2,10 @@ import os import pandas import datetime -import h5io from pyfileindex import PyFileIndex from pyiron_base.interfaces.singleton import Singleton from pyiron_base.database.generic import IsDatabase +from pyiron_base.storage.hdfio import write_hdf5, read_hdf5 table_columns = { "id": None, @@ -322,7 +322,7 @@ def get_job_status(self, job_id): def set_job_status(self, job_id, status): db_entry = self.get_item_by_id(item_id=job_id) self._job_table.loc[self._job_table.id == job_id, "status"] = status - h5io.write_hdf5( + write_hdf5( db_entry["project"] + db_entry["subjob"] + ".h5", status, title=db_entry["subjob"][1:] + "/status", @@ -356,15 +356,15 @@ def get_extract(path, mtime): def get_hamilton_from_file(hdf5_file, job_name): - return h5io.read_hdf5(hdf5_file, job_name + "/TYPE").split(".")[-1].split("'")[0] + return read_hdf5(hdf5_file, job_name + "/TYPE").split(".")[-1].split("'")[0] def get_hamilton_version_from_file(hdf5_file, job_name): - return h5io.read_hdf5(hdf5_file, job_name + "/VERSION") + return read_hdf5(hdf5_file, job_name + "/VERSION") def get_job_status_from_file(hdf5_file, job_name): if os.path.exists(hdf5_file): - return h5io.read_hdf5(hdf5_file, job_name + "/status") + return read_hdf5(hdf5_file, job_name + "/status") else: return None diff --git a/pyiron_base/jobs/job/generic.py b/pyiron_base/jobs/job/generic.py index 8ba0dbdb3..bd2728891 100644 --- a/pyiron_base/jobs/job/generic.py +++ b/pyiron_base/jobs/job/generic.py @@ -8,7 +8,6 @@ from datetime import datetime import os import posixpath -import h5io import signal import warnings @@ -46,6 +45,7 @@ from pyiron_base.utils.deprecate import deprecate from pyiron_base.jobs.job.extension.server.generic import Server from pyiron_base.database.filetable import FileTable +from pyiron_base.storage.hdfio import write_hdf5, read_hdf5 __author__ = "Joerg Neugebauer, Jan Janssen" __copyright__ = ( @@ -182,12 +182,12 @@ def __init__(self, project, job_name): self._status = JobStatus(db=project.db, job_id=self.job_id) self.refresh_job_status() elif os.path.exists(self.project_hdf5.file_name): - initial_status = h5io.read_hdf5( + initial_status = read_hdf5( self.project_hdf5.file_name, job_name + "/status" ) self._status = JobStatus(initial_status=initial_status) if "job_id" in self.list_nodes(): - self._job_id = h5io.read_hdf5( + self._job_id = read_hdf5( self.project_hdf5.file_name, job_name + "/job_id" ) else: @@ -470,7 +470,7 @@ def refresh_job_status(self): ) elif state.database.database_is_disabled: self._status = JobStatus( - initial_status=h5io.read_hdf5( + initial_status=read_hdf5( self.project_hdf5.file_name, self.job_name + "/status" ) ) @@ -1132,7 +1132,7 @@ def save(self): if not state.database.database_is_disabled: job_id = self.project.db.add_item_dict(self.db_entry()) self._job_id = job_id - h5io.write_hdf5( + write_hdf5( self.project_hdf5.file_name, job_id, title=self.job_name + "/job_id", diff --git a/pyiron_base/storage/hdfio.py b/pyiron_base/storage/hdfio.py index 8cb5247f3..0de4f8910 100644 --- a/pyiron_base/storage/hdfio.py +++ b/pyiron_base/storage/hdfio.py @@ -150,7 +150,7 @@ def __getitem__(self, item): # underlying file once, this reduces the number of file opens in the most-likely case from 2 to 1 (1 to # check whether the data is there and 1 to read it) and increases in the worst case from 1 to 2 (1 to # try to read it here and one more time to verify it's not a group below). - obj = h5io.read_hdf5(self.file_name, title=self._get_h5_path(item)) + obj = read_hdf5(self.file_name, title=self._get_h5_path(item)) if self._is_convertable_dtype_object_array(obj): obj = self._convert_dtype_obj_array(obj.copy()) return obj @@ -258,23 +258,13 @@ def __setitem__(self, key, value): use_json = False elif isinstance(value, tuple): value = list(value) - try: - h5io.write_hdf5( - self.file_name, - value, - title=self._get_h5_path(key), - overwrite="update", - use_json=use_json, - ) - except BlockingIOError: - time.sleep(1) - h5io.write_hdf5( - self.file_name, - value, - title=self._get_h5_path(key), - overwrite="update", - use_json=use_json, - ) + write_hdf5( + self.file_name, + value, + title=self._get_h5_path(key), + overwrite="update", + use_json=use_json, + ) def __delitem__(self, key): """ @@ -912,7 +902,7 @@ def _read(self, item): Returns: dict, list, float, int: data or data object """ - return h5io.read_hdf5(self.file_name, title=self._get_h5_path(item)) + return read_hdf5(self.file_name, title=self._get_h5_path(item)) # def _open_store(self, mode="r"): # """ @@ -1481,3 +1471,67 @@ def create_project_from_hdf5(self): Project: pyiron project object """ return self._project.__class__(path=self.file_path) + + +def read_hdf5(fname, title="h5io", slash="ignore", _counter=0): + try: + return h5io.read_hdf5(fname=fname, title=title, slash=slash) + except BlockingIOError: + state.logger.warn( + "Two or more processes tried to access the file " + + fname + + ". Try again in 1sec. It is the " + + _counter + + " time." + ) + if _counter < 10: + time.sleep(1) + return read_hdf5( + fname=fname, title=title, slash=slash, _counter=_counter + 1 + ) + else: + raise BlockingIOError("Tried 10 times, but still get a BlockingIOError") + + +def write_hdf5( + fname, + data, + overwrite=False, + compression=4, + title="h5io", + slash="error", + use_json=False, + _counter=0, +): + try: + h5io.write_hdf5( + fname=fname, + data=data, + overwrite=overwrite, + compression=compression, + title=title, + slash=slash, + use_json=use_json, + ) + except BlockingIOError: + state.logger.warn( + "Two or more processes tried to access the file " + + fname + + ". Try again in 1sec. It is the " + + _counter + + " time." + ) + if _counter < 10: + time.sleep(1) + write_hdf5( + fname=fname, + data=data, + overwrite=overwrite, + compression=compression, + title=title, + slash=slash, + use_json=use_json, + _counter=_counter + 1, + ) + else: + raise BlockingIOError("Tried 10 times, but still get a BlockingIOError")