diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py index 2180ee21db9..17ee8b78169 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase.py @@ -23,7 +23,7 @@ from couchbase_columnar.cluster import Cluster from couchbase_columnar.common.result import BlockingQueryResult from couchbase_columnar.credential import Credential -from couchbase_columnar.options import QueryOptions +from couchbase_columnar.options import ClusterOptions, QueryOptions, TimeoutOptions from jinja2 import BaseLoader, Environment from pydantic import StrictFloat, StrictStr @@ -348,7 +348,10 @@ def _get_columnar_cluster(config: CouchbaseColumnarOfflineStoreConfig) -> Cluste assert config.password is not None cred = Credential.from_username_and_password(config.user, config.password) - return Cluster.create_instance(config.connection_string, cred) + timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) + return Cluster.create_instance( + config.connection_string, cred, ClusterOptions(timeout_options=timeout_opts) + ) def _execute_query( diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py index a28f853da1e..89e4aa2332e 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/couchbase_source.py @@ -4,7 +4,7 @@ from couchbase_columnar.cluster import Cluster from couchbase_columnar.credential import Credential -from couchbase_columnar.options import QueryOptions +from couchbase_columnar.options import ClusterOptions, QueryOptions, TimeoutOptions from typeguard import typechecked from feast.data_source import DataSource @@ -203,7 +203,12 @@ def get_table_column_names_and_types( cred = Credential.from_username_and_password( config.offline_store.user, config.offline_store.password ) - cluster = Cluster.create_instance(config.offline_store.connection_string, cred) + timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) + cluster = Cluster.create_instance( + config.offline_store.connection_string, + cred, + ClusterOptions(timeout_options=timeout_opts), + ) query_context = self.get_table_query_string() query = f""" @@ -218,7 +223,7 @@ def get_table_column_names_and_types( """ result = cluster.execute_query( - query, QueryOptions(timeout=timedelta(seconds=500)) + query, QueryOptions(timeout=timedelta(seconds=config.offline_store.timeout)) ) if not result: raise ZeroColumnQueryResult(query) diff --git a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py index 413dc2e618c..157e71fe490 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/couchbase_offline_store/tests/data_source.py @@ -7,7 +7,7 @@ import pandas as pd from couchbase_columnar.cluster import Cluster from couchbase_columnar.credential import Credential -from couchbase_columnar.options import QueryOptions +from couchbase_columnar.options import ClusterOptions, QueryOptions, TimeoutOptions from feast.data_source import DataSource from feast.feature_logging import LoggingDestination @@ -38,7 +38,7 @@ def __init__(self, project_name: str, *args, **kwargs): connection_string=os.environ["COUCHBASE_COLUMNAR_CONNECTION_STRING"], user=os.environ["COUCHBASE_COLUMNAR_USER"], password=os.environ["COUCHBASE_COLUMNAR_PASSWORD"], - timeout=480, + timeout=120, ) def create_data_source( @@ -67,8 +67,11 @@ def format_row(row): cred = Credential.from_username_and_password( self.offline_store_config.user, self.offline_store_config.password ) + timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) cluster = Cluster.create_instance( - self.offline_store_config.connection_string, cred + self.offline_store_config.connection_string, + cred, + ClusterOptions(timeout_options=timeout_opts), ) create_cluster_query = f"CREATE ANALYTICS COLLECTION {COUCHBASE_COLUMNAR_DATABASE}.{COUCHBASE_COLUMNAR_SCOPE}.{collection_name} IF NOT EXISTS PRIMARY KEY(pk: UUID) AUTOGENERATED;" @@ -127,8 +130,12 @@ def teardown(self): cred = Credential.from_username_and_password( self.offline_store_config.user, self.offline_store_config.password ) + + timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) cluster = Cluster.create_instance( - self.offline_store_config.connection_string, cred + self.offline_store_config.connection_string, + cred, + ClusterOptions(timeout_options=timeout_opts), ) for collection in self.collections: @@ -141,8 +148,5 @@ def teardown(self): ), ) print(f"Successfully dropped collection: {collection}") - except TimeoutError: - # FIXME: temp workaround, timeouts occur in Columnar SDK even when the drop was successful - pass except Exception as e: print(f"Error dropping collection {collection}: {e}") diff --git a/sdk/python/feast/templates/couchbase/bootstrap.py b/sdk/python/feast/templates/couchbase/bootstrap.py index 4a13f5847a8..ce3f6442a59 100644 --- a/sdk/python/feast/templates/couchbase/bootstrap.py +++ b/sdk/python/feast/templates/couchbase/bootstrap.py @@ -2,7 +2,7 @@ from couchbase_columnar.cluster import Cluster from couchbase_columnar.common.errors import InvalidCredentialError, TimeoutError from couchbase_columnar.credential import Credential -from couchbase_columnar.options import QueryOptions +from couchbase_columnar.options import ClusterOptions, QueryOptions, TimeoutOptions from feast.file_utils import replace_str_in_file from feast.infra.offline_stores.contrib.couchbase_offline_store.couchbase import ( @@ -61,13 +61,18 @@ def bootstrap(): cred = Credential.from_username_and_password( columnar_user, columnar_password ) - cluster = Cluster.create_instance(columnar_connection_string, cred) + timeout_opts = TimeoutOptions(dispatch_timeout=timedelta(seconds=120)) + cluster = Cluster.create_instance( + columnar_connection_string, + cred, + ClusterOptions(timeout_options=timeout_opts), + ) table_name = "Default.Default.feast_driver_hourly_stats" try: cluster.execute_query( f"DROP COLLECTION {table_name} IF EXISTS", - QueryOptions(timeout=timedelta(seconds=500)), + QueryOptions(timeout=timedelta(seconds=columnar_timeout)), ) except TimeoutError: # FIXME: temp workaround, timeouts occur in Columnar SDK even when the drop was successful