-
Notifications
You must be signed in to change notification settings - Fork 302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Flytekit] Separate remote signal functions #2933
base: master
Are you sure you want to change the base?
[Flytekit] Separate remote signal functions #2933
Conversation
Signed-off-by: mao3267 <chenvincent610@gmail.com>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2933 +/- ##
==========================================
- Coverage 82.79% 74.79% -8.01%
==========================================
Files 3 202 +199
Lines 186 21377 +21191
Branches 0 2745 +2745
==========================================
+ Hits 154 15989 +15835
- Misses 32 4615 +4583
- Partials 0 773 +773 ☔ View full report in Codecov by Sentry. |
Signed-off-by: mao3267 <chenvincent610@gmail.com>
Signed-off-by: mao3267 <chenvincent610@gmail.com>
Signed-off-by: mao3267 <chenvincent610@gmail.com>
…t/#3459-flyteremote-signal-enhance
…t/#3459-flyteremote-signal-enhance
Signed-off-by: mao3267 <chenvincent610@gmail.com>
Code Review Agent Run Status
|
execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]}) | ||
sleep(5) | ||
remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING)) | ||
sleep(10) | ||
remote.approve("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN) | ||
remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) | ||
assert execution.outputs["o0"] == {"title": "my report", "data": [1.0, 2.0, 3.0, 4.0, 5.0]} | ||
|
||
with pytest.raises(FlyteAssertion, match="Outputs could not be found because the execution ended in failure"): | ||
execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]}) | ||
sleep(5) | ||
remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING)) | ||
sleep(10) | ||
remote.reject("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, there is a 5–10-second delay between the remote execution and the signal being stored in the database. Therefore, we now introduce a 5–10-second delay before each signal operation in the integration test.
this is what @mao3267 said.
Do you think there's a better way to know the signal is callable, which means ' approve,
set_input, and
reject` can be called?
…t/#3459-flyteremote-signal-enhance Signed-off-by: mao3267 <chenvincent610@gmail.com>
Code Review Agent Run Status
|
Code Review Agent Run #8b91e1Actionable Suggestions - 4
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
def approve(self, signal_id: str, execution_name: str, project: str = None, domain: str = None): | ||
""" | ||
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call. | ||
:param execution_name: The name of the execution. This is the tail-end of the URL when looking | ||
at the workflow execution. | ||
:param project: The execution project, will default to the Remote's default project. | ||
:param domain: The execution domain, will default to the Remote's default domain. | ||
""" | ||
|
||
wf_exec_id = WorkflowExecutionIdentifier( | ||
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name | ||
) | ||
|
||
lt = TypeEngine.to_literal_type(bool) | ||
true_literal = TypeEngine.to_literal(self.context, True, bool, lt) | ||
|
||
req = SignalSetRequest( | ||
id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=true_literal.to_flyte_idl() | ||
) | ||
|
||
# Response is empty currently, nothing to give back to the user. | ||
self.client.set_signal(req) | ||
|
||
def reject(self, signal_id: str, execution_name: str, project: str = None, domain: str = None): | ||
""" | ||
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call. | ||
:param execution_name: The name of the execution. This is the tail-end of the URL when looking | ||
at the workflow execution. | ||
:param project: The execution project, will default to the Remote's default project. | ||
:param domain: The execution domain, will default to the Remote's default domain. | ||
""" | ||
|
||
wf_exec_id = WorkflowExecutionIdentifier( | ||
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name | ||
) | ||
|
||
lt = TypeEngine.to_literal_type(bool) | ||
false_literal = TypeEngine.to_literal(self.context, False, bool, lt) | ||
|
||
req = SignalSetRequest( | ||
id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=false_literal.to_flyte_idl() | ||
) | ||
|
||
# Response is empty currently, nothing to give back to the user. | ||
self.client.set_signal(req) | ||
|
||
def set_input( | ||
self, | ||
signal_id: str, | ||
execution_name: str, | ||
value: typing.Union[literal_models.Literal, typing.Any], | ||
project=None, | ||
domain=None, | ||
python_type=None, | ||
literal_type=None, | ||
): | ||
""" | ||
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call. | ||
:param execution_name: The name of the execution. This is the tail-end of the URL when looking | ||
at the workflow execution. | ||
:param value: This is either a Literal or a Python value which FlyteRemote will invoke the TypeEngine to | ||
convert into a Literal. This argument is only value for wait_for_input type signals. | ||
:param project: The execution project, will default to the Remote's default project. | ||
:param domain: The execution domain, will default to the Remote's default domain. | ||
:param python_type: Provide a python type to help with conversion if the value you provided is not a Literal. | ||
:param literal_type: Provide a Flyte literal type to help with conversion if the value you provided | ||
is not a Literal | ||
""" | ||
|
||
wf_exec_id = WorkflowExecutionIdentifier( | ||
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name | ||
) | ||
if isinstance(value, Literal): | ||
logger.debug(f"Using provided {value} as existing Literal value") | ||
lit = value | ||
else: | ||
lt = literal_type or ( | ||
TypeEngine.to_literal_type(python_type) if python_type else TypeEngine.to_literal_type(type(value)) | ||
) | ||
lit = TypeEngine.to_literal(self.context, value, python_type or type(value), lt) | ||
logger.debug(f"Converted {value} to literal {lit} using literal type {lt}") | ||
|
||
req = SignalSetRequest(id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=lit.to_flyte_idl()) | ||
|
||
# Response is empty currently, nothing to give back to the user. | ||
self.client.set_signal(req) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider consolidating the signal handling methods approve()
, reject()
, and set_input()
into a single method with a signal type parameter, as they share similar code structure and functionality. This would reduce code duplication and improve maintainability.
Code suggestion
Check the AI-generated fix before applying
@@ -635,87 +635,108 @@
+ def set_signal_value(self, signal_id: str, execution_name: str, value: typing.Union[literal_models.Literal, typing.Any],
+ project: str = None, domain: str = None, python_type = None, literal_type = None):
+ wf_exec_id = WorkflowExecutionIdentifier(
+ project=project or self.default_project, domain=domain or self.default_domain, name=execution_name
+ )
+ if isinstance(value, Literal):
+ logger.debug(f"Using provided {value} as existing Literal value")
+ lit = value
+ else:
+ lt = literal_type or (TypeEngine.to_literal_type(python_type) if python_type else TypeEngine.to_literal_type(type(value)))
+ lit = TypeEngine.to_literal(self.context, value, python_type or type(value), lt)
+ logger.debug(f"Converted {value} to literal {lit} using literal type {lt}")
+
+ req = SignalSetRequest(id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=lit.to_flyte_idl())
+ self.client.set_signal(req)
+
+ def approve(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
+ self.set_signal_value(signal_id, execution_name, True, project, domain, bool)
+
+ def reject(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
+ self.set_signal_value(signal_id, execution_name, False, project, domain, bool)
+
+ def set_input(self, signal_id: str, execution_name: str, value: typing.Union[literal_models.Literal, typing.Any],
+ project=None, domain=None, python_type=None, literal_type=None):
+ self.set_signal_value(signal_id, execution_name, value, project, domain, python_type, literal_type)
Code Review Run #8b91e1
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
def set_input( | ||
self, | ||
signal_id: str, | ||
execution_name: str, | ||
value: typing.Union[literal_models.Literal, typing.Any], | ||
project=None, | ||
domain=None, | ||
python_type=None, | ||
literal_type=None, | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 'set_input()' method has too many parameters (7 > 5) which makes it harder to use and maintain. Consider refactoring using a configuration object pattern.
Code suggestion
Check the AI-generated fix before applying
def set_input( | |
self, | |
signal_id: str, | |
execution_name: str, | |
value: typing.Union[literal_models.Literal, typing.Any], | |
project=None, | |
domain=None, | |
python_type=None, | |
literal_type=None, | |
): | |
@dataclass | |
class SetInputConfig: | |
signal_id: str | |
execution_name: str | |
value: typing.Union[literal_models.Literal, typing.Any] | |
project: typing.Optional[str] = None | |
domain: typing.Optional[str] = None | |
python_type: typing.Optional[type] = None | |
literal_type: typing.Optional[type_models.LiteralType] = None | |
def set_input(self, config: SetInputConfig) -> None: |
Code Review Run #8b91e1
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]}) | ||
sleep(5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a more reliable synchronization mechanism instead of hardcoded sleep()
calls. The test may be flaky since it depends on timing.
Code suggestion
Check the AI-generated fix before applying
execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]}) | |
sleep(5) | |
execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]}) | |
max_retries = 10 | |
retry_delay = 1 | |
for _ in range(max_retries): | |
try: | |
remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING)) | |
break | |
except Exception: | |
sleep(retry_delay) | |
retry_delay *= 2 |
Code Review Run #8b91e1
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
approve_node = approve(upstream_item=title_input, name="review-passes", timeout=timedelta(hours=1)) | ||
title_input >> approve_node | ||
# This conditional returns the finalized report if the review passes, | ||
# otherwise it returns an invalid report output. | ||
return reporting_wf(title_input, data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approve_node
is created but its result is not used in the workflow logic. Consider using the approval result in the conditional logic to determine whether to proceed with report generation.
Code suggestion
Check the AI-generated fix before applying
approve_node = approve(upstream_item=title_input, name="review-passes", timeout=timedelta(hours=1)) | |
title_input >> approve_node | |
# This conditional returns the finalized report if the review passes, | |
# otherwise it returns an invalid report output. | |
return reporting_wf(title_input, data) | |
approve_node = approve(upstream_item=title_input, name="review-passes", timeout=timedelta(hours=1)) | |
title_input >> approve_node | |
return conditional(approve_node).\ | |
if_().\ | |
then(reporting_wf(title_input=title_input, data=data)).\ | |
else_().\ | |
fail("Report was not approved") |
Code Review Run #8b91e1
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
…t/#3459-flyteremote-signal-enhance Signed-off-by: mao3267 <chenvincent610@gmail.com>
Signed-off-by: mao3267 <chenvincent610@gmail.com>
Code Review Agent Run #994a15Actionable Suggestions - 0Additional Suggestions - 10
Review Details
|
Signed-off-by: mao3267 <chenvincent610@gmail.com>
Signed-off-by: mao3267 <chenvincent610@gmail.com>
Code Review Agent Run #ebca5eActionable Suggestions - 1
Review Details
|
max_retries = 10 | ||
|
||
for _ in range(max_retries): | ||
try: | ||
remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING)) | ||
break | ||
except Exception as e: | ||
sleep(1) | ||
continue | ||
for _ in range(max_retries): | ||
try: | ||
remote.approve("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN) | ||
break | ||
except Exception as e: | ||
sleep(1) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider extracting the retry logic into a reusable helper function since the same pattern is repeated multiple times. This could help reduce code duplication and improve maintainability.
Code suggestion
Check the AI-generated fix before applying
max_retries = 10 | |
for _ in range(max_retries): | |
try: | |
remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING)) | |
break | |
except Exception as e: | |
sleep(1) | |
continue | |
for _ in range(max_retries): | |
try: | |
remote.approve("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN) | |
break | |
except Exception as e: | |
sleep(1) | |
continue | |
def retry_operation(operation): | |
max_retries = 10 | |
for _ in range(max_retries): | |
try: | |
operation() | |
break | |
except Exception: | |
sleep(1) | |
retry_operation(lambda: remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING))) | |
retry_operation(lambda: remote.approve("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN)) |
Code Review Run #ebca5e
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: mao3267 <chenvincent610@gmail.com>
Code Review Agent Run #79075dActionable Suggestions - 1
Review Details
|
def retry_operation(operation): | ||
max_retries = 10 | ||
for _ in range(max_retries): | ||
try: | ||
operation() | ||
break | ||
except Exception: | ||
sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding a return value from retry_operation
to indicate success/failure. Currently, it silently continues after max retries which could mask failures.
Code suggestion
Check the AI-generated fix before applying
def retry_operation(operation): | |
max_retries = 10 | |
for _ in range(max_retries): | |
try: | |
operation() | |
break | |
except Exception: | |
sleep(1) | |
def retry_operation(operation) -> bool: | |
max_retries = 10 | |
for _ in range(max_retries): | |
try: | |
operation() | |
return True | |
except Exception: | |
sleep(1) | |
return False |
Code Review Run #79075d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Tracking issue
Closes flyteorg/flyte#3459
Why are the changes needed?
Currently, the
set_signal
function can be used to handle three distinct actions: approving, rejecting, and providing input for gates. However, this implementation is not intuitive for users. To enhance clarity and usability, we propose splitting theset_signal
function into three separate functions:approve
,reject
, andset_input
. This change will make the functionality more straightforward and user-friendly during coding.What changes were proposed in this pull request?
approve
,reject
, andset_input
, providing certain signals or values for gates.set_input
,approve
,reject
) in the integration test.How was this patch tested?
test_signal_approve_reject
are added.Setup process
Screenshots
tests/flytekit/integration/remote/test_remote.py
Check all the applicable boxes
Related PRs
None
Docs link
TODO
Summary by Bito
This PR enhances FlyteRemote with improved signal handling and retry mechanisms. It implements a configurable retry mechanism with exponential backoff, replacing fixed delays and consolidating duplicate retry loops into a single retry_operation function. The changes include VLLM integration for HuggingFace model serving, Optuna integration for hyperparameter optimization, and a new Environment class. These improvements enhance code maintainability and system robustness.Unit tests added: True
Estimated effort to review (1-5, lower is better): 5