Skip to content

Commit

Permalink
DAOS-15540 object: add bulk transfer timeout
Browse files Browse the repository at this point in the history
Add timeout mechanism for bulk transfer.

During bulk transfer, it wait the eventual until it reach
RPC timeout, which it will abort the inflight bulk.

Signed-off-by: Di Wang <[email protected]>
  • Loading branch information
wangdi1 committed Jan 27, 2025
1 parent c5f248b commit ec0e9f0
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 4 deletions.
37 changes: 37 additions & 0 deletions src/engine/srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,43 @@ dss_sleep(uint64_t msec)
return 0;
}

/**
* Wait for the eventual is ready until it is timeout.
*
* \param[in] eventual eventual to be waitted.
* \param[in] status eventual status.
* \param[in] timeout timeout seconds.
* \param[in] step sleep seconds if it is not ready.
*
* \return status if eventual is ready or failed within the timeout.
* DER_TIMEOUT if it is not ready within timeout.
*/
int
dss_eventual_timeout_wait(ABT_eventual eventual, void *status, uint32_t timeout, uint32_t step)
{
ABT_bool is_ready;
uint64_t start = daos_gettime_coarse();
int rc;

while (1) {
rc = ABT_eventual_test(eventual, status, &is_ready);
if (rc != ABT_SUCCESS || is_ready == ABT_TRUE) {
if (rc != ABT_SUCCESS)
rc = dss_abterr2der(rc);
break;
}

if (daos_gettime_coarse() - start >= timeout) {
D_ERROR("Started from "DF_U64", timeout after %u\n", start, timeout);
rc = -DER_TIMEDOUT;
break;
}
dss_sleep(step);
}

return rc;
}

