Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-72: Gracefully handle "not-found" XCOMs in task sdk API client #45341

Closed
1 task done
amoghrajesh opened this issue Jan 2, 2025 · 0 comments · Fixed by #45344
Closed
1 task done

AIP-72: Gracefully handle "not-found" XCOMs in task sdk API client #45341

amoghrajesh opened this issue Jan 2, 2025 · 0 comments · Fixed by #45344
Assignees
Labels
area:API Airflow's REST/HTTP API area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK kind:bug This is a clearly a bug

Comments

@amoghrajesh
Copy link
Contributor

Body

Example DAG:

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator

def push_to_xcom(**kwargs):
    value = "Hello, XCom!"
    return value


def pull_from_xcom(**kwargs):
    ti = kwargs['ti']
    xcom_value = ti.xcom_pull(task_ids='invalid_id')
    print(f"Retrieved XCom Value: {xcom_value}")


with DAG(
    'xcom_example',
    schedule=None,
    catchup=False,
) as dag:

    push_xcom_task = PythonOperator(
        task_id='push_xcom_task',
        python_callable=push_to_xcom,
    )

    pull_xcom_task = PythonOperator(
        task_id='pull_xcom_task',
        python_callable=pull_from_xcom,
    )

    push_xcom_task >> pull_xcom_task

Here the invalid_id task id doesn't exist. So the XCOM pull should fail gracefully.
Instead, the executor just crashed:

[2024-12-31T06:53:15.741+0000] {_client.py:1026} INFO - HTTP Request: GET http://localhost:9091/execution/xcoms/xcom_example/manual__2024-12-31T06:53:14.523233+00:00/invalid_id/return_value?map_index=-1 "HTTP/1.1 404 Not Found"
2024-12-31 06:53:15 [warning  ] Server error                   [airflow.sdk.api.client] detail={'detail': {'reason': 'not_found', 'message': "XCom with key 'return_value' not found for task 'invalid_id' in DAG 'xcom_example'"}}
[2024-12-31T06:53:15.742+0000] {local_executor.py:96} ERROR - uhoh
Traceback (most recent call last):
  File "/opt/airflow/airflow/executors/local_executor.py", line 92, in _run_worker
    _execute_work(log, workload)
  File "/opt/airflow/airflow/executors/local_executor.py", line 113, in _execute_work
    supervise(
  File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", line 898, in supervise
    exit_code = process.wait()
  File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", line 512, in wait
    self._monitor_subprocess()
  File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", line 554, in _monitor_subprocess
    alive = self._service_subprocess(max_wait_time=max_wait_time) is None
  File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", line 598, in _service_subprocess
    need_more = socket_handler(key.fileobj)
  File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", line 785, in cb
    gen.send(line)
  File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", line 708, in handle_requests
    self._handle_request(msg, log)
  File "/opt/airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py", line 728, in _handle_request
    xcom = self.client.xcoms.get(msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.map_index)
  File "/opt/airflow/task_sdk/src/airflow/sdk/api/client.py", line 222, in get
    resp = self.client.get(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}", params=params)
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1054, in get
    return self.request(
  File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 336, in wrapped_f
    return copy(f, *args, **kw)
  File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 475, in __call__
    do = self.iter(retry_state=retry_state)
  File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 376, in iter
    result = action(retry_state)
  File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 398, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/tenacity/__init__.py", line 478, in __call__
    result = fn(*args, **kwargs)
  File "/opt/airflow/task_sdk/src/airflow/sdk/api/client.py", line 317, in request
    return super().request(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 827, in request
    return self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 914, in send
    response = self._send_handling_auth(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 942, in _send_handling_auth
    response = self._send_handling_redirects(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 999, in _send_handling_redirects
    raise exc
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 982, in _send_handling_redirects
    hook(response)
  File "/opt/airflow/task_sdk/src/airflow/sdk/api/client.py", line 93, in raise_on_4xx_5xx
    return get_json_error(response) or response.raise_for_status()
  File "/opt/airflow/task_sdk/src/airflow/sdk/api/client.py", line 89, in get_json_error
    raise err
airflow.sdk.api.client.ServerResponseError: Server returned error

Legacy Airflow just ignores such cases and moves on with a return.
image (16)

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.
@amoghrajesh amoghrajesh added the area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK label Jan 2, 2025
@amoghrajesh amoghrajesh self-assigned this Jan 2, 2025
@dosubot dosubot bot added area:API Airflow's REST/HTTP API kind:bug This is a clearly a bug labels Jan 2, 2025
@amoghrajesh amoghrajesh changed the title Gracefully handle "not-found" XCOMs in task sdk API client AIP-72: Gracefully handle "not-found" XCOMs in task sdk API client Jan 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant