diff --git a/src/client/dfuse/dfuse.h b/src/client/dfuse/dfuse.h index 6f6d48f3320..bb375032495 100644 --- a/src/client/dfuse/dfuse.h +++ b/src/client/dfuse/dfuse.h @@ -1133,7 +1133,7 @@ dfuse_cache_evict_dir(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *i * Returns true if feature was used. */ bool -read_chunk_close(struct dfuse_inode_entry *ie); +read_chunk_close(struct active_inode *active); /* Metadata caching functions. */ diff --git a/src/client/dfuse/file.c b/src/client/dfuse/file.c index f18fbde0c56..6a86134d628 100644 --- a/src/client/dfuse/file.c +++ b/src/client/dfuse/file.c @@ -96,7 +96,7 @@ active_oh_decref(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh) if (oc != 1) goto out; - rcb = read_chunk_close(oh->doh_ie); + rcb = read_chunk_close(oh->doh_ie->ie_active); ah_free(dfuse_info, oh->doh_ie); out: @@ -118,7 +118,7 @@ active_ie_decref(struct dfuse_info *dfuse_info, struct dfuse_inode_entry *ie) if (oc != 1) goto out; - read_chunk_close(ie); + read_chunk_close(ie->ie_active); ah_free(dfuse_info, ie); out: diff --git a/src/client/dfuse/ops/read.c b/src/client/dfuse/ops/read.c index 46286e7ae11..d9bba35ebb3 100644 --- a/src/client/dfuse/ops/read.c +++ b/src/client/dfuse/ops/read.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2024 Intel Corporation. + * (C) Copyright 2016-2025 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -129,7 +129,7 @@ dfuse_readahead_reply(fuse_req_t req, size_t len, off_t position, struct dfuse_o } if (((position % K128) == 0) && ((len % K128) == 0)) { - DFUSE_TRA_INFO(oh, "allowing out-of-order pre read"); + DFUSE_TRA_DEBUG(oh, "allowing out-of-order pre read"); /* Do not closely track the read position in this case, just the maximum, * later checks will determine if the file is read to the end. */ @@ -173,38 +173,33 @@ pick_eqt(struct dfuse_info *dfuse_info) * * This code is entered when caching is enabled and reads are correctly size/aligned and not in the * last CHUNK_SIZE of a file. When open then the inode contains a single read_chunk_core pointer - * and this contains a list of read_chunk_data entries, one for each bucket. Buckets where all - * slots have been requested are remove from the list and closed when the last request is completed. + * and this contains a list of read_chunk_data entries, one for each bucket. * - * TODO: Currently there is no code to remove partially read buckets from the list so reading - * one slot every chunk would leave the entire file contents in memory until close and mean long - * list traversal times. + * TODO: Currently there is no code to remove buckets from the list so all buckets will remain in + * memory until close. */ #define CHUNK_SIZE (1024 * 1024) struct read_chunk_data { struct dfuse_event *ev; + bool slot_done[8]; struct active_inode *ia; - fuse_req_t reqs[8]; - struct dfuse_obj_hdl *ohs[8]; d_list_t list; uint64_t bucket; struct dfuse_eq *eqt; int rc; int entered; - ATOMIC int exited; - bool exiting; bool complete; + d_list_t req_list; }; -static void -chunk_free(struct read_chunk_data *cd) -{ - d_list_del(&cd->list); - d_slab_release(cd->eqt->de_read_slab, cd->ev); - D_FREE(cd); -} +struct read_chunk_req { + d_list_t req_list; + struct dfuse_obj_hdl *oh; + fuse_req_t req; + int slot; +}; /* Called when the last open file handle on a inode is closed. This needs to free everything which * is complete and for anything that isn't flag it for deletion in the callback. @@ -212,27 +207,20 @@ chunk_free(struct read_chunk_data *cd) * Returns true if the feature was used. */ bool -read_chunk_close(struct dfuse_inode_entry *ie) +read_chunk_close(struct active_inode *active) { struct read_chunk_data *cd, *cdn; - bool rcb = false; - D_SPIN_LOCK(&ie->ie_active->lock); - if (d_list_empty(&ie->ie_active->chunks)) - goto out; - - rcb = true; + if (d_list_empty(&active->chunks)) + return false; - d_list_for_each_entry_safe(cd, cdn, &ie->ie_active->chunks, list) { - if (cd->complete) { - chunk_free(cd); - } else { - cd->exiting = true; - } + d_list_for_each_entry_safe(cd, cdn, &active->chunks, list) { + D_ASSERT(cd->complete); + d_list_del(&cd->list); + d_slab_release(cd->eqt->de_read_slab, cd->ev); + D_FREE(cd); } -out: - D_SPIN_UNLOCK(&ie->ie_active->lock); - return rcb; + return true; } static void @@ -240,119 +228,60 @@ chunk_cb(struct dfuse_event *ev) { struct read_chunk_data *cd = ev->de_cd; struct active_inode *ia = cd->ia; - fuse_req_t req; - bool done = false; + struct read_chunk_req *cr; + d_list_t tmp_list = D_LIST_HEAD_INIT(tmp_list); + struct read_chunk_req *crn; cd->rc = ev->de_ev.ev_error; if (cd->rc == 0 && (ev->de_len != CHUNK_SIZE)) { - cd->rc = EIO; DS_WARN(cd->rc, "Unexpected short read bucket %ld (%#zx) expected %i got %zi", cd->bucket, cd->bucket * CHUNK_SIZE, CHUNK_SIZE, ev->de_len); } daos_event_fini(&ev->de_ev); - do { - int i; - req = 0; + /* Mark as complete so no more get put on list */ + D_SPIN_LOCK(&ia->lock); + cd->complete = true; - D_SPIN_LOCK(&ia->lock); + /* Mark the slot as replied to. There's a race here as the slot hasn't been replied to + * however references are dropped by the DFUSE_REPLY macros below so an extra ref on active + * would be required. The danger is that the bucket gets put on the end of the list rather + * than the start. + */ + d_list_for_each_entry(cr, &cd->req_list, req_list) + cd->slot_done[cr->slot] = true; - if (cd->exiting) { - chunk_free(cd); - D_SPIN_UNLOCK(&ia->lock); - return; - } + d_list_splice_init(&cd->req_list, &tmp_list); - cd->complete = true; - for (i = 0; i < 8; i++) { - if (cd->reqs[i]) { - req = cd->reqs[i]; - cd->reqs[i] = 0; - break; - } - } + D_SPIN_UNLOCK(&ia->lock); - D_SPIN_UNLOCK(&ia->lock); + d_list_for_each_entry_safe(cr, crn, &tmp_list, req_list) { + size_t position = (cd->bucket * CHUNK_SIZE) + (cr->slot * K128); + size_t len; - if (req) { - size_t position = (cd->bucket * CHUNK_SIZE) + (i * K128); + DFUSE_TRA_DEBUG(cr->oh, "Replying for %ld[%d]", cd->bucket, cr->slot); - if (cd->rc != 0) { - DFUSE_REPLY_ERR_RAW(cd->ohs[i], req, cd->rc); - } else { - DFUSE_TRA_DEBUG(cd->ohs[i], "%#zx-%#zx read", position, - position + K128 - 1); - DFUSE_REPLY_BUFQ(cd->ohs[i], req, ev->de_iov.iov_buf + (i * K128), - K128); - } + /* Delete from the list before replying as there's no reference held otherwise */ + d_list_del(&cr->req_list); - if (atomic_fetch_add_relaxed(&cd->exited, 1) == 7) - done = true; + if (cd->rc != 0) { + DFUSE_REPLY_ERR_RAW(cr->oh, cr->req, cd->rc); + } else { + if ((((cr->slot + 1) * K128) - 1) >= ev->de_len) + len = max(ev->de_len - (cr->slot * K128), 0); + else + len = K128; + + DFUSE_TRA_DEBUG(cr->oh, "%#zx-%#zx read", position, position + len - 1); + DFUSE_REPLY_BUFQ(cr->oh, cr->req, ev->de_iov.iov_buf + (cr->slot * K128), + len); } - } while (req && !done); - - if (done) { - d_slab_release(cd->eqt->de_read_slab, cd->ev); - D_FREE(cd); + D_FREE(cr); } } -/* Submut a read to dfs. - * - * Returns true on success. - */ -static bool -chunk_fetch(fuse_req_t req, struct dfuse_obj_hdl *oh, struct read_chunk_data *cd, int slot) -{ - struct dfuse_info *dfuse_info = fuse_req_userdata(req); - struct dfuse_inode_entry *ie = oh->doh_ie; - struct dfuse_event *ev; - struct dfuse_eq *eqt; - int rc; - daos_off_t position = cd->bucket * CHUNK_SIZE; - - eqt = pick_eqt(dfuse_info); - - ev = d_slab_acquire(eqt->de_read_slab); - if (ev == NULL) { - cd->rc = ENOMEM; - return false; - } - - ev->de_iov.iov_len = CHUNK_SIZE; - ev->de_req = req; - ev->de_cd = cd; - ev->de_sgl.sg_nr = 1; - ev->de_len = 0; - ev->de_complete_cb = chunk_cb; - - cd->ev = ev; - cd->eqt = eqt; - cd->reqs[slot] = req; - cd->ohs[slot] = oh; - - rc = dfs_read(ie->ie_dfs->dfs_ns, ie->ie_obj, &ev->de_sgl, position, &ev->de_len, - &ev->de_ev); - if (rc != 0) - goto err; - - /* Send a message to the async thread to wake it up and poll for events */ - sem_post(&eqt->de_sem); - - /* Now ensure there are more descriptors for the next request */ - d_slab_restock(eqt->de_read_slab); - - return true; - -err: - daos_event_fini(&ev->de_ev); - d_slab_release(eqt->de_read_slab, ev); - cd->rc = rc; - return false; -} - /* Try and do a bulk read. * * Returns true if it was able to handle the read. @@ -362,11 +291,13 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) { struct dfuse_inode_entry *ie = oh->doh_ie; struct read_chunk_data *cd; + struct read_chunk_req *cr = NULL; off_t last; uint64_t bucket; int slot; bool submit = false; bool rcb; + bool all_done = true; if (len != K128) return false; @@ -391,7 +322,6 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) d_list_for_each_entry(cd, &ie->ie_active->chunks, list) if (cd->bucket == bucket) { - /* Remove from list to re-add again later. */ d_list_del(&cd->list); goto found; } @@ -400,57 +330,121 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh) if (cd == NULL) goto err; + D_ALLOC_PTR(cr); + if (cr == NULL) { + D_FREE(cd); + goto err; + } + + D_INIT_LIST_HEAD(&cd->req_list); cd->ia = ie->ie_active; cd->bucket = bucket; submit = true; found: - if (++cd->entered < 8) { - /* Put on front of list for efficient searching */ - d_list_add(&cd->list, &ie->ie_active->chunks); + for (int i = 0; i < 8; i++) { + if (!cd->slot_done[i]) + all_done = false; } - D_SPIN_UNLOCK(&ie->ie_active->lock); + if (all_done) + d_list_add(&cd->list, &ie->ie_active->chunks); + else + d_list_add_tail(&cd->list, &ie->ie_active->chunks); if (submit) { - DFUSE_TRA_DEBUG(oh, "submit for bucket %ld[%d]", bucket, slot); - rcb = chunk_fetch(req, oh, cd, slot); - } else { - struct dfuse_event *ev = NULL; + struct dfuse_info *dfuse_info = fuse_req_userdata(req); + struct dfuse_eq *eqt; + struct dfuse_event *ev; + int rc; + + /* Overwrite position here to the start of the bucket */ + position = cd->bucket * CHUNK_SIZE; + + eqt = pick_eqt(dfuse_info); + + ev = d_slab_acquire(eqt->de_read_slab); + if (ev == NULL) { + d_list_del(&cd->list); + D_FREE(cr); + D_FREE(cd); + goto err; + } + + d_list_add(&cr->req_list, &cd->req_list); /* Now check if this read request is complete or not yet, if it isn't then just * save req in the right slot however if it is then reply here. After the call to * DFUSE_REPLY_* then no reference is held on either the open file or the inode so * at that point they could be closed. */ - rcb = true; - D_SPIN_LOCK(&ie->ie_active->lock); - if (cd->complete) { - ev = cd->ev; + DFUSE_TRA_DEBUG(oh, "submit for bucket %ld[%d]", bucket, slot); + D_SPIN_UNLOCK(&ie->ie_active->lock); + + cd->eqt = eqt; + cd->ev = ev; + + cr->req = req; + cr->oh = oh; + cr->slot = slot; + + ev->de_iov.iov_len = CHUNK_SIZE; + ev->de_req = req; + ev->de_cd = cd; + ev->de_sgl.sg_nr = 1; + ev->de_len = 0; + ev->de_complete_cb = chunk_cb; + + rc = dfs_read(ie->ie_dfs->dfs_ns, ie->ie_obj, &ev->de_sgl, position, &ev->de_len, + &ev->de_ev); + if (rc == 0) { + /* Send a message to the async thread to wake it up and poll for events */ + sem_post(&eqt->de_sem); } else { - cd->reqs[slot] = req; - cd->ohs[slot] = oh; + ev->de_ev.ev_error = rc; + chunk_cb(ev); } + + rcb = true; + } else if (cd->complete) { + cd->slot_done[slot] = true; + D_SPIN_UNLOCK(&ie->ie_active->lock); - if (ev) { - if (cd->rc != 0) { - /* Don't pass fuse an error here, rather return false and the read - * will be tried over the network. - */ - rcb = false; - } else { - DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position, - position + K128 - 1); - DFUSE_REPLY_BUFQ(oh, req, ev->de_iov.iov_buf + (slot * K128), K128); - } - if (atomic_fetch_add_relaxed(&cd->exited, 1) == 7) { - d_slab_release(cd->eqt->de_read_slab, cd->ev); - D_FREE(cd); - } + if (cd->rc != 0) { + /* Don't pass fuse an error here, rather return false and + * the read will be tried over the network. + */ + rcb = false; + } else { + size_t read_len; + + if ((((slot + 1) * K128) - 1) >= cd->ev->de_len) + read_len = max(cd->ev->de_len - (slot * K128), 0); + else + read_len = K128; + + oh->doh_linear_read_pos = max(oh->doh_linear_read_pos, position + read_len); + + DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position, position + read_len - 1); + DFUSE_REPLY_BUFQ(oh, req, cd->ev->de_iov.iov_buf + (slot * K128), read_len); + rcb = true; + } + } else { + rcb = false; + + D_ALLOC_PTR(cr); + if (cr) { + cr->req = req; + cr->oh = oh; + cr->slot = slot; + d_list_add_tail(&cr->req_list, &cd->req_list); + rcb = true; } + + D_SPIN_UNLOCK(&ie->ie_active->lock); } return rcb; @@ -493,8 +487,10 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct eqt = pick_eqt(dfuse_info); ev = d_slab_acquire(eqt->de_read_slab); - if (ev == NULL) - D_GOTO(err, rc = ENOMEM); + if (ev == NULL) { + DFUSE_REPLY_ERR_RAW(oh, req, ENOMEM); + return; + } if (oh->doh_ie->ie_truncated && position + len < oh->doh_ie->ie_stat.st_size && ((oh->doh_ie->ie_start_off == 0 && oh->doh_ie->ie_end_off == 0) || @@ -533,7 +529,8 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct rc = dfs_read(oh->doh_dfs, oh->doh_obj, &ev->de_sgl, position, &ev->de_len, &ev->de_ev); if (rc != 0) { - D_GOTO(err, rc); + ev->de_ev.ev_error = rc; + dfuse_cb_read_complete(ev); return; } @@ -544,12 +541,6 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct d_slab_restock(eqt->de_read_slab); return; -err: - DFUSE_REPLY_ERR_RAW(oh, req, rc); - if (ev) { - daos_event_fini(&ev->de_ev); - d_slab_release(eqt->de_read_slab, ev); - } } static void