-
Notifications
You must be signed in to change notification settings - Fork 302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add suggestion bundle support #3041
base: master
Are you sure you want to change the base?
Conversation
Code Review Agent Run #411aadActionable Suggestions - 3
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
Code Review Agent Run #c016c8Actionable Suggestions - 3
Additional Suggestions - 1
Review Details
|
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #3041 +/- ##
=======================================
Coverage 82.79% 82.79%
=======================================
Files 3 3
Lines 186 186
=======================================
Hits 154 154
Misses 32 32 ☔ View full report in Codecov by Sentry. |
Code Review Agent Run #b3b166Actionable Suggestions - 0Additional Suggestions - 1
Review Details
|
Code Review Agent Run #af2f75Actionable Suggestions - 1
Additional Suggestions - 1
Review Details
|
Code Review Agent Run #5bfe4bActionable Suggestions - 1
Review Details
|
Code Review Agent Run #b54cd9Actionable Suggestions - 0Review Details
|
Code Review Agent Run #7205faActionable Suggestions - 0Review Details
|
Another example: import asyncio
import flytekit as fl
import sklearn
from sklearn.model_selection import train_test_split
import xgboost as xgb
import numpy as np
import optuna
import plotly
from flytekitplugins.optuna import Optimizer, suggest
image = fl.ImageSpec(
builder="union",
packages=[
"flytekit==1.15.0b0",
"optuna>=4.0.0",
"scikit-learn>=1.6.0",
"xgboost>=2.1.3",
"plotly>=5.24.1",
],
)
@fl.task(container_image=image)
async def objective(params: dict[str, int | float | str]) -> float:
(data, target) = sklearn.datasets.load_breast_cancer(return_X_y=True)
train_x, valid_x, train_y, valid_y = train_test_split(data, target, test_size=0.25)
dtrain = xgb.DMatrix(train_x, label=train_y)
dvalid = xgb.DMatrix(valid_x, label=valid_y)
bst = xgb.train(params, dtrain)
valid_yhat = bst.predict(dvalid)
accuracy = sklearn.metrics.accuracy_score(valid_y, np.rint(valid_yhat))
return accuracy
@fl.eager(container_image=image)
async def train(concurrency: int, n_trials: int):
study = optuna.create_study(direction="maximize")
optimizer = Optimizer(objective, concurrency, n_trials, study)
params = {
"lambda": suggest.float(1e-8, 1.0, log=True),
"alpha": suggest.float(1e-8, 1.0, log=True),
"subsample": suggest.float(0.2, 1.0),
"colsample_bytree": suggest.float(0.2, 1.0),
"max_depth": suggest.integer(3, 9, step=2),
"min_child_weight": suggest.integer(2, 10),
"eta": suggest.float(1e-8, 1.0, log=True),
"gamma": suggest.float(1e-8, 1.0, log=True),
"grow_policy": suggest.category(["depthwise", "lossguide"]),
"sample_type": suggest.category(["uniform", "weighted"]),
"normalize_type": suggest.category(["tree", "forest"]),
"rate_drop": suggest.float(1e-8, 1.0, log=True),
"skip_drop": suggest.float(1e-8, 1.0, log=True),
"objective": "binary:logistic",
"tree_method": "exact",
"booster": "dart",
}
await optimizer(params=params) |
In some extreme cases, it is useful to be able to define suggestions programmatically. However, this cannot happen within an objective task, because we need to tell the Study what each task is doing. Therefore, users should be able to define a This is complicated but I think it is the best solution. It takes advantage of Flyte's static typing and enables caching too. import asyncio
from typing import Any
import flytekit as fl
import sklearn
from sklearn.model_selection import train_test_split
import xgboost as xgb
import numpy as np
import optuna
import plotly
from optimizer import Optimizer
image = fl.ImageSpec(
builder="union",
packages=[
"flytekit==1.15.0b0",
"optuna>=4.0.0",
"scikit-learn>=1.6.0",
"xgboost>=2.1.3",
"plotly>=5.24.1",
],
)
@fl.task(container_image=image)
async def objective(params: dict[str, int | float | str]) -> float:
(data, target) = sklearn.datasets.load_breast_cancer(return_X_y=True)
train_x, valid_x, train_y, valid_y = train_test_split(data, target, test_size=0.25)
dtrain = xgb.DMatrix(train_x, label=train_y)
dvalid = xgb.DMatrix(valid_x, label=valid_y)
bst = xgb.train(params, dtrain)
valid_yhat = bst.predict(dvalid)
accuracy = sklearn.metrics.accuracy_score(valid_y, np.rint(valid_yhat))
print(accuracy)
return accuracy
@fl.eager(container_image=image)
async def train(concurrency: int, n_trials: int):
study = optuna.create_study(direction="maximize")
def callback(trial: optuna.Trial, inputs: dict[str, Any]) -> dict[str, Any]:
params = {
"verbosity": 0,
"objective": "binary:logistic",
# use exact for small dataset.
"tree_method": "exact",
# defines booster, gblinear for linear functions.
"booster": trial.suggest_categorical("booster", ["gbtree", "gblinear", "dart"]),
# L2 regularization weight.
"lambda": trial.suggest_float("lambda", 1e-8, 1.0, log=True),
# L1 regularization weight.
"alpha": trial.suggest_float("alpha", 1e-8, 1.0, log=True),
# sampling ratio for training data.
"subsample": trial.suggest_float("subsample", 0.2, 1.0),
# sampling according to each tree.
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.2, 1.0),
}
if params["booster"] in ["gbtree", "dart"]:
# maximum depth of the tree, signifies complexity of the tree.
params["max_depth"] = trial.suggest_int("max_depth", 3, 9, step=2)
# minimum child weight, larger the term more conservative the tree.
params["min_child_weight"] = trial.suggest_int("min_child_weight", 2, 10)
params["eta"] = trial.suggest_float("eta", 1e-8, 1.0, log=True)
# defines how selective algorithm is.
params["gamma"] = trial.suggest_float("gamma", 1e-8, 1.0, log=True)
params["grow_policy"] = trial.suggest_categorical("grow_policy", ["depthwise", "lossguide"])
if params["booster"] == "dart":
params["sample_type"] = trial.suggest_categorical("sample_type", ["uniform", "weighted"])
params["normalize_type"] = trial.suggest_categorical("normalize_type", ["tree", "forest"])
params["rate_drop"] = trial.suggest_float("rate_drop", 1e-8, 1.0, log=True)
params["skip_drop"] = trial.suggest_float("skip_drop", 1e-8, 1.0, log=True)
return dict(params=params)
optimizer = Optimizer(objective, concurrency, n_trials, study, callback)
await optimizer()
show("History", optuna.visualization.plot_optimization_history(optimizer.study))
show("Timeline", optuna.visualization.plot_timeline(optimizer.study))
show("Coordinates", optuna.visualization.plot_parallel_coordinate(optimizer.study))
show("Importance", optuna.visualization.plot_param_importances(optimizer.study))
def show(name: str, fig: plotly.graph_objects.Figure):
html: str = plotly.io.to_html(
fig=fig,
config={
"scrollZoom": False,
"displayModeBar": False,
"doubleClick": "reset",
},
)
fl.Deck(name, html)
if __name__ == "__main__":
loss = asyncio.run(train(concurrency=1, n_trials=10))
print(loss) |
Code Review Agent Run #a3ee63Actionable Suggestions - 0Review Details
|
Code Review Agent Run #74ccb6Actionable Suggestions - 0Review Details
|
Code Review Agent Run #bf90adActionable Suggestions - 5
Review Details
|
if self.callback is not None: | ||
promise = self.callback(trial=trial, **inputs) | ||
else: | ||
promise = self.objective(**process(trial, inputs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider extracting the trial execution logic into a separate method to improve code organization and reusability. The conditional logic for handling callback vs direct objective execution could be cleaner in its own method.
Code suggestion
Check the AI-generated fix before applying
if self.callback is not None: | |
promise = self.callback(trial=trial, **inputs) | |
else: | |
promise = self.objective(**process(trial, inputs)) | |
promise = await self._execute_trial(trial, inputs) | |
async def _execute_trial(self, trial: optuna.Trial, inputs: dict[str, Any]) -> Awaitable[Union[float, tuple[float, ...]]]: | |
if self.callback is not None: | |
return await self.callback(trial=trial, **inputs) | |
return await self.objective(**process(trial, inputs)) |
Code Review Run #bf90ad
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
|
||
study = optuna.create_study(direction="maximize") | ||
|
||
optimizer = Optimizer(concurrency=concurrency, n_trials=n_trials, study=study, callback=callback) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider passing the objective
function directly to the Optimizer
constructor. The current implementation relies on the callback to invoke the objective function which may not be the intended pattern.
Code suggestion
Check the AI-generated fix before applying
optimizer = Optimizer(concurrency=concurrency, n_trials=n_trials, study=study, callback=callback) | |
optimizer = Optimizer(objective=objective, concurrency=concurrency, n_trials=n_trials, study=study, callback=callback) |
Code Review Run #bf90ad
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
concurrency: int | ||
n_trials: int | ||
study: Optional[optuna.Study] = None | ||
objective: Optional[Union[PythonFunctionTask, PythonFunctionWorkflow]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The removal of the required objective
field from the class definition may cause issues since it's used in the __post_init__
method. Consider keeping it as required field or updating the initialization logic.
Code suggestion
Check the AI-generated fix before applying
@@ -55,6 +55,7 @@
class Optimizer:
concurrency: int
n_trials: int
+ objective: Union[PythonFunctionTask, PythonFunctionWorkflow]
study: Optional[optuna.Study] = None
callback: Optional[Callable[Concatenate[optuna.Trial, P], Awaitable[Union[float, tuple[float, ...]]]]] = None
Code Review Run #bf90ad
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
if isinstance(self.objective, PythonFunctionTask): | ||
func = self.objective.task_function | ||
elif isinstance(self.objective, PythonFunctionWorkflow): | ||
func = self.objective._workflow_function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using public interface instead of accessing private member '_workflow_function'
Code suggestion
Check the AI-generated fix before applying
func = self.objective._workflow_function | |
func = self.objective.workflow_function |
Code Review Run #bf90ad
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Why are the changes needed?
I recently created the
flytekitplugins.optuna
plugin.This PR adds support to bundle all of the suggestions into any number of dictionaries.
This is for the cases in which one wants to add dozens or more hyperparameters. Adding each of them as a type function input is pretty exhausting. In our case, we can collect all of these suggestions into any number of dictionaries at runtime.
What changes were proposed in this pull request?
I add support for bundled suggestions to
flytekitplugins.optuna
. Users should provide any kwargs of typedict[str, Suggestion|Any]
.How was this patch tested?
I have added an example and unit tests for the new bundling functionality.
Setup process
Screenshots
Check all the applicable boxes
Summary by Bito
Enhanced flytekit-optuna plugin with bundled hyperparameter suggestions and improved type safety. Implemented dictionary-based parameter passing and callback mechanism for better management. Modified Optimizer class with instance-based suggest method and parameter organization using parent-child relationships. Added proper handling of optimizer's best value and streamlined multiple hyperparameter handling through runtime dictionary collection.Unit tests added: True
Estimated effort to review (1-5, lower is better): 2