Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete the job due to an error triggered by chunk.decode("utf8"). #97

Open
kuixiang opened this issue Jul 19, 2024 · 1 comment
Open
Labels
bug Something isn't working

Comments

@kuixiang
Copy link

Phenomenon

When a user submits a job through Airflow, it runs for a while and then encounters the following error:

[2024-07-19, 12:46:11 CST] {taskinstance.py:1112} DEBUG - <TaskInstance: asset_map.job_identifier manual__2024-06-01T16:12:23+08:00 [running]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2024-07-19, 12:46:11 CST] {client.py:170} DEBUG - [DeleteNamespaceResource][264c][/apis/batch/v1/namespaces/user-jobs/jobs/job_identifier] State: Streaming
[2024-07-19, 12:46:11 CST] {operations.py:67} INFO - [user-jobs/jobs/job_identifier] deleted
[2024-07-19, 12:46:11 CST] {client.py:170} DEBUG - [DeleteNamespaceResource][264c][/apis/batch/v1/namespaces/user-jobs/jobs/job_identifier] State: Disconnected
[2024-07-19, 12:46:11 CST] {job_runner.py:286} INFO - {job-runner}: Job deleted
[2024-07-19, 12:46:11 CST] {job_runner.py:286} INFO - {job-runner}: Client stopped, execution aborted.
[2024-07-19, 12:46:11 CST] {taskinstance.py:1824} ERROR - Task failed with exception
airflow_kubernetes_job_operator.kube_api.exceptions.KubeApiException: Error while executing query
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/path/to/airflow_kubernetes_job_operator/kubernetes_job_operator.py", line 463, in execute
    rslt = self.job_runner.execute_job(
  File "/path/to/airflow_kubernetes_job_operator/job_runner.py", line 451, in execute_job
    raise ex
  File "/path/to/zthreading/tasks.py", line 176, in _run_as_thread
    rslt = self.action(*args, **kwargs)
  File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 261, in _execute_query
    raise ex
  File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 257, in _execute_query
    self.query_loop(client)
  File "/path/to/airflow_kubernetes_job_operator/kube_api/queries.py", line 466, in query_loop
    raise ex
  File "/path/to/airflow_kubernetes_job_operator/kube_api/queries.py", line 459, in query_loop
    return super().query_loop(client)
  File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 446, in query_loop
    raise ex from KubeApiException("Error while executing query")
  File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 386, in query_loop
    for line in self._read_response_stream_lines(response):
  File "/path/to/airflow_kubernetes_job_operator/kube_api/client.py", line 212, in _read_response_stream_lines
    chunk = chunk.decode("utf8")
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe6 in position 16375: unexpected end of data

The job gets deleted, and it's impossible to check the failure details later using commands like kubectl describe job or pod.

Cause of the Issue

The original code for retrieving the job context was:

Extracted in chunks and then reassembled.
If the job context description contains Chinese characters, such as “开始” (start), they are encoded as \xe5\xbc\x80\xe5\xa7\x8b but might be truncated to \xe5\xbc\x80\xe5 and \xa7\x8b. The \xe5 part is the beginning of a Chinese character.
Decoding \xe5\xbc\x80\xe5 as UTF-8 results in an error.
Airflow handles the triggered exception by abruptly deleting the job.

    @classmethod
    def _read_response_stream_lines(cls, response: HTTPResponse):
        """INTERNAL. Helper yield method. Parses the streaming http response
        to lines (can be async!)

        Yields:
            str: The line
        """
        prev = ""
        for chunk in response.stream(decode_content=False):
            if isinstance(chunk, bytes):
                chunk = chunk.decode("utf8")
            chunk = prev + chunk
            lines = chunk.split("\n")
            if not chunk endswith("\n"):
                prev = lines[-1]
                lines = lines[:-1]
            else:
                prev = ""
            for line in lines:
                if line:
                    yield line

Solutions

  • Solution 1: Avoid using Chinese characters in the job configuration context for jobs submitted through Airflow.
  • Solution 2: Modify Airflow's handling of retrieving job context information.
# The idea is to catch the UnicodeDecodeError exception, check if it occurred at the end of the chunk, 
# indicating a truncation issue, store the truncated part, and parse only the untruncated part.
# Then, merge the truncated part with the next chunk for parsing.

@classmethod
def _read_response_stream_lines(cls, response: HTTPResponse):
    """INTERNAL. Helper yield method. Parses the streaming http response
    to lines (can be async!)

    Yields:
        str: The line
    """
    prev = ""
    prev_binary_chunk = b""
    for chunk in response.stream(decode_content=False):
        if isinstance(chunk, bytes):
            chunk = prev_binary_chunk + chunk
            prev_binary_chunk = b""
            try:
                chunk = chunk.decode("utf8")
            except UnicodeDecodeError as e:
                if e.end != len(chunk):
                    raise
                prev_binary_chunk = chunk[e.start:]
                chunk = chunk[0:e.start].decode("utf8")
        chunk = prev + chunk
        lines = chunk.split("\n")
        if not chunk endswith("\n"):
            prev = lines[-1]
            lines = lines[:-1]
        else:
            prev = ""
        for line in lines:
            if line:
                yield line


@LamaAni LamaAni added the bug Something isn't working label Jul 19, 2024
@LamaAni
Copy link
Owner

LamaAni commented Jul 19, 2024

Hi, Nice catch!!

I'd defenitly accept a pr!

Looks like the solution you have works.! If I understand, the error happens due to missing bytes in the decode. e.g. (recived, 3 bytes out of 4 for a char).

One small thing though, please raise a proper error (with text, error type should be the KubernetesJobOperator error or some ParseError, see example in errors).

Your code, with the corrections,

        prev = ""
        prev_binary_chunk = b""
        for chunk in response.stream(decode_content=False):
            if isinstance(chunk, bytes):
                chunk = prev_binary_chunk + chunk
                prev_binary_chunk = b""
                try:
                    chunk = chunk.decode("utf8")
                except UnicodeDecodeError as e:
                    # This may happen for the case where
                    # we have split string chars that have more than one byte per char,
                    # (say, char has 4 bytes, but we received 3)

                    # This check needs eplaination as well (thnx!)
                    if e.end != len(chunk):
                        raise KubeApiException(
                            "Error when parsing api response stream"
                        ) from e
                    prev_binary_chunk = chunk[e.start :]
                    chunk = chunk[0 : e.start].decode("utf8")

            chunk = prev + chunk
            lines = chunk.split("\n")

            if not chunk.endswith("\n"):
                prev = lines[-1]
                lines = lines[:-1]
            else:
                prev = ""
            for line in lines:
                if line:
                    yield line

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants