Skip to content

Commit

Permalink
Reintegrate v0.8 (#119)
Browse files Browse the repository at this point in the history
* Add EasyAlertConsumerTemplate (#89)

a general-purpose processor template

* Bump version to 0.8.5

* AlertConsumer: explicitly acknowledge alerts

see AmpelAstro/Ampel-LSST#13

* Bump version to 0.8.6a0

* fix(deps): update ampel-core to >=0.8.7a0,<0.9

* Bump version to 0.8.6

* EasyAlertConsumerTemplate: adapt to v0.9

* chore(deps): update dev dependencies

* fix(deps): update minor updates

* chore(deps): update dependency mypy to ^1.6.0

* chore(deps): update dev dependencies

* Bump version to 0.8.6

* TarAlertLoader: avoid consuming entire tarfile when start!=0

* fix: bump core to ^0.9.1a0

---------

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
  • Loading branch information
jvansanten and renovate[bot] authored Dec 19, 2023
1 parent 673f406 commit f26cec8
Show file tree
Hide file tree
Showing 13 changed files with 1,051 additions and 689 deletions.
4 changes: 4 additions & 0 deletions ampel/abstract/AbsAlertLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ def __iter__(self) -> Iterator[T]: # type: ignore
@abstractmethod
def __next__(self) -> T:
...

def acknowledge(self, alerts: Iterator[T]) -> None:
"""Inform the source that a batch of alerts has been handled"""
...
4 changes: 4 additions & 0 deletions ampel/abstract/AbsAlertSupplier.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ def add_resource(self, name: str, value: Resource) -> None:
@abstractmethod
def __iter__(self) -> Iterator[AmpelAlertProtocol]:
...

def acknowledge(self, alerts: Iterator[AmpelAlertProtocol]) -> None:
"""Inform the source that a batch of alerts has been handled"""
...
93 changes: 49 additions & 44 deletions ampel/alert/AlertConsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def proceed(self, event_hdlr: EventHandler) -> int:
updates_buffer = DBUpdatesBuffer(
self._ampel_db, run_id, logger,
error_callback = self.set_cancel_run,
acknowledge_callback = self.alert_supplier.acknowledge,
catch_signals = False, # we do it ourself
max_size = self.updates_buffer_size
)
Expand Down Expand Up @@ -325,54 +326,58 @@ def proceed(self, event_hdlr: EventHandler) -> int:
for counter in stats.filter_accepted:
counter.inc()

if filter_results:

stats.accepted.inc()

try:
alert_extra: dict[str, Any] = {'alert': alert.id}
if self.include_alert_extra_with_keys and alert.extra:
for key, path in self.include_alert_extra_with_keys.items():
alert_extra[key] = get_by_path(alert.extra, path)
with stat_time.labels("ingest").time():
ing_hdlr.ingest(
alert.datapoints, filter_results, stock_id, alert.tag,
alert_extra, alert.extra.get('stock') if alert.extra else None
)
except (PyMongoError, AmpelLoggingError) as e:
print("%s: abording run() procedure" % e.__class__.__name__)
report_ingest_error(e, alert, filter_results)
raise e

except Exception as e:
report_ingest_error(e, alert, filter_results)

if self.raise_exc:
raise e
with updates_buffer.group_updates():

if self.error_max:
err += 1
if filter_results:

if err == self.error_max:
logger.error("Max number of error reached, breaking alert processing")
self.set_cancel_run(AlertConsumerError.TOO_MANY_ERRORS)
stats.accepted.inc()

else:
try:
alert_extra: dict[str, Any] = {'alert': alert.id}
if self.include_alert_extra_with_keys and alert.extra:
for key, path in self.include_alert_extra_with_keys.items():
alert_extra[key] = get_by_path(alert.extra, path)
with stat_time.labels("ingest").time():
ing_hdlr.ingest(
alert.datapoints, filter_results, stock_id, alert.tag,
alert_extra, alert.extra.get('stock') if alert.extra else None
)
except (PyMongoError, AmpelLoggingError) as e:
print("%s: abording run() procedure" % e.__class__.__name__)
report_ingest_error(e, alert, filter_results)
raise e

except Exception as e:
report_ingest_error(e, alert, filter_results)

if self.raise_exc:
raise e

# All channels reject this alert
# no log entries goes into the main logs collection sinces those are redirected to Ampel_rej.

# So we add a notification manually. For that, we don't use logger
# cause rejection messages were alreary logged into the console
# by the StreamHandler in channel specific RecordBufferingHandler instances.
# So we address directly db_logging_handler, and for that, we create
# a LogDocument manually.
lr = LightLogRecord(logger.name, LogFlag.INFO | logger.base_flag)
lr.stock = stock_id
lr.channel = reduced_chan_names # type: ignore[assignment]
lr.extra = {'a': alert.id, 'allout': True}
if db_logging_handler:
db_logging_handler.handle(lr)
if self.error_max:
err += 1

if err == self.error_max:
logger.error("Max number of error reached, breaking alert processing")
self.set_cancel_run(AlertConsumerError.TOO_MANY_ERRORS)

else:

# All channels reject this alert
# no log entries goes into the main logs collection sinces those are redirected to Ampel_rej.

# So we add a notification manually. For that, we don't use logger
# cause rejection messages were alreary logged into the console
# by the StreamHandler in channel specific RecordBufferingHandler instances.
# So we address directly db_logging_handler, and for that, we create
# a LogDocument manually.
lr = LightLogRecord(logger.name, LogFlag.INFO | logger.base_flag)
lr.stock = stock_id
lr.channel = reduced_chan_names # type: ignore[assignment]
lr.extra = {'a': alert.id, 'allout': True}
if db_logging_handler:
db_logging_handler.handle(lr)

updates_buffer.acknowledge_on_push(alert)

iter_count += 1
stats.alerts.inc()
Expand Down
8 changes: 3 additions & 5 deletions ampel/alert/load/TarAlertLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@ def __init__(self, **kwargs) -> None:
raise ValueError("Please provide value either for 'file_path' or 'file_obj'")

if self.start != 0:
count = 0
for tarinfo in self.tar_file:
count += 1
if count < self.start:
continue
for count, _ in enumerate(self.tar_file, 1):
if count >= self.start:
break


def __iter__(self):
Expand Down
84 changes: 84 additions & 0 deletions ampel/template/EasyAlertConsumerTemplate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from typing import Any, overload

from ampel.abstract.AbsConfigMorpher import AbsConfigMorpher
from ampel.log.AmpelLogger import AmpelLogger
from ampel.model.ingest.CompilerOptions import CompilerOptions
from ampel.model.ingest.FilterModel import FilterModel
from ampel.model.ingest.T2Compute import T2Compute
from ampel.model.UnitModel import UnitModel
from ampel.template.AbsEasyChannelTemplate import AbsEasyChannelTemplate
from ampel.types import ChannelId


class EasyAlertConsumerTemplate(AbsConfigMorpher):
"""Configure an AlertConsumer (or subclass) for a single channel"""

#: Channel tag for any documents created
channel: ChannelId
#: Alert supplier unit
supplier: str | UnitModel
#: Optional override for alert loader
loader: None | str | UnitModel
#: Alert shaper
shaper: str | UnitModel
#: Document creation options
compiler_opts: CompilerOptions
#: Alert filter. None disables filtering
filter: None | str | FilterModel
#: Augment alerts with external content before ingestion
muxer: None | str | UnitModel
# Combine datapoints into states
combiner: str | UnitModel

#: T2 units to trigger when stock is updated. Dependencies of tied
#: units will be added automatically.
t2_compute: list[T2Compute] = []

#: Unit to synthesize config for
unit: str = "AlertConsumer"

extra: dict = {}

def morph(self, ampel_config: dict[str, Any], logger: AmpelLogger) -> dict[str, Any]:

return UnitModel(
unit=self.unit,
config=self.extra
| AbsEasyChannelTemplate.craft_t0_processor_config(
channel=self.channel,
alconf=ampel_config,
t2_compute=self.t2_compute,
supplier=self._get_supplier(),
shaper=self._config_as_dict(self.shaper),
combiner=self._config_as_dict(self.combiner),
filter_dict=self._config_as_dict(self.filter),
muxer=self._config_as_dict(self.muxer),
compiler_opts=self.compiler_opts.dict(),
),
).dict(exclude_unset=True)

@overload
@staticmethod
def _config_as_dict(arg: None) -> None:
...

@overload
@staticmethod
def _config_as_dict(arg: str | UnitModel) -> dict[str, Any]:
...

@staticmethod
def _config_as_dict(arg: None | str | UnitModel) -> None | dict[str, Any]:
if arg is None:
return None
else:
return (arg if isinstance(arg, UnitModel) else UnitModel(unit=arg)).dict(exclude_unset=True)

def _get_supplier(self) -> dict[str, Any]:

unit_dict = self._config_as_dict(self.supplier)
if self.loader:
unit_dict["config"] = unit_dict.get("config", {}) | {
"loader": self._config_as_dict(self.loader)
}
return unit_dict
3 changes: 3 additions & 0 deletions conf/ampel-alerts/ampel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ unit:

# Logical unit
- ampel.alert.filter.BasicMultiFilter

template:
ingest_alerts: ampel.template.EasyAlertConsumerTemplate
Loading

0 comments on commit f26cec8

Please sign in to comment.