Skip to content

Commit

Permalink
Add trench
Browse files Browse the repository at this point in the history
  • Loading branch information
gagantrivedi committed May 22, 2024
1 parent 5e47fb2 commit 9f31bc2
Show file tree
Hide file tree
Showing 42 changed files with 1,467 additions and 460 deletions.
2 changes: 1 addition & 1 deletion api/app/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
"api_keys",
"features.feature_external_resources",
# 2FA
"trench",
"custom_auth.mfa.trench",
# health check plugins
"health_check",
"health_check.db",
Expand Down
3 changes: 2 additions & 1 deletion api/custom_auth/mfa/backends/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from django.conf import settings
from pyotp import TOTP
from rest_framework.response import Response
from trench.models import MFAMethod

from custom_auth.mfa.trench.models import MFAMethod


class CustomApplicationBackend:
Expand Down
1 change: 1 addition & 0 deletions api/custom_auth/mfa/trench/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "0.3.1"
8 changes: 8 additions & 0 deletions api/custom_auth/mfa/trench/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from django.contrib import admin

from custom_auth.mfa.trench.models import MFAMethod


@admin.register(MFAMethod)
class MFAMethodAdmin(admin.ModelAdmin):
pass
6 changes: 6 additions & 0 deletions api/custom_auth/mfa/trench/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from django.apps import AppConfig


class TrenchConfig(AppConfig):
name = "custom_auth.mfa.trench"
verbose_name = "django-trench"
Empty file.
Empty file.
47 changes: 47 additions & 0 deletions api/custom_auth/mfa/trench/command/activate_mfa_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Callable, Set, Type

from custom_auth.mfa.trench.command.generate_backup_codes import (
generate_backup_codes_command,
)
from custom_auth.mfa.trench.command.replace_mfa_method_backup_codes import (
regenerate_backup_codes_for_mfa_method_command,
)
from custom_auth.mfa.trench.exceptions import MFAMethodDoesNotExistError
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_handler, get_mfa_model


class ActivateMFAMethodCommand:
def __init__(
self, mfa_model: Type[MFAMethod], backup_codes_generator: Callable
) -> None:
self._mfa_model = mfa_model
self._backup_codes_generator = backup_codes_generator

def execute(self, user_id: int, name: str, code: str) -> Set[str]:
mfa = self._mfa_model.objects.get_by_name(user_id=user_id, name=name)

get_mfa_handler(mfa).confirm_activation(code)

rows_affected = self._mfa_model.objects.filter(
user_id=user_id, name=name
).update(
is_active=True,
is_primary=not self._mfa_model.objects.primary_exists(user_id=user_id),
)

if rows_affected < 1:
raise MFAMethodDoesNotExistError()

backup_codes = regenerate_backup_codes_for_mfa_method_command(
user_id=user_id,
name=name,
)

return backup_codes


activate_mfa_method_command = ActivateMFAMethodCommand(
mfa_model=get_mfa_model(),
backup_codes_generator=generate_backup_codes_command,
).execute
36 changes: 36 additions & 0 deletions api/custom_auth/mfa/trench/command/authenticate_second_factor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from custom_auth.mfa.trench.command.remove_backup_code import (
remove_backup_code_command,
)
from custom_auth.mfa.trench.command.validate_backup_code import (
validate_backup_code_command,
)
from custom_auth.mfa.trench.exceptions import (
InvalidCodeError,
InvalidTokenError,
)
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_handler, user_token_generator
from users.models import FFAdminUser


def is_authenticated(user_id: int, code: str) -> None:
for auth_method in MFAMethod.objects.list_active(user_id=user_id):
validated_backup_code = validate_backup_code_command(
value=code, backup_codes=auth_method.backup_codes
)
if get_mfa_handler(mfa_method=auth_method).validate_code(code=code):
return
if validated_backup_code:
remove_backup_code_command(
user_id=auth_method.user_id, method_name=auth_method.name, code=code
)
return
raise InvalidCodeError()


