-
Notifications
You must be signed in to change notification settings - Fork 901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add runner benchmark #4210
Open
noklam
wants to merge
30
commits into
main
Choose a base branch
from
noklam/stress-testing-runners-4127
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+339
−21
Open
Add runner benchmark #4210
Changes from all commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
98744d5
add benchmark dependencies
noklam 216cc7a
add structure
noklam a4ab4c4
add benchmark
noklam 92c7556
tmp commit
noklam e1c4156
force benchmarks to be a package
noklam 06d178d
update config
noklam 783b2e8
rename folder
noklam 033237b
fix asv config
noklam 0cb6e4f
rename
noklam 1b8a7ab
update format
noklam 4341284
typo
noklam bc1ec5c
update
noklam 58a70ff
incorrect config
noklam 145af85
update
noklam 7e00882
update
noklam 66ad6c5
back to kedro_benchmarks
noklam 985a051
rename benchmark file
noklam 0f5a4f0
clean up
noklam ddd6a77
update asv config
noklam 4f905b5
update config
noklam 515d91c
update config
noklam fdf0fe2
Merge branch 'main' into noklam/stress-testing-runners-4127
noklam 74d24bf
fix memory test
noklam aaa9a3f
remove memory tracking since it's not meaningful
noklam b733534
Merge branch 'main' into noklam/stress-testing-runners-4127
noklam 4c9098b
Merge branch 'main' into noklam/stress-testing-runners-4127
noklam 752440b
mergMerge branch 'noklam/stress-testing-runners-4127' of github.com:k…
noklam 8ee7dac
test
noklam 05bcd5e
commit benchmark module
noklam f1a1235
ADD README
noklam File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
This is the benchmark repository of Kedro, which is mainly used internally: | ||
|
||
# Installation | ||
`pip install asv` | ||
|
||
|
||
# Run the benchmark | ||
Run this in the terminal: | ||
`asv run` | ||
|
||
You can also run the benchmark for specific commits or a range of commits, for details | ||
checkout the [official documentation](https://asv.readthedocs.io/en/stable/using.html#benchmarking) | ||
|
||
For example, `asv run main..mybranch` will run benchmark against every single commits since branching off from | ||
`main`. | ||
|
||
## Compare benchmark for two commits: | ||
Run this in the terminal: | ||
`asv compare v0.1 v0.2` | ||
|
||
This run benchmark against two different commits |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
# Write the benchmarking functions here. | ||
# See "Writing benchmarks" in the asv docs for more information. | ||
|
||
from kedro.pipeline import node | ||
from kedro.pipeline.modular_pipeline import pipeline | ||
from kedro.io.data_catalog import DataCatalog | ||
from kedro.runner import SequentialRunner, ThreadRunner, ParallelRunner | ||
import time | ||
import numpy as np | ||
import yaml | ||
import pandas as pd | ||
from kedro.io.memory_dataset import MemoryDataset | ||
from pathlib import Path | ||
|
||
|
||
# Simulate an I/O-bound task | ||
def io_bound_task(input_data): | ||
time.sleep(2) # Simulate an I/O wait (e.g., reading from a file) | ||
output = input_data | ||
return output | ||
|
||
|
||
# Simulate a compute-bound task (matrix multiplication) | ||
def compute_bound_task(input_data) -> str: | ||
# Simulate heavy compute that are not using multicore (not pandas/numpy etc) | ||
ans = 1 | ||
for i in range(1, 50000): | ||
ans = ans * i | ||
return "dummy" | ||
|
||
|
||
def create_data_catalog(): | ||
""" | ||
Use dataset factory pattern to make sure the benchmark cover the slowest path. | ||
""" | ||
catalog_conf = """ | ||
|
||
'output_{pattern}': | ||
type: pandas.CSVDataset | ||
filepath: benchmarks/data/'{pattern}.csv' | ||
|
||
'numpy_{pattern}': | ||
type: pickle.PickleDataset | ||
filepath: benchmarks/data/'{pattern}.pkl' | ||
|
||
'{catch_all_dataset_pattern}': | ||
type: pandas.CSVDataset | ||
filepath: benchmarks/data/data.csv | ||
""" | ||
catalog_conf = yaml.safe_load(catalog_conf) | ||
catalog = DataCatalog.from_config(catalog_conf) | ||
return catalog | ||
|
||
|
||
def create_io_bound_node(inputs=None, outputs=None, name=None): | ||
io_node = node(io_bound_task, inputs=inputs, outputs=outputs, name=name) | ||
return io_node | ||
|
||
|
||
def create_io_bound_pipeline(): | ||
dummy_pipeline = pipeline( | ||
[ | ||
create_io_bound_node("dummy_1", "output_1"), | ||
create_io_bound_node("dummy_2", "output_2"), | ||
create_io_bound_node("dummy_3", "output_3"), | ||
create_io_bound_node("dummy_4", "output_4"), | ||
create_io_bound_node("dummy_5", "output_5"), | ||
create_io_bound_node("dummy_6", "output_6"), | ||
create_io_bound_node("dummy_7", "output_7"), | ||
create_io_bound_node("dummy_1", "output_8"), | ||
create_io_bound_node("dummy_1", "output_9"), | ||
create_io_bound_node("dummy_1", "output_10"), | ||
] | ||
) | ||
return dummy_pipeline | ||
|
||
|
||
def create_compute_bound_node(inputs=None, outputs=None, name=None): | ||
io_node = node(compute_bound_task, inputs=inputs, outputs=outputs, name=name) | ||
return io_node | ||
|
||
|
||
def create_compute_bound_pipeline(): | ||
dummy_pipeline = pipeline( | ||
[ | ||
create_compute_bound_node("dummy_1", "numpy_1"), | ||
create_compute_bound_node("dummy_2", "numpy_2"), | ||
create_compute_bound_node("dummy_3", "numpy_3"), | ||
create_compute_bound_node("dummy_4", "numpy_4"), | ||
create_compute_bound_node("dummy_5", "numpy_5"), | ||
create_compute_bound_node("dummy_6", "numpy_6"), | ||
create_compute_bound_node("dummy_7", "numpy_7"), | ||
create_compute_bound_node("dummy_1", "numpy_8"), | ||
create_compute_bound_node("dummy_1", "numpy_9"), | ||
create_compute_bound_node("dummy_1", "numpy_10"), | ||
] | ||
) | ||
return dummy_pipeline | ||
|
||
|
||
class RunnerMemorySuite: | ||
params = [SequentialRunner, ThreadRunner, ParallelRunner] | ||
param_names = ["runner"] | ||
|
||
def setup(self, *args, **kwargs): | ||
data_dir = Path("benchmarks/data") | ||
data_dir.mkdir(exist_ok=True, parents=True) | ||
|
||
# Create a dummy csv | ||
with open(data_dir / "data.csv", "w") as f: | ||
f.write("col1,col2\n1,2\n") | ||
|
||
def mem_runners(self, runner): | ||
catalog = create_data_catalog() | ||
test_pipeline = create_compute_bound_pipeline() | ||
runner_obj = runner() | ||
runner_obj.run(test_pipeline, catalog=catalog) | ||
|
||
def peakmem_runners(self, runner): | ||
catalog = create_data_catalog() | ||
test_pipeline = create_compute_bound_pipeline() | ||
runner_obj = runner() | ||
runner_obj.run(test_pipeline, catalog=catalog) | ||
|
||
|
||
class RunnerTimeSuite: | ||
def setup(self, *args, **kwargs): | ||
data_dir = Path("benchmarks/data") | ||
data_dir.mkdir(exist_ok=True, parents=True) | ||
|
||
# Create a dummy csv | ||
with open(data_dir / "data.csv", "w") as f: | ||
f.write("col1,col2\n1,2\n") | ||
|
||
def time_sequential_runner(self): | ||
catalog = create_data_catalog() | ||
test_pipeline = create_compute_bound_pipeline() | ||
runner_obj = SequentialRunner() | ||
runner_obj.run(test_pipeline, catalog=catalog) | ||
|
||
def time_parallel_runner(self): | ||
"""compute bound pipeline""" | ||
catalog = create_data_catalog() | ||
test_pipeline = create_compute_bound_pipeline() | ||
runner_obj = ParallelRunner() | ||
runner_obj.run(test_pipeline, catalog=catalog) | ||
|
||
def time_thread_runner(self): | ||
"""IO bound pipeline""" | ||
catalog = create_data_catalog() | ||
test_pipeline = create_io_bound_pipeline() | ||
runner_obj = ThreadRunner() | ||
runner_obj.run(test_pipeline, catalog=catalog) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
# Write the benchmarking functions here. | ||
# See "Writing benchmarks" in the asv docs for more information. | ||
|
||
from kedro.pipeline import node | ||
from kedro.pipeline.modular_pipeline import pipeline | ||
from kedro.io.data_catalog import DataCatalog | ||
from kedro.runner import SequentialRunner, ThreadRunner, ParallelRunner | ||
import time | ||
import numpy as np | ||
import yaml | ||
import pandas as pd | ||
from kedro.io.memory_dataset import MemoryDataset | ||
from pathlib import Path | ||
|
||
|
||
# Simulate an I/O-bound task | ||
def io_bound_task(input_data): | ||
time.sleep(2) # Simulate an I/O wait (e.g., reading from a file) | ||
output = input_data | ||
return output | ||
|
||
|
||
# Simulate a compute-bound task (matrix multiplication) | ||
def compute_bound_task(input_data) -> str: | ||
# Simulate heavy compute that are not using multicore (not pandas/numpy etc) | ||
ans = 1 | ||
for i in range(1, 50000): | ||
ans = ans * i | ||
return "dummy" | ||
|
||
|
||
def create_data_catalog(): | ||
""" | ||
Use dataset factory pattern to make sure the benchmark cover the slowest path. | ||
""" | ||
catalog_conf = """ | ||
|
||
'output_{pattern}': | ||
type: pandas.CSVDataset | ||
filepath: benchmarks/data/'{pattern}.csv' | ||
|
||
'numpy_{pattern}': | ||
type: pickle.PickleDataset | ||
filepath: benchmarks/data/'{pattern}.pkl' | ||
|
||
'{catch_all_dataset_pattern}': | ||
type: pandas.CSVDataset | ||
filepath: benchmarks/data/data.csv | ||
""" | ||
catalog_conf = yaml.safe_load(catalog_conf) | ||
catalog = DataCatalog.from_config(catalog_conf) | ||
return catalog | ||
|
||
|
||
def create_io_bound_node(inputs=None, outputs=None, name=None): | ||
io_node = node(io_bound_task, inputs=inputs, outputs=outputs, name=name) | ||
return io_node | ||
|
||
|
||
def create_io_bound_pipeline(): | ||
dummy_pipeline = pipeline( | ||
[ | ||
create_io_bound_node("dummy_1", "output_1"), | ||
create_io_bound_node("dummy_2", "output_2"), | ||
create_io_bound_node("dummy_3", "output_3"), | ||
create_io_bound_node("dummy_4", "output_4"), | ||
create_io_bound_node("dummy_5", "output_5"), | ||
create_io_bound_node("dummy_6", "output_6"), | ||
create_io_bound_node("dummy_7", "output_7"), | ||
create_io_bound_node("dummy_1", "output_8"), | ||
create_io_bound_node("dummy_1", "output_9"), | ||
create_io_bound_node("dummy_1", "output_10"), | ||
] | ||
) | ||
return dummy_pipeline | ||
|
||
|
||
def create_compute_bound_node(inputs=None, outputs=None, name=None): | ||
io_node = node(compute_bound_task, inputs=inputs, outputs=outputs, name=name) | ||
return io_node | ||
|
||
|
||
def create_compute_bound_pipeline(): | ||
dummy_pipeline = pipeline( | ||
[ | ||
create_compute_bound_node("dummy_1", "numpy_1"), | ||
create_compute_bound_node("dummy_2", "numpy_2"), | ||
create_compute_bound_node("dummy_3", "numpy_3"), | ||
create_compute_bound_node("dummy_4", "numpy_4"), | ||
create_compute_bound_node("dummy_5", "numpy_5"), | ||
create_compute_bound_node("dummy_6", "numpy_6"), | ||
create_compute_bound_node("dummy_7", "numpy_7"), | ||
create_compute_bound_node("dummy_1", "numpy_8"), | ||
create_compute_bound_node("dummy_1", "numpy_9"), | ||
create_compute_bound_node("dummy_1", "numpy_10"), | ||
] | ||
) | ||
return dummy_pipeline | ||
|
||
|
||
class RunnerMemorySuite: | ||
params = [SequentialRunner, ThreadRunner, ParallelRunner] | ||
param_names = ["runner"] | ||
|
||
def setup(self, *args, **kwargs): | ||
data_dir = Path("benchmarks/data") | ||
data_dir.mkdir(exist_ok=True, parents=True) | ||
|
||
# Create a dummy csv | ||
with open(data_dir / "data.csv", "w") as f: | ||
f.write("col1,col2\n1,2\n") | ||
|
||
def mem_runners(self, runner): | ||
catalog = create_data_catalog() | ||
test_pipeline = create_compute_bound_pipeline() | ||
runner_obj = runner() | ||
runner_obj.run(test_pipeline, catalog=catalog) | ||
|
||
def peakmem_runners(self, runner): | ||
catalog = create_data_catalog() | ||
test_pipeline = create_compute_bound_pipeline() | ||
runner_obj = runner() | ||
runner_obj.run(test_pipeline, catalog=catalog) | ||
|
||
|
||
class RunnerTimeSuite: | ||
def setup(self, *args, **kwargs): | ||
data_dir = Path("benchmarks/data") | ||
data_dir.mkdir(exist_ok=True, parents=True) | ||
|
||
# Create a dummy csv | ||
with open(data_dir / "data.csv", "w") as f: | ||
f.write("col1,col2\n1,2\n") | ||
|
||
def time_sequential_runner(self): | ||
catalog = create_data_catalog() | ||
test_pipeline = create_compute_bound_pipeline() | ||
runner_obj = SequentialRunner() | ||
runner_obj.run(test_pipeline, catalog=catalog) | ||
|
||
def time_parallel_runner(self): | ||
"""compute bound pipeline""" | ||
catalog = create_data_catalog() | ||
test_pipeline = create_compute_bound_pipeline() | ||
runner_obj = ParallelRunner() | ||
runner_obj.run(test_pipeline, catalog=catalog) | ||
|
||
def time_thread_runner(self): | ||
"""IO bound pipeline""" | ||
catalog = create_data_catalog() | ||
test_pipeline = create_io_bound_pipeline() | ||
runner_obj = ThreadRunner() | ||
runner_obj.run(test_pipeline, catalog=catalog) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to revert before merge.