Skip to content

Commit

Permalink
Merge branch 'master' into kccain/daos_14021_phase2
Browse files Browse the repository at this point in the history
  • Loading branch information
kccain committed Jan 10, 2024
2 parents c73aef0 + 9a2a674 commit c520327
Show file tree
Hide file tree
Showing 26 changed files with 219 additions and 164 deletions.
12 changes: 12 additions & 0 deletions src/container/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,18 @@ ds_cont_child_stop_all(struct ds_pool_child *pool_child)
}
}

void
ds_cont_child_reset_ec_agg_eph_all(struct ds_pool_child *pool_child)
{
struct ds_cont_child *cont_child;

D_DEBUG(DB_MD, DF_UUID"[%d]: reset all containers EC aggregate epoch.\n",
DP_UUID(pool_child->spc_uuid), dss_get_module_info()->dmi_tgt_id);

d_list_for_each_entry(cont_child, &pool_child->spc_cont_list, sc_link)
cont_child->sc_ec_agg_eph = cont_child->sc_ec_agg_eph_boundary;
}

static int
cont_child_start(struct ds_pool_child *pool_child, const uuid_t co_uuid,
bool *started, struct ds_cont_child **cont_out)
Expand Down
3 changes: 2 additions & 1 deletion src/include/daos_srv/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ void ds_cont_child_stop_all(struct ds_pool_child *pool_child);

int ds_cont_child_lookup(uuid_t pool_uuid, uuid_t cont_uuid,
struct ds_cont_child **ds_cont);

void
ds_cont_child_reset_ec_agg_eph_all(struct ds_pool_child *pool_child);
/** initialize a csummer based on container properties. Will retrieve the
* checksum related properties from IV
*/
Expand Down
133 changes: 82 additions & 51 deletions src/object/srv_ec_aggregate.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2020-2023 Intel Corporation.
* (C) Copyright 2020-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -89,6 +89,7 @@ struct ec_agg_par_extent {
struct ec_agg_stripe {
daos_off_t as_stripenum; /* ordinal of stripe, offset/(k*len) */
daos_epoch_t as_hi_epoch; /* highest epoch in stripe */
daos_epoch_t as_lo_epoch; /* lowest epoch in stripe */
d_list_t as_dextents; /* list of stripe's data extents */
daos_off_t as_stripe_fill; /* amount of stripe covered by data */
uint64_t as_offset; /* start offset in stripe */
Expand All @@ -114,6 +115,7 @@ struct ec_agg_entry {
struct pl_obj_layout *ae_obj_layout;
struct daos_shard_loc ae_peer_pshards[OBJ_EC_MAX_P];
uint32_t ae_grp_idx;
uint32_t ae_is_leader:1;
};

/* Parameters used to drive iterate all.
Expand All @@ -123,13 +125,13 @@ struct ec_agg_param {
struct ec_agg_entry ap_agg_entry; /* entry used for each OID */
daos_epoch_range_t ap_epr; /* hi/lo extent threshold */
daos_epoch_t ap_filter_eph; /* Aggregatable filter epoch */
daos_epoch_t ap_min_unagg_eph; /* minimum unaggregate epoch */
daos_handle_t ap_cont_handle; /* VOS container handle */
int (*ap_yield_func)(void *arg); /* yield function*/
void *ap_yield_arg; /* yield argument */
uint32_t ap_credits_max; /* # of tight loops to yield */
uint32_t ap_credits; /* # of tight loops */
uint32_t ap_initialized:1, /* initialized flag */
ap_obj_skipped:1; /* skipped obj during aggregation */
uint32_t ap_initialized:1; /* initialized flag */
};

/* Struct used to drive offloaded stripe update.
Expand Down Expand Up @@ -324,6 +326,7 @@ agg_clear_extents(struct ec_agg_entry *entry)
D_ASSERT(entry->ae_cur_stripe.as_extent_cnt == 0);
}
entry->ae_cur_stripe.as_hi_epoch = 0UL;
entry->ae_cur_stripe.as_lo_epoch = 0UL;
entry->ae_cur_stripe.as_stripe_fill = 0;
entry->ae_cur_stripe.as_has_holes = carry_is_hole ? true : false;
}
Expand Down Expand Up @@ -1858,7 +1861,13 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry)
* and all replica extents are newer than parity.
*/
if (ec_age_stripe_full(entry, ec_age_with_parity(entry))) {
rc = agg_encode_local_parity(entry);
if (entry->ae_is_leader) {
rc = agg_encode_local_parity(entry);
} else {
update_vos = false;
agg_param->ap_min_unagg_eph = min(agg_param->ap_min_unagg_eph,
entry->ae_cur_stripe.as_lo_epoch);
}
goto out;
}

Expand All @@ -1868,6 +1877,13 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry)
goto out;
}

if (!entry->ae_is_leader) {
update_vos = false;
agg_param->ap_min_unagg_eph = min(agg_param->ap_min_unagg_eph,
entry->ae_cur_stripe.as_lo_epoch);
goto out;
}

/* With parity and some newer partial replicas, possibly holes */
if (ec_age_with_hole(entry))
process_holes = true;
Expand Down Expand Up @@ -1951,13 +1967,19 @@ agg_extent_add(struct ec_agg_entry *agg_entry, vos_iter_entry_t *entry,
agg_in_stripe(agg_entry, recx);
}

if (agg_entry->ae_cur_stripe.as_lo_epoch == 0 ||
extent->ae_epoch < agg_entry->ae_cur_stripe.as_lo_epoch)
agg_entry->ae_cur_stripe.as_lo_epoch = extent->ae_epoch;

if (extent->ae_epoch > agg_entry->ae_cur_stripe.as_hi_epoch)
agg_entry->ae_cur_stripe.as_hi_epoch = extent->ae_epoch;

D_DEBUG(DB_TRACE, "adding extent "DF_RECX", to stripe %lu, shard: %u\n",
D_DEBUG(DB_TRACE, "adding extent "DF_RECX", to stripe %lu, shard: %u"
"max/min "DF_X64"/"DF_X64"\n",
DP_RECX(extent->ae_recx),
agg_stripenum(agg_entry, extent->ae_recx.rx_idx),
agg_entry->ae_oid.id_shard);
agg_entry->ae_oid.id_shard, agg_entry->ae_cur_stripe.as_hi_epoch,
agg_entry->ae_cur_stripe.as_lo_epoch);
out:
return rc;
}
Expand All @@ -1973,9 +1995,9 @@ agg_data_extent(struct ec_agg_param *agg_param, vos_iter_entry_t *entry,

D_ASSERT(!(entry->ie_recx.rx_idx & PARITY_INDICATOR));

D_DEBUG(DB_IO, DF_UOID" get recx "DF_RECX", %u\n",
D_DEBUG(DB_IO, DF_UOID" get recx "DF_RECX", "DF_X64"/%u leader %s\n",
DP_UOID(agg_entry->ae_oid), DP_RECX(entry->ie_recx),
entry->ie_minor_epc);
entry->ie_epoch, entry->ie_minor_epc, agg_entry->ae_is_leader ? "yes" : "no");

while (offset < end) {
daos_off_t this_stripenum;
Expand Down Expand Up @@ -2038,6 +2060,7 @@ agg_akey_post(daos_handle_t ih, struct ec_agg_param *agg_param,

agg_entry->ae_cur_stripe.as_stripenum = 0UL;
agg_entry->ae_cur_stripe.as_hi_epoch = 0UL;
agg_entry->ae_cur_stripe.as_lo_epoch = 0UL;
agg_entry->ae_cur_stripe.as_stripe_fill = 0UL;
agg_entry->ae_cur_stripe.as_offset = 0U;
}
Expand Down Expand Up @@ -2073,39 +2096,57 @@ agg_reset_pos(vos_iter_type_t type, struct ec_agg_entry *agg_entry)
}
}

