Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Edge] Edge worker supports capacity handling instead of concurrency #43737

Open
wants to merge 15 commits into
base: main
Choose a base branch
from

Conversation

AutomationDev85
Copy link
Contributor

Description

Edge worker supported at the current state only concurrency. Every job reduced the available concurrency by one. If worker had concurrency of 8 then 8 jobs can run parallel. Concurrency is now switched to capacity.

The idea behind is that not all jobs on a worker which runs parallel are consuming the same amount of resources. Now it is possible to set need_capacity value for a job which need more resources. By default the need_capacity is 1 for each job. If worker has capacity of 2 and executes one job with 2 then no new job is executed by the worker. But if a job with need_capacity of 1 is executed a second job with need_capacity of 1 can be executed in parallel.

For detailed information check the edge_executor.rst changes in this PR.

Details about changes

  • Concurrency is renamed to capacity
  • Job can use the executor_config to define the need_capacitiy
  • edge_job table includes new column need_capacity
  • Free_capacity of a worker is exported as metric
  • As alembic provider migration will be supported inAirflow 3 and Edge package is not fully released yet, a workaround is implemented to drop old table and create new edge_job table.

@jscheffl jscheffl added AIP-69 Edge Executor provider:edge Edge Executor / Worker (AIP-69) labels Nov 6, 2024
@potiuk
Copy link
Member

potiuk commented Nov 6, 2024

Should not that capacity be a task parameter rather than executor config parameter on DAG level. We have similar concept with pool_slots and there they are "per task" - and part of the BaseOperator. It seems to be way more flexible to specify it this way (additionally then this could be renamed as "task_slots" - to be similar to "pool_slots") or maybe even we should combine the two. This way it will also be potentially usable by other executors.

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small comments - but also alongside Jareks comments had the same thought. Seems a small discussion is needed on this.

providers/src/airflow/providers/edge/provider.yaml Outdated Show resolved Hide resolved
docs/apache-airflow-providers-edge/edge_executor.rst Outdated Show resolved Hide resolved
providers/src/airflow/providers/edge/cli/edge_command.py Outdated Show resolved Hide resolved
@AutomationDev85
Copy link
Contributor Author

AutomationDev85 commented Nov 7, 2024

Should not that capacity be a task parameter rather than executor config parameter on DAG level. We have similar concept with pool_slots and there they are "per task" - and part of the BaseOperator. It seems to be way more flexible to specify it this way (additionally then this could be renamed as "task_slots" - to be similar to "pool_slots") or maybe even we should combine the two. This way it will also be potentially usable by other executors.

I was thinking in the same way, but during coding I saw that the need_capacity parameter is no easy to get into the executor. We have to tough core code like TaskInstanceKey class to get the info into the Executor. I just used the idea of the KubernetesExecutor to add additional data into the executor and that is the reason why I started using the executor_config parameter. My main idea is to tough only Edge package t and then make a later PR which can add this changes into the core because the Edge package is the only which will support this feature for the moment and it is not released yet.
So what is your opinion about that?
Shall we change this also in this PR or in a separate PR?
During writing this lines I have also the feeling to use still the term concurrency instead of capacity. Then it is easier to adapt this to already existing Executor code in the future.

@AutomationDev85 AutomationDev85 force-pushed the feature/edge-worker-supports-concurrency-slots-handling branch from 6fbca24 to aee1de7 Compare November 7, 2024 09:41
Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another round of review.

providers/src/airflow/providers/edge/provider.yaml Outdated Show resolved Hide resolved
docs/apache-airflow-providers-edge/edge_executor.rst Outdated Show resolved Hide resolved
providers/src/airflow/providers/edge/models/edge_job.py Outdated Show resolved Hide resolved
@jscheffl
Copy link
Contributor

jscheffl commented Nov 7, 2024

Should not that capacity be a task parameter rather than executor config parameter on DAG level. We have similar concept with pool_slots and there they are "per task" - and part of the BaseOperator. It seems to be way more flexible to specify it this way (additionally then this could be renamed as "task_slots" - to be similar to "pool_slots") or maybe even we should combine the two. This way it will also be potentially usable by other executors.

I was thinking in the same way, but during coding I saw that the need_capacity parameter is no easy to get into the executor. We have to tough core code like TaskInstanceKey class to get the info into the Executor. I just used the idea of the KubernetesExecutor to add additional data into the executor and that is the reason why I started using the executor_config parameter. My main idea is to tough only Edge package t and then make a later PR which can add this changes into the core because the Edge package is the only which will support this feature for the moment and it is not released yet. So what is your opinion about that? Shall we change this also in this PR or in a separate PR? During writing this lines I have also the feeling to use still the term concurrency instead of capacity. Then it is easier to adapt this to already existing Executor code in the future.

@potiuk I had also a longer talk to @AutomationDev85 today about this. Reading the docs from https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/kubernetes_executor.html#pod-override this field is used (and is only used there today) to carry a dict with a potential included of pod_override element that can define extra details of the POD to run for the task execution. That can be used to add volume mounts, request resources or add sidecars... whatsoever.

With this interface generically more parameters can be carried. An additional field in the dict would not harm. I am thinking that maybe instead of "needs capacity" we should name it pool_slots according to the task instance parameter. With this PR here, you would need to define this as extra field on the task instance... but with a small additional PR we could bring the pool_slots from the task instance per default in there for future leverage... but then this intrinsic is a bit confusing though.

As @AutomationDev85 said we could also bring the pool_slots field directly from the task instance into the executor, but today the interface in the scheduler in airflow/executors/base_executor.py:execute_async() only carries TaskInstanceKey, Command, queue and the executor_config - adding the full TaskInstance or the pool_slots here would be a breaking change in the interface or the executor would need to query the DB additionally to get the pool_slots (which the scheduler obviously already has because it allocated the pool slots before scheduling... the calling method _process_tasks() has the taskinstance object).

@potiuk Do you think we need/should to make a breaking change in the scheduler/executor interface or add an intrinsic?
@ashb as being the Scheduler expert, would you have an opinion on this?

@AutomationDev85 AutomationDev85 force-pushed the feature/edge-worker-supports-concurrency-slots-handling branch from c3f8a6c to 2819a00 Compare November 11, 2024 09:22
Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good for me.

Acknowledging that the scheduler interface can not easily be adjusted w/o breaking all existing executors... so using the executor_config is the second best option tha is possible w/o breaking the API.

@potiuk / @ashb Okay to merge or any objections from you?

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still not convinced. I would rather see a single way to specify "weight of the task" and then pass it to executor so that executor understands it and uses it in the way that iit wants.

Adding "executor config" as dag parameter feels wrong. It assumes that the dag and tasks will be executed via edge executor. What happens if that task will be executed via another executor ? it means that we should change DAG definition for that including "executor config".

While I understand it is done this way for K8S executor configuration, it's passing there k8s specific parameters, but in this case what we are passing is "weight" of the task which is much more "abstract thing".

Say we have two tasks - one with weight 1 and one with weight 2 - eventually we would like all (or at least most) of our executors to understand this and map it to the executor's workers.

I want to hear other's opinion on that @ashb @o-nikolas ? and mark it as request changes for now until we agree if this is fine or not to keep it as "executor-specific" config.

But my intuition tells me that we should use an "abstract" weight of the task here and it should be mapped to both "pool slots" and to "task weight" used by the executor. I do not particularly like "concurrency slots" as the name - it's quite misleading, what we really talk about is how heavy the task is.

The benefit of that approach is that it might be used in the future for any executors. Also they might choose whether they want to use "concurrency" for that or other ways. I can for example imagine an executor that runs a task on "small" or "medium" or "big" node and they could map the "weight" to choose "small" when it is 1 . or "medium" when it is "2" or "3" and "big" when it is "4+" or smth similar.

@potiuk
Copy link
Member

potiuk commented Nov 11, 2024

As @AutomationDev85 said we could also bring the pool_slots field directly from the task instance into the executor, but today the interface in the scheduler in airflow/executors/base_executor.py:execute_async() only carries TaskInstanceKey, Command, queue and the executor_config - adding the full TaskInstance or the pool_slots here would be a breaking change in the interface or the executor would need to query the DB additionally to get the pool_slots (which the scheduler obviously already has because it allocated the pool slots before scheduling... the calling method _process_tasks() has the taskinstance object).

And comment on that - again, I am not really trying to block/veto it, but I don't think the breaking or not breaking here matters when we go to Airlfow 3. We will already have breaking changes for Executors i believe, and we will have to handle them. For me it is much more important to try to deliberate and discuss and come up with good future executor interface than to merge a PR for edge executor only. This can wait. There is absolutely no hurry with it (unless i am mistaken) - we can even keep on rebasing it until we come to conclusion on what's best/)

I think it is more important to think forward and whether we want to address an improvement need of one executor (which is not even production ready yet) or whether we want to think of future common "executor" scenarios. I think if we will not consider the future now, we might be building technical debt before even we release Airflow 3.

For example when I think about YuniKorn executor in the future (cc: @amoghrajesh ) I think we would like some way of passing metadata to uvicorn executor from tasks - to allow various ways of scheduling those. I think we need to discuss how "executor specific" vs."airflow abstract" the meta-data should be. I.e. do we have some properties of the task that can be mapped from "airflow abstract" terms to specific executor matadata - this can help in the future to freely move tasks betweeen executors without specifically rewriting their metadata.

And I do not think we will be able to do it for all parameters, but there are certain properties that could be "abstract":

  • task weight
  • task group task belongs to
  • task labels

All those seems to be "abstract" enough to be shared between different executors - unlike kubernetes executor config that is really a pod-template override, those could cover vast majority of cases where we want to attach some common properties with tasks that might have similar meaning even if they are run by different executors.

Maybe we want this, maybe not - maybe it's a wrong abstraction. But at least it's worth to discuss it rather than merge the PR without discussing it.

@jscheffl
Copy link
Contributor

