diff --git a/README.md b/README.md index 56ae8875..624d4a59 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,18 @@ Options: with-gt] --help Show this message and exit. ``` +#### OpenSearch run command example + +```shell +vectordbbench awsopensearch --db-label awsopensearch \ +--m 16 --ef-construction 256 \ +--host search-vector-db-prod-h4f6m4of6x7yp2rz7gdmots7w4.us-west-2.es.amazonaws.com --port 443 \ +--user vector --password '' \ +--case-type Performance1536D5M \ +--skip-load --num-concurrency 75 \ +--number-of-replicas 2 --index-thread-qty 4 --number-of-shards 3 +``` + #### Using a configuration file. The vectordbbench command can optionally read some or all the options from a yaml formatted configuration file. diff --git a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py index 234014f1..de928c36 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py +++ b/vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py @@ -52,10 +52,27 @@ def case_config_cls(cls, index_type: IndexType | None = None) -> AWSOpenSearchIn return AWSOpenSearchIndexConfig def _create_index(self, client: OpenSearch): + cluster_settings_body = { + "persistent": { + "knn.algo_param.index_thread_qty": self.case_config.index_thread_qty, + "knn.memory.circuit_breaker.limit": self.case_config.cb_threshold, + } + } + client.cluster.put_settings(cluster_settings_body) settings = { "index": { "knn": True, + "number_of_shards": self.case_config.number_of_shards, + "number_of_replicas": 0, + "translog.flush_threshold_size": self.case_config.flush_threshold_size, + # Setting trans log threshold to 5GB + **( + {"knn.algo_param.ef_search": self.case_config.ef_search} + if self.case_config.engine == AWSOS_Engine.nmslib + else {} + ), }, + "refresh_interval": self.case_config.refresh_interval, } mappings = { "properties": { @@ -145,9 +162,9 @@ def search_embedding( docvalue_fields=[self.id_col_name], stored_fields="_none_", ) - log.info(f"Search took: {resp['took']}") - log.info(f"Search shards: {resp['_shards']}") - log.info(f"Search hits total: {resp['hits']['total']}") + log.debug(f"Search took: {resp['took']}") + log.debug(f"Search shards: {resp['_shards']}") + log.debug(f"Search hits total: {resp['hits']['total']}") return [int(h["fields"][self.id_col_name][0]) for h in resp["hits"]["hits"]] except Exception as e: log.warning(f"Failed to search: {self.index_name} error: {e!s}") @@ -157,12 +174,39 @@ def optimize(self, data_size: int | None = None): """optimize will be called between insertion and search in performance cases.""" # Call refresh first to ensure that all segments are created self._refresh_index() - self._do_force_merge() + if self.case_config.force_merge_enabled: + self._do_force_merge() + self._refresh_index() + self._update_replicas() # Call refresh again to ensure that the index is ready after force merge. self._refresh_index() # ensure that all graphs are loaded in memory and ready for search self._load_graphs_to_memory() + def _update_replicas(self): + index_settings = self.client.indices.get_settings(index=self.index_name) + current_number_of_replicas = int(index_settings[self.index_name]["settings"]["index"]["number_of_replicas"]) + log.info( + f"Current Number of replicas are {current_number_of_replicas} and changing the replicas to {self.case_config.number_of_replicas}" + ) + settings_body = {"index": {"number_of_replicas": self.case_config.number_of_replicas}} + self.client.indices.put_settings(index=self.index_name, body=settings_body) + self._wait_till_green() + + def _wait_till_green(self): + log.info("Wait for index to become green..") + SECONDS_WAITING_FOR_REPLICAS_TO_BE_ENABLED_SEC = 30 + while True: + res = self.client.cat.indices(index=self.index_name, h="health", format="json") + health = res[0]["health"] + if health == "green": + break + else: + log.info(f"The index {self.index_name} has health : {health} and is not green. Retrying") + time.sleep(SECONDS_WAITING_FOR_REPLICAS_TO_BE_ENABLED_SEC) + continue + log.info(f"Index {self.index_name} is green..") + def _refresh_index(self): log.debug(f"Starting refresh for index {self.index_name}") while True: @@ -179,6 +223,12 @@ def _refresh_index(self): log.debug(f"Completed refresh for index {self.index_name}") def _do_force_merge(self): + log.info(f"Updating the Index thread qty to {self.case_config.index_thread_qty_during_force_merge}.") + + cluster_settings_body = { + "persistent": {"knn.algo_param.index_thread_qty": self.case_config.index_thread_qty_during_force_merge} + } + self.client.cluster.put_settings(cluster_settings_body) log.debug(f"Starting force merge for index {self.index_name}") force_merge_endpoint = f"/{self.index_name}/_forcemerge?max_num_segments=1&wait_for_completion=false" force_merge_task_id = self.client.transport.perform_request("POST", force_merge_endpoint)["task"] diff --git a/vectordb_bench/backend/clients/aws_opensearch/cli.py b/vectordb_bench/backend/clients/aws_opensearch/cli.py index bb0c2450..826e465c 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/cli.py +++ b/vectordb_bench/backend/clients/aws_opensearch/cli.py @@ -18,6 +18,73 @@ class AWSOpenSearchTypedDict(TypedDict): port: Annotated[int, click.option("--port", type=int, default=443, help="Db Port")] user: Annotated[str, click.option("--user", type=str, default="admin", help="Db User")] password: Annotated[str, click.option("--password", type=str, help="Db password")] + number_of_shards: Annotated[ + int, + click.option("--number-of-shards", type=int, help="Number of shards", default=1), + ] + number_of_replicas: Annotated[ + int, + click.option("--number-of-replicas", type=int, help="Number of replica", default=1), + ] + index_thread_qty: Annotated[ + int, + click.option( + "--index-thread-qty", + type=int, + help="Thread count for native engine indexing", + default=4, + ), + ] + + index_thread_qty_during_force_merge: Annotated[ + int, + click.option( + "--index-thread-qty-during-force-merge", + type=int, + help="Thread count for native engine indexing used during force merge", + default=4, + ), + ] + + number_of_segments: Annotated[ + int, + click.option("--number-of-segments", type=int, help="Number of segments", default=1), + ] + + refresh_interval: Annotated[ + int, + click.option("--refresh-interval", type=str, help="refresh-interval", default="60s"), + ] + + force_merge_enabled: Annotated[ + int, + click.option("--force-merge-enabled", type=bool, help="If we need to do force merge or not", default=True), + ] + + flush_threshold_size: Annotated[ + int, + click.option("--flush-threshold-size", type=str, help="Threshold for flushing translog", default="5120mb"), + ] + + number_of_indexing_clients: Annotated[ + int, + click.option( + "--number-of-indexing-clients", + type=int, + help="Number of indexing clients that should be used for indexing the data", + default=1, + ), + ] + + cb_threshold: Annotated[ + int, + click.option( + "--cb-threshold", + type=str, + help="k-NN Memory circuit breaker threshold", + default="50%", + ), + ] class AWSOpenSearchHNSWTypedDict(CommonTypedDict, AWSOpenSearchTypedDict, HNSWFlavor2): ... @@ -36,6 +103,17 @@ def AWSOpenSearch(**parameters: Unpack[AWSOpenSearchHNSWTypedDict]): user=parameters["user"], password=SecretStr(parameters["password"]), ), - db_case_config=AWSOpenSearchIndexConfig(), + db_case_config=AWSOpenSearchIndexConfig( + number_of_shards=parameters["number_of_shards"], + number_of_replicas=parameters["number_of_replicas"], + index_thread_qty=parameters["index_thread_qty"], + number_of_segments=parameters["number_of_segments"], + refresh_interval=parameters["refresh_interval"], + force_merge_enabled=parameters["force_merge_enabled"], + flush_threshold_size=parameters["flush_threshold_size"], + number_of_indexing_clients=parameters["number_of_indexing_clients"], + index_thread_qty_during_force_merge=parameters["index_thread_qty_during_force_merge"], + cb_threshold=parameters["cb_threshold"], + ), **parameters, ) diff --git a/vectordb_bench/backend/clients/aws_opensearch/config.py b/vectordb_bench/backend/clients/aws_opensearch/config.py index e9ccc727..dd51b266 100644 --- a/vectordb_bench/backend/clients/aws_opensearch/config.py +++ b/vectordb_bench/backend/clients/aws_opensearch/config.py @@ -39,6 +39,16 @@ class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig): efConstruction: int = 256 efSearch: int = 256 M: int = 16 + index_thread_qty: int | None = 4 + number_of_shards: int | None = 1 + number_of_replicas: int | None = 0 + number_of_segments: int | None = 1 + refresh_interval: str | None = "60s" + force_merge_enabled: bool | None = True + flush_threshold_size: str | None = "5120mb" + number_of_indexing_clients: int | None = 1 + index_thread_qty_during_force_merge: int + cb_threshold: str | None = "50%" def parse_metric(self) -> str: if self.metric_type == MetricType.IP: