Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add vtimezone tzinfo implementation #94

Merged
merged 4 commits into from
Aug 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ical/calendar.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ class Calendar(ComponentModel):
prodid: str = Field(default=_PRODID)
version: str = Field(default=_VERSION)

#
# Calendar components
#

events: list[Event] = Field(alias="vevent", default_factory=list)
todos: list[Todo] = Field(alias="vtodo", default_factory=list)
journal: list[Journal] = Field(alias="vjournal", default_factory=list)
Expand Down
11 changes: 7 additions & 4 deletions ical/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Recur,
RequestStatus,
Uri,
validate_until_dtstart,
)
from .util import dtstamp_factory, normalize_datetime, uid_factory

Expand Down Expand Up @@ -181,7 +182,7 @@ def __ge__(self, other: Any) -> bool:
return NotImplemented
return self._tuple() >= other._tuple()

@root_validator
@root_validator(allow_reuse=True)
def validate_date_types(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Validate that start and end values are the same date or datetime type."""
if (
Expand All @@ -192,7 +193,7 @@ def validate_date_types(cls, values: dict[str, Any]) -> dict[str, Any]:
raise ValueError("Expected end value type to match start")
return values

@root_validator
@root_validator(allow_reuse=True)
def validate_datetime_timezone(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Validate that start and end values have the same timezone information."""
if (
Expand All @@ -210,14 +211,14 @@ def validate_datetime_timezone(cls, values: dict[str, Any]) -> dict[str, Any]:
raise ValueError(f"Expected end datetime with timezone but was {dtend}")
return values

@root_validator
@root_validator(allow_reuse=True)
def validate_one_end_or_duration(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Validate that only one of duration or end date may be set."""
if values.get("dtend") and values.get("duration"):
raise ValueError("Only one of dtend or duration may be set." "")
return values

@root_validator
@root_validator(allow_reuse=True)
def validate_duration_unit(cls, values: dict[str, Any]) -> dict[str, Any]:
"""Validate the duration is the appropriate units."""
if not (duration := values.get("duration")):
Expand All @@ -229,3 +230,5 @@ def validate_duration_unit(cls, values: dict[str, Any]) -> dict[str, Any]:
if duration < datetime.timedelta(seconds=0):
raise ValueError(f"Expected duration to be positive but was {duration}")
return values

_validate_until_dtstart = root_validator(allow_reuse=True)(validate_until_dtstart)
130 changes: 130 additions & 0 deletions ical/iter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""Library for iterators used in ical.

These iterators are primarily used for implementing recurrence rules where an
object should be returned for a series of date/time, with some modification
based on that date/time. Additionally, it is often necessary to handle multiple
recurrence rules together as a single view of recurring date/times.
"""

from __future__ import annotations

import datetime
import heapq
from collections.abc import Callable, Iterable, Iterator
from typing import TypeVar, Union

T = TypeVar("T")

ItemAdapter = Callable[[Union[datetime.datetime, datetime.date]], T]
"""An adapter for an object in a sorted container (iterator).

The adapter is invoked with the date/time of the current instance and
the callback returns an object at that time (e.g. event with updated time)
"""


class RecurIterator(Iterator[T]):
"""An iterator for a recurrence rule."""

def __init__(
self,
item_cb: ItemAdapter[T],
recur: Iterator[datetime.datetime | datetime.date],
):
"""Initialize the RecurIterator."""
self._item_cb = item_cb
self._recur = recur

def __iter__(self) -> Iterator[T]:
return self

def __next__(self) -> T:
"""Return the next event in the recurrence."""
dtstart: datetime.datetime | datetime.date = next(self._recur)
return self._item_cb(dtstart)


class RecurIterable(Iterable[T]):
"""A series of events from a recurring event.

The inputs are a callback that creates objects at a specific date/time, and an iterable
of all the relevant date/times (typically a dateutil.rrule or dateutil.rruleset).
"""

def __init__(
self,
item_cb: ItemAdapter[T],
recur: Iterable[datetime.datetime | datetime.date],
) -> None:
"""Initialize timeline."""
self._item_cb = item_cb
self._recur = recur

def __iter__(self) -> Iterator[T]:
"""Return an iterator as a traversal over events in chronological order."""
return RecurIterator(self._item_cb, iter(self._recur))


class PeekingIterator(Iterator[T]):
"""An iterator with a preview of the next item.

The primary purpose is to implement a merged iterator where it is needed to
see the next item in the iterator in order to decide which child iterator
to pull from.
"""

def __init__(self, iterator: Iterator[T]):
"""Initialize PeekingIterator."""
self._iterator = iterator
self._next = next(self._iterator, None)

def __iter__(self) -> Iterator[T]:
"""Return this iterator."""
return self

def peek(self) -> T | None:
"""Peek at the next item without consuming."""
return self._next

def __next__(self) -> T:
"""Produce the next item from the merged set."""
result = self._next
self._next = next(self._iterator, None)
if result is None:
raise StopIteration()
return result


class MergedIterator(Iterator[T]):
"""An iterator with a merged sorted view of the underlying sorted iterators."""

def __init__(self, iters: list[Iterator[T]]):
"""Initialize MergedIterator."""
self._iters = [PeekingIterator(iterator) for iterator in iters]

def __iter__(self) -> Iterator[T]:
"""Return this iterator."""
return self

def __next__(self) -> T:
"""Produce the next item from the merged set."""
heap: list[tuple[T, PeekingIterator[T]]] = []
for iterator in self._iters:
peekd: T | None = iterator.peek()
if peekd:
heapq.heappush(heap, (peekd, iterator))
if not heap:
raise StopIteration()
(_, iterator) = heapq.heappop(heap)
return next(iterator)


class MergedIterable(Iterable[T]):
"""An iterator that merges results from underlying sorted iterables."""

def __init__(self, iters: list[Iterable[T]]) -> None:
"""Initialize MergedIterable."""
self._iters = iters

def __iter__(self) -> Iterator[T]:
return MergedIterator([iter(it) for it in self._iters])
5 changes: 4 additions & 1 deletion ical/journal.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import logging
from typing import Any, Optional, Union

from pydantic import Field
from pydantic import Field, root_validator

from .parsing.property import ParsedProperty
from .types import (
Expand All @@ -19,6 +19,7 @@
Recur,
RequestStatus,
Uri,
validate_until_dtstart,
)
from .util import dtstamp_factory, normalize_datetime, uid_factory

Expand Down Expand Up @@ -89,3 +90,5 @@ def start(self) -> datetime.datetime | datetime.date:
def start_datetime(self) -> datetime.datetime:
"""Return the events start as a datetime."""
return normalize_datetime(self.start).astimezone(tz=datetime.timezone.utc)

_validate_until_dtstart = root_validator(allow_reuse=True)(validate_until_dtstart)
139 changes: 7 additions & 132 deletions ical/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dateutil import rrule

from .event import Event
from .types import Frequency, Recur, Weekday
from .iter import MergedIterable, RecurIterable
from .util import normalize_datetime

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -126,24 +126,17 @@ def __iter__(self) -> Iterator[Event]:
yield event


class RecurIterator(Iterator[Event]):
class RecurAdapter:
"""An iterator for a recurrence rule."""

def __init__(
self, event: Event, recur: Iterator[datetime.datetime | datetime.date]
):
"""Initialize the RecurIterator."""
def __init__(self, event: Event):
"""Initialize the RecurAdapter."""
self._event = event
self._event_duration = event.computed_duration
self._recur = recur
self._is_all_day = not isinstance(self._event.dtstart, datetime.datetime)

def __iter__(self) -> Iterator[Event]:
return self

def __next__(self) -> Event:
def get(self, dtstart: datetime.datetime | datetime.date) -> Event:
"""Return the next event in the recurrence."""
dtstart: datetime.datetime | datetime.date = next(self._recur)
if self._is_all_day and isinstance(dtstart, datetime.datetime):
dtstart = dtstart.date()
return self._event.copy(
Expand All @@ -155,124 +148,6 @@ def __next__(self) -> Event:
)


class RecurIterable(Iterable[Event]):
"""A series of events from a recurring event."""

def __init__(self, event: Event, recur: rrule.rrule | rrule.rruleset) -> None:
"""Initialize timeline."""
self._event = event
self._recur = recur

def __iter__(self) -> Iterator[Event]:
"""Return an iterator as a traversal over events in chronological order."""
return RecurIterator(self._event, iter(self._recur))


RRULE_FREQ = {
Frequency.DAILY: rrule.DAILY,
Frequency.WEEKLY: rrule.WEEKLY,
Frequency.MONTHLY: rrule.MONTHLY,
Frequency.YEARLY: rrule.YEARLY,
}
RRULE_WEEKDAY = {
Weekday.MONDAY: rrule.MO,
Weekday.TUESDAY: rrule.TU,
Weekday.WEDNESDAY: rrule.WE,
Weekday.THURSDAY: rrule.TH,
Weekday.FRIDAY: rrule.FR,
Weekday.SATURDAY: rrule.SA,
Weekday.SUNDAY: rrule.SU,
}


def _create_rrule(
dtstart: datetime.datetime | datetime.date, rule: Recur
) -> rrule.rrule:
"""Create a dateutil rrule for the specified event."""
if (freq := RRULE_FREQ.get(rule.freq)) is None:
raise ValueError(f"Unsupported frequency in rrule: {rule.freq}")

byweekday: list[rrule.weekday] | None = None
if rule.by_weekday:
byweekday = [
RRULE_WEEKDAY[weekday.weekday](
1 if weekday.occurrence is None else weekday.occurrence
)
for weekday in rule.by_weekday
]
return rrule.rrule(
freq=freq,
dtstart=dtstart,
interval=rule.interval,
count=rule.count,
until=rule.until,
byweekday=byweekday,
bymonthday=rule.by_month_day if rule.by_month_day else None,
bymonth=rule.by_month if rule.by_month else None,
cache=True,
)


class PeekingIterator(Iterator[Event]):
"""An iterator with a preview of the next item."""

def __init__(self, iterator: Iterator[Event]):
"""Initialize PeekingIterator."""
self._iterator = iterator
self._next = next(self._iterator, None)

def __iter__(self) -> Iterator[Event]:
"""Return this iterator."""
return self

def peek(self) -> Event | None:
"""Peek at the next item without consuming."""
return self._next

def __next__(self) -> Event:
"""Produce the next item from the merged set."""
result = self._next
self._next = next(self._iterator, None)
if result is None:
raise StopIteration()
return result


class MergedIterator(Iterator[Event]):
"""An iterator with a merged sorted view of the underlying sorted iterators."""

def __init__(self, iters: list[Iterator[Event]]):
"""Initialize MergedIterator."""
self._iters = [PeekingIterator(iterator) for iterator in iters]

def __iter__(self) -> Iterator[Event]:
"""Return this iterator."""
return self

def __next__(self) -> Event:
"""Produce the next item from the merged set."""
heap: list[tuple[datetime.datetime, PeekingIterator]] = []
for iterator in self._iters:
peekd = iterator.peek()
if peekd:
heapq.heappush(heap, (peekd.start_datetime, iterator))
if not heap:
raise StopIteration()
(_, iterator) = heapq.heappop(heap)
return next(iterator)


class MergedIterable(Iterable[Event]):
"""An iterator that merges results from underlying sorted iterables."""

def __init__(self, iters: list[Iterable[Event]]) -> None:
"""Initialize MergedIterable."""
self._iters = iters

def __iter__(self) -> Iterator[Event]:
return MergedIterator([iter(it) for it in self._iters])


def calendar_timeline(events: list[Event]) -> Timeline:
"""Create a timeline for events on a calendar, including recurrence."""
iters: list[Iterable[Event]] = [EventIterable(events)]
Expand All @@ -281,10 +156,10 @@ def calendar_timeline(events: list[Event]) -> Timeline:
continue
ruleset = rrule.rruleset()
if event.rrule:
ruleset.rrule(_create_rrule(event.start, event.rrule))
ruleset.rrule(event.rrule.as_rrule(event.start))
for rdate in event.rdate:
ruleset.rdate(rdate) # type: ignore[no-untyped-call]
for exdate in event.exdate:
ruleset.exdate(exdate) # type: ignore[no-untyped-call]
iters.append(RecurIterable(event, ruleset))
iters.append(RecurIterable(RecurAdapter(event).get, ruleset))
return Timeline(MergedIterable(iters))
Loading