def authenticate_second_step_command(code: str, ephemeral_token: str) -> FFAdminUser:
user = user_token_generator.check_token(user=None, token=ephemeral_token)
if user is None:
raise InvalidTokenError()
is_authenticated(user_id=user.id, code=code)
return user
22 changes: 22 additions & 0 deletions api/custom_auth/mfa/trench/command/authenticate_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from django.contrib.auth import authenticate, get_user_model
from django.contrib.auth.models import AbstractUser
from rest_framework.request import Request
from trench.exceptions import UnauthenticatedError

User: AbstractUser = get_user_model()


class AuthenticateUserCommand:
@staticmethod
def execute(request: Request, username: str, password: str) -> User:
user = authenticate(
request=request,
username=username,
password=password,
)
if user is None:
raise UnauthenticatedError()
return user


authenticate_user_command = AuthenticateUserCommand.execute
30 changes: 30 additions & 0 deletions api/custom_auth/mfa/trench/command/create_mfa_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Callable, Type

from custom_auth.mfa.trench.command.create_secret import create_secret_command
from custom_auth.mfa.trench.exceptions import MFAMethodAlreadyActiveError
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_model


class CreateMFAMethodCommand:
def __init__(self, secret_generator: Callable, mfa_model: Type[MFAMethod]) -> None:
self._mfa_model = mfa_model
self._create_secret = secret_generator

def execute(self, user_id: int, name: str) -> MFAMethod:
mfa, created = self._mfa_model.objects.get_or_create(
user_id=user_id,
name=name,
defaults={
"secret": self._create_secret,
"is_active": False,
},
)
if not created and mfa.is_active:
raise MFAMethodAlreadyActiveError()
return mfa


create_mfa_method_command = CreateMFAMethodCommand(
secret_generator=create_secret_command, mfa_model=get_mfa_model()
).execute
10 changes: 10 additions & 0 deletions api/custom_auth/mfa/trench/command/create_otp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from pyotp import TOTP


class CreateOTPCommand:
@staticmethod
def execute(secret: str, interval: int) -> TOTP:
return TOTP(secret, interval=interval)


create_otp_command = CreateOTPCommand.execute
19 changes: 19 additions & 0 deletions api/custom_auth/mfa/trench/command/create_secret.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import Callable

from pyotp import random_base32

from custom_auth.mfa.trench.settings import TrenchAPISettings, trench_settings


class CreateSecretCommand:
def __init__(self, generator: Callable, settings: TrenchAPISettings) -> None:
self._generator = generator
self._settings = settings

def execute(self) -> str:
return self._generator(length=self._settings.SECRET_KEY_LENGTH)


create_secret_command = CreateSecretCommand(
generator=random_base32, settings=trench_settings
).execute
35 changes: 35 additions & 0 deletions api/custom_auth/mfa/trench/command/deactivate_mfa_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Type

from django.db.transaction import atomic

from custom_auth.mfa.trench.exceptions import (
DeactivationOfPrimaryMFAMethodError,
MFANotEnabledError,
)
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.utils import get_mfa_model


class DeactivateMFAMethodCommand:
def __init__(self, mfa_model: Type[MFAMethod]) -> None:
self._mfa_model = mfa_model

@atomic
def execute(self, mfa_method_name: str, user_id: int) -> None:
mfa = self._mfa_model.objects.get_by_name(user_id=user_id, name=mfa_method_name)
number_of_active_mfa_methods = self._mfa_model.objects.filter(
user_id=user_id, is_active=True
).count()
if mfa.is_primary and number_of_active_mfa_methods > 1:
raise DeactivationOfPrimaryMFAMethodError()
if not mfa.is_active:
raise MFANotEnabledError()

self._mfa_model.objects.filter(user_id=user_id, name=mfa_method_name).update(
is_active=False, is_primary=False
)


deactivate_mfa_method_command = DeactivateMFAMethodCommand(
mfa_model=get_mfa_model()
).execute
39 changes: 39 additions & 0 deletions api/custom_auth/mfa/trench/command/generate_backup_codes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import Callable, Set

from django.utils.crypto import get_random_string

