Skip to content
This repository has been archived by the owner on Jan 2, 2024. It is now read-only.

Improve Core Events to support additional metadata #829

Merged
merged 6 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/taipy/core/_entity/_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from typing import List

from .._entity._reload import _get_manager
from ..notification import _publish_event
from ..notification import Notifier


class _Entity:
Expand All @@ -34,6 +34,6 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
self._properties.data.update(self._properties._pending_changes)
_get_manager(self._MANAGER_NAME)._set(self)

while self._in_context_attributes_changed_collector:
_publish_event(*self._in_context_attributes_changed_collector.pop(0))
for event in self._in_context_attributes_changed_collector:
Notifier.publish(event)
Copy link
Contributor Author

@gmarabout gmarabout Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: Adding more arguments to the Event class make the use of a arg list a bit cumbersome and very error prone (think the order being significant!).
That is why I propose to replace this list of args by a list of Event objects. Then, the logic does not change, except that it makes more sense to call Notifier.publish directly now...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's awesome!!

_get_manager(self._MANAGER_NAME)._set(self)
31 changes: 15 additions & 16 deletions src/taipy/core/_entity/_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@

from collections import UserDict

from ..notification import _ENTITY_TO_EVENT_ENTITY_TYPE, EventOperation, _publish_event
from ..notification import _ENTITY_TO_EVENT_ENTITY_TYPE, EventOperation, Notifier, _make_event


class _Properties(UserDict):

__PROPERTIES_ATTRIBUTE_NAME = "properties"

def __init__(self, entity_owner, **kwargs):
Expand All @@ -29,20 +28,20 @@ def __setitem__(self, key, value):
from ... import core as tp

if hasattr(self, "_entity_owner"):
to_publish_event_parameters = [
_ENTITY_TO_EVENT_ENTITY_TYPE[self._entity_owner._MANAGER_NAME],
self._entity_owner.id,
event = _make_event(
self._entity_owner,
EventOperation.UPDATE,
self.__PROPERTIES_ATTRIBUTE_NAME,
]
attribute_name=self.__PROPERTIES_ATTRIBUTE_NAME,
attribute_value=value,
)
if not self._entity_owner._is_in_context:
tp.set(self._entity_owner)
_publish_event(*to_publish_event_parameters)
Notifier.publish(event)
else:
if key in self._pending_deletions:
self._pending_deletions.remove(key)
self._pending_changes[key] = value
self._entity_owner._in_context_attributes_changed_collector.append(to_publish_event_parameters)
self._entity_owner._in_context_attributes_changed_collector.append(event)

def __getitem__(self, key):
from taipy.config.common._template_handler import _TemplateHandler as _tpl
Expand All @@ -54,16 +53,16 @@ def __delitem__(self, key):
from ... import core as tp

if hasattr(self, "_entity_owner"):
to_publish_event_parameters = [
_ENTITY_TO_EVENT_ENTITY_TYPE[self._entity_owner._MANAGER_NAME],
self._entity_owner.id,
event = _make_event(
self._entity_owner,
EventOperation.UPDATE,
self.__PROPERTIES_ATTRIBUTE_NAME,
]
attribute_name=self.__PROPERTIES_ATTRIBUTE_NAME,
attribute_value=None,
)
if not self._entity_owner._is_in_context:
tp.set(self._entity_owner)
_publish_event(*to_publish_event_parameters)
Notifier.publish(event)
else:
self._pending_changes.pop(key, None)
self._pending_deletions.add(key)
self._entity_owner._in_context_attributes_changed_collector.append(to_publish_event_parameters)
self._entity_owner._in_context_attributes_changed_collector.append(event)
20 changes: 12 additions & 8 deletions src/taipy/core/_entity/_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import functools

from ..notification import EventOperation, _publish_event
from ..notification import EventOperation, Notifier, _make_event


class _Reloader:
Expand Down Expand Up @@ -65,19 +65,23 @@ def __set_entity(fct):
def _do_set_entity(self, *args, **kwargs):
fct(self, *args, **kwargs)
entity_manager = _get_manager(manager)
to_publish_event_parameters = [
entity_manager._EVENT_ENTITY_TYPE,
self.id,
if len(args) == 1:
value = args[0]
else:
value = args
event = _make_event(
self,
EventOperation.UPDATE,
fct.__name__,
]
attribute_name=fct.__name__,
attribute_value=value,
)
if not self._is_in_context:
entity = _Reloader()._reload(manager, self)
fct(entity, *args, **kwargs)
entity_manager._set(entity)
_publish_event(*to_publish_event_parameters)
Notifier.publish(event)
else:
self._in_context_attributes_changed_collector.append(to_publish_event_parameters)
self._in_context_attributes_changed_collector.append(event)

return _do_set_entity

Expand Down
36 changes: 31 additions & 5 deletions src/taipy/core/_manager/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@
# specific language governing permissions and limitations under the License.

import pathlib
from importlib import metadata
from typing import Dict, Generic, Iterable, List, Optional, TypeVar, Union

from taipy.logger._taipy_logger import _TaipyLogger

from .._entity._entity_ids import _EntityIds
from .._repository._abstract_repository import _AbstractRepository
from ..exceptions.exceptions import ModelNotFound
from ..notification import EventOperation, _publish_event
from ..notification import Event, EventOperation, Notifier

EntityType = TypeVar("EntityType")

Expand All @@ -34,7 +35,13 @@ def _delete_all(cls):
"""
cls._repository._delete_all()
if hasattr(cls, "_EVENT_ENTITY_TYPE"):
_publish_event(cls._EVENT_ENTITY_TYPE, "all", EventOperation.DELETION, None)
Notifier.publish(
Event(
cls._EVENT_ENTITY_TYPE,
EventOperation.DELETION,
metadata={"delete_all": True},
)
)

@classmethod
def _delete_many(cls, ids: Iterable):
Expand All @@ -44,7 +51,14 @@ def _delete_many(cls, ids: Iterable):
cls._repository._delete_many(ids)
if hasattr(cls, "_EVENT_ENTITY_TYPE"):
for entity_id in ids:
_publish_event(cls._EVENT_ENTITY_TYPE, entity_id, EventOperation.DELETION, None) # type: ignore
Notifier.publish(
Event(
cls._EVENT_ENTITY_TYPE, # type: ignore
EventOperation.DELETION,
entity_id=entity_id,
metadata={"delete_all": True},
)
)

@classmethod
def _delete_by_version(cls, version_number: str):
Expand All @@ -53,7 +67,13 @@ def _delete_by_version(cls, version_number: str):
"""
cls._repository._delete_by(attribute="version", value=version_number)
if hasattr(cls, "_EVENT_ENTITY_TYPE"):
_publish_event(cls._EVENT_ENTITY_TYPE, None, EventOperation.DELETION, None) # type: ignore
Notifier.publish(
Event(
cls._EVENT_ENTITY_TYPE, # type: ignore
EventOperation.DELETION,
metadata={"delete_by_version": version_number},
)
)

@classmethod
def _delete(cls, id):
Expand All @@ -62,7 +82,13 @@ def _delete(cls, id):
"""
cls._repository._delete(id)
if hasattr(cls, "_EVENT_ENTITY_TYPE"):
_publish_event(cls._EVENT_ENTITY_TYPE, id, EventOperation.DELETION, None)
Notifier.publish(
Event(
cls._EVENT_ENTITY_TYPE,
EventOperation.DELETION,
entity_id=id,
)
)

@classmethod
def _set(cls, entity: EntityType):
Expand Down
6 changes: 5 additions & 1 deletion src/taipy/core/cycle/_cycle_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ def _create(
frequency, properties, creation_date=creation_date, start_date=start_date, end_date=end_date, name=name
)
cls._set(cycle)
_publish_event(cls._EVENT_ENTITY_TYPE, cycle.id, EventOperation.CREATION, None)
_publish_event(
cls._EVENT_ENTITY_TYPE,
EventOperation.CREATION,
entity_id=cycle.id,
)
return cycle

@classmethod
Expand Down
21 changes: 21 additions & 0 deletions src/taipy/core/cycle/cycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .._entity._properties import _Properties
from .._entity._reload import _Reloader, _self_reload, _self_setter
from ..exceptions.exceptions import _SuspiciousFileOperation
from ..notification.event import Event, EventEntityType, EventOperation, _make_event
from .cycle_id import CycleId


Expand Down Expand Up @@ -176,3 +177,23 @@ def get_simple_label(self) -> str:
The simple label of the cycle as a string.
"""
return self._get_simple_label()


@_make_event.register(Cycle)
def _make_event_for_cycle(
cycle: Cycle,
operation: EventOperation,
/,
attribute_name: Optional[str] = None,
attribute_value: Optional[Any] = None,
**kwargs,
) -> Event:
metadata = {**kwargs}
return Event(
entity_type=EventEntityType.CYCLE,
entity_id=cycle.id,
operation=operation,
attribute_name=attribute_name,
attribute_value=attribute_value,
metadata=metadata,
)
9 changes: 5 additions & 4 deletions src/taipy/core/data/_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ..config.data_node_config import DataNodeConfig
from ..cycle.cycle_id import CycleId
from ..exceptions.exceptions import InvalidDataNodeType
from ..notification import EventEntityType, EventOperation, _publish_event
from ..notification import Event, EventEntityType, EventOperation, Notifier, _make_event
from ..scenario.scenario_id import ScenarioId
from ..sequence.sequence_id import SequenceId
from ._abstract_file import _AbstractFileDataNode
Expand All @@ -33,7 +33,6 @@


class _DataManager(_Manager[DataNode], _VersionMixin):

