Skip to content

Commit

Permalink
Merge pull request #369 from maykinmedia/feature/superuser
Browse files Browse the repository at this point in the history
Feature/superuser
  • Loading branch information
annashamray authored Apr 17, 2024
2 parents d6bf149 + df49fe8 commit b819418
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 2 deletions.
16 changes: 16 additions & 0 deletions docs/admin/authorization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,19 @@ fields you can submit the form.
Now the client who has this token can access the objects with the "Boom" object type.

If you want to know how to use Objects API you can follow :ref:`api_usage`


Superuser permissions
----------------------

It's possible to set up superuser permissions in Objects API. A client with such permissions
is able to request objects for all objecttypes.

In the admin page of the Objects API go to the "Token authorizations" resource and click on
a token, which should have superuser permissions. Check "is superuser" field. Now this token
has read and write permissions for all objects.

.. warning::

Tokens with superuser permissions are not recommended for production. They should be used
only for test and development purposes.
22 changes: 20 additions & 2 deletions src/objects/api/fields.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from django.core.exceptions import ObjectDoesNotExist
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.utils.encoding import smart_str
from django.utils.translation import gettext_lazy as _

from rest_framework import serializers
from vng_api_common.utils import get_uuid_from_path
from zgw_consumers.models import Service

from objects.core.models import ObjectRecord

Expand Down Expand Up @@ -42,7 +44,23 @@ def to_internal_value(self, data):
try:
return self.get_queryset().get_by_url(data)
except ObjectDoesNotExist:
self.fail("does_not_exist", value=smart_str(data))
# if service is configured, but object_type is missing
# let's try to create an ObjectType
service = Service.get_service(data)
if not service:
self.fail("does_not_exist", value=smart_str(data))

uuid = get_uuid_from_path(data)
object_type = self.get_queryset().model(service=service, uuid=uuid)

try:
object_type.clean()
except ValidationError:
self.fail("does_not_exist", value=smart_str(data))

object_type.save()
return object_type

except (TypeError, ValueError):
self.fail("invalid")

Expand Down
4 changes: 4 additions & 0 deletions src/objects/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class ObjectRecordQuerySet(models.QuerySet):
def filter_for_token(self, token):
if not token:
return self.none()

if token.is_superuser:
return self.all()

allowed_object_types = token.permissions.values("object_type")
return self.filter(
object__object_type__in=models.Subquery(allowed_object_types)
Expand Down
190 changes: 190 additions & 0 deletions src/objects/tests/v2/test_auth.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
from django.contrib.gis.geos import Point

import requests_mock
from rest_framework import status
from rest_framework.test import APITestCase

from objects.core.models import ObjectType
from objects.core.tests.factories import (
ObjectFactory,
ObjectRecordFactory,
ObjectTypeFactory,
ServiceFactory,
)
from objects.token.constants import PermissionModes
from objects.token.tests.factories import PermissionFactory, TokenAuthFactory
from objects.utils.test import TokenAuthMixin

from ..constants import GEO_WRITE_KWARGS, POLYGON_AMSTERDAM_CENTRUM
from ..utils import mock_objecttype, mock_objecttype_version, mock_service_oas_get
from .utils import reverse, reverse_lazy

OBJECT_TYPES_API = "https://example.com/objecttypes/v1/"
Expand Down Expand Up @@ -292,3 +296,189 @@ def test_search_objects_limited_to_object_permission(self):
data[0]["url"],
f"http://testserver{reverse('object-detail', args=[record.object.uuid])}",
)


class SuperUserTests(TokenAuthMixin, APITestCase):
@classmethod
def setUpTestData(cls):
super().setUpTestData()

cls.token_auth.is_superuser = True
cls.token_auth.save()

def test_retrieve_superuser(self):
object = ObjectFactory.create()
ObjectRecordFactory.create(object=object)
url = reverse("object-detail", args=[object.uuid])

response = self.client.get(url)

self.assertEqual(response.status_code, status.HTTP_200_OK)

def test_list_superuser(self):
ObjectRecordFactory.create_batch(2)
url = reverse_lazy("object-list")

response = self.client.get(url)

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.json()["results"]), 2)

def test_search_superuser(self):
ObjectRecordFactory.create_batch(2, geometry=Point(4.905289, 52.369918))
url = reverse("object-search")

response = self.client.post(
url,
{
"geometry": {
"within": {
"type": "Polygon",
"coordinates": [POLYGON_AMSTERDAM_CENTRUM],
}
},
},
**GEO_WRITE_KWARGS,
)
self.assertEqual(response.status_code, status.HTTP_200_OK)

data = response.json()["results"]
self.assertEqual(len(data), 2)

def test_history_superuser(self):
object = ObjectFactory.create()
ObjectRecordFactory.create(object=object)
url = reverse("object-history", args=[object.uuid])

response = self.client.get(url)

self.assertEqual(response.status_code, status.HTTP_200_OK)

@requests_mock.Mocker()
def test_create_superuser(self, m):
object_type = ObjectTypeFactory.create(service__api_root=OBJECT_TYPES_API)
url = reverse("object-list")
data = {
"type": f"{object_type.url}",
"record": {
"typeVersion": 1,
"data": {"plantDate": "2020-04-12", "diameter": 30},
"startAt": "2020-01-01",
},
}
# mocks
mock_service_oas_get(m, OBJECT_TYPES_API, "objecttypes")
m.get(
f"{object_type.url}/versions/1",
json=mock_objecttype_version(object_type.url),
)