Maybe we want this, maybe not - maybe it's a wrong abstraction. But at least it's worth to discuss it rather than merge the PR without discussing it.

Fair. And it is a bit a pity that the "TaskInstance" object is not passed into the Executor interface, the calling method has all the fields available. So if the TaskInstance would be included in execute_async signature all would be there.

For sure, don't mis-understand. We don't want to build an EdgeWorker "balcony" here. Would be great if other executors would have this as well. Was looking once about if this could be made for Celery as well but could not find an entry point w/o patching deep in celery :-D

If you dislike the parameter - I assume this can be changed... main point is how more task context can be made available. For example "some" fields like priority, pool slots, labels, max_tries, pool, queue could be genericaly be added into the dict of executor_config...

@o-nikolas
Copy link
Contributor

I want to hear other's opinion on that @ashb @o-nikolas ? and mark it as request changes for now until we agree if this is fine or not to keep it as "executor-specific" config.

Here are my thoughts:

1) Can we use executor_config in this way?

The executor_config field is there to pass any key/value configuration to the executor for a specific task. The k8s executor uses it as already discussed, the ECS executor treats it as AWS parameters which are passed along to the boto3 call to ecs run_task (so the user can configure more memory for a particular task, or send it to a different ECS cluster, or simply add tags, etc). So it is free to be used in any way an executor sees fit and it's a nice mechanism to customize execution for a task.

2) Should we use executor_config in this way for capacity management

I think this idea of capacity management is interesting and agree we have to make a call on whether or not we should adopt this as a firstclass feature or not within the Airflow executor interface. I agree with @potiuk that there is no harm in updating the executor interface to support it. We have a major release coming up and that's what they're for, so if we want to update the exec_async method now to include more parameters (or even pass along the entire TI as @jscheffl suggested) that's perfectly doable.
But whether we should is another story. Here I think it's less obvious. Many executors don't even have a notion of long lived "workers" (they are completely ephemeral and map 1:1 with a single task for any containerized executor like k8s and AWS ECS). But they don't have to make use of it, or you could imagine it as a way to map to a larger container image configuration for executors like this (i.e. a two slot task would get a larger container with more memory or what have you, pre-defined by the user).

One last thing that I do feel strongly about (and I think this also agrees with what @potiuk is saying) is that Airflow already has slots and executors also already have a notion of their own slots (kind of like their internal capacity), which they use to manage how many tasks they can run concurrently/in parallel. I think we should build on this existing mechanism (i.e. a larger task should consume >1 executor slots for the executor that receives it) and then within the executor, knowing that a task is consuming a particular number of slots, the executor can handle that task especially/differently (or do nothing at all if the executor doesn't care about task size). This pushes this to be a Task Instance property which I think makes the most sense, because it is the task that is taking up more capacity and the executor just needs to account for that and react appropriately.

@potiuk
Copy link
Member

potiuk commented Nov 12, 2024

Should we turn it into a devlist discussion? It seems that is a decision that shoudd be made about now - i.e. what we really want to do with executor and "task properties".

I think we have two options:

  1. we make any metadata we want to attach to task "executor specific" - like kubernetes executor so far
  2. we try to make some common (well defined) "properties" of the task, exposed to executors so that they can adapt on how they are running tasks. Task "weight" seem to be a good candidate for that that we can also use elsewhere (i.e. pool slots).

And indeed, we are at the right time to introduce breaking chnges (or variations) in the executor interface to acommodate to 2) if we choose to go this direction.

@jscheffl
Copy link
Contributor

jscheffl commented Nov 13, 2024

Should we turn it into a devlist discussion? It seems that is a decision that shoudd be made about now - i.e. what we really want to do with executor and "task properties".

Yeah, would support a devlist discussion... @AutomationDev85 will you take this?

@potiuk would it be only a discussion, does this then need a formal vote or lazy consensus? I believe we did not vote on breaking changes in the past... discussion would be mainly to attract opinions and feedback on PR?

I'd propose to go into the option (2) and (while on my way back from work) I had a good idea such that the change would be softly breaking:

Besides the today's (interface limited) execute_async() we add a new <name-tbd>() method (e.g. execute() because the existing method is not really async and name is a bit mis-leading). The existing execute_async() will get a deprecation warning and call the new method. So old and new interface could be supported.
We could add this to 2.10.4, latest to 2.11. As it is non-breaking I'd say the earlier the better. Docs would need to be updated as well, highlighting the deprecation.

Then we could have starting from 2.10.4 slowly start migrating existing executors to the new interface, also support the old signature still not to break Airflow compatibility.... and drop the old method in 3.0. (We still do not force and could keep the old signature, does not harm).

One option as (2) could be to have a specific well-defined interface.... but actually much easier, we could directly pass the TaskInstance object as the caller has this. Then all details like priority, pool slots, executor_config are directly accessible. And we can implement the pool_slot awareness in LocalExecutor as first-class citizen as well. And if TaskInstance is extended in future, no interface changes needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-69 Edge Executor area:providers kind:documentation provider:edge Edge Executor / Worker (AIP-69)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants