From 459d8cb7ebec71debc416fd7bc486f53cd236448 Mon Sep 17 00:00:00 2001 From: maffettone Date: Wed, 30 Mar 2022 16:43:08 -0400 Subject: [PATCH 01/21] Doc typo --- docs/source/use-cases.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/use-cases.rst b/docs/source/use-cases.rst index a75cdc72..3eede931 100644 --- a/docs/source/use-cases.rst +++ b/docs/source/use-cases.rst @@ -3,7 +3,7 @@ Event Model Patterns ============================ When implementing a system with the Event Model for a particular use case (technique, scan type, etc.), many design choices can be made. For example: how many streams you define (through Event Descriptor documents), what events you put into those streams, and what how data points are stored in an event. Here, we present a use case and a potential design, discussing the pros and cons of different options. -To further complicate things, we can consider that a document stream might be be optimized for very different scenarios within the same technique. For example, for the sample scan of the same sample, a document stream might read as the scan as it is being run for the purpose of providing in-scan feedback to a user or beamline analysis tool. For the same scan, a docstream might be serialized into a Mongo database and read back out by analysis and visualiztion tools. In these different uses of data from the same scan, the level of granularity that one puts intto an Event Document might be very different. The streaming consumer might require a large number of very small granular events in order to quickly make decisions that affect the course of the scan. On the other hand, MongoDB document retrieval is much more efficient with a small number of larger documents, and a small number of events that each contain data from multiple time steps might be preferrable. +To further complicate things, we can consider that a document stream might be be optimized for very different scenarios within the same technique. For example, for the sample scan of the same sample, a document stream might read as the scan as it is being run for the purpose of providing in-scan feedback to a user or beamline analysis tool. For the same scan, a docstream might be serialized into a Mongo database and read back out by analysis and visualiztion tools. In these different uses of data from the same scan, the level of granularity that one puts into an Event Document might be very different. The streaming consumer might require a large number of very small granular events in order to quickly make decisions that affect the course of the scan. On the other hand, MongoDB document retrieval is much more efficient with a small number of larger documents, and a small number of events that each contain data from multiple time steps might be preferrable. Use Case - Tomography Tiling and MongoDB Serialization From bfe0558c5452f86aa590d8341aeed7ab0cc4b60d Mon Sep 17 00:00:00 2001 From: maffettone Date: Thu, 31 Mar 2022 14:08:07 -0400 Subject: [PATCH 02/21] Add stream resource json schema --- event_model/schemas/stream_resource.json | 55 ++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 event_model/schemas/stream_resource.json diff --git a/event_model/schemas/stream_resource.json b/event_model/schemas/stream_resource.json new file mode 100644 index 00000000..f7accbed --- /dev/null +++ b/event_model/schemas/stream_resource.json @@ -0,0 +1,55 @@ +{ + "properties": { + "spec": { + "type": "string", + "description": "String identifying the format/type of this Stream Resource, used to identify a compatible Handler" + }, + "resource_path": { + "type": "string", + "description": "Filepath or URI for locating this resource" + }, + "resource_kwargs": { + "type": "object", + "description": "Additional argument to pass to the Handler to read a Stream Resource" + }, + "root": { + "type": "string", + "description": "Subset of resource_path that is a local detail, not semantic." + }, + "path_semantics": { + "type": "string", + "description": "Rules for joining paths", + "enum": ["posix", "windows"] + }, + "uid": { + "type": "string", + "description": "Globally unique identifier for this Stream Resource" + }, + "run_start": { + "type": "string", + "description": "Globally unique ID to the run_start document this Stream Resource is associated with." + }, + "stream_names": { + "type": "array", + "items": { + "type": "string" + }, + "minItems": 1, + "uniqueItems": true, + "description": "List of the stream names this resource provides" + + } + }, + "required": [ + "spec", + "resource_path", + "resource_kwargs", + "root", + "uid", + "stream_names" + ], + "additionalProperties": false, + "type": "object", + "title": "stream_resource", + "description": "Document to reference a collection (e.g. file or group of files) of externally-stored data streams" +} \ No newline at end of file From 0300dff998a3a8039d81892cbb0d200e9cfa298c Mon Sep 17 00:00:00 2001 From: maffettone Date: Thu, 31 Mar 2022 14:20:54 -0400 Subject: [PATCH 03/21] Add stream datum json schema --- event_model/schemas/stream_datum.json | 40 +++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 event_model/schemas/stream_datum.json diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json new file mode 100644 index 00000000..dc8fc7ee --- /dev/null +++ b/event_model/schemas/stream_datum.json @@ -0,0 +1,40 @@ +{ + "properties": { + "datum_kwargs": { + "type": "object", + "description": "Arguments to pass to the Handler to retrieve one quanta of data" + }, + "stream_resource": { + "type": "string", + "description": "The UID of the Stream Resource to which this Datum belongs" + }, + "uid": { + "type": "string", + "description": "Globally unique identifier for this Datum. A suggested formatting being '//" + }, + "stream_name": { + "type": "string", + "description": "The name of the stream that this Datum is providing a block of." + }, + "block_id": { + "type": "int", + "description": "The order in the stream of this block of data. This must be contiguous for a given stream." + }, + "event_offset": { + "type": "int", + "description": "The sequence number of the first event in this block. This increasing value allows the presence of gaps." + } + + }, + "required": [ + "datum_kwargs", + "resource", + "uid", + "stream_name", + "block_id" + ], + "additionalProperties": false, + "type": "object", + "title": "stream_datum", + "description": "Document to reference a quanta of an externally-stored stream of data." +} From ebb35bb6ba3d0e93c683e63083543cd7a7013e62 Mon Sep 17 00:00:00 2001 From: maffettone Date: Thu, 31 Mar 2022 14:40:46 -0400 Subject: [PATCH 04/21] Extend DocumentNames and SCHEMA_NAMES --- event_model/__init__.py | 4 ++++ event_model/tests/test_em.py | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 457f09dc..9daa4c73 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -33,6 +33,8 @@ class DocumentNames(Enum): resource = 'resource' event_page = 'event_page' datum_page = 'datum_page' + stream_resource = 'stream_resource' + stream_datum = 'stream_datum' bulk_datum = 'bulk_datum' # deprecated bulk_events = 'bulk_events' # deprecated @@ -1523,6 +1525,8 @@ class MismatchedDataKeys(InvalidData): DocumentNames.datum: 'schemas/datum.json', DocumentNames.datum_page: 'schemas/datum_page.json', DocumentNames.resource: 'schemas/resource.json', + DocumentNames.stream_datum: 'schemas/stream_datum.json', + DocumentNames.stream_resource: 'schemas/stream_resource.json', # DEPRECATED: DocumentNames.bulk_events: 'schemas/bulk_events.json', DocumentNames.bulk_datum: 'schemas/bulk_datum.json'} diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 383e60d2..8e19f0a0 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -15,12 +15,13 @@ def test_documents(): dn = event_model.DocumentNames for k in ('stop', 'start', 'descriptor', 'event', 'bulk_events', 'datum', - 'resource', 'bulk_datum', 'event_page', 'datum_page'): + 'resource', 'bulk_datum', 'event_page', 'datum_page', + 'stream_resource', 'stream_datum'): assert dn(k) == getattr(dn, k) def test_len(): - assert 10 == len(event_model.DocumentNames) + assert 12 == len(event_model.DocumentNames) def test_schemas(): From 269d58fc77c5cb461298268475d3834caf8296fa Mon Sep 17 00:00:00 2001 From: maffettone Date: Thu, 31 Mar 2022 15:41:24 -0400 Subject: [PATCH 05/21] Composition of basic bundles without complete run bundle. This includes a substantial change to the previous ComposeRunBundle named tuple. It was changed to an extended data class following discussion here: https://github.com/bluesky/event-model/issues/219 It continues to pass all tests in this new form. --- event_model/__init__.py | 63 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 3 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 9daa4c73..51e27b7d 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1,5 +1,7 @@ from collections import defaultdict, deque, namedtuple import collections.abc +from dataclasses import dataclass +from typing import Union from distutils.version import LooseVersion import copy import json @@ -1567,13 +1569,28 @@ def _is_array(checker, instance): __version__ = get_versions()['version'] del get_versions -ComposeRunBundle = namedtuple('ComposeRunBundle', - 'start_doc compose_descriptor compose_resource ' - 'compose_stop') +@dataclass +class ComposeRunBundle: + """Extensible compose run bundle. This maintains backward compatibility by unpacking into a basic + run bundle (start, compose_descriptor, compose_resource, stop). Further extensions are optional and + require keyword referencing (i.e. compose_stream_resource). + """ + start_doc: dict + compose_descriptor: callable + compose_resource: callable + compose_stop: callable + compose_stream_resource: Union[callable, None] = None + + def __iter__(self): + return iter((self.start_doc, self.compose_descriptor, self.compose_resource, self.compose_stop)) + + ComposeDescriptorBundle = namedtuple('ComposeDescriptorBundle', 'descriptor_doc compose_event compose_event_page') ComposeResourceBundle = namedtuple('ComposeResourceBundle', 'resource_doc compose_datum compose_datum_page') +ComposeStreamResourceBundle = namedtuple('ComposeStreamResourceBundle', + 'stream_resource_doc compose_stream_datum') def compose_datum(*, resource, counter, datum_kwargs, validate=True): @@ -1624,6 +1641,46 @@ def compose_resource(*, spec, root, resource_path, resource_kwargs, partial(compose_datum_page, resource=doc, counter=counter)) +def compose_stream_datum(*, stream_resource, stream_name, counter, datum_kwargs, event_offset=0, validate=True): + resource_uid = stream_resource['uid'] + block_id = next(counter) + doc = dict(stream_resource=resource_uid, + datum_kwargs=datum_kwargs, + uid=f"{resource_uid}/{stream_name}/{block_id}", + stream_name=stream_name, + block_id=block_id, + event_offset=event_offset, + ) + if validate: + schema_validators[DocumentNames.stream_datum].validate(doc) + return doc + + +def compose_stream_resource(*, spec, root, resource_path, resource_kwargs, stream_names, counter=None, + path_semantics=default_path_semantics, start=None, uid=None, validate=True): + if uid is None: + uid = str(uuid.uuid4()) + if counter is None: + counter = itertools.count() + doc = dict(uid=uid, + spec=spec, + root=root, + resource_path=resource_path, + resource_kwargs=resource_kwargs, + stream_names=stream_names, + path_semantics=path_semantics) + if start: + doc["run_start"] = start["uid"] + + if validate: + schema_validators[DocumentNames.stream_resource].validate(doc) + + return ComposeStreamResourceBundle( + doc, + partial(compose_stream_datum, stream_resource=doc, counter=counter) + ) + + def compose_stop(*, start, event_counter, poison_pill, exit_status='success', reason='', uid=None, time=None, From 3975bdf36ac63423c6a89cad0464946867e11b0c Mon Sep 17 00:00:00 2001 From: maffettone Date: Thu, 31 Mar 2022 16:14:17 -0400 Subject: [PATCH 06/21] Set up pre-commit --- .gitignore | 5 ++++- event_model/__init__.py | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 52943ad4..7418a9a9 100644 --- a/.gitignore +++ b/.gitignore @@ -79,4 +79,7 @@ target/ .vscode/* #Ipython Notebook -.ipynb_checkpoints \ No newline at end of file +.ipynb_checkpoints + +# precomit +.pre-commit-config.yaml \ No newline at end of file diff --git a/event_model/__init__.py b/event_model/__init__.py index 51e27b7d..788641b4 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1569,6 +1569,7 @@ def _is_array(checker, instance): __version__ = get_versions()['version'] del get_versions + @dataclass class ComposeRunBundle: """Extensible compose run bundle. This maintains backward compatibility by unpacking into a basic From 2e96ae24a9c7e02b758b3954d2dd1cf8bad13355 Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 1 Apr 2022 10:00:36 -0400 Subject: [PATCH 07/21] Add compose_stream_resource kwarg to compose_run_bundle --- event_model/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 788641b4..4f54ab42 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1853,7 +1853,9 @@ def compose_run(*, uid=None, time=None, metadata=None, validate=True): event_counter=event_counter), partial(compose_resource, start=doc), partial(compose_stop, start=doc, event_counter=event_counter, - poison_pill=poison_pill)) + poison_pill=poison_pill), + compose_stream_resource=partial(compose_stream_resource, start=doc) + ) def pack_event_page(*events): From f1cd8aa4efdbb59e87ab70198325c34c27047233 Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 1 Apr 2022 10:07:03 -0400 Subject: [PATCH 08/21] Fix JSON typos --- event_model/schemas/stream_datum.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index dc8fc7ee..36ef9ebc 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -17,18 +17,18 @@ "description": "The name of the stream that this Datum is providing a block of." }, "block_id": { - "type": "int", + "type": "integer", "description": "The order in the stream of this block of data. This must be contiguous for a given stream." }, "event_offset": { - "type": "int", + "type": "integer", "description": "The sequence number of the first event in this block. This increasing value allows the presence of gaps." } }, "required": [ "datum_kwargs", - "resource", + "stream_resource", "uid", "stream_name", "block_id" From e03422749d0e1e283288aa8b49430c3193922ed7 Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 1 Apr 2022 12:53:12 -0400 Subject: [PATCH 09/21] Update stream resource to expect multiple streams. Include unit test for composing the stream resource. --- event_model/__init__.py | 16 +++++++++++----- event_model/tests/test_em.py | 23 +++++++++++++++++++++++ 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 4f54ab42..1a7e3031 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1591,7 +1591,7 @@ def __iter__(self): ComposeResourceBundle = namedtuple('ComposeResourceBundle', 'resource_doc compose_datum compose_datum_page') ComposeStreamResourceBundle = namedtuple('ComposeStreamResourceBundle', - 'stream_resource_doc compose_stream_datum') + 'stream_resource_doc compose_stream_data') def compose_datum(*, resource, counter, datum_kwargs, validate=True): @@ -1657,12 +1657,17 @@ def compose_stream_datum(*, stream_resource, stream_name, counter, datum_kwargs, return doc -def compose_stream_resource(*, spec, root, resource_path, resource_kwargs, stream_names, counter=None, +def compose_stream_resource(*, spec, root, resource_path, resource_kwargs, stream_names, counters=(), path_semantics=default_path_semantics, start=None, uid=None, validate=True): if uid is None: uid = str(uuid.uuid4()) - if counter is None: - counter = itertools.count() + if isinstance(stream_names, str): + stream_names = [stream_names, ] + if len(counters) == 0: + counters = [itertools.count() for _ in stream_names] + elif len(counters) > len(stream_names): + raise ValueError(f"Insufficient number of counters {len(counters)} for stream names: {stream_names}") + doc = dict(uid=uid, spec=spec, root=root, @@ -1678,7 +1683,8 @@ def compose_stream_resource(*, spec, root, resource_path, resource_kwargs, strea return ComposeStreamResourceBundle( doc, - partial(compose_stream_datum, stream_resource=doc, counter=counter) + [partial(compose_stream_datum, stream_resource=doc, stream_name=stream_name, counter=counter) for + stream_name, counter in zip(stream_names, counters)] ) diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 8e19f0a0..f7ff91c6 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -82,6 +82,29 @@ def test_compose_run(): assert stop_doc['num_events']['primary'] == 3 +def test_compose_stream_resource(): + """ + Following the example of test_compose_run, focus only on the stream resource and + datum functionality + """ + bundle = event_model.compose_run() + compose_stream_resource = bundle.compose_stream_resource + assert bundle.compose_stream_resource is compose_stream_resource + stream_names = ["stream_1", "stream_2"] + bundle = compose_stream_resource(spec="TIFF_STREAM", root="/tmp", resource_path="test_streams", + resource_kwargs={}, stream_names=stream_names) + resource_doc, compose_stream_data = bundle + assert bundle.stream_resource_doc is resource_doc + assert bundle.compose_stream_data is compose_stream_data + assert compose_stream_data[0] is not compose_stream_data[1] + datum_doc_0, datum_doc_1 = (compose_stream_datum(datum_kwargs={}) + for compose_stream_datum in compose_stream_data) + # Ensure independent counters + assert datum_doc_0['block_id'] == datum_doc_1["block_id"] + datum_doc_1a = compose_stream_data[1](datum_kwargs={}) + assert datum_doc_1a["block_id"] != datum_doc_1["block_id"] + + def test_round_trip_pagination(): run_bundle = event_model.compose_run() desc_bundle = run_bundle.compose_descriptor( From 9c6b8dad3ddbb92db47e12d6fd6a4a14fc281078 Mon Sep 17 00:00:00 2001 From: maffettone Date: Thu, 2 Jun 2022 09:37:30 -0400 Subject: [PATCH 10/21] Add Event count to stream --- event_model/__init__.py | 4 +++- event_model/schemas/stream_datum.json | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 1a7e3031..78219722 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1642,7 +1642,8 @@ def compose_resource(*, spec, root, resource_path, resource_kwargs, partial(compose_datum_page, resource=doc, counter=counter)) -def compose_stream_datum(*, stream_resource, stream_name, counter, datum_kwargs, event_offset=0, validate=True): +def compose_stream_datum(*, stream_resource, stream_name, counter, datum_kwargs, + event_count=1, event_offset=0, validate=True): resource_uid = stream_resource['uid'] block_id = next(counter) doc = dict(stream_resource=resource_uid, @@ -1650,6 +1651,7 @@ def compose_stream_datum(*, stream_resource, stream_name, counter, datum_kwargs, uid=f"{resource_uid}/{stream_name}/{block_id}", stream_name=stream_name, block_id=block_id, + event_count=event_count, event_offset=event_offset, ) if validate: diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index 36ef9ebc..f02cb1a8 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -20,6 +20,10 @@ "type": "integer", "description": "The order in the stream of this block of data. This must be contiguous for a given stream." }, + "event_count": { + "type": "integer", + "description": "The number of events in this datum." + }, "event_offset": { "type": "integer", "description": "The sequence number of the first event in this block. This increasing value allows the presence of gaps." From 1c26df6ea0ab87b04d8ff6f3e26b1961cd8db09c Mon Sep 17 00:00:00 2001 From: maffettone Date: Thu, 2 Jun 2022 10:55:25 -0400 Subject: [PATCH 11/21] Extend DocumentRouter, RunRouter, and Filler for new documents. For Filler (more extensive), added: - caches in __init__ - caches in __eq__ - caches in get and set state - calls for updating caches --- event_model/__init__.py | 64 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 2 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 78219722..37d83728 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -207,6 +207,12 @@ def event_page(self, doc): def datum_page(self, doc): return NotImplemented + def stream_datum(self, doc): + return NotImplemented + + def stream_resource(self, doc): + return NotImplemented + def bulk_events(self, doc): # Do not modify this in a subclass. Use event_page. warnings.warn( @@ -490,6 +496,10 @@ class Filler(DocumentRouter): A cache of Datum documents. If None, a dict is used. descriptor_cache : dict, optional A cache of EventDescriptor documents. If None, a dict is used. + stream_resource_cache : dict, optional + A cache of StreamResource documents. If None, a dict is used. + stream_datum_cache : dict, optional + A cache of StreamDatum documents. If None, a dict is used. retry_intervals : Iterable, optional If data is not found on the first try, there may a race between the I/O systems creating the external data and this stream of Documents @@ -530,7 +540,8 @@ class Filler(DocumentRouter): def __init__(self, handler_registry, *, include=None, exclude=None, root_map=None, coerce='as_is', handler_cache=None, resource_cache=None, datum_cache=None, - descriptor_cache=None, inplace=None, + descriptor_cache=None, stream_resource_cache=None, + stream_datum_cache=None, inplace=None, retry_intervals=(0.001, 0.002, 0.004, 0.008, 0.016, 0.032, 0.064, 0.128, 0.256, 0.512, 1.024)): if inplace is None: @@ -583,10 +594,16 @@ def __init__(self, handler_registry, *, datum_cache = self.get_default_datum_cache() if descriptor_cache is None: descriptor_cache = self.get_default_descriptor_cache() + if stream_resource_cache is None: + stream_resource_cache = self.get_default_stream_resource_cache() + if stream_datum_cache is None: + stream_datum_cache = self.get_default_stream_datum_cache() self._handler_cache = handler_cache self._resource_cache = resource_cache self._datum_cache = datum_cache self._descriptor_cache = descriptor_cache + self._stream_resource_cache = stream_resource_cache + self._stream_datum_cache = stream_datum_cache if retry_intervals is None: retry_intervals = [] self.retry_intervals = retry_intervals @@ -604,6 +621,8 @@ def __eq__(self, other): type(self._resource_cache) is type(other._resource_cache) and type(self._datum_cache) is type(other._datum_cache) and type(self._descriptor_cache) is type(other._descriptor_cache) and + type(self._stream_resource_cache) is type(other._stream_resource_cache) and + type(self._stream_datum_cache) is type(other._stream_datum_cache) and self.retry_intervals == other.retry_intervals ) @@ -619,6 +638,8 @@ def __getstate__(self): resource_cache=self._resource_cache, datum_cache=self._datum_cache, descriptor_cache=self._descriptor_cache, + stream_resource_cache=self._stream_resource_cache, + stream_datum_cache=self._stream_datum_cache, retry_intervals=self.retry_intervals) def __setstate__(self, d): @@ -641,6 +662,8 @@ def __setstate__(self, d): self._resource_cache = d['resource_cache'] self._datum_cache = d['datum_cache'] self._descriptor_cache = d['descriptor_cache'] + self._stream_resource_cache = d['stream_resource_cache'] + self._stream_datum_cache = d['stream_datum_cache'] retry_intervals = d['retry_intervals'] if retry_intervals is None: retry_intervals = [] @@ -674,6 +697,14 @@ def get_default_datum_cache(): def get_default_handler_cache(): return {} + @staticmethod + def get_default_stream_datum_cache(): + return {} + + @staticmethod + def get_default_stream_resource_cache(): + return {} + @property def inplace(self): return self._inplace @@ -681,7 +712,8 @@ def inplace(self): def clone(self, handler_registry=None, *, root_map=None, coerce=None, handler_cache=None, resource_cache=None, datum_cache=None, - descriptor_cache=None, inplace=None, + descriptor_cache=None, stream_resource_cache=None, + stream_datum_cache=None, inplace=None, retry_intervals=None): """ Create a new Filler instance from this one. @@ -708,6 +740,8 @@ def clone(self, handler_registry=None, *, resource_cache=resource_cache, datum_cache=datum_cache, descriptor_cache=descriptor_cache, + stream_resource_cache=stream_resource_cache, + stream_datum_cache=stream_datum_cache, inplace=inplace, retry_intervals=retry_intervals) @@ -792,6 +826,13 @@ def datum(self, doc): self._datum_cache[doc['datum_id']] = doc return doc + def stream_resource(self, doc): + self._stream_resource_cache[doc["uid"]] = doc + return doc + + def stream_datum(self, doc): + self._stream_datum_cache[doc["uid"]] = doc + def event_page(self, doc): # TODO We may be able to fill a page in place, and that may be more # efficient than unpacking the page in to Events, filling them, and the @@ -1272,6 +1313,7 @@ def __init__(self, factories, handler_registry=None, *, # Map Resource UID to RunStart UID. self._resources = {} + self._stream_resources = {} # Old-style Resources that do not have a RunStart UID self._unlabeled_resources = deque(maxlen=10000) @@ -1402,6 +1444,15 @@ def datum_page(self, doc): for callback in self._subfactory_cbs_by_start[start_uid]: callback('datum_page', doc) + def stream_datum(self, doc): + resource_uid = doc["stream_resource"] + start_uid = self._stream_resources[resource_uid] + self._fillers[start_uid].stream_datum(doc) + for callback in self._factory_cbs_by_start[start_uid]: + callback("stream_datum", doc) + for callback in self._subfactory_cbs_by_start[start_uid]: + callback("stream_datum", doc) + def resource(self, doc): try: start_uid = doc['run_start'] @@ -1427,6 +1478,15 @@ def resource(self, doc): for callback in self._subfactory_cbs_by_start[start_uid]: callback('resource', doc) + def stream_resource(self, doc): + start_uid = doc["run_start"] # No need for Try + self._fillers[start_uid].stream_resource(doc) + self._stream_resources[doc["uid"]] = doc["run_start"] + for callback in self._factory_cbs_by_start[start_uid]: + callback("stream_resource", doc) + for callback in self._subfactory_cbs_by_start[start_uid]: + callback("stream_resource", doc) + def stop(self, doc): start_uid = doc['run_start'] for callback in self._factory_cbs_by_start[start_uid]: From 5c8a629b0a9d97ec39432dbd94140da1988e2a5b Mon Sep 17 00:00:00 2001 From: "Dr. Phil Maffettone" <43007690+maffettone@users.noreply.github.com> Date: Thu, 2 Jun 2022 11:19:17 -0400 Subject: [PATCH 12/21] Require event_count in stream_datum Co-authored-by: Thomas A Caswell --- event_model/schemas/stream_datum.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index f02cb1a8..5aabab01 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -35,7 +35,8 @@ "stream_resource", "uid", "stream_name", - "block_id" + "block_id", + "event_count" ], "additionalProperties": false, "type": "object", From b6b8fe24627644907828d1eedf160ff69b20c0f3 Mon Sep 17 00:00:00 2001 From: "Dr. Phil Maffettone" <43007690+maffettone@users.noreply.github.com> Date: Thu, 2 Jun 2022 11:19:54 -0400 Subject: [PATCH 13/21] Cleaner typing Co-authored-by: Thomas A Caswell --- event_model/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 37d83728..610198a0 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1,7 +1,7 @@ from collections import defaultdict, deque, namedtuple import collections.abc from dataclasses import dataclass -from typing import Union +from typing import Optional from distutils.version import LooseVersion import copy import json From 54d11a389d4add05a22b544b67a74b76799e4269 Mon Sep 17 00:00:00 2001 From: "Dr. Phil Maffettone" <43007690+maffettone@users.noreply.github.com> Date: Thu, 2 Jun 2022 11:20:01 -0400 Subject: [PATCH 14/21] Cleaner typing Co-authored-by: Thomas A Caswell --- event_model/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 610198a0..6210941c 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1640,7 +1640,7 @@ class ComposeRunBundle: compose_descriptor: callable compose_resource: callable compose_stop: callable - compose_stream_resource: Union[callable, None] = None + compose_stream_resource: Optional[callable] = None def __iter__(self): return iter((self.start_doc, self.compose_descriptor, self.compose_resource, self.compose_stop)) From 9082b82c92e0c7833555dadb12c3efaf17b478fe Mon Sep 17 00:00:00 2001 From: maffettone Date: Thu, 2 Jun 2022 12:11:47 -0400 Subject: [PATCH 15/21] Sanity test to ensure new stream_datum name is included in stream_resource --- event_model/__init__.py | 2 ++ event_model/tests/test_em.py | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/event_model/__init__.py b/event_model/__init__.py index 37d83728..879cacc4 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1705,6 +1705,8 @@ def compose_resource(*, spec, root, resource_path, resource_kwargs, def compose_stream_datum(*, stream_resource, stream_name, counter, datum_kwargs, event_count=1, event_offset=0, validate=True): resource_uid = stream_resource['uid'] + if stream_name not in stream_resource["stream_names"]: + raise EventModelKeyError("Attempt to create stream_datum with name not included in stream_resource") block_id = next(counter) doc = dict(stream_resource=resource_uid, datum_kwargs=datum_kwargs, diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index f7ff91c6..2e58bd2d 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -104,6 +104,12 @@ def test_compose_stream_resource(): datum_doc_1a = compose_stream_data[1](datum_kwargs={}) assert datum_doc_1a["block_id"] != datum_doc_1["block_id"] + # Ensure safety check + from itertools import count + with pytest.raises(KeyError): + event_model.compose_stream_datum(stream_resource=resource_doc, stream_name="stream_NA", + counter=count(), datum_kwargs={}) + def test_round_trip_pagination(): run_bundle = event_model.compose_run() From 1bb8681d3cb2448b00f5b4c1070b7b02ab60c5b4 Mon Sep 17 00:00:00 2001 From: maffettone Date: Thu, 2 Jun 2022 12:29:50 -0400 Subject: [PATCH 16/21] Use block_idx instead of block_id IDX infers an integer ID could really be anything (uuid, string name, etc) --- event_model/__init__.py | 6 +++--- event_model/schemas/stream_datum.json | 4 ++-- event_model/tests/test_em.py | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 1cadd213..b610ed0c 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1707,12 +1707,12 @@ def compose_stream_datum(*, stream_resource, stream_name, counter, datum_kwargs, resource_uid = stream_resource['uid'] if stream_name not in stream_resource["stream_names"]: raise EventModelKeyError("Attempt to create stream_datum with name not included in stream_resource") - block_id = next(counter) + block_idx = next(counter) doc = dict(stream_resource=resource_uid, datum_kwargs=datum_kwargs, - uid=f"{resource_uid}/{stream_name}/{block_id}", + uid=f"{resource_uid}/{stream_name}/{block_idx}", stream_name=stream_name, - block_id=block_id, + block_idx=block_idx, event_count=event_count, event_offset=event_offset, ) diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index 5aabab01..1c5b30cd 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -16,7 +16,7 @@ "type": "string", "description": "The name of the stream that this Datum is providing a block of." }, - "block_id": { + "block_idx": { "type": "integer", "description": "The order in the stream of this block of data. This must be contiguous for a given stream." }, @@ -35,7 +35,7 @@ "stream_resource", "uid", "stream_name", - "block_id", + "block_idx", "event_count" ], "additionalProperties": false, diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 2e58bd2d..0343902e 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -100,9 +100,9 @@ def test_compose_stream_resource(): datum_doc_0, datum_doc_1 = (compose_stream_datum(datum_kwargs={}) for compose_stream_datum in compose_stream_data) # Ensure independent counters - assert datum_doc_0['block_id'] == datum_doc_1["block_id"] + assert datum_doc_0['block_idx'] == datum_doc_1["block_idx"] datum_doc_1a = compose_stream_data[1](datum_kwargs={}) - assert datum_doc_1a["block_id"] != datum_doc_1["block_id"] + assert datum_doc_1a["block_idx"] != datum_doc_1["block_idx"] # Ensure safety check from itertools import count From 5cb52f270b19ec02fc4bb73e95f454ec6fff6549 Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 3 Jun 2022 08:38:10 -0400 Subject: [PATCH 17/21] Add run router unit test test_document_router_streams_smoke_test --- event_model/tests/test_em.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 0343902e..6d6bf7a9 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -336,6 +336,24 @@ def test_document_router_smoke_test(): dr('stop', run_bundle.compose_stop()) +def test_document_router_streams_smoke_test(): + dr = event_model.DocumentRouter() + run_bundle = event_model.compose_run() + compose_stream_resource = run_bundle.compose_stream_resource + start = run_bundle.start_doc + dr("start", start) + stream_names = ["stream_1", "stream_2"] + stream_resource_doc, compose_stream_data = \ + compose_stream_resource(spec="TIFF_STREAM", root="/tmp", resource_path="test_streams", + resource_kwargs={}, stream_names=stream_names) + dr("stream_resource", stream_resource_doc) + datum_doc_0, datum_doc_1 = (compose_stream_datum(datum_kwargs={}) + for compose_stream_datum in compose_stream_data) + dr("stream_datum", datum_doc_0) + dr("stream_datum", datum_doc_1) + dr('stop', run_bundle.compose_stop()) + + def test_document_router_with_validation(): dr = event_model.DocumentRouter() run_bundle = event_model.compose_run() From 9f2020822c47b1e87435e6a05c5f9e22b751386c Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 3 Jun 2022 08:43:30 -0400 Subject: [PATCH 18/21] Use tmp_path in tests --- event_model/tests/test_em.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 6d6bf7a9..753aa7ee 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -82,7 +82,7 @@ def test_compose_run(): assert stop_doc['num_events']['primary'] == 3 -def test_compose_stream_resource(): +def test_compose_stream_resource(tmp_path): """ Following the example of test_compose_run, focus only on the stream resource and datum functionality @@ -91,7 +91,7 @@ def test_compose_stream_resource(): compose_stream_resource = bundle.compose_stream_resource assert bundle.compose_stream_resource is compose_stream_resource stream_names = ["stream_1", "stream_2"] - bundle = compose_stream_resource(spec="TIFF_STREAM", root="/tmp", resource_path="test_streams", + bundle = compose_stream_resource(spec="TIFF_STREAM", root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, stream_names=stream_names) resource_doc, compose_stream_data = bundle assert bundle.stream_resource_doc is resource_doc @@ -336,7 +336,7 @@ def test_document_router_smoke_test(): dr('stop', run_bundle.compose_stop()) -def test_document_router_streams_smoke_test(): +def test_document_router_streams_smoke_test(tmp_path): dr = event_model.DocumentRouter() run_bundle = event_model.compose_run() compose_stream_resource = run_bundle.compose_stream_resource @@ -344,7 +344,7 @@ def test_document_router_streams_smoke_test(): dr("start", start) stream_names = ["stream_1", "stream_2"] stream_resource_doc, compose_stream_data = \ - compose_stream_resource(spec="TIFF_STREAM", root="/tmp", resource_path="test_streams", + compose_stream_resource(spec="TIFF_STREAM", root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, stream_names=stream_names) dr("stream_resource", stream_resource_doc) datum_doc_0, datum_doc_1 = (compose_stream_datum(datum_kwargs={}) From 937bb9d36772b73458a97cd8eabf2fca402d745b Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 3 Jun 2022 08:56:51 -0400 Subject: [PATCH 19/21] Test run router for stream datum and resource test_run_router_streams --- event_model/tests/test_run_router.py | 42 ++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/event_model/tests/test_run_router.py b/event_model/tests/test_run_router.py index e6c16e4f..3a3171ea 100644 --- a/event_model/tests/test_run_router.py +++ b/event_model/tests/test_run_router.py @@ -196,6 +196,48 @@ def check_not_filled_factory(name, doc): rr(name, doc) +def test_run_router_streams(tmp_path): + bundle = event_model.compose_run() + docs = [] + start_doc, compose_stream_resource, stop_doc = \ + bundle.start_doc, bundle.compose_stream_resource, bundle.compose_stop() + docs.append(("start", start_doc)) + stream_names = ["stream_1", "stream_2"] + stream_resource_doc, compose_stream_data = \ + compose_stream_resource(spec="TIFF_STREAM", root=str(tmp_path), resource_path="test_streams", + resource_kwargs={}, stream_names=stream_names) + docs.append(("stream_resource", stream_resource_doc)) + datum_doc_0, datum_doc_1 = (compose_stream_datum(datum_kwargs={}) + for compose_stream_datum in compose_stream_data) + docs.append(("stream_datum", datum_doc_0)) + docs.append(("stream_datum", datum_doc_1)) + docs.append(("stop", stop_doc)) + + # Empty list of factories. Just make sure nothing blows up. + rr = event_model.RunRouter([]) + for name, doc in docs: + rr(name, doc) + + # Collector factory for stream resource and data + resource_list = [] + data_list = [] + + def collector(name, doc): + if name == "stream_resource": + resource_list.append((name, doc)) + elif name == "stream_datum": + data_list.append((name, doc)) + + def all_factory(name, doc): + return [collector], [] + + rr = event_model.RunRouter([all_factory]) + for name, doc in docs: + rr(name, doc) + assert len(resource_list) == 1 + assert len(data_list) == 2 + + def test_subfactory(): # this test targeted the bug described in issue #170 factory_documents = defaultdict(list) From 30514e8e25ebebc41bfb16a399ebc80bc3563b22 Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 3 Jun 2022 10:57:56 -0400 Subject: [PATCH 20/21] Changes to data-model docs. --- docs/source/data-model.rst | 90 +++++++++++++++++++++++++++++++++++++- 1 file changed, 88 insertions(+), 2 deletions(-) diff --git a/docs/source/data-model.rst b/docs/source/data-model.rst index 873c2c2e..3b8dc1bf 100644 --- a/docs/source/data-model.rst +++ b/docs/source/data-model.rst @@ -24,7 +24,7 @@ with useful components. Overview ======== -The data model is composed of six types of Documents, which in Python are +The data model is composed of eight types of Documents, which in Python are represented as dictionaries but could be represented as nested mappings (e.g. JSON) in any language. Each document class has a defined, but flexible, schema. @@ -35,8 +35,13 @@ JSON) in any language. Each document class has a defined, but flexible, schema. * Event Descriptor --- Metadata about a series of Events. Envision richly-detail column headings in a table, encompassing physical units, hardware configuration information, etc. -* Resource --- A pointer to an external file (or resource in general). +* Resource --- A pointer to an external file (or resource in general) that has + predictable and fixed dimensionality. * Datum --- A pointer to a specific slice of data within a Resource. +* Stream Resource (Experimental) --- A pointer to an external resource that contains a stream + of data without restriction on the length of the stream. This resources with + know 'column' dimension without a known number of rows (e.g., time series, point detectors). +* Stream Datum (Experimental) --- A pointer to a specific slice of data within a Stream Resource. * Run Stop Document --- Everything that we can only know at the very end, such as the time it ended and the exit status (succeeded, aborted, failed due to error). @@ -463,6 +468,87 @@ Formal Datum Page schema: .. literalinclude:: ../../event_model/schemas/datum_page.json +.. _stream_resource: + +Stream Resource Document (Experimental) +--------------------------------------- + +See :doc:`external` for details on the role Stream Resource documents play in +referencing external assets that are natively ragged, such as single-photon detectors, +or assets where there are many relatively small data sets (e.g. scanned fluorescence data). + +Minimal nontrivial valid example: + +.. code-block:: python + + # 'Stream Resource' document + {'path_semantics': 'posix', + 'resource_kwargs': {}, + 'resource_path': '/local/path/subdirectory/data_file', + 'root': '/local/path/', + 'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd', + 'spec': 'SOME_SPEC', + 'stream_names': ['point_det'], + 'uid': '272132cf-564f-428f-bf6b-149ee4287024'} + +Typical example: + +.. code-block:: python + + # resource + {'spec': 'AD_HDF5', + 'root': '/GPFS/DATA/Andor/', + 'resource_path': '2020/01/03/8ff08ff9-a2bf-48c3-8ff3-dcac0f309d7d.h5', + 'resource_kwargs': {'frame_per_point': 1}, + 'path_semantics': 'posix', + 'stream_names': ['point_det'], + 'uid': '3b300e6f-b431-4750-a635-5630d15c81a8', + 'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd'} + +Formal schema: + +.. literalinclude:: ../../event_model/schemas/stream_resource.json + +.. _stream_datum: + +Stream Datum Document +--------------------- + +See :doc:`external` for details on the role Stream Datum documents play in referencing +external assets that are natively ragged, such as single-photon detectors, +or assets where there are many relatively small data sets (e.g. scanned fluorescence data). + +Minimal nontrivial valid example: + +.. code-block:: python + + # 'datum' document + {'resource': '272132cf-564f-428f-bf6b-149ee4287024', # foreign key + 'datum_kwargs': {}, # format-specific parameters + 'datum_id': '272132cf-564f-428f-bf6b-149ee4287024/1', + 'block_idx': 0, + 'event_count': 1 + } + +Typical example: + +.. code-block:: python + + # datum + {'resource': '3b300e6f-b431-4750-a635-5630d15c81a8', + 'datum_kwargs': {'index': 3}, + 'datum_id': '3b300e6f-b431-4750-a635-5630d15c81a8/3', + 'block_idx': 0, + 'event_count': 5, + 'event_offset': 14} + +It is an implementation detail that ``datum_id`` is often formatted as +``{resource}/{counter}`` but this should not be considered part of the schema. + +Formal schema: + +.. literalinclude:: ../../event_model/schemas/stream_datum.json + .. _bulk_events: "Bulk Events" Document (DEPRECATED) From 541e9143260bc7f1c08ef522e7b1e8a59a3220d5 Mon Sep 17 00:00:00 2001 From: "Dr. Phil Maffettone" <43007690+maffettone@users.noreply.github.com> Date: Mon, 6 Jun 2022 09:36:30 -0400 Subject: [PATCH 21/21] Update event_model/schemas/stream_datum.json Co-authored-by: Thomas A Caswell --- event_model/schemas/stream_datum.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index 1c5b30cd..e697b3bf 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -36,7 +36,8 @@ "uid", "stream_name", "block_idx", - "event_count" + "event_count", + "event_offset" ], "additionalProperties": false, "type": "object",