Skip to content

Commit

Permalink
DAOS-16469 engine: bind helper - b26
Browse files Browse the repository at this point in the history
When a main IO XS wants to add some task to helper XS (in spite of via chore
queue or creating new ULT), it needs to take lock on related helper XS. Under
the dss_helper_pool mode, if there are a lot of ULTs from different main IO XS
doing that concurrently, the overhead caused by lock contention may be high. To
reduce such overhead, we will divide main IO XS into small groups, and bind each
group to a helper XS: the ones in the same group will share one helper, different
groups use different helpers. Then above lock contention will be restricted inside
the group. Depends on the system configuration, such solution may be unnecessary,
for example, there is only single helper per engine, or very limited targets (less
than 4 or about) per engine. So this feature will be configurable via environment
variable DAOS_BIND_HELPER that is unset by default.

NOTE:

1. Binding helper is only available when the count of helper XS is non-zero and
   smaller than the count of main IO XS.
2. If multiple sockets mode is enabled (dss_numa_nr > 1), then do not bind helper
   for simplification.
3. To be load balanced, if the count of main IO XS is not the integral multiple
   of the helper XS, then binding helper will be disabled automatically.

Allow-unstable-test: true

Signed-off-by: Fan Yong <[email protected]>
  • Loading branch information
Nasf-Fan committed Oct 21, 2024
1 parent f8682fb commit 54e3af4
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 41 deletions.
4 changes: 2 additions & 2 deletions src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -2227,7 +2227,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
D_GOTO(out, rc = dss_abterr2der(rc));
}

rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore);
rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore, true);
if (rc != 0) {
DL_ERROR(rc, "chore create failed [%u, %u] (2)", dlh->dlh_forward_idx,
dlh->dlh_forward_cnt);
Expand Down Expand Up @@ -2306,7 +2306,7 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
/* The ones without DELAY flag will be skipped when scan the targets array. */
dlh->dlh_forward_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt;

rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore);
rc = dss_chore_delegate(&dtx_chore.chore, dtx_leader_exec_ops_chore, true);
if (rc != 0) {
DL_ERROR(rc, "chore create failed (4)");
ABT_future_free(&dlh->dlh_future);
Expand Down
4 changes: 2 additions & 2 deletions src/dtx/dtx_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes,
goto out;
}

rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper);
rc = dss_chore_delegate(&dca->dca_chore, dtx_rpc_helper, false);
if (rc != 0)
goto out;

Expand Down Expand Up @@ -1557,7 +1557,7 @@ dtx_coll_rpc_prep(struct ds_cont_child *cont, struct dtx_coll_entry *dce, uint32
}

if (dss_has_enough_helper()) {
rc = dss_chore_delegate(&dcra->dcra_chore, dtx_coll_rpc_helper);
rc = dss_chore_delegate(&dcra->dcra_chore, dtx_coll_rpc_helper, false);
} else {
dss_chore_diy(&dcra->dcra_chore, dtx_coll_rpc_helper);
rc = 0;
Expand Down
37 changes: 37 additions & 0 deletions src/engine/srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,30 @@ bool dss_helper_pool;
/** Bypass for the nvme health check */
bool dss_nvme_bypass_health_check;

/**
* When a main IO XS wants to add some task to helper XS (in spite of via chore
* queue or creating new ULT), it needs to take lock on related helper XS. Under
* the dss_helper_pool mode, if there are a lot of ULTs from different main IO XS
* doing that concurrently, the overhead caused by lock contention may be high. To
* reduce such overhead, we will divide main IO XS into small groups, and bind each
* group to a helper XS: the ones in the same group will share one helper, different
* groups use different helpers. Then above lock contention will be restricted inside
* the group. Depends on the system configuration, such solution may be unnecessary,
* for example, there is only single helper per engine, or very limited targets (less
* than 4 or about) per engine. So this feature will be configurable via environment
* variable DAOS_BIND_HELPER that is unset by default.
*
* NOTE:
*
* 1. Binding helper is only available when the count of helper XS is non-zero and
* smaller than the count of main IO XS.
* 2. If multiple sockets mode is enabled (dss_numa_nr > 1), then do not bind helper
* for simplification.
* 3. To be load balanced, if the count of main IO XS is not the integral multiple
* of the helper XS, then binding helper will be disabled automatically.
*/
bool dss_bind_helper;

static daos_epoch_t dss_start_epoch;

unsigned int
Expand Down Expand Up @@ -814,6 +838,10 @@ dss_start_one_xstream(hwloc_cpuset_t cpus, int tag, int xs_id)
xs_id);
}

