diff --git a/engine/clients/opensearch/config.py b/engine/clients/opensearch/config.py index 57001884..a1bc5bfa 100644 --- a/engine/clients/opensearch/config.py +++ b/engine/clients/opensearch/config.py @@ -1,4 +1,60 @@ -OPENSEARCH_PORT = 9200 -OPENSEARCH_INDEX = "bench" -OPENSEARCH_USER = "opensearch" -OPENSEARCH_PASSWORD = "passwd" +import os +import time + +from opensearchpy import OpenSearch + +OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", 9200)) +OPENSEARCH_INDEX = os.getenv("OPENSEARCH_INDEX", "bench") +OPENSEARCH_USER = os.getenv("OPENSEARCH_USER", "opensearch") +OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD", "passwd") +OPENSEARCH_TIMEOUT = int(os.getenv("OPENSEARCH_TIMEOUT", 300)) +OPENSEARCH_BULK_INDEX_TIMEOUT = int(os.getenv("OPENSEARCH_BULK_INDEX_TIMEOUT", 3600)) +OPENSEARCH_FULL_INDEX_TIMEOUT = int(os.getenv("OPENSEARCH_FULL_INDEX_TIMEOUT", 3600)) +OPENSEARCH_DELETE_INDEX_TIMEOUT = int( + os.getenv("OPENSEARCH_DELETE_INDEX_TIMEOUT", 1200) +) + + +def get_opensearch_client(host, connection_params): + init_params = { + **{ + "verify_certs": False, + "request_timeout": OPENSEARCH_TIMEOUT, + "retry_on_timeout": True, + # don't show warnings about ssl certs verification + "ssl_show_warn": False, + }, + **connection_params, + } + # Enabling basic auth on opensearch client + # If the user and password are empty we use anonymous auth on opensearch client + if OPENSEARCH_USER != "" and OPENSEARCH_PASSWORD != "": + init_params["basic_auth"] = (OPENSEARCH_USER, OPENSEARCH_PASSWORD) + if host.startswith("https"): + init_params["use_ssl"] = True + else: + init_params["use_ssl"] = False + if host.startswith("http"): + url = "" + else: + url = "http://" + url += f"{host}:{OPENSEARCH_PORT}" + client = OpenSearch( + url, + **init_params, + ) + assert client.ping() + return client + + +def _wait_for_es_status(client, status="yellow"): + print(f"waiting for OpenSearch cluster health {status} status...") + for _ in range(100): + try: + client.cluster.health(wait_for_status=status) + return client + except ConnectionError: + time.sleep(0.1) + else: + # timeout + raise Exception("OpenSearch failed to start.") diff --git a/engine/clients/opensearch/configure.py b/engine/clients/opensearch/configure.py index bd550917..7fd69d5e 100644 --- a/engine/clients/opensearch/configure.py +++ b/engine/clients/opensearch/configure.py @@ -1,14 +1,13 @@ -from opensearchpy import NotFoundError, OpenSearch +from opensearchpy import NotFoundError from benchmark.dataset import Dataset from engine.base_client import IncompatibilityError from engine.base_client.configure import BaseConfigurator from engine.base_client.distances import Distance from engine.clients.opensearch.config import ( + OPENSEARCH_DELETE_INDEX_TIMEOUT, OPENSEARCH_INDEX, - OPENSEARCH_PASSWORD, - OPENSEARCH_PORT, - OPENSEARCH_USER, + get_opensearch_client, ) @@ -16,6 +15,7 @@ class OpenSearchConfigurator(BaseConfigurator): DISTANCE_MAPPING = { Distance.L2: "l2", Distance.COSINE: "cosinesimil", + # innerproduct (supported for Lucene in OpenSearch version 2.13 and later) Distance.DOT: "innerproduct", } INDEX_TYPE_MAPPING = { @@ -25,44 +25,45 @@ class OpenSearchConfigurator(BaseConfigurator): def __init__(self, host, collection_params: dict, connection_params: dict): super().__init__(host, collection_params, connection_params) - init_params = { - **{ - "verify_certs": False, - "request_timeout": 90, - "retry_on_timeout": True, - }, - **connection_params, - } - self.client = OpenSearch( - f"http://{host}:{OPENSEARCH_PORT}", - basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD), - **init_params, - ) + self.client = get_opensearch_client(host, connection_params) def clean(self): try: self.client.indices.delete( index=OPENSEARCH_INDEX, params={ - "timeout": 300, + "timeout": OPENSEARCH_DELETE_INDEX_TIMEOUT, }, ) except NotFoundError: pass def recreate(self, dataset: Dataset, collection_params): - if dataset.config.distance == Distance.DOT: - raise IncompatibilityError - if dataset.config.vector_size > 1024: + # The knn_vector data type supports a vector of floats that can have a dimension count of up to 16,000 for the NMSLIB, Faiss, and Lucene engines, as set by the dimension mapping parameter. + # Source: https://opensearch.org/docs/latest/search-plugins/knn/approximate-knn/ + if dataset.config.vector_size > 16000: raise IncompatibilityError + index_settings = ( + { + "knn": True, + "number_of_replicas": 0, + "refresh_interval": -1, # no refresh is required because we index all the data at once + }, + ) + index_config = collection_params.get("index") + + # if we specify the number_of_shards on the config, enforce it. otherwise use the default + if "number_of_shards" in index_config: + index_settings["number_of_shards"] = 1 + + # Followed the bellow link for tuning for ingestion and querying + # https://opensearch.org/docs/1.1/search-plugins/knn/performance-tuning/#indexing-performance-tuning self.client.indices.create( index=OPENSEARCH_INDEX, body={ "settings": { - "index": { - "knn": True, - } + "index": index_settings, }, "mappings": { "properties": { diff --git a/engine/clients/opensearch/search.py b/engine/clients/opensearch/search.py index a3e36058..af909f40 100644 --- a/engine/clients/opensearch/search.py +++ b/engine/clients/opensearch/search.py @@ -2,15 +2,16 @@ import uuid from typing import List, Tuple +import backoff from opensearchpy import OpenSearch +from opensearchpy.exceptions import TransportError from dataset_reader.base_reader import Query from engine.base_client.search import BaseSearcher from engine.clients.opensearch.config import ( OPENSEARCH_INDEX, - OPENSEARCH_PASSWORD, - OPENSEARCH_PORT, - OPENSEARCH_USER, + OPENSEARCH_TIMEOUT, + get_opensearch_client, ) from engine.clients.opensearch.parser import OpenSearchConditionParser @@ -31,22 +32,21 @@ def get_mp_start_method(cls): @classmethod def init_client(cls, host, distance, connection_params: dict, search_params: dict): - init_params = { - **{ - "verify_certs": False, - "request_timeout": 90, - "retry_on_timeout": True, - }, - **connection_params, - } - cls.client: OpenSearch = OpenSearch( - f"http://{host}:{OPENSEARCH_PORT}", - basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD), - **init_params, - ) + cls.client = get_opensearch_client(host, connection_params) cls.search_params = search_params + def _search_backoff_handler(details): + print( + f"Backing off OpenSearch query for {details['wait']} seconds after {details['tries']} tries due to {details['exception']}" + ) + @classmethod + @backoff.on_exception( + backoff.expo, + TransportError, + max_time=OPENSEARCH_TIMEOUT, + on_backoff=_search_backoff_handler, + ) def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: opensearch_query = { "knn": { @@ -73,7 +73,7 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: "size": top, }, params={ - "timeout": 60, + "timeout": OPENSEARCH_TIMEOUT, }, ) return [ diff --git a/engine/clients/opensearch/upload.py b/engine/clients/opensearch/upload.py index 0bc2427e..ded5983e 100644 --- a/engine/clients/opensearch/upload.py +++ b/engine/clients/opensearch/upload.py @@ -2,15 +2,18 @@ import uuid from typing import List +import backoff from opensearchpy import OpenSearch +from opensearchpy.exceptions import TransportError from dataset_reader.base_reader import Record from engine.base_client.upload import BaseUploader from engine.clients.opensearch.config import ( + OPENSEARCH_BULK_INDEX_TIMEOUT, + OPENSEARCH_FULL_INDEX_TIMEOUT, OPENSEARCH_INDEX, - OPENSEARCH_PASSWORD, - OPENSEARCH_PORT, - OPENSEARCH_USER, + _wait_for_es_status, + get_opensearch_client, ) @@ -29,22 +32,26 @@ def get_mp_start_method(cls): @classmethod def init_client(cls, host, distance, connection_params, upload_params): - init_params = { - **{ - "verify_certs": False, - "request_timeout": 90, - "retry_on_timeout": True, - }, - **connection_params, - } - cls.client = OpenSearch( - f"http://{host}:{OPENSEARCH_PORT}", - basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD), - **init_params, - ) + cls.client = get_opensearch_client(host, connection_params) cls.upload_params = upload_params + def _upload_backoff_handler(details): + print( + f"Backing off OpenSearch bulk upload for {details['wait']} seconds after {details['tries']} tries due to {details['exception']}" + ) + + def _index_backoff_handler(details): + print( + f"Backing off OpenSearch indexing for {details['wait']} seconds after {details['tries']} tries due to {details['exception']}" + ) + @classmethod + @backoff.on_exception( + backoff.expo, + TransportError, + max_time=OPENSEARCH_FULL_INDEX_TIMEOUT, + on_backoff=_upload_backoff_handler, + ) def upload_batch(cls, batch: List[Record]): operations = [] for record in batch: @@ -56,16 +63,34 @@ def upload_batch(cls, batch: List[Record]): index=OPENSEARCH_INDEX, body=operations, params={ - "timeout": 300, + "timeout": OPENSEARCH_BULK_INDEX_TIMEOUT, }, ) @classmethod + @backoff.on_exception( + backoff.expo, + TransportError, + max_time=OPENSEARCH_FULL_INDEX_TIMEOUT, + on_backoff=_index_backoff_handler, + ) def post_upload(cls, _distance): - cls.client.indices.forcemerge( + print( + "Updated the index settings back to the default and waiting for indexing to be completed." + ) + # Update the index settings back to the default + refresh_interval = "1s" + cls.client.indices.put_settings( index=OPENSEARCH_INDEX, - params={ - "timeout": 300, - }, + body={"index": {"refresh_interval": refresh_interval}}, ) + _wait_for_es_status(cls.client) return {} + + def get_memory_usage(cls): + index_stats = cls.client.indices.stats(index=OPENSEARCH_INDEX) + size_in_bytes = index_stats["_all"]["primaries"]["store"]["size_in_bytes"] + return { + "size_in_bytes": size_in_bytes, + "index_info": index_stats, + } diff --git a/experiments/configurations/opensearch-single-node.json b/experiments/configurations/opensearch-single-node-default-index.json similarity index 100% rename from experiments/configurations/opensearch-single-node.json rename to experiments/configurations/opensearch-single-node-default-index.json diff --git a/experiments/configurations/opensearch-single-node-single-shard.json b/experiments/configurations/opensearch-single-node-single-shard.json new file mode 100644 index 00000000..d1789dda --- /dev/null +++ b/experiments/configurations/opensearch-single-node-single-shard.json @@ -0,0 +1,87 @@ +[ + { + "name": "opensearch-default", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 16, "ef_construction": 100 } } }, +"search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } + ], + "upload_params": { "parallel": 16 } + }, + { + "name": "opensearch-m-16-ef-128", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 16, "ef_construction": 128 } } }, + "search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + "upload_params": { "parallel": 16 } + }, + { + "name": "opensearch-m-32-ef-128", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 32, "ef_construction": 128 } } }, + "search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + "upload_params": { "parallel": 16 } + }, + { + "name": "opensearch-m-32-ef-256", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 32, "ef_construction": 256 } } }, + "search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + "upload_params": { "parallel": 16 } + }, + { + "name": "opensearch-m-32-ef-512", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 32, "ef_construction": 512 } } }, + "search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + "upload_params": { "parallel": 16 } + }, + { + "name": "opensearch-m-64-ef-256", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 64, "ef_construction": 256 } } }, + "search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + "upload_params": { "parallel": 16 } + }, + { + "name": "opensearch-m-64-ef-512", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 64, "ef_construction": 512 } } }, + "search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + "upload_params": { "parallel": 16 } + } +]