Replies: 3 comments 5 replies
-
I think such a discussion should happen in the devlist not in GitHub issue. I believe very soon we are going to have some proposal (or few proposals) on the devlist that will be follow-up to the original AIP-48 https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling This is far more than a feature request and touches a lot more than a simple "feature" discussion as it changes the architecture of core airflow feature. I will convert it into a discussion and maybe some people will continue chiming in here, but at the very least you should join thet devlist @mrn-aglic and raise your proposal there - maybe pointing to this discussion, digesting the "gist" of it, or maybe continuing it in the devlist in full - up to you. But generally speaking - as in all ASF projects "If it did not happen at the devlist - it did not happen" - so any discussions and agreements here will have to be brought back to the devlist, likely Airflow Improvement Proposal written and it should be formally voted. You can read more at the https://airflow.apache.org/community/ of ours and https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals for the process of discussing and proposing new AIP. |
Beta Was this translation helpful? Give feedback.
-
Doc from devlist: https://docs.google.com/document/d/1Apgrlylk410sStf9AS-Yc2RaIOGu8noBl8pHyBfmE08/edit#heading=h.wix0jhslclfw Quick summary of the problem based on the doc: Data Interval Values that we set for for downstream/consumer dags are non-deterministic. If there is more than one event in the queue, the data interval is calculated by looking at all the source dag runs, and setting the data interval to be the min |
Beta Was this translation helpful? Give feedback.
-
Hello, Thanks |
Beta Was this translation helpful? Give feedback.
-
Description
Airflow should support defining different types of dataset consumer DAGs for dataset events. For example, if a DAG producer has created multiple dataset events, the DAG consumer should choose whether to digest these dataset events one by one or as a whole (for now).
The current implementation is non-deterministic. The job scheduler will pick up the dataset events which may be stored in the database by one or more DAG runs and will create a single DAG run. Sometimes the data intervals may correspond between the producer and consumer, sometimes they may not.
It would be nice if different types of consumers were also supported. For example, in our use case, we would need the consumer to replicate the data intervals of the producer DAG. I have taken a look at the source code and can propose a simple solution for this (on that later).
However, this also poses some issues/questions.
Let's break it down to pros and cons.
Pros:
replicated
across the entire pipelineCons:
produce
the same dataset? These DAGs may have different data intervals, so replicating one doesn't make (exactly) sense. Different types of consuming modes could be proposed for these cases.dataset_dag_run_queue
model in the database may not reflect what exactly will happen - the actual dag runs that will be queued. I'm not sure.Here are some of my findings on what it would mean to add support for a consumer to replicate the data interval of the producer (again, this makes sense only if a single producer is updating the dataset):
default
or name itsimple
)__init__
method to theDatasetTimetable
class to pass the mode for consumerand change the scheduler job logic to iterate over a list instead of a single value:
In the future, one could separate different types of dataset timetables.
Now, I have never submitted anything to Airflow, so I don't know the source code, and there may be issues with this approach that I am not aware off.
Thoughts?
Use case/motivation
While working with Data-aware scheduling in Airflow, there is an "issue" where multiple dataset events trigger a single DAG run of the consumer DAG. This presents an issue when using
data_interal_start
anddata_interval_end
variables in DAG execution as well as jinja templated queries.The problem presents itself when either the producer DAG is running during catchup or backfill.
It seems like the culprit is that multiple dataset events are used to create a single
DataInterval
, where upon creation the min and max across all of the dataset events are taken (seeDatasetTriggeredTimetable
implementation).It would be nice if different types of consumers were supported. For example, in our use case, we would need the consumer to replicate the data intervals of the producer DAG. I have taken a look at the source code and can propose a simple solution for this (on that later).
This would allow breaking up a complex pipeline into multiple DAGs and use backfill or catchup with more certainty in the end result.
Related issues
No response
Are you willing to submit a PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions