Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backup and restore test with multi-tenant class #202

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,26 @@ jobs:
password: ${{secrets.DOCKER_PASSWORD}}
- name: Run chaos test
run: ./backup_and_restore_multi_node_crud.sh
backup-restore-crud-multi-node-50k-tenants:
name: Backup & Restore multi node with several thounsands of tenants
runs-on: ubuntu-latest-4-cores
timeout-minutes: 60
env:
PERSISTENCE_LSM_ACCESS_STRATEGY: ${{inputs.lsm_access_strategy}}
steps:
- uses: actions/checkout@v3
# - name: Polar Signals Continuous Profiling
# uses: polarsignals/[email protected]
# with:
# polarsignals_cloud_token: ${{ secrets.POLARSIGNALS_TOKEN }}
# labels: 'job=${{ github.job }};gh_run_id=${{ github.run_id }}'
- name: Login to Docker Hub
uses: docker/login-action@v2
with:
username: ${{secrets.DOCKER_USERNAME}}
password: ${{secrets.DOCKER_PASSWORD}}
- name: Run chaos test multi-tenant backup & restore
run: ./backup_and_restore_multi_node_multi_tenant.sh
backup-restore-version-compat:
name: Backup & Restore version compatibility
runs-on: ubuntu-latest-8-cores
Expand Down
14 changes: 14 additions & 0 deletions apps/backup_and_restore_multi_tenant/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM python:3.10-slim-bullseye

WORKDIR /workdir

ARG backend
ENV BACKUP_BACKEND_PROVIDER=${backend}

ARG expected_shard_count
ENV EXPECTED_SHARD_COUNT=${expected_shard_count}

COPY requirements.txt .
RUN pip3 install -r requirements.txt

COPY backup_and_restore_multi_tenant.py ./
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
import datetime
import numpy as np
import os
import random
import requests
import sys
import time
import uuid
import weaviate
from weaviate import Tenant, schema

from loguru import logger
from typing import Optional, List

WEAVIATE_PORT = 8080


def assert_expected_shard_count(client: weaviate.Client):
expected = os.environ.get("EXPECTED_SHARD_COUNT")
if expected is None or expected == "":
expected = 1
else:
expected = int(expected)

schema = client.schema.get()["classes"]
class_shard_counts = [
{"class": cls["class"], "actualCount": cls["shardingConfig"]["actualCount"]}
for cls in schema
]

logger.info(f"Expected shard count per class: {expected}")
logger.info(f"Actual shard counts: {class_shard_counts}")

assert len(class_shard_counts) > 0 and len(class_shard_counts) == len(schema)
assert all(list(map(lambda cls: cls["actualCount"] == expected, class_shard_counts)))


def other_classes(all_classes, self):
return [c for c in all_classes if c != self]


def reset_schema(client: weaviate.Client, class_names):
client.schema.delete_all()
for class_name in class_names:
class_obj = {
"vectorizer": "none",
"vectorIndexConfig": {
"efConstruction": 128,
"maxConnections": 16,
"ef": 256,
"cleanupIntervalSeconds": 10,
},
"class": class_name,
"invertedIndexConfig": {
"indexTimestamps": False,
},
"multiTenancyConfig": {"enabled": True},
"replicationConfig": {
"factor": 2,
},
"properties": [
{
"dataType": ["boolean"],
"name": "should_be_deleted",
},
{
"dataType": ["boolean"],
"name": "is_divisible_by_four",
},
{
"dataType": ["int"],
"name": "index_id",
},
{
"dataType": ["string"],
"name": "stage",
},
],
}

client.schema.create_class(class_obj)

for class_name in class_names:
for other in other_classes(class_names, class_name):
add_prop = {
"dataType": [
other,
],
"name": f"to_{other}",
}

client.schema.property.create(class_name, add_prop)

#assert_expected_shard_count(client)


def handle_errors(results: Optional[dict]) -> None:
"""
Handle error message from batch requests logs the message as an info message.
Parameters
----------
results : Optional[dict]
The returned results for Batch creation.
"""

if results is not None:
for result in results:
if (
"result" in result
and "errors" in result["result"]
and "error" in result["result"]["errors"]
):
for message in result["result"]["errors"]["error"]:
logger.error(message["message"])


def create_tenants(client: weaviate.Client, class_name: str, tenants: List[Tenant], stage="stage_1"):
client.schema.add_class_tenants(class_name=class_name, tenants=tenants)

def delete_records(client: weaviate.Client, class_name):
client.batch.delete_objects(
class_name=class_name,
where={"operator": "Equal", "path": ["should_be_deleted"], "valueBoolean": True},
output="minimal",
dry_run=False,
)


def fatal(msg):
logger.error(msg)
sys.exit(1)


def success(msg):
logger.success(msg)

def backend_provider():
backend = os.environ.get("BACKUP_BACKEND_PROVIDER")
if backend is None or backend == "":
return "filesystem"
return backend


def temp_backup_url_create():
return f"http://localhost:{WEAVIATE_PORT}/v1/backups/{backend_provider()}"


def temp_backup_url_create_status(backup_name):
return f"http://localhost:{WEAVIATE_PORT}/v1/backups/{backend_provider()}/{backup_name}"


def temp_backup_url_restore(backup_name):
return f"http://localhost:{WEAVIATE_PORT}/v1/backups/{backend_provider()}/{backup_name}/restore"


def temp_backup_url_restore_status(backup_name):
return f"http://localhost:{WEAVIATE_PORT}/v1/backups/{backend_provider()}/{backup_name}/restore"


