diff --git a/README.md b/README.md index 5375b46..4505510 100644 --- a/README.md +++ b/README.md @@ -29,13 +29,12 @@ interop and upgrades. To work with Harmony, services must: supported way to receive messages, though HTTP is likely to follow. `harmony.cli` provides helpers for setting up CLI parsing while being unobtrusive to non-Harmony CLIs that may also need to exist. -2. Extend `harmony.BaseHarmonyAdapter` and implement the `#invoke` to -adapt the incoming Harmony message to a service call and adapt the service -result to call to one of the adapter's `#completed_with_*` methods. The adapter -class provides helper methods for retrieving data, staging results, and cleaning -up temporary files, though these can be overridden or ignored if a service -needs different behavior, e.g. if it operates on data in situ and does not -want to download the remote file. +2. Extend `harmony.BaseHarmonyAdapter` and either override `#invoke` to process +the message or override `#process_item` to process each individual STAC item +provided in the input STAC catalog. The adapter class provides helper methods +for retrieving data, staging results, and cleaning up temporary files, though +these can be overridden or ignored if a service needs different behavior, e.g. +if it operates on data in situ and does not want to download the remote file. A full example of these two requirements with use of helpers can be found in [example/example_service.py](example/example_service.py). Also see @@ -86,16 +85,10 @@ OPTIONAL: * `USE_LOCALSTACK`: (Development) If 'true' will perform S3 calls against localstack rather than AWS * `LOCALSTACK_HOST`: (Development) If `USE_LOCALSTACK` `true` and this is set, will - establish `boto` client connections for S3 & SQS operations using this hostname. + establish `boto` client connections for S3 operations using this hostname. * `TEXT_LOGGER`: (Default: True) Setting this to true will cause all log messages to use a text string format. By default log messages will be formatted as JSON. -* `HEALTH_CHECK_PATH`: Set this to the path where the health check file should be stored. This - file's mtime is set to the current time whenever a successful attempt is made to to read the - message queue (whether or not a message is retrieved). This file can be used by a container's - health check command. The container is considered unhealthy if the mtime of the file is old - - where 'old' is configurable in the service container. If this variable is not set the path - defaults to '/tmp/health.txt'. * `MAX_DOWNLOAD_RETRIES`: Number of times to retry HTTP download calls that fail due to transient errors. OPTIONAL -- Use with CAUTION: diff --git a/dev-requirements.txt b/dev-requirements.txt index a966da2..9a3da72 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -12,5 +12,10 @@ pytest-mock ~=3.5 python-language-server ~= 0.35 responses ~=0.22.0 pycodestyle >= 2.9.1 -safety ~= 3.2.7 setuptools == 70.0.0 + +# Pin safety down to work around the compatibility issue with Python 3.13 +# See: https://github.com/pyupio/safety/issues/620 +# Update safety version when the issue is resolved. +safety-schemas == 0.0.5 +safety == 3.2.3 diff --git a/harmony_service_lib/adapter.py b/harmony_service_lib/adapter.py index e86e2fa..c36957d 100644 --- a/harmony_service_lib/adapter.py +++ b/harmony_service_lib/adapter.py @@ -8,23 +8,13 @@ between service results and Harmony callbacks. """ -import shutil -import os -import urllib import logging import uuid from abc import ABC -from tempfile import mkdtemp -from warnings import warn +from pystac import Catalog, read_file -from deprecation import deprecated -from pystac import Catalog, Item, Asset, read_file - -from harmony_service_lib.exceptions import CanceledException from harmony_service_lib.http import request_context from harmony_service_lib.logging import build_logger -from harmony_service_lib.message import Temporal -from harmony_service_lib.util import touch_health_check_file from . import util @@ -41,17 +31,8 @@ class BaseHarmonyAdapter(ABC): ---------- message : harmony_service_lib.Message The Harmony input which needs acting upon - temp_paths : list - A list of string paths that should be cleaned up on exit - is_complete : boolean - True if the service has provided a result to Harmony (and therefore must - not provide another) - is_canceled: boolean - True if the request has been canceled by a Harmony user or operator logger: Logger Logger specific to this request - is_failed: boolean - True if the request failed to execute successfully """ def __init__(self, message, catalog=None, config=None): @@ -67,10 +48,6 @@ def __init__(self, message, catalog=None, config=None): config : harmony_service_lib.util.Config The configuration values for this runtime environment. """ - if catalog is None: - warn('Invoking adapter.BaseHarmonyAdapter without a STAC catalog is deprecated', - DeprecationWarning, stacklevel=2) - # set the request ID in the global context so we can use it in other places request_id = message.requestId if hasattr(message, 'requestId') else None request_context['request_id'] = request_id @@ -84,12 +61,6 @@ def __init__(self, message, catalog=None, config=None): else: self.logger = logging.getLogger() - # Properties that will be deprecated - self.temp_paths = [] - self.is_complete = False - self.is_canceled = False - self.is_failed = False - def set_config(self, config): self.config = config if self.config is not None: @@ -119,9 +90,8 @@ def invoke(self): # New-style processing using STAC if self.catalog: return (self.message, self._process_catalog_recursive(self.catalog)) - - # Deprecated, processing using callbacks - self._process_with_callbacks() + else: + raise RuntimeError("The service should override the invoke function when no STAC catalog is provided.") def get_all_catalog_items(self, catalog: Catalog, follow_page_links=True): """ @@ -203,42 +173,6 @@ def _process_catalog_recursive(self, catalog): return result - def _process_with_callbacks(self): - """ - Method for backward compatibility with non-chaining workflows. Takes an incoming message - containing granules, translates the granules into STAC items, and passes them individually - to process_item - """ - item_count = sum([len(source.granules) for source in self.message.sources]) - completed = 0 - for source in self.message.sources: - for granule in source.granules: - item = Item(granule.id, util.bbox_to_geometry(granule.bbox), granule.bbox, None, { - 'start_datetime': granule.temporal.start, - 'end_datetime': granule.temporal.end - }) - item.add_asset('data', Asset(granule.url, granule.name, roles=['data'])) - result = self.process_item(item, source) - if not result: - continue - assets = [v for k, v in result.assets.items() if 'data' in (v.roles or [])] - completed += 1 - progress = int(100 * completed / item_count) - for asset in assets: - temporal = Temporal({}, result.properties['start_datetime'], result.properties['end_datetime']) - common_args = dict( - title=asset.title, - mime=asset.media_type, - source_granule=granule, - temporal=temporal, - bbox=result.bbox - ) - if self.message.isSynchronous: - self.completed_with_redirect(asset.href, **common_args) - return - self.async_add_url_partial_result(asset.href, progress=progress, **common_args) - self.async_completed_successfully() - def process_item(self, item, source): """ Given a pystac.Item and a message.Source (collection and variables to subset), processes the @@ -299,496 +233,3 @@ def _get_item_source(self, item): href = sources[0].target collection = href.split('/').pop() return next(source for source in self.message.sources if source.collection == collection) - - # All methods below are deprecated as we move to STAC-based chaining workflows without callbacks - - @deprecated(details='Services must update to process and output STAC catalogs') - def cleanup(self): - """ - Removes temporary files produced during execution - """ - for temp_path in self.temp_paths: - if os.path.isfile(temp_path): - os.remove(temp_path) # remove the file - elif os.path.isdir(temp_path): - shutil.rmtree(temp_path) # remove dir and all contents - self.temp_paths = [] - - @deprecated(details='Services must update to process and output STAC catalogs') - def download_granules(self, granules=None): - """ - Downloads all of the granules contained in the message to the given temp directory, giving each - a unique filename. - - Parameters - ---------- - granules : list - A list of harmony_service_lib.message.Granule objects corresponding to the granules to download. Default: - all granules in the incoming message - """ - temp_dir = mkdtemp() - self.temp_paths += [temp_dir] - - granules = granules or self.message.granules - - # Download the remote file - for granule in granules: - granule.local_filename = util.download(granule.url, temp_dir, logger=self.logger, - access_token=self.message.accessToken, cfg=self.config) - - @deprecated(details='Services must update to process and output STAC catalogs') - def stage(self, local_file, source_granule=None, remote_filename=None, is_variable_subset=False, - is_regridded=False, is_subsetted=False, mime=None): - """ - Stages a file on the local filesystem to S3 with the given remote filename and mime type for - user access. - - Parameters - ---------- - local_file : string - The path and name of the file to stage - source_granule : message.Granule, optional - The granule from which the file was derived, if it was derived from a single granule. This - will be used to produce a canonical filename - remote_filename : string, optional - The name of the file when staged, which will be visible to the user requesting data. - Specify this if not providing a source granule. If neither remote_filename nor source_granule - is provided, the output file will use the file's basename - is_variable_subset : bool, optional - True if a variable subset operation has been performed (default: False) - is_regridded : bool, optional - True if a regridding operation has been performed (default: False) - is_subsetted : bool, optional - True if a subsetting operation has been performed (default: False) - mime : string, optional - The mime type of the file, by default the output mime type requested by Harmony - - Returns - ------- - string - A URI to the staged file - """ - if remote_filename is None: - if source_granule: - remote_filename = self.filename_for_granule(source_granule, os.path.splitext( - local_file)[1], is_variable_subset, is_regridded, is_subsetted) - else: - remote_filename = os.path.basename(local_file) - - if mime is None: - mime = self.message.format.mime - - return util.stage(local_file, remote_filename, mime, location=self.message.stagingLocation, - logger=self.logger, cfg=self.config) - - @deprecated(details='Services must update to process and output STAC catalogs') - def completed_with_error(self, error_message): - """ - Performs a callback instructing Harmony that there has been an error and providing a - message to send back to the service user - - Parameters - ---------- - error_message : string - The error message to pass on to the service user - - Raises - ------ - Exception - If a callback has already been performed - """ - self.is_failed = True - if self.is_complete and not self.is_canceled: - raise Exception( - 'Attempted to error an already-complete service call with message ' + error_message) - self._callback_response({'error': error_message}) - self.is_complete = True - - @deprecated(details='Services must update to process and output STAC catalogs') - def completed_with_redirect( - self, - url, - title=None, - mime=None, - source_granule=None, - temporal=None, - bbox=None): - """ - Performs a callback instructing Harmony to redirect the service user to the given URL - - Parameters - ---------- - url : string - The URL where the service user should be redirected - mime : string, optional - The mime type of the file, by default the output mime type requested by Harmony - title : string, optional - Textual information to provide users along with the link - temporal : harmony_service_lib.message.Temporal, optional - The temporal extent of the provided file. If not provided, the source granule's - temporal will be used when a source granule is provided - bbox : list, optional - List of [West, South, East, North] for the MBR of the provided result. If not provided, - the source granule's bbox will be used when a source granule is provided - - Raises - ------ - Exception - If a callback has already been performed - """ - - if self.is_complete: - raise Exception( - 'Attempted to redirect an already-complete service call to ' + url) - params = self._build_callback_item_params(url, mime=mime, source_granule=source_granule) - params['status'] = 'successful' - self._callback_response(params) - self.is_complete = True - - @deprecated(details='Services must update to process and output STAC catalogs') - def completed_with_local_file( - self, - filename, - source_granule=None, - remote_filename=None, - is_variable_subset=False, - is_regridded=False, - is_subsetted=False, - mime=None, - title=None, - temporal=None, - bbox=None): - """ - Indicates that the service has completed with the given file as its result. Stages the - provided local file to a user-accessible S3 location and instructs Harmony to redirect - to that location. - - Parameters - ---------- - filename : string - The path and name of the local file - source_granule : message.Granule, optional - The granule from which the file was derived, if it was derived from a single granule. This - will be used to produce a canonical filename - remote_filename : string, optional - The name of the file when staged, which will be visible to the user requesting data. - Specify this if not providing a source granule. If neither remote_filename nor source_granule - is provided, the output file will use the file's basename - is_variable_subset : bool, optional - True if a variable subset operation has been performed (default: False) - is_regridded : bool, optional - True if a regridding operation has been performed (default: False) - is_subsetted : bool, optional - True if a subsetting operation has been performed (default: False) - mime : string, optional - The mime type of the file, by default the output mime type requested by Harmony - title : string, optional - Textual information to provide users along with the link - temporal : harmony_service_lib.message.Temporal, optional - The temporal extent of the provided file. If not provided, the source granule's - temporal will be used when a source granule is provided - bbox : list, optional - List of [West, South, East, North] for the MBR of the provided result. If not provided, - the source granule's bbox will be used when a source granule is provided - - Raises - ------ - Exception - If a callback has already been performed - """ - url = self.stage(filename, source_granule, remote_filename, - is_variable_subset, is_regridded, is_subsetted, mime) - self.completed_with_redirect(url, title, mime, source_granule, temporal, bbox) - - @deprecated(details='Services must update to process and output STAC catalogs') - def async_add_local_file_partial_result( - self, - filename, - source_granule=None, - remote_filename=None, - is_variable_subset=False, - is_regridded=False, - is_subsetted=False, - title=None, - mime=None, - progress=None, - temporal=None, - bbox=None): - """ - For service requests that are asynchronous, stages the given filename and sends the staged - URL as a progress update to Harmony. Optionally also provides a numeric progress indicator. - Synchronous requests may not call this method and will throw an exeception. - - Parameters - ---------- - filename : string - The path and name of the local file - source_granule : message.Granule, optional - The granule from which the file was derived, if it was derived from a single granule. This - will be used to produce a canonical filename and assist when temporal and bbox are not specified - remote_filename : string, optional - The name of the file when staged, which will be visible to the user requesting data. - Specify this if not providing a source granule. If neither remote_filename nor source_granule - is provided, the output file will use the file's basename - is_variable_subset : bool, optional - True if a variable subset operation has been performed (default: False) - is_regridded : bool, optional - True if a regridding operation has been performed (default: False) - is_subsetted : bool, optional - True if a subsetting operation has been performed (default: False) - title : string, optional - Textual information to provide users along with the link - mime : string, optional - The mime type of the file, by default the output mime type requested by Harmony - progress : integer, optional - Numeric progress of the total request, 0-100 - temporal : harmony_service_lib.message.Temporal, optional - The temporal extent of the provided file. If not provided, the source granule's - temporal will be used when a source granule is provided - bbox : list, optional - List of [West, South, East, North] for the MBR of the provided result. If not provided, - the source granule's bbox will be used when a source granule is provided - - Raises - ------ - Exception - If the request is synchronous or the request has already been marked complete - """ - url = self.stage(filename, source_granule, remote_filename, - is_variable_subset, is_regridded, is_subsetted, mime) - self.async_add_url_partial_result(url, title, mime, progress, source_granule, - temporal, bbox) - - @deprecated(details='Services must update to process and output STAC catalogs') - def async_add_url_partial_result(self, url, title=None, mime=None, progress=None, source_granule=None, - temporal=None, bbox=None): - """ - For service requests that are asynchronous, stages the provides the given URL as a partial result. - Optionally also provides a numeric progress indicator. - Synchronous requests may not call this method and will throw an exeception. - - Parameters - ---------- - url : string - The URL where the service user should be redirected - title : string, optional - Textual information to provide users along with the link - mime : string, optional - The mime type of the file, by default the output mime type requested by Harmony - progress : integer, optional - Numeric progress of the total request, 0-100 - source_granule : message.Granule, optional - The granule from which the file was derived, if it was derived from a single granule. This - will be used to produce a canonical filename and assist when temporal and bbox are not specified - temporal : harmony_service_lib.message.Temporal, optional - The temporal extent of the provided file. If not provided, the source granule's - temporal will be used when a source granule is provided - bbox : list, optional - List of [West, South, East, North] for the MBR of the provided result. If not provided, - the source granule's bbox will be used when a source granule is provided - - Raises - ------ - Exception - If the request is synchronous or the request has already been marked complete - """ - if self.message.isSynchronous: - raise Exception( - 'Attempted to call back asynchronously to a synchronous request') - if self.is_complete: - raise Exception( - 'Attempted to add a result to an already-completed request: ' + url) - params = self._build_callback_item_params(url, title, mime, source_granule, temporal, bbox) - if progress is not None: - params['progress'] = progress - - self._callback_response(params) - - @deprecated(details='Services must update to process and output STAC catalogs') - def async_completed_successfully(self): - """ - For service requests that are asynchronous, sends a progress update indicating - that a service request is complete. - Synchronous requests may not call this method and will throw an exeception. - - Raises - ------ - Exception - If the request is synchronous or the request has already been marked complete - """ - if self.message.isSynchronous: - raise Exception( - 'Attempted to call back asynchronously to a synchronous request') - if self.is_complete: - raise Exception( - 'Attempted to call back for an already-completed request.') - self._callback_response({'status': 'successful'}) - self.is_complete = True - - @deprecated(details='Services must update to process and output STAC catalogs') - def filename_for_granule(self, granule, ext, is_variable_subset=False, is_regridded=False, is_subsetted=False): - """ - Return an output filename for the given granules according to our naming conventions: - {original filename without suffix}(_{single var})?(_regridded)?(_subsetted)?. - - Parameters - ---------- - granule : message.Granule - The source granule for the output file - ext: string - The destination file extension - is_variable_subset : bool, optional - True if a variable subset operation has been performed (default: False) - is_regridded : bool, optional - True if a regridding operation has been performed (default: False) - is_subsetted : bool, optional - True if a subsetting operation has been performed (default: False) - - Returns - ------- - string - The output filename - """ - url = granule.url - # Get everything between the last non-trailing '/' before the query and the first '?' - # Do this instead of using a URL parser, because our URLs are not complex in practice and - # it is useful to allow relative file paths to work for local testing. - original_filename = url.split('?')[0].rstrip('/').split('/')[-1] - original_basename = os.path.splitext(original_filename)[0] - if not ext.startswith('.'): - ext = '.' + ext - - suffixes = [] - if is_variable_subset and len(granule.variables) == 1: - suffixes.append('_' + granule.variables[0].name.replace('/', '_')) - if is_regridded: - suffixes.append('_regridded') - if is_subsetted: - suffixes.append('_subsetted') - suffixes.append(ext) - - result = original_basename - # Iterate suffixes in reverse, removing them from the result if they're at the end of the string - # This supports the case of chaining where one service regrids and another subsets but we don't - # want names to get mangled - for suffix in suffixes[::-1]: - if result.endswith(suffix): - result = result[:-len(suffix)] - - return result + "".join(suffixes) - - # Deprecated internal methods below - - def _build_callback_item_params( - self, - url, - title=None, - mime=None, - source_granule=None, - temporal=None, - bbox=None): - """ - Builds the "item[...]" parameters required for a callback to Harmony for the given - params, returning them as a string param / string value dict. - - Parameters - ---------- - url : string - The URL where the service user should be redirected - title : string, optional - Textual information to provide users along with the link - mime : string, optional - The mime type of the file, by default the output mime type requested by Harmony - source_granule : message.Granule, optional - The granule from which the file was derived, if it was derived from a single granule. This - will be used to produce a canonical filename and assist when temporal and bbox are not specified - temporal : harmony_service_lib.message.Temporal, optional - The temporal extent of the provided file. If not provided, the source granule's - temporal will be used when a source granule is provided - bbox : list, optional - List of [West, South, East, North] for the MBR of the provided result. If not provided, - the source granule's bbox will be used when a source granule is provided - - Returns - ------- - dict - A dictionary containing a mapping of query parameters to value for the given params - """ - if mime is None: - mime = self.message.format.mime - if source_granule is not None: - temporal = temporal or source_granule.temporal - bbox = bbox or source_granule.bbox - - params = {'item[href]': url, 'item[type]': mime} - if title is not None: - params['item[title]'] = title - if temporal is not None: - params['item[temporal]'] = ','.join([temporal.start, temporal.end]) - if bbox is not None: - params['item[bbox]'] = ','.join([str(c) for c in bbox]) - return params - - def _callback_response(self, query_params): - """ - POSTs to the Harmony callback URL at the given path with the given params - - Parameters - ---------- - query_params : dict - A mapping of string key to string value query params to send to the callback - - Returns - ------- - None - """ - - param_strs = ['%s=%s' % (k, urllib.parse.quote(str(v))) - for k, v in query_params.items()] - self._callback_post('/response?' + '&'.join(param_strs)) - - def _callback_post(self, path): - """ - POSTs to the Harmony callback URL at the given path, which may include query params - - Parameters - ---------- - path : string - The URL path relative to the Harmony callback URL which should be POSTed to - - Returns - ------- - None - """ - - url = self.message.callback + path - touch_health_check_file(self.config.health_check_path) - if os.environ.get('ENV') in ['dev', 'test']: - self.logger.warning( - 'ENV=' + os.environ['ENV'] + ' so we will not reply to Harmony with POST ' + url) - elif self.is_canceled: - msg = 'Ignoring making callback request because the request has been canceled.' - self.logger.info(msg) - else: - self.logger.info('Starting response: %s', url) - request = urllib.request.Request(url, method='POST') - try: - response = \ - urllib.request.urlopen(request).read().decode('utf-8') - self.logger.info('Remote response: %s', response) - self.logger.info('Completed response: %s', url) - except urllib.error.HTTPError as e: - self.is_failed = True - body = e.read().decode() - msg = f'Harmony returned an error when updating the job: {body}' - self.logger.error(msg, exc_info=e) - if e.code == 409: - self.logger.warning('Harmony request was canceled.') - self.is_canceled = True - self.is_complete = True - raise CanceledException - raise - except Exception as e: - self.is_failed = True - self.logger.error('Error when updating the job', exc_info=e) - raise diff --git a/harmony_service_lib/aws.py b/harmony_service_lib/aws.py index 1400f68..42e056f 100644 --- a/harmony_service_lib/aws.py +++ b/harmony_service_lib/aws.py @@ -1,6 +1,5 @@ """ -This module includes various AWS-specific functions to stage data in S3 and deal with -messages in SQS queues. +This module includes various AWS-specific functions to stage data in S3. This module relies on the harmony_service_lib.util.config and its environment variables to be set for correct operation. See that module and the project README for details. @@ -76,7 +75,7 @@ def _get_aws_client(config, service, user_agent=None): config : harmony_service_lib.util.Config The configuration for the current runtime environment. service : string - The AWS service name for which to construct a client, e.g. "s3" or "sqs" + The AWS service name for which to construct a client, e.g. "s3" user_agent : string The user agent that is requesting the aws service. E.g. harmony/0.0.0 (harmony-sit) harmony-service-lib/4.0 (gdal-subsetter) @@ -162,85 +161,3 @@ def stage(config, local_filename, remote_filename, mime, logger, location=None): s3.upload_file(local_filename, staging_bucket, key, ExtraArgs={'ContentType': mime}) return 's3://%s/%s' % (staging_bucket, key) - - -def receive_messages(config, queue_url, visibility_timeout_s, logger): - """ - Generates successive messages from reading the queue. The caller - is responsible for deleting or returning each message to the queue - - Parameters - ---------- - config : harmony_service_lib.util.Config - The configuration for the current runtime environment. - queue_url : string - The URL of the queue to receive messages on - visibility_timeout_s : int - The number of seconds to wait for a received message to be deleted - before it is returned to the queue - - Yields - ------ - receiptHandle, body : string, string - A tuple of the receipt handle, used to delete or update messages, - and the contents of the message - """ - if visibility_timeout_s is None: - visibility_timeout_s = 600 - - sqs = _get_aws_client(config, 'sqs') - logger.info('Listening on %s' % (queue_url,)) - while True: - receive_params = dict( - QueueUrl=queue_url, - VisibilityTimeout=visibility_timeout_s, - WaitTimeSeconds=20, - MaxNumberOfMessages=1 - ) - response = sqs.receive_message(**receive_params) - messages = response.get('Messages') or [] - if len(messages) == 1: - yield (messages[0]['ReceiptHandle'], messages[0]['Body']) - else: - logger.info('No messages received. Retrying.') - - -def delete_message(config, queue_url, receipt_handle): - """ - Deletes the message with the given receipt handle from the provided queue URL, - indicating successful processing - - Parameters - ---------- - config : harmony_service_lib.util.Config - The configuration for the current runtime environment. - queue_url : string - The queue from which the message originated - receipt_handle : string - The receipt handle of the message, as yielded by `receive_messages` - """ - sqs = _get_aws_client(config, 'sqs') - sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle) - - -def change_message_visibility(config, queue_url, receipt_handle, visibility_timeout_s): - """ - Updates the message visibility timeout of the message with the given receipt handle - - Parameters - ---------- - config : harmony_service_lib.util.Config - The configuration for the current runtime environment. - queue_url : string - The queue from which the message originated - receipt_handle : string - The receipt handle of the message, as yielded by `receive_messages` - visibility_timeout_s : int - The number of additional seconds to wait for a received message to be deleted - before it is returned to the queue - """ - sqs = _get_aws_client(config, 'sqs') - sqs.change_message_visibility( - QueueUrl=queue_url, - ReceiptHandle=receipt_handle, - VisibilityTimeout=visibility_timeout_s) diff --git a/harmony_service_lib/cli.py b/harmony_service_lib/cli.py index 041b86c..b6a8900 100644 --- a/harmony_service_lib/cli.py +++ b/harmony_service_lib/cli.py @@ -14,11 +14,10 @@ from pystac import Catalog, CatalogType from pystac.layout import BestPracticesLayoutStrategy -from harmony_service_lib.exceptions import CanceledException, HarmonyException +from harmony_service_lib.exceptions import HarmonyException from harmony_service_lib.message import Message from harmony_service_lib.logging import setup_stdout_log_formatting, build_logger -from harmony_service_lib.util import (receive_messages, delete_message, change_message_visibility, - config, create_decrypter) +from harmony_service_lib.util import (config, create_decrypter) from harmony_service_lib.version import get_version from harmony_service_lib.aws import is_s3, write_s3 from harmony_service_lib.s3_stac_io import S3StacIO @@ -59,9 +58,8 @@ def setup_cli(parser): The parser being used to parse CLI arguments """ parser.add_argument('--harmony-action', - choices=['invoke', 'start'], - help=('the action Harmony needs to perform, "invoke" to run once and quit, ' - '"start" to listen to a queue')) + choices=['invoke'], + help=('the action Harmony needs to perform, "invoke" to run once and quit')) parser.add_argument('--harmony-input', help=('the input data for the action provided by Harmony, required for ' '--harmony-action=invoke')) @@ -69,8 +67,7 @@ def setup_cli(parser): help=('the optional path to the input data for the action provided by Harmony')) parser.add_argument('--harmony-sources', help=('file path that contains a STAC catalog with items and metadata to ' - 'be processed by the service. Required for non-deprecated ' - 'invocations ')) + 'be processed by the service. Required for --harmony-action=invoke')) parser.add_argument('--harmony-metadata-dir', help=('file path where output metadata should be written. The resulting ' 'STAC catalog will be written to catalog.json in the supplied dir ' @@ -81,8 +78,6 @@ def setup_cli(parser): parser.add_argument('--harmony-data-location', help=('the location where output data should be written, either a directory ' 'or S3 URI prefix. If set, overrides any value set by the message')) - parser.add_argument('--harmony-queue-url', - help='the queue URL to listen on, required for --harmony-action=start') parser.add_argument('--harmony-visibility-timeout', type=int, default=600, @@ -111,55 +106,6 @@ def is_harmony_cli(args): return args.harmony_action is not None -def _invoke_deprecated(AdapterClass, message_string, config): - """ - Handles --harmony-action=invoke by invoking the adapter for the given input message - - Parameters - ---------- - AdapterClass : class - The BaseHarmonyAdapter subclass to use to handle service invocations - message_string : string - The Harmony input message - config : harmony_service_lib.util.Config - A configuration instance for this service - Returns - ------- - True if the operation completed successfully, False otherwise - """ - - secret_key = config.shared_secret_key - decrypter = create_decrypter(bytes(secret_key, 'utf-8')) - - message_data = json.loads(message_string) - adapter = AdapterClass(Message(message_data, decrypter)) - adapter.set_config(config) - - try: - adapter.invoke() - if not adapter.is_complete: - adapter.completed_with_error('The backend service did not respond') - - except CanceledException: - # If we see the request has been canceled do not try calling back to harmony again - # Enable this logging after fixing HARMONY-410 - # logging.error('Service request canceled by Harmony, exiting') - pass - except HarmonyException as e: - logging.error(e, exc_info=1) - if not adapter.is_complete: - adapter.completed_with_error(str(e)) - except BaseException as e: - # Make sure we always call back if the error is in a Harmony invocation and we have - # successfully parsed enough that we know where to call back to - logging.error(e, exc_info=1) - if not adapter.is_complete: - msg = 'Service request failed with an unknown error' - adapter.completed_with_error(msg) - raise - return not adapter.is_failed - - def _write_error(metadata_dir, message, category='Unknown'): """ Writes the given error message to error.json in the provided metadata dir @@ -203,7 +149,8 @@ def _build_adapter(AdapterClass, message_string, sources_path, data_location, co BaseHarmonyAdapter subclass instance The adapter to be invoked """ - catalog = Catalog.from_file(sources_path) + catalog = Catalog.from_file(sources_path) if bool(sources_path) else None + secret_key = config.shared_secret_key if bool(secret_key): @@ -265,47 +212,6 @@ def _invoke(adapter, metadata_dir): raise -def _start(AdapterClass, queue_url, visibility_timeout_s, config): - """ - Handles --harmony-action=start by listening to the given queue_url and invoking the - AdapterClass on any received messages - - Parameters - ---------- - AdapterClass : class - The BaseHarmonyAdapter subclass to use to handle service invocations - queue_url : string - The SQS queue to listen on - visibility_timeout_s : int - The time interval during which the message can't be picked up by other - listeners on the queue. - config : harmony_service_lib.util.Config - A configuration instance for this service - """ - for receipt, message in receive_messages(queue_url, visibility_timeout_s, cfg=config): - # Behavior here is slightly different than _invoke. Whereas _invoke ensures - # that the backend receives a callback whenever possible in the case of an - # exception, the message queue listener prefers to let the message become - # visibile again and let retry and dead letter queue policies determine visibility - adapter = AdapterClass(Message(message)) - adapter.set_config(config) - - try: - adapter.invoke() - except Exception: - logging.error('Adapter threw an exception', exc_info=True) - finally: - if adapter.is_complete: - delete_message(queue_url, receipt, cfg=config) - else: - change_message_visibility(queue_url, receipt, 0, cfg=config) - try: - adapter.cleanup() - except Exception: - logging.error( - 'Adapter threw an exception on cleanup', exc_info=True) - - def run_cli(parser, args, AdapterClass, cfg=None): """ Runs the Harmony CLI invocation captured by the given args @@ -336,10 +242,9 @@ def run_cli(parser, args, AdapterClass, cfg=None): if not bool(args.harmony_input): parser.error( '--harmony-input or --harmony-input-file must be provided for --harmony-action=invoke') - elif not bool(args.harmony_sources): - successful = _invoke_deprecated(AdapterClass, args.harmony_input, cfg) - if not successful: - raise Exception('Service operation failed') + elif not bool(args.harmony_metadata_dir): + parser.error( + '--harmony-metadata-dir must be provided for --harmony-action=invoke') else: adapter = None try: @@ -355,15 +260,16 @@ def run_cli(parser, args, AdapterClass, cfg=None): duration_ms = int(round(time_diff.total_seconds() * 1000)) duration_logger = build_logger(cfg) extra_fields = { - 'user': adapter.message.user if adapter else '', - 'requestId': adapter.message.requestId if adapter else '', + 'user': ( + adapter.message.user + if adapter and adapter.message and hasattr(adapter.message, "user") + else '' + ), + 'requestId': ( + adapter.message.requestId + if adapter and adapter.message and hasattr(adapter.message, "requestId") + else '' + ), 'durationMs': duration_ms } duration_logger.info(f'timing.{cfg.app_name}.end', extra=extra_fields) - - if args.harmony_action == 'start': - if not bool(args.harmony_queue_url): - parser.error( - '--harmony-queue-url must be provided for --harmony-action=start') - else: - return _start(AdapterClass, args.harmony_queue_url, args.harmony_visibility_timeout, cfg) diff --git a/harmony_service_lib/message.py b/harmony_service_lib/message.py index a1e308f..b66720f 100644 --- a/harmony_service_lib/message.py +++ b/harmony_service_lib/message.py @@ -582,9 +582,7 @@ class Message(JsonObject): version : string The semantic version of the Harmony message contained in the provided JSON callback : string - The URL that services must POST to when their execution is complete. Services - should use the `completed_with_*` methods of a Harmony Adapter to perform - callbacks to ensure compatibility, rather than directly using this URL + (Deprecated) The URL that services must POST to when their execution is complete. stagingLocation : string An object store (S3) URL prefix under which services may elect to put their output. Services must have write access to the Harmony staging bucket for the deployed diff --git a/harmony_service_lib/util.py b/harmony_service_lib/util.py index 7e466ee..f1d145e 100644 --- a/harmony_service_lib/util.py +++ b/harmony_service_lib/util.py @@ -45,8 +45,6 @@ APP_NAME: A name for the service that will appear in log entries. ENV: The application environment. One of: dev, test. Used for local development. TEXT_LOGGER: Whether to log in plaintext or JSON. Default: True (plaintext). - HEALTH_CHECK_PATH: The filesystem path that should be `touch`ed to indicate the service is - alive. MAX_DOWNLOAD_RETRIES: Number of times to retry HTTP download calls that fail due to transient errors. """ @@ -96,7 +94,6 @@ 'staging_bucket', 'env', 'text_logger', - 'health_check_path', 'shared_secret_key', 'user_agent', 'max_download_retries' @@ -193,7 +190,6 @@ def int_envvar(name: str, default: int) -> int: staging_bucket=str_envvar('STAGING_BUCKET', None), env=str_envvar('ENV', ''), text_logger=bool_envvar('TEXT_LOGGER', False), - health_check_path=str_envvar('HEALTH_CHECK_PATH', '/tmp/health.txt'), shared_secret_key=str_envvar('SHARED_SECRET_KEY', DEFAULT_SHARED_SECRET_KEY), user_agent=str_envvar('USER_AGENT', 'harmony (unknown version)'), max_download_retries=int_envvar('MAX_DOWNLOAD_RETRIES', 0) @@ -377,89 +373,6 @@ def stage(local_filename, remote_filename, mime, logger=None, location=None, cfg return aws.stage(cfg, local_filename, remote_filename, mime, logger, location) -def receive_messages(queue_url, visibility_timeout_s=600, logger=None, cfg=None): - """ - Generates successive messages from reading the queue. The caller - is responsible for deleting or returning each message to the queue - - Parameters - ---------- - queue_url : string - The URL of the queue to receive messages on - visibility_timeout_s : int - The number of seconds to wait for a received message to be deleted - before it is returned to the queue - cfg : harmony_service_lib.util.Config - The configuration values for this runtime environment. - - Yields - ------ - receiptHandle, body : string, string - A tuple of the receipt handle, used to delete or update messages, - and the contents of the message - """ - # The implementation of this function has been moved to the - # harmony_service_lib.aws module. - if cfg is None: - cfg = config() - if logger is None: - logger = build_logger(cfg) - - touch_health_check_file(cfg.health_check_path) - return aws.receive_messages(cfg, queue_url, visibility_timeout_s, logger) - - -def delete_message(queue_url, receipt_handle, cfg=None): - """ - Deletes the message with the given receipt handle from the provided queue URL, - indicating successful processing - - Parameters - ---------- - queue_url : string - The queue from which the message originated - receipt_handle : string - The receipt handle of the message, as yielded by `receive_messages` - cfg : harmony_service_lib.util.Config - The configuration values for this runtime environment. - """ - # The implementation of this function has been moved to the - # harmony_service_lib.aws module. - if cfg is None: - cfg = config() - return aws.delete_message(cfg, queue_url, receipt_handle) - - -def change_message_visibility(queue_url, receipt_handle, visibility_timeout_s, cfg=None): - """ - Updates the message visibility timeout of the message with the given receipt handle - - Parameters - ---------- - queue_url : string - The queue from which the message originated - receipt_handle : string - The receipt handle of the message, as yielded by `receive_messages` - visibility_timeout_s : int - The number of additional seconds to wait for a received message to be deleted - before it is returned to the queue - cfg : harmony_service_lib.util.Config - The configuration values for this runtime environment. - """ - # The implementation of this function has been moved to the - # harmony_service_lib.aws module. - if cfg is None: - cfg = config() - return aws.change_message_visibility(cfg, queue_url, receipt_handle, visibility_timeout_s) - - -def touch_health_check_file(health_check_path): - """ - Updates the mtime of the health check file. - """ - Path(health_check_path).touch() - - def create_decrypter(key=b'_THIS_IS_MY_32_CHARS_SECRET_KEY_'): """Creates a function that will decrypt cyphertext using a shared secret (symmetric) 32-byte key. diff --git a/tests/test_adapter_deprecated.py b/tests/test_adapter_deprecated.py deleted file mode 100644 index 1d362fa..0000000 --- a/tests/test_adapter_deprecated.py +++ /dev/null @@ -1,357 +0,0 @@ -""" -Tests deprecated methods and non-STAC invocation styles on BaseHarmonyAdapter -""" - -import unittest -from unittest.mock import patch, MagicMock -from tempfile import NamedTemporaryFile, mkdtemp -import os -from os import path, remove -import pathlib -from shutil import rmtree -from urllib.error import HTTPError - -from harmony_service_lib.adapter import BaseHarmonyAdapter -from harmony_service_lib.message import Message, Granule, Variable, Temporal -import harmony_service_lib.util -from harmony_service_lib.exceptions import CanceledException -from .example_messages import minimal_message, full_message - - -# BaseHarmonyAdapter is abstract, so tests need a minimal concrete class -class AdapterTester(BaseHarmonyAdapter): - def __init__(self, message_str, config): - super().__init__(Message(message_str), config=config) - - def invoke(self): - pass - - -class MockHTTPError(HTTPError): - def __init__(self, url='http://example.com', code=409, msg='Harmony canceled request', hdrs=[], fp=None): - super().__init__(url, code, msg, hdrs, fp) - - def read(self): - return MagicMock(return_value='request body') - - -class TestBaseHarmonyAdapter(unittest.TestCase): - def setUp(self): - self.config = harmony_service_lib.util.config(validate=False) - - def test_cleanup_deletes_temporary_file_paths(self): - adapter = AdapterTester(minimal_message, self.config) - f = NamedTemporaryFile(delete=False) - try: - f.close() - adapter.temp_paths += [f.name] - self.assertTrue(path.exists(f.name)) - - adapter.cleanup() - - self.assertFalse(path.exists(f.name)) - finally: - if path.exists(f.name): - remove(f.name) - - def test_cleanup_deletes_temporary_directory_paths(self): - adapter = AdapterTester(minimal_message, self.config) - dirname = mkdtemp() - try: - adapter.temp_paths += [dirname] - self.assertTrue(path.exists(dirname)) - - adapter.cleanup() - - self.assertFalse(path.exists(dirname)) - finally: - if path.exists(dirname): - rmtree(dirname) - - def test_download_granules_fetches_remote_granules_and_stores_their_path(self): - adapter = AdapterTester(full_message, self.config) - try: - adapter.download_granules() - granules = adapter.message.granules - self.assertEqual(granules[0].local_filename, 'example/example_granule_1.txt') - self.assertEqual(granules[1].local_filename, 'example/example_granule_2.txt') - self.assertEqual(granules[2].local_filename, 'example/example_granule_3.txt') - self.assertEqual(granules[3].local_filename, 'example/example_granule_4.txt') - finally: - adapter.cleanup() - - def test_download_granules_adds_granule_temp_dir_to_temp_paths(self): - adapter = AdapterTester(full_message, self.config) - try: - self.assertEqual(len(adapter.temp_paths), 0) - adapter.download_granules() - self.assertEqual(len(adapter.temp_paths), 1) - finally: - adapter.cleanup() - - @patch.object(BaseHarmonyAdapter, '_callback_post') - def test_completed_with_error_when_no_callback_has_been_made_it_posts_the_error(self, _callback_post): - adapter = AdapterTester(full_message, self.config) - adapter.completed_with_error('ohai there') - _callback_post.assert_called_with('/response?error=ohai%20there') - - def test_completed_with_error_when_a_callback_has_been_made_it_throws_an_exception(self): - adapter = AdapterTester(full_message, self.config) - adapter.completed_with_error('ohai there') - self.assertRaises(Exception, adapter.completed_with_error, 'ohai there again') - - @patch.object(BaseHarmonyAdapter, '_callback_post') - def test_completed_with_redirect_when_no_callback_has_been_made_it_posts_the_redirect(self, _callback_post): - adapter = AdapterTester(full_message, self.config) - adapter.completed_with_redirect('https://example.com') - _callback_post.assert_called_with('/response' - '?item[href]=https%3A//example.com' - '&item[type]=image/tiff' - '&status=successful') - - @patch.object(BaseHarmonyAdapter, '_callback_post') - def test_completed_with_redirect_accepts_item_metadata(self, _callback_post): - adapter = AdapterTester(full_message, self.config) - granule = adapter.message.sources[0].granules[0] - temporal = Temporal(start='2011-11-11T11:11:11Z', end='2011-11-11T11:11:12Z') - adapter.completed_with_redirect('https://example.com', title='hi', mime='image/tiff', source_granule=granule, - temporal=temporal, bbox=[1.1, 2.2, 3.3, 4.4]) - _callback_post.assert_called_with('/response' - '?item[href]=https%3A//example.com' - '&item[type]=image/tiff' - '&item[temporal]=2001-01-01T01%3A01%3A01Z%2C2002-02-02T02%3A02%3A02Z' - '&item[bbox]=-1%2C-2%2C3%2C4' - '&status=successful') - - def test_completed_with_redirect_when_a_callback_has_been_made_it_throws_an_exception(self): - adapter = AdapterTester(full_message, self.config) - adapter.completed_with_redirect('https://example.com/1') - self.assertRaises(Exception, adapter.completed_with_error, 'https://example.com/2') - - @patch.object(BaseHarmonyAdapter, '_callback_post') - @patch.object(harmony_service_lib.util, 'stage', return_value='https://example.com/out') - def test_completed_with_local_file_stages_the_local_file_and_redirects_to_it(self, stage, _callback_post): - adapter = AdapterTester(full_message, self.config) - adapter.completed_with_local_file('tmp/output.tif', remote_filename='out.tif') - stage.assert_called_with('tmp/output.tif', - 'out.tif', - 'image/tiff', - location='s3://example-bucket/public/some-org/some-service/some-uuid/', - logger=adapter.logger, - cfg=self.config) - _callback_post.assert_called_with('/response' - '?item[href]=https%3A//example.com/out' - '&item[type]=image/tiff' - '&status=successful') - - @patch.object(BaseHarmonyAdapter, '_callback_post') - @patch.object(harmony_service_lib.util, 'stage', return_value='https://example.com/out') - def test_completed_with_local_file_uses_granule_file_naming(self, stage, _callback_post): - adapter = AdapterTester(full_message, self.config) - granule = adapter.message.sources[0].granules[0] - adapter.completed_with_local_file('tmp/output.tif', source_granule=granule, - is_variable_subset=True, is_regridded=True, is_subsetted=True) - stage.assert_called_with('tmp/output.tif', - 'example_granule_1_ExampleVar1_regridded_subsetted.tif', - 'image/tiff', - location='s3://example-bucket/public/some-org/some-service/some-uuid/', - logger=adapter.logger, - cfg=self.config) - _callback_post.assert_called_with('/response' - '?item[href]=https%3A//example.com/out' - '&item[type]=image/tiff' - '&item[temporal]=2001-01-01T01%3A01%3A01Z%2C2002-02-02T02%3A02%3A02Z' - '&item[bbox]=-1%2C-2%2C3%2C4' - '&status=successful') - - @patch.object(BaseHarmonyAdapter, '_callback_post') - def test_async_add_url_partial_result_for_async_incomplete_requests_posts_the_url(self, _callback_post): - adapter = AdapterTester(full_message, self.config) - adapter.message.isSynchronous = False - adapter.async_add_url_partial_result('https://example.com') - _callback_post.assert_called_with('/response?item[href]=https%3A//example.com&item[type]=image/tiff') - - def test_async_add_url_partial_result_for_sync_requests_throws_an_error(self): - adapter = AdapterTester(full_message, self.config) - adapter.message.isSynchronous = True - self.assertRaises(Exception, adapter.async_add_url_partial_result, 'https://example.com/2') - - def test_async_add_url_partial_result_for_complete_requests_throws_an_error(self): - adapter = AdapterTester(full_message, self.config) - adapter.message.isSynchronous = False - adapter.completed_with_redirect('https://example.com/1') - self.assertRaises(Exception, adapter.async_add_url_partial_result, 'https://example.com/2') - - @patch.object(BaseHarmonyAdapter, '_callback_post') - def test_async_add_url_partial_result_supplies_bbox_and_temporal_when_provided(self, _callback_post): - adapter = AdapterTester(full_message, self.config) - adapter.message.isSynchronous = False - temporal = Temporal(start='2011-11-11T11:11:11Z', end='2011-11-11T11:11:12Z') - adapter.async_add_url_partial_result('https://example.com', temporal=temporal, bbox=[1.1, 2.2, 3.3, 4.4]) - _callback_post.assert_called_with('/response' - '?item[href]=https%3A//example.com' - '&item[type]=image/tiff' - '&item[temporal]=2011-11-11T11%3A11%3A11Z%2C2011-11-11T11%3A11%3A12Z' - '&item[bbox]=1.1%2C2.2%2C3.3%2C4.4') - - @patch.object(BaseHarmonyAdapter, '_callback_post') - def test_async_add_url_partial_result_uses_granule_bbox_and_temporal_when_provided(self, _callback_post): - adapter = AdapterTester(full_message, self.config) - adapter.message.isSynchronous = False - granule = adapter.message.sources[0].granules[0] - adapter.async_add_url_partial_result('https://example.com', source_granule=granule) - _callback_post.assert_called_with('/response' - '?item[href]=https%3A//example.com' - '&item[type]=image/tiff' - '&item[temporal]=2001-01-01T01%3A01%3A01Z%2C2002-02-02T02%3A02%3A02Z' - '&item[bbox]=-1%2C-2%2C3%2C4') - - @patch.object(BaseHarmonyAdapter, '_callback_post') - def test_async_add_url_partial_result_prefers_explicit_bbox_and_temporal_over_granule_values(self, _callback_post): - adapter = AdapterTester(full_message, self.config) - adapter.message.isSynchronous = False - granule = adapter.message.sources[0].granules[0] - temporal = Temporal(start='2011-11-11T11:11:11Z', end='2011-11-11T11:11:12Z') - adapter.async_add_url_partial_result('https://example.com', source_granule=granule, - temporal=temporal, bbox=[1.1, 2.2, 3.3, 4.4]) - _callback_post.assert_called_with('/response' - '?item[href]=https%3A//example.com' - '&item[type]=image/tiff' - '&item[temporal]=2011-11-11T11%3A11%3A11Z%2C2011-11-11T11%3A11%3A12Z' - '&item[bbox]=1.1%2C2.2%2C3.3%2C4.4') - - @patch.object(BaseHarmonyAdapter, '_callback_post') - def test_async_completed_successfully_for_async_incomplete_requests_posts_the_completion_status(self, _callback_post): - adapter = AdapterTester(full_message, self.config) - adapter.message.isSynchronous = False - adapter.async_completed_successfully() - _callback_post.assert_called_with('/response?status=successful') - - def test_async_completed_successfully_for_sync_requests_throws_an_error(self): - adapter = AdapterTester(full_message, self.config) - adapter.message.isSynchronous = True - self.assertRaises(Exception, adapter.async_completed_successfully) - - def test_async_completed_successfully_for_complete_requests_throws_an_error(self): - adapter = AdapterTester(full_message, self.config) - adapter.message.isSynchronous = False - adapter.async_completed_successfully() - self.assertRaises(Exception, adapter.async_completed_successfully) - - @patch.object(BaseHarmonyAdapter, '_callback_post') - @patch.object(harmony_service_lib.util, 'stage', return_value='https://example.com/out') - def test_async_add_local_file_partial_result_stages_the_local_file_and_updates_progress(self, stage, _callback_post): - adapter = AdapterTester(full_message, self.config) - adapter.message.isSynchronous = False - adapter.async_add_local_file_partial_result('tmp/output.tif', remote_filename='out.tif', title='my file', - progress=50) - stage.assert_called_with('tmp/output.tif', - 'out.tif', - 'image/tiff', - location='s3://example-bucket/public/some-org/some-service/some-uuid/', - logger=adapter.logger, - cfg=self.config) - _callback_post.assert_called_with('/response' - '?item[href]=https%3A//example.com/out' - '&item[type]=image/tiff' - '&item[title]=my%20file' - '&progress=50') - - @patch.object(BaseHarmonyAdapter, '_callback_post') - @patch.object(harmony_service_lib.util, 'stage', return_value='https://example.com/out') - def test_async_add_local_file_partial_result_uses_granule_file_naming(self, stage, _callback_post): - adapter = AdapterTester(full_message, self.config) - adapter.message.isSynchronous = False - granule = adapter.message.sources[0].granules[0] - adapter.async_add_local_file_partial_result('tmp/output.tif', source_granule=granule, - is_variable_subset=True, is_regridded=True, is_subsetted=True, - title='my file', progress=50) - stage.assert_called_with('tmp/output.tif', - 'example_granule_1_ExampleVar1_regridded_subsetted.tif', - 'image/tiff', - location='s3://example-bucket/public/some-org/some-service/some-uuid/', - logger=adapter.logger, - cfg=self.config) - _callback_post.assert_called_with('/response' - '?item[href]=https%3A//example.com/out' - '&item[type]=image/tiff' - '&item[title]=my%20file' - '&item[temporal]=2001-01-01T01%3A01%3A01Z%2C2002-02-02T02%3A02%3A02Z' - '&item[bbox]=-1%2C-2%2C3%2C4' - '&progress=50') - - def test_filename_for_granule(self): - adapter = AdapterTester(minimal_message, self.config) - granule = Granule({'url': 'https://example.com/fake-path/abc.123.nc/?query=true'}) - ext = 'zarr' - - # Basic cases - self.assertEqual(adapter.filename_for_granule(granule, ext), 'abc.123.zarr') - self.assertEqual(adapter.filename_for_granule(granule, ext, is_subsetted=True), 'abc.123_subsetted.zarr') - self.assertEqual(adapter.filename_for_granule(granule, ext, is_regridded=True), 'abc.123_regridded.zarr') - self.assertEqual(adapter.filename_for_granule(granule, ext, is_subsetted=True, is_regridded=True), - 'abc.123_regridded_subsetted.zarr') - self.assertEqual(adapter.filename_for_granule(granule, ext, is_variable_subset=True, is_subsetted=True, - is_regridded=True), - 'abc.123_regridded_subsetted.zarr') - - # Variable name contains full path with '/' ('/' replaced with '_') - granule.variables.append(Variable({'name': '/path/to/VarB'})) - self.assertEqual(adapter.filename_for_granule(granule, ext, is_variable_subset=True, is_subsetted=True, - is_regridded=True), - 'abc.123__path_to_VarB_regridded_subsetted.zarr') - granule.variables.pop() - - # Single variable cases - granule.variables.append(Variable({'name': 'VarA'})) - self.assertEqual(adapter.filename_for_granule(granule, ext), 'abc.123.zarr') - self.assertEqual(adapter.filename_for_granule(granule, ext, is_subsetted=True, is_regridded=True), - 'abc.123_regridded_subsetted.zarr') - self.assertEqual(adapter.filename_for_granule(granule, ext, is_variable_subset=True), 'abc.123_VarA.zarr') - self.assertEqual(adapter.filename_for_granule(granule, ext, is_variable_subset=True, is_subsetted=True, - is_regridded=True), - 'abc.123_VarA_regridded_subsetted.zarr') - - # Multiple variable cases (no variable name in suffix) - granule.variables.append(Variable({'name': 'VarB'})) - self.assertEqual(adapter.filename_for_granule(granule, ext, is_subsetted=True, is_regridded=True), - 'abc.123_regridded_subsetted.zarr') - self.assertEqual(adapter.filename_for_granule(granule, ext, is_variable_subset=True, is_subsetted=True, - is_regridded=True), - 'abc.123_regridded_subsetted.zarr') - granule.variables.pop() - - # URL already containing a suffix - granule.url = 'https://example.com/fake-path/abc.123_regridded.zarr' - self.assertEqual(adapter.filename_for_granule(granule, ext, is_subsetted=True), - 'abc.123_regridded_subsetted.zarr') - self.assertEqual(adapter.filename_for_granule(granule, ext, is_variable_subset=True, is_subsetted=True, - is_regridded=True), - 'abc.123_VarA_regridded_subsetted.zarr') - - # URL already containing all suffixes - granule.url = 'https://example.com/fake-path/abc.123_VarA_regridded_subsetted.zarr' - self.assertEqual(adapter.filename_for_granule(granule, ext, is_variable_subset=True, is_subsetted=True, - is_regridded=True), - 'abc.123_VarA_regridded_subsetted.zarr') - - @patch('urllib.request.urlopen') - @patch.dict(os.environ, {'ENV': 'not_test_we_swear'}) - def test_cancel_request(self, urlopen): - adapter = AdapterTester(minimal_message, self.config) - urlopen.side_effect = MockHTTPError(url='http://example.com', code=409, msg='Harmony canceled request', hdrs=[], fp=None) - self.assertRaises(CanceledException, adapter.async_add_url_partial_result, 'https://example.com/2') - self.assertTrue(adapter.is_canceled) - self.assertTrue(adapter.is_complete) - self.assertEqual(adapter.is_failed, 1) - - -class TestCallbackPostHealthUpdate(unittest.TestCase): - def setUp(self): - self.config = harmony_service_lib.util.config(validate=False) - - @patch.object(pathlib.Path, '__new__') - def test_callback_post_updates_health_check_file(self, mock_path): - adapter = AdapterTester(full_message, self.config) - adapter.completed_with_error('ohai there') - mock_path.return_value.touch.assert_called() diff --git a/tests/test_adapter_stac.py b/tests/test_adapter_stac.py index d7db4fb..d31942d 100644 --- a/tests/test_adapter_stac.py +++ b/tests/test_adapter_stac.py @@ -198,60 +198,3 @@ def test_altered_ids_are_retained(self): (message, out_catalog) = adapter.invoke() out_items = [item for item in out_catalog.get_items()] self.assertEqual(out_items[0].id, 'i-mutated-you') - - def test_legacy_invocations_create_stac_catalogs(self): - message = Message(full_message) - message.isSynchronous = False - adapter = AdapterTester(message, config=self.config) - adapter.invoke() - self.assertEqual(len(AdapterTester.process_args), 4) - self.assertEqual(AdapterTester.process_args[0][1], message.sources[0]) - self.assertEqual(AdapterTester.process_args[2][1], message.sources[1]) - self.assertEqual(AdapterTester.process_args[0][0].to_dict(), { - 'type': 'Feature', - 'stac_version': '1.0.0', - 'stac_extensions': [], - 'id': 'G0001-EXAMPLE', - 'properties': { - 'start_datetime': '2001-01-01T01:01:01Z', - 'end_datetime': '2002-02-02T02:02:02Z', - 'datetime': None - }, - 'geometry': { - 'coordinates': [[[-1, -2], [-1, 4], [3, 4], [3, -2], [-1, -2]]], - 'type': 'Polygon' - }, - 'links': [], - 'assets': { - 'data': { - 'href': 'file://example/example_granule_1.txt', - 'title': 'Example1', - 'roles': ['data'] - } - }, - 'bbox': [-1, -2, 3, 4] - }) - self.assertEqual(AdapterTester.process_args[1][0].to_dict(), { - 'type': 'Feature', - 'stac_version': '1.0.0', - 'stac_extensions': [], - 'id': 'G0002-EXAMPLE', - 'properties': { - 'start_datetime': '2003-03-03T03:03:03Z', - 'end_datetime': '2004-04-04T04:04:04Z', - 'datetime': None - }, - 'geometry': { - 'coordinates': [[[-5, -6], [-5, 8], [7, 8], [7, -6], [-5, -6]]], - 'type': 'Polygon' - }, - 'links': [], - 'assets': { - 'data': { - 'href': 'file://example/example_granule_2.txt', - 'title': 'Example2', - 'roles': ['data'] - } - }, - 'bbox': [-5, -6, 7, 8] - }) diff --git a/tests/test_cli.py b/tests/test_cli.py index 6e8e10e..6545b02 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -4,7 +4,8 @@ import harmony_service_lib.util from harmony_service_lib import cli, BaseHarmonyAdapter -from tests.util import mock_receive, cli_test +from pystac import Catalog +from tests.util import cli_test class MockAdapter(BaseHarmonyAdapter): @@ -14,17 +15,18 @@ class MockAdapter(BaseHarmonyAdapter): messages = [] errors = [] cleaned_up = [] - - def __init__(self, message): - super().__init__(self, message) + result_catalog = Catalog( + id='example id', + description='An empty STAC catalog', + stac_extensions=[] + ) + + def __init__(self, message, catalog=None): + super().__init__(message, catalog) MockAdapter.messages.append(message.data) def invoke(self): - self.is_complete = True - self.is_failed = False - - def completed_with_error(self, error): - MockAdapter.errors.append(error) + return (self.message, self.result_catalog) def cleanup(self): MockAdapter.cleaned_up.append(True) @@ -69,52 +71,30 @@ def test_when_harmony_input_is_not_provided_it_terminates_with_error(self, parse '--harmony-input or --harmony-input-file must be provided for --harmony-action=invoke') @cli_test('--harmony-action', 'invoke', '--harmony-input', '{"test": "input"}') + def test_when_harmony_metadata_dir_is_not_provided_it_terminates_with_error(self, parser): + with patch.object(parser, 'error') as error_method: + args = parser.parse_args() + cli.run_cli(parser, args, MockAdapter, self.config) + error_method.assert_called_once_with( + '--harmony-metadata-dir must be provided for --harmony-action=invoke') + + @cli_test('--harmony-action', 'invoke', '--harmony-input', '{"test": "input"}', '--harmony-metadata-dir', '/tmp') def test_when_harmony_input_is_provided_it_creates_and_invokes_an_adapter(self, parser): args = parser.parse_args() cli.run_cli(parser, args, MockAdapter, self.config) self.assertListEqual([{'test': 'input'}], MockAdapter.messages) - @cli_test('--harmony-action', 'invoke', '--harmony-input-file', '/tmp/operation.json') + @cli_test('--harmony-action', 'invoke', '--harmony-input-file', '/tmp/operation.json', '--harmony-metadata-dir', '/tmp') def test_when_harmony_input_file_is_provided_it_creates_and_invokes_an_adapter(self, parser): args = parser.parse_args() cli.run_cli(parser, args, MockAdapter, self.config) self.assertListEqual([{'test': 'input'}], MockAdapter.messages) - @cli_test('--harmony-action', 'invoke', '--harmony-input', '{"test": "input"}') - def test_when_the_backend_service_doesnt_respond_it_responds_with_an_error(self, parser): - class MockImpl(MockAdapter): - def invoke(self): - self.is_complete = False - - args = parser.parse_args() - try: - cli.run_cli(parser, args, MockImpl, self.config) - except Exception: - pass - self.assertListEqual( - MockImpl.errors, ['The backend service did not respond']) - - @cli_test('--harmony-action', 'invoke', '--harmony-input', '{"test": "input"}') - def test_when_the_backend_service_throws_an_exception_before_response_it_responds_with_an_error(self, parser): - class MockImpl(MockAdapter): - def invoke(self): - self.is_complete = False - raise Exception('Something bad happened') - - args = parser.parse_args() - with self.assertRaises(Exception) as context: - cli.run_cli(parser, args, MockImpl, self.config) - - self.assertTrue('Something bad happened' in str(context.exception)) - self.assertListEqual( - MockImpl.errors, ['Service request failed with an unknown error']) - - @cli_test('--harmony-action', 'invoke', '--harmony-input', '{"test": "input"}') - def test_when_the_backend_service_throws_an_exception_afterresponse_it_does_not_respond_again(self, parser): + @cli_test('--harmony-action', 'invoke', '--harmony-input', '{"test": "input"}', '--harmony-metadata-dir', '/tmp') + def test_when_the_backend_service_throws_an_exception_after_response_it_does_not_respond_again(self, parser): class MockImpl(MockAdapter): def invoke(self): - self.is_complete = True raise Exception('Something bad happened') args = parser.parse_args() @@ -124,156 +104,5 @@ def invoke(self): pass self.assertListEqual(MockImpl.errors, []) - -class TestCliStartAction(unittest.TestCase): - def setUp(self): - self.config = harmony_service_lib.util.config(validate=False) - - def tearDown(self): - MockAdapter.messages = [] - MockAdapter.errors = [] - MockAdapter.cleaned_up = [] - - @cli_test('--harmony-action', 'start') - def test_when_queue_url_is_not_provided_it_terminates_with_error(self, parser): - with patch.object(parser, 'error') as error_method: - args = parser.parse_args() - cli.run_cli(parser, args, MockAdapter, self.config) - error_method.assert_called_once_with( - '--harmony-queue-url must be provided for --harmony-action=start') - - @cli_test('--harmony-action', 'start', '--harmony-queue-url', 'test-queue-url') - @patch('boto3.client') - def test_listens_on_the_provided_queue(self, parser, client): - sqs = mock_receive(self.config, client, parser, MockAdapter) - sqs.receive_message.assert_called_with( - QueueUrl='test-queue-url', - VisibilityTimeout=600, - WaitTimeSeconds=20, - MaxNumberOfMessages=1) - self.assertListEqual(MockAdapter.messages, []) - self.assertListEqual(MockAdapter.errors, []) - - @cli_test('--harmony-action', 'start', - '--harmony-queue-url', 'test-queue-url', - '--harmony-visibility-timeout', '100') - @patch('boto3.client') - def test_uses_optional_visibility_timeouts_from_the_command_line(self, parser, client): - sqs = mock_receive(self.config, client, parser, MockAdapter) - sqs.receive_message.assert_called_with( - QueueUrl='test-queue-url', - VisibilityTimeout=100, - WaitTimeSeconds=20, - MaxNumberOfMessages=1) - self.assertListEqual(MockAdapter.messages, []) - self.assertListEqual(MockAdapter.errors, []) - - @cli_test('--harmony-action', 'start', '--harmony-queue-url', 'test-queue-url') - @patch('boto3.client') - def test_sends_queue_messages_to_the_adapter(self, parser, client): - mock_receive(self.config, client, parser, MockAdapter, - '{"test": "a"}', None, '{"test": "b"}') - self.assertEqual(MockAdapter.messages, [{'test': 'a'}, {'test': 'b'}]) - - @cli_test('--harmony-action', 'start', '--harmony-queue-url', 'test-queue-url') - @patch('boto3.client') - def test_when_the_adapter_completes_the_request_it_deletes_the_queue_message(self, parser, client): - sqs = mock_receive(self.config, client, parser, MockAdapter, - '{"test": "a"}', None, '{"test": "b"}') - sqs.delete_message.assert_called_with( - QueueUrl='test-queue-url', - ReceiptHandle=2) - - @cli_test('--harmony-action', 'start', '--harmony-queue-url', 'test-queue-url') - @patch('boto3.client') - def test_when_the_adapter_completes_the_request_it_calls_cleanup_on_the_adapter(self, parser, client): - mock_receive(self.config, client, parser, MockAdapter, - '{"test": "a"}', None, '{"test": "b"}') - self.assertListEqual(MockAdapter.cleaned_up, [True, True]) - - @cli_test('--harmony-action', 'start', '--harmony-queue-url', 'test-queue-url') - @patch('boto3.client') - def test_when_the_adapter_runs_without_completing_the_request_it_returns_the_message_to_the_queue( - self, parser, client - ): - class MockImpl(MockAdapter): - def invoke(self): - self.is_complete = False - - sqs = mock_receive(self.config, client, parser, MockImpl, - '{"test": "a"}', None, '{"test": "b"}') - sqs.delete_message.assert_not_called() - sqs.change_message_visibility.assert_called_with( - QueueUrl='test-queue-url', - VisibilityTimeout=0, - ReceiptHandle=2) - - @cli_test('--harmony-action', 'start', '--harmony-queue-url', 'test-queue-url') - @patch('boto3.client') - def test_when_the_adapter_runs_without_completing_the_request_it_calls_cleanup_on_the_adapter(self, parser, client): - class MockImpl(MockAdapter): - def invoke(self): - self.is_complete = False - - mock_receive(self.config, client, parser, MockImpl, '{"test": "a"}') - self.assertListEqual(MockImpl.cleaned_up, [True]) - - @cli_test('--harmony-action', 'start', '--harmony-queue-url', 'test-queue-url') - @patch('boto3.client') - def test_when_the_adapter_throws_before_completing_the_request_it_returns_the_message_to_the_queue( - self, parser, client - ): - class MockImpl(MockAdapter): - def invoke(self): - self.is_complete = False - raise Exception('Something bad happened') - - sqs = mock_receive(self.config, client, parser, MockImpl, - '{"test": "a"}', None, '{"test": "b"}') - sqs.delete_message.assert_not_called() - sqs.change_message_visibility.assert_called_with( - QueueUrl='test-queue-url', - VisibilityTimeout=0, - ReceiptHandle=2) - - self.assertListEqual(MockImpl.cleaned_up, [True, True]) - - @cli_test('--harmony-action', 'start', '--harmony-queue-url', 'test-queue-url') - @patch('boto3.client') - def test_when_the_adapter_throws_after_completing_the_request_it_deletes_the_queue_message(self, parser, client): - class MockImpl(MockAdapter): - def invoke(self): - self.is_complete = True - raise Exception('Something bad happened') - - sqs = mock_receive(self.config, client, parser, MockImpl, - '{"test": "a"}', None, '{"test": "b"}') - sqs.delete_message.assert_called_with( - QueueUrl='test-queue-url', - ReceiptHandle=2) - sqs.change_message_visibility.assert_not_called() - - @cli_test('--harmony-action', 'start', '--harmony-queue-url', 'test-queue-url') - @patch('boto3.client') - def test_when_the_adapter_throws_it_calls_cleanup_on_the_adapter(self, parser, client): - class MockImpl(MockAdapter): - def invoke(self): - self.is_complete = False - raise Exception('Something bad happened') - - mock_receive(self.config, client, parser, MockImpl, '{"test": "a"}', None, '{"test": "b"}') - self.assertListEqual(MockImpl.cleaned_up, [True, True]) - - @cli_test('--harmony-action', 'start', '--harmony-queue-url', 'test-queue-url') - @patch('boto3.client') - def test_when_cleanup_throws_it_continues_processing_queue_messages(self, parser, client): - class MockImpl(MockAdapter): - def cleanup(self): - raise Exception('Something bad happened') - - mock_receive(self.config, client, parser, MockImpl, '{"test": "a"}', None, '{"test": "b"}') - self.assertEqual(MockAdapter.messages, [{'test': 'a'}, {'test': 'b'}]) - - if __name__ == '__main__': unittest.main() diff --git a/tests/test_cli_stac.py b/tests/test_cli_stac.py index 6a365d3..2fffee7 100644 --- a/tests/test_cli_stac.py +++ b/tests/test_cli_stac.py @@ -20,7 +20,7 @@ class MockAdapter(BaseHarmonyAdapter): def invoke(self): MockAdapter.message = self.message return (self.message, self.catalog) - + class MockMultiCatalogOutputAdapter(BaseHarmonyAdapter): message = None """ @@ -98,7 +98,6 @@ def test_when_the_backend_service_throws_a_known_error_it_writes_the_error_to_th class MockImpl(MockAdapter): def invoke(self): - self.is_complete = False raise ForbiddenException('Something bad happened') args = parser.parse_args() @@ -118,7 +117,6 @@ def test_when_the_backend_service_throws_an_unknown_error_it_writes_a_generic_er class MockImpl(MockAdapter): def invoke(self): - self.is_complete = False raise Exception('Something bad happened') args = parser.parse_args() @@ -155,7 +153,7 @@ def test_when_multi_catalog_output_it_saves_with_particular_layout(self): with open(os.path.join(self.workdir, 'batch-catalogs.json')) as file: self.assertEqual(json.loads(file.read()), ["catalog0.json", - "catalog1.json", + "catalog1.json", "catalog2.json"]) if __name__ == '__main__': diff --git a/tests/test_util.py b/tests/test_util.py index 97e66fc..dc3261b 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,15 +1,12 @@ -import pathlib from requests import Session import unittest from unittest.mock import patch, MagicMock, mock_open, ANY -from urllib.error import HTTPError from harmony_service_lib import aws from harmony_service_lib import util from harmony_service_lib.http import request_context from harmony_service_lib.message import Variable -from tests.test_cli import MockAdapter, cli_test -from tests.util import mock_receive, config_fixture +from tests.util import config_fixture class TestDownload(unittest.TestCase): @@ -192,33 +189,6 @@ def test_when_not_using_localstack_it_ignores_localstack_host(self): self.assertDictEqual(expected, actual) -class TestSQSReadHealthUpdate(unittest.TestCase): - def setUp(self): - self.config = util.config(validate=False) - - @cli_test('--harmony-action', 'start', '--harmony-queue-url', 'test-queue-url') - @patch('boto3.client') - @patch.object(pathlib.Path, '__new__') - def test_when_reading_from_queue_health_update_happens(self, parser, mock_path, client): - all_test_cases = [ - # message received - ['{"test": "a"}'], - - # no message received - [None], - - # error receiving message - [Exception()] - ] - for messages in all_test_cases: - with self.subTest(messages=messages): - try: - mock_receive(self.config, client, parser, MockAdapter, *messages) - except Exception: - pass - mock_path.return_value.touch.assert_called() - - class TestGenerateOutputFilename(unittest.TestCase): def test_includes_provided_suffixes_ext(self): """Ensure the correct combinations of regridded, subsetted and diff --git a/tests/util.py b/tests/util.py index 0e8c109..d978d99 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,44 +1,10 @@ import argparse import sys -from unittest.mock import patch, MagicMock +from unittest.mock import patch from contextlib import contextmanager from harmony_service_lib import cli, util - -def mock_receive(cfg, client, parser, AdapterClass, *messages): - """ - Mocks an sqs receive call - """ - sqs = MagicMock() - side_effects = [] - - for i, message in enumerate(messages): - contents = [] - if message: - contents.append({'Body': message, 'ReceiptHandle': i}) - # this allows us to test what happens when receiving a message from the queue fails - if isinstance(message, Exception): - side_effects = message - break - else: - side_effects.append({'Messages': contents}) - - print(side_effects) - sqs.receive_message.side_effect = side_effects - client.return_value = sqs - args = parser.parse_args() - try: - cli.run_cli(parser, args, AdapterClass, cfg=cfg) - except RuntimeError as e: - if str(e) == 'generator raised StopIteration': - # Expection. Happens every call when messages are exhausted, allowing us to stop iterating. - pass - else: - raise - return sqs - - def cli_test(*cli_args): """ Decorator that takes a list of CLI parameters, patches them into @@ -102,7 +68,6 @@ def config_fixture(fallback_authn_enabled=False, backend_host=c.backend_host, localstack_host=c.localstack_host, aws_default_region=c.aws_default_region, - health_check_path=c.health_check_path, shared_secret_key=c.shared_secret_key, # Override if provided, else default user_agent=c.user_agent if user_agent is None else user_agent