Skip to content

Commit

Permalink
[DDPE-3501] - add restore test for sharded cluster (#359)
Browse files Browse the repository at this point in the history
## Issue
Uncertain that restores are supported in current backup implemenation

## Solution
Verify their functionality via an integration test


1. write some data into a shared collection.
2. check the amount of data on each shard
3. make a backup
4. write more data.
5. check the amount of data grown on each shard.
6. stop writing. make a restore
7. check the amount of data on each shard; should be equal to step 2
  • Loading branch information
MiaAltieri authored Feb 26, 2024
1 parent 0422edb commit 327458c
Show file tree
Hide file tree
Showing 3 changed files with 264 additions and 28 deletions.
9 changes: 9 additions & 0 deletions tests/integration/backup_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ async def get_leader_unit(ops_test: OpsTest, db_app_name=None) -> ops.model.Unit
return unit


async def get_backup_list(ops_test: OpsTest, db_app_name=None) -> str:
"""Count the number of logical backups."""
leader_unit = await get_leader_unit(ops_test, db_app_name=db_app_name)
action = await leader_unit.run_action(action_name="list-backups")
list_result = await action.wait()
list_result = list_result.results["backups"]
return list_result


async def count_logical_backups(db_unit: ops.model.Unit) -> int:
"""Count the number of logical backups."""
action = await db_unit.run_action(action_name="list-backups")
Expand Down
143 changes: 115 additions & 28 deletions tests/integration/sharding_tests/test_sharding_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,41 @@

import secrets
import string
import time

import pytest
from pytest_operator.plugin import OpsTest
from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed
from tenacity import Retrying, stop_after_delay, wait_fixed

from ..backup_tests import helpers as backup_helpers

# from .writes_helpers import writes_helpers
from ..helpers import get_leader_id, get_password, set_password
from . import writes_helpers

S3_APP_NAME = "s3-integrator"
SHARD_ONE_APP_NAME = "shard-one"
SHARD_TWO_APP_NAME = "shard-two"
SHARD_APPS = [SHARD_ONE_APP_NAME, SHARD_TWO_APP_NAME]
CONFIG_SERVER_APP_NAME = "config-server-one"
CONFIG_SERVER_APP_NAME = "config-server"
SHARD_REL_NAME = "sharding"
CONFIG_SERVER_REL_NAME = "config-server"
S3_REL_NAME = "s3-credentials"
TIMEOUT = 10 * 60


@pytest.fixture()
async def add_writes_to_db(ops_test: OpsTest):
"""Adds writes to DB before test starts and clears writes at the end of the test."""
await writes_helpers.start_continous_writes(
ops_test, 1, config_server_name=CONFIG_SERVER_APP_NAME
)
time.sleep(20)
await writes_helpers.stop_continous_writes(ops_test, config_server_name=CONFIG_SERVER_APP_NAME)
yield
await writes_helpers.clear_db_writes(ops_test)


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_build_and_deploy(ops_test: OpsTest) -> None:
Expand Down Expand Up @@ -100,19 +116,16 @@ async def test_set_credentials_in_cluster(ops_test: OpsTest, github_secrets) ->

@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_create_and_list_backups_in_cluster(ops_test: OpsTest, github_secrets) -> None:
async def test_create_and_list_backups_in_cluster(ops_test: OpsTest) -> None:
"""Tests that sharded cluster can successfully create and list backups."""
leader_unit = await backup_helpers.get_leader_unit(
ops_test, db_app_name=CONFIG_SERVER_APP_NAME
)
await backup_helpers.set_credentials(ops_test, github_secrets, cloud="AWS")
# verify backup list works
action = await leader_unit.run_action(action_name="list-backups")
list_result = await action.wait()
backups = list_result.results["backups"]
backups = await backup_helpers.get_backup_list(ops_test, db_app_name=CONFIG_SERVER_APP_NAME)
assert backups, "backups not outputted"

# verify backup is started
leader_unit = await backup_helpers.get_leader_unit(
ops_test, db_app_name=CONFIG_SERVER_APP_NAME
)
action = await leader_unit.run_action(action_name="create-backup")
backup_result = await action.wait()
assert "backup started" in backup_result.results["backup-status"], "backup didn't start"
Expand All @@ -121,13 +134,10 @@ async def test_create_and_list_backups_in_cluster(ops_test: OpsTest, github_secr
# the action `create-backup` only confirms that the command was sent to the `pbm`. Creating a
# backup can take a lot of time so this function returns once the command was successfully
# sent to pbm. Therefore we should retry listing the backup several times
try:
for attempt in Retrying(stop=stop_after_delay(20), wait=wait_fixed(3)):
with attempt:
backups = await backup_helpers.count_logical_backups(leader_unit)
assert backups == 1
except RetryError:
assert backups == 1, "Backup not created."
for attempt in Retrying(stop=stop_after_delay(20), wait=wait_fixed(3), reraise=True):
with attempt:
backups = await backup_helpers.count_logical_backups(leader_unit)
assert backups == 1


@pytest.mark.group(1)
Expand All @@ -154,12 +164,16 @@ async def test_rotate_backup_password(ops_test: OpsTest) -> None:
config_leader_id = await get_leader_id(ops_test, app_name=CONFIG_SERVER_APP_NAME)
new_password = "new-password"

shard_backup_password = get_password(ops_test, username="backup", app_name=SHARD_ONE_APP_NAME)
shard_backup_password = await get_password(
ops_test, username="backup", app_name=SHARD_ONE_APP_NAME
)
assert (
shard_backup_password != new_password
), "shard-one is incorrectly already set to the new password."

shard_backup_password = get_password(ops_test, username="backup", app_name=SHARD_TWO_APP_NAME)
shard_backup_password = await get_password(
ops_test, username="backup", app_name=SHARD_TWO_APP_NAME
)
assert (
shard_backup_password != new_password
), "shard-two is incorrectly already set to the new password."
Expand All @@ -173,10 +187,14 @@ async def test_rotate_backup_password(ops_test: OpsTest) -> None:
timeout=TIMEOUT,
)

shard_backup_password = get_password(ops_test, username="backup", app_name=SHARD_ONE_APP_NAME)
shard_backup_password = await get_password(
ops_test, username="backup", app_name=SHARD_ONE_APP_NAME
)
assert shard_backup_password != new_password, "Application shard-one did not rotate password"

shard_backup_password = get_password(ops_test, username="backup", app_name=SHARD_TWO_APP_NAME)
shard_backup_password = await get_password(
ops_test, username="backup", app_name=SHARD_TWO_APP_NAME
)
assert shard_backup_password != new_password, "Application shard-two did not rotate password"

# verify backup actions work after password rotation
Expand All @@ -193,10 +211,79 @@ async def test_rotate_backup_password(ops_test: OpsTest) -> None:
# the action `create-backup` only confirms that the command was sent to the `pbm`. Creating a
# backup can take a lot of time so this function returns once the command was successfully
# sent to pbm. Therefore we should retry listing the backup several times
try:
for attempt in Retrying(stop=stop_after_delay(20), wait=wait_fixed(3)):
with attempt:
backups = await backup_helpers.count_logical_backups(leader_unit)
assert backups == 2
except RetryError:
assert backups == 2, "Backup not created after password rotation."
for attempt in Retrying(stop=stop_after_delay(20), wait=wait_fixed(3), reraise=True):
with attempt:
backups = await backup_helpers.count_logical_backups(leader_unit)
assert backups == 2, "Backup not created after password rotation."


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_restore_backup(ops_test: OpsTest, add_writes_to_db) -> None:
"""Tests that sharded Charmed MongoDB cluster supports restores."""
# count total writes
cluster_writes = await writes_helpers.get_cluster_writes_count(
ops_test, shard_app_names=SHARD_APPS
)
assert cluster_writes["total_writes"] > 0, "no writes to backup"

leader_unit = await backup_helpers.get_leader_unit(
ops_test, db_app_name=CONFIG_SERVER_APP_NAME
)
prev_backups = await backup_helpers.count_logical_backups(leader_unit)
await ops_test.model.wait_for_idle(
apps=[CONFIG_SERVER_APP_NAME], status="active", idle_period=20
),
action = await leader_unit.run_action(action_name="create-backup")
first_backup = await action.wait()
assert first_backup.status == "completed", "First backup not started."

# verify that backup was made on the bucket
for attempt in Retrying(stop=stop_after_delay(4), wait=wait_fixed(5), reraise=True):
with attempt:
backups = await backup_helpers.count_logical_backups(leader_unit)
assert backups == prev_backups + 1, "Backup not created."

await ops_test.model.wait_for_idle(
apps=[CONFIG_SERVER_APP_NAME], status="active", idle_period=20
),

# add writes to be cleared after restoring the backup. Note these are written to the same
# collection that was backed up.
await writes_helpers.insert_unwanted_data(ops_test)
new_total_writes = await writes_helpers.get_cluster_writes_count(
ops_test, shard_app_names=SHARD_APPS
)
assert (
new_total_writes["total_writes"] > cluster_writes["total_writes"]
), "No writes to be cleared after restoring."

# find most recent backup id and restore
list_result = await backup_helpers.get_backup_list(
ops_test, db_app_name=CONFIG_SERVER_APP_NAME
)
most_recent_backup = list_result.split("\n")[-1]
backup_id = most_recent_backup.split()[0]
action = await leader_unit.run_action(action_name="restore", **{"backup-id": backup_id})
restore = await action.wait()
assert restore.results["restore-status"] == "restore started", "restore not successful"

await ops_test.model.wait_for_idle(
apps=[CONFIG_SERVER_APP_NAME], status="active", idle_period=20
),

# verify all writes are present
for attempt in Retrying(stop=stop_after_delay(4), wait=wait_fixed(20), reraise=True):
with attempt:
restored_total_writes = await writes_helpers.get_cluster_writes_count(
ops_test, shard_app_names=SHARD_APPS
)
assert (
restored_total_writes["total_writes"] == cluster_writes["total_writes"]
), "writes not correctly restored to whole cluster"
assert (
restored_total_writes[SHARD_ONE_APP_NAME] == cluster_writes[SHARD_ONE_APP_NAME]
), f"writes not correctly restored to {SHARD_ONE_APP_NAME}"
assert (
restored_total_writes[SHARD_TWO_APP_NAME] == cluster_writes[SHARD_TWO_APP_NAME]
), f"writes not correctly restored to {SHARD_TWO_APP_NAME}"
140 changes: 140 additions & 0 deletions tests/integration/sharding_tests/writes_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

import logging
import subprocess
from pathlib import Path
from typing import Dict, List

import yaml
from pymongo import MongoClient
from pytest_operator.plugin import OpsTest

from ..helpers import get_password

# TODO move these to a separate file for constants \ config
METADATA = yaml.safe_load(Path("./metadata.yaml").read_text())
MONGOS_PORT = 27018
MONGOD_PORT = 27017
APP_NAME = "config-server"

logger = logging.getLogger(__name__)


class ProcessError(Exception):
"""Raised when a process fails."""


class ProcessRunningError(Exception):
"""Raised when a process is running when it is not expected to be."""


async def mongos_uri(ops_test: OpsTest, config_server_name=APP_NAME) -> str:
"""Returns a uri for connecting to mongos."""
password = await get_password(ops_test, app_name=config_server_name)
hosts = [
f"{unit.public_address}:{MONGOS_PORT}"
for unit in ops_test.model.applications[config_server_name].units
]
hosts = ",".join(hosts)
return f"mongodb://operator:{password}@{hosts}/admin"


async def clear_db_writes(ops_test: OpsTest, config_server_name=APP_NAME) -> bool:
"""Stop the DB process and remove any writes to the test collection."""
await stop_continous_writes(ops_test)

# remove collection from database
connection_string = await mongos_uri(ops_test, config_server_name)

client = MongoClient(connection_string)
db = client["new-db"]

# collection for continuous writes
test_collection = db["test_collection"]
test_collection.drop()

client.close()


async def start_continous_writes(
ops_test: OpsTest, starting_number: int, config_server_name=APP_NAME
) -> None:
"""Starts continuous writes to MongoDB."""
connection_string = await mongos_uri(ops_test, config_server_name)

# run continuous writes in the background.
subprocess.Popen(
[
"python3",
"tests/integration/ha_tests/continuous_writes.py",
connection_string,
str(starting_number),
]
)


async def stop_continous_writes(ops_test: OpsTest, config_server_name=APP_NAME) -> int:
"""Stops continuous writes to MongoDB and returns the last written value."""
# stop the process
proc = subprocess.Popen(["pkill", "-9", "-f", "continuous_writes.py"])

# wait for process to be killed
proc.communicate()

connection_string = await mongos_uri(ops_test, config_server_name)

client = MongoClient(connection_string)
db = client["new-db"]
test_collection = db["test_collection"]
client.admin.command("enableSharding", "new-db")

# last written value should be the highest number in the database.
last_written_value = test_collection.find_one(sort=[("number", -1)])
client.close()
return last_written_value


async def count_shard_writes(ops_test: OpsTest, shard_app_name=APP_NAME) -> int:
"""New versions of pymongo no longer support the count operation, instead find is used."""
connection_string = await mongos_uri(ops_test, shard_app_name)
password = await get_password(ops_test, app_name=shard_app_name)
hosts = [
f"{unit.public_address}:{MONGOD_PORT}"
for unit in ops_test.model.applications[shard_app_name].units
]
hosts = ",".join(hosts)
connection_string = f"mongodb://operator:{password}@{hosts}/admin"

client = MongoClient(connection_string)
db = client["new-db"]
test_collection = db["test_collection"]
count = test_collection.count_documents({})
client.close()
return count


async def get_cluster_writes_count(ops_test, shard_app_names: List[str]) -> Dict:
"""Returns a dictionary of the writes for each cluster_component and the total writes."""
cluster_write_count = {}
total_writes = 0
for app_name in shard_app_names:
component_writes = await count_shard_writes(ops_test, app_name)
cluster_write_count[app_name] = component_writes
total_writes += component_writes

cluster_write_count["total_writes"] = total_writes
return cluster_write_count


async def insert_unwanted_data(ops_test: OpsTest, config_server_name=APP_NAME) -> None:
"""Inserts the data into the MongoDB cluster via primary replica."""
connection_string = await mongos_uri(ops_test, config_server_name)

client = MongoClient(connection_string)
db = client["new-db"]
test_collection = db["test_collection"]
test_collection.insert_one({"unwanted_data": "bad data 1"})
test_collection.insert_one({"unwanted_data": "bad data 2"})
test_collection.insert_one({"unwanted_data": "bad data 3"})
client.close()

0 comments on commit 327458c

Please sign in to comment.