diff --git a/.github/workflow/test_and_build.yml b/.github/workflow/test_and_build.yml deleted file mode 100644 index 722e4f7..0000000 --- a/.github/workflow/test_and_build.yml +++ /dev/null @@ -1,99 +0,0 @@ -name: Build and Test -on: - push: - branches: - - main - pull_request_target: - branches: - - main - -env: - PYTHON_VERSION: 3.8.16 - AIRFLOW_HOME: /home/runner/work/airflow-dags/airflow-dags - REGISTRY: registry.cern.ch - IMAGE: cern-sis/airflow - AIRFLOW__CORE__EXECUTOR: CeleryExecutor - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@localhost/airflow - AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@localhost/airflow - AIRFLOW__CELERY__BROKER_URL: redis://localhost:6379/0 - AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true" - AIRFLOW__CORE__LOAD_EXAMPLES: "false" - AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth" - IOP_FTP_HOST: "sftp" - SPRINGER_FTP_HOST: "sftp" - OUP_FTP_HOST: "ftp" - S3_ENDPOINT: "http://localhost:9000" - SPRINGER_FTP_PORT: 22 - IOP_FTP_PORT: 22 - AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true" -jobs: - build_test: - name: Build and Test - runs-on: ubuntu-latest - steps: - - name: Checkout - if: ${{ github.event_name == 'push' }} - uses: actions/checkout@v3 - - - name: Checkout PR - if: ${{ github.event_name == 'pull_request_target' }} - uses: actions/checkout@v3 - with: - ref: ${{ github.event.pull_request.head.sha }} - - - name: Install Python 3 - uses: actions/setup-python@v4 - with: - python-version: 3.8.16 - - - name: Run services on docker compose - run: docker compose up -d postgres redis sftp ftp s3 create_buckets - - - name: List services for IT Tests - run: docker ps - - - name: Build Image - id: build - uses: cern-sis/gh-workflows/.github/actions/docker-build@v6 - with: - registry: ${{ env.REGISTRY }} - image: ${{ env.IMAGE }} - tags: type=ref,event=pr - cache: false - username: ${{ secrets.HARBOR_USERNAME }} - password: ${{ secrets.HARBOR_PASSWORD }} - - - - name: Run tests with pytest and generate report - run: | - docker run \ - --name airflowdags \ - --network=host \ - -v "$(pwd)"/tests:/opt/airflow/tests \ - -v "$(pwd)"/dags:/opt/airflow/dags \ - -v "$(pwd)"/airflow.cfg:/opt/airflow/airflow.cfg \ - -v "$(pwd)"/data:/opt/airflow/data \ - $REGISTRY/$IMAGE@${{ steps.build.outputs.image-digest }} \ - bash -c "airflow db init && \ - airflow webserver -D && \ - airflow scheduler -D && \ - airflow triggerer -D && \ - airflow celery worker -D && \ - airflow celery flower -D && \ - pytest /opt/airflow/tests --cov=/opt/airflow --cov-report=xml" - - - name: Copy test coverage file to host machine - run: docker cp airflowdags:/opt/airflow/coverage.xml . - - - name: Upload Coverage to Codecov - uses: codecov/codecov-action@v3 - with: - verbose: true - - - name: Deploy QA - if: ${{ github.event_name == 'push' }} - uses: cern-sis/gh-workflows/.github/actions/kubernetes-project-new-images@v6 - with: - event-type: update - images: ${{ env.REGISTRY }}/${{ env.IMAGE }}@${{ steps.build.outputs.image-digest }} - token: ${{ secrets.PAT_FIRE_EVENTS_ON_CERN_SIS_KUBERNETES }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index fee2e9a..7e63cba 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,9 +14,3 @@ repos: hooks: - id: isort name: isort (python) - args: ["--profile", "black"] -- repo: https://github.com/pycqa/flake8 - rev: "3.9.2" - hooks: - - id: flake8 - args: ["--config=setup.cfg"] diff --git a/README.md b/README.md index 06da890..1ade69a 100644 --- a/README.md +++ b/README.md @@ -8,13 +8,6 @@ The setup is done with docker-compose. In order to run use docker-compose up -d ``` -in the UI go to Admin>Connections and set: -- backoffice_conn: http host.docker.internal:8000 -- inspire_connection: https://inspirebeta.net -in the UI go to Admin>Variables and set: -- backoffice_token auth token from django for a superuser -- inspire_token: in `inspirehep-qa` container use the shell to generate a token - Also, for the dags there's a need to define an airflow variable `inspire_token` which is a token to inspirebeta.net. It can be added via UI (go to Admin -> Variables). diff --git a/dags/author/author_create/author_create_approved.py b/dags/author/author_create/author_create_approved.py index 103b858..49edee9 100644 --- a/dags/author/author_create/author_create_approved.py +++ b/dags/author/author_create/author_create_approved.py @@ -1,31 +1,32 @@ import datetime +import json import logging from airflow.decorators import dag, task from airflow.models.param import Param from airflow.utils.trigger_rule import TriggerRule -from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook +from hooks.backoffice import (WorkflowManagementHook, + WorkflowTicketManagementHook) from hooks.inspirehep.inspire_http_hook import InspireHttpHook -from hooks.inspirehep.inspire_http_record_management_hook import ( - InspireHTTPRecordManagementHook, -) +from hooks.inspirehep.inspire_http_record_management_hook import \ + InspireHTTPRecordManagementHook from include.utils.set_workflow_status import ( - get_wf_status_from_inspire_response, - set_workflow_status_to_error, -) + get_wf_status_from_inspire_response, set_workflow_status_to_error) logger = logging.getLogger(__name__) +default_args = { + "start_date": datetime.datetime(2021, 1, 1), + "schedule_interval": None, +} @dag( + default_args=default_args, params={ "workflow_id": Param(type="string", default=""), "data": Param(type="object", default={}), "create_ticket": Param(type="boolean", default=False), }, - start_date=datetime.datetime(2024, 5, 5), - schedule_interval=None, - catchup=False, on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date! ) def author_create_approved_dag(): @@ -65,7 +66,7 @@ def author_check_approval_branch(**context: dict) -> None: @task def create_author_create_curation_ticket(**context: dict) -> None: - endpoint = "api/tickets/create" + endpoint = "/tickets/create-with-template" request_data = { "functional_category": "", "workflow_id": context["params"]["workflow_id"], @@ -97,7 +98,9 @@ def create_author_on_inspire(**context: dict) -> str: workflow_data["data"]["control_number"] = control_number workflow_management_hook.partial_update_workflow( workflow_id=context["params"]["workflow_id"], - workflow_partial_update_data={"data": workflow_data["data"]}, + workflow_partial_update_data={ + "data": json.dumps(workflow_data["data"]) + }, ) return status @@ -116,7 +119,7 @@ def close_author_create_user_ticket(**context: dict) -> None: ticket_id = workflow_ticket_management_hook.get_ticket( workflow_id=context["params"]["workflow_id"], ticket_type=ticket_type )["ticket_id"] - endpoint = "api/tickets/resolve" + endpoint = "/tickets/resolve" request_data = {"ticket_id": ticket_id} inspire_http_hook.call_api(endpoint=endpoint, data=request_data, method="POST") diff --git a/dags/author/author_create/author_create_init.py b/dags/author/author_create/author_create_init.py index ceca833..1f409b1 100644 --- a/dags/author/author_create/author_create_init.py +++ b/dags/author/author_create/author_create_init.py @@ -3,21 +3,25 @@ from airflow.decorators import dag, task from airflow.models.param import Param -from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook +from hooks.backoffice import (WorkflowManagementHook, + WorkflowTicketManagementHook) from hooks.inspirehep.inspire_http_hook import InspireHttpHook from include.utils.set_workflow_status import set_workflow_status_to_error logger = logging.getLogger(__name__) +default_args = { + "start_date": datetime.datetime(2021, 1, 1), + "schedule_interval": None, +} + @dag( + default_args=default_args, params={ "workflow_id": Param(type="string", default=""), "data": Param(type="object", default={}), }, - start_date=datetime.datetime(2024, 5, 5), - schedule_interval=None, - catchup=False, on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date! ) def author_create_initialization_dag(): @@ -42,7 +46,7 @@ def set_workflow_status_to_running(**context): @task() def create_author_create_user_ticket(**context: dict) -> None: - endpoint = "/api/tickets/create" + endpoint = "/tickets/create-with-template" request_data = { "functional_category": "Author curation", "template": "user_new_author", diff --git a/dags/author/author_create/author_create_rejected.py b/dags/author/author_create/author_create_rejected.py index 249d3ed..213e31c 100644 --- a/dags/author/author_create/author_create_rejected.py +++ b/dags/author/author_create/author_create_rejected.py @@ -2,19 +2,23 @@ from airflow.decorators import dag, task from airflow.models.param import Param -from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook +from hooks.backoffice import (WorkflowManagementHook, + WorkflowTicketManagementHook) from hooks.inspirehep.inspire_http_hook import InspireHttpHook from include.utils.set_workflow_status import set_workflow_status_to_error +default_args = { + "start_date": datetime.datetime(2021, 1, 1), + "schedule_interval": None, +} + @dag( + default_args=default_args, params={ "workflow_id": Param(type="string", default=""), "data": Param(type="object", default={}), }, - start_date=datetime.datetime(2024, 5, 5), - schedule_interval=None, - catchup=False, on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date! ) def author_create_rejected_dag() -> None: diff --git a/dags/author/author_update/author_update.py b/dags/author/author_update/author_update.py index 9f33725..b3b4a8c 100644 --- a/dags/author/author_update/author_update.py +++ b/dags/author/author_update/author_update.py @@ -2,25 +2,22 @@ from airflow.decorators import dag, task from airflow.models.param import Param -from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook +from hooks.backoffice import (WorkflowManagementHook, + WorkflowTicketManagementHook) from hooks.inspirehep.inspire_http_hook import InspireHttpHook -from hooks.inspirehep.inspire_http_record_management_hook import ( - InspireHTTPRecordManagementHook, -) +from hooks.inspirehep.inspire_http_record_management_hook import \ + InspireHTTPRecordManagementHook from include.utils.set_workflow_status import ( - get_wf_status_from_inspire_response, - set_workflow_status_to_error, -) + get_wf_status_from_inspire_response, set_workflow_status_to_error) @dag( - start_date=datetime.datetime(2024, 5, 5), + start_date=datetime.datetime(2021, 1, 1), schedule_interval=None, params={ "workflow_id": Param(type="string", default=""), "data": Param(type="object", default={}), }, - catchup=False, on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date! ) def author_update_dag(): diff --git a/plugins/hooks/backoffice/base.py b/plugins/hooks/backoffice/base.py index 5a18aad..08d2732 100644 --- a/plugins/hooks/backoffice/base.py +++ b/plugins/hooks/backoffice/base.py @@ -25,7 +25,6 @@ def __init__( self.headers = headers or { "Authorization": f'Token {Variable.get("backoffice_token")}', "Accept": "application/json", - "Content-Type": "application/json", } @property @@ -52,8 +51,8 @@ def run( else: url = self.base_url + endpoint - req = requests.Request(method, url, json=data, headers=headers, params=params) + req = requests.Request(method, url, data=data, headers=headers, params=params) prepped_request = session.prepare_request(req) - self.log.info("Sending '%s' to url: %s", method, url) + self.log.info(f"Sending '%s' to url: %s", self.method, url) return self.run_and_check(session, prepped_request, extra_options) diff --git a/plugins/hooks/backoffice/workflow_management_hook.py b/plugins/hooks/backoffice/workflow_management_hook.py index a6df67c..fecf241 100644 --- a/plugins/hooks/backoffice/workflow_management_hook.py +++ b/plugins/hooks/backoffice/workflow_management_hook.py @@ -29,7 +29,7 @@ def set_workflow_status(self, status_name: str, workflow_id: str) -> Response: ) def get_workflow(self, workflow_id: str) -> dict: - endpoint = f"api/workflows/{workflow_id}" + endpoint = f"/workflows/{workflow_id}" response = self.run_with_advanced_retry( _retry_args=self.tenacity_retry_kwargs, method="GET", endpoint=endpoint ) @@ -37,7 +37,7 @@ def get_workflow(self, workflow_id: str) -> dict: return response.json() def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response: - endpoint = f"api/workflows/{workflow_id}/" + endpoint = f"/workflows/{workflow_id}/" return self.run_with_advanced_retry( _retry_args=self.tenacity_retry_kwargs, method="PUT", @@ -48,7 +48,7 @@ def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response: def partial_update_workflow( self, workflow_id: str, workflow_partial_update_data: dict ) -> Response: - endpoint = f"api/workflow-update/{workflow_id}/" + endpoint = f"/workflow-update/{workflow_id}/" return self.run_with_advanced_retry( _retry_args=self.tenacity_retry_kwargs, method="PATCH", diff --git a/plugins/hooks/backoffice/workflow_ticket_management_hook.py b/plugins/hooks/backoffice/workflow_ticket_management_hook.py index e26047a..3590289 100644 --- a/plugins/hooks/backoffice/workflow_ticket_management_hook.py +++ b/plugins/hooks/backoffice/workflow_ticket_management_hook.py @@ -19,10 +19,10 @@ def __init__( headers: dict = None, ) -> None: super().__init__(method, http_conn_id, headers) - self.endpoint = "api/workflow-ticket/" + self.endpoint = "/workflow-ticket/" def get_ticket(self, workflow_id: str, ticket_type: str) -> dict: - endpoint = f"api/workflow-ticket/{workflow_id}/" + endpoint = f"/workflow-ticket/{workflow_id}/" params = {"ticket_type": ticket_type} response = self.run_with_advanced_retry( _retry_args=self.tenacity_retry_kwargs, @@ -35,7 +35,7 @@ def get_ticket(self, workflow_id: str, ticket_type: str) -> dict: def create_ticket_entry( self, workflow_id: str, ticket_id: str, ticket_type: str ) -> Response: - endpoint = "api/workflow-ticket/" + endpoint = f"/workflow-ticket/" data = { "ticket_type": ticket_type, "ticket_id": ticket_id, diff --git a/plugins/hooks/inspirehep/inspire_http_record_management_hook.py b/plugins/hooks/inspirehep/inspire_http_record_management_hook.py index a3b397d..f4f54cf 100644 --- a/plugins/hooks/inspirehep/inspire_http_record_management_hook.py +++ b/plugins/hooks/inspirehep/inspire_http_record_management_hook.py @@ -40,5 +40,5 @@ def post_record(self, data: dict, pid_type: str) -> Response: method="POST", headers=self.headers, json=data, - endpoint=f"api/{pid_type}", + endpoint=f"/{pid_type}", ) diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index f581081..0000000 --- a/setup.cfg +++ /dev/null @@ -1,8 +0,0 @@ -# flake8 don't support pyproject.toml -# https://github.com/PyCQA/flake8/issues/234 -[flake8] -# E501 black takes care of the line size -max-line-length = 88 -exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,venv,.venv -ignore = D401,W503,E501,E265,E203 -max-complexity = 15