Skip to content

Commit

Permalink
⚗️ Most of refactor prototyping done
Browse files Browse the repository at this point in the history
Probably best now to move the backends/library code into mozilla-django-oidc-db
and continue the refactor after that.
  • Loading branch information
sergei-maertens committed May 13, 2024
1 parent 27e4c69 commit 4d839da
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 21 deletions.
164 changes: 164 additions & 0 deletions src/digid_eherkenning_oidc_generics/new_backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""
Custom OIDC Authentication backend(s) which looks up configuration from the DB.
TODO: contribute this back to the mozilla_django_oidc_db package.
"""

from __future__ import annotations

from typing import TypeAlias, cast

from django.contrib.auth import get_user_model
from django.contrib.auth.models import AbstractBaseUser, AnonymousUser
from django.core.exceptions import ImproperlyConfigured, SuspiciousOperation
from django.http import HttpRequest

from mozilla_django_oidc.auth import OIDCAuthenticationBackend as BaseBackend
from mozilla_django_oidc_db.models import OpenIDConnectConfigBase
from typing_extensions import override

from .models import OpenIDConnectBaseConfig
from .utils import dynamic_setting, get_setting_from_config, lookup_config

AnyUser: TypeAlias = AnonymousUser | AbstractBaseUser


class OIDCAuthBackend(BaseBackend):
"""
Custom backend modifying the upstream package behaviour.
This backend is meant to look up the configuration (class) to use via the config
query parameter (pointing to a model class now, probably a unique key in the
future), which loads the specific configuration parameters for the identity
provider.
No configuration is loaded in :meth:`__init__` at all, instead we define properties
to dynamically look this up. Django instantiates backends *a lot*, e.g. during
permission checks. We only support the :meth:`authenticate` entrypoint like the
upstream library.
.. todo:: go over the mozilla-django-oidc-db OIDCAuthenticationBackend methods and
see what else needs to be included.
"""

config_class: type[OpenIDConnectConfigBase]

# These should be functionaly equivalent to
# :class:`mozilla_django_oidc.auth.OIDCAuthenticationBackend`.
OIDC_OP_TOKEN_ENDPOINT = dynamic_setting[str]()
OIDC_OP_USER_ENDPOINT = dynamic_setting[str]()
OIDC_OP_JWKS_ENDPOINT = dynamic_setting[str | None](default=None)
OIDC_RP_CLIENT_ID = dynamic_setting[str]()
OIDC_RP_CLIENT_SECRET = dynamic_setting[str]()
OIDC_RP_SIGN_ALGO = dynamic_setting[str](default="HS256")
OIDC_RP_IDP_SIGN_KEY = dynamic_setting[str | None](default=None)

@override
def __init__(self, *args, **kwargs) -> None:
# Deliberately empty, we discard all initialization from the parent class which
# requires a config_class to be set. Even if we set it (declaratively), this is
# not viable because it performs DB/cache IO to look up the config instance,
# which would happen as well when Django goes through the auth backends for
# permission checks.
#
# See https://github.com/maykinmedia/mozilla-django-oidc-db/issues/30
self.UserModel = get_user_model()

@override
def get_settings(self, attr: str, *args): # type: ignore
"""
Override the upstream library get_settings.
Upstream is django-settings based, and we store configuration in database
records instead. We look up the configuration from the DB and check if the
requested setting is defined there or not. If not, it is taken from the Django
settings.
"""
assert hasattr(self, "config_class"), (
"The config must be loaded from the authenticate entrypoint. It looks like "
"you're trying to access configuration before this was called."
)
if (config := getattr(self, "_config", None)) is None:
# django-solo and type checking is challenging, but a new release is on the
# way and should fix that :fingers_crossed:
config = cast(OpenIDConnectConfigBase, self.config_class.get_solo())
self._config = config
self._validate_settings()
return get_setting_from_config(config, attr, *args)

def _validate_settings(self):
if (
self.OIDC_RP_SIGN_ALGO.startswith("RS")
or self.OIDC_RP_SIGN_ALGO.startswith("ES")
) and (
self.OIDC_RP_IDP_SIGN_KEY is None and self.OIDC_OP_JWKS_ENDPOINT is None
):
msg = "{} alg requires OIDC_RP_IDP_SIGN_KEY or OIDC_OP_JWKS_ENDPOINT to be configured."
raise ImproperlyConfigured(msg.format(self.OIDC_RP_SIGN_ALGO))

def _check_candidate_backend(self) -> bool:
return self.get_settings("enabled")

# The method signature is checked by django when selecting a suitable backend. Our
# signature is more strict than the upstream library. Check the upstream
# `OIDCAuthenticationCallbackView` for the `auth.authenticate(**kwargs)` call if this
# needs updating.
@override
def authenticate( # type: ignore
self,
request: HttpRequest | None,
nonce: str | None = None,
code_verifier: str | None = None,
) -> AnyUser | None:
"""
Authenticate the user with OIDC *iff* the conditions are met.
Return ``None`` to skip to the next backend, raise
:class:`django.core.exceptions.PermissionDenied` to stop in our tracks. Return
a user object (real or anonymous) to signify success.
"""
# if we don't get a request, we can't check anything, so skip to the next
# backend. We need to grab the state and code from request.GET for OIDC.
if request is None:
return None

# Load the config to apply and check if this backend should be considered to
# authenticate the user.
self.config_class = lookup_config(request)
is_candidate = self._check_candidate_backend()
if not is_candidate:
return None

# Allright, now try to actually authenticate the user.
return super().authenticate(request, nonce=nonce, code_verifier=code_verifier)


class DigiDEHerkenningOIDCBackend(OIDCAuthBackend):
"""
A backend specialised to the digid-eherkenning-generics subclassed model.
"""

@override
def _check_candidate_backend(self) -> bool:
# if we're dealing with a mozilla-django-oidc-db config that is *not* a
# digid-eherkenning-generics subclass, then don't bother.
if not issubclass(self.config_class, OpenIDConnectBaseConfig):
return False
return super()._check_candidate_backend()

def get_or_create_user(self, access_token: str, id_token: str, payload: dict):
# TODO: overrides here - make it work for both real and "fake" users.
assert isinstance(self.request, HttpRequest)

user_info = self.get_userinfo(access_token, id_token, payload)

claims_verified = self.verify_claims(user_info)
if not claims_verified:
msg = "Claims verification failed"
raise SuspiciousOperation(msg)

# breakpoint()

user = AnonymousUser()
user.is_active = True # type: ignore
return user
85 changes: 72 additions & 13 deletions src/digid_eherkenning_oidc_generics/utils.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,65 @@
from typing import Any
"""
.. todo:: port most of this code to mozilla_django_oidc_db
"""

from typing import Any, Generic, Protocol, TypeVar, overload

from django.apps import apps
from django.core.exceptions import BadRequest
from django.http import HttpRequest

from mozilla_django_oidc.utils import import_from_settings
from mozilla_django_oidc_db.models import OpenIDConnectConfigBase
from typing_extensions import Self

T = TypeVar("T")


from .models import OpenIDConnectBaseConfig
class SettingsHolder(Protocol):
def get_settings(self, attr: str, *args: Any) -> Any: ...


def get_setting_from_config(config: OpenIDConnectBaseConfig, attr: str, *args) -> Any:
class dynamic_setting(Generic[T]):
"""
Look up a setting from the config record or fall back to Django settings.
Descriptor to lazily access settings while explicitly defining them.
"""

_default_set: bool = False
default: T

def __init__(self, **kwargs):
if default := kwargs.get("default"):
self.default = default
self._default_set = True

def __set_name__(self, owner: type[object], name: str) -> None:
self.name = name

def __repr__(self) -> str:
default = f" (default: {self.default!r})" if self._default_set else ""
return f"<dynamic_setting {self.name}{default}>"

TODO: port this into mozilla_django_oidc_db.
@overload
def __get__(self, obj: None, objtype: None) -> Self: ...

@overload
def __get__(self, obj: SettingsHolder, objtype: type[object]) -> T: ...

def __get__(
self, obj: SettingsHolder | None, objtype: type[object] | None = None
) -> Self | T:
if obj is None:
return self
args = () if not self._default_set else (self.default,)
return obj.get_settings(self.name, *args)

def __set__(self, obj: object | None, value: Any) -> None:
raise AttributeError(f"setting {self.name} is read-only")


def get_setting_from_config(config: OpenIDConnectConfigBase, attr: str, *args) -> Any:
"""
Look up a setting from the config record or fall back to Django settings.
"""
attr_lowercase = attr.lower()
if hasattr(config, attr_lowercase):
Expand All @@ -26,18 +72,31 @@ def get_setting_from_config(config: OpenIDConnectBaseConfig, attr: str, *args) -
return import_from_settings(attr, *args)


def lookup_config(request: HttpRequest) -> type[OpenIDConnectBaseConfig]:
# config query param is set in `OIDCInit.get_extra_params`
def lookup_config(request: HttpRequest) -> type[OpenIDConnectConfigBase]:
# cache on request for optimized access
if config := getattr(request, "_oidc_generics_config_class", None):
return config

# The config_class key is added to the state in the OIDCInit.get method.
# TODO: verify that the state query param is present for error flows! Need to check
# the OAUTH2 spec for this, but according to ChatGeePeeTee if the request contains
# it, the callback must have it too.
state_key = request.GET.get("state")
if not state_key or state_key not in (
states := request.session.get("oidc_states", [])
):
raise BadRequest("Could not look up the referenced config.")

state = states[state_key]
try:
config = apps.get_model(request.GET.get("config", ""))
config = apps.get_model(state.get("config_class", ""))
except (LookupError, ValueError) as exc:
raise BadRequest("Could not look up the referenced config.") from exc

# TODO: check if this can be spoofed: starting the flow with a particular config
# and calling the callback with another one. If this is possible, we should
# store the intended config in the state, and then the query param is likely not
# even needed anymore.
if not issubclass(config, OpenIDConnectBaseConfig):
# Spoofing is not possible since we store it in the server-side session, but there
# can still be all sorts of programmer mistakes.
if not issubclass(config, OpenIDConnectConfigBase):
raise BadRequest("Invalid config referenced.")

request._oidc_generics_config_class = config # type: ignore
return config
16 changes: 10 additions & 6 deletions src/digid_eherkenning_oidc_generics/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from django.utils.http import url_has_allowed_host_and_scheme

import requests
from furl import furl
from mozilla_django_oidc.views import (
OIDCAuthenticationCallbackView as _OIDCAuthenticationCallbackView,
OIDCAuthenticationRequestView as _OIDCAuthenticationRequestView,
Expand Down Expand Up @@ -110,6 +111,14 @@ def get(
# it's easiest to just override the session key with the correct value.
request.session["oidc_login_next"] = return_url

# Store which config class to use in the state. We can not simply pass this as
# a querystring parameter appended to redirect_uri, as these are likely to be
# strictly validated. We must grab the state from the redirect Location.
state_key = furl(response.url).args["state"]
options = self.config_class._meta
state = request.session["oidc_states"][state_key]
state["config_class"] = f"{options.app_label}.{options.object_name}"

return response

def _check_idp_availability(self) -> None:
Expand All @@ -133,11 +142,6 @@ def get_extra_params(self, request: HttpRequest) -> dict[str, str]:
Add a keycloak identity provider hint if configured.
"""
extra = super().get_extra_params(request)
options = self.config_class._meta
# Store which config class to use in the params so that the callback view can
# extract this again.
# TODO: verify this cannot be tampered with!
extra["config"] = f"{options.app_label}.{options.object_name}"
if kc_idp_hint := self.get_settings("OIDC_KEYCLOAK_IDP_HINT", ""):
extra["kc_idp_hint"] = kc_idp_hint
return extra
Expand Down Expand Up @@ -203,7 +207,7 @@ def get_settings(self, attr, *args): # type: ignore

def dispatch(self, request: HttpRequest, *args, **kwargs):
self.config_class = lookup_config(request)
return super().dispatch(*args, **kwargs)
return super().dispatch(request, *args, **kwargs)

def get(self, request: HttpRequest):
# grab where the redirect next from the session and store it as a temporary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,30 @@ def setUp(self):
self.addCleanup(self.requests_mocker.stop)
self.requests_mocker.start()

def test_redirect_to_digid_oidc(self):
@patch(
"digid_eherkenning_oidc_generics.new_backends.DigiDEHerkenningOIDCBackend.verify_token",
return_value={"bsn": "123456789"},
)
def test_redirect_to_digid_oidc(self, mock_verify_token):
# TODO: switch to VCR instead of request mocks...
self.requests_mocker.get(
"http://provider.com/auth/realms/master/protocol/openid-connect/auth",
status_code=200,
)
self.requests_mocker.post(
"http://provider.com/auth/realms/master/protocol/openid-connect/token",
json={
"id_token": "-dummy-id-token-",
"access_token": "-dummy-access-token-",
},
)
self.requests_mocker.get(
"http://provider.com/auth/realms/master/protocol/openid-connect/userinfo",
json={"bsn": "123456789"},
)
form_url = get_start_form_url(self.form)
start_url = get_start_url(self.form, plugin_id="digid_oidc")
callback_url, oidc_login_next = "", ""

response = self.client.get(start_url)

Expand All @@ -74,10 +91,22 @@ def test_redirect_to_digid_oidc(self):
self.assertEqual(query_params["scope"], "openid bsn")
self.assertEqual(query_params["client_id"], "testclient")
self.assertEqual(
query_params["redirect_uri"],
(redirect_uri := query_params["redirect_uri"]),
f"http://testserver{reverse('digid_oidc:callback')}",
)

assert isinstance(redirect_uri, str)
callback_url = (
furl(redirect_uri)
.set(
{
"state": query_params["state"], # CSRF protection
"code": "-dummy-",
}
)
.url
)

with self.subTest("Return state setup"):
oidc_login_next = furl(self.client.session["oidc_login_next"])
query_params = oidc_login_next.query.params
Expand All @@ -91,6 +120,17 @@ def test_redirect_to_digid_oidc(self):
)
self.assertEqual(query_params["next"], form_url)

with self.subTest("Callback view/backend"):
# possibly something else failed earlier, no point in validating the return
# flow.
if callback_url and oidc_login_next:
response = self.client.get(callback_url)

mock_verify_token.assert_called_once()
self.assertRedirects(
response, str(oidc_login_next), fetch_redirect_response=False
)

def test_redirect_to_digid_oidc_internal_server_error(self):
self.requests_mocker.get(
"http://provider.com/auth/realms/master/protocol/openid-connect/auth",
Expand Down
Loading

0 comments on commit 4d839da

Please sign in to comment.