__DATA_NODE_CLASS_MAP = DataNode._class_map() # type: ignore
_ENTITY_NAME = DataNode.__name__
_EVENT_ENTITY_TYPE = EventEntityType.DATA_NODE
Expand Down Expand Up @@ -77,7 +76,7 @@ def _create_and_set(
cls._set(data_node)
if isinstance(data_node, _AbstractFileDataNode):
_append_to_backup_file(new_file_path=data_node._path)
_publish_event(cls._EVENT_ENTITY_TYPE, data_node.id, EventOperation.CREATION, None)
Notifier.publish(_make_event(data_node, EventOperation.CREATION))
return data_node

@classmethod
Expand Down Expand Up @@ -166,7 +165,9 @@ def _delete_by_version(cls, version_number: str):
cls._clean_pickle_files(data_nodes)
cls._remove_dn_file_paths_in_backup_file(data_nodes)
cls._repository._delete_by(attribute="version", value=version_number)
_publish_event(cls._EVENT_ENTITY_TYPE, None, EventOperation.DELETION, None)
Notifier.publish(
Event(EventEntityType.DATA_NODE, EventOperation.DELETION, metadata={"delete_by_version": version_number})
)

@classmethod
def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None) -> List[DataNode]:
Expand Down
21 changes: 21 additions & 0 deletions src/taipy/core/data/data_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ..common._warnings import _warn_deprecated
from ..exceptions.exceptions import DataNodeIsBeingEdited, NoData
from ..job.job_id import JobId
from ..notification.event import Event, EventEntityType, EventOperation, _make_event
from ._filter import _FilterDataNode
from .data_node_id import DataNodeId, Edit
from .operator import JoinOperator
Expand Down Expand Up @@ -555,3 +556,23 @@ def get_simple_label(self) -> str:
The simple label of the data node as a string.
"""
return self._get_simple_label()


@_make_event.register(DataNode)
def make_event_for_datanode(
data_node: DataNode,
operation: EventOperation,
/,
attribute_name: Optional[str] = None,
attribute_value: Optional[Any] = None,
**kwargs,
) -> Event:
metadata = {"config_id": data_node.config_id, **kwargs}
return Event(
entity_type=EventEntityType.DATA_NODE,
entity_id=data_node.id,
operation=operation,
attribute_name=attribute_name,
attribute_value=attribute_value,
metadata=metadata,
)
5 changes: 2 additions & 3 deletions src/taipy/core/job/_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
from .._version._version_manager_factory import _VersionManagerFactory
from .._version._version_mixin import _VersionMixin
from ..exceptions.exceptions import JobNotDeletedException
from ..notification import EventEntityType, EventOperation, _publish_event
from ..notification import EventEntityType, EventOperation, Notifier, _make_event
from ..task.task import Task
from .job import Job
from .job_id import JobId


class _JobManager(_Manager[Job], _VersionMixin):

_ENTITY_NAME = Job.__name__
_ID_PREFIX = "JOB_"
_repository: _AbstractRepository
Expand Down Expand Up @@ -52,7 +51,7 @@ def _create(
version=version,
)
cls._set(job)
_publish_event(cls._EVENT_ENTITY_TYPE, job.id, EventOperation.CREATION, None)
Notifier.publish(_make_event(job, EventOperation.CREATION))
job._on_status_change(*callbacks)
return job

Expand Down
26 changes: 25 additions & 1 deletion src/taipy/core/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import traceback
from datetime import datetime
from typing import Callable, List
from typing import Any, Callable, List, Optional

from taipy.logger._taipy_logger import _TaipyLogger

Expand All @@ -22,6 +22,7 @@
from .._entity._reload import _self_reload, _self_setter
from .._version._version_manager_factory import _VersionManagerFactory
from ..common._utils import _fcts_to_dict
from ..notification.event import Event, EventEntityType, EventOperation, _make_event
from ..task.task import Task
from .job_id import JobId
from .status import Status
Expand Down Expand Up @@ -70,6 +71,9 @@ def __init__(self, id: JobId, task: Task, submit_id: str, submit_entity_id: str,
self.__logger = _TaipyLogger._get_logger()
self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()

def get_event_context(self):
return {"task_config_id": self._task.config_id}

@property # type: ignore
@_self_reload(_MANAGER_NAME)
def task(self):
Expand Down Expand Up @@ -351,3 +355,23 @@ def is_deletable(self) -> bool:
from ... import core as tp

return tp.is_deletable(self)


@_make_event.register(Job)
def _make_event_for_job(
job: Job,
operation: EventOperation,
/,
attribute_name: Optional[str] = None,
attribute_value: Optional[Any] = None,
**kwargs,
) -> Event:
metadata = {"creation_date": job.creation_date, "task_config_id": job._task.config_id}
return Event(
entity_type=EventEntityType.JOB,
entity_id=job.id,
operation=operation,
attribute_name=attribute_name,
attribute_value=attribute_value,
metadata={**metadata, **kwargs},
)
2 changes: 1 addition & 1 deletion src/taipy/core/notification/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@
from ._registration import _Registration
from ._topic import _Topic
from .core_event_consumer import CoreEventConsumerBase
from .event import _ENTITY_TO_EVENT_ENTITY_TYPE, Event, EventEntityType, EventOperation
from .event import _ENTITY_TO_EVENT_ENTITY_TYPE, Event, EventEntityType, EventOperation, _make_event
from .notifier import Notifier, _publish_event
from .registration_id import RegistrationId
Loading
Loading