diff --git a/src/digid_eherkenning_oidc_generics/new_backends.py b/src/digid_eherkenning_oidc_generics/new_backends.py new file mode 100644 index 0000000000..ac45b223fc --- /dev/null +++ b/src/digid_eherkenning_oidc_generics/new_backends.py @@ -0,0 +1,164 @@ +""" +Custom OIDC Authentication backend(s) which looks up configuration from the DB. + +TODO: contribute this back to the mozilla_django_oidc_db package. +""" + +from __future__ import annotations + +from typing import TypeAlias, cast + +from django.contrib.auth import get_user_model +from django.contrib.auth.models import AbstractBaseUser, AnonymousUser +from django.core.exceptions import ImproperlyConfigured, SuspiciousOperation +from django.http import HttpRequest + +from mozilla_django_oidc.auth import OIDCAuthenticationBackend as BaseBackend +from mozilla_django_oidc_db.models import OpenIDConnectConfigBase +from typing_extensions import override + +from .models import OpenIDConnectBaseConfig +from .utils import dynamic_setting, get_setting_from_config, lookup_config + +AnyUser: TypeAlias = AnonymousUser | AbstractBaseUser + + +class OIDCAuthBackend(BaseBackend): + """ + Custom backend modifying the upstream package behaviour. + + This backend is meant to look up the configuration (class) to use via the config + query parameter (pointing to a model class now, probably a unique key in the + future), which loads the specific configuration parameters for the identity + provider. + + No configuration is loaded in :meth:`__init__` at all, instead we define properties + to dynamically look this up. Django instantiates backends *a lot*, e.g. during + permission checks. We only support the :meth:`authenticate` entrypoint like the + upstream library. + + .. todo:: go over the mozilla-django-oidc-db OIDCAuthenticationBackend methods and + see what else needs to be included. + """ + + config_class: type[OpenIDConnectConfigBase] + + # These should be functionaly equivalent to + # :class:`mozilla_django_oidc.auth.OIDCAuthenticationBackend`. + OIDC_OP_TOKEN_ENDPOINT = dynamic_setting[str]() + OIDC_OP_USER_ENDPOINT = dynamic_setting[str]() + OIDC_OP_JWKS_ENDPOINT = dynamic_setting[str | None](default=None) + OIDC_RP_CLIENT_ID = dynamic_setting[str]() + OIDC_RP_CLIENT_SECRET = dynamic_setting[str]() + OIDC_RP_SIGN_ALGO = dynamic_setting[str](default="HS256") + OIDC_RP_IDP_SIGN_KEY = dynamic_setting[str | None](default=None) + + @override + def __init__(self, *args, **kwargs) -> None: + # Deliberately empty, we discard all initialization from the parent class which + # requires a config_class to be set. Even if we set it (declaratively), this is + # not viable because it performs DB/cache IO to look up the config instance, + # which would happen as well when Django goes through the auth backends for + # permission checks. + # + # See https://github.com/maykinmedia/mozilla-django-oidc-db/issues/30 + self.UserModel = get_user_model() + + @override + def get_settings(self, attr: str, *args): # type: ignore + """ + Override the upstream library get_settings. + + Upstream is django-settings based, and we store configuration in database + records instead. We look up the configuration from the DB and check if the + requested setting is defined there or not. If not, it is taken from the Django + settings. + """ + assert hasattr(self, "config_class"), ( + "The config must be loaded from the authenticate entrypoint. It looks like " + "you're trying to access configuration before this was called." + ) + if (config := getattr(self, "_config", None)) is None: + # django-solo and type checking is challenging, but a new release is on the + # way and should fix that :fingers_crossed: + config = cast(OpenIDConnectConfigBase, self.config_class.get_solo()) + self._config = config + self._validate_settings() + return get_setting_from_config(config, attr, *args) + + def _validate_settings(self): + if ( + self.OIDC_RP_SIGN_ALGO.startswith("RS") + or self.OIDC_RP_SIGN_ALGO.startswith("ES") + ) and ( + self.OIDC_RP_IDP_SIGN_KEY is None and self.OIDC_OP_JWKS_ENDPOINT is None + ): + msg = "{} alg requires OIDC_RP_IDP_SIGN_KEY or OIDC_OP_JWKS_ENDPOINT to be configured." + raise ImproperlyConfigured(msg.format(self.OIDC_RP_SIGN_ALGO)) + + def _check_candidate_backend(self) -> bool: + return self.get_settings("enabled") + + # The method signature is checked by django when selecting a suitable backend. Our + # signature is more strict than the upstream library. Check the upstream + # `OIDCAuthenticationCallbackView` for the `auth.authenticate(**kwargs)` call if this + # needs updating. + @override + def authenticate( # type: ignore + self, + request: HttpRequest | None, + nonce: str | None = None, + code_verifier: str | None = None, + ) -> AnyUser | None: + """ + Authenticate the user with OIDC *iff* the conditions are met. + + Return ``None`` to skip to the next backend, raise + :class:`django.core.exceptions.PermissionDenied` to stop in our tracks. Return + a user object (real or anonymous) to signify success. + """ + # if we don't get a request, we can't check anything, so skip to the next + # backend. We need to grab the state and code from request.GET for OIDC. + if request is None: + return None + + # Load the config to apply and check if this backend should be considered to + # authenticate the user. + self.config_class = lookup_config(request) + is_candidate = self._check_candidate_backend() + if not is_candidate: + return None + + # Allright, now try to actually authenticate the user. + return super().authenticate(request, nonce=nonce, code_verifier=code_verifier) + + +class DigiDEHerkenningOIDCBackend(OIDCAuthBackend): + """ + A backend specialised to the digid-eherkenning-generics subclassed model. + """ + + @override + def _check_candidate_backend(self) -> bool: + # if we're dealing with a mozilla-django-oidc-db config that is *not* a + # digid-eherkenning-generics subclass, then don't bother. + if not issubclass(self.config_class, OpenIDConnectBaseConfig): + return False + return super()._check_candidate_backend() + + def get_or_create_user(self, access_token: str, id_token: str, payload: dict): + # TODO: overrides here - make it work for both real and "fake" users. + assert isinstance(self.request, HttpRequest) + + user_info = self.get_userinfo(access_token, id_token, payload) + + claims_verified = self.verify_claims(user_info) + if not claims_verified: + msg = "Claims verification failed" + raise SuspiciousOperation(msg) + + # breakpoint() + + user = AnonymousUser() + user.is_active = True # type: ignore + return user diff --git a/src/digid_eherkenning_oidc_generics/utils.py b/src/digid_eherkenning_oidc_generics/utils.py index 6df6607daf..dbf42137e1 100644 --- a/src/digid_eherkenning_oidc_generics/utils.py +++ b/src/digid_eherkenning_oidc_generics/utils.py @@ -1,19 +1,65 @@ -from typing import Any +""" +.. todo:: port most of this code to mozilla_django_oidc_db +""" + +from typing import Any, Generic, Protocol, TypeVar, overload from django.apps import apps from django.core.exceptions import BadRequest from django.http import HttpRequest from mozilla_django_oidc.utils import import_from_settings +from mozilla_django_oidc_db.models import OpenIDConnectConfigBase +from typing_extensions import Self + +T = TypeVar("T") + -from .models import OpenIDConnectBaseConfig +class SettingsHolder(Protocol): + def get_settings(self, attr: str, *args: Any) -> Any: ... -def get_setting_from_config(config: OpenIDConnectBaseConfig, attr: str, *args) -> Any: +class dynamic_setting(Generic[T]): """ - Look up a setting from the config record or fall back to Django settings. + Descriptor to lazily access settings while explicitly defining them. + """ + + _default_set: bool = False + default: T + + def __init__(self, **kwargs): + if default := kwargs.get("default"): + self.default = default + self._default_set = True + + def __set_name__(self, owner: type[object], name: str) -> None: + self.name = name + + def __repr__(self) -> str: + default = f" (default: {self.default!r})" if self._default_set else "" + return f"" - TODO: port this into mozilla_django_oidc_db. + @overload + def __get__(self, obj: None, objtype: None) -> Self: ... + + @overload + def __get__(self, obj: SettingsHolder, objtype: type[object]) -> T: ... + + def __get__( + self, obj: SettingsHolder | None, objtype: type[object] | None = None + ) -> Self | T: + if obj is None: + return self + args = () if not self._default_set else (self.default,) + return obj.get_settings(self.name, *args) + + def __set__(self, obj: object | None, value: Any) -> None: + raise AttributeError(f"setting {self.name} is read-only") + + +def get_setting_from_config(config: OpenIDConnectConfigBase, attr: str, *args) -> Any: + """ + Look up a setting from the config record or fall back to Django settings. """ attr_lowercase = attr.lower() if hasattr(config, attr_lowercase): @@ -26,18 +72,31 @@ def get_setting_from_config(config: OpenIDConnectBaseConfig, attr: str, *args) - return import_from_settings(attr, *args) -def lookup_config(request: HttpRequest) -> type[OpenIDConnectBaseConfig]: - # config query param is set in `OIDCInit.get_extra_params` +def lookup_config(request: HttpRequest) -> type[OpenIDConnectConfigBase]: + # cache on request for optimized access + if config := getattr(request, "_oidc_generics_config_class", None): + return config + + # The config_class key is added to the state in the OIDCInit.get method. + # TODO: verify that the state query param is present for error flows! Need to check + # the OAUTH2 spec for this, but according to ChatGeePeeTee if the request contains + # it, the callback must have it too. + state_key = request.GET.get("state") + if not state_key or state_key not in ( + states := request.session.get("oidc_states", []) + ): + raise BadRequest("Could not look up the referenced config.") + + state = states[state_key] try: - config = apps.get_model(request.GET.get("config", "")) + config = apps.get_model(state.get("config_class", "")) except (LookupError, ValueError) as exc: raise BadRequest("Could not look up the referenced config.") from exc - # TODO: check if this can be spoofed: starting the flow with a particular config - # and calling the callback with another one. If this is possible, we should - # store the intended config in the state, and then the query param is likely not - # even needed anymore. - if not issubclass(config, OpenIDConnectBaseConfig): + # Spoofing is not possible since we store it in the server-side session, but there + # can still be all sorts of programmer mistakes. + if not issubclass(config, OpenIDConnectConfigBase): raise BadRequest("Invalid config referenced.") + request._oidc_generics_config_class = config # type: ignore return config diff --git a/src/digid_eherkenning_oidc_generics/views.py b/src/digid_eherkenning_oidc_generics/views.py index 42b1a43ccb..b52e7d7b3c 100644 --- a/src/digid_eherkenning_oidc_generics/views.py +++ b/src/digid_eherkenning_oidc_generics/views.py @@ -7,6 +7,7 @@ from django.utils.http import url_has_allowed_host_and_scheme import requests +from furl import furl from mozilla_django_oidc.views import ( OIDCAuthenticationCallbackView as _OIDCAuthenticationCallbackView, OIDCAuthenticationRequestView as _OIDCAuthenticationRequestView, @@ -110,6 +111,14 @@ def get( # it's easiest to just override the session key with the correct value. request.session["oidc_login_next"] = return_url + # Store which config class to use in the state. We can not simply pass this as + # a querystring parameter appended to redirect_uri, as these are likely to be + # strictly validated. We must grab the state from the redirect Location. + state_key = furl(response.url).args["state"] + options = self.config_class._meta + state = request.session["oidc_states"][state_key] + state["config_class"] = f"{options.app_label}.{options.object_name}" + return response def _check_idp_availability(self) -> None: @@ -133,11 +142,6 @@ def get_extra_params(self, request: HttpRequest) -> dict[str, str]: Add a keycloak identity provider hint if configured. """ extra = super().get_extra_params(request) - options = self.config_class._meta - # Store which config class to use in the params so that the callback view can - # extract this again. - # TODO: verify this cannot be tampered with! - extra["config"] = f"{options.app_label}.{options.object_name}" if kc_idp_hint := self.get_settings("OIDC_KEYCLOAK_IDP_HINT", ""): extra["kc_idp_hint"] = kc_idp_hint return extra @@ -203,7 +207,7 @@ def get_settings(self, attr, *args): # type: ignore def dispatch(self, request: HttpRequest, *args, **kwargs): self.config_class = lookup_config(request) - return super().dispatch(*args, **kwargs) + return super().dispatch(request, *args, **kwargs) def get(self, request: HttpRequest): # grab where the redirect next from the session and store it as a temporary diff --git a/src/openforms/authentication/contrib/digid_eherkenning_oidc/tests/digid/test_auth_procedure.py b/src/openforms/authentication/contrib/digid_eherkenning_oidc/tests/digid/test_auth_procedure.py index 8a9801ba23..6406a15ce7 100644 --- a/src/openforms/authentication/contrib/digid_eherkenning_oidc/tests/digid/test_auth_procedure.py +++ b/src/openforms/authentication/contrib/digid_eherkenning_oidc/tests/digid/test_auth_procedure.py @@ -50,13 +50,30 @@ def setUp(self): self.addCleanup(self.requests_mocker.stop) self.requests_mocker.start() - def test_redirect_to_digid_oidc(self): + @patch( + "digid_eherkenning_oidc_generics.new_backends.DigiDEHerkenningOIDCBackend.verify_token", + return_value={"bsn": "123456789"}, + ) + def test_redirect_to_digid_oidc(self, mock_verify_token): + # TODO: switch to VCR instead of request mocks... self.requests_mocker.get( "http://provider.com/auth/realms/master/protocol/openid-connect/auth", status_code=200, ) + self.requests_mocker.post( + "http://provider.com/auth/realms/master/protocol/openid-connect/token", + json={ + "id_token": "-dummy-id-token-", + "access_token": "-dummy-access-token-", + }, + ) + self.requests_mocker.get( + "http://provider.com/auth/realms/master/protocol/openid-connect/userinfo", + json={"bsn": "123456789"}, + ) form_url = get_start_form_url(self.form) start_url = get_start_url(self.form, plugin_id="digid_oidc") + callback_url, oidc_login_next = "", "" response = self.client.get(start_url) @@ -74,10 +91,22 @@ def test_redirect_to_digid_oidc(self): self.assertEqual(query_params["scope"], "openid bsn") self.assertEqual(query_params["client_id"], "testclient") self.assertEqual( - query_params["redirect_uri"], + (redirect_uri := query_params["redirect_uri"]), f"http://testserver{reverse('digid_oidc:callback')}", ) + assert isinstance(redirect_uri, str) + callback_url = ( + furl(redirect_uri) + .set( + { + "state": query_params["state"], # CSRF protection + "code": "-dummy-", + } + ) + .url + ) + with self.subTest("Return state setup"): oidc_login_next = furl(self.client.session["oidc_login_next"]) query_params = oidc_login_next.query.params @@ -91,6 +120,17 @@ def test_redirect_to_digid_oidc(self): ) self.assertEqual(query_params["next"], form_url) + with self.subTest("Callback view/backend"): + # possibly something else failed earlier, no point in validating the return + # flow. + if callback_url and oidc_login_next: + response = self.client.get(callback_url) + + mock_verify_token.assert_called_once() + self.assertRedirects( + response, str(oidc_login_next), fetch_redirect_response=False + ) + def test_redirect_to_digid_oidc_internal_server_error(self): self.requests_mocker.get( "http://provider.com/auth/realms/master/protocol/openid-connect/auth", diff --git a/src/openforms/conf/base.py b/src/openforms/conf/base.py index b657950bc1..702835ccf0 100644 --- a/src/openforms/conf/base.py +++ b/src/openforms/conf/base.py @@ -502,6 +502,10 @@ # Allow logging in with both username+password and email+password AUTHENTICATION_BACKENDS = [ + # Put the fake backend first, as it (on success) only puts information in the session + # and it's quite easy to shortcut. + "digid_eherkenning_oidc_generics.new_backends.DigiDEHerkenningOIDCBackend", + # Real backends "axes.backends.AxesBackend", "openforms.accounts.backends.UserModelEmailBackend", "django.contrib.auth.backends.ModelBackend",