-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OpenSearch integration improvements #139
Open
filipecosta90
wants to merge
22
commits into
qdrant:master
Choose a base branch
from
redis-performance:opensearch.improvements
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
4964b7b
opensearch improvements
filipecosta90 fbe689e
Fixes per PR linter
filipecosta90 91d3dd0
Fixes per ruff linter
filipecosta90 acb11f1
Increase the vector limit to 16K given the latest docs
filipecosta90 e05c145
Removed the dotproduct incompatibility error given opensearch now sup…
filipecosta90 c82c5e9
Added source for IncompatibilityError on vector size
filipecosta90 d98196d
Only using basic_auth when we have opensearch login data (this allows…
filipecosta90 3fb15b9
Only using basic_auth when we have opensearch login data (this allows…
filipecosta90 0555aa4
Detecting ssl features from url on opensearch client
filipecosta90 fa3eb76
Fixed OpenSearch connection setup
filipecosta90 8ea777a
Waiting for yellow status at least on opensearch post upload stage
filipecosta90 aec4967
Fixes per PR pre-commit: isort
filipecosta90 e8b5764
Fixed forcemerge api usage on opensearch
filipecosta90 e500000
Renamed references to ES
filipecosta90 4f5937a
Added backoff strategy for search_one method on opensearch client
filipecosta90 f83bc75
Fixes per PR pre-commit: isort
filipecosta90 abd8637
Added backoff strategy for search_one method on opensearch client
filipecosta90 ae5b620
Improved index and search performance based uppon docs recommendation
filipecosta90 5679a19
Collecting index stats at end of ingestion
filipecosta90 276892c
Using backoff on opensearch ingestion
filipecosta90 2c54762
Included single shard experiment for opensearch. Added backoff to pos…
filipecosta90 7292e92
Fixes per PR pre-commit: isort
filipecosta90 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,60 @@ | ||
OPENSEARCH_PORT = 9200 | ||
OPENSEARCH_INDEX = "bench" | ||
OPENSEARCH_USER = "opensearch" | ||
OPENSEARCH_PASSWORD = "passwd" | ||
import os | ||
import time | ||
|
||
from opensearchpy import OpenSearch | ||
|
||
OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", 9200)) | ||
OPENSEARCH_INDEX = os.getenv("OPENSEARCH_INDEX", "bench") | ||
OPENSEARCH_USER = os.getenv("OPENSEARCH_USER", "opensearch") | ||
OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD", "passwd") | ||
OPENSEARCH_TIMEOUT = int(os.getenv("OPENSEARCH_TIMEOUT", 300)) | ||
OPENSEARCH_BULK_INDEX_TIMEOUT = int(os.getenv("OPENSEARCH_BULK_INDEX_TIMEOUT", 3600)) | ||
OPENSEARCH_FULL_INDEX_TIMEOUT = int(os.getenv("OPENSEARCH_FULL_INDEX_TIMEOUT", 3600)) | ||
OPENSEARCH_DELETE_INDEX_TIMEOUT = int( | ||
os.getenv("OPENSEARCH_DELETE_INDEX_TIMEOUT", 1200) | ||
) | ||
|
||
|
||
def get_opensearch_client(host, connection_params): | ||
init_params = { | ||
**{ | ||
"verify_certs": False, | ||
"request_timeout": OPENSEARCH_TIMEOUT, | ||
"retry_on_timeout": True, | ||
# don't show warnings about ssl certs verification | ||
"ssl_show_warn": False, | ||
}, | ||
**connection_params, | ||
} | ||
# Enabling basic auth on opensearch client | ||
# If the user and password are empty we use anonymous auth on opensearch client | ||
if OPENSEARCH_USER != "" and OPENSEARCH_PASSWORD != "": | ||
init_params["basic_auth"] = (OPENSEARCH_USER, OPENSEARCH_PASSWORD) | ||
if host.startswith("https"): | ||
init_params["use_ssl"] = True | ||
else: | ||
init_params["use_ssl"] = False | ||
if host.startswith("http"): | ||
url = "" | ||
else: | ||
url = "http://" | ||
url += f"{host}:{OPENSEARCH_PORT}" | ||
client = OpenSearch( | ||
url, | ||
**init_params, | ||
) | ||
assert client.ping() | ||
return client | ||
|
||
|
||
def _wait_for_es_status(client, status="yellow"): | ||
print(f"waiting for OpenSearch cluster health {status} status...") | ||
for _ in range(100): | ||
try: | ||
client.cluster.health(wait_for_status=status) | ||
return client | ||
except ConnectionError: | ||
time.sleep(0.1) | ||
else: | ||
# timeout | ||
raise Exception("OpenSearch failed to start.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,15 +2,18 @@ | |
import uuid | ||
from typing import List | ||
|
||
import backoff | ||
from opensearchpy import OpenSearch | ||
from opensearchpy.exceptions import TransportError | ||
|
||
from dataset_reader.base_reader import Record | ||
from engine.base_client.upload import BaseUploader | ||
from engine.clients.opensearch.config import ( | ||
OPENSEARCH_BULK_INDEX_TIMEOUT, | ||
OPENSEARCH_FULL_INDEX_TIMEOUT, | ||
OPENSEARCH_INDEX, | ||
OPENSEARCH_PASSWORD, | ||
OPENSEARCH_PORT, | ||
OPENSEARCH_USER, | ||
_wait_for_es_status, | ||
get_opensearch_client, | ||
) | ||
|
||
|
||
|
@@ -29,22 +32,26 @@ def get_mp_start_method(cls): | |
|
||
@classmethod | ||
def init_client(cls, host, distance, connection_params, upload_params): | ||
init_params = { | ||
**{ | ||
"verify_certs": False, | ||
"request_timeout": 90, | ||
"retry_on_timeout": True, | ||
}, | ||
**connection_params, | ||
} | ||
cls.client = OpenSearch( | ||
f"http://{host}:{OPENSEARCH_PORT}", | ||
basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD), | ||
**init_params, | ||
) | ||
cls.client = get_opensearch_client(host, connection_params) | ||
cls.upload_params = upload_params | ||
|
||
def _upload_backoff_handler(details): | ||
print( | ||
f"Backing off OpenSearch bulk upload for {details['wait']} seconds after {details['tries']} tries due to {details['exception']}" | ||
) | ||
|
||
def _index_backoff_handler(details): | ||
print( | ||
f"Backing off OpenSearch indexing for {details['wait']} seconds after {details['tries']} tries due to {details['exception']}" | ||
) | ||
|
||
@classmethod | ||
@backoff.on_exception( | ||
backoff.expo, | ||
TransportError, | ||
max_time=OPENSEARCH_FULL_INDEX_TIMEOUT, | ||
on_backoff=_upload_backoff_handler, | ||
) | ||
def upload_batch(cls, batch: List[Record]): | ||
operations = [] | ||
for record in batch: | ||
|
@@ -56,16 +63,34 @@ def upload_batch(cls, batch: List[Record]): | |
index=OPENSEARCH_INDEX, | ||
body=operations, | ||
params={ | ||
"timeout": 300, | ||
"timeout": OPENSEARCH_BULK_INDEX_TIMEOUT, | ||
}, | ||
) | ||
|
||
@classmethod | ||
@backoff.on_exception( | ||
backoff.expo, | ||
TransportError, | ||
max_time=OPENSEARCH_FULL_INDEX_TIMEOUT, | ||
on_backoff=_index_backoff_handler, | ||
) | ||
def post_upload(cls, _distance): | ||
cls.client.indices.forcemerge( | ||
print( | ||
"Updated the index settings back to the default and waiting for indexing to be completed." | ||
) | ||
# Update the index settings back to the default | ||
refresh_interval = "1s" | ||
cls.client.indices.put_settings( | ||
index=OPENSEARCH_INDEX, | ||
params={ | ||
"timeout": 300, | ||
}, | ||
body={"index": {"refresh_interval": refresh_interval}}, | ||
Comment on lines
+81
to
+85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i believe it's best as is, meaning:
|
||
) | ||
_wait_for_es_status(cls.client) | ||
return {} | ||
|
||
def get_memory_usage(cls): | ||
index_stats = cls.client.indices.stats(index=OPENSEARCH_INDEX) | ||
size_in_bytes = index_stats["_all"]["primaries"]["store"]["size_in_bytes"] | ||
return { | ||
"size_in_bytes": size_in_bytes, | ||
"index_info": index_stats, | ||
} |
File renamed without changes.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Tuples don't support item assignment"
I suggest: