Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flytekit] Separate remote signal functions #2933

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,93 @@
s = resp.signals
return s

def approve(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
"""
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call.
:param execution_name: The name of the execution. This is the tail-end of the URL when looking
at the workflow execution.
:param project: The execution project, will default to the Remote's default project.
:param domain: The execution domain, will default to the Remote's default domain.
"""

wf_exec_id = WorkflowExecutionIdentifier(

Check warning on line 644 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L644

Added line #L644 was not covered by tests
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name
)

lt = TypeEngine.to_literal_type(bool)
true_literal = TypeEngine.to_literal(self.context, True, bool, lt)

Check warning on line 649 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L648-L649

Added lines #L648 - L649 were not covered by tests

req = SignalSetRequest(

Check warning on line 651 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L651

Added line #L651 was not covered by tests
id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=true_literal.to_flyte_idl()
)

# Response is empty currently, nothing to give back to the user.
self.client.set_signal(req)

Check warning on line 656 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L656

Added line #L656 was not covered by tests

def reject(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
"""
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call.
:param execution_name: The name of the execution. This is the tail-end of the URL when looking
at the workflow execution.
:param project: The execution project, will default to the Remote's default project.
:param domain: The execution domain, will default to the Remote's default domain.
"""

wf_exec_id = WorkflowExecutionIdentifier(

Check warning on line 667 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L667

Added line #L667 was not covered by tests
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name
)

lt = TypeEngine.to_literal_type(bool)
false_literal = TypeEngine.to_literal(self.context, False, bool, lt)

Check warning on line 672 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L671-L672

Added lines #L671 - L672 were not covered by tests

req = SignalSetRequest(

Check warning on line 674 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L674

Added line #L674 was not covered by tests
id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=false_literal.to_flyte_idl()
)

# Response is empty currently, nothing to give back to the user.
self.client.set_signal(req)

Check warning on line 679 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L679

Added line #L679 was not covered by tests

def set_input(
self,
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project=None,
domain=None,
python_type=None,
literal_type=None,
):
Comment on lines +681 to +690
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many parameters in set_input method

The 'set_input()' method has too many parameters (7 > 5) which makes it harder to use and maintain. Consider refactoring using a configuration object pattern.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def set_input(
self,
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project=None,
domain=None,
python_type=None,
literal_type=None,
):
@dataclass
class SetInputConfig:
signal_id: str
execution_name: str
value: typing.Union[literal_models.Literal, typing.Any]
project: typing.Optional[str] = None
domain: typing.Optional[str] = None
python_type: typing.Optional[type] = None
literal_type: typing.Optional[type_models.LiteralType] = None
def set_input(self, config: SetInputConfig) -> None:

Code Review Run #8b91e1


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

"""
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call.
:param execution_name: The name of the execution. This is the tail-end of the URL when looking
at the workflow execution.
:param value: This is either a Literal or a Python value which FlyteRemote will invoke the TypeEngine to
convert into a Literal. This argument is only value for wait_for_input type signals.
:param project: The execution project, will default to the Remote's default project.
:param domain: The execution domain, will default to the Remote's default domain.
:param python_type: Provide a python type to help with conversion if the value you provided is not a Literal.
:param literal_type: Provide a Flyte literal type to help with conversion if the value you provided
is not a Literal
"""

wf_exec_id = WorkflowExecutionIdentifier(

Check warning on line 704 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L704

Added line #L704 was not covered by tests
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name
)
if isinstance(value, Literal):
logger.debug(f"Using provided {value} as existing Literal value")
lit = value

Check warning on line 709 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L708-L709

Added lines #L708 - L709 were not covered by tests
else:
lt = literal_type or (

Check warning on line 711 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L711

Added line #L711 was not covered by tests
TypeEngine.to_literal_type(python_type) if python_type else TypeEngine.to_literal_type(type(value))
)
lit = TypeEngine.to_literal(self.context, value, python_type or type(value), lt)
logger.debug(f"Converted {value} to literal {lit} using literal type {lt}")

Check warning on line 715 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L714-L715

Added lines #L714 - L715 were not covered by tests

req = SignalSetRequest(id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=lit.to_flyte_idl())

Check warning on line 717 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L717

Added line #L717 was not covered by tests

# Response is empty currently, nothing to give back to the user.
self.client.set_signal(req)

Check warning on line 720 in flytekit/remote/remote.py

View check run for this annotation

Codecov / codecov/patch

flytekit/remote/remote.py#L720

Added line #L720 was not covered by tests

Comment on lines +635 to +721
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider consolidating signal handling methods

Consider consolidating the signal handling methods approve(), reject(), and set_input() into a single method with a signal type parameter, as they share similar code structure and functionality. This would reduce code duplication and improve maintainability.

Code suggestion
Check the AI-generated fix before applying
 @@ -635,87 +635,108 @@
 +    def set_signal_value(self, signal_id: str, execution_name: str, value: typing.Union[literal_models.Literal, typing.Any],
 +                        project: str = None, domain: str = None, python_type = None, literal_type = None):
 +        wf_exec_id = WorkflowExecutionIdentifier(
 +            project=project or self.default_project, domain=domain or self.default_domain, name=execution_name
 +        )
 +        if isinstance(value, Literal):
 +            logger.debug(f"Using provided {value} as existing Literal value")
 +            lit = value
 +        else:
 +            lt = literal_type or (TypeEngine.to_literal_type(python_type) if python_type else TypeEngine.to_literal_type(type(value)))
 +            lit = TypeEngine.to_literal(self.context, value, python_type or type(value), lt)
 +            logger.debug(f"Converted {value} to literal {lit} using literal type {lt}")
 +
 +        req = SignalSetRequest(id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=lit.to_flyte_idl())
 +        self.client.set_signal(req)
 +
 +    def approve(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
 +        self.set_signal_value(signal_id, execution_name, True, project, domain, bool)
 +
 +    def reject(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
 +        self.set_signal_value(signal_id, execution_name, False, project, domain, bool)
 +
 +    def set_input(self, signal_id: str, execution_name: str, value: typing.Union[literal_models.Literal, typing.Any],
 +                  project=None, domain=None, python_type=None, literal_type=None):
 +        self.set_signal_value(signal_id, execution_name, value, project, domain, python_type, literal_type)

Code Review Run #8b91e1


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

def set_signal(
self,
signal_id: str,
Expand Down
33 changes: 33 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,3 +866,36 @@ def test_attr_access_sd():
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
file_transfer.delete_file(bucket=bucket, key=key)

def test_signal_approve_reject(register):
from flytekit.models.types import LiteralType, SimpleType
from time import sleep

remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
conditional_wf = remote.fetch_workflow(name="basic.signal_test.signal_test_wf", version=VERSION)

execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]})

def retry_operation(operation):
max_retries = 10
for _ in range(max_retries):
try:
operation()
break
except Exception:
sleep(1)
Comment on lines +879 to +886
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding retry operation return value

Consider adding a return value from retry_operation to indicate success/failure. Currently, it silently continues after max retries which could mask failures.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def retry_operation(operation):
max_retries = 10
for _ in range(max_retries):
try:
operation()
break
except Exception:
sleep(1)
def retry_operation(operation) -> bool:
max_retries = 10
for _ in range(max_retries):
try:
operation()
return True
except Exception:
sleep(1)
return False

Code Review Run #79075d


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


retry_operation(lambda: remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING)))
retry_operation(lambda: remote.approve("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN))

remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5))
assert execution.outputs["o0"] == {"title": "my report", "data": [1.0, 2.0, 3.0, 4.0, 5.0]}

with pytest.raises(FlyteAssertion, match="Outputs could not be found because the execution ended in failure"):
execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]})

retry_operation(lambda: remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING)))
retry_operation(lambda: remote.reject("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN))

remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5))
assert execution.outputs["o0"] == {"title": "my report", "data": [1.0, 2.0, 3.0, 4.0, 5.0]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from datetime import timedelta
from flytekit import task, workflow, wait_for_input, approve, conditional
import typing

@task
def reporting_wf(title_input: str, data: typing.List[float]) -> dict:
return {"title": title_input, "data": data}

@workflow
def signal_test_wf(data: typing.List[float]) -> dict:
title_input = wait_for_input(name="title-input", timeout=timedelta(hours=1), expected_type=str)

# Define a "review-passes" approve node so that a human can review
# the title before finalizing it.
approve(upstream_item=title_input, name="review-passes", timeout=timedelta(hours=1))

return reporting_wf(title_input, data)
Loading