diff --git a/src/taipy/core/_orchestrator/_orchestrator.py b/src/taipy/core/_orchestrator/_orchestrator.py index 5647a53f..6c62e77f 100644 --- a/src/taipy/core/_orchestrator/_orchestrator.py +++ b/src/taipy/core/_orchestrator/_orchestrator.py @@ -67,7 +67,10 @@ def submit( Returns: The created Jobs. """ - submission = _SubmissionManagerFactory._build_manager()._create(submittable.id) # type: ignore + submission = _SubmissionManagerFactory._build_manager()._create( + submittable.id, submittable._ID_PREFIX # type: ignore + ) + jobs = [] tasks = submittable._get_sorted_tasks() with cls.lock: @@ -118,7 +121,7 @@ def submit_task( Returns: The created `Job^`. """ - submission = _SubmissionManagerFactory._build_manager()._create(task.id) + submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX) submit_id = submission.id with cls.lock: job = cls._lock_dn_output_and_create_job( @@ -222,7 +225,6 @@ def _on_status_change(cls, job: Job): if job.is_completed() or job.is_skipped(): cls.__unblock_jobs() elif job.is_failed(): - print(f"\nJob {job.id} failed, abandoning subsequent jobs.\n") cls._fail_subsequent_jobs(job) @classmethod @@ -295,7 +297,6 @@ def _fail_subsequent_jobs(cls, failed_job: Job): cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys())) ) for job in to_fail_or_abandon_jobs: - print(f"Abandoning job: {job.id}") job.abandoned() to_fail_or_abandon_jobs.update([failed_job]) cls.__remove_blocked_jobs(to_fail_or_abandon_jobs) diff --git a/src/taipy/core/submission/_submission_converter.py b/src/taipy/core/submission/_submission_converter.py index 003ddcbe..0e3c1925 100644 --- a/src/taipy/core/submission/_submission_converter.py +++ b/src/taipy/core/submission/_submission_converter.py @@ -24,6 +24,7 @@ def _entity_to_model(cls, submission: Submission) -> _SubmissionModel: return _SubmissionModel( id=submission.id, entity_id=submission._entity_id, + entity_type=submission.entity_type, job_ids=[job.id if isinstance(job, Job) else JobId(str(job)) for job in list(submission._jobs)], creation_date=submission._creation_date.isoformat(), submission_status=submission._submission_status, @@ -34,6 +35,7 @@ def _entity_to_model(cls, submission: Submission) -> _SubmissionModel: def _model_to_entity(cls, model: _SubmissionModel) -> Submission: submission = Submission( entity_id=model.entity_id, + entity_type=model.entity_type, id=SubmissionId(model.id), jobs=model.job_ids, creation_date=datetime.fromisoformat(model.creation_date), diff --git a/src/taipy/core/submission/_submission_manager.py b/src/taipy/core/submission/_submission_manager.py index 1c79b118..ee1db450 100644 --- a/src/taipy/core/submission/_submission_manager.py +++ b/src/taipy/core/submission/_submission_manager.py @@ -35,11 +35,8 @@ def _get_all(cls, version_number: Optional[str] = None) -> List[Submission]: return cls._repository._load_all(filters) @classmethod - def _create( - cls, - entity_id: str, - ) -> Submission: - submission = Submission(entity_id=entity_id) + def _create(cls, entity_id: str, entity_type: str) -> Submission: + submission = Submission(entity_id=entity_id, entity_type=entity_type) cls._set(submission) Notifier.publish(_make_event(submission, EventOperation.CREATION)) diff --git a/src/taipy/core/submission/_submission_model.py b/src/taipy/core/submission/_submission_model.py index d099131c..a7466807 100644 --- a/src/taipy/core/submission/_submission_model.py +++ b/src/taipy/core/submission/_submission_model.py @@ -28,6 +28,7 @@ class _SubmissionModel(_BaseModel): mapper_registry.metadata, Column("id", String, primary_key=True), Column("entity_id", String), + Column("entity_type", String), Column("job_ids", JSON), Column("creation_date", String), Column("submission_status", Enum(SubmissionStatus)), @@ -35,6 +36,7 @@ class _SubmissionModel(_BaseModel): ) id: str entity_id: str + entity_type: str job_ids: Union[List[JobId], List] creation_date: str submission_status: SubmissionStatus @@ -45,6 +47,7 @@ def from_dict(data: Dict[str, Any]): return _SubmissionModel( id=data["id"], entity_id=data["entity_id"], + entity_type=data["entity_type"], job_ids=_BaseModel._deserialize_attribute(data["job_ids"]), creation_date=data["creation_date"], submission_status=SubmissionStatus._from_repr(data["submission_status"]), @@ -55,6 +58,7 @@ def to_list(self): return [ self.id, self.entity_id, + self.entity_type, _BaseModel._serialize_attribute(self.job_ids), self.creation_date, repr(self.submission_status), diff --git a/src/taipy/core/submission/submission.py b/src/taipy/core/submission/submission.py index 4584df35..a94c8781 100644 --- a/src/taipy/core/submission/submission.py +++ b/src/taipy/core/submission/submission.py @@ -9,16 +9,17 @@ # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. +import threading import uuid from datetime import datetime -from typing import Any, List, Optional, Union +from typing import Any, List, MutableSet, Optional, Union from .._entity._entity import _Entity from .._entity._labeled import _Labeled from .._entity._reload import _self_reload, _self_setter from .._version._version_manager_factory import _VersionManagerFactory from ..job._job_manager_factory import _JobManagerFactory -from ..job.job import Job, JobId +from ..job.job import Job, JobId, Status from ..notification.event import Event, EventEntityType, EventOperation, _make_event from .submission_id import SubmissionId from .submission_status import SubmissionStatus @@ -40,10 +41,12 @@ class Submission(_Entity, _Labeled): _ID_PREFIX = "SUBMISSION" _MANAGER_NAME = "submission" __SEPARATOR = "_" + lock = threading.Lock() def __init__( self, entity_id: str, + entity_type: str, id: Optional[str] = None, jobs: Optional[Union[List[Job], List[JobId]]] = None, creation_date: Optional[datetime] = None, @@ -51,12 +54,20 @@ def __init__( version: Optional[str] = None, ): self._entity_id = entity_id + self._entity_type = entity_type self.id = id or self.__new_id() self._jobs: Union[List[Job], List[JobId], List] = jobs or [] self._creation_date = creation_date or datetime.now() self._submission_status = submission_status or SubmissionStatus.SUBMITTED self._version = version or _VersionManagerFactory._build_manager()._get_latest_version() + self.__abandoned = False + self.__completed = False + + self.__running_jobs: MutableSet[str] = set() + self.__blocked_jobs: MutableSet[str] = set() + self.__pending_jobs: MutableSet[str] = set() + @staticmethod def __new_id() -> str: """Generate a unique Submission identifier.""" @@ -66,6 +77,10 @@ def __new_id() -> str: def entity_id(self) -> str: return self._entity_id + @property + def entity_type(self) -> str: + return self._entity_type + @property def creation_date(self): return self._creation_date @@ -130,56 +145,46 @@ def __gt__(self, other): def __ge__(self, other): return self.creation_date.timestamp() >= other.creation_date.timestamp() - def _update_submission_status(self, _: Job): - abandoned = False - canceled = False - blocked = False - pending = False - running = False - completed = False - - for job in self.jobs: - if not job: - continue - if job.is_failed(): - self.submission_status = SubmissionStatus.FAILED # type: ignore - return - if job.is_canceled(): - canceled = True - continue - if job.is_blocked(): - blocked = True - continue - if job.is_pending() or job.is_submitted(): - pending = True - continue - if job.is_running(): - running = True - continue - if job.is_completed() or job.is_skipped(): - completed = True - continue - if job.is_abandoned(): - abandoned = True - if canceled: + def _update_submission_status(self, job: Job): + if self._submission_status == SubmissionStatus.FAILED or self._submission_status == SubmissionStatus.CANCELED: + return + + job_status = job.status + + if job_status == Status.FAILED: + self.submission_status = SubmissionStatus.FAILED # type: ignore + return + if job_status == Status.CANCELED: self.submission_status = SubmissionStatus.CANCELED # type: ignore return - if abandoned: + + with self.lock: + if job_status == Status.BLOCKED: + self.__blocked_jobs.add(job.id) + if job_status == Status.PENDING or job_status == Status.SUBMITTED: + self.__pending_jobs.add(job.id) + self.__blocked_jobs.discard(job.id) + if job_status == Status.RUNNING: + self.__running_jobs.add(job.id) + self.__pending_jobs.discard(job.id) + if job_status == Status.COMPLETED or job_status == Status.SKIPPED: + self.__completed = True + self.__running_jobs.discard(job.id) + if job_status == Status.ABANDONED: + self.__abandoned = True + + if self.__abandoned: self.submission_status = SubmissionStatus.UNDEFINED # type: ignore - return - if running: + elif self.__running_jobs: self.submission_status = SubmissionStatus.RUNNING # type: ignore - return - if pending: + elif self.__pending_jobs: self.submission_status = SubmissionStatus.PENDING # type: ignore - return - if blocked: + elif self.__blocked_jobs: self.submission_status = SubmissionStatus.BLOCKED # type: ignore - return - if completed: + elif self.__completed: self.submission_status = SubmissionStatus.COMPLETED # type: ignore - return - self.submission_status = SubmissionStatus.UNDEFINED # type: ignore + else: + self.submission_status = SubmissionStatus.UNDEFINED # type: ignore @_make_event.register(Submission) diff --git a/tests/core/_orchestrator/_dispatcher/test_job_dispatcher.py b/tests/core/_orchestrator/_dispatcher/test_job_dispatcher.py index 8a9cfb6f..5753cd16 100644 --- a/tests/core/_orchestrator/_dispatcher/test_job_dispatcher.py +++ b/tests/core/_orchestrator/_dispatcher/test_job_dispatcher.py @@ -112,7 +112,7 @@ def test_can_execute_synchronous(): task_id = TaskId("task_id1") task = Task(config_id="name", properties={}, input=[], function=print, output=[], id=task_id) - submission = _SubmissionManagerFactory._build_manager()._create(task_id) + submission = _SubmissionManagerFactory._build_manager()._create(task_id, task._ID_PREFIX) job_id = JobId("id1") job = Job(job_id, task, submission.id, task.id) @@ -130,7 +130,7 @@ def test_exception_in_user_function(): task_id = TaskId("task_id1") job_id = JobId("id1") task = Task(config_id="name", properties={}, input=[], function=_error, output=[], id=task_id) - submission = _SubmissionManagerFactory._build_manager()._create(task_id) + submission = _SubmissionManagerFactory._build_manager()._create(task_id, task._ID_PREFIX) job = Job(job_id, task, submission.id, task.id) dispatcher = _OrchestratorFactory._dispatcher @@ -151,7 +151,7 @@ def test_exception_in_writing_data(): output._is_in_cache = False output.write.side_effect = ValueError() task = Task(config_id="name", properties={}, input=[], function=print, output=[output], id=task_id) - submission = _SubmissionManagerFactory._build_manager()._create(task_id) + submission = _SubmissionManagerFactory._build_manager()._create(task_id, task._ID_PREFIX) job = Job(job_id, task, submission.id, task.id) dispatcher = _OrchestratorFactory._dispatcher diff --git a/tests/core/_orchestrator/test_orchestrator.py b/tests/core/_orchestrator/test_orchestrator.py index bac169cb..0ef09989 100644 --- a/tests/core/_orchestrator/test_orchestrator.py +++ b/tests/core/_orchestrator/test_orchestrator.py @@ -27,8 +27,9 @@ from src.taipy.core.data.in_memory import InMemoryDataNode from src.taipy.core.scenario._scenario_manager import _ScenarioManager from src.taipy.core.scenario.scenario import Scenario -from src.taipy.core.sequence._sequence_manager import _SequenceManager from src.taipy.core.sequence.sequence import Sequence +from src.taipy.core.submission._submission_manager import _SubmissionManager +from src.taipy.core.submission.submission_status import SubmissionStatus from src.taipy.core.task._task_manager import _TaskManager from src.taipy.core.task.task import Task from taipy.config import Config @@ -93,6 +94,7 @@ def test_submit_task(): assert _DataManager._get(output_dn_id).job_ids == [job.id] assert _DataManager._get(output_dn_id).is_ready_for_reading assert job.is_completed() + assert _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED def test_submit_sequence_generate_unique_submit_id(): @@ -234,6 +236,7 @@ def return_2tuple(): assert task.output[f"{task.config_id}_output0"].read() == 0 assert job.is_failed() assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0 + assert _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED def test_scenario_only_submit_same_task_once(): @@ -261,16 +264,19 @@ def test_scenario_only_submit_same_task_once(): assert len(jobs) == 3 assert all([job.is_completed() for job in jobs]) assert all(not _Orchestrator._is_blocked(job) for job in jobs) + assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.COMPLETED jobs = _Orchestrator.submit(sequence_1) assert len(jobs) == 2 assert all([job.is_completed() for job in jobs]) assert all(not _Orchestrator._is_blocked(job) for job in jobs) + assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.COMPLETED jobs = _Orchestrator.submit(sequence_2) assert len(jobs) == 2 assert all([job.is_completed() for job in jobs]) assert all(not _Orchestrator._is_blocked(job) for job in jobs) + assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.COMPLETED def test_update_status_fail_job(): @@ -299,6 +305,7 @@ def test_update_status_fail_job(): job = _Orchestrator.submit_task(task_0) assert job.is_failed() + assert _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED jobs = _Orchestrator.submit(scenario_1) tasks_jobs = {job._task.id: job for job in jobs} @@ -306,6 +313,7 @@ def test_update_status_fail_job(): assert all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]]) assert tasks_jobs["task_3"].is_completed() assert all(not _Orchestrator._is_blocked(job) for job in jobs) + assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED jobs = _Orchestrator.submit(scenario_2) tasks_jobs = {job._task.id: job for job in jobs} @@ -313,6 +321,7 @@ def test_update_status_fail_job(): assert all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]]) assert tasks_jobs["task_3"].is_completed() assert all(not _Orchestrator._is_blocked(job) for job in jobs) + assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED def test_update_status_fail_job_in_parallel(): @@ -356,18 +365,25 @@ def test_update_status_fail_job_in_parallel(): job = _Orchestrator.submit_task(task_0) assert_true_after_time(job.is_failed) + assert_true_after_time(lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED) jobs = _Orchestrator.submit(sequence_1) tasks_jobs = {job._task.id: job for job in jobs} assert_true_after_time(tasks_jobs["task_0"].is_failed) assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]])) assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs)) + assert_true_after_time( + lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED + ) jobs = _Orchestrator.submit(scenario_1.sequences["sequence_1"]) tasks_jobs = {job._task.id: job for job in jobs} assert_true_after_time(tasks_jobs["task_0"].is_failed) assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]])) assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs)) + assert_true_after_time( + lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED + ) jobs = _Orchestrator.submit(scenario_1) tasks_jobs = {job._task.id: job for job in jobs} @@ -375,6 +391,9 @@ def test_update_status_fail_job_in_parallel(): assert_true_after_time(tasks_jobs["task_3"].is_completed) assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]])) assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs)) + assert_true_after_time( + lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED + ) jobs = _Orchestrator.submit(scenario_2) tasks_jobs = {job._task.id: job for job in jobs} @@ -382,6 +401,9 @@ def test_update_status_fail_job_in_parallel(): assert_true_after_time(tasks_jobs["task_3"].is_completed) assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]])) assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs)) + assert_true_after_time( + lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED + ) def test_submit_task_in_parallel(): @@ -399,9 +421,15 @@ def test_submit_task_in_parallel(): job = _Orchestrator.submit_task(task) assert_true_after_time(job.is_running) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) + assert_true_after_time( + lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(lambda: task.output[f"{task.config_id}_output0"].read() == 42) assert_true_after_time(job.is_completed) + assert_true_after_time( + lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED + ) assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0 @@ -422,9 +450,15 @@ def test_submit_sequence_in_parallel(): job = _Orchestrator.submit(sequence)[0] assert_true_after_time(job.is_running) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) + assert_true_after_time( + lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(lambda: task.output[f"{task.config_id}_output0"].read() == 42) assert_true_after_time(job.is_completed) + assert_true_after_time( + lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED + ) assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0 @@ -444,9 +478,15 @@ def test_submit_scenario_in_parallel(): job = _Orchestrator.submit(scenario)[0] assert_true_after_time(job.is_running) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) + assert_true_after_time( + lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(lambda: task.output[f"{task.config_id}_output0"].read() == 42) assert_true_after_time(job.is_completed) + assert_true_after_time( + lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED + ) assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0 @@ -469,6 +509,9 @@ def test_submit_task_synchronously_in_parallel(): job = _Orchestrator.submit_task(task, wait=True) assert (datetime.now() - start_time).seconds >= sleep_period assert_true_after_time(job.is_completed) + assert_true_after_time( + lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED + ) def test_submit_sequence_synchronously_in_parallel(): @@ -483,6 +526,9 @@ def test_submit_sequence_synchronously_in_parallel(): job = _Orchestrator.submit(sequence, wait=True)[0] assert (datetime.now() - start_time).seconds >= sleep_period assert_true_after_time(job.is_completed) + assert_true_after_time( + lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED + ) def test_submit_scenario_synchronously_in_parallel(): @@ -497,6 +543,9 @@ def test_submit_scenario_synchronously_in_parallel(): job = _Orchestrator.submit(scenario, wait=True)[0] assert (datetime.now() - start_time).seconds >= sleep_period assert_true_after_time(job.is_completed) + assert_true_after_time( + lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED + ) def test_submit_fail_task_synchronously_in_parallel(): @@ -509,6 +558,7 @@ def test_submit_fail_task_synchronously_in_parallel(): job = _Orchestrator.submit_task(task, wait=True) assert (datetime.now() - start_time).seconds >= sleep_period assert_true_after_time(job.is_failed) + assert_true_after_time(lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED) def test_submit_fail_sequence_synchronously_in_parallel(): @@ -523,6 +573,7 @@ def test_submit_fail_sequence_synchronously_in_parallel(): job = _Orchestrator.submit(sequence, wait=True)[0] assert (datetime.now() - start_time).seconds >= sleep_period assert_true_after_time(job.is_failed) + assert_true_after_time(lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED) def test_submit_fail_scenario_synchronously_in_parallel(): @@ -537,6 +588,7 @@ def test_submit_fail_scenario_synchronously_in_parallel(): job = _Orchestrator.submit(scenario, wait=True)[0] assert (datetime.now() - start_time).seconds >= sleep_period assert_true_after_time(job.is_failed) + assert_true_after_time(lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED) def test_submit_task_synchronously_in_parallel_with_timeout(): @@ -553,6 +605,9 @@ def test_submit_task_synchronously_in_parallel_with_timeout(): assert timeout_duration <= (end_time - start_time).seconds assert_true_after_time(job.is_completed) + assert_true_after_time( + lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED + ) def test_submit_task_multithreading_multiple_task(): @@ -577,17 +632,34 @@ def test_submit_task_multithreading_multiple_task(): assert_true_after_time(job_1.is_running) assert_true_after_time(job_2.is_running) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2) + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING + ) + assert_true_after_time( + lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42) assert task_1.output[f"{task_1.config_id}_output0"].read() == 0 assert_true_after_time(job_2.is_completed) assert_true_after_time(job_1.is_running) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING + ) + assert_true_after_time( + lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED + ) assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42) assert_true_after_time(job_1.is_completed) - assert_true_after_time(job_2.is_completed) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0) + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED + ) + + assert job_2.is_completed() + assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED def test_submit_sequence_multithreading_multiple_task(): @@ -615,17 +687,28 @@ def test_submit_sequence_multithreading_multiple_task(): assert_true_after_time(job_1.is_running) assert_true_after_time(job_2.is_running) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2) + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42) assert task_1.output[f"{task_1.config_id}_output0"].read() == 0 assert_true_after_time(job_2.is_completed) assert_true_after_time(job_1.is_running) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42) assert_true_after_time(job_1.is_completed) - assert_true_after_time(job_2.is_completed) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0) + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED + ) + + assert job_2.is_completed() + assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED def test_submit_scenario_multithreading_multiple_task(): @@ -653,20 +736,29 @@ def test_submit_scenario_multithreading_multiple_task(): assert_true_after_time(job_1.is_running) assert_true_after_time(job_2.is_running) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2) - + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42) assert task_1.output[f"{task_1.config_id}_output0"].read() == 0 assert_true_after_time(job_2.is_completed) assert_true_after_time(job_1.is_running) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42) assert_true_after_time(job_1.is_completed) - assert_true_after_time(job_2.is_completed) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0) + assert_true_after_time(job_2.is_completed) + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED + ) def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_status(): + # TODO Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2) m = multiprocessing.Manager() @@ -684,6 +776,9 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu job_0 = _Orchestrator.submit_task(task_0) assert_true_after_time(job_0.is_running) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) + assert_true_after_time( + lambda: _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.RUNNING + ) with lock_1: with lock_2: assert task_1.output[f"{task_1.config_id}_output0"].read() == 0 @@ -693,6 +788,15 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu assert_true_after_time(job_0.is_running) assert_true_after_time(job_1.is_pending) assert_true_after_time(job_2.is_running) + assert_true_after_time( + lambda: _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.RUNNING + ) + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.PENDING + ) + assert_true_after_time( + lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2) assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42) @@ -700,13 +804,30 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu assert_true_after_time(job_0.is_running) assert_true_after_time(job_1.is_running) assert_true_after_time(job_2.is_completed) + assert_true_after_time( + lambda: _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.RUNNING + ) + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING + ) + assert_true_after_time( + lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED + ) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2) assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42) assert task_0.output[f"{task_0.config_id}_output0"].read() == 0 assert_true_after_time(job_0.is_running) assert_true_after_time(job_1.is_completed) + assert_true_after_time( + lambda: _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.RUNNING + ) + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED + ) + assert job_2.is_completed() + assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0) @@ -714,6 +835,9 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu assert job_0.is_completed() assert job_1.is_completed() assert job_2.is_completed() + assert _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.COMPLETED + assert _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED + assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED def test_blocked_task(): @@ -744,6 +868,7 @@ def test_blocked_task(): job_2 = _Orchestrator.submit_task(task_2) # job 2 is submitted first assert job_2.is_blocked() # since bar is not is_valid the job 2 is blocked assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0) + assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.BLOCKED assert len(_Orchestrator.blocked_jobs) == 1 with lock_2: with lock_1: @@ -754,16 +879,32 @@ def test_blocked_task(): assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) assert not _DataManager._get(task_1.bar.id).is_ready_for_reading # And bar still not ready assert_true_after_time(job_2.is_blocked) # the job_2 remains blocked + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING + ) + assert_true_after_time( + lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.BLOCKED + ) assert_true_after_time(job_1.is_completed) # job1 unlocked and can complete assert _DataManager._get(task_1.bar.id).is_ready_for_reading # bar becomes ready assert _DataManager._get(task_1.bar.id).read() == 2 # the data is computed and written assert_true_after_time(job_2.is_running) # And job 2 can start running assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) assert len(_Orchestrator.blocked_jobs) == 0 + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED + ) + assert_true_after_time( + lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(job_2.is_completed) # job 2 unlocked so it can complete assert _DataManager._get(task_2.baz.id).is_ready_for_reading # baz becomes ready assert _DataManager._get(task_2.baz.id).read() == 6 # the data is computed and written assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0) + assert _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED + assert_true_after_time( + lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED + ) def test_blocked_sequence(): @@ -801,16 +942,25 @@ def test_blocked_sequence(): assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) assert not _DataManager._get(task_1.bar.id).is_ready_for_reading # And bar still not ready assert_true_after_time(job_2.is_blocked) # the job_2 remains blocked + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(job_1.is_completed) # job1 unlocked and can complete assert _DataManager._get(task_1.bar.id).is_ready_for_reading # bar becomes ready assert _DataManager._get(task_1.bar.id).read() == 2 # the data is computed and written assert_true_after_time(job_2.is_running) # And job 2 can start running assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) assert len(_Orchestrator.blocked_jobs) == 0 + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(job_2.is_completed) # job 2 unlocked so it can complete assert _DataManager._get(task_2.baz.id).is_ready_for_reading # baz becomes ready assert _DataManager._get(task_2.baz.id).read() == 6 # the data is computed and written assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0) + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED + ) def test_blocked_scenario(): @@ -848,16 +998,25 @@ def test_blocked_scenario(): assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) assert not _DataManager._get(task_1.bar.id).is_ready_for_reading # And bar still not ready assert_true_after_time(job_2.is_blocked) # the job_2 remains blocked + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(job_1.is_completed) # job1 unlocked and can complete assert _DataManager._get(task_1.bar.id).is_ready_for_reading # bar becomes ready assert _DataManager._get(task_1.bar.id).read() == 2 # the data is computed and written assert_true_after_time(job_2.is_running) # And job 2 can start running assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1) assert len(_Orchestrator.blocked_jobs) == 0 + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING + ) assert_true_after_time(job_2.is_completed) # job 2 unlocked so it can complete assert _DataManager._get(task_2.baz.id).is_ready_for_reading # baz becomes ready assert _DataManager._get(task_2.baz.id).read() == 6 # the data is computed and written assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0) + assert_true_after_time( + lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED + ) def test_task_orchestrator_create_synchronous_dispatcher(): @@ -900,6 +1059,9 @@ def test_can_exec_task_with_modified_config(): assert_true_after_time( jobs[0].is_completed ) # If the job is completed, that means the asserts in the task are successful + assert_true_after_time( + lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.COMPLETED + ) def update_config_task(n): @@ -940,6 +1102,9 @@ def test_cannot_exec_task_that_update_config(): # The job should fail due to an exception is raised assert_true_after_time(jobs[0].is_failed) + assert_true_after_time( + lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED + ) def test_can_execute_task_with_development_mode(): diff --git a/tests/core/data/test_parquet_data_node.py b/tests/core/data/test_parquet_data_node.py index ffe1d98d..73548c4f 100644 --- a/tests/core/data/test_parquet_data_node.py +++ b/tests/core/data/test_parquet_data_node.py @@ -500,7 +500,6 @@ def test_pandas_parquet_config_kwargs(self, engine, tmpdir_factory): dn.write(df) assert set(pd.read_parquet(temp_file_path).columns) == {"id", "integer", "text"} - print(dn.read()) assert set(dn.read().columns) == set(read_kwargs["columns"]) # !!! filter doesn't work with `fastparquet` without partition_cols diff --git a/tests/core/job/test_job.py b/tests/core/job/test_job.py index 28f9d4d0..0c9037d2 100644 --- a/tests/core/job/test_job.py +++ b/tests/core/job/test_job.py @@ -119,7 +119,7 @@ def test_comparison(task): def test_status_job(task): - submission = _SubmissionManagerFactory._build_manager()._create(task.id) + submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX) job = Job("job_id", task, submission.id, "SCENARIO_scenario_config") submission.jobs = [job] @@ -150,7 +150,7 @@ def test_status_job(task): def test_notification_job(task): subscribe = MagicMock() - submission = _SubmissionManagerFactory._build_manager()._create(task.id) + submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX) job = Job("job_id", task, submission.id, "SCENARIO_scenario_config") submission.jobs = [job] @@ -170,7 +170,7 @@ def test_notification_job(task): def test_handle_exception_in_user_function(task_id, job_id): task = Task(config_id="name", properties={}, input=[], function=_error, output=[], id=task_id) - submission = _SubmissionManagerFactory._build_manager()._create(task.id) + submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX) job = Job(job_id, task, submission.id, "scenario_entity_id") submission.jobs = [job] @@ -184,7 +184,7 @@ def test_handle_exception_in_user_function(task_id, job_id): def test_handle_exception_in_input_data_node(task_id, job_id): data_node = InMemoryDataNode("data_node", scope=Scope.SCENARIO) task = Task(config_id="name", properties={}, input=[data_node], function=print, output=[], id=task_id) - submission = _SubmissionManagerFactory._build_manager()._create(task.id) + submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX) job = Job(job_id, task, submission.id, "scenario_entity_id") submission.jobs = [job] @@ -198,7 +198,7 @@ def test_handle_exception_in_input_data_node(task_id, job_id): def test_handle_exception_in_ouptut_data_node(replace_in_memory_write_fct, task_id, job_id): data_node = InMemoryDataNode("data_node", scope=Scope.SCENARIO) task = Task(config_id="name", properties={}, input=[], function=_foo, output=[data_node], id=task_id) - submission = _SubmissionManagerFactory._build_manager()._create(task.id) + submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX) job = Job(job_id, task, submission.id, "scenario_entity_id") submission.jobs = [job] @@ -213,7 +213,7 @@ def test_handle_exception_in_ouptut_data_node(replace_in_memory_write_fct, task_ def test_auto_set_and_reload(current_datetime, job_id): task_1 = Task(config_id="name_1", properties={}, function=_foo, id=TaskId("task_1")) task_2 = Task(config_id="name_2", properties={}, function=_foo, id=TaskId("task_2")) - submission = _SubmissionManagerFactory._build_manager()._create(task_1.id) + submission = _SubmissionManagerFactory._build_manager()._create(task_1.id, task_1._ID_PREFIX) job_1 = Job(job_id, task_1, submission.id, "scenario_entity_id") submission.jobs = [job_1] diff --git a/tests/core/job/test_job_manager.py b/tests/core/job/test_job_manager.py index 524ab1a5..4701d6e1 100644 --- a/tests/core/job/test_job_manager.py +++ b/tests/core/job/test_job_manager.py @@ -349,8 +349,8 @@ def test_cancel_subsequent_jobs(): task_3 = Task("task_config_3", {}, print, [dn_4], id="task_3") # Can't get tasks under 1 scenario due to partial not serializable - submission_1 = submission_manager._create("scenario_id") - submission_2 = submission_manager._create("scenario_id") + submission_1 = submission_manager._create("scenario_id", "SCENARIO") + submission_2 = submission_manager._create("scenario_id", "SCENARIO") _DataManager._set(dn_1) _DataManager._set(dn_2) diff --git a/tests/core/notification/test_notifier.py b/tests/core/notification/test_notifier.py index 135a8856..f7c9ae30 100644 --- a/tests/core/notification/test_notifier.py +++ b/tests/core/notification/test_notifier.py @@ -109,7 +109,6 @@ def find_registration_and_topic(registration_id): Notifier.unregister(registration_id_1) assert len(Notifier._topics_registrations_list.keys()) == 2 - print(Notifier._topics_registrations_list.keys()) assert all(topic not in Notifier._topics_registrations_list.keys() for topic in [topic_0, topic_1]) Notifier.unregister(registration_id_2) diff --git a/tests/core/submission/test_submission.py b/tests/core/submission/test_submission.py index 9ec70a6e..85f43c44 100644 --- a/tests/core/submission/test_submission.py +++ b/tests/core/submission/test_submission.py @@ -29,7 +29,7 @@ def test_create_submission(scenario, job, current_datetime): - submission_1 = Submission(scenario.id) + submission_1 = Submission(scenario.id, scenario._ID_PREFIX) assert submission_1.id is not None assert submission_1.entity_id == scenario.id @@ -39,7 +39,13 @@ def test_create_submission(scenario, job, current_datetime): assert submission_1._version is not None submission_2 = Submission( - scenario.id, "submission_id", [job], current_datetime, SubmissionStatus.COMPLETED, "version_id" + scenario.id, + scenario._ID_PREFIX, + "submission_id", + [job], + current_datetime, + SubmissionStatus.COMPLETED, + "version_id", ) assert submission_2.id == "submission_id" @@ -83,7 +89,7 @@ def is_submitted(self): return self.status == Status.SUBMITTED -def mock_get_jobs(job_ids): +def __test_update_submission_status(job_ids, expected_submission_status): jobs = { "job0_submitted": MockJob("job0_submitted", Status.SUBMITTED), "job1_failed": MockJob("job1_failed", Status.FAILED), @@ -95,20 +101,13 @@ def mock_get_jobs(job_ids): "job7_skipped": MockJob("job7_skipped", Status.SKIPPED), "job8_abandoned": MockJob("job8_abandoned", Status.ABANDONED), } - return [jobs[job_id] for job_id in job_ids] - -def __test_update_submission_status(job_ids, expected_submission_status): - with ( - patch( - "src.taipy.core.submission.submission.Submission.jobs", - new_callable=mock.PropertyMock, - return_value=(mock_get_jobs(job_ids)), - ) - ): - submission = Submission("submission_id") - submission._update_submission_status(None) - assert submission.submission_status == expected_submission_status + submission = Submission("submission_id", "ENTITY_TYPE") + submission.jobs = [jobs[job_id] for job_id in job_ids] + for job_id in job_ids: + job = jobs[job_id] + submission._update_submission_status(job) + assert submission.submission_status == expected_submission_status @pytest.mark.parametrize( @@ -139,7 +138,6 @@ def test_update_single_submission_status(job_ids, expected_submission_status): (["job1_failed", "job6_completed"], SubmissionStatus.FAILED), (["job1_failed", "job7_skipped"], SubmissionStatus.FAILED), (["job1_failed", "job8_abandoned"], SubmissionStatus.FAILED), - (["job2_canceled", "job1_failed"], SubmissionStatus.FAILED), (["job3_blocked", "job1_failed"], SubmissionStatus.FAILED), (["job4_pending", "job1_failed"], SubmissionStatus.FAILED), (["job5_running", "job1_failed"], SubmissionStatus.FAILED), @@ -162,6 +160,7 @@ def test_update_submission_status_with_one_failed_job_in_jobs(job_ids, expected_ (["job2_canceled", "job6_completed"], SubmissionStatus.CANCELED), (["job2_canceled", "job7_skipped"], SubmissionStatus.CANCELED), (["job2_canceled", "job8_abandoned"], SubmissionStatus.CANCELED), + (["job2_canceled", "job1_failed"], SubmissionStatus.CANCELED), (["job3_blocked", "job2_canceled"], SubmissionStatus.CANCELED), (["job4_pending", "job2_canceled"], SubmissionStatus.CANCELED), (["job5_running", "job2_canceled"], SubmissionStatus.CANCELED), @@ -263,7 +262,7 @@ def test_update_submission_status_with_wrong_case_abandoned_without_cancel_or_fa def test_auto_set_and_reload(): task = Task(config_id="name_1", properties={}, function=print, id=TaskId("task_1")) - submission_1 = Submission(task.id) + submission_1 = Submission(task.id, task._ID_PREFIX) job_1 = Job("job_1", task, submission_1.id, submission_1.entity_id) job_2 = Job("job_2", task, submission_1.id, submission_1.entity_id) diff --git a/tests/core/submission/test_submission_manager.py b/tests/core/submission/test_submission_manager.py index ff50e3a8..ab8ad421 100644 --- a/tests/core/submission/test_submission_manager.py +++ b/tests/core/submission/test_submission_manager.py @@ -20,7 +20,7 @@ def test_create_submission(scenario): - submission_1 = _SubmissionManagerFactory._build_manager()._create(scenario.id) + submission_1 = _SubmissionManagerFactory._build_manager()._create(scenario.id, scenario._ID_PREFIX) assert submission_1.id is not None assert submission_1.entity_id == scenario.id @@ -34,7 +34,7 @@ def test_get_submission(): assert submission_manager._get("random_submission_id") is None - submission_1 = submission_manager._create("entity_id") + submission_1 = submission_manager._create("entity_id", "ENTITY_TYPE") submission_2 = submission_manager._get(submission_1.id) assert submission_1.id == submission_2.id @@ -69,22 +69,22 @@ def test_get_latest_submission(): task_2 = Task("task_config_2", {}, print, id="task_id_2") submission_manager = _SubmissionManagerFactory._build_manager() - submission_1 = submission_manager._create(task_1.id) + submission_1 = submission_manager._create(task_1.id, task_1._ID_PREFIX) assert submission_manager._get_latest(task_1) == submission_1 assert submission_manager._get_latest(task_2) is None sleep(0.01) # Comparison is based on time, precision on Windows is not enough important - submission_2 = submission_manager._create(task_2.id) + submission_2 = submission_manager._create(task_2.id, task_2._ID_PREFIX) assert submission_manager._get_latest(task_1) == submission_1 assert submission_manager._get_latest(task_2) == submission_2 sleep(0.01) # Comparison is based on time, precision on Windows is not enough important - submission_3 = submission_manager._create(task_1.id) + submission_3 = submission_manager._create(task_1.id, task_1._ID_PREFIX) assert submission_manager._get_latest(task_1) == submission_3 assert submission_manager._get_latest(task_2) == submission_2 sleep(0.01) # Comparison is based on time, precision on Windows is not enough important - submission_4 = submission_manager._create(task_2.id) + submission_4 = submission_manager._create(task_2.id, task_2._ID_PREFIX) assert submission_manager._get_latest(task_1) == submission_3 assert submission_manager._get_latest(task_2) == submission_4 diff --git a/tests/core/submission/test_submission_manager_with_sql_repo.py b/tests/core/submission/test_submission_manager_with_sql_repo.py index 0854301b..80617f4f 100644 --- a/tests/core/submission/test_submission_manager_with_sql_repo.py +++ b/tests/core/submission/test_submission_manager_with_sql_repo.py @@ -28,7 +28,7 @@ def init_managers(): def test_create_submission(scenario, init_sql_repo): init_managers() - submission_1 = _SubmissionManagerFactory._build_manager()._create(scenario.id) + submission_1 = _SubmissionManagerFactory._build_manager()._create(scenario.id, scenario._ID_PREFIX) assert submission_1.id is not None assert submission_1.entity_id == scenario.id @@ -42,7 +42,7 @@ def test_get_submission(init_sql_repo): submission_manager = _SubmissionManagerFactory._build_manager() - submission_1 = submission_manager._create("entity_id") + submission_1 = submission_manager._create("entity_id", "ENTITY_TYPE") submission_2 = submission_manager._get(submission_1.id) assert submission_1.id == submission_2.id @@ -80,22 +80,22 @@ def test_get_latest_submission(init_sql_repo): task_2 = Task("task_config_2", {}, print, id="task_id_2") submission_manager = _SubmissionManagerFactory._build_manager() - submission_1 = submission_manager._create(task_1.id) + submission_1 = submission_manager._create(task_1.id, task_1._ID_PREFIX) assert submission_manager._get_latest(task_1) == submission_1 assert submission_manager._get_latest(task_2) is None sleep(0.01) # Comparison is based on time, precision on Windows is not enough important - submission_2 = submission_manager._create(task_2.id) + submission_2 = submission_manager._create(task_2.id, task_2._ID_PREFIX) assert submission_manager._get_latest(task_1) == submission_1 assert submission_manager._get_latest(task_2) == submission_2 sleep(0.01) # Comparison is based on time, precision on Windows is not enough important - submission_3 = submission_manager._create(task_1.id) + submission_3 = submission_manager._create(task_1.id, task_1._ID_PREFIX) assert submission_manager._get_latest(task_1) == submission_3 assert submission_manager._get_latest(task_2) == submission_2 sleep(0.01) # Comparison is based on time, precision on Windows is not enough important - submission_4 = submission_manager._create(task_2.id) + submission_4 = submission_manager._create(task_2.id, task_2._ID_PREFIX) assert submission_manager._get_latest(task_1) == submission_3 assert submission_manager._get_latest(task_2) == submission_4 diff --git a/tests/core/submission/test_submission_repositories.py b/tests/core/submission/test_submission_repositories.py index ecb5a2db..b5e2dcad 100644 --- a/tests/core/submission/test_submission_repositories.py +++ b/tests/core/submission/test_submission_repositories.py @@ -43,7 +43,7 @@ def test_save_and_load(self, data_node, job, configure_repo): job._task = task _JobManagerFactory._build_manager()._repository._save(job) - submission = Submission(task.id) + submission = Submission(task.id, task._ID_PREFIX) submission_repository = _SubmissionManagerFactory._build_manager()._repository submission_repository._save(submission) submission.jobs = [job] @@ -55,7 +55,7 @@ def test_save_and_load(self, data_node, job, configure_repo): def test_exists(self, configure_repo): configure_repo() - submission = Submission("entity_id") + submission = Submission("entity_id", "ENTITY_TYPE") submission_repository = _SubmissionManagerFactory._build_manager()._repository submission_repository._save(submission) @@ -67,7 +67,7 @@ def test_load_all(self, configure_repo): configure_repo() repository = _SubmissionManagerFactory._build_manager()._repository - submission = Submission("entity_id") + submission = Submission("entity_id", "ENTITY_TYPE") for i in range(10): submission.id = f"submission-{i}" repository._save(submission) @@ -81,7 +81,7 @@ def test_delete(self, configure_repo): repository = _SubmissionManagerFactory._build_manager()._repository - submission = Submission("entity_id") + submission = Submission("entity_id", "ENTITY_TYPE") repository._save(submission) repository._delete(submission.id) @@ -94,7 +94,7 @@ def test_delete_all(self, configure_repo): configure_repo() submission_repository = _SubmissionManagerFactory._build_manager()._repository - submission = Submission("entity_id") + submission = Submission("entity_id", "ENTITY_TYPE") for i in range(10): submission.id = f"submission-{i}" @@ -110,7 +110,7 @@ def test_delete_all(self, configure_repo): def test_delete_many(self, configure_repo): configure_repo() - submission = Submission("entity_id") + submission = Submission("entity_id", "ENTITY_TYPE") submission_repository = _SubmissionManagerFactory._build_manager()._repository for i in range(10): @@ -130,7 +130,7 @@ def test_delete_by(self, configure_repo): # Create 5 entities with version 1.0 and 5 entities with version 2.0 submission_repository = _SubmissionManagerFactory._build_manager()._repository - submission = Submission("entity_id") + submission = Submission("entity_id", "ENTITY_TYPE") for i in range(10): submission.id = f"submission-{i}" @@ -148,7 +148,7 @@ def test_search(self, configure_repo): configure_repo() submission_repository = _SubmissionManagerFactory._build_manager()._repository - submission = Submission("entity_id", version="random_version_number") + submission = Submission("entity_id", "ENTITY_TYPE", version="random_version_number") for i in range(10): submission.id = f"submission-{i}" submission_repository._save(submission) @@ -170,7 +170,7 @@ def test_export(self, tmpdir, configure_repo): configure_repo() repository = _SubmissionManagerFactory._build_manager()._repository - submission = Submission("entity_id") + submission = Submission("entity_id", "ENTITY_TYPE") repository._save(submission) repository._export(submission.id, tmpdir.strpath)