"already registered for DAG" when dynamically generate tasks #20693
Replies: 8 comments 9 replies
-
Thanks for opening your first issue here! Be sure to follow the issue template! |
Beta Was this translation helpful? Give feedback.
-
The error is correct - decorated callable is a single "task" and you try to addi to dag multiple times. If you are adding it in a loop you are adding the same task (named by the function name) muiltiple times. You cannot make multiple tasks with single decorator this way. There is an AIP-42 in progess https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-42+Dynamic+Task+Mapping tha add capability of even more flexibility - you will be able to fan-out <> fan-in variable number of similar tasks but currently I do not think there is an easy way to use task_flow for that. I think classic operators are better for that purpose for now - also if you have essentially "fixed" set of tasks.. |
Beta Was this translation helpful? Give feedback.
-
Just wanted to chime in here that I was investigating something similar and was trying to cut things down significantly to further investigate on 2.2.3. Using an example right from the docs I am able to get these warnings to show up without even generating a task group/dynamically generating anything:
Running a simple test:
If I don't reference something upstream (eg pass I'll note that this doesn't effect the dags from functioning. |
Beta Was this translation helpful? Give feedback.
-
It's likely the example might be somewhat misleading @turbaszek @dstandish @josh-fell ? The examples here Looks a bit out-of-context and I belive the way how they are added by @adh-wonolo is wrong - maybe worth to add a more complete /full dag examples to show how it could look like |
Beta Was this translation helpful? Give feedback.
-
I think I found the root cause of this WARNING message. This WARNING message is only printed when XcomArg is consumed as an argument by a Traditional Operators (e.g. Python Operator) . For example: @task
def get_a_number():
return 123
PythonOperator(
task_id="print_a_number",
python_callable=lambda x: print(x),
op_args=(get_a_number(),)
) The WARNING message is printed out when mini scheduler getting a partial deep copy of the dag. Tasks get deep copied one by one. When deep copy a TaskInstance, the downstream relationship was re-built. Because the relationship existed, the WARNING message was printed out. Why the TaskFlow API Task (a.k.a DecoratedOperator) will not print the WARNING? @task
def get_a_number():
return 123
@task
def print_a_number(x):
print(x)
print_a_number(get_a_number()) This is because TFAT will generate a new unique task_id if the duplicated task_id detected. This could avoid the WARNING, as there is not relationship to the new task_id. How to fix this issueThis is just a WARNING message, we could ignore if it does not cause any trouble. But it is reported the WARNING message could be too massive to be ignored in a relatively complicated DAG in AWS MWAA. A large number of this WARNING could bring significant workload on Workers and CloudWatch. Therefore, it is quite necessary to fix this issue. I dont know what is the best way to fix it permanently in Airflow. Just list some of my ideas:
Ways to avoid in authoring DAG
|
Beta Was this translation helpful? Give feedback.
-
Has this issue been resolved? I am experiencing it when using taskgroups in Airflow 2.2.2 |
Beta Was this translation helpful? Give feedback.
-
I have the same problem on airflow 2.5.0. the log was never showed on 2.3.2 in my case, the DAGs are generated dynamically, not using dynamic task mapping for tasks FYI, PythonParallelOperator is custom operator by me which inherits PythonOperator.
|
Beta Was this translation helpful? Give feedback.
-
Also experiencing this issue when dynamically generating GlueCrawlerOperators & other custom operators after bumping from version 2.3.0 to 2.4.1. |
Beta Was this translation helpful? Give feedback.
-
Apache Airflow version
2.1.4
What happened
Recently I started to use TaskFlow API in some of my dag files where the tasks are being dynamically generated and started to notice (a lot of) warning messages in the logs. The messages look like this:
And these messages keep appearing even when the DAG is not running and also when it's paused. So I guess this happens at when parsing the DAG.
What you expected to happen
I'm not sure if I'm doing something wrong, but this messages should not appear.
How to reproduce
Here is an DAG example to reproduce this issue:
Operating System
Cloud Composer
Versions of Apache Airflow Providers
No response
Deployment
Composer
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions