Skip to content

Commit

Permalink
Use multiprocessing for WOFOST
Browse files Browse the repository at this point in the history
  • Loading branch information
burggraaff committed Mar 18, 2024
1 parent ec3e2f1 commit 22006a2
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 179 deletions.
2 changes: 1 addition & 1 deletion fpcup/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from . import aggregate, constants, geo, io, plotting, settings
from . import agro, crop, site, soil, weather
from .model import run_pcse_single, run_pcse_ensemble
from .model import run_pcse_single
from .settings import DEFAULT_DATA, DEFAULT_OUTPUT, DEFAULT_RESULTS
from .tools import RUNNING_IN_IPYTHON
57 changes: 57 additions & 0 deletions fpcup/_bundle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
Legacy code for bundling parameters.
Probably won't be re-used.
"""
from datetime import date, datetime
from itertools import product
from multiprocessing import Pool
from pathlib import Path

import geopandas as gpd
gpd.options.io_engine = "pyogrio"
from pyogrio import read_dataframe as read_geodataframe, write_dataframe as write_geodataframe
import pandas as pd
import shapely
from tqdm import tqdm

from pcse.base import MultiCropDataProvider, ParameterProvider, WeatherDataProvider
from pcse.exceptions import WeatherDataProviderError
from pcse.fileinput import CABOFileReader
from pcse.models import Engine, Wofost72_WLP_FD
from pcse.util import _GenericSiteDataProvider as PCSESiteDataProvider

from ._typing import Callable, Coordinates, Iterable, Optional, PathOrStr
from .agro import AgromanagementData
from .constants import CRS_AMERSFOORT
from .model import RunData, RunDataBRP
from .soil import SoilType
from .tools import make_iterable


def bundle_parameters(sitedata: PCSESiteDataProvider | Iterable[PCSESiteDataProvider],
soildata: CABOFileReader | Iterable[CABOFileReader],
cropdata: MultiCropDataProvider | Iterable[MultiCropDataProvider],
weatherdata: WeatherDataProvider | Iterable[WeatherDataProvider],
agromanagementdata: AgromanagementData | Iterable[AgromanagementData]) -> tuple[Iterable[RunData], int | None]:
"""
Bundle the site, soil, and crop parameters into PCSE ParameterProvider objects.
For the main parameters, a Cartesian product is used to get all their combinations.
"""
# Make sure the data are iterable
sitedata_iter = make_iterable(sitedata, exclude=[PCSESiteDataProvider])
soildata_iter = make_iterable(soildata, exclude=[CABOFileReader])
cropdata_iter = make_iterable(cropdata, exclude=[MultiCropDataProvider])
weatherdata_iter = make_iterable(weatherdata, exclude=[WeatherDataProvider])
agromanagementdata_iter = make_iterable(agromanagementdata, exclude=[AgromanagementData])

# Determine the total number of parameter combinations, if possible
try:
n = len(sitedata_iter) * len(soildata_iter) * len(cropdata_iter) * len(weatherdata_iter) * len(agromanagementdata_iter)
except TypeError:
n = None

# Combine everything
combined_parameters = product(sitedata_iter, soildata_iter, cropdata_iter, weatherdata_iter, agromanagementdata_iter)
rundata = (RunData(*params) for params in combined_parameters)

return rundata, n
1 change: 1 addition & 0 deletions fpcup/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Callable, Iterable, Optional, Type

from geopandas import GeoSeries
from pandas import Series
from shapely import Point, Polygon

# Combinations of built-in types
Expand Down
5 changes: 5 additions & 0 deletions fpcup/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Functions for file input and output.
"""
from functools import cache
from multiprocessing import Pool
from os import makedirs
from pathlib import Path

Expand Down Expand Up @@ -124,6 +125,10 @@ def load_ensemble_results_from_folder(folder: PathOrStr, run_ids: Optional[Itera

# Load the files with an optional progressbar
filenames = tqdm(filenames, total=n_results, desc="Loading outputs", unit="files", disable=not progressbar, leave=leave_progressbar)
# if n_results < 1000:
results = [Result.from_file(filename) for filename in filenames]
# else:
# with Pool() as p:
# results = list(p.imap_unordered(Result.from_file, filenames, chunksize=100))

return results
128 changes: 45 additions & 83 deletions fpcup/model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""
Functions that are useful
Classes, functions, constants relating to running the WOFOST model and processing its inputs/outputs.
"""
from datetime import date, datetime
from functools import partial
from itertools import product
from multiprocessing import Pool
from pathlib import Path
Expand All @@ -25,6 +26,9 @@
from .soil import SoilType
from .tools import make_iterable

# Constants
_THRESHOLD_PARALLEL_PCSE = 200

# Parameter names are from "A gentle introduction to WOFOST", De Wit & Boogaard 2021
parameter_names = {"DVS": "Crop development stage",
"LAI": "Leaf area index [ha/ha]",
Expand Down Expand Up @@ -261,6 +265,7 @@ def to_file(self, filename: PathOrStr, **kwargs) -> None:
write_geodataframe(self, filename, driver="GeoJSON", **kwargs)
self.set_index("run_id", inplace=True)


class Result(pd.DataFrame):
"""
Stores the results from a single PCSE run.
Expand Down Expand Up @@ -356,34 +361,6 @@ def to_file(self, output_directory: PathOrStr, *,
self.summary.to_file(filename_summary, **kwargs)


def bundle_parameters(sitedata: PCSESiteDataProvider | Iterable[PCSESiteDataProvider],
soildata: CABOFileReader | Iterable[CABOFileReader],
cropdata: MultiCropDataProvider | Iterable[MultiCropDataProvider],
weatherdata: WeatherDataProvider | Iterable[WeatherDataProvider],
agromanagementdata: AgromanagementData | Iterable[AgromanagementData]) -> tuple[Iterable[RunData], int | None]:
"""
Bundle the site, soil, and crop parameters into PCSE ParameterProvider objects.
For the main parameters, a Cartesian product is used to get all their combinations.
"""
# Make sure the data are iterable
sitedata_iter = make_iterable(sitedata, exclude=[PCSESiteDataProvider])
soildata_iter = make_iterable(soildata, exclude=[CABOFileReader])
cropdata_iter = make_iterable(cropdata, exclude=[MultiCropDataProvider])
weatherdata_iter = make_iterable(weatherdata, exclude=[WeatherDataProvider])
agromanagementdata_iter = make_iterable(agromanagementdata, exclude=[AgromanagementData])

# Determine the total number of parameter combinations, if possible
try:
n = len(sitedata_iter) * len(soildata_iter) * len(cropdata_iter) * len(weatherdata_iter) * len(agromanagementdata_iter)
except TypeError:
n = None

# Combine everything
combined_parameters = product(sitedata_iter, soildata_iter, cropdata_iter, weatherdata_iter, agromanagementdata_iter)
rundata = (RunData(*params) for params in combined_parameters)

return rundata, n

def run_pcse_single(run_data: RunData, *, model: Engine=Wofost72_WLP_FD) -> Result | None:
"""
Start a new PCSE model with the given inputs and run it until it terminates.
Expand All @@ -401,65 +378,50 @@ def run_pcse_single(run_data: RunData, *, model: Engine=Wofost72_WLP_FD) -> Resu

return output

def filter_ensemble_outputs(outputs: Iterable[Result | None], summary: Iterable[dict | None]) -> tuple[list[Result], list[dict], int]:
"""
Filter None and other incorrect entries.
"""
# Find entries that are None
valid_entries = [s is not None and o is not None for s, o in zip(summary, outputs)]
n_filtered_out = len(valid_entries) - sum(valid_entries)

# Apply the filter
outputs_filtered = [o for o, v in zip(outputs, valid_entries) if v]
summary_filtered = [s for s, v in zip(summary, valid_entries) if v]

return outputs_filtered, summary_filtered, n_filtered_out

def run_pcse_ensemble(all_runs: Iterable[RunData], nr_runs: Optional[int]=None, progressbar=True, leave_progressbar=True) -> tuple[list[Result], Summary]:
def run_pcse_ensemble(func_pcse: Callable, data_pcse: Iterable, *,
n: Optional[int]=None, chunksize: int=10,
unit: str="run",
verbose: bool=False) -> Iterable[RunData]:
"""
Run an entire PCSE ensemble.
all_runs is an iterator that zips the three model inputs (parameters, weatherdata, agromanagement) together, e.g.:
all_runs = product(parameters_combined, weatherdata, agromanagementdata)
`func_pcse` is a function that runs PCSE for a single instance of `data_pcse`.
For example, data_pcse might be a list of coordinates, for which func_pcse retrieves the corresponding site/weather data and then runs WOFOST.
Models are run in parallel if there are more than _THRESHOLD_PARALLEL_PCSE to run.
"""
# Run the models
outputs = [run_pcse_single(run_data) for run_data in tqdm(all_runs, total=nr_runs, desc="Running PCSE models", unit="runs", disable=not progressbar, leave=leave_progressbar)]

# Get the summaries
summaries_individual = [o.summary for o in outputs]

# Clean up the results
outputs, summaries_individual, n_filtered_out = filter_ensemble_outputs(outputs, summaries_individual)
if n_filtered_out > 0:
print(f"{n_filtered_out} runs failed.")

# Convert the summary to a Summary object (modified DataFrame)
summary = Summary.from_ensemble(summaries_individual)

return outputs, summary

def run_pcse_ensemble_parallel(all_runs: Iterable[RunData], nr_runs: Optional[int]=None, progressbar=True, leave_progressbar=True) -> tuple[list[Result], Summary]:
"""
Note: Very unstable!
Parallelised version of run_pcse_ensemble.
Run an entire PCSE ensemble at once.
all_runs is an iterator that zips the three model inputs (parameters, weatherdata, agromanagement) together, e.g.:
all_runs = product(parameters_combined, weatherdata, agromanagementdata)
"""
# Run the models
with Pool() as p:
# outputs = tqdm(p.map(run_pcse_single, all_runs, chunksize=3), total=nr_runs, desc="Running PCSE models", unit="runs", disable=not progressbar, leave=leave_progressbar)
outputs = p.map(run_pcse_single, all_runs, chunksize=3)
# Determine the number of inputs and set up a progressbar
if n is None:
try:
n = len(data_pcse)
except TypeError:
n = None
_tqdm_here = partial(tqdm, total=n, desc="Running models", unit=unit, leave=verbose)

# Determine whether to use multiprocessing
if n is None:
USE_MULTIPROCESSING = False
elif n < _THRESHOLD_PARALLEL_PCSE:
USE_MULTIPROCESSING = False
else:
USE_MULTIPROCESSING = True

# Get the summaries
summaries_individual = [o.summary for o in outputs]
# Actually run the model
if USE_MULTIPROCESSING:
with Pool() as p:
outputs = list(_tqdm_here(p.imap_unordered(func_pcse, data_pcse, chunksize=chunksize)))
else:
outputs = list(map(func_pcse, _tqdm_here(data_pcse)))

# Clean up the results
outputs, summaries_individual, n_filtered_out = filter_ensemble_outputs(outputs, summaries_individual)
if n_filtered_out > 0:
print(f"{n_filtered_out} runs failed.")
# Determine which runs failed / were skipped
failed_runs = [o for o in outputs if isinstance(o, RunData)]
if len(failed_runs) > 0:
print(f"Number of failed runs: {len(failed_runs)}/{n}")
else:
if verbose:
print("No runs failed.")

# Convert the summary to a Summary object (modified DataFrame)
summary = Summary.from_ensemble(summaries_individual)
skipped_runs = [o for o in outputs if o is False]
if verbose:
print(f"Number of skipped runs: {len(skipped_runs)}/{n}")

return outputs, summary
return failed_runs
5 changes: 4 additions & 1 deletion fpcup/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from .constants import CRS_AMERSFOORT, WGS84
from .geo import area, _generate_random_point_in_geometry, _generate_random_point_in_geometry_batch, coverage_of_bounding_box, transform_geometry

# Constants
_THRESHOLD_PARALLEL_SITES = 1000


def example(*args, **kwargs) -> PCSESiteDataProvider:
"""
Expand Down Expand Up @@ -113,7 +116,7 @@ def generate_sites_in_province(province: str, n: int, *,
geometry_iterable = tqdm([geometry] * n, total=n, desc="Generating sites", unit="site", disable=not progressbar, leave=leave_progressbar)

# Generate points - single process for small batches, multiprocessing for large batches
if n < 1000:
if n < _THRESHOLD_PARALLEL_SITES:
points = map(_generate_random_point_in_geometry, geometry_iterable)
else:
with Pool() as p:
Expand Down
30 changes: 8 additions & 22 deletions test/batch_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,21 @@
Example:
%run test/batch_locations.py -v -n 1000 -p Zeeland
"""
import argparse
from multiprocessing import Pool
from pathlib import Path

from tqdm import tqdm

import fpcup

### Parse command line arguments
import argparse
parser = argparse.ArgumentParser(description="Run a PCSE ensemble for multiple location with one sowing date.")
parser.add_argument("-d", "--data_dir", help="folder to load PCSE data from", type=Path, default=fpcup.settings.DEFAULT_DATA)
parser.add_argument("-o", "--output_dir", help="folder to save PCSE outputs to", type=Path, default=fpcup.settings.DEFAULT_OUTPUT / "locations")
parser.add_argument("-d", "--data_dir", help="folder to load PCSE data from", type=fpcup.io.Path, default=fpcup.settings.DEFAULT_DATA)
parser.add_argument("-o", "--output_dir", help="folder to save PCSE outputs to", type=fpcup.io.Path, default=fpcup.settings.DEFAULT_OUTPUT / "locations")
parser.add_argument("-n", "--number", help="number of locations; result may be lower due to rounding", type=int, default=400)
parser.add_argument("-p", "--province", help="province to simulate plots in (or all)", default="Netherlands", choices=fpcup.geo.NAMES, type=fpcup.geo.process_input_province)
parser.add_argument("-v", "--verbose", help="increase output verbosity", action="store_true")
args = parser.parse_args()

args.SINGLE_PROVINCE = (args.province != "Netherlands")


### Load constants
crs = fpcup.constants.WGS84
soildata = fpcup.soil.soil_types["ec3"]
Expand All @@ -31,11 +27,11 @@


### Worker function; this runs PCSE once for one site
def run_pcse(coordinates: fpcup._typing.Coordinates) -> bool | str:
def run_pcse(coordinates: fpcup._typing.Coordinates) -> bool | fpcup.model.RunData:
"""
For a single site, get the site-dependent data, wrap it into a RunData object, and run PCSE on it.
Returns True if the results were succesfully written to file.
Returns a str (the run_id) if the run failed.
Returns the corresponding RunData if a run failed.
"""
# Get site data
sitedata = fpcup.site.example(coordinates)
Expand All @@ -54,7 +50,7 @@ def run_pcse(coordinates: fpcup._typing.Coordinates) -> bool | str:
output.to_file(args.output_dir)
# If the run failed, saving to file will also fail, so we instead note that this run failed
except AttributeError:
return run.run_id
return run
else:
return True

Expand All @@ -79,17 +75,7 @@ def run_pcse(coordinates: fpcup._typing.Coordinates) -> bool | str:
print(f"Generated {len(coords)} coordinates")

### Run the model
with Pool() as p:
runs_successful = list(tqdm(p.imap(run_pcse, coords, chunksize=25), total=len(coords), desc="Running models", unit="site"))

# Feedback on failed runs: if any failed, let the user know. If none failed, only let the user know in verbose mode.
failed_runs = [run_id for run_id in runs_successful if isinstance(run_id, str)]
print()
if len(failed_runs) > 0:
print(f"Number of failed runs: {len(failed_runs)}/{len(coords)}")
else:
if args.verbose:
print("No runs failed.")
failed_runs = fpcup.model.run_pcse_ensemble(run_pcse, coords, unit="site", chunksize=25, verbose=args.verbose)

# Save an ensemble summary
fpcup.io.save_ensemble_summary(args.output_dir, verbose=args.verbose)
Loading

0 comments on commit 22006a2

Please sign in to comment.