diff --git a/src/container/srv_target.c b/src/container/srv_target.c index 80f924327b8..83b1846bead 100644 --- a/src/container/srv_target.c +++ b/src/container/srv_target.c @@ -850,6 +850,18 @@ ds_cont_child_stop_all(struct ds_pool_child *pool_child) } } +void +ds_cont_child_reset_ec_agg_eph_all(struct ds_pool_child *pool_child) +{ + struct ds_cont_child *cont_child; + + D_DEBUG(DB_MD, DF_UUID"[%d]: reset all containers EC aggregate epoch.\n", + DP_UUID(pool_child->spc_uuid), dss_get_module_info()->dmi_tgt_id); + + d_list_for_each_entry(cont_child, &pool_child->spc_cont_list, sc_link) + cont_child->sc_ec_agg_eph = cont_child->sc_ec_agg_eph_boundary; +} + static int cont_child_start(struct ds_pool_child *pool_child, const uuid_t co_uuid, bool *started, struct ds_cont_child **cont_out) diff --git a/src/include/daos_srv/container.h b/src/include/daos_srv/container.h index bbbb848d444..daa9184e707 100644 --- a/src/include/daos_srv/container.h +++ b/src/include/daos_srv/container.h @@ -190,7 +190,8 @@ void ds_cont_child_stop_all(struct ds_pool_child *pool_child); int ds_cont_child_lookup(uuid_t pool_uuid, uuid_t cont_uuid, struct ds_cont_child **ds_cont); - +void +ds_cont_child_reset_ec_agg_eph_all(struct ds_pool_child *pool_child); /** initialize a csummer based on container properties. Will retrieve the * checksum related properties from IV */ diff --git a/src/object/srv_ec_aggregate.c b/src/object/srv_ec_aggregate.c index 1c603297b51..71aedeca895 100644 --- a/src/object/srv_ec_aggregate.c +++ b/src/object/srv_ec_aggregate.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2020-2023 Intel Corporation. + * (C) Copyright 2020-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -89,6 +89,7 @@ struct ec_agg_par_extent { struct ec_agg_stripe { daos_off_t as_stripenum; /* ordinal of stripe, offset/(k*len) */ daos_epoch_t as_hi_epoch; /* highest epoch in stripe */ + daos_epoch_t as_lo_epoch; /* lowest epoch in stripe */ d_list_t as_dextents; /* list of stripe's data extents */ daos_off_t as_stripe_fill; /* amount of stripe covered by data */ uint64_t as_offset; /* start offset in stripe */ @@ -114,6 +115,7 @@ struct ec_agg_entry { struct pl_obj_layout *ae_obj_layout; struct daos_shard_loc ae_peer_pshards[OBJ_EC_MAX_P]; uint32_t ae_grp_idx; + uint32_t ae_is_leader:1; }; /* Parameters used to drive iterate all. @@ -123,13 +125,13 @@ struct ec_agg_param { struct ec_agg_entry ap_agg_entry; /* entry used for each OID */ daos_epoch_range_t ap_epr; /* hi/lo extent threshold */ daos_epoch_t ap_filter_eph; /* Aggregatable filter epoch */ + daos_epoch_t ap_min_unagg_eph; /* minimum unaggregate epoch */ daos_handle_t ap_cont_handle; /* VOS container handle */ int (*ap_yield_func)(void *arg); /* yield function*/ void *ap_yield_arg; /* yield argument */ uint32_t ap_credits_max; /* # of tight loops to yield */ uint32_t ap_credits; /* # of tight loops */ - uint32_t ap_initialized:1, /* initialized flag */ - ap_obj_skipped:1; /* skipped obj during aggregation */ + uint32_t ap_initialized:1; /* initialized flag */ }; /* Struct used to drive offloaded stripe update. @@ -324,6 +326,7 @@ agg_clear_extents(struct ec_agg_entry *entry) D_ASSERT(entry->ae_cur_stripe.as_extent_cnt == 0); } entry->ae_cur_stripe.as_hi_epoch = 0UL; + entry->ae_cur_stripe.as_lo_epoch = 0UL; entry->ae_cur_stripe.as_stripe_fill = 0; entry->ae_cur_stripe.as_has_holes = carry_is_hole ? true : false; } @@ -1858,7 +1861,13 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry) * and all replica extents are newer than parity. */ if (ec_age_stripe_full(entry, ec_age_with_parity(entry))) { - rc = agg_encode_local_parity(entry); + if (entry->ae_is_leader) { + rc = agg_encode_local_parity(entry); + } else { + update_vos = false; + agg_param->ap_min_unagg_eph = min(agg_param->ap_min_unagg_eph, + entry->ae_cur_stripe.as_lo_epoch); + } goto out; } @@ -1868,6 +1877,13 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry) goto out; } + if (!entry->ae_is_leader) { + update_vos = false; + agg_param->ap_min_unagg_eph = min(agg_param->ap_min_unagg_eph, + entry->ae_cur_stripe.as_lo_epoch); + goto out; + } + /* With parity and some newer partial replicas, possibly holes */ if (ec_age_with_hole(entry)) process_holes = true; @@ -1951,13 +1967,19 @@ agg_extent_add(struct ec_agg_entry *agg_entry, vos_iter_entry_t *entry, agg_in_stripe(agg_entry, recx); } + if (agg_entry->ae_cur_stripe.as_lo_epoch == 0 || + extent->ae_epoch < agg_entry->ae_cur_stripe.as_lo_epoch) + agg_entry->ae_cur_stripe.as_lo_epoch = extent->ae_epoch; + if (extent->ae_epoch > agg_entry->ae_cur_stripe.as_hi_epoch) agg_entry->ae_cur_stripe.as_hi_epoch = extent->ae_epoch; - D_DEBUG(DB_TRACE, "adding extent "DF_RECX", to stripe %lu, shard: %u\n", + D_DEBUG(DB_TRACE, "adding extent "DF_RECX", to stripe %lu, shard: %u" + "max/min "DF_X64"/"DF_X64"\n", DP_RECX(extent->ae_recx), agg_stripenum(agg_entry, extent->ae_recx.rx_idx), - agg_entry->ae_oid.id_shard); + agg_entry->ae_oid.id_shard, agg_entry->ae_cur_stripe.as_hi_epoch, + agg_entry->ae_cur_stripe.as_lo_epoch); out: return rc; } @@ -1973,9 +1995,9 @@ agg_data_extent(struct ec_agg_param *agg_param, vos_iter_entry_t *entry, D_ASSERT(!(entry->ie_recx.rx_idx & PARITY_INDICATOR)); - D_DEBUG(DB_IO, DF_UOID" get recx "DF_RECX", %u\n", + D_DEBUG(DB_IO, DF_UOID" get recx "DF_RECX", "DF_X64"/%u leader %s\n", DP_UOID(agg_entry->ae_oid), DP_RECX(entry->ie_recx), - entry->ie_minor_epc); + entry->ie_epoch, entry->ie_minor_epc, agg_entry->ae_is_leader ? "yes" : "no"); while (offset < end) { daos_off_t this_stripenum; @@ -2038,6 +2060,7 @@ agg_akey_post(daos_handle_t ih, struct ec_agg_param *agg_param, agg_entry->ae_cur_stripe.as_stripenum = 0UL; agg_entry->ae_cur_stripe.as_hi_epoch = 0UL; + agg_entry->ae_cur_stripe.as_lo_epoch = 0UL; agg_entry->ae_cur_stripe.as_stripe_fill = 0UL; agg_entry->ae_cur_stripe.as_offset = 0U; } @@ -2073,39 +2096,57 @@ agg_reset_pos(vos_iter_type_t type, struct ec_agg_entry *agg_entry) } } -static int -agg_shard_is_leader(struct ds_pool *pool, struct ec_agg_entry *agg_entry) +static bool +agg_shard_is_parity(struct ds_pool *pool, struct ec_agg_entry *agg_entry) { - struct pl_obj_shard *shard; struct daos_oclass_attr *oca; uint32_t grp_idx; uint32_t grp_start; - uint32_t ec_tgt_idx; - int shard_idx; - int rc; + uint32_t min_fseq = -1; + int leader_shard = -1; + int i; oca = &agg_entry->ae_oca; + if (is_ec_data_shard_by_layout_ver(agg_entry->ae_oid.id_layout_ver, + agg_entry->ae_dkey_hash, oca, + agg_entry->ae_oid.id_shard)) { + agg_entry->ae_is_leader = 0; + return false; + } + grp_idx = agg_entry->ae_oid.id_shard / daos_oclass_grp_size(oca); - grp_start = grp_idx * daos_oclass_grp_size(oca); - ec_tgt_idx = obj_ec_shard_idx_by_layout_ver(agg_entry->ae_oid.id_layout_ver, - agg_entry->ae_dkey_hash, oca, - daos_oclass_grp_size(oca) - 1); - /** - * FIXME: only the last parity shard can be the EC agg leader. What about - * Degraded mode? - */ - if (agg_entry->ae_oid.id_shard != ec_tgt_idx + grp_start) - return 0; + grp_start = grp_idx * agg_entry->ae_obj_layout->ol_grp_size; + for (i = 0; i < obj_ec_parity_tgt_nr(oca); i++) { + uint32_t ec_tgt_idx; + uint32_t shard_idx; + struct pl_obj_shard *shard; + + ec_tgt_idx = obj_ec_shard_idx_by_layout_ver(agg_entry->ae_oid.id_layout_ver, + agg_entry->ae_dkey_hash, oca, + daos_oclass_grp_size(oca) - i - 1); + + shard_idx = grp_start + ec_tgt_idx; + shard = pl_obj_get_shard(agg_entry->ae_obj_layout, shard_idx); - /* If last parity unavailable, then skip the object via returning -DER_STALE. */ - shard_idx = grp_idx * agg_entry->ae_obj_layout->ol_grp_size + ec_tgt_idx; - shard = pl_obj_get_shard(agg_entry->ae_obj_layout, shard_idx); - if (shard->po_target != -1 && shard->po_shard != -1 && !shard->po_rebuilding) - rc = (agg_entry->ae_oid.id_shard == shard->po_shard) ? 1 : 0; + if (shard->po_target == -1 || shard->po_shard == -1 || shard->po_rebuilding) + continue; + + if (min_fseq == -1 || min_fseq > shard->po_fseq) { + leader_shard = shard_idx; + min_fseq = shard->po_fseq; + } + } + + /* No parity shard is available */ + if (leader_shard == -1) + return false; + + if (agg_entry->ae_oid.id_shard == leader_shard) + agg_entry->ae_is_leader = 1; else - rc = -DER_STALE; + agg_entry->ae_is_leader = 0; - return rc; + return true; } /* Initializes the struct holding the iteration state (ec_agg_entry). */ @@ -2129,8 +2170,6 @@ agg_dkey(daos_handle_t ih, vos_iter_entry_t *entry, struct ec_agg_param *agg_param, struct ec_agg_entry *agg_entry, unsigned int *acts) { - int rc; - if (!agg_key_compare(agg_entry->ae_dkey, entry->ie_key)) { D_DEBUG(DB_EPC, "Skip dkey: "DF_KEY" ec agg on re-probe\n", DP_KEY(&entry->ie_key)); @@ -2144,24 +2183,16 @@ agg_dkey(daos_handle_t ih, vos_iter_entry_t *entry, agg_entry->ae_dkey_hash = obj_dkey2hash(agg_entry->ae_oid.id_pub, &agg_entry->ae_dkey); agg_reset_pos(VOS_ITER_AKEY, agg_entry); - rc = agg_shard_is_leader(agg_param->ap_pool_info.api_pool, agg_entry); - if (rc == 1) { - D_DEBUG(DB_EPC, "oid:"DF_UOID":"DF_KEY" ec agg starting\n", - DP_UOID(agg_entry->ae_oid), DP_KEY(&agg_entry->ae_dkey)); + if(agg_shard_is_parity(agg_param->ap_pool_info.api_pool, agg_entry)) { + D_DEBUG(DB_EPC, "oid:"DF_UOID":"DF_KEY" ec agg starting leader %s\n", + DP_UOID(agg_entry->ae_oid), DP_KEY(&agg_entry->ae_dkey), + agg_entry->ae_is_leader ? "yes" : "no"); agg_reset_dkey_entry(&agg_param->ap_agg_entry, entry); - rc = 0; } else { - if (rc < 0) { - D_ERROR("oid:"DF_UOID" ds_pool_check_leader failed " - DF_RC"\n", DP_UOID(entry->ie_oid), DP_RC(rc)); - if (rc == -DER_STALE) - agg_param->ap_obj_skipped = 1; - rc = 0; - } *acts |= VOS_ITER_CB_SKIP; } - return rc; + return 0; } /* Handles akeys returned by the iterator. */ @@ -2625,7 +2656,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, agg_reset_entry(&ec_agg_param->ap_agg_entry, NULL, NULL); - ec_agg_param->ap_obj_skipped = 0; + ec_agg_param->ap_min_unagg_eph = DAOS_EPOCH_MAX; rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors, agg_iterate_pre_cb, agg_iterate_post_cb, ec_agg_param, NULL); @@ -2637,8 +2668,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, ec_agg_param->ap_agg_entry.ae_obj_hdl = DAOS_HDL_INVAL; } - if (ec_agg_param->ap_obj_skipped && !cont->sc_stopping) { - D_DEBUG(DB_EPC, "with skipped obj during aggregation.\n"); + if (cont->sc_pool->spc_pool->sp_rebuilding > 0 && !cont->sc_stopping) { /* There is rebuild going on, and we can't proceed EC aggregate boundary, * Let's wait for 5 seconds for another EC aggregation. */ @@ -2649,7 +2679,7 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, vos_aggregate_exit(cont->sc_hdl); update_hae: - if (rc == 0 && ec_agg_param->ap_obj_skipped == 0) { + if (rc == 0) { cont->sc_ec_agg_eph = max(cont->sc_ec_agg_eph, epr->epr_hi); if (!cont->sc_stopping && cont->sc_ec_query_agg_eph) { uint64_t orig, cur; @@ -2662,7 +2692,8 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, DP_CONT(cont->sc_pool_uuid, cont->sc_uuid), orig, cur, cur - orig); - *cont->sc_ec_query_agg_eph = cont->sc_ec_agg_eph; + *cont->sc_ec_query_agg_eph = min(ec_agg_param->ap_min_unagg_eph, + cont->sc_ec_agg_eph); } } diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 70e75b15ad5..70ce7973553 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -996,7 +996,12 @@ __migrate_fetch_update_parity(struct migrate_one *mrone, daos_handle_t oh, offset = iods[i].iod_recxs[0].rx_idx; size = iods[i].iod_recxs[0].rx_nr; - parity_eph = ephs[i][0]; + /* Use stable epoch for partial parity update to make sure + * these partial updates are not below stable epoch boundary, + * otherwise both EC and VOS aggregation might operate on + * the same recxs. + */ + parity_eph = encode ? ephs[i][0] : mrone->mo_epoch; tmp_iod = iods[i]; ptr = iov[i].iov_buf; for (j = 1; j < iods[i].iod_nr; j++) { diff --git a/src/pool/srv_target.c b/src/pool/srv_target.c index 4d459405008..8dce5ae8661 100644 --- a/src/pool/srv_target.c +++ b/src/pool/srv_target.c @@ -1614,6 +1614,7 @@ update_child_map(void *data) return 0; } + ds_cont_child_reset_ec_agg_eph_all(child); child->spc_map_version = pool->sp_map_version; ds_pool_child_put(child); return 0; diff --git a/src/tests/ftest/daos_test/suite.yaml b/src/tests/ftest/daos_test/suite.yaml index f641b3a621d..c1bc395c2b0 100644 --- a/src/tests/ftest/daos_test/suite.yaml +++ b/src/tests/ftest/daos_test/suite.yaml @@ -27,7 +27,7 @@ timeouts: test_daos_extend_simple: 3600 test_daos_oid_allocator: 640 test_daos_checksum: 500 - test_daos_rebuild_ec: 6400 + test_daos_rebuild_ec: 7200 test_daos_aggregate_ec: 200 test_daos_degraded_ec: 1900 test_daos_dedup: 220 diff --git a/src/tests/ftest/deployment/ior_per_rank.py b/src/tests/ftest/deployment/ior_per_rank.py index f914216f326..5a8463cb940 100644 --- a/src/tests/ftest/deployment/ior_per_rank.py +++ b/src/tests/ftest/deployment/ior_per_rank.py @@ -5,6 +5,7 @@ """ from avocado.core.exceptions import TestFail +from ClusterShell.NodeSet import NodeSet from general_utils import DaosTestError, percent_change from ior_test_base import IorTestBase from ior_utils import IorCommand, IorMetrics @@ -32,53 +33,52 @@ def execute_ior_per_rank(self, rank): self.log.info("Running Test on rank: %s", rank) # create the pool on specified rank. self.add_pool(connect=False, target_list=[rank]) + self.container = self.get_container(self.pool) + + host = self.server_managers[0].get_host(rank) + + # execute ior on given rank and collect the results + try: + self.ior_cmd.flags.update(self.write_flags) + dfs_out = self.run_ior_with_pool(create_cont=False, fail_on_warning=self.log.info) + dfs_perf_write = IorCommand.get_ior_metrics(dfs_out) + self.ior_cmd.flags.update(self.read_flags) + dfs_out = self.run_ior_with_pool(create_cont=False, fail_on_warning=self.log.info) + dfs_perf_read = IorCommand.get_ior_metrics(dfs_out) + + # Destroy container, to be sure we use newly created container in next iteration + self.container.destroy() + self.container = None + + # gather actual and expected perf data to be compared + dfs_max_write = float(dfs_perf_write[0][IorMetrics.MAX_MIB]) + dfs_max_read = float(dfs_perf_read[0][IorMetrics.MAX_MIB]) + actual_write_x = abs(percent_change(self.expected_bw, dfs_max_write)) + actual_read_x = abs(percent_change(self.expected_bw, dfs_max_read)) + + # verify write performance + if actual_write_x > self.write_x: + if host not in self.failed_nodes: + self.failed_nodes[host] = [] + self.failed_nodes[host].append( + f"rank {rank} low write perf. " + f"BW: {dfs_max_write:.2f}/{self.expected_bw:.2f}; " + f"percent diff: {actual_write_x:.2f}/{self.write_x:.2f}") + + # verify read performance + if actual_read_x > self.read_x: + if host not in self.failed_nodes: + self.failed_nodes[host] = [] + self.failed_nodes[host].append( + f"rank {rank} low read perf. " + f"BW: {dfs_max_read:.2f}/{self.expected_bw:.2f}; " + f"percent diff: {actual_read_x:.2f}/{self.read_x:.2f}") + + except (TestFail, DaosTestError) as error: + if host not in self.failed_nodes: + self.failed_nodes[host] = [] + self.failed_nodes[host].append(str(error)) - # execute ior on given rank for different transfer sizes and collect the results - for idx, transfer_size in enumerate(self.transfer_sizes): - try: - self.ior_cmd.transfer_size.update(transfer_size) - self.ior_cmd.flags.update(self.write_flags) - dfs_out = self.run_ior_with_pool(fail_on_warning=self.log.info) - dfs_perf_write = IorCommand.get_ior_metrics(dfs_out) - self.ior_cmd.flags.update(self.read_flags) - dfs_out = self.run_ior_with_pool(create_cont=False, fail_on_warning=self.log.info) - dfs_perf_read = IorCommand.get_ior_metrics(dfs_out) - - # Destroy container, to be sure we use newly created container in next iteration - self.container.destroy() - self.container = None - - # gather actual and expected perf data to be compared - if idx == 0: - dfs_max_write = float(dfs_perf_write[0][IorMetrics.MAX_MIB]) - dfs_max_read = float(dfs_perf_read[0][IorMetrics.MAX_MIB]) - actual_write_x = percent_change(dfs_max_write, self.expected_bw) - actual_read_x = percent_change(dfs_max_read, self.expected_bw) - else: - dfs_max_write = float(dfs_perf_write[0][IorMetrics.MAX_OPS]) - dfs_max_read = float(dfs_perf_read[0][IorMetrics.MAX_OPS]) - actual_write_x = percent_change(dfs_max_write, self.expected_iops) - actual_read_x = percent_change(dfs_max_read, self.expected_iops) - - # compare actual and expected perf data - self.assertLessEqual(abs(actual_write_x), self.write_x, - "Max Write Diff too large for rank: {}".format(rank)) - self.assertLessEqual(abs(actual_read_x), self.read_x, - "Max Read Diff too large for rank: {}".format(rank)) - # collect list of good nodes - good_node = self.server_managers[0].get_host(rank) - if ((good_node not in self.good_nodes) - and (good_node not in self.failed_nodes)): - self.good_nodes.append(good_node) - except (TestFail, DaosTestError): - # collect bad nodes - failed_node = self.server_managers[0].get_host(rank) - if failed_node not in self.failed_nodes: - self.failed_nodes[failed_node] = [rank] - else: - self.failed_nodes[failed_node].append(rank) - if failed_node in self.good_nodes: - self.good_nodes.remove(failed_node) # Destroy pool, to be sure we use newly created pool in next iteration self.pool.destroy() self.pool = None @@ -100,8 +100,6 @@ def test_ior_per_rank(self): # test params self.failed_nodes = {} - self.good_nodes = [] - self.transfer_sizes = self.params.get("transfer_sizes", self.ior_cmd.namespace) self.write_flags = self.params.get("write_flags", self.ior_cmd.namespace) self.read_flags = self.params.get("read_flags", self.ior_cmd.namespace) @@ -122,13 +120,15 @@ def test_ior_per_rank(self): for rank in rank_list: self.execute_ior_per_rank(rank) - # list of good nodes - if self.good_nodes: - self.log.info("List of good nodes: %s", self.good_nodes) + # the good nodes are any that did not fail + good_nodes = self.hostlist_servers - NodeSet.fromlist(self.failed_nodes.keys()) + if good_nodes: + self.log.info("Good nodes: %s", good_nodes) # list the failed node and the rank number associated with that node if self.failed_nodes: - self.log.info("List of failed ranks with corresponding nodes") - for node, rank in self.failed_nodes.items(): - self.log.info("Node: %s, Rank: %s", node, rank) + self.log.info("List of failed nodes with corresponding ranks") + for node, reason_list in self.failed_nodes.items(): + for reason in reason_list: + self.log.info("%s: %s", node, reason) self.fail("Performance check failed for one or more nodes") diff --git a/src/tests/ftest/deployment/ior_per_rank.yaml b/src/tests/ftest/deployment/ior_per_rank.yaml index 406ef6dfff9..3a45226b5ca 100644 --- a/src/tests/ftest/deployment/ior_per_rank.yaml +++ b/src/tests/ftest/deployment/ior_per_rank.yaml @@ -23,12 +23,12 @@ server_config: pool: mode: 146 size: 750G # Cannot use percentage, as it does not work when using pool create for per rank. - control_method: dmg properties: ec_cell_sz:128KiB container: type: POSIX properties: cksum:crc16,cksum_size:16384,srv_cksum:on control_method: daos + oclass: SX ior: client_processes: ppn: 32 @@ -36,13 +36,11 @@ ior: test_file: testFile write_flags: "-w -C -e -g -G 27 -k -Q 1" read_flags: "-r -R -C -e -g -G 27 -k -Q 1" - sw_deadline: 30 + sw_deadline: 15 sw_wearout: 1 sw_status_file: "/var/tmp/daos_testing/stoneWallingStatusFile" - dfs_oclass: 'SX' - transfer_sizes: - - 1M - - 256B + dfs_oclass: SX + transfer_size: 1M block_size: 150G # 0.5 only for CI, due to the varying nature of different clusters in CI. # Change it to 15% (0.15) for Aurora. diff --git a/src/tests/ftest/nvme/fragmentation.py b/src/tests/ftest/nvme/fragmentation.py index 65dd3406ef6..fe09ef50f72 100644 --- a/src/tests/ftest/nvme/fragmentation.py +++ b/src/tests/ftest/nvme/fragmentation.py @@ -35,7 +35,7 @@ def setUp(self): self.ior_transfer_size = self.params.get("transfer_block_size", '/run/ior/iorflags/*') self.ior_dfs_oclass = self.params.get("obj_class", '/run/ior/iorflags/*') # Recreate the client hostfile without slots defined - self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir, None) + self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir) self.pool = None self.out_queue = queue.Queue() diff --git a/src/tests/ftest/nvme/pool_capacity.py b/src/tests/ftest/nvme/pool_capacity.py index 2f88d012092..4c417aa83a9 100644 --- a/src/tests/ftest/nvme/pool_capacity.py +++ b/src/tests/ftest/nvme/pool_capacity.py @@ -32,7 +32,7 @@ def setUp(self): self.ior_test_sequence = self.params.get("ior_test_sequence", '/run/ior/iorflags/*') self.ior_dfs_oclass = self.params.get("obj_class", '/run/ior/iorflags/*') # Recreate the client hostfile without slots defined - self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir, None) + self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir) self.out_queue = queue.Queue() def ior_thread(self, pool, oclass, api, test, flags, results): diff --git a/src/tests/ftest/nvme/pool_exclude.py b/src/tests/ftest/nvme/pool_exclude.py index 578070c1c47..72a6c9b50ad 100644 --- a/src/tests/ftest/nvme/pool_exclude.py +++ b/src/tests/ftest/nvme/pool_exclude.py @@ -32,8 +32,7 @@ def setUp(self): self.dmg_command = self.get_dmg_command() self.ior_test_sequence = self.params.get("ior_test_sequence", "/run/ior/iorflags/*") # Recreate the client hostfile without slots defined - self.hostfile_clients = write_host_file( - self.hostlist_clients, self.workdir, None) + self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir) self.pool = None self.cont_list = [] self.dmg_command.exit_status_exception = True diff --git a/src/tests/ftest/nvme/pool_extend.py b/src/tests/ftest/nvme/pool_extend.py index 18e48ced8a1..876050a8561 100644 --- a/src/tests/ftest/nvme/pool_extend.py +++ b/src/tests/ftest/nvme/pool_extend.py @@ -30,7 +30,7 @@ def setUp(self): super().setUp() # Recreate the client hostfile without slots defined - self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir, None) + self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir) self.dmg_command.exit_status_exception = True def run_nvme_pool_extend(self, num_pool, oclass=None): diff --git a/src/tests/ftest/osa/offline_drain.py b/src/tests/ftest/osa/offline_drain.py index 26be76d95a8..ea31e86cf93 100644 --- a/src/tests/ftest/osa/offline_drain.py +++ b/src/tests/ftest/osa/offline_drain.py @@ -29,8 +29,7 @@ def setUp(self): self.ior_test_sequence = self.params.get( "ior_test_sequence", '/run/ior/iorflags/*') # Recreate the client hostfile without slots defined - self.hostfile_clients = write_host_file( - self.hostlist_clients, self.workdir, None) + self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir) def run_offline_drain_test(self, num_pool, data=False, oclass=None, pool_fillup=0): """Run the offline drain without data. diff --git a/src/tests/ftest/osa/offline_reintegration.py b/src/tests/ftest/osa/offline_reintegration.py index 6491a477d7a..d88a2ee2f9c 100644 --- a/src/tests/ftest/osa/offline_reintegration.py +++ b/src/tests/ftest/osa/offline_reintegration.py @@ -30,8 +30,7 @@ def setUp(self): self.ior_test_repetitions = self.params.get("pool_test_repetitions", '/run/pool_capacity/*') self.loop_test_cnt = 1 # Recreate the client hostfile without slots defined - self.hostfile_clients = write_host_file( - self.hostlist_clients, self.workdir, None) + self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir) self.dmg_command.exit_status_exception = True def run_offline_reintegration_test(self, num_pool, data=False, server_boot=False, oclass=None, diff --git a/src/tests/ftest/osa/online_drain.py b/src/tests/ftest/osa/online_drain.py index bf5804c3767..6bef36ead89 100644 --- a/src/tests/ftest/osa/online_drain.py +++ b/src/tests/ftest/osa/online_drain.py @@ -28,8 +28,7 @@ def setUp(self): "ior_test_sequence", '/run/ior/iorflags/*') self.test_oclass = self.params.get("oclass", '/run/test_obj_class/*') # Recreate the client hostfile without slots defined - self.hostfile_clients = write_host_file( - self.hostlist_clients, self.workdir, None) + self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir) self.dmg_command.exit_status_exception = True self.pool = None diff --git a/src/tests/ftest/osa/online_extend.py b/src/tests/ftest/osa/online_extend.py index b1ae7c247f3..d8c8b18e607 100644 --- a/src/tests/ftest/osa/online_extend.py +++ b/src/tests/ftest/osa/online_extend.py @@ -33,7 +33,7 @@ def setUp(self): self.extra_servers = self.get_hosts_from_yaml( "test_servers", "server_partition", "server_reservation", "/run/extra_servers/*") # Recreate the client hostfile without slots defined - self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir, None) + self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir) self.pool = None self.dmg_command.exit_status_exception = True self.daos_racer = None diff --git a/src/tests/ftest/osa/online_parallel_test.py b/src/tests/ftest/osa/online_parallel_test.py index c99a45dbd66..23addc6c392 100644 --- a/src/tests/ftest/osa/online_parallel_test.py +++ b/src/tests/ftest/osa/online_parallel_test.py @@ -35,7 +35,7 @@ def setUp(self): self.ior_test_sequence = self.params.get("ior_test_sequence", '/run/ior/iorflags/*') self.ior_dfs_oclass = self.params.get("obj_class", '/run/ior/iorflags/*') # Recreate the client hostfile without slots defined - self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir, None) + self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir) self.pool = None self.out_queue = queue.Queue() self.ds_racer_queue = queue.Queue() diff --git a/src/tests/ftest/osa/online_reintegration.py b/src/tests/ftest/osa/online_reintegration.py index 3e4cda5eb3c..3fcc74e7629 100644 --- a/src/tests/ftest/osa/online_reintegration.py +++ b/src/tests/ftest/osa/online_reintegration.py @@ -31,7 +31,7 @@ def setUp(self): self.ior_test_sequence = self.params.get("ior_test_sequence", '/run/ior/iorflags/*') self.test_oclass = self.params.get("oclass", '/run/test_obj_class/*') # Recreate the client hostfile without slots defined - self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir, None) + self.hostfile_clients = write_host_file(self.hostlist_clients, self.workdir) self.pool = None self.ds_racer_queue = queue.Queue() self.daos_racer = None diff --git a/src/tests/ftest/util/write_host_file.py b/src/tests/ftest/util/write_host_file.py index 8a6e810c6ba..a967c4f3232 100644 --- a/src/tests/ftest/util/write_host_file.py +++ b/src/tests/ftest/util/write_host_file.py @@ -4,18 +4,18 @@ SPDX-License-Identifier: BSD-2-Clause-Patent """ import os -import random from logging import getLogger +from tempfile import mkstemp -def write_host_file(hosts, path='/tmp', slots=1): +def write_host_file(hosts, path='/tmp', slots=None): """Write out a hostfile suitable for orterun. Args: hosts (NodeSet): hosts to write to the hostfile path (str, optional): where to write the hostfile. Defaults to '/tmp'. slots (int, optional): slots per host to specify in the hostfile. - Defaults to 1. + Defaults to None. Raises: ValueError: if no hosts have been specified @@ -24,23 +24,19 @@ def write_host_file(hosts, path='/tmp', slots=1): str: the full path of the written hostfile """ - log = getLogger() - unique = random.randint(1, 100000) # nosec + if not hosts: + raise ValueError("hosts parameter must be provided.") - if not os.path.exists(path): - os.makedirs(path) - hostfile = os.path.join(path, "".join(["hostfile", str(unique)])) + log = getLogger() + os.makedirs(path, exist_ok=True) - if not hosts: - raise ValueError("host list parameter must be provided.") + _, hostfile = mkstemp(dir=path, prefix='hostfile_') log.debug("Writing hostfile: %s (hosts=%s, slots=%s)", hostfile, hosts, slots) with open(hostfile, "w") as hostfile_handle: - for host in hosts: - hostfile_line = [host] - if slots: - hostfile_line.append(f"slots={slots}") - hostfile_handle.write(f"{' '.join(hostfile_line)}\n") - log.debug(" %s", " ".join(hostfile_line)) + if slots: + hostfile_handle.writelines(f'{host} slots={slots}\n' for host in sorted(hosts)) + else: + hostfile_handle.writelines(f'{host}\n' for host in sorted(hosts)) return hostfile diff --git a/src/tests/suite/daos_rebuild_ec.c b/src/tests/suite/daos_rebuild_ec.c index 6e3ac373690..b5018e50111 100644 --- a/src/tests/suite/daos_rebuild_ec.c +++ b/src/tests/suite/daos_rebuild_ec.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -1304,6 +1304,9 @@ rebuild_ec_parity_overwrite_fail_parity_internal(void **state, int *kill_shards, parity_rank = get_rank_by_oid_shard(arg, oid, shard_idx); rebuild_single_pool_rank(arg, parity_rank, true); + print_message("sleep 60 seconds for aggregation\n"); + sleep(60); + /* fail data shard */ for (i = 0; i < nr; i++) { shard_idx = (dkey_hash % 6 + kill_shards[i]) % 6; @@ -1487,7 +1490,7 @@ static const struct CMUnitTest rebuild_tests[] = { {"REBUILD46: fail parity shard and data shards after overwrite", rebuild_ec_overwrite_fail_parity_data, rebuild_ec_8nodes_setup, test_teardown}, - {"REBUILD46: fail parity shard and data shards after overwrite with aggregation", + {"REBUILD47: fail parity shard and data shards after overwrite with aggregation", rebuild_ec_overwrite_fail_parity_data_with_parity, rebuild_ec_8nodes_setup, test_teardown}, }; diff --git a/src/vos/ilog.c b/src/vos/ilog.c index 08528e40c42..1d1d6508087 100644 --- a/src/vos/ilog.c +++ b/src/vos/ilog.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2019-2023 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -81,6 +81,8 @@ struct ilog_context { bool ic_in_txn; /** version needs incrementing */ bool ic_ver_inc; + /** For operation with fixed epoch (rebuild, ec_agg, ec_rep, etc.) */ + bool ic_fixed_epoch; }; D_CASSERT(sizeof(struct ilog_id) == sizeof(struct ilog_tree)); @@ -401,7 +403,7 @@ ilog_create(struct umem_instance *umm, struct ilog_df *root) int ilog_open(struct umem_instance *umm, struct ilog_df *root, - const struct ilog_desc_cbs *cbs, daos_handle_t *loh) + const struct ilog_desc_cbs *cbs, bool fixed_epoch, daos_handle_t *loh) { struct ilog_context *lctx; int rc; @@ -412,6 +414,7 @@ ilog_open(struct umem_instance *umm, struct ilog_df *root, if (rc != 0) return rc; + lctx->ic_fixed_epoch = fixed_epoch; *loh = ilog_lctx2hdl(lctx); return 0; @@ -603,7 +606,15 @@ check_equal(struct ilog_context *lctx, struct ilog_id *id_out, const struct ilog id_out->id_update_minor_eph > id_out->id_punch_minor_eph) return -DER_ALREADY; + } else if (lctx->ic_fixed_epoch) { + /* + * For operation with fixed epoch, when update existing ilog entry, + * regard them as the same and use minor epoch for further handling. + */ + *is_equal = true; + return 0; } + D_DEBUG(DB_IO, "Access of incarnation log from multiple DTX" " at same time is not allowed: rc=DER_TX_RESTART\n"); return -DER_TX_RESTART; diff --git a/src/vos/ilog.h b/src/vos/ilog.h index 767064022dd..0cc7ceb5c4f 100644 --- a/src/vos/ilog.h +++ b/src/vos/ilog.h @@ -1,5 +1,5 @@ /** - * (C) Copyright 2019-2023 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -88,14 +88,15 @@ ilog_create(struct umem_instance *umm, struct ilog_df *root); * * \param umm[IN] The umem instance * \param root[IN] A pointer to the allocated root - * \param cbs[in] Incarnation log transaction log callbacks + * \param cbs[IN] Incarnation log transaction log callbacks + * \param fixed_epoch[IN] It is for operation with fixed epoch or not * \param loh[OUT] Returned open log handle * * \return 0 on success, error code on failure */ int ilog_open(struct umem_instance *umm, struct ilog_df *root, - const struct ilog_desc_cbs *cbs, daos_handle_t *loh); + const struct ilog_desc_cbs *cbs, bool fixed_epoch, daos_handle_t *loh); /** Close an open incarnation log handle * diff --git a/src/vos/tests/vts_ilog.c b/src/vos/tests/vts_ilog.c index 39caad1113c..c696ff0b487 100644 --- a/src/vos/tests/vts_ilog.c +++ b/src/vos/tests/vts_ilog.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2019-2023 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -504,7 +504,7 @@ ilog_test_update(void **state) rc = ilog_create(umm, ilog); LOG_FAIL(rc, 0, "Failed to create a new incarnation log\n"); - rc = ilog_open(umm, ilog, &ilog_callbacks, &loh); + rc = ilog_open(umm, ilog, &ilog_callbacks, false, &loh); LOG_FAIL(rc, 0, "Failed to open incarnation log\n"); version_cache_fetch(&version_cache, loh, true); @@ -653,7 +653,7 @@ ilog_test_abort(void **state) rc = ilog_create(umm, ilog); LOG_FAIL(rc, 0, "Failed to create a new incarnation log\n"); - rc = ilog_open(umm, ilog, &ilog_callbacks, &loh); + rc = ilog_open(umm, ilog, &ilog_callbacks, false, &loh); LOG_FAIL(rc, 0, "Failed to open new incarnation log\n"); version_cache_fetch(&version_cache, loh, true); @@ -765,7 +765,7 @@ ilog_test_persist(void **state) rc = ilog_create(umm, ilog); LOG_FAIL(rc, 0, "Failed to create a new incarnation log\n"); - rc = ilog_open(umm, ilog, &ilog_callbacks, &loh); + rc = ilog_open(umm, ilog, &ilog_callbacks, false, &loh); LOG_FAIL(rc, 0, "Failed to open incarnation log\n"); version_cache_fetch(&version_cache, loh, true); @@ -849,7 +849,7 @@ ilog_test_aggregate(void **state) rc = ilog_create(umm, ilog); LOG_FAIL(rc, 0, "Failed to create a new incarnation log\n"); - rc = ilog_open(umm, ilog, &ilog_callbacks, &loh); + rc = ilog_open(umm, ilog, &ilog_callbacks, false, &loh); LOG_FAIL(rc, 0, "Failed to open incarnation log\n"); id.id_epoch = 1; @@ -965,7 +965,7 @@ ilog_test_discard(void **state) rc = ilog_create(umm, ilog); LOG_FAIL(rc, 0, "Failed to create a new incarnation log\n"); - rc = ilog_open(umm, ilog, &ilog_callbacks, &loh); + rc = ilog_open(umm, ilog, &ilog_callbacks, false, &loh); LOG_FAIL(rc, 0, "Failed to open incarnation log\n"); id.id_epoch = 1; diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index a598c943689..ff3fc187bfa 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2019-2023 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -529,7 +529,7 @@ dtx_ilog_rec_release(struct umem_instance *umm, struct vos_container *cont, ilog = umem_off2ptr(umm, umem_off2offset(rec)); vos_ilog_desc_cbs_init(&cbs, vos_cont2hdl(cont)); - rc = ilog_open(umm, ilog, &cbs, &loh); + rc = ilog_open(umm, ilog, &cbs, false, &loh); if (rc != 0) return rc; diff --git a/src/vos/vos_ilog.c b/src/vos/vos_ilog.c index dea2a22f28f..1a13e4d6a28 100644 --- a/src/vos/vos_ilog.c +++ b/src/vos/vos_ilog.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2019-2023 Intel Corporation. + * (C) Copyright 2019-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -428,7 +428,7 @@ int vos_ilog_update_(struct vos_container *cont, struct ilog_df *ilog, } vos_ilog_desc_cbs_init(&cbs, vos_cont2hdl(cont)); - rc = ilog_open(vos_cont2umm(cont), ilog, &cbs, &loh); + rc = ilog_open(vos_cont2umm(cont), ilog, &cbs, dth == NULL, &loh); if (rc != 0) { D_ERROR("Could not open incarnation log: "DF_RC"\n", DP_RC(rc)); return rc; @@ -525,7 +525,7 @@ vos_ilog_punch_(struct vos_container *cont, struct ilog_df *ilog, punch_log: vos_ilog_desc_cbs_init(&cbs, vos_cont2hdl(cont)); - rc = ilog_open(vos_cont2umm(cont), ilog, &cbs, &loh); + rc = ilog_open(vos_cont2umm(cont), ilog, &cbs, dth == NULL, &loh); if (rc != 0) { D_ERROR("Could not open incarnation log: "DF_RC"\n", DP_RC(rc)); return rc; diff --git a/src/vos/vos_obj_index.c b/src/vos/vos_obj_index.c index 58d1dba1021..d1b2a713d03 100644 --- a/src/vos/vos_obj_index.c +++ b/src/vos/vos_obj_index.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2023 Intel Corporation. + * (C) Copyright 2016-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -288,7 +288,7 @@ vos_oi_find_alloc(struct vos_container *cont, daos_unit_oid_t oid, if (!log) goto skip_log; vos_ilog_desc_cbs_init(&cbs, vos_cont2hdl(cont)); - rc = ilog_open(vos_cont2umm(cont), &obj->vo_ilog, &cbs, &loh); + rc = ilog_open(vos_cont2umm(cont), &obj->vo_ilog, &cbs, dth == NULL, &loh); if (rc != 0) return rc;