Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

Feature/new update submission status algorithm #832

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/taipy/core/_orchestrator/_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ def submit(
Returns:
The created Jobs.
"""
submission = _SubmissionManagerFactory._build_manager()._create(submittable.id) # type: ignore
submission = _SubmissionManagerFactory._build_manager()._create(
submittable.id, submittable._ID_PREFIX # type: ignore
)

jobs = []
tasks = submittable._get_sorted_tasks()
with cls.lock:
Expand Down Expand Up @@ -118,7 +121,7 @@ def submit_task(
Returns:
The created `Job^`.
"""
submission = _SubmissionManagerFactory._build_manager()._create(task.id)
submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX)
submit_id = submission.id
with cls.lock:
job = cls._lock_dn_output_and_create_job(
Expand Down Expand Up @@ -222,7 +225,6 @@ def _on_status_change(cls, job: Job):
if job.is_completed() or job.is_skipped():
cls.__unblock_jobs()
elif job.is_failed():
print(f"\nJob {job.id} failed, abandoning subsequent jobs.\n")
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
cls._fail_subsequent_jobs(job)

@classmethod
Expand Down Expand Up @@ -295,7 +297,6 @@ def _fail_subsequent_jobs(cls, failed_job: Job):
cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys()))
)
for job in to_fail_or_abandon_jobs:
print(f"Abandoning job: {job.id}")
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
job.abandoned()
to_fail_or_abandon_jobs.update([failed_job])
cls.__remove_blocked_jobs(to_fail_or_abandon_jobs)
Expand Down
2 changes: 2 additions & 0 deletions src/taipy/core/submission/_submission_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def _entity_to_model(cls, submission: Submission) -> _SubmissionModel:
return _SubmissionModel(
id=submission.id,
entity_id=submission._entity_id,
entity_type=submission.entity_type,
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
job_ids=[job.id if isinstance(job, Job) else JobId(str(job)) for job in list(submission._jobs)],
creation_date=submission._creation_date.isoformat(),
submission_status=submission._submission_status,
Expand All @@ -34,6 +35,7 @@ def _entity_to_model(cls, submission: Submission) -> _SubmissionModel:
def _model_to_entity(cls, model: _SubmissionModel) -> Submission:
submission = Submission(
entity_id=model.entity_id,
entity_type=model.entity_type,
id=SubmissionId(model.id),
jobs=model.job_ids,
creation_date=datetime.fromisoformat(model.creation_date),
Expand Down
7 changes: 2 additions & 5 deletions src/taipy/core/submission/_submission_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@ def _get_all(cls, version_number: Optional[str] = None) -> List[Submission]:
return cls._repository._load_all(filters)

@classmethod
def _create(
cls,
entity_id: str,
) -> Submission:
submission = Submission(entity_id=entity_id)
def _create(cls, entity_id: str, entity_type: str) -> Submission:
submission = Submission(entity_id=entity_id, entity_type=entity_type)
cls._set(submission)

Notifier.publish(_make_event(submission, EventOperation.CREATION))
Expand Down
4 changes: 4 additions & 0 deletions src/taipy/core/submission/_submission_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ class _SubmissionModel(_BaseModel):
mapper_registry.metadata,
Column("id", String, primary_key=True),
Column("entity_id", String),
Column("entity_type", String),
Column("job_ids", JSON),
Column("creation_date", String),
Column("submission_status", Enum(SubmissionStatus)),
Column("version", String),
)
id: str
entity_id: str
entity_type: str
job_ids: Union[List[JobId], List]
creation_date: str
submission_status: SubmissionStatus
Expand All @@ -45,6 +47,7 @@ def from_dict(data: Dict[str, Any]):
return _SubmissionModel(
id=data["id"],
entity_id=data["entity_id"],
entity_type=data["entity_type"],
job_ids=_BaseModel._deserialize_attribute(data["job_ids"]),
creation_date=data["creation_date"],
submission_status=SubmissionStatus._from_repr(data["submission_status"]),
Expand All @@ -55,6 +58,7 @@ def to_list(self):
return [
self.id,
self.entity_id,
self.entity_type,
_BaseModel._serialize_attribute(self.job_ids),
self.creation_date,
repr(self.submission_status),
Expand Down
95 changes: 50 additions & 45 deletions src/taipy/core/submission/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

import threading
import uuid
from datetime import datetime
from typing import Any, List, Optional, Union
from typing import Any, List, MutableSet, Optional, Union
toan-quach marked this conversation as resolved.
Show resolved Hide resolved

from .._entity._entity import _Entity
from .._entity._labeled import _Labeled
from .._entity._reload import _self_reload, _self_setter
from .._version._version_manager_factory import _VersionManagerFactory
from ..job._job_manager_factory import _JobManagerFactory
from ..job.job import Job, JobId
from ..job.job import Job, JobId, Status
from ..notification.event import Event, EventEntityType, EventOperation, _make_event
from .submission_id import SubmissionId
from .submission_status import SubmissionStatus
Expand All @@ -40,23 +41,33 @@ class Submission(_Entity, _Labeled):
_ID_PREFIX = "SUBMISSION"
_MANAGER_NAME = "submission"
__SEPARATOR = "_"
lock = threading.Lock()

def __init__(
self,
entity_id: str,
entity_type: str,
id: Optional[str] = None,
jobs: Optional[Union[List[Job], List[JobId]]] = None,
creation_date: Optional[datetime] = None,
submission_status: Optional[SubmissionStatus] = None,
version: Optional[str] = None,
):
self._entity_id = entity_id
self._entity_type = entity_type
self.id = id or self.__new_id()
self._jobs: Union[List[Job], List[JobId], List] = jobs or []
self._creation_date = creation_date or datetime.now()
self._submission_status = submission_status or SubmissionStatus.SUBMITTED
self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()

self.__abandoned = False
self.__completed = False

self.__running_jobs: MutableSet[str] = set()
self.__blocked_jobs: MutableSet[str] = set()
self.__pending_jobs: MutableSet[str] = set()

toan-quach marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ What is the purpose of these additional set fields? As far as I can see, we add/remove job ids from it in _update_submission_status but we never read from it!
if it is just to track the statuses of the jobs, why not just using one single Dict[JobId, Status]?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh it's just something to hold if we still have jobs running, blocked, or pending or not. As we still need access to other job statuses, if we use a dict, we will need to loop through all job statuses to do this check, while if we keep each status to an equivalent set, we can quickly identify the existence of "running jobs" or "blocked jobs", etc. in O(1) time. Does that make sense to you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but is it used anywhere in the code? I can't find any...
If it is not used, why do we need those sets?

Copy link
Contributor

@gmarabout gmarabout Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but I have concerns about this implementation...
Let me elaborate, so you can tell me I shouldn't actually be worried :-)

By keeping such "states" in the object without any db persistence will make any Taipy-core service stateful. Example: a pod running in Kubernetes.

I see at least two problems now:

  • If we need to "scale up" the pod (because we need more throughput/horse power), then the state remains in on pod but the other has an inconsistent state. The behaviour might become erratic.
  • If the pod dies for some reason, then the state/data is lost forever.

So either this state is important, and thus we need to somehow persist it, either it's not useful and we should just get rid of it.

let me know if we need to discuss it with more details.

Copy link
Member Author

@toan-quach toan-quach Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm for now, from what I can understand, as long as orchestrator and dispatcher is on the same application, sharing the same memory pool, this shouldn't be an issue. Previously we also rely on this fact when initiating the callback from job to update the submission status:

But you made a very valid point, in the scenario where we want to scale up, separating the dispatcher to a separate application or even, a separate machine, this won't work anymore. I think we should discuss this point further :D and we should also involve @jrobinAV as I think this topic would be interesting for this ticket also https://app.zenhub.com/workspaces/taipy-613ef328affde6000ff77fb3/issues/gh/avaiga/taipy-enterprise/268

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that the dispatcher and orchestrator should always on the same machine, and we only split the jobs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trgiangdo In theory, if you consider Kubernetes as the deployment platform, you shouldn't make such assumptions. Services running on K8s must natively be stateless and restartable without breaking the app (the so-called "cloud native").
And if it can only run once (i.e. a single process that won't scale), then it must also be ready to be restarted on other nodes without notice (K8S does that). It means the state must be easily restored or recomputed. In this case, it would just be emptied...
I don't want to block the PR, but design choices imply things that go beyond the scope of the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can close this topic for now.
To summarize:

  • We acknowledge that there are several places that are stateful that we don't have an approach to serialize them across multiple machines yet (including job callbacks, state of the submission entity, the event notification, etc.)
  • We will not these issues right now and will review them again later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can close the topic.

@staticmethod
def __new_id() -> str:
"""Generate a unique Submission identifier."""
Expand All @@ -66,6 +77,10 @@ def __new_id() -> str:
def entity_id(self) -> str:
return self._entity_id

@property
def entity_type(self) -> str:
return self._entity_type

@property
def creation_date(self):
return self._creation_date
Expand Down Expand Up @@ -130,56 +145,46 @@ def __gt__(self, other):
def __ge__(self, other):
return self.creation_date.timestamp() >= other.creation_date.timestamp()

def _update_submission_status(self, _: Job):
abandoned = False
canceled = False
blocked = False
pending = False
running = False
completed = False

for job in self.jobs:
if not job:
continue
if job.is_failed():
self.submission_status = SubmissionStatus.FAILED # type: ignore
return
if job.is_canceled():
canceled = True
continue
if job.is_blocked():
blocked = True
continue
if job.is_pending() or job.is_submitted():
pending = True
continue
if job.is_running():
running = True
continue
if job.is_completed() or job.is_skipped():
completed = True
continue
if job.is_abandoned():
abandoned = True
if canceled:
def _update_submission_status(self, job: Job):
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concern about the the possibility to call this method for the same job for 2 (or more) status changes in the wrong order.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed, the status of a job will be finished updated by the orchestrator before handing it over to the dispatcher so no update to job status shall create a collision. The only scenario this might happen is when we try to cancel a job as this action will be trigger directly from the main thread while the job could be running in the subthread. But we have various conditions to check if we can cancel a job or not, if it's running then we can't, if it's not running, I think this process can still be triggered safely when we update the status of job to canceled

if self._submission_status == SubmissionStatus.FAILED or self._submission_status == SubmissionStatus.CANCELED:
return

job_status = job.status
toan-quach marked this conversation as resolved.
Show resolved Hide resolved

if job_status == Status.FAILED:
self.submission_status = SubmissionStatus.FAILED # type: ignore
return
if job_status == Status.CANCELED:
self.submission_status = SubmissionStatus.CANCELED # type: ignore
return
if abandoned:

with self.lock:
if job_status == Status.BLOCKED:
self.__blocked_jobs.add(job.id)
if job_status == Status.PENDING or job_status == Status.SUBMITTED:
self.__pending_jobs.add(job.id)
self.__blocked_jobs.discard(job.id)
if job_status == Status.RUNNING:
self.__running_jobs.add(job.id)
self.__pending_jobs.discard(job.id)
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
if job_status == Status.COMPLETED or job_status == Status.SKIPPED:
self.__completed = True
self.__running_jobs.discard(job.id)
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
if job_status == Status.ABANDONED:
self.__abandoned = True
toan-quach marked this conversation as resolved.
Show resolved Hide resolved
toan-quach marked this conversation as resolved.
Show resolved Hide resolved

if self.__abandoned:
self.submission_status = SubmissionStatus.UNDEFINED # type: ignore
return
if running:
elif self.__running_jobs:
self.submission_status = SubmissionStatus.RUNNING # type: ignore
return
if pending:
elif self.__pending_jobs:
self.submission_status = SubmissionStatus.PENDING # type: ignore
return
if blocked:
elif self.__blocked_jobs:
self.submission_status = SubmissionStatus.BLOCKED # type: ignore
return
if completed:
elif self.__completed:
self.submission_status = SubmissionStatus.COMPLETED # type: ignore
return
self.submission_status = SubmissionStatus.UNDEFINED # type: ignore
else:
self.submission_status = SubmissionStatus.UNDEFINED # type: ignore
toan-quach marked this conversation as resolved.
Show resolved Hide resolved


@_make_event.register(Submission)
Expand Down
6 changes: 3 additions & 3 deletions tests/core/_orchestrator/_dispatcher/test_job_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def test_can_execute_synchronous():

task_id = TaskId("task_id1")
task = Task(config_id="name", properties={}, input=[], function=print, output=[], id=task_id)
submission = _SubmissionManagerFactory._build_manager()._create(task_id)
submission = _SubmissionManagerFactory._build_manager()._create(task_id, task._ID_PREFIX)
job_id = JobId("id1")
job = Job(job_id, task, submission.id, task.id)

Expand All @@ -130,7 +130,7 @@ def test_exception_in_user_function():
task_id = TaskId("task_id1")
job_id = JobId("id1")
task = Task(config_id="name", properties={}, input=[], function=_error, output=[], id=task_id)
submission = _SubmissionManagerFactory._build_manager()._create(task_id)
submission = _SubmissionManagerFactory._build_manager()._create(task_id, task._ID_PREFIX)
job = Job(job_id, task, submission.id, task.id)

dispatcher = _OrchestratorFactory._dispatcher
Expand All @@ -151,7 +151,7 @@ def test_exception_in_writing_data():
output._is_in_cache = False
output.write.side_effect = ValueError()
task = Task(config_id="name", properties={}, input=[], function=print, output=[output], id=task_id)
submission = _SubmissionManagerFactory._build_manager()._create(task_id)
submission = _SubmissionManagerFactory._build_manager()._create(task_id, task._ID_PREFIX)
job = Job(job_id, task, submission.id, task.id)

dispatcher = _OrchestratorFactory._dispatcher
Expand Down
Loading
Loading