Skip to content

Commit

Permalink
DAOS-16951 Correct discard logic
Browse files Browse the repository at this point in the history
- disregard whether DAE is aborted/aborting
- discard inline records first

Priority: 2
Required-githooks: true

Signed-off-by: Jan Michalski <[email protected]>
  • Loading branch information
janekmi committed Jan 29, 2025
1 parent 032a661 commit 70de8dc
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 73 deletions.
74 changes: 28 additions & 46 deletions src/dtx/tests/dts_discard_invalid.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,39 +139,18 @@ test_missing_things(void **unused)
int discarded = 0;
int rc;

/* 1. Missing arguments. */
/* Missing arguments. */
expect_assert_failure(vos_dtx_discard_invalid(hdl_null, NULL, NULL));
expect_assert_failure(vos_dtx_discard_invalid(Coh, NULL, NULL));
expect_assert_failure(vos_dtx_discard_invalid(Coh, DTX_ID_PTR, NULL));
expect_assert_failure(vos_dtx_discard_invalid(Coh, NULL, &discarded));

/* 2. DAE not in the DTX active table. */
/* DAE not in the DTX active table. */
will_return(__wrap_dbtree_lookup, NULL);
rc = vos_dtx_discard_invalid(Coh, DTX_ID_PTR, &discarded);
assert_int_equal(rc, DBTREE_LOOKUP_ERROR_RC);
}

static void
test_aborted_dae(void **unused)
{
int discarded = 0;
int rc;

/* 3.a Aborting. */
memset(&Dae, 0, sizeof(Dae));
Dae.dae_aborting = 1;
will_return(__wrap_dbtree_lookup, &Dae);
rc = vos_dtx_discard_invalid(Coh, DTX_ID_PTR, &discarded);
assert_int_equal(rc, 0);

/* 3.b Aborted. */
memset(&Dae, 0, sizeof(Dae));
Dae.dae_aborted = 1;
will_return(__wrap_dbtree_lookup, &Dae);
rc = vos_dtx_discard_invalid(Coh, DTX_ID_PTR, &discarded);
assert_int_equal(rc, 0);
}

struct rec_valid {
enum vos_dtx_record_types type;
bool valid;
Expand Down Expand Up @@ -259,7 +238,7 @@ test_tx_begin_fail(void **unused)
int discarded = 0;
int rc;

/* 4. tx_begin() fails. */
/* tx_begin() fails. */
will_return(__wrap_dbtree_lookup, &Dae);
will_return(tx_begin, TX_ERROR_RC);
rc = vos_dtx_discard_invalid(Coh, DTX_ID_PTR, &discarded);
Expand All @@ -272,7 +251,7 @@ test_tx_abort_fail(void **unused)
int discarded = 0;
int rc;

/* 5. tx_abort() (when nothing to commit) fails. */
/* tx_abort() (when nothing to commit) fails. */
will_return(__wrap_dbtree_lookup, &Dae);
will_return(tx_begin, 0);
expect_value(tx_abort, error, 0);
Expand All @@ -289,7 +268,7 @@ test_tx_add_ptr_inline_fail(void **unused)
int discarded = 0;
int rc;

/* 6a. tx_add_ptr() for inline records fails. */
/* tx_add_ptr() for inline records fails. */
will_return(__wrap_dbtree_lookup, &Dae);
will_return(tx_begin, 0);
prep_records_inline(One_rec, ARRAY_SIZE(One_rec));
Expand All @@ -305,7 +284,7 @@ test_tx_add_ptr_noninline_fail(void **unused)
int discarded = 0;
int rc;

/* 6b. tx_add_ptr() for non-inline records fails. */
/* tx_add_ptr() for non-inline records fails. */
will_return(__wrap_dbtree_lookup, &Dae);
will_return(tx_begin, 0);
prep_records_noninline(One_rec, ARRAY_SIZE(One_rec));
Expand All @@ -321,7 +300,7 @@ test_tx_commit_fail(void **unused)
int discarded = 0;
int rc;

/* 7. tx_commit() fails. */
/* tx_commit() fails. */
will_return(__wrap_dbtree_lookup, &Dae);
will_return(tx_begin, 0);
prep_records_noninline(One_rec, ARRAY_SIZE(One_rec));
Expand Down Expand Up @@ -351,7 +330,7 @@ test_discard_inline_all(void **unused)
int discarded = 0;
int rc;

/* 8. discard all inline records at once */
/* discard all inline records at once */
will_return(__wrap_dbtree_lookup, &Dae);
will_return(tx_begin, 0);
prep_records_inline(recs, ARRAY_SIZE(recs));
Expand Down Expand Up @@ -414,7 +393,7 @@ discard_inline_one_execute(struct rec_valid *recs, int num)
static void
test_discard_inline_one(void **unused)
{
/* 9. discard just one inline record */
/* discard just one inline record */
prep_discard_one_common(discard_inline_one_execute);
}

Expand All @@ -431,7 +410,7 @@ test_discard_noninline_all(void **unused)
int discarded = 0;
int rc;

/* 10. discard all noninline records at once */
/* discard all noninline records at once */
will_return(__wrap_dbtree_lookup, &Dae);
will_return(tx_begin, 0);
prep_records_noninline(recs, ARRAY_SIZE(recs));
Expand Down Expand Up @@ -465,7 +444,7 @@ discard_noninline_one_execute(struct rec_valid *recs, int num)
static void
test_discard_noninline_one(void **unused)
{
/* 11. discard just one noninline record */
/* discard just one noninline record */
prep_discard_one_common(discard_noninline_one_execute);
}

Expand Down Expand Up @@ -500,22 +479,20 @@ test_discard_rand(void **unused)
will_return(__wrap_dbtree_lookup, &Dae);
will_return(tx_begin, 0);

/* Note: The nonline records are processed first hence they have to be initialized first as
/* Note: The inline records are processed first hence they have to be initialized first as
* well. */
call_tx_add_ptr = prep_records_inline(recs, min(num, DTX_INLINE_REC_CNT));
if (call_tx_add_ptr) {
will_return(tx_add_ptr, 0);
}

if (num > DTX_INLINE_REC_CNT) {
call_tx_add_ptr =
prep_records_noninline(&recs[DTX_INLINE_REC_CNT], num - DTX_INLINE_REC_CNT);
if (call_tx_add_ptr) {
will_return(tx_add_ptr, 0);
}
}
call_tx_add_ptr = prep_records_inline(recs, min(num, DTX_INLINE_REC_CNT));
if (call_tx_add_ptr) {
will_return(tx_add_ptr, 0);
}

/* Account for both inline and noninline records. */
Dae.dae_base.dae_rec_cnt = num;

will_return(tx_commit, 0);
rc = vos_dtx_discard_invalid(Coh, DTX_ID_PTR, &discarded);
Expand Down Expand Up @@ -585,12 +562,17 @@ teardown_cont(void **unused)
}

static const struct CMUnitTest discard_invalid_tests_all[] = {
TEST("DTX400", missing_things), TEST("DTX401", aborted_dae),
TEST("DTX402", tx_begin_fail), TEST("DTX403", tx_abort_fail),
TEST("DTX404", tx_add_ptr_inline_fail), TEST("DTX405", tx_add_ptr_noninline_fail),
TEST("DTX406", tx_commit_fail), TEST("DTX407", discard_inline_all),
TEST("DTX408", discard_inline_one), TEST("DTX409", discard_noninline_all),
TEST("DTX410", discard_noninline_one), TEST("DTX411", discard_rand),
TEST("DTX400", missing_things),
TEST("DTX401", tx_begin_fail),
TEST("DTX402", tx_abort_fail),
TEST("DTX403", tx_add_ptr_inline_fail),
TEST("DTX404", tx_add_ptr_noninline_fail),
TEST("DTX405", tx_commit_fail),
TEST("DTX406", discard_inline_all),
TEST("DTX407", discard_inline_one),
TEST("DTX408", discard_noninline_all),
TEST("DTX409", discard_noninline_one),
TEST("DTX410", discard_rand),
};

int
Expand Down
45 changes: 18 additions & 27 deletions src/vos/vos_dtx.c
Original file line number Diff line number Diff line change
Expand Up @@ -2458,10 +2458,26 @@ vos_dtx_discard_invalid_internal(struct vos_container *cont, struct vos_dtx_act_
struct umem_instance *umm = vos_cont2umm(cont);
int discarded_noninline = 0;
int discarded_inline = 0;
int count;
int count = min(DAE_REC_CNT(dae), DTX_INLINE_REC_CNT);
int i;

/* go through the non-inlined records first if present */
/* go through the inlined records */
for (i = 0; i < count; i++) {
do_dtx_rec_discard_invalid(umm, dae, &DAE_REC_INLINE(dae)[i], &discarded_inline);
}

if (discarded_inline > 0) {
/* copy the whole array to durable format */
struct vos_dtx_act_ent_df *dae_df = umem_off2ptr(umm, dae->dae_df_off);
size_t size = sizeof(umem_off_t) * count;
int rc = umem_tx_add_ptr(umm, &dae_df->dae_rec_inline, size);
if (rc != 0) {
return rc;
}
memcpy(&dae_df->dae_rec_inline, &DAE_REC_INLINE(dae), size);
}

/* go through the non-inlined records if present */
if (dae->dae_records != NULL) {
D_ASSERT(DAE_REC_CNT(dae) > DTX_INLINE_REC_CNT);

Expand All @@ -2481,26 +2497,6 @@ vos_dtx_discard_invalid_internal(struct vos_container *cont, struct vos_dtx_act_
}
memcpy(rec_df, dae->dae_records, size);
}

count = DTX_INLINE_REC_CNT;
} else {
count = DAE_REC_CNT(dae);
}

/* go through the inlined records */
for (i = 0; i < count; i++) {
do_dtx_rec_discard_invalid(umm, dae, &DAE_REC_INLINE(dae)[i], &discarded_inline);
}

if (discarded_inline > 0) {
/* copy the whole array to durable format */
struct vos_dtx_act_ent_df *dae_df = umem_off2ptr(umm, dae->dae_df_off);
size_t size = sizeof(umem_off_t) * count;
int rc = umem_tx_add_ptr(umm, &dae_df->dae_rec_inline, size);
if (rc != 0) {
return rc;
}
memcpy(&dae_df->dae_rec_inline, &DAE_REC_INLINE(dae), size);
}

*discarded = discarded_inline + discarded_noninline;
Expand Down Expand Up @@ -2532,11 +2528,6 @@ vos_dtx_discard_invalid(daos_handle_t coh, struct dtx_id *dti, int *discarded)
}
dae = riov.iov_buf;

if (vos_dae_is_abort(dae)) {
/* Only not aborted transactions are considered. */
return 0;
}

rc = umem_tx_begin(vos_cont2umm(cont), NULL);
if (rc == 0) {
rc = vos_dtx_discard_invalid_internal(cont, dae, discarded);
Expand Down

0 comments on commit 70de8dc

Please sign in to comment.