Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Opensearch configurations #461

Closed
wants to merge 9 commits into from
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,18 @@ Options:
with-gt]
--help Show this message and exit.
```
#### OpenSearch run command example

```shell
vectordbbench awsopensearch --db-label awsopensearch \
--m 16 --ef-construction 256 \
--host search-vector-db-prod-h4f6m4of6x7yp2rz7gdmots7w4.us-west-2.es.amazonaws.com --port 443 \
--user vector --password '<password>' \
--case-type Performance1536D5M \
--skip-load --num-concurrency 75 \
--number-of-replicas 2 --index-thread-qty 4 --number-of-shards 3
```

#### Using a configuration file.

The vectordbbench command can optionally read some or all the options from a yaml formatted configuration file.
Expand Down
58 changes: 54 additions & 4 deletions vectordb_bench/backend/clients/aws_opensearch/aws_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,27 @@ def case_config_cls(cls, index_type: IndexType | None = None) -> AWSOpenSearchIn
return AWSOpenSearchIndexConfig

def _create_index(self, client: OpenSearch):
cluster_settings_body = {
"persistent": {
"knn.algo_param.index_thread_qty": self.case_config.index_thread_qty,
"knn.memory.circuit_breaker.limit": self.case_config.cb_threshold,
}
}
client.cluster.put_settings(cluster_settings_body)
settings = {
"index": {
"knn": True,
"number_of_shards": self.case_config.number_of_shards,
"number_of_replicas": 0,
"translog.flush_threshold_size": self.case_config.flush_threshold_size,
# Setting trans log threshold to 5GB
**(
{"knn.algo_param.ef_search": self.case_config.ef_search}
if self.case_config.engine == AWSOS_Engine.nmslib
else {}
),
},
"refresh_interval": self.case_config.refresh_interval,
}
mappings = {
"properties": {
Expand Down Expand Up @@ -145,9 +162,9 @@ def search_embedding(
docvalue_fields=[self.id_col_name],
stored_fields="_none_",
)
log.info(f"Search took: {resp['took']}")
log.info(f"Search shards: {resp['_shards']}")
log.info(f"Search hits total: {resp['hits']['total']}")
log.debug(f"Search took: {resp['took']}")
log.debug(f"Search shards: {resp['_shards']}")
log.debug(f"Search hits total: {resp['hits']['total']}")
return [int(h["fields"][self.id_col_name][0]) for h in resp["hits"]["hits"]]
except Exception as e:
log.warning(f"Failed to search: {self.index_name} error: {e!s}")
Expand All @@ -157,12 +174,39 @@ def optimize(self, data_size: int | None = None):
"""optimize will be called between insertion and search in performance cases."""
# Call refresh first to ensure that all segments are created
self._refresh_index()
self._do_force_merge()
if self.case_config.force_merge_enabled:
self._do_force_merge()
self._refresh_index()
self._update_replicas()
# Call refresh again to ensure that the index is ready after force merge.
self._refresh_index()
# ensure that all graphs are loaded in memory and ready for search
self._load_graphs_to_memory()

def _update_replicas(self):
index_settings = self.client.indices.get_settings(index=self.index_name)
current_number_of_replicas = int(index_settings[self.index_name]["settings"]["index"]["number_of_replicas"])
log.info(
f"Current Number of replicas are {current_number_of_replicas} and changing the replicas to {self.case_config.number_of_replicas}"
)
settings_body = {"index": {"number_of_replicas": self.case_config.number_of_replicas}}
self.client.indices.put_settings(index=self.index_name, body=settings_body)
self._wait_till_green()

def _wait_till_green(self):
log.info("Wait for index to become green..")
SECONDS_WAITING_FOR_REPLICAS_TO_BE_ENABLED_SEC = 30
while True:
res = self.client.cat.indices(index=self.index_name, h="health", format="json")
health = res[0]["health"]
if health == "green":
break
else:
log.info(f"The index {self.index_name} has health : {health} and is not green. Retrying")
time.sleep(SECONDS_WAITING_FOR_REPLICAS_TO_BE_ENABLED_SEC)
continue
log.info(f"Index {self.index_name} is green..")

def _refresh_index(self):
log.debug(f"Starting refresh for index {self.index_name}")
while True:
Expand All @@ -179,6 +223,12 @@ def _refresh_index(self):
log.debug(f"Completed refresh for index {self.index_name}")

def _do_force_merge(self):
log.info(f"Updating the Index thread qty to {self.case_config.index_thread_qty_during_force_merge}.")

cluster_settings_body = {
"persistent": {"knn.algo_param.index_thread_qty": self.case_config.index_thread_qty_during_force_merge}
}
self.client.cluster.put_settings(cluster_settings_body)
log.debug(f"Starting force merge for index {self.index_name}")
force_merge_endpoint = f"/{self.index_name}/_forcemerge?max_num_segments=1&wait_for_completion=false"
force_merge_task_id = self.client.transport.perform_request("POST", force_merge_endpoint)["task"]
Expand Down
80 changes: 79 additions & 1 deletion vectordb_bench/backend/clients/aws_opensearch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,73 @@ class AWSOpenSearchTypedDict(TypedDict):
port: Annotated[int, click.option("--port", type=int, default=443, help="Db Port")]
user: Annotated[str, click.option("--user", type=str, default="admin", help="Db User")]
password: Annotated[str, click.option("--password", type=str, help="Db password")]
number_of_shards: Annotated[
int,
click.option("--number-of-shards", type=int, help="Number of shards", default=1),
]
number_of_replicas: Annotated[
int,
click.option("--number-of-replicas", type=int, help="Number of replica", default=1),
]
index_thread_qty: Annotated[
int,
click.option(
"--index-thread-qty",
type=int,
help="Thread count for native engine indexing",
default=4,
),
]

index_thread_qty_during_force_merge: Annotated[
int,
click.option(
"--index-thread-qty-during-force-merge",
type=int,
help="Thread count for native engine indexing used during force merge",
default=4,
),
]

number_of_segments: Annotated[
int,
click.option("--number-of-segments", type=int, help="Number of segments", default=1),
]

refresh_interval: Annotated[
int,
click.option("--refresh-interval", type=str, help="refresh-interval", default="60s"),
]

force_merge_enabled: Annotated[
int,
click.option("--force-merge-enabled", type=bool, help="If we need to do force merge or not", default=True),
]

flush_threshold_size: Annotated[
int,
click.option("--flush-threshold-size", type=str, help="Threshold for flushing translog", default="5120mb"),
]

number_of_indexing_clients: Annotated[
int,
click.option(
"--number-of-indexing-clients",
type=int,
help="Number of indexing clients that should be used for indexing the data",
default=1,
),
]

cb_threshold: Annotated[
int,
click.option(
"--cb-threshold",
type=str,
help="k-NN Memory circuit breaker threshold",
default="50%",
),
]


class AWSOpenSearchHNSWTypedDict(CommonTypedDict, AWSOpenSearchTypedDict, HNSWFlavor2): ...
Expand All @@ -36,6 +103,17 @@ def AWSOpenSearch(**parameters: Unpack[AWSOpenSearchHNSWTypedDict]):
user=parameters["user"],
password=SecretStr(parameters["password"]),
),
db_case_config=AWSOpenSearchIndexConfig(),
db_case_config=AWSOpenSearchIndexConfig(
number_of_shards=parameters["number_of_shards"],
number_of_replicas=parameters["number_of_replicas"],
index_thread_qty=parameters["index_thread_qty"],
number_of_segments=parameters["number_of_segments"],
refresh_interval=parameters["refresh_interval"],
force_merge_enabled=parameters["force_merge_enabled"],
flush_threshold_size=parameters["flush_threshold_size"],
number_of_indexing_clients=parameters["number_of_indexing_clients"],
index_thread_qty_during_force_merge=parameters["index_thread_qty_during_force_merge"],
cb_threshold=parameters["cb_threshold"],
),
**parameters,
)
10 changes: 10 additions & 0 deletions vectordb_bench/backend/clients/aws_opensearch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ class AWSOpenSearchIndexConfig(BaseModel, DBCaseConfig):
efConstruction: int = 256
efSearch: int = 256
M: int = 16
index_thread_qty: int | None = 4
number_of_shards: int | None = 1
number_of_replicas: int | None = 0
number_of_segments: int | None = 1
refresh_interval: str | None = "60s"
force_merge_enabled: bool | None = True
flush_threshold_size: str | None = "5120mb"
number_of_indexing_clients: int | None = 1
index_thread_qty_during_force_merge: int
cb_threshold: str | None = "50%"

def parse_metric(self) -> str:
if self.metric_type == MetricType.IP:
Expand Down
Loading