Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flytekit] Separate remote signal functions #2933

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
87 changes: 87 additions & 0 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,93 @@ def list_signals(
s = resp.signals
return s

def approve(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
"""
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call.
:param execution_name: The name of the execution. This is the tail-end of the URL when looking
at the workflow execution.
:param project: The execution project, will default to the Remote's default project.
:param domain: The execution domain, will default to the Remote's default domain.
"""

wf_exec_id = WorkflowExecutionIdentifier(
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name
)

lt = TypeEngine.to_literal_type(bool)
true_literal = TypeEngine.to_literal(self.context, True, bool, lt)

req = SignalSetRequest(
id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=true_literal.to_flyte_idl()
)

# Response is empty currently, nothing to give back to the user.
self.client.set_signal(req)

def reject(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
"""
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call.
:param execution_name: The name of the execution. This is the tail-end of the URL when looking
at the workflow execution.
:param project: The execution project, will default to the Remote's default project.
:param domain: The execution domain, will default to the Remote's default domain.
"""

wf_exec_id = WorkflowExecutionIdentifier(
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name
)

lt = TypeEngine.to_literal_type(bool)
false_literal = TypeEngine.to_literal(self.context, False, bool, lt)

req = SignalSetRequest(
id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=false_literal.to_flyte_idl()
)

# Response is empty currently, nothing to give back to the user.
self.client.set_signal(req)

def set_input(
self,
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project=None,
domain=None,
python_type=None,
literal_type=None,
):
Comment on lines +681 to +690
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many parameters in set_input method

The 'set_input()' method has too many parameters (7 > 5) which makes it harder to use and maintain. Consider refactoring using a configuration object pattern.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def set_input(
self,
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project=None,
domain=None,
python_type=None,
literal_type=None,
):
@dataclass
class SetInputConfig:
signal_id: str
execution_name: str
value: typing.Union[literal_models.Literal, typing.Any]
project: typing.Optional[str] = None
domain: typing.Optional[str] = None
python_type: typing.Optional[type] = None
literal_type: typing.Optional[type_models.LiteralType] = None
def set_input(self, config: SetInputConfig) -> None:

Code Review Run #8b91e1


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

"""
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call.
:param execution_name: The name of the execution. This is the tail-end of the URL when looking
at the workflow execution.
:param value: This is either a Literal or a Python value which FlyteRemote will invoke the TypeEngine to
convert into a Literal. This argument is only value for wait_for_input type signals.
:param project: The execution project, will default to the Remote's default project.
:param domain: The execution domain, will default to the Remote's default domain.
:param python_type: Provide a python type to help with conversion if the value you provided is not a Literal.
:param literal_type: Provide a Flyte literal type to help with conversion if the value you provided
is not a Literal
"""

wf_exec_id = WorkflowExecutionIdentifier(
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name
)
if isinstance(value, Literal):
logger.debug(f"Using provided {value} as existing Literal value")
lit = value
else:
lt = literal_type or (
TypeEngine.to_literal_type(python_type) if python_type else TypeEngine.to_literal_type(type(value))
)
lit = TypeEngine.to_literal(self.context, value, python_type or type(value), lt)
logger.debug(f"Converted {value} to literal {lit} using literal type {lt}")

req = SignalSetRequest(id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=lit.to_flyte_idl())

# Response is empty currently, nothing to give back to the user.
self.client.set_signal(req)

Comment on lines +635 to +721
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider consolidating signal handling methods

Consider consolidating the signal handling methods approve(), reject(), and set_input() into a single method with a signal type parameter, as they share similar code structure and functionality. This would reduce code duplication and improve maintainability.

Code suggestion
Check the AI-generated fix before applying
 @@ -635,87 +635,108 @@
 +    def set_signal_value(self, signal_id: str, execution_name: str, value: typing.Union[literal_models.Literal, typing.Any],
 +                        project: str = None, domain: str = None, python_type = None, literal_type = None):
 +        wf_exec_id = WorkflowExecutionIdentifier(
 +            project=project or self.default_project, domain=domain or self.default_domain, name=execution_name
 +        )
 +        if isinstance(value, Literal):
 +            logger.debug(f"Using provided {value} as existing Literal value")
 +            lit = value
 +        else:
 +            lt = literal_type or (TypeEngine.to_literal_type(python_type) if python_type else TypeEngine.to_literal_type(type(value)))
 +            lit = TypeEngine.to_literal(self.context, value, python_type or type(value), lt)
 +            logger.debug(f"Converted {value} to literal {lit} using literal type {lt}")
 +
 +        req = SignalSetRequest(id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=lit.to_flyte_idl())
 +        self.client.set_signal(req)
 +
 +    def approve(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
 +        self.set_signal_value(signal_id, execution_name, True, project, domain, bool)
 +
 +    def reject(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
 +        self.set_signal_value(signal_id, execution_name, False, project, domain, bool)
 +
 +    def set_input(self, signal_id: str, execution_name: str, value: typing.Union[literal_models.Literal, typing.Any],
 +                  project=None, domain=None, python_type=None, literal_type=None):
 +        self.set_signal_value(signal_id, execution_name, value, project, domain, python_type, literal_type)

Code Review Run #8b91e1


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

def set_signal(
self,
signal_id: str,
Expand Down
24 changes: 24 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,3 +804,27 @@ def test_get_control_plane_version():
client = _SynchronousFlyteClient(PlatformConfig.for_endpoint("localhost:30080", True))
version = client.get_control_plane_version()
assert version == "unknown" or version.startswith("v")

def test_signal_approve_reject(register):
from flytekit.models.types import LiteralType, SimpleType
from time import sleep

remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
conditional_wf = remote.fetch_workflow(name="basic.signal_test.signal_test_wf", version=VERSION)

execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]})
sleep(5)
remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING))
sleep(10)
remote.approve("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN)
remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5))
assert execution.outputs["o0"] == {"title": "my report", "data": [1.0, 2.0, 3.0, 4.0, 5.0]}

