Skip to content

Commit

Permalink
Preserve uid when editing recurring events (#286)
Browse files Browse the repository at this point in the history
* Preserve uid when editing recurring events

- Move todo list interface to TodoStore
- Combine recurrence adapters between todo list and timeline
- Simplify typing for sortable timespan items
- Move merge_and_expand_items to recur adapter
- Split edit and delete into separate functions
- Apply edits on all matching items

* Update store tests to assert on ICS contents

* Add test coverage for THIS_AND_FUTURE todo edits

* Remove _lookup_by_uid

* Share code for merging recurring events
  • Loading branch information
allenporter authored Feb 5, 2024
1 parent e1e51af commit dee0625
Show file tree
Hide file tree
Showing 10 changed files with 827 additions and 222 deletions.
9 changes: 0 additions & 9 deletions ical/calendar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

from collections.abc import Iterable
import datetime
import itertools
import logging
Expand All @@ -22,7 +21,6 @@
from .parsing.property import ParsedProperty
from .timeline import Timeline, calendar_timeline
from .timezone import Timezone, TimezoneModel, IcsTimezoneInfo
from .list import todo_list_view
from .todo import Todo
from .util import local_timezone, prodid_factory

Expand Down Expand Up @@ -82,13 +80,6 @@ def timeline_tz(self, tzinfo: datetime.tzinfo | None = None) -> Timeline:
"""
return calendar_timeline(self.events, tzinfo=tzinfo or local_timezone())

def todo_list(self, tzinfo: datetime.tzinfo | None = None) -> Iterable[Todo]:
"""Return a list of all todos on the calendar.
This view accounts for recurring todos.
"""
return todo_list_view(self.todos, tzinfo=tzinfo or local_timezone())

@root_validator(pre=True)
def _propagate_timezones(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Propagate timezone information down to date-time objects.
Expand Down
15 changes: 11 additions & 4 deletions ical/iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ def __lt__(self, other: Any) -> bool:
return cast(bool, self._key < other.key)


SpanOrderedItem = SortableItem[Timespan, T]
"""A sortable item with a timespan as the key."""


class SortableItemValue(SortableItem[K, T]):
"""Concrete value implementation of SortableItem."""

Expand Down Expand Up @@ -151,7 +155,10 @@ class RulesetIterable(Iterable[Union[datetime.datetime, datetime.date]]):
debug. This wrapper is meant to assist with that.
"""

_converter: Callable[[Iterable[Union[datetime.date, datetime.datetime]]], Iterable[Union[datetime.date, datetime.datetime]]]
_converter: Callable[
[Iterable[Union[datetime.date, datetime.datetime]]],
Iterable[Union[datetime.date, datetime.datetime]],
]

def __init__(
self,
Expand Down Expand Up @@ -307,7 +314,7 @@ def __iter__(self) -> Iterator[SortableItem[K, T]]:
class SortableItemTimeline(Iterable[T]):
"""A set of components on a calendar."""

def __init__(self, iterable: Iterable[SortableItem[Timespan, T]]) -> None:
def __init__(self, iterable: Iterable[SpanOrderedItem[T]]) -> None:
self._iterable = iterable

def __iter__(self) -> Iterator[T]:
Expand Down Expand Up @@ -391,6 +398,6 @@ def today(self) -> Iterator[T]:
"""Return an iterator containing all events active on the specified day."""
return self.on_date(datetime.date.today())

def now(self) -> Iterator[T]:
def now(self, tz: datetime.tzinfo | None = None) -> Iterator[T]:
"""Return an iterator containing all events active on the specified day."""
return self.at_instant(datetime.datetime.now())
return self.at_instant(datetime.datetime.now(tz=tz))
93 changes: 19 additions & 74 deletions ical/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,73 +7,20 @@
"""

import datetime
from collections.abc import Generator, Iterable
from collections.abc import Generator
import logging

from .todo import Todo
from .iter import (
LazySortableItem,
MergedIterable,
RecurIterable,
SortableItem,
SortableItemValue,
)
from .types.recur import RecurrenceId
from .recur_adapter import items_by_uid, merge_and_expand_items
from .util import local_timezone

# Not part of the public API
__all__: list[str] = []

_LOGGER = logging.getLogger(__name__)
_SortableTodoItem = SortableItem[datetime.datetime | datetime.date | None, Todo]


class RecurAdapter:
"""An adapter that expands an Todo instance for a recurrence rule.
This adapter is given an todo, then invoked with a specific date/time instance
that the todo is due from a recurrence rule. The todo is copied with
necessary updated fields to act as a flattened instance of the todo item.
"""

def __init__(self, todo: Todo, tzinfo: datetime.tzinfo | None = None):
"""Initialize the RecurAdapter."""
self._todo = todo
self._duration = todo.computed_duration
self._tzinfo = tzinfo

def get(self, dtstart: datetime.datetime | datetime.date) -> _SortableTodoItem:
"""Return a lazy sortable item."""

recur_id_dt = dtstart
# Make recurrence_id floating time to avoid dealing with serializing
# TZID. This value will still be unique within the series and is in
# the context of dtstart which may have a timezone.
if isinstance(recur_id_dt, datetime.datetime) and recur_id_dt.tzinfo:
recur_id_dt = recur_id_dt.replace(tzinfo=None)
recurrence_id = RecurrenceId.__parse_property_value__(recur_id_dt)

def build() -> Todo:
updates = {
"dtstart": dtstart,
"recurrence_id": recurrence_id,
}
if self._todo.due and self._duration:
updates["due"] = dtstart + self._duration
return self._todo.copy(update=updates)

return LazySortableItem(dtstart, build)


def _todos_by_uid(todos: list[Todo]) -> dict[str, list[Todo]]:
todos_by_uid: dict[str, list[Todo]] = {}
for todo in todos:
if todo.uid is None:
raise ValueError("Todo must have a UID")
if todo.uid not in todos_by_uid:
todos_by_uid[todo.uid] = []
todos_by_uid[todo.uid].append(todo)
return todos_by_uid


def _pick_todo(todos: list[Todo], tzinfo: datetime.tzinfo) -> Todo:
def _pick_todo(todos: list[Todo], dtstart: datetime.datetime) -> Todo:
"""Pick a todo to return in a list from a list of recurring todos.
The items passed in must all be for the same original todo (either a
Expand All @@ -84,30 +31,28 @@ def _pick_todo(todos: list[Todo], tzinfo: datetime.tzinfo) -> Todo:
"""
# For a recurring todo, the dtstart is after the last due date. Therefore
# we can stort items by dtstart and pick the last one that hasn't happened
iters: list[Iterable[_SortableTodoItem]] = []
for todo in todos:
if not (recur := todo.as_rrule()):
iters.append([SortableItemValue(todo.dtstart, todo)])
continue
iters.append(RecurIterable(RecurAdapter(todo, tzinfo=tzinfo).get, recur))
root_iter = merge_and_expand_items(todos, dtstart.tzinfo or local_timezone())

root_iter = MergedIterable(iters)

# Pick the first todo that hasn't started yet based on its dtstart
now = datetime.datetime.now(tzinfo)
it = iter(root_iter)
last = next(it)
while cur := next(it, None):
if cur.item.start_datetime is None or cur.item.start_datetime > now:
if cur.item.start_datetime is None or cur.item.start_datetime > dtstart:
break
last = cur
return last.item


def todo_list_view(
todos: list[Todo], tzinfo: datetime.tzinfo
todos: list[Todo],
dtstart: datetime.datetime | None = None,
) -> Generator[Todo, None, None]:
"""Create a list view for todos on a calendar, including recurrence."""
todos_by_uid = _todos_by_uid(todos)
"""Create a list view for todos on a calendar, including recurrence.
The dtstart value is used to determine the current time for the list and
for deciding which instance of a recurring todo to return.
"""
if dtstart is None:
dtstart = datetime.datetime.now(tz=local_timezone())
todos_by_uid = items_by_uid(todos)
for todos in todos_by_uid.values():
yield _pick_todo(todos, tzinfo=tzinfo)
yield _pick_todo(todos, dtstart)
106 changes: 106 additions & 0 deletions ical/recur_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Component specific iterable functions.
This module contains functions that are helpful for iterating over components
in a calendar. This includes expanding recurring components or other functions
for managing components from a list (e.g. grouping by uid).
"""

import datetime
from collections.abc import Iterable
from typing import Generic, TypeVar, cast

from .iter import (
MergedIterable,
RecurIterable,
SortableItemValue,
SpanOrderedItem,
LazySortableItem,
SortableItem,
)
from .types.recur import RecurrenceId
from .event import Event
from .todo import Todo
from .timespan import Timespan


ItemType = TypeVar("ItemType", bound="Event | Todo")
_DateOrDatetime = datetime.datetime | datetime.date


class RecurAdapter(Generic[ItemType]):
"""An adapter that expands an Event instance for a recurrence rule.
This adapter is given an event, then invoked with a specific date/time instance
that the event occurs on due to a recurrence rule. The event is copied with
necessary updated fields to act as a flattened instance of the event.
"""

def __init__(
self,
item: ItemType,
tzinfo: datetime.tzinfo | None = None,
) -> None:
"""Initialize the RecurAdapter."""
self._item = item
self._duration = item.computed_duration
self._tzinfo = tzinfo

def get(
self, dtstart: datetime.datetime | datetime.date
) -> SortableItem[Timespan, ItemType]:
"""Return a lazy sortable item."""

recur_id_dt = dtstart
dtend = dtstart + self._duration if self._duration else dtstart
# Make recurrence_id floating time to avoid dealing with serializing
# TZID. This value will still be unique within the series and is in
# the context of dtstart which may have a timezone.
if isinstance(recur_id_dt, datetime.datetime) and recur_id_dt.tzinfo:
recur_id_dt = recur_id_dt.replace(tzinfo=None)
recurrence_id = RecurrenceId.__parse_property_value__(recur_id_dt)

def build() -> ItemType:
updates = {
"dtstart": dtstart,
"recurrence_id": recurrence_id,
}
if isinstance(self._item, Event) and self._item.dtend and dtend:
updates["dtend"] = dtend
if isinstance(self._item, Todo) and self._item.due and dtend:
updates["due"] = dtend
return cast(ItemType, self._item.copy(update=updates))

ts = Timespan.of(dtstart, dtend, self._tzinfo)
return LazySortableItem(ts, build)


def items_by_uid(items: list[ItemType]) -> dict[str, list[ItemType]]:
items_by_uid: dict[str, list[ItemType]] = {}
for item in items:
if item.uid is None:
raise ValueError("Todo must have a UID")
if (values := items_by_uid.get(item.uid)) is None:
values = []
items_by_uid[item.uid] = values
values.append(item)
return items_by_uid


def merge_and_expand_items(
items: list[ItemType], tzinfo: datetime.tzinfo
) -> Iterable[SpanOrderedItem[ItemType]]:
"""Merge and expand items that are recurring."""
iters: list[Iterable[SpanOrderedItem[ItemType]]] = []
for item in items:
if not (recur := item.as_rrule()):
iters.append(
[
SortableItemValue(
item.timespan_of(tzinfo),
item,
)
]
)
continue
iters.append(RecurIterable(RecurAdapter(item, tzinfo=tzinfo).get, recur))
return MergedIterable(iters)
Loading

0 comments on commit dee0625

Please sign in to comment.