Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate ArgoFloat with ArgoIndex #432

Merged
merged 11 commits into from
Feb 11, 2025
Merged

Integrate ArgoFloat with ArgoIndex #432

merged 11 commits into from
Feb 11, 2025

Conversation

gmaze
Copy link
Member

@gmaze gmaze commented Jan 24, 2025

Follow-on the implementation of the ArgoFloat API:
https://gist.github.com/gmaze/6924fc26405f42aa36fd5a22a64657ea

In this PR, we provide an interation method for the ArgoIndex that return an ArgoFloat instance

# Make a search on Argo index of profiles:
idx = ArgoIndex().search_lat_lon([lon_min, lon_max, lat_min, lat_max])

# Then iterate over float matching the results:
for float in idx.iterfloats():
  float # is a ArgoFloat instance

@gmaze gmaze added the enhancement New feature or request label Jan 24, 2025
gmaze added 3 commits January 27, 2025 15:51
for dac and metadata, no need for them before requested explicitely, this make instanciation way faster ! hence integration with an ArgoIndex interator
@gmaze gmaze marked this pull request as ready for review February 3, 2025 13:34
Copy link

codecov bot commented Feb 10, 2025

❌ 32 Tests Failed:

Tests completed Failed Passed Skipped
1744 32 1712 740
View the top 3 failed test(s) by shortest run time
test_fetchers_data_gdac.py::TestBackend::test_fetching[host='s3', ds='phy', mode='research', {'profile': [13857, 90]}]
Stack Traces | 0.104s run time
func = <bound method ClientCreator._create_api_method.<locals>._api_call of <aiobotocore.client.S3 object at 0x000001EBD467B310>>

    async def _error_wrapper(func, *, args=(), kwargs=None, retries):
        if kwargs is None:
            kwargs = {}
        for i in range(retries):
            try:
>               return await func(*args, **kwargs)

C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:113: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <aiobotocore.client.S3 object at 0x000001EBD467B310>
operation_name = 'HeadObject'
api_params = {'Bucket': 'argo-gdac-sandbox', 'Key': 'pub/dac'}

    async def _make_api_call(self, operation_name, api_params):
        operation_model = self._service_model.operation_model(operation_name)
        service_name = self._service_model.service_name
        history_recorder.record(
            'API_CALL',
            {
                'service': service_name,
                'operation': operation_name,
                'params': api_params,
            },
        )
        if operation_model.deprecated:
            logger.debug(
                'Warning: %s.%s() is deprecated', service_name, operation_name
            )
        request_context = {
            'client_region': self.meta.region_name,
            'client_config': self.meta.config,
            'has_streaming_input': operation_model.has_streaming_input,
            'auth_type': operation_model.resolved_auth_type,
            'unsigned_payload': operation_model.unsigned_payload,
        }
    
        api_params = await self._emit_api_params(
            api_params=api_params,
            operation_model=operation_model,
            context=request_context,
        )
        (
            endpoint_url,
            additional_headers,
            properties,
        ) = await self._resolve_endpoint_ruleset(
            operation_model, api_params, request_context
        )
        if properties:
            # Pass arbitrary endpoint info with the Request
            # for use during construction.
            request_context['endpoint_properties'] = properties
        request_dict = await self._convert_to_request_dict(
            api_params=api_params,
            operation_model=operation_model,
            endpoint_url=endpoint_url,
            context=request_context,
            headers=additional_headers,
        )
        resolve_checksum_context(request_dict, operation_model, api_params)
    
        service_id = self._service_model.service_id.hyphenize()
        handler, event_response = await self.meta.events.emit_until_response(
            f'before-call.{service_id}.{operation_name}',
            model=operation_model,
            params=request_dict,
            request_signer=self._request_signer,
            context=request_context,
        )
    
        if event_response is not None:
            http, parsed_response = event_response
        else:
            maybe_compress_request(
                self.meta.config, request_dict, operation_model
            )
            apply_request_checksum(request_dict)
            http, parsed_response = await self._make_request(
                operation_model, request_dict, request_context
            )
    
        await self.meta.events.emit(
            f'after-call.{service_id}.{operation_name}',
            http_response=http,
            parsed=parsed_response,
            model=operation_model,
            context=request_context,
        )
    
        if http.status_code >= 300:
            error_info = parsed_response.get("Error", {})
            error_code = error_info.get("QueryErrorCode") or error_info.get(
                "Code"
            )
            error_class = self.exceptions.from_code(error_code)
