Skip to content

Commit

Permalink
Merge pull request #87 from dbmi-pitt/tjmadonna/add-session-to-es-upd…
Browse files Browse the repository at this point in the history
…ates

Adding session to es bulk_update and upsert funcs
  • Loading branch information
maxsibilla authored Nov 12, 2024
2 parents 5240e7b + 5de73b7 commit 604ec8d
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions src/opensearch_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,9 @@ def check_response_payload_size(response_text):
internal_server_error(msg)


def upsert(doc: dict, index: str, es_url: str, headers: Optional[dict] = None, verify: bool = False):
""" Update or insert a document in the index.
def upsert(doc: dict, index: str, es_url: str,
headers: Optional[dict] = None, verify: bool = False, session: requests.Session = None):
"""Update or insert a document in the index.
Parameters
----------
Expand All @@ -238,6 +239,8 @@ def upsert(doc: dict, index: str, es_url: str, headers: Optional[dict] = None, v
The headers to be included in the request.
verify: bool, optional
Whether to verify the SSL certificate.
session: requests.Session, optional
The requests session to be used for the request.
Returns
-------
Expand All @@ -249,6 +252,9 @@ def upsert(doc: dict, index: str, es_url: str, headers: Optional[dict] = None, v
"doc": doc,
"doc_as_upsert": True
}

if session is not None:
return session.post(url, json=body, headers=headers, verify=verify)
return requests.post(url, json=body, headers=headers, verify=verify)


Expand All @@ -267,7 +273,8 @@ class BulkUpdate:
deletes: list[str] = field(default_factory=list)


def bulk_update(bulk_update: BulkUpdate, index: str, es_url: str, headers: Optional[dict] = None, verify: bool = False):
def bulk_update(bulk_update: BulkUpdate, index: str, es_url: str,
headers: Optional[dict] = None, verify: bool = False, session: requests.Session = None):
"""Upsert (update or insert) or delete multiple documents in the index.
Parameters
Expand All @@ -282,6 +289,8 @@ def bulk_update(bulk_update: BulkUpdate, index: str, es_url: str, headers: Optio
The headers to be included in the request.
verify: bool, optional
Whether to verify the SSL certificate.
session: requests.Session, optional
The requests session to be used for the request.
Returns
-------
Expand Down Expand Up @@ -310,4 +319,7 @@ def bulk_update(bulk_update: BulkUpdate, index: str, es_url: str, headers: Optio
deletes = [f'{{ "delete": {{ "_id": "{delete_uuid}" }} }}' for delete_uuid in bulk_update.deletes]

body = "\n".join(upserts + deletes) + "\n"

if session is not None:
return session.post(url, headers=headers, data=body, verify=verify)
return requests.post(url, headers=headers, data=body, verify=verify)

0 comments on commit 604ec8d

Please sign in to comment.