Skip to content

Commit

Permalink
Fix storage tests, standarize filter methods & used dedicated test bu…
Browse files Browse the repository at this point in the history
…cket
  • Loading branch information
mihow committed Jul 10, 2024
1 parent 4c4dd29 commit 1f5e32d
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 74 deletions.
1 change: 1 addition & 0 deletions .envs/.local/.django
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ MINIO_ROOT_USER=minioadmin
MINIO_ROOT_PASSWORD=minioadmin
MINIO_DEFAULT_BUCKET=ami
MINIO_STORAGE_USE_HTTPS=False
MINIO_TEST_BUCKET=ami-test
151 changes: 78 additions & 73 deletions ami/utils/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,37 @@ def count_files_paginated(config: S3Config) -> int:
return count


def _compile_regex_filter(regex_filter: str | None) -> re.Pattern | None:
regex = re.compile(str(regex_filter)) if regex_filter else None
if regex:
logger.debug(f"Interpreted regex_filter '{regex_filter}' as pattern '{regex.pattern}'")
return regex


def _filter_single_key(key: str, regex: re.Pattern | None = None) -> bool:
"""
Determine if a single filepath should be filtered based on a regex pattern.
"""
if key.endswith("/"):
logger.debug(key + " is skipped because it is a folder")
return False
if regex and not regex.search(key):
logger.debug(f'{key} is skipped by regex filter: "{regex.pattern}"')
return False
return True


def list_files(
config: S3Config,
limit: int | None = 100000,
subdir: str | None = None,
regex_filter: str | None = None,
) -> typing.Generator[ObjectSummary, typing.Any, None]:
"""
Recursively list files in a bucket, with optional limit and regex filter.
Returns an ObjectSummary object.
"""
# @TODO raise a warning about potential cost for large buckets
bucket = get_bucket(config)
full_prefix = make_full_prefix(config, subdir)
Expand All @@ -192,16 +217,60 @@ def list_files(
else:
objects = q.all()
bucket_iter = objects
regex = re.compile(str(regex_filter)) if regex_filter else None
regex = _compile_regex_filter(regex_filter)
for obj in bucket_iter:
if obj.key.endswith("/"):
logger.debug(obj.key + " is skipped because it is a folder")
continue
if regex and not regex.match(obj.key):
logger.debug(obj.key + " is skipped by regex filter")
continue
logger.debug(f"Yielding {obj.key}")
yield obj
if _filter_single_key(obj.key, regex):
logger.debug(f"Yielding {obj.key}")
yield obj


def list_files_paginated(
config: S3Config,
limit: int | None = None,
subdir: str | None = None,
regex_filter: str | None = None,
**paginator_params: typing.Any,
) -> typing.Generator[ObjectTypeDef, None, None]:
"""
List files in a bucket, with pagination to increase performance.
Returns an ObjectTypeDef dict instead of an ObjectSummary object.
"""
client = get_client(config)
full_prefix = make_full_prefix(config, subdir)
full_uri = make_full_prefix_uri(config, subdir, regex_filter)
logger.info(f"Scanning {full_uri}")
paginator: ListObjectsV2Paginator = client.get_paginator("list_objects_v2")

# Prepare paginator parameters
paginate_params: dict[str, typing.Any] = {
"Bucket": config.bucket_name,
"Prefix": full_prefix,
}

# Prepare pagination configuration
pagination_config: PaginatorConfigTypeDef = {}
if limit is not None:
pagination_config["MaxItems"] = limit

# Update with any additional paginator parameters
paginate_params.update(paginator_params)

# Use the Prefix parameter to filter results server-side
page_iterator = paginator.paginate(PaginationConfig=pagination_config, **paginate_params)

regex = _compile_regex_filter(regex_filter)

for page in page_iterator:
if "Contents" in page:
for obj in page["Contents"]:
assert "Key" in obj, f"Key is missing from object: {obj}"
logger.debug(f"Found {obj['Key']}")
if _filter_single_key(obj["Key"], regex):
logger.debug(f"Yielding {obj['Key']}")
yield obj
else:
logger.debug("No Contents in page")


def make_full_prefix(
Expand Down Expand Up @@ -274,70 +343,6 @@ def make_full_key_uri(config: S3Config, key: str, subdir: str | None = None, wit
return full_key_uri


# @TODO return information about the page & files checked, even if no file match the regex or the endswith check
def filter_objects(
page_iterator, regex_filter: str | None = None
) -> typing.Generator[ObjectTypeDef, typing.Any, None]:
"""
Filter objects based on regex pattern and yield matching objects.
"""
regex = re.compile(str(regex_filter)) if regex_filter else None
logger.debug(f"Filtering objects with regex: {regex}")

for page in page_iterator:
if "Contents" in page:
for obj in page["Contents"]:
assert "Key" in obj, f"Key is missing from object: {obj}"
logger.debug(f"Found {obj['Key']}")
if obj["Key"].endswith("/"):
logger.debug(obj["Key"] + " is skipped because it is a folder")
continue
if regex and not regex.search(str(obj["Key"])):
logger.debug(f'{obj["Key"]} is skipped by regex filter: "{regex.pattern}"')
continue
logger.debug(f"Yielding {obj['Key']}")
yield obj
else:
logger.debug("No Contents in page")


def list_files_paginated(
config: S3Config,
limit: int | None = None,
subdir: str | None = None,
regex_filter: str | None = None,
**paginator_params: typing.Any,
) -> typing.Generator[ObjectTypeDef, None, None]:
"""
List files in a bucket, with pagination to increase performance.
"""
client = get_client(config)
full_prefix = make_full_prefix(config, subdir)
full_uri = make_full_prefix_uri(config, subdir, regex_filter)
logger.info(f"Scanning {full_uri}")
paginator: ListObjectsV2Paginator = client.get_paginator("list_objects_v2")

# Prepare paginator parameters
paginate_params: dict[str, typing.Any] = {
"Bucket": config.bucket_name,
"Prefix": full_prefix,
}

# Prepare pagination configuration
pagination_config: PaginatorConfigTypeDef = {}
if limit is not None:
pagination_config["MaxItems"] = limit

# Update with any additional paginator parameters
paginate_params.update(paginator_params)

# Use the Prefix parameter to filter results server-side
page_iterator = paginator.paginate(PaginationConfig=pagination_config, **paginate_params)

# Use a separate generator for regex filtering
return filter_objects(page_iterator, regex_filter)


def _handle_boto_error(e: Exception) -> tuple[str, str]:
"""Handle different types of boto errors and return appropriate error code and message."""
if isinstance(e, botocore.exceptions.ClientError):
Expand Down
2 changes: 1 addition & 1 deletion config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,4 +367,4 @@
S3_TEST_ENDPOINT = env("MINIO_ENDPOINT", default="http://minio:9000") # type: ignore[no-untyped-call]
S3_TEST_KEY = env("MINIO_ROOT_USER", default=None) # type: ignore[no-untyped-call]
S3_TEST_SECRET = env("MINIO_ROOT_PASSWORD", default=None) # type: ignore[no-untyped-call]
S3_TEST_BUCKET = env("MINIO_DEFAULT_BUCKET", default="test") # type: ignore[no-untyped-call]
S3_TEST_BUCKET = env("MINIO_TEST_BUCKET", default="ami-test") # type: ignore[no-untyped-call]

0 comments on commit 1f5e32d

Please sign in to comment.