Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
workflows: fixes to make author dags work
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul committed Jul 8, 2024
1 parent c744e74 commit 5e71e8f
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 48 deletions.
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@ repos:
hooks:
- id: isort
name: isort (python)
args: ["--profile", "black"]
- repo: https://github.com/pycqa/flake8
rev: "3.9.2"
hooks:
- id: flake8
args: ["--config=setup.cfg"]
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ The setup is done with docker-compose. In order to run use
docker-compose up -d
```

in the UI go to Admin>Connections and set:
- backoffice_conn: http host.docker.internal:8000
- inspire_connection: https://inspirebeta.net
in the UI go to Admin>Variables and set:
- backoffice_token auth token from django for a superuser
- inspire_token: in `inspirehep-qa` container use the shell to generate a token

Also, for the dags there's a need to define an airflow variable `inspire_token` which is a token to inspirebeta.net.
It can be added via UI (go to Admin -> Variables).

Expand Down
29 changes: 13 additions & 16 deletions dags/author/author_create/author_create_approved.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
import datetime
import json
import logging

from airflow.decorators import dag, task
from airflow.models.param import Param
from airflow.utils.trigger_rule import TriggerRule
from hooks.backoffice import (WorkflowManagementHook,
WorkflowTicketManagementHook)
from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook
from hooks.inspirehep.inspire_http_hook import InspireHttpHook
from hooks.inspirehep.inspire_http_record_management_hook import \
InspireHTTPRecordManagementHook
from hooks.inspirehep.inspire_http_record_management_hook import (
InspireHTTPRecordManagementHook,
)
from include.utils.set_workflow_status import (
get_wf_status_from_inspire_response, set_workflow_status_to_error)
get_wf_status_from_inspire_response,
set_workflow_status_to_error,
)

logger = logging.getLogger(__name__)
default_args = {
"start_date": datetime.datetime(2021, 1, 1),
"schedule_interval": None,
}


@dag(
default_args=default_args,
params={
"workflow_id": Param(type="string", default=""),
"data": Param(type="object", default={}),
"create_ticket": Param(type="boolean", default=False),
},
start_date=datetime.datetime(2024, 5, 5),
schedule_interval=None,
catchup=False,
on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date!
)
def author_create_approved_dag():
Expand Down Expand Up @@ -66,7 +65,7 @@ def author_check_approval_branch(**context: dict) -> None:

@task
def create_author_create_curation_ticket(**context: dict) -> None:
endpoint = "/tickets/create-with-template"
endpoint = "api/tickets/create"
request_data = {
"functional_category": "",
"workflow_id": context["params"]["workflow_id"],
Expand Down Expand Up @@ -98,9 +97,7 @@ def create_author_on_inspire(**context: dict) -> str:
workflow_data["data"]["control_number"] = control_number
workflow_management_hook.partial_update_workflow(
workflow_id=context["params"]["workflow_id"],
workflow_partial_update_data={
"data": json.dumps(workflow_data["data"])
},
workflow_partial_update_data={"data": workflow_data["data"]},
)
return status

Expand All @@ -119,7 +116,7 @@ def close_author_create_user_ticket(**context: dict) -> None:
ticket_id = workflow_ticket_management_hook.get_ticket(
workflow_id=context["params"]["workflow_id"], ticket_type=ticket_type
)["ticket_id"]
endpoint = "/tickets/resolve"
endpoint = "api/tickets/resolve"
request_data = {"ticket_id": ticket_id}
inspire_http_hook.call_api(endpoint=endpoint, data=request_data, method="POST")

Expand Down
14 changes: 5 additions & 9 deletions dags/author/author_create/author_create_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,21 @@

from airflow.decorators import dag, task
from airflow.models.param import Param
from hooks.backoffice import (WorkflowManagementHook,
WorkflowTicketManagementHook)
from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook
from hooks.inspirehep.inspire_http_hook import InspireHttpHook
from include.utils.set_workflow_status import set_workflow_status_to_error

logger = logging.getLogger(__name__)

default_args = {
"start_date": datetime.datetime(2021, 1, 1),
"schedule_interval": None,
}


@dag(
default_args=default_args,
params={
"workflow_id": Param(type="string", default=""),
"data": Param(type="object", default={}),
},
start_date=datetime.datetime(2024, 5, 5),
schedule_interval=None,
catchup=False,
on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date!
)
def author_create_initialization_dag():
Expand All @@ -46,7 +42,7 @@ def set_workflow_status_to_running(**context):

@task()
def create_author_create_user_ticket(**context: dict) -> None:
endpoint = "/tickets/create-with-template"
endpoint = "/api/tickets/create"
request_data = {
"functional_category": "Author curation",
"template": "user_new_author",
Expand Down
12 changes: 4 additions & 8 deletions dags/author/author_create/author_create_rejected.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,19 @@

from airflow.decorators import dag, task
from airflow.models.param import Param
from hooks.backoffice import (WorkflowManagementHook,
WorkflowTicketManagementHook)
from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook
from hooks.inspirehep.inspire_http_hook import InspireHttpHook
from include.utils.set_workflow_status import set_workflow_status_to_error

default_args = {
"start_date": datetime.datetime(2021, 1, 1),
"schedule_interval": None,
}


@dag(
default_args=default_args,
params={
"workflow_id": Param(type="string", default=""),
"data": Param(type="object", default={}),
},
start_date=datetime.datetime(2024, 5, 5),
schedule_interval=None,
catchup=False,
on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date!
)
def author_create_rejected_dag() -> None:
Expand Down
15 changes: 9 additions & 6 deletions dags/author/author_update/author_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@

from airflow.decorators import dag, task
from airflow.models.param import Param
from hooks.backoffice import (WorkflowManagementHook,
WorkflowTicketManagementHook)
from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook
from hooks.inspirehep.inspire_http_hook import InspireHttpHook
from hooks.inspirehep.inspire_http_record_management_hook import \
InspireHTTPRecordManagementHook
from hooks.inspirehep.inspire_http_record_management_hook import (
InspireHTTPRecordManagementHook,
)
from include.utils.set_workflow_status import (
get_wf_status_from_inspire_response, set_workflow_status_to_error)
get_wf_status_from_inspire_response,
set_workflow_status_to_error,
)


@dag(
start_date=datetime.datetime(2021, 1, 1),
start_date=datetime.datetime(2024, 5, 5),
schedule_interval=None,
params={
"workflow_id": Param(type="string", default=""),
"data": Param(type="object", default={}),
},
catchup=False,
on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date!
)
def author_update_dag():
Expand Down
5 changes: 3 additions & 2 deletions plugins/hooks/backoffice/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(
self.headers = headers or {
"Authorization": f'Token {Variable.get("backoffice_token")}',
"Accept": "application/json",
"Content-Type": "application/json",
}

@property
Expand All @@ -51,8 +52,8 @@ def run(
else:
url = self.base_url + endpoint

req = requests.Request(method, url, data=data, headers=headers, params=params)
req = requests.Request(method, url, json=data, headers=headers, params=params)

prepped_request = session.prepare_request(req)
self.log.info(f"Sending '%s' to url: %s", self.method, url)
self.log.info("Sending '%s' to url: %s", method, url)
return self.run_and_check(session, prepped_request, extra_options)
6 changes: 3 additions & 3 deletions plugins/hooks/backoffice/workflow_management_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ def set_workflow_status(self, status_name: str, workflow_id: str) -> Response:
)

def get_workflow(self, workflow_id: str) -> dict:
endpoint = f"/workflows/{workflow_id}"
endpoint = f"api/workflows/{workflow_id}"
response = self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs, method="GET", endpoint=endpoint
)
response = self.run(endpoint=endpoint, headers=self.headers)
return response.json()

def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response:
endpoint = f"/workflows/{workflow_id}/"
endpoint = f"api/workflows/{workflow_id}/"
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
method="PUT",
Expand All @@ -48,7 +48,7 @@ def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response:
def partial_update_workflow(
self, workflow_id: str, workflow_partial_update_data: dict
) -> Response:
endpoint = f"/workflow-update/{workflow_id}/"
endpoint = f"api/workflow-update/{workflow_id}/"
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
method="PATCH",
Expand Down
6 changes: 3 additions & 3 deletions plugins/hooks/backoffice/workflow_ticket_management_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ def __init__(
headers: dict = None,
) -> None:
super().__init__(method, http_conn_id, headers)
self.endpoint = "/workflow-ticket/"
self.endpoint = "api/workflow-ticket/"

def get_ticket(self, workflow_id: str, ticket_type: str) -> dict:
endpoint = f"/workflow-ticket/{workflow_id}/"
endpoint = f"api/workflow-ticket/{workflow_id}/"
params = {"ticket_type": ticket_type}
response = self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
Expand All @@ -35,7 +35,7 @@ def get_ticket(self, workflow_id: str, ticket_type: str) -> dict:
def create_ticket_entry(
self, workflow_id: str, ticket_id: str, ticket_type: str
) -> Response:
endpoint = f"/workflow-ticket/"
endpoint = "api/workflow-ticket/"
data = {
"ticket_type": ticket_type,
"ticket_id": ticket_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ def post_record(self, data: dict, pid_type: str) -> Response:
method="POST",
headers=self.headers,
json=data,
endpoint=f"/{pid_type}",
endpoint=f"api/{pid_type}",
)
8 changes: 8 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# flake8 don't support pyproject.toml
# https://github.com/PyCQA/flake8/issues/234
[flake8]
# E501 black takes care of the line size
max-line-length = 88
exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,venv,.venv
ignore = D401,W503,E501,E265,E203
max-complexity = 15

0 comments on commit 5e71e8f

Please sign in to comment.