diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index ff4f2dfe4ef0..b26163e01a47 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -2227,7 +2227,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, D_GOTO(out, rc = dss_abterr2der(rc)); } - rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore); + rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore, true); if (rc != 0) { DL_ERROR(rc, "chore create failed [%u, %u] (2)", dlh->dlh_forward_idx, dlh->dlh_forward_cnt); @@ -2306,7 +2306,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func, /* The ones without DELAY flag will be skipped when scan the targets array. */ dlh->dlh_forward_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt; - rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore); + rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore, true); if (rc != 0) { DL_ERROR(rc, "chore create failed (4)"); ABT_future_free(&dlh->dlh_future); diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index e654047a6218..daaea8f19b97 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -736,7 +736,7 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, goto out; } - rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper); + rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper, false); if (rc != 0) goto out; @@ -1557,7 +1557,7 @@ dtx_coll_rpc_prep(struct ds_cont_child *cont, struct dtx_coll_entry *dce, uint32 } if (dss_has_enough_helper()) { - rc = dss_chore_delegate(&dcra->dcra_chore, dtx_coll_rpc_helper); + rc = dss_chore_delegate(&dcra->dcra_chore, dtx_coll_rpc_helper, false); } else { dss_chore_diy(&dcra->dcra_chore, dtx_coll_rpc_helper); rc = 0; diff --git a/src/dtx/tests/ult_mock.c b/src/dtx/tests/ult_mock.c index 41fc639c0346..c4d9d766cdac 100644 --- a/src/dtx/tests/ult_mock.c +++ b/src/dtx/tests/ult_mock.c @@ -80,7 +80,7 @@ dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func) } int -dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func) +dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func, bool iofw) { assert_true(false); return -DER_NOMEM; diff --git a/src/engine/srv.c b/src/engine/srv.c index 9461a18e9d99..99bb2e8736e4 100644 --- a/src/engine/srv.c +++ b/src/engine/srv.c @@ -99,6 +99,30 @@ bool dss_helper_pool; /** Bypass for the nvme health check */ bool dss_nvme_bypass_health_check; +/** + * When a main IO XS wants to add some task to helper XS (in spite of via chore + * queue or creating new ULT), it needs to take lock on related helper XS. Under + * the dss_helper_pool mode, if there are a lot of ULTs from different main IO XS + * doing that concurrently, the overhead caused by lock contention may be high. To + * reduce such overhead, we will divide main IO XS into small groups, and bind each + * group to a helper XS: the ones in the same group will share one helper, different + * groups use different helpers. Then above lock contention will be restricted inside + * the group. Depends on the system configuration, such solution may be unnecessary, + * for example, there is only single helper per engine, or very limited targets (less + * than 4 or about) per engine. So this feature will be configurable via environment + * variable DAOS_BIND_HELPER that is unset by default. + * + * NOTE: + * + * 1. Binding helper is only available when the count of helper XS is non-zero and + * smaller than the count of main IO XS. + * 2. If multiple sockets mode is enabled (dss_numa_nr > 1), then do not bind helper + * for simplification. + * 3. To be load balanced, if the count of main IO XS is not the integral multiple + * of the helper XS, then binding helper will be disabled automatically. + */ +bool dss_bind_helper; + static daos_epoch_t dss_start_epoch; unsigned int @@ -814,6 +838,10 @@ dss_start_one_xstream(hwloc_cpuset_t cpus, int tag, int xs_id) xs_id); } + if (dx->dx_main_xs && dss_bind_helper) + dx->dx_helper_id = dss_sys_xs_nr + dss_tgt_nr + + dx->dx_tgt_id % dss_tgt_offload_xs_nr; + /** create ABT scheduler in charge of this xstream */ rc = dss_sched_init(dx); if (rc != 0) { @@ -1082,6 +1110,15 @@ dss_xstreams_init(void) D_ASSERT(dss_tgt_nr >= 1); + if (!dss_helper_pool || dss_tgt_nr <= dss_tgt_offload_xs_nr || + dss_tgt_nr % dss_tgt_offload_xs_nr != 0 || dss_numa_nr > 1) + dss_bind_helper = false; + else + d_getenv_bool("DAOS_BIND_HELPER", &dss_bind_helper); + D_INFO("Binding helper is %s: tgt_nr %d, helper_nr %d, numa_nr %d\n", + dss_bind_helper ? "enabled" : "disabled", + dss_tgt_nr, dss_tgt_offload_xs_nr, dss_numa_nr); + d_getenv_bool("DAOS_SCHED_PRIO_DISABLED", &sched_prio_disabled); if (sched_prio_disabled) D_INFO("ULT prioritizing is disabled.\n"); diff --git a/src/engine/srv_internal.h b/src/engine/srv_internal.h index 222f07e49060..d6f72031755e 100644 --- a/src/engine/srv_internal.h +++ b/src/engine/srv_internal.h @@ -101,6 +101,7 @@ struct dss_xstream { int dx_ctx_id; /* Cart progress timeout in micro-seconds */ unsigned int dx_timeout; + int dx_helper_id; bool dx_main_xs; /* true for main XS */ bool dx_comm; /* true with cart context */ bool dx_iofw; /* true for DSS_XS_IOFW XS */ @@ -112,7 +113,10 @@ struct dss_xstream { #endif bool dx_progress_started; /* Network poll started */ int dx_tag; /** tag for xstream */ - struct dss_chore_queue dx_chore_queue; + /* Chore queue for IO forwarding. */ + struct dss_chore_queue dx_chore_iofw_queue; + /* Chore queue for other tasks, such as DTX. */ + struct dss_chore_queue dx_chore_misc_queue; }; /** Engine module's metrics */ diff --git a/src/engine/ult.c b/src/engine/ult.c index 94a2a0f93908..1c6f6d741c1f 100644 --- a/src/engine/ult.c +++ b/src/engine/ult.c @@ -405,8 +405,13 @@ sched_ult2xs(int xs_type, int tgt_id) { uint32_t xs_id; - if (xs_type == DSS_XS_VOS || xs_type == DSS_XS_OFFLOAD || xs_type == DSS_XS_IOFW) - D_ASSERT(tgt_id >= 0 && tgt_id < dss_tgt_nr); + if (xs_type == DSS_XS_OFFLOAD || xs_type == DSS_XS_IOFW) { + struct dss_xstream *dx = dss_current_xstream(); + + if (dx->dx_helper_id > 0) + return dx->dx_helper_id; + } + switch (xs_type) { case DSS_XS_SELF: return DSS_XS_SELF; @@ -476,6 +481,7 @@ sched_ult2xs(int xs_type, int tgt_id) xs_id = (DSS_MAIN_XS_ID(tgt_id) + 1) % dss_tgt_nr; break; case DSS_XS_VOS: + D_ASSERT(tgt_id >= 0 && tgt_id < dss_tgt_nr); xs_id = DSS_MAIN_XS_ID(tgt_id); break; default: @@ -717,11 +723,12 @@ dss_chore_ult(void *arg) * * \param[in] chore address of the embedded chore object * \param[in] func function to be executed via \a chore + * \param[in] iofw it is for IO forward or not * * \retval -DER_CANCEL chore queue stopping */ int -dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func) +dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func, bool iofw) { struct dss_module_info *info = dss_get_module_info(); int xs_id; @@ -747,7 +754,7 @@ dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func) D_ASSERT(xs_id != -DER_INVAL); dx = dss_get_xstream(xs_id); D_ASSERT(dx != NULL); - queue = &dx->dx_chore_queue; + queue = iofw ? &dx->dx_chore_iofw_queue : &dx->dx_chore_misc_queue; D_ASSERT(queue != NULL); ABT_mutex_lock(queue->chq_mutex); @@ -848,11 +855,59 @@ dss_chore_queue_ult(void *arg) D_DEBUG(DB_TRACE, "end\n"); } +static int +chore_queue_start_one(struct dss_xstream *dx, struct dss_chore_queue *queue) +{ + int rc; + + rc = daos_abt_thread_create(dx->dx_sp, dss_free_stack_cb, dx->dx_pools[DSS_POOL_GENERIC], + dss_chore_queue_ult, queue, ABT_THREAD_ATTR_NULL, + &queue->chq_ult); + if (rc != 0) { + D_ERROR("failed to create chore queue ULT: %d\n", rc); + rc = dss_abterr2der(rc); + } + + return rc; +} + +static void +chore_queue_stop_one(struct dss_chore_queue *queue) +{ + ABT_mutex_lock(queue->chq_mutex); + queue->chq_stop = true; + ABT_cond_broadcast(queue->chq_cond); + ABT_mutex_unlock(queue->chq_mutex); + ABT_thread_free(&queue->chq_ult); +} + int -dss_chore_queue_init(struct dss_xstream *dx) +dss_chore_queue_start(struct dss_xstream *dx) +{ + int rc; + + rc = chore_queue_start_one(dx, &dx->dx_chore_iofw_queue); + if (rc != 0) + return rc; + + rc = chore_queue_start_one(dx, &dx->dx_chore_misc_queue); + if (rc != 0) + chore_queue_stop_one(&dx->dx_chore_iofw_queue); + + return rc; +} + +void +dss_chore_queue_stop(struct dss_xstream *dx) { - struct dss_chore_queue *queue = &dx->dx_chore_queue; - int rc; + chore_queue_stop_one(&dx->dx_chore_iofw_queue); + chore_queue_stop_one(&dx->dx_chore_misc_queue); +} + +static int +chore_queue_init_one(struct dss_chore_queue *queue) +{ + int rc; D_INIT_LIST_HEAD(&queue->chq_list); queue->chq_stop = false; @@ -873,40 +928,32 @@ dss_chore_queue_init(struct dss_xstream *dx) return 0; } -int -dss_chore_queue_start(struct dss_xstream *dx) +static void +chore_queue_fini_one(struct dss_chore_queue *queue) { - struct dss_chore_queue *queue = &dx->dx_chore_queue; - int rc; - - rc = daos_abt_thread_create(dx->dx_sp, dss_free_stack_cb, dx->dx_pools[DSS_POOL_GENERIC], - dss_chore_queue_ult, queue, ABT_THREAD_ATTR_NULL, - &queue->chq_ult); - if (rc != 0) { - D_ERROR("failed to create chore queue ULT: %d\n", rc); - return dss_abterr2der(rc); - } - - return 0; + ABT_cond_free(&queue->chq_cond); + ABT_mutex_free(&queue->chq_mutex); } -void -dss_chore_queue_stop(struct dss_xstream *dx) +int +dss_chore_queue_init(struct dss_xstream *dx) { - struct dss_chore_queue *queue = &dx->dx_chore_queue; + int rc; - ABT_mutex_lock(queue->chq_mutex); - queue->chq_stop = true; - ABT_cond_broadcast(queue->chq_cond); - ABT_mutex_unlock(queue->chq_mutex); - ABT_thread_free(&queue->chq_ult); + rc = chore_queue_init_one(&dx->dx_chore_iofw_queue); + if (rc != 0) + return rc; + + rc = chore_queue_init_one(&dx->dx_chore_misc_queue); + if (rc != 0) + chore_queue_fini_one(&dx->dx_chore_iofw_queue); + + return rc; } void dss_chore_queue_fini(struct dss_xstream *dx) { - struct dss_chore_queue *queue = &dx->dx_chore_queue; - - ABT_cond_free(&queue->chq_cond); - ABT_mutex_free(&queue->chq_mutex); + chore_queue_fini_one(&dx->dx_chore_iofw_queue); + chore_queue_fini_one(&dx->dx_chore_misc_queue); } diff --git a/src/include/daos_srv/daos_engine.h b/src/include/daos_srv/daos_engine.h index 189f0d107532..baef3b493863 100644 --- a/src/include/daos_srv/daos_engine.h +++ b/src/include/daos_srv/daos_engine.h @@ -781,7 +781,7 @@ struct dss_chore { dss_chore_func_t cho_func; }; -int dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func); +int dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func, bool iofw); void dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func); bool engine_in_check(void);