Skip to content

Commit

Permalink
DAOS-14557 object: collectively query key
Browse files Browse the repository at this point in the history
Currently, get file size (query key) for large-scaled object is very slow.
Because DAOS does not has logic (metadata) center to store the file size.
The client needs to send query RPCs to all related redundancy groups, then
aggregate related query results. For EC object with parity rotation, it is
worse, the client has to send query RPCs to all shards in every redundancy
group. It will cause a lot of query RPCs. For large-scaled object (such as
the "GX" object class), current method is too heavy loaded for both client
and servers.

To resolve such bad situation, we introduce new mechanism: collective query.
The basic idea is that: before sending query RPCs to related engines, based
on the shards to be queried, the client will generate the bitmap for related
VOS targets on each involved engine. For each engine with non-empty bitmap,
the client only sends one OBJ_COLL_QUERY RPC to it, then the engine generates
collective tasks (based on the bitmap) to query related object shards on each
own local VOS targets. That can save many query RPCs if multiple VOS targets
reside on relative concentrated engines.

On the other hand, it is inefficient for single client to send out hundreds
or even thousands of query RPCs concurrently, that will cause a lot of DRAM
resource being occupied for relative long time. To speedup, once the RPCs
count exceeds some threshold, the client will ask some engine(s) to help to
forward the collective query RPC to other related engine(s), and reply the
aggregated query results to the client. From client perspective, forwarding
causes one additional RPC round-trip, but it is better than single client
handling hundreds or thousands of query RPCs by itself.

{cli,srv}_obj.c are too large to be read, split collective operation related
logic from them into {cli,srv}_coll.c for help reading.

Required-githooks: true

Signed-off-by: Fan Yong <[email protected]>
  • Loading branch information
