From af537b52611681ccd2a5b3cbd7c6f8dbc645a36f Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Wed, 21 Aug 2024 13:14:40 -0400 Subject: [PATCH] Rewrite more access logic in terms of permissions instead of roles (#15453) * Rewrite more access logic in terms of permissions instead of roles * Cut down supported logic because that would not work anyway * Remove methods not needed anymore * Create managed roles in test before delegating permissions --- awx/main/access.py | 76 +++++++------------ .../test_rbac_execution_environment.py | 4 +- 2 files changed, 28 insertions(+), 52 deletions(-) diff --git a/awx/main/access.py b/awx/main/access.py index 24b8b746fd30..b8a80c12d92e 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -242,9 +242,10 @@ def get_queryset(self): return qs def filtered_queryset(self): - # Override in subclasses - # filter objects according to user's read access - return self.model.objects.none() + if permission_registry.is_registered(self.model): + return self.model.access_qs(self.user, 'view') + else: + raise NotImplementedError('Filtered queryset for model is not written') def can_read(self, obj): return bool(obj and self.get_queryset().filter(pk=obj.pk).exists()) @@ -606,9 +607,6 @@ class InstanceGroupAccess(BaseAccess): model = InstanceGroup prefetch_related = ('instances',) - def filtered_queryset(self): - return self.model.accessible_objects(self.user, 'read_role') - @check_superuser def can_use(self, obj): return self.user in obj.use_role @@ -654,7 +652,7 @@ def filtered_queryset(self): qs = User.objects.all() else: qs = ( - User.objects.filter(pk__in=Organization.accessible_objects(self.user, 'read_role').values('member_role__members')) + User.objects.filter(pk__in=Organization.access_qs(self.user, 'view').values('member_role__members')) | User.objects.filter(pk=self.user.id) | User.objects.filter(is_superuser=True) ).distinct() @@ -671,7 +669,7 @@ def can_add(self, data): return True if not settings.MANAGE_ORGANIZATION_AUTH: return False - return Organization.accessible_objects(self.user, 'admin_role').exists() + return Organization.access_qs(self.user, 'change').exists() def can_change(self, obj, data): if data is not None and ('is_superuser' in data or 'is_system_auditor' in data): @@ -691,7 +689,7 @@ def user_organizations(u): """ Returns all organizations that count `u` as a member """ - return Organization.accessible_objects(u, 'member_role') + return Organization.access_qs(u, 'member') def is_all_org_admin(self, u): """ @@ -774,7 +772,7 @@ class OAuth2ApplicationAccess(BaseAccess): prefetch_related = ('organization', 'oauth2accesstoken_set') def filtered_queryset(self): - org_access_qs = Organization.accessible_objects(self.user, 'member_role') + org_access_qs = Organization.access_qs(self.user, 'member') return self.model.objects.filter(organization__in=org_access_qs) def can_change(self, obj, data): @@ -787,7 +785,7 @@ def can_add(self, data): if self.user.is_superuser: return True if not data: - return Organization.accessible_objects(self.user, 'admin_role').exists() + return Organization.access_qs(self.user, 'change').exists() return self.check_related('organization', Organization, data, role_field='admin_role', mandatory=True) @@ -855,9 +853,6 @@ class OrganizationAccess(NotificationAttachMixin, BaseAccess): # organization admin_role is not a parent of organization auditor_role notification_attach_roles = ['admin_role', 'auditor_role'] - def filtered_queryset(self): - return self.model.accessible_objects(self.user, 'read_role') - @check_superuser def can_change(self, obj, data): if data and data.get('default_environment'): @@ -925,9 +920,6 @@ class InventoryAccess(BaseAccess): Prefetch('labels', queryset=Label.objects.all().order_by('name')), ) - def filtered_queryset(self, allowed=None, ad_hoc=None): - return self.model.accessible_objects(self.user, 'read_role') - @check_superuser def can_use(self, obj): return self.user in obj.use_role @@ -936,7 +928,7 @@ def can_use(self, obj): def can_add(self, data): # If no data is specified, just checking for generic add permission? if not data: - return Organization.accessible_objects(self.user, 'inventory_admin_role').exists() + return Organization.access_qs(self.user, 'add_inventory').exists() return self.check_related('organization', Organization, data, role_field='inventory_admin_role') @check_superuser @@ -998,7 +990,7 @@ def filtered_queryset(self): def can_add(self, data): if not data: # So the browseable API will work - return Inventory.accessible_objects(self.user, 'admin_role').exists() + return Inventory.access_qs(self.user, 'change').exists() # Checks for admin or change permission on inventory. if not self.check_related('inventory', Inventory, data): @@ -1060,7 +1052,7 @@ def filtered_queryset(self): def can_add(self, data): if not data: # So the browseable API will work - return Inventory.accessible_objects(self.user, 'admin_role').exists() + return Inventory.access_qs(self.user, 'change').exists() if 'inventory' not in data: return False # Checks for admin or change permission on inventory. @@ -1102,7 +1094,7 @@ def filtered_queryset(self): def can_add(self, data): if not data or 'inventory' not in data: - return Inventory.accessible_objects(self.user, 'admin_role').exists() + return Inventory.access_qs(self.user, 'change').exists() if not self.check_related('source_project', Project, data, role_field='use_role'): return False @@ -1216,9 +1208,6 @@ class CredentialAccess(BaseAccess): ) prefetch_related = ('admin_role', 'use_role', 'read_role', 'admin_role__parents', 'admin_role__members', 'credential_type', 'organization') - def filtered_queryset(self): - return self.model.accessible_objects(self.user, 'read_role') - @check_superuser def can_add(self, data): if not data: # So the browseable API will work @@ -1329,7 +1318,7 @@ def filtered_queryset(self): @check_superuser def can_add(self, data): if not data: # So the browseable API will work - return Organization.accessible_objects(self.user, 'admin_role').exists() + return Organization.access_qs(self.user, 'view').exists() if not settings.MANAGE_ORGANIZATION_AUTH: return False return self.check_related('organization', Organization, data) @@ -1400,7 +1389,7 @@ class ExecutionEnvironmentAccess(BaseAccess): def filtered_queryset(self): return ExecutionEnvironment.objects.filter( - Q(organization__in=Organization.accessible_pk_qs(self.user, 'read_role')) + Q(organization__in=Organization.access_ids_qs(self.user, 'view')) | Q(organization__isnull=True) | Q(id__in=ExecutionEnvironment.access_ids_qs(self.user, 'change')) ).distinct() @@ -1408,7 +1397,7 @@ def filtered_queryset(self): @check_superuser def can_add(self, data): if not data: # So the browseable API will work - return Organization.accessible_objects(self.user, 'execution_environment_admin_role').exists() + return Organization.access_qs(self.user, 'add_executionenvironment').exists() return self.check_related('organization', Organization, data, mandatory=True, role_field='execution_environment_admin_role') @check_superuser @@ -1457,13 +1446,10 @@ class ProjectAccess(NotificationAttachMixin, BaseAccess): prefetch_related = ('modified_by', 'created_by', 'organization', 'last_job', 'current_job') notification_attach_roles = ['admin_role'] - def filtered_queryset(self): - return self.model.accessible_objects(self.user, 'read_role') - @check_superuser def can_add(self, data): if not data: # So the browseable API will work - return Organization.accessible_objects(self.user, 'project_admin_role').exists() + return Organization.access_qs(self.user, 'add_project').exists() if data.get('default_environment'): ee = get_object_from_data('default_environment', ExecutionEnvironment, data) @@ -1559,9 +1545,6 @@ class JobTemplateAccess(NotificationAttachMixin, UnifiedCredentialsMixin, BaseAc Prefetch('last_job', queryset=UnifiedJob.objects.non_polymorphic()), ) - def filtered_queryset(self): - return self.model.accessible_objects(self.user, 'read_role') - def can_add(self, data): """ a user can create a job template if @@ -1574,7 +1557,7 @@ def can_add(self, data): Users who are able to create deploy jobs can also run normal and check (dry run) jobs. """ if not data: # So the browseable API will work - return Project.accessible_objects(self.user, 'use_role').exists() + return Project.access_qs(self.user, 'use_project').exists() # if reference_obj is provided, determine if it can be copied reference_obj = data.get('reference_obj', None) @@ -1765,13 +1748,13 @@ class JobAccess(BaseAccess): def filtered_queryset(self): qs = self.model.objects - qs_jt = qs.filter(job_template__in=JobTemplate.accessible_objects(self.user, 'read_role')) + qs_jt = qs.filter(job_template__in=JobTemplate.access_qs(self.user, 'view')) org_access_qs = Organization.objects.filter(Q(admin_role__members=self.user) | Q(auditor_role__members=self.user)) if not org_access_qs.exists(): return qs_jt - return qs.filter(Q(job_template__in=JobTemplate.accessible_objects(self.user, 'read_role')) | Q(organization__in=org_access_qs)).distinct() + return qs.filter(Q(job_template__in=JobTemplate.access_qs(self.user, 'view')) | Q(organization__in=org_access_qs)).distinct() def can_add(self, data, validate_license=True): raise NotImplementedError('Direct job creation not possible in v2 API') @@ -1972,7 +1955,7 @@ class WorkflowJobTemplateNodeAccess(UnifiedCredentialsMixin, BaseAccess): prefetch_related = ('success_nodes', 'failure_nodes', 'always_nodes', 'unified_job_template', 'workflow_job_template') def filtered_queryset(self): - return self.model.objects.filter(workflow_job_template__in=WorkflowJobTemplate.accessible_objects(self.user, 'read_role')) + return self.model.objects.filter(workflow_job_template__in=WorkflowJobTemplate.access_qs(self.user, 'view')) @check_superuser def can_add(self, data): @@ -2087,9 +2070,6 @@ class WorkflowJobTemplateAccess(NotificationAttachMixin, BaseAccess): 'read_role', ) - def filtered_queryset(self): - return self.model.accessible_objects(self.user, 'read_role') - @check_superuser def can_add(self, data): """ @@ -2100,7 +2080,7 @@ def can_add(self, data): Users who are able to create deploy jobs can also run normal and check (dry run) jobs. """ if not data: # So the browseable API will work - return Organization.accessible_objects(self.user, 'workflow_admin_role').exists() + return Organization.access_qs(self.user, 'add_workflowjobtemplate').exists() if not self.check_related('organization', Organization, data, role_field='workflow_admin_role', mandatory=True): if data.get('organization', None) is None: @@ -2660,13 +2640,13 @@ def filtered_queryset(self): if settings.ANSIBLE_BASE_ROLE_SYSTEM_ACTIVATED: return self.model.access_qs(self.user, 'view') return self.model.objects.filter( - Q(organization__in=Organization.accessible_objects(self.user, 'notification_admin_role')) | Q(organization__in=self.user.auditor_of_organizations) + Q(organization__in=Organization.access_qs(self.user, 'add_notificationtemplate')) | Q(organization__in=self.user.auditor_of_organizations) ).distinct() @check_superuser def can_add(self, data): if not data: - return Organization.accessible_objects(self.user, 'notification_admin_role').exists() + return Organization.access_qs(self.user, 'add_notificationtemplate').exists() return self.check_related('organization', Organization, data, role_field='notification_admin_role', mandatory=True) @check_superuser @@ -2694,7 +2674,7 @@ class NotificationAccess(BaseAccess): def filtered_queryset(self): return self.model.objects.filter( - Q(notification_template__organization__in=Organization.accessible_objects(self.user, 'notification_admin_role')) + Q(notification_template__organization__in=Organization.access_qs(self.user, 'add_notificationtemplate')) | Q(notification_template__organization__in=self.user.auditor_of_organizations) ).distinct() @@ -2810,11 +2790,7 @@ def filtered_queryset(self): if credential_set: q |= Q(credential__in=credential_set) - auditing_orgs = ( - (Organization.accessible_objects(self.user, 'admin_role') | Organization.accessible_objects(self.user, 'auditor_role')) - .distinct() - .values_list('id', flat=True) - ) + auditing_orgs = (Organization.access_qs(self.user, 'change') | Organization.access_qs(self.user, 'audit')).distinct().values_list('id', flat=True) if auditing_orgs: q |= ( Q(user__in=auditing_orgs.values('member_role__members')) diff --git a/awx/main/tests/functional/test_rbac_execution_environment.py b/awx/main/tests/functional/test_rbac_execution_environment.py index e5bc3553710f..8749574389e3 100644 --- a/awx/main/tests/functional/test_rbac_execution_environment.py +++ b/awx/main/tests/functional/test_rbac_execution_environment.py @@ -98,7 +98,7 @@ def test_team_can_have_permission(org_ee, ee_rd, rando, admin_user, post): @pytest.mark.django_db -def test_give_object_permission_to_ee(org_ee, ee_rd, org_member, check_user_capabilities): +def test_give_object_permission_to_ee(setup_managed_roles, org_ee, ee_rd, org_member, check_user_capabilities): access = ExecutionEnvironmentAccess(org_member) assert access.can_read(org_ee) # by virtue of being an org member assert not access.can_change(org_ee, {'name': 'new'}) @@ -130,7 +130,7 @@ def test_need_related_organization_access(org_ee, ee_rd, org_member): @pytest.mark.django_db @pytest.mark.parametrize('style', ['new', 'old']) -def test_give_org_permission_to_ee(org_ee, organization, org_member, check_user_capabilities, style, org_ee_rd): +def test_give_org_permission_to_ee(setup_managed_roles, org_ee, organization, org_member, check_user_capabilities, style, org_ee_rd): access = ExecutionEnvironmentAccess(org_member) assert not access.can_change(org_ee, {'name': 'new'}) check_user_capabilities(org_member, org_ee, {'edit': False, 'delete': False, 'copy': False})