diff --git a/.github/workflows/code.yml b/.github/workflows/code.yml index 4414029d..ef74d363 100644 --- a/.github/workflows/code.yml +++ b/.github/workflows/code.yml @@ -24,8 +24,8 @@ jobs: install_options: -e .[dev] - name: Lint - run: tox -e pre-commit #, mypy - + run: tox -e pre-commit,mypy + test: if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name != github.repository strategy: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index aa2a4cb2..136b6801 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,3 +21,10 @@ repos: language: system entry: flake8 types: [python] + + - id: mypy + name: Run mypy + stages: [commit] + language: system + entry: mypy --install-types --non-interactive + types: [python] diff --git a/event_model/__init__.py b/event_model/__init__.py index 5f363add..8552f2c9 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -11,16 +11,47 @@ import uuid import warnings import weakref -from collections import defaultdict, deque, namedtuple +from collections import defaultdict, deque from dataclasses import dataclass from enum import Enum from functools import partial -from typing import Optional +from typing import ( + Any, + Callable, + Dict, + Generator, + Iterable, + Iterator, + List, + Optional, + Tuple, + Type, + Union, + cast, + no_type_check, +) import jsonschema import numpy from packaging import version from pkg_resources import resource_filename as rs_fn +from typing_extensions import Literal + +from .documents.datum import Datum +from .documents.datum_page import DatumPage +from .documents.event import Event +from .documents.event_descriptor import ( + Configuration, + DataKey, + EventDescriptor, + PerObjectHint, +) +from .documents.event_page import EventPage +from .documents.resource import Resource +from .documents.run_start import RunStart +from .documents.run_stop import RunStop +from .documents.stream_datum import StreamDatum +from .documents.stream_resource import StreamResource if sys.version_info < (3, 8): from importlib_metadata import metadata @@ -84,9 +115,12 @@ class DocumentRouter: Expected signature ``f(name, doc)`` """ - def __init__(self, *, emit=None): + def __init__(self, *, emit: Optional[Callable] = None) -> None: # Put in some extra effort to validate `emit` carefully, because if # this is used incorrectly the resultant errors can be confusing. + + self._emit_ref: Optional[Callable] = None + if emit is not None: if not callable(emit): raise ValueError("emit must be a callable") @@ -103,10 +137,8 @@ def __init__(self, *, emit=None): self._emit_ref = weakref.WeakMethod(emit) else: self._emit_ref = weakref.ref(emit) - else: - self._emit_ref = None - def emit(self, name, doc): + def emit(self, name: str, doc: dict) -> None: """ Emit to the callable provided an instantiation time, if any. """ @@ -116,7 +148,9 @@ def emit(self, name, doc): if emit is not None: emit(name, doc) - def __call__(self, name, doc, validate=False): + def __call__( + self, name: str, doc: dict, validate: bool = False + ) -> Tuple[str, dict]: """ Process a document. @@ -136,7 +170,7 @@ def __call__(self, name, doc, validate=False): """ return self._dispatch(name, doc, validate) - def _dispatch(self, name, doc, validate): + def _dispatch(self, name: str, doc: dict, validate: bool) -> Tuple[str, dict]: """ Dispatch to the method corresponding to the `name`. @@ -148,7 +182,7 @@ def _dispatch(self, name, doc, validate): # vice versa, use that. And the same for 'datum_page' / 'datum. if output_doc is NotImplemented: if name == "event": - event_page = pack_event_page(doc) + event_page = pack_event_page(cast(Event, doc)) # Subclass' implementation of event_page may return a valid # EventPage or None or NotImplemented. output_event_page = self.event_page(event_page) @@ -158,7 +192,7 @@ def _dispatch(self, name, doc, validate): if output_event_page is not NotImplemented: (output_doc,) = unpack_event_page(output_event_page) elif name == "datum": - datum_page = pack_datum_page(doc) + datum_page = pack_datum_page(cast(Datum, doc)) # Subclass' implementation of datum_page may return a valid # DatumPage or None or NotImplemented. output_datum_page = self.datum_page(datum_page) @@ -169,7 +203,7 @@ def _dispatch(self, name, doc, validate): (output_doc,) = unpack_datum_page(output_datum_page) elif name == "event_page": output_events = [] - for event in unpack_event_page(doc): + for event in unpack_event_page(cast(EventPage, doc)): # Subclass' implementation of event may return a valid # Event or None or NotImplemented. output_event = self.event(event) @@ -181,7 +215,7 @@ def _dispatch(self, name, doc, validate): output_doc = pack_event_page(*output_events) elif name == "datum_page": output_datums = [] - for datum in unpack_datum_page(doc): + for datum in unpack_datum_page(cast(DatumPage, doc)): # Subclass' implementation of datum may return a valid # Datum or None or NotImplemented. output_datum = self.datum(datum) @@ -207,37 +241,37 @@ def _dispatch(self, name, doc, validate): # to how Python uses NotImplemented in arithmetic operations, as described # in the documentation. - def start(self, doc): + def start(self, doc: RunStart) -> Optional[RunStart]: return NotImplemented - def stop(self, doc): + def stop(self, doc: RunStop) -> Optional[RunStop]: return NotImplemented - def descriptor(self, doc): + def descriptor(self, doc: EventDescriptor) -> Optional[EventDescriptor]: return NotImplemented - def resource(self, doc): + def resource(self, doc: Resource) -> Optional[Resource]: return NotImplemented - def event(self, doc): + def event(self, doc: Event) -> Event: return NotImplemented - def datum(self, doc): + def datum(self, doc: Datum) -> Datum: return NotImplemented - def event_page(self, doc): + def event_page(self, doc: EventPage) -> EventPage: return NotImplemented - def datum_page(self, doc): + def datum_page(self, doc: DatumPage) -> Optional[DatumPage]: return NotImplemented - def stream_datum(self, doc): + def stream_datum(self, doc: StreamDatum) -> Optional[StreamDatum]: return NotImplemented - def stream_resource(self, doc): + def stream_resource(self, doc: StreamResource) -> Optional[StreamResource]: return NotImplemented - def bulk_events(self, doc): + def bulk_events(self, doc: dict) -> None: # Do not modify this in a subclass. Use event_page. warnings.warn( "The document type 'bulk_events' has been deprecated in favor of " @@ -246,7 +280,7 @@ def bulk_events(self, doc): for page in bulk_events_to_event_pages(doc): self.event_page(page) - def bulk_datum(self, doc): + def bulk_datum(self, doc: dict) -> None: # Do not modify this in a subclass. Use event_page. warnings.warn( "The document type 'bulk_datum' has been deprecated in favor of " @@ -260,12 +294,14 @@ class SingleRunDocumentRouter(DocumentRouter): A DocumentRouter intended to process events from exactly one run. """ - def __init__(self): + def __init__(self) -> None: super().__init__() - self._start_doc = None - self._descriptors = dict() + self._start_doc: Optional[dict] = None + self._descriptors: dict = dict() - def __call__(self, name, doc, validate=False): + def __call__( + self, name: str, doc: dict, validate: bool = False + ) -> Tuple[str, dict]: """ Process a document. @@ -296,6 +332,7 @@ def __call__(self, name, doc, validate=False): f'received a second start document with uid {doc["uid"]}' ) elif name == "descriptor": + assert isinstance(self._start_doc, dict) if doc["run_start"] == self._start_doc["uid"]: self._descriptors[doc["uid"]] = doc else: @@ -306,9 +343,9 @@ def __call__(self, name, doc, validate=False): f'start document {doc["run_start"]}' ) # Defer to superclass for dispatch/processing. - return super().__call__(name, doc, validate) + return super().__call__(name, doc, validate=validate) - def get_start(self): + def get_start(self) -> dict: """Convenience method returning the start document for the associated run. If no start document has been processed EventModelError will be raised. @@ -324,7 +361,7 @@ def get_start(self): return self._start_doc - def get_descriptor(self, doc): + def get_descriptor(self, doc: dict) -> EventDescriptor: """Convenience method returning the descriptor associated with the specified document. @@ -335,7 +372,7 @@ def get_descriptor(self, doc): Returns ------- - descriptor document : dict + descriptor document : EventDescriptor """ if "descriptor" not in doc: raise EventModelValueError( @@ -349,7 +386,7 @@ def get_descriptor(self, doc): return self._descriptors[doc["descriptor"]] - def get_stream_name(self, doc): + def get_stream_name(self, doc: dict) -> str: """Convenience method returning the name of the stream for the specified document. @@ -362,32 +399,32 @@ def get_stream_name(self, doc): ------- stream name : str """ - return self.get_descriptor(doc).get("name") + return str(self.get_descriptor(doc).get("name")) class HandlerRegistryView(collections.abc.Mapping): - def __init__(self, handler_registry): + def __init__(self, handler_registry: dict) -> None: self._handler_registry = handler_registry - def __repr__(self): + def __repr__(self) -> str: return f"HandlerRegistryView({self._handler_registry!r})" - def __getitem__(self, key): + def __getitem__(self, key: str) -> str: return self._handler_registry[key] - def __iter__(self): + def __iter__(self) -> Generator: yield from self._handler_registry - def __len__(self): + def __len__(self) -> int: return len(self._handler_registry) - def __setitem__(self, key, value): + def __setitem__(self, key: str, val: Any) -> None: raise EventModelTypeError( "The handler registry cannot be edited directly. " "Instead, use the method Filler.register_handler." ) - def __delitem__(self, key): + def __delitem__(self, key: str) -> None: raise EventModelTypeError( "The handler registry cannot be edited directly. " "Instead, use the method Filler.deregister_handler." @@ -423,12 +460,13 @@ def __delitem__(self, key): # delayed-computation framework (e.g. dask) in event-model itself. -def as_is(handler_class, filler_state): +def as_is(handler_class, filler_state) -> Type: "A no-op coercion function that returns handler_class unchanged." return handler_class -def force_numpy(handler_class, filler_state): +@no_type_check +def force_numpy(handler_class: Type, filler_state) -> Any: "A coercion that makes handler_class.__call__ return actual numpy.ndarray." class Subclass(handler_class): @@ -446,7 +484,7 @@ def __call__(self, *args, **kwargs): _coercion_registry = {"as_is": as_is, "force_numpy": force_numpy} -def register_coercion(name, func, overwrite=False): +def register_coercion(name: str, func: Callable, overwrite: bool = False) -> None: """ Register a new option for :class:`Filler`'s ``coerce`` argument. @@ -582,20 +620,20 @@ class Filler(DocumentRouter): def __init__( self, - handler_registry, + handler_registry: dict, *, - include=None, - exclude=None, - root_map=None, - coerce="as_is", - handler_cache=None, - resource_cache=None, - datum_cache=None, - descriptor_cache=None, - stream_resource_cache=None, - stream_datum_cache=None, - inplace=None, - retry_intervals=( + include: Optional[Iterable] = None, + exclude: Optional[Iterable] = None, + root_map: Optional[dict] = None, + coerce: str = "as_is", + handler_cache: Optional[dict] = None, + resource_cache: Optional[dict] = None, + datum_cache: Optional[dict] = None, + descriptor_cache: Optional[dict] = None, + stream_resource_cache: Optional[dict] = None, + stream_datum_cache: Optional[dict] = None, + inplace: Optional[bool] = None, + retry_intervals: List = [ 0.001, 0.002, 0.004, @@ -607,8 +645,8 @@ def __init__( 0.256, 0.512, 1.024, - ), - ): + ], + ) -> None: if inplace is None: self._inplace = True warnings.warn( @@ -638,8 +676,8 @@ def __init__( # _current_state, which is passed to coercion functions' `filler_state` # parameter. self._current_state = threading.local() - self._unpatched_handler_registry = {} - self._handler_registry = {} + self._unpatched_handler_registry: dict = {} + self._handler_registry: dict = {} for spec, handler_class in handler_registry.items(): self.register_handler(spec, handler_class) self.handler_registry = HandlerRegistryView(self._handler_registry) @@ -681,7 +719,7 @@ def __init__( self.retry_intervals = retry_intervals self._closed = False - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: return ( type(self) is type(other) and self.inplace == other.inplace @@ -698,7 +736,7 @@ def __eq__(self, other): and self.retry_intervals == other.retry_intervals ) - def __getstate__(self): + def __getstate__(self) -> dict: return dict( inplace=self._inplace, coercion_func=self._coerce, @@ -715,7 +753,7 @@ def __getstate__(self): retry_intervals=self.retry_intervals, ) - def __setstate__(self, d): + def __setstate__(self, d: dict) -> None: self._inplace = d["inplace"] self._coerce = d["coercion_func"] @@ -744,59 +782,59 @@ def __setstate__(self, d): self._closed = False @property - def retry_intervals(self): + def retry_intervals(self) -> List: return self._retry_intervals @retry_intervals.setter - def retry_intervals(self, value): + def retry_intervals(self, value: Any) -> None: self._retry_intervals = list(value) - def __repr__(self): + def __repr__(self) -> str: return "" if not self._closed else "" @staticmethod - def get_default_resource_cache(): + def get_default_resource_cache() -> dict: return {} @staticmethod - def get_default_descriptor_cache(): + def get_default_descriptor_cache() -> dict: return {} @staticmethod - def get_default_datum_cache(): + def get_default_datum_cache() -> dict: return {} @staticmethod - def get_default_handler_cache(): + def get_default_handler_cache() -> dict: return {} @staticmethod - def get_default_stream_datum_cache(): + def get_default_stream_datum_cache() -> dict: return {} @staticmethod - def get_default_stream_resource_cache(): + def get_default_stream_resource_cache() -> dict: return {} @property - def inplace(self): + def inplace(self) -> bool: return self._inplace def clone( self, - handler_registry=None, + handler_registry: Optional[dict] = None, *, - root_map=None, - coerce=None, - handler_cache=None, - resource_cache=None, - datum_cache=None, - descriptor_cache=None, - stream_resource_cache=None, - stream_datum_cache=None, - inplace=None, - retry_intervals=None, - ): + root_map: Optional[dict] = None, + coerce: Optional[str] = None, + handler_cache: Optional[dict] = None, + resource_cache: Optional[dict] = None, + datum_cache: Optional[dict] = None, + descriptor_cache: Optional[dict] = None, + stream_resource_cache: Optional[dict] = None, + stream_datum_cache: Optional[dict] = None, + inplace: Optional[bool] = None, + retry_intervals: Optional[List] = None, + ) -> "Filler": """ Create a new Filler instance from this one. @@ -830,7 +868,9 @@ def clone( retry_intervals=retry_intervals, ) - def register_handler(self, spec, handler, overwrite=False): + def register_handler( + self, spec: str, handler: Any, overwrite: bool = False + ) -> None: """ Register a handler. @@ -866,7 +906,7 @@ def register_handler(self, spec, handler, overwrite=False): # wants to. self._handler_registry[spec] = self._coercion_func(handler, self._current_state) - def deregister_handler(self, spec): + def deregister_handler(self, spec: str) -> Any: """ Deregister a handler. @@ -892,7 +932,7 @@ def deregister_handler(self, spec): del self._handler_cache[key] return handler - def resource(self, doc): + def resource(self, doc: Resource) -> Resource: # Defer creating the handler instance until we actually need it, when # we fill the first Event field that requires this Resource. self._resource_cache[doc["uid"]] = doc @@ -901,24 +941,25 @@ def resource(self, doc): # Handlers operate document-wise, so we'll explode pages into individual # documents. - def datum_page(self, doc): + def datum_page(self, doc: DatumPage) -> DatumPage: datum = self.datum # Avoid attribute lookup in hot loop. for datum_doc in unpack_datum_page(doc): datum(datum_doc) return doc - def datum(self, doc): + def datum(self, doc: Datum) -> Datum: self._datum_cache[doc["datum_id"]] = doc return doc - def stream_resource(self, doc): + def stream_resource(self, doc: StreamResource) -> StreamResource: self._stream_resource_cache[doc["uid"]] = doc return doc - def stream_datum(self, doc): + def stream_datum(self, doc: StreamDatum) -> StreamDatum: self._stream_datum_cache[doc["uid"]] = doc + return doc - def event_page(self, doc): + def event_page(self, doc: EventPage) -> EventPage: # TODO We may be able to fill a page in place, and that may be more # efficient than unpacking the page in to Events, filling them, and the # re-packing a new page. But that seems tricky in general since the @@ -928,11 +969,17 @@ def event_page(self, doc): ) return filled_doc - def event(self, doc): + def event(self, doc: Event) -> Event: filled_doc = self.fill_event(doc, include=self.include, exclude=self.exclude) return filled_doc - def fill_event_page(self, doc, include=None, exclude=None, inplace=None): + def fill_event_page( + self, + doc: EventPage, + include: Optional[Iterable] = None, + exclude: Optional[Iterable] = None, + inplace: Optional[bool] = None, + ) -> EventPage: filled_events = [] for event_doc in unpack_event_page(doc): filled_events.append( @@ -950,13 +997,13 @@ def fill_event_page(self, doc, include=None, exclude=None, inplace=None): else: return filled_doc - def get_handler(self, resource): + def get_handler(self, resource: Resource) -> Any: """ Return a new Handler instance for this Resource. Parameters ---------- - resource: dict + resource: Resource Returns ------- @@ -1006,7 +1053,7 @@ def get_handler(self, resource): ) return handler - def _get_handler_maybe_cached(self, resource): + def _get_handler_maybe_cached(self, resource: Resource) -> Any: "Get a cached handler for this resource or make one and cache it." key = (resource["uid"], resource["spec"]) try: @@ -1016,7 +1063,13 @@ def _get_handler_maybe_cached(self, resource): self._handler_cache[key] = handler return handler - def fill_event(self, doc, include=None, exclude=None, inplace=None): + def fill_event( + self, + doc, + include: Optional[Iterable] = None, + exclude: Optional[Iterable] = None, + inplace: Optional[bool] = None, + ) -> Any: if inplace is None: inplace = self._inplace if inplace: @@ -1110,14 +1163,15 @@ def fill_event(self, doc, include=None, exclude=None, inplace=None): self._current_state.datum = None return filled_doc - def descriptor(self, doc): + def descriptor(self, doc: EventDescriptor) -> EventDescriptor: self._descriptor_cache[doc["uid"]] = doc return doc def __enter__(self): return self - def close(self): + @no_type_check + def close(self) -> None: """ Drop cached documents and handlers. @@ -1135,10 +1189,10 @@ def close(self): self._descriptor_cache = None @property - def closed(self): + def closed(self) -> bool: return self._closed - def clear_handler_cache(self): + def clear_handler_cache(self) -> None: """ Clear any cached handler instances. @@ -1147,7 +1201,7 @@ def clear_handler_cache(self): """ self._handler_cache.clear() - def clear_document_caches(self): + def clear_document_caches(self) -> None: """ Clear any cached documents. """ @@ -1155,10 +1209,12 @@ def clear_document_caches(self): self._descriptor_cache.clear() self._datum_cache.clear() - def __exit__(self, *exc_details): + def __exit__(self, *exc_details) -> None: self.close() - def __call__(self, name, doc, validate=False): + def __call__( + self, name: str, doc: dict, validate: bool = False + ) -> Tuple[str, dict]: if self._closed: raise EventModelRuntimeError( "This Filler has been closed and is no longer usable." @@ -1166,9 +1222,18 @@ def __call__(self, name, doc, validate=False): return super().__call__(name, doc, validate) +class EventModelError(Exception): + ... + + def _attempt_with_retries( - func, args, kwargs, intervals, error_to_catch, error_to_raise -): + func, + args, + kwargs, + intervals: Iterable, + error_to_catch: Type[OSError], + error_to_raise: EventModelError, +) -> Any: """ Return func(*args, **kwargs), using a retry loop. @@ -1208,13 +1273,19 @@ class NoFiller(Filler): the information that we will need when that computation occurs. """ - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: # Do not make Filler make copies because we are not going to alter the # documents anyway. kwargs.setdefault("inplace", True) super().__init__(*args, **kwargs) - def fill_event_page(self, doc, include=None, exclude=None): + def fill_event_page( + self, + doc: EventPage, + include: Optional[Iterable] = None, + exclude: Optional[Iterable] = None, + *kwargs, + ) -> EventPage: filled_events = [] for event_doc in unpack_event_page(doc): filled_events.append( @@ -1225,7 +1296,13 @@ def fill_event_page(self, doc, include=None, exclude=None): filled_doc = pack_event_page(*filled_events) return filled_doc - def fill_event(self, doc, include=None, exclude=None, inplace=None): + def fill_event( + self, + doc: Event, + include: Optional[Iterable] = None, + exclude: Optional[Iterable] = None, + inplace: Optional[bool] = None, + ) -> Event: descriptor = self._descriptor_cache[doc["descriptor"]] from_datakeys = False try: @@ -1383,12 +1460,12 @@ class RunRouter(DocumentRouter): def __init__( self, factories, - handler_registry=None, + handler_registry: Optional[dict] = None, *, - root_map=None, - filler_class=Filler, - fill_or_fail=False, - ): + root_map: Optional[dict] = None, + filler_class: Type[Filler] = Filler, + fill_or_fail: bool = False, + ) -> None: self.factories = factories self.handler_registry = handler_registry or {} self.filler_class = filler_class @@ -1397,45 +1474,45 @@ def __init__( # Map RunStart UID to "subfactory" functions that want all # EventDescriptors from that run. - self._subfactories = defaultdict(list) + self._subfactories: defaultdict = defaultdict(list) # Callbacks that want all the documents from a given run, keyed on # RunStart UID. - self._factory_cbs_by_start = defaultdict(list) + self._factory_cbs_by_start: defaultdict = defaultdict(list) # Callbacks that want all the documents from a given run, keyed on # each EventDescriptor UID in the run. - self._factory_cbs_by_descriptor = defaultdict(list) + self._factory_cbs_by_descriptor: defaultdict = defaultdict(list) # Callbacks that want documents related to a given EventDescriptor, # keyed on EventDescriptor UID. - self._subfactory_cbs_by_descriptor = defaultdict(list) + self._subfactory_cbs_by_descriptor: defaultdict = defaultdict(list) # Callbacks that want documents related to a given EventDescriptor, # keyed on the RunStart UID referenced by that EventDescriptor. - self._subfactory_cbs_by_start = defaultdict(list) + self._subfactory_cbs_by_start: defaultdict = defaultdict(list) # Map RunStart UID to RunStart document. This is used to send # RunStart documents to subfactory callbacks. - self._start_to_start_doc = dict() + self._start_to_start_doc: dict = dict() # Map RunStart UID to the list EventDescriptor. This is used to # facilitate efficient cleanup of the caches above. - self._start_to_descriptors = defaultdict(list) + self._start_to_descriptors: defaultdict = defaultdict(list) # Map EventDescriptor UID to RunStart UID. This is used for looking up # Fillers. - self._descriptor_to_start = {} + self._descriptor_to_start: dict = {} # Map Resource UID to RunStart UID. - self._resources = {} - self._stream_resources = {} + self._resources: dict = {} + self._stream_resources: dict = {} # Old-style Resources that do not have a RunStart UID - self._unlabeled_resources = deque(maxlen=10000) + self._unlabeled_resources: deque = deque(maxlen=10000) # Map Runstart UID to instances of self.filler_class. - self._fillers = {} + self._fillers: dict = {} def __repr__(self): return ( @@ -1444,7 +1521,7 @@ def __repr__(self): + "])" ) - def start(self, start_doc): + def start(self, start_doc: RunStart) -> None: uid = start_doc["uid"] # If we get the same uid twice, weird things will happen, so check for # that and give a nice error message. @@ -1483,7 +1560,7 @@ def start(self, start_doc): self._factory_cbs_by_start[uid].extend(callbacks) self._subfactories[uid].extend(subfactories) - def descriptor(self, descriptor_doc): + def descriptor(self, descriptor_doc: EventDescriptor) -> None: descriptor_uid = descriptor_doc["uid"] start_uid = descriptor_doc["run_start"] @@ -1526,7 +1603,7 @@ def descriptor(self, descriptor_doc): ) raise err - def event_page(self, doc): + def event_page(self, doc: EventPage): descriptor_uid = doc["descriptor"] start_uid = self._descriptor_to_start[descriptor_uid] try: @@ -1539,7 +1616,7 @@ def event_page(self, doc): for callback in self._subfactory_cbs_by_descriptor[descriptor_uid]: callback("event_page", doc) - def datum_page(self, doc): + def datum_page(self, doc: DatumPage) -> None: resource_uid = doc["resource"] try: start_uid = self._resources[resource_uid] @@ -1570,7 +1647,7 @@ def datum_page(self, doc): for callback in self._subfactory_cbs_by_start[start_uid]: callback("datum_page", doc) - def stream_datum(self, doc): + def stream_datum(self, doc: StreamDatum) -> None: resource_uid = doc["stream_resource"] start_uid = self._stream_resources[resource_uid] self._fillers[start_uid].stream_datum(doc) @@ -1579,7 +1656,7 @@ def stream_datum(self, doc): for callback in self._subfactory_cbs_by_start[start_uid]: callback("stream_datum", doc) - def resource(self, doc): + def resource(self, doc: Resource) -> None: try: start_uid = doc["run_start"] except KeyError: @@ -1604,7 +1681,7 @@ def resource(self, doc): for callback in self._subfactory_cbs_by_start[start_uid]: callback("resource", doc) - def stream_resource(self, doc): + def stream_resource(self, doc: StreamResource) -> None: start_uid = doc["run_start"] # No need for Try self._fillers[start_uid].stream_resource(doc) self._stream_resources[doc["uid"]] = doc["run_start"] @@ -1613,7 +1690,7 @@ def stream_resource(self, doc): for callback in self._subfactory_cbs_by_start[start_uid]: callback("stream_resource", doc) - def stop(self, doc): + def stop(self, doc: RunStop) -> None: start_uid = doc["run_start"] for callback in self._factory_cbs_by_start[start_uid]: callback("stop", doc) @@ -1632,10 +1709,6 @@ def stop(self, doc): self._start_to_start_doc.pop(start_uid, None) -class EventModelError(Exception): - ... - - # Here we define subclasses of all of the built-in Python exception types (as # needed, not a comprehensive list) so that all errors raised *directly* by # event_model also inhereit from EventModelError as well as the appropriate @@ -1685,7 +1758,7 @@ class DataNotAccessible(EventModelError, IOError): class UnresolvableForeignKeyError(EventModelValueError): """when we see a foreign before we see the thing to which it refers""" - def __init__(self, key, message): + def __init__(self, key: Any, message: str) -> None: self.key = key self.message = message @@ -1736,6 +1809,7 @@ class MismatchedDataKeys(InvalidData): # We pin jsonschema >=3.0.0 in requirements.txt but due to pip's dependency # resolution it is easy to end up with an environment where that pin is not # respected. Thus, we maintain best-effort support for 2.x. + if version.parse(metadata("jsonschema")["version"]) >= version.parse("3.0.0"): def _is_array(checker, instance): @@ -1780,13 +1854,14 @@ class ComposeRunBundle: (i.e. compose_stream_resource). """ - start_doc: dict - compose_descriptor: callable - compose_resource: callable - compose_stop: callable - compose_stream_resource: Optional[callable] = None + start_doc: RunStart + compose_descriptor: Callable + compose_resource: Callable + compose_stop: Callable + compose_stream_resource: Optional[Callable] = None - def __iter__(self): + # iter for backwards compatibility + def __iter__(self) -> Iterator: return iter( ( self.start_doc, @@ -1797,353 +1872,635 @@ def __iter__(self): ) -ComposeDescriptorBundle = namedtuple( - "ComposeDescriptorBundle", "descriptor_doc compose_event compose_event_page" -) -ComposeResourceBundle = namedtuple( - "ComposeResourceBundle", "resource_doc compose_datum compose_datum_page" -) -ComposeStreamResourceBundle = namedtuple( - "ComposeStreamResourceBundle", "stream_resource_doc compose_stream_data" -) +@dataclass +class ComposeResourceBundle: + resource_doc: Resource + compose_datum: Callable + compose_datum_page: Callable + # iter for backwards compatibility + def __iter__(self) -> Iterator: + return iter( + ( + self.resource_doc, + self.compose_datum, + self.compose_datum_page, + ) + ) -def compose_datum(*, resource, counter, datum_kwargs, validate=True): - resource_uid = resource["uid"] - doc = { - "resource": resource_uid, - "datum_kwargs": datum_kwargs, - "datum_id": "{}/{}".format(resource_uid, next(counter)), - } - if validate: - schema_validators[DocumentNames.datum].validate(doc) - return doc - - -def compose_datum_page(*, resource, counter, datum_kwargs, validate=True): - resource_uid = resource["uid"] - any_column, *_ = datum_kwargs.values() - N = len(any_column) - doc = { - "resource": resource_uid, - "datum_kwargs": datum_kwargs, - "datum_id": ["{}/{}".format(resource_uid, next(counter)) for _ in range(N)], - } - if validate: - schema_validators[DocumentNames.datum_page].validate(doc) - return doc + +@dataclass +class ComposeStreamResourceBundle: + stream_resource_doc: StreamResource + compose_stream_data: List[Callable] + + # iter for backwards compatibility + def __iter__(self) -> Iterator: + return iter( + ( + self.stream_resource_doc, + self.compose_stream_data, + ) + ) -default_path_semantics = {"posix": "posix", "nt": "windows"}[os.name] +@dataclass +class ComposeDatum: + resource: Resource + counter: Iterator + + def __call__(self, datum_kwargs: Dict[str, Any], validate: bool = True) -> Datum: + resource_uid = self.resource["uid"] + doc = Datum( + resource=resource_uid, + datum_kwargs=datum_kwargs, + datum_id="{}/{}".format(resource_uid, next(self.counter)), + ) + if validate: + schema_validators[DocumentNames.datum].validate(doc) + return doc -def compose_resource( +def compose_datum( *, - spec, - root, - resource_path, - resource_kwargs, - path_semantics=default_path_semantics, - start=None, - uid=None, - validate=True, -): - if uid is None: - uid = str(uuid.uuid4()) - counter = itertools.count() - doc = { - "uid": uid, - "spec": spec, - "root": root, - "resource_path": resource_path, - "resource_kwargs": resource_kwargs, - "path_semantics": path_semantics, - } - if start: - doc["run_start"] = start["uid"] + resource: Resource, + counter: Iterator, + datum_kwargs: Dict[str, Any], + validate: bool = True, +) -> Datum: + """ + Here for backwards compatibility, the Compose class is prefered. + """ + return ComposeDatum(resource, counter)(datum_kwargs, validate=validate) - if validate: - schema_validators[DocumentNames.resource].validate(doc) - return ComposeResourceBundle( - doc, - partial(compose_datum, resource=doc, counter=counter), - partial(compose_datum_page, resource=doc, counter=counter), +@dataclass +class ComposeDatumPage: + resource: Resource + counter: Iterator + + def __call__(self, datum_kwargs: dict, validate: bool = True) -> DatumPage: + resource_uid = self.resource["uid"] + any_column, *_ = datum_kwargs.values() + N = len(any_column) + doc = DatumPage( + resource=resource_uid, + datum_kwargs=datum_kwargs, + datum_id=[ + "{}/{}".format(resource_uid, next(self.counter)) for _ in range(N) + ], + ) + if validate: + schema_validators[DocumentNames.datum_page].validate(doc) + return doc + + +def compose_datum_page( + *, + resource: Resource, + counter: Iterator, + datum_kwargs: Dict[str, List[Any]], + validate: bool = True, +) -> DatumPage: + """ + Here for backwards compatibility, the Compose class is prefered. + """ + return ComposeDatumPage(resource, counter)(datum_kwargs, validate=validate) + + +PATH_SEMANTICS: Dict[str, Literal["posix", "windows"]] = { + "posix": "posix", + "nt": "windows", +} +default_path_semantics: Literal["posix", "windows"] = PATH_SEMANTICS[os.name] + + +@dataclass +class ComposeResource: + start: Optional[RunStart] + + def __call__( + self, + spec: str, + root: str, + resource_path: str, + resource_kwargs: Dict[str, Any], + path_semantics: Literal["posix", "windows"] = default_path_semantics, + uid: Optional[str] = None, + validate: bool = True, + ) -> ComposeResourceBundle: + if uid is None: + uid = str(uuid.uuid4()) + + doc = Resource( + path_semantics=path_semantics, + uid=uid, + spec=spec, + root=root, + resource_kwargs=resource_kwargs, + resource_path=resource_path, + ) + if validate: + schema_validators[DocumentNames.resource].validate(doc) + + if self.start: + doc["run_start"] = self.start["uid"] + + counter = itertools.count() + return ComposeResourceBundle( + doc, + ComposeDatum(resource=doc, counter=counter), + ComposeDatumPage(resource=doc, counter=counter), + ) + + +def compose_resource( + *, + spec: str, + root: str, + resource_path: str, + resource_kwargs: Dict[str, Any], + path_semantics: Literal["posix", "windows"] = default_path_semantics, + start: Optional[RunStart] = None, + uid: Optional[str] = None, + validate: bool = True, +) -> ComposeResourceBundle: + """ + Here for backwards compatibility, the Compose class is prefered. + """ + return ComposeResource(start)( + spec, + root, + resource_path, + resource_kwargs, + path_semantics=path_semantics, + uid=uid, + validate=validate, ) +@dataclass +class ComposeStreamDatum: + stream_resource: StreamResource + counter: Iterator + + def __call__( + self, + data_keys: List[str], + seq_nums: Dict[str, int], + indices: Dict[str, int], + event_count: int = 1, + event_offset: int = 0, + validate: bool = True, + ) -> StreamDatum: + resource_uid = self.stream_resource["uid"] + block_idx = next(self.counter) + doc = StreamDatum( + stream_resource=resource_uid, + uid=f"{resource_uid}/{block_idx}", + block_idx=block_idx, + event_count=event_count, + event_offset=event_offset, + data_keys=data_keys, + seq_nums=seq_nums, + indices=indices, + ) + if validate: + schema_validators[DocumentNames.stream_datum].validate(doc) + + return doc + + def compose_stream_datum( *, - stream_resource, - stream_name, - counter, - datum_kwargs, - event_count=1, - event_offset=0, - validate=True, -): - resource_uid = stream_resource["uid"] - if stream_name not in stream_resource["stream_names"]: - raise EventModelKeyError( - "Attempt to create stream_datum with name not included in stream_resource" - ) - block_idx = next(counter) - doc = dict( - stream_resource=resource_uid, - datum_kwargs=datum_kwargs, - uid=f"{resource_uid}/{stream_name}/{block_idx}", - stream_name=stream_name, - block_idx=block_idx, + stream_resource: StreamResource, + counter: Iterator, + data_keys: List[str], + seq_nums: Dict[str, int], + indices: Dict[str, int], + event_count: int = 1, + event_offset: int = 0, + validate: bool = True, +) -> StreamDatum: + """ + Here for backwards compatibility, the Compose class is prefered. + """ + return ComposeStreamDatum(stream_resource, counter)( + data_keys, + seq_nums, + indices, event_count=event_count, event_offset=event_offset, + validate=validate, ) - if validate: - schema_validators[DocumentNames.stream_datum].validate(doc) - return doc -def compose_stream_resource( - *, - spec, - root, - resource_path, - resource_kwargs, - stream_names, - counters=(), - path_semantics=default_path_semantics, - start=None, - uid=None, - validate=True, -): - if uid is None: - uid = str(uuid.uuid4()) - if isinstance(stream_names, str): - stream_names = [ - stream_names, - ] - if len(counters) == 0: - counters = [itertools.count() for _ in stream_names] - elif len(counters) > len(stream_names): - raise ValueError( - f"Insufficient number of counters {len(counters)} for " - f"stream names: {stream_names}" +@dataclass +class ComposeStreamResource: + start: Optional[RunStart] = None + + def __call__( + self, + spec: str, + root: str, + resource_path: str, + resource_kwargs: Dict[str, Any], + stream_names: List[str], + counters: List[Iterator] = [], + path_semantics: Literal["posix", "windows"] = default_path_semantics, + uid: Optional[str] = None, + validate: bool = True, + ) -> ComposeStreamResourceBundle: + if uid is None: + uid = str(uuid.uuid4()) + + doc = StreamResource( + uid=uid, + spec=spec, + root=root, + stream_names=stream_names, + resource_path=resource_path, + resource_kwargs=resource_kwargs, + path_semantics=path_semantics, ) + if self.start: + doc["run_start"] = self.start["uid"] - doc = dict( - uid=uid, - spec=spec, - root=root, - resource_path=resource_path, - resource_kwargs=resource_kwargs, - stream_names=stream_names, + if validate: + schema_validators[DocumentNames.stream_resource].validate(doc) + + return ComposeStreamResourceBundle( + doc, + [ + ComposeStreamDatum( + stream_resource=doc, + counter=counter, + ) + for counter in counters + ], + ) + + +def compose_stream_resource( + *, + spec: str, + root: str, + resource_path: str, + resource_kwargs: Dict[str, Any], + stream_names: List[str], + counters: List[Iterator] = [], + path_semantics: Literal["posix", "windows"] = default_path_semantics, + start: Optional[RunStart] = None, + uid: Optional[str] = None, + validate: bool = True, +) -> ComposeStreamResourceBundle: + """ + Here for backwards compatibility, the Compose class is prefered. + """ + return ComposeStreamResource(start=start)( + spec, + root, + resource_path, + resource_kwargs, + stream_names, + counters=counters, path_semantics=path_semantics, + uid=uid, + validate=validate, ) - if start: - doc["run_start"] = start["uid"] - if validate: - schema_validators[DocumentNames.stream_resource].validate(doc) - return ComposeStreamResourceBundle( - doc, - [ - partial( - compose_stream_datum, - stream_resource=doc, - stream_name=stream_name, - counter=counter, +@dataclass +class ComposeStop: + start: RunStart + event_counters: dict + poison_pill: List + + def __call__( + self, + exit_status: Literal["success", "abort", "fail"] = "success", + reason: str = "", + uid: Optional[str] = None, + time: Optional[float] = None, + validate: bool = True, + ) -> RunStop: + if self.poison_pill: + raise EventModelError( + "Already composed a RunStop document for run " + "{!r}.".format(self.start["uid"]) ) - for stream_name, counter in zip(stream_names, counters) - ], - ) + self.poison_pill.append(object()) + if uid is None: + uid = str(uuid.uuid4()) + if time is None: + time = ttime.time() + doc = RunStop( + uid=uid, + time=time, + run_start=self.start["uid"], + exit_status=exit_status, + reason=reason, + num_events={k: v - 1 for k, v in self.event_counters.items()}, + ) + if validate: + schema_validators[DocumentNames.stop].validate(doc) + return doc def compose_stop( *, - start, - event_counters, - poison_pill, - exit_status="success", - reason="", - uid=None, - time=None, - validate=True, -): - if poison_pill: - raise EventModelError( - "Already composed a RunStop document for run " "{!r}.".format(start["uid"]) + start: RunStart, + event_counters: dict, + poison_pill: List, + exit_status: Literal["success", "abort", "fail"] = "success", + reason: str = "", + uid: Optional[str] = None, + time: Optional[float] = None, + validate: bool = True, +) -> RunStop: + """ + Here for backwards compatibility, the Compose class is prefered. + """ + return ComposeStop( + start=start, + event_counters=event_counters, + poison_pill=poison_pill, + )(exit_status=exit_status, reason=reason, uid=uid, time=time, validate=validate) + + +@dataclass +class ComposeEventPage: + descriptor: EventDescriptor + event_counters: dict + + def __call__( + self, + data: dict, + timestamps: dict, + seq_num: List, + filled: Optional[Dict[str, List[Union[bool, str]]]] = None, + uid: Optional[List] = None, + time: Optional[List] = None, + validate: bool = True, + ) -> EventPage: + N = len(seq_num) + if uid is None: + uid = [str(uuid.uuid4()) for _ in range(N)] + if time is None: + time = [ttime.time()] * N + if filled is None: + filled = {} + doc = EventPage( + uid=uid, + time=time, + data=data, + timestamps=timestamps, + seq_num=seq_num, + filled=filled, + descriptor=self.descriptor["uid"], ) - poison_pill.append(object()) - if uid is None: - uid = str(uuid.uuid4()) - if time is None: - time = ttime.time() - doc = { - "uid": uid, - "time": time, - "run_start": start["uid"], - "exit_status": exit_status, - "reason": reason, - "num_events": {k: v - 1 for k, v in event_counters.items()}, - } - if validate: - schema_validators[DocumentNames.stop].validate(doc) - return doc + if validate: + schema_validators[DocumentNames.event_page].validate(doc) + if not ( + self.descriptor["data_keys"].keys() == data.keys() == timestamps.keys() + ): + raise EventModelValidationError( + "These sets of keys must match:\n" + "event['data'].keys(): {}\n" + "event['timestamps'].keys(): {}\n" + "descriptor['data_keys'].keys(): {}\n".format( + data.keys(), + timestamps.keys(), + self.descriptor["data_keys"].keys(), + ) + ) + if set(filled) - set(data): + raise EventModelValidationError( + "Keys in event['filled'] {} must be a subset of those in " + "event['data'] {}".format(filled.keys(), data.keys()) + ) + self.event_counters[self.descriptor["name"]] += len(data) + return doc def compose_event_page( *, - descriptor, - event_counters, - data, - timestamps, - seq_num, - filled=None, - uid=None, - time=None, - validate=True, -): - N = len(seq_num) - if uid is None: - uid = [str(uuid.uuid4()) for _ in range(N)] - if time is None: - time = [ttime.time()] * N - if filled is None: - filled = {} - doc = { - "uid": uid, - "time": time, - "data": data, - "timestamps": timestamps, - "seq_num": seq_num, - "filled": filled, - "descriptor": descriptor["uid"], - } - if validate: - schema_validators[DocumentNames.event_page].validate(doc) - if not (descriptor["data_keys"].keys() == data.keys() == timestamps.keys()): - raise EventModelValidationError( - "These sets of keys must match:\n" - "event['data'].keys(): {}\n" - "event['timestamps'].keys(): {}\n" - "descriptor['data_keys'].keys(): {}\n".format( - data.keys(), timestamps.keys(), descriptor["data_keys"].keys() + descriptor: EventDescriptor, + event_counters: dict, + data: dict, + timestamps: dict, + seq_num: List, + filled: Optional[Dict[str, List[Union[bool, str]]]] = None, + uid: Optional[List] = None, + time: Optional[List] = None, + validate: bool = True, +) -> EventPage: + """ + Here for backwards compatibility, the Compose class is prefered. + """ + return ComposeEventPage(descriptor, event_counters)( + data, timestamps, seq_num, filled, uid=uid, time=time, validate=validate + ) + + +@dataclass +class ComposeEvent: + descriptor: EventDescriptor + event_counters: Dict[str, int] + + def __call__( + self, + data: dict, + timestamps: dict, + seq_num: Optional[int] = None, + filled: Optional[Dict[str, Union[bool, str]]] = None, + uid: Optional[str] = None, + time: Optional[float] = None, + validate: bool = True, + ) -> Event: + if seq_num is None: + seq_num = self.event_counters[self.descriptor["name"]] + if uid is None: + uid = str(uuid.uuid4()) + if time is None: + time = ttime.time() + if filled is None: + filled = {} + doc = Event( + uid=uid, + time=time, + data=data, + timestamps=timestamps, + seq_num=seq_num, + filled=filled, + descriptor=self.descriptor["uid"], + ) + if validate: + schema_validators[DocumentNames.event].validate(doc) + if not ( + self.descriptor["data_keys"].keys() == data.keys() == timestamps.keys() + ): + raise EventModelValidationError( + "These sets of keys must match:\n" + "event['data'].keys(): {}\n" + "event['timestamps'].keys(): {}\n" + "descriptor['data_keys'].keys(): {}\n".format( + data.keys(), + timestamps.keys(), + self.descriptor["data_keys"].keys(), + ) ) - ) - if set(filled) - set(data): - raise EventModelValidationError( - "Keys in event['filled'] {} must be a subset of those in " - "event['data'] {}".format(filled.keys(), data.keys()) - ) - event_counters[descriptor["name"]] += len(data) - return doc + if set(filled) - set(data): + raise EventModelValidationError( + "Keys in event['filled'] {} must be a subset of those in " + "event['data'] {}".format(filled.keys(), data.keys()) + ) + self.event_counters[self.descriptor["name"]] += 1 + return doc def compose_event( *, - descriptor, - event_counters, - data, - timestamps, - seq_num=None, - filled=None, - uid=None, - time=None, - validate=True, -): - if seq_num is None: - seq_num = event_counters[descriptor["name"]] - if uid is None: - uid = str(uuid.uuid4()) - if time is None: - time = ttime.time() - if filled is None: - filled = {} - doc = { - "uid": uid, - "time": time, - "data": data, - "timestamps": timestamps, - "seq_num": seq_num, - "filled": filled, - "descriptor": descriptor["uid"], - } - if validate: - schema_validators[DocumentNames.event].validate(doc) - if not (descriptor["data_keys"].keys() == data.keys() == timestamps.keys()): - raise EventModelValidationError( - "These sets of keys must match:\n" - "event['data'].keys(): {}\n" - "event['timestamps'].keys(): {}\n" - "descriptor['data_keys'].keys(): {}\n".format( - data.keys(), timestamps.keys(), descriptor["data_keys"].keys() - ) - ) - if set(filled) - set(data): - raise EventModelValidationError( - "Keys in event['filled'] {} must be a subset of those in " - "event['data'] {}".format(filled.keys(), data.keys()) + descriptor: EventDescriptor, + event_counters: dict, + data: Dict[str, Any], + timestamps: Dict[str, Any], + seq_num: int, + filled: Optional[dict] = None, + uid: Optional[str] = None, + time: Optional[float] = None, + validate: bool = True, +) -> Event: + """ + Here for backwards compatibility, the Compose class is prefered. + """ + return ComposeEvent(descriptor, event_counters)( + data, + timestamps, + seq_num=seq_num, + filled=filled, + uid=uid, + time=time, + validate=validate, + ) + + +@dataclass +class ComposeDescriptorBundle: + descriptor_doc: EventDescriptor + compose_event: ComposeEvent + compose_event_page: ComposeEventPage + + def __iter__(self) -> Iterator: + return iter( + ( + self.descriptor_doc, + self.compose_event, + self.compose_event_page, ) - event_counters[descriptor["name"]] += 1 - return doc + ) + + +@dataclass +class ComposeDescriptor: + start: RunStart + streams: dict + event_counters: dict + + def __call__( + self, + name, + data_keys, + hints=None, + configuration=None, + object_keys=None, + time=None, + uid=None, + validate: bool = True, + ) -> ComposeDescriptorBundle: + if time is None: + time = ttime.time() + if uid is None: + uid = str(uuid.uuid4()) + if hints is None: + hints = {} + if configuration is None: + configuration = {} + if object_keys is None: + object_keys = {} + + doc = EventDescriptor( + configuration=configuration, + data_keys=data_keys, + name=name, + object_keys=object_keys, + run_start=self.start["uid"], + time=time, + uid=uid, + hints=hints, + ) + if validate: + if name in self.streams and self.streams[name] != set(data_keys): + raise EventModelValidationError( + "A descriptor with the name {} has already been composed with " + "data_keys {}. The requested data_keys were {}. All " + "descriptors in a given stream must have the same " + "data_keys.".format(name, self.streams[name], set(data_keys)) + ) + schema_validators[DocumentNames.descriptor].validate(doc) + + if name not in self.streams: + self.streams[name] = set(data_keys) + self.event_counters[name] = 1 + + return ComposeDescriptorBundle( + descriptor_doc=doc, + compose_event=ComposeEvent( + descriptor=doc, event_counters=self.event_counters + ), + compose_event_page=ComposeEventPage( + descriptor=doc, event_counters=self.event_counters + ), + ) def compose_descriptor( *, - start, - streams, - event_counters, - name, - data_keys, - uid=None, - time=None, - object_keys=None, - configuration=None, - hints=None, - validate=True, -): - if uid is None: - uid = str(uuid.uuid4()) - if time is None: - time = ttime.time() - if object_keys is None: - object_keys = {} - if configuration is None: - configuration = {} - if hints is None: - hints = {} - doc = { - "uid": uid, - "time": time, - "run_start": start["uid"], - "name": name, - "data_keys": data_keys, - "object_keys": object_keys, - "hints": hints, - "configuration": configuration, - } - if validate: - if name in streams and streams[name] != set(data_keys): - raise EventModelValidationError( - "A descriptor with the name {} has already been composed with " - "data_keys {}. The requested data_keys were {}. All " - "descriptors in a given stream must have the same " - "data_keys.".format(name, streams[name], set(data_keys)) - ) - schema_validators[DocumentNames.descriptor].validate(doc) - if name not in streams: - streams[name] = set(data_keys) - event_counters[name] = 1 - return ComposeDescriptorBundle( - doc, - partial(compose_event, descriptor=doc, event_counters=event_counters), - partial(compose_event_page, descriptor=doc, event_counters=event_counters), + start: RunStart, + streams: dict, + event_counters: dict, + name: str, + data_keys: Dict[str, DataKey], + uid: Optional[str] = None, + time: Optional[float] = None, + object_keys: Optional[Dict[str, Any]] = None, + configuration: Optional[Dict[str, Configuration]] = None, + hints: Optional[PerObjectHint] = None, + validate: bool = True, +) -> ComposeDescriptorBundle: + """ + Here for backwards compatibility, the Compose class is prefered. + """ + return ComposeDescriptor(start, streams, event_counters)( + name, + data_keys, + hints=hints, + configuration=configuration, + object_keys=object_keys, + time=time, + uid=uid, + validate=validate, ) def compose_run( - *, uid=None, time=None, metadata=None, validate=True, event_counters=None -): + *, + uid: Optional[str] = None, + time: Optional[float] = None, + metadata: Optional[Dict] = None, + validate: bool = True, + event_counters: Optional[Dict[str, int]] = None, +) -> ComposeRunBundle: """ Compose a RunStart document and factory functions for related documents. @@ -2174,36 +2531,38 @@ def compose_run( time = ttime.time() if metadata is None: metadata = {} - doc = dict(uid=uid, time=time, **metadata) + # Define some mutable state to be shared internally by the closures composed # below. - streams = {} + streams: dict = {} if event_counters is None: event_counters = {} - poison_pill = [] + poison_pill: list = [] + + doc = dict( + **metadata, + uid=uid, + time=time, + ) if validate: schema_validators[DocumentNames.start].validate(doc) return ComposeRunBundle( - doc, - partial( - compose_descriptor, - start=doc, - streams=streams, - event_counters=event_counters, + cast(RunStart, doc), + ComposeDescriptor( + start=cast(RunStart, doc), streams=streams, event_counters=event_counters ), - partial(compose_resource, start=doc), - partial( - compose_stop, - start=doc, + ComposeResource(start=cast(RunStart, doc)), + ComposeStop( + start=cast(RunStart, doc), event_counters=event_counters, poison_pill=poison_pill, ), - compose_stream_resource=partial(compose_stream_resource, start=doc), + compose_stream_resource=ComposeStreamResource(start=cast(RunStart, doc)), ) -def pack_event_page(*events): +def pack_event_page(*events: Event) -> EventPage: """ Transform one or more Event documents into an EventPage document. @@ -2235,29 +2594,29 @@ def pack_event_page(*events): filled_list.append(event.get("filled", {})) data_list.append(event["data"]) timestamps_list.append(event["timestamps"]) - event_page = { - "time": time_list, - "uid": uid_list, - "seq_num": seq_num_list, - "descriptor": event["descriptor"], - "filled": _transpose_list_of_dicts(filled_list), - "data": _transpose_list_of_dicts(data_list), - "timestamps": _transpose_list_of_dicts(timestamps_list), - } + event_page = EventPage( + time=time_list, + uid=uid_list, + seq_num=seq_num_list, + descriptor=event["descriptor"], + filled=_transpose_list_of_dicts(filled_list), + data=_transpose_list_of_dicts(data_list), + timestamps=_transpose_list_of_dicts(timestamps_list), + ) return event_page -def unpack_event_page(event_page): +def unpack_event_page(event_page: EventPage) -> Generator: """ Transform an EventPage document into individual Event documents. Parameters ---------- - event_page : dict + event_page : EventPage Yields ------ - event : dict + event : Event """ descriptor = event_page["descriptor"] data_list = _transpose_dict_of_lists(event_page["data"]) @@ -2272,19 +2631,18 @@ def unpack_event_page(event_page): filled_list, fillvalue={}, ): - event = { - "descriptor": descriptor, - "uid": uid, - "time": time, - "seq_num": seq_num, - "data": data, - "timestamps": timestamps, - "filled": filled, - } - yield event - - -def pack_datum_page(*datum): + yield Event( + descriptor=descriptor, + uid=uid, + time=time, + seq_num=seq_num, + data=data, + timestamps=timestamps, + filled=filled, + ) + + +def pack_datum_page(*datum: Datum) -> DatumPage: """ Transform one or more Datum documents into a DatumPage document. @@ -2308,40 +2666,37 @@ def pack_datum_page(*datum): for datum_ in datum: datum_id_list.append(datum_["datum_id"]) datum_kwarg_list.append(datum_["datum_kwargs"]) - datum_page = { - "resource": datum_["resource"], - "datum_id": datum_id_list, - "datum_kwargs": _transpose_list_of_dicts(datum_kwarg_list), - } + datum_page = DatumPage( + resource=datum_["resource"], + datum_id=datum_id_list, + datum_kwargs=_transpose_list_of_dicts(datum_kwarg_list), + ) return datum_page -def unpack_datum_page(datum_page): +def unpack_datum_page(datum_page: DatumPage) -> Generator: """ Transform a DatumPage document into individual Datum documents. Parameters ---------- - datum_page : dict + datum_page : DatumPage Yields ------ - datum : dict + datum : Datum """ resource = datum_page["resource"] datum_kwarg_list = _transpose_dict_of_lists(datum_page["datum_kwargs"]) + datum_id: Any + datum_kwargs: Any for datum_id, datum_kwargs in itertools.zip_longest( datum_page["datum_id"], datum_kwarg_list, fillvalue={} ): - datum = { - "datum_id": datum_id, - "datum_kwargs": datum_kwargs, - "resource": resource, - } - yield datum + yield Datum(datum_id=datum_id, datum_kwargs=datum_kwargs, resource=resource) -def rechunk_event_pages(event_pages, chunk_size): +def rechunk_event_pages(event_pages: Iterable, chunk_size: int) -> Generator: """ Resizes the event_pages in a iterable of event_pages. @@ -2359,7 +2714,7 @@ def rechunk_event_pages(event_pages, chunk_size): remainder = chunk_size chunk_list = [] - def page_chunks(page, chunk_size, remainder): + def page_chunks(page: dict, chunk_size: int, remainder: int) -> Generator: """ Yields chunks of a event_page. The first chunk will be of size remainder, the following chunks will be @@ -2403,7 +2758,7 @@ def page_chunks(page, chunk_size, remainder): yield merge_event_pages(chunk_list) -def merge_event_pages(event_pages): +def merge_event_pages(event_pages: Iterable[EventPage]) -> EventPage: """ Combines a iterable of event_pages to a single event_page. @@ -2420,21 +2775,20 @@ def merge_event_pages(event_pages): if len(pages) == 1: return pages[0] - array_keys = ["seq_num", "time", "uid"] - - return { - "descriptor": pages[0]["descriptor"], - **{ - key: list(itertools.chain.from_iterable([page[key] for page in pages])) - for key in array_keys - }, - "data": { + doc = dict( + descriptor=pages[0]["descriptor"], + seq_num=list( + itertools.chain.from_iterable([page["seq_num"] for page in pages]) + ), + time=list(itertools.chain.from_iterable([page["time"] for page in pages])), + uid=list(itertools.chain.from_iterable([page["uid"] for page in pages])), + data={ key: list( itertools.chain.from_iterable([page["data"][key] for page in pages]) ) for key in pages[0]["data"].keys() }, - "timestamps": { + timestamps={ key: list( itertools.chain.from_iterable( [page["timestamps"][key] for page in pages] @@ -2442,16 +2796,17 @@ def merge_event_pages(event_pages): ) for key in pages[0]["data"].keys() }, - "filled": { + filled={ key: list( itertools.chain.from_iterable([page["filled"][key] for page in pages]) ) for key in pages[0]["data"].keys() }, - } + ) + return cast(EventPage, doc) -def rechunk_datum_pages(datum_pages, chunk_size): +def rechunk_datum_pages(datum_pages: Iterable, chunk_size: int) -> Generator: """ Resizes the datum_pages in a iterable of event_pages. @@ -2469,7 +2824,7 @@ def rechunk_datum_pages(datum_pages, chunk_size): remainder = chunk_size chunk_list = [] - def page_chunks(page, chunk_size, remainder): + def page_chunks(page: dict, chunk_size: int, remainder: int) -> Generator: """ Yields chunks of a datum_page. The first chunk will be of size remainder, the following chunks will be @@ -2508,7 +2863,7 @@ def page_chunks(page, chunk_size, remainder): yield merge_datum_pages(chunk_list) -def merge_datum_pages(datum_pages): +def merge_datum_pages(datum_pages: Iterable) -> DatumPage: """ Combines a iterable of datum_pages to a single datum_page. @@ -2527,13 +2882,13 @@ def merge_datum_pages(datum_pages): array_keys = ["datum_id"] - return { - "resource": pages[0]["resource"], + doc = dict( + resource=pages[0]["resource"], **{ key: list(itertools.chain.from_iterable([page[key] for page in pages])) for key in array_keys }, - "datum_kwargs": { + datum_kwargs={ key: list( itertools.chain.from_iterable( [page["datum_kwargs"][key] for page in pages] @@ -2541,10 +2896,11 @@ def merge_datum_pages(datum_pages): ) for key in pages[0]["datum_kwargs"].keys() }, - } + ) + return cast(DatumPage, doc) -def bulk_events_to_event_pages(bulk_events): +def bulk_events_to_event_pages(bulk_events: dict) -> list: """ Transform a BulkEvents document into a list of EventPage documents. @@ -2560,7 +2916,7 @@ def bulk_events_to_event_pages(bulk_events): """ # This is for a deprecated document type, so we are not being fussy # about efficiency/laziness here. - event_pages = {} # descriptor uid mapped to page + event_pages: dict = {} # descriptor uid mapped to page for events in bulk_events.values(): for event in events: descriptor = event["descriptor"] @@ -2587,22 +2943,22 @@ def bulk_events_to_event_pages(bulk_events): return list(event_pages.values()) -def bulk_datum_to_datum_page(bulk_datum): +def bulk_datum_to_datum_page(bulk_datum: dict) -> DatumPage: """ Transform one BulkDatum into one DatumPage. Note: There is only one known usage of BulkDatum "in the wild", and the BulkDatum layout has been deprecated in favor of DatumPage. """ - datum_page = { - "datum_id": bulk_datum["datum_ids"], - "resource": bulk_datum["resource"], - "datum_kwargs": _transpose_list_of_dicts(bulk_datum["datum_kwarg_list"]), - } + datum_page = DatumPage( + datum_id=bulk_datum["datum_ids"], + resource=bulk_datum["resource"], + datum_kwargs=_transpose_list_of_dicts(bulk_datum["datum_kwarg_list"]), + ) return datum_page -def _transpose_list_of_dicts(list_of_dicts): +def _transpose_list_of_dicts(list_of_dicts: list) -> dict: "Transform list-of-dicts into dict-of-lists (i.e. DataFrame-like)." dict_of_lists = defaultdict(list) for row in list_of_dicts: @@ -2611,7 +2967,7 @@ def _transpose_list_of_dicts(list_of_dicts): return dict(dict_of_lists) -def _transpose_dict_of_lists(dict_of_lists): +def _transpose_dict_of_lists(dict_of_lists: dict) -> list: "Transform dict-of-lists (i.e. DataFrame-like) into list-of-dicts." list_of_dicts = [] keys = list(dict_of_lists) @@ -2620,7 +2976,7 @@ def _transpose_dict_of_lists(dict_of_lists): return list_of_dicts -def verify_filled(event_page): +def verify_filled(event_page: dict) -> None: """Take an event_page document and verify that it is completely filled. Parameters @@ -2647,7 +3003,7 @@ def verify_filled(event_page): ) -def sanitize_doc(doc): +def sanitize_doc(doc: dict) -> dict: """Return a copy with any numpy objects converted to built-in Python types. This function takes in an event-model document and returns a copy with any @@ -2682,7 +3038,8 @@ class NumpyEncoder(json.JSONEncoder): """ # Credit: https://stackoverflow.com/a/47626762/1221924 - def default(self, obj): + @no_type_check + def default(self, obj: object) -> Any: try: import dask.array diff --git a/event_model/documents/__init__.py b/event_model/documents/__init__.py new file mode 100644 index 00000000..9e1c7f41 --- /dev/null +++ b/event_model/documents/__init__.py @@ -0,0 +1,11 @@ +# flake8: noqa +from .datum import Datum +from .datum_page import DatumPage +from .event import Event +from .event_descriptor import EventDescriptor +from .event_page import EventPage +from .resource import Resource +from .run_start import RunStart +from .run_stop import RunStop +from .stream_datum import StreamDatum +from .stream_resource import StreamResource diff --git a/event_model/documents/datum.py b/event_model/documents/datum.py new file mode 100644 index 00000000..335f1247 --- /dev/null +++ b/event_model/documents/datum.py @@ -0,0 +1,30 @@ +from typing import Any, Dict + +from typing_extensions import Annotated, TypedDict + +from .generate.type_wrapper import Field, add_extra_schema + +DATUM_EXTRA_SCHEMA = {"additionalProperties": False} + + +@add_extra_schema(DATUM_EXTRA_SCHEMA) +class Datum(TypedDict): + """Document to reference a quanta of externally-stored data""" + + datum_id: Annotated[ + str, + Field( + description="Globally unique identifier for this Datum (akin to 'uid' " + "for other Document types), typically formatted as '/'" + ), + ] + datum_kwargs: Annotated[ + Dict[str, Any], + Field( + description="Arguments to pass to the Handler to " + "retrieve one quanta of data", + ), + ] + resource: Annotated[ + str, Field(description="The UID of the Resource to which this Datum belongs") + ] diff --git a/event_model/documents/datum_page.py b/event_model/documents/datum_page.py new file mode 100644 index 00000000..95890cd7 --- /dev/null +++ b/event_model/documents/datum_page.py @@ -0,0 +1,34 @@ +from typing import Any, Dict, List + +from typing_extensions import Annotated, TypedDict + +from .generate.type_wrapper import AsRef, Field, add_extra_schema + +DATUM_PAGE_EXTRA_SCHEMA = {"additionalProperties": False} + + +@add_extra_schema(DATUM_PAGE_EXTRA_SCHEMA) +class DatumPage(TypedDict): + """Page of documents to reference a quanta of externally-stored data""" + + datum_id: Annotated[ + List[str], + AsRef("Dataframe"), + Field( + description="Array unique identifiers for each Datum (akin to 'uid' for " + "other Document types), typically formatted as '/'" + ), + ] + datum_kwargs: Annotated[ + Dict[str, List[Any]], + Field( + description="Array of arguments to pass to the Handler to " + "retrieve one quanta of data" + ), + ] + resource: Annotated[ + str, + Field( + description="The UID of the Resource to which all Datums in the page belong" + ), + ] diff --git a/event_model/documents/event.py b/event_model/documents/event.py new file mode 100644 index 00000000..e3cfcb38 --- /dev/null +++ b/event_model/documents/event.py @@ -0,0 +1,55 @@ +from typing import Any, Dict, Union + +from typing_extensions import Annotated, NotRequired, TypedDict + +from .generate.type_wrapper import Field, add_extra_schema + +EVENT_EXTRA_SCHEMA = {"additionalProperties": False} + + +class PartialEvent(TypedDict): + """ + Fields seperated from the complete Event for use by the protocols. + """ + + filled: NotRequired[ + Annotated[ + Dict[str, Union[bool, str]], + Field( + description="Mapping each of the keys of externally-stored data to the " + "boolean False, indicating that the data has not been loaded, or to " + "foreign keys (moved here from 'data' when the data was loaded)" + ), + ] + ] + data: Annotated[Dict[str, Any], Field(description="The actual measurement data")] + time: Annotated[ + float, + Field( + description="The event time. This maybe different than the timestamps on " + "each of the data entries.", + ), + ] + timestamps: Annotated[ + Dict[str, Any], + Field(description="The timestamps of the individual measurement data"), + ] + + +@add_extra_schema(EVENT_EXTRA_SCHEMA) +class Event(PartialEvent): + """Document to record a quanta of collected data""" + + descriptor: Annotated[ + str, Field(description="UID of the EventDescriptor to which this Event belongs") + ] + + seq_num: Annotated[ + int, + Field( + description="Sequence number to identify the location of this Event in the " + "Event stream", + ), + ] + + uid: Annotated[str, Field(description="Globally unique identifier for this Event")] diff --git a/event_model/documents/event_descriptor.py b/event_model/documents/event_descriptor.py new file mode 100644 index 00000000..85191139 --- /dev/null +++ b/event_model/documents/event_descriptor.py @@ -0,0 +1,155 @@ +from typing import Any, Dict, List + +from typing_extensions import Annotated, Literal, NotRequired, TypedDict + +from .generate.type_wrapper import Field, add_extra_schema + + +class DataKey(TypedDict): + """Describes the objects in the data property of Event documents""" + + dims: NotRequired[ + Annotated[ + List[str], + Field( + description="The names for dimensions of the data. Null or empty list " + "if scalar data", + ), + ] + ] + dtype: Annotated[ + Literal["string", "number", "array", "boolean", "integer"], + Field(description="The type of the data in the event."), + ] + external: NotRequired[ + Annotated[ + str, + Field( + description="Where the data is stored if it is stored external " + "to the events", + regex=r"^[A-Z]+:?", + ), + ] + ] + object_name: NotRequired[ + Annotated[ + str, + Field(description="The name of the object this key was pulled from."), + ] + ] + precision: NotRequired[ + Annotated[ + int, + Field( + description="Number of digits after decimal place if " + "a floating point number" + ), + ] + ] + shape: Annotated[ + List[int], + Field(description="The shape of the data. Empty list indicates scalar data."), + ] + source: Annotated[ + str, Field(description="The source (ex piece of hardware) of the data.") + ] + units: NotRequired[ + Annotated[str, Field(description="Engineering units of the value")] + ] + + +class PerObjectHint(TypedDict): + """The 'interesting' data keys for this device.""" + + fields: NotRequired[ + Annotated[ + List[str], + Field(description="The 'interesting' data keys for this device."), + ] + ] + + +class Configuration(TypedDict): + data: NotRequired[ + Annotated[Dict[str, Any], Field(description="The actual measurement data")] + ] + data_keys: NotRequired[ + Annotated[ + Dict[str, DataKey], + Field( + description="This describes the data stored alongside it in this " + "configuration object." + ), + ] + ] + timestamps: NotRequired[ + Annotated[ + Dict[str, Any], + Field(description="The timestamps of the individual measurement data"), + ] + ] + + +EVENT_DESCRIPTOR_EXTRA_SCHEMA = { + "patternProperties": {"^([^./]+)$": {"$ref": "#/definitions/DataType"}}, + "definitions": { + "DataType": { + "title": "DataType", + "patternProperties": {"^([^./]+)$": {"$ref": "#/definitions/DataType"}}, + "additionalProperties": False, + } + }, + "additionalProperties": False, +} + + +@add_extra_schema(EVENT_DESCRIPTOR_EXTRA_SCHEMA) +class EventDescriptor(TypedDict): + """Document to describe the data captured in the associated event + documents""" + + configuration: NotRequired[ + Annotated[ + Dict[str, Configuration], + Field( + description="Readings of configurational fields necessary for " + "interpreting data in the Events.", + ), + ] + ] + data_keys: Annotated[ + Dict[str, DataKey], + Field( + description="This describes the data in the Event Documents.", + title="data_keys", + ), + ] + hints: NotRequired[PerObjectHint] + name: NotRequired[ + Annotated[ + str, + Field( + description="A human-friendly name for this data stream, such as " + "'primary' or 'baseline'.", + ), + ] + ] + object_keys: NotRequired[ + Annotated[ + Dict[str, Any], + Field( + description="Maps a Device/Signal name to the names of the entries " + "it produces in data_keys.", + ), + ] + ] + run_start: Annotated[ + str, Field(description="Globally unique ID of this run's 'start' document.") + ] + time: Annotated[ + float, Field(description="Creation time of the document as unix epoch time.") + ] + uid: Annotated[ + str, + Field(description="Globally unique ID for this event descriptor.", title="uid"), + ] diff --git a/event_model/documents/event_page.py b/event_model/documents/event_page.py new file mode 100644 index 00000000..d17f621d --- /dev/null +++ b/event_model/documents/event_page.py @@ -0,0 +1,67 @@ +from typing import Any, Dict, List, Union + +from typing_extensions import Annotated, NotRequired, TypedDict + +from .generate.type_wrapper import AsRef, Field, add_extra_schema + +EVENT_PAGE_EXTRA_SCHEMA = {"additionalProperties": False} + + +class PartialEventPage(TypedDict): + """ + Fields seperated from the complete EventPage for use by the protocols. + """ + + data: Annotated[ + Dict[str, List[Any]], + AsRef("Dataframe"), + Field(description="The actual measurement data"), + ] + time: Annotated[ + List[float], + Field( + description="Array of Event times. This maybe different than the " + "timestamps on each of the data entries" + ), + ] + timestamps: Annotated[ + Dict[str, List[Any]], + AsRef("Dataframe"), + Field(description="The timestamps of the individual measurement data"), + ] + + +@add_extra_schema(EVENT_PAGE_EXTRA_SCHEMA) +class EventPage(PartialEventPage): + """Page of documents to record a quanta of collected data""" + + descriptor: Annotated[ + str, + Field( + description="The UID of the EventDescriptor to which all of the Events in " + "this page belong", + ), + ] + filled: NotRequired[ + Annotated[ + Dict[str, List[Union[bool, str]]], + AsRef("DataframeForFilled"), + Field( + description="Mapping each of the keys of externally-stored data to an " + "array containing the boolean False, indicating that the data has not " + "been loaded, or to foreign keys (moved here from 'data' when the data " + "was loaded)" + ), + ] + ] + seq_num: Annotated[ + List[int], + Field( + description="Array of sequence numbers to identify the location of each " + "Event in the Event stream", + ), + ] + uid: Annotated[ + List[str], + Field(description="Array of globally unique identifiers for each Event"), + ] diff --git a/event_model/documents/generate/__init__.py b/event_model/documents/generate/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/event_model/documents/generate/type_wrapper.py b/event_model/documents/generate/type_wrapper.py new file mode 100644 index 00000000..156853a7 --- /dev/null +++ b/event_model/documents/generate/type_wrapper.py @@ -0,0 +1,40 @@ +""" +A wrapper used to patch out schema generation utilities. +""" + + +from dataclasses import dataclass + +try: + import pydantic + + Field = pydantic.Field + FieldInfo = pydantic.fields.FieldInfo +except ModuleNotFoundError: + + def Field(*args, **kwargs): # type: ignore + ... + + class FieldInfo: # type: ignore + ... + + +extra_schema = {} + + +def add_extra_schema(schema: dict): + def inner(cls): + extra_schema[cls] = schema + return cls + + return inner + + +@dataclass +class AsRef: + ref_name: str + + +# We need to check that only one element in an annotation is the type of +# the field, the others have to be instances of classes in this tuple +ALLOWED_ANNOTATION_ELEMENTS = (AsRef, FieldInfo) diff --git a/event_model/documents/generate/typeddict_to_schema.py b/event_model/documents/generate/typeddict_to_schema.py new file mode 100644 index 00000000..616a3fcd --- /dev/null +++ b/event_model/documents/generate/typeddict_to_schema.py @@ -0,0 +1,563 @@ +# type: ignore + +import json +import re +import sys +from collections import OrderedDict +from pathlib import Path +from typing import Dict, Optional, Tuple, Type, Union + +from pydantic import BaseConfig, BaseModel, Field, create_model +from pydantic.fields import FieldInfo +from typing_extensions import ( + Annotated, + NotRequired, + _TypedDictMeta, + get_args, + get_origin, +) + +from event_model import SCHEMA_PATH +from event_model.documents import ( + Datum, + DatumPage, + Event, + EventDescriptor, + EventPage, + Resource, + RunStart, + RunStop, + StreamDatum, + StreamResource, +) +from event_model.documents.generate.type_wrapper import ( + ALLOWED_ANNOTATION_ELEMENTS, + AsRef, + extra_schema, +) + +# The hacky indexing on types isn't possible with python < 3.9 +if sys.version_info[:2] < (3, 9): + raise EnvironmentError("schema generation requires python 3.8 or higher") + + +SCHEMA_OUT_DIR = Path("event_model") / SCHEMA_PATH + + +# Used to add user written schema to autogenerated schema. +def merge_dicts(dict1: dict, dict2: dict) -> dict: + """ + Takes two dictionaries with subdirectories and returns a new dictionary of + the two merged: + dict1 = { + "x1": { + "y1": 0, "y3": {"z1" : [1, 2], "z2": 1} + }, + "x2" : 0, + "x3": 1 + } + and + dict2 = { + "x1": { + "y2" : 0, "y3": {"z1": [3, 4], "z3": 5} + }, + "x3" : 0 + } + returns + { + "x1": { + "y1": 0, "y2": 0, "y3": {"z1": [1, 2, 3, 4], "z2": 1, "z3": 5} + }, + "x2": 0 + "x3": 1 + } + """ + + return_dict = dict2.copy() + + for key in dict1: + if key not in dict2: + return_dict[key] = dict1[key] + + elif not isinstance(dict1[key], type(dict2[key])): + return_dict[key] = dict1[key] + + elif isinstance(dict1[key], dict): + return_dict[key] = merge_dicts(dict1[key], dict2[key]) + + elif isinstance(dict1[key], list): + return_dict[key] = dict1[key] + dict2[key] + + return return_dict + + +def format_all_annotations( + typed_dict: _TypedDictMeta, new_basemodel_classes: Dict[str, BaseModel] = None +) -> _TypedDictMeta: + """Goes through all field type annotations and formats them to be acceptable + to pydantic. + + It formats annotations in the following way: + * NotRequired[Annotated[X, FieldInfo, ...] -> Annotated[Optional[X], + FieldInfo, ...] + * Annotated[X, AsRef("ref_name"), FieldInfo] -> + Annotated[ref_name_BaseModel_containing_X, FieldInfo] + * If X is also a typeddict then Annotated[X, FieldInfo] -> + Annotated[parse_typeddict_to_schema(X), FieldInfo] + + Parameters + ---------- + typed_dict : dict + TypedDict to change annotations of prior to being converted to BaseModel. + + new_basemodel_classes : Optional[Dict[str, BaseModel] + Dict where keys are names of _TypedDictMeta classes referenced inside + the TypedDict being converted to a BaseModel, and values are BaseModel + classes that have already been generated from those TypedDicts - + pydantic will throw an error if we try to generate the same BaseModel + twice so we just reference the one already created if multiple + annotations have the same TypedDict. + + Returns + ------- + typed_dict : dict + The same dict with field annotations adjusted to be pydantic friendly. + + """ + new_annotations = {} + + if new_basemodel_classes is None: + new_basemodel_classes = {} + + for field_name, field_type in typed_dict.__annotations__.items(): + new_annotations[field_name] = field_parser( + field_name, field_type, new_basemodel_classes + ) + + typed_dict.__annotations__ = new_annotations + + return typed_dict + + +def field_is_annotated(field: type) -> bool: + """Returns True if a field is of type Annotated[X].""" + return get_origin(field) == Annotated + + +def field_is_not_required( + field_type: type, remove_origin_if_NotRequired: bool = False +) -> Tuple[type, bool]: + """Checks if a field is of type is NotRequired[X]. + + Parameters + ---------- + field_type : dict + Field type taken from an annotation. + remove_origin_if_NotRequired : bool + If True returns the inputted field_type with NotRequired stripped off. + + Returns + ------- + (field_type, is_not_required) : Tuple[type, bool] + is_not_required is True if the field is of type NotRequired[X]. + field_type is the same as the inputted field_type however if + remove_origin_if_NotRequired is true it will strip NotRequired. + """ + is_not_required = get_origin(field_type) == NotRequired + if is_not_required and remove_origin_if_NotRequired: + args = get_args(field_type) + assert len(args) == 1 + field_type = args[0] + return field_type, is_not_required + + +def get_field_type( + field_name: str, + field_type: type, + new_basemodel_classes: Dict[str, BaseModel], +) -> type: + """Goes through an annotation and finds the type it represents without the + additional context - e.g int inside Annotated[int, Field(description=""), + AsRef("blah")]. + It is assumed that either the field origin is Annotated, or the field has no + origin. + + Parameters + ---------- + field_name : str + Name of the field being parsed. + field_type : type + Annotation to be parsed. + new_basemodel_classes : dict + Dict where keys are names of _TypedDictMeta classes referenced inside + the TypedDict being converted to a BaseModel, and values are BaseModel + classes that have already been generated from those TypedDicts - + pydantic will throw an error if we try to generate the same BaseModel + twice so we just reference the one already created if multiple + annotations have the same TypedDict. + + Returns + ------- + field_type : type + The data type inside the annotation seperate from the other context in + the annotation. + """ + args = get_args(field_type) + if args: + field_type = [ + x + for x in args + if True not in [isinstance(x, y) for y in ALLOWED_ANNOTATION_ELEMENTS] + ] + assert len(field_type) == 1, ( + f'Field "{field_name}" has multiple types: ' + f'{"and ".join([x.__class__.__name__ for x in field_type])}' + ) + field_type = field_type[0] + + # If the TypedDict references another TypedDict then another + # BaseModel is recursively generated from that TypedDict, + # and the field annotation is swapped to that BaseModel. + field_type = change_sub_typed_dicts_to_basemodels(field_type, new_basemodel_classes) + + return field_type + + +def get_annotation_contents(field_type: type) -> Tuple[Optional[AsRef], FieldInfo]: + """Goes through the args of an Annotation and parses out any AsRef(), or + FieldInfo. + + Parameters + ---------- + field_type : type + Annotation to be parsed. + + Returns + ------- + (as_ref, field_info) : Tuple[AsRef, FieldInfo] + as_ref is the AsRef tag in the annotation, or None if + there is no AsRef tag. field_info is the FieldInfo class returned from + the Field() call in the Annotation, or an empty Field() call if none is found. + """ + + args = get_args(field_type) + as_ref = None + field_info = Field() + if args: + for arg in args: + if isinstance(arg, AsRef): + as_ref = arg + elif isinstance(arg, FieldInfo): + field_info = arg + + return as_ref, field_info + + +def parse_AsRef( + field_type: type, + field_info: FieldInfo, + as_ref: AsRef, + new_basemodel_classes: Dict[str, BaseModel], +) -> type: + """Parses the AsRef tag and makes a new BaseModel class containing a + __root__ of the field with the AsRef. + + Parameters + ----------------- + field_type : type + Datatype to put in the __root__ field of the generated basemodel. + field_info : FieldInfo + Info included in the Field() of the annotation. + as_ref : AsRef + AsRef tag in the annotation. + new_basemodel_classes : dict + Dict where keys are names of a basemodel class generated by this + function, and values are the class itseldf. If another field has the + same AsRef the already generated baseclass in this dict is used as the + new field value. + + Returns + ----------------- + field_type : type + The generated basemodel. + """ + ref_field_info = Field() + # Regex is extracted from the field info and placed in the new field info + if field_info.regex: + ref_field_info.regex = field_info.regex + ref_field_type = Annotated[field_type, ref_field_info] + + if as_ref.ref_name not in new_basemodel_classes: + field_type = create_model( + as_ref.ref_name, + __config__=Config, + __root__=(ref_field_type, None), + ) + new_basemodel_classes[as_ref.ref_name] = field_type + else: + generated_basemodel_type = new_basemodel_classes[ + as_ref.ref_name + ].__annotations__["__root__"] + assert get_args(ref_field_type)[0] == get_args(generated_basemodel_type)[0], ( + f'Fields with type AsRef("{as_ref.ref_name}") have differing types: ' + f"{generated_basemodel_type} and {ref_field_type}" + ) + field_type = new_basemodel_classes[as_ref.ref_name] + + return field_type + + +def change_sub_typed_dicts_to_basemodels( + field_type: type, new_basemodel_classes: Dict[str, BaseModel] +) -> dict: + """Checks for any TypedDicts in the field_type and converts them to + basemodels. + + Parameters + ----------------- + field_type : type + Annotation to be parsed. + new_basemodel_classes : dict + Dict where keys are names of _TypedDictMeta classes referenced inside + the TypedDict being converted to a BaseModel, and values are BaseModel + classes that have already been generated from those TypedDicts - + pydantic will throw an error if we try to generate the same BaseModel + twice so we just reference the one already created if multiple + annotations have the same TypedDict. + + Returns + ----------------- + field_type : type + New field type with TypedDicts swapped to basemodels + """ + + if isinstance(field_type, _TypedDictMeta): + if field_type.__name__ not in new_basemodel_classes: + field_type = parse_typeddict_to_schema( + field_type, + return_basemodel=True, + new_basemodel_classes=new_basemodel_classes, + ) + new_basemodel_classes[field_type.__name__] = field_type + else: + field_type = new_basemodel_classes[field_type.__name__] + # It's still possible there's a TypedDict in the args to be converted - e.g + # Dict[str, SomeTypedDict] + else: + origin = get_origin(field_type) + args = get_args(field_type) + if origin and args: + field_type = origin[ + tuple( + change_sub_typed_dicts_to_basemodels(arg, new_basemodel_classes) + for arg in args + ) + ] + return field_type + + +def field_parser( + field_name: str, field_type: type, new_basemodel_classes: Dict[str, BaseModel] +): + """Parses a field annotation and generates a pydantic friendly one by + extracting relevant information. + + Parameters + ---------------- + field_name : str + Name of the field being parsed. + field_type : type + Annotation to be parsed. + new_basemodel_classes : dict + Dict where keys are names of _TypedDictMeta classes referenced inside + the TypedDict being converted to a BaseModel, and values are BaseModel + classes that have already been generated from those TypedDicts - + pydantic will throw an error if we try to generate the same BaseModel + twice so we just reference the one already created if multiple + annotations have the same TypedDict. + + Returns + ----------------- + field_info : type + The field_type inputted, but parsed to be pydantic friendly + """ + field_type, is_not_required = field_is_not_required( + field_type, remove_origin_if_NotRequired=True + ) + + as_ref, field_info = get_annotation_contents(field_type) + + field_type = get_field_type( + field_name, + field_type, + new_basemodel_classes, + ) + + if as_ref: + field_type = parse_AsRef(field_type, field_info, as_ref, new_basemodel_classes) + field_info.regex = None + + if is_not_required: + field_type = Optional[field_type] + + return Annotated[field_type, field_info] + + +class Config(BaseConfig): + """Config for generated BaseModel.""" + + def alias_generator(string_to_be_aliased): + """Alias in snake case""" + return re.sub(r"(? dict: + """Sorts the schema alphabetically by key name, exchanging the + properties dicts for OrderedDicts""" + schema = OrderedDict(sorted(list(schema.items()), key=lambda x: x[0])) + + return schema + + +def sort_schema(schema: dict) -> dict: + if isinstance(schema, dict): + schema = OrderedDict( + sorted( + list(schema.items()), key=lambda x: SortOrder.get(x[0], len(SortOrder)) + ) + ) + + for key in schema: + if key in ("definitions", "properties", "required"): + if isinstance(schema[key], dict): + schema[key] = sort_alphabetically(schema[key]) + for key2 in schema[key]: + if isinstance(schema[key][key2], dict): + schema[key][key2] = sort_schema(schema[key][key2]) + elif isinstance(schema[key], list): + schema[key].sort() + + return schema + + +SortOrder = { + "title": 0, + "description": 1, + "type": 2, + "definitions": 3, + "properties": 4, + "required": 5, + "additionalProperties": 6, + "patternProperties": 7, +} + + +# From https://github.com/pydantic/pydantic/issues/760#issuecomment-589708485 +def parse_typeddict_to_schema( + typed_dict: _TypedDictMeta, + out_dir: Optional[Path] = None, + return_basemodel: bool = False, + new_basemodel_classes: Optional[Dict[str, BaseModel]] = None, + sort: bool = True, +) -> Union[Type[BaseModel], Dict[str, type]]: + """Takes a TypedDict and generates a jsonschema from it. + + Parameters + ---------- + typed_dict : _TypedDictMeta + The typeddict to be converted to a pydantic basemodel. + out_dir: Optional[Path] + Optionally provide a directory to store the generated json schema from + the basemodel, if None then the dictionary schema won't be saved + to disk. + return_basemodel : bool + Optionally return the basemodel as soon as it's generated, rather than + converting it to a dictionary. Required for converting TypedDicts + within documents. + new_basemodel_classes : Dict[str, BaseModel] + Optionally provide basemodel classes already generated during a + conversion. Required for when the function is called recursively. + sort : bool + If true, sort the properties keys in the outputted schema. + + Returns + ------- + Either the generated BaseModel or the schema dictionary generated from it, + depending on if return_basemodel is True. + """ + + if new_basemodel_classes is None: + new_basemodel_classes = {} + annotations: Dict[str, type] = {} + + typed_dict = format_all_annotations( + typed_dict, new_basemodel_classes=new_basemodel_classes + ) + + for name, field in typed_dict.__annotations__.items(): + default_value = getattr(typed_dict, name, ...) + annotations[name] = (field, default_value) + + model = create_model(typed_dict.__name__, __config__=Config, **annotations) + + # Docstring is used as the description field. + model.__doc__ = typed_dict.__doc__ + + if return_basemodel: + return model + + # title goes to snake_case + model.__name__ = Config.alias_generator(typed_dict.__name__).lower() + + model_schema = model.schema(by_alias=True) + model_schema = strip_newline_literal(model_schema) + + # Add the manually defined extra stuff + if typed_dict in extra_schema: + model_schema = merge_dicts(extra_schema[typed_dict], model_schema) + + if sort: + model_schema = sort_schema(model_schema) + + if out_dir: + with open(out_dir / f'{model_schema["title"]}.json', "w+") as f: + json.dump(model_schema, f, indent=3) + + return model_schema + + +def generate_all_schema(schema_out_dir: Path = SCHEMA_OUT_DIR) -> None: + """Generates all schema in the documents directory.""" + parse_typeddict_to_schema(DatumPage, out_dir=schema_out_dir) + parse_typeddict_to_schema(Datum, out_dir=schema_out_dir) + parse_typeddict_to_schema(EventDescriptor, out_dir=schema_out_dir) + parse_typeddict_to_schema(EventPage, out_dir=schema_out_dir) + parse_typeddict_to_schema(Event, out_dir=schema_out_dir) + parse_typeddict_to_schema(Resource, out_dir=schema_out_dir) + parse_typeddict_to_schema(RunStart, out_dir=schema_out_dir) + parse_typeddict_to_schema(RunStop, out_dir=schema_out_dir) + parse_typeddict_to_schema(StreamDatum, out_dir=schema_out_dir) + parse_typeddict_to_schema(StreamResource, out_dir=schema_out_dir) + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("--schema_out_directory", default=SCHEMA_OUT_DIR, nargs="?") + args = parser.parse_args() + generate_all_schema(schema_out_dir=args.schema_out_directory) diff --git a/event_model/documents/resource.py b/event_model/documents/resource.py new file mode 100644 index 00000000..0c8efa63 --- /dev/null +++ b/event_model/documents/resource.py @@ -0,0 +1,63 @@ +from typing import Any, Dict + +from typing_extensions import Annotated, Literal, NotRequired, TypedDict + +from .generate.type_wrapper import Field, add_extra_schema + +RESOURCE_EXTRA_SCHEMA = {"additionalProperties": False} + + +class PartialResource(TypedDict): + """ + Fields seperated from the complete Resource for use by the protocols. + """ + + path_semantics: NotRequired[ + Annotated[ + Literal["posix", "windows"], + Field(description="Rules for joining paths"), + ] + ] + resource_kwargs: Annotated[ + Dict[str, Any], + Field( + description="Additional argument to pass to the Handler to read a Resource" + ), + ] + root: Annotated[ + str, + Field( + description="Subset of resource_path that is a local detail, not semantic." + ), + ] + spec: Annotated[ + str, + Field( + description="String identifying the format/type of this Resource, used to " + "identify a compatible Handler", + ), + ] + resource_path: Annotated[ + str, Field(description="Filepath or URI for locating this resource") + ] + uid: Annotated[ + str, Field(description="Globally unique identifier for this Resource") + ] + + +@add_extra_schema(RESOURCE_EXTRA_SCHEMA) +class Resource(PartialResource): + """ + Document to reference a collection (e.g. file or group of files) of + externally-stored data + """ + + run_start: NotRequired[ + Annotated[ + str, + Field( + description="Globally unique ID to the run_start document this " + "resource is associated with.", + ), + ] + ] diff --git a/event_model/documents/run_start.py b/event_model/documents/run_start.py new file mode 100644 index 00000000..e6dca71d --- /dev/null +++ b/event_model/documents/run_start.py @@ -0,0 +1,213 @@ +from typing import Any, Dict, List, Union + +from typing_extensions import Annotated, Literal, NotRequired, TypedDict + +from event_model.documents.generate.type_wrapper import Field, add_extra_schema + +from .generate.type_wrapper import AsRef + + +class Hints(TypedDict): + """Start-level hints""" + + dimensions: NotRequired[ + Annotated[ + List[List[Union[List[str], str]]], + Field( + description="The independent axes of the experiment. " + "Ordered slow to fast", + ), + ] + ] + + +class Calculation(TypedDict): + args: NotRequired[List] + callable: Annotated[ + str, Field(description="callable function to perform calculation") + ] + kwargs: NotRequired[ + Annotated[Dict[str, Any], Field(description="kwargs for calcalation callable")] + ] + + +class Projection(TypedDict): + """Where to get the data from""" + + calculation: NotRequired[ + Annotated[ + Calculation, + Field( + description="required fields if type is calculated", + title="calculation properties", + ), + ] + ] + config_index: NotRequired[int] + config_device: NotRequired[str] + field: NotRequired[str] + location: NotRequired[ + Annotated[ + Literal["start", "event", "configuration"], + Field( + description="start comes from metadata fields in the start document, " + "event comes from event, configuration comes from configuration " + "fields in the event_descriptor document" + ), + ] + ] + stream: NotRequired[str] + type: NotRequired[ + Annotated[ + Literal["linked", "calculated", "static"], + Field( + description="linked: a value linked from the data set, " + "calculated: a value that requires calculation, " + "static: a value defined here in the projection ", + ), + ] + ] + value: NotRequired[ + Annotated[ + Any, + Field( + description="value explicitely defined in the projection " + "when type==static." + ), + ] + ] + + +RUN_START_EXTRA_SCHEMA = { + "definitions": { + "DataType": { + "patternProperties": {"^([^./]+)$": {"$ref": "#/definitions/DataType"}}, + "additionalProperties": False, + }, + "Projection": { + "allOf": [ + { + "if": { + "allOf": [ + {"properties": {"location": {"enum": ["configuration"]}}}, + {"properties": {"type": {"enum": "linked"}}}, + ] + }, + "then": { + "required": [ + "type", + "location", + "config_index", + "config_device", + "field", + "stream", + ] + }, + }, + { + "if": { + "allOf": [ + {"properties": {"location": {"enum": "event"}}}, + {"properties": {"type": {"enum": "linked"}}}, + ] + }, + "then": {"required": ["type", "location", "field", "stream"]}, + }, + { + "if": { + "allOf": [ + {"properties": {"location": {"enum": "event"}}}, + {"properties": {"type": {"enum": "calculated"}}}, + ] + }, + "then": {"required": ["type", "field", "stream", "calculation"]}, + }, + { + "if": {"properties": {"type": {"enum": "static"}}}, + "then": {"required": ["type", "value"]}, + }, + ], + }, + }, + "properties": { + "hints": { + "additionalProperties": False, + "patternProperties": {"^([^.]+)$": {"$ref": "#/definitions/DataType"}}, + }, + }, + "patternProperties": {"^([^./]+)$": {"$ref": "#/definitions/DataType"}}, + "additionalProperties": False, +} + + +class Projections(TypedDict): + """Describe how to interperet this run as the given projection""" + + configuration: Annotated[ + Dict[str, Any], Field(description="Static information about projection") + ] + name: NotRequired[Annotated[str, Field(description="The name of the projection")]] + projection: Annotated[Dict[Any, Projection], Field(description="")] + version: Annotated[ + str, + Field( + description="The version of the projection spec. Can specify the version " + "of an external specification.", + ), + ] + + +@add_extra_schema(RUN_START_EXTRA_SCHEMA) +class RunStart(TypedDict): + """ + Document created at the start of run. Provides a seach target and + later documents link to it + """ + + data_groups: NotRequired[ + Annotated[ + List[str], + Field( + description="An optional list of data access groups that have meaning " + "to some external system. Examples might include facility, beamline, " + "end stations, proposal, safety form.", + ), + ] + ] + data_session: NotRequired[ + Annotated[ + str, + Field( + description="An optional field for grouping runs. The meaning is " + "not mandated, but this is a data management grouping and not a " + "scientific grouping. It is intended to group runs in a visit or " + "set of trials.", + ), + ] + ] + data_type: NotRequired[Annotated[Any, Field(description=""), AsRef("DataType")]] + group: NotRequired[ + Annotated[str, Field(description="Unix group to associate this data with")] + ] + hints: NotRequired[Annotated[Hints, Field(description="Start-level hints")]] + owner: NotRequired[ + Annotated[str, Field(description="Unix owner to associate this data with")] + ] + project: NotRequired[ + Annotated[str, Field(description="Name of project that this run is part of")] + ] + projections: NotRequired[Annotated[List[Projections], Field(description="")]] + sample: NotRequired[ + Annotated[ + Union[Dict[str, Any], str], + Field( + description="Information about the sample, may be a UID to " + "another collection" + ), + ] + ] + scan_id: NotRequired[ + Annotated[int, Field(description="Scan ID number, not globally unique")] + ] + time: Annotated[float, Field(description="Time the run started. Unix epoch time")] + uid: Annotated[str, Field(description="Globally unique ID for this run")] diff --git a/event_model/documents/run_stop.py b/event_model/documents/run_stop.py new file mode 100644 index 00000000..25bda130 --- /dev/null +++ b/event_model/documents/run_stop.py @@ -0,0 +1,46 @@ +from typing import Any, Dict + +from typing_extensions import Annotated, Literal, NotRequired, TypedDict + +from .generate.type_wrapper import AsRef, Field, add_extra_schema + +RUN_STOP_EXTRA_SCHEMA = { + "patternProperties": {"^([^./]+)$": {"$ref": "#/definitions/DataType"}}, + "additionalProperties": False, +} + + +@add_extra_schema(RUN_STOP_EXTRA_SCHEMA) +class RunStop(TypedDict): + """ + Document for the end of a run indicating the success/fail state of the + run and the end time + """ + + data_type: NotRequired[ + Annotated[Any, Field(description="data_type"), AsRef("DataType")] + ] + exit_status: Annotated[ + Literal["success", "abort", "fail"], + Field(description="State of the run when it ended"), + ] + num_events: NotRequired[ + Annotated[ + Dict[str, int], + Field( + description="Number of Events per named stream", + ), + ] + ] + reason: NotRequired[ + Annotated[str, Field(description="Long-form description of why the run ended")] + ] + run_start: Annotated[ + str, + Field( + description="Reference back to the run_start document that this document " + "is paired with.", + ), + ] + time: Annotated[float, Field(description="The time the run ended. Unix epoch")] + uid: Annotated[str, Field(description="Globally unique ID for this document")] diff --git a/event_model/documents/stream_datum.py b/event_model/documents/stream_datum.py new file mode 100644 index 00000000..43d7acc1 --- /dev/null +++ b/event_model/documents/stream_datum.py @@ -0,0 +1,65 @@ +from typing import Dict, List + +from typing_extensions import Annotated, TypedDict + +from .generate.type_wrapper import Field, add_extra_schema + +STREAM_DATUM_EXTRA_SCHEMA = {"additionalProperties": False} + + +@add_extra_schema(STREAM_DATUM_EXTRA_SCHEMA) +class StreamDatum(TypedDict): + """Document to reference a quanta of an externally-stored stream of data.""" + + block_idx: Annotated[ + int, + Field( + description="The order in the stream of this block of data. This must " + "be contiguous for a given stream.", + ), + ] + event_count: Annotated[ + int, Field(description="The number of events in this datum.") + ] + event_offset: Annotated[ + int, + Field( + description="The sequence number of the first event in this block. This " + "increasing value allows the presence of gaps.", + ), + ] + stream_resource: Annotated[ + str, + Field( + description="The UID of the Stream Resource to which this Datum belongs." + ), + ] + uid: Annotated[ + str, + Field( + description="Globally unique identifier for this Datum. A suggested " + "formatting being '//", + ), + ] + + data_keys: Annotated[ + List[str], + Field( + description="A list to show which data_keys of the " + "Descriptor are being streamed" + ), + ] + seq_nums: Annotated[ + Dict[str, int], + Field( + description="A slice object showing the Event numbers the " + "resource corresponds to" + ), + ] + indices: Annotated[ + Dict[str, int], + Field( + description="A slice object passed to the StreamResource " + "handler so it can hand back data and timestamps" + ), + ] diff --git a/event_model/documents/stream_resource.py b/event_model/documents/stream_resource.py new file mode 100644 index 00000000..c40ea74b --- /dev/null +++ b/event_model/documents/stream_resource.py @@ -0,0 +1,59 @@ +from typing import Any, Dict, List + +from typing_extensions import Annotated, Literal, NotRequired, TypedDict + +from .generate.type_wrapper import Field, add_extra_schema + +STREAM_RESOURCE_EXTRA_SCHEMA = {"additionalProperties": False} + + +@add_extra_schema(STREAM_RESOURCE_EXTRA_SCHEMA) +class StreamResource(TypedDict): + """ + Document to reference a collection (e.g. file or group of files) of + externally-stored data streams + """ + + stream_names: Annotated[List[str], Field(description="List of datum names")] + + path_semantics: NotRequired[ + Annotated[ + Literal["posix", "windows"], + Field(description="Rules for joining paths"), + ] + ] + resource_kwargs: Annotated[ + Dict[str, Any], + Field( + description="Additional argument to pass to the Handler to read a " + "Stream Resource", + ), + ] + resource_path: Annotated[ + str, Field(description="Filepath or URI for locating this resource") + ] + root: Annotated[ + str, + Field( + description="Subset of resource_path that is a local detail, not semantic" + ), + ] + run_start: NotRequired[ + Annotated[ + str, + Field( + description="Globally unique ID to the run_start document " + "this Stream Resource is associated with", + ), + ] + ] + spec: Annotated[ + str, + Field( + description="String identifying the format/type of this Stream Resource, " + "used to identify a compatible Handler", + ), + ] + uid: Annotated[ + str, Field(description="Globally unique identifier for this Stream Resource") + ] diff --git a/event_model/schemas/datum.json b/event_model/schemas/datum.json index 22e04769..8d1bd088 100644 --- a/event_model/schemas/datum.json +++ b/event_model/schemas/datum.json @@ -4,22 +4,25 @@ "type": "object", "properties": { "datum_id": { + "title": "Datum Id", "description": "Globally unique identifier for this Datum (akin to 'uid' for other Document types), typically formatted as '/'", "type": "string" }, "datum_kwargs": { + "title": "Datum Kwargs", "description": "Arguments to pass to the Handler to retrieve one quanta of data", "type": "object" }, "resource": { + "title": "Resource", "description": "The UID of the Resource to which this Datum belongs", "type": "string" } }, "required": [ + "datum_id", "datum_kwargs", - "resource", - "datum_id" + "resource" ], "additionalProperties": false } \ No newline at end of file diff --git a/event_model/schemas/datum_page.json b/event_model/schemas/datum_page.json index 8584b1b0..2ed83ad4 100644 --- a/event_model/schemas/datum_page.json +++ b/event_model/schemas/datum_page.json @@ -1,38 +1,45 @@ { - "title": "datum", + "title": "datum_page", "description": "Page of documents to reference a quanta of externally-stored data", "type": "object", "definitions": { - "dataframe": { - "title": "dataframe", - "description": "A DataFrame-like object", - "type": "object", - "additionalProperties": { - "type": "array" + "Dataframe": { + "title": "Dataframe", + "type": "array", + "items": { + "type": "string" } } }, "properties": { "datum_id": { + "title": "Datum Id", "description": "Array unique identifiers for each Datum (akin to 'uid' for other Document types), typically formatted as '/'", - "type": "array", - "items": { - "type": "string" - } + "allOf": [ + { + "$ref": "#/definitions/Dataframe" + } + ] }, "datum_kwargs": { + "title": "Datum Kwargs", "description": "Array of arguments to pass to the Handler to retrieve one quanta of data", - "$ref": "#/definitions/dataframe" + "type": "object", + "additionalProperties": { + "type": "array", + "items": {} + } }, "resource": { + "title": "Resource", "description": "The UID of the Resource to which all Datums in the page belong", "type": "string" } }, "required": [ - "resource", + "datum_id", "datum_kwargs", - "datum_id" + "resource" ], "additionalProperties": false } \ No newline at end of file diff --git a/event_model/schemas/event.json b/event_model/schemas/event.json index 901af7c5..ab07d9df 100644 --- a/event_model/schemas/event.json +++ b/event_model/schemas/event.json @@ -4,47 +4,58 @@ "type": "object", "properties": { "data": { + "title": "Data", "description": "The actual measurement data", "type": "object" }, "descriptor": { + "title": "Descriptor", "description": "UID of the EventDescriptor to which this Event belongs", "type": "string" }, "filled": { + "title": "Filled", "description": "Mapping each of the keys of externally-stored data to the boolean False, indicating that the data has not been loaded, or to foreign keys (moved here from 'data' when the data was loaded)", "type": "object", "additionalProperties": { - "type": [ - "boolean", - "string" + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "string" + } ] } }, "seq_num": { + "title": "Seq Num", "description": "Sequence number to identify the location of this Event in the Event stream", "type": "integer" }, "time": { + "title": "Time", "description": "The event time. This maybe different than the timestamps on each of the data entries.", "type": "number" }, "timestamps": { + "title": "Timestamps", "description": "The timestamps of the individual measurement data", "type": "object" }, "uid": { + "title": "Uid", "description": "Globally unique identifier for this Event", "type": "string" } }, "required": [ - "uid", "data", - "timestamps", - "time", "descriptor", - "seq_num" + "seq_num", + "time", + "timestamps", + "uid" ], "additionalProperties": false } \ No newline at end of file diff --git a/event_model/schemas/event_descriptor.json b/event_model/schemas/event_descriptor.json index e3efa770..c6269b5d 100644 --- a/event_model/schemas/event_descriptor.json +++ b/event_model/schemas/event_descriptor.json @@ -3,32 +3,37 @@ "description": "Document to describe the data captured in the associated event documents", "type": "object", "definitions": { - "configuration": { + "Configuration": { + "title": "Configuration", "type": "object", "properties": { "data": { + "title": "Data", "description": "The actual measurement data", "type": "object" }, "data_keys": { + "title": "Data Keys", "description": "This describes the data stored alongside it in this configuration object.", "type": "object", "additionalProperties": { - "$ref": "#/definitions/data_key" + "$ref": "#/definitions/DataKey" } }, "timestamps": { + "title": "Timestamps", "description": "The timestamps of the individual measurement data", "type": "object" } } }, - "data_key": { - "title": "data_key", + "DataKey": { + "title": "DataKey", "description": "Describes the objects in the data property of Event documents", "type": "object", "properties": { "dims": { + "title": "Dims", "description": "The names for dimensions of the data. Null or empty list if scalar data", "type": "array", "items": { @@ -36,6 +41,7 @@ } }, "dtype": { + "title": "Dtype", "description": "The type of the data in the event.", "type": "string", "enum": [ @@ -47,15 +53,23 @@ ] }, "external": { - "description": "Where the data is stored if it is stored external to the events.", + "title": "External", + "description": "Where the data is stored if it is stored external to the events", "type": "string", "pattern": "^[A-Z]+:?" }, "object_name": { + "title": "Object Name", "description": "The name of the object this key was pulled from.", "type": "string" }, + "precision": { + "title": "Precision", + "description": "Number of digits after decimal place if a floating point number", + "type": "integer" + }, "shape": { + "title": "Shape", "description": "The shape of the data. Empty list indicates scalar data.", "type": "array", "items": { @@ -63,38 +77,38 @@ } }, "source": { + "title": "Source", "description": "The source (ex piece of hardware) of the data.", "type": "string" + }, + "units": { + "title": "Units", + "description": "Engineering units of the value", + "type": "string" } }, "required": [ - "source", "dtype", - "shape" + "shape", + "source" ] }, - "data_type": { - "title": "data_type", - "additionalProperties": false, - "patternProperties": { - "^([^./]+)$": { - "$ref": "#/definitions/data_type" - } - } - }, - "object_hints": { - "title": "Object Hints", + "DataType": { + "title": "DataType", "additionalProperties": false, "patternProperties": { "^([^./]+)$": { - "$ref": "#/definitions/per_object_hint" + "$ref": "#/definitions/DataType" } } }, - "per_object_hint": { + "PerObjectHint": { + "title": "PerObjectHint", + "description": "The 'interesting' data keys for this device.", "type": "object", "properties": { "fields": { + "title": "Fields", "description": "The 'interesting' data keys for this device.", "type": "array", "items": { @@ -106,10 +120,11 @@ }, "properties": { "configuration": { + "title": "Configuration", "description": "Readings of configurational fields necessary for interpreting data in the Events.", "type": "object", "additionalProperties": { - "$ref": "#/definitions/configuration" + "$ref": "#/definitions/Configuration" } }, "data_keys": { @@ -117,25 +132,29 @@ "description": "This describes the data in the Event Documents.", "type": "object", "additionalProperties": { - "$ref": "#/definitions/data_key" + "$ref": "#/definitions/DataKey" } }, "hints": { - "$ref": "#/definitions/object_hints" + "$ref": "#/definitions/PerObjectHint" }, "name": { + "title": "Name", "description": "A human-friendly name for this data stream, such as 'primary' or 'baseline'.", "type": "string" }, "object_keys": { + "title": "Object Keys", "description": "Maps a Device/Signal name to the names of the entries it produces in data_keys.", "type": "object" }, "run_start": { + "title": "Run Start", "description": "Globally unique ID of this run's 'start' document.", "type": "string" }, "time": { + "title": "Time", "description": "Creation time of the document as unix epoch time.", "type": "number" }, @@ -146,15 +165,15 @@ } }, "required": [ - "uid", "data_keys", "run_start", - "time" + "time", + "uid" ], "additionalProperties": false, "patternProperties": { "^([^./]+)$": { - "$ref": "#/definitions/data_type" + "$ref": "#/definitions/DataType" } } } \ No newline at end of file diff --git a/event_model/schemas/event_page.json b/event_model/schemas/event_page.json index 6cbbcdd1..328ea66e 100644 --- a/event_model/schemas/event_page.json +++ b/event_model/schemas/event_page.json @@ -3,24 +3,27 @@ "description": "Page of documents to record a quanta of collected data", "type": "object", "definitions": { - "dataframe": { - "title": "dataframe", - "description": "A DataFrame-like object", + "Dataframe": { + "title": "Dataframe", "type": "object", "additionalProperties": { - "type": "array" + "type": "array", + "items": {} } }, - "dataframe_for_filled": { - "title": "dataframe_for_filled", - "description": "A DataFrame-like object with boolean or string entries", + "DataframeForFilled": { + "title": "DataframeForFilled", "type": "object", "additionalProperties": { "type": "array", "items": { - "type": [ - "boolean", - "string" + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "string" + } ] } } @@ -28,18 +31,30 @@ }, "properties": { "data": { + "title": "Data", "description": "The actual measurement data", - "$ref": "#/definitions/dataframe" + "allOf": [ + { + "$ref": "#/definitions/Dataframe" + } + ] }, "descriptor": { + "title": "Descriptor", "description": "The UID of the EventDescriptor to which all of the Events in this page belong", "type": "string" }, "filled": { + "title": "Filled", "description": "Mapping each of the keys of externally-stored data to an array containing the boolean False, indicating that the data has not been loaded, or to foreign keys (moved here from 'data' when the data was loaded)", - "$ref": "#/definitions/dataframe_for_filled" + "allOf": [ + { + "$ref": "#/definitions/DataframeForFilled" + } + ] }, "seq_num": { + "title": "Seq Num", "description": "Array of sequence numbers to identify the location of each Event in the Event stream", "type": "array", "items": { @@ -47,6 +62,7 @@ } }, "time": { + "title": "Time", "description": "Array of Event times. This maybe different than the timestamps on each of the data entries", "type": "array", "items": { @@ -54,10 +70,16 @@ } }, "timestamps": { + "title": "Timestamps", "description": "The timestamps of the individual measurement data", - "$ref": "#/definitions/dataframe" + "allOf": [ + { + "$ref": "#/definitions/Dataframe" + } + ] }, "uid": { + "title": "Uid", "description": "Array of globally unique identifiers for each Event", "type": "array", "items": { @@ -66,12 +88,12 @@ } }, "required": [ - "descriptor", - "uid", "data", - "timestamps", + "descriptor", + "seq_num", "time", - "seq_num" + "timestamps", + "uid" ], "additionalProperties": false } \ No newline at end of file diff --git a/event_model/schemas/resource.json b/event_model/schemas/resource.json index 051e7f85..5421b607 100644 --- a/event_model/schemas/resource.json +++ b/event_model/schemas/resource.json @@ -4,6 +4,7 @@ "type": "object", "properties": { "path_semantics": { + "title": "Path Semantics", "description": "Rules for joining paths", "type": "string", "enum": [ @@ -12,35 +13,41 @@ ] }, "resource_kwargs": { + "title": "Resource Kwargs", "description": "Additional argument to pass to the Handler to read a Resource", "type": "object" }, "resource_path": { + "title": "Resource Path", "description": "Filepath or URI for locating this resource", "type": "string" }, "root": { + "title": "Root", "description": "Subset of resource_path that is a local detail, not semantic.", "type": "string" }, "run_start": { + "title": "Run Start", "description": "Globally unique ID to the run_start document this resource is associated with.", "type": "string" }, "spec": { + "title": "Spec", "description": "String identifying the format/type of this Resource, used to identify a compatible Handler", "type": "string" }, "uid": { + "title": "Uid", "description": "Globally unique identifier for this Resource", "type": "string" } }, "required": [ - "spec", - "resource_path", "resource_kwargs", + "resource_path", "root", + "spec", "uid" ], "additionalProperties": false diff --git a/event_model/schemas/run_start.json b/event_model/schemas/run_start.json index fe74d4ce..3fada997 100644 --- a/event_model/schemas/run_start.json +++ b/event_model/schemas/run_start.json @@ -1,52 +1,99 @@ { - "description": "Document created at the start of run. Provides a search target and later documents link to it", + "title": "run_start", + "description": "Document created at the start of run. Provides a seach target and later documents link to it", "type": "object", "definitions": { - "data_type": { - "title": "data_type", + "Calculation": { + "title": "Calculation", + "type": "object", + "properties": { + "args": { + "title": "Args", + "type": "array", + "items": {} + }, + "callable": { + "title": "Callable", + "description": "callable function to perform calculation", + "type": "string" + }, + "kwargs": { + "title": "Kwargs", + "description": "kwargs for calcalation callable", + "type": "object" + } + }, + "required": [ + "callable" + ] + }, + "DataType": { + "title": "DataType", "additionalProperties": false, "patternProperties": { "^([^./]+)$": { - "$ref": "#/definitions/data_type" + "$ref": "#/definitions/DataType" } } }, - "projection": { + "Hints": { + "title": "Hints", + "description": "Start-level hints", + "type": "object", + "properties": { + "dimensions": { + "title": "Dimensions", + "description": "The independent axes of the experiment. Ordered slow to fast", + "type": "array", + "items": { + "type": "array", + "items": { + "anyOf": [ + { + "type": "array", + "items": { + "type": "string" + } + }, + { + "type": "string" + } + ] + } + } + } + } + }, + "Projection": { + "title": "Projection", "description": "Where to get the data from", "type": "object", "properties": { "calculation": { "title": "calculation properties", "description": "required fields if type is calculated", - "properties": { - "args": { - "description": "args for calculation callable", - "type": "array" - }, - "callable": { - "description": "callable function to perform calculation", - "type": "string" - }, - "kwargs": { - "description": "kwargs for calculation callable", - "type": "object" + "allOf": [ + { + "$ref": "#/definitions/Calculation" } - }, - "required": [ - "callable" ] }, "config_device": { + "title": "Config Device", "type": "string" }, "config_index": { + "title": "Config Index", "type": "integer" }, "field": { + "title": "Field", "type": "string" }, "location": { + "title": "Location", "description": "start comes from metadata fields in the start document, event comes from event, configuration comes from configuration fields in the event_descriptor document", + "type": "string", "enum": [ "start", "event", @@ -54,10 +101,13 @@ ] }, "stream": { + "title": "Stream", "type": "string" }, "type": { + "title": "Type", "description": "linked: a value linked from the data set, calculated: a value that requires calculation, static: a value defined here in the projection ", + "type": "string", "enum": [ "linked", "calculated", @@ -65,10 +115,10 @@ ] }, "value": { - "description": "value explicitly defined in the projection when type==static." + "title": "Value", + "description": "value explicitely defined in the projection when type==static." } }, - "additionalProperties": false, "allOf": [ { "if": { @@ -85,9 +135,7 @@ { "properties": { "type": { - "enum": [ - "linked" - ] + "enum": "linked" } } } @@ -110,18 +158,14 @@ { "properties": { "location": { - "enum": [ - "event" - ] + "enum": "event" } } }, { "properties": { "type": { - "enum": [ - "linked" - ] + "enum": "linked" } } } @@ -142,18 +186,14 @@ { "properties": { "location": { - "enum": [ - "event" - ] + "enum": "event" } } }, { "properties": { "type": { - "enum": [ - "calculated" - ] + "enum": "calculated" } } } @@ -172,9 +212,7 @@ "if": { "properties": { "type": { - "enum": [ - "static" - ] + "enum": "static" } } }, @@ -187,41 +225,44 @@ } ] }, - "projections": { - "title": "Describe how to interpret this run as the given projection", + "Projections": { + "title": "Projections", + "description": "Describe how to interperet this run as the given projection", + "type": "object", "properties": { "configuration": { + "title": "Configuration", "description": "Static information about projection", "type": "object" }, "name": { + "title": "Name", "description": "The name of the projection", "type": "string" }, "projection": { + "title": "Projection", "type": "object", - "additionalProperties": false, - "patternProperties": { - ".": { - "$ref": "#/definitions/projection" - } + "additionalProperties": { + "$ref": "#/definitions/Projection" } }, "version": { + "title": "Version", "description": "The version of the projection spec. Can specify the version of an external specification.", "type": "string" } }, "required": [ + "configuration", "projection", - "version", - "configuration" - ], - "additionalProperties": false + "version" + ] } }, "properties": { "data_groups": { + "title": "Data Groups", "description": "An optional list of data access groups that have meaning to some external system. Examples might include facility, beamline, end stations, proposal, safety form.", "type": "array", "items": { @@ -229,90 +270,86 @@ } }, "data_session": { + "title": "Data Session", "description": "An optional field for grouping runs. The meaning is not mandated, but this is a data management grouping and not a scientific grouping. It is intended to group runs in a visit or set of trials.", "type": "string" }, + "data_type": { + "$ref": "#/definitions/DataType" + }, "group": { + "title": "Group", "description": "Unix group to associate this data with", "type": "string" }, "hints": { + "title": "Hints", "description": "Start-level hints", - "type": "object", - "properties": { - "dimensions": { - "description": "The independent axes of the experiment. Ordered slow to fast", - "type": "array", - "items": { - "type": "array", - "description": "Each entry in this list is of the from ([, ...], ). A 1d scan will have 1 such entry, a scan with 3 independent entries would have 3", - "items": [ - { - "type": "array", - "description": "The data key(s) for the given dimension.", - "items": { - "type": "string" - } - }, - { - "type": "string", - "description": "The stream to find the datakeys in." - } - ], - "additionalItems": false, - "minItems": 2 - } - } - }, "additionalProperties": false, "patternProperties": { "^([^.]+)$": { - "$ref": "#/definitions/data_type" + "$ref": "#/definitions/DataType" } - } + }, + "allOf": [ + { + "$ref": "#/definitions/Hints" + } + ] }, "owner": { + "title": "Owner", "description": "Unix owner to associate this data with", "type": "string" }, "project": { + "title": "Project", "description": "Name of project that this run is part of", "type": "string" }, "projections": { + "title": "Projections", "type": "array", "items": { - "$ref": "#/definitions/projections" + "$ref": "#/definitions/Projections" } }, "sample": { + "title": "Sample", "description": "Information about the sample, may be a UID to another collection", - "type": [ - "object", - "string" + "anyOf": [ + { + "type": "object" + }, + { + "type": "string" + } ] }, "scan_id": { + "title": "Scan Id", "description": "Scan ID number, not globally unique", "type": "integer" }, "time": { + "title": "Time", "description": "Time the run started. Unix epoch time", "type": "number" }, "uid": { + "title": "Uid", "description": "Globally unique ID for this run", "type": "string" } }, "required": [ - "uid", - "time" + "time", + "uid" ], "additionalProperties": false, "patternProperties": { "^([^./]+)$": { - "$ref": "#/definitions/data_type" + "$ref": "#/definitions/DataType" } } } \ No newline at end of file diff --git a/event_model/schemas/run_stop.json b/event_model/schemas/run_stop.json index 64e2e959..236ffab8 100644 --- a/event_model/schemas/run_stop.json +++ b/event_model/schemas/run_stop.json @@ -1,19 +1,24 @@ { + "title": "run_stop", "description": "Document for the end of a run indicating the success/fail state of the run and the end time", "type": "object", "definitions": { - "data_type": { - "title": "data_type", - "additionalProperties": false, - "patternProperties": { - "^([^./]+)$": { - "$ref": "#/definitions/data_type" - } - } + "DataType": { + "title": "DataType" } }, "properties": { + "data_type": { + "title": "Data Type", + "description": "data_type", + "allOf": [ + { + "$ref": "#/definitions/DataType" + } + ] + }, "exit_status": { + "title": "Exit Status", "description": "State of the run when it ended", "type": "string", "enum": [ @@ -23,40 +28,44 @@ ] }, "num_events": { + "title": "Num Events", "description": "Number of Events per named stream", "type": "object", - "properties": {}, "additionalProperties": { "type": "integer" } }, "reason": { + "title": "Reason", "description": "Long-form description of why the run ended", "type": "string" }, "run_start": { + "title": "Run Start", "description": "Reference back to the run_start document that this document is paired with.", "type": "string" }, "time": { + "title": "Time", "description": "The time the run ended. Unix epoch", "type": "number" }, "uid": { + "title": "Uid", "description": "Globally unique ID for this document", "type": "string" } }, "required": [ - "uid", + "exit_status", "run_start", "time", - "exit_status" + "uid" ], "additionalProperties": false, "patternProperties": { "^([^./]+)$": { - "$ref": "#/definitions/data_type" + "$ref": "#/definitions/DataType" } } } \ No newline at end of file diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index 898e8310..e6848417 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -4,42 +4,64 @@ "type": "object", "properties": { "block_idx": { + "title": "Block Idx", "description": "The order in the stream of this block of data. This must be contiguous for a given stream.", "type": "integer" }, - "datum_kwargs": { - "description": "Arguments to pass to the Handler to retrieve one quanta of data", - "type": "object" + "data_keys": { + "title": "Data Keys", + "description": "A list to show which data_keys of the Descriptor are being streamed", + "type": "array", + "items": { + "type": "string" + } }, "event_count": { + "title": "Event Count", "description": "The number of events in this datum.", "type": "integer" }, "event_offset": { + "title": "Event Offset", "description": "The sequence number of the first event in this block. This increasing value allows the presence of gaps.", "type": "integer" }, - "stream_name": { - "description": "The name of the stream that this Datum is providing a block of.", - "type": "string" + "indices": { + "title": "Indices", + "description": "A slice object passed to the StreamResource handler so it can hand back data and timestamps", + "type": "object", + "additionalProperties": { + "type": "integer" + } + }, + "seq_nums": { + "title": "Seq Nums", + "description": "A slice object showing the Event numbers the resource corresponds to", + "type": "object", + "additionalProperties": { + "type": "integer" + } }, "stream_resource": { - "description": "The UID of the Stream Resource to which this Datum belongs", + "title": "Stream Resource", + "description": "The UID of the Stream Resource to which this Datum belongs.", "type": "string" }, "uid": { + "title": "Uid", "description": "Globally unique identifier for this Datum. A suggested formatting being '//", "type": "string" } }, "required": [ - "datum_kwargs", - "stream_resource", - "uid", - "stream_name", "block_idx", + "data_keys", "event_count", - "event_offset" + "event_offset", + "indices", + "seq_nums", + "stream_resource", + "uid" ], "additionalProperties": false } \ No newline at end of file diff --git a/event_model/schemas/stream_resource.json b/event_model/schemas/stream_resource.json index a4432c2a..cfcba1a5 100644 --- a/event_model/schemas/stream_resource.json +++ b/event_model/schemas/stream_resource.json @@ -4,6 +4,7 @@ "type": "object", "properties": { "path_semantics": { + "title": "Path Semantics", "description": "Rules for joining paths", "type": "string", "enum": [ @@ -12,46 +13,51 @@ ] }, "resource_kwargs": { + "title": "Resource Kwargs", "description": "Additional argument to pass to the Handler to read a Stream Resource", "type": "object" }, "resource_path": { + "title": "Resource Path", "description": "Filepath or URI for locating this resource", "type": "string" }, "root": { - "description": "Subset of resource_path that is a local detail, not semantic.", + "title": "Root", + "description": "Subset of resource_path that is a local detail, not semantic", "type": "string" }, "run_start": { - "description": "Globally unique ID to the run_start document this Stream Resource is associated with.", + "title": "Run Start", + "description": "Globally unique ID to the run_start document this Stream Resource is associated with", "type": "string" }, "spec": { + "title": "Spec", "description": "String identifying the format/type of this Stream Resource, used to identify a compatible Handler", "type": "string" }, "stream_names": { - "description": "List of the stream names this resource provides", + "title": "Stream Names", + "description": "List of datum names", "type": "array", "items": { "type": "string" - }, - "minItems": 1, - "uniqueItems": true + } }, "uid": { + "title": "Uid", "description": "Globally unique identifier for this Stream Resource", "type": "string" } }, "required": [ - "spec", - "resource_path", "resource_kwargs", + "resource_path", "root", - "uid", - "stream_names" + "spec", + "stream_names", + "uid" ], "additionalProperties": false } \ No newline at end of file diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 899d9d47..ab747108 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -1,6 +1,7 @@ import json import pickle from distutils.version import LooseVersion +from itertools import count import jsonschema import numpy @@ -109,38 +110,26 @@ def test_compose_stream_resource(tmp_path): bundle = event_model.compose_run() compose_stream_resource = bundle.compose_stream_resource assert bundle.compose_stream_resource is compose_stream_resource - stream_names = ["stream_1", "stream_2"] bundle = compose_stream_resource( spec="TIFF_STREAM", root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, - stream_names=stream_names, + counters=[count(1), count(1)], + stream_names=["primary"], ) resource_doc, compose_stream_data = bundle assert bundle.stream_resource_doc is resource_doc assert bundle.compose_stream_data is compose_stream_data assert compose_stream_data[0] is not compose_stream_data[1] datum_doc_0, datum_doc_1 = ( - compose_stream_datum(datum_kwargs={}) - for compose_stream_datum in compose_stream_data + compose_stream_datum([], {}, {}) for compose_stream_datum in compose_stream_data ) # Ensure independent counters assert datum_doc_0["block_idx"] == datum_doc_1["block_idx"] - datum_doc_1a = compose_stream_data[1](datum_kwargs={}) + datum_doc_1a = compose_stream_data[1]([], {}, {}) assert datum_doc_1a["block_idx"] != datum_doc_1["block_idx"] - # Ensure safety check - from itertools import count - - with pytest.raises(KeyError): - event_model.compose_stream_datum( - stream_resource=resource_doc, - stream_name="stream_NA", - counter=count(), - datum_kwargs={}, - ) - def test_round_trip_pagination(): run_bundle = event_model.compose_run() @@ -410,18 +399,17 @@ def test_document_router_streams_smoke_test(tmp_path): compose_stream_resource = run_bundle.compose_stream_resource start = run_bundle.start_doc dr("start", start) - stream_names = ["stream_1", "stream_2"] stream_resource_doc, compose_stream_data = compose_stream_resource( spec="TIFF_STREAM", root=str(tmp_path), + counters=[count(1), count(6)], resource_path="test_streams", resource_kwargs={}, - stream_names=stream_names, + stream_names=["primary"], ) dr("stream_resource", stream_resource_doc) datum_doc_0, datum_doc_1 = ( - compose_stream_datum(datum_kwargs={}) - for compose_stream_datum in compose_stream_data + compose_stream_datum([], {}, {}) for compose_stream_datum in compose_stream_data ) dr("stream_datum", datum_doc_0) dr("stream_datum", datum_doc_1) diff --git a/event_model/tests/test_run_router.py b/event_model/tests/test_run_router.py index 0143b74e..10d3bf10 100644 --- a/event_model/tests/test_run_router.py +++ b/event_model/tests/test_run_router.py @@ -1,4 +1,5 @@ from collections import defaultdict +from itertools import count import numpy import pytest @@ -215,18 +216,17 @@ def test_run_router_streams(tmp_path): bundle.compose_stop(), ) docs.append(("start", start_doc)) - stream_names = ["stream_1", "stream_2"] stream_resource_doc, compose_stream_data = compose_stream_resource( spec="TIFF_STREAM", root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, - stream_names=stream_names, + counters=[count(3), count(2)], + stream_names=["primary"], ) docs.append(("stream_resource", stream_resource_doc)) datum_doc_0, datum_doc_1 = ( - compose_stream_datum(datum_kwargs={}) - for compose_stream_datum in compose_stream_data + compose_stream_datum([], {}, {}) for compose_stream_datum in compose_stream_data ) docs.append(("stream_datum", datum_doc_0)) docs.append(("stream_datum", datum_doc_1)) diff --git a/event_model/tests/test_schema_generation.py b/event_model/tests/test_schema_generation.py new file mode 100644 index 00000000..5d2f4b39 --- /dev/null +++ b/event_model/tests/test_schema_generation.py @@ -0,0 +1,70 @@ +# type: ignore + +import sys + +import pytest + +if sys.version_info[:2] >= (3, 9): + # Test schema generation + import json + import os + + from event_model import SCHEMA_PATH + from event_model.documents import ( + Datum, + DatumPage, + Event, + EventDescriptor, + EventPage, + Resource, + RunStart, + RunStop, + StreamDatum, + StreamResource, + ) + from event_model.documents.generate.typeddict_to_schema import ( + parse_typeddict_to_schema, + ) + + typed_dict_class_list = [ + DatumPage, + Datum, + EventDescriptor, + EventPage, + Event, + Resource, + RunStart, + RunStop, + StreamDatum, + StreamResource, + ] + + SCHEMA_PATH = "event_model/" + SCHEMA_PATH + + @pytest.mark.parametrize("typed_dict_class", typed_dict_class_list) + def test_generated_json_matches_typed_dict(typed_dict_class, tmpdir): + parse_typeddict_to_schema(typed_dict_class, out_dir=tmpdir) + file_name = os.listdir(tmpdir)[0] + generated_file_path = os.path.join(tmpdir, file_name) + old_file_path = os.path.join(SCHEMA_PATH, file_name) + + with open(generated_file_path) as generated_file, open( + old_file_path + ) as old_file: + try: + assert json.load(generated_file) == json.load(old_file) + except AssertionError: + raise Exception( + f"`{typed_dict_class.__name__}` can generate a json schema, but " + f"it doesn't match the schema in `{SCHEMA_PATH}`. Did you forget " + "to run `python event_model/typeddict_to_schema.py` after changes " + f"to `{typed_dict_class.__name__}`?" + ) + +else: + # Test an error is thrown for pyton <= 3.8 + def test_schema_generation_import_throws_error(): + with pytest.raises(EnvironmentError): + from event_model.documents.generate import typeddict_to_schema + + typeddict_to_schema diff --git a/pyproject.toml b/pyproject.toml index f052dc5e..48a3dcb6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,8 @@ dependencies = [ "importlib-metadata", "jsonschema", "numpy", - "packaging" + "packaging", + "typing_extensions" ] dynamic = ["version"] license.file = "LICENSE" @@ -48,6 +49,9 @@ dev = [ "ipython", "matplotlib", "numpydoc", + + # For schema generation. + "pydantic", ] [project.scripts]