diff --git a/docs/admin/authorization.rst b/docs/admin/authorization.rst index eb5fe55f..d5bb5064 100644 --- a/docs/admin/authorization.rst +++ b/docs/admin/authorization.rst @@ -88,3 +88,19 @@ fields you can submit the form. Now the client who has this token can access the objects with the "Boom" object type. If you want to know how to use Objects API you can follow :ref:`api_usage` + + +Superuser permissions +---------------------- + +It's possible to set up superuser permissions in Objects API. A client with such permissions +is able to request objects for all objecttypes. + +In the admin page of the Objects API go to the "Token authorizations" resource and click on +a token, which should have superuser permissions. Check "is superuser" field. Now this token +has read and write permissions for all objects. + +.. warning:: + + Tokens with superuser permissions are not recommended for production. They should be used + only for test and development purposes. diff --git a/src/objects/api/fields.py b/src/objects/api/fields.py index 1e4a2da1..b6d8919e 100644 --- a/src/objects/api/fields.py +++ b/src/objects/api/fields.py @@ -1,8 +1,10 @@ -from django.core.exceptions import ObjectDoesNotExist +from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.utils.encoding import smart_str from django.utils.translation import gettext_lazy as _ from rest_framework import serializers +from vng_api_common.utils import get_uuid_from_path +from zgw_consumers.models import Service from objects.core.models import ObjectRecord @@ -42,7 +44,23 @@ def to_internal_value(self, data): try: return self.get_queryset().get_by_url(data) except ObjectDoesNotExist: - self.fail("does_not_exist", value=smart_str(data)) + # if service is configured, but object_type is missing + # let's try to create an ObjectType + service = Service.get_service(data) + if not service: + self.fail("does_not_exist", value=smart_str(data)) + + uuid = get_uuid_from_path(data) + object_type = self.get_queryset().model(service=service, uuid=uuid) + + try: + object_type.clean() + except ValidationError: + self.fail("does_not_exist", value=smart_str(data)) + + object_type.save() + return object_type + except (TypeError, ValueError): self.fail("invalid") diff --git a/src/objects/core/query.py b/src/objects/core/query.py index 84514c6e..bd1116fe 100644 --- a/src/objects/core/query.py +++ b/src/objects/core/query.py @@ -30,6 +30,10 @@ class ObjectRecordQuerySet(models.QuerySet): def filter_for_token(self, token): if not token: return self.none() + + if token.is_superuser: + return self.all() + allowed_object_types = token.permissions.values("object_type") return self.filter( object__object_type__in=models.Subquery(allowed_object_types) diff --git a/src/objects/tests/v2/test_auth.py b/src/objects/tests/v2/test_auth.py index 4cca5325..819bb877 100644 --- a/src/objects/tests/v2/test_auth.py +++ b/src/objects/tests/v2/test_auth.py @@ -1,18 +1,22 @@ from django.contrib.gis.geos import Point +import requests_mock from rest_framework import status from rest_framework.test import APITestCase +from objects.core.models import ObjectType from objects.core.tests.factories import ( ObjectFactory, ObjectRecordFactory, ObjectTypeFactory, + ServiceFactory, ) from objects.token.constants import PermissionModes from objects.token.tests.factories import PermissionFactory, TokenAuthFactory from objects.utils.test import TokenAuthMixin from ..constants import GEO_WRITE_KWARGS, POLYGON_AMSTERDAM_CENTRUM +from ..utils import mock_objecttype, mock_objecttype_version, mock_service_oas_get from .utils import reverse, reverse_lazy OBJECT_TYPES_API = "https://example.com/objecttypes/v1/" @@ -292,3 +296,189 @@ def test_search_objects_limited_to_object_permission(self): data[0]["url"], f"http://testserver{reverse('object-detail', args=[record.object.uuid])}", ) + + +class SuperUserTests(TokenAuthMixin, APITestCase): + @classmethod + def setUpTestData(cls): + super().setUpTestData() + + cls.token_auth.is_superuser = True + cls.token_auth.save() + + def test_retrieve_superuser(self): + object = ObjectFactory.create() + ObjectRecordFactory.create(object=object) + url = reverse("object-detail", args=[object.uuid]) + + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_list_superuser(self): + ObjectRecordFactory.create_batch(2) + url = reverse_lazy("object-list") + + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.json()["results"]), 2) + + def test_search_superuser(self): + ObjectRecordFactory.create_batch(2, geometry=Point(4.905289, 52.369918)) + url = reverse("object-search") + + response = self.client.post( + url, + { + "geometry": { + "within": { + "type": "Polygon", + "coordinates": [POLYGON_AMSTERDAM_CENTRUM], + } + }, + }, + **GEO_WRITE_KWARGS, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + data = response.json()["results"] + self.assertEqual(len(data), 2) + + def test_history_superuser(self): + object = ObjectFactory.create() + ObjectRecordFactory.create(object=object) + url = reverse("object-history", args=[object.uuid]) + + response = self.client.get(url) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + @requests_mock.Mocker() + def test_create_superuser(self, m): + object_type = ObjectTypeFactory.create(service__api_root=OBJECT_TYPES_API) + url = reverse("object-list") + data = { + "type": f"{object_type.url}", + "record": { + "typeVersion": 1, + "data": {"plantDate": "2020-04-12", "diameter": 30}, + "startAt": "2020-01-01", + }, + } + # mocks + mock_service_oas_get(m, OBJECT_TYPES_API, "objecttypes") + m.get( + f"{object_type.url}/versions/1", + json=mock_objecttype_version(object_type.url), + ) + + response = self.client.post(url, data, **GEO_WRITE_KWARGS) + + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + + def test_create_superuser_no_service(self): + url = reverse("object-list") + data = { + "type": f"{OBJECT_TYPES_API}objecttypes/8be76be2-6567-4f5c-a17b-05217ab6d7b2", + "record": { + "typeVersion": 1, + "data": {"plantDate": "2020-04-12", "diameter": 30}, + "startAt": "2020-01-01", + }, + } + + response = self.client.post(url, data, **GEO_WRITE_KWARGS) + + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + @requests_mock.Mocker() + def test_create_superuser_no_object_type(self, m): + objecttype_url = ( + f"{OBJECT_TYPES_API}objecttypes/8be76be2-6567-4f5c-a17b-05217ab6d7b2" + ) + service = ServiceFactory.create(api_root=OBJECT_TYPES_API) + url = reverse("object-list") + data = { + "type": objecttype_url, + "record": { + "typeVersion": 1, + "data": {"plantDate": "2020-04-12", "diameter": 30}, + "startAt": "2020-01-01", + }, + } + # mocks + mock_service_oas_get(m, OBJECT_TYPES_API, "objecttypes") + m.get(objecttype_url, json=mock_objecttype(objecttype_url)) + m.get( + f"{objecttype_url}/versions/1", + json=mock_objecttype_version(objecttype_url), + ) + + response = self.client.post(url, data, **GEO_WRITE_KWARGS) + + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + # check created object type + object_type = ObjectType.objects.get() + self.assertEqual(object_type.service, service) + self.assertEqual(object_type.url, objecttype_url) + + @requests_mock.Mocker() + def test_update_superuser(self, m): + object_type = ObjectTypeFactory(service__api_root=OBJECT_TYPES_API) + record = ObjectRecordFactory.create(object__object_type=object_type, version=1) + url = reverse("object-detail", args=[record.object.uuid]) + data = { + "type": f"{object_type.url}", + "record": { + "typeVersion": record.version, + "data": record.data, + "startAt": record.start_at, + }, + } + # mocks + mock_service_oas_get(m, OBJECT_TYPES_API, "objecttypes") + m.get( + f"{object_type.url}/versions/1", + json=mock_objecttype_version(object_type.url), + ) + + response = self.client.put(url, data=data, **GEO_WRITE_KWARGS) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + @requests_mock.Mocker() + def test_patch_superuser(self, m): + object_type = ObjectTypeFactory(service__api_root=OBJECT_TYPES_API) + record = ObjectRecordFactory.create( + object__object_type=object_type, version=1, data__name="old" + ) + url = reverse("object-detail", args=[record.object.uuid]) + # mocks + mock_service_oas_get(m, OBJECT_TYPES_API, "objecttypes") + m.get( + f"{object_type.url}/versions/1", + json=mock_objecttype_version(object_type.url), + ) + + response = self.client.patch( + url, + data={ + "record": { + **record.data, + **{"name": "new"}, + "startAt": "2020-01-01", + }, + }, + **GEO_WRITE_KWARGS, + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + def test_destroy_superuser(self): + record = ObjectRecordFactory.create(data__name="old") + url = reverse("object-detail", args=[record.object.uuid]) + + response = self.client.delete(url) + + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) diff --git a/src/objects/token/admin.py b/src/objects/token/admin.py index 47faf319..9b1a0d32 100644 --- a/src/objects/token/admin.py +++ b/src/objects/token/admin.py @@ -115,6 +115,7 @@ class TokenAuthAdmin(admin.ModelAdmin): "organization", "administration", "application", + "is_superuser", ) readonly_fields = ("token",) inlines = [PermissionInline] diff --git a/src/objects/token/migrations/0010_tokenauth_is_superuser.py b/src/objects/token/migrations/0010_tokenauth_is_superuser.py new file mode 100644 index 00000000..b535e5ad --- /dev/null +++ b/src/objects/token/migrations/0010_tokenauth_is_superuser.py @@ -0,0 +1,21 @@ +# Generated by Django 3.2.23 on 2024-03-08 13:03 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("token", "0009_alter_permission_fields"), + ] + + operations = [ + migrations.AddField( + model_name="tokenauth", + name="is_superuser", + field=models.BooleanField( + default=False, + help_text="Designates whether the user has access to all objects.", + verbose_name="is superuser", + ), + ), + ] diff --git a/src/objects/token/models.py b/src/objects/token/models.py index e086fb45..2856c5f3 100644 --- a/src/objects/token/models.py +++ b/src/objects/token/models.py @@ -46,6 +46,11 @@ class TokenAuth(models.Model): blank=True, help_text=_("Administration which has access to the API"), ) + is_superuser = models.BooleanField( + _("is superuser"), + default=False, + help_text=_("Designates whether the user has access to all objects."), + ) object_types = models.ManyToManyField( "core.ObjectType", diff --git a/src/objects/token/permissions.py b/src/objects/token/permissions.py index 2d05c505..ab2d553d 100644 --- a/src/objects/token/permissions.py +++ b/src/objects/token/permissions.py @@ -14,6 +14,9 @@ def has_permission(self, request, view): if not request.auth: return False + if request.auth.is_superuser: + return True + # detail actions are processed in has_object_permission method if view.action != "create": return True @@ -39,6 +42,9 @@ def has_object_permission(self, request, view, obj): if bypass_permissions(request): return True + if request.auth.is_superuser: + return True + object_permission = request.auth.get_permission_for_object_type( obj.object.object_type ) diff --git a/src/objects/utils/serializers.py b/src/objects/utils/serializers.py index c7c2aaa1..8a69ab3f 100644 --- a/src/objects/utils/serializers.py +++ b/src/objects/utils/serializers.py @@ -148,6 +148,9 @@ def get_allowed_fields(self, instance) -> list: if not request: return ALL_FIELDS + if request.auth.is_superuser: + return ALL_FIELDS + # use prefetch_related for DB optimization if getattr(instance.object.object_type, "token_permissions", None): permission = instance.object.object_type.token_permissions[0]