if (dx->dx_main_xs && dss_bind_helper)
dx->dx_helper_id = dss_sys_xs_nr + dss_tgt_nr +
dx->dx_tgt_id % dss_tgt_offload_xs_nr;

/** create ABT scheduler in charge of this xstream */
rc = dss_sched_init(dx);
if (rc != 0) {
Expand Down Expand Up @@ -1082,6 +1110,15 @@ dss_xstreams_init(void)

D_ASSERT(dss_tgt_nr >= 1);

if (!dss_helper_pool || dss_tgt_nr <= dss_tgt_offload_xs_nr ||
dss_tgt_nr % dss_tgt_offload_xs_nr != 0 || dss_numa_nr > 1)
dss_bind_helper = false;
else
d_getenv_bool("DAOS_BIND_HELPER", &dss_bind_helper);
D_INFO("Binding helper is %s: tgt_nr %d, helper_nr %d, numa_nr %d\n",
dss_bind_helper ? "enabled" : "disabled",
dss_tgt_nr, dss_tgt_offload_xs_nr, dss_numa_nr);

d_getenv_bool("DAOS_SCHED_PRIO_DISABLED", &sched_prio_disabled);
if (sched_prio_disabled)
D_INFO("ULT prioritizing is disabled.\n");
Expand Down
6 changes: 5 additions & 1 deletion src/engine/srv_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ struct dss_xstream {
int dx_ctx_id;
/* Cart progress timeout in micro-seconds */
unsigned int dx_timeout;
int dx_helper_id;
bool dx_main_xs; /* true for main XS */
bool dx_comm; /* true with cart context */
bool dx_iofw; /* true for DSS_XS_IOFW XS */
Expand All @@ -112,7 +113,10 @@ struct dss_xstream {
#endif
bool dx_progress_started; /* Network poll started */
int dx_tag; /** tag for xstream */
struct dss_chore_queue dx_chore_queue;
/* Chore queue for IO forwarding. */
struct dss_chore_queue dx_chore_iofw_queue;
/* Chore queue for other tasks, such as DTX. */
struct dss_chore_queue dx_chore_misc_queue;
};

/** Engine module's metrics */
Expand Down
117 changes: 82 additions & 35 deletions src/engine/ult.c
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,13 @@ sched_ult2xs(int xs_type, int tgt_id)
{
uint32_t xs_id;

if (xs_type == DSS_XS_VOS || xs_type == DSS_XS_OFFLOAD || xs_type == DSS_XS_IOFW)
D_ASSERT(tgt_id >= 0 && tgt_id < dss_tgt_nr);
if (xs_type == DSS_XS_OFFLOAD || xs_type == DSS_XS_IOFW) {
struct dss_xstream *dx = dss_current_xstream();

if (dx->dx_helper_id > 0)
return dx->dx_helper_id;
}

switch (xs_type) {
case DSS_XS_SELF:
return DSS_XS_SELF;
Expand Down Expand Up @@ -476,6 +481,7 @@ sched_ult2xs(int xs_type, int tgt_id)
xs_id = (DSS_MAIN_XS_ID(tgt_id) + 1) % dss_tgt_nr;
break;
case DSS_XS_VOS:
D_ASSERT(tgt_id >= 0 && tgt_id < dss_tgt_nr);
xs_id = DSS_MAIN_XS_ID(tgt_id);
break;
default:
Expand Down Expand Up @@ -717,11 +723,12 @@ dss_chore_ult(void *arg)
*
* \param[in] chore address of the embedded chore object
* \param[in] func function to be executed via \a chore
* \param[in] iofw it is for IO forward or not
*
* \retval -DER_CANCEL chore queue stopping
*/
int
dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func)
dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func, bool iofw)
{
struct dss_module_info *info = dss_get_module_info();
int xs_id;
Expand All @@ -747,17 +754,17 @@ dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func)
D_ASSERT(xs_id != -DER_INVAL);
dx = dss_get_xstream(xs_id);
D_ASSERT(dx != NULL);
queue = &dx->dx_chore_queue;
queue = iofw ? &dx->dx_chore_iofw_queue : &dx->dx_chore_misc_queue;
D_ASSERT(queue != NULL);

ABT_mutex_lock(queue->chq_mutex);
ABT_mutex_spinlock(queue->chq_mutex);
if (queue->chq_stop) {
ABT_mutex_unlock(queue->chq_mutex);
return -DER_CANCELED;
}
d_list_add_tail(&chore->cho_link, &queue->chq_list);
ABT_cond_broadcast(queue->chq_cond);
ABT_mutex_unlock(queue->chq_mutex);
ABT_cond_broadcast(queue->chq_cond);

D_DEBUG(DB_TRACE, "%p: tgt_id=%d -> xs_id=%d dx.tgt_id=%d\n", chore, info->dmi_tgt_id,
xs_id, dx->dx_tgt_id);
Expand Down Expand Up @@ -848,11 +855,59 @@ dss_chore_queue_ult(void *arg)
D_DEBUG(DB_TRACE, "end\n");
}

static int
chore_queue_start_one(struct dss_xstream *dx, struct dss_chore_queue *queue)
{
int rc;

rc = daos_abt_thread_create(dx->dx_sp, dss_free_stack_cb, dx->dx_pools[DSS_POOL_GENERIC],
dss_chore_queue_ult, queue, ABT_THREAD_ATTR_NULL,
&queue->chq_ult);
if (rc != 0) {
D_ERROR("failed to create chore queue ULT: %d\n", rc);
rc = dss_abterr2der(rc);
}

return rc;
}

static void
chore_queue_stop_one(struct dss_chore_queue *queue)
{
ABT_mutex_spinlock(queue->chq_mutex);
queue->chq_stop = true;
ABT_mutex_unlock(queue->chq_mutex);
ABT_cond_broadcast(queue->chq_cond);
ABT_thread_free(&queue->chq_ult);
}

int
dss_chore_queue_init(struct dss_xstream *dx)
dss_chore_queue_start(struct dss_xstream *dx)
{
int rc;

rc = chore_queue_start_one(dx, &dx->dx_chore_iofw_queue);
if (rc != 0)
return rc;

rc = chore_queue_start_one(dx, &dx->dx_chore_misc_queue);
if (rc != 0)
chore_queue_stop_one(&dx->dx_chore_iofw_queue);

return rc;
}

void
dss_chore_queue_stop(struct dss_xstream *dx)
{
struct dss_chore_queue *queue = &dx->dx_chore_queue;
int rc;
chore_queue_stop_one(&dx->dx_chore_iofw_queue);
chore_queue_stop_one(&dx->dx_chore_misc_queue);
}

static int
chore_queue_init_one(struct dss_chore_queue *queue)
{
int rc;

D_INIT_LIST_HEAD(&queue->chq_list);
queue->chq_stop = false;
Expand All @@ -873,40 +928,32 @@ dss_chore_queue_init(struct dss_xstream *dx)
return 0;
}

int
dss_chore_queue_start(struct dss_xstream *dx)
static void
chore_queue_fini_one(struct dss_chore_queue *queue)
{
struct dss_chore_queue *queue = &dx->dx_chore_queue;
int rc;

rc = daos_abt_thread_create(dx->dx_sp, dss_free_stack_cb, dx->dx_pools[DSS_POOL_GENERIC],
dss_chore_queue_ult, queue, ABT_THREAD_ATTR_NULL,
&queue->chq_ult);
if (rc != 0) {
D_ERROR("failed to create chore queue ULT: %d\n", rc);
return dss_abterr2der(rc);
}

return 0;
ABT_cond_free(&queue->chq_cond);
ABT_mutex_free(&queue->chq_mutex);
}

void
dss_chore_queue_stop(struct dss_xstream *dx)
int
dss_chore_queue_init(struct dss_xstream *dx)
{
struct dss_chore_queue *queue = &dx->dx_chore_queue;
int rc;

ABT_mutex_lock(queue->chq_mutex);
queue->chq_stop = true;
ABT_cond_broadcast(queue->chq_cond);
ABT_mutex_unlock(queue->chq_mutex);
ABT_thread_free(&queue->chq_ult);
rc = chore_queue_init_one(&dx->dx_chore_iofw_queue);
if (rc != 0)
return rc;

rc = chore_queue_init_one(&dx->dx_chore_misc_queue);
if (rc != 0)
chore_queue_fini_one(&dx->dx_chore_iofw_queue);

return rc;
}

void
dss_chore_queue_fini(struct dss_xstream *dx)
{
struct dss_chore_queue *queue = &dx->dx_chore_queue;

ABT_cond_free(&queue->chq_cond);
ABT_mutex_free(&queue->chq_mutex);
chore_queue_fini_one(&dx->dx_chore_iofw_queue);
chore_queue_fini_one(&dx->dx_chore_misc_queue);
}
2 changes: 1 addition & 1 deletion src/include/daos_srv/daos_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ struct dss_chore {
dss_chore_func_t cho_func;
};

int dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func);
int dss_chore_delegate(struct dss_chore *chore, dss_chore_func_t func, bool iofw);
void dss_chore_diy(struct dss_chore *chore, dss_chore_func_t func);

bool engine_in_check(void);
Expand Down

0 comments on commit 54e3af4

Please sign in to comment.