Skip to content

Commit

Permalink
Fix quirks related to no-name files. Add more filters.
Browse files Browse the repository at this point in the history
  • Loading branch information
mihow committed Jul 11, 2024
1 parent 561b766 commit e9b5312
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 50 deletions.
150 changes: 116 additions & 34 deletions ami/utils/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from mypy_boto3_s3.type_defs import BucketTypeDef, ObjectTypeDef, PaginatorConfigTypeDef
from rich import print

from .storages import ConnectionTestResult
from .storages import IMAGE_FILE_EXTENSIONS, ConnectionTestResult

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -96,23 +96,22 @@ def get_session(config: S3Config) -> boto3.session.Session:
return session


def get_client(config: S3Config) -> S3Client:
def get_s3_client(config: S3Config) -> S3Client:
session = get_session(config)
if config.endpoint_url:
client: S3Client = session.client(
client = session.client(
service_name="s3",
endpoint_url=config.endpoint_url,
aws_access_key_id=config.access_key_id,
aws_secret_access_key=config.secret_access_key,
config=botocore.config.Config(signature_version="s3v4"),
)
else:
client: S3Client = session.client(
client = session.client(
service_name="s3",
aws_access_key_id=config.access_key_id,
aws_secret_access_key=config.secret_access_key,
)

return client


Expand All @@ -127,7 +126,7 @@ def get_resource(config: S3Config) -> S3ServiceResource:


def list_buckets(config: S3Config) -> list[BucketTypeDef]:
s3 = get_client(config)
s3 = get_s3_client(config)
return s3.list_buckets().get("Buckets", [])


Expand All @@ -138,14 +137,14 @@ def get_bucket(config: S3Config) -> Bucket:


def list_projects(config: S3Config):
client = get_client(config)
client = get_s3_client(config)
resp = client.list_objects_v2(Bucket=config.bucket_name, Prefix="", Delimiter="/")
prefixes = [without_trailing_slash(item["Prefix"]) for item in resp["CommonPrefixes"]] # type: ignore
return prefixes


def list_deployments(config: S3Config, project: str):
client = get_client(config)
client = get_s3_client(config)
resp = client.list_objects_v2(Bucket=config.bucket_name, Prefix=with_trailing_slash(project), Delimiter="/")
if len(resp) and "CommonPrefixes" in resp.keys():
prefixes = [without_trailing_slash(item["Prefix"]) for item in resp["CommonPrefixes"]] # type: ignore
Expand All @@ -161,7 +160,7 @@ def count_files(config: S3Config):


def count_files_paginated(config: S3Config) -> int:
client = get_client(config)
client = get_s3_client(config)
paginator: ListObjectsV2Paginator = client.get_paginator("list_objects_v2")
page_iterator = paginator.paginate(
Bucket=config.bucket_name,
Expand All @@ -183,16 +182,31 @@ def _compile_regex_filter(regex_filter: str | None) -> re.Pattern | None:
return regex


def _filter_single_key(key: str, regex: re.Pattern | None = None) -> bool:
def _filter_single_key(
key: str,
obj_size: int,
regex: re.Pattern | None = None,
file_extensions: list[str] = IMAGE_FILE_EXTENSIONS,
) -> bool:
"""
Determine if a single filepath should be filtered based on a regex pattern.
"""
logger.debug(f"Checking key: {key}")
if key == "":
logger.debug("Empty key is skipped")
return False
if not any(key.lower().endswith(ext) for ext in file_extensions):
logger.debug(f"{key} is skipped because it does not have a valid file extension: {file_extensions}")
return False
if key.endswith("/"):
logger.debug(key + " is skipped because it is a folder")
return False
if regex and not regex.search(key):
logger.debug(f'{key} is skipped by regex filter: "{regex.pattern}"')
return False
if obj_size == 0:
logger.debug(f"{key} is skipped because it is an empty object")
return False
return True


Expand All @@ -201,11 +215,15 @@ def list_files(
limit: int | None = 100000,
subdir: str | None = None,
regex_filter: str | None = None,
file_extensions: list[str] = IMAGE_FILE_EXTENSIONS,
) -> typing.Generator[tuple[ObjectSummary | None, int], typing.Any, None]:
"""
"Recursively" list files in a bucket, with optional limit and regex filter.
Returns an ObjectSummary object.
@TODO Consider returning just the key instead of the full object so we
can make list_files_paginated more consistent with list_files.
"""
# @TODO raise a warning about potential cost for large buckets
bucket = get_bucket(config)
Expand All @@ -221,7 +239,7 @@ def list_files(
num_files_checked = 0
for num_files_checked, obj in enumerate(bucket_iter):
num_files_checked += 1
if _filter_single_key(obj.key, regex):
if _filter_single_key(obj.key, obj_size=obj.size, regex=regex, file_extensions=file_extensions):
logger.debug(f"Yielding {obj.key}")
yield obj, num_files_checked
yield None, num_files_checked
Expand All @@ -232,14 +250,18 @@ def list_files_paginated(
limit: int | None = None,
subdir: str | None = None,
regex_filter: str | None = None,
file_extensions: list[str] = IMAGE_FILE_EXTENSIONS,
**paginator_params: typing.Any,
) -> typing.Generator[tuple[ObjectTypeDef | None, int], typing.Any, None]:
"""
List files in a bucket, with pagination to increase performance.
Returns an ObjectTypeDef dict instead of an ObjectSummary object.
@TODO Consider returning just the key instead of the full object so we
can make list_files_paginated more consistent with list_files.
"""
client = get_client(config)
client = get_s3_client(config)
full_prefix = make_full_prefix(config, subdir)
full_uri = make_full_prefix_uri(config, subdir, regex_filter)
logger.info(f"Scanning {full_uri}")
Expand All @@ -248,17 +270,15 @@ def list_files_paginated(
# Prepare paginator parameters
paginate_params: dict[str, typing.Any] = {
"Bucket": config.bucket_name,
"Prefix": full_prefix,
}
if full_prefix:
paginate_params["Prefix"] = full_prefix

# Prepare pagination configuration
pagination_config: PaginatorConfigTypeDef = {}
if limit is not None:
pagination_config["MaxItems"] = limit

# Update with any additional paginator parameters
paginate_params.update(paginator_params)

# Use the Prefix parameter to filter results server-side
page_iterator = paginator.paginate(PaginationConfig=pagination_config, **paginate_params)

Expand All @@ -269,9 +289,9 @@ def list_files_paginated(
if "Contents" in page:
for obj in page["Contents"]:
num_files_checked += 1
assert "Key" in obj, f"Key is missing from object: {obj}"
assert "Key" in obj and "Size" in obj, f"Key or Size is missing from object: {obj}"
logger.debug(f"Found {obj['Key']}")
if _filter_single_key(obj["Key"], regex):
if _filter_single_key(obj["Key"], obj_size=obj["Size"], regex=regex, file_extensions=file_extensions):
logger.debug(f"Yielding {obj['Key']}")
yield obj, num_files_checked
else:
Expand All @@ -283,7 +303,7 @@ def list_files_paginated(
def make_full_prefix(
config: S3Config, subdir: str | None = None, with_bucket: bool = False, leading_slash: bool = False
) -> str:
full_prefix = pathlib.Path(config.prefix, subdir or "")
full_prefix = pathlib.Path(without_leading_slash(config.prefix), without_leading_slash(subdir) if subdir else "")
if with_bucket:
full_prefix = pathlib.Path(config.bucket_name, full_prefix)

Expand Down Expand Up @@ -343,34 +363,85 @@ def make_full_key_uri(config: S3Config, key: str, subdir: str | None = None, wit
"""
key = key_with_prefix(config, key, subdir)
if with_protocol:
full_key_uri = f"s3://{pathlib.Path(config.bucket_name, key).as_posix()}"
full_key_uri = f"s3://{pathlib.Path(config.bucket_name, without_leading_slash(key)).as_posix()}"
else:
full_key_uri = f"/{pathlib.Path(config.bucket_name, key).as_posix()}"
full_key_uri = f"/{pathlib.Path(config.bucket_name, without_leading_slash(key)).as_posix()}"

return full_key_uri


def _handle_boto_error(e: Exception) -> tuple[str, str]:
"""Handle different types of boto errors and return appropriate error code and message."""
def _handle_boto_error(e: Exception, operation_name: str = "unknown_operation", raise_error=False) -> tuple[str, str]:
"""Handle different types of boto errors and return more specific exceptions."""

known_errors = {
"NoSuchKey": ("InvalidCredentials", "Invalid access key"),
"AccessDenied": ("InvalidCredentials", "Invalid secret key or insufficient permissions"),
"NoSuchBucket": ("NoSuchBucket", "Bucket does not exist"),
}

if isinstance(e, botocore.exceptions.ClientError):
error_code = e.response.get("Error", {}).get("Code", "UnknownBotoError")
error_message = e.response.get("Error", {}).get("Message", str(e))

if error_code in known_errors:
error_code, error_message = known_errors[error_code]

elif isinstance(e, botocore.exceptions.EndpointConnectionError):
error_code = "EndpointConnectionError"
error_message = str(e)

elif isinstance(e, botocore.exceptions.BotoCoreError):
error_code = e.__class__.__name__
error_message = str(e)

else:
error_code = "UnknownBotoError"
error_message = str(e)

logger.error(f"{error_code}: {error_message}")
return error_code, error_message
if raise_error:
raise botocore.exceptions.ClientError(
{"Error": {"Code": error_code, "Message": error_message}},
operation_name=operation_name,
)
else:
return error_code, error_message


def test_credentials(config: S3Config) -> bool:
client = get_s3_client(config)
try:
client.get_bucket_location(Bucket=config.bucket_name)
except botocore.exceptions.ClientError as e:
_handle_boto_error(e, operation_name="get_bucket_location", raise_error=True)
return True


def delete_blank_and_empty_key(config: S3Config, subdir: str | None = None):
"""
Apparently MinIO and Swift object store allows you to create objects with no name,
which breaks the list_files function.
Here we will delete any objects with no name if their content length is 0.
"""
key = key_with_prefix(config, "", subdir)
if file_exists(config, key):
key = get_bucket(config).Object(key)
# Check if the object content length is 0
if key.get()["ContentLength"] == 0:
logger.warn(f"Found object with no name, deleting empty object at: {key}")
key.delete()
else:
logger.error(
f"Object with no name has content, skipping deletion of: {key}, but list_files function may break"
)


def test_connection(
config: S3Config, subdir: str | None = None, regex_filter: str | None = None
config: S3Config,
subdir: str | None = None,
regex_filter: str | None = None,
file_extensions: list[str] = IMAGE_FILE_EXTENSIONS,
) -> ConnectionTestResult:
"""
Test the connection and return detailed statistics about the operation.
Expand All @@ -387,13 +458,24 @@ def test_connection(
full_uri = make_full_prefix_uri(config, subdir, regex_filter)

try:
# Determine max_keys based on whether a regex_filter is provided
limit = 1 if not regex_filter else 10000
# Test the access key & secret credentials by calling STS
test_credentials(config)

# Limit the number of files to check
limit = 10000

# Delete blank keys at this subdirectory level
# @TODO Needs further testing. This fixed the issue, but also increasing the limit fixed the issue.
# delete_blank_and_empty_key(config, subdir)

# Use list_files_paginated with appropriate max_keys
file_generator = list_files_paginated(
config, limit=limit, subdir=subdir, regex_filter=regex_filter
) # , max_keys=max_keys)
config,
limit=limit,
subdir=subdir,
regex_filter=regex_filter,
file_extensions=file_extensions,
)

# Measure latency to first response
first_response_time = time.time()
Expand All @@ -404,17 +486,17 @@ def test_connection(
connection_successful = True
prefix_exists = first_file_found is not None # In S3, a prefix only exists if there is at least one object

# Catch "NoSuchKey" error when the access key is invalid
except (
botocore.exceptions.ClientError,
botocore.exceptions.EndpointConnectionError,
botocore.exceptions.BotoCoreError,
) as e:
error_code, error_message = _handle_boto_error(e)
error_code, error_message = _handle_boto_error(e, operation_name="list_files", raise_error=False)
except Exception as e:
error_code = "UnknownError"
error_code = str(e)
error_message = str(e)
logger.error(f"Unknown error: {error_message}")
raise # Re-raise the exception for unexpected errors

total_time = time.time() - start_time

Expand Down Expand Up @@ -508,7 +590,7 @@ def get_presigned_url(config: S3Config, key: str, expires_in: int = 60 * 60 * 24
url = cache.get(cache_key, default=None)
if not url:
logger.debug(f"Fetching new presigned URL for: {cache_key}")
client = get_client(config)
client = get_s3_client(config)
url = client.generate_presigned_url(
"get_object",
Params={"Bucket": config.bucket_name, "Key": key},
Expand Down Expand Up @@ -536,7 +618,7 @@ def resized_key(config: S3Config, key: str, width: int, height: int):


def resize_image(config: S3Config, key: str, width: int, height: int) -> str:
client = get_client(config)
client = get_s3_client(config)
new_key = resized_key(config, key, width, height)
client.put_object(
Bucket=config.bucket_name,
Expand Down
2 changes: 2 additions & 0 deletions ami/utils/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from storages.backends.s3boto3 import S3Boto3Storage

IMAGE_FILE_EXTENSIONS = ["jpg", "jpeg", "png", "gif", "webp", "svg", "bmp", "ico", "tiff", "tif"]


class StaticRootS3Boto3Storage(S3Boto3Storage):
location = "static"
Expand Down
Loading

0 comments on commit e9b5312

Please sign in to comment.