with pytest.raises(FlyteAssertion, match="Outputs could not be found because the execution ended in failure"):
execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]})
sleep(5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider more reliable synchronization mechanism

Consider using a more reliable synchronization mechanism instead of hardcoded sleep() calls. The test may be flaky since it depends on timing.

Code suggestion
Check the AI-generated fix before applying
Suggested change
execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]})
sleep(5)
execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]})
max_retries = 10
retry_delay = 1
for _ in range(max_retries):
try:
remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING))
break
except Exception:
sleep(retry_delay)
retry_delay *= 2

Code Review Run #8b91e1


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING))
sleep(10)
remote.reject("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN)
Copy link
Member

@Future-Outlier Future-Outlier Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, there is a 5–10-second delay between the remote execution and the signal being stored in the database. Therefore, we now introduce a 5–10-second delay before each signal operation in the integration test.

this is what @mao3267 said.

Do you think there's a better way to know the signal is callable, which means ' approve, set_input, and reject` can be called?

cc @pingsutw @wild-endeavor @thomasjpfan

remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5))
assert execution.outputs["o0"] == {"title": "my report", "data": [1.0, 2.0, 3.0, 4.0, 5.0]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from datetime import timedelta
from flytekit import task, workflow, wait_for_input, approve, conditional
import typing

@task
def reporting_wf(title_input: str, data: typing.List[float]) -> dict:
return {"title": title_input, "data": data}

@workflow
def signal_test_wf(data: typing.List[float]) -> dict:
title_input = wait_for_input(name="title-input", timeout=timedelta(hours=1), expected_type=str)

# Define a "review-passes" approve node so that a human can review
# the title before finalizing it.
approve_node = approve(upstream_item=title_input, name="review-passes", timeout=timedelta(hours=1))
title_input >> approve_node
# This conditional returns the finalized report if the review passes,
# otherwise it returns an invalid report output.
return reporting_wf(title_input, data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approval result not used in workflow

The approve_node is created but its result is not used in the workflow logic. Consider using the approval result in the conditional logic to determine whether to proceed with report generation.

Code suggestion
Check the AI-generated fix before applying
Suggested change
approve_node = approve(upstream_item=title_input, name="review-passes", timeout=timedelta(hours=1))
title_input >> approve_node
# This conditional returns the finalized report if the review passes,
# otherwise it returns an invalid report output.
return reporting_wf(title_input, data)
approve_node = approve(upstream_item=title_input, name="review-passes", timeout=timedelta(hours=1))
title_input >> approve_node
return conditional(approve_node).\
if_().\
then(reporting_wf(title_input=title_input, data=data)).\
else_().\
fail("Report was not approved")

Code Review Run #8b91e1


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Loading