-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Swap Dag Parsing to use the TaskSDK machinery. #44972
Conversation
The tests aren't 100% finished yet. And this change is larger than I would have liked, but at least it's a net-negative change |
32176b9
to
1215213
Compare
1215213
to
9f04c14
Compare
I don't expect tests to pass yet, but I want to give people the chance to see this PR, and I know @jedcunningham is waiting on this for some of his DAG versioning work. |
I think that is unavoidable, as user code will come from Task SDK
Right, I think processor will have to depend on both Task SDK (user-facing code) + Base Executor dist -- after that separation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review, will be coming back to it in an hour
|
||
|
||
@attrs.define | ||
class DagFileProcessorManager: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class DagFileProcessorManager: | |
class FileParsingSupervisor: |
and let's rename the file appropriately.
|
||
class DagFileProcessor(LoggingMixin): | ||
@attrs.define() | ||
class DagFileProcessorProcess(WatchedSubprocess): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class DagFileProcessorProcess(WatchedSubprocess): | |
class FileParserProcess(WatchedSubprocess): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to do these rename as a separate PR right after this one -- the diff is big enough as it is.
93b8d53
to
553049e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mainly reviewed the processor and manager files. The other changes seem mostly reactive. Overall, I like the improvements, especially the reuse of the execution time machinery here. Few initial comments, nothing serious but mostly nits.
|
||
class DagFileProcessor(LoggingMixin): | ||
@attrs.define() | ||
class DagFileProcessorProcess(WatchedSubprocess): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah sounds good
e61c595
to
f6487c9
Compare
Right, I think this should now pass the tests, the only thing I'm not sure about this is the xfail I've put for the "simple ti roundtrip exec config tests" -- Either we should remove it or make it work, but I'm not sure if we need to pass down executor config via TI anymore @kaxil Any ideas the best plan for that one? |
(I still need to rename a class and file, but that is a non-meaningful/non-review-impacting change. |
Since we are planning to handle callbacks via Executor/worker interface too -- don't think we need to pass it explicitly from TI, instead just handle it on the server side before sending TI/request to the worker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments and the renaming of file & classes can happen in a separate PR too
I'm going to run this with full tests, I want the kube tests to see if there is something broken not covered by unit tests |
As part of Airflow 3 DAG definition files will have to use the Task SDK for all their classes, and anything involving running user code will need to be de-coupled from the database in the user-code process. This change moves all of the "serialization" change up to the DagFileProcessorManager, using the new function introduced in apache#44898 and the "subprocess" machinery introduced in apache#44874. **Important Note**: this change does not remove the ability for dag processes to access the DB for Variables etc. That will come in a future change. Some key parts of this change: - It builds upon the WatchedSubprocess from the TaskSDK. Right now this puts a nasty/unwanted depenednecy between the Dag Parsing code upon the TaskSDK. This will be addressed before release (we have talked about introducing a new "apache-airflow-base-executor" dist where this subprocess+supervisor could live, as the "execution_time" folder in the Task SDK is more a feature of the executor, not of the TaskSDK itself.) - A number of classes that we need to send between processes have been converted to Pydantic for ease of serialization. - In order to not have to serialize everything in the subprocess and deserialize everything in the parent Manager process, we have created a `LazyDeserializedDAG` class that provides lazy access to much of the properties needed to create update the DAG related DB objects, without needing to fully deserialize the entire DAG structure. - Classes switched to attrs based for less boilerplate in constructors. - Internal timers convert to `time.monotonic` where possible, and `time.time` where not, we only need second diff between two points, not datetime objects. - With the earlier removal of "sync mode" for SQLite in apache#44839 the need for separate TERMIANTE and END messages over the control socket can go. Co-authored-by: Jed Cunningham <[email protected]> Co-authored-by: Daniel Imberman <[email protected]>
f6487c9
to
b22af18
Compare
assert "a.py" in resp.import_errors | ||
|
||
|
||
# @conf_vars({("logging", "dag_processor_log_target"): "stdout"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving these comments out for now as I want to overhaul the logging in #45072
As part of Airflow 3 DAG definition files will have to use the Task SDK for
all their classes, and anything involving running user code will need to be
de-coupled from the database in the user-code process.
This change moves all of the "serialization" change up to the
DagFileProcessorManager, using the new function introduced in #44898 and the
"subprocess" machinery introduced in #44874.
Important Note: this change does not remove the ability for dag processes
to access the DB for Variables etc. That will come in a future change.
Some key parts of this change:
nasty/unwanted depenednecy between the Dag Parsing code upon the TaskSDK.
This will be addressed before release (we have talked about introducing a
new "apache-airflow-base-executor" dist where this subprocess+supervisor
could live, as the "execution_time" folder in the Task SDK is more a feature
of the executor, not of the TaskSDK itself)
converted to Pydantic for ease of serialization.
in the parent Manager process, we have created a
LazyDeserializedDAG
classthat provides lazy access to much of the properties needed to create update
the DAG related DB objects, without needing to fully deserialize the entire
DAG structure.
time.monotonic
where possible, andtime.time
where not, we only need second diff between two points, not datetime objects
separate TERMIANTE and END messages over the control socket can go
Co-authored-by: Jed Cunningham [email protected]
Co-authored-by: Daniel Imberman [email protected]
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.