Skip to content

Commit

Permalink
Share common code between event store and todo store (#284)
Browse files Browse the repository at this point in the history
* Share common code between event store and todo store

* Improve test coverage for timezones
  • Loading branch information
allenporter authored Feb 3, 2024
1 parent d7f1ca1 commit 83943db
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 134 deletions.
243 changes: 109 additions & 134 deletions ical/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@

import datetime
import logging
from collections.abc import Callable
from typing import Any
from collections.abc import Callable, Iterable
from typing import Any, TypeVar

from .calendar import Calendar
from .event import Event
Expand All @@ -36,6 +36,100 @@
"StoreError",
]

_T = TypeVar("_T", bound="Event | Todo")
_ItemType = Event | Todo


def _lookup_by_uid(uid: str, items: Iterable[_T]) -> tuple[int | None, _T | None]:
"""Find the specified item by id."""
for i, item in enumerate(items):
if item.uid == uid:
return i, item
return None, None


def _ensure_timezone(dtstart: datetime.datetime | datetime.date | None, timezones: list[Timezone]) -> Timezone | None:
"""Create a timezone object for the specified date if it does not already exist."""
if (
not isinstance(dtstart, datetime.datetime)
or not dtstart.utcoffset()
or not dtstart.tzinfo
):
return None

# Verify this timezone does not already exist. The number of timezones
# in a calendar is typically very small so iterate over the whole thing
# to avoid any synchronization/cache issues.
key = str(dtstart.tzinfo)
for timezone in timezones:
if timezone.tz_id == key:
return None

try:
return Timezone.from_tzif(key)
except TimezoneInfoError as err:
raise EventStoreError(
"No timezone information available for event: {key}"
) from err


def _ensure_calendar_timezone(dtstart: datetime.datetime | datetime.date | None, calendar: Calendar) -> None:
"""Ensure the calendar has the necessary timezone for the specified item."""
if (
new_timezone := _ensure_timezone(dtstart, calendar.timezones)
) is not None:
calendar.timezones.append(new_timezone)


def _prepare_update(
store_item: Event | Todo,
item: Event | Todo,
recurrence_id: str | None = None,
recurrence_range: Range = Range.NONE,
) -> dict[str, Any]:
"""Prepare an update to an existing event."""
partial_update = item.dict(exclude_unset=True)
_LOGGER.debug("Preparing update update=%s", item)
update = {
"created": store_item.dtstamp,
"sequence": (store_item.sequence + 1) if store_item.sequence else 1,
"last_modified": item.dtstamp,
**partial_update,
"dtstamp": item.dtstamp,
}
if "rrule" in update:
update["rrule"] = Recur.parse_obj(update["rrule"])
if recurrence_id:
if not store_item.rrule:
raise EventStoreError("Specified recurrence_id but event is not recurring")
# Forking a new event off the old event
update["uid"] = item.uid
if recurrence_range == Range.NONE:
# The new event copied from the original is a single instance,
# not recurrin
update["rrule"] = None
else:
# Overwriting with a new recurring event
update.update(
{
"sequence": 0,
"created": item.dtstamp,
}
)

# Adjust start and end time of the event
dtstart: datetime.datetime | datetime.date = RecurrenceId.to_value(
recurrence_id
)
if item.dtstart:
dtstart = item.dtstart
update["dtstart"] = dtstart
# Event either has a duration (which should already be set) or has
# an explicit end which needs to be realigned to new start time.
if isinstance(store_item, Event) and store_item.dtend:
update["dtend"] = dtstart + store_item.computed_duration
return update


class EventStore:
"""An event store manages the lifecycle of events on a Calendar.
Expand Down Expand Up @@ -102,13 +196,6 @@ def __init__(
self._calendar = calendar
self._dtstamp_fn = dtstamp_fn

def _lookup_event(self, uid: str) -> Event | None:
"""Find the specified event by id."""
for event in self._calendar.events:
if event.uid == uid:
return event
return None

def add(self, event: Event) -> Event:
"""Add the specified event to the calendar.
Expand All @@ -134,7 +221,7 @@ def add(self, event: Event) -> Event:
)

_LOGGER.debug("Adding event: %s", new_event)
self._ensure_timezone(event)
_ensure_calendar_timezone(event.dtstart, self._calendar)
self._calendar.events.append(new_event)
return new_event

Expand All @@ -156,7 +243,8 @@ def delete(
When deleting individual instances, the range property may specify
if deletion of just a specific instance, or a range of instances.
"""
if not (store_event := self._lookup_event(uid)):
_, store_event = _lookup_by_uid(uid, self._calendar.events)
if store_event is None:
raise EventStoreError(f"No existing event with uid: {uid}")

if (
Expand Down Expand Up @@ -233,7 +321,8 @@ def edit(
`ical.timezone.Timezone` needed to fully specify the event time information
when encoded.
"""
if not (store_event := self._lookup_event(uid)):
_, store_event = _lookup_by_uid(uid, self._calendar.events)
if store_event is None:
raise EventStoreError(f"No existing event with uid: {uid}")

if (
Expand All @@ -245,9 +334,7 @@ def edit(
# entire series so don't bother forking a new event
recurrence_id = None

update = self._prepare_update(
store_event, event, recurrence_id, recurrence_range
)
update = _prepare_update(store_event, event, recurrence_id, recurrence_range)
if recurrence_range == Range.NONE:
# Changing the recurrence rule of a single event in the middle of the series
# is not allowed. It is allowed to convert a single instance event to recurring.
Expand Down Expand Up @@ -283,7 +370,7 @@ def edit(
f"Unsupported relationship type {relation.reltype}"
)

self._ensure_timezone(event)
_ensure_calendar_timezone(event.dtstart, self._calendar)

# Editing a single instance of a recurring event is like deleting that instance
# then adding a new instance on the specified date. If recurrence id is not
Expand All @@ -294,84 +381,6 @@ def edit(
else:
self._calendar.events.append(new_event)

def _prepare_update(
self,
store_event: Event,
event: Event,
recurrence_id: str | None = None,
recurrence_range: Range = Range.NONE,
) -> dict[str, Any]:
"""Prepare an update to an existing event."""
partial_update = event.dict(exclude_unset=True)
_LOGGER.debug("EV update=%s", event)
update = {
"created": store_event.dtstamp,
"sequence": (store_event.sequence + 1) if store_event.sequence else 1,
"last_modified": event.dtstamp,
**partial_update,
"dtstamp": event.dtstamp,
}
if "rrule" in update:
update["rrule"] = Recur.parse_obj(update["rrule"])
if recurrence_id:
if not store_event.rrule:
raise EventStoreError(
"Specified recurrence_id but event is not recurring"
)
# Forking a new event off the old event
update["uid"] = event.uid
if recurrence_range == Range.NONE:
# The new event copied from the original is a single instance,
# not recurrin
update["rrule"] = None
else:
# Overwriting with a new recurring event
update.update(
{
"sequence": 0,
"created": event.dtstamp,
}
)

# Adjust start and end time of the event
dtstart: datetime.datetime | datetime.date = RecurrenceId.to_value(
recurrence_id
)
if event.dtstart:
dtstart = event.dtstart
update["dtstart"] = dtstart
# Event either has a duration (which should already be set) or has
# an explicit end which needs to be realigned to new start time.
if store_event.dtend:
update["dtend"] = dtstart + store_event.computed_duration
return update

def _ensure_timezone(self, event: Event) -> None:
"""Create a timezone object for the specified date if it does not already exist."""
if (
not isinstance(event.dtstart, datetime.datetime)
or not event.dtstart.utcoffset()
or not event.dtstart.tzinfo
):
return

# Verify this timezone does not already exist. The number of timezones
# in a calendar is typically very small so iterate over the whole thing
# to avoid any synchronization/cache issues.
key = str(event.dtstart.tzinfo)
for timezone in self._calendar.timezones:
if timezone.tz_id == key:
return

new_timezone: Timezone
try:
new_timezone = Timezone.from_tzif(key)
except TimezoneInfoError as err:
raise EventStoreError(
"No timezone information available for event: {key}"
) from err
self._calendar.timezones.append(new_timezone)


class TodoStore:
"""A To-do store manages the lifecycle of to-dos on a Calendar."""
Expand All @@ -385,13 +394,6 @@ def __init__(
self._calendar = calendar
self._dtstamp_fn = dtstamp_fn

def _lookup_todo(self, uid: str) -> tuple[int | None, Todo | None]:
"""Find the specified todo by id returning the index."""
for i, todo in enumerate(self._calendar.todos):
if todo.uid == uid:
return i, todo
return None, None

def add(self, todo: Todo) -> Todo:
"""Add the specified todo to the calendar."""
update: dict[str, Any] = {}
Expand All @@ -409,7 +411,7 @@ def add(self, todo: Todo) -> Todo:
)

_LOGGER.debug("Adding todo: %s", new_todo)
self._ensure_timezone(todo)
_ensure_calendar_timezone(todo.dtstart, self._calendar)
self._calendar.todos.append(new_todo)
return new_todo

Expand All @@ -418,8 +420,8 @@ def delete(
uid: str,
) -> None:
"""Delete the todo from the calendar."""
store_index, store_todo = self._lookup_todo(uid)
if not store_todo:
store_index, store_todo = _lookup_by_uid(uid, self._calendar.todos)
if store_todo is None:
raise TodoStoreError(f"No existing todo with uid: {uid}")
removals = [store_todo]

Expand All @@ -437,8 +439,8 @@ def edit(
todo: Todo,
) -> None:
"""Update the todo with the specified uid."""
store_index, store_todo = self._lookup_todo(uid)
if not store_todo or store_index is None:
store_index, store_todo = _lookup_by_uid(uid, self._calendar.todos)
if store_todo is None or store_index is None:
raise TodoStoreError(f"No existing todo with uid: {uid}")

partial_update = todo.dict(exclude_unset=True)
Expand All @@ -459,33 +461,6 @@ def edit(
f"Unsupported relationship type {relation.reltype}"
)

self._ensure_timezone(todo)

_ensure_calendar_timezone(todo.dtstart, self._calendar)
self._calendar.todos.pop(store_index)
self._calendar.todos.insert(store_index, new_todo)

def _ensure_timezone(self, todo: Todo) -> None:
"""Create a timezone object for the specified date if it does not already exist."""
if (
not isinstance(todo.due, datetime.datetime)
or not todo.due.utcoffset()
or not todo.due.tzinfo
):
return

# Verify this timezone does not already exist. The number of timezones
# in a calendar is typically very small so iterate over the whole thing
# to avoid any synchronization/cache issues.
key = str(todo.due.tzinfo)
for timezone in self._calendar.timezones:
if timezone.tz_id == key:
return

new_timezone: Timezone
try:
new_timezone = Timezone.from_tzif(key)
except TimezoneInfoError as err:
raise TodoStoreError(
"No timezone information available for todo: {key}"
) from err
self._calendar.timezones.append(new_timezone)
Loading

0 comments on commit 83943db

Please sign in to comment.