Nasf-Fan committed Jan 21, 2024
1 parent be4402b commit 3dab142
Show file tree
Hide file tree
Showing 22 changed files with 2,612 additions and 1,207 deletions.
4 changes: 1 addition & 3 deletions src/dtx/dtx_coll.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2023 Intel Corporation.
* (C) Copyright 2023-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -161,7 +161,6 @@ dtx_coll_prep(uuid_t po_uuid, daos_unit_oid_t oid, struct dtx_id *xid, struct dt
/* Skip non-healthy one. */
if (target->ta_comp.co_status != PO_COMP_ST_UP &&
target->ta_comp.co_status != PO_COMP_ST_UPIN &&
target->ta_comp.co_status != PO_COMP_ST_NEW &&
target->ta_comp.co_status != PO_COMP_ST_DRAIN)
continue;

Expand Down Expand Up @@ -238,7 +237,6 @@ dtx_coll_prep(uuid_t po_uuid, daos_unit_oid_t oid, struct dtx_id *xid, struct dt
/* Skip non-healthy one. */
if (target->ta_comp.co_status != PO_COMP_ST_UP &&
target->ta_comp.co_status != PO_COMP_ST_UPIN &&
target->ta_comp.co_status != PO_COMP_ST_NEW &&
target->ta_comp.co_status != PO_COMP_ST_DRAIN)
continue;

Expand Down
54 changes: 33 additions & 21 deletions src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2019-2023 Intel Corporation.
* (C) Copyright 2019-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -1959,9 +1959,7 @@ dtx_comp_cb(void **arg)
uint32_t i;
uint32_t j;

if (dlh->dlh_agg_cb != NULL) {
dlh->dlh_result = dlh->dlh_agg_cb(dlh, dlh->dlh_allow_failure);
} else {
if (!dlh->dlh_need_agg) {
for (i = dlh->dlh_forward_idx, j = 0; j < dlh->dlh_forward_cnt; i++, j++) {
sub = &dlh->dlh_subs[i];

Expand Down Expand Up @@ -2098,16 +2096,15 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
dlh->dlh_normal_sub_done = 0;
dlh->dlh_drop_cond = 0;
dlh->dlh_forward_idx = 0;
dlh->dlh_need_agg = 0;
dlh->dlh_agg_done = 0;

if (sub_cnt > DTX_EXEC_STEP_LENGTH) {
dlh->dlh_forward_cnt = DTX_EXEC_STEP_LENGTH;
dlh->dlh_agg_cb = NULL;
} else {
dlh->dlh_forward_cnt = sub_cnt;
if (likely(dlh->dlh_delay_sub_cnt == 0))
dlh->dlh_agg_cb = agg_cb;
else
dlh->dlh_agg_cb = NULL;
if (likely(dlh->dlh_delay_sub_cnt == 0) && agg_cb != NULL)
dlh->dlh_need_agg = 1;
}

if (dlh->dlh_normal_sub_cnt == 0)
Expand Down Expand Up @@ -2162,8 +2159,8 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
dlh->dlh_forward_idx += dlh->dlh_forward_cnt;
if (sub_cnt <= DTX_EXEC_STEP_LENGTH) {
dlh->dlh_forward_cnt = sub_cnt;
if (likely(dlh->dlh_delay_sub_cnt == 0))
dlh->dlh_agg_cb = agg_cb;
if (likely(dlh->dlh_delay_sub_cnt == 0) && agg_cb != NULL)
dlh->dlh_need_agg = 1;
}

D_DEBUG(DB_IO, "More dispatch sub-requests for "DF_DTI", normal %u, "
Expand All @@ -2176,18 +2173,25 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
}

dlh->dlh_normal_sub_done = 1;

if (likely(dlh->dlh_delay_sub_cnt == 0))
goto out;

dlh->dlh_drop_cond = 1;

if (agg_cb != 0 && allow_failure != 0) {
rc = agg_cb(dlh, allow_failure);
if (rc == allow_failure)
if (agg_cb != NULL) {
remote_rc = agg_cb(dlh, func_arg);
dlh->dlh_agg_done = 1;
if (remote_rc == allow_failure)
dlh->dlh_drop_cond = 0;
else if (remote_rc != 0)
D_GOTO(out, rc = remote_rc);
}

if (likely(dlh->dlh_delay_sub_cnt == 0))
goto out;

/* Need more aggregation for delayed sub-requests. */
dlh->dlh_agg_done = 0;
if (agg_cb != NULL)
dlh->dlh_need_agg = 1;

D_ASSERT(dlh->dlh_future == ABT_FUTURE_NULL);

/*
Expand All @@ -2200,7 +2204,6 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
D_GOTO(out, rc = dss_abterr2der(rc));
}

dlh->dlh_agg_cb = agg_cb;
dlh->dlh_forward_idx = 0;
/* The ones without DELAY flag will be skipped when scan the targets array. */
dlh->dlh_forward_cnt = dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt;
Expand All @@ -2224,6 +2227,15 @@ dtx_leader_exec_ops(struct dtx_leader_handle *dlh, dtx_sub_func_t func,
allow_failure, local_rc, remote_rc);

out:
/* The agg_cb may contain cleanup, let's do it even if hit failure at some former step. */
if (agg_cb != NULL && !dlh->dlh_agg_done) {
remote_rc = agg_cb(dlh, func_arg);
dlh->dlh_agg_done = 1;
if (remote_rc != 0 && remote_rc != allow_failure &&
(rc == 0 || rc == allow_failure))
rc = remote_rc;
}

if (rc == 0 && local_rc == allow_failure &&
(dlh->dlh_normal_sub_cnt + dlh->dlh_delay_sub_cnt == 0 || remote_rc == allow_failure))
rc = allow_failure;
Expand Down Expand Up @@ -2335,7 +2347,7 @@ dtx_leader_get(struct ds_pool *pool, struct dtx_memberships *mbs, daos_unit_oid_
D_GOTO(out, rc);

/* The target that (re-)joined the system after DTX cannot be the leader. */
if (rc == 1 && (*p_tgt)->ta_comp.co_ver <= version)
if (rc == 1 && (*p_tgt)->ta_comp.co_in_ver <= version)
D_GOTO(out, rc = 0);
}

Expand Down Expand Up @@ -2371,7 +2383,7 @@ dtx_leader_get(struct ds_pool *pool, struct dtx_memberships *mbs, daos_unit_oid_
D_ASSERT(rc == 1);

/* The target that (re-)joined the system after DTX cannot be the leader. */
if ((*p_tgt)->ta_comp.co_ver <= version)
if ((*p_tgt)->ta_comp.co_in_ver <= version)
D_GOTO(out, rc = 0);
}

Expand Down
9 changes: 7 additions & 2 deletions src/dtx/dtx_internal.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2019-2023 Intel Corporation.
* (C) Copyright 2019-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -180,7 +180,12 @@ extern uint32_t dtx_batched_ult_max;
*/
#define DTX_INLINE_MBS_SIZE 512

#define DTX_COLL_TREE_WIDTH 16
/*
* The branch ratio for the KNOMIAL tree when bcast collective DTX RPC (commit/abort/check)
* to related engines. Based on the experience, the value which is not less than 4 may give
* relative better performance, cannot be too large (such as more than 10).
*/
#define DTX_COLL_TREE_WIDTH 8

extern struct crt_corpc_ops dtx_coll_commit_co_ops;
extern struct crt_corpc_ops dtx_coll_abort_co_ops;
Expand Down
3 changes: 1 addition & 2 deletions src/dtx/dtx_rpc.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2019-2023 Intel Corporation.
* (C) Copyright 2019-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -517,7 +517,6 @@ dtx_classify_one(struct ds_pool *pool, daos_handle_t tree, d_list_t *head, int *
/* Skip non-healthy one. */
if (target->ta_comp.co_status != PO_COMP_ST_UP &&
target->ta_comp.co_status != PO_COMP_ST_UPIN &&
target->ta_comp.co_status != PO_COMP_ST_NEW &&
target->ta_comp.co_status != PO_COMP_ST_DRAIN)
continue;

Expand Down
8 changes: 5 additions & 3 deletions src/include/daos_srv/dtx_srv.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2019-2023 Intel Corporation.
* (C) Copyright 2019-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -143,6 +143,7 @@ struct dtx_sub_status {
int dss_result;
uint32_t dss_version;
uint32_t dss_comp:1;
void *dss_data;
};

struct dtx_coll_entry {
Expand All @@ -157,7 +158,7 @@ struct dtx_coll_entry {
};

struct dtx_leader_handle;
typedef int (*dtx_agg_cb_t)(struct dtx_leader_handle *dlh, int allow_failure);
typedef int (*dtx_agg_cb_t)(struct dtx_leader_handle *dlh, void *arg);

/* Transaction handle on the leader node to manage the transaction */
struct dtx_leader_handle {
Expand All @@ -176,10 +177,11 @@ struct dtx_leader_handle {
/* The future to wait for sub requests to finish. */
ABT_future dlh_future;

dtx_agg_cb_t dlh_agg_cb;
int32_t dlh_allow_failure;
/* Normal sub requests have been processed. */
uint32_t dlh_normal_sub_done:1,
dlh_need_agg:1,
dlh_agg_done:1,
/* For collective DTX. */
dlh_coll:1,
/* Only forward RPC, but neither commit nor abort DTX. */
Expand Down
4 changes: 2 additions & 2 deletions src/object/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def scons():
'obj_enum.c', 'obj_class_def.c', "obj_layout.c"])

# Object client library
dc_obj_tgts = denv.SharedObject(['cli_obj.c', 'cli_shard.c',
dc_obj_tgts = denv.SharedObject(['cli_obj.c', 'cli_shard.c', 'cli_coll.c',
'cli_mod.c', 'cli_ec.c', 'cli_csum.c',
'obj_verify.c'])
libdaos_tgts.extend(dc_obj_tgts + common_tgts)
Expand All @@ -32,7 +32,7 @@ def scons():

senv.Append(CPPDEFINES=['-DDAOS_PMEM_BUILD'])
srv = senv.d_library('obj',
common_tgts + ['srv_obj.c', 'srv_mod.c',
common_tgts + ['srv_obj.c', 'srv_coll.c', 'srv_mod.c',
'srv_obj_remote.c', 'srv_ec.c',
'srv_obj_migrate.c', 'srv_enum.c',
'srv_cli.c', 'srv_ec_aggregate.c',
Expand Down
Loading

0 comments on commit 3dab142

Please sign in to comment.