Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ruff rules #128

Merged
merged 8 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,8 @@ jobs:
uses: AmpelProject/Ampel-interface/.github/workflows/ci.yml@ci-py12-v3
secrets: inherit
with:
lint: true
# renovate: datasource=conda depName=conda-forge/python
python-version: "3.12.1"
# renovate: datasource=pypi depName=ruff
ruff-version: "0.1.15"
11 changes: 6 additions & 5 deletions ampel/abstract/AbsAlertLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
# Last Modified Date: 19.12.2022
# Last Modified By: valery brinnel <firstname.lastname@gmail.com>

from ampel.types import T
from typing import Generic
from collections.abc import Iterator
from ampel.struct.Resource import Resource
from ampel.log.AmpelLogger import AmpelLogger
from typing import Generic

from ampel.base.AmpelABC import AmpelABC
from ampel.base.decorator import abstractmethod
from ampel.base.AmpelBaseModel import AmpelBaseModel
from ampel.base.decorator import abstractmethod
from ampel.log.AmpelLogger import AmpelLogger
from ampel.struct.Resource import Resource
from ampel.types import T


class AbsAlertLoader(AmpelABC, AmpelBaseModel, Generic[T], abstract=True):
Expand Down
2 changes: 1 addition & 1 deletion ampel/abstract/AbsAlertRegister.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

from ampel.base.AmpelABC import AmpelABC
from ampel.base.decorator import abstractmethod
from ampel.core.ContextUnit import ContextUnit
from ampel.core.AmpelRegister import AmpelRegister
from ampel.core.ContextUnit import ContextUnit
from ampel.protocol.AmpelAlertProtocol import AmpelAlertProtocol


Expand Down
9 changes: 5 additions & 4 deletions ampel/abstract/AbsAlertSupplier.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
# Last Modified Date: 19.12.2022
# Last Modified By: valery brinnel <firstname.lastname@gmail.com>

from typing import Iterator
from ampel.log.AmpelLogger import AmpelLogger
from collections.abc import Iterator

from ampel.base.AmpelABC import AmpelABC
from ampel.base.decorator import abstractmethod
from ampel.base.AmpelUnit import AmpelUnit
from ampel.struct.Resource import Resource
from ampel.base.decorator import abstractmethod
from ampel.log.AmpelLogger import AmpelLogger
from ampel.protocol.AmpelAlertProtocol import AmpelAlertProtocol
from ampel.struct.Resource import Resource


class AbsAlertSupplier(AmpelUnit, AmpelABC, abstract=True):
Expand Down
78 changes: 41 additions & 37 deletions ampel/alert/AlertConsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,38 @@
# Last Modified By: valery brinnel <firstname.lastname@gmail.com>

import sys
from typing import Any
from typing_extensions import Self
from collections.abc import Sequence
from contextlib import suppress
from signal import SIGINT, SIGTERM, default_int_handler, signal
from typing import TYPE_CHECKING, Any

from pymongo.errors import PyMongoError
from signal import signal, SIGINT, SIGTERM, default_int_handler
from typing_extensions import Self

from ampel.core.AmpelContext import AmpelContext
from ampel.enum.EventCode import EventCode
from ampel.model.UnitModel import UnitModel
from ampel.core.EventHandler import EventHandler
from ampel.abstract.AbsAlertSupplier import AbsAlertSupplier
from ampel.abstract.AbsEventUnit import AbsEventUnit
from ampel.base.AuxUnitRegister import AuxUnitRegister
from ampel.alert.AlertConsumerError import AlertConsumerError
from ampel.alert.AlertConsumerMetrics import AlertConsumerMetrics, stat_time
from ampel.alert.FilterBlocksHandler import FilterBlocksHandler
from ampel.base.AuxUnitRegister import AuxUnitRegister
from ampel.core.AmpelContext import AmpelContext
from ampel.core.EventHandler import EventHandler
from ampel.enum.EventCode import EventCode
from ampel.ingest.ChainedIngestionHandler import ChainedIngestionHandler
from ampel.mongo.update.DBUpdatesBuffer import DBUpdatesBuffer
from ampel.log import AmpelLogger, LogFlag, VERBOSE
from ampel.log.utils import report_exception
from ampel.log import VERBOSE, AmpelLogger, LogFlag
from ampel.log.AmpelLoggingError import AmpelLoggingError
from ampel.log.LightLogRecord import LightLogRecord
from ampel.alert.AlertConsumerError import AlertConsumerError
from ampel.alert.AlertConsumerMetrics import AlertConsumerMetrics, stat_time
from ampel.model.ingest.CompilerOptions import CompilerOptions
from ampel.log.utils import report_exception
from ampel.model.AlertConsumerModel import AlertConsumerModel
from ampel.util.mappings import get_by_path, merge_dict
from ampel.model.ingest.CompilerOptions import CompilerOptions
from ampel.model.UnitModel import UnitModel
from ampel.mongo.update.DBUpdatesBuffer import DBUpdatesBuffer
from ampel.util.freeze import recursive_unfreeze
from ampel.util.mappings import get_by_path, merge_dict

if TYPE_CHECKING:
from ampel.alert.FilterBlock import FilterBlock
from ampel.protocol.AmpelAlertProtocol import AmpelAlertProtocol

class AlertConsumer(AbsEventUnit, AlertConsumerModel):
"""
Expand Down Expand Up @@ -257,17 +262,19 @@ def proceed(self, event_hdlr: EventHandler) -> int:
self._fbh.ready(logger, run_id)

# Shortcuts
report_filter_error = lambda e, alert, fblock: self._report_ap_error(
e, event_hdlr, logger,
extra = {'a': alert.id, 'section': 'filter', 'c': fblock.channel}
)
def report_filter_error(e: Exception, alert: "AmpelAlertProtocol", fblock: "FilterBlock"):
self._report_ap_error(
e, event_hdlr, logger,
extra = {'a': alert.id, 'section': 'filter', 'c': fblock.channel}
)

report_ingest_error = lambda e, alert, filter_results: self._report_ap_error(
e, event_hdlr, logger, extra={
'a': alert.id, 'section': 'ingest',
'c': [self.directives[el[0]].channel for el in filter_results]
}
)
def report_ingest_error(e: Exception, alert: "AmpelAlertProtocol", filter_results: Sequence[tuple[int, bool|int]]):
self._report_ap_error(
e, event_hdlr, logger, extra={
'a': alert.id, 'section': 'ingest',
'c': [self.directives[el[0]].channel for el in filter_results]
}
)

# Process alerts
################
Expand Down Expand Up @@ -305,7 +312,7 @@ def proceed(self, event_hdlr: EventHandler) -> int:
filter_results.append(res) # type: ignore[arg-type]

# Unrecoverable (logging related) errors
except (PyMongoError, AmpelLoggingError) as e:
except (PyMongoError, AmpelLoggingError) as e: # noqa: PERF203
print("%s: abording run() procedure" % e.__class__.__name__)
report_filter_error(e, alert, fblock)
raise e
Expand All @@ -320,12 +327,11 @@ def proceed(self, event_hdlr: EventHandler) -> int:

if self.raise_exc:
raise e
else:
if self.error_max:
err += 1
if err == self.error_max:
logger.error("Max number of error reached, breaking alert processing")
self.set_cancel_run(AlertConsumerError.TOO_MANY_ERRORS)
if self.error_max:
err += 1
if err == self.error_max:
logger.error("Max number of error reached, breaking alert processing")
self.set_cancel_run(AlertConsumerError.TOO_MANY_ERRORS)
else:
# if bypassing filters, track passing rates at top level
for counter in stats.filter_accepted:
Expand Down Expand Up @@ -463,8 +469,8 @@ def _report_ap_error(
}

if extra:
for k in extra.keys():
info[k] = extra[k]
for k, v in extra.items():
info[k] = v

# Try to insert doc into trouble collection (raises no exception)
# Possible exception will be logged out to console in any case
Expand All @@ -474,10 +480,8 @@ def _report_ap_error(
@staticmethod
def print_feedback(arg: Any, suffix: str = "") -> None:
print("") # ^C in console
try:
with suppress(Exception):
arg = AlertConsumerError(arg)
except Exception:
pass
s = f"[{arg.name if isinstance(arg, AlertConsumerError) else arg}] Interrupting run {suffix}"
print("+" * len(s))
print(s)
Expand Down
1 change: 1 addition & 0 deletions ampel/alert/AlertConsumerError.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import signal
from enum import IntEnum


class AlertConsumerError(IntEnum):
CONNECTIVITY = 1
SIGINT = signal.SIGINT # 2
Expand Down
24 changes: 12 additions & 12 deletions ampel/alert/AlertFileList.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import logging


# pylint: disable=logging-format-interpolation
class AlertFileList:

Expand Down Expand Up @@ -40,7 +41,7 @@ def set_index_range(self, min_index=None, max_index=None):
self.min_index = min_index
self.max_index = max_index
self.logger.debug(f"Min index set to: {self.min_index}")
self.logger.debug(f"Max index set to: {str(self.max_index)}")
self.logger.debug(f"Max index set to: {self.max_index:s}")


def set_max_entries(self, max_entries):
Expand All @@ -59,24 +60,23 @@ def build_file_list(self):
""" """
self.logger.debug("Building internal file list")

import glob, os
import glob
import os
all_files = sorted(glob.glob(self.folder + "/" + self.extension), key=os.path.getmtime)

if self.min_index is not None:
self.logger.debug("Filtering files using min_index criterium")
out_files = []
for f in all_files:
if int(os.path.basename(f).split(".")[0]) >= self.min_index:
out_files.append(f)
all_files = out_files
all_files = [
f for f in all_files
if int(os.path.basename(f).split(".")[0]) >= self.min_index
]

if self.max_index is not None:
self.logger.debug("Filtering files using max_index criterium")
out_files = []
for f in all_files:
if int(os.path.basename(f).split(".")[0]) <= self.max_index:
out_files.append(f)
all_files = out_files
all_files = [
f for f in all_files
if int(os.path.basename(f).split(".")[0]) <= self.max_index
]

if self.max_entries is not None:
self.logger.debug("Filtering files using max_entries criterium")
Expand Down
13 changes: 7 additions & 6 deletions ampel/alert/BaseAlertSupplier.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
# Last Modified By: valery brinnel <firstname.lastname@gmail.com>

import json
from collections.abc import Callable, Iterator
from io import IOBase
from typing import Any, Literal
from collections.abc import Callable, Iterator
from ampel.protocol.AmpelAlertProtocol import AmpelAlertProtocol

from ampel.abstract.AbsAlertLoader import AbsAlertLoader
from ampel.abstract.AbsAlertSupplier import AbsAlertSupplier
from ampel.log.AmpelLogger import AmpelLogger
from ampel.base.decorator import abstractmethod
from ampel.base.AuxUnitRegister import AuxUnitRegister
from ampel.abstract.AbsAlertLoader import AbsAlertLoader
from ampel.base.decorator import abstractmethod
from ampel.log.AmpelLogger import AmpelLogger
from ampel.model.UnitModel import UnitModel
from ampel.protocol.AmpelAlertProtocol import AmpelAlertProtocol
from ampel.struct.Resource import Resource


Expand Down Expand Up @@ -73,7 +74,7 @@ def __init__(self, **kwargs) -> None:
elif self.deserialize == "avro":

from fastavro import reader
def avro_next(arg: IOBase): # noqa: E306
def avro_next(arg: IOBase):
return next(reader(arg))

self._deserialize = avro_next
Expand Down
Loading
Loading