response = self.client.post(url, data, **GEO_WRITE_KWARGS)

self.assertEqual(response.status_code, status.HTTP_201_CREATED)

def test_create_superuser_no_service(self):
url = reverse("object-list")
data = {
"type": f"{OBJECT_TYPES_API}objecttypes/8be76be2-6567-4f5c-a17b-05217ab6d7b2",
"record": {
"typeVersion": 1,
"data": {"plantDate": "2020-04-12", "diameter": 30},
"startAt": "2020-01-01",
},
}

response = self.client.post(url, data, **GEO_WRITE_KWARGS)

self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)

@requests_mock.Mocker()
def test_create_superuser_no_object_type(self, m):
objecttype_url = (
f"{OBJECT_TYPES_API}objecttypes/8be76be2-6567-4f5c-a17b-05217ab6d7b2"
)
service = ServiceFactory.create(api_root=OBJECT_TYPES_API)
url = reverse("object-list")
data = {
"type": objecttype_url,
"record": {
"typeVersion": 1,
"data": {"plantDate": "2020-04-12", "diameter": 30},
"startAt": "2020-01-01",
},
}
# mocks
mock_service_oas_get(m, OBJECT_TYPES_API, "objecttypes")
m.get(objecttype_url, json=mock_objecttype(objecttype_url))
m.get(
f"{objecttype_url}/versions/1",
json=mock_objecttype_version(objecttype_url),
)

response = self.client.post(url, data, **GEO_WRITE_KWARGS)

self.assertEqual(response.status_code, status.HTTP_201_CREATED)
# check created object type
object_type = ObjectType.objects.get()
self.assertEqual(object_type.service, service)
self.assertEqual(object_type.url, objecttype_url)

@requests_mock.Mocker()
def test_update_superuser(self, m):
object_type = ObjectTypeFactory(service__api_root=OBJECT_TYPES_API)
record = ObjectRecordFactory.create(object__object_type=object_type, version=1)
url = reverse("object-detail", args=[record.object.uuid])
data = {
"type": f"{object_type.url}",
"record": {
"typeVersion": record.version,
"data": record.data,
"startAt": record.start_at,
},
}
# mocks
mock_service_oas_get(m, OBJECT_TYPES_API, "objecttypes")
m.get(
f"{object_type.url}/versions/1",
json=mock_objecttype_version(object_type.url),
)

response = self.client.put(url, data=data, **GEO_WRITE_KWARGS)

self.assertEqual(response.status_code, status.HTTP_200_OK)

@requests_mock.Mocker()
def test_patch_superuser(self, m):
object_type = ObjectTypeFactory(service__api_root=OBJECT_TYPES_API)
record = ObjectRecordFactory.create(
object__object_type=object_type, version=1, data__name="old"
)
url = reverse("object-detail", args=[record.object.uuid])
# mocks
mock_service_oas_get(m, OBJECT_TYPES_API, "objecttypes")
m.get(
f"{object_type.url}/versions/1",
json=mock_objecttype_version(object_type.url),
)

response = self.client.patch(
url,
data={
"record": {
**record.data,
**{"name": "new"},
"startAt": "2020-01-01",
},
},
**GEO_WRITE_KWARGS,
)

self.assertEqual(response.status_code, status.HTTP_200_OK)

def test_destroy_superuser(self):
record = ObjectRecordFactory.create(data__name="old")
url = reverse("object-detail", args=[record.object.uuid])

response = self.client.delete(url)

self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
1 change: 1 addition & 0 deletions src/objects/token/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class TokenAuthAdmin(admin.ModelAdmin):
"organization",
"administration",
"application",
"is_superuser",
)
readonly_fields = ("token",)
inlines = [PermissionInline]
21 changes: 21 additions & 0 deletions src/objects/token/migrations/0010_tokenauth_is_superuser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Generated by Django 3.2.23 on 2024-03-08 13:03

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("token", "0009_alter_permission_fields"),
]

operations = [
migrations.AddField(
model_name="tokenauth",
name="is_superuser",
field=models.BooleanField(
default=False,
help_text="Designates whether the user has access to all objects.",
verbose_name="is superuser",
),
),
]
5 changes: 5 additions & 0 deletions src/objects/token/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class TokenAuth(models.Model):
blank=True,
help_text=_("Administration which has access to the API"),
)
is_superuser = models.BooleanField(
_("is superuser"),
default=False,
help_text=_("Designates whether the user has access to all objects."),
)

object_types = models.ManyToManyField(
"core.ObjectType",
Expand Down
6 changes: 6 additions & 0 deletions src/objects/token/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ def has_permission(self, request, view):
if not request.auth:
return False

if request.auth.is_superuser:
return True

# detail actions are processed in has_object_permission method
if view.action != "create":
return True
Expand All @@ -39,6 +42,9 @@ def has_object_permission(self, request, view, obj):
if bypass_permissions(request):
return True

if request.auth.is_superuser:
return True

object_permission = request.auth.get_permission_for_object_type(
obj.object.object_type
)
Expand Down
3 changes: 3 additions & 0 deletions src/objects/utils/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ def get_allowed_fields(self, instance) -> list:
if not request:
return ALL_FIELDS

if request.auth.is_superuser:
return ALL_FIELDS

# use prefetch_related for DB optimization
if getattr(instance.object.object_type, "token_permissions", None):
permission = instance.object.object_type.token_permissions[0]
Expand Down

0 comments on commit b819418

Please sign in to comment.