Skip to content

Commit

Permalink
wip: get rid of trench
Browse files Browse the repository at this point in the history
  • Loading branch information
gagantrivedi committed May 21, 2024
1 parent ecb9104 commit 5e47fb2
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 12 deletions.
2 changes: 1 addition & 1 deletion api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@
"MFA_METHODS": {
"app": {
"VERBOSE_NAME": "TOTP App",
"VALIDITY_PERIOD": 60 * 10,
"VALIDITY_PERIOD": 30,
"USES_THIRD_PARTY_CLIENT": True,
"HANDLER": "custom_auth.mfa.backends.application.CustomApplicationBackend",
},
Expand Down
27 changes: 22 additions & 5 deletions api/custom_auth/mfa/backends/application.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
from typing import Any, Dict

from django.conf import settings
from pyotp import TOTP
from rest_framework.response import Response
from trench.backends.application import ApplicationMessageDispatcher
from trench.models import MFAMethod


class CustomApplicationBackend:
def __init__(self, mfa_method: MFAMethod, config: Dict[str, Any]) -> None:
self._mfa_method = mfa_method
self._config = config
self._totp = TOTP(self._mfa_method.secret)

class CustomApplicationBackend(ApplicationMessageDispatcher):
def dispatch_message(self):
original_message = super().dispatch_message()
qr_link = self._totp.provisioning_uri(
self._mfa_method.user.email, settings.TRENCH_AUTH["APPLICATION_ISSUER_NAME"]
)
data = {
"qr_link": original_message.data["details"],
"qr_link": qr_link,
"secret": self._mfa_method.secret,
}
return Response(data)

def confirm_activation(self, code: str) -> None:
pass

def validate_confirmation_code(self, code: str) -> bool:
return self.validate_code(code)

def validate_code(self, code: str) -> bool:
return self._get_otp().verify(otp=code, valid_window=20)
return self._totp.verify(otp=code, valid_window=20)
57 changes: 57 additions & 0 deletions api/custom_auth/mfa/user_token_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from datetime import datetime
from typing import Optional

from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import User
from django.contrib.auth.tokens import PasswordResetTokenGenerator
from django.utils.crypto import constant_time_compare, salted_hmac
from django.utils.http import base36_to_int, int_to_base36


class UserTokenGenerator(PasswordResetTokenGenerator):
"""
Custom token generator:
- user pk in token
- expires after 15 minutes
- longer hash (40 instead of 20)
"""

KEY_SALT = "django.contrib.auth.tokens.PasswordResetTokenGenerator"
SECRET = settings.SECRET_KEY
EXPIRY_TIME = 60 * 15

def make_token(self, user: User) -> str:
return self._make_token_with_timestamp(user, int(datetime.now().timestamp()))

def check_token(self, user: User, token: str) -> Optional[User]:
user_model = get_user_model()
if not token:
return None
try:
token = str(token)
user_pk, ts_b36, token_hash = token.rsplit("-", 2)
ts = base36_to_int(ts_b36)
user = user_model._default_manager.get(pk=user_pk)
except (ValueError, TypeError, user_model.DoesNotExist):
return None

if (datetime.now().timestamp() - ts) > self.EXPIRY_TIME:
return None # pragma: no cover

if not constant_time_compare(self._make_token_with_timestamp(user, ts), token):
return None # pragma: no cover

return user

def _make_token_with_timestamp(self, user: User, timestamp: int, **kwargs) -> str:
ts_b36 = int_to_base36(timestamp)
token_hash = salted_hmac(
self.KEY_SALT,
self._make_hash_value(user, timestamp),
secret=self.SECRET,
).hexdigest()
return f"{user.pk}-{ts_b36}-{token_hash}"


user_token_generator = UserTokenGenerator()
15 changes: 9 additions & 6 deletions api/custom_auth/views.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from django.conf import settings
from django.contrib.auth import user_logged_out
from django.utils.decorators import method_decorator
from djoser.views import TokenCreateView, UserViewSet
Expand All @@ -9,23 +10,23 @@
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.throttling import ScopedRateThrottle
from trench.backends.provider import get_mfa_handler
from trench.command.authenticate_second_factor import (
authenticate_second_step_command,
)
from trench.exceptions import MFAMethodDoesNotExistError, MFAValidationError
from trench.models import MFAMethod
from trench.responses import ErrorResponse
from trench.serializers import CodeLoginSerializer
from trench.utils import get_mfa_model, user_token_generator
from trench.views.authtoken import MFAFirstStepAuthTokenView

from custom_auth.mfa.backends.application import CustomApplicationBackend
from custom_auth.mfa.user_token_generator import user_token_generator
from custom_auth.serializers import CustomUserDelete
from users.constants import DEFAULT_DELETE_ORPHAN_ORGANISATIONS_VALUE

from .models import UserPasswordResetRequest


class CustomAuthTokenLoginOrRequestMFACode(TokenCreateView, MFAFirstStepAuthTokenView):
class CustomAuthTokenLoginOrRequestMFACode(TokenCreateView):
"""
Class to handle throttling for login requests
"""
Expand All @@ -38,9 +39,11 @@ def post(self, request: Request) -> Response:
serializer.is_valid(raise_exception=True)
user = serializer.user
try:
mfa_model = get_mfa_model()
mfa_model = MFAMethod
mfa_method = mfa_model.objects.get_primary_active(user_id=user.id)
get_mfa_handler(mfa_method=mfa_method).dispatch_message()
conf = settings.TRENCH_AUTH["MFA_METHODS"]["app"]
mfa_handler = CustomApplicationBackend(mfa_method=mfa_method, config=conf)
mfa_handler.dispatch_message()
return Response(
data={
"ephemeral_token": user_token_generator.make_token(user),
Expand Down

0 comments on commit 5e47fb2

Please sign in to comment.