Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Fix map operator fusion when concurrency is set #49573

Merged
merged 4 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions python/ray/data/_internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def get_compute(compute_spec: Union[str, ComputeStrategy]) -> ComputeStrategy:
if not isinstance(compute_spec, (TaskPoolStrategy, ActorPoolStrategy)):
raise ValueError(
"In Ray 2.5, the compute spec must be either "
f"TaskPoolStrategy or ActorPoolStategy, was: {compute_spec}."
f"TaskPoolStrategy or ActorPoolStrategy, was: {compute_spec}."
)
elif not compute_spec or compute_spec == "tasks":
return TaskPoolStrategy()
Expand All @@ -141,11 +141,3 @@ def get_compute(compute_spec: Union[str, ComputeStrategy]) -> ComputeStrategy:
return compute_spec
else:
raise ValueError("compute must be one of [`tasks`, `actors`, ComputeStrategy]")


def is_task_compute(compute_spec: Union[str, ComputeStrategy]) -> bool:
return (
not compute_spec
or compute_spec == "tasks"
or isinstance(compute_spec, TaskPoolStrategy)
)
28 changes: 17 additions & 11 deletions python/ray/data/_internal/logical/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(
min_rows_per_bundled_input: Optional[int] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
compute: Optional[ComputeStrategy] = None,
):
"""
Args:
Expand All @@ -51,6 +52,7 @@ def __init__(
self._min_rows_per_bundled_input = min_rows_per_bundled_input
self._ray_remote_args = ray_remote_args or {}
self._ray_remote_args_fn = ray_remote_args_fn
self._compute = compute or TaskPoolStrategy()


class AbstractUDFMap(AbstractMap):
Expand All @@ -68,7 +70,7 @@ def __init__(
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
min_rows_per_bundled_input: Optional[int] = None,
compute: Optional[Union[str, ComputeStrategy]] = None,
compute: Optional[ComputeStrategy] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
Expand All @@ -87,8 +89,8 @@ def __init__(
`fn` if `fn` is a callable class.
min_rows_per_bundled_input: The target number of rows to pass to
``MapOperator._add_bundled_input()``.
compute: The compute strategy, either ``"tasks"`` (default) to use Ray
tasks, or ``"actors"`` to use an autoscaling actor pool.
compute: The compute strategy, either ``TaskPoolStrategy`` (default) to use
Ray tasks, or ``ActorPoolStrategy`` to use an autoscaling actor pool.
ray_remote_args_fn: A function that returns a dictionary of remote args
passed to each map worker. The purpose of this argument is to generate
dynamic arguments for each actor/task, and will be called each time
Expand All @@ -103,13 +105,13 @@ def __init__(
input_op,
min_rows_per_bundled_input=min_rows_per_bundled_input,
ray_remote_args=ray_remote_args,
compute=compute,
)
self._fn = fn
self._fn_args = fn_args
self._fn_kwargs = fn_kwargs
self._fn_constructor_args = fn_constructor_args
self._fn_constructor_kwargs = fn_constructor_kwargs
self._compute = compute or TaskPoolStrategy()
self._ray_remote_args_fn = ray_remote_args_fn

def _get_operator_name(self, op_name: str, fn: UserDefinedFunction):
Expand Down Expand Up @@ -154,7 +156,7 @@ def __init__(
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
min_rows_per_bundled_input: Optional[int] = None,
compute: Optional[Union[str, ComputeStrategy]] = None,
compute: Optional[ComputeStrategy] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
Expand Down Expand Up @@ -191,7 +193,7 @@ def __init__(
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
compute: Optional[Union[str, ComputeStrategy]] = None,
compute: Optional[ComputeStrategy] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
Expand Down Expand Up @@ -221,7 +223,7 @@ def __init__(
input_op: LogicalOperator,
fn: Optional[UserDefinedFunction] = None,
filter_expr: Optional["pa.dataset.Expression"] = None,
compute: Optional[Union[str, ComputeStrategy]] = None,
compute: Optional[ComputeStrategy] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
Expand Down Expand Up @@ -252,11 +254,15 @@ def __init__(
input_op: LogicalOperator,
cols: Optional[List[str]] = None,
cols_rename: Optional[Dict[str, str]] = None,
compute: Optional[Union[str, ComputeStrategy]] = None,
compute: Optional[ComputeStrategy] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__("Project", input_op=input_op, ray_remote_args=ray_remote_args)
self._compute = compute
super().__init__(
"Project",
input_op=input_op,
ray_remote_args=ray_remote_args,
compute=compute,
)
self._batch_size = DEFAULT_BATCH_SIZE
self._cols = cols or []
self._cols_rename = cols_rename or {}
Expand Down Expand Up @@ -287,7 +293,7 @@ def __init__(
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
compute: Optional[Union[str, ComputeStrategy]] = None,
compute: Optional[ComputeStrategy] = None,
ray_remote_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
Expand Down
91 changes: 54 additions & 37 deletions python/ray/data/_internal/logical/rules/operator_fusion.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import itertools
from typing import List, Optional, Tuple

# TODO(Clark): Remove compute dependency once we delete the legacy compute.
from ray.data._internal.compute import get_compute, is_task_compute
from ray.data._internal.compute import (
ActorPoolStrategy,
ComputeStrategy,
TaskPoolStrategy,
)
from ray.data._internal.execution.interfaces import (
PhysicalOperator,
RefBundle,
Expand All @@ -24,7 +27,10 @@
RandomShuffle,
Repartition,
)
from ray.data._internal.logical.operators.map_operator import AbstractUDFMap
from ray.data._internal.logical.operators.map_operator import (
AbstractMap,
AbstractUDFMap,
)
from ray.data._internal.stats import StatsDict
from ray.data.context import DataContext

Expand Down Expand Up @@ -131,11 +137,6 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool:
the same class AND constructor args are the same for both.
* They have compatible remote arguments.
"""
from ray.data._internal.logical.operators.map_operator import (
AbstractMap,
AbstractUDFMap,
)