>           raise error_class(parsed_response, operation_name)
E           botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden

C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\aiobotocore\client.py:412: ClientError

The above exception was the direct cause of the following exception:

self = <argopy.tests.test_fetchers_data_gdac.TestBackend object at 0x000001EBA5B406D0>
request = <SubRequest 'fetcher' for <Function test_fetching[host='s3', ds='phy', mode='research', {'profile': [13857, 90]}]>>
mocked_httpserver = 'http://127.0.0.1:9898'

    @pytest.fixture
    def fetcher(self, request, mocked_httpserver):
        """ Fixture to create a GDAC fetcher for a given host and access point """
>       fetcher_args, access_point = self._setup_fetcher(request, cached=False)

argopy\tests\test_fetchers_data_gdac.py:188: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
argopy\tests\test_fetchers_data_gdac.py:180: in _setup_fetcher
    if not check_gdac_path(fetcher_args['gdac']):
argopy\utils\checkers.py:504: in check_gdac_path
    check1 = fs.exists(fs.sep.join([path, "dac"]))
argopy\stores\implementations\http.py:64: in exists
    return super().exists(path, *args, **kwargs)
argopy\stores\spec.py:61: in exists
    return self.fs.exists(path, *args)
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\fsspec\asyn.py:118: in wrapper
    return sync(self.loop, func, *args, **kwargs)
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\fsspec\asyn.py:103: in sync
    raise return_result
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\fsspec\asyn.py:56: in _runner
    result[0] = await coro
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:1058: in _exists
    await self._info(path, bucket, key, version_id=version_id)
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:1371: in _info
    out = await self._call_s3(
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:362: in _call_s3
    return await _error_wrapper(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

func = <bound method ClientCreator._create_api_method.<locals>._api_call of <aiobotocore.client.S3 object at 0x000001EBD467B310>>

    async def _error_wrapper(func, *, args=(), kwargs=None, retries):
        if kwargs is None:
            kwargs = {}
        for i in range(retries):
            try:
                return await func(*args, **kwargs)
            except S3_RETRYABLE_ERRORS as e:
                err = e
                logger.debug("Retryable error: %s", e)
                await asyncio.sleep(min(1.7**i * 0.1, 15))
            except ClientError as e:
                logger.debug("Client error (maybe retryable): %s", e)
                err = e
                if "SlowDown" in str(e):
                    await asyncio.sleep(min(1.7**i * 0.1, 15))
                elif "XAmzContentSHA256Mismatch" in str(e):
                    await asyncio.sleep(min(1.7**i * 0.1, 15))
                else:
                    break
            except Exception as e:
                logger.debug("Nonretryable error: %s", e)
                err = e
                break
    
        if "'coroutine'" in str(err):
            # aiobotocore internal error - fetch original botocore error
            tb = err.__traceback__
            while tb.tb_next:
                tb = tb.tb_next
            try:
                await tb.tb_frame.f_locals["response"]
            except Exception as e:
                err = e
        err = translate_boto_error(err)
>       raise err
E       PermissionError: Forbidden

C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:142: PermissionError
test_fetchers_data_gdac.py::TestBackend::test_fetching[host='s3', ds='phy', mode='expert', {'float': [13857]}]
Stack Traces | 0.105s run time
func = <bound method ClientCreator._create_api_method.<locals>._api_call of <aiobotocore.client.S3 object at 0x000001EBD467B310>>

    async def _error_wrapper(func, *, args=(), kwargs=None, retries):
        if kwargs is None:
            kwargs = {}
        for i in range(retries):
            try:
>               return await func(*args, **kwargs)

C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:113: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <aiobotocore.client.S3 object at 0x000001EBD467B310>
operation_name = 'HeadObject'
api_params = {'Bucket': 'argo-gdac-sandbox', 'Key': 'pub/dac'}

    async def _make_api_call(self, operation_name, api_params):
        operation_model = self._service_model.operation_model(operation_name)
        service_name = self._service_model.service_name
        history_recorder.record(
            'API_CALL',
            {
                'service': service_name,
                'operation': operation_name,
                'params': api_params,
            },
        )
        if operation_model.deprecated:
            logger.debug(
                'Warning: %s.%s() is deprecated', service_name, operation_name
            )
        request_context = {
            'client_region': self.meta.region_name,
            'client_config': self.meta.config,
            'has_streaming_input': operation_model.has_streaming_input,
            'auth_type': operation_model.resolved_auth_type,
            'unsigned_payload': operation_model.unsigned_payload,
        }
    
        api_params = await self._emit_api_params(
            api_params=api_params,
            operation_model=operation_model,
            context=request_context,
        )
        (
            endpoint_url,
            additional_headers,
            properties,
        ) = await self._resolve_endpoint_ruleset(
            operation_model, api_params, request_context
        )
        if properties:
            # Pass arbitrary endpoint info with the Request
            # for use during construction.
            request_context['endpoint_properties'] = properties
        request_dict = await self._convert_to_request_dict(
            api_params=api_params,
            operation_model=operation_model,
            endpoint_url=endpoint_url,
            context=request_context,
            headers=additional_headers,
        )
        resolve_checksum_context(request_dict, operation_model, api_params)
    
        service_id = self._service_model.service_id.hyphenize()
        handler, event_response = await self.meta.events.emit_until_response(
            f'before-call.{service_id}.{operation_name}',
            model=operation_model,
            params=request_dict,
            request_signer=self._request_signer,
            context=request_context,
        )
    
        if event_response is not None:
            http, parsed_response = event_response
        else:
            maybe_compress_request(
                self.meta.config, request_dict, operation_model
            )
            apply_request_checksum(request_dict)
            http, parsed_response = await self._make_request(
                operation_model, request_dict, request_context
            )
    
        await self.meta.events.emit(
            f'after-call.{service_id}.{operation_name}',
            http_response=http,
            parsed=parsed_response,
            model=operation_model,
            context=request_context,
        )
    
        if http.status_code >= 300:
            error_info = parsed_response.get("Error", {})
            error_code = error_info.get("QueryErrorCode") or error_info.get(
                "Code"
            )
            error_class = self.exceptions.from_code(error_code)
>           raise error_class(parsed_response, operation_name)
E           botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden

C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\aiobotocore\client.py:412: ClientError

The above exception was the direct cause of the following exception:

self = <argopy.tests.test_fetchers_data_gdac.TestBackend object at 0x000001EBA5B405B0>
request = <SubRequest 'fetcher' for <Function test_fetching[host='s3', ds='phy', mode='expert', {'float': [13857]}]>>
mocked_httpserver = 'http://127.0.0.1:9898'

    @pytest.fixture
    def fetcher(self, request, mocked_httpserver):
        """ Fixture to create a GDAC fetcher for a given host and access point """
>       fetcher_args, access_point = self._setup_fetcher(request, cached=False)

argopy\tests\test_fetchers_data_gdac.py:188: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
argopy\tests\test_fetchers_data_gdac.py:180: in _setup_fetcher
    if not check_gdac_path(fetcher_args['gdac']):
argopy\utils\checkers.py:504: in check_gdac_path
    check1 = fs.exists(fs.sep.join([path, "dac"]))
argopy\stores\implementations\http.py:64: in exists
    return super().exists(path, *args, **kwargs)
argopy\stores\spec.py:61: in exists
    return self.fs.exists(path, *args)
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\fsspec\asyn.py:118: in wrapper
    return sync(self.loop, func, *args, **kwargs)
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\fsspec\asyn.py:103: in sync
    raise return_result
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\fsspec\asyn.py:56: in _runner
    result[0] = await coro
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:1058: in _exists
    await self._info(path, bucket, key, version_id=version_id)
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:1371: in _info
    out = await self._call_s3(
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:362: in _call_s3
    return await _error_wrapper(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

func = <bound method ClientCreator._create_api_method.<locals>._api_call of <aiobotocore.client.S3 object at 0x000001EBD467B310>>

    async def _error_wrapper(func, *, args=(), kwargs=None, retries):
        if kwargs is None:
            kwargs = {}
        for i in range(retries):
            try:
                return await func(*args, **kwargs)
            except S3_RETRYABLE_ERRORS as e:
                err = e
                logger.debug("Retryable error: %s", e)
                await asyncio.sleep(min(1.7**i * 0.1, 15))
            except ClientError as e:
                logger.debug("Client error (maybe retryable): %s", e)
                err = e
                if "SlowDown" in str(e):
                    await asyncio.sleep(min(1.7**i * 0.1, 15))
                elif "XAmzContentSHA256Mismatch" in str(e):
                    await asyncio.sleep(min(1.7**i * 0.1, 15))
                else:
                    break
            except Exception as e:
                logger.debug("Nonretryable error: %s", e)
                err = e
                break
    
        if "'coroutine'" in str(err):
            # aiobotocore internal error - fetch original botocore error
            tb = err.__traceback__
            while tb.tb_next:
                tb = tb.tb_next
            try:
                await tb.tb_frame.f_locals["response"]
            except Exception as e:
                err = e
        err = translate_boto_error(err)
>       raise err
E       PermissionError: Forbidden

C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:142: PermissionError
test_fetchers_data_gdac.py::TestBackend::test_fetching[host='s3', ds='phy', mode='standard', {'region': [-20, -16.0, 0, 1, 0, 100.0]}]
Stack Traces | 0.105s run time
func = <bound method ClientCreator._create_api_method.<locals>._api_call of <aiobotocore.client.S3 object at 0x000001EBD467B310>>

    async def _error_wrapper(func, *, args=(), kwargs=None, retries):
        if kwargs is None:
            kwargs = {}
        for i in range(retries):
            try:
>               return await func(*args, **kwargs)

C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:113: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <aiobotocore.client.S3 object at 0x000001EBD467B310>
operation_name = 'HeadObject'
api_params = {'Bucket': 'argo-gdac-sandbox', 'Key': 'pub/dac'}

    async def _make_api_call(self, operation_name, api_params):
        operation_model = self._service_model.operation_model(operation_name)
        service_name = self._service_model.service_name
        history_recorder.record(
            'API_CALL',
            {
                'service': service_name,
                'operation': operation_name,
                'params': api_params,
            },
        )
        if operation_model.deprecated:
            logger.debug(
                'Warning: %s.%s() is deprecated', service_name, operation_name
            )
        request_context = {
            'client_region': self.meta.region_name,
            'client_config': self.meta.config,
            'has_streaming_input': operation_model.has_streaming_input,
            'auth_type': operation_model.resolved_auth_type,
            'unsigned_payload': operation_model.unsigned_payload,
        }
    
        api_params = await self._emit_api_params(
            api_params=api_params,
            operation_model=operation_model,
            context=request_context,
        )
        (
            endpoint_url,
            additional_headers,
            properties,
        ) = await self._resolve_endpoint_ruleset(
            operation_model, api_params, request_context
        )
        if properties:
            # Pass arbitrary endpoint info with the Request
            # for use during construction.
            request_context['endpoint_properties'] = properties
        request_dict = await self._convert_to_request_dict(
            api_params=api_params,
            operation_model=operation_model,
            endpoint_url=endpoint_url,
            context=request_context,
            headers=additional_headers,
        )
        resolve_checksum_context(request_dict, operation_model, api_params)
    
        service_id = self._service_model.service_id.hyphenize()
        handler, event_response = await self.meta.events.emit_until_response(
            f'before-call.{service_id}.{operation_name}',
            model=operation_model,
            params=request_dict,
            request_signer=self._request_signer,
            context=request_context,
        )
    
        if event_response is not None:
            http, parsed_response = event_response
        else:
            maybe_compress_request(
                self.meta.config, request_dict, operation_model
            )
            apply_request_checksum(request_dict)
            http, parsed_response = await self._make_request(
                operation_model, request_dict, request_context
            )
    
        await self.meta.events.emit(
            f'after-call.{service_id}.{operation_name}',
            http_response=http,
            parsed=parsed_response,
            model=operation_model,
            context=request_context,
        )
    
        if http.status_code >= 300:
            error_info = parsed_response.get("Error", {})
            error_code = error_info.get("QueryErrorCode") or error_info.get(
                "Code"
            )
            error_class = self.exceptions.from_code(error_code)
>           raise error_class(parsed_response, operation_name)
E           botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden

C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\aiobotocore\client.py:412: ClientError

The above exception was the direct cause of the following exception:

self = <argopy.tests.test_fetchers_data_gdac.TestBackend object at 0x000001EBA5B40A90>
request = <SubRequest 'fetcher' for <Function test_fetching[host='s3', ds='phy', mode='standard', {'region': [-20, -16.0, 0, 1, 0, 100.0]}]>>
mocked_httpserver = 'http://127.0.0.1:9898'

    @pytest.fixture
    def fetcher(self, request, mocked_httpserver):
        """ Fixture to create a GDAC fetcher for a given host and access point """
>       fetcher_args, access_point = self._setup_fetcher(request, cached=False)

argopy\tests\test_fetchers_data_gdac.py:188: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
argopy\tests\test_fetchers_data_gdac.py:180: in _setup_fetcher
    if not check_gdac_path(fetcher_args['gdac']):
argopy\utils\checkers.py:504: in check_gdac_path
    check1 = fs.exists(fs.sep.join([path, "dac"]))
argopy\stores\implementations\http.py:64: in exists
    return super().exists(path, *args, **kwargs)
argopy\stores\spec.py:61: in exists
    return self.fs.exists(path, *args)
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\fsspec\asyn.py:118: in wrapper
    return sync(self.loop, func, *args, **kwargs)
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\fsspec\asyn.py:103: in sync
    raise return_result
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\fsspec\asyn.py:56: in _runner
    result[0] = await coro
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:1058: in _exists
    await self._info(path, bucket, key, version_id=version_id)
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:1371: in _info
    out = await self._call_s3(
C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:362: in _call_s3
    return await _error_wrapper(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

func = <bound method ClientCreator._create_api_method.<locals>._api_call of <aiobotocore.client.S3 object at 0x000001EBD467B310>>

    async def _error_wrapper(func, *, args=(), kwargs=None, retries):
        if kwargs is None:
            kwargs = {}
        for i in range(retries):
            try:
                return await func(*args, **kwargs)
            except S3_RETRYABLE_ERRORS as e:
                err = e
                logger.debug("Retryable error: %s", e)
                await asyncio.sleep(min(1.7**i * 0.1, 15))
            except ClientError as e:
                logger.debug("Client error (maybe retryable): %s", e)
                err = e
                if "SlowDown" in str(e):
                    await asyncio.sleep(min(1.7**i * 0.1, 15))
                elif "XAmzContentSHA256Mismatch" in str(e):
                    await asyncio.sleep(min(1.7**i * 0.1, 15))
                else:
                    break
            except Exception as e:
                logger.debug("Nonretryable error: %s", e)
                err = e
                break
    
        if "'coroutine'" in str(err):
            # aiobotocore internal error - fetch original botocore error
            tb = err.__traceback__
            while tb.tb_next:
                tb = tb.tb_next
            try:
                await tb.tb_frame.f_locals["response"]
            except Exception as e:
                err = e
        err = translate_boto_error(err)
>       raise err
E       PermissionError: Forbidden

C:\Users\runneradmin\micromamba\envs\argopy-tests\lib\site-packages\s3fs\core.py:142: PermissionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@gmaze gmaze merged commit 44e5d8f into master Feb 11, 2025
39 checks passed
@gmaze gmaze deleted the integrate-float-store branch February 13, 2025 13:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant