From f1884254b6078912d6c8eb6a63f70a888b6083b7 Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Thu, 15 Jun 2023 13:46:02 +0100 Subject: [PATCH 01/23] Implemented external asset structure --- event_model/__init__.py | 46 ++++++++---------------- event_model/documents/stream_datum.py | 37 +++++++++++-------- event_model/documents/stream_resource.py | 10 +----- event_model/schemas/stream_datum.json | 35 ++++++++++++------ event_model/schemas/stream_resource.json | 11 ------ event_model/tests/test_em.py | 26 ++++---------- event_model/tests/test_run_router.py | 7 ++-- 7 files changed, 72 insertions(+), 100 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 38a159a3..b796aeec 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -2000,31 +2000,28 @@ def compose_resource( @dataclass class ComposeStreamDatum: stream_resource: StreamResource - stream_name: str counter: Iterator def __call__( self, - datum_kwargs: Dict[str, Any], + data_keys: List[str], + seq_nums: Dict[str, int], + indices: Dict[str, int], event_count: int = 1, event_offset: int = 0, validate: bool = True, ) -> StreamDatum: resource_uid = self.stream_resource["uid"] - if self.stream_name not in self.stream_resource["stream_names"]: - raise EventModelKeyError( - "Attempt to create stream_datum with name not included" - "in stream_resource" - ) block_idx = next(self.counter) doc = StreamDatum( stream_resource=resource_uid, - datum_kwargs=datum_kwargs, - uid=f"{resource_uid}/{self.stream_name}/{block_idx}", - stream_name=self.stream_name, + uid=f"{resource_uid}/{block_idx}", block_idx=block_idx, event_count=event_count, event_offset=event_offset, + data_keys=data_keys, + seq_nums=seq_nums, + indices=indices, ) if validate: schema_validators[DocumentNames.stream_datum].validate(doc) @@ -2035,9 +2032,10 @@ def __call__( def compose_stream_datum( *, stream_resource: StreamResource, - stream_name: str, counter: Iterator, - datum_kwargs: Dict[str, Any], + data_keys: List[str], + seq_nums: Dict[str, int], + indices: Dict[str, int], event_count: int = 1, event_offset: int = 0, validate: bool = True, @@ -2045,8 +2043,10 @@ def compose_stream_datum( """ Here for backwards compatibility, the Compose class is prefered. """ - return ComposeStreamDatum(stream_resource, stream_name, counter)( - datum_kwargs, + return ComposeStreamDatum(stream_resource, counter)( + data_keys, + seq_nums, + indices, event_count=event_count, event_offset=event_offset, validate=validate, @@ -2078,7 +2078,6 @@ def __call__( root: str, resource_path: str, resource_kwargs: Dict[str, Any], - stream_names: Union[List, str], counters: List = [], path_semantics: Literal["posix", "windows"] = default_path_semantics, uid: Optional[str] = None, @@ -2086,17 +2085,6 @@ def __call__( ) -> ComposeStreamResourceBundle: if uid is None: uid = str(uuid.uuid4()) - if isinstance(stream_names, str): - stream_names = [ - stream_names, - ] - if len(counters) == 0: - counters = [itertools.count() for _ in stream_names] - elif len(counters) > len(stream_names): - raise ValueError( - "Insufficient number of counters " - f"{len(counters)} for stream names: {stream_names}" - ) doc = StreamResource( uid=uid, @@ -2104,7 +2092,6 @@ def __call__( root=root, resource_path=resource_path, resource_kwargs=resource_kwargs, - stream_names=stream_names, path_semantics=path_semantics, ) if self.start: @@ -2118,10 +2105,9 @@ def __call__( [ ComposeStreamDatum( stream_resource=doc, - stream_name=stream_name, counter=counter, ) - for stream_name, counter in zip(stream_names, counters) + for counter in counters ], ) @@ -2132,7 +2118,6 @@ def compose_stream_resource( root: str, resource_path: str, resource_kwargs: Dict[str, Any], - stream_names: Union[List, str], counters: List = [], path_semantics: Literal["posix", "windows"] = default_path_semantics, start: Optional[RunStart] = None, @@ -2147,7 +2132,6 @@ def compose_stream_resource( root, resource_path, resource_kwargs, - stream_names, counters=counters, path_semantics=path_semantics, uid=uid, diff --git a/event_model/documents/stream_datum.py b/event_model/documents/stream_datum.py index 5d9bf832..82b626be 100644 --- a/event_model/documents/stream_datum.py +++ b/event_model/documents/stream_datum.py @@ -1,4 +1,4 @@ -from typing import Any, Dict +from typing import Dict, List from typing_extensions import Annotated, TypedDict @@ -18,13 +18,6 @@ class StreamDatum(TypedDict): "be contiguous for a given stream.", ), ] - datum_kwargs: Annotated[ - Dict[str, Any], - Field( - description="Arguments to pass to the Handler to retrieve one " - "quanta of data", - ), - ] event_count: Annotated[ int, Field(description="The number of events in this datum.") ] @@ -35,13 +28,6 @@ class StreamDatum(TypedDict): "increasing value allows the presence of gaps.", ), ] - stream_name: Annotated[ - str, - Field( - description="The name of the stream that this Datum is providing a " - "block of.", - ), - ] stream_resource: Annotated[ str, Field( @@ -55,3 +41,24 @@ class StreamDatum(TypedDict): "formatting being '//", ), ] + data_keys: Annotated[ + List[str], + Field( + description="A list to show which data_keys of the " + "Descriptor are being streamed" + ), + ] + seq_nums: Annotated[ + Dict[str, int], + Field( + description="A slice object showing the Event numbers the " + "resource corresponds to" + ), + ] + indices: Annotated[ + Dict[str, int], + Field( + description="A slice object passed to the StreamResource " + "handler so it can hand back data and timestamps" + ), + ] diff --git a/event_model/documents/stream_resource.py b/event_model/documents/stream_resource.py index fe9050fc..df4e9b07 100644 --- a/event_model/documents/stream_resource.py +++ b/event_model/documents/stream_resource.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List +from typing import Any, Dict from typing_extensions import Annotated, Literal, NotRequired, TypedDict @@ -55,11 +55,3 @@ class StreamResource(TypedDict): uid: Annotated[ str, Field(description="Globally unique identifier for this Stream Resource") ] - stream_names: Annotated[ - List[str], - Field( - description="List of the stream names this resource provides", - min_items=1, - unique_items=True, - ), - ] diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index e10017b2..e6848417 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -8,10 +8,13 @@ "description": "The order in the stream of this block of data. This must be contiguous for a given stream.", "type": "integer" }, - "datum_kwargs": { - "title": "Datum Kwargs", - "description": "Arguments to pass to the Handler to retrieve one quanta of data", - "type": "object" + "data_keys": { + "title": "Data Keys", + "description": "A list to show which data_keys of the Descriptor are being streamed", + "type": "array", + "items": { + "type": "string" + } }, "event_count": { "title": "Event Count", @@ -23,10 +26,21 @@ "description": "The sequence number of the first event in this block. This increasing value allows the presence of gaps.", "type": "integer" }, - "stream_name": { - "title": "Stream Name", - "description": "The name of the stream that this Datum is providing a block of.", - "type": "string" + "indices": { + "title": "Indices", + "description": "A slice object passed to the StreamResource handler so it can hand back data and timestamps", + "type": "object", + "additionalProperties": { + "type": "integer" + } + }, + "seq_nums": { + "title": "Seq Nums", + "description": "A slice object showing the Event numbers the resource corresponds to", + "type": "object", + "additionalProperties": { + "type": "integer" + } }, "stream_resource": { "title": "Stream Resource", @@ -41,10 +55,11 @@ }, "required": [ "block_idx", - "datum_kwargs", + "data_keys", "event_count", "event_offset", - "stream_name", + "indices", + "seq_nums", "stream_resource", "uid" ], diff --git a/event_model/schemas/stream_resource.json b/event_model/schemas/stream_resource.json index e6d02554..7149fffe 100644 --- a/event_model/schemas/stream_resource.json +++ b/event_model/schemas/stream_resource.json @@ -37,16 +37,6 @@ "description": "String identifying the format/type of this Stream Resource, used to identify a compatible Handler", "type": "string" }, - "stream_names": { - "title": "Stream Names", - "description": "List of the stream names this resource provides", - "type": "array", - "minItems": 1, - "uniqueItems": true, - "items": { - "type": "string" - } - }, "uid": { "title": "Uid", "description": "Globally unique identifier for this Stream Resource", @@ -58,7 +48,6 @@ "resource_path", "root", "spec", - "stream_names", "uid" ], "additionalProperties": false diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 6fce1246..d4ffe29d 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -1,6 +1,7 @@ import json import pickle from distutils.version import LooseVersion +from itertools import count import jsonschema import numpy @@ -109,38 +110,25 @@ def test_compose_stream_resource(tmp_path): bundle = event_model.compose_run() compose_stream_resource = bundle.compose_stream_resource assert bundle.compose_stream_resource is compose_stream_resource - stream_names = ["stream_1", "stream_2"] bundle = compose_stream_resource( spec="TIFF_STREAM", root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, - stream_names=stream_names, + counters=[count(1), count(1)], ) resource_doc, compose_stream_data = bundle assert bundle.stream_resource_doc is resource_doc assert bundle.compose_stream_data is compose_stream_data assert compose_stream_data[0] is not compose_stream_data[1] datum_doc_0, datum_doc_1 = ( - compose_stream_datum(datum_kwargs={}) - for compose_stream_datum in compose_stream_data + compose_stream_datum([], {}, {}) for compose_stream_datum in compose_stream_data ) # Ensure independent counters assert datum_doc_0["block_idx"] == datum_doc_1["block_idx"] - datum_doc_1a = compose_stream_data[1](datum_kwargs={}) + datum_doc_1a = compose_stream_data[1]([], {}, {}) assert datum_doc_1a["block_idx"] != datum_doc_1["block_idx"] - # Ensure safety check - from itertools import count - - with pytest.raises(KeyError): - event_model.compose_stream_datum( - stream_resource=resource_doc, - stream_name="stream_NA", - counter=count(), - datum_kwargs={}, - ) - def test_round_trip_pagination(): run_bundle = event_model.compose_run() @@ -410,18 +398,16 @@ def test_document_router_streams_smoke_test(tmp_path): compose_stream_resource = run_bundle.compose_stream_resource start = run_bundle.start_doc dr("start", start) - stream_names = ["stream_1", "stream_2"] stream_resource_doc, compose_stream_data = compose_stream_resource( spec="TIFF_STREAM", root=str(tmp_path), + counters=[count(1), count(6)], resource_path="test_streams", resource_kwargs={}, - stream_names=stream_names, ) dr("stream_resource", stream_resource_doc) datum_doc_0, datum_doc_1 = ( - compose_stream_datum(datum_kwargs={}) - for compose_stream_datum in compose_stream_data + compose_stream_datum([], {}, {}) for compose_stream_datum in compose_stream_data ) dr("stream_datum", datum_doc_0) dr("stream_datum", datum_doc_1) diff --git a/event_model/tests/test_run_router.py b/event_model/tests/test_run_router.py index 0143b74e..9b930f40 100644 --- a/event_model/tests/test_run_router.py +++ b/event_model/tests/test_run_router.py @@ -1,4 +1,5 @@ from collections import defaultdict +from itertools import count import numpy import pytest @@ -215,18 +216,16 @@ def test_run_router_streams(tmp_path): bundle.compose_stop(), ) docs.append(("start", start_doc)) - stream_names = ["stream_1", "stream_2"] stream_resource_doc, compose_stream_data = compose_stream_resource( spec="TIFF_STREAM", root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, - stream_names=stream_names, + counters=[count(3), count(2)], ) docs.append(("stream_resource", stream_resource_doc)) datum_doc_0, datum_doc_1 = ( - compose_stream_datum(datum_kwargs={}) - for compose_stream_datum in compose_stream_data + compose_stream_datum([], {}, {}) for compose_stream_datum in compose_stream_data ) docs.append(("stream_datum", datum_doc_0)) docs.append(("stream_datum", datum_doc_1)) From 463743dce74981a3dbb87b6310dd2a22eff9ec0d Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Thu, 15 Jun 2023 14:48:10 +0100 Subject: [PATCH 02/23] Added Partial typeddicts for use with bluesky --- event_model/documents/event.py | 2 +- event_model/documents/event_page.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/event_model/documents/event.py b/event_model/documents/event.py index 58fdad8c..9fbb6e19 100644 --- a/event_model/documents/event.py +++ b/event_model/documents/event.py @@ -39,7 +39,7 @@ class Event(PartialEvent): descriptor: Annotated[ str, Field(description="UID of the EventDescriptor to which this Event belongs") ] - + seq_num: Annotated[ int, Field( diff --git a/event_model/documents/event_page.py b/event_model/documents/event_page.py index 0fc21934..ebf38580 100644 --- a/event_model/documents/event_page.py +++ b/event_model/documents/event_page.py @@ -42,7 +42,6 @@ class PartialEventPage(TypedDict): @add_extra_schema(EVENT_PAGE_EXTRA_SCHEMA) class EventPage(PartialEventPage): """Page of documents to record a quanta of collected data""" - descriptor: Annotated[ str, Field( From cd601d7efbb2c58167e7ebc6a6753c50731f337a Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Thu, 15 Jun 2023 14:48:10 +0100 Subject: [PATCH 03/23] Added Partial typeddicts for use with bluesky --- event_model/documents/event.py | 1 - event_model/documents/event_page.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/event_model/documents/event.py b/event_model/documents/event.py index 9fbb6e19..2b6dc477 100644 --- a/event_model/documents/event.py +++ b/event_model/documents/event.py @@ -39,7 +39,6 @@ class Event(PartialEvent): descriptor: Annotated[ str, Field(description="UID of the EventDescriptor to which this Event belongs") ] - seq_num: Annotated[ int, Field( diff --git a/event_model/documents/event_page.py b/event_model/documents/event_page.py index ebf38580..0fc21934 100644 --- a/event_model/documents/event_page.py +++ b/event_model/documents/event_page.py @@ -42,6 +42,7 @@ class PartialEventPage(TypedDict): @add_extra_schema(EVENT_PAGE_EXTRA_SCHEMA) class EventPage(PartialEventPage): """Page of documents to record a quanta of collected data""" + descriptor: Annotated[ str, Field( From 2910dfe83c5174fceee1e75f416d0e8c1007a6ac Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Tue, 20 Jun 2023 14:20:37 +0100 Subject: [PATCH 04/23] Changed the event counters to increment by seq_num, not data. Changed seq_num to be a kwarg in ComposeEventPage --- event_model/__init__.py | 46 ++++++++++++++++++++++++++++++++---- event_model/tests/test_em.py | 4 ++-- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index b796aeec..8ddcd706 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -2197,6 +2197,20 @@ def compose_stop( )(exit_status=exit_status, reason=reason, uid=uid, time=time, validate=validate) +def dict_of_lists_has_equal_size_lists(dictionary: Dict[str, List]) -> bool: + """Return True if all lists are the same size in a Dict[str, List].""" + + dictionary_values = iter(dictionary.values()) + first_element_len = len(next(dictionary_values)) + next_element = next(dictionary_values, None) + + while next_element: + if len(next_element) != first_element_len: + return False + next_element = next(dictionary_values, None) + return True + + @dataclass class ComposeEventPage: descriptor: EventDescriptor @@ -2206,12 +2220,30 @@ def __call__( self, data: Dict[str, List], timestamps: Dict[str, Any], - seq_num: List[int], + seq_num: Optional[List[int]] = None, filled: Optional[Dict[str, List[Union[bool, str]]]] = None, uid: Optional[List] = None, time: Optional[List] = None, validate: bool = True, ) -> EventPage: + assert dict_of_lists_has_equal_size_lists(timestamps), ( + "Cannot compose event_page: event_page contains `timestamps` " + "list values of different lengths" + ) + assert dict_of_lists_has_equal_size_lists(data), ( + "Cannot compose event_page: event_page contains `data` " + "lists of different lengths" + ) + assert len(next(iter(timestamps.values()))) == len(next(iter(data.values()))), ( + "Cannot compose event_page: the lists in `timestamps` are of a different " + "length to those in `data`" + ) + + if seq_num is None: + last_seq_num = self.event_counters[self.descriptor["name"]] + seq_num = list( + range(last_seq_num, len(next(iter(data.values()))) + last_seq_num) + ) N = len(seq_num) if uid is None: uid = [str(uuid.uuid4()) for _ in range(N)] @@ -2248,7 +2280,7 @@ def __call__( "Keys in event['filled'] {} must be a subset of those in " "event['data'] {}".format(filled.keys(), data.keys()) ) - self.event_counters[self.descriptor["name"]] += len(data) + self.event_counters[self.descriptor["name"]] += len(seq_num) return doc @@ -2268,7 +2300,13 @@ def compose_event_page( Here for backwards compatibility, the Compose class is prefered. """ return ComposeEventPage(descriptor, event_counters)( - data, timestamps, seq_num, filled, uid=uid, time=time, validate=validate + data, + timestamps, + seq_num=seq_num, + filled=filled, + uid=uid, + time=time, + validate=validate, ) @@ -2324,7 +2362,7 @@ def __call__( "Keys in event['filled'] {} must be a subset of those in " "event['data'] {}".format(filled.keys(), data.keys()) ) - self.event_counters[self.descriptor["name"]] += 1 + self.event_counters[self.descriptor["name"]] = seq_num + 1 return doc diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index d4ffe29d..cdac1330 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -1026,8 +1026,8 @@ def test_array_like(): ) desc_bundle.compose_event_page( data={"a": dask_array.ones((5, 3))}, - timestamps={"a": [1, 2, 3]}, - seq_num=[1, 2, 3], + timestamps={"a": [1, 2, 3, 4, 5]}, + seq_num=[1, 2, 3, 4, 5], ) From e5ec76c5173873767628c976b8ac755668e25fa5 Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Mon, 3 Jul 2023 13:40:07 +0100 Subject: [PATCH 05/23] Changed ComposeStreamResource to only have one ComposeDatum. Changed event_descriptor.py to use a Dtype as a type variable so we can import it from bluesky protocols --- event_model/__init__.py | 14 +++++--------- event_model/documents/event_descriptor.py | 4 +++- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 8ddcd706..238b3b90 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -2056,7 +2056,7 @@ def compose_stream_datum( @dataclass class ComposeStreamResourceBundle: stream_resource_doc: StreamResource - compose_stream_data: List[ComposeStreamDatum] + compose_stream_data: ComposeStreamDatum # iter for backwards compatibility def __iter__(self) -> Iterator: @@ -2078,7 +2078,6 @@ def __call__( root: str, resource_path: str, resource_kwargs: Dict[str, Any], - counters: List = [], path_semantics: Literal["posix", "windows"] = default_path_semantics, uid: Optional[str] = None, validate: bool = True, @@ -2102,13 +2101,10 @@ def __call__( return ComposeStreamResourceBundle( doc, - [ - ComposeStreamDatum( - stream_resource=doc, - counter=counter, - ) - for counter in counters - ], + ComposeStreamDatum( + stream_resource=doc, + counter=itertools.count(), + ) ) diff --git a/event_model/documents/event_descriptor.py b/event_model/documents/event_descriptor.py index 6fef8484..2ec03897 100644 --- a/event_model/documents/event_descriptor.py +++ b/event_model/documents/event_descriptor.py @@ -4,6 +4,8 @@ from .generate.type_wrapper import Field, add_extra_schema +Dtype = Literal["string", "number", "array", "boolean", "integer"] + class DataKey(TypedDict): """Describes the objects in the data property of Event documents""" @@ -18,7 +20,7 @@ class DataKey(TypedDict): ] ] dtype: Annotated[ - Literal["string", "number", "array", "boolean", "integer"], + Dtype, Field(description="The type of the data in the event."), ] external: NotRequired[ From 78d95b43374c3b232263f9c7f8349d8887b5c64d Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Mon, 3 Jul 2023 14:22:57 +0100 Subject: [PATCH 06/23] Specifying pydantic<2 in pyproject.toml, also changed stream_resource to only have one compose_datum in the ComposeStreamResourceBundle --- event_model/documents/event_descriptor.py | 2 +- event_model/tests/test_em.py | 25 ++++++----------------- event_model/tests/test_run_router.py | 13 ++++-------- pyproject.toml | 2 +- 4 files changed, 12 insertions(+), 30 deletions(-) diff --git a/event_model/documents/event_descriptor.py b/event_model/documents/event_descriptor.py index 2ec03897..f7f0178a 100644 --- a/event_model/documents/event_descriptor.py +++ b/event_model/documents/event_descriptor.py @@ -20,7 +20,7 @@ class DataKey(TypedDict): ] ] dtype: Annotated[ - Dtype, + Dtype, Field(description="The type of the data in the event."), ] external: NotRequired[ diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index cdac1330..0552b29a 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -1,7 +1,6 @@ import json import pickle from distutils.version import LooseVersion -from itertools import count import jsonschema import numpy @@ -115,19 +114,11 @@ def test_compose_stream_resource(tmp_path): root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, - counters=[count(1), count(1)], ) - resource_doc, compose_stream_data = bundle + resource_doc, compose_stream_datum = bundle assert bundle.stream_resource_doc is resource_doc - assert bundle.compose_stream_data is compose_stream_data - assert compose_stream_data[0] is not compose_stream_data[1] - datum_doc_0, datum_doc_1 = ( - compose_stream_datum([], {}, {}) for compose_stream_datum in compose_stream_data - ) - # Ensure independent counters - assert datum_doc_0["block_idx"] == datum_doc_1["block_idx"] - datum_doc_1a = compose_stream_data[1]([], {}, {}) - assert datum_doc_1a["block_idx"] != datum_doc_1["block_idx"] + assert bundle.compose_stream_data is compose_stream_datum + compose_stream_datum([], {}, {}) def test_round_trip_pagination(): @@ -398,19 +389,15 @@ def test_document_router_streams_smoke_test(tmp_path): compose_stream_resource = run_bundle.compose_stream_resource start = run_bundle.start_doc dr("start", start) - stream_resource_doc, compose_stream_data = compose_stream_resource( + stream_resource_doc, compose_stream_datum = compose_stream_resource( spec="TIFF_STREAM", root=str(tmp_path), - counters=[count(1), count(6)], resource_path="test_streams", resource_kwargs={}, ) dr("stream_resource", stream_resource_doc) - datum_doc_0, datum_doc_1 = ( - compose_stream_datum([], {}, {}) for compose_stream_datum in compose_stream_data - ) - dr("stream_datum", datum_doc_0) - dr("stream_datum", datum_doc_1) + datum_doc = compose_stream_datum([], {}, {}) + dr("stream_datum", datum_doc) dr("stop", run_bundle.compose_stop()) diff --git a/event_model/tests/test_run_router.py b/event_model/tests/test_run_router.py index 9b930f40..f1c99429 100644 --- a/event_model/tests/test_run_router.py +++ b/event_model/tests/test_run_router.py @@ -1,5 +1,4 @@ from collections import defaultdict -from itertools import count import numpy import pytest @@ -216,19 +215,15 @@ def test_run_router_streams(tmp_path): bundle.compose_stop(), ) docs.append(("start", start_doc)) - stream_resource_doc, compose_stream_data = compose_stream_resource( + stream_resource_doc, compose_stream_datum = compose_stream_resource( spec="TIFF_STREAM", root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, - counters=[count(3), count(2)], ) docs.append(("stream_resource", stream_resource_doc)) - datum_doc_0, datum_doc_1 = ( - compose_stream_datum([], {}, {}) for compose_stream_datum in compose_stream_data - ) - docs.append(("stream_datum", datum_doc_0)) - docs.append(("stream_datum", datum_doc_1)) + datum_doc = compose_stream_datum([], {}, {}) + docs.append(("stream_datum", datum_doc)) docs.append(("stop", stop_doc)) # Empty list of factories. Just make sure nothing blows up. @@ -253,7 +248,7 @@ def all_factory(name, doc): for name, doc in docs: rr(name, doc) assert len(resource_list) == 1 - assert len(data_list) == 2 + assert len(data_list) == 1 def test_subfactory(): diff --git a/pyproject.toml b/pyproject.toml index ac2d053a..7a7d8ca8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,7 @@ dev = [ "numpydoc", # For schema generation. - "pydantic", + "pydantic<2.0", ] [project.scripts] From 19e67446fccb0df279858f9c84bd88ccebf2200e Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Mon, 3 Jul 2023 14:59:36 +0100 Subject: [PATCH 07/23] Removed content from stream_datum, added a Range Typeddict --- event_model/__init__.py | 21 +++------ event_model/documents/generate/__main__.py | 3 ++ event_model/documents/stream_datum.py | 30 ++++++------ event_model/schemas/stream_datum.json | 53 ++++++++++++++-------- event_model/tests/test_em.py | 5 +- event_model/tests/test_run_router.py | 3 +- 6 files changed, 64 insertions(+), 51 deletions(-) create mode 100644 event_model/documents/generate/__main__.py diff --git a/event_model/__init__.py b/event_model/__init__.py index 238b3b90..0623ade4 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -51,7 +51,7 @@ from .documents.resource import Resource from .documents.run_start import RunStart from .documents.run_stop import RunStop -from .documents.stream_datum import StreamDatum +from .documents.stream_datum import Range, StreamDatum from .documents.stream_resource import StreamResource if sys.version_info < (3, 9): @@ -2005,10 +2005,8 @@ class ComposeStreamDatum: def __call__( self, data_keys: List[str], - seq_nums: Dict[str, int], - indices: Dict[str, int], - event_count: int = 1, - event_offset: int = 0, + seq_nums: Range, + indices: Range, validate: bool = True, ) -> StreamDatum: resource_uid = self.stream_resource["uid"] @@ -2017,8 +2015,6 @@ def __call__( stream_resource=resource_uid, uid=f"{resource_uid}/{block_idx}", block_idx=block_idx, - event_count=event_count, - event_offset=event_offset, data_keys=data_keys, seq_nums=seq_nums, indices=indices, @@ -2034,10 +2030,8 @@ def compose_stream_datum( stream_resource: StreamResource, counter: Iterator, data_keys: List[str], - seq_nums: Dict[str, int], - indices: Dict[str, int], - event_count: int = 1, - event_offset: int = 0, + seq_nums: Range, + indices: Range, validate: bool = True, ) -> StreamDatum: """ @@ -2047,8 +2041,6 @@ def compose_stream_datum( data_keys, seq_nums, indices, - event_count=event_count, - event_offset=event_offset, validate=validate, ) @@ -2104,7 +2096,7 @@ def __call__( ComposeStreamDatum( stream_resource=doc, counter=itertools.count(), - ) + ), ) @@ -2128,7 +2120,6 @@ def compose_stream_resource( root, resource_path, resource_kwargs, - counters=counters, path_semantics=path_semantics, uid=uid, validate=validate, diff --git a/event_model/documents/generate/__main__.py b/event_model/documents/generate/__main__.py new file mode 100644 index 00000000..6089f854 --- /dev/null +++ b/event_model/documents/generate/__main__.py @@ -0,0 +1,3 @@ +from typeddict_to_schema import generate_all_schema + +generate_all_schema() diff --git a/event_model/documents/stream_datum.py b/event_model/documents/stream_datum.py index 82b626be..1c640b8f 100644 --- a/event_model/documents/stream_datum.py +++ b/event_model/documents/stream_datum.py @@ -1,9 +1,23 @@ -from typing import Dict, List +from typing import List from typing_extensions import Annotated, TypedDict from .generate.type_wrapper import Field, add_extra_schema + +class Range(TypedDict): + """The parameters required to describe a sequence of incrementing integers""" + + start: Annotated[ + int, + Field(description="First number in the range"), + ] + stop: Annotated[ + int, + Field(description="Last number in the range is less than this number"), + ] + + STREAM_DATUM_EXTRA_SCHEMA = {"additionalProperties": False} @@ -18,16 +32,6 @@ class StreamDatum(TypedDict): "be contiguous for a given stream.", ), ] - event_count: Annotated[ - int, Field(description="The number of events in this datum.") - ] - event_offset: Annotated[ - int, - Field( - description="The sequence number of the first event in this block. This " - "increasing value allows the presence of gaps.", - ), - ] stream_resource: Annotated[ str, Field( @@ -49,14 +53,14 @@ class StreamDatum(TypedDict): ), ] seq_nums: Annotated[ - Dict[str, int], + Range, Field( description="A slice object showing the Event numbers the " "resource corresponds to" ), ] indices: Annotated[ - Dict[str, int], + Range, Field( description="A slice object passed to the StreamResource " "handler so it can hand back data and timestamps" diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index e6848417..6a7cdb36 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -2,6 +2,29 @@ "title": "stream_datum", "description": "Document to reference a quanta of an externally-stored stream of data.", "type": "object", + "definitions": { + "Range": { + "title": "Range", + "description": "The parameters required to describe a sequence of incrementing integers", + "type": "object", + "properties": { + "start": { + "title": "Start", + "description": "First number in the range", + "type": "integer" + }, + "stop": { + "title": "Stop", + "description": "Last number in the range is less than this number", + "type": "integer" + } + }, + "required": [ + "start", + "stop" + ] + } + }, "properties": { "block_idx": { "title": "Block Idx", @@ -16,31 +39,23 @@ "type": "string" } }, - "event_count": { - "title": "Event Count", - "description": "The number of events in this datum.", - "type": "integer" - }, - "event_offset": { - "title": "Event Offset", - "description": "The sequence number of the first event in this block. This increasing value allows the presence of gaps.", - "type": "integer" - }, "indices": { "title": "Indices", "description": "A slice object passed to the StreamResource handler so it can hand back data and timestamps", - "type": "object", - "additionalProperties": { - "type": "integer" - } + "allOf": [ + { + "$ref": "#/definitions/Range" + } + ] }, "seq_nums": { "title": "Seq Nums", "description": "A slice object showing the Event numbers the resource corresponds to", - "type": "object", - "additionalProperties": { - "type": "integer" - } + "allOf": [ + { + "$ref": "#/definitions/Range" + } + ] }, "stream_resource": { "title": "Stream Resource", @@ -56,8 +71,6 @@ "required": [ "block_idx", "data_keys", - "event_count", - "event_offset", "indices", "seq_nums", "stream_resource", diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 0552b29a..affc7dc6 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -7,6 +7,7 @@ import pytest import event_model +from event_model.documents.stream_datum import Range JSONSCHEMA_2 = LooseVersion(jsonschema.__version__) < LooseVersion("3.0.0") @@ -118,7 +119,7 @@ def test_compose_stream_resource(tmp_path): resource_doc, compose_stream_datum = bundle assert bundle.stream_resource_doc is resource_doc assert bundle.compose_stream_data is compose_stream_datum - compose_stream_datum([], {}, {}) + compose_stream_datum([], Range(start=0, stop=0), Range(start=0, stop=0)) def test_round_trip_pagination(): @@ -396,7 +397,7 @@ def test_document_router_streams_smoke_test(tmp_path): resource_kwargs={}, ) dr("stream_resource", stream_resource_doc) - datum_doc = compose_stream_datum([], {}, {}) + datum_doc = compose_stream_datum([], Range(start=0, stop=0), Range(start=0, stop=0)) dr("stream_datum", datum_doc) dr("stop", run_bundle.compose_stop()) diff --git a/event_model/tests/test_run_router.py b/event_model/tests/test_run_router.py index f1c99429..e354b4de 100644 --- a/event_model/tests/test_run_router.py +++ b/event_model/tests/test_run_router.py @@ -4,6 +4,7 @@ import pytest import event_model +from event_model.documents.stream_datum import Range def test_run_router(tmp_path): @@ -222,7 +223,7 @@ def test_run_router_streams(tmp_path): resource_kwargs={}, ) docs.append(("stream_resource", stream_resource_doc)) - datum_doc = compose_stream_datum([], {}, {}) + datum_doc = compose_stream_datum([], Range(start=0, stop=0), Range(start=0, stop=0)) docs.append(("stream_datum", datum_doc)) docs.append(("stop", stop_doc)) From 6e8a865ca9fa74f83bbca62ab01c9d8bf0ad1df5 Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Tue, 4 Jul 2023 11:42:40 +0100 Subject: [PATCH 08/23] Allowed external="STREAM:" keys to not be represented in data in ComposeEvent and ComposeEventPage --- event_model/__init__.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 0623ade4..c8e85dae 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -2249,11 +2249,16 @@ def __call__( ) if validate: schema_validators[DocumentNames.event_page].validate(doc) + if not ( - self.descriptor["data_keys"].keys() == data.keys() == timestamps.keys() + keys_without_stream_keys( + self.descriptor["data_keys"], self.descriptor["data_keys"] + ) == + keys_without_stream_keys(data, self.descriptor["data_keys"]) == + keys_without_stream_keys(timestamps, self.descriptor["data_keys"]) ): raise EventModelValidationError( - "These sets of keys must match:\n" + "These sets of keys must match (other than \"STREAM:\" keys):\n" "event['data'].keys(): {}\n" "event['timestamps'].keys(): {}\n" "descriptor['data_keys'].keys(): {}\n".format( @@ -2297,6 +2302,15 @@ def compose_event_page( ) +def keys_without_stream_keys(dictionary, descriptor_data_keys): + return [ + key for key in dictionary.keys() if ( + "external" not in descriptor_data_keys[key] or + descriptor_data_keys[key]["external"] != "STREAM:" + ) + ] + + @dataclass class ComposeEvent: descriptor: EventDescriptor @@ -2331,11 +2345,16 @@ def __call__( ) if validate: schema_validators[DocumentNames.event].validate(doc) + if not ( - self.descriptor["data_keys"].keys() == data.keys() == timestamps.keys() + keys_without_stream_keys( + self.descriptor["data_keys"], self.descriptor["data_keys"] + ) == + keys_without_stream_keys(data, self.descriptor["data_keys"]) == + keys_without_stream_keys(timestamps, self.descriptor["data_keys"]) ): raise EventModelValidationError( - "These sets of keys must match:\n" + "These sets of keys must match (other than \"STREAM:\" keys):\n" "event['data'].keys(): {}\n" "event['timestamps'].keys(): {}\n" "descriptor['data_keys'].keys(): {}\n".format( From 959bc59a49e7598a4915ca6eb933bf9052f94ffd Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Thu, 14 Sep 2023 16:15:40 +0100 Subject: [PATCH 09/23] Fixed typo --- event_model/__init__.py | 4 ++-- event_model/tests/test_em.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 38a159a3..276ae0be 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -2056,14 +2056,14 @@ def compose_stream_datum( @dataclass class ComposeStreamResourceBundle: stream_resource_doc: StreamResource - compose_stream_data: List[ComposeStreamDatum] + compose_stream_datum: ComposeStreamDatum # iter for backwards compatibility def __iter__(self) -> Iterator: return iter( ( self.stream_resource_doc, - self.compose_stream_data, + self.compose_stream_datum, ) ) diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 6fce1246..cc6962b4 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -119,7 +119,7 @@ def test_compose_stream_resource(tmp_path): ) resource_doc, compose_stream_data = bundle assert bundle.stream_resource_doc is resource_doc - assert bundle.compose_stream_data is compose_stream_data + assert bundle.compose_stream_datum is compose_stream_data assert compose_stream_data[0] is not compose_stream_data[1] datum_doc_0, datum_doc_1 = ( compose_stream_datum(datum_kwargs={}) From 79daf00f79a329afd8fbac133e58c12e6079317b Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Tue, 4 Jul 2023 14:27:23 +0100 Subject: [PATCH 10/23] Fixed imports to avoid pydantic if the version is >=2 --- event_model/__init__.py | 16 +++++++-------- .../documents/generate/type_wrapper.py | 20 ++++++++++++++++++- .../documents/generate/typeddict_to_schema.py | 12 +++++++++-- 3 files changed, 37 insertions(+), 11 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index c8e85dae..b485ab36 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -2251,11 +2251,11 @@ def __call__( schema_validators[DocumentNames.event_page].validate(doc) if not ( - keys_without_stream_keys( + set(keys_without_stream_keys( self.descriptor["data_keys"], self.descriptor["data_keys"] - ) == - keys_without_stream_keys(data, self.descriptor["data_keys"]) == - keys_without_stream_keys(timestamps, self.descriptor["data_keys"]) + )) == + set(keys_without_stream_keys(data, self.descriptor["data_keys"])) == + set(keys_without_stream_keys(timestamps, self.descriptor["data_keys"])) ): raise EventModelValidationError( "These sets of keys must match (other than \"STREAM:\" keys):\n" @@ -2347,11 +2347,11 @@ def __call__( schema_validators[DocumentNames.event].validate(doc) if not ( - keys_without_stream_keys( + set(keys_without_stream_keys( self.descriptor["data_keys"], self.descriptor["data_keys"] - ) == - keys_without_stream_keys(data, self.descriptor["data_keys"]) == - keys_without_stream_keys(timestamps, self.descriptor["data_keys"]) + )) == + set(keys_without_stream_keys(data, self.descriptor["data_keys"])) == + set(keys_without_stream_keys(timestamps, self.descriptor["data_keys"])) ): raise EventModelValidationError( "These sets of keys must match (other than \"STREAM:\" keys):\n" diff --git a/event_model/documents/generate/type_wrapper.py b/event_model/documents/generate/type_wrapper.py index 156853a7..ed4ffcf9 100644 --- a/event_model/documents/generate/type_wrapper.py +++ b/event_model/documents/generate/type_wrapper.py @@ -5,11 +5,20 @@ from dataclasses import dataclass +pydantic_version = None try: - import pydantic + try: + from pydantic import v1 as pydantic # type: ignore + except ImportError: + import pydantic + + pydantic_version = pydantic.__version__ Field = pydantic.Field FieldInfo = pydantic.fields.FieldInfo + BaseConfig = pydantic.BaseConfig + BaseModel = pydantic.BaseModel + create_model = pydantic.create_model except ModuleNotFoundError: def Field(*args, **kwargs): # type: ignore @@ -18,6 +27,15 @@ def Field(*args, **kwargs): # type: ignore class FieldInfo: # type: ignore ... + class BaseConfig: # type: ignore + ... + + class BaseModel: # type: ignore + ... + + def create_model(*args, **kwargs): # type: ignore + ... + extra_schema = {} diff --git a/event_model/documents/generate/typeddict_to_schema.py b/event_model/documents/generate/typeddict_to_schema.py index 216b337b..0aca6465 100644 --- a/event_model/documents/generate/typeddict_to_schema.py +++ b/event_model/documents/generate/typeddict_to_schema.py @@ -7,8 +7,6 @@ from pathlib import Path from typing import Dict, Optional, Tuple, Type, Union -from pydantic import BaseConfig, BaseModel, Field, create_model -from pydantic.fields import FieldInfo from typing_extensions import ( Annotated, NotRequired, @@ -33,13 +31,23 @@ from event_model.documents.generate.type_wrapper import ( ALLOWED_ANNOTATION_ELEMENTS, AsRef, + BaseConfig, + BaseModel, + Field, + FieldInfo, + create_model, extra_schema, + pydantic_version, ) # The hacky indexing on types isn't possible with python < 3.9 if sys.version_info[:2] < (3, 9): raise EnvironmentError("schema generation requires python 3.8 or higher") +if not pydantic_version: + raise EnvironmentError( + "schema generation requires pydantic < 2.0 to run, pydantic isn't installed" + ) SCHEMA_OUT_DIR = Path("event_model") / SCHEMA_PATH From 1ed861d848d21478824d88a9bab5f799f9a1ab44 Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Wed, 12 Jul 2023 11:36:47 +0100 Subject: [PATCH 11/23] We changed != to == when checking datakeys From e1e806512f5f8afb4fcf3aa6835e21b442d95222 Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Thu, 13 Jul 2023 15:40:17 +0100 Subject: [PATCH 12/23] Made corrections to changes, still need to add descriptor reference to stream_datum --- event_model/__init__.py | 54 +++++++++++++++++---------- event_model/documents/stream_datum.py | 6 +-- event_model/tests/test_em.py | 8 ++-- event_model/tests/test_run_router.py | 6 ++- 4 files changed, 46 insertions(+), 28 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index b485ab36..5987029e 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -51,7 +51,7 @@ from .documents.resource import Resource from .documents.run_start import RunStart from .documents.run_stop import RunStop -from .documents.stream_datum import Range, StreamDatum +from .documents.stream_datum import StreamDatum, StreamRange from .documents.stream_resource import StreamResource if sys.version_info < (3, 9): @@ -2005,8 +2005,8 @@ class ComposeStreamDatum: def __call__( self, data_keys: List[str], - seq_nums: Range, - indices: Range, + seq_nums: StreamRange, + indices: StreamRange, validate: bool = True, ) -> StreamDatum: resource_uid = self.stream_resource["uid"] @@ -2030,13 +2030,17 @@ def compose_stream_datum( stream_resource: StreamResource, counter: Iterator, data_keys: List[str], - seq_nums: Range, - indices: Range, + seq_nums: StreamRange, + indices: StreamRange, validate: bool = True, ) -> StreamDatum: """ Here for backwards compatibility, the Compose class is prefered. """ + warnings.warn( + "compose_stream_datum() will be removed in the minor version.", + DeprecationWarning + ) return ComposeStreamDatum(stream_resource, counter)( data_keys, seq_nums, @@ -2251,14 +2255,18 @@ def __call__( schema_validators[DocumentNames.event_page].validate(doc) if not ( - set(keys_without_stream_keys( - self.descriptor["data_keys"], self.descriptor["data_keys"] - )) == - set(keys_without_stream_keys(data, self.descriptor["data_keys"])) == - set(keys_without_stream_keys(timestamps, self.descriptor["data_keys"])) + set( + keys_without_stream_keys( + self.descriptor["data_keys"], self.descriptor["data_keys"] + ) + ) + == set(keys_without_stream_keys(data, self.descriptor["data_keys"])) + == set( + keys_without_stream_keys(timestamps, self.descriptor["data_keys"]) + ) ): raise EventModelValidationError( - "These sets of keys must match (other than \"STREAM:\" keys):\n" + 'These sets of keys must match (other than "STREAM:" keys):\n' "event['data'].keys(): {}\n" "event['timestamps'].keys(): {}\n" "descriptor['data_keys'].keys(): {}\n".format( @@ -2304,9 +2312,11 @@ def compose_event_page( def keys_without_stream_keys(dictionary, descriptor_data_keys): return [ - key for key in dictionary.keys() if ( - "external" not in descriptor_data_keys[key] or - descriptor_data_keys[key]["external"] != "STREAM:" + key + for key in dictionary.keys() + if ( + "external" not in descriptor_data_keys[key] + or descriptor_data_keys[key]["external"] != "STREAM:" ) ] @@ -2347,14 +2357,18 @@ def __call__( schema_validators[DocumentNames.event].validate(doc) if not ( - set(keys_without_stream_keys( - self.descriptor["data_keys"], self.descriptor["data_keys"] - )) == - set(keys_without_stream_keys(data, self.descriptor["data_keys"])) == - set(keys_without_stream_keys(timestamps, self.descriptor["data_keys"])) + set( + keys_without_stream_keys( + self.descriptor["data_keys"], self.descriptor["data_keys"] + ) + ) + == set(keys_without_stream_keys(data, self.descriptor["data_keys"])) + == set( + keys_without_stream_keys(timestamps, self.descriptor["data_keys"]) + ) ): raise EventModelValidationError( - "These sets of keys must match (other than \"STREAM:\" keys):\n" + 'These sets of keys must match (other than "STREAM:" keys):\n' "event['data'].keys(): {}\n" "event['timestamps'].keys(): {}\n" "descriptor['data_keys'].keys(): {}\n".format( diff --git a/event_model/documents/stream_datum.py b/event_model/documents/stream_datum.py index 1c640b8f..ae8a8a08 100644 --- a/event_model/documents/stream_datum.py +++ b/event_model/documents/stream_datum.py @@ -5,7 +5,7 @@ from .generate.type_wrapper import Field, add_extra_schema -class Range(TypedDict): +class StreamRange(TypedDict): """The parameters required to describe a sequence of incrementing integers""" start: Annotated[ @@ -53,14 +53,14 @@ class StreamDatum(TypedDict): ), ] seq_nums: Annotated[ - Range, + StreamRange, Field( description="A slice object showing the Event numbers the " "resource corresponds to" ), ] indices: Annotated[ - Range, + StreamRange, Field( description="A slice object passed to the StreamResource " "handler so it can hand back data and timestamps" diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index affc7dc6..059339ca 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -7,7 +7,7 @@ import pytest import event_model -from event_model.documents.stream_datum import Range +from event_model.documents.stream_datum import StreamRange JSONSCHEMA_2 = LooseVersion(jsonschema.__version__) < LooseVersion("3.0.0") @@ -119,7 +119,7 @@ def test_compose_stream_resource(tmp_path): resource_doc, compose_stream_datum = bundle assert bundle.stream_resource_doc is resource_doc assert bundle.compose_stream_data is compose_stream_datum - compose_stream_datum([], Range(start=0, stop=0), Range(start=0, stop=0)) + compose_stream_datum([], StreamRange(start=0, stop=0), StreamRange(start=0, stop=0)) def test_round_trip_pagination(): @@ -397,7 +397,9 @@ def test_document_router_streams_smoke_test(tmp_path): resource_kwargs={}, ) dr("stream_resource", stream_resource_doc) - datum_doc = compose_stream_datum([], Range(start=0, stop=0), Range(start=0, stop=0)) + datum_doc = compose_stream_datum( + [], StreamRange(start=0, stop=0), StreamRange(start=0, stop=0) + ) dr("stream_datum", datum_doc) dr("stop", run_bundle.compose_stop()) diff --git a/event_model/tests/test_run_router.py b/event_model/tests/test_run_router.py index e354b4de..54df7335 100644 --- a/event_model/tests/test_run_router.py +++ b/event_model/tests/test_run_router.py @@ -4,7 +4,7 @@ import pytest import event_model -from event_model.documents.stream_datum import Range +from event_model.documents.stream_datum import StreamRange def test_run_router(tmp_path): @@ -223,7 +223,9 @@ def test_run_router_streams(tmp_path): resource_kwargs={}, ) docs.append(("stream_resource", stream_resource_doc)) - datum_doc = compose_stream_datum([], Range(start=0, stop=0), Range(start=0, stop=0)) + datum_doc = compose_stream_datum( + [], StreamRange(start=0, stop=0), StreamRange(start=0, stop=0) + ) docs.append(("stream_datum", datum_doc)) docs.append(("stop", stop_doc)) From 60848ee59392bd33ef01a0facb6b707f4aaae0bc Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Thu, 20 Jul 2023 11:09:48 +0100 Subject: [PATCH 13/23] Added a reference to event-descriptor uid in StreamDatum --- event_model/__init__.py | 4 ++++ event_model/documents/stream_datum.py | 8 +++++++- event_model/schemas/stream_datum.json | 13 +++++++++---- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 5987029e..43e9a3a9 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -2007,6 +2007,7 @@ def __call__( data_keys: List[str], seq_nums: StreamRange, indices: StreamRange, + descriptor: Optional[EventDescriptor] = None, validate: bool = True, ) -> StreamDatum: resource_uid = self.stream_resource["uid"] @@ -2019,6 +2020,9 @@ def __call__( seq_nums=seq_nums, indices=indices, ) + if descriptor: + doc["descriptor"] = descriptor["uid"] + if validate: schema_validators[DocumentNames.stream_datum].validate(doc) diff --git a/event_model/documents/stream_datum.py b/event_model/documents/stream_datum.py index ae8a8a08..c3778bbc 100644 --- a/event_model/documents/stream_datum.py +++ b/event_model/documents/stream_datum.py @@ -1,6 +1,6 @@ from typing import List -from typing_extensions import Annotated, TypedDict +from typing_extensions import Annotated, TypedDict, NotRequired from .generate.type_wrapper import Field, add_extra_schema @@ -25,6 +25,12 @@ class StreamRange(TypedDict): class StreamDatum(TypedDict): """Document to reference a quanta of an externally-stored stream of data.""" + descriptor: NotRequired[ + Annotated[ + str, Field(description="UID of the EventDescriptor to which this Datum belongs") + ] + ] + block_idx: Annotated[ int, Field( diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index 6a7cdb36..93791946 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -3,8 +3,8 @@ "description": "Document to reference a quanta of an externally-stored stream of data.", "type": "object", "definitions": { - "Range": { - "title": "Range", + "StreamRange": { + "title": "StreamRange", "description": "The parameters required to describe a sequence of incrementing integers", "type": "object", "properties": { @@ -39,12 +39,17 @@ "type": "string" } }, + "descriptor": { + "title": "Descriptor", + "description": "UID of the EventDescriptor to which this Datum belongs", + "type": "string" + }, "indices": { "title": "Indices", "description": "A slice object passed to the StreamResource handler so it can hand back data and timestamps", "allOf": [ { - "$ref": "#/definitions/Range" + "$ref": "#/definitions/StreamRange" } ] }, @@ -53,7 +58,7 @@ "description": "A slice object showing the Event numbers the resource corresponds to", "allOf": [ { - "$ref": "#/definitions/Range" + "$ref": "#/definitions/StreamRange" } ] }, From 7f5394887291d885cf8f779666a4b3b467a2b4f4 Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Thu, 20 Jul 2023 11:43:06 +0100 Subject: [PATCH 14/23] Made run_start and descriptor required in Resource and StreamDatum respectively --- event_model/__init__.py | 13 +++++-------- .../documents/generate/typeddict_to_schema.py | 1 + event_model/documents/resource.py | 14 ++++++-------- event_model/documents/stream_datum.py | 13 ++++++------- event_model/documents/stream_resource.py | 14 ++++++-------- event_model/schemas/resource.json | 1 + event_model/schemas/stream_datum.json | 3 ++- event_model/schemas/stream_resource.json | 1 + 8 files changed, 28 insertions(+), 32 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 43e9a3a9..b6626c53 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1957,13 +1957,12 @@ def __call__( root=root, resource_kwargs=resource_kwargs, resource_path=resource_path, + run_start=self.start["uid"] if self.start else "", ) + if validate: schema_validators[DocumentNames.resource].validate(doc) - if self.start: - doc["run_start"] = self.start["uid"] - counter = itertools.count() return ComposeResourceBundle( doc, @@ -2019,9 +2018,8 @@ def __call__( data_keys=data_keys, seq_nums=seq_nums, indices=indices, + descriptor=descriptor["uid"] if descriptor else "", ) - if descriptor: - doc["descriptor"] = descriptor["uid"] if validate: schema_validators[DocumentNames.stream_datum].validate(doc) @@ -2043,7 +2041,7 @@ def compose_stream_datum( """ warnings.warn( "compose_stream_datum() will be removed in the minor version.", - DeprecationWarning + DeprecationWarning, ) return ComposeStreamDatum(stream_resource, counter)( data_keys, @@ -2092,9 +2090,8 @@ def __call__( resource_path=resource_path, resource_kwargs=resource_kwargs, path_semantics=path_semantics, + run_start=self.start["uid"] if self.start else "", ) - if self.start: - doc["run_start"] = self.start["uid"] if validate: schema_validators[DocumentNames.stream_resource].validate(doc) diff --git a/event_model/documents/generate/typeddict_to_schema.py b/event_model/documents/generate/typeddict_to_schema.py index 0aca6465..7108d410 100644 --- a/event_model/documents/generate/typeddict_to_schema.py +++ b/event_model/documents/generate/typeddict_to_schema.py @@ -49,6 +49,7 @@ "schema generation requires pydantic < 2.0 to run, pydantic isn't installed" ) + SCHEMA_OUT_DIR = Path("event_model") / SCHEMA_PATH diff --git a/event_model/documents/resource.py b/event_model/documents/resource.py index 882ceb69..ed272362 100644 --- a/event_model/documents/resource.py +++ b/event_model/documents/resource.py @@ -49,12 +49,10 @@ class Resource(PartialResource): Field(description="Rules for joining paths"), ] ] - run_start: NotRequired[ - Annotated[ - str, - Field( - description="Globally unique ID to the run_start document this " - "resource is associated with.", - ), - ] + run_start: Annotated[ + str, + Field( + description="Globally unique ID to the run_start document this " + "resource is associated with.", + ), ] diff --git a/event_model/documents/stream_datum.py b/event_model/documents/stream_datum.py index c3778bbc..f247b858 100644 --- a/event_model/documents/stream_datum.py +++ b/event_model/documents/stream_datum.py @@ -1,6 +1,6 @@ from typing import List -from typing_extensions import Annotated, TypedDict, NotRequired +from typing_extensions import Annotated, TypedDict from .generate.type_wrapper import Field, add_extra_schema @@ -25,10 +25,9 @@ class StreamRange(TypedDict): class StreamDatum(TypedDict): """Document to reference a quanta of an externally-stored stream of data.""" - descriptor: NotRequired[ - Annotated[ - str, Field(description="UID of the EventDescriptor to which this Datum belongs") - ] + descriptor: Annotated[ + str, + Field(description="UID of the EventDescriptor to " "which this Datum belongs"), ] block_idx: Annotated[ @@ -47,8 +46,8 @@ class StreamDatum(TypedDict): uid: Annotated[ str, Field( - description="Globally unique identifier for this Datum. A suggested " - "formatting being '//", + description="A slice object passed to the StreamResource " + "handler so it can hand back data and timestamps." ), ] data_keys: Annotated[ diff --git a/event_model/documents/stream_resource.py b/event_model/documents/stream_resource.py index df4e9b07..cafcf710 100644 --- a/event_model/documents/stream_resource.py +++ b/event_model/documents/stream_resource.py @@ -36,14 +36,12 @@ class StreamResource(TypedDict): description="Subset of resource_path that is a local detail, not semantic." ), ] - run_start: NotRequired[ - Annotated[ - str, - Field( - description="Globally unique ID to the run_start document " - "this Stream Resource is associated with.", - ), - ] + run_start: Annotated[ + str, + Field( + description="Globally unique ID to the run_start document " + "this Stream Resource is associated with.", + ), ] spec: Annotated[ str, diff --git a/event_model/schemas/resource.json b/event_model/schemas/resource.json index 5421b607..68ef0bc8 100644 --- a/event_model/schemas/resource.json +++ b/event_model/schemas/resource.json @@ -47,6 +47,7 @@ "resource_kwargs", "resource_path", "root", + "run_start", "spec", "uid" ], diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index 93791946..1c68df37 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -69,13 +69,14 @@ }, "uid": { "title": "Uid", - "description": "Globally unique identifier for this Datum. A suggested formatting being '//", + "description": "A slice object passed to the StreamResource handler so it can hand back data and timestamps.", "type": "string" } }, "required": [ "block_idx", "data_keys", + "descriptor", "indices", "seq_nums", "stream_resource", diff --git a/event_model/schemas/stream_resource.json b/event_model/schemas/stream_resource.json index 7149fffe..2ab8719a 100644 --- a/event_model/schemas/stream_resource.json +++ b/event_model/schemas/stream_resource.json @@ -47,6 +47,7 @@ "resource_kwargs", "resource_path", "root", + "run_start", "spec", "uid" ], From f579225e5bb293269f40d84a6a14c877ed518dd9 Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Mon, 14 Aug 2023 08:54:36 +0100 Subject: [PATCH 15/23] removed block_idx from StreamDatum --- docs/user/explanations/data-model.rst | 2 -- event_model/__init__.py | 4 +--- event_model/documents/stream_datum.py | 8 -------- event_model/schemas/stream_datum.json | 6 ------ 4 files changed, 1 insertion(+), 19 deletions(-) diff --git a/docs/user/explanations/data-model.rst b/docs/user/explanations/data-model.rst index dc151b04..d5d8a957 100644 --- a/docs/user/explanations/data-model.rst +++ b/docs/user/explanations/data-model.rst @@ -526,7 +526,6 @@ Minimal nontrivial valid example: {'resource': '272132cf-564f-428f-bf6b-149ee4287024', # foreign key 'datum_kwargs': {}, # format-specific parameters 'datum_id': '272132cf-564f-428f-bf6b-149ee4287024/1', - 'block_idx': 0, 'event_count': 1 } @@ -538,7 +537,6 @@ Typical example: {'resource': '3b300e6f-b431-4750-a635-5630d15c81a8', 'datum_kwargs': {'index': 3}, 'datum_id': '3b300e6f-b431-4750-a635-5630d15c81a8/3', - 'block_idx': 0, 'event_count': 5, 'event_offset': 14} diff --git a/event_model/__init__.py b/event_model/__init__.py index b6626c53..896500ad 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -2010,11 +2010,9 @@ def __call__( validate: bool = True, ) -> StreamDatum: resource_uid = self.stream_resource["uid"] - block_idx = next(self.counter) doc = StreamDatum( stream_resource=resource_uid, - uid=f"{resource_uid}/{block_idx}", - block_idx=block_idx, + uid=f"{resource_uid}/{next(self.counter)}", data_keys=data_keys, seq_nums=seq_nums, indices=indices, diff --git a/event_model/documents/stream_datum.py b/event_model/documents/stream_datum.py index f247b858..58091c47 100644 --- a/event_model/documents/stream_datum.py +++ b/event_model/documents/stream_datum.py @@ -29,14 +29,6 @@ class StreamDatum(TypedDict): str, Field(description="UID of the EventDescriptor to " "which this Datum belongs"), ] - - block_idx: Annotated[ - int, - Field( - description="The order in the stream of this block of data. This must " - "be contiguous for a given stream.", - ), - ] stream_resource: Annotated[ str, Field( diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index 1c68df37..d2cf4909 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -26,11 +26,6 @@ } }, "properties": { - "block_idx": { - "title": "Block Idx", - "description": "The order in the stream of this block of data. This must be contiguous for a given stream.", - "type": "integer" - }, "data_keys": { "title": "Data Keys", "description": "A list to show which data_keys of the Descriptor are being streamed", @@ -74,7 +69,6 @@ } }, "required": [ - "block_idx", "data_keys", "descriptor", "indices", From 9ca7ca743dc80b8820dc6c72efbb1770b7add693 Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Mon, 11 Sep 2023 09:52:37 +0100 Subject: [PATCH 16/23] Made seq_nums optional --- event_model/__init__.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 896500ad..766ce657 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1996,6 +1996,11 @@ def compose_resource( ) +# A dict of Tuple[str, StreamRange] where the string is the StreamDatum uuid + +_stream_datum_seq_nums: Dict[str, StreamRange] = {} + + @dataclass class ComposeStreamDatum: stream_resource: StreamResource @@ -2004,12 +2009,18 @@ class ComposeStreamDatum: def __call__( self, data_keys: List[str], - seq_nums: StreamRange, indices: StreamRange, + seq_nums: Optional[StreamRange] = None, descriptor: Optional[EventDescriptor] = None, validate: bool = True, ) -> StreamDatum: resource_uid = self.stream_resource["uid"] + + # If the seq_nums aren't passed in then the bluesky + # bundler will keep track of them + if not seq_nums: + seq_nums = StreamRange(start=0, stop=0) + doc = StreamDatum( stream_resource=resource_uid, uid=f"{resource_uid}/{next(self.counter)}", From a7e80da4bd73ce49c20dc43012427b1f27d3dad7 Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Thu, 14 Sep 2023 11:16:45 +0100 Subject: [PATCH 17/23] Made data_keys a str data_key and moved it to stream resource --- event_model/__init__.py | 13 ++++--------- event_model/documents/stream_datum.py | 9 --------- event_model/documents/stream_resource.py | 7 +++++++ event_model/schemas/stream_datum.json | 9 --------- event_model/schemas/stream_resource.json | 6 ++++++ event_model/tests/test_em.py | 6 ++++-- event_model/tests/test_run_router.py | 3 ++- 7 files changed, 23 insertions(+), 30 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 766ce657..5ce07d17 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1996,11 +1996,6 @@ def compose_resource( ) -# A dict of Tuple[str, StreamRange] where the string is the StreamDatum uuid - -_stream_datum_seq_nums: Dict[str, StreamRange] = {} - - @dataclass class ComposeStreamDatum: stream_resource: StreamResource @@ -2008,7 +2003,6 @@ class ComposeStreamDatum: def __call__( self, - data_keys: List[str], indices: StreamRange, seq_nums: Optional[StreamRange] = None, descriptor: Optional[EventDescriptor] = None, @@ -2024,7 +2018,6 @@ def __call__( doc = StreamDatum( stream_resource=resource_uid, uid=f"{resource_uid}/{next(self.counter)}", - data_keys=data_keys, seq_nums=seq_nums, indices=indices, descriptor=descriptor["uid"] if descriptor else "", @@ -2040,7 +2033,6 @@ def compose_stream_datum( *, stream_resource: StreamResource, counter: Iterator, - data_keys: List[str], seq_nums: StreamRange, indices: StreamRange, validate: bool = True, @@ -2053,7 +2045,6 @@ def compose_stream_datum( DeprecationWarning, ) return ComposeStreamDatum(stream_resource, counter)( - data_keys, seq_nums, indices, validate=validate, @@ -2084,6 +2075,7 @@ def __call__( spec: str, root: str, resource_path: str, + data_key: str, resource_kwargs: Dict[str, Any], path_semantics: Literal["posix", "windows"] = default_path_semantics, uid: Optional[str] = None, @@ -2094,6 +2086,7 @@ def __call__( doc = StreamResource( uid=uid, + data_key=data_key, spec=spec, root=root, resource_path=resource_path, @@ -2119,6 +2112,7 @@ def compose_stream_resource( spec: str, root: str, resource_path: str, + data_key: str, resource_kwargs: Dict[str, Any], counters: List = [], path_semantics: Literal["posix", "windows"] = default_path_semantics, @@ -2133,6 +2127,7 @@ def compose_stream_resource( spec, root, resource_path, + data_key, resource_kwargs, path_semantics=path_semantics, uid=uid, diff --git a/event_model/documents/stream_datum.py b/event_model/documents/stream_datum.py index 58091c47..06f7f83a 100644 --- a/event_model/documents/stream_datum.py +++ b/event_model/documents/stream_datum.py @@ -1,5 +1,3 @@ -from typing import List - from typing_extensions import Annotated, TypedDict from .generate.type_wrapper import Field, add_extra_schema @@ -42,13 +40,6 @@ class StreamDatum(TypedDict): "handler so it can hand back data and timestamps." ), ] - data_keys: Annotated[ - List[str], - Field( - description="A list to show which data_keys of the " - "Descriptor are being streamed" - ), - ] seq_nums: Annotated[ StreamRange, Field( diff --git a/event_model/documents/stream_resource.py b/event_model/documents/stream_resource.py index cafcf710..2d456c35 100644 --- a/event_model/documents/stream_resource.py +++ b/event_model/documents/stream_resource.py @@ -20,6 +20,13 @@ class StreamResource(TypedDict): Field(description="Rules for joining paths"), ] ] + data_key: Annotated[ + str, + Field( + description="A string to show which data_key of the " + "Descriptor are being streamed" + ), + ] resource_kwargs: Annotated[ Dict[str, Any], Field( diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index d2cf4909..124bbd0b 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -26,14 +26,6 @@ } }, "properties": { - "data_keys": { - "title": "Data Keys", - "description": "A list to show which data_keys of the Descriptor are being streamed", - "type": "array", - "items": { - "type": "string" - } - }, "descriptor": { "title": "Descriptor", "description": "UID of the EventDescriptor to which this Datum belongs", @@ -69,7 +61,6 @@ } }, "required": [ - "data_keys", "descriptor", "indices", "seq_nums", diff --git a/event_model/schemas/stream_resource.json b/event_model/schemas/stream_resource.json index 2ab8719a..cb42b78f 100644 --- a/event_model/schemas/stream_resource.json +++ b/event_model/schemas/stream_resource.json @@ -3,6 +3,11 @@ "description": "Document to reference a collection (e.g. file or group of files) of externally-stored data streams", "type": "object", "properties": { + "data_key": { + "title": "Data Key", + "description": "A string to show which data_key of the Descriptor are being streamed", + "type": "string" + }, "path_semantics": { "title": "Path Semantics", "description": "Rules for joining paths", @@ -44,6 +49,7 @@ } }, "required": [ + "data_key", "resource_kwargs", "resource_path", "root", diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 059339ca..4f9714bb 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -113,13 +113,14 @@ def test_compose_stream_resource(tmp_path): bundle = compose_stream_resource( spec="TIFF_STREAM", root=str(tmp_path), + data_key="det1", resource_path="test_streams", resource_kwargs={}, ) resource_doc, compose_stream_datum = bundle assert bundle.stream_resource_doc is resource_doc assert bundle.compose_stream_data is compose_stream_datum - compose_stream_datum([], StreamRange(start=0, stop=0), StreamRange(start=0, stop=0)) + compose_stream_datum(StreamRange(start=0, stop=0), StreamRange(start=0, stop=0)) def test_round_trip_pagination(): @@ -392,13 +393,14 @@ def test_document_router_streams_smoke_test(tmp_path): dr("start", start) stream_resource_doc, compose_stream_datum = compose_stream_resource( spec="TIFF_STREAM", + data_key="det1", root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, ) dr("stream_resource", stream_resource_doc) datum_doc = compose_stream_datum( - [], StreamRange(start=0, stop=0), StreamRange(start=0, stop=0) + StreamRange(start=0, stop=0), StreamRange(start=0, stop=0) ) dr("stream_datum", datum_doc) dr("stop", run_bundle.compose_stop()) diff --git a/event_model/tests/test_run_router.py b/event_model/tests/test_run_router.py index 54df7335..93149d99 100644 --- a/event_model/tests/test_run_router.py +++ b/event_model/tests/test_run_router.py @@ -218,13 +218,14 @@ def test_run_router_streams(tmp_path): docs.append(("start", start_doc)) stream_resource_doc, compose_stream_datum = compose_stream_resource( spec="TIFF_STREAM", + data_key="det1", root=str(tmp_path), resource_path="test_streams", resource_kwargs={}, ) docs.append(("stream_resource", stream_resource_doc)) datum_doc = compose_stream_datum( - [], StreamRange(start=0, stop=0), StreamRange(start=0, stop=0) + StreamRange(start=0, stop=0), StreamRange(start=0, stop=0) ) docs.append(("stream_datum", datum_doc)) docs.append(("stop", stop_doc)) From 0432aeb4de13b5b6a2e577c5ac735e6c2bf3d4aa Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Thu, 28 Sep 2023 14:22:54 +0100 Subject: [PATCH 18/23] Supported empty event pages --- event_model/__init__.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 5ce07d17..5ca30e23 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -2193,18 +2193,15 @@ def compose_stop( )(exit_status=exit_status, reason=reason, uid=uid, time=time, validate=validate) -def dict_of_lists_has_equal_size_lists(dictionary: Dict[str, List]) -> bool: - """Return True if all lists are the same size in a Dict[str, List].""" - - dictionary_values = iter(dictionary.values()) - first_element_len = len(next(dictionary_values)) - next_element = next(dictionary_values, None) - - while next_element: - if len(next_element) != first_element_len: - return False - next_element = next(dictionary_values, None) - return True +def length_of_value(dictionary: Dict[str, List], error_msg: str) -> Optional[int]: + length = None + for k, v in dictionary.items(): + v_len = len(v) + if length is not None: + if v_len != length: + raise EventModelError(error_msg) + length = v_len + return length @dataclass @@ -2222,15 +2219,17 @@ def __call__( time: Optional[List] = None, validate: bool = True, ) -> EventPage: - assert dict_of_lists_has_equal_size_lists(timestamps), ( + timestamps_length = length_of_value( + timestamps, "Cannot compose event_page: event_page contains `timestamps` " "list values of different lengths" ) - assert dict_of_lists_has_equal_size_lists(data), ( + data_length = length_of_value( + data, "Cannot compose event_page: event_page contains `data` " "lists of different lengths" ) - assert len(next(iter(timestamps.values()))) == len(next(iter(data.values()))), ( + assert timestamps_length == data_length, ( "Cannot compose event_page: the lists in `timestamps` are of a different " "length to those in `data`" ) From 6d3d8f28c73e73b696878ddd1674af1229dcca1e Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Fri, 29 Sep 2023 08:56:33 +0100 Subject: [PATCH 19/23] Ran black (minor change) --- event_model/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 5ca30e23..23b30e1f 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -2222,12 +2222,12 @@ def __call__( timestamps_length = length_of_value( timestamps, "Cannot compose event_page: event_page contains `timestamps` " - "list values of different lengths" + "list values of different lengths", ) data_length = length_of_value( data, "Cannot compose event_page: event_page contains `data` " - "lists of different lengths" + "lists of different lengths", ) assert timestamps_length == data_length, ( "Cannot compose event_page: the lists in `timestamps` are of a different " From 8a2334416b3763ab2101c8801ce374887e1e347f Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Tue, 3 Oct 2023 16:06:34 +0100 Subject: [PATCH 20/23] Made stream_resource::run_start optional and fixed incorrect description --- event_model/documents/stream_datum.py | 4 ++-- event_model/documents/stream_resource.py | 14 ++++++++------ event_model/schemas/stream_datum.json | 2 +- event_model/schemas/stream_resource.json | 1 - 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/event_model/documents/stream_datum.py b/event_model/documents/stream_datum.py index 06f7f83a..a48de7ab 100644 --- a/event_model/documents/stream_datum.py +++ b/event_model/documents/stream_datum.py @@ -36,8 +36,8 @@ class StreamDatum(TypedDict): uid: Annotated[ str, Field( - description="A slice object passed to the StreamResource " - "handler so it can hand back data and timestamps." + description="Globally unique identifier for this Datum. A suggested " + "formatting being '//" ), ] seq_nums: Annotated[ diff --git a/event_model/documents/stream_resource.py b/event_model/documents/stream_resource.py index 2d456c35..3ab1877e 100644 --- a/event_model/documents/stream_resource.py +++ b/event_model/documents/stream_resource.py @@ -43,12 +43,14 @@ class StreamResource(TypedDict): description="Subset of resource_path that is a local detail, not semantic." ), ] - run_start: Annotated[ - str, - Field( - description="Globally unique ID to the run_start document " - "this Stream Resource is associated with.", - ), + run_start: NotRequired[ + Annotated[ + str, + Field( + description="Globally unique ID to the run_start document " + "this Stream Resource is associated with.", + ), + ] ] spec: Annotated[ str, diff --git a/event_model/schemas/stream_datum.json b/event_model/schemas/stream_datum.json index 124bbd0b..bd024c32 100644 --- a/event_model/schemas/stream_datum.json +++ b/event_model/schemas/stream_datum.json @@ -56,7 +56,7 @@ }, "uid": { "title": "Uid", - "description": "A slice object passed to the StreamResource handler so it can hand back data and timestamps.", + "description": "Globally unique identifier for this Datum. A suggested formatting being '//", "type": "string" } }, diff --git a/event_model/schemas/stream_resource.json b/event_model/schemas/stream_resource.json index cb42b78f..3e9a3fd7 100644 --- a/event_model/schemas/stream_resource.json +++ b/event_model/schemas/stream_resource.json @@ -53,7 +53,6 @@ "resource_kwargs", "resource_path", "root", - "run_start", "spec", "uid" ], From ba9fbcffcac205011359d000eff2649f987d5087 Mon Sep 17 00:00:00 2001 From: Tom Cobb Date: Tue, 10 Oct 2023 13:22:15 +0000 Subject: [PATCH 21/23] Fix typo --- event_model/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/event_model/__init__.py b/event_model/__init__.py index 23b30e1f..08a43fec 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -2054,14 +2054,14 @@ def compose_stream_datum( @dataclass class ComposeStreamResourceBundle: stream_resource_doc: StreamResource - compose_stream_data: ComposeStreamDatum + compose_stream_datum: ComposeStreamDatum # iter for backwards compatibility def __iter__(self) -> Iterator: return iter( ( self.stream_resource_doc, - self.compose_stream_data, + self.compose_stream_datum, ) ) From a389141c03694074e7aa8ce99348a9a43703c973 Mon Sep 17 00:00:00 2001 From: Tom Cobb Date: Tue, 10 Oct 2023 15:49:02 +0000 Subject: [PATCH 22/23] Missed from the last merge --- event_model/tests/test_em.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/event_model/tests/test_em.py b/event_model/tests/test_em.py index 4f9714bb..866810b7 100644 --- a/event_model/tests/test_em.py +++ b/event_model/tests/test_em.py @@ -119,7 +119,7 @@ def test_compose_stream_resource(tmp_path): ) resource_doc, compose_stream_datum = bundle assert bundle.stream_resource_doc is resource_doc - assert bundle.compose_stream_data is compose_stream_datum + assert bundle.compose_stream_datum is compose_stream_datum compose_stream_datum(StreamRange(start=0, stop=0), StreamRange(start=0, stop=0)) From e57e53bceb0c7e339fb1fad772626f3e5b76e8a8 Mon Sep 17 00:00:00 2001 From: Eva Lott Date: Wed, 11 Oct 2023 09:53:02 +0100 Subject: [PATCH 23/23] Changed doc examples, made run_start NotRequired in resource/stream_resource --- docs/user/explanations/data-model.rst | 47 ++++++--------------------- event_model/__init__.py | 9 +++-- event_model/documents/resource.py | 14 ++++---- event_model/schemas/resource.json | 1 - 4 files changed, 24 insertions(+), 47 deletions(-) diff --git a/docs/user/explanations/data-model.rst b/docs/user/explanations/data-model.rst index d5d8a957..3687d688 100644 --- a/docs/user/explanations/data-model.rst +++ b/docs/user/explanations/data-model.rst @@ -477,31 +477,17 @@ See :doc:`external` for details on the role Stream Resource documents play in referencing external assets that are natively ragged, such as single-photon detectors, or assets where there are many relatively small data sets (e.g. scanned fluorescence data). -Minimal nontrivial valid example: - -.. code-block:: python - - # 'Stream Resource' document - {'path_semantics': 'posix', - 'resource_kwargs': {}, - 'resource_path': '/local/path/subdirectory/data_file', - 'root': '/local/path/', - 'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd', - 'spec': 'SOME_SPEC', - 'stream_names': ['point_det'], - 'uid': '272132cf-564f-428f-bf6b-149ee4287024'} - Typical example: .. code-block:: python - # resource - {'spec': 'AD_HDF5', + # 'Stream Resource' document + {'data_key': 'detector_1', + 'spec': 'AD_HDF5', 'root': '/GPFS/DATA/Andor/', 'resource_path': '2020/01/03/8ff08ff9-a2bf-48c3-8ff3-dcac0f309d7d.h5', 'resource_kwargs': {'frame_per_point': 1}, 'path_semantics': 'posix', - 'stream_names': ['point_det'], 'uid': '3b300e6f-b431-4750-a635-5630d15c81a8', 'run_start': '10bf6945-4afd-43ca-af36-6ad8f3540bcd'} @@ -518,30 +504,17 @@ See :doc:`external` for details on the role Stream Datum documents play in refer external assets that are natively ragged, such as single-photon detectors, or assets where there are many relatively small data sets (e.g. scanned fluorescence data). -Minimal nontrivial valid example: - -.. code-block:: python - - # 'datum' document - {'resource': '272132cf-564f-428f-bf6b-149ee4287024', # foreign key - 'datum_kwargs': {}, # format-specific parameters - 'datum_id': '272132cf-564f-428f-bf6b-149ee4287024/1', - 'event_count': 1 - } - Typical example: .. code-block:: python - # datum - {'resource': '3b300e6f-b431-4750-a635-5630d15c81a8', - 'datum_kwargs': {'index': 3}, - 'datum_id': '3b300e6f-b431-4750-a635-5630d15c81a8/3', - 'event_count': 5, - 'event_offset': 14} - -It is an implementation detail that ``datum_id`` is often formatted as -``{resource}/{counter}`` but this should not be considered part of the schema. + # 'Stream Datum' document + {'uid': '86340942-9865-47f9-9a8d-bdaaab1bfce2', + 'descriptor': '8c70b8c2-df32-40e3-9f50-29cda8142fa0', + 'stream_resource': '272132cf-564f-428f-bf6b-149ee4287024', # foreign key + 'indices': {'start': 0, 'stop': 1}, + 'seq_nums': {'start': 1, 'stop': 2}, + } Formal schema: diff --git a/event_model/__init__.py b/event_model/__init__.py index 08a43fec..3fec64a5 100644 --- a/event_model/__init__.py +++ b/event_model/__init__.py @@ -1957,9 +1957,11 @@ def __call__( root=root, resource_kwargs=resource_kwargs, resource_path=resource_path, - run_start=self.start["uid"] if self.start else "", ) + if self.start: + doc["run_start"] = self.start["uid"] + if validate: schema_validators[DocumentNames.resource].validate(doc) @@ -2092,9 +2094,11 @@ def __call__( resource_path=resource_path, resource_kwargs=resource_kwargs, path_semantics=path_semantics, - run_start=self.start["uid"] if self.start else "", ) + if self.start: + doc["run_start"] = self.start["uid"] + if validate: schema_validators[DocumentNames.stream_resource].validate(doc) @@ -2114,7 +2118,6 @@ def compose_stream_resource( resource_path: str, data_key: str, resource_kwargs: Dict[str, Any], - counters: List = [], path_semantics: Literal["posix", "windows"] = default_path_semantics, start: Optional[RunStart] = None, uid: Optional[str] = None, diff --git a/event_model/documents/resource.py b/event_model/documents/resource.py index ed272362..882ceb69 100644 --- a/event_model/documents/resource.py +++ b/event_model/documents/resource.py @@ -49,10 +49,12 @@ class Resource(PartialResource): Field(description="Rules for joining paths"), ] ] - run_start: Annotated[ - str, - Field( - description="Globally unique ID to the run_start document this " - "resource is associated with.", - ), + run_start: NotRequired[ + Annotated[ + str, + Field( + description="Globally unique ID to the run_start document this " + "resource is associated with.", + ), + ] ] diff --git a/event_model/schemas/resource.json b/event_model/schemas/resource.json index 68ef0bc8..5421b607 100644 --- a/event_model/schemas/resource.json +++ b/event_model/schemas/resource.json @@ -47,7 +47,6 @@ "resource_kwargs", "resource_path", "root", - "run_start", "spec", "uid" ],