from custom_auth.mfa.trench.settings import trench_settings


class GenerateBackupCodesCommand:
def __init__(self, random_string_generator: Callable) -> None:
self._random_string_generator = random_string_generator

def execute(
self,
quantity: int = trench_settings.BACKUP_CODES_QUANTITY,
length: int = trench_settings.BACKUP_CODES_LENGTH,
allowed_chars: str = trench_settings.BACKUP_CODES_CHARACTERS,
) -> Set[str]:
"""
Generates random encrypted backup codes.
:param quantity: How many codes should be generated
:type quantity: int
:param length: How long codes should be
:type length: int
:param allowed_chars: Characters to create backup codes from
:type allowed_chars: str
:returns: Encrypted backup codes
:rtype: set[str]
"""
return {
self._random_string_generator(length, allowed_chars)
for _ in range(quantity)
}


generate_backup_codes_command = GenerateBackupCodesCommand(
random_string_generator=get_random_string,
).execute
41 changes: 41 additions & 0 deletions api/custom_auth/mfa/trench/command/remove_backup_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from typing import Any, Set

from django.contrib.auth.hashers import check_password

from custom_auth.mfa.trench.exceptions import (
InvalidCodeError,
MFAMethodDoesNotExistError,
)
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.settings import trench_settings


def remove_backup_code_command(user_id: Any, method_name: str, code: str) -> None:
serialized_codes = (
MFAMethod.objects.filter(user_id=user_id, name=method_name)
.values_list("_backup_codes", flat=True)
.first()
)
if serialized_codes is None:
raise MFAMethodDoesNotExistError()
codes = MFAMethod._BACKUP_CODES_DELIMITER.join(
_remove_code_from_set(
backup_codes=set(serialized_codes.split(MFAMethod._BACKUP_CODES_DELIMITER)),
code=code,
)
)
MFAMethod.objects.filter(user_id=user_id, name=method_name).update(
_backup_codes=codes
)


def _remove_code_from_set(backup_codes: Set[str], code: str) -> Set[str]:
settings = trench_settings
if not settings.ENCRYPT_BACKUP_CODES:
backup_codes.remove(code)
return backup_codes
for backup_code in backup_codes:
if check_password(code, backup_code):
backup_codes.remove(backup_code)
return backup_codes
raise InvalidCodeError()
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Callable, Set, Type

from django.contrib.auth.hashers import make_password

from custom_auth.mfa.trench.command.generate_backup_codes import (
generate_backup_codes_command,
)
from custom_auth.mfa.trench.exceptions import MFAMethodDoesNotExistError
from custom_auth.mfa.trench.models import MFAMethod
from custom_auth.mfa.trench.settings import trench_settings
from custom_auth.mfa.trench.utils import get_mfa_model


class RegenerateBackupCodesForMFAMethodCommand:
def __init__(
self,
requires_encryption: bool,
mfa_model: Type[MFAMethod],
code_hasher: Callable,
codes_generator: Callable,
) -> None:
self._requires_encryption = requires_encryption
self._mfa_model = mfa_model
self._code_hasher = code_hasher
self._codes_generator = codes_generator

def execute(self, user_id: int, name: str) -> Set[str]:
backup_codes = self._codes_generator()
rows_affected = self._mfa_model.objects.filter(
user_id=user_id, name=name
).update(
_backup_codes=MFAMethod._BACKUP_CODES_DELIMITER.join(
[self._code_hasher(backup_code) for backup_code in backup_codes]
if self._requires_encryption
else backup_codes
),
)

if rows_affected < 1:
raise MFAMethodDoesNotExistError()

return backup_codes


regenerate_backup_codes_for_mfa_method_command = (
RegenerateBackupCodesForMFAMethodCommand(
requires_encryption=trench_settings.ENCRYPT_BACKUP_CODES,
mfa_model=get_mfa_model(),
code_hasher=make_password,
codes_generator=generate_backup_codes_command,
).execute
)
Loading

0 comments on commit 9f31bc2

Please sign in to comment.