From f9ecc961ba466f016d8ced97c1bf5059aa09a5da Mon Sep 17 00:00:00 2001 From: 0xE0F Date: Tue, 25 Feb 2025 09:03:38 +1100 Subject: [PATCH] DAOS-16355 pydaos: add dir object cache for Datasets (#15888) Dataset reads a lot of samples that are stored under just a few common directories, to reduce computation load on lookup for each sample file, this commit introduces a cache of directory objects. Signed-off-by: Denis Barakhtanov --- src/client/pydaos/torch/torch_api.py | 19 +- src/client/pydaos/torch/torch_shim.c | 408 ++++++++++++++++++++------- 2 files changed, 322 insertions(+), 105 deletions(-) diff --git a/src/client/pydaos/torch/torch_api.py b/src/client/pydaos/torch/torch_api.py index cfb802114b5..1bfcf3c3f6e 100644 --- a/src/client/pydaos/torch/torch_api.py +++ b/src/client/pydaos/torch/torch_api.py @@ -24,6 +24,7 @@ ITER_BATCH_SIZE = 32 READDIR_BATCH_SIZE = 128 PARALLEL_SCAN_WORKERS = 16 +DIR_CACHE_SIZE = 64 * 1024 def transform_fn_default(data): @@ -56,6 +57,8 @@ class Dataset(TorchDataset): Function to transform samples from storage to in-memory representation readdir_batch_size: int (optional) Number of directory entries to read for each readdir call. + dir_cache_size: int (optional) + Number of directory object entries to cache in memory. Methods @@ -78,12 +81,13 @@ class Dataset(TorchDataset): # pylint: disable=too-many-arguments def __init__(self, pool=None, cont=None, path=None, transform_fn=transform_fn_default, - readdir_batch_size=READDIR_BATCH_SIZE): + readdir_batch_size=READDIR_BATCH_SIZE, + dir_cache_size=DIR_CACHE_SIZE): super().__init__() self._pool = pool self._cont = cont - self._dfs = _Dfs(pool=pool, cont=cont) + self._dfs = _Dfs(pool=pool, cont=cont, dir_cache_size=dir_cache_size) self._transform_fn = transform_fn self._readdir_batch_size = readdir_batch_size @@ -171,6 +175,8 @@ class IterableDataset(TorchIterableDataset): Number of directory entries to read for each readdir call. batch_size: int (optional) Number of samples to fetch per iteration. + dir_cache_size: int (optional) + Number of directory object entries to cache in memory. Methods @@ -187,12 +193,13 @@ class IterableDataset(TorchIterableDataset): def __init__(self, pool=None, cont=None, path=None, transform_fn=transform_fn_default, readdir_batch_size=READDIR_BATCH_SIZE, - batch_size=ITER_BATCH_SIZE): + batch_size=ITER_BATCH_SIZE, + dir_cache_size=DIR_CACHE_SIZE): super().__init__() self._pool = pool self._cont = cont - self._dfs = _Dfs(pool=pool, cont=cont) + self._dfs = _Dfs(pool=pool, cont=cont, dir_cache_size=dir_cache_size) self._transform_fn = transform_fn self._readdir_batch_size = readdir_batch_size self._batch_size = batch_size @@ -506,14 +513,14 @@ class _Dfs(): Should not be used directly. """ - def __init__(self, pool=None, cont=None, rd_only=True): + def __init__(self, pool=None, cont=None, rd_only=True, dir_cache_size=DIR_CACHE_SIZE): if pool is None: raise ValueError("pool label or UUID is required") if cont is None: raise ValueError("container label or UUID is required") self._dc = DaosClient() - (ret, dfs) = torch_shim.torch_connect(DAOS_MAGIC, pool, cont, rd_only) + (ret, dfs) = torch_shim.torch_connect(DAOS_MAGIC, pool, cont, rd_only, dir_cache_size) if ret != 0: raise OSError(ret, os.strerror(ret), f"could not connect to {pool}:{cont}") diff --git a/src/client/pydaos/torch/torch_shim.c b/src/client/pydaos/torch/torch_shim.c index 45c8bf6fabb..83f9a85c7e0 100644 --- a/src/client/pydaos/torch/torch_shim.c +++ b/src/client/pydaos/torch/torch_shim.c @@ -21,17 +21,76 @@ #include #include #include +#include #define PY_SHIM_MAGIC_NUMBER (0x7A8B) #define EQ_POLL_BATCH_SIZE (64) struct dfs_handle { - int flags; - dfs_t *dfs; - d_iov_t global; + int flags; + dfs_t *dfs; + d_iov_t global; - daos_handle_t eq; - pid_t eq_owner_pid; + daos_handle_t eq; + pid_t eq_owner_pid; + + uint32_t dir_cache_size; + struct d_hash_table *dir_cache; +}; + +/* Cached directory object entry */ +struct dir_obj_cache_entry { + d_list_t entry; + dfs_obj_t *obj; + char name[PATH_MAX]; +}; + +static inline struct dir_obj_cache_entry * +dir_obj_cache_entry_from_link(d_list_t *rlink) +{ + return container_of(rlink, struct dir_obj_cache_entry, entry); +} + +static bool +dir_cache_key_cmp(struct d_hash_table *htable, d_list_t *rlink, const void *key, unsigned int ksize) +{ + struct dir_obj_cache_entry *h = dir_obj_cache_entry_from_link(rlink); + + return (strcmp(h->name, (const char *)key) == 0); +} + +static void +dir_cache_rec_free(struct d_hash_table *htable, d_list_t *rlink) +{ + struct dir_obj_cache_entry *h = dir_obj_cache_entry_from_link(rlink); + + int rc = dfs_release(h->obj); + if (rc) { + D_ERROR("Could not release object '%s': %s (rc=%d)", h->name, strerror(rc), rc); + } + free(h); +} + +static bool +dir_cache_rec_decref(struct d_hash_table *htable, d_list_t *rlink) +{ + return true; +} + +static uint32_t +dir_cache_rec_hash(struct d_hash_table *htable, d_list_t *rlink) +{ + struct dir_obj_cache_entry *h = dir_obj_cache_entry_from_link(rlink); + + return d_hash_string_u32(h->name, strnlen(h->name, PATH_MAX)); +} + +/* Directory object hash table operations */ +static d_hash_table_ops_t dir_cache_hash_ops = { + .hop_key_cmp = dir_cache_key_cmp, + .hop_rec_decref = dir_cache_rec_decref, + .hop_rec_free = dir_cache_rec_free, + .hop_rec_hash = dir_cache_rec_hash, }; /* Parse arguments and magic number. @@ -95,22 +154,73 @@ __shim_handle__module_fini(PyObject *self, PyObject *args) return PyLong_FromLong(rc); } +/* Hash table does not require lock: workers will be working on their own subsets of data +and will have their own handlers, including the cache. +*/ +static int +__dir_cache_create(struct dfs_handle *hdl) +{ + uint32_t bits = ceil(log2(hdl->dir_cache_size)); + + int rc = d_hash_table_create(D_HASH_FT_EPHEMERAL | D_HASH_FT_NOLOCK | D_HASH_FT_LRU, bits, + NULL, &dir_cache_hash_ops, &hdl->dir_cache); + if (rc) { + D_ERROR("Could not create directory cache's hash table: %s (rc=%d)", d_errstr(rc), + rc); + rc = daos_der2errno(rc); + } + + return rc; +} + +static int +__dir_cache_destroy(struct dfs_handle *hdl) +{ + int rc = 0; + + /* Before finalizing the handle, we need to release all cached directory objects, + by dropping the reference count we allow the hash table to free the memory via rec_free + callback + */ + while (true) { + d_list_t *rlink = d_hash_rec_first(hdl->dir_cache); + if (rlink == NULL) + break; + + d_hash_rec_decref(hdl->dir_cache, rlink); + } + + rc = d_hash_table_destroy(hdl->dir_cache, false); + if (rc) { + D_ERROR("Could not destroy directory objects cache hash table: %s (rc=%d)", + d_errstr(rc), rc); + rc = daos_der2errno(rc); + } + + return rc; +} + static PyObject * __shim_handle__torch_connect(PyObject *self, PyObject *args) { - int rc = 0; - int rc2 = 0; - char *pool = NULL; - char *cont = NULL; - int rd_only = 1; - struct dfs_handle *hdl = NULL; + int rc = 0; + int rc2 = 0; + char *pool = NULL; + char *cont = NULL; + int rd_only = 1; + uint32_t dir_cache_size = 0; + struct dfs_handle *hdl = NULL; PyObject *result = PyList_New(2); if (result == NULL) { return PyErr_NoMemory(); } - RETURN_NULL_IF_FAILED_TO_PARSE(args, "ssp", &pool, &cont, &rd_only); + RETURN_NULL_IF_FAILED_TO_PARSE(args, "sspI", &pool, &cont, &rd_only, &dir_cache_size); + + if (dir_cache_size == 0) { + return PyLong_FromLong(EINVAL); + } rc = dfs_init(); if (rc) { @@ -123,7 +233,8 @@ __shim_handle__torch_connect(PyObject *self, PyObject *args) rc = ENOMEM; goto out; } - hdl->flags = rd_only ? O_RDONLY : O_RDWR; + hdl->flags = rd_only ? O_RDONLY : O_RDWR; + hdl->dir_cache_size = dir_cache_size; rc = dfs_connect(pool, NULL, cont, hdl->flags, NULL, &hdl->dfs); if (rc) { @@ -160,6 +271,11 @@ __shim_handle__torch_connect(PyObject *self, PyObject *args) } hdl->eq_owner_pid = getpid(); + rc = __dir_cache_create(hdl); + if (rc) { + goto out; + } + PyList_SetItem(result, 0, PyLong_FromLong(rc)); PyList_SetItem(result, 1, PyLong_FromVoidPtr(hdl)); @@ -211,6 +327,11 @@ __shim_handle__torch_disconnect(PyObject *self, PyObject *args) goto out; } + rc = __dir_cache_destroy(hdl); + if (rc) { + goto out; + } + rc = dfs_fini(); if (rc) { D_ERROR("Could not finalize DFS: %s (rc=%d)", strerror(rc), rc); @@ -262,9 +383,59 @@ __shim_handle__torch_worker_init(PyObject *self, PyObject *args) rc = daos_der2errno(rc); } + rc = __dir_cache_create(hdl); + return PyLong_FromLong(rc); } +static int +lookup_or_insert_dir_obj(struct dfs_handle *hdl, const char *name, dfs_obj_t **obj) +{ + int rc = 0; + d_list_t *rlink = NULL; + size_t len = strnlen(name, PATH_MAX); + struct dir_obj_cache_entry *rec = NULL; + + assert(obj != NULL); + assert(hdl->dir_cache != NULL); + + rlink = d_hash_rec_find(hdl->dir_cache, name, len); + if (rlink != NULL) { + rec = dir_obj_cache_entry_from_link(rlink); + *obj = rec->obj; + return 0; + } + + rc = dfs_lookup(hdl->dfs, name, hdl->flags, obj, NULL, NULL); + if (rc) { + return rc; + } + + rec = calloc(1, sizeof(struct dir_obj_cache_entry)); + if (rec == NULL) { + return ENOMEM; + } + + rec->obj = *obj; + strncpy(rec->name, name, len); + + rc = d_hash_rec_insert(hdl->dir_cache, rec->name, len, &rec->entry, false); + if (rc) { + D_ERROR("Failed to insert dir handle in hashtable: '%s': %s (rc=%d)", name, + d_errstr(rc), rc); + rc = daos_der2errno(rc); + + int rc2 = dfs_release(rec->obj); + if (rc2) { + D_ERROR("Could not release object '%s': %s (rc=%d)", name, strerror(rc2), + rc2); + } + free(rec); + } + + return rc; +} + static PyObject * __shim_handle__torch_recommended_dir_split(PyObject *self, PyObject *args) { @@ -272,22 +443,18 @@ __shim_handle__torch_recommended_dir_split(PyObject *self, PyObject *args) char *path = NULL; dfs_obj_t *obj = NULL; uint32_t nr = 0; + int rc = 0; RETURN_NULL_IF_FAILED_TO_PARSE(args, "Ls", &hdl, &path); assert(hdl->dfs != NULL); - int rc = dfs_lookup(hdl->dfs, path, O_RDONLY, &obj, NULL, NULL); + rc = lookup_or_insert_dir_obj(hdl, path, &obj); if (rc) { return Py_BuildValue("iI", rc, nr); } rc = dfs_obj_anchor_split(obj, &nr, NULL); - if (rc) { - return Py_BuildValue("iI", rc, nr); - } - - rc = dfs_release(obj); return Py_BuildValue("iI", rc, nr); } @@ -328,7 +495,7 @@ __shim_handle__torch_list_with_anchor(PyObject *self, PyObject *args) goto out; } - rc = dfs_lookup(hdl->dfs, path, O_RDONLY, &obj, NULL, NULL); + rc = lookup_or_insert_dir_obj(hdl, path, &obj); if (rc) { D_ERROR("Could not lookup object at '%s': %s (rc=%d)", path, strerror(rc), rc); goto out; @@ -396,17 +563,65 @@ __shim_handle__torch_list_with_anchor(PyObject *self, PyObject *args) return PyLong_FromLong(rc); } +static int +split_path(const char *path, char **dir, char **name) +{ + assert(dir != NULL); + assert(name != NULL); + + int rc = 0; + char *cp1 = NULL; + char *cp2 = NULL; + char *dir_name = NULL; + char *file_name = NULL; + + D_STRNDUP(cp1, path, PATH_MAX); + if (cp1 == NULL) { + return ENOMEM; + } + D_STRNDUP(cp2, path, PATH_MAX); + if (cp2 == NULL) { + rc = ENOMEM; + goto out; + } + + dir_name = dirname(cp1); + file_name = basename(cp2); + + D_STRNDUP(*dir, dir_name, PATH_MAX); + if (*dir == NULL) { + rc = ENOMEM; + goto out; + } + D_STRNDUP(*name, file_name, PATH_MAX); + if (*name == NULL) { + D_FREE(*dir); + rc = ENOMEM; + goto out; + } + +out: + D_FREE(cp1); + D_FREE(cp2); + + return rc; +} + static PyObject * __shim_handle__torch_read(PyObject *self, PyObject *args) { - ssize_t rc = 0; - struct dfs_handle *hdl = NULL; - char *path = NULL; - dfs_obj_t *obj = NULL; - PyObject *buffer = NULL; + ssize_t rc = 0; + struct dfs_handle *hdl = NULL; + char *path = NULL; + char *dir_name = NULL; + char *file_name = NULL; + dfs_obj_t *obj = NULL; + dfs_obj_t *parent = NULL; + PyObject *buffer = NULL; Py_buffer bview; d_iov_t iov; daos_size_t read = 0; + mode_t mode = S_IFREG; RETURN_NULL_IF_FAILED_TO_PARSE(args, "LsO", &hdl, &path, &buffer); assert(hdl->dfs != NULL); @@ -432,6 +647,23 @@ __shim_handle__torch_read(PyObject *self, PyObject *args) return NULL; } + rc = split_path(path, &dir_name, &file_name); + if (rc) { + goto out; + } + + rc = lookup_or_insert_dir_obj(hdl, dir_name, &parent); + if (rc) { + D_ERROR("Could not lookup '%s': %s (rc=%ld)", path, strerror(rc), rc); + goto out; + } + + rc = dfs_open(hdl->dfs, parent, file_name, mode, O_RDONLY, 0, 0, NULL, &obj); + if (rc) { + D_ERROR("Could not open '%s': %s (rc=%ld)", path, strerror(rc), rc); + goto out; + } + read = bview.len; d_iov_set(&iov, bview.buf, read); @@ -441,12 +673,6 @@ __shim_handle__torch_read(PyObject *self, PyObject *args) .sg_iovs = &iov, }; - rc = dfs_lookup(hdl->dfs, path, O_RDONLY, &obj, NULL, NULL); - if (rc) { - D_ERROR("Could not lookup '%s': %s (rc=%ld)", path, strerror(rc), rc); - goto out; - } - rc = dfs_read(hdl->dfs, obj, &sgl, 0 /* offset */, &read, NULL); if (rc) { goto out; @@ -456,6 +682,8 @@ __shim_handle__torch_read(PyObject *self, PyObject *args) } out: + D_FREE(dir_name); + D_FREE(file_name); PyBuffer_Release(&bview); if (obj) { @@ -499,6 +727,11 @@ start_read_op(struct dfs_handle *hdl, PyObject *item, struct io_op *op) int rc2 = 0; daos_event_t *evp = &op->ev; + char *dir_name = NULL; + char *file_name = NULL; + dfs_obj_t *parent = NULL; + mode_t mode = S_IFREG; + PyObject *py_path = PyTuple_GetItem(item, 0); PyObject *py_buff = PyTuple_GetItem(item, 1); @@ -521,20 +754,31 @@ start_read_op(struct dfs_handle *hdl, PyObject *item, struct io_op *op) if (!PyBuffer_IsContiguous(&op->buf_view, 'C')) { D_ERROR("Buffer for '%s' is not contiguous", path); rc = EINVAL; - goto out; + goto err; } rc = daos_event_init(evp, hdl->eq, NULL); if (rc) { D_ERROR("Could not init event: %s (rc=%d)", d_errstr(rc), rc); rc = daos_der2errno(rc); - goto out; + goto err; } - rc = dfs_lookup(hdl->dfs, path, O_RDONLY, &op->obj, NULL, NULL); + rc = split_path(path, &dir_name, &file_name); if (rc) { - D_ERROR("Could not lookup path '%s': %s (rc=%d)", op->path, strerror(rc), rc); - goto out; + goto err; + } + + rc = lookup_or_insert_dir_obj(hdl, dir_name, &parent); + if (rc) { + D_ERROR("Could not lookup '%s': %s (rc=%d)", dir_name, strerror(rc), rc); + goto err; + } + + rc = dfs_open(hdl->dfs, parent, file_name, mode, O_RDONLY, 0, 0, NULL, &op->obj); + if (rc) { + D_ERROR("Could not open '%s': %s (rc=%d)", op->path, strerror(rc), rc); + goto err; } op->path = path; @@ -549,12 +793,17 @@ start_read_op(struct dfs_handle *hdl, PyObject *item, struct io_op *op) if (rc) { D_ERROR("Could not start async read on '%s': %s (rc=%d)", op->path, strerror(rc), rc); - goto out; + goto err; } + D_FREE(dir_name); + D_FREE(file_name); + return 0; -out: +err: + D_FREE(dir_name); + D_FREE(file_name); PyBuffer_Release(&op->buf_view); rc2 = daos_event_fini(&op->ev); @@ -686,50 +935,6 @@ __shim_handle__torch_batch_read(PyObject *self, PyObject *args) return PyLong_FromLong(rc); } -static int -split_path(const char *path, char **dir, char **name) -{ - assert(dir != NULL); - assert(name != NULL); - - int rc = 0; - char *cp1 = NULL; - char *cp2 = NULL; - char *dir_name = NULL; - char *file_name = NULL; - - D_STRNDUP(cp1, path, PATH_MAX); - if (cp1 == NULL) { - return ENOMEM; - } - D_STRNDUP(cp2, path, PATH_MAX); - if (cp2 == NULL) { - rc = ENOMEM; - goto out; - } - - dir_name = dirname(cp1); - file_name = basename(cp2); - - D_STRNDUP(*dir, dir_name, PATH_MAX); - if (*dir == NULL) { - rc = ENOMEM; - goto out; - } - D_STRNDUP(*name, file_name, PATH_MAX); - if (*name == NULL) { - D_FREE(*dir); - rc = ENOMEM; - goto out; - } - -out: - D_FREE(cp1); - D_FREE(cp2); - - return rc; -} - static PyObject * __shim_handle__torch_write(PyObject *self, PyObject *args) { @@ -739,7 +944,7 @@ __shim_handle__torch_write(PyObject *self, PyObject *args) char *path = NULL; char *dir_name = NULL; char *file_name = NULL; - dfs_obj_t *dir = NULL; + dfs_obj_t *parent = NULL; dfs_obj_t *obj = NULL; PyObject *buffer = NULL; int oflags = 0; @@ -778,13 +983,13 @@ __shim_handle__torch_write(PyObject *self, PyObject *args) goto out; } - rc = dfs_lookup(hdl->dfs, dir_name, O_RDWR, &dir, NULL, NULL); + rc = lookup_or_insert_dir_obj(hdl, dir_name, &parent); if (rc) { D_ERROR("Could not lookup '%s': %s (rc=%d)", dir_name, strerror(rc), rc); goto out; } - rc = dfs_open(hdl->dfs, dir, file_name, mode, oflags, cid, chunk_size, NULL, &obj); + rc = dfs_open(hdl->dfs, parent, file_name, mode, oflags, cid, chunk_size, NULL, &obj); if (rc) { D_ERROR("Could not open '%s': %s (rc=%d)", path, strerror(rc), rc); goto out; @@ -815,13 +1020,6 @@ __shim_handle__torch_write(PyObject *self, PyObject *args) rc2); } } - if (dir) { - rc2 = dfs_release(dir); - if (rc2) { - D_ERROR("Could not release dir '%s': %s (rc=%d)", dir_name, strerror(rc2), - rc2); - } - } D_FREE(dir_name); D_FREE(file_name); @@ -832,21 +1030,33 @@ __shim_handle__torch_write(PyObject *self, PyObject *args) static PyObject * __shim_handle__torch_get_fsize(PyObject *self, PyObject *args) { - struct dfs_handle *hdl = NULL; - char *path = NULL; - dfs_obj_t *obj = NULL; - struct stat st = {0}; + struct dfs_handle *hdl = NULL; + char *path = NULL; + char *dir_name = NULL; + char *file_name = NULL; + dfs_obj_t *parent = NULL; + struct stat st = {0}; RETURN_NULL_IF_FAILED_TO_PARSE(args, "Ls", &hdl, &path); assert(hdl->dfs != NULL); - int rc = dfs_lookup(hdl->dfs, path, O_RDONLY, &obj, NULL, &st); + int rc = split_path(path, &dir_name, &file_name); + if (rc) { + return Py_BuildValue("iK", rc, 0); + } + + rc = lookup_or_insert_dir_obj(hdl, dir_name, &parent); if (rc) { - return Py_BuildValue("iK", rc, st.st_size); + D_ERROR("Could not lookup '%s': %s (rc=%d)", dir_name, strerror(rc), rc); + goto out; } - rc = dfs_release(obj); + rc = dfs_stat(hdl->dfs, parent, file_name, &st); + +out: + D_FREE(dir_name); + D_FREE(file_name); return Py_BuildValue("iK", rc, st.st_size); }