From 5fe96dd2d0304fdbc7b3e1cffb7b712d35a0b460 Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Tue, 25 Feb 2025 16:48:29 +0800 Subject: [PATCH] DAOS-17151 engine: RPC credits for IO chore task - b26 We need to control the in-flight RPCs (via IO chore tasks) count from current target to others. Otherwise, too many in-flight RPCs may occupy a lot of DRAM as to cause out of memory on server. Introudce new server-side environment DAOS_IO_CHORE_CREDITS to allow admin to configure such credits. The default value is 4096, it may be optimized in the future based on more test resutls. Allow-unstable-test: true Signed-off-by: Fan Yong --- src/dtx/dtx_common.c | 106 +++++++++++++++++++----- src/dtx/dtx_internal.h | 11 ++- src/dtx/dtx_rpc.c | 33 ++++++-- src/dtx/dtx_srv.c | 8 ++ src/dtx/tests/ult_mock.c | 11 ++- src/engine/srv.c | 10 +++ src/engine/srv_internal.h | 7 ++ src/engine/ult.c | 49 +++++++++-- src/include/daos_srv/daos_engine.h | 14 ++-- src/tests/ftest/util/telemetry_utils.py | 3 + 10 files changed, 208 insertions(+), 44 deletions(-) diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index 28f8e04b154..9daa5a99c7e 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -1178,7 +1178,7 @@ dtx_leader_begin(daos_handle_t coh, struct dtx_id *dti, struct dtx_epoch *epoch, D_ALLOC(dlh, sizeof(*dlh) + sizeof(struct dtx_sub_status) * tgt_cnt); if (dlh == NULL) - return -DER_NOMEM; + D_GOTO(out, rc = -DER_NOMEM); dlh->dlh_future = ABT_FUTURE_NULL; dlh->dlh_coll_entry = dce; @@ -1216,11 +1216,12 @@ dtx_leader_begin(daos_handle_t coh, struct dtx_id *dti, struct dtx_epoch *epoch, if (rc == 0 && sub_modification_cnt > 0) rc = vos_dtx_attach(dth, false, (flags & DTX_PREPARED) ? true : false); - D_DEBUG(DB_IO, "Start (%s) DTX "DF_DTI" sub modification %d, ver %u, epoch " - DF_X64", leader "DF_UOID", dti_cos_cnt %d, tgt_cnt %d, flags %x: "DF_RC"\n", - dlh->dlh_coll ? (dlh->dlh_relay ? "relay" : "collective") : "regular", - DP_DTI(dti), sub_modification_cnt, dth->dth_ver, epoch->oe_value, - DP_UOID(*leader_oid), dti_cos_cnt, tgt_cnt, flags, DP_RC(rc)); +out: + DL_CDEBUG(rc != 0, DLOG_ERR, DB_IO, rc, "Start (%s) DTX " DF_DTI " sub modification %d, " + "ver %u, eph " DF_X64 ", leader " DF_UOID ", cos_cnt %d, tgt_cnt %d, flags %x: ", + flags & DTX_TGT_COLL ? (flags & DTX_RELAY ? "relay" : "collective") : "regular", + DP_DTI(dti), sub_modification_cnt, pm_ver, epoch->oe_value, + DP_UOID(*leader_oid), dti_cos_cnt, tgt_cnt, flags); if (rc != 0) { D_FREE(dlh); @@ -2083,8 +2084,9 @@ dtx_sub_comp_cb(struct dtx_leader_handle *dlh, int idx, int rc) sub->dss_comp = 1; sub->dss_result = rc; - D_DEBUG(DB_TRACE, "execute from idx %d (%d:%d), flags %x: rc %d\n", - idx, tgt->st_rank, tgt->st_tgt_idx, tgt->st_flags, rc); + DL_CDEBUG(rc == -DER_NOMEM, DLOG_ERR, DB_TRACE, rc, + "execute from idx %d (%d:%d), flags %x", + idx, tgt->st_rank, tgt->st_tgt_idx, tgt->st_flags); } rc = ABT_future_set(dlh->dlh_future, dlh); @@ -2189,6 +2191,7 @@ int dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, dtx_agg_cb_t agg_cb, int allow_failure, void *func_arg) { + struct dtx_tls *tls = dtx_tls_get(); struct dtx_chore dtx_chore; int sub_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt; int rc = 0; @@ -2199,6 +2202,9 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, dtx_chore.func_arg = func_arg; dtx_chore.dlh = dlh; + dtx_chore.chore.cho_func = dtx_leader_exec_ops_chore; + dtx_chore.chore.cho_priority = 0; + dlh->dlh_result = 0; dlh->dlh_allow_failure = allow_failure; dlh->dlh_normal_sub_done = 0; @@ -2207,8 +2213,8 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, dlh->dlh_need_agg = 0; dlh->dlh_agg_done = 0; - if (sub_cnt > DTX_RPC_STEP_LENGTH) { - dlh->dlh_forward_cnt = DTX_RPC_STEP_LENGTH; + if (sub_cnt > DTX_REG_RPC_STEP_LENGTH) { + dlh->dlh_forward_cnt = DTX_REG_RPC_STEP_LENGTH; } else { dlh->dlh_forward_cnt = sub_cnt; if (likely(dlh->dlh_delay_sub_cnt == 0) && agg_cb != NULL) @@ -2218,7 +2224,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, if (dlh->dlh_normal_sub_cnt == 0) goto exec; -again: +again1: D_ASSERT(dlh->dlh_future == ABT_FUTURE_NULL); /* @@ -2232,12 +2238,57 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, D_GOTO(out, rc = dss_abterr2der(rc)); } - rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore); +again2: + dtx_chore.chore.cho_credits = dlh->dlh_forward_cnt; + dtx_chore.chore.cho_hint = NULL; + rc = dss_chore_register(&dtx_chore.chore); if (rc != 0) { - DL_ERROR(rc, "chore create failed [%u, %u] (2)", dlh->dlh_forward_idx, - dlh->dlh_forward_cnt); - ABT_future_free(&dlh->dlh_future); - goto out; + if (rc != -DER_AGAIN) { + DL_ERROR(rc, "chore create failed [%u, %u] (2)", + dlh->dlh_forward_idx, dlh->dlh_forward_cnt); + ABT_future_free(&dlh->dlh_future); + goto out; + } + + d_tm_inc_counter(tls->dt_chore_retry, 1); + + /* + * To avoid the whole task is split too many pieces. If there are very few + * credits, we may prefer to wait instead of shrink the credits quirement. + */ + if (dtx_chore.chore.cho_credits > dlh->dlh_normal_sub_cnt / 8) { + D_DEBUG(DB_TRACE, "Retry IO forward with credits from %d to %d\n", + dlh->dlh_forward_cnt, dtx_chore.chore.cho_credits); + ABT_future_free(&dlh->dlh_future); + dlh->dlh_forward_cnt = dtx_chore.chore.cho_credits; + goto again1; + } + + /* + * If more than half sub-requests have been processed, let's handle the left + * part ASAP to avoid the whole task timeout. Otherwise once timeout, it may + * cause more overhead for rollback. + */ + if (dlh->dlh_forward_idx > sub_cnt / 2) { + dtx_chore.chore.cho_priority = 1; + + if (dlh->dlh_forward_cnt > DTX_PRI_RPC_STEP_LENGTH) { + D_DEBUG(DB_TRACE, "Retry (prio) IO forward with credits %d => %d\n", + dlh->dlh_forward_cnt, DTX_PRI_RPC_STEP_LENGTH); + ABT_future_free(&dlh->dlh_future); + dlh->dlh_forward_cnt = DTX_PRI_RPC_STEP_LENGTH; + goto again1; + } + + D_DEBUG(DB_TRACE, "Retry (prio) IO forward with credits %d\n", + dlh->dlh_forward_cnt); + goto again2; + } + + D_DEBUG(DB_TRACE, "Not enough credits (%d vs %d) for IO forward, wait and retry\n", + dlh->dlh_forward_cnt, dtx_chore.chore.cho_credits); + ABT_thread_yield(); + goto again2; } exec: @@ -2246,8 +2297,10 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, local_rc = func(dlh, func_arg, -1, NULL); /* Even the local request failure, we still need to wait for remote sub request. */ - if (dlh->dlh_normal_sub_cnt > 0) + if (dlh->dlh_normal_sub_cnt > 0) { remote_rc = dtx_leader_wait(dlh); + dss_chore_deregister(&dtx_chore.chore); + } if (local_rc != 0 && local_rc != allow_failure) D_GOTO(out, rc = local_rc); @@ -2258,7 +2311,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, sub_cnt -= dlh->dlh_forward_cnt; if (sub_cnt > 0) { dlh->dlh_forward_idx += dlh->dlh_forward_cnt; - if (sub_cnt <= DTX_RPC_STEP_LENGTH) { + if (sub_cnt <= DTX_REG_RPC_STEP_LENGTH) { dlh->dlh_forward_cnt = sub_cnt; if (likely(dlh->dlh_delay_sub_cnt == 0) && agg_cb != NULL) dlh->dlh_need_agg = 1; @@ -2270,7 +2323,8 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, dlh->dlh_delay_sub_cnt, dlh->dlh_forward_idx, dlh->dlh_forward_cnt, allow_failure); - goto again; + dtx_chore.chore.cho_priority = 0; + goto again1; } dlh->dlh_normal_sub_done = 1; @@ -2311,7 +2365,18 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, /* The ones without DELAY flag will be skipped when scan the targets array. */ dlh->dlh_forward_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt; - rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore); + /* + * Since non-delay sub-requests have already been processed, let's use high priority + * to apply chore credits, then the left delayed sub-requests can be handled quickly + * to reduce the possibility of the whole IO timeout. + */ + if (unlikely(dlh->dlh_delay_sub_cnt < DTX_PRI_RPC_STEP_LENGTH)) + D_WARN("Too many delayed sub-requests %u\n", dlh->dlh_delay_sub_cnt); + + dtx_chore.chore.cho_priority = 1; + dtx_chore.chore.cho_credits = dlh->dlh_delay_sub_cnt; + dtx_chore.chore.cho_hint = NULL; + rc = dss_chore_register(&dtx_chore.chore); if (rc != 0) { DL_ERROR(rc, "chore create failed (4)"); ABT_future_free(&dlh->dlh_future); @@ -2319,6 +2384,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, } remote_rc = dtx_leader_wait(dlh); + dss_chore_deregister(&dtx_chore.chore); if (remote_rc != 0 && remote_rc != allow_failure) rc = remote_rc; diff --git a/src/dtx/dtx_internal.h b/src/dtx/dtx_internal.h index 3fbd06f6b92..4abc06471e9 100644 --- a/src/dtx/dtx_internal.h +++ b/src/dtx/dtx_internal.h @@ -1,5 +1,6 @@ /** * (C) Copyright 2019-2024 Intel Corporation. + * (C) Copyright 2025 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -187,7 +188,14 @@ extern uint32_t dtx_batched_ult_max; * dispatch process may trigger too many in-flight or in-queued RPCs that will hold * too much resource as to server maybe out of memory. */ -#define DTX_RPC_STEP_LENGTH DTX_THRESHOLD_COUNT +#define DTX_REG_RPC_STEP_LENGTH 512 + +/* + * High priority (DTX) RPC may break through IO chore credit restriction temporarily. + * To reduce the side-affect on the other IO forward RPCs, use smaller step for high + * priority RPC. + */ +#define DTX_PRI_RPC_STEP_LENGTH 64 extern struct crt_corpc_ops dtx_coll_commit_co_ops; extern struct crt_corpc_ops dtx_coll_abort_co_ops; @@ -214,6 +222,7 @@ struct dtx_tls { struct d_tm_node_t *dt_committable; struct d_tm_node_t *dt_dtx_leader_total; struct d_tm_node_t *dt_async_cmt_lat; + struct d_tm_node_t *dt_chore_retry; uint64_t dt_agg_gen; uint32_t dt_batched_ult_cnt; }; diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index 20ecb614cc1..a46b0902cee 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -1,5 +1,6 @@ /** * (C) Copyright 2019-2024 Intel Corporation. + * (C) Copyright 2025 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -720,6 +721,9 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, length = dca->dca_count; } + + dca->dca_chore.cho_func = dtx_rpc_helper; + dca->dca_chore.cho_priority = 1; dca->dca_drr = d_list_entry(dca->dca_head.next, struct dtx_req_rec, drr_link); /* @@ -728,8 +732,8 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, * to reduce the whole network peak load and the pressure on related peers. */ while (length > 0) { - if (length > DTX_RPC_STEP_LENGTH && opc != DTX_CHECK) - dca->dca_steps = DTX_RPC_STEP_LENGTH; + if (length > DTX_PRI_RPC_STEP_LENGTH && opc != DTX_CHECK) + dca->dca_steps = DTX_PRI_RPC_STEP_LENGTH; else dca->dca_steps = length; @@ -742,7 +746,9 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, goto out; } - rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper); + dca->dca_chore.cho_credits = dca->dca_steps; + dca->dca_chore.cho_hint = NULL; + rc = dss_chore_register(&dca->dca_chore); if (rc != 0) { ABT_eventual_free(&dca->dca_chore_eventual); goto out; @@ -754,10 +760,11 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, rc = ABT_eventual_free(&dca->dca_chore_eventual); D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_free: %d\n", rc); } else { - dss_chore_diy(&dca->dca_chore, dtx_rpc_helper); + dss_chore_diy(&dca->dca_chore); } rc = dtx_req_wait(&dca->dca_dra); + dss_chore_deregister(&dca->dca_chore); if (rc == 0 || rc == -DER_NONEXIST) goto next; @@ -781,9 +788,9 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, */ break; case DTX_REFRESH: - D_ASSERTF(length < DTX_RPC_STEP_LENGTH, + D_ASSERTF(length < DTX_PRI_RPC_STEP_LENGTH, "Too long list for DTX refresh: %u vs %u\n", - length, DTX_RPC_STEP_LENGTH); + length, DTX_PRI_RPC_STEP_LENGTH); break; default: D_ASSERTF(0, "Invalid DTX opc %u\n", opc); @@ -1580,6 +1587,9 @@ dtx_coll_rpc_prep(struct ds_cont_child *cont, struct dtx_coll_entry *dce, uint32 dcra->dcra_hints = dce->dce_hints; dcra->dcra_hint_sz = dce->dce_hint_sz; + dcra->dcra_chore.cho_func = dtx_coll_rpc_helper; + dcra->dcra_chore.cho_priority = 1; + rc = ABT_future_create(1, NULL, &dcra->dcra_future); if (rc != ABT_SUCCESS) { D_ERROR("ABT_future_create failed for coll DTX ("DF_DTI") RPC %u: rc = %d\n", @@ -1588,9 +1598,15 @@ dtx_coll_rpc_prep(struct ds_cont_child *cont, struct dtx_coll_entry *dce, uint32 } if (dss_has_enough_helper()) { - rc = dss_chore_delegate(&dcra->dcra_chore, dtx_coll_rpc_helper); + /* The cho_credits maybe over-estimated, no matter. */ + dcra->dcra_chore.cho_credits = dcra->dcra_ranks->rl_nr < DTX_COLL_TREE_WIDTH ? + dcra->dcra_ranks->rl_nr : DTX_COLL_TREE_WIDTH; + dcra->dcra_chore.cho_hint = NULL; + rc = dss_chore_register(&dcra->dcra_chore); + if (rc != 0) + ABT_future_free(&dcra->dcra_future); } else { - dss_chore_diy(&dcra->dcra_chore, dtx_coll_rpc_helper); + dss_chore_diy(&dcra->dcra_chore); rc = 0; } @@ -1608,6 +1624,7 @@ dtx_coll_rpc_post(struct dtx_coll_rpc_args *dcra, int ret) "Collective DTX wait req for opc %u, future %p done, rc %d, result %d\n", dcra->dcra_opc, dcra->dcra_future, rc, dcra->dcra_result); ABT_future_free(&dcra->dcra_future); + dss_chore_deregister(&dcra->dcra_chore); } return ret != 0 ? ret : dcra->dcra_result; diff --git a/src/dtx/dtx_srv.c b/src/dtx/dtx_srv.c index 8241fe9dfd8..734d772bfb2 100644 --- a/src/dtx/dtx_srv.c +++ b/src/dtx/dtx_srv.c @@ -1,5 +1,6 @@ /** * (C) Copyright 2019-2024 Intel Corporation. + * (C) Copyright 2025 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -55,6 +56,13 @@ dtx_tls_init(int tags, int xs_id, int tgt_id) D_WARN("Failed to create DTX async commit latency metric: " DF_RC"\n", DP_RC(rc)); + rc = d_tm_add_metric(&tls->dt_chore_retry, D_TM_COUNTER, + "DTX chore retry", NULL, + "io/dtx/chore_retry/tgt_%u", tgt_id); + if (rc != DER_SUCCESS) + D_WARN("Failed to create DTX chore retry metric: " DF_RC"\n", + DP_RC(rc)); + return tls; } diff --git a/src/dtx/tests/ult_mock.c b/src/dtx/tests/ult_mock.c index 41fc639c034..19ae7bc4309 100644 --- a/src/dtx/tests/ult_mock.c +++ b/src/dtx/tests/ult_mock.c @@ -1,5 +1,6 @@ /** * (C) Copyright 2023-2024 Intel Corporation. + * (C) Copyright 2025 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -74,14 +75,20 @@ struct dss_chore; typedef enum dss_chore_status (*dss_chore_func_t)(struct dss_chore *chore, bool is_reentrance); void -dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func) +dss_chore_diy(struct dss_chore *chore) { assert_true(false); } int -dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func) +dss_chore_register(struct dss_chore *chore) { assert_true(false); return -DER_NOMEM; } + +void +dss_chore_deregister(struct dss_chore *chore) +{ + assert_true(false); +} diff --git a/src/engine/srv.c b/src/engine/srv.c index 9461a18e9d9..4ed310166e9 100644 --- a/src/engine/srv.c +++ b/src/engine/srv.c @@ -1,5 +1,6 @@ /* * (C) Copyright 2016-2024 Intel Corporation. + * (C) Copyright 2025 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -1118,6 +1119,15 @@ dss_xstreams_init(void) d_getenv_uint("DAOS_SCHED_UNIT_RUNTIME_MAX", &sched_unit_runtime_max); d_getenv_bool("DAOS_SCHED_WATCHDOG_ALL", &sched_watchdog_all); + dss_chore_credits = DSS_CHORE_CREDITS_DEF; + d_getenv_uint("DAOS_IO_CHORE_CREDITS", &dss_chore_credits); + if (dss_chore_credits < DSS_CHORE_CREDITS_MIN) { + D_WARN("Invalid DAOS_IO_CHORE_CREDITS value, minimum is %u, set as default %u\n", + DSS_CHORE_CREDITS_MIN, DSS_CHORE_CREDITS_DEF); + dss_chore_credits = DSS_CHORE_CREDITS_DEF; + } + D_INFO("Set DAOS IO chore credits as %u\n", dss_chore_credits); + /* start the execution streams */ D_DEBUG(DB_TRACE, "%d cores total detected starting %d main xstreams\n", diff --git a/src/engine/srv_internal.h b/src/engine/srv_internal.h index 222f07e4906..71e2740eac9 100644 --- a/src/engine/srv_internal.h +++ b/src/engine/srv_internal.h @@ -1,5 +1,6 @@ /* * (C) Copyright 2016-2024 Intel Corporation. + * (C) Copyright 2025 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -72,6 +73,7 @@ struct mem_stats { /* See dss_chore. */ struct dss_chore_queue { d_list_t chq_list; + int32_t chq_credits; bool chq_stop; ABT_mutex chq_mutex; ABT_cond chq_cond; @@ -166,6 +168,11 @@ extern unsigned int dss_tgt_offload_xs_nr; extern unsigned int dss_offload_per_numa_nr; /** Number of target per socket */ extern unsigned int dss_tgt_per_numa_nr; +/** The maximum number of credits for each IO chore queue. That is per helper XS. */ +extern uint32_t dss_chore_credits; + +#define DSS_CHORE_CREDITS_MIN 1024 +#define DSS_CHORE_CREDITS_DEF 4096 /** Shadow dss_get_module_info */ struct dss_module_info *get_module_info(void); diff --git a/src/engine/ult.c b/src/engine/ult.c index fbeb3f538fa..badc16aa13d 100644 --- a/src/engine/ult.c +++ b/src/engine/ult.c @@ -1,5 +1,6 @@ /** * (C) Copyright 2016-2024 Intel Corporation. + * (C) Copyright 2025 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -13,6 +14,9 @@ /* ============== Thread collective functions ============================ */ +/** The maximum number of credits for each IO chore queue. That is per helper XS. */ +uint32_t dss_chore_credits; + struct aggregator_arg_type { struct dss_stream_arg_type at_args; void (*at_reduce)(void *a_args, @@ -716,20 +720,20 @@ dss_chore_ult(void *arg) * Add \a chore for \a func to the chore queue of some other xstream. * * \param[in] chore address of the embedded chore object - * \param[in] func function to be executed via \a chore * * \retval -DER_CANCEL chore queue stopping */ int -dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func) +dss_chore_register(struct dss_chore *chore) { struct dss_module_info *info = dss_get_module_info(); int xs_id; struct dss_xstream *dx; struct dss_chore_queue *queue; + D_ASSERT(chore->cho_credits > 0); + chore->cho_status = DSS_CHORE_NEW; - chore->cho_func = func; /* * The dss_chore_queue_ult approach may get insufficient scheduling on @@ -755,15 +759,46 @@ dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func) ABT_mutex_unlock(queue->chq_mutex); return -DER_CANCELED; } + + if (!chore->cho_priority && queue->chq_credits < chore->cho_credits) { + /* + * Piggyback current available credits, then the caller can decide + * whether can shrink credit requirement and retry with more steps. + */ + chore->cho_credits = queue->chq_credits; + ABT_mutex_unlock(queue->chq_mutex); + return -DER_AGAIN; + } + + /* queue->chq_credits can be negative temporarily because of high priority requests. */ + queue->chq_credits -= chore->cho_credits; + chore->cho_hint = queue; d_list_add_tail(&chore->cho_link, &queue->chq_list); ABT_cond_broadcast(queue->chq_cond); ABT_mutex_unlock(queue->chq_mutex); - D_DEBUG(DB_TRACE, "%p: tgt_id=%d -> xs_id=%d dx.tgt_id=%d\n", chore, info->dmi_tgt_id, - xs_id, dx->dx_tgt_id); + D_DEBUG(DB_TRACE, "register chore %p on queue %p: tgt=%d -> xs=%d dx.tgt=%d, credits %u\n", + chore, queue, info->dmi_tgt_id, xs_id, dx->dx_tgt_id, chore->cho_credits); return 0; } +void +dss_chore_deregister(struct dss_chore *chore) +{ + struct dss_chore_queue *queue = chore->cho_hint; + + if (queue != NULL) { + D_ASSERT(chore->cho_credits > 0); + + ABT_mutex_lock(queue->chq_mutex); + queue->chq_credits += chore->cho_credits; + ABT_mutex_unlock(queue->chq_mutex); + + D_DEBUG(DB_TRACE, "deregister chore %p from queue %p: credits %u\n", + chore, queue, chore->cho_credits); + } +} + /** * Do \a chore for \a func synchronously in the current ULT. * @@ -771,11 +806,10 @@ dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func) * \param[in] func function to be executed via \a chore */ void -dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func) +dss_chore_diy(struct dss_chore *chore) { D_INIT_LIST_HEAD(&chore->cho_link); chore->cho_status = DSS_CHORE_NEW; - chore->cho_func = func; dss_chore_diy_internal(chore); } @@ -856,6 +890,7 @@ dss_chore_queue_init(struct dss_xstream *dx) D_INIT_LIST_HEAD(&queue->chq_list); queue->chq_stop = false; + queue->chq_credits = dss_chore_credits; rc = ABT_mutex_create(&queue->chq_mutex); if (rc != ABT_SUCCESS) { diff --git a/src/include/daos_srv/daos_engine.h b/src/include/daos_srv/daos_engine.h index 189f0d10753..a29a97cd4e1 100644 --- a/src/include/daos_srv/daos_engine.h +++ b/src/include/daos_srv/daos_engine.h @@ -1,5 +1,6 @@ /** * (C) Copyright 2016-2024 Intel Corporation. + * (C) Copyright 2025 Hewlett Packard Enterprise Development LP * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -767,22 +768,23 @@ struct dss_chore; typedef enum dss_chore_status (*dss_chore_func_t)(struct dss_chore *chore, bool is_reentrance); /** - * Chore (opaque) - * * A simple task (e.g., an I/O forwarding task) that yields by returning * DSS_CHORE_YIELD instead of calling ABT_thread_yield. This data structure * shall be embedded in the user's own task data structure, which typically - * also includes arguments and internal state variables for \a cho_func. All - * fields are private. See dtx_chore for an example. + * also includes arguments and internal state variables for \a cho_func. */ struct dss_chore { d_list_t cho_link; enum dss_chore_status cho_status; dss_chore_func_t cho_func; + uint32_t cho_priority:1; + int32_t cho_credits; + void *cho_hint; }; -int dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func); -void dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func); +int dss_chore_register(struct dss_chore *chore); +void dss_chore_deregister(struct dss_chore *chore); +void dss_chore_diy(struct dss_chore *chore); bool engine_in_check(void); diff --git a/src/tests/ftest/util/telemetry_utils.py b/src/tests/ftest/util/telemetry_utils.py index a075bb0ecdd..39b5e2f5087 100644 --- a/src/tests/ftest/util/telemetry_utils.py +++ b/src/tests/ftest/util/telemetry_utils.py @@ -205,6 +205,8 @@ class TelemetryUtils(): *_gen_stats_metrics("engine_dmabuff_grab_retries")] ENGINE_IO_DTX_ASYNC_CMT_LAT_METRICS = \ _gen_stats_metrics("engine_io_dtx_async_cmt_lat") + ENGINE_IO_DTX_CHORE_RETRY_METRICS = \ + _gen_stats_metrics("engine_io_dtx_chore_retry") ENGINE_IO_DTX_COMMITTABLE_METRICS = \ _gen_stats_metrics("engine_io_dtx_committable") ENGINE_IO_DTX_COMMITTED_METRICS = \ @@ -312,6 +314,7 @@ class TelemetryUtils(): ENGINE_IO_OPS_UPDATE_ACTIVE_METRICS = \ _gen_stats_metrics("engine_io_ops_update_active") ENGINE_IO_METRICS = ENGINE_IO_DTX_ASYNC_CMT_LAT_METRICS +\ + ENGINE_IO_DTX_CHORE_RETRY_METRICS +\ ENGINE_IO_DTX_COMMITTABLE_METRICS +\ ENGINE_IO_DTX_COMMITTED_METRICS +\ ENGINE_IO_DTX_INVALID_METRICS +\