Skip to content

Commit

Permalink
DAOS-17151 engine: RPC credits for IO chore task - b26
Browse files Browse the repository at this point in the history
We need to control the in-flight RPCs (via IO chore tasks) count from current
target to others. Otherwise, too many in-flight RPCs may occupy a lot of DRAM
as to cause out of memory on server.

Introudce new server-side environment DAOS_IO_CHORE_CREDITS to allow admin to
configure such credits. The default value is 4096, it may be optimized in the
future based on more test resutls.

Allow-unstable-test: true

Signed-off-by: Fan Yong <[email protected]>
  • Loading branch information
Nasf-Fan committed Feb 25, 2025
1 parent 9977cb9 commit 5fe96dd
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 44 deletions.
106 changes: 86 additions & 20 deletions src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ dtx_leader_begin(daos_handle_t coh, struct dtx_id *dti, struct dtx_epoch *epoch,

D_ALLOC(dlh, sizeof(*dlh) + sizeof(struct dtx_sub_status) * tgt_cnt);
if (dlh == NULL)
return -DER_NOMEM;
D_GOTO(out, rc = -DER_NOMEM);

dlh->dlh_future = ABT_FUTURE_NULL;
dlh->dlh_coll_entry = dce;
Expand Down Expand Up @@ -1216,11 +1216,12 @@ dtx_leader_begin(daos_handle_t coh, struct dtx_id *dti, struct dtx_epoch *epoch,
if (rc == 0 && sub_modification_cnt > 0)
rc = vos_dtx_attach(dth, false, (flags & DTX_PREPARED) ? true : false);

D_DEBUG(DB_IO, "Start (%s) DTX "DF_DTI" sub modification %d, ver %u, epoch "
DF_X64", leader "DF_UOID", dti_cos_cnt %d, tgt_cnt %d, flags %x: "DF_RC"\n",
dlh->dlh_coll ? (dlh->dlh_relay ? "relay" : "collective") : "regular",
DP_DTI(dti), sub_modification_cnt, dth->dth_ver, epoch->oe_value,
DP_UOID(*leader_oid), dti_cos_cnt, tgt_cnt, flags, DP_RC(rc));
out:
DL_CDEBUG(rc != 0, DLOG_ERR, DB_IO, rc, "Start (%s) DTX " DF_DTI " sub modification %d, "
"ver %u, eph " DF_X64 ", leader " DF_UOID ", cos_cnt %d, tgt_cnt %d, flags %x: ",
flags & DTX_TGT_COLL ? (flags & DTX_RELAY ? "relay" : "collective") : "regular",
DP_DTI(dti), sub_modification_cnt, pm_ver, epoch->oe_value,
DP_UOID(*leader_oid), dti_cos_cnt, tgt_cnt, flags);

if (rc != 0) {
D_FREE(dlh);
Expand Down Expand Up @@ -2083,8 +2084,9 @@ dtx_sub_comp_cb(struct dtx_leader_handle *dlh, int idx, int rc)
sub->dss_comp = 1;
sub->dss_result = rc;

D_DEBUG(DB_TRACE, "execute from idx %d (%d:%d), flags %x: rc %d\n",
idx, tgt->st_rank, tgt->st_tgt_idx, tgt->st_flags, rc);
DL_CDEBUG(rc == -DER_NOMEM, DLOG_ERR, DB_TRACE, rc,
"execute from idx %d (%d:%d), flags %x",
idx, tgt->st_rank, tgt->st_tgt_idx, tgt->st_flags);
}

rc = ABT_future_set(dlh->dlh_future, dlh);
Expand Down Expand Up @@ -2189,6 +2191,7 @@ int
dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
dtx_agg_cb_t agg_cb, int allow_failure, void *func_arg)
{
struct dtx_tls *tls = dtx_tls_get();
struct dtx_chore dtx_chore;
int sub_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt;
int rc = 0;
Expand All @@ -2199,6 +2202,9 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
dtx_chore.func_arg = func_arg;
dtx_chore.dlh = dlh;

dtx_chore.chore.cho_func = dtx_leader_exec_ops_chore;
dtx_chore.chore.cho_priority = 0;

dlh->dlh_result = 0;
dlh->dlh_allow_failure = allow_failure;
dlh->dlh_normal_sub_done = 0;
Expand All @@ -2207,8 +2213,8 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
dlh->dlh_need_agg = 0;
dlh->dlh_agg_done = 0;

if (sub_cnt > DTX_RPC_STEP_LENGTH) {
dlh->dlh_forward_cnt = DTX_RPC_STEP_LENGTH;
if (sub_cnt > DTX_REG_RPC_STEP_LENGTH) {
dlh->dlh_forward_cnt = DTX_REG_RPC_STEP_LENGTH;
} else {
dlh->dlh_forward_cnt = sub_cnt;
if (likely(dlh->dlh_delay_sub_cnt == 0) && agg_cb != NULL)
Expand All @@ -2218,7 +2224,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
if (dlh->dlh_normal_sub_cnt == 0)
goto exec;

again:
again1:
D_ASSERT(dlh->dlh_future == ABT_FUTURE_NULL);

/*
Expand All @@ -2232,12 +2238,57 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
D_GOTO(out, rc = dss_abterr2der(rc));
}

rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore);
again2:
dtx_chore.chore.cho_credits = dlh->dlh_forward_cnt;
dtx_chore.chore.cho_hint = NULL;
rc = dss_chore_register(&dtx_chore.chore);
if (rc != 0) {
DL_ERROR(rc, "chore create failed [%u, %u] (2)", dlh->dlh_forward_idx,
dlh->dlh_forward_cnt);
ABT_future_free(&dlh->dlh_future);
goto out;
if (rc != -DER_AGAIN) {
DL_ERROR(rc, "chore create failed [%u, %u] (2)",
dlh->dlh_forward_idx, dlh->dlh_forward_cnt);
ABT_future_free(&dlh->dlh_future);
goto out;
}

d_tm_inc_counter(tls->dt_chore_retry, 1);

/*
* To avoid the whole task is split too many pieces. If there are very few
* credits, we may prefer to wait instead of shrink the credits quirement.
*/
if (dtx_chore.chore.cho_credits > dlh->dlh_normal_sub_cnt / 8) {
D_DEBUG(DB_TRACE, "Retry IO forward with credits from %d to %d\n",
dlh->dlh_forward_cnt, dtx_chore.chore.cho_credits);
ABT_future_free(&dlh->dlh_future);
dlh->dlh_forward_cnt = dtx_chore.chore.cho_credits;
goto again1;
}

/*
* If more than half sub-requests have been processed, let's handle the left
* part ASAP to avoid the whole task timeout. Otherwise once timeout, it may
* cause more overhead for rollback.
*/
if (dlh->dlh_forward_idx > sub_cnt / 2) {
dtx_chore.chore.cho_priority = 1;

if (dlh->dlh_forward_cnt > DTX_PRI_RPC_STEP_LENGTH) {
D_DEBUG(DB_TRACE, "Retry (prio) IO forward with credits %d => %d\n",
dlh->dlh_forward_cnt, DTX_PRI_RPC_STEP_LENGTH);
ABT_future_free(&dlh->dlh_future);
dlh->dlh_forward_cnt = DTX_PRI_RPC_STEP_LENGTH;
goto again1;
}

D_DEBUG(DB_TRACE, "Retry (prio) IO forward with credits %d\n",
dlh->dlh_forward_cnt);
goto again2;
}

D_DEBUG(DB_TRACE, "Not enough credits (%d vs %d) for IO forward, wait and retry\n",
dlh->dlh_forward_cnt, dtx_chore.chore.cho_credits);
ABT_thread_yield();
goto again2;
}

exec:
Expand All @@ -2246,8 +2297,10 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
local_rc = func(dlh, func_arg, -1, NULL);

/* Even the local request failure, we still need to wait for remote sub request. */
if (dlh->dlh_normal_sub_cnt > 0)
if (dlh->dlh_normal_sub_cnt > 0) {
remote_rc = dtx_leader_wait(dlh);
dss_chore_deregister(&dtx_chore.chore);
}

if (local_rc != 0 && local_rc != allow_failure)
D_GOTO(out, rc = local_rc);
Expand All @@ -2258,7 +2311,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
sub_cnt -= dlh->dlh_forward_cnt;
if (sub_cnt > 0) {
dlh->dlh_forward_idx += dlh->dlh_forward_cnt;
if (sub_cnt <= DTX_RPC_STEP_LENGTH) {
if (sub_cnt <= DTX_REG_RPC_STEP_LENGTH) {
dlh->dlh_forward_cnt = sub_cnt;
if (likely(dlh->dlh_delay_sub_cnt == 0) && agg_cb != NULL)
dlh->dlh_need_agg = 1;
Expand All @@ -2270,7 +2323,8 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
dlh->dlh_delay_sub_cnt, dlh->dlh_forward_idx,
dlh->dlh_forward_cnt, allow_failure);

goto again;
dtx_chore.chore.cho_priority = 0;
goto again1;
}

dlh->dlh_normal_sub_done = 1;
Expand Down Expand Up @@ -2311,14 +2365,26 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
/* The ones without DELAY flag will be skipped when scan the targets array. */
dlh->dlh_forward_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt;

rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore);
/*
* Since non-delay sub-requests have already been processed, let's use high priority
* to apply chore credits, then the left delayed sub-requests can be handled quickly
* to reduce the possibility of the whole IO timeout.
*/
if (unlikely(dlh->dlh_delay_sub_cnt < DTX_PRI_RPC_STEP_LENGTH))
D_WARN("Too many delayed sub-requests %u\n", dlh->dlh_delay_sub_cnt);

dtx_chore.chore.cho_priority = 1;
dtx_chore.chore.cho_credits = dlh->dlh_delay_sub_cnt;
dtx_chore.chore.cho_hint = NULL;
rc = dss_chore_register(&dtx_chore.chore);
if (rc != 0) {
DL_ERROR(rc, "chore create failed (4)");
ABT_future_free(&dlh->dlh_future);
goto out;
}

remote_rc = dtx_leader_wait(dlh);
dss_chore_deregister(&dtx_chore.chore);
if (remote_rc != 0 && remote_rc != allow_failure)
rc = remote_rc;

Expand Down
11 changes: 10 additions & 1 deletion src/dtx/dtx_internal.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* (C) Copyright 2019-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -187,7 +188,14 @@ extern uint32_t dtx_batched_ult_max;
* dispatch process may trigger too many in-flight or in-queued RPCs that will hold
* too much resource as to server maybe out of memory.
*/
#define DTX_RPC_STEP_LENGTH DTX_THRESHOLD_COUNT
#define DTX_REG_RPC_STEP_LENGTH 512

/*
* High priority (DTX) RPC may break through IO chore credit restriction temporarily.
* To reduce the side-affect on the other IO forward RPCs, use smaller step for high
* priority RPC.
*/
#define DTX_PRI_RPC_STEP_LENGTH 64

extern struct crt_corpc_ops dtx_coll_commit_co_ops;
extern struct crt_corpc_ops dtx_coll_abort_co_ops;
Expand All @@ -214,6 +222,7 @@ struct dtx_tls {
struct d_tm_node_t *dt_committable;
struct d_tm_node_t *dt_dtx_leader_total;
struct d_tm_node_t *dt_async_cmt_lat;
struct d_tm_node_t *dt_chore_retry;
uint64_t dt_agg_gen;
uint32_t dt_batched_ult_cnt;
};
Expand Down
33 changes: 25 additions & 8 deletions src/dtx/dtx_rpc.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* (C) Copyright 2019-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -720,6 +721,9 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes,
length = dca->dca_count;
}


dca->dca_chore.cho_func = dtx_rpc_helper;
dca->dca_chore.cho_priority = 1;
dca->dca_drr = d_list_entry(dca->dca_head.next, struct dtx_req_rec, drr_link);

/*
Expand All @@ -728,8 +732,8 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes,
* to reduce the whole network peak load and the pressure on related peers.
*/
while (length > 0) {
if (length > DTX_RPC_STEP_LENGTH && opc != DTX_CHECK)
dca->dca_steps = DTX_RPC_STEP_LENGTH;
if (length > DTX_PRI_RPC_STEP_LENGTH && opc != DTX_CHECK)
dca->dca_steps = DTX_PRI_RPC_STEP_LENGTH;
else
dca->dca_steps = length;

Expand All @@ -742,7 +746,9 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes,
goto out;
}

rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper);
dca->dca_chore.cho_credits = dca->dca_steps;
dca->dca_chore.cho_hint = NULL;
rc = dss_chore_register(&dca->dca_chore);
if (rc != 0) {
ABT_eventual_free(&dca->dca_chore_eventual);
goto out;
Expand All @@ -754,10 +760,11 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes,
rc = ABT_eventual_free(&dca->dca_chore_eventual);
D_ASSERTF(rc == ABT_SUCCESS, "ABT_eventual_free: %d\n", rc);
} else {
dss_chore_diy(&dca->dca_chore, dtx_rpc_helper);
dss_chore_diy(&dca->dca_chore);
}

rc = dtx_req_wait(&dca->dca_dra);
dss_chore_deregister(&dca->dca_chore);
if (rc == 0 || rc == -DER_NONEXIST)
goto next;

Expand All @@ -781,9 +788,9 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes,
*/
break;
case DTX_REFRESH:
D_ASSERTF(length < DTX_RPC_STEP_LENGTH,
D_ASSERTF(length < DTX_PRI_RPC_STEP_LENGTH,
"Too long list for DTX refresh: %u vs %u\n",
length, DTX_RPC_STEP_LENGTH);
length, DTX_PRI_RPC_STEP_LENGTH);
break;
default:
D_ASSERTF(0, "Invalid DTX opc %u\n", opc);
Expand Down Expand Up @@ -1580,6 +1587,9 @@ dtx_coll_rpc_prep(struct ds_cont_child *cont, struct dtx_coll_entry *dce, uint32
dcra->dcra_hints = dce->dce_hints;
dcra->dcra_hint_sz = dce->dce_hint_sz;

dcra->dcra_chore.cho_func = dtx_coll_rpc_helper;
dcra->dcra_chore.cho_priority = 1;

rc = ABT_future_create(1, NULL, &dcra->dcra_future);
if (rc != ABT_SUCCESS) {
D_ERROR("ABT_future_create failed for coll DTX ("DF_DTI") RPC %u: rc = %d\n",
Expand All @@ -1588,9 +1598,15 @@ dtx_coll_rpc_prep(struct ds_cont_child *cont, struct dtx_coll_entry *dce, uint32
}

if (dss_has_enough_helper()) {
rc = dss_chore_delegate(&dcra->dcra_chore, dtx_coll_rpc_helper);
/* The cho_credits maybe over-estimated, no matter. */
dcra->dcra_chore.cho_credits = dcra->dcra_ranks->rl_nr < DTX_COLL_TREE_WIDTH ?
dcra->dcra_ranks->rl_nr : DTX_COLL_TREE_WIDTH;
dcra->dcra_chore.cho_hint = NULL;
rc = dss_chore_register(&dcra->dcra_chore);
if (rc != 0)
ABT_future_free(&dcra->dcra_future);
} else {
dss_chore_diy(&dcra->dcra_chore, dtx_coll_rpc_helper);
dss_chore_diy(&dcra->dcra_chore);
rc = 0;
}

Expand All @@ -1608,6 +1624,7 @@ dtx_coll_rpc_post(struct dtx_coll_rpc_args *dcra, int ret)
"Collective DTX wait req for opc %u, future %p done, rc %d, result %d\n",
dcra->dcra_opc, dcra->dcra_future, rc, dcra->dcra_result);
ABT_future_free(&dcra->dcra_future);
dss_chore_deregister(&dcra->dcra_chore);
}

return ret != 0 ? ret : dcra->dcra_result;
Expand Down
8 changes: 8 additions & 0 deletions src/dtx/dtx_srv.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* (C) Copyright 2019-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -55,6 +56,13 @@ dtx_tls_init(int tags, int xs_id, int tgt_id)
D_WARN("Failed to create DTX async commit latency metric: " DF_RC"\n",
DP_RC(rc));

rc = d_tm_add_metric(&tls->dt_chore_retry, D_TM_COUNTER,
"DTX chore retry", NULL,
"io/dtx/chore_retry/tgt_%u", tgt_id);
if (rc != DER_SUCCESS)
D_WARN("Failed to create DTX chore retry metric: " DF_RC"\n",
DP_RC(rc));

return tls;
}

Expand Down
11 changes: 9 additions & 2 deletions src/dtx/tests/ult_mock.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* (C) Copyright 2023-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -74,14 +75,20 @@ struct dss_chore;
typedef enum dss_chore_status (*dss_chore_func_t)(struct dss_chore *chore, bool is_reentrance);

void
dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func)
dss_chore_diy(struct dss_chore *chore)
{
assert_true(false);
}

int
dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func)
dss_chore_register(struct dss_chore *chore)
{
assert_true(false);
return -DER_NOMEM;
}

void
dss_chore_deregister(struct dss_chore *chore)
{
assert_true(false);
}
Loading

0 comments on commit 5fe96dd

Please sign in to comment.