Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flytekit] Separate remote signal functions #2933

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,93 @@ def list_signals(
s = resp.signals
return s

def approve(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
"""
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call.
:param execution_name: The name of the execution. This is the tail-end of the URL when looking
at the workflow execution.
:param project: The execution project, will default to the Remote's default project.
:param domain: The execution domain, will default to the Remote's default domain.
"""

wf_exec_id = WorkflowExecutionIdentifier(
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name
)

lt = TypeEngine.to_literal_type(bool)
true_literal = TypeEngine.to_literal(self.context, True, bool, lt)

req = SignalSetRequest(
id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=true_literal.to_flyte_idl()
)

# Response is empty currently, nothing to give back to the user.
self.client.set_signal(req)

def reject(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
"""
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call.
:param execution_name: The name of the execution. This is the tail-end of the URL when looking
at the workflow execution.
:param project: The execution project, will default to the Remote's default project.
:param domain: The execution domain, will default to the Remote's default domain.
"""

wf_exec_id = WorkflowExecutionIdentifier(
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name
)

lt = TypeEngine.to_literal_type(bool)
false_literal = TypeEngine.to_literal(self.context, False, bool, lt)

req = SignalSetRequest(
id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=false_literal.to_flyte_idl()
)

# Response is empty currently, nothing to give back to the user.
self.client.set_signal(req)

def set_input(
self,
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project=None,
domain=None,
python_type=None,
literal_type=None,
):
Comment on lines +681 to +690
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many parameters in set_input method

The 'set_input()' method has too many parameters (7 > 5) which makes it harder to use and maintain. Consider refactoring using a configuration object pattern.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def set_input(
self,
signal_id: str,
execution_name: str,
value: typing.Union[literal_models.Literal, typing.Any],
project=None,
domain=None,
python_type=None,
literal_type=None,
):
@dataclass
class SetInputConfig:
signal_id: str
execution_name: str
value: typing.Union[literal_models.Literal, typing.Any]
project: typing.Optional[str] = None
domain: typing.Optional[str] = None
python_type: typing.Optional[type] = None
literal_type: typing.Optional[type_models.LiteralType] = None
def set_input(self, config: SetInputConfig) -> None:

Code Review Run #8b91e1


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

"""
:param signal_id: The name of the signal, this is the key used in the approve() or wait_for_input() call.
:param execution_name: The name of the execution. This is the tail-end of the URL when looking
at the workflow execution.
:param value: This is either a Literal or a Python value which FlyteRemote will invoke the TypeEngine to
convert into a Literal. This argument is only value for wait_for_input type signals.
:param project: The execution project, will default to the Remote's default project.
:param domain: The execution domain, will default to the Remote's default domain.
:param python_type: Provide a python type to help with conversion if the value you provided is not a Literal.
:param literal_type: Provide a Flyte literal type to help with conversion if the value you provided
is not a Literal
"""

wf_exec_id = WorkflowExecutionIdentifier(
project=project or self.default_project, domain=domain or self.default_domain, name=execution_name
)
if isinstance(value, Literal):
logger.debug(f"Using provided {value} as existing Literal value")
lit = value
else:
lt = literal_type or (
TypeEngine.to_literal_type(python_type) if python_type else TypeEngine.to_literal_type(type(value))
)
lit = TypeEngine.to_literal(self.context, value, python_type or type(value), lt)
logger.debug(f"Converted {value} to literal {lit} using literal type {lt}")

req = SignalSetRequest(id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=lit.to_flyte_idl())

# Response is empty currently, nothing to give back to the user.
self.client.set_signal(req)

Comment on lines +635 to +721
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider consolidating signal handling methods

Consider consolidating the signal handling methods approve(), reject(), and set_input() into a single method with a signal type parameter, as they share similar code structure and functionality. This would reduce code duplication and improve maintainability.

Code suggestion
Check the AI-generated fix before applying
 @@ -635,87 +635,108 @@
 +    def set_signal_value(self, signal_id: str, execution_name: str, value: typing.Union[literal_models.Literal, typing.Any],
 +                        project: str = None, domain: str = None, python_type = None, literal_type = None):
 +        wf_exec_id = WorkflowExecutionIdentifier(
 +            project=project or self.default_project, domain=domain or self.default_domain, name=execution_name
 +        )
 +        if isinstance(value, Literal):
 +            logger.debug(f"Using provided {value} as existing Literal value")
 +            lit = value
 +        else:
 +            lt = literal_type or (TypeEngine.to_literal_type(python_type) if python_type else TypeEngine.to_literal_type(type(value)))
 +            lit = TypeEngine.to_literal(self.context, value, python_type or type(value), lt)
 +            logger.debug(f"Converted {value} to literal {lit} using literal type {lt}")
 +
 +        req = SignalSetRequest(id=SignalIdentifier(signal_id, wf_exec_id).to_flyte_idl(), value=lit.to_flyte_idl())
 +        self.client.set_signal(req)
 +
 +    def approve(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
 +        self.set_signal_value(signal_id, execution_name, True, project, domain, bool)
 +
 +    def reject(self, signal_id: str, execution_name: str, project: str = None, domain: str = None):
 +        self.set_signal_value(signal_id, execution_name, False, project, domain, bool)
 +
 +    def set_input(self, signal_id: str, execution_name: str, value: typing.Union[literal_models.Literal, typing.Any],
 +                  project=None, domain=None, python_type=None, literal_type=None):
 +        self.set_signal_value(signal_id, execution_name, value, project, domain, python_type, literal_type)

Code Review Run #8b91e1


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

def set_signal(
self,
signal_id: str,
Expand Down
50 changes: 50 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,3 +866,53 @@ def test_attr_access_sd():
url = urlparse(remote_file_path)
bucket, key = url.netloc, url.path.lstrip("/")
file_transfer.delete_file(bucket=bucket, key=key)

def test_signal_approve_reject(register):
from flytekit.models.types import LiteralType, SimpleType
from time import sleep

remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
conditional_wf = remote.fetch_workflow(name="basic.signal_test.signal_test_wf", version=VERSION)

execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]})

max_retries = 10

for _ in range(max_retries):
try:
remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING))
break
except Exception as e:
sleep(1)
continue
for _ in range(max_retries):
try:
remote.approve("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN)
break
except Exception as e:
sleep(1)
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting duplicated retry logic

Consider extracting the retry logic into a reusable helper function since the same pattern is repeated multiple times. This could help reduce code duplication and improve maintainability.

Code suggestion
Check the AI-generated fix before applying
Suggested change
max_retries = 10
for _ in range(max_retries):
try:
remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING))
break
except Exception as e:
sleep(1)
continue
for _ in range(max_retries):
try:
remote.approve("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN)
break
except Exception as e:
sleep(1)
continue
def retry_operation(operation):
max_retries = 10
for _ in range(max_retries):
try:
operation()
break
except Exception:
sleep(1)
retry_operation(lambda: remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING)))
retry_operation(lambda: remote.approve("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN))

Code Review Run #ebca5e


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5))
assert execution.outputs["o0"] == {"title": "my report", "data": [1.0, 2.0, 3.0, 4.0, 5.0]}

with pytest.raises(FlyteAssertion, match="Outputs could not be found because the execution ended in failure"):
execution = remote.execute(conditional_wf, inputs={"data": [1.0, 2.0, 3.0, 4.0, 5.0]})

for _ in range(max_retries):
try:
remote.set_input("title-input", execution.id.name, value="my report", project=PROJECT, domain=DOMAIN, python_type=str, literal_type=LiteralType(simple=SimpleType.STRING))
break
except Exception as e:
sleep(1)
continue
for _ in range(max_retries):
try:
remote.reject("review-passes", execution.id.name, project=PROJECT, domain=DOMAIN)
break
except Exception as e:
sleep(1)
continue

remote.wait(execution=execution, timeout=datetime.timedelta(minutes=5))
assert execution.outputs["o0"] == {"title": "my report", "data": [1.0, 2.0, 3.0, 4.0, 5.0]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from datetime import timedelta
from flytekit import task, workflow, wait_for_input, approve, conditional
import typing

@task
def reporting_wf(title_input: str, data: typing.List[float]) -> dict:
return {"title": title_input, "data": data}

@workflow
def signal_test_wf(data: typing.List[float]) -> dict:
title_input = wait_for_input(name="title-input", timeout=timedelta(hours=1), expected_type=str)

# Define a "review-passes" approve node so that a human can review
# the title before finalizing it.
approve(upstream_item=title_input, name="review-passes", timeout=timedelta(hours=1))

return reporting_wf(title_input, data)
Loading