From f68d15112f2f7b91605988a03a16c12c52ca63b9 Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Wed, 16 Oct 2024 08:12:05 +0800 Subject: [PATCH] DAOS-16469 engine: bind helper - b26 When a main IO XS wants to add some task to helper XS (in spite of via chore queue or creating new ULT), it needs to take lock on related helper XS. Under the dss_helper_pool mode, if there are a lot of ULTs from different main IO XS doing that concurrently, the overhead caused by lock contention may be high. To reduce such overhead, we will divide main IO XS into small groups, and bind each group to a helper XS: the ones in the same group will share one helper, different groups use different helpers. Then above lock contention will be restricted inside the group. Depends on the system configuration, such solution may be unnecessary, for example, there is only single helper per engine, or very limited targets (less than 4 or about) per engine. So this feature will be configurable via environment variable DAOS_BIND_HELPER that is unset by default. NOTE: 1. Binding helper is only available when the count of helper XS is non-zero and smaller than the count of main IO XS. 2. If multiple sockets mode is enabled (dss_numa_nr > 1), then do not bind helper for simplification. 3. To be load balanced, if the count of main IO XS is not the integral multiple of the helper XS, then binding helper will be disabled automatically. Allow-unstable-test: true Signed-off-by: Fan Yong --- src/dtx/dtx_common.c | 4 +- src/dtx/dtx_rpc.c | 4 +- src/dtx/tests/ult_mock.c | 2 +- src/engine/srv.c | 37 ++++++++++ src/engine/srv_internal.h | 6 +- src/engine/ult.c | 113 ++++++++++++++++++++--------- src/include/daos_srv/daos_engine.h | 2 +- 7 files changed, 128 insertions(+), 40 deletions(-) diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index ff4f2dfe4ef0..b26163e01a47 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -2227,7 +2227,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, D_GOTO(out, rc = dss_abterr2der(rc)); } - rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore); + rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore, true); if (rc != 0) { DL_ERROR(rc, "chore create failed [%u, %u] (2)", dlh->dlh_forward_idx, dlh->dlh_forward_cnt); @@ -2306,7 +2306,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, /* The ones without DELAY flag will be skipped when scan the targets array. */ dlh->dlh_forward_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt; - rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore); + rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore, true); if (rc != 0) { DL_ERROR(rc, "chore create failed (4)"); ABT_future_free(&dlh->dlh_future); diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index e654047a6218..daaea8f19b97 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -736,7 +736,7 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, goto out; } - rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper); + rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper, false); if (rc != 0) goto out; @@ -1557,7 +1557,7 @@ dtx_coll_rpc_prep(struct ds_cont_child *cont, struct dtx_coll_entry *dce, uint32 } if (dss_has_enough_helper()) { - rc = dss_chore_delegate(&dcra->dcra_chore, dtx_coll_rpc_helper); + rc = dss_chore_delegate(&dcra->dcra_chore, dtx_coll_rpc_helper, false); } else { dss_chore_diy(&dcra->dcra_chore, dtx_coll_rpc_helper); rc = 0; diff --git a/src/dtx/tests/ult_mock.c b/src/dtx/tests/ult_mock.c index 41fc639c0346..c4d9d766cdac 100644 --- a/src/dtx/tests/ult_mock.c +++ b/src/dtx/tests/ult_mock.c @@ -80,7 +80,7 @@ dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func) } int -dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func) +dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func, bool iofw) { assert_true(false); return -DER_NOMEM; diff --git a/src/engine/srv.c b/src/engine/srv.c index 9461a18e9d99..99bb2e8736e4 100644 --- a/src/engine/srv.c +++ b/src/engine/srv.c @@ -99,6 +99,30 @@ bool dss_helper_pool; /** Bypass for the nvme health check */ bool dss_nvme_bypass_health_check; +/** + * When a main IO XS wants to add some task to helper XS (in spite of via chore + * queue or creating new ULT), it needs to take lock on related helper XS. Under + * the dss_helper_pool mode, if there are a lot of ULTs from different main IO XS + * doing that concurrently, the overhead caused by lock contention may be high. To + * reduce such overhead, we will divide main IO XS into small groups, and bind each + * group to a helper XS: the ones in the same group will share one helper, different + * groups use different helpers. Then above lock contention will be restricted inside + * the group. Depends on the system configuration, such solution may be unnecessary, + * for example, there is only single helper per engine, or very limited targets (less + * than 4 or about) per engine. So this feature will be configurable via environment + * variable DAOS_BIND_HELPER that is unset by default. + * + * NOTE: + * + * 1. Binding helper is only available when the count of helper XS is non-zero and + * smaller than the count of main IO XS. + * 2. If multiple sockets mode is enabled (dss_numa_nr > 1), then do not bind helper + * for simplification. + * 3. To be load balanced, if the count of main IO XS is not the integral multiple + * of the helper XS, then binding helper will be disabled automatically. + */ +bool dss_bind_helper; + static daos_epoch_t dss_start_epoch; unsigned int @@ -814,6 +838,10 @@ dss_start_one_xstream(hwloc_cpuset_t cpus, int tag, int xs_id) xs_id); } + if (dx->dx_main_xs && dss_bind_helper) + dx->dx_helper_id = dss_sys_xs_nr + dss_tgt_nr + + dx->dx_tgt_id % dss_tgt_offload_xs_nr; + /** create ABT scheduler in charge of this xstream */ rc = dss_sched_init(dx); if (rc != 0) { @@ -1082,6 +1110,15 @@ dss_xstreams_init(void) D_ASSERT(dss_tgt_nr >= 1); + if (!dss_helper_pool || dss_tgt_nr <= dss_tgt_offload_xs_nr || + dss_tgt_nr % dss_tgt_offload_xs_nr != 0 || dss_numa_nr > 1) + dss_bind_helper = false; + else + d_getenv_bool("DAOS_BIND_HELPER", &dss_bind_helper); + D_INFO("Binding helper is %s: tgt_nr %d, helper_nr %d, numa_nr %d\n", + dss_bind_helper ? "enabled" : "disabled", + dss_tgt_nr, dss_tgt_offload_xs_nr, dss_numa_nr); + d_getenv_bool("DAOS_SCHED_PRIO_DISABLED", &sched_prio_disabled); if (sched_prio_disabled) D_INFO("ULT prioritizing is disabled.\n"); diff --git a/src/engine/srv_internal.h b/src/engine/srv_internal.h index 222f07e49060..d6f72031755e 100644 --- a/src/engine/srv_internal.h +++ b/src/engine/srv_internal.h @@ -101,6 +101,7 @@ struct dss_xstream { int dx_ctx_id; /* Cart progress timeout in micro-seconds */ unsigned int dx_timeout; + int dx_helper_id; bool dx_main_xs; /* true for main XS */ bool dx_comm; /* true with cart context */ bool dx_iofw; /* true for DSS_XS_IOFW XS */ @@ -112,7 +113,10 @@ struct dss_xstream { #endif bool dx_progress_started; /* Network poll started */ int dx_tag; /** tag for xstream */ - struct dss_chore_queue dx_chore_queue; + /* Chore queue for IO forwarding. */ + struct dss_chore_queue dx_chore_iofw_queue; + /* Chore queue for other tasks, such as DTX. */ + struct dss_chore_queue dx_chore_misc_queue; }; /** Engine module's metrics */ diff --git a/src/engine/ult.c b/src/engine/ult.c index 94a2a0f93908..1c6f6d741c1f 100644 --- a/src/engine/ult.c +++ b/src/engine/ult.c @@ -405,8 +405,13 @@ sched_ult2xs(int xs_type, int tgt_id) { uint32_t xs_id; - if (xs_type == DSS_XS_VOS || xs_type == DSS_XS_OFFLOAD || xs_type == DSS_XS_IOFW) - D_ASSERT(tgt_id >= 0 && tgt_id < dss_tgt_nr); + if (xs_type == DSS_XS_OFFLOAD || xs_type == DSS_XS_IOFW) { + struct dss_xstream *dx = dss_current_xstream(); + + if (dx->dx_helper_id > 0) + return dx->dx_helper_id; + } + switch (xs_type) { case DSS_XS_SELF: return DSS_XS_SELF; @@ -476,6 +481,7 @@ sched_ult2xs(int xs_type, int tgt_id) xs_id = (DSS_MAIN_XS_ID(tgt_id) + 1) % dss_tgt_nr; break; case DSS_XS_VOS: + D_ASSERT(tgt_id >= 0 && tgt_id < dss_tgt_nr); xs_id = DSS_MAIN_XS_ID(tgt_id); break; default: @@ -717,11 +723,12 @@ dss_chore_ult(void *arg) * * \param[in] chore address of the embedded chore object * \param[in] func function to be executed via \a chore + * \param[in] iofw it is for IO forward or not * * \retval -DER_CANCEL chore queue stopping */ int -dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func) +dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func, bool iofw) { struct dss_module_info *info = dss_get_module_info(); int xs_id; @@ -747,7 +754,7 @@ dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func) D_ASSERT(xs_id != -DER_INVAL); dx = dss_get_xstream(xs_id); D_ASSERT(dx != NULL); - queue = &dx->dx_chore_queue; + queue = iofw ? &dx->dx_chore_iofw_queue : &dx->dx_chore_misc_queue; D_ASSERT(queue != NULL); ABT_mutex_lock(queue->chq_mutex); @@ -848,11 +855,59 @@ dss_chore_queue_ult(void *arg) D_DEBUG(DB_TRACE, "end\n"); } +static int +chore_queue_start_one(struct dss_xstream *dx, struct dss_chore_queue *queue) +{ + int rc; + + rc = daos_abt_thread_create(dx->dx_sp, dss_free_stack_cb, dx->dx_pools[DSS_POOL_GENERIC], + dss_chore_queue_ult, queue, ABT_THREAD_ATTR_NULL, + &queue->chq_ult); + if (rc != 0) { + D_ERROR("failed to create chore queue ULT: %d\n", rc); + rc = dss_abterr2der(rc); + } + + return rc; +} + +static void +chore_queue_stop_one(struct dss_chore_queue *queue) +{ + ABT_mutex_lock(queue->chq_mutex); + queue->chq_stop = true; + ABT_cond_broadcast(queue->chq_cond); + ABT_mutex_unlock(queue->chq_mutex); + ABT_thread_free(&queue->chq_ult); +} + int -dss_chore_queue_init(struct dss_xstream *dx) +dss_chore_queue_start(struct dss_xstream *dx) +{ + int rc; + + rc = chore_queue_start_one(dx, &dx->dx_chore_iofw_queue); + if (rc != 0) + return rc; + + rc = chore_queue_start_one(dx, &dx->dx_chore_misc_queue); + if (rc != 0) + chore_queue_stop_one(&dx->dx_chore_iofw_queue); + + return rc; +} + +void +dss_chore_queue_stop(struct dss_xstream *dx) { - struct dss_chore_queue *queue = &dx->dx_chore_queue; - int rc; + chore_queue_stop_one(&dx->dx_chore_iofw_queue); + chore_queue_stop_one(&dx->dx_chore_misc_queue); +} + +static int +chore_queue_init_one(struct dss_chore_queue *queue) +{ + int rc; D_INIT_LIST_HEAD(&queue->chq_list); queue->chq_stop = false; @@ -873,40 +928,32 @@ dss_chore_queue_init(struct dss_xstream *dx) return 0; } -int -dss_chore_queue_start(struct dss_xstream *dx) +static void +chore_queue_fini_one(struct dss_chore_queue *queue) { - struct dss_chore_queue *queue = &dx->dx_chore_queue; - int rc; - - rc = daos_abt_thread_create(dx->dx_sp, dss_free_stack_cb, dx->dx_pools[DSS_POOL_GENERIC], - dss_chore_queue_ult, queue, ABT_THREAD_ATTR_NULL, - &queue->chq_ult); - if (rc != 0) { - D_ERROR("failed to create chore queue ULT: %d\n", rc); - return dss_abterr2der(rc); - } - - return 0; + ABT_cond_free(&queue->chq_cond); + ABT_mutex_free(&queue->chq_mutex); } -void -dss_chore_queue_stop(struct dss_xstream *dx) +int +dss_chore_queue_init(struct dss_xstream *dx) { - struct dss_chore_queue *queue = &dx->dx_chore_queue; + int rc; - ABT_mutex_lock(queue->chq_mutex); - queue->chq_stop = true; - ABT_cond_broadcast(queue->chq_cond); - ABT_mutex_unlock(queue->chq_mutex); - ABT_thread_free(&queue->chq_ult); + rc = chore_queue_init_one(&dx->dx_chore_iofw_queue); + if (rc != 0) + return rc; + + rc = chore_queue_init_one(&dx->dx_chore_misc_queue); + if (rc != 0) + chore_queue_fini_one(&dx->dx_chore_iofw_queue); + + return rc; } void dss_chore_queue_fini(struct dss_xstream *dx) { - struct dss_chore_queue *queue = &dx->dx_chore_queue; - - ABT_cond_free(&queue->chq_cond); - ABT_mutex_free(&queue->chq_mutex); + chore_queue_fini_one(&dx->dx_chore_iofw_queue); + chore_queue_fini_one(&dx->dx_chore_misc_queue); } diff --git a/src/include/daos_srv/daos_engine.h b/src/include/daos_srv/daos_engine.h index 189f0d107532..baef3b493863 100644 --- a/src/include/daos_srv/daos_engine.h +++ b/src/include/daos_srv/daos_engine.h @@ -781,7 +781,7 @@ struct dss_chore { dss_chore_func_t cho_func; }; -int dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func); +int dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func, bool iofw); void dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func); bool engine_in_check(void);