-
Notifications
You must be signed in to change notification settings - Fork 401
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
5e47fb2
commit fb728b5
Showing
40 changed files
with
1,560 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
__version__ = "0.3.1" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from django.contrib import admin | ||
from trench.models import MFAMethod | ||
|
||
|
||
@admin.register(MFAMethod) | ||
class MFAMethodAdmin(admin.ModelAdmin): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from django.apps import AppConfig | ||
|
||
|
||
class TrenchConfig(AppConfig): | ||
name = "trench" | ||
verbose_name = "django-trench" |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from trench.backends.base import AbstractMessageDispatcher | ||
from trench.models import MFAMethod | ||
from trench.query.get_mfa_config_by_name import get_mfa_config_by_name_query | ||
from trench.settings import HANDLER | ||
|
||
|
||
def get_mfa_handler(mfa_method: MFAMethod) -> AbstractMessageDispatcher: | ||
conf = get_mfa_config_by_name_query(name=mfa_method.name) | ||
dispatcher = conf[HANDLER] | ||
return dispatcher(mfa_method=mfa_method, config=conf) |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from typing import Callable, Set, Type | ||
|
||
from trench.backends.provider import get_mfa_handler | ||
from trench.command.generate_backup_codes import generate_backup_codes_command | ||
from trench.command.replace_mfa_method_backup_codes import ( | ||
regenerate_backup_codes_for_mfa_method_command, | ||
) | ||
from trench.exceptions import MFAMethodDoesNotExistError | ||
from trench.models import MFAMethod | ||
from trench.utils import get_mfa_model | ||
|
||
|
||
class ActivateMFAMethodCommand: | ||
def __init__( | ||
self, mfa_model: Type[MFAMethod], backup_codes_generator: Callable | ||
) -> None: | ||
self._mfa_model = mfa_model | ||
self._backup_codes_generator = backup_codes_generator | ||
|
||
def execute(self, user_id: int, name: str, code: str) -> Set[str]: | ||
mfa = self._mfa_model.objects.get_by_name(user_id=user_id, name=name) | ||
|
||
get_mfa_handler(mfa).confirm_activation(code) | ||
|
||
rows_affected = self._mfa_model.objects.filter( | ||
user_id=user_id, name=name | ||
).update( | ||
is_active=True, | ||
is_primary=not self._mfa_model.objects.primary_exists(user_id=user_id), | ||
) | ||
|
||
if rows_affected < 1: | ||
raise MFAMethodDoesNotExistError() | ||
|
||
backup_codes = regenerate_backup_codes_for_mfa_method_command( | ||
user_id=user_id, | ||
name=name, | ||
) | ||
|
||
return backup_codes | ||
|
||
|
||
activate_mfa_method_command = ActivateMFAMethodCommand( | ||
mfa_model=get_mfa_model(), | ||
backup_codes_generator=generate_backup_codes_command, | ||
).execute |
43 changes: 43 additions & 0 deletions
43
api/custom_auth/mfa/trench/command/authenticate_second_factor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from typing import Type | ||
|
||
from django.contrib.auth import get_user_model | ||
from django.contrib.auth.models import AbstractUser | ||
from trench.backends.provider import get_mfa_handler | ||
from trench.command.remove_backup_code import remove_backup_code_command | ||
from trench.command.validate_backup_code import validate_backup_code_command | ||
from trench.exceptions import InvalidCodeError, InvalidTokenError | ||
from trench.models import MFAMethod | ||
from trench.utils import get_mfa_model, user_token_generator | ||
|
||
User: AbstractUser = get_user_model() | ||
|
||
|
||
class AuthenticateSecondFactorCommand: | ||
def __init__(self, mfa_model: Type[MFAMethod]) -> None: | ||
self._mfa_model = mfa_model | ||
|
||
def execute(self, code: str, ephemeral_token: str) -> User: | ||
user = user_token_generator.check_token(user=None, token=ephemeral_token) | ||
if user is None: | ||
raise InvalidTokenError() | ||
self.is_authenticated(user_id=user.id, code=code) | ||
return user | ||
|
||
def is_authenticated(self, user_id: int, code: str) -> None: | ||
for auth_method in self._mfa_model.objects.list_active(user_id=user_id): | ||
validated_backup_code = validate_backup_code_command( | ||
value=code, backup_codes=auth_method.backup_codes | ||
) | ||
if get_mfa_handler(mfa_method=auth_method).validate_code(code=code): | ||
return | ||
if validated_backup_code: | ||
remove_backup_code_command( | ||
user_id=auth_method.user_id, method_name=auth_method.name, code=code | ||
) | ||
return | ||
raise InvalidCodeError() | ||
|
||
|
||
authenticate_second_step_command = AuthenticateSecondFactorCommand( | ||
mfa_model=get_mfa_model() | ||
).execute |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
from django.contrib.auth import authenticate, get_user_model | ||
from django.contrib.auth.models import AbstractUser | ||
from rest_framework.request import Request | ||
from trench.exceptions import UnauthenticatedError | ||
|
||
User: AbstractUser = get_user_model() | ||
|
||
|
||
class AuthenticateUserCommand: | ||
@staticmethod | ||
def execute(request: Request, username: str, password: str) -> User: | ||
user = authenticate( | ||
request=request, | ||
username=username, | ||
password=password, | ||
) | ||
if user is None: | ||
raise UnauthenticatedError() | ||
return user | ||
|
||
|
||
authenticate_user_command = AuthenticateUserCommand.execute |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
from typing import Callable, Type | ||
|
||
from trench.command.create_secret import create_secret_command | ||
from trench.exceptions import MFAMethodAlreadyActiveError | ||
from trench.models import MFAMethod | ||
from trench.utils import get_mfa_model | ||
|
||
|
||
class CreateMFAMethodCommand: | ||
def __init__(self, secret_generator: Callable, mfa_model: Type[MFAMethod]) -> None: | ||
self._mfa_model = mfa_model | ||
self._create_secret = secret_generator | ||
|
||
def execute(self, user_id: int, name: str) -> MFAMethod: | ||
mfa, created = self._mfa_model.objects.get_or_create( | ||
user_id=user_id, | ||
name=name, | ||
defaults={ | ||
"secret": self._create_secret, | ||
"is_active": False, | ||
}, | ||
) | ||
if not created and mfa.is_active: | ||
raise MFAMethodAlreadyActiveError() | ||
return mfa | ||
|
||
|
||
create_mfa_method_command = CreateMFAMethodCommand( | ||
secret_generator=create_secret_command, mfa_model=get_mfa_model() | ||
).execute |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from pyotp import TOTP | ||
|
||
|
||
class CreateOTPCommand: | ||
@staticmethod | ||
def execute(secret: str, interval: int) -> TOTP: | ||
return TOTP(secret, interval=interval) | ||
|
||
|
||
create_otp_command = CreateOTPCommand.execute |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from typing import Callable | ||
|
||
from pyotp import random_base32 | ||
from trench.settings import TrenchAPISettings, trench_settings | ||
|
||
|
||
class CreateSecretCommand: | ||
def __init__(self, generator: Callable, settings: TrenchAPISettings) -> None: | ||
self._generator = generator | ||
self._settings = settings | ||
|
||
def execute(self) -> str: | ||
return self._generator(length=self._settings.SECRET_KEY_LENGTH) | ||
|
||
|
||
create_secret_command = CreateSecretCommand( | ||
generator=random_base32, settings=trench_settings | ||
).execute |
34 changes: 34 additions & 0 deletions
34
api/custom_auth/mfa/trench/command/deactivate_mfa_method.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from typing import Type | ||
|
||
from django.db.transaction import atomic | ||
from trench.exceptions import ( | ||
DeactivationOfPrimaryMFAMethodError, | ||
MFANotEnabledError, | ||
) | ||
from trench.models import MFAMethod | ||
from trench.utils import get_mfa_model | ||
|
||
|
||
class DeactivateMFAMethodCommand: | ||
def __init__(self, mfa_model: Type[MFAMethod]) -> None: | ||
self._mfa_model = mfa_model | ||
|
||
@atomic | ||
def execute(self, mfa_method_name: str, user_id: int) -> None: | ||
mfa = self._mfa_model.objects.get_by_name(user_id=user_id, name=mfa_method_name) | ||
number_of_active_mfa_methods = self._mfa_model.objects.filter( | ||
user_id=user_id, is_active=True | ||
).count() | ||
if mfa.is_primary and number_of_active_mfa_methods > 1: | ||
raise DeactivationOfPrimaryMFAMethodError() | ||
if not mfa.is_active: | ||
raise MFANotEnabledError() | ||
|
||
self._mfa_model.objects.filter(user_id=user_id, name=mfa_method_name).update( | ||
is_active=False, is_primary=False | ||
) | ||
|
||
|
||
deactivate_mfa_method_command = DeactivateMFAMethodCommand( | ||
mfa_model=get_mfa_model() | ||
).execute |
38 changes: 38 additions & 0 deletions
38
api/custom_auth/mfa/trench/command/generate_backup_codes.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
from typing import Callable, Set | ||
|
||
from django.utils.crypto import get_random_string | ||
from trench.settings import trench_settings | ||
|
||
|
||
class GenerateBackupCodesCommand: | ||
def __init__(self, random_string_generator: Callable) -> None: | ||
self._random_string_generator = random_string_generator | ||
|
||
def execute( | ||
self, | ||
quantity: int = trench_settings.BACKUP_CODES_QUANTITY, | ||
length: int = trench_settings.BACKUP_CODES_LENGTH, | ||
allowed_chars: str = trench_settings.BACKUP_CODES_CHARACTERS, | ||
) -> Set[str]: | ||
""" | ||
Generates random encrypted backup codes. | ||
:param quantity: How many codes should be generated | ||
:type quantity: int | ||
:param length: How long codes should be | ||
:type length: int | ||
:param allowed_chars: Characters to create backup codes from | ||
:type allowed_chars: str | ||
:returns: Encrypted backup codes | ||
:rtype: set[str] | ||
""" | ||
return { | ||
self._random_string_generator(length, allowed_chars) | ||
for _ in range(quantity) | ||
} | ||
|
||
|
||
generate_backup_codes_command = GenerateBackupCodesCommand( | ||
random_string_generator=get_random_string, | ||
).execute |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from typing import Any, Set, Type | ||
|
||
from django.contrib.auth.hashers import check_password | ||
from trench.exceptions import InvalidCodeError, MFAMethodDoesNotExistError | ||
from trench.models import MFAMethod | ||
from trench.settings import TrenchAPISettings, trench_settings | ||
from trench.utils import get_mfa_model | ||
|
||
|
||
class RemoveBackupCodeCommand: | ||
def __init__(self, mfa_model: Type[MFAMethod], settings: TrenchAPISettings) -> None: | ||
self._mfa_model = mfa_model | ||
self._settings = settings | ||
|
||
def execute(self, user_id: Any, method_name: str, code: str) -> None: | ||
serialized_codes = ( | ||
self._mfa_model.objects.filter(user_id=user_id, name=method_name) | ||
.values_list("_backup_codes", flat=True) | ||
.first() | ||
) | ||
if serialized_codes is None: | ||
raise MFAMethodDoesNotExistError() | ||
codes = MFAMethod._BACKUP_CODES_DELIMITER.join( | ||
self._remove_code_from_set( | ||
backup_codes=set( | ||
serialized_codes.split(MFAMethod._BACKUP_CODES_DELIMITER) | ||
), | ||
code=code, | ||
) | ||
) | ||
self._mfa_model.objects.filter(user_id=user_id, name=method_name).update( | ||
_backup_codes=codes | ||
) | ||
|
||
def _remove_code_from_set(self, backup_codes: Set[str], code: str) -> Set[str]: | ||
if not self._settings.ENCRYPT_BACKUP_CODES: | ||
backup_codes.remove(code) | ||
return backup_codes | ||
for backup_code in backup_codes: | ||
if check_password(code, backup_code): | ||
backup_codes.remove(backup_code) | ||
return backup_codes | ||
raise InvalidCodeError() | ||
|
||
|
||
remove_backup_code_command = RemoveBackupCodeCommand( | ||
mfa_model=get_mfa_model(), | ||
settings=trench_settings, | ||
).execute |
49 changes: 49 additions & 0 deletions
49
api/custom_auth/mfa/trench/command/replace_mfa_method_backup_codes.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from typing import Callable, Set, Type | ||
|
||
from django.contrib.auth.hashers import make_password | ||
from trench.command.generate_backup_codes import generate_backup_codes_command | ||
from trench.exceptions import MFAMethodDoesNotExistError | ||
from trench.models import MFAMethod | ||
from trench.settings import trench_settings | ||
from trench.utils import get_mfa_model | ||
|
||
|
||
class RegenerateBackupCodesForMFAMethodCommand: | ||
def __init__( | ||
self, | ||
requires_encryption: bool, | ||
mfa_model: Type[MFAMethod], | ||
code_hasher: Callable, | ||
codes_generator: Callable, | ||
) -> None: | ||
self._requires_encryption = requires_encryption | ||
self._mfa_model = mfa_model | ||
self._code_hasher = code_hasher | ||
self._codes_generator = codes_generator | ||
|
||
def execute(self, user_id: int, name: str) -> Set[str]: | ||
backup_codes = self._codes_generator() | ||
rows_affected = self._mfa_model.objects.filter( | ||
user_id=user_id, name=name | ||
).update( | ||
_backup_codes=MFAMethod._BACKUP_CODES_DELIMITER.join( | ||
[self._code_hasher(backup_code) for backup_code in backup_codes] | ||
if self._requires_encryption | ||
else backup_codes | ||
), | ||
) | ||
|
||
if rows_affected < 1: | ||
raise MFAMethodDoesNotExistError() | ||
|
||
return backup_codes | ||
|
||
|
||
regenerate_backup_codes_for_mfa_method_command = ( | ||
RegenerateBackupCodesForMFAMethodCommand( | ||
requires_encryption=trench_settings.ENCRYPT_BACKUP_CODES, | ||
mfa_model=get_mfa_model(), | ||
code_hasher=make_password, | ||
codes_generator=generate_backup_codes_command, | ||
).execute | ||
) |
Oops, something went wrong.