Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rethinking external assets 2 #272

Merged
merged 24 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f188425
Implemented external asset structure
evalott100 Jun 15, 2023
463743d
Added Partial typeddicts for use with bluesky
evalott100 Jun 15, 2023
cd601d7
Added Partial typeddicts for use with bluesky
evalott100 Jun 15, 2023
2910dfe
Changed the event counters to increment by seq_num, not data. Changed…
evalott100 Jun 20, 2023
e5ec76c
Changed ComposeStreamResource to only have one ComposeDatum. Changed …
evalott100 Jul 3, 2023
78d95b4
Specifying pydantic<2 in pyproject.toml, also changed stream_resource…
evalott100 Jul 3, 2023
19e6744
Removed content from stream_datum, added a Range Typeddict
evalott100 Jul 3, 2023
6e8a865
Allowed external="STREAM:" keys to not be represented in data in Comp…
evalott100 Jul 4, 2023
959bc59
Fixed typo
evalott100 Sep 14, 2023
79daf00
Fixed imports to avoid pydantic if the version is >=2
evalott100 Jul 4, 2023
1ed861d
We changed != to == when checking datakeys
evalott100 Jul 12, 2023
e1e8065
Made corrections to changes, still need to add descriptor reference t…
evalott100 Jul 13, 2023
60848ee
Added a reference to event-descriptor uid in StreamDatum
evalott100 Jul 20, 2023
7f53948
Made run_start and descriptor required in Resource and StreamDatum re…
evalott100 Jul 20, 2023
f579225
removed block_idx from StreamDatum
evalott100 Aug 14, 2023
9ca7ca7
Made seq_nums optional
evalott100 Sep 11, 2023
a7e80da
Made data_keys a str data_key and moved it to stream resource
evalott100 Sep 14, 2023
0432aeb
Supported empty event pages
evalott100 Sep 28, 2023
6d3d8f2
Ran black (minor change)
evalott100 Sep 29, 2023
8a23344
Made stream_resource::run_start optional and fixed incorrect description
evalott100 Oct 3, 2023
ba9fbcf
Fix typo
coretl Oct 10, 2023
bd90c47
Merge remote-tracking branch 'eva/typo_correction' into rethinking_ex…
coretl Oct 10, 2023
a389141
Missed from the last merge
coretl Oct 10, 2023
e57e53b
Changed doc examples, made run_start NotRequired in resource/stream_r…
evalott100 Oct 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/user/explanations/data-model.rst
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,6 @@ Minimal nontrivial valid example:
{'resource': '272132cf-564f-428f-bf6b-149ee4287024', # foreign key
'datum_kwargs': {}, # format-specific parameters
'datum_id': '272132cf-564f-428f-bf6b-149ee4287024/1',
'block_idx': 0,
'event_count': 1
}

Expand All @@ -538,7 +537,6 @@ Typical example:
{'resource': '3b300e6f-b431-4750-a635-5630d15c81a8',
'datum_kwargs': {'index': 3},
'datum_id': '3b300e6f-b431-4750-a635-5630d15c81a8/3',
'block_idx': 0,
'event_count': 5,
'event_offset': 14}

Expand Down
176 changes: 111 additions & 65 deletions event_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from .documents.resource import Resource
from .documents.run_start import RunStart
from .documents.run_stop import RunStop
from .documents.stream_datum import StreamDatum
from .documents.stream_datum import StreamDatum, StreamRange
from .documents.stream_resource import StreamResource

if sys.version_info < (3, 9):
Expand Down Expand Up @@ -1957,13 +1957,12 @@ def __call__(
root=root,
resource_kwargs=resource_kwargs,
resource_path=resource_path,
run_start=self.start["uid"] if self.start else "",
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
)

if validate:
schema_validators[DocumentNames.resource].validate(doc)

if self.start:
doc["run_start"] = self.start["uid"]

counter = itertools.count()
return ComposeResourceBundle(
doc,
Expand Down Expand Up @@ -2000,32 +1999,30 @@ def compose_resource(
@dataclass
class ComposeStreamDatum:
stream_resource: StreamResource
stream_name: str
counter: Iterator

def __call__(
self,
datum_kwargs: Dict[str, Any],
event_count: int = 1,
event_offset: int = 0,
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
indices: StreamRange,
seq_nums: Optional[StreamRange] = None,
descriptor: Optional[EventDescriptor] = None,
validate: bool = True,
) -> StreamDatum:
resource_uid = self.stream_resource["uid"]
if self.stream_name not in self.stream_resource["stream_names"]:
raise EventModelKeyError(
"Attempt to create stream_datum with name not included"
"in stream_resource"
)
block_idx = next(self.counter)

# If the seq_nums aren't passed in then the bluesky
# bundler will keep track of them
if not seq_nums:
seq_nums = StreamRange(start=0, stop=0)

doc = StreamDatum(
stream_resource=resource_uid,
datum_kwargs=datum_kwargs,
uid=f"{resource_uid}/{self.stream_name}/{block_idx}",
stream_name=self.stream_name,
block_idx=block_idx,
event_count=event_count,
event_offset=event_offset,
uid=f"{resource_uid}/{next(self.counter)}",
seq_nums=seq_nums,
indices=indices,
descriptor=descriptor["uid"] if descriptor else "",
)

if validate:
schema_validators[DocumentNames.stream_datum].validate(doc)

Expand All @@ -2035,35 +2032,36 @@ def __call__(
def compose_stream_datum(
*,
stream_resource: StreamResource,
stream_name: str,
counter: Iterator,
datum_kwargs: Dict[str, Any],
event_count: int = 1,
event_offset: int = 0,
seq_nums: StreamRange,
indices: StreamRange,
validate: bool = True,
) -> StreamDatum:
"""
Here for backwards compatibility, the Compose class is prefered.
"""
return ComposeStreamDatum(stream_resource, stream_name, counter)(
evalott100 marked this conversation as resolved.
Show resolved Hide resolved
datum_kwargs,
event_count=event_count,
event_offset=event_offset,
warnings.warn(
"compose_stream_datum() will be removed in the minor version.",
DeprecationWarning,
)
return ComposeStreamDatum(stream_resource, counter)(
seq_nums,
indices,
validate=validate,
)


@dataclass
class ComposeStreamResourceBundle:
stream_resource_doc: StreamResource
compose_stream_data: List[ComposeStreamDatum]
compose_stream_datum: ComposeStreamDatum

# iter for backwards compatibility
def __iter__(self) -> Iterator:
return iter(
(
self.stream_resource_doc,
self.compose_stream_data,
self.compose_stream_datum,
)
)

Expand All @@ -2077,52 +2075,35 @@ def __call__(
spec: str,
root: str,
resource_path: str,
data_key: str,
resource_kwargs: Dict[str, Any],
stream_names: Union[List, str],
counters: List = [],
path_semantics: Literal["posix", "windows"] = default_path_semantics,
uid: Optional[str] = None,
validate: bool = True,
) -> ComposeStreamResourceBundle:
if uid is None:
uid = str(uuid.uuid4())
if isinstance(stream_names, str):
stream_names = [
stream_names,
]
if len(counters) == 0:
counters = [itertools.count() for _ in stream_names]
elif len(counters) > len(stream_names):
raise ValueError(
"Insufficient number of counters "
f"{len(counters)} for stream names: {stream_names}"
)

doc = StreamResource(
uid=uid,
data_key=data_key,
spec=spec,
root=root,
resource_path=resource_path,
resource_kwargs=resource_kwargs,
stream_names=stream_names,
path_semantics=path_semantics,
run_start=self.start["uid"] if self.start else "",
)
if self.start:
doc["run_start"] = self.start["uid"]
evalott100 marked this conversation as resolved.
Show resolved Hide resolved

if validate:
schema_validators[DocumentNames.stream_resource].validate(doc)

return ComposeStreamResourceBundle(
doc,
[
ComposeStreamDatum(
stream_resource=doc,
stream_name=stream_name,
counter=counter,
)
for stream_name, counter in zip(stream_names, counters)
],
ComposeStreamDatum(
stream_resource=doc,
counter=itertools.count(),
),
)


Expand All @@ -2131,8 +2112,8 @@ def compose_stream_resource(
spec: str,
root: str,
resource_path: str,
data_key: str,
resource_kwargs: Dict[str, Any],
stream_names: Union[List, str],
counters: List = [],
path_semantics: Literal["posix", "windows"] = default_path_semantics,
start: Optional[RunStart] = None,
Expand All @@ -2146,9 +2127,8 @@ def compose_stream_resource(
spec,
root,
resource_path,
data_key,
resource_kwargs,
stream_names,
counters=counters,
path_semantics=path_semantics,
uid=uid,
validate=validate,
Expand Down Expand Up @@ -2213,6 +2193,17 @@ def compose_stop(
)(exit_status=exit_status, reason=reason, uid=uid, time=time, validate=validate)


def length_of_value(dictionary: Dict[str, List], error_msg: str) -> Optional[int]:
length = None
for k, v in dictionary.items():
v_len = len(v)
if length is not None:
if v_len != length:
raise EventModelError(error_msg)
length = v_len
return length


@dataclass
class ComposeEventPage:
descriptor: EventDescriptor
Expand All @@ -2222,12 +2213,32 @@ def __call__(
self,
data: Dict[str, List],
timestamps: Dict[str, Any],
seq_num: List[int],
seq_num: Optional[List[int]] = None,
filled: Optional[Dict[str, List[Union[bool, str]]]] = None,
uid: Optional[List] = None,
time: Optional[List] = None,
validate: bool = True,
) -> EventPage:
timestamps_length = length_of_value(
timestamps,
"Cannot compose event_page: event_page contains `timestamps` "
"list values of different lengths",
)
data_length = length_of_value(
data,
"Cannot compose event_page: event_page contains `data` "
"lists of different lengths",
)
assert timestamps_length == data_length, (
"Cannot compose event_page: the lists in `timestamps` are of a different "
"length to those in `data`"
)

if seq_num is None:
last_seq_num = self.event_counters[self.descriptor["name"]]
seq_num = list(
range(last_seq_num, len(next(iter(data.values()))) + last_seq_num)
)
N = len(seq_num)
if uid is None:
uid = [str(uuid.uuid4()) for _ in range(N)]
Expand All @@ -2246,11 +2257,20 @@ def __call__(
)
if validate:
schema_validators[DocumentNames.event_page].validate(doc)

if not (
self.descriptor["data_keys"].keys() == data.keys() == timestamps.keys()
set(
keys_without_stream_keys(
self.descriptor["data_keys"], self.descriptor["data_keys"]
)
)
== set(keys_without_stream_keys(data, self.descriptor["data_keys"]))
== set(
keys_without_stream_keys(timestamps, self.descriptor["data_keys"])
)
):
raise EventModelValidationError(
"These sets of keys must match:\n"
'These sets of keys must match (other than "STREAM:" keys):\n'
"event['data'].keys(): {}\n"
"event['timestamps'].keys(): {}\n"
"descriptor['data_keys'].keys(): {}\n".format(
Expand All @@ -2264,7 +2284,7 @@ def __call__(
"Keys in event['filled'] {} must be a subset of those in "
"event['data'] {}".format(filled.keys(), data.keys())
)
self.event_counters[self.descriptor["name"]] += len(data)
self.event_counters[self.descriptor["name"]] += len(seq_num)
return doc


Expand All @@ -2284,10 +2304,27 @@ def compose_event_page(
Here for backwards compatibility, the Compose class is prefered.
"""
return ComposeEventPage(descriptor, event_counters)(
data, timestamps, seq_num, filled, uid=uid, time=time, validate=validate
data,
timestamps,
seq_num=seq_num,
filled=filled,
uid=uid,
time=time,
validate=validate,
)


def keys_without_stream_keys(dictionary, descriptor_data_keys):
return [
key
for key in dictionary.keys()
if (
"external" not in descriptor_data_keys[key]
or descriptor_data_keys[key]["external"] != "STREAM:"
)
]


@dataclass
class ComposeEvent:
descriptor: EventDescriptor
Expand Down Expand Up @@ -2322,11 +2359,20 @@ def __call__(
)
if validate:
schema_validators[DocumentNames.event].validate(doc)

if not (
self.descriptor["data_keys"].keys() == data.keys() == timestamps.keys()
set(
keys_without_stream_keys(
self.descriptor["data_keys"], self.descriptor["data_keys"]
)
)
== set(keys_without_stream_keys(data, self.descriptor["data_keys"]))
== set(
keys_without_stream_keys(timestamps, self.descriptor["data_keys"])
)
):
raise EventModelValidationError(
"These sets of keys must match:\n"
'These sets of keys must match (other than "STREAM:" keys):\n'
"event['data'].keys(): {}\n"
"event['timestamps'].keys(): {}\n"
"descriptor['data_keys'].keys(): {}\n".format(
Expand All @@ -2340,7 +2386,7 @@ def __call__(
"Keys in event['filled'] {} must be a subset of those in "
"event['data'] {}".format(filled.keys(), data.keys())
)
self.event_counters[self.descriptor["name"]] += 1
self.event_counters[self.descriptor["name"]] = seq_num + 1
return doc


Expand Down
1 change: 0 additions & 1 deletion event_model/documents/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class Event(PartialEvent):
descriptor: Annotated[
str, Field(description="UID of the EventDescriptor to which this Event belongs")
]

seq_num: Annotated[
int,
Field(
Expand Down
4 changes: 3 additions & 1 deletion event_model/documents/event_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from .generate.type_wrapper import Field, add_extra_schema

Dtype = Literal["string", "number", "array", "boolean", "integer"]


class DataKey(TypedDict):
"""Describes the objects in the data property of Event documents"""
Expand All @@ -18,7 +20,7 @@ class DataKey(TypedDict):
]
]
dtype: Annotated[
Literal["string", "number", "array", "boolean", "integer"],
Dtype,
Field(description="The type of the data in the event."),
]
external: NotRequired[
Expand Down
3 changes: 3 additions & 0 deletions event_model/documents/generate/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from typeddict_to_schema import generate_all_schema

generate_all_schema()
Loading
Loading