Skip to content

Commit

Permalink
Merge pull request #889 from pyiron/h5io_buffer
Browse files Browse the repository at this point in the history
hdfio: If the initial write fails with a BlockIOError try again after 1 second.
  • Loading branch information
jan-janssen authored Nov 21, 2022
2 parents 72e9d93 + 1231944 commit c7157b6
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 29 deletions.
10 changes: 5 additions & 5 deletions pyiron_base/database/filetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import os
import pandas
import datetime
import h5io
from pyfileindex import PyFileIndex
from pyiron_base.interfaces.singleton import Singleton
from pyiron_base.database.generic import IsDatabase
from pyiron_base.storage.hdfio import write_hdf5, read_hdf5

table_columns = {
"id": None,
Expand Down Expand Up @@ -322,7 +322,7 @@ def get_job_status(self, job_id):
def set_job_status(self, job_id, status):
db_entry = self.get_item_by_id(item_id=job_id)
self._job_table.loc[self._job_table.id == job_id, "status"] = status
h5io.write_hdf5(
write_hdf5(
db_entry["project"] + db_entry["subjob"] + ".h5",
status,
title=db_entry["subjob"][1:] + "/status",
Expand Down Expand Up @@ -356,15 +356,15 @@ def get_extract(path, mtime):


def get_hamilton_from_file(hdf5_file, job_name):
return h5io.read_hdf5(hdf5_file, job_name + "/TYPE").split(".")[-1].split("'")[0]
return read_hdf5(hdf5_file, job_name + "/TYPE").split(".")[-1].split("'")[0]


def get_hamilton_version_from_file(hdf5_file, job_name):
return h5io.read_hdf5(hdf5_file, job_name + "/VERSION")
return read_hdf5(hdf5_file, job_name + "/VERSION")


def get_job_status_from_file(hdf5_file, job_name):
if os.path.exists(hdf5_file):
return h5io.read_hdf5(hdf5_file, job_name + "/status")
return read_hdf5(hdf5_file, job_name + "/status")
else:
return None
10 changes: 5 additions & 5 deletions pyiron_base/jobs/job/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from datetime import datetime
import os
import posixpath
import h5io
import signal
import warnings

Expand Down Expand Up @@ -46,6 +45,7 @@
from pyiron_base.utils.deprecate import deprecate
from pyiron_base.jobs.job.extension.server.generic import Server
from pyiron_base.database.filetable import FileTable
from pyiron_base.storage.hdfio import write_hdf5, read_hdf5

__author__ = "Joerg Neugebauer, Jan Janssen"
__copyright__ = (
Expand Down Expand Up @@ -182,12 +182,12 @@ def __init__(self, project, job_name):
self._status = JobStatus(db=project.db, job_id=self.job_id)
self.refresh_job_status()
elif os.path.exists(self.project_hdf5.file_name):
initial_status = h5io.read_hdf5(
initial_status = read_hdf5(
self.project_hdf5.file_name, job_name + "/status"
)
self._status = JobStatus(initial_status=initial_status)
if "job_id" in self.list_nodes():
self._job_id = h5io.read_hdf5(
self._job_id = read_hdf5(
self.project_hdf5.file_name, job_name + "/job_id"
)
else:
Expand Down Expand Up @@ -470,7 +470,7 @@ def refresh_job_status(self):
)
elif state.database.database_is_disabled:
self._status = JobStatus(
initial_status=h5io.read_hdf5(
initial_status=read_hdf5(
self.project_hdf5.file_name, self.job_name + "/status"
)
)
Expand Down Expand Up @@ -1132,7 +1132,7 @@ def save(self):
if not state.database.database_is_disabled:
job_id = self.project.db.add_item_dict(self.db_entry())
self._job_id = job_id
h5io.write_hdf5(
write_hdf5(
self.project_hdf5.file_name,
job_id,
title=self.job_name + "/job_id",
Expand Down
92 changes: 73 additions & 19 deletions pyiron_base/storage/hdfio.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def __getitem__(self, item):
# underlying file once, this reduces the number of file opens in the most-likely case from 2 to 1 (1 to
# check whether the data is there and 1 to read it) and increases in the worst case from 1 to 2 (1 to
# try to read it here and one more time to verify it's not a group below).
obj = h5io.read_hdf5(self.file_name, title=self._get_h5_path(item))
obj = read_hdf5(self.file_name, title=self._get_h5_path(item))
if self._is_convertable_dtype_object_array(obj):
obj = self._convert_dtype_obj_array(obj.copy())
return obj
Expand Down Expand Up @@ -258,23 +258,13 @@ def __setitem__(self, key, value):
use_json = False
elif isinstance(value, tuple):
value = list(value)
try:
h5io.write_hdf5(
self.file_name,
value,
title=self._get_h5_path(key),
overwrite="update",
use_json=use_json,
)
except BlockingIOError:
time.sleep(1)
h5io.write_hdf5(
self.file_name,
value,
title=self._get_h5_path(key),
overwrite="update",
use_json=use_json,
)
write_hdf5(
self.file_name,
value,
title=self._get_h5_path(key),
overwrite="update",
use_json=use_json,
)

def __delitem__(self, key):
"""
Expand Down Expand Up @@ -912,7 +902,7 @@ def _read(self, item):
Returns:
dict, list, float, int: data or data object
"""
return h5io.read_hdf5(self.file_name, title=self._get_h5_path(item))
return read_hdf5(self.file_name, title=self._get_h5_path(item))

# def _open_store(self, mode="r"):
# """
Expand Down Expand Up @@ -1481,3 +1471,67 @@ def create_project_from_hdf5(self):
Project: pyiron project object
"""
return self._project.__class__(path=self.file_path)


def read_hdf5(fname, title="h5io", slash="ignore", _counter=0):
try:
return h5io.read_hdf5(fname=fname, title=title, slash=slash)
except BlockingIOError:
state.logger.warn(
"Two or more processes tried to access the file "
+ fname
+ ". Try again in 1sec. It is the "
+ _counter
+ " time."
)
if _counter < 10:
time.sleep(1)
return read_hdf5(
fname=fname, title=title, slash=slash, _counter=_counter + 1
)
else:
raise BlockingIOError("Tried 10 times, but still get a BlockingIOError")


def write_hdf5(
fname,
data,
overwrite=False,
compression=4,
title="h5io",
slash="error",
use_json=False,
_counter=0,
):
try:
h5io.write_hdf5(
fname=fname,
data=data,
overwrite=overwrite,
compression=compression,
title=title,
slash=slash,
use_json=use_json,
)
except BlockingIOError:
state.logger.warn(
"Two or more processes tried to access the file "
+ fname
+ ". Try again in 1sec. It is the "
+ _counter
+ " time."
)
if _counter < 10:
time.sleep(1)
write_hdf5(
fname=fname,
data=data,
overwrite=overwrite,
compression=compression,
title=title,
slash=slash,
use_json=use_json,
_counter=_counter + 1,
)
else:
raise BlockingIOError("Tried 10 times, but still get a BlockingIOError")

0 comments on commit c7157b6

Please sign in to comment.