Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-17089 cart: simplify progress callback registration #15867

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 22 additions & 107 deletions src/cart/crt_context.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -1252,7 +1253,7 @@ crt_context_timeout_check(struct crt_context *crt_ctx)
D_ASSERTF(d_list_empty(&rpc_priv->crp_tmp_link_timeout),
"already on timeout list\n");
d_list_add_tail(&rpc_priv->crp_tmp_link_timeout, &timeout_list);
};
}
D_MUTEX_UNLOCK(&crt_ctx->cc_mutex);

/* handle the timeout RPCs */
Expand Down Expand Up @@ -1776,39 +1777,6 @@ crt_context_empty(crt_provider_t provider, int locked)
return rc;
}

static int64_t
crt_exec_progress_cb(struct crt_context *ctx, int64_t timeout)
{
struct crt_prog_cb_priv *cbs_prog;
crt_progress_cb cb_func;
void *cb_args;
size_t cbs_size, i;
int ctx_idx;
int rc;

if (unlikely(crt_plugin_gdata.cpg_inited == 0 || ctx == NULL))
return timeout;

rc = crt_context_idx(ctx, &ctx_idx);
if (unlikely(rc)) {
D_ERROR("crt_context_idx() failed, rc: %d.\n", rc);
return timeout;
}

cbs_size = crt_plugin_gdata.cpg_prog_size[ctx_idx];
cbs_prog = crt_plugin_gdata.cpg_prog_cbs[ctx_idx];

for (i = 0; i < cbs_size; i++) {
cb_func = cbs_prog[i].cpcp_func;
cb_args = cbs_prog[i].cpcp_args;
/* check for and execute progress callbacks here */
if (cb_func != NULL)
timeout = cb_func(ctx, timeout, cb_args);
}

return timeout;
}

