Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runner benchmark #4210

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
98744d5
add benchmark dependencies
noklam Oct 7, 2024
216cc7a
add structure
noklam Oct 9, 2024
a4ab4c4
add benchmark
noklam Oct 9, 2024
92c7556
tmp commit
noklam Oct 14, 2024
e1c4156
force benchmarks to be a package
noklam Oct 14, 2024
06d178d
update config
noklam Oct 14, 2024
783b2e8
rename folder
noklam Oct 14, 2024
033237b
fix asv config
noklam Oct 14, 2024
0cb6e4f
rename
noklam Oct 14, 2024
1b8a7ab
update format
noklam Oct 14, 2024
4341284
typo
noklam Oct 14, 2024
bc1ec5c
update
noklam Oct 14, 2024
58a70ff
incorrect config
noklam Oct 14, 2024
145af85
update
noklam Oct 14, 2024
7e00882
update
noklam Oct 14, 2024
66ad6c5
back to kedro_benchmarks
noklam Oct 16, 2024
985a051
rename benchmark file
noklam Oct 16, 2024
0f5a4f0
clean up
noklam Oct 16, 2024
ddd6a77
update asv config
noklam Oct 16, 2024
4f905b5
update config
noklam Oct 16, 2024
515d91c
update config
noklam Oct 16, 2024
fdf0fe2
Merge branch 'main' into noklam/stress-testing-runners-4127
noklam Oct 17, 2024
74d24bf
fix memory test
noklam Oct 18, 2024
aaa9a3f
remove memory tracking since it's not meaningful
noklam Oct 21, 2024
b733534
Merge branch 'main' into noklam/stress-testing-runners-4127
noklam Oct 21, 2024
4c9098b
Merge branch 'main' into noklam/stress-testing-runners-4127
noklam Oct 21, 2024
752440b
mergMerge branch 'noklam/stress-testing-runners-4127' of github.com:k…
noklam Oct 21, 2024
8ee7dac
test
noklam Oct 21, 2024
05bcd5e
commit benchmark module
noklam Oct 21, 2024
f1a1235
ADD README
noklam Oct 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions asv.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
"project": "Kedro",
"project_url": "https://kedro.org/",
"repo": ".",
"install_command": ["pip install -e ."],
"branches": ["main"],
"install_command": [
"pip install -e . kedro-datasets[pandas-csvdataset]"
],
"branches": [
"noklam/stress-testing-runners-4127"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to revert before merge.

],
"environment_name": "kedro",
"environment_type": "virtualenv",
"show_commit_url": "http://github.com/kedro-org/kedro/commit/",
"results_dir": ".asv/results",
"html_dir": ".asv/html",
"matrix": {
"req": {
"kedro-datasets": [],
"pandas": []
"kedro-datasets[pandas]": [],
}
}
}
21 changes: 21 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
This is the benchmark repository of Kedro, which is mainly used internally:

# Installation
`pip install asv`


# Run the benchmark
Run this in the terminal:
`asv run`

You can also run the benchmark for specific commits or a range of commits, for details
checkout the [official documentation](https://asv.readthedocs.io/en/stable/using.html#benchmarking)

For example, `asv run main..mybranch` will run benchmark against every single commits since branching off from
`main`.

## Compare benchmark for two commits:
Run this in the terminal:
`asv compare v0.1 v0.2`

This run benchmark against two different commits
16 changes: 0 additions & 16 deletions benchmarks/benchmark_dummy.py

This file was deleted.

153 changes: 153 additions & 0 deletions benchmarks/benchmark_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Write the benchmarking functions here.
# See "Writing benchmarks" in the asv docs for more information.

from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline
from kedro.io.data_catalog import DataCatalog
from kedro.runner import SequentialRunner, ThreadRunner, ParallelRunner
import time
import numpy as np
import yaml
import pandas as pd
from kedro.io.memory_dataset import MemoryDataset
from pathlib import Path


# Simulate an I/O-bound task
def io_bound_task(input_data):
time.sleep(2) # Simulate an I/O wait (e.g., reading from a file)
output = input_data
return output


# Simulate a compute-bound task (matrix multiplication)
def compute_bound_task(input_data) -> str:
# Simulate heavy compute that are not using multicore (not pandas/numpy etc)
ans = 1
for i in range(1, 50000):
ans = ans * i
return "dummy"


def create_data_catalog():
"""
Use dataset factory pattern to make sure the benchmark cover the slowest path.
"""
catalog_conf = """

'output_{pattern}':
type: pandas.CSVDataset
filepath: benchmarks/data/'{pattern}.csv'

'numpy_{pattern}':
type: pickle.PickleDataset
filepath: benchmarks/data/'{pattern}.pkl'

'{catch_all_dataset_pattern}':
type: pandas.CSVDataset
filepath: benchmarks/data/data.csv
"""
catalog_conf = yaml.safe_load(catalog_conf)
catalog = DataCatalog.from_config(catalog_conf)
return catalog


def create_io_bound_node(inputs=None, outputs=None, name=None):
io_node = node(io_bound_task, inputs=inputs, outputs=outputs, name=name)
return io_node


def create_io_bound_pipeline():
dummy_pipeline = pipeline(
[
create_io_bound_node("dummy_1", "output_1"),
create_io_bound_node("dummy_2", "output_2"),
create_io_bound_node("dummy_3", "output_3"),
create_io_bound_node("dummy_4", "output_4"),
create_io_bound_node("dummy_5", "output_5"),
create_io_bound_node("dummy_6", "output_6"),
create_io_bound_node("dummy_7", "output_7"),
create_io_bound_node("dummy_1", "output_8"),
create_io_bound_node("dummy_1", "output_9"),
create_io_bound_node("dummy_1", "output_10"),
]
)
return dummy_pipeline


def create_compute_bound_node(inputs=None, outputs=None, name=None):
io_node = node(compute_bound_task, inputs=inputs, outputs=outputs, name=name)
return io_node


def create_compute_bound_pipeline():
dummy_pipeline = pipeline(
[
create_compute_bound_node("dummy_1", "numpy_1"),
create_compute_bound_node("dummy_2", "numpy_2"),
create_compute_bound_node("dummy_3", "numpy_3"),
create_compute_bound_node("dummy_4", "numpy_4"),
create_compute_bound_node("dummy_5", "numpy_5"),
create_compute_bound_node("dummy_6", "numpy_6"),
create_compute_bound_node("dummy_7", "numpy_7"),
create_compute_bound_node("dummy_1", "numpy_8"),
create_compute_bound_node("dummy_1", "numpy_9"),
create_compute_bound_node("dummy_1", "numpy_10"),
]
)
return dummy_pipeline


class RunnerMemorySuite:
params = [SequentialRunner, ThreadRunner, ParallelRunner]
param_names = ["runner"]

def setup(self, *args, **kwargs):
data_dir = Path("benchmarks/data")
data_dir.mkdir(exist_ok=True, parents=True)

# Create a dummy csv
with open(data_dir / "data.csv", "w") as f:
f.write("col1,col2\n1,2\n")

def mem_runners(self, runner):
catalog = create_data_catalog()
test_pipeline = create_compute_bound_pipeline()
runner_obj = runner()
runner_obj.run(test_pipeline, catalog=catalog)

def peakmem_runners(self, runner):
catalog = create_data_catalog()
test_pipeline = create_compute_bound_pipeline()
runner_obj = runner()
runner_obj.run(test_pipeline, catalog=catalog)


class RunnerTimeSuite:
def setup(self, *args, **kwargs):
data_dir = Path("benchmarks/data")
data_dir.mkdir(exist_ok=True, parents=True)

# Create a dummy csv
with open(data_dir / "data.csv", "w") as f:
f.write("col1,col2\n1,2\n")

def time_sequential_runner(self):
catalog = create_data_catalog()
test_pipeline = create_compute_bound_pipeline()
runner_obj = SequentialRunner()
runner_obj.run(test_pipeline, catalog=catalog)

def time_parallel_runner(self):
"""compute bound pipeline"""
catalog = create_data_catalog()
test_pipeline = create_compute_bound_pipeline()
runner_obj = ParallelRunner()
runner_obj.run(test_pipeline, catalog=catalog)

def time_thread_runner(self):
"""IO bound pipeline"""
catalog = create_data_catalog()
test_pipeline = create_io_bound_pipeline()
runner_obj = ThreadRunner()
runner_obj.run(test_pipeline, catalog=catalog)
Empty file added kedro_benchmarks/__init__.py
Empty file.
153 changes: 153 additions & 0 deletions kedro_benchmarks/benchmark_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Write the benchmarking functions here.
# See "Writing benchmarks" in the asv docs for more information.

from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline
from kedro.io.data_catalog import DataCatalog
from kedro.runner import SequentialRunner, ThreadRunner, ParallelRunner
import time
import numpy as np
import yaml
import pandas as pd
from kedro.io.memory_dataset import MemoryDataset
from pathlib import Path


# Simulate an I/O-bound task
def io_bound_task(input_data):
time.sleep(2) # Simulate an I/O wait (e.g., reading from a file)
output = input_data
return output


# Simulate a compute-bound task (matrix multiplication)
def compute_bound_task(input_data) -> str:
# Simulate heavy compute that are not using multicore (not pandas/numpy etc)
ans = 1
for i in range(1, 50000):
ans = ans * i
return "dummy"


def create_data_catalog():
"""
Use dataset factory pattern to make sure the benchmark cover the slowest path.
"""
catalog_conf = """

'output_{pattern}':
type: pandas.CSVDataset
filepath: benchmarks/data/'{pattern}.csv'

'numpy_{pattern}':
type: pickle.PickleDataset
filepath: benchmarks/data/'{pattern}.pkl'

'{catch_all_dataset_pattern}':
type: pandas.CSVDataset
filepath: benchmarks/data/data.csv
"""
catalog_conf = yaml.safe_load(catalog_conf)
catalog = DataCatalog.from_config(catalog_conf)
return catalog


def create_io_bound_node(inputs=None, outputs=None, name=None):
io_node = node(io_bound_task, inputs=inputs, outputs=outputs, name=name)
return io_node


def create_io_bound_pipeline():
dummy_pipeline = pipeline(
[
create_io_bound_node("dummy_1", "output_1"),
create_io_bound_node("dummy_2", "output_2"),
create_io_bound_node("dummy_3", "output_3"),
create_io_bound_node("dummy_4", "output_4"),
create_io_bound_node("dummy_5", "output_5"),
create_io_bound_node("dummy_6", "output_6"),
create_io_bound_node("dummy_7", "output_7"),
create_io_bound_node("dummy_1", "output_8"),
create_io_bound_node("dummy_1", "output_9"),
create_io_bound_node("dummy_1", "output_10"),
]
)
return dummy_pipeline


def create_compute_bound_node(inputs=None, outputs=None, name=None):
io_node = node(compute_bound_task, inputs=inputs, outputs=outputs, name=name)
return io_node


def create_compute_bound_pipeline():
dummy_pipeline = pipeline(
[
create_compute_bound_node("dummy_1", "numpy_1"),
create_compute_bound_node("dummy_2", "numpy_2"),
create_compute_bound_node("dummy_3", "numpy_3"),
create_compute_bound_node("dummy_4", "numpy_4"),
create_compute_bound_node("dummy_5", "numpy_5"),
create_compute_bound_node("dummy_6", "numpy_6"),
create_compute_bound_node("dummy_7", "numpy_7"),
create_compute_bound_node("dummy_1", "numpy_8"),
create_compute_bound_node("dummy_1", "numpy_9"),
create_compute_bound_node("dummy_1", "numpy_10"),
]
)
return dummy_pipeline


class RunnerMemorySuite:
params = [SequentialRunner, ThreadRunner, ParallelRunner]
param_names = ["runner"]

def setup(self, *args, **kwargs):
data_dir = Path("benchmarks/data")
data_dir.mkdir(exist_ok=True, parents=True)

# Create a dummy csv
with open(data_dir / "data.csv", "w") as f:
f.write("col1,col2\n1,2\n")

def mem_runners(self, runner):
catalog = create_data_catalog()
test_pipeline = create_compute_bound_pipeline()
runner_obj = runner()
runner_obj.run(test_pipeline, catalog=catalog)

def peakmem_runners(self, runner):
catalog = create_data_catalog()
test_pipeline = create_compute_bound_pipeline()
runner_obj = runner()
runner_obj.run(test_pipeline, catalog=catalog)


class RunnerTimeSuite:
def setup(self, *args, **kwargs):
data_dir = Path("benchmarks/data")
data_dir.mkdir(exist_ok=True, parents=True)

# Create a dummy csv
with open(data_dir / "data.csv", "w") as f:
f.write("col1,col2\n1,2\n")

def time_sequential_runner(self):
catalog = create_data_catalog()
test_pipeline = create_compute_bound_pipeline()
runner_obj = SequentialRunner()
runner_obj.run(test_pipeline, catalog=catalog)

def time_parallel_runner(self):
"""compute bound pipeline"""
catalog = create_data_catalog()
test_pipeline = create_compute_bound_pipeline()
runner_obj = ParallelRunner()
runner_obj.run(test_pipeline, catalog=catalog)

def time_thread_runner(self):
"""IO bound pipeline"""
catalog = create_data_catalog()
test_pipeline = create_io_bound_pipeline()
runner_obj = ThreadRunner()
runner_obj.run(test_pipeline, catalog=catalog)
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ jupyter = [
"ipylab>=1.0.0",
"notebook>=7.0.0" # requires the new share backend of notebook and labs"
]
all = [ "kedro[test,docs,jupyter]" ]
benchmark = [
"asv"
]
all = [ "kedro[test,docs,jupyter,benchmark]" ]

[project.urls]
Homepage = "https://kedro.org"
Expand Down
Loading