static int
agg_shard_is_leader(struct ds_pool *pool, struct ec_agg_entry *agg_entry)
static bool
agg_shard_is_parity(struct ds_pool *pool, struct ec_agg_entry *agg_entry)
{
struct pl_obj_shard *shard;
struct daos_oclass_attr *oca;
uint32_t grp_idx;
uint32_t grp_start;
uint32_t ec_tgt_idx;
int shard_idx;
int rc;
uint32_t min_fseq = -1;
int leader_shard = -1;
int i;

oca = &agg_entry->ae_oca;
if (is_ec_data_shard_by_layout_ver(agg_entry->ae_oid.id_layout_ver,
agg_entry->ae_dkey_hash, oca,
agg_entry->ae_oid.id_shard)) {
agg_entry->ae_is_leader = 0;
return false;
}

grp_idx = agg_entry->ae_oid.id_shard / daos_oclass_grp_size(oca);
grp_start = grp_idx * daos_oclass_grp_size(oca);
ec_tgt_idx = obj_ec_shard_idx_by_layout_ver(agg_entry->ae_oid.id_layout_ver,
agg_entry->ae_dkey_hash, oca,
daos_oclass_grp_size(oca) - 1);
/**
* FIXME: only the last parity shard can be the EC agg leader. What about
* Degraded mode?
*/
if (agg_entry->ae_oid.id_shard != ec_tgt_idx + grp_start)
return 0;
grp_start = grp_idx * agg_entry->ae_obj_layout->ol_grp_size;
for (i = 0; i < obj_ec_parity_tgt_nr(oca); i++) {
uint32_t ec_tgt_idx;
uint32_t shard_idx;
struct pl_obj_shard *shard;

ec_tgt_idx = obj_ec_shard_idx_by_layout_ver(agg_entry->ae_oid.id_layout_ver,
agg_entry->ae_dkey_hash, oca,
daos_oclass_grp_size(oca) - i - 1);

shard_idx = grp_start + ec_tgt_idx;
shard = pl_obj_get_shard(agg_entry->ae_obj_layout, shard_idx);

/* If last parity unavailable, then skip the object via returning -DER_STALE. */
shard_idx = grp_idx * agg_entry->ae_obj_layout->ol_grp_size + ec_tgt_idx;
shard = pl_obj_get_shard(agg_entry->ae_obj_layout, shard_idx);
if (shard->po_target != -1 && shard->po_shard != -1 && !shard->po_rebuilding)
rc = (agg_entry->ae_oid.id_shard == shard->po_shard) ? 1 : 0;
if (shard->po_target == -1 || shard->po_shard == -1 || shard->po_rebuilding)
continue;

if (min_fseq == -1 || min_fseq > shard->po_fseq) {
leader_shard = shard_idx;
min_fseq = shard->po_fseq;
}
}

/* No parity shard is available */
if (leader_shard == -1)
return false;

if (agg_entry->ae_oid.id_shard == leader_shard)
agg_entry->ae_is_leader = 1;
else
rc = -DER_STALE;
agg_entry->ae_is_leader = 0;

return rc;
return true;
}

