diff --git a/.gitignore b/.gitignore index 52943ad4..7418a9a9 100644 --- a/.gitignore +++ b/.gitignore @@ -79,4 +79,7 @@ target/ .vscode/* #Ipython Notebook -.ipynb_checkpoints \ No newline at end of file +.ipynb_checkpoints + +# precomit +.pre-commit-config.yaml \ No newline at end of file diff --git a/docs/source/data-model.rst b/docs/source/data-model.rst index 873c2c2e..3b8dc1bf 100644 --- a/docs/source/data-model.rst +++ b/docs/source/data-model.rst @@ -24,7 +24,7 @@ with useful components. Overview ======== -The data model is composed of six types of Documents, which in Python are +The data model is composed of eight types of Documents, which in Python are represented as dictionaries but could be represented as nested mappings (e.g. JSON) in any language. Each document class has a defined, but flexible, schema. @@ -35,8 +35,13 @@ JSON) in any language. Each document class has a defined, but flexible, schema. * Event Descriptor --- Metadata about a series of Events. Envision richly-detail column headings in a table, encompassing physical units, hardware configuration information, etc. -* Resource --- A pointer to an external file (or resource in general). +* Resource --- A pointer to an external file (or resource in general) that has + predictable and fixed dimensionality. * Datum --- A pointer to a specific slice of data within a Resource. +* Stream Resource (Experimental) --- A pointer to an external resource that contains a stream + of data without restriction on the length of the stream. This resources with + know 'column' dimension without a known number of rows (e.g., time series, point detectors). +* Stream Datum (Experimental) --- A pointer to a specific slice of data within a Stream Resource. * Run Stop Document --- Everything that we can only know at the very end, such as the time it ended and the exit status (succeeded, aborted, failed due to error). @@ -463,6 +468,87 @@ Formal Datum Page schema: .. literalinclude:: ../../event_model/schemas/datum_page.json +.. _stream_resource: + +Stream Resource Document (Experimental) +--------------------------------------- + +See :doc:`external` for details on the role Stream Resource documents play in +referencing external assets that are natively ragged, such as single-photon detectors, +or assets where there are many relatively small data sets (e.g. scanned fluorescence data). + +Minimal nontrivial valid example: + +.. code-block:: python + + # 'Stream Resource' document + {'path_semantics': 'posix', + 'resource_kwargs': {}, + 'resource_path': '/local/path/subdirectory/data_file', + 'root': '/local/path/', + 'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd', + 'spec': 'SOME_SPEC', + 'stream_names': ['point_det'], + 'uid': '272132cf-564f-428f-bf6b-149ee4287024'} + +Typical example: + +.. code-block:: python + + # resource + {'spec': 'AD_HDF5', + 'root': '/GPFS/DATA/Andor/', + 'resource_path': '2020/01/03/8ff08ff9-a2bf-48c3-8ff3-dcac0f309d7d.h5', + 'resource_kwargs': {'frame_per_point': 1}, + 'path_semantics': 'posix', + 'stream_names': ['point_det'], + 'uid': '3b300e6f-b431-4750-a635-5630d15c81a8', + 'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd'} + +Formal schema: + +.. literalinclude:: ../../event_model/schemas/stream_resource.json + +.. _stream_datum: + +Stream Datum Document +--------------------- + +See :doc:`external` for details on the role Stream Datum documents play in referencing +external assets that are natively ragged, such as single-photon detectors, +or assets where there are many relatively small data sets (e.g. scanned fluorescence data). + +Minimal nontrivial valid example: + +.. code-block:: python + + # 'datum' document + {'resource': '272132cf-564f-428f-bf6b-149ee4287024', # foreign key + 'datum_kwargs': {}, # format-specific parameters + 'datum_id': '272132cf-564f-428f-bf6b-149ee4287024/1', + 'block_idx': 0, + 'event_count': 1 + } + +Typical example: + +.. code-block:: python + + # datum + {'resource': '3b300e6f-b431-4750-a635-5630d15c81a8', + 'datum_kwargs': {'index': 3}, + 'datum_id': '3b300e6f-b431-4750-a635-5630d15c81a8/3', + 'block_idx': 0, + 'event_count': 5, + 'event_offset': 14} + +It is an implementation detail that ``datum_id`` is often formatted as +``{resource}/{counter}`` but this should not be considered part of the schema. + +Formal schema: + +.. literalinclude:: ../../event_model/schemas/stream_datum.json + .. _bulk_events: "Bulk Events" Document (DEPRECATED) diff --git a/docs/source/use-cases.rst b/docs/source/use-cases.rst index a75cdc72..3eede931 100644 --- a/docs/source/use-cases.rst +++ b/docs/source/use-cases.rst @@ -3,7 +3,7 @@ Event Model Patterns ============================ When implementing a system with the Event Model for a particular use case (technique, scan type, etc.), many design choices can be made. For example: how many streams you define (through Event Descriptor documents), what events you put into those streams, and what how data points are stored in an event. Here, we present a use case and a potential design, discussing the pros and cons of different options. -To further complicate things, we can consider that a document stream might be be optimized for very different scenarios within the same technique. For example, for the sample scan of the same sample, a document stream might read as the scan as it is being run for the purpose of providing in-scan feedback to a user or beamline analysis tool. For the same scan, a docstream might be serialized into a Mongo database and read back out by analysis and visualiztion tools. In these different uses of data from the same scan, the level of granularity that one puts intto an Event Document might be very different. The streaming consumer might require a large number of very small granular events in order to quickly make decisions that affect the course of the scan. On the other hand, MongoDB document retrieval is much more efficient with a small number of larger documents, and a small number of events that each contain data from multiple time steps might be preferrable. +To further complicate things, we can consider that a document stream might be be optimized for very different scenarios within the same technique. For example, for the sample scan of the same sample, a document stream might read as the scan as it is being run for the purpose of providing in-scan feedback to a user or beamline analysis tool. For the same scan, a docstream might be serialized into a Mongo database and read back out by analysis and visualiztion tools. In these different uses of data from the same scan, the level of granularity that one puts into an Event Document might be very different. The streaming consumer might require a large number of very small granular events in order to quickly make decisions that affect the course of the scan. On the other hand, MongoDB document retrieval is much more efficient with a small number of larger documents, and a small number of events that each contain data from multiple time steps might be preferrable. Use Case - Tomography Tiling and MongoDB Serialization diff --git a/event_model/__init__.py b/event_model/__init__.py index dcc4389c..4ea45ce7 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1,5 +1,7 @@ from collections import defaultdict, deque, namedtuple import collections.abc +from dataclasses import dataclass +from typing import Optional from distutils.version import LooseVersion import copy import json @@ -34,6 +36,8 @@ class DocumentNames(Enum): resource = 'resource' event_page = 'event_page' datum_page = 'datum_page' + stream_resource = 'stream_resource' + stream_datum = 'stream_datum' bulk_datum = 'bulk_datum' # deprecated bulk_events = 'bulk_events' # deprecated @@ -204,6 +208,12 @@ def event_page(self, doc): def datum_page(self, doc): return NotImplemented + def stream_datum(self, doc): + return NotImplemented + + def stream_resource(self, doc): + return NotImplemented + def bulk_events(self, doc): # Do not modify this in a subclass. Use event_page. warnings.warn( @@ -487,6 +497,10 @@ class Filler(DocumentRouter): A cache of Datum documents. If None, a dict is used. descriptor_cache : dict, optional A cache of EventDescriptor documents. If None, a dict is used. + stream_resource_cache : dict, optional + A cache of StreamResource documents. If None, a dict is used. + stream_datum_cache : dict, optional + A cache of StreamDatum documents. If None, a dict is used. retry_intervals : Iterable, optional If data is not found on the first try, there may a race between the I/O systems creating the external data and this stream of Documents @@ -527,7 +541,8 @@ class Filler(DocumentRouter): def __init__(self, handler_registry, *, include=None, exclude=None, root_map=None, coerce='as_is', handler_cache=None, resource_cache=None, datum_cache=None, - descriptor_cache=None, inplace=None, + descriptor_cache=None, stream_resource_cache=None, + stream_datum_cache=None, inplace=None, retry_intervals=(0.001, 0.002, 0.004, 0.008, 0.016, 0.032, 0.064, 0.128, 0.256, 0.512, 1.024)): if inplace is None: @@ -580,10 +595,16 @@ def __init__(self, handler_registry, *, datum_cache = self.get_default_datum_cache() if descriptor_cache is None: descriptor_cache = self.get_default_descriptor_cache() + if stream_resource_cache is None: + stream_resource_cache = self.get_default_stream_resource_cache() + if stream_datum_cache is None: + stream_datum_cache = self.get_default_stream_datum_cache() self._handler_cache = handler_cache self._resource_cache = resource_cache self._datum_cache = datum_cache self._descriptor_cache = descriptor_cache + self._stream_resource_cache = stream_resource_cache + self._stream_datum_cache = stream_datum_cache if retry_intervals is None: retry_intervals = [] self.retry_intervals = retry_intervals @@ -601,6 +622,8 @@ def __eq__(self, other): type(self._resource_cache) is type(other._resource_cache) and type(self._datum_cache) is type(other._datum_cache) and type(self._descriptor_cache) is type(other._descriptor_cache) and + type(self._stream_resource_cache) is type(other._stream_resource_cache) and + type(self._stream_datum_cache) is type(other._stream_datum_cache) and self.retry_intervals == other.retry_intervals ) @@ -616,6 +639,8 @@ def __getstate__(self): resource_cache=self._resource_cache, datum_cache=self._datum_cache, descriptor_cache=self._descriptor_cache, + stream_resource_cache=self._stream_resource_cache, + stream_datum_cache=self._stream_datum_cache, retry_intervals=self.retry_intervals) def __setstate__(self, d): @@ -638,6 +663,8 @@ def __setstate__(self, d): self._resource_cache = d['resource_cache'] self._datum_cache = d['datum_cache'] self._descriptor_cache = d['descriptor_cache'] + self._stream_resource_cache = d['stream_resource_cache'] + self._stream_datum_cache = d['stream_datum_cache'] retry_intervals = d['retry_intervals'] if retry_intervals is None: retry_intervals = [] @@ -671,6 +698,14 @@ def get_default_datum_cache(): def get_default_handler_cache(): return {} + @staticmethod + def get_default_stream_datum_cache(): + return {} + + @staticmethod + def get_default_stream_resource_cache(): + return {} + @property def inplace(self): return self._inplace @@ -678,7 +713,8 @@ def inplace(self): def clone(self, handler_registry=None, *, root_map=None, coerce=None, handler_cache=None, resource_cache=None, datum_cache=None, - descriptor_cache=None, inplace=None, + descriptor_cache=None, stream_resource_cache=None, + stream_datum_cache=None, inplace=None, retry_intervals=None): """ Create a new Filler instance from this one. @@ -705,6 +741,8 @@ def clone(self, handler_registry=None, *, resource_cache=resource_cache, datum_cache=datum_cache, descriptor_cache=descriptor_cache, + stream_resource_cache=stream_resource_cache, + stream_datum_cache=stream_datum_cache, inplace=inplace, retry_intervals=retry_intervals) @@ -789,6 +827,13 @@ def datum(self, doc): self._datum_cache[doc['datum_id']] = doc return doc + def stream_resource(self, doc): + self._stream_resource_cache[doc["uid"]] = doc + return doc + + def stream_datum(self, doc): + self._stream_datum_cache[doc["uid"]] = doc + def event_page(self, doc): # TODO We may be able to fill a page in place, and that may be more # efficient than unpacking the page in to Events, filling them, and the @@ -1269,6 +1314,7 @@ def __init__(self, factories, handler_registry=None, *, # Map Resource UID to RunStart UID. self._resources = {} + self._stream_resources = {} # Old-style Resources that do not have a RunStart UID self._unlabeled_resources = deque(maxlen=10000) @@ -1399,6 +1445,15 @@ def datum_page(self, doc): for callback in self._subfactory_cbs_by_start[start_uid]: callback('datum_page', doc) + def stream_datum(self, doc): + resource_uid = doc["stream_resource"] + start_uid = self._stream_resources[resource_uid] + self._fillers[start_uid].stream_datum(doc) + for callback in self._factory_cbs_by_start[start_uid]: + callback("stream_datum", doc) + for callback in self._subfactory_cbs_by_start[start_uid]: + callback("stream_datum", doc) + def resource(self, doc): try: start_uid = doc['run_start'] @@ -1424,6 +1479,15 @@ def resource(self, doc): for callback in self._subfactory_cbs_by_start[start_uid]: callback('resource', doc) + def stream_resource(self, doc): + start_uid = doc["run_start"] # No need for Try + self._fillers[start_uid].stream_resource(doc) + self._stream_resources[doc["uid"]] = doc["run_start"] + for callback in self._factory_cbs_by_start[start_uid]: + callback("stream_resource", doc) + for callback in self._subfactory_cbs_by_start[start_uid]: + callback("stream_resource", doc) + def stop(self, doc): start_uid = doc['run_start'] for callback in self._factory_cbs_by_start[start_uid]: @@ -1524,6 +1588,8 @@ class MismatchedDataKeys(InvalidData): DocumentNames.datum: 'schemas/datum.json', DocumentNames.datum_page: 'schemas/datum_page.json', DocumentNames.resource: 'schemas/resource.json', + DocumentNames.stream_datum: 'schemas/stream_datum.json', + DocumentNames.stream_resource: 'schemas/stream_resource.json', # DEPRECATED: DocumentNames.bulk_events: 'schemas/bulk_events.json', DocumentNames.bulk_datum: 'schemas/bulk_datum.json'} @@ -1563,13 +1629,29 @@ def _is_array(checker, instance): __version__ = get_versions()['version'] del get_versions -ComposeRunBundle = namedtuple('ComposeRunBundle', - 'start_doc compose_descriptor compose_resource ' - 'compose_stop') + +@dataclass +class ComposeRunBundle: + """Extensible compose run bundle. This maintains backward compatibility by unpacking into a basic + run bundle (start, compose_descriptor, compose_resource, stop). Further extensions are optional and + require keyword referencing (i.e. compose_stream_resource). + """ + start_doc: dict + compose_descriptor: callable + compose_resource: callable + compose_stop: callable + compose_stream_resource: Optional[callable] = None + + def __iter__(self): + return iter((self.start_doc, self.compose_descriptor, self.compose_resource, self.compose_stop)) + + ComposeDescriptorBundle = namedtuple('ComposeDescriptorBundle', 'descriptor_doc compose_event compose_event_page') ComposeResourceBundle = namedtuple('ComposeResourceBundle', 'resource_doc compose_datum compose_datum_page') +ComposeStreamResourceBundle = namedtuple('ComposeStreamResourceBundle', + 'stream_resource_doc compose_stream_data') def compose_datum(*, resource, counter, datum_kwargs, validate=True): @@ -1620,6 +1702,56 @@ def compose_resource(*, spec, root, resource_path, resource_kwargs, partial(compose_datum_page, resource=doc, counter=counter)) +def compose_stream_datum(*, stream_resource, stream_name, counter, datum_kwargs, + event_count=1, event_offset=0, validate=True): + resource_uid = stream_resource['uid'] + if stream_name not in stream_resource["stream_names"]: + raise EventModelKeyError("Attempt to create stream_datum with name not included in stream_resource") + block_idx = next(counter) + doc = dict(stream_resource=resource_uid, + datum_kwargs=datum_kwargs, + uid=f"{resource_uid}/{stream_name}/{block_idx}", + stream_name=stream_name, + block_idx=block_idx, + event_count=event_count, + event_offset=event_offset, + ) + if validate: + schema_validators[DocumentNames.stream_datum].validate(doc) + return doc + + +def compose_stream_resource(*, spec, root, resource_path, resource_kwargs, stream_names, counters=(), + path_semantics=default_path_semantics, start=None, uid=None, validate=True): + if uid is None: + uid = str(uuid.uuid4()) + if isinstance(stream_names, str): + stream_names = [stream_names, ] + if len(counters) == 0: + counters = [itertools.count() for _ in stream_names] + elif len(counters) > len(stream_names): + raise ValueError(f"Insufficient number of counters {len(counters)} for stream names: {stream_names}") + + doc = dict(uid=uid, + spec=spec, + root=root, + resource_path=resource_path, + resource_kwargs=resource_kwargs, + stream_names=stream_names, + path_semantics=path_semantics) + if start: + doc["run_start"] = start["uid"] + + if validate: + schema_validators[DocumentNames.stream_resource].validate(doc) + + return ComposeStreamResourceBundle( + doc, + [partial(compose_stream_datum, stream_resource=doc, stream_name=stream_name, counter=counter) for + stream_name, counter in zip(stream_names, counters)] + ) + + def compose_stop(*, start, event_counter, poison_pill, exit_status='success', reason='', uid=None, time=None, @@ -1791,7 +1923,9 @@ def compose_run(*, uid=None, time=None, metadata=None, validate=True): event_counter=event_counter), partial(compose_resource, start=doc), partial(compose_stop, start=doc, event_counter=event_counter, - poison_pill=poison_pill)) + poison_pill=poison_pill), + compose_stream_resource=partial(compose_stream_resource, start=doc) + ) def pack_event_page(*events): diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json new file mode 100644 index 00000000..e697b3bf --- /dev/null +++ b/event_model/schemas/stream_datum.json @@ -0,0 +1,46 @@ +{ + "properties": { + "datum_kwargs": { + "type": "object", + "description": "Arguments to pass to the Handler to retrieve one quanta of data" + }, + "stream_resource": { + "type": "string", + "description": "The UID of the Stream Resource to which this Datum belongs" + }, + "uid": { + "type": "string", + "description": "Globally unique identifier for this Datum. A suggested formatting being '//" + }, + "stream_name": { + "type": "string", + "description": "The name of the stream that this Datum is providing a block of." + }, + "block_idx": { + "type": "integer", + "description": "The order in the stream of this block of data. This must be contiguous for a given stream." + }, + "event_count": { + "type": "integer", + "description": "The number of events in this datum." + }, + "event_offset": { + "type": "integer", + "description": "The sequence number of the first event in this block. This increasing value allows the presence of gaps." + } + + }, + "required": [ + "datum_kwargs", + "stream_resource", + "uid", + "stream_name", + "block_idx", + "event_count", + "event_offset" + ], + "additionalProperties": false, + "type": "object", + "title": "stream_datum", + "description": "Document to reference a quanta of an externally-stored stream of data." +} diff --git a/event_model/schemas/stream_resource.json b/event_model/schemas/stream_resource.json new file mode 100644 index 00000000..f7accbed --- /dev/null +++ b/event_model/schemas/stream_resource.json @@ -0,0 +1,55 @@ +{ + "properties": { + "spec": { + "type": "string", + "description": "String identifying the format/type of this Stream Resource, used to identify a compatible Handler" + }, + "resource_path": { + "type": "string", + "description": "Filepath or URI for locating this resource" + }, + "resource_kwargs": { + "type": "object", + "description": "Additional argument to pass to the Handler to read a Stream Resource" + }, + "root": { + "type": "string", + "description": "Subset of resource_path that is a local detail, not semantic." + }, + "path_semantics": { + "type": "string", + "description": "Rules for joining paths", + "enum": ["posix", "windows"] + }, + "uid": { + "type": "string", + "description": "Globally unique identifier for this Stream Resource" + }, + "run_start": { + "type": "string", + "description": "Globally unique ID to the run_start document this Stream Resource is associated with." + }, + "stream_names": { + "type": "array", + "items": { + "type": "string" + }, + "minItems": 1, + "uniqueItems": true, + "description": "List of the stream names this resource provides" + + } + }, + "required": [ + "spec", + "resource_path", + "resource_kwargs", + "root", + "uid", + "stream_names" + ], + "additionalProperties": false, + "type": "object", + "title": "stream_resource", + "description": "Document to reference a collection (e.g. file or group of files) of externally-stored data streams" +} \ No newline at end of file diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 383e60d2..753aa7ee 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -15,12 +15,13 @@ def test_documents(): dn = event_model.DocumentNames for k in ('stop', 'start', 'descriptor', 'event', 'bulk_events', 'datum', - 'resource', 'bulk_datum', 'event_page', 'datum_page'): + 'resource', 'bulk_datum', 'event_page', 'datum_page', + 'stream_resource', 'stream_datum'): assert dn(k) == getattr(dn, k) def test_len(): - assert 10 == len(event_model.DocumentNames) + assert 12 == len(event_model.DocumentNames) def test_schemas(): @@ -81,6 +82,35 @@ def test_compose_run(): assert stop_doc['num_events']['primary'] == 3 +def test_compose_stream_resource(tmp_path): + """ + Following the example of test_compose_run, focus only on the stream resource and + datum functionality + """ + bundle = event_model.compose_run() + compose_stream_resource = bundle.compose_stream_resource + assert bundle.compose_stream_resource is compose_stream_resource + stream_names = ["stream_1", "stream_2"] + bundle = compose_stream_resource(spec="TIFF_STREAM", root=str(tmp_path), resource_path="test_streams", + resource_kwargs={}, stream_names=stream_names) + resource_doc, compose_stream_data = bundle + assert bundle.stream_resource_doc is resource_doc + assert bundle.compose_stream_data is compose_stream_data + assert compose_stream_data[0] is not compose_stream_data[1] + datum_doc_0, datum_doc_1 = (compose_stream_datum(datum_kwargs={}) + for compose_stream_datum in compose_stream_data) + # Ensure independent counters + assert datum_doc_0['block_idx'] == datum_doc_1["block_idx"] + datum_doc_1a = compose_stream_data[1](datum_kwargs={}) + assert datum_doc_1a["block_idx"] != datum_doc_1["block_idx"] + + # Ensure safety check + from itertools import count + with pytest.raises(KeyError): + event_model.compose_stream_datum(stream_resource=resource_doc, stream_name="stream_NA", + counter=count(), datum_kwargs={}) + + def test_round_trip_pagination(): run_bundle = event_model.compose_run() desc_bundle = run_bundle.compose_descriptor( @@ -306,6 +336,24 @@ def test_document_router_smoke_test(): dr('stop', run_bundle.compose_stop()) +def test_document_router_streams_smoke_test(tmp_path): + dr = event_model.DocumentRouter() + run_bundle = event_model.compose_run() + compose_stream_resource = run_bundle.compose_stream_resource + start = run_bundle.start_doc + dr("start", start) + stream_names = ["stream_1", "stream_2"] + stream_resource_doc, compose_stream_data = \ + compose_stream_resource(spec="TIFF_STREAM", root=str(tmp_path), resource_path="test_streams", + resource_kwargs={}, stream_names=stream_names) + dr("stream_resource", stream_resource_doc) + datum_doc_0, datum_doc_1 = (compose_stream_datum(datum_kwargs={}) + for compose_stream_datum in compose_stream_data) + dr("stream_datum", datum_doc_0) + dr("stream_datum", datum_doc_1) + dr('stop', run_bundle.compose_stop()) + + def test_document_router_with_validation(): dr = event_model.DocumentRouter() run_bundle = event_model.compose_run() diff --git a/event_model/tests/test_run_router.py b/event_model/tests/test_run_router.py index e6c16e4f..3a3171ea 100644 --- a/event_model/tests/test_run_router.py +++ b/event_model/tests/test_run_router.py @@ -196,6 +196,48 @@ def check_not_filled_factory(name, doc): rr(name, doc) +def test_run_router_streams(tmp_path): + bundle = event_model.compose_run() + docs = [] + start_doc, compose_stream_resource, stop_doc = \ + bundle.start_doc, bundle.compose_stream_resource, bundle.compose_stop() + docs.append(("start", start_doc)) + stream_names = ["stream_1", "stream_2"] + stream_resource_doc, compose_stream_data = \ + compose_stream_resource(spec="TIFF_STREAM", root=str(tmp_path), resource_path="test_streams", + resource_kwargs={}, stream_names=stream_names) + docs.append(("stream_resource", stream_resource_doc)) + datum_doc_0, datum_doc_1 = (compose_stream_datum(datum_kwargs={}) + for compose_stream_datum in compose_stream_data) + docs.append(("stream_datum", datum_doc_0)) + docs.append(("stream_datum", datum_doc_1)) + docs.append(("stop", stop_doc)) + + # Empty list of factories. Just make sure nothing blows up. + rr = event_model.RunRouter([]) + for name, doc in docs: + rr(name, doc) + + # Collector factory for stream resource and data + resource_list = [] + data_list = [] + + def collector(name, doc): + if name == "stream_resource": + resource_list.append((name, doc)) + elif name == "stream_datum": + data_list.append((name, doc)) + + def all_factory(name, doc): + return [collector], [] + + rr = event_model.RunRouter([all_factory]) + for name, doc in docs: + rr(name, doc) + assert len(resource_list) == 1 + assert len(data_list) == 2 + + def test_subfactory(): # this test targeted the bug described in issue #170 factory_documents = defaultdict(list)