-
Notifications
You must be signed in to change notification settings - Fork 230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add multi-storage-client backend for file open #1455
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,7 @@ | |
import yaml | ||
from packaging.version import parse as parse_version | ||
|
||
from lhotse.utils import Pathlike, Pipe, SmartOpen, is_module_available, is_valid_url | ||
from lhotse.utils import Pathlike, Pipe, SmartOpen, is_module_available, is_valid_url, replace_bucket_with_profile_name | ||
from lhotse.workarounds import gzip_open_robust | ||
|
||
# TODO: figure out how to use some sort of typing stubs | ||
|
@@ -815,6 +815,82 @@ def handles_special_case(self, identifier: Pathlike) -> bool: | |
def is_applicable(self, identifier: Pathlike) -> bool: | ||
return is_valid_url(identifier) | ||
|
||
|
||
@lru_cache(1) | ||
def get_lhotse_msc_override_protocols() -> Any: | ||
return os.getenv("LHOTSE_MSC_OVERRIDE_PROTOCOLS", None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please document these environment variables in Lhotse's top-level |
||
|
||
|
||
@lru_cache(1) | ||
def get_lhotse_msc_profile() -> Any: | ||
return os.getenv("LHOTSE_MSC_PROFILE", None) | ||
|
||
|
||
@lru_cache(1) | ||
def get_lhotse_io_backend() -> Any: | ||
return os.getenv("LHOTSE_IO_BACKEND", None) | ||
|
||
|
||
MSC_PREFIX = "msc" | ||
|
||
class MSCIOBackend(IOBackend): | ||
""" | ||
Uses multi-storage client to download data from object store | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a link to MSC here? It'd be good to add 1-2 sentences about how MSC is different and what are it's unique features. |
||
""" | ||
|
||
def open(self, identifier: str, mode: str): | ||
""" | ||
Convert identifier if is not prefixed with msc, and use msc.open to access the file | ||
For paths that are prefixed with msc, e.g. msc://profile/path/to/my/object1 | ||
|
||
For paths are yet to migrate to msc-compatible url, e.g. protocol://bucket/path/to/my/object2 | ||
1. override protocols provided by env LHOTSE_MSC_OVERRIDE_PROTOCOLS to msc: msc://bucket/path/to/my/object2 | ||
2. override the profile/bucket name by env LHOTSE_MSC_PROFILE if provided: msc://profile/path/to/my/object2, | ||
if bucket name is not provided, then we expect the msc profile name to match with bucket name | ||
""" | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add an import guard here: if not is_module_available("multistorageclient"):
raise RuntimeError("Please run 'pip install multistorageclient' in order to use MSCIOBackend.") (imported from |
||
import multistorageclient as msc | ||
|
||
# if url prefixed with msc, then return early | ||
if identifier.startswith(f"{MSC_PREFIX}://"): | ||
return msc.open(identifier, mode) | ||
|
||
# override protocol if provided | ||
lhotse_msc_override_protocols = get_lhotse_msc_override_protocols() | ||
if lhotse_msc_override_protocols: | ||
if "," in lhotse_msc_override_protocols: | ||
override_protocol_list = lhotse_msc_override_protocols.split(",") | ||
else: | ||
override_protocol_list = [lhotse_msc_override_protocols] | ||
for override_protocol in override_protocol_list: | ||
if identifier.startswith(override_protocol): | ||
identifier = identifier.replace(override_protocol, MSC_PREFIX) | ||
break | ||
|
||
# override bucket if provided | ||
lhotse_msc_profile = get_lhotse_msc_profile() | ||
if lhotse_msc_profile: | ||
identifier = replace_bucket_with_profile_name(identifier, lhotse_msc_profile) | ||
|
||
try: | ||
file = msc.open(identifier, mode) | ||
except Exception as e: | ||
print(f"exception: {e}, identifier: {identifier}") | ||
raise e | ||
|
||
return file | ||
|
||
|
||
@classmethod | ||
def is_available(cls) -> bool: | ||
return is_module_available("multistorageclient") | ||
|
||
def handles_special_case(self, identifier: Pathlike) -> bool: | ||
return str(identifier).startswith(f"{MSC_PREFIX}://") | ||
|
||
def is_applicable(self, identifier: Pathlike) -> bool: | ||
return is_valid_url(identifier) | ||
|
||
|
||
class CompositeIOBackend(IOBackend): | ||
""" | ||
|
@@ -938,6 +1014,8 @@ def get_default_io_backend() -> "IOBackend": | |
RedirectIOBackend(), | ||
PipeIOBackend(), | ||
] | ||
if MSCIOBackend.is_available(): | ||
backends.append(MSCIOBackend()) | ||
if AIStoreIOBackend.is_available(): | ||
# Try AIStore before other generalist backends, | ||
# but only if it's installed and enabled via AIS_ENDPOINT env var. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove the lru cache decorator since these environ lookups should be cheap - it looks like that would simplify the tests.