struct dss_rpc_cntr *
dss_rpc_cntr_get(enum dss_rpc_cntr_id id)
{
Expand Down
3 changes: 3 additions & 0 deletions src/include/daos/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,9 @@ enum {
#define DAOS_POOL_EVICT_FAIL (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xa0)
#define DAOS_POOL_RFCHECK_FAIL (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xa1)

#define DAOS_OBJ_FAIL_BULK_TIMEOUT (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xa3)
#define DAOS_OBJ_FAIL_BULK_ABORT (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xa4)

#define DAOS_CHK_CONT_ORPHAN (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xb0)
#define DAOS_CHK_CONT_BAD_LABEL (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xb1)
#define DAOS_CHK_LEADER_BLOCK (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0xb2)
Expand Down
3 changes: 3 additions & 0 deletions src/include/daos_srv/daos_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,9 @@ int dss_ult_periodic(void (*func)(void *), void *arg, int xs_type, int tgt_id,

int dss_sleep(uint64_t ms);

int dss_eventual_timeout_wait(ABT_eventual eventual, void *status, uint32_t timeout,
uint32_t step);

/* Pack return codes with additional argument to reduce */
struct dss_stream_arg_type {
/** return value */
Expand Down
4 changes: 4 additions & 0 deletions src/object/srv_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ struct migrate_cont_hdl {
struct obj_bulk_args {
ABT_eventual eventual;
uint64_t bulk_size;
crt_bulk_t *bulk_hdls;
crt_bulk_opid_t *bulk_opids;
uint16_t bulk_hdls_max_size;
uint16_t bulk_hdls_size;
int bulks_inflight;
int result;
bool inited;
Expand Down
120 changes: 116 additions & 4 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,63 @@ obj_rw_reply(crt_rpc_t *rpc, int status, uint64_t epoch, bool release_input,
}
}

#define BULK_HANDLE_MIN_SIZE 5
static int
obj_bulk_args_insert_hdl_opid(struct obj_bulk_args *arg, crt_bulk_t bulk_hdl,
crt_bulk_opid_t bulk_opid)
{
D_ASSERT(arg->bulk_hdls_size <= arg->bulk_hdls_max_size);
if (arg->bulk_hdls_size == arg->bulk_hdls_max_size - 1) {
crt_bulk_t *new_hdls;
crt_bulk_opid_t *new_opids;

D_ALLOC_ARRAY(new_hdls, arg->bulk_hdls_size + BULK_HANDLE_MIN_SIZE);
if (new_hdls == NULL)
return -DER_NOMEM;

memcpy(new_hdls, arg->bulk_hdls, arg->bulk_hdls_size * sizeof(*arg->bulk_hdls));
if (arg->bulk_hdls_size > BULK_HANDLE_MIN_SIZE)
D_FREE(arg->bulk_hdls);
arg->bulk_hdls = new_hdls;

D_ALLOC_ARRAY(new_opids, arg->bulk_hdls_size + BULK_HANDLE_MIN_SIZE);
if (new_hdls == NULL)
return -DER_NOMEM;

memcpy(new_opids, arg->bulk_opids, arg->bulk_hdls_size * sizeof(*arg->bulk_opids));
if (arg->bulk_hdls_size > BULK_HANDLE_MIN_SIZE)
D_FREE(arg->bulk_opids);

arg->bulk_opids = new_opids;
arg->bulk_hdls_max_size = arg->bulk_hdls_size + BULK_HANDLE_MIN_SIZE;
}

/* Insert bulk_hdl(key) and bulk_opid(value) to the obj_bulk_args, so if the bulk transfer
* complete, the hdl and opid will be removed, otherwise the bulk will be aborted if the bulk
* transfer can not be finished in time.
*/
arg->bulk_hdls[arg->bulk_hdls_size] = bulk_hdl;
arg->bulk_opids[arg->bulk_hdls_size] = bulk_opid;
arg->bulk_hdls_size++;

return 0;
}

static void
obj_bulk_args_delete_hdl_opid(struct obj_bulk_args *arg, crt_bulk_t bulk_hdl)
{
int i;

D_ASSERT(arg->bulk_hdls_size <= arg->bulk_hdls_max_size);
for (i = 0; i < arg->bulk_hdls_size; i++) {
if (arg->bulk_hdls[i] == bulk_hdl) {
arg->bulk_hdls[i] = NULL;
arg->bulk_opids[i] = NULL;
return;
}
}
}

static int
obj_bulk_comp_cb(const struct crt_bulk_cb_info *cb_info)
{
Expand All @@ -257,30 +314,53 @@ obj_bulk_comp_cb(const struct crt_bulk_cb_info *cb_info)

D_ASSERT(arg->bulks_inflight > 0);
arg->bulks_inflight--;
if (arg->bulks_inflight == 0)
if (arg->bulks_inflight == 0 &&
!DAOS_FAIL_CHECK(DAOS_OBJ_FAIL_BULK_TIMEOUT))
ABT_eventual_set(arg->eventual, &arg->result,
sizeof(arg->result));

obj_bulk_args_delete_hdl_opid((struct obj_bulk_args *)cb_info->bci_arg,
bulk_desc->bd_local_hdl);
crt_req_decref(rpc);
return cb_info->bci_rc;
}

static void
obj_bulk_args_bulks_abort(crt_rpc_t *rpc, struct obj_bulk_args *arg)
{
int i;

D_ASSERT(arg->bulk_hdls_size <= arg->bulk_hdls_max_size);
for (i = 0; i < arg->bulk_hdls_size; i++) {
if (arg->bulk_opids[i] != NULL) {
D_DEBUG(DB_IO, "abort bulk %p/%p\n", arg->bulk_opids[i],
arg->bulk_hdls[i]);
D_ASSERT(arg->bulk_hdls[i] != NULL);
crt_bulk_abort(rpc->cr_ctx, arg->bulk_opids[i]);
}
}
}

static inline int
bulk_cp(const struct crt_bulk_cb_info *cb_info)
{
struct crt_bulk_desc *bulk_desc;
int rc;

rc = obj_bulk_comp_cb(cb_info);

bulk_desc = cb_info->bci_bulk_desc;
D_ASSERT(bulk_desc->bd_local_hdl != CRT_BULK_NULL);
crt_bulk_free(bulk_desc->bd_local_hdl);
bulk_desc->bd_local_hdl = CRT_BULK_NULL;

return obj_bulk_comp_cb(cb_info);
return rc;
}

static inline int
cached_bulk_cp(const struct crt_bulk_cb_info *cb_info)
{

return obj_bulk_comp_cb(cb_info);
}

Expand Down Expand Up @@ -444,6 +524,9 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk,
D_ASSERT(remote_size > remote_off);
if (length > (remote_size - remote_off)) {
rc = -DER_OVERFLOW;
if (!cached_bulk)
crt_bulk_free(local_bulk);

D_ERROR("Remote bulk isn't large enough. local_sz:%zu, remote_sz:%zu, "
"remote_off:%zu, "DF_RC"\n", length, remote_size, remote_off,
DP_RC(rc));
Expand Down Expand Up @@ -480,6 +563,10 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk,
}
remote_off += length;

/* Since there are no progress happening after crt_bulk_transfer, so cached_bulk_cp
* or bulk_cp should not be called yet.
*/
obj_bulk_args_insert_hdl_opid(p_arg, local_bulk, bulk_opid);
/* Give cart progress a chance to complete some in-flight bulk transfers */
if (bulk_iovs >= MAX_BULK_IOVS) {
bulk_iovs = 0;
Expand All @@ -496,10 +583,13 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul
int sgl_nr, int bulk_nr, struct obj_bulk_args *p_arg, struct ds_cont_hdl *coh)
{
struct obj_bulk_args arg = { 0 };
crt_bulk_t bulk_hdls[BULK_HANDLE_MIN_SIZE];
crt_bulk_opid_t bulk_opids[BULK_HANDLE_MIN_SIZE];
int i, rc, *status, ret;
int skip_nr = 0;
bool async = true;
uint64_t time = daos_get_ntime();
uint32_t timeout;

if (unlikely(sgl_nr > bulk_nr)) {
D_ERROR("Invalid sgl_nr vs bulk_nr: %d/%d\n", sgl_nr, bulk_nr);
Expand All @@ -520,6 +610,10 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul
if (rc != 0)
return dss_abterr2der(rc);

p_arg->bulk_hdls = bulk_hdls;
p_arg->bulk_opids = bulk_opids;
p_arg->bulk_hdls_max_size = BULK_HANDLE_MIN_SIZE;
p_arg->bulk_hdls_size = 0;
p_arg->inited = true;
D_DEBUG(DB_IO, "bulk_op %d, sgl_nr %d, bulk_nr %d\n", bulk_op, sgl_nr, bulk_nr);

Expand Down Expand Up @@ -594,9 +688,23 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul
if (async)
return rc;

ret = ABT_eventual_wait(p_arg->eventual, (void **)&status);
if (DAOS_FAIL_CHECK(DAOS_OBJ_FAIL_BULK_ABORT))
timeout = 0;
else
crt_req_get_timeout(rpc, &timeout);

ret = dss_eventual_timeout_wait(p_arg->eventual, (void **)&status, timeout, 1);
if (ret == -DER_TIMEDOUT) {
obj_bulk_args_bulks_abort(rpc, p_arg);
while (p_arg->bulks_inflight > 0) {
D_INFO("Wait for comp callback to be executed inflight %d\n",
p_arg->bulks_inflight);
dss_sleep(1);
}
}

if (rc == 0)
rc = ret ? dss_abterr2der(ret) : *status;
rc = ret ? ret : *status;

ABT_eventual_free(&p_arg->eventual);

Expand Down Expand Up @@ -625,6 +733,10 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul
*fbuffer += 0x2;
d_sgl_fini(&fsgl, false);
}
if (p_arg->bulk_hdls != bulk_hdls)
D_FREE(p_arg->bulk_hdls);
if (p_arg->bulk_opids != bulk_opids)
D_FREE(p_arg->bulk_opids);
return rc;
}

Expand Down
60 changes: 60 additions & 0 deletions src/tests/suite/daos_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -5367,6 +5367,64 @@ io_57(void **state)
reintegrate_single_pool_rank(arg, 0, false);
}

void
io_bulk_timeout(void **state)
{
daos_obj_id_t oid;
test_arg_t *arg = *state;
struct ioreq req;
char *fetch_buf;
char *update_buf;
int size =1048576;

oid = daos_test_oid_gen(arg->coh, dts_obj_class, 0, 0, arg->myrank);
ioreq_init(&req, arg->coh, oid, DAOS_IOD_ARRAY, arg);

D_ALLOC(fetch_buf, size);
assert_non_null(fetch_buf);
D_ALLOC(update_buf, size);
assert_non_null(update_buf);
dts_buf_render(update_buf, size);

print_message("timeout bulk\n");
if (arg->myrank == 0)
daos_debug_set_params(arg->group, -1, DMG_KEY_FAIL_LOC,
DAOS_OBJ_FAIL_BULK_TIMEOUT | DAOS_FAIL_ONCE,
0, NULL);

/** Insert */
insert_single("dkey", "akey", 0, update_buf, size, DAOS_TX_NONE, &req);

/** Lookup */
memset(fetch_buf, 0, size);
lookup_single("dkey", "akey", 0, fetch_buf, size, DAOS_TX_NONE, &req);

/** Verify data consistency */
assert_int_equal(req.iod[0].iod_size, size);
assert_memory_equal(update_buf, fetch_buf, size);

print_message("abort bulk\n");
if (arg->myrank == 0)
daos_debug_set_params(arg->group, -1, DMG_KEY_FAIL_LOC,
DAOS_OBJ_FAIL_BULK_ABORT | DAOS_FAIL_ONCE,
0, NULL);

/** Insert */
insert_single("dkey1", "akey", 0, update_buf, size, DAOS_TX_NONE, &req);

/** Lookup */
memset(fetch_buf, 0, size);
lookup_single("dkey1", "akey", 0, fetch_buf, size, DAOS_TX_NONE, &req);

/** Verify data consistency */
assert_int_equal(req.iod[0].iod_size, size);
assert_memory_equal(update_buf, fetch_buf, size);

D_FREE(update_buf);
D_FREE(fetch_buf);
ioreq_fini(&req);
}

static const struct CMUnitTest io_tests[] = {
{ "IO1: simple update/fetch/verify",
io_simple, async_disable, test_case_teardown},
Expand Down Expand Up @@ -5481,6 +5539,8 @@ static const struct CMUnitTest io_tests[] = {
io_56, async_disable, test_case_teardown},
{ "IO57: collective object query with rank_0 excluded",
io_57, rebuild_sub_rf1_setup, test_teardown},
{ "IO58: IO bulk timeout",
io_bulk_timeout, async_disable, test_case_teardown},
};

int
Expand Down

0 comments on commit ec0e9f0

Please sign in to comment.