Skip to content
This repository has been archived by the owner on Jul 26, 2024. It is now read-only.

Commit

Permalink
Revert "workflows: fixes to make author dags work"
Browse files Browse the repository at this point in the history
This reverts commit 1167c4d.
  • Loading branch information
DonHaul committed Jul 8, 2024
1 parent 4105112 commit e63b9dc
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 161 deletions.
99 changes: 0 additions & 99 deletions .github/workflow/test_and_build.yml

This file was deleted.

6 changes: 0 additions & 6 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,3 @@ repos:
hooks:
- id: isort
name: isort (python)
args: ["--profile", "black"]
- repo: https://github.com/pycqa/flake8
rev: "3.9.2"
hooks:
- id: flake8
args: ["--config=setup.cfg"]
7 changes: 0 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@ The setup is done with docker-compose. In order to run use
docker-compose up -d
```

in the UI go to Admin>Connections and set:
- backoffice_conn: http host.docker.internal:8000
- inspire_connection: https://inspirebeta.net
in the UI go to Admin>Variables and set:
- backoffice_token auth token from django for a superuser
- inspire_token: in `inspirehep-qa` container use the shell to generate a token

Also, for the dags there's a need to define an airflow variable `inspire_token` which is a token to inspirebeta.net.
It can be added via UI (go to Admin -> Variables).

Expand Down
29 changes: 16 additions & 13 deletions dags/author/author_create/author_create_approved.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
import datetime
import json
import logging

from airflow.decorators import dag, task
from airflow.models.param import Param
from airflow.utils.trigger_rule import TriggerRule
from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook
from hooks.backoffice import (WorkflowManagementHook,
WorkflowTicketManagementHook)
from hooks.inspirehep.inspire_http_hook import InspireHttpHook
from hooks.inspirehep.inspire_http_record_management_hook import (
InspireHTTPRecordManagementHook,
)
from hooks.inspirehep.inspire_http_record_management_hook import \
InspireHTTPRecordManagementHook
from include.utils.set_workflow_status import (
get_wf_status_from_inspire_response,
set_workflow_status_to_error,
)
get_wf_status_from_inspire_response, set_workflow_status_to_error)

logger = logging.getLogger(__name__)
default_args = {
"start_date": datetime.datetime(2021, 1, 1),
"schedule_interval": None,
}


@dag(
default_args=default_args,
params={
"workflow_id": Param(type="string", default=""),
"data": Param(type="object", default={}),
"create_ticket": Param(type="boolean", default=False),
},
start_date=datetime.datetime(2024, 5, 5),
schedule_interval=None,
catchup=False,
on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date!
)
def author_create_approved_dag():
Expand Down Expand Up @@ -65,7 +66,7 @@ def author_check_approval_branch(**context: dict) -> None:

@task
def create_author_create_curation_ticket(**context: dict) -> None:
endpoint = "api/tickets/create"
endpoint = "/tickets/create-with-template"
request_data = {
"functional_category": "",
"workflow_id": context["params"]["workflow_id"],
Expand Down Expand Up @@ -97,7 +98,9 @@ def create_author_on_inspire(**context: dict) -> str:
workflow_data["data"]["control_number"] = control_number
workflow_management_hook.partial_update_workflow(
workflow_id=context["params"]["workflow_id"],
workflow_partial_update_data={"data": workflow_data["data"]},
workflow_partial_update_data={
"data": json.dumps(workflow_data["data"])
},
)
return status

Expand All @@ -116,7 +119,7 @@ def close_author_create_user_ticket(**context: dict) -> None:
ticket_id = workflow_ticket_management_hook.get_ticket(
workflow_id=context["params"]["workflow_id"], ticket_type=ticket_type
)["ticket_id"]
endpoint = "api/tickets/resolve"
endpoint = "/tickets/resolve"
request_data = {"ticket_id": ticket_id}
inspire_http_hook.call_api(endpoint=endpoint, data=request_data, method="POST")

Expand Down
14 changes: 9 additions & 5 deletions dags/author/author_create/author_create_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,25 @@

from airflow.decorators import dag, task
from airflow.models.param import Param
from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook
from hooks.backoffice import (WorkflowManagementHook,
WorkflowTicketManagementHook)
from hooks.inspirehep.inspire_http_hook import InspireHttpHook
from include.utils.set_workflow_status import set_workflow_status_to_error

logger = logging.getLogger(__name__)

default_args = {
"start_date": datetime.datetime(2021, 1, 1),
"schedule_interval": None,
}


@dag(
default_args=default_args,
params={
"workflow_id": Param(type="string", default=""),
"data": Param(type="object", default={}),
},
start_date=datetime.datetime(2024, 5, 5),
schedule_interval=None,
catchup=False,
on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date!
)
def author_create_initialization_dag():
Expand All @@ -42,7 +46,7 @@ def set_workflow_status_to_running(**context):

@task()
def create_author_create_user_ticket(**context: dict) -> None:
endpoint = "/api/tickets/create"
endpoint = "/tickets/create-with-template"
request_data = {
"functional_category": "Author curation",
"template": "user_new_author",
Expand Down
12 changes: 8 additions & 4 deletions dags/author/author_create/author_create_rejected.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@

from airflow.decorators import dag, task
from airflow.models.param import Param
from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook
from hooks.backoffice import (WorkflowManagementHook,
WorkflowTicketManagementHook)
from hooks.inspirehep.inspire_http_hook import InspireHttpHook
from include.utils.set_workflow_status import set_workflow_status_to_error

default_args = {
"start_date": datetime.datetime(2021, 1, 1),
"schedule_interval": None,
}


@dag(
default_args=default_args,
params={
"workflow_id": Param(type="string", default=""),
"data": Param(type="object", default={}),
},
start_date=datetime.datetime(2024, 5, 5),
schedule_interval=None,
catchup=False,
on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date!
)
def author_create_rejected_dag() -> None:
Expand Down
15 changes: 6 additions & 9 deletions dags/author/author_update/author_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,22 @@

from airflow.decorators import dag, task
from airflow.models.param import Param
from hooks.backoffice import WorkflowManagementHook, WorkflowTicketManagementHook
from hooks.backoffice import (WorkflowManagementHook,
WorkflowTicketManagementHook)
from hooks.inspirehep.inspire_http_hook import InspireHttpHook
from hooks.inspirehep.inspire_http_record_management_hook import (
InspireHTTPRecordManagementHook,
)
from hooks.inspirehep.inspire_http_record_management_hook import \
InspireHTTPRecordManagementHook
from include.utils.set_workflow_status import (
get_wf_status_from_inspire_response,
set_workflow_status_to_error,
)
get_wf_status_from_inspire_response, set_workflow_status_to_error)


@dag(
start_date=datetime.datetime(2024, 5, 5),
start_date=datetime.datetime(2021, 1, 1),
schedule_interval=None,
params={
"workflow_id": Param(type="string", default=""),
"data": Param(type="object", default={}),
},
catchup=False,
on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date!
)
def author_update_dag():
Expand Down
5 changes: 2 additions & 3 deletions plugins/hooks/backoffice/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ def __init__(
self.headers = headers or {
"Authorization": f'Token {Variable.get("backoffice_token")}',
"Accept": "application/json",
"Content-Type": "application/json",
}

@property
Expand All @@ -52,8 +51,8 @@ def run(
else:
url = self.base_url + endpoint

req = requests.Request(method, url, json=data, headers=headers, params=params)
req = requests.Request(method, url, data=data, headers=headers, params=params)

prepped_request = session.prepare_request(req)
self.log.info("Sending '%s' to url: %s", method, url)
self.log.info(f"Sending '%s' to url: %s", self.method, url)
return self.run_and_check(session, prepped_request, extra_options)
6 changes: 3 additions & 3 deletions plugins/hooks/backoffice/workflow_management_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ def set_workflow_status(self, status_name: str, workflow_id: str) -> Response:
)

def get_workflow(self, workflow_id: str) -> dict:
endpoint = f"api/workflows/{workflow_id}"
endpoint = f"/workflows/{workflow_id}"
response = self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs, method="GET", endpoint=endpoint
)
response = self.run(endpoint=endpoint, headers=self.headers)
return response.json()

def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response:
endpoint = f"api/workflows/{workflow_id}/"
endpoint = f"/workflows/{workflow_id}/"
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
method="PUT",
Expand All @@ -48,7 +48,7 @@ def update_workflow(self, workflow_id: str, workflow_data: dict) -> Response:
def partial_update_workflow(
self, workflow_id: str, workflow_partial_update_data: dict
) -> Response:
endpoint = f"api/workflow-update/{workflow_id}/"
endpoint = f"/workflow-update/{workflow_id}/"
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
method="PATCH",
Expand Down
6 changes: 3 additions & 3 deletions plugins/hooks/backoffice/workflow_ticket_management_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ def __init__(
headers: dict = None,
) -> None:
super().__init__(method, http_conn_id, headers)
self.endpoint = "api/workflow-ticket/"
self.endpoint = "/workflow-ticket/"

def get_ticket(self, workflow_id: str, ticket_type: str) -> dict:
endpoint = f"api/workflow-ticket/{workflow_id}/"
endpoint = f"/workflow-ticket/{workflow_id}/"
params = {"ticket_type": ticket_type}
response = self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
Expand All @@ -35,7 +35,7 @@ def get_ticket(self, workflow_id: str, ticket_type: str) -> dict:
def create_ticket_entry(
self, workflow_id: str, ticket_id: str, ticket_type: str
) -> Response:
endpoint = "api/workflow-ticket/"
endpoint = f"/workflow-ticket/"
data = {
"ticket_type": ticket_type,
"ticket_id": ticket_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ def post_record(self, data: dict, pid_type: str) -> Response:
method="POST",
headers=self.headers,
json=data,
endpoint=f"api/{pid_type}",
endpoint=f"/{pid_type}",
)
8 changes: 0 additions & 8 deletions setup.cfg

This file was deleted.

0 comments on commit e63b9dc

Please sign in to comment.