From 3ab4ce9aedfcebfa700dd98699c3b684195f1440 Mon Sep 17 00:00:00 2001 From: DonHaul <ramiro.animus@gmail.com> Date: Mon, 1 Jul 2024 11:33:23 +0200 Subject: [PATCH 1/3] workflows: fixes to make author dags work workflows: added github workflows --- .github/workflow/test_and_build.yml | 99 +++++++++++++++++++ .pre-commit-config.yaml | 6 ++ README.md | 7 ++ .../author_create/author_create_approved.py | 29 +++--- .../author_create/author_create_init.py | 14 +-- .../author_create/author_create_rejected.py | 12 +-- dags/author/author_update/author_update.py | 15 +-- plugins/hooks/backoffice/base.py | 5 +- .../backoffice/workflow_management_hook.py | 6 +- .../workflow_ticket_management_hook.py | 6 +- .../inspire_http_record_management_hook.py | 2 +- setup.cfg | 8 ++ 12 files changed, 161 insertions(+), 48 deletions(-) create mode 100644 .github/workflow/test_and_build.yml create mode 100644 setup.cfg diff --git a/.github/workflow/test_and_build.yml b/.github/workflow/test_and_build.yml new file mode 100644 index 0000000..722e4f7 --- /dev/null +++ b/.github/workflow/test_and_build.yml @@ -0,0 +1,99 @@ +name: Build and Test +on: + push: + branches: + - main + pull_request_target: + branches: + - main + +env: + PYTHON_VERSION: 3.8.16 + AIRFLOW_HOME: /home/runner/work/airflow-dags/airflow-dags + REGISTRY: registry.cern.ch + IMAGE: cern-sis/airflow + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@localhost/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@localhost/airflow + AIRFLOW__CELERY__BROKER_URL: redis://localhost:6379/0 + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true" + AIRFLOW__CORE__LOAD_EXAMPLES: "false" + AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth" + IOP_FTP_HOST: "sftp" + SPRINGER_FTP_HOST: "sftp" + OUP_FTP_HOST: "ftp" + S3_ENDPOINT: "http://localhost:9000" + SPRINGER_FTP_PORT: 22 + IOP_FTP_PORT: 22 + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true" +jobs: + build_test: + name: Build and Test + runs-on: ubuntu-latest + steps: + - name: Checkout + if: ${{ github.event_name == 'push' }} + uses: actions/checkout@v3 + + - name: Checkout PR + if: ${{ github.event_name == 'pull_request_target' }} + uses: actions/checkout@v3 + with: + ref: ${{ github.event.pull_request.head.sha }} + + - name: Install Python 3 + uses: actions/setup-python@v4 + with: + python-version: 3.8.16 + + - name: Run services on docker compose + run: docker compose up -d postgres redis sftp ftp s3 create_buckets + + - name: List services for IT Tests + run: docker ps + + - name: Build Image + id: build + uses: cern-sis/gh-workflows/.github/actions/docker-build@v6 + with: + registry: ${{ env.REGISTRY }} + image: ${{ env.IMAGE }} + tags: type=ref,event=pr + cache: false + username: ${{ secrets.HARBOR_USERNAME }} + password: ${{ secrets.HARBOR_PASSWORD }} + + + - name: Run tests with pytest and generate report + run: | + docker run \ + --name airflowdags \ + --network=host \ + -v "$(pwd)"/tests:/opt/airflow/tests \ + -v "$(pwd)"/dags:/opt/airflow/dags \ + -v "$(pwd)"/airflow.cfg:/opt/airflow/airflow.cfg \ + -v "$(pwd)"/data:/opt/airflow/data \ + $REGISTRY/$IMAGE@${{ steps.build.outputs.image-digest }} \ + bash -c "airflow db init && \ + airflow webserver -D && \ + airflow scheduler -D && \ + airflow triggerer -D && \ + airflow celery worker -D && \ + airflow celery flower -D && \ + pytest /opt/airflow/tests --cov=/opt/airflow --cov-report=xml" + + - name: Copy test coverage file to host machine + run: docker cp airflowdags:/opt/airflow/coverage.xml . + + - name: Upload Coverage to Codecov + uses: codecov/codecov-action@v3 + with: + verbose: true + + - name: Deploy QA + if: ${{ github.event_name == 'push' }} + uses: cern-sis/gh-workflows/.github/actions/kubernetes-project-new-images@v6 + with: + event-type: update + images: ${{ env.REGISTRY }}/${{ env.IMAGE }}@${{ steps.build.outputs.image-digest }} + token: ${{ secrets.PAT_FIRE_EVENTS_ON_CERN_SIS_KUBERNETES }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7e63cba..fee2e9a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,3 +14,9 @@ repos: hooks: - id: isort name: isort (python) + args: ["--profile", "black"] +- repo: https://github.com/pycqa/flake8 + rev: "3.9.2" + hooks: + - id: flake8 + args: ["--config=setup.cfg"] diff --git a/README.md b/README.md index 1ade69a..06da890 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,13 @@ The setup is done with docker-compose. In order to run use docker-compose up -d ``` +in the UI go to Admin>Connections and set: +- backoffice_conn: http host.docker.internal:8000 +- inspire_connection: https://inspirebeta.net +in the UI go to Admin>Variables and set: +- backoffice_token auth token from django for a superuser +- inspire_token: in `inspirehep-qa` container use the shell to generate a token + Also, for the dags there's a need to define an airflow variable `inspire_token` which is a token to inspirebeta.net. It can be added via UI (go to Admin -> Variables). diff --git a/dags/author/author_create/author_create_approved.py b/dags/author/author_create/author_create_approved.py index 49edee9..103b858 100644 --- a/dags/author/author_create/author_create_approved.py +++ b/dags/author/author_create/author_create_approved.py @@ -1,32 +1,31 @@ import datetime -import json import logging from airflow.decorators import dag, task from airflow.models.param import Param from airflow.utils.trigger_rule import TriggerRule -from hooks.backoffice import (WorkflowManagementHook, - WorkflowTicketManagementHook) +from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook from hooks.inspirehep.inspire_http_hook import InspireHttpHook -from hooks.inspirehep.inspire_http_record_management_hook import \ - InspireHTTPRecordManagementHook +from hooks.inspirehep.inspire_http_record_management_hook import ( + InspireHTTPRecordManagementHook, +) from include.utils.set_workflow_status import ( - get_wf_status_from_inspire_response, set_workflow_status_to_error) + get_wf_status_from_inspire_response, + set_workflow_status_to_error, +) logger = logging.getLogger(__name__) -default_args = { - "start_date": datetime.datetime(2021, 1, 1), - "schedule_interval": None, -} @dag( - default_args=default_args, params={ "workflow_id": Param(type="string", default=""), "data": Param(type="object", default={}), "create_ticket": Param(type="boolean", default=False), }, + start_date=datetime.datetime(2024, 5, 5), + schedule_interval=None, + catchup=False, on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date! ) def author_create_approved_dag(): @@ -66,7 +65,7 @@ def author_check_approval_branch(**context: dict) -> None: @task def create_author_create_curation_ticket(**context: dict) -> None: - endpoint = "/tickets/create-with-template" + endpoint = "api/tickets/create" request_data = { "functional_category": "", "workflow_id": context["params"]["workflow_id"], @@ -98,9 +97,7 @@ def create_author_on_inspire(**context: dict) -> str: workflow_data["data"]["control_number"] = control_number workflow_management_hook.partial_update_workflow( workflow_id=context["params"]["workflow_id"], - workflow_partial_update_data={ - "data": json.dumps(workflow_data["data"]) - }, + workflow_partial_update_data={"data": workflow_data["data"]}, ) return status @@ -119,7 +116,7 @@ def close_author_create_user_ticket(**context: dict) -> None: ticket_id = workflow_ticket_management_hook.get_ticket( workflow_id=context["params"]["workflow_id"], ticket_type=ticket_type )["ticket_id"] - endpoint = "/tickets/resolve" + endpoint = "api/tickets/resolve" request_data = {"ticket_id": ticket_id} inspire_http_hook.call_api(endpoint=endpoint, data=request_data, method="POST") diff --git a/dags/author/author_create/author_create_init.py b/dags/author/author_create/author_create_init.py index 1f409b1..ceca833 100644 --- a/dags/author/author_create/author_create_init.py +++ b/dags/author/author_create/author_create_init.py @@ -3,25 +3,21 @@ from airflow.decorators import dag, task from airflow.models.param import Param -from hooks.backoffice import (WorkflowManagementHook, - WorkflowTicketManagementHook) +from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook from hooks.inspirehep.inspire_http_hook import InspireHttpHook from include.utils.set_workflow_status import set_workflow_status_to_error logger = logging.getLogger(__name__) -default_args = { - "start_date": datetime.datetime(2021, 1, 1), - "schedule_interval": None, -} - @dag( - default_args=default_args, params={ "workflow_id": Param(type="string", default=""), "data": Param(type="object", default={}), }, + start_date=datetime.datetime(2024, 5, 5), + schedule_interval=None, + catchup=False, on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date! ) def author_create_initialization_dag(): @@ -46,7 +42,7 @@ def set_workflow_status_to_running(**context): @task() def create_author_create_user_ticket(**context: dict) -> None: - endpoint = "/tickets/create-with-template" + endpoint = "/api/tickets/create" request_data = { "functional_category": "Author curation", "template": "user_new_author", diff --git a/dags/author/author_create/author_create_rejected.py b/dags/author/author_create/author_create_rejected.py index 213e31c..249d3ed 100644 --- a/dags/author/author_create/author_create_rejected.py +++ b/dags/author/author_create/author_create_rejected.py @@ -2,23 +2,19 @@ from airflow.decorators import dag, task from airflow.models.param import Param -from hooks.backoffice import (WorkflowManagementHook, - WorkflowTicketManagementHook) +from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook from hooks.inspirehep.inspire_http_hook import InspireHttpHook from include.utils.set_workflow_status import set_workflow_status_to_error -default_args = { - "start_date": datetime.datetime(2021, 1, 1), - "schedule_interval": None, -} - @dag( - default_args=default_args, params={ "workflow_id": Param(type="string", default=""), "data": Param(type="object", default={}), }, + start_date=datetime.datetime(2024, 5, 5), + schedule_interval=None, + catchup=False, on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date! ) def author_create_rejected_dag() -> None: diff --git a/dags/author/author_update/author_update.py b/dags/author/author_update/author_update.py index b3b4a8c..9f33725 100644 --- a/dags/author/author_update/author_update.py +++ b/dags/author/author_update/author_update.py @@ -2,22 +2,25 @@ from airflow.decorators import dag, task from airflow.models.param import Param -from hooks.backoffice import (WorkflowManagementHook, - WorkflowTicketManagementHook) +from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook from hooks.inspirehep.inspire_http_hook import InspireHttpHook -from hooks.inspirehep.inspire_http_record_management_hook import \ - InspireHTTPRecordManagementHook +from hooks.inspirehep.inspire_http_record_management_hook import ( + InspireHTTPRecordManagementHook, +) from include.utils.set_workflow_status import ( - get_wf_status_from_inspire_response, set_workflow_status_to_error) + get_wf_status_from_inspire_response, + set_workflow_status_to_error, +) @dag( - start_date=datetime.datetime(2021, 1, 1), + start_date=datetime.datetime(2024, 5, 5), schedule_interval=None, params={ "workflow_id": Param(type="string", default=""), "data": Param(type="object", default={}), }, + catchup=False, on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date! ) def author_update_dag(): diff --git a/plugins/hooks/backoffice/base.py b/plugins/hooks/backoffice/base.py index 08d2732..5a18aad 100644 --- a/plugins/hooks/backoffice/base.py +++ b/plugins/hooks/backoffice/base.py @@ -25,6 +25,7 @@ def __init__( self.headers = headers or { "Authorization": f'Token {Variable.get("backoffice_token")}', "Accept": "application/json", + "Content-Type": "application/json", } @property @@ -51,8 +52,8 @@ def run( else: url = self.base_url + endpoint - req = requests.Request(method, url, data=data, headers=headers, params=params) + req = requests.Request(method, url, json=data, headers=headers, params=params) prepped_request = session.prepare_request(req) - self.log.info(f"Sending '%s' to url: %s", self.method, url) + self.log.info("Sending '%s' to url: %s", method, url) return self.run_and_check(session, prepped_request, extra_options) diff --git a/plugins/hooks/backoffice/workflow_management_hook.py b/plugins/hooks/backoffice/workflow_management_hook.py index fecf241..a6df67c 100644 --- a/plugins/hooks/backoffice/workflow_management_hook.py +++ b/plugins/hooks/backoffice/workflow_management_hook.py @@ -29,7 +29,7 @@ def set_workflow_status(self, status_name: str, workflow_id: str) -> Response: ) def get_workflow(self, workflow_id: str) -> dict: - endpoint = f"/workflows/{workflow_id}" + endpoint = f"api/workflows/{workflow_id}" response = self.run_with_advanced_retry( _retry_args=self.tenacity_retry_kwargs, method="GET", endpoint=endpoint ) @@ -37,7 +37,7 @@ def get_workflow(self, workflow_id: str) -> dict: return response.json() def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response: - endpoint = f"/workflows/{workflow_id}/" + endpoint = f"api/workflows/{workflow_id}/" return self.run_with_advanced_retry( _retry_args=self.tenacity_retry_kwargs, method="PUT", @@ -48,7 +48,7 @@ def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response: def partial_update_workflow( self, workflow_id: str, workflow_partial_update_data: dict ) -> Response: - endpoint = f"/workflow-update/{workflow_id}/" + endpoint = f"api/workflow-update/{workflow_id}/" return self.run_with_advanced_retry( _retry_args=self.tenacity_retry_kwargs, method="PATCH", diff --git a/plugins/hooks/backoffice/workflow_ticket_management_hook.py b/plugins/hooks/backoffice/workflow_ticket_management_hook.py index 3590289..e26047a 100644 --- a/plugins/hooks/backoffice/workflow_ticket_management_hook.py +++ b/plugins/hooks/backoffice/workflow_ticket_management_hook.py @@ -19,10 +19,10 @@ def __init__( headers: dict = None, ) -> None: super().__init__(method, http_conn_id, headers) - self.endpoint = "/workflow-ticket/" + self.endpoint = "api/workflow-ticket/" def get_ticket(self, workflow_id: str, ticket_type: str) -> dict: - endpoint = f"/workflow-ticket/{workflow_id}/" + endpoint = f"api/workflow-ticket/{workflow_id}/" params = {"ticket_type": ticket_type} response = self.run_with_advanced_retry( _retry_args=self.tenacity_retry_kwargs, @@ -35,7 +35,7 @@ def get_ticket(self, workflow_id: str, ticket_type: str) -> dict: def create_ticket_entry( self, workflow_id: str, ticket_id: str, ticket_type: str ) -> Response: - endpoint = f"/workflow-ticket/" + endpoint = "api/workflow-ticket/" data = { "ticket_type": ticket_type, "ticket_id": ticket_id, diff --git a/plugins/hooks/inspirehep/inspire_http_record_management_hook.py b/plugins/hooks/inspirehep/inspire_http_record_management_hook.py index f4f54cf..a3b397d 100644 --- a/plugins/hooks/inspirehep/inspire_http_record_management_hook.py +++ b/plugins/hooks/inspirehep/inspire_http_record_management_hook.py @@ -40,5 +40,5 @@ def post_record(self, data: dict, pid_type: str) -> Response: method="POST", headers=self.headers, json=data, - endpoint=f"/{pid_type}", + endpoint=f"api/{pid_type}", ) diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..f581081 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,8 @@ +# flake8 don't support pyproject.toml +# https://github.com/PyCQA/flake8/issues/234 +[flake8] +# E501 black takes care of the line size +max-line-length = 88 +exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,venv,.venv +ignore = D401,W503,E501,E265,E203 +max-complexity = 15 From 4105112689ef64f8786c93f1256aeeec596388a1 Mon Sep 17 00:00:00 2001 From: DonHaul <ramiro.animus@gmail.com> Date: Mon, 8 Jul 2024 17:58:42 +0200 Subject: [PATCH 2/3] removed unnecessary services bumped python version --- .github/workflows/test_and_build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_and_build.yml b/.github/workflows/test_and_build.yml index 6ed60b8..ed300da 100644 --- a/.github/workflows/test_and_build.yml +++ b/.github/workflows/test_and_build.yml @@ -47,7 +47,7 @@ jobs: python-version: 3.11.9 - name: Run services on docker compose - run: docker compose up -d postgres redis sftp ftp s3 create_buckets + run: docker compose up -d postgres redis s3 create_buckets - name: List services for IT Tests run: docker ps From e63b9dc5c4132bc711ad7c068c17cc43c7171867 Mon Sep 17 00:00:00 2001 From: DonHaul <ramiro.animus@gmail.com> Date: Mon, 8 Jul 2024 18:06:06 +0200 Subject: [PATCH 3/3] Revert "workflows: fixes to make author dags work" This reverts commit 1167c4d654882849047272a5d5dea6c065cf0781. --- .github/workflow/test_and_build.yml | 99 ------------------- .pre-commit-config.yaml | 6 -- README.md | 7 -- .../author_create/author_create_approved.py | 29 +++--- .../author_create/author_create_init.py | 14 ++- .../author_create/author_create_rejected.py | 12 ++- dags/author/author_update/author_update.py | 15 ++- plugins/hooks/backoffice/base.py | 5 +- .../backoffice/workflow_management_hook.py | 6 +- .../workflow_ticket_management_hook.py | 6 +- .../inspire_http_record_management_hook.py | 2 +- setup.cfg | 8 -- 12 files changed, 48 insertions(+), 161 deletions(-) delete mode 100644 .github/workflow/test_and_build.yml delete mode 100644 setup.cfg diff --git a/.github/workflow/test_and_build.yml b/.github/workflow/test_and_build.yml deleted file mode 100644 index 722e4f7..0000000 --- a/.github/workflow/test_and_build.yml +++ /dev/null @@ -1,99 +0,0 @@ -name: Build and Test -on: - push: - branches: - - main - pull_request_target: - branches: - - main - -env: - PYTHON_VERSION: 3.8.16 - AIRFLOW_HOME: /home/runner/work/airflow-dags/airflow-dags - REGISTRY: registry.cern.ch - IMAGE: cern-sis/airflow - AIRFLOW__CORE__EXECUTOR: CeleryExecutor - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@localhost/airflow - AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@localhost/airflow - AIRFLOW__CELERY__BROKER_URL: redis://localhost:6379/0 - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true" - AIRFLOW__CORE__LOAD_EXAMPLES: "false" - AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth" - IOP_FTP_HOST: "sftp" - SPRINGER_FTP_HOST: "sftp" - OUP_FTP_HOST: "ftp" - S3_ENDPOINT: "http://localhost:9000" - SPRINGER_FTP_PORT: 22 - IOP_FTP_PORT: 22 - AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true" -jobs: - build_test: - name: Build and Test - runs-on: ubuntu-latest - steps: - - name: Checkout - if: ${{ github.event_name == 'push' }} - uses: actions/checkout@v3 - - - name: Checkout PR - if: ${{ github.event_name == 'pull_request_target' }} - uses: actions/checkout@v3 - with: - ref: ${{ github.event.pull_request.head.sha }} - - - name: Install Python 3 - uses: actions/setup-python@v4 - with: - python-version: 3.8.16 - - - name: Run services on docker compose - run: docker compose up -d postgres redis sftp ftp s3 create_buckets - - - name: List services for IT Tests - run: docker ps - - - name: Build Image - id: build - uses: cern-sis/gh-workflows/.github/actions/docker-build@v6 - with: - registry: ${{ env.REGISTRY }} - image: ${{ env.IMAGE }} - tags: type=ref,event=pr - cache: false - username: ${{ secrets.HARBOR_USERNAME }} - password: ${{ secrets.HARBOR_PASSWORD }} - - - - name: Run tests with pytest and generate report - run: | - docker run \ - --name airflowdags \ - --network=host \ - -v "$(pwd)"/tests:/opt/airflow/tests \ - -v "$(pwd)"/dags:/opt/airflow/dags \ - -v "$(pwd)"/airflow.cfg:/opt/airflow/airflow.cfg \ - -v "$(pwd)"/data:/opt/airflow/data \ - $REGISTRY/$IMAGE@${{ steps.build.outputs.image-digest }} \ - bash -c "airflow db init && \ - airflow webserver -D && \ - airflow scheduler -D && \ - airflow triggerer -D && \ - airflow celery worker -D && \ - airflow celery flower -D && \ - pytest /opt/airflow/tests --cov=/opt/airflow --cov-report=xml" - - - name: Copy test coverage file to host machine - run: docker cp airflowdags:/opt/airflow/coverage.xml . - - - name: Upload Coverage to Codecov - uses: codecov/codecov-action@v3 - with: - verbose: true - - - name: Deploy QA - if: ${{ github.event_name == 'push' }} - uses: cern-sis/gh-workflows/.github/actions/kubernetes-project-new-images@v6 - with: - event-type: update - images: ${{ env.REGISTRY }}/${{ env.IMAGE }}@${{ steps.build.outputs.image-digest }} - token: ${{ secrets.PAT_FIRE_EVENTS_ON_CERN_SIS_KUBERNETES }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index fee2e9a..7e63cba 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,9 +14,3 @@ repos: hooks: - id: isort name: isort (python) - args: ["--profile", "black"] -- repo: https://github.com/pycqa/flake8 - rev: "3.9.2" - hooks: - - id: flake8 - args: ["--config=setup.cfg"] diff --git a/README.md b/README.md index 06da890..1ade69a 100644 --- a/README.md +++ b/README.md @@ -8,13 +8,6 @@ The setup is done with docker-compose. In order to run use docker-compose up -d ``` -in the UI go to Admin>Connections and set: -- backoffice_conn: http host.docker.internal:8000 -- inspire_connection: https://inspirebeta.net -in the UI go to Admin>Variables and set: -- backoffice_token auth token from django for a superuser -- inspire_token: in `inspirehep-qa` container use the shell to generate a token - Also, for the dags there's a need to define an airflow variable `inspire_token` which is a token to inspirebeta.net. It can be added via UI (go to Admin -> Variables). diff --git a/dags/author/author_create/author_create_approved.py b/dags/author/author_create/author_create_approved.py index 103b858..49edee9 100644 --- a/dags/author/author_create/author_create_approved.py +++ b/dags/author/author_create/author_create_approved.py @@ -1,31 +1,32 @@ import datetime +import json import logging from airflow.decorators import dag, task from airflow.models.param import Param from airflow.utils.trigger_rule import TriggerRule -from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook +from hooks.backoffice import (WorkflowManagementHook, + WorkflowTicketManagementHook) from hooks.inspirehep.inspire_http_hook import InspireHttpHook -from hooks.inspirehep.inspire_http_record_management_hook import ( - InspireHTTPRecordManagementHook, -) +from hooks.inspirehep.inspire_http_record_management_hook import \ + InspireHTTPRecordManagementHook from include.utils.set_workflow_status import ( - get_wf_status_from_inspire_response, - set_workflow_status_to_error, -) + get_wf_status_from_inspire_response, set_workflow_status_to_error) logger = logging.getLogger(__name__) +default_args = { + "start_date": datetime.datetime(2021, 1, 1), + "schedule_interval": None, +} @dag( + default_args=default_args, params={ "workflow_id": Param(type="string", default=""), "data": Param(type="object", default={}), "create_ticket": Param(type="boolean", default=False), }, - start_date=datetime.datetime(2024, 5, 5), - schedule_interval=None, - catchup=False, on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date! ) def author_create_approved_dag(): @@ -65,7 +66,7 @@ def author_check_approval_branch(**context: dict) -> None: @task def create_author_create_curation_ticket(**context: dict) -> None: - endpoint = "api/tickets/create" + endpoint = "/tickets/create-with-template" request_data = { "functional_category": "", "workflow_id": context["params"]["workflow_id"], @@ -97,7 +98,9 @@ def create_author_on_inspire(**context: dict) -> str: workflow_data["data"]["control_number"] = control_number workflow_management_hook.partial_update_workflow( workflow_id=context["params"]["workflow_id"], - workflow_partial_update_data={"data": workflow_data["data"]}, + workflow_partial_update_data={ + "data": json.dumps(workflow_data["data"]) + }, ) return status @@ -116,7 +119,7 @@ def close_author_create_user_ticket(**context: dict) -> None: ticket_id = workflow_ticket_management_hook.get_ticket( workflow_id=context["params"]["workflow_id"], ticket_type=ticket_type )["ticket_id"] - endpoint = "api/tickets/resolve" + endpoint = "/tickets/resolve" request_data = {"ticket_id": ticket_id} inspire_http_hook.call_api(endpoint=endpoint, data=request_data, method="POST") diff --git a/dags/author/author_create/author_create_init.py b/dags/author/author_create/author_create_init.py index ceca833..1f409b1 100644 --- a/dags/author/author_create/author_create_init.py +++ b/dags/author/author_create/author_create_init.py @@ -3,21 +3,25 @@ from airflow.decorators import dag, task from airflow.models.param import Param -from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook +from hooks.backoffice import (WorkflowManagementHook, + WorkflowTicketManagementHook) from hooks.inspirehep.inspire_http_hook import InspireHttpHook from include.utils.set_workflow_status import set_workflow_status_to_error logger = logging.getLogger(__name__) +default_args = { + "start_date": datetime.datetime(2021, 1, 1), + "schedule_interval": None, +} + @dag( + default_args=default_args, params={ "workflow_id": Param(type="string", default=""), "data": Param(type="object", default={}), }, - start_date=datetime.datetime(2024, 5, 5), - schedule_interval=None, - catchup=False, on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date! ) def author_create_initialization_dag(): @@ -42,7 +46,7 @@ def set_workflow_status_to_running(**context): @task() def create_author_create_user_ticket(**context: dict) -> None: - endpoint = "/api/tickets/create" + endpoint = "/tickets/create-with-template" request_data = { "functional_category": "Author curation", "template": "user_new_author", diff --git a/dags/author/author_create/author_create_rejected.py b/dags/author/author_create/author_create_rejected.py index 249d3ed..213e31c 100644 --- a/dags/author/author_create/author_create_rejected.py +++ b/dags/author/author_create/author_create_rejected.py @@ -2,19 +2,23 @@ from airflow.decorators import dag, task from airflow.models.param import Param -from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook +from hooks.backoffice import (WorkflowManagementHook, + WorkflowTicketManagementHook) from hooks.inspirehep.inspire_http_hook import InspireHttpHook from include.utils.set_workflow_status import set_workflow_status_to_error +default_args = { + "start_date": datetime.datetime(2021, 1, 1), + "schedule_interval": None, +} + @dag( + default_args=default_args, params={ "workflow_id": Param(type="string", default=""), "data": Param(type="object", default={}), }, - start_date=datetime.datetime(2024, 5, 5), - schedule_interval=None, - catchup=False, on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date! ) def author_create_rejected_dag() -> None: diff --git a/dags/author/author_update/author_update.py b/dags/author/author_update/author_update.py index 9f33725..b3b4a8c 100644 --- a/dags/author/author_update/author_update.py +++ b/dags/author/author_update/author_update.py @@ -2,25 +2,22 @@ from airflow.decorators import dag, task from airflow.models.param import Param -from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook +from hooks.backoffice import (WorkflowManagementHook, + WorkflowTicketManagementHook) from hooks.inspirehep.inspire_http_hook import InspireHttpHook -from hooks.inspirehep.inspire_http_record_management_hook import ( - InspireHTTPRecordManagementHook, -) +from hooks.inspirehep.inspire_http_record_management_hook import \ + InspireHTTPRecordManagementHook from include.utils.set_workflow_status import ( - get_wf_status_from_inspire_response, - set_workflow_status_to_error, -) + get_wf_status_from_inspire_response, set_workflow_status_to_error) @dag( - start_date=datetime.datetime(2024, 5, 5), + start_date=datetime.datetime(2021, 1, 1), schedule_interval=None, params={ "workflow_id": Param(type="string", default=""), "data": Param(type="object", default={}), }, - catchup=False, on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date! ) def author_update_dag(): diff --git a/plugins/hooks/backoffice/base.py b/plugins/hooks/backoffice/base.py index 5a18aad..08d2732 100644 --- a/plugins/hooks/backoffice/base.py +++ b/plugins/hooks/backoffice/base.py @@ -25,7 +25,6 @@ def __init__( self.headers = headers or { "Authorization": f'Token {Variable.get("backoffice_token")}', "Accept": "application/json", - "Content-Type": "application/json", } @property @@ -52,8 +51,8 @@ def run( else: url = self.base_url + endpoint - req = requests.Request(method, url, json=data, headers=headers, params=params) + req = requests.Request(method, url, data=data, headers=headers, params=params) prepped_request = session.prepare_request(req) - self.log.info("Sending '%s' to url: %s", method, url) + self.log.info(f"Sending '%s' to url: %s", self.method, url) return self.run_and_check(session, prepped_request, extra_options) diff --git a/plugins/hooks/backoffice/workflow_management_hook.py b/plugins/hooks/backoffice/workflow_management_hook.py index a6df67c..fecf241 100644 --- a/plugins/hooks/backoffice/workflow_management_hook.py +++ b/plugins/hooks/backoffice/workflow_management_hook.py @@ -29,7 +29,7 @@ def set_workflow_status(self, status_name: str, workflow_id: str) -> Response: ) def get_workflow(self, workflow_id: str) -> dict: - endpoint = f"api/workflows/{workflow_id}" + endpoint = f"/workflows/{workflow_id}" response = self.run_with_advanced_retry( _retry_args=self.tenacity_retry_kwargs, method="GET", endpoint=endpoint ) @@ -37,7 +37,7 @@ def get_workflow(self, workflow_id: str) -> dict: return response.json() def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response: - endpoint = f"api/workflows/{workflow_id}/" + endpoint = f"/workflows/{workflow_id}/" return self.run_with_advanced_retry( _retry_args=self.tenacity_retry_kwargs, method="PUT", @@ -48,7 +48,7 @@ def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response: def partial_update_workflow( self, workflow_id: str, workflow_partial_update_data: dict ) -> Response: - endpoint = f"api/workflow-update/{workflow_id}/" + endpoint = f"/workflow-update/{workflow_id}/" return self.run_with_advanced_retry( _retry_args=self.tenacity_retry_kwargs, method="PATCH", diff --git a/plugins/hooks/backoffice/workflow_ticket_management_hook.py b/plugins/hooks/backoffice/workflow_ticket_management_hook.py index e26047a..3590289 100644 --- a/plugins/hooks/backoffice/workflow_ticket_management_hook.py +++ b/plugins/hooks/backoffice/workflow_ticket_management_hook.py @@ -19,10 +19,10 @@ def __init__( headers: dict = None, ) -> None: super().__init__(method, http_conn_id, headers) - self.endpoint = "api/workflow-ticket/" + self.endpoint = "/workflow-ticket/" def get_ticket(self, workflow_id: str, ticket_type: str) -> dict: - endpoint = f"api/workflow-ticket/{workflow_id}/" + endpoint = f"/workflow-ticket/{workflow_id}/" params = {"ticket_type": ticket_type} response = self.run_with_advanced_retry( _retry_args=self.tenacity_retry_kwargs, @@ -35,7 +35,7 @@ def get_ticket(self, workflow_id: str, ticket_type: str) -> dict: def create_ticket_entry( self, workflow_id: str, ticket_id: str, ticket_type: str ) -> Response: - endpoint = "api/workflow-ticket/" + endpoint = f"/workflow-ticket/" data = { "ticket_type": ticket_type, "ticket_id": ticket_id, diff --git a/plugins/hooks/inspirehep/inspire_http_record_management_hook.py b/plugins/hooks/inspirehep/inspire_http_record_management_hook.py index a3b397d..f4f54cf 100644 --- a/plugins/hooks/inspirehep/inspire_http_record_management_hook.py +++ b/plugins/hooks/inspirehep/inspire_http_record_management_hook.py @@ -40,5 +40,5 @@ def post_record(self, data: dict, pid_type: str) -> Response: method="POST", headers=self.headers, json=data, - endpoint=f"api/{pid_type}", + endpoint=f"/{pid_type}", ) diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index f581081..0000000 --- a/setup.cfg +++ /dev/null @@ -1,8 +0,0 @@ -# flake8 don't support pyproject.toml -# https://github.com/PyCQA/flake8/issues/234 -[flake8] -# E501 black takes care of the line size -max-line-length = 88 -exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,venv,.venv -ignore = D401,W503,E501,E265,E203 -max-complexity = 15