diff --git a/lib/charms/mongodb/v1/helpers.py b/lib/charms/mongodb/v1/helpers.py index 937786b89..425e905a1 100644 --- a/lib/charms/mongodb/v1/helpers.py +++ b/lib/charms/mongodb/v1/helpers.py @@ -8,7 +8,7 @@ import secrets import string import subprocess -from typing import List +from typing import List, Mapping from charms.mongodb.v1.mongodb import MongoConfiguration from ops.model import ActiveStatus, MaintenanceStatus, StatusBase, WaitingStatus @@ -23,7 +23,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 8 +LIBPATCH = 9 # path to store mongodb ketFile KEY_FILE = "keyFile" @@ -320,3 +320,25 @@ def add_args_to_env(var: str, args: str): with open(Config.ENV_VAR_PATH, "w") as service_file: service_file.writelines(env_vars) + + +def safe_exec( + command: list[str] | str, + env: Mapping[str, str] | None = None, + working_dir: str | None = None, +) -> str: + """Execs a command on the workload in a safe way.""" + try: + output = subprocess.check_output( + command, + stderr=subprocess.PIPE, + universal_newlines=True, + shell=isinstance(command, str), + env=env, + cwd=working_dir, + ) + logger.debug(f"{output=}") + return output + except subprocess.CalledProcessError as err: + logger.error(f"cmd failed - {err.cmd = }, {err.stdout = }, {err.stderr = }") + raise diff --git a/src/charm.py b/src/charm.py index 29759e2e8..e969b432f 100755 --- a/src/charm.py +++ b/src/charm.py @@ -26,6 +26,7 @@ generate_keyfile, generate_password, get_create_user_cmd, + safe_exec, ) from charms.mongodb.v1.mongodb import MongoDBConnection, NotReadyError from charms.mongodb.v1.mongodb_backups import MongoDBBackups @@ -46,6 +47,7 @@ NoVersionError, get_charm_revision, ) +from ops import StorageAttachedEvent from ops.charm import ( ActionEvent, CharmBase, @@ -108,6 +110,7 @@ def __init__(self, *args): self.framework.observe(self.on.install, self._on_install) self.framework.observe(self.on.start, self._on_start) self.framework.observe(self.on.update_status, self._on_update_status) + self.framework.observe(self.on.mongodb_storage_attached, self._on_storage_attached) self.framework.observe( self.on[Config.Relations.PEERS].relation_joined, self._on_relation_joined ) @@ -148,7 +151,7 @@ def __init__(self, *args): ], ) self.upgrade = MongoDBUpgrade(self) - self.config_server = ShardingProvider(self, substrate="vm") + self.config_server = ShardingProvider(self, substrate=Config.SUBSTRATE) self.cluster = ClusterProvider(self) self.shard = ConfigServerRequirer(self) self.status = MongoDBStatusHandler(self) @@ -588,6 +591,14 @@ def _on_relation_departed(self, event: RelationDepartedEvent) -> None: self._update_hosts(event) + def _on_storage_attached(self, event: StorageAttachedEvent) -> None: + """Handler for `storage_attached` event. + + This should handle fixing the permissions for the data dir. + """ + safe_exec(f"chmod -R 770 {Config.MONGODB_COMMON_PATH}".split()) + safe_exec(f"chown -R {Config.SNAP_USER}:root {Config.MONGODB_COMMON_PATH}".split()) + def _on_storage_detaching(self, event: StorageDetachingEvent) -> None: """Before storage detaches, allow removing unit to remove itself from the set. diff --git a/src/config.py b/src/config.py index fa8473eb1..d430e5073 100644 --- a/src/config.py +++ b/src/config.py @@ -3,6 +3,7 @@ # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. +from pathlib import Path from typing import Literal, TypeAlias from ops.model import BlockedStatus @@ -21,7 +22,13 @@ class Config: MONGOD_CONF_DIR = f"{MONGODB_SNAP_DATA_DIR}/etc/mongod" MONGOD_CONF_FILE_PATH = f"{MONGOD_CONF_DIR}/mongod.conf" CHARM_INTERNAL_VERSION_FILE = "charm_internal_version" - SNAP_PACKAGES = [("charmed-mongodb", "6/edge", 118)] + SNAP_PACKAGES = [("charmed-mongodb", "6/edge", 121)] + + MONGODB_COMMON_PATH = Path("/var/snap/charmed-mongodb/common") + + # This is the snap_daemon user, which does not exist on the VM before the + # snap install so creating it by UID + SNAP_USER = 584788 # Keep these alphabetically sorted class Actions: diff --git a/src/upgrades/upgrade.py b/src/upgrades/upgrade.py index 9e39a6ac2..9aea3699f 100644 --- a/src/upgrades/upgrade.py +++ b/src/upgrades/upgrade.py @@ -178,7 +178,7 @@ def app_status(self) -> typing.Optional[ops.StatusBase]: resume_string = "" if len(self._sorted_units) > 1: resume_string = ( - "Verify highest unit is healthy & run `{RESUME_ACTION_NAME}` action. " + f"Verify highest unit is healthy & run `{RESUME_ACTION_NAME}` action. " ) return ops.BlockedStatus( f"Upgrading. {resume_string}To rollback, `juju refresh` to last revision" diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 4a2d7754f..5bf36642b 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -1,10 +1,10 @@ # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. +import calendar import json import logging import subprocess -import time from datetime import datetime from pathlib import Path from subprocess import PIPE, check_output @@ -400,34 +400,7 @@ def storage_id(ops_test, unit_name): return line.split()[1] -async def add_unit_with_storage(ops_test, app_name, storage): - """Adds unit with storage. - - Note: this function exists as a temporary solution until this issue is resolved: - https://github.com/juju/python-libjuju/issues/695 - """ - expected_units = len(ops_test.model.applications[app_name].units) + 1 - prev_units = [unit.name for unit in ops_test.model.applications[app_name].units] - model_name = ops_test.model.info.name - add_unit_cmd = f"add-unit {app_name} --model={model_name} --attach-storage={storage}".split() - await ops_test.juju(*add_unit_cmd) - await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) - assert ( - len(ops_test.model.applications[app_name].units) == expected_units - ), "New unit not added to model" - - # verify storage attached - curr_units = [unit.name for unit in ops_test.model.applications[app_name].units] - new_unit = list(set(curr_units) - set(prev_units))[0] - assert storage_id(ops_test, new_unit) == storage, "unit added with incorrect storage" - - # return a reference to newly added unit - for unit in ops_test.model.applications[app_name].units: - if unit.name == new_unit: - return unit - - -async def reused_storage(ops_test: OpsTest, unit_name, removal_time) -> bool: +async def reused_storage(ops_test: OpsTest, unit_name: str, removal_time: float) -> bool: """Returns True if storage provided to mongod has been reused. MongoDB startup message indicates storage reuse: @@ -450,11 +423,17 @@ async def reused_storage(ops_test: OpsTest, unit_name, removal_time) -> bool: item = json.loads(line) - if "msg" not in item: + # "attr" is needed and stores the state information and changes of mongodb + if "attr" not in item: continue + # Compute reuse time re_use_time = convert_time(item["t"]["$date"]) - if '"newState": "STARTUP2", "oldState": "REMOVED"' in line and re_use_time > removal_time: + + # Get newstate and oldstate if present + newstate = item["attr"].get("newState", "") + oldstate = item["attr"].get("oldState", "") + if newstate == "STARTUP2" and oldstate == "REMOVED" and re_use_time > removal_time: return True return False @@ -641,10 +620,10 @@ async def verify_replica_set_configuration(ops_test: OpsTest, app_name=None) -> def convert_time(time_as_str: str) -> int: - """Converts a string time representation to an integer time representation.""" + """Converts a string time representation to an integer time representation, in UTC.""" # parse time representation, provided in this format: 'YYYY-MM-DDTHH:MM:SS.MMM+00:00' d = datetime.strptime(time_as_str, "%Y-%m-%dT%H:%M:%S.%f%z") - return time.mktime(d.timetuple()) + return calendar.timegm(d.timetuple()) def cut_network_from_unit(machine_name: str) -> None: diff --git a/tests/integration/ha_tests/test_ha.py b/tests/integration/ha_tests/test_ha.py index 26b9c50d7..7d303e0b1 100644 --- a/tests/integration/ha_tests/test_ha.py +++ b/tests/integration/ha_tests/test_ha.py @@ -7,6 +7,7 @@ import time import pytest +from juju import tag from pymongo import MongoClient from pytest_operator.plugin import OpsTest from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed @@ -20,7 +21,6 @@ unit_uri, ) from .helpers import ( - add_unit_with_storage, all_db_processes_down, clear_db_writes, count_primaries, @@ -77,12 +77,16 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: return my_charm = await ops_test.build_charm(".") - await ops_test.model.deploy(my_charm, num_units=required_units) + + storage = {"mongodb": {"pool": "lxd", "size": 2048}} + + await ops_test.model.deploy(my_charm, num_units=required_units, storage=storage) await ops_test.model.wait_for_idle() @pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "large"]) @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_storage_re_use(ops_test, continuous_writes): """Verifies that database units with attached storage correctly repurpose storage. @@ -110,10 +114,16 @@ async def test_storage_re_use(ops_test, continuous_writes): await ops_test.model.wait_for_idle( apps=[app_name], status="active", timeout=1000, wait_for_exact_units=expected_units ) - new_unit = await add_unit_with_storage(ops_test, app_name, unit_storage_id) + new_unit = ( + await ops_test.model.applications[app_name].add_unit( + count=1, attach_storage=[tag.storage(unit_storage_id)] + ) + )[0] + + await ops_test.model.wait_for_idle(apps=[app_name], status="active", timeout=1000) assert await reused_storage( - ops_test, new_unit.public_address, removal_time + ops_test, new_unit.name, removal_time ), "attached storage not properly reused by MongoDB." # verify that the no writes were skipped @@ -122,6 +132,71 @@ async def test_storage_re_use(ops_test, continuous_writes): assert total_expected_writes["number"] == actual_writes +@pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "large"]) +@pytest.mark.group(1) +@pytest.mark.skip("This is currently unsupported on MongoDB charm.") +@pytest.mark.abort_on_fail +async def test_storage_re_use_different_cluster(ops_test: OpsTest, continuous_writes): + """Tests that we can reuse storage from a different cluster. + + For that, we completely remove the application while keeping the storages, + and then we deploy a new application with storage reuse and check that the + storage has been reused. + """ + app_name = await get_app_name(ops_test) + if storage_type(ops_test, app_name) == "rootfs": + pytest.skip( + "reuse of storage can only be used on deployments with persistent storage not on rootfs deployments" + ) + + writes_results = await stop_continous_writes(ops_test, app_name=app_name) + unit_ids = [unit.name for unit in ops_test.model.applications[app_name].units] + storage_ids = {} + + remaining_units = len(unit_ids) + for unit_id in unit_ids: + storage_ids[unit_id] = storage_id(ops_test, unit_id) + await ops_test.model.applications[app_name].destroy_unit(unit_id) + # Give some time to remove the unit. We don't use asyncio.sleep here to + # leave time for each unit to be removed before removing the next one. + # time.sleep(60) + remaining_units -= 1 + await ops_test.model.wait_for_idle( + apps=[app_name], + status="active", + timeout=1000, + idle_period=20, + wait_for_exact_units=remaining_units, + ) + + # Wait until all apps are cleaned up + await ops_test.model.wait_for_idle(apps=[app_name], timeout=1000, wait_for_exact_units=0) + + for unit_id in unit_ids: + n_units = len(ops_test.model.applications[app_name].units) + await ops_test.model.applications[app_name].add_unit( + count=1, attach_storage=[tag.storage(storage_ids[unit_id])] + ) + await ops_test.model.wait_for_idle( + apps=[app_name], + status="active", + timeout=1000, + idle_period=20, + wait_for_exact_units=n_units + 1, + ) + + await ops_test.model.wait_for_idle( + apps=[app_name], + status="active", + timeout=1000, + idle_period=20, + wait_for_exact_units=len(unit_ids), + ) + + actual_writes = await count_writes(ops_test, app_name=app_name) + assert writes_results["number"] == actual_writes + + @pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "large"]) @pytest.mark.group(1) @pytest.mark.abort_on_fail @@ -235,6 +310,7 @@ async def test_scale_down_capablities(ops_test: OpsTest, continuous_writes) -> N @pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "large"]) @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_replication_across_members(ops_test: OpsTest, continuous_writes) -> None: """Check consistency, ie write to primary, read data from secondaries.""" # first find primary, write to primary, then read from each unit @@ -263,6 +339,7 @@ async def test_replication_across_members(ops_test: OpsTest, continuous_writes) @pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "large"]) @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None: """Verify unique clusters do not share DBs.""" # first find primary, write to primary, @@ -313,6 +390,7 @@ async def test_unique_cluster_dbs(ops_test: OpsTest, continuous_writes) -> None: @pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "large"]) @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_replication_member_scaling(ops_test: OpsTest, continuous_writes) -> None: """Verify newly added and newly removed members properly replica data. @@ -360,6 +438,7 @@ async def test_replication_member_scaling(ops_test: OpsTest, continuous_writes) @pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "large"]) @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_kill_db_process(ops_test, continuous_writes): # locate primary unit app_name = await get_app_name(ops_test) @@ -398,6 +477,7 @@ async def test_kill_db_process(ops_test, continuous_writes): @pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "large"]) @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_freeze_db_process(ops_test, continuous_writes): # locate primary unit app_name = await get_app_name(ops_test) @@ -453,6 +533,7 @@ async def test_freeze_db_process(ops_test, continuous_writes): @pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "large"]) @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_restart_db_process(ops_test, continuous_writes): # locate primary unit app_name = await get_app_name(ops_test) @@ -501,6 +582,7 @@ async def test_restart_db_process(ops_test, continuous_writes): @pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "large"]) @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_full_cluster_crash(ops_test: OpsTest, continuous_writes, reset_restart_delay): app_name = await get_app_name(ops_test) @@ -553,6 +635,7 @@ async def test_full_cluster_crash(ops_test: OpsTest, continuous_writes, reset_re @pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "large"]) @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_full_cluster_restart(ops_test: OpsTest, continuous_writes, reset_restart_delay): app_name = await get_app_name(ops_test) @@ -603,6 +686,7 @@ async def test_full_cluster_restart(ops_test: OpsTest, continuous_writes, reset_ @pytest.mark.runner(["self-hosted", "linux", "X64", "jammy", "large"]) @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_network_cut(ops_test, continuous_writes): # locate primary unit app_name = await get_app_name(ops_test) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 80b7767bb..057197219 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -75,11 +75,13 @@ def dump(self) -> Dict[str, Any]: return result -async def destroy_cluster(ops_test: OpsTest, applications: list[str]) -> None: +async def destroy_cluster( + ops_test: OpsTest, applications: list[str], destroy_storage: bool = False +) -> None: """Destroy cluster in a forceful way.""" for app in applications: await ops_test.model.applications[app].destroy( - destroy_storage=True, force=True, no_wait=False + destroy_storage=destroy_storage, force=True, no_wait=False ) # destroy does not wait for applications to be removed, perform this check manually diff --git a/tests/integration/upgrade/test_sharding_upgrade.py b/tests/integration/upgrade/test_sharding_upgrade.py index 1356519e5..4caf4be22 100644 --- a/tests/integration/upgrade/test_sharding_upgrade.py +++ b/tests/integration/upgrade/test_sharding_upgrade.py @@ -2,7 +2,9 @@ # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. +import logging import time +from pathlib import Path import pytest from pytest_operator.plugin import OpsTest @@ -21,6 +23,8 @@ stop_continous_writes, ) +logger = logging.getLogger(__name__) + MONGOD_SERVICE = "snap.charmed-mongodb.mongod.service" MONGOS_SERVICE = "snap.charmed-mongodb.mongos.service" SHARD_ONE_APP_NAME = "shard-one" @@ -75,7 +79,7 @@ async def test_upgrade( # We want to be sure that everything is settled down await ops_test.model.wait_for_idle( - CLUSTER_COMPONENTS, status="active", idle_period=20, timeout=TIMEOUT + CLUSTER_COMPONENTS, status="active", idle_period=20, timeout=20 * 60 ) # verify no writes were skipped during upgrade process shard_one_expected_writes = await stop_continous_writes( @@ -145,7 +149,7 @@ async def test_pre_upgrade_check_failure(ops_test: OpsTest) -> None: # TODO Future PR: Add more cases for failing pre-upgrade-check -async def run_upgrade_sequence(ops_test: OpsTest, app_name: str, new_charm) -> None: +async def run_upgrade_sequence(ops_test: OpsTest, app_name: str, new_charm: Path) -> None: """Runs the upgrade sequence on a given app.""" leader_unit = await find_unit(ops_test, leader=True, app_name=app_name) action = await leader_unit.run_action("pre-upgrade-check") @@ -153,17 +157,18 @@ async def run_upgrade_sequence(ops_test: OpsTest, app_name: str, new_charm) -> N assert action.status == "completed", "pre-upgrade-check failed, expected to succeed." await ops_test.model.applications[app_name].refresh(path=new_charm) - await ops_test.model.wait_for_idle(apps=[app_name], timeout=1000, idle_period=30) + await ops_test.model.wait_for_idle(apps=[app_name], timeout=1000, idle_period=120) # resume upgrade only needs to be ran when: # 1. there are more than one units in the application # 2. AND the underlying workload was updated - if not len(ops_test.model.applications[app_name].units) > 1: + if len(ops_test.model.applications[app_name].units) < 2: return if "resume-upgrade" not in ops_test.model.applications[app_name].status_message: return + logger.info(f"Calling resume-upgrade for {app_name}") action = await leader_unit.run_action("resume-upgrade") await action.wait() assert action.status == "completed", "resume-upgrade failed, expected to succeed." diff --git a/tests/integration/upgrade/test_upgrade.py b/tests/integration/upgrade/test_upgrade.py index 9486cfd5f..60d28eefc 100644 --- a/tests/integration/upgrade/test_upgrade.py +++ b/tests/integration/upgrade/test_upgrade.py @@ -62,16 +62,17 @@ async def test_upgrade(ops_test: OpsTest, continuous_writes) -> None: new_charm = await ops_test.build_charm(".") app_name = await get_app_name(ops_test) await ops_test.model.applications[app_name].refresh(path=new_charm) - await ops_test.model.wait_for_idle( - apps=[app_name], status="active", timeout=1000, idle_period=120 - ) + await ops_test.model.wait_for_idle(apps=[app_name], timeout=1000, idle_period=120) if "resume-upgrade" in ops_test.model.applications[app_name].status_message: + logger.info("Calling resume upgrade") action = await leader_unit.run_action("resume-upgrade") await action.wait() assert action.status == "completed", "resume-upgrade failed, expected to succeed" - await ops_test.model.wait_for_idle(apps=[app_name], timeout=1000, idle_period=120) + await ops_test.model.wait_for_idle( + apps=[app_name], status="active", timeout=1000, idle_period=120 + ) # verify that the no writes were skipped total_expected_writes = await ha_helpers.stop_continous_writes(ops_test, app_name=app_name)