Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handle_errors context manager #657

Merged
merged 10 commits into from
Aug 10, 2023
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ repos:
hooks:
- id: ruff
- repo: https://github.com/returntocorp/semgrep
rev: 'v0.91.0'
rev: 'v1.31.2'
hooks:
- id: semgrep
# See semgrep.dev/rulesets to select a ruleset and copy its URL
Expand Down
35 changes: 35 additions & 0 deletions tools/semgrep.yml
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,38 @@ rules:
severity: ERROR
paths:
include: ["web"]

- id: explicit-session-begin_nested
patterns:
- pattern: |
with $CM: ...
- metavariable-pattern:
metavariable: "$CM"
pattern: handle_errors($SESS, $...ARGS)
- focus-metavariable: "$CM"
message: "Use `session.begin` explicitly instead of passing the session to `handle_errors`"
fix: |
handle_errors($...ARGS), $SESS.begin_nested()
languages: [python]
severity: ERROR
paths:
include: ["web"]

- id: commit-in-context-manager
patterns:
- pattern: |
with $CM:
$BODY
- metavariable-pattern:
metavariable: "$CM"
pattern: $SESS.begin_nested()
- metavariable-pattern:
metavariable: "$BODY"
pattern: |
<... $SESS.commit() ...>
- focus-metavariable: ["$CM", "$SESS"]
message: "Don't commit the session in a `.begin_nested()` context manager"
languages: [python]
severity: ERROR
paths:
include: ["web", "pycroft"]
9 changes: 3 additions & 6 deletions web/blueprints/finance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1007,12 +1007,9 @@ def default_response():
if not form.is_submitted():
return default_response()

try:
with handle_errors(session):
lib.finance.transaction_confirm_all(current_user)
session.commit()
except PycroftException: # pragma: no cover
return default_response()
with handle_errors(error_response=default_response), session.begin_nested():
lib.finance.transaction_confirm_all(current_user)
session.commit()

flash("Alle Transaktionen wurden bestätigt.", "success")
return redirect(url_for(".transactions_unconfirmed"))
Expand Down
66 changes: 40 additions & 26 deletions web/blueprints/helpers/exception.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import traceback
import typing as t
from contextlib import contextmanager

from flask import flash
from flask import flash, abort, make_response
from flask.typing import ResponseReturnValue
from sqlalchemy.orm import Session, SessionTransaction

from pycroft.exc import PycroftException
Expand All @@ -15,41 +17,18 @@
logger = logging.getLogger('web.exc')


def web_execute(function, success_message, *args, **kwargs):
from warnings import warn
warn("Use `handle_errors` context manager instead.", DeprecationWarning)
try:
result = function(*args, **kwargs)

if success_message:
flash(success_message, 'success')

return result, True
except PycroftException as e:
flash(exception_flash_message(e), 'error')
session.session.rollback()
except Exception as e:
traceback.print_exc()
flash(f"Es ist ein unerwarteter Fehler aufgetreten: {e}", "error")

session.session.rollback()

return None, False


class UnexpectedException(PycroftException):
pass


@contextmanager
def handle_errors(session: Session) -> SessionTransaction:
def flash_and_wrap_errors() -> t.Iterator[None]:
"""Flash a message, roll back the session, and wrap unknown errors in a ``PycroftException``

:raises PycroftException:
"""
try:
with session.begin_nested() as n:
yield n
yield
except PycroftException as e:
flash(exception_flash_message(e), 'error')
raise
Expand All @@ -60,6 +39,41 @@ def handle_errors(session: Session) -> SessionTransaction:
raise UnexpectedException from e


@contextmanager
# TODO rename to „wrap_errors“; `handle` suggests „I'll deal with everything“, which is incorrect
def handle_errors(
error_response: t.Callable[[], ResponseReturnValue]
| ResponseReturnValue
| None = None,
) -> t.Iterator[SessionTransaction]:
"""Wraps errors as `PycroftErrors` and turns them into a flash message.

Example:

def default_response(): return render_template("template.html")

with handle_errors(error_response=default_response), session.begin_nested():
... # call some `lib` functions
session.commit()

:param error_response: if given, this will be called when a `PycroftException` is caught
and the return value is used as the response via :py:function:`flask.abort`.
"""
cm = flash_and_wrap_errors()

if error_response is None:
with cm as n:
yield n
return

try:
with cm as n:
yield n
except PycroftException:
resp = error_response() if callable(error_response) else error_response
abort(make_response(resp))


def exception_flash_message(e: PycroftException) -> str:
match e:
case MacExistsException():
Expand Down
93 changes: 36 additions & 57 deletions web/blueprints/host/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,9 @@ def default_response():
if not form.is_submitted():
return default_response()

try:
with handle_errors(session.session):
lib_host.host_delete(host, current_user)
session.session.commit()
except PycroftException: # pragma: no cover
return default_response()
with handle_errors(error_response=default_response), session.session.begin_nested():
lib_host.host_delete(host, current_user)
session.session.commit()

flash("Host erfolgreich gelöscht.", 'success')
return redirect(url_for('user.user_show', user_id=owner.id, _anchor='hosts'))
Expand Down Expand Up @@ -88,24 +85,18 @@ def default_response():

# existence guaranteed by validator
owner = User.get(form.owner_id.data)
try:
with handle_errors(session.session):
if not (
room := get_room(
building_id=form.building.data.id,
level=form.level.data,
room_number=form.room_number.data,
)
):
form.room_number.errors.append("room does not exist")
return default_response()
lib_host.host_edit(
host, owner, room, form.name.data,
processor=current_user
with handle_errors(error_response=default_response), session.session.begin_nested():
if not (
room := get_room(
building_id=form.building.data.id,
level=form.level.data,
room_number=form.room_number.data,
)
session.session.commit()
except PycroftException: # pragma: no cover
return default_response()
):
form.room_number.errors.append("room does not exist")
return default_response()
lib_host.host_edit(host, owner, room, form.name.data, processor=current_user)
session.session.commit()

flash("Host erfolgreich bearbeitet.", 'success')
return redirect(url_for(
Expand Down Expand Up @@ -138,26 +129,21 @@ def default_response():

# existence verified by validator
owner = User.get(form.owner_id.data)
try:
with handle_errors(session.session):
if not (
room := get_room(
# TODO I know this is a double query,
# but we should fix this on the `get_room` side.
building_id=form.building.data.id,
level=form.level.data,
room_number=form.room_number.data,
)
):
form.room_number.errors.append("room does not exist")
return default_response()

host = lib_host.host_create(
owner, room, form.name.data, processor=current_user
with handle_errors(error_response=default_response), session.session.begin_nested():
if not (
room := get_room(
# TODO I know this is a double query,
# but we should fix this on the `get_room` side.
building_id=form.building.data.id,
level=form.level.data,
room_number=form.room_number.data,
)
session.session.commit()
except PycroftException: # pragma: no cover
return default_response()
):
form.room_number.errors.append("room does not exist")
return default_response()

host = lib_host.host_create(owner, room, form.name.data, processor=current_user)
session.session.commit()

flash("Host erfolgreich erstellt.", "success")
return redirect(
Expand Down Expand Up @@ -236,12 +222,9 @@ def default_response():
if not form.is_submitted():
return default_response()

try:
with handle_errors(session.session):
lib_host.interface_delete(interface, current_user)
session.session.commit()
except PycroftException: # pragma: no cover
return default_response()
with handle_errors(error_response=default_response), session.session.begin_nested():
lib_host.interface_delete(interface, current_user)
session.session.commit()

flash("Interface erfolgreich gelöscht.", 'success')
return redirect(url_for(
Expand Down Expand Up @@ -283,15 +266,11 @@ def default_response():

ips = {IPv4Address(ip) for ip in form.ips.data}

try:
with handle_errors(session.session):
lib_host.interface_edit(
interface, form.name.data, form.mac.data, ips,
processor=current_user
)
session.session.commit()
except PycroftException: # pragma: no cover
return default_response()
with handle_errors(error_response=default_response), session.session.begin_nested():
lib_host.interface_edit(
interface, form.name.data, form.mac.data, ips, processor=current_user
)
session.session.commit()

flash("Interface erfolgreich bearbeitet.", 'success')
return redirect(url_for('user.user_show', user_id=interface.host.owner_id, _anchor='hosts'))
Expand Down
Loading
Loading