int
crt_progress_cond(crt_context_t crt_ctx, int64_t timeout,
crt_progress_cond_cb_t cond_cb, void *arg)
Expand Down Expand Up @@ -1858,7 +1826,8 @@ crt_progress_cond(crt_context_t crt_ctx, int64_t timeout,
/** loop until callback returns non-null value */
while ((rc = cond_cb(arg)) == 0) {
crt_context_timeout_check(ctx);
timeout = crt_exec_progress_cb(ctx, timeout);
if (ctx->cc_prog_cb != NULL)
timeout = ctx->cc_prog_cb(ctx, timeout, ctx->cc_prog_cb_arg);

if (timeout < 0) {
/**
Expand Down Expand Up @@ -1930,7 +1899,8 @@ crt_progress(crt_context_t crt_ctx, int64_t timeout)
* progress
*/
crt_context_timeout_check(ctx);
timeout = crt_exec_progress_cb(ctx, timeout);
if (ctx->cc_prog_cb != NULL)
timeout = ctx->cc_prog_cb(ctx, timeout, ctx->cc_prog_cb_arg);

if (timeout != 0 && (rc == 0 || rc == -DER_TIMEDOUT)) {
/** call progress once again with the real timeout */
Expand All @@ -1950,93 +1920,38 @@ crt_progress(crt_context_t crt_ctx, int64_t timeout)
int
crt_register_progress_cb(crt_progress_cb func, int ctx_idx, void *args)
{
struct crt_prog_cb_priv *cbs_prog;
size_t i, cbs_size;
int rc = 0;
struct crt_context *ctx;
int rc;

if (ctx_idx >= CRT_SRV_CONTEXT_NUM) {
D_ERROR("ctx_idx %d >= %d\n", ctx_idx, CRT_SRV_CONTEXT_NUM);
D_GOTO(out, rc = -DER_INVAL);
}

D_MUTEX_LOCK(&crt_plugin_gdata.cpg_mutex);

cbs_size = crt_plugin_gdata.cpg_prog_size[ctx_idx];
cbs_prog = crt_plugin_gdata.cpg_prog_cbs[ctx_idx];

for (i = 0; i < cbs_size; i++) {
if (cbs_prog[i].cpcp_func == func &&
cbs_prog[i].cpcp_args == args) {
D_GOTO(out_unlock, rc = -DER_EXIST);
}
}

for (i = 0; i < cbs_size; i++) {
if (cbs_prog[i].cpcp_func == NULL) {
cbs_prog[i].cpcp_args = args;
cbs_prog[i].cpcp_func = func;
D_GOTO(out_unlock, rc = 0);
}
D_GOTO(error, rc = -DER_INVAL);
}

D_FREE(crt_plugin_gdata.cpg_prog_cbs_old[ctx_idx]);

crt_plugin_gdata.cpg_prog_cbs_old[ctx_idx] = cbs_prog;
cbs_size += CRT_CALLBACKS_NUM;

D_ALLOC_ARRAY(cbs_prog, cbs_size);
if (cbs_prog == NULL) {
crt_plugin_gdata.cpg_prog_cbs_old[ctx_idx] = NULL;
D_GOTO(out_unlock, rc = -DER_NOMEM);
ctx = crt_context_lookup(ctx_idx);
if (ctx == NULL) {
D_ERROR("crt_context_lookup(%d) failed.\n", ctx_idx);
D_GOTO(error, rc = -DER_NONEXIST);
}

if (i > 0)
memcpy(cbs_prog, crt_plugin_gdata.cpg_prog_cbs_old[ctx_idx],
i * sizeof(*cbs_prog));
cbs_prog[i].cpcp_args = args;
cbs_prog[i].cpcp_func = func;
D_MUTEX_LOCK(&ctx->cc_mutex);
ctx->cc_prog_cb = func;
ctx->cc_prog_cb_arg = args;
D_MUTEX_UNLOCK(&ctx->cc_mutex);

crt_plugin_gdata.cpg_prog_cbs[ctx_idx] = cbs_prog;
crt_plugin_gdata.cpg_prog_size[ctx_idx] = cbs_size;
return 0;

out_unlock:
D_MUTEX_UNLOCK(&crt_plugin_gdata.cpg_mutex);
out:
error:
return rc;
}

int
crt_unregister_progress_cb(crt_progress_cb func, int ctx_idx, void *args)
{
struct crt_prog_cb_priv *cbs_prog;
size_t i, cbs_size;
int rc = -DER_NONEXIST;

if (ctx_idx >= CRT_SRV_CONTEXT_NUM) {
D_ERROR("ctx_idx %d >= %d\n", ctx_idx, CRT_SRV_CONTEXT_NUM);
D_GOTO(out, rc = -DER_INVAL);
}

D_MUTEX_LOCK(&crt_plugin_gdata.cpg_mutex);
(void)func;
(void)args;

cbs_size = crt_plugin_gdata.cpg_prog_size[ctx_idx];
cbs_prog = crt_plugin_gdata.cpg_prog_cbs[ctx_idx];

for (i = 0; i < cbs_size; i++) {
if (cbs_prog[i].cpcp_func == func &&
cbs_prog[i].cpcp_args == args) {
cbs_prog[i].cpcp_func = NULL;
cbs_prog[i].cpcp_args = NULL;
D_GOTO(out_unlock, rc = 0);
}
}

out_unlock:
D_FREE(crt_plugin_gdata.cpg_prog_cbs_old[ctx_idx]);

D_MUTEX_UNLOCK(&crt_plugin_gdata.cpg_mutex);
out:
return rc;
return crt_register_progress_cb(NULL, ctx_idx, NULL);
}

int
Expand Down
27 changes: 2 additions & 25 deletions src/cart/crt_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -364,29 +364,16 @@ data_init(int server, crt_init_options_t *opt)
static int
crt_plugin_init(void)
{
struct crt_prog_cb_priv *cbs_prog;
struct crt_event_cb_priv *cbs_event;
size_t cbs_size = CRT_CALLBACKS_NUM;
int i, rc;
int rc;

D_ASSERT(crt_plugin_gdata.cpg_inited == 0);

for (i = 0; i < CRT_SRV_CONTEXT_NUM; i++) {
crt_plugin_gdata.cpg_prog_cbs_old[i] = NULL;
D_ALLOC_ARRAY(cbs_prog, cbs_size);
if (cbs_prog == NULL) {
for (i--; i >= 0; i--)
D_FREE(crt_plugin_gdata.cpg_prog_cbs[i]);
D_GOTO(out, rc = -DER_NOMEM);
}
crt_plugin_gdata.cpg_prog_size[i] = cbs_size;
crt_plugin_gdata.cpg_prog_cbs[i] = cbs_prog;
}

crt_plugin_gdata.cpg_event_cbs_old = NULL;
D_ALLOC_ARRAY(cbs_event, cbs_size);
if (cbs_event == NULL) {
D_GOTO(out_destroy_prog, rc = -DER_NOMEM);
D_GOTO(out, rc = -DER_NOMEM);
}
crt_plugin_gdata.cpg_event_size = cbs_size;
crt_plugin_gdata.cpg_event_cbs = cbs_event;
Expand All @@ -400,27 +387,17 @@ crt_plugin_init(void)

out_destroy_event:
D_FREE(crt_plugin_gdata.cpg_event_cbs);
out_destroy_prog:
for (i = 0; i < CRT_SRV_CONTEXT_NUM; i++)
D_FREE(crt_plugin_gdata.cpg_prog_cbs[i]);
out:
return rc;
}

static void
crt_plugin_fini(void)
{
int i;

D_ASSERT(crt_plugin_gdata.cpg_inited == 1);

crt_plugin_gdata.cpg_inited = 0;

for (i = 0; i < CRT_SRV_CONTEXT_NUM; i++) {
D_FREE(crt_plugin_gdata.cpg_prog_cbs[i]);
D_FREE(crt_plugin_gdata.cpg_prog_cbs_old[i]);
}

D_FREE(crt_plugin_gdata.cpg_event_cbs);
D_FREE(crt_plugin_gdata.cpg_event_cbs_old);

Expand Down
13 changes: 4 additions & 9 deletions src/cart/crt_internal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,6 @@ struct crt_gdata {

extern struct crt_gdata crt_gdata;

struct crt_prog_cb_priv {
crt_progress_cb cpcp_func;
void *cpcp_args;
};

struct crt_event_cb_priv {
crt_event_cb cecp_func;
void *cecp_args;
Expand Down Expand Up @@ -355,10 +350,6 @@ crt_env_dump(void)

/* structure of global fault tolerance data */
struct crt_plugin_gdata {
/* list of progress callbacks */
size_t cpg_prog_size[CRT_SRV_CONTEXT_NUM];
struct crt_prog_cb_priv *cpg_prog_cbs[CRT_SRV_CONTEXT_NUM];
struct crt_prog_cb_priv *cpg_prog_cbs_old[CRT_SRV_CONTEXT_NUM];
/* list of event notification callbacks */
size_t cpg_event_size;
struct crt_event_cb_priv *cpg_event_cbs;
Expand Down Expand Up @@ -403,6 +394,10 @@ struct crt_context {
crt_rpc_task_t cc_rpc_cb; /** rpc callback */
crt_rpc_task_t cc_iv_resp_cb;

/* progress callback */
void *cc_prog_cb_arg;
crt_progress_cb cc_prog_cb;

/** RPC tracking */
/** in-flight endpoint tracking hash table */
struct d_hash_table cc_epi_table;
Expand Down
9 changes: 8 additions & 1 deletion src/tests/ftest/cart/utest/utest_swim.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* (C) Copyright 2020-2022 Intel Corporation.
* (C) Copyright 2020-2024 Intel Corporation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Intel copyright intentionally updated?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I had worked on it while we were at Intel so I guess it would count :D

* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand All @@ -22,11 +23,15 @@
static void
test_swim(void **state)
{
crt_context_t crt_ctx;
int rc;

rc = crt_init(NULL, CRT_FLAG_BIT_SERVER | CRT_FLAG_BIT_AUTO_SWIM_DISABLE);
assert_int_equal(rc, 0);

rc = crt_context_create(&crt_ctx);
assert_int_equal(rc, 0);

rc = crt_swim_init(0);
assert_int_equal(rc, 0);

Expand All @@ -46,6 +51,8 @@ test_swim(void **state)
assert_int_equal(rc, -DER_ALREADY);

crt_swim_fini();
rc = crt_context_destroy(crt_ctx, 0);
assert_int_equal(rc, 0);
rc = crt_finalize();
assert_int_equal(rc, 0);
}
Expand Down