-
Notifications
You must be signed in to change notification settings - Fork 302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Flytekit] Separate remote signal functions #2933
base: master
Are you sure you want to change the base?
Changes from 7 commits
6bda0f8
fca748d
a615263
a7dc1fb
7c37562
779e3de
590a054
997f918
11b746e
aba3a64
97a0921
58c2e91
f167305
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -632,6 +632,93 @@ def list_signals( | |
s = resp.signals | ||
return s | ||
|
||
def approve(self, signal_id: str, execution_name: str, project: str = None, domain: str = None): | ||
""" | ||
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call. | ||
:param execution_name: The name of the execution. This is the tail-end of the URL when looking | ||
at the workflow execution. | ||
:param project: The execution project, will default to the Remote's default project. | ||
:param domain: The execution domain, will default to the Remote's default domain. | ||
""" | ||
|
||
wf_exec_id = WorkflowExecutionIdentifier( | ||
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name | ||
) | ||
|
||
lt = TypeEngine.to_literal_type(bool) | ||
true_literal = TypeEngine.to_literal(self.context, True, bool, lt) | ||
|
||
req = SignalSetRequest( | ||
id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=true_literal.to_flyte_idl() | ||
) | ||
|
||
# Response is empty currently, nothing to give back to the user. | ||
self.client.set_signal(req) | ||
|
||
def reject(self, signal_id: str, execution_name: str, project: str = None, domain: str = None): | ||
""" | ||
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call. | ||
:param execution_name: The name of the execution. This is the tail-end of the URL when looking | ||
at the workflow execution. | ||
:param project: The execution project, will default to the Remote's default project. | ||
:param domain: The execution domain, will default to the Remote's default domain. | ||
""" | ||
|
||
wf_exec_id = WorkflowExecutionIdentifier( | ||
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name | ||
) | ||
|
||
lt = TypeEngine.to_literal_type(bool) | ||
false_literal = TypeEngine.to_literal(self.context, False, bool, lt) | ||
|
||
req = SignalSetRequest( | ||
id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=false_literal.to_flyte_idl() | ||
) | ||
|
||
# Response is empty currently, nothing to give back to the user. | ||
self.client.set_signal(req) | ||
|
||
def set_input( | ||
self, | ||
signal_id: str, | ||
execution_name: str, | ||
value: typing.Union[literal_models.Literal, typing.Any], | ||
project=None, | ||
domain=None, | ||
python_type=None, | ||
literal_type=None, | ||
): | ||
""" | ||
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call. | ||
:param execution_name: The name of the execution. This is the tail-end of the URL when looking | ||
at the workflow execution. | ||
:param value: This is either a Literal or a Python value which FlyteRemote will invoke the TypeEngine to | ||
convert into a Literal. This argument is only value for wait_for_input type signals. | ||
:param project: The execution project, will default to the Remote's default project. | ||
:param domain: The execution domain, will default to the Remote's default domain. | ||
:param python_type: Provide a python type to help with conversion if the value you provided is not a Literal. | ||
:param literal_type: Provide a Flyte literal type to help with conversion if the value you provided | ||
is not a Literal | ||
""" | ||
|
||
wf_exec_id = WorkflowExecutionIdentifier( | ||
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name | ||
) | ||
if isinstance(value, Literal): | ||
logger.debug(f"Using provided {value} as existing Literal value") | ||
lit = value | ||
else: | ||
lt = literal_type or ( | ||
TypeEngine.to_literal_type(python_type) if python_type else TypeEngine.to_literal_type(type(value)) | ||
) | ||
lit = TypeEngine.to_literal(self.context, value, python_type or type(value), lt) | ||
logger.debug(f"Converted {value} to literal {lit} using literal type {lt}") | ||
|
||
req = SignalSetRequest(id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=lit.to_flyte_idl()) | ||
|
||
# Response is empty currently, nothing to give back to the user. | ||
self.client.set_signal(req) | ||
|
||
Comment on lines
+635
to
+721
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider consolidating signal handling methods
Consider consolidating the signal handling methods Code suggestionCheck the AI-generated fix before applying
Code Review Run #8b91e1 Is this a valid issue, or was it incorrectly flagged by the Agent?
|
||
def set_signal( | ||
self, | ||
signal_id: str, | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -804,3 +804,27 @@ def test_get_control_plane_version(): | |||||||||||||||||||||||||
client = _SynchronousFlyteClient(PlatformConfig.for_endpoint("localhost:30080", True)) | ||||||||||||||||||||||||||
version = client.get_control_plane_version() | ||||||||||||||||||||||||||
assert version == "unknown" or version.startswith("v") | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
def test_signal_approve_reject(register): | ||||||||||||||||||||||||||
from flytekit.models.types import LiteralType, SimpleType | ||||||||||||||||||||||||||
from time import sleep | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN) | ||||||||||||||||||||||||||
conditional_wf = remote.fetch_workflow(name="basic.signal_test.signal_test_wf", version=VERSION) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]}) | ||||||||||||||||||||||||||
sleep(5) | ||||||||||||||||||||||||||
remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING)) | ||||||||||||||||||||||||||
sleep(10) | ||||||||||||||||||||||||||
remote.approve("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN) | ||||||||||||||||||||||||||
remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) | ||||||||||||||||||||||||||
assert execution.outputs["o0"] == {"title": "my report", "data": [1.0, 2.0, 3.0, 4.0, 5.0]} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
with pytest.raises(FlyteAssertion, match="Outputs could not be found because the execution ended in failure"): | ||||||||||||||||||||||||||
execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]}) | ||||||||||||||||||||||||||
sleep(5) | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider more reliable synchronization mechanism
Consider using a more reliable synchronization mechanism instead of hardcoded Code suggestionCheck the AI-generated fix before applying
Suggested change
Code Review Run #8b91e1 Is this a valid issue, or was it incorrectly flagged by the Agent?
|
||||||||||||||||||||||||||
remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING)) | ||||||||||||||||||||||||||
sleep(10) | ||||||||||||||||||||||||||
remote.reject("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN) | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
this is what @mao3267 said. Do you think there's a better way to know the signal is callable, which means ' approve |
||||||||||||||||||||||||||
remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5)) | ||||||||||||||||||||||||||
assert execution.outputs["o0"] == {"title": "my report", "data": [1.0, 2.0, 3.0, 4.0, 5.0]} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,19 @@ | ||||||||||||||||||||||||||
from datetime import timedelta | ||||||||||||||||||||||||||
from flytekit import task, workflow, wait_for_input, approve, conditional | ||||||||||||||||||||||||||
import typing | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@task | ||||||||||||||||||||||||||
def reporting_wf(title_input: str, data: typing.List[float]) -> dict: | ||||||||||||||||||||||||||
return {"title": title_input, "data": data} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@workflow | ||||||||||||||||||||||||||
def signal_test_wf(data: typing.List[float]) -> dict: | ||||||||||||||||||||||||||
title_input = wait_for_input(name="title-input", timeout=timedelta(hours=1), expected_type=str) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
# Define a "review-passes" approve node so that a human can review | ||||||||||||||||||||||||||
# the title before finalizing it. | ||||||||||||||||||||||||||
approve_node = approve(upstream_item=title_input, name="review-passes", timeout=timedelta(hours=1)) | ||||||||||||||||||||||||||
title_input >> approve_node | ||||||||||||||||||||||||||
# This conditional returns the finalized report if the review passes, | ||||||||||||||||||||||||||
# otherwise it returns an invalid report output. | ||||||||||||||||||||||||||
return reporting_wf(title_input, data) | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Approval result not used in workflow
The Code suggestionCheck the AI-generated fix before applying
Suggested change
Code Review Run #8b91e1 Is this a valid issue, or was it incorrectly flagged by the Agent?
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 'set_input()' method has too many parameters (7 > 5) which makes it harder to use and maintain. Consider refactoring using a configuration object pattern.
Code suggestion
Code Review Run #8b91e1
Is this a valid issue, or was it incorrectly flagged by the Agent?