-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: integrate django authentication system (#16)
* feat: add casbin_backend * feat: add unit test * feat: add backend to doc * fix: clear policy cache for tests * fix: Fix code style issues with Black
- Loading branch information
Showing
10 changed files
with
236 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
name: Lint | ||
|
||
on: | ||
# Trigger the workflow on push, | ||
# but only for the master branch | ||
push: | ||
branches: | ||
- master | ||
|
||
permissions: | ||
checks: write | ||
contents: write | ||
|
||
jobs: | ||
run-linters: | ||
name: Run linters | ||
runs-on: ubuntu-latest | ||
|
||
steps: | ||
- name: Check out Git repository | ||
uses: actions/checkout@v2 | ||
|
||
- name: Set up Python | ||
uses: actions/setup-python@v1 | ||
with: | ||
python-version: 3.8 | ||
|
||
- name: Install Python dependencies | ||
run: pip install black flake8 | ||
|
||
- name: Get email | ||
run: | | ||
actor="${{ github.actor }}" | ||
email=$(curl -H "Authorization: token $GITHUB_TOKEN" https://api.github.com/users/$actor | jq -r '.email') | ||
echo "Email is $email" | ||
- name: Run linters | ||
uses: wearerequired/lint-action@v2 | ||
with: | ||
github_token: ${{ secrets.GITHUB_TOKEN }} | ||
auto_fix: true | ||
black: true | ||
black_auto_fix: true | ||
commit_message: "fix: Fix code style issues with Black" | ||
git_name: semantic-release-bot | ||
git_email: semantic-release-bot@martynus.net |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
from .casbin_backend import CasbinBackend, set_enforcer_for_casbin_backend |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
from django.contrib.auth import get_user_model | ||
from django.contrib.auth.backends import BaseBackend | ||
|
||
from dauthz.core import enforcer, enforcers | ||
|
||
|
||
UserModel = get_user_model() | ||
|
||
|
||
class CasbinBackend(BaseBackend): | ||
""" | ||
Check permissions with Casbin. | ||
""" | ||
|
||
def __init__(self): | ||
self.enforcer = enforcer | ||
|
||
def authenticate(self, request, username=None, password=None, **kwargs): | ||
if username is None: | ||
username = kwargs.get(UserModel.USERNAME_FIELD) | ||
if username is None or password is None: | ||
return | ||
try: | ||
user = UserModel._default_manager.get_by_natural_key(username) | ||
except UserModel.DoesNotExist: | ||
# Run the default password hasher once to reduce the timing | ||
# difference between an existing and a nonexistent user (#20760). | ||
UserModel().set_password(password) | ||
else: | ||
if user.check_password(password) and self.user_can_authenticate(user): | ||
return user | ||
|
||
def user_can_authenticate(self, user): | ||
""" | ||
Reject users with is_active=False. Custom user models that don't have | ||
that attribute are allowed. | ||
""" | ||
return getattr(user, "is_active", True) | ||
|
||
def _get_permissions(self, user_obj, obj, from_name): | ||
""" | ||
Return the direct permissions of `user_obj` | ||
""" | ||
if not user_obj.is_active or user_obj.is_anonymous or obj is not None: | ||
return set() | ||
|
||
perm_cache_name = "_%s_perm_cache" % from_name | ||
if not hasattr(user_obj, perm_cache_name): | ||
policies = self.enforcer.get_implicit_permissions_for_user(user_obj.username) | ||
perms = tuple(map(tuple, policies)) | ||
setattr(user_obj, perm_cache_name, perms) | ||
return getattr(user_obj, perm_cache_name) | ||
|
||
def get_user_permissions(self, user_obj, obj=None): | ||
""" | ||
Return a set of permission the user `user_obj` has from their | ||
`user_permissions`. | ||
""" | ||
policies = self.enforcer.get_permissions_for_user(user_obj.username) | ||
res = tuple(map(tuple, policies)) | ||
return res | ||
|
||
def get_all_permissions(self, user_obj, obj=None): | ||
""" | ||
Return a set of permission the user `user_obj` and inherited roles have. | ||
The result is cached for each user. Refresh By requesting a new instance. | ||
""" | ||
if not user_obj.is_active or user_obj.is_anonymous or obj is not None: | ||
return set() | ||
return self._get_permissions(user_obj, obj, from_name="user") | ||
|
||
def has_perm(self, user_obj, perm, obj=None): | ||
return user_obj.is_active and super().has_perm(user_obj, perm, obj=obj) | ||
|
||
def get_user(self, user_id): | ||
try: | ||
user = UserModel._default_manager.get(pk=user_id) | ||
except UserModel.DoesNotExist: | ||
return None | ||
return user if self.user_can_authenticate(user) else None | ||
|
||
|
||
def set_enforcer_for_casbin_backend(enforcer_name): | ||
_enforcer = enforcers[enforcer_name] | ||
if _enforcer: | ||
CasbinBackend.enforcer = _enforcer | ||
CasbinBackend.enforcer.load_policy() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,3 @@ | ||
# from casbin_adapter.adapter import Adapter as CasbinAdapter | ||
import logging | ||
|
||
from casbin import Enforcer | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
-r requirements.txt | ||
|
||
mock==4.0.3 | ||
setuptools==60.2.0 | ||
casbin_django_orm_adapter==1.0.2 | ||
setuptools>=60.2.0 | ||
casbin_django_orm_adapter>=1.1.2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
from django.contrib.auth.models import User | ||
from django.test import TestCase | ||
|
||
from dauthz.core import enforcer | ||
|
||
|
||
class TestConfig(TestCase): | ||
def __init__(self, methodName: str = ...): | ||
super().__init__(methodName) | ||
self.UserModel = User | ||
|
||
def create_users(self): | ||
self.user = User.objects.create_user( | ||
username="alice", | ||
email="test@example.com", | ||
password="test", | ||
) | ||
|
||
def setUp(self): | ||
self.create_users() | ||
self.e = enforcer | ||
|
||
def test_get_user_permissions(self): | ||
self.e.clear_policy() | ||
user = self.UserModel._default_manager.get(pk=self.user.pk) | ||
self.assertEqual(user.get_user_permissions(), set()) | ||
self.e.add_policy("alice", "data1", "read") | ||
self.e.add_policy("alice", "data1", "write") | ||
self.assertEqual(user.get_user_permissions(), {("alice", "data1", "read"), ("alice", "data1", "write")}) | ||
|
||
def test_get_all_permissions(self): | ||
self.e.clear_policy() | ||
user = self.UserModel._default_manager.get(pk=self.user.pk) | ||
self.assertEqual(user.get_all_permissions(), set()) | ||
|
||
user = self.UserModel._default_manager.get(pk=self.user.pk) | ||
self.e.add_policy("alice", "data1", "read") | ||
self.e.add_policy("alice", "data1", "write") | ||
self.assertEqual(user.get_all_permissions(), {("alice", "data1", "read"), ("alice", "data1", "write")}) | ||
|
||
user = self.UserModel._default_manager.get(pk=self.user.pk) # to reset cache | ||
self.e.add_policy("data2_admin", "data2", "read") | ||
self.e.add_policy("data2_admin", "data2", "write") | ||
self.e.add_role_for_user("alice", "data2_admin") | ||
self.assertEqual( | ||
user.get_all_permissions(), | ||
{ | ||
("alice", "data1", "read"), | ||
("alice", "data1", "write"), | ||
("data2_admin", "data2", "write"), | ||
("data2_admin", "data2", "read"), | ||
}, | ||
) | ||
|
||
def test_has_perm(self): | ||
self.e.clear_policy() | ||
user = self.UserModel._default_manager.get(pk=self.user.pk) # to reset cache | ||
self.assertFalse(user.has_perm(("alice", "data1", "read"))) | ||
|
||
user = self.UserModel._default_manager.get(pk=self.user.pk) | ||
self.e.add_policy("alice", "data1", "read") | ||
self.assertTrue(user.has_perm(("alice", "data1", "read"))) |