if not up_op.supports_fusion() or not down_op.supports_fusion():
return False

Expand Down Expand Up @@ -191,15 +192,16 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool:
if isinstance(down_logical_op, Repartition) and not down_logical_op._shuffle:
return False

if isinstance(down_logical_op, AbstractUDFMap) and isinstance(
up_logical_op, AbstractUDFMap
if isinstance(down_logical_op, AbstractMap) and isinstance(
up_logical_op, AbstractMap
):
# Allow fusing tasks->actors if the resources are compatible (read->map),
# but not the other way around. The latter (downstream op) will be used as
# the compute if fused.
if is_task_compute(down_logical_op._compute) and get_compute(
up_logical_op._compute
) != get_compute(down_logical_op._compute):
if (
self._fuse_compute_strategy(
up_logical_op._compute,
down_logical_op._compute,
)
is None
):
return False

# Only fuse if the ops' remote arguments are compatible.
Expand All @@ -226,6 +228,30 @@ def _can_fuse(self, down_op: PhysicalOperator, up_op: PhysicalOperator) -> bool:
# Otherwise, ops are compatible for fusion.
return True

def _fuse_compute_strategy(
self, up_compute: ComputeStrategy, down_compute: ComputeStrategy
) -> Optional[ComputeStrategy]:
"""Fuse the compute strategies of the upstream and downstream operators.
Returns None if they are not compatible.

Task->Task and Task->Actor are allowed.
Actor->Actor and Actor->Task are not allowed.
"""
if isinstance(up_compute, ActorPoolStrategy):
Copy link
Contributor

@gvspraveen gvspraveen Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason you are disallowing upstream op being Actor here?

Previous logic seems to permit Actor -> Task as long as compute are compatible/same.

is_task_compute(down_logical_op._compute) and get_compute(
                up_logical_op._compute
            ) != get_compute(down_logical_op._compute)
         return False

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actor -> Task is already disallowed by this condition above.

Copy link
Contributor

@gvspraveen gvspraveen Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for future. if resource requirements are same, should we allow Actor -> Task fuse?

return None
assert isinstance(up_compute, TaskPoolStrategy)
if isinstance(down_compute, TaskPoolStrategy):
# For Task->Task, the sizes must match.
if up_compute.size != down_compute.size:
return None
return down_compute
else:
assert isinstance(down_compute, ActorPoolStrategy)
# For Task->Actor, Task's size must match Actor's max_size.
if up_compute.size != down_compute.max_size:
return None
return down_compute

def _can_merge_target_max_block_size(
self,
up_target_max_block_size: Optional[int],
Expand Down Expand Up @@ -263,8 +289,6 @@ def _get_merged_target_max_block_size(
def _get_fused_map_operator(
self, down_op: MapOperator, up_op: MapOperator
) -> MapOperator:
from ray.data._internal.logical.operators.map_operator import AbstractMap

assert self._can_fuse(down_op, up_op), (
"Current rule supports fusing MapOperator->MapOperator, but received: "
f"{type(up_op).__name__} -> {type(down_op).__name__}"
Expand All @@ -275,18 +299,12 @@ def _get_fused_map_operator(

down_logical_op = self._op_map.pop(down_op)
up_logical_op = self._op_map.pop(up_op)
assert isinstance(down_logical_op, AbstractMap)
assert isinstance(up_logical_op, AbstractMap)

# Merge minimum block sizes.
down_min_rows_per_bundled_input = (
down_logical_op._min_rows_per_bundled_input
if isinstance(down_logical_op, AbstractMap)
else None
)
up_min_rows_per_bundled_input = (
up_logical_op._min_rows_per_bundled_input
if isinstance(up_logical_op, AbstractMap)
else None
)
down_min_rows_per_bundled_input = down_logical_op._min_rows_per_bundled_input
up_min_rows_per_bundled_input = up_logical_op._min_rows_per_bundled_input
if (
down_min_rows_per_bundled_input is not None
and up_min_rows_per_bundled_input is not None
Expand All @@ -303,11 +321,10 @@ def _get_fused_map_operator(
up_op.target_max_block_size, down_op.target_max_block_size
)

# We take the downstream op's compute in case we're fusing upstream tasks with a
# downstream actor pool (e.g. read->map).
compute = None
if isinstance(down_logical_op, AbstractUDFMap):
compute = get_compute(down_logical_op._compute)
compute = self._fuse_compute_strategy(
up_logical_op._compute, down_logical_op._compute
)
assert compute is not None
ray_remote_args = up_logical_op._ray_remote_args
ray_remote_args_fn = (
up_logical_op._ray_remote_args_fn or down_logical_op._ray_remote_args_fn
Expand Down Expand Up @@ -359,8 +376,6 @@ def _get_fused_map_operator(
ray_remote_args,
)
else:
from ray.data._internal.logical.operators.map_operator import AbstractMap

# The downstream op is AbstractMap instead of AbstractUDFMap.
logical_op = AbstractMap(
name,
Expand All @@ -384,8 +399,10 @@ def _get_fused_all_to_all_operator(
# Fuse operator names.
name = up_op.name + "->" + down_op.name

down_logical_op: AbstractAllToAll = self._op_map.pop(down_op)
up_logical_op: AbstractUDFMap = self._op_map.pop(up_op)
down_logical_op = self._op_map.pop(down_op)
up_logical_op = self._op_map.pop(up_op)
assert isinstance(down_logical_op, AbstractAllToAll)
assert isinstance(up_logical_op, AbstractMap)

# Fuse transformation functions.
ray_remote_args = up_logical_op._ray_remote_args
Expand Down
70 changes: 70 additions & 0 deletions python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import ray
from ray.data._internal.aggregate import Count
from ray.data._internal.compute import ComputeStrategy
from ray.data._internal.datasource.parquet_datasink import ParquetDatasink
from ray.data._internal.execution.interfaces import ExecutionOptions
from ray.data._internal.execution.operators.base_physical_operator import (
Expand Down Expand Up @@ -1097,6 +1098,75 @@ def test_write_fusion(ray_start_regular_shared, tmp_path):
_check_usage_record(["ReadRange", "WriteCSV"])


@pytest.mark.parametrize(
"up_use_actor, up_concurrency, down_use_actor, down_concurrency, should_fuse",
[
# === Task->Task cases ===
# Same concurrency set. Should fuse.
(False, 1, False, 1, True),
# Different concurrency set. Should not fuse.
(False, 1, False, 2, False),
# If one op has concurrency set, and the other doesn't, should not fuse.
(False, None, False, 1, False),
(False, 1, False, None, False),
# === Task->Actor cases ===
# When max size matches, should fuse.
(False, 2, True, 2, True),
(False, 2, True, (1, 2), True),
# When max size doesn't match, should not fuse.
(False, 1, True, 2, False),
(False, 1, True, (1, 2), False),
(False, None, True, (1, 2), False),
# === Actor->Task cases ===
# Should not fuse whatever concurrency is set.
(True, 2, False, 2, False),
# === Actor->Actor cases ===
# Should not fuse whatever concurrency is set.
(True, 2, True, 2, False),
],
)
def test_map_fusion_with_concurrency_arg(
ray_start_regular_shared,
up_use_actor,
up_concurrency,
down_use_actor,
down_concurrency,
should_fuse,
):
"""Test map operator fusion with different concurrency settings."""

class Map:
def __call__(self, row):
return row

def map(row):
return row

ds = ray.data.range(10, override_num_blocks=2)
if not up_use_actor:
ds = ds.map(map, num_cpus=0, concurrency=up_concurrency)
up_name = "Map(map)"
else:
ds = ds.map(Map, num_cpus=0, concurrency=up_concurrency)
up_name = "Map(Map)"

if not down_use_actor:
ds = ds.map(map, num_cpus=0, concurrency=down_concurrency)
down_name = "Map(map)"
else:
ds = ds.map(Map, num_cpus=0, concurrency=down_concurrency)
down_name = "Map(Map)"

assert extract_values("id", ds.take_all()) == list(range(10))

name = f"{up_name}->{down_name}"
stats = ds.stats()
if should_fuse:
assert name in stats, stats
else:
assert name not in stats, stats


def test_write_operator(ray_start_regular_shared, tmp_path):
ctx = DataContext.get_current()

Expand Down