Skip to content

Commit

Permalink
Merge pull request #125 from AmpelProject/pydantic-2
Browse files Browse the repository at this point in the history
Pydantic 2
  • Loading branch information
jvansanten authored Jan 17, 2024
2 parents 94a2f72 + c597c85 commit de03868
Show file tree
Hide file tree
Showing 10 changed files with 385 additions and 332 deletions.
132 changes: 4 additions & 128 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,134 +11,10 @@ on:
pull_request:
branches:
- master
- dev/*

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies (with all extras)
run: |
python -m pip install --upgrade poetry
poetry install $(cat pyproject.toml | awk '/^\s*$/{f=0};f{if ($1!="docs") printf(" -E %s",$1)};/\[tool\.poetry\.extras\]/{f=1}')
- run: poetry run pytest --cov=ampel

mypy:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies (with all extras)
run: |
python -m pip install --upgrade poetry
poetry install $(cat pyproject.toml | awk '/^\s*$/{f=0};f{if ($1!="docs") printf(" -E %s",$1)};/\[tool\.poetry\.extras\]/{f=1}')
- run: poetry run mypy --namespace-packages -p ampel

check_version:
name: Tag version bump
runs-on: ubuntu-latest
# run only on pushes, not PRs
if: ${{ github.event_name == 'push' && github.base_ref == null }}
needs: [test, mypy]
outputs:
should_publish: ${{ steps.check.outputs.result }}
steps:
- uses: actions/setup-node@v3
with:
node-version: 14
- run: npm install toml@3.0.0
- name: Ensure tag for version bump
id: check
uses: actions/github-script@v6
with:
script: |
const toml = require('toml')
async function getVersion(ref) {
try {
response = await github.rest.repos.getContent({
repo: context.repo.repo,
owner: context.repo.owner,
path: 'pyproject.toml',
ref: ref
});
return toml.parse(Buffer.from(response.data.content, 'base64').toString())
.tool
.poetry
.version;
} catch (exc) {
if (exc.name == 'HttpError' && exc.status == 404) {
return null;
} else {
throw exc;
}
}
}
after = await getVersion(context.payload.after);
ref = `refs/tags/v${after}`
is_main = context.payload.ref === `refs/heads/${context.payload.repository.default_branch}`
// a tag matching the version was just pushed
let release = context.payload.ref == ref;
if (release) {
console.log(`Tag v${after} pushed (${context.sha})`);
}
// the version on the default branch changed; create a tag
if (!release && is_main) {
before = await getVersion(context.payload.before);
if (before !== after) {
console.log(`Version bumped: ${before} -> ${after}`);
try {
await github.rest.git.createRef({
owner: context.repo.owner,
repo: context.repo.repo,
ref,
sha: context.sha
});
console.log(`Tag v${after} created (${context.sha})`);
release = true;
} catch (exc) {
// tag already existed
if (exc.name == 'HttpError' && exc.status == 422) {
console.log(`Skipping publish (tag v${after} already exists)`);
release = false;
} else {
throw exc;
}
}
} else {
console.log(`Skipping publish (version is still ${before})`);
}
} else if (!is_main) {
console.log(`Skipping publish (push to ${context.payload.ref} != refs/heads/${context.payload.repository.default_branch})`);
}
return release;
ci:
uses: AmpelProject/Ampel-interface/.github/workflows/ci.yml@ci-py12-v1
secrets: inherit

pypi:
name: Publish to PyPI
runs-on: ubuntu-latest
needs: [check_version]
# NB: outputs are always strings; explicitly parse to get a boolean
if: ${{ fromJSON(needs.check_version.outputs.should_publish) }}

steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install poetry
run: |
python -m pip install --upgrade poetry
- name: Publish
run: |
poetry config repositories.testpypi https://test.pypi.org/legacy/
poetry publish -n --build
env:
POETRY_PYPI_TOKEN_PYPI: ${{ secrets.PYPI_API_TOKEN }}
POETRY_PYPI_TOKEN_TESTPYPI: ${{ secrets.PYPI_TEST_API_TOKEN }}
18 changes: 13 additions & 5 deletions ampel/abstract/AbsAlertLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,26 @@
from ampel.base.AmpelBaseModel import AmpelBaseModel


class AbsAlertLoader(Generic[T], AmpelABC, AmpelBaseModel, abstract=True):
class AbsAlertLoader(AmpelABC, AmpelBaseModel, Generic[T], abstract=True):

@property
def logger(self) -> AmpelLogger:
return self._logger

@property
def resources(self) -> dict[str, Resource]:
return self._resources

def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.logger: AmpelLogger = AmpelLogger.get_logger()
self.resources: dict[str, Resource] = {}
self._logger: AmpelLogger = AmpelLogger.get_logger()
self._resources: dict[str, Resource] = {}

def set_logger(self, logger: AmpelLogger) -> None:
self.logger = logger
self._logger = logger

def add_resource(self, name: str, value: Resource) -> None:
self.resources[name] = value
self._resources[name] = value

def __iter__(self) -> Iterator[T]: # type: ignore
return self
Expand Down
15 changes: 10 additions & 5 deletions ampel/alert/AlertConsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)

self._ampel_db = self.context.get_database()
self.alert_supplier = AuxUnitRegister.new_unit(
self._alert_supplier = AuxUnitRegister.new_unit(
model = self.supplier,
sub_type = AbsAlertSupplier
)
Expand All @@ -131,6 +131,11 @@ def __init__(self, **kwargs) -> None:
logger.info("AlertConsumer setup completed")


@property
def alert_supplier(self) -> AbsAlertSupplier:
return self._alert_supplier


def register_signal(self, signum: int, frame) -> None:
""" Executed when SIGINT/SIGTERM is emitted during alert processing """
if self._cancel_run == 0:
Expand Down Expand Up @@ -202,11 +207,11 @@ def proceed(self, event_hdlr: EventHandler) -> int:
base_flag = LogFlag.T0 | LogFlag.CORE | self.base_log_flag
)

self.alert_supplier.set_logger(logger)
self._alert_supplier.set_logger(logger)

if event_hdlr.resources:
for k, v in event_hdlr.resources.items():
self.alert_supplier.add_resource(k, v)
self._alert_supplier.add_resource(k, v)

if logger.verbose:
logger.log(VERBOSE, "Pre-run setup")
Expand All @@ -219,7 +224,7 @@ def proceed(self, event_hdlr: EventHandler) -> int:
updates_buffer = DBUpdatesBuffer(
self._ampel_db, run_id, logger,
error_callback = self.set_cancel_run,
acknowledge_callback = self.alert_supplier.acknowledge,
acknowledge_callback = self._alert_supplier.acknowledge,
catch_signals = False, # we do it ourself
max_size = self.updates_buffer_size
)
Expand Down Expand Up @@ -277,7 +282,7 @@ def proceed(self, event_hdlr: EventHandler) -> int:
register_signal = self.register_signal

# Iterate over alerts
for alert in self.alert_supplier:
for alert in self._alert_supplier:

# Allow execution to complete for this alert (loop exited after ingestion of current alert)
signal(SIGINT, register_signal)
Expand Down
4 changes: 2 additions & 2 deletions ampel/alert/load/FileAlertLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ def __init__(self, **kwargs) -> None:
if self.logger:
self.logger.info(f"Registering {len(self.files)} file(s) to load")

self.iter_files = iter(self.files)
self._iter_files = iter(self.files)

def __iter__(self):
return self

def __next__(self) -> BytesIO:
with open(next(self.iter_files), "rb") as alert_file:
with open(next(self._iter_files), "rb") as alert_file:
return BytesIO(alert_file.read())
41 changes: 24 additions & 17 deletions ampel/alert/load/TarAlertLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@

import tarfile
from gzip import GzipFile
from typing import IO
from typing import IO, TypeAlias, TYPE_CHECKING
from ampel.log.AmpelLogger import AmpelLogger
from ampel.types import Traceless
from ampel.abstract.AbsAlertLoader import AbsAlertLoader

# use IOBase at runtime, because isinstance(anything, IO[bytes]) is always
# False.
if TYPE_CHECKING:
IOBase: TypeAlias = IO[bytes]
else:
from io import IOBase

class TarAlertLoader(AbsAlertLoader[IO[bytes]]):

class TarAlertLoader(AbsAlertLoader[IOBase]):
"""
Load alerts from a ``tar`` file. The archive must be laid out like the
`ZTF public alert archive <https://ztf.uw.edu/alerts/public/>`_, i.e. one
Expand All @@ -24,7 +31,7 @@ class TarAlertLoader(AbsAlertLoader[IO[bytes]]):

tar_mode: str = 'r:gz'
start: int = 0
file_obj: None | IO[bytes] | tarfile.ExFileObject
file_obj: None | IOBase
file_path: None | str
logger: Traceless[AmpelLogger] # actually optional

Expand All @@ -36,17 +43,17 @@ def __init__(self, **kwargs) -> None:

super().__init__(**kwargs)

self.chained_tal: 'None | TarAlertLoader' = None
self._chained_tal: 'None | TarAlertLoader' = None

if self.file_obj:
self.tar_file = tarfile.open(fileobj=self.file_obj, mode=self.tar_mode)
self._tar_file = tarfile.open(fileobj=self.file_obj, mode=self.tar_mode)
elif self.file_path:
self.tar_file = tarfile.open(self.file_path, mode=self.tar_mode)
self._tar_file = tarfile.open(self.file_path, mode=self.tar_mode)
else:
raise ValueError("Please provide value either for 'file_path' or 'file_obj'")

if self.start != 0:
for count, _ in enumerate(self.tar_file, 1):
for count, _ in enumerate(self._tar_file, 1):
if count >= self.start:
break

Expand All @@ -55,7 +62,7 @@ def __iter__(self):
return self


def __next__(self) -> IO[bytes]:
def __next__(self) -> IOBase:
"""
FYI:
from io import IOBase
Expand All @@ -69,15 +76,15 @@ def __next__(self) -> IO[bytes]:
# public interface. Beware the temptation to call getmembers() instead;
# while this does return .members, it also reads the entire archive as
# a side-effect.
self.tar_file.members.clear() # type: ignore
self._tar_file.members.clear() # type: ignore

if self.chained_tal is not None:
if self._chained_tal is not None:
file_obj = self.get_chained_next()
if file_obj is not None:
return file_obj

# Get next element in tar archive
tar_info = self.tar_file.next()
tar_info = self._tar_file.next()

# Reach end of archive
if tar_info is None:
Expand All @@ -92,12 +99,12 @@ def __next__(self) -> IO[bytes]:
if tar_info.isfile():

# extractfile returns a file like obj
file_obj = self.tar_file.extractfile(tar_info)
file_obj = self._tar_file.extractfile(tar_info)
assert file_obj is not None

# Handle tars with nested tars
if tar_info.name.endswith('.tar.gz'):
self.chained_tal = TarAlertLoader(file_obj=file_obj)
self._chained_tal = TarAlertLoader(file_obj=file_obj)
if (subfile_obj := self.get_chained_next()) is not None:
return subfile_obj
else:
Expand All @@ -109,11 +116,11 @@ def __next__(self) -> IO[bytes]:
return next(self)


def get_chained_next(self) -> None | IO[bytes]:
assert self.chained_tal is not None
file_obj = next(self.chained_tal, None)
def get_chained_next(self) -> None | IOBase:
assert self._chained_tal is not None
file_obj = next(self._chained_tal, None)
if file_obj is None:
self.chained_tal = None
self._chained_tal = None
return None

return file_obj
32 changes: 0 additions & 32 deletions mypy.ini

This file was deleted.

Loading

0 comments on commit de03868

Please sign in to comment.