def create_backup(client: weaviate.Client, name, max_num_connection_errors: int = 120):
create_body = {"id": name}
res = requests.post(temp_backup_url_create(), json=create_body)
if res.status_code > 399:
fatal(f"Backup Create returned status code {res.status_code} with body: {res.json()}")

num_connection_errors = 0
while True:
time.sleep(10)
try:
res = requests.get(temp_backup_url_create_status(name))
except requests.exceptions.ConnectionError as e:
# sometimes the server becomes unresponsive during the backup but comes back online later
num_connection_errors += 1
if num_connection_errors >= max_num_connection_errors:
raise e
logger.info(f"Failed to get backup status, will keep trying: {e}. {max_num_connection_errors - num_connection_errors} tries left.")
continue
res_json = res.json()
if res_json["status"] == "SUCCESS":
success(f"Backup creation successful")
break
if res_json["status"] == "FAILED":
fatal(f"Backup failed with res: {res_json}")


def restore_backup(client: weaviate.Client, name, max_num_connection_errors: int = 120):
restore_body = {"id": name}
res = requests.post(temp_backup_url_restore(name), json=restore_body)
if res.status_code > 399:
fatal(f"Backup Restore returned status code {res.status_code} with body: {res.json()}")

num_connection_errors = 0
while True:
time.sleep(10)
try:
res = requests.get(temp_backup_url_restore_status(name))
except requests.exceptions.ConnectionError as e:
# sometimes the server becomes unresponsive during the recoverery but comes back online later
num_connection_errors += 1
if num_connection_errors >= max_num_connection_errors:
raise e
logger.info(f"Failed to get restore status, will keep trying: {e}. {max_num_connection_errors - num_connection_errors} tries left.")
continue

res_json = res.json()
if res_json["status"] == "SUCCESS":
success(f"Restore succeeded")
break
if res_json["status"] == "FAILED":
fatal(f"Restore failed with res: {res_json}")


client = weaviate.Client(f"http://localhost:{WEAVIATE_PORT}")

backup_name = f"{int(datetime.datetime.now().timestamp())}_stage_1"

class_names = ["Class_A", "Class_B"]
# 60k total tenants
num_tenants_hot = 15_000
num_tenants_cold = num_tenants_hot * 2

logger.info(f"Step 0, reset everything, import schema")
reset_schema(client, class_names)

logger.info(f"Step 1, create {num_tenants_hot} hot tenants and {num_tenants_cold} cold tenants per class")
tenants = [
Tenant(name=f"{i}_tenant", activity_status=schema.TenantActivityStatus.HOT)
for i in range(num_tenants_hot)
] + [
Tenant(name=f"{i + num_tenants_hot}_tenant", activity_status=schema.TenantActivityStatus.COLD)
for i in range(num_tenants_cold)
]
for class_name in class_names:
create_tenants(
client,
class_name,
tenants=tenants,
stage="stage_1",
)


logger.info("Step 2, create backup of current instance including all classes")
create_backup(client, backup_name)

logger.info("Step 3, delete all classes")
client.schema.delete_all()

logger.info("Step 4, restore backup mark")
restore_backup(client, backup_name)

logger.info("Step 10, run test and make sure results are same as on original instance at stage 1")
for class_name in class_names:

actual_tenants = client.schema.get_class_tenants(class_name)
logger.info(f"{class_name}: {len(actual_tenants)} tenants")
assert len(actual_tenants) == len(tenants), f"Expected {tenants} tenants, but got {len(actual_tenants)}"

4 changes: 4 additions & 0 deletions apps/backup_and_restore_multi_tenant/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
weaviate-client>=3.9.0
numpy==1.22.2
loguru==0.5.3

57 changes: 57 additions & 0 deletions backup_and_restore_multi_node_multi_tenant.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash

set -e

function wait_weaviate_cluster() {
echo "Wait for Weaviate to be ready"
local node1_ready=false
local node2_ready=false
for _ in {1..120}; do
if curl -sf -o /dev/null localhost:8080/v1/.well-known/ready; then
echo "Weaviate node1 is ready"
node1_ready=true
fi

if curl -sf -o /dev/null localhost:8081; then
echo "Weaviate node2 is ready"
node2_ready=true
fi

if $node1_ready && $node2_ready; then
return 0
fi

echo "Weaviate cluster is not ready, trying again in 1s"
sleep 1
done
if ! $node1_ready; then
echo "ERROR: Weaviate node1 is not ready after 120s"
fi
if ! $node2_ready; then
echo "ERROR: Weaviate node2 is not ready after 120s"
fi
exit 1
}

echo "Building all required containers"
( cd apps/backup_and_restore_multi_tenant/ && docker build -t backup_and_restore_multi_tenant \
--build-arg backend="s3" --build-arg expected_shard_count=150000 . )

# verify WEAVIATE_VERSION is set
[ -z "$WEAVIATE_VERSION" ] && echo "ERROR: Need to set the WEAVIATE_VERSION env var to the docker tag for the image (e.g.: X.X.X-raft-XXXXXXX)" && exit 1

export WEAVIATE_NODE_1_VERSION=$WEAVIATE_VERSION
export WEAVIATE_NODE_2_VERSION=$WEAVIATE_VERSION

echo "Starting Weaviate..."
docker-compose -f apps/weaviate/docker-compose-backup.yml up -d weaviate-node-1 weaviate-node-2 backup-s3

wait_weaviate_cluster

echo "Creating S3 bucket..."
docker-compose -f apps/weaviate/docker-compose-backup.yml up create-s3-bucket

echo "Run multi-node backup and restore with Multi-tenant class"
docker run --network host -t backup_and_restore_multi_tenant python3 backup_and_restore_multi_tenant.py

echo "Passed!"
Loading