/* Initializes the struct holding the iteration state (ec_agg_entry). */
Expand All @@ -2129,8 +2170,6 @@ agg_dkey(daos_handle_t ih, vos_iter_entry_t *entry,
struct ec_agg_param *agg_param, struct ec_agg_entry *agg_entry,
unsigned int *acts)
{
int rc;

if (!agg_key_compare(agg_entry->ae_dkey, entry->ie_key)) {
D_DEBUG(DB_EPC, "Skip dkey: "DF_KEY" ec agg on re-probe\n",
DP_KEY(&entry->ie_key));
Expand All @@ -2144,24 +2183,16 @@ agg_dkey(daos_handle_t ih, vos_iter_entry_t *entry,
agg_entry->ae_dkey_hash = obj_dkey2hash(agg_entry->ae_oid.id_pub,
&agg_entry->ae_dkey);
agg_reset_pos(VOS_ITER_AKEY, agg_entry);
rc = agg_shard_is_leader(agg_param->ap_pool_info.api_pool, agg_entry);
if (rc == 1) {
D_DEBUG(DB_EPC, "oid:"DF_UOID":"DF_KEY" ec agg starting\n",
DP_UOID(agg_entry->ae_oid), DP_KEY(&agg_entry->ae_dkey));
if(agg_shard_is_parity(agg_param->ap_pool_info.api_pool, agg_entry)) {
D_DEBUG(DB_EPC, "oid:"DF_UOID":"DF_KEY" ec agg starting leader %s\n",
DP_UOID(agg_entry->ae_oid), DP_KEY(&agg_entry->ae_dkey),
agg_entry->ae_is_leader ? "yes" : "no");
agg_reset_dkey_entry(&agg_param->ap_agg_entry, entry);
rc = 0;
} else {
if (rc < 0) {
D_ERROR("oid:"DF_UOID" ds_pool_check_leader failed "
DF_RC"\n", DP_UOID(entry->ie_oid), DP_RC(rc));
if (rc == -DER_STALE)
agg_param->ap_obj_skipped = 1;
rc = 0;
}
*acts |= VOS_ITER_CB_SKIP;
}

return rc;
return 0;
}

/* Handles akeys returned by the iterator. */
Expand Down Expand Up @@ -2625,7 +2656,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,

agg_reset_entry(&ec_agg_param->ap_agg_entry, NULL, NULL);

ec_agg_param->ap_obj_skipped = 0;
ec_agg_param->ap_min_unagg_eph = DAOS_EPOCH_MAX;
rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors,
agg_iterate_pre_cb, agg_iterate_post_cb, ec_agg_param, NULL);

Expand All @@ -2637,8 +2668,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
ec_agg_param->ap_agg_entry.ae_obj_hdl = DAOS_HDL_INVAL;
}

if (ec_agg_param->ap_obj_skipped && !cont->sc_stopping) {
D_DEBUG(DB_EPC, "with skipped obj during aggregation.\n");
if (cont->sc_pool->spc_pool->sp_rebuilding > 0 && !cont->sc_stopping) {
/* There is rebuild going on, and we can't proceed EC aggregate boundary,
* Let's wait for 5 seconds for another EC aggregation.
*/
Expand All @@ -2649,7 +2679,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
vos_aggregate_exit(cont->sc_hdl);

update_hae:
if (rc == 0 && ec_agg_param->ap_obj_skipped == 0) {
if (rc == 0) {
cont->sc_ec_agg_eph = max(cont->sc_ec_agg_eph, epr->epr_hi);
if (!cont->sc_stopping && cont->sc_ec_query_agg_eph) {
uint64_t orig, cur;
Expand All @@ -2662,7 +2692,8 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
DP_CONT(cont->sc_pool_uuid, cont->sc_uuid),
orig, cur, cur - orig);

*cont->sc_ec_query_agg_eph = cont->sc_ec_agg_eph;
*cont->sc_ec_query_agg_eph = min(ec_agg_param->ap_min_unagg_eph,
cont->sc_ec_agg_eph);
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/object/srv_obj_migrate.c
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,12 @@ __migrate_fetch_update_parity(struct migrate_one *mrone, daos_handle_t oh,

offset = iods[i].iod_recxs[0].rx_idx;
size = iods[i].iod_recxs[0].rx_nr;
parity_eph = ephs[i][0];
/* Use stable epoch for partial parity update to make sure
* these partial updates are not below stable epoch boundary,
* otherwise both EC and VOS aggregation might operate on
* the same recxs.
*/
parity_eph = encode ? ephs[i][0] : mrone->mo_epoch;
tmp_iod = iods[i];
ptr = iov[i].iov_buf;
for (j = 1; j < iods[i].iod_nr; j++) {
Expand Down
1 change: 1 addition & 0 deletions src/pool/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,7 @@ update_child_map(void *data)
return 0;
}

ds_cont_child_reset_ec_agg_eph_all(child);
child->spc_map_version = pool->sp_map_version;
ds_pool_child_put(child);
return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/tests/ftest/daos_test/suite.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ timeouts:
test_daos_extend_simple: 3600
test_daos_oid_allocator: 640
test_daos_checksum: 500
test_daos_rebuild_ec: 6400
test_daos_rebuild_ec: 7200
test_daos_aggregate_ec: 200
test_daos_degraded_ec: 1900
test_daos_dedup: 220
Expand Down
Loading

0 comments on commit c520327

Please sign in to comment.