From a9405287cd4e229d60665b40226a8c319aa2b0a1 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 3 Oct 2023 18:07:08 +0200 Subject: [PATCH 1/3] EJR: automatically inject `job_id` extra logging Related to #163 --- openeo_driver/jobregistry.py | 108 +++++++++++++++++------------------ tests/test_jobregistry.py | 39 +++++++++++-- 2 files changed, 89 insertions(+), 58 deletions(-) diff --git a/openeo_driver/jobregistry.py b/openeo_driver/jobregistry.py index 2997d791..36fb9509 100644 --- a/openeo_driver/jobregistry.py +++ b/openeo_driver/jobregistry.py @@ -1,4 +1,5 @@ import argparse +import contextlib import datetime as dt import json import logging @@ -241,6 +242,19 @@ def set_user_agent(self): user_agent += f"/{self._backend_id}" self._session.headers["User-Agent"] = user_agent + @contextlib.contextmanager + def _with_extra_logging(self, **kwargs): + """Context manager to temporarily add extra logging fields in a context""" + orig = self.logger + extra = kwargs + if isinstance(orig, logging.LoggerAdapter): + extra = {**orig.extra, **extra} + self.logger = logging.LoggerAdapter(logger=orig, extra=extra) + try: + yield + finally: + self.logger = orig + @property def backend_id(self) -> str: assert self._backend_id @@ -282,13 +296,9 @@ def _do_request( json: Union[dict, list, None] = None, use_auth: bool = True, expected_status: int = 200, - logging_extra: Optional[dict] = None, ) -> Union[dict, list, None]: """Do an HTTP request to Elastic Job Tracker service.""" - with TimingLogger( - logger=(lambda m: self.logger.debug(m, extra=logging_extra)), - title=f"EJR Request `{method} {path}`", - ): + with TimingLogger(logger=self.logger.debug, title=f"EJR Request `{method} {path}`"): headers = {} if use_auth: access_token = self._cache.get_or_call( @@ -300,10 +310,7 @@ def _do_request( headers["Authorization"] = f"Bearer {access_token}" url = url_join(self._api_url, path) - self.logger.debug( - f"Doing EJR request `{method} {url}` {headers.keys()=}", - extra=logging_extra, - ) + self.logger.debug(f"Doing EJR request `{method} {url}` {headers.keys()=}") if self._debug_show_curl: curl_command = self._as_curl(method=method, url=url, data=json, headers=headers) self.logger.debug(f"Equivalent curl command: {curl_command}") @@ -315,10 +322,7 @@ def _do_request( headers=headers, timeout=self._REQUEST_TIMEOUT, ) - self.logger.debug( - f"EJR response on `{method} {path}`: {response.status_code!r}", - extra=logging_extra, - ) + self.logger.debug(f"EJR response on `{method} {path}`: {response.status_code!r}") if expected_status and response.status_code != expected_status: raise EjrHttpError.from_response(response=response) else: @@ -382,48 +386,46 @@ def create_job( "api_version": api_version, # TODO: additional technical metadata, see https://github.com/Open-EO/openeo-api/issues/472 } - logging_extra = {"job_id": job_id} - self.logger.info(f"EJR creating {job_id=} {created=}", extra=logging_extra) - return self._do_request( - "POST", - "/jobs", - json=job_data, - expected_status=201, - logging_extra=logging_extra, - ) + with self._with_extra_logging(job_id=job_id): + self.logger.info(f"EJR creating {job_id=} {created=}") + return self._do_request("POST", "/jobs", json=job_data, expected_status=201) def get_job(self, job_id: str, fields: Optional[List[str]] = None) -> JobDict: - query = { - "bool": { - "filter": [ - {"term": {"backend_id": self.backend_id}}, - {"term": {"job_id": job_id}}, - ] + with self._with_extra_logging(job_id=job_id): + self.logger.info(f"EJR get job data {job_id=}") + query = { + "bool": { + "filter": [ + {"term": {"backend_id": self.backend_id}}, + {"term": {"job_id": job_id}}, + ] + } } - } - # Return full document, by default - jobs = self._search(query=query, fields=fields or ["*"]) - if len(jobs) == 1: - job = jobs[0] - assert job["job_id"] == job_id, f"{job['job_id']=} != {job_id=}" - return job - elif len(jobs) == 0: - raise JobNotFoundException(job_id=job_id) - else: - summary = [{k: j.get(k) for k in ["user_id", "created"]} for j in jobs] - self.logger.error(f"Found multiple ({len(jobs)}) jobs for {job_id=}: {repr_truncate(summary, width=200)}") - raise InternalException(message=f"Found {len(jobs)} jobs for {job_id=}") + # Return full document, by default + jobs = self._search(query=query, fields=fields or ["*"]) + if len(jobs) == 1: + job = jobs[0] + assert job["job_id"] == job_id, f"{job['job_id']=} != {job_id=}" + return job + elif len(jobs) == 0: + raise JobNotFoundException(job_id=job_id) + else: + summary = [{k: j.get(k) for k in ["user_id", "created"]} for j in jobs] + self.logger.error( + f"Found multiple ({len(jobs)}) jobs for {job_id=}: {repr_truncate(summary, width=200)}" + ) + raise InternalException(message=f"Found {len(jobs)} jobs for {job_id=}") def delete_job(self, job_id: str) -> None: - try: - self._do_request(method="DELETE", path=f"/jobs/{job_id}") - logging_extra = {"job_id": job_id} - self.logger.info(f"EJR deleted {job_id=}", extra=logging_extra) - except EjrHttpError as e: - if e.status_code == 404: - raise JobNotFoundException(job_id=job_id) from e - raise e + with self._with_extra_logging(job_id=job_id): + try: + self._do_request(method="DELETE", path=f"/jobs/{job_id}") + self.logger.info(f"EJR deleted {job_id=}") + except EjrHttpError as e: + if e.status_code == 404: + raise JobNotFoundException(job_id=job_id) from e + raise e def set_status( self, @@ -446,11 +448,9 @@ def set_status( def _update(self, job_id: str, data: dict) -> JobDict: """Generic update method""" - logging_extra = {"job_id": job_id} - self.logger.info(f"EJR update {job_id=} {data=}", extra=logging_extra) - return self._do_request( - "PATCH", f"/jobs/{job_id}", json=data, logging_extra=logging_extra - ) + with self._with_extra_logging(job_id=job_id): + self.logger.info(f"EJR update {job_id=} {data=}") + return self._do_request("PATCH", f"/jobs/{job_id}", json=data) def set_dependencies( self, job_id: str, dependencies: List[Dict[str, str]] diff --git a/tests/test_jobregistry.py b/tests/test_jobregistry.py index c463ce94..36484f80 100644 --- a/tests/test_jobregistry.py +++ b/tests/test_jobregistry.py @@ -606,10 +606,6 @@ def test_job_id_logging(self, requests_mock, oidc_mock, ejr, caplog): """Check that job_id logging is passed through as logging extra in appropriate places""" caplog.set_level(logging.DEBUG) - class Formatter: - def format(self, record: logging.LogRecord): - job_id = getattr(record, "job_id", None) - return f"{record.name}:{job_id}:{record.message}" job_id = "j-123" @@ -623,6 +619,12 @@ def patch_job(request, context): requests_mock.post(f"{self.EJR_API_URL}/jobs", json=post_jobs) requests_mock.patch(f"{self.EJR_API_URL}/jobs/{job_id}", json=patch_job) + requests_mock.delete(f"{self.EJR_API_URL}/jobs/{job_id}", status_code=200, content=b"") + + class Formatter: + def format(self, record: logging.LogRecord): + job_id = getattr(record, "job_id", None) + return f"{record.name}:{job_id}:{record.message}" with caplog_with_custom_formatter(caplog=caplog, format=Formatter()): with time_machine.travel("2020-01-02 03:04:05+00", tick=False): @@ -635,6 +637,9 @@ def patch_job(request, context): with time_machine.travel("2020-01-02 03:44:55+00", tick=False): ejr.set_status(job_id=job_id, status=JOB_STATUS.RUNNING) + with time_machine.travel("2020-01-03 12:00:00+00", tick=False): + ejr.delete_job(job_id=job_id) + logs = caplog.text.strip().split("\n") for expected in [ @@ -652,5 +657,31 @@ def patch_job(request, context): "openeo_driver.jobregistry.elastic:j-123:EJR update job_id='j-123' data={'status': 'running', 'updated': '2020-01-02T03:44:55Z'}", "openeo_driver.jobregistry.elastic:j-123:EJR response on `PATCH /jobs/j-123`: 200", "openeo_driver.jobregistry.elastic:j-123:EJR Request `PATCH /jobs/j-123`: end 2020-01-02 03:44:55, elapsed 0:00:00", + # delete + "openeo_driver.jobregistry.elastic:j-123:EJR Request `DELETE /jobs/j-123`: start 2020-01-03 12:00:00", + "openeo_driver.jobregistry.elastic:j-123:EJR deleted job_id='j-123'", ]: assert expected in logs + + def test_with_extra_logging(self, requests_mock, oidc_mock, ejr, caplog): + """Test that "extra logging fields" (like job_id) do not leak outside of context""" + caplog.set_level(logging.INFO) + + class Formatter: + def format(self, record: logging.LogRecord): + job_id = getattr(record, "job_id", None) + return f"{record.name} [{job_id}] {record.message}" + + with caplog_with_custom_formatter(caplog=caplog, format=Formatter()): + # Trigger failure during _with_extra_logging + requests_mock.post(f"{self.EJR_API_URL}/jobs/search", status_code=500) + with pytest.raises(EjrHttpError): + _ = ejr.get_job(job_id="job-123") + + # Health check should not be logged with job id in logs + requests_mock.get(f"{self.EJR_API_URL}/health", json={"ok": "yep"}) + ejr.health_check(use_auth=False, log=True) + + logs = caplog.text.strip().split("\n") + assert "openeo_driver.jobregistry.elastic [job-123] EJR get job data job_id='job-123'" in logs + assert "openeo_driver.jobregistry.elastic [None] EJR health check {'ok': 'yep'}" in logs From c3b6101f767a348c81fbac016a38769235f5de59 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 3 Oct 2023 17:35:17 +0200 Subject: [PATCH 2/3] Issue #163 EJR: verify (with backoff) that job was deleted/created --- openeo_driver/_version.py | 2 +- openeo_driver/jobregistry.py | 33 +++++++++++- tests/test_jobregistry.py | 102 ++++++++++++++++++++++++++++------- 3 files changed, 116 insertions(+), 21 deletions(-) diff --git a/openeo_driver/_version.py b/openeo_driver/_version.py index 2fe0e2b7..a542fd30 100644 --- a/openeo_driver/_version.py +++ b/openeo_driver/_version.py @@ -1 +1 @@ -__version__ = "0.69.0a1" +__version__ = "0.69.1a1" diff --git a/openeo_driver/jobregistry.py b/openeo_driver/jobregistry.py index 36fb9509..c2a86491 100644 --- a/openeo_driver/jobregistry.py +++ b/openeo_driver/jobregistry.py @@ -235,6 +235,8 @@ def __init__( self.set_user_agent() self._debug_show_curl = _debug_show_curl + # TODO: expose this as constructor arg or even config? + self._verification_backoffs = [0, 0.1, 1.0] def set_user_agent(self): user_agent = f"openeo_driver-{openeo_driver._version.__version__}/{self.__class__.__name__}" @@ -388,7 +390,9 @@ def create_job( } with self._with_extra_logging(job_id=job_id): self.logger.info(f"EJR creating {job_id=} {created=}") - return self._do_request("POST", "/jobs", json=job_data, expected_status=201) + result = self._do_request("POST", "/jobs", json=job_data, expected_status=201) + self._verify_job_existence(job_id=job_id, exists=True) + return result def get_job(self, job_id: str, fields: Optional[List[str]] = None) -> JobDict: with self._with_extra_logging(job_id=job_id): @@ -426,6 +430,33 @@ def delete_job(self, job_id: str) -> None: if e.status_code == 404: raise JobNotFoundException(job_id=job_id) from e raise e + self._verify_job_existence(job_id=job_id, exists=False) + + def _verify_job_existence(self, job_id: str, exists: bool = True): + """ + Verify that EJR committed the job creation/deletion + :param job_id: job id + :param exists: whether the job should exist (after creation) or not exist (after deletion) + :return: + """ + if not self._verification_backoffs: + return + for backoff in self._verification_backoffs: + self.logger.debug(f"_verify_job_existence {job_id=} {exists=} {backoff=}") + time.sleep(backoff) + try: + self.get_job(job_id=job_id, fields=["job_id"]) + if exists: + return + except JobNotFoundException: + if not exists: + return + except Exception as e: + # TODO: fail hard instead of just logging? + self.logger.exception(f"Unexpected error while verifying {job_id=} {exists=}: {e=}") + return + # TODO: fail hard instead of just logging? + self.logger.error(f"Failed to verify {job_id=} {exists=}") def set_status( self, diff --git a/tests/test_jobregistry.py b/tests/test_jobregistry.py index 36484f80..b6051c0b 100644 --- a/tests/test_jobregistry.py +++ b/tests/test_jobregistry.py @@ -1,6 +1,6 @@ import logging import urllib.parse -from typing import Union, Optional, List +from typing import Callable, List, Optional, Sequence, Union from unittest import mock import pytest @@ -8,8 +8,8 @@ import requests_mock import time_machine from openeo.rest.auth.testing import OidcMock -from openeo_driver.errors import JobNotFoundException, InternalException +from openeo_driver.errors import InternalException, JobNotFoundException from openeo_driver.jobregistry import ( DEPENDENCY_STATUS, JOB_STATUS, @@ -18,13 +18,7 @@ ElasticJobRegistry, ElasticJobRegistryCredentials, ) -from openeo_driver.testing import ( - DictSubSet, - RegexMatcher, - ListSubSet, - IgnoreOrder, - caplog_with_custom_formatter, -) +from openeo_driver.testing import DictSubSet, IgnoreOrder, ListSubSet, RegexMatcher, caplog_with_custom_formatter DUMMY_PROCESS = { "summary": "calculate 3+5, please", @@ -316,26 +310,44 @@ def post_jobs(request, context): with pytest.raises(EjrError) as e: _ = ejr.create_job(process=DUMMY_PROCESS, user_id="john") - def test_get_job(self, requests_mock, oidc_mock, ejr): - def post_jobs_search(request, context): - """Handler of `POST /jobs/search""" + def _build_handler_post_jobs_search_single_job_lookup( + self, + *, + job_id: str = "job-123", + backend_id: str = "unittests", + user_id: str = "john", + status: str = "created", + source_fields: Sequence[str] = ("job_id", "user_id", "created", "status", "updated", "*"), + oidc_mock: OidcMock, + ) -> Callable: + """ + Build handler for 'POST /jobs/search for a single job lookup. + """ + + def handler(request, context): assert self._auth_is_valid(oidc_mock=oidc_mock, request=request) assert request.json() == { "query": { "bool": { "filter": [ - {"term": {"backend_id": "unittests"}}, - {"term": {"job_id": "job-123"}}, + {"term": {"backend_id": backend_id}}, + {"term": {"job_id": job_id}}, ] } }, - "_source": IgnoreOrder( - ["job_id", "user_id", "created", "status", "updated", "*"] - ), + "_source": IgnoreOrder(list(source_fields)), } - return [{"job_id": "job-123", "user_id": "john", "status": "created"}] + return [{"job_id": job_id, "user_id": user_id, "status": status}] - requests_mock.post(f"{self.EJR_API_URL}/jobs/search", json=post_jobs_search) + return handler + + def test_get_job(self, requests_mock, oidc_mock, ejr): + requests_mock.post( + f"{self.EJR_API_URL}/jobs/search", + json=self._build_handler_post_jobs_search_single_job_lookup( + job_id="job-123", user_id="john", status="created", oidc_mock=oidc_mock + ), + ) result = ejr.get_job(job_id="job-123") assert result == {"job_id": "job-123", "user_id": "john", "status": "created"} @@ -366,6 +378,58 @@ def test_delete_job_not_found(self, requests_mock, oidc_mock, ejr): _ = ejr.delete_job(job_id="job-123") assert delete_mock.call_count == 1 + def test_delete_job_with_verification_direct(self, requests_mock, oidc_mock, ejr, caplog): + caplog.set_level(logging.DEBUG) + + delete_mock = requests_mock.delete(f"{self.EJR_API_URL}/jobs/job-123", status_code=200, content=b"") + # Empty search response + search_mock = requests_mock.post(f"{self.EJR_API_URL}/jobs/search", json=[]) + + _ = ejr.delete_job(job_id="job-123") + assert delete_mock.call_count == 1 + assert search_mock.call_count == 1 + + log_messages = caplog.messages + for expected in [ + "EJR deleted job_id='job-123'", + "_verify_job_existence job_id='job-123' exists=False backoff=0", + ]: + assert expected in log_messages + + def test_delete_job_with_verification_backoff(self, requests_mock, oidc_mock, ejr, caplog): + caplog.set_level(logging.DEBUG) + + delete_mock = requests_mock.delete(f"{self.EJR_API_URL}/jobs/job-123", status_code=200, content=b"") + search_mock = requests_mock.post( + f"{self.EJR_API_URL}/jobs/search", + [ + # First attempt: still found + { + "json": self._build_handler_post_jobs_search_single_job_lookup( + job_id="job-123", + user_id="john", + status="created", + oidc_mock=oidc_mock, + source_fields=["job_id", "user_id", "created", "status", "updated"], + ) + }, + # Second attempt: not found anymore + {"json": []}, + ], + ) + + _ = ejr.delete_job(job_id="job-123") + assert delete_mock.call_count == 1 + assert search_mock.call_count == 2 + + log_messages = caplog.messages + for expected in [ + "EJR deleted job_id='job-123'", + "_verify_job_existence job_id='job-123' exists=False backoff=0", + "_verify_job_existence job_id='job-123' exists=False backoff=0.1", + ]: + assert expected in log_messages + def test_get_job_multiple_results(self, requests_mock, oidc_mock, ejr): def post_jobs_search(request, context): """Handler of `POST /jobs/search""" From f7c53bbba644c0f86fa6e89a52112424930fef3a Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 3 Oct 2023 22:09:15 +0200 Subject: [PATCH 3/3] Issue #163 EJR: only verify deletion creation should be covered by https://github.com/Open-EO/openeo-job-registry-elastic-api/issues/20 --- openeo_driver/jobregistry.py | 11 ++++------- tests/test_jobregistry.py | 1 - 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/openeo_driver/jobregistry.py b/openeo_driver/jobregistry.py index c2a86491..50662bd7 100644 --- a/openeo_driver/jobregistry.py +++ b/openeo_driver/jobregistry.py @@ -9,7 +9,7 @@ import time import typing from decimal import Decimal -from typing import Any, Dict, List, NamedTuple, Optional, Union +from typing import Any, Dict, List, NamedTuple, Optional, Union, Sequence import requests from openeo.rest.auth.oidc import OidcClientCredentialsAuthenticator, OidcClientInfo, OidcProviderInfo @@ -235,8 +235,6 @@ def __init__( self.set_user_agent() self._debug_show_curl = _debug_show_curl - # TODO: expose this as constructor arg or even config? - self._verification_backoffs = [0, 0.1, 1.0] def set_user_agent(self): user_agent = f"openeo_driver-{openeo_driver._version.__version__}/{self.__class__.__name__}" @@ -391,7 +389,6 @@ def create_job( with self._with_extra_logging(job_id=job_id): self.logger.info(f"EJR creating {job_id=} {created=}") result = self._do_request("POST", "/jobs", json=job_data, expected_status=201) - self._verify_job_existence(job_id=job_id, exists=True) return result def get_job(self, job_id: str, fields: Optional[List[str]] = None) -> JobDict: @@ -432,16 +429,16 @@ def delete_job(self, job_id: str) -> None: raise e self._verify_job_existence(job_id=job_id, exists=False) - def _verify_job_existence(self, job_id: str, exists: bool = True): + def _verify_job_existence(self, job_id: str, exists: bool = True, backoffs: Sequence[float] = (0, 0.1, 1.0)): """ Verify that EJR committed the job creation/deletion :param job_id: job id :param exists: whether the job should exist (after creation) or not exist (after deletion) :return: """ - if not self._verification_backoffs: + if not backoffs: return - for backoff in self._verification_backoffs: + for backoff in backoffs: self.logger.debug(f"_verify_job_existence {job_id=} {exists=} {backoff=}") time.sleep(backoff) try: diff --git a/tests/test_jobregistry.py b/tests/test_jobregistry.py index b6051c0b..15a57a00 100644 --- a/tests/test_jobregistry.py +++ b/tests/test_jobregistry.py @@ -670,7 +670,6 @@ def test_job_id_logging(self, requests_mock, oidc_mock, ejr, caplog): """Check that job_id logging is passed through as logging extra in appropriate places""" caplog.set_level(logging.DEBUG) - job_id = "j-123" def post_jobs(request, context):