Skip to content

Commit

Permalink
Modify filter_memory_leak to use async indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
rlmanrique committed Dec 23, 2024
1 parent 2247b2b commit f58e2e0
Show file tree
Hide file tree
Showing 24 changed files with 172 additions and 14 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/matrix.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,33 @@ jobs:
with:
lsm_access_strategy: ${{matrix.lsm_access_strategy}}
weaviate_version: ${{ github.event_name == 'schedule' && 'latest' || inputs.weaviate_version || '1.27.2' }}
async_indexing: "false"
secrets:
AWS_ACCESS_KEY: ${{secrets.AWS_ACCESS_KEY}}
AWS_SECRET_ACCESS_KEY: ${{secrets.AWS_SECRET_ACCESS_KEY}}
DOCKER_USERNAME: ${{secrets.DOCKER_USERNAME}}
DOCKER_PASSWORD: ${{secrets.DOCKER_PASSWORD}}
GCP_SERVICE_ACCOUNT_BENCHMARKS: ${{secrets.GCP_SERVICE_ACCOUNT_BENCHMARKS}}
POLARSIGNALS_TOKEN: ${{secrets.POLARSIGNALS_TOKEN}}

run-with-async-indexing:
strategy:
fail-fast: false
matrix:
lsm_access_strategy: ["mmap", "pread"]
uses: ./.github/workflows/tests.yaml
with:
lsm_access_strategy: ${{matrix.lsm_access_strategy}}
weaviate_version: ${{ github.event_name == 'schedule' && 'latest' || inputs.weaviate_version || '1.27.2' }}
async_indexing: "true"
secrets:
AWS_ACCESS_KEY: ${{secrets.AWS_ACCESS_KEY}}
AWS_SECRET_ACCESS_KEY: ${{secrets.AWS_SECRET_ACCESS_KEY}}
DOCKER_USERNAME: ${{secrets.DOCKER_USERNAME}}
DOCKER_PASSWORD: ${{secrets.DOCKER_PASSWORD}}
GCP_SERVICE_ACCOUNT_BENCHMARKS: ${{secrets.GCP_SERVICE_ACCOUNT_BENCHMARKS}}
POLARSIGNALS_TOKEN: ${{secrets.POLARSIGNALS_TOKEN}}

send-slack-message-on-failure:
needs: [weaviate-version-information, run-with-sync-indexing]
if: failure() && github.event_name == 'schedule'
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ on:
lsm_access_strategy:
required: true
type: string
async_indexing:
required: true
type: string
secrets:
AWS_ACCESS_KEY:
required: true
Expand All @@ -26,6 +29,7 @@ on:
env:
WEAVIATE_VERSION: ${{ inputs.weaviate_version }}
DISABLE_RECOVERY_ON_PANIC: true
ASYNC_INDEXING: ${{ inputs.async_indexing }}

jobs:
real-version-in-tag:
Expand Down Expand Up @@ -427,6 +431,7 @@ jobs:
PERSISTENCE_LSM_ACCESS_STRATEGY: ${{inputs.lsm_access_strategy}}
NUM_NODES: 1
MINIMUM_WEAVIATE_VERSION: 1.22.0
ASYNC_INDEXING: ${{inputs.async_indexing}}
steps:
- uses: actions/checkout@v3
# - name: Polar Signals Continuous Profiling
Expand Down
2 changes: 1 addition & 1 deletion ann_benchmark_aws.sh
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,6 @@ ssh -i "${key_id}.pem" $ssh_addr -- "mkdir -p ~/apps/"
scp -i "${key_id}.pem" -r apps/ann-benchmarks "$ssh_addr:~/apps/"
scp -i "${key_id}.pem" -r apps/weaviate-no-restart-on-crash/ "$ssh_addr:~/apps/"
scp -i "${key_id}.pem" -r ann_benchmark.sh "$ssh_addr:~"
ssh -i "${key_id}.pem" $ssh_addr -- "DATASET=$dataset DISTANCE=$distance REQUIRED_RECALL=$REQUIRED_RECALL WEAVIATE_VERSION=$WEAVIATE_VERSION MACHINE_TYPE=$MACHINE_TYPE CLOUD_PROVIDER=$CLOUD_PROVIDER OS=$OS bash ann_benchmark.sh"
ssh -i "${key_id}.pem" $ssh_addr -- "ASYNC_INDEXING=$ASYNC_INDEXING DATASET=$dataset DISTANCE=$distance REQUIRED_RECALL=$REQUIRED_RECALL WEAVIATE_VERSION=$WEAVIATE_VERSION MACHINE_TYPE=$MACHINE_TYPE CLOUD_PROVIDER=$CLOUD_PROVIDER OS=$OS bash ann_benchmark.sh"
mkdir -p results
scp -i "${key_id}.pem" -r "$ssh_addr:~/results/*.json" results/
2 changes: 1 addition & 1 deletion ann_benchmark_gcp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ gcloud compute ssh --zone $ZONE $instance -- "mkdir -p ~/apps/"
gcloud compute scp --zone $ZONE --recurse apps/ann-benchmarks "$instance:~/apps/"
gcloud compute scp --zone $ZONE --recurse apps/weaviate-no-restart-on-crash/ "$instance:~/apps/"
gcloud compute scp --zone $ZONE --recurse ann_benchmark.sh "$instance:~"
gcloud compute ssh --zone $ZONE $instance -- "DATASET=$DATASET DISTANCE=$DISTANCE REQUIRED_RECALL=$REQUIRED_RECALL WEAVIATE_VERSION=$WEAVIATE_VERSION MACHINE_TYPE=$MACHINE_TYPE CLOUD_PROVIDER=$CLOUD_PROVIDER OS=$OS bash ann_benchmark.sh"
gcloud compute ssh --zone $ZONE $instance -- "ASYNC_INDEXING=$ASYNC_INDEXING DATASET=$DATASET DISTANCE=$DISTANCE REQUIRED_RECALL=$REQUIRED_RECALL WEAVIATE_VERSION=$WEAVIATE_VERSION MACHINE_TYPE=$MACHINE_TYPE CLOUD_PROVIDER=$CLOUD_PROVIDER OS=$OS bash ann_benchmark.sh"
mkdir -p results
gcloud compute scp --zone $ZONE --recurse "$instance:~/results/*.json" results/
2 changes: 1 addition & 1 deletion ann_benchmark_quantization_aws.sh
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,6 @@ ssh -i "${key_id}.pem" $ssh_addr -- "mkdir -p ~/apps/"
scp -i "${key_id}.pem" -r apps/ann-benchmarks "$ssh_addr:~/apps/"
scp -i "${key_id}.pem" -r apps/weaviate-no-restart-on-crash/ "$ssh_addr:~/apps/"
scp -i "${key_id}.pem" -r ann_benchmark_quantization.sh "$ssh_addr:~"
ssh -i "${key_id}.pem" $ssh_addr -- "DATASET=$dataset DISTANCE=$distance REQUIRED_RECALL=$REQUIRED_RECALL QUANTIZATION=$QUANTIZATION WEAVIATE_VERSION=$WEAVIATE_VERSION MACHINE_TYPE=$MACHINE_TYPE CLOUD_PROVIDER=$CLOUD_PROVIDER OS=$OS bash ann_benchmark_quantization.sh"
ssh -i "${key_id}.pem" $ssh_addr -- "ASYNC_INDEXING=$ASYNC_INDEXING DATASET=$dataset DISTANCE=$distance REQUIRED_RECALL=$REQUIRED_RECALL QUANTIZATION=$QUANTIZATION WEAVIATE_VERSION=$WEAVIATE_VERSION MACHINE_TYPE=$MACHINE_TYPE CLOUD_PROVIDER=$CLOUD_PROVIDER OS=$OS bash ann_benchmark_quantization.sh"
mkdir -p results
scp -i "${key_id}.pem" -r "$ssh_addr:~/results/*.json" results/
2 changes: 1 addition & 1 deletion ann_benchmark_quantization_gcp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ gcloud compute ssh --zone $ZONE $instance -- "mkdir -p ~/apps/"
gcloud compute scp --zone $ZONE --recurse apps/ann-benchmarks "$instance:~/apps/"
gcloud compute scp --zone $ZONE --recurse apps/weaviate-no-restart-on-crash/ "$instance:~/apps/"
gcloud compute scp --zone $ZONE --recurse ann_benchmark_quantization.sh "$instance:~"
gcloud compute ssh --zone $ZONE $instance -- "DATASET=$DATASET DISTANCE=$DISTANCE REQUIRED_RECALL=$REQUIRED_RECALL QUANTIZATION=$QUANTIZATION WEAVIATE_VERSION=$WEAVIATE_VERSION MACHINE_TYPE=$MACHINE_TYPE CLOUD_PROVIDER=$CLOUD_PROVIDER OS=$OS bash ann_benchmark_quantization.sh"
gcloud compute ssh --zone $ZONE $instance -- "ASYNC_INDEXING=$ASYNC_INDEXING DATASET=$DATASET DISTANCE=$DISTANCE REQUIRED_RECALL=$REQUIRED_RECALL QUANTIZATION=$QUANTIZATION WEAVIATE_VERSION=$WEAVIATE_VERSION MACHINE_TYPE=$MACHINE_TYPE CLOUD_PROVIDER=$CLOUD_PROVIDER OS=$OS bash ann_benchmark_quantization.sh"
mkdir -p results
gcloud compute scp --zone $ZONE --recurse "$instance:~/results/*.json" results/
5 changes: 3 additions & 2 deletions apps/ann-benchmarks/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import os
from datetime import timedelta

from weaviate_import import reset_schema, load_records
from weaviate_import import reset_schema, load_records, wait_for_all_shards_ready
from weaviate_query import query

values = {
Expand Down Expand Up @@ -107,7 +107,8 @@
)
logger.info(f"Waiting 30s for compactions to settle, etc")
time.sleep(30)

logger.info(f"Waiting for all shards to be ready")
wait_for_all_shards_ready(client)
logger.info(f"Starting querying for efC={efC}, m={m}, shards={shards}")
query(
client,
Expand Down
12 changes: 7 additions & 5 deletions apps/ann-benchmarks/weaviate_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def load_records(
)
)

wait_for_all_shards_ready(collection)
wait_for_all_shards_ready(client)

i = 100000
with client.batch.fixed_size(batch_size=batch_size) as batch:
Expand All @@ -110,12 +110,11 @@ def load_records(
logger.info(f"Finished writing {len_objects} records")


def wait_for_all_shards_ready(collection: weaviate.collections.Collection):
def wait_for_all_shards_ready(client: weaviate.WeaviateClient, timeout=600):
collection = client.collections.get(class_name)
status = [s.status for s in collection.config.get_shards()]
if not all(s == "READONLY" for s in status):
raise Exception(f"shards are not READONLY at beginning: {status}")

max_wait = 300
max_wait = timeout
before = time.time()

while True:
Expand All @@ -126,4 +125,7 @@ def wait_for_all_shards_ready(collection: weaviate.collections.Collection):
return

if time.time() - before > max_wait:
logger.error(f"Shards not ready. Timeout reached")
raise Exception(f"after {max_wait}s not all shards READY: {status}")

logger.info(f"Shards not ready. Waiting for {time.time() - before}s")
1 change: 1 addition & 0 deletions apps/geo-crash/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def load_records(client: weaviate.Client, start=0, end=100_000):
class_name="GeoClass",
uuid=uuid.UUID(int=i),
)
batch.wait_for_vector_indexing()
logger.info(f"Finished writing {end-start} records")


Expand Down
25 changes: 25 additions & 0 deletions apps/get_env_var.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import docker

def get_env_variables(partial_image_name):
client = docker.from_env()
# List all running containers
containers = client.containers.list()

# Filter containers where the image name matches the partial name
matching_containers = [
container for container in containers if partial_image_name in container.image.tags[0]
]

if not matching_containers:
print(f"No running containers found matching image name: {partial_image_name}")
return

# Retrieve and print environment variables for each matching container
for container in matching_containers:
print(f"Environment variables for container {container.short_id}:")
exec_result = container.exec_run("env", stdout=True, stderr=True)
print(exec_result.output.decode())
print("--------------------------------------------")

# Replace with a partial image name
get_env_variables("weaviate")
1 change: 1 addition & 0 deletions apps/upgrade-journey/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (c *cluster) startWeaviateNode(ctx context.Context, nodeId int, version str
"RAFT_JOIN": fmt.Sprintf("%s:8300", c.hostname(nodeId)),
"RAFT_BOOTSTRAP_EXPECT": "1",
"PERSISTENCE_LSM_ACCESS_STRATEGY": os.Getenv("PERSISTENCE_LSM_ACCESS_STRATEGY"),
"ASYNC_INDEXING": os.Getenv("ASYNC_INDEXING"),
},
Mounts: testcontainers.Mounts(testcontainers.BindMount(
c.volumePath(nodeId), "/var/lib/weaviate",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ services:
volumes:
- "$PWD/apps/weaviate/data:/var/lib/weaviate"
environment:
ASYNC_INDEXING: ${ASYNC_INDEXING}
PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
PERSISTENCE_FLUSH_IDLE_MEMTABLES_AFTER: 2
QUERY_DEFAULTS_LIMIT: 25
Expand Down
1 change: 1 addition & 0 deletions apps/weaviate-no-restart-on-crash/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ services:
volumes:
- "$PWD/apps/weaviate/data:/var/lib/weaviate"
environment:
- ASYNC_INDEXING
- PERSISTENCE_DATA_PATH
- PERSISTENCE_MEMTABLES_FLUSH_IDLE_AFTER_SECONDS
- QUERY_DEFAULTS_LIMIT
Expand Down
1 change: 1 addition & 0 deletions apps/weaviate/docker-compose-backup.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ services:
- 6060:6060
restart: on-failure:0
environment:
ASYNC_INDEXING: '${ASYNC_INDEXING}'
LOG_LEVEL: 'debug'
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
Expand Down
1 change: 1 addition & 0 deletions apps/weaviate/docker-compose-c11y.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ services:
volumes:
- "$PWD/apps/weaviate/data:/var/lib/weaviate"
environment:
ASYNC_INDEXING: '${ASYNC_INDEXING}'
CONTEXTIONARY_URL: contextionary:9999
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
Expand Down
1 change: 1 addition & 0 deletions apps/weaviate/docker-compose-cpu-constrained.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ services:
- 50051:50051
restart: on-failure:0
environment:
ASYNC_INDEXING: '${ASYNC_INDEXING}'
LOG_LEVEL: 'debug'
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
Expand Down
1 change: 1 addition & 0 deletions apps/weaviate/docker-compose-replication.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ services:
volumes:
- "$PWD/apps/weaviate/data-node-1:/var/lib/weaviate"
environment:
ASYNC_INDEXING: '${ASYNC_INDEXING}'
LOG_LEVEL: 'debug'
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
Expand Down
1 change: 1 addition & 0 deletions apps/weaviate/docker-compose-replication_single_voter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ services:
volumes:
- "$PWD/apps/weaviate/data-node-1:/var/lib/weaviate"
environment:
ASYNC_INDEXING: '${ASYNC_INDEXING}'
LOG_LEVEL: 'debug'
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ services:
volumes:
- "$PWD/apps/weaviate/data:/var/lib/weaviate"
environment:
ASYNC_INDEXING: '${ASYNC_INDEXING}'
LOG_LEVEL: 'debug'
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
Expand Down
1 change: 1 addition & 0 deletions apps/weaviate/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ services:
volumes:
- "$PWD/apps/weaviate/data:/var/lib/weaviate"
environment:
ASYNC_INDEXING: '${ASYNC_INDEXING}'
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
Expand Down
73 changes: 72 additions & 1 deletion common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,75 @@ function shutdown() {

trap 'logs; shutdown; exit 1' SIGINT ERR

trap '[[ $? -eq 1 ]] && logs; shutdown' EXIT
trap '[[ $? -eq 1 ]] && logs; shutdown' EXIT


function wait_for_indexing() {
local weaviate_url="$1"
local timeout="${2:-300}"
local interval="${3:-5}"
local check_after="${4:-0}"

local nodes_url="${weaviate_url}/v1/nodes?output=verbose"

if [[ $check_after -gt 0 ]]; then
echo "Waiting ${check_after} seconds before starting checks..."
sleep "$check_after"
fi

local start_time=$(date +%s)

while true; do
response=$(curl -s -f "$nodes_url")
if [[ $? -ne 0 ]]; then
echo "Error: Failed to fetch nodes data from $nodes_url"
return 1
fi

# Check if all shards have vectorIndexingStatus set to READY
all_ready=true
shards_status=$(echo "$response" | jq -c '.nodes[]?.shards[]? | .vectorIndexingStatus' 2>/dev/null)

if [[ $? -ne 0 || -z "$shards_status" ]]; then
echo "Error: Unable to process shards data."
return 1
fi

while read -r status; do
if [[ "$status" != "\"READY\"" ]]; then
all_ready=false
echo "Shard with status $status is not ready yet."
break
fi
done <<< "$shards_status"

if $all_ready; then
echo "All shards are READY."
return 0
fi

current_time=$(date +%s)
if (( current_time - start_time > timeout )); then
echo "Timeout reached before all shards were READY."
return 1
fi

echo "Some shards are not READY. Retrying in ${interval} seconds..."
sleep "$interval"
done
}


function get_env_vars() {
PARTIAL_IMAGE_NAME="weaviate"
CONTAINER_IDS=$(docker ps --filter "name=$PARTIAL_IMAGE_NAME" --format "{{.ID}}")
if [ -z "$CONTAINER_IDS" ]; then
echo "No running containers found matching image name: $PARTIAL_IMAGE_NAME"
return 1
fi
for CONTAINER_ID in $CONTAINER_IDS; do
echo "Fetching environment variables for container ID: $CONTAINER_ID"
docker exec -i "$CONTAINER_ID" env
done
return 0
}
23 changes: 23 additions & 0 deletions get_env_var.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import docker


def get_env_variables(partial_image_name):
client = docker.from_env()
containers = client.containers.list()

matching_containers = [
container for container in containers if partial_image_name in container.image.tags[0]
]

if not matching_containers:
print(f"No running containers found matching image name: {partial_image_name}")
return

for container in matching_containers:
print(f"Environment variables for container {container.short_id}:")
exec_result = container.exec_run("env", stdout=True, stderr=True)
print(exec_result.output.decode())
print("--------------------------------------------")


get_env_variables("weaviate")
2 changes: 1 addition & 1 deletion update_stability.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ docker run --network host -t -v "$PWD/results:/app/results" -v "$PWD/datasets:/a
-e "UPDATE_ITERATIONS=$UPDATE_ITERATIONS" \
-e "REQUIRED_RECALL=$REQUIRED_RECALL" \
-e "WEAVIATE_URL=localhost" \
semitechnologies/weaviate-benchmarker:v2.0.0 /app/scripts/shell/update_stability.sh
rlmanriqueatweaviate/benchmarker:async /app/scripts/shell/update_stability.sh

echo "Stopping Weaviate..."
docker compose -f $COMPOSE down
Expand Down

0 comments on commit f58e2e0

Please sign in to comment.