-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Edge] Edge worker supports capacity handling instead of concurrency #43737
base: main
Are you sure you want to change the base?
[Edge] Edge worker supports capacity handling instead of concurrency #43737
Conversation
Should not that capacity be a task parameter rather than executor config parameter on DAG level. We have similar concept with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small comments - but also alongside Jareks comments had the same thought. Seems a small discussion is needed on this.
I was thinking in the same way, but during coding I saw that the need_capacity parameter is no easy to get into the executor. We have to tough core code like TaskInstanceKey class to get the info into the Executor. I just used the idea of the KubernetesExecutor to add additional data into the executor and that is the reason why I started using the executor_config parameter. My main idea is to tough only Edge package t and then make a later PR which can add this changes into the core because the Edge package is the only which will support this feature for the moment and it is not released yet. |
6fbca24
to
aee1de7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another round of review.
@potiuk I had also a longer talk to @AutomationDev85 today about this. Reading the docs from https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/kubernetes_executor.html#pod-override this field is used (and is only used there today) to carry a dict with a potential included of With this interface generically more parameters can be carried. An additional field in the dict would not harm. I am thinking that maybe instead of "needs capacity" we should name it As @AutomationDev85 said we could also bring the @potiuk Do you think we need/should to make a breaking change in the scheduler/executor interface or add an intrinsic? |
c3f8a6c
to
2819a00
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still not convinced. I would rather see a single way to specify "weight of the task" and then pass it to executor so that executor understands it and uses it in the way that iit wants.
Adding "executor config" as dag parameter feels wrong. It assumes that the dag and tasks will be executed via edge executor. What happens if that task will be executed via another executor ? it means that we should change DAG definition for that including "executor config".
While I understand it is done this way for K8S executor configuration, it's passing there k8s specific parameters, but in this case what we are passing is "weight" of the task which is much more "abstract thing".
Say we have two tasks - one with weight 1 and one with weight 2 - eventually we would like all (or at least most) of our executors to understand this and map it to the executor's workers.
I want to hear other's opinion on that @ashb @o-nikolas ? and mark it as request changes for now until we agree if this is fine or not to keep it as "executor-specific" config.
But my intuition tells me that we should use an "abstract" weight of the task here and it should be mapped to both "pool slots" and to "task weight" used by the executor. I do not particularly like "concurrency slots" as the name - it's quite misleading, what we really talk about is how heavy
the task is.
The benefit of that approach is that it might be used in the future for any executors. Also they might choose whether they want to use "concurrency" for that or other ways. I can for example imagine an executor that runs a task on "small" or "medium" or "big" node and they could map the "weight" to choose "small" when it is 1 . or "medium" when it is "2" or "3" and "big" when it is "4+" or smth similar.
And comment on that - again, I am not really trying to block/veto it, but I don't think the breaking or not breaking here matters when we go to Airlfow 3. We will already have breaking changes for Executors i believe, and we will have to handle them. For me it is much more important to try to deliberate and discuss and come up with good future executor interface than to merge a PR for edge executor only. This can wait. There is absolutely no hurry with it (unless i am mistaken) - we can even keep on rebasing it until we come to conclusion on what's best/) I think it is more important to think forward and whether we want to address an improvement need of one executor (which is not even production ready yet) or whether we want to think of future common "executor" scenarios. I think if we will not consider the future now, we might be building technical debt before even we release Airflow 3. For example when I think about YuniKorn executor in the future (cc: @amoghrajesh ) I think we would like some way of passing metadata to uvicorn executor from tasks - to allow various ways of scheduling those. I think we need to discuss how "executor specific" vs."airflow abstract" the meta-data should be. I.e. do we have some properties of the task that can be mapped from "airflow abstract" terms to specific executor matadata - this can help in the future to freely move tasks betweeen executors without specifically rewriting their metadata. And I do not think we will be able to do it for all parameters, but there are certain properties that could be "abstract":
All those seems to be "abstract" enough to be shared between different executors - unlike kubernetes executor config that is really a pod-template override, those could cover vast majority of cases where we want to attach some common properties with tasks that might have similar meaning even if they are run by different executors. Maybe we want this, maybe not - maybe it's a wrong abstraction. But at least it's worth to discuss it rather than merge the PR without discussing it. |
Fair. And it is a bit a pity that the "TaskInstance" object is not passed into the Executor interface, the calling method has all the fields available. So if the TaskInstance would be included in For sure, don't mis-understand. We don't want to build an EdgeWorker "balcony" here. Would be great if other executors would have this as well. Was looking once about if this could be made for Celery as well but could not find an entry point w/o patching deep in celery :-D If you dislike the parameter - I assume this can be changed... main point is how more task context can be made available. For example "some" fields like priority, pool slots, labels, max_tries, pool, queue could be genericaly be added into the dict of |
Here are my thoughts: 1) Can we use executor_config in this way?The 2) Should we use
|
…ncy-slots-handling
Should we turn it into a devlist discussion? It seems that is a decision that shoudd be made about now - i.e. what we really want to do with executor and "task properties". I think we have two options:
And indeed, we are at the right time to introduce breaking chnges (or variations) in the executor interface to acommodate to 2) if we choose to go this direction. |
Yeah, would support a devlist discussion... @AutomationDev85 will you take this? @potiuk would it be only a discussion, does this then need a formal vote or lazy consensus? I believe we did not vote on breaking changes in the past... discussion would be mainly to attract opinions and feedback on PR? I'd propose to go into the option (2) and (while on my way back from work) I had a good idea such that the change would be softly breaking: Besides the today's (interface limited) Then we could have starting from 2.10.4 slowly start migrating existing executors to the new interface, also support the old signature still not to break Airflow compatibility.... and drop the old method in 3.0. (We still do not force and could keep the old signature, does not harm). One option as (2) could be to have a specific well-defined interface.... but actually much easier, we could directly pass the |
Description
Edge worker supported at the current state only concurrency. Every job reduced the available concurrency by one. If worker had concurrency of 8 then 8 jobs can run parallel. Concurrency is now switched to capacity.
The idea behind is that not all jobs on a worker which runs parallel are consuming the same amount of resources. Now it is possible to set need_capacity value for a job which need more resources. By default the need_capacity is 1 for each job. If worker has capacity of 2 and executes one job with 2 then no new job is executed by the worker. But if a job with need_capacity of 1 is executed a second job with need_capacity of 1 can be executed in parallel.
For detailed information check the edge_executor.rst changes in this PR.
Details about changes