Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Identify s3/remote uri path correctly #5076

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions sdk/python/feast/infra/offline_stores/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,9 @@ def persist(
# Check if the specified location already exists.
if not allow_overwrite and os.path.exists(storage.file_options.uri):
raise SavedDatasetLocationAlreadyExists(location=storage.file_options.uri)

if not Path(storage.file_options.uri).is_absolute():
absolute_path = Path(self.repo_path) / storage.file_options.uri
else:
absolute_path = Path(storage.file_options.uri)
absolute_path = FileSource.get_uri_for_file_path(
repo_path=self.repo_path, uri=storage.file_options.uri
)

filesystem, path = FileSource.create_filesystem_and_path(
str(absolute_path),
Expand Down
7 changes: 3 additions & 4 deletions sdk/python/feast/infra/offline_stores/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ def _write_data_source(

file_options = data_source.file_options

if not Path(file_options.uri).is_absolute():
absolute_path = Path(repo_path) / file_options.uri
else:
absolute_path = Path(file_options.uri)
absolute_path = FileSource.get_uri_for_file_path(
repo_path=repo_path, uri=file_options.uri
)

if (
mode == "overwrite"
Expand Down
23 changes: 14 additions & 9 deletions sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Optional, Tuple
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union
from urllib.parse import urlparse

import pyarrow
from packaging import version
Expand Down Expand Up @@ -154,17 +155,21 @@ def validate(self, config: RepoConfig):
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
return type_map.pa_to_feast_value_type

@staticmethod
def get_uri_for_file_path(repo_path: Union[Path, str, None], uri: str) -> str:
parsed_uri = urlparse(uri)
if parsed_uri.scheme and parsed_uri.netloc:
return uri # Keep remote URIs as they are
if repo_path is not None and not Path(uri).is_absolute():
return str(Path(repo_path) / uri)
return str(Path(uri))

def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
if (
config.repo_path is not None
and not Path(self.file_options.uri).is_absolute()
):
absolute_path = config.repo_path / self.file_options.uri
else:
absolute_path = Path(self.file_options.uri)

absolute_path = self.get_uri_for_file_path(
repo_path=config.repo_path, uri=self.file_options.uri
)
filesystem, path = FileSource.create_filesystem_and_path(
str(absolute_path), self.file_options.s3_endpoint_override
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
TrinoRetrievalJob,
)
from feast.infra.offline_stores.dask import DaskRetrievalJob
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.offline_store import RetrievalJob, RetrievalMetadata
from feast.infra.offline_stores.redshift import (
RedshiftOfflineStoreConfig,
Expand Down Expand Up @@ -246,3 +247,28 @@ def test_to_arrow_timeout(retrieval_job, timeout: Optional[int]):
with patch.object(retrieval_job, "_to_arrow_internal") as mock_to_arrow_internal:
retrieval_job.to_arrow(timeout=timeout)
mock_to_arrow_internal.assert_called_once_with(timeout=timeout)


@pytest.mark.parametrize(
"repo_path, uri, expected",
[
# Remote URI - Should return as-is
(
"/some/repo",
"s3://bucket-name/file.parquet",
"s3://bucket-name/file.parquet",
),
# Absolute Path - Should return as-is
("/some/repo", "/abs/path/file.parquet", "/abs/path/file.parquet"),
# Relative Path with repo_path - Should combine
("/some/repo", "data/output.parquet", "/some/repo/data/output.parquet"),
# Relative Path without repo_path - Should return absolute path
(None, "C:/path/to/file.parquet", "C:/path/to/file.parquet"),
],
ids=["s3_uri", "absolute_path", "relative_path", "windows_path"],
)
def test_get_uri_for_file_path(
repo_path: Optional[str], uri: str, expected: str
) -> None:
result = FileSource.get_uri_for_file_path(repo_path=repo_path, uri=uri)
assert result == expected, f"Expected {expected}, but got {result}"
Loading