diff --git a/api/app/settings/common.py b/api/app/settings/common.py index 996e16c97ca61..f6eb554cffd6d 100644 --- a/api/app/settings/common.py +++ b/api/app/settings/common.py @@ -762,7 +762,7 @@ "MFA_METHODS": { "app": { "VERBOSE_NAME": "TOTP App", - "VALIDITY_PERIOD": 60 * 10, + "VALIDITY_PERIOD": 30, "USES_THIRD_PARTY_CLIENT": True, "HANDLER": "custom_auth.mfa.backends.application.CustomApplicationBackend", }, diff --git a/api/custom_auth/mfa/backends/application.py b/api/custom_auth/mfa/backends/application.py index 0ee1b87be29a4..1903851a720a2 100644 --- a/api/custom_auth/mfa/backends/application.py +++ b/api/custom_auth/mfa/backends/application.py @@ -1,15 +1,32 @@ +from typing import Any, Dict + +from django.conf import settings +from pyotp import TOTP from rest_framework.response import Response -from trench.backends.application import ApplicationMessageDispatcher +from trench.models import MFAMethod + +class CustomApplicationBackend: + def __init__(self, mfa_method: MFAMethod, config: Dict[str, Any]) -> None: + self._mfa_method = mfa_method + self._config = config + self._totp = TOTP(self._mfa_method.secret) -class CustomApplicationBackend(ApplicationMessageDispatcher): def dispatch_message(self): - original_message = super().dispatch_message() + qr_link = self._totp.provisioning_uri( + self._mfa_method.user.email, settings.TRENCH_AUTH["APPLICATION_ISSUER_NAME"] + ) data = { - "qr_link": original_message.data["details"], + "qr_link": qr_link, "secret": self._mfa_method.secret, } return Response(data) + def confirm_activation(self, code: str) -> None: + pass + + def validate_confirmation_code(self, code: str) -> bool: + return self.validate_code(code) + def validate_code(self, code: str) -> bool: - return self._get_otp().verify(otp=code, valid_window=20) + return self._totp.verify(otp=code, valid_window=20) diff --git a/api/custom_auth/mfa/user_token_generator.py b/api/custom_auth/mfa/user_token_generator.py new file mode 100644 index 0000000000000..3f85ea015e769 --- /dev/null +++ b/api/custom_auth/mfa/user_token_generator.py @@ -0,0 +1,57 @@ +from datetime import datetime +from typing import Optional + +from django.conf import settings +from django.contrib.auth import get_user_model +from django.contrib.auth.models import User +from django.contrib.auth.tokens import PasswordResetTokenGenerator +from django.utils.crypto import constant_time_compare, salted_hmac +from django.utils.http import base36_to_int, int_to_base36 + + +class UserTokenGenerator(PasswordResetTokenGenerator): + """ + Custom token generator: + - user pk in token + - expires after 15 minutes + - longer hash (40 instead of 20) + """ + + KEY_SALT = "django.contrib.auth.tokens.PasswordResetTokenGenerator" + SECRET = settings.SECRET_KEY + EXPIRY_TIME = 60 * 15 + + def make_token(self, user: User) -> str: + return self._make_token_with_timestamp(user, int(datetime.now().timestamp())) + + def check_token(self, user: User, token: str) -> Optional[User]: + user_model = get_user_model() + if not token: + return None + try: + token = str(token) + user_pk, ts_b36, token_hash = token.rsplit("-", 2) + ts = base36_to_int(ts_b36) + user = user_model._default_manager.get(pk=user_pk) + except (ValueError, TypeError, user_model.DoesNotExist): + return None + + if (datetime.now().timestamp() - ts) > self.EXPIRY_TIME: + return None # pragma: no cover + + if not constant_time_compare(self._make_token_with_timestamp(user, ts), token): + return None # pragma: no cover + + return user + + def _make_token_with_timestamp(self, user: User, timestamp: int, **kwargs) -> str: + ts_b36 = int_to_base36(timestamp) + token_hash = salted_hmac( + self.KEY_SALT, + self._make_hash_value(user, timestamp), + secret=self.SECRET, + ).hexdigest() + return f"{user.pk}-{ts_b36}-{token_hash}" + + +user_token_generator = UserTokenGenerator() diff --git a/api/custom_auth/views.py b/api/custom_auth/views.py index d1962298963d3..f7c434493199a 100644 --- a/api/custom_auth/views.py +++ b/api/custom_auth/views.py @@ -1,3 +1,4 @@ +from django.conf import settings from django.contrib.auth import user_logged_out from django.utils.decorators import method_decorator from djoser.views import TokenCreateView, UserViewSet @@ -9,23 +10,23 @@ from rest_framework.request import Request from rest_framework.response import Response from rest_framework.throttling import ScopedRateThrottle -from trench.backends.provider import get_mfa_handler from trench.command.authenticate_second_factor import ( authenticate_second_step_command, ) from trench.exceptions import MFAMethodDoesNotExistError, MFAValidationError +from trench.models import MFAMethod from trench.responses import ErrorResponse from trench.serializers import CodeLoginSerializer -from trench.utils import get_mfa_model, user_token_generator -from trench.views.authtoken import MFAFirstStepAuthTokenView +from custom_auth.mfa.backends.application import CustomApplicationBackend +from custom_auth.mfa.user_token_generator import user_token_generator from custom_auth.serializers import CustomUserDelete from users.constants import DEFAULT_DELETE_ORPHAN_ORGANISATIONS_VALUE from .models import UserPasswordResetRequest -class CustomAuthTokenLoginOrRequestMFACode(TokenCreateView, MFAFirstStepAuthTokenView): +class CustomAuthTokenLoginOrRequestMFACode(TokenCreateView): """ Class to handle throttling for login requests """ @@ -38,9 +39,11 @@ def post(self, request: Request) -> Response: serializer.is_valid(raise_exception=True) user = serializer.user try: - mfa_model = get_mfa_model() + mfa_model = MFAMethod mfa_method = mfa_model.objects.get_primary_active(user_id=user.id) - get_mfa_handler(mfa_method=mfa_method).dispatch_message() + conf = settings.TRENCH_AUTH["MFA_METHODS"]["app"] + mfa_handler = CustomApplicationBackend(mfa_method=mfa_method, config=conf) + mfa_handler.dispatch_message() return Response( data={ "ephemeral_token": user_token_generator.make_token(user),