Skip to content

Commit

Permalink
WIP: event notification
Browse files Browse the repository at this point in the history
  • Loading branch information
ilius committed Nov 6, 2024
1 parent 6f4601c commit dd8c657
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 65 deletions.
2 changes: 1 addition & 1 deletion scal3/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ def stopRunningThreads() -> None:
try:
cancel = thread.cancel
except AttributeError: # noqa: PERF203
pass
log.debug(f"Thread {thread} has no cancel function")
else:
log.info(f"stopping thread {thread.getName()}")
cancel()
Expand Down
34 changes: 27 additions & 7 deletions scal3/event_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import os.path
from collections import OrderedDict
from contextlib import suppress
from datetime import timedelta
from os.path import isabs, join, split, splitext
from time import localtime, perf_counter
from time import time as now
Expand Down Expand Up @@ -1790,6 +1789,10 @@ def __init__(self, event: "Event") -> None:
self.alarmSound = "" # FIXME
self.playerCmd = "mplayer"

def notify(self, finishFunc: "Callable") -> None:
from scal3.ui_gtk.event.notifier.alarm import notify
notify(self, finishFunc)


@classes.notifier.register
class FloatingMsgNotifier(EventNotifier):
Expand All @@ -1810,6 +1813,10 @@ def __init__(self, event: "Event") -> None:
self.bgColor = (255, 255, 0)
self.textColor = (0, 0, 0)

def notify(self, finishFunc: "Callable") -> None:
from scal3.ui_gtk.event.floatingMsg.alarm import notify
notify(self, finishFunc)


@classes.notifier.register
class WindowMsgNotifier(EventNotifier):
Expand All @@ -1822,6 +1829,10 @@ def __init__(self, event: "Event") -> None:
self.extraMessage = ""
# window icon, FIXME

def notify(self, finishFunc: "Callable") -> None:
from scal3.ui_gtk.event.notifier.windowMsg import notify
notify(self, finishFunc)


# @classes.notifier.register # FIXME
class CommandNotifier(EventNotifier):
Expand All @@ -1837,6 +1848,9 @@ def __init__(self, event: "Event") -> None:
self.command = ""
self.pyEval = False

def notify(self, finishFunc: "Callable") -> None:
from scal3.ui_gtk.event.command.alarm import notify
notify(self, finishFunc)

# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -2466,7 +2480,7 @@ def checkNotify(self, finishFunc: "Callable") -> None:
if end < tm: # TODO: add a self.parent.notificationMaxDelay
log.debug(f"checkNotify: event has past, event={self}")
return
if start > tm + timedelta(seconds=self.getNotifyBeforeSec()):
if start > tm + self.getNotifyBeforeSec():
log.debug(f"checkNotify: event notif time has not reached, event={self}")
return
self.notify(finishFunc)
Expand All @@ -2484,6 +2498,7 @@ def notifierFinishFunc():
log.exception("")

for notifier in self.notifiers:
print(f"notifier.notify: {notifier=}")
notifier.notify(notifierFinishFunc)

def getIcsData(self, prettyDateTime=False): # noqa: ARG002, PLR6301
Expand Down Expand Up @@ -4521,6 +4536,7 @@ def initOccurrence(self) -> None:
self.occur = EventSearchTree()
# self.occurLoaded = False
self.occurCount = 0
self.notifyOccur = EventSearchTree()

def clear(self) -> None:
self.occur.clear()
Expand All @@ -4547,6 +4563,8 @@ def updateOccurrence(self) -> None:
self.addOccur(t0, t1, event.id)
if event.notifiers:
notificationEnabled = True
for t0, t1 in occur.getTimeRangeList():
self.notifyOccur.add(t0 - event.getNotifyBeforeSec(), t1, event.id)
self.notificationEnabled = notificationEnabled

# self.occurLoaded = True
Expand Down Expand Up @@ -4733,8 +4751,7 @@ def importData(

def _searchTimeFilter(self, conds):
if not ("time_from" in conds or "time_to" in conds):
for eid in self.idList:
yield eid
yield from self.idList
return

try:
Expand All @@ -4750,8 +4767,8 @@ def _searchTimeFilter(self, conds):
else:
del conds["time_to"]

for _epoch0, _epoch1, eid, _odt in self.occur.search(time_from, time_to):
yield eid
for item in self.occur.search(time_from, time_to):
yield item.eid

def search(self, conds):
conds = dict(conds) # take a copy, we may modify it
Expand Down Expand Up @@ -6332,10 +6349,13 @@ def getDayOccurrenceData(curJd, groups, tfmt="HM$"):
# log.debug("\nupdateData: checking event", event.summary)
gid = group.id
color = group.color
for epoch0, epoch1, eid, _odt in group.occur.search(
for item in group.occur.search(
getEpochFromJd(curJd),
getEpochFromJd(curJd + 1),
):
eid = item.eid
epoch0 = item.start
epoch1 = item.end
event = group[eid]
# ---
text = event.getTextParts()
Expand Down
106 changes: 76 additions & 30 deletions scal3/event_notification_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,37 @@
# Also avalable in /usr/share/common-licenses/GPL on Debian systems
# or /usr/share/licenses/common/GPL3/license.txt on ArchLinux

import sched
from heapq import heappop
from scal3 import logger

log = logger.get()

import os
import threading

#from sched import scheduler
from threading import Thread
from time import perf_counter, sleep
from time import time as now

from .simple_sched import scheduler

DISABLE = False


class EventNotificationManager:
def __init__(self, eventGroups):
self.byGroup = {}
self.byGroup: dict[int, EventGroupNotificationThread] = {}
if DISABLE:
return
for group in eventGroups:
self.runGroup(group)
self.checkGroup(group)

def stop(self):
for thread in self.byGroup.values():
thread.cancel()

def checkGroup(self, group):
# log.debug(f"EventNotificationManager.checkGroup: {group=}")
if not group.enable:
return
if not group.notificationEnabled:
Expand All @@ -38,26 +55,24 @@ def checkGroup(self, group):
if thread is not None and thread.is_alive():
return

log.info(f"EventNotificationManager.checkGroup: {group=}: creating thread")
thread = EventGroupNotificationThread(group)
self.byGroup[group.id] = thread
thread.start()


class EventGroupNotificationThread(Thread):
interval = 30 * 60 # seconds
maxTimerCount = 100
sleepSeconds = 1 # seconds
interval = int(os.getenv("STARCAL_NOTIFICATION_CHECK_INTERVAL") or "1800")
# ^ seconds
# TODO: get from group.notificationCheckInterval

def __init__(self, group):
self.group = group
# self.sch: sched.scheduler | None = None
self.queues = {}
# type: dict[int, list[int]]
# the values should be a (min) heap
# use heappush and heappop
# epoch = self.queues[eid][0] # to get the smallest without pop
# epoch = heappop(self.queues[eid]) # to get and remove the smallest
# heappush(self.queues[eid], epoch)

self.sent = set()

# self.sch: sched.scheduler | None = None
# threading.Timer is a subclass of threading.Thread
# so probably too expensive to create a timer for each occurance or even event!
# try using sched.scheduler
Expand All @@ -70,44 +85,75 @@ def __init__(self, group):
target=self.mainLoop,
)

self._stop_event = threading.Event()

def cancel(self):
log.debug("EventGroupNotificationThread.cancel")
self._stop_event.set()

def stopped(self):
return self._stop_event.is_set()

def sleep(self, seconds: float):
step = self.sleepSeconds
sleepUntil = perf_counter() + seconds
while not self.stopped() and perf_counter() < sleepUntil:
sleep(step)

def mainLoop(self):
# time.perf_counter() is resistant to change of system time
interval = self.interval
while True:
t0 = perf_counter()
self.run()
dt = perf_counter() - t0
sleep(interval - dt)
sleepSeconds = self.sleepSeconds
while not self.stopped():
sleepUntil = perf_counter() + interval
log.debug(f"EventGroupNotificationThread: run: {self.group=}")
self._runStep()
log.debug(f"EventGroupNotificationThread: finished run: {self.group=}")
while not self.stopped() and perf_counter() < sleepUntil:
sleep(sleepSeconds)

def finishFunc(self):
pass # FIXME: what to do here?

def notify(self, eid: int):
log.info(f"EventGroupNotificationThread: notify: {eid=}")
self.group[eid].checkNotify(self.finishFunc)

def run(self):
def _runStep(self):
if not self.group.enable:
return
if not self.group.notificationEnabled:
return

interval = self.interval
queues = self.queues

# if self.sch is not None and not self.sch.empty():
# print(f"EventGroupNotificationThread: run: last scheduler is not done yet")
group = self.group

sch = sched.scheduler(now, sleep)
tm = now()
items = list(group.notifyOccur.search(tm, tm + interval))

if not items:
return

sch = scheduler(
timefunc=now,
delayfunc=self.sleep,
stopped=self.stopped,
)

for eid in queues:
if queues[eid][0] > tm + interval:
for item in items:
if item.oid in self.sent:
log.info(f"EventGroupNotificationThread: skipping {item}")
continue
log.info(f"EventGroupNotificationThread: adding {item}")
self.sent.add(item.oid)
sch.enterabs(
heappop(queues[eid]),
item.start, # max(now(), item.start),
1, # priority
self.notify,
argument=(eid,),
argument=(item.eid,),
)

# self.sch = sch
sch.run(blocking=True)
log.info(f"EventGroupNotificationThread: run: starting sch.run, {len(items)=}")
sch.run()
log.info("EventGroupNotificationThread: run: finished sch.run")
22 changes: 17 additions & 5 deletions scal3/event_search_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# Also avalable in /usr/share/common-licenses/LGPL on Debian systems
# or /usr/share/licenses/common/LGPL/license.txt on ArchLinux

from collections import namedtuple

from scal3 import logger

log = logger.get()
Expand All @@ -27,6 +29,15 @@
epsTm = 0.01


OccurItem = namedtuple("OccurItem", [
"start",
"end",
"eid",
"dt",
"oid",
])


def getCount(x):
return x.count if x else 0

Expand Down Expand Up @@ -231,11 +242,12 @@ def _searchStep(self, node, t0, t1):

def search(self, t0, t1):
for mt, dt, eid in self._searchStep(self.root, t0, t1):
yield (
max(t0, mt - dt),
min(t1, mt + dt),
eid,
2 * dt,
yield OccurItem(
start=max(t0, mt - dt),
end=min(t1, mt + dt),
eid=eid,
dt=2 * dt,
oid=hash((eid, mt - dt, mt + dt)),
)

def getLastBefore(self, t1):
Expand Down
Loading

0 comments on commit dd8c657

Please sign in to comment.