diff --git a/asab/web/auth/authorization.py b/asab/web/auth/authorization.py index d433fe737..6cc78c6bd 100644 --- a/asab/web/auth/authorization.py +++ b/asab/web/auth/authorization.py @@ -18,6 +18,7 @@ class Authorization: def __init__(self, claims: dict): # Userinfo should not be accessed directly self._Claims = claims or {} + self._Resources = self._Claims.get("resources", {}) self.CredentialsId = self._Claims.get("sub") self.Username = self._Claims.get("preferred_username") or self._Claims.get("username") @@ -62,7 +63,7 @@ def has_superuser_access(self) -> bool: >>> print("I am but a mere mortal.") """ self.require_valid() - return is_superuser(self._Claims) + return is_superuser(self._Resources) def has_resource_access(self, *resources: str) -> bool: @@ -84,7 +85,7 @@ def has_resource_access(self, *resources: str) -> bool: >>> print("Not much to do here.") """ self.require_valid() - return has_resource_access(self._Claims, resources, tenant=Tenant.get(None)) + return has_resource_access(self._Resources, resources, tenant=Tenant.get(None)) def has_tenant_access(self) -> bool: @@ -102,7 +103,7 @@ def has_tenant_access(self) -> bool: >>> if authz.has_tenant_access(): >>> print("I have access to Big Corporation!") >>> else: - >>> print("Not much to do here.") + >>> print("Not much to do here.") >>> finally: >>> asab.contextvars.Tenant.reset(tenant_ctx) """ @@ -113,7 +114,7 @@ def has_tenant_access(self) -> bool: except LookupError as e: raise ValueError("No tenant in context.") from e - return has_tenant_access(self._Claims, tenant) + return has_tenant_access(self._Resources, tenant) def require_valid(self): @@ -187,20 +188,6 @@ def require_tenant_access(self): raise AccessDeniedError() - def authorized_resources(self) -> typing.Optional[typing.Set[str]]: - """ - Return the set of authorized resources. - - NOTE: If possible, use methods has_resource_access(resource_id) and has_superuser_access() instead of inspecting - the set of resources directly. - - Returns: - Set of authorized resources. - """ - self.require_valid() - return get_authorized_resources(self._Claims, Tenant.get(None)) - - def user_info(self) -> typing.Dict[str, typing.Any]: """ Return OpenID Connect UserInfo claims (or JWToken claims). @@ -229,25 +216,61 @@ def get_claim(self, key: str) -> typing.Any: return self._Claims.get(key) -def is_superuser(userinfo: typing.Mapping) -> bool: + def _resources(self) -> typing.Optional[typing.Set[str]]: + """ + Return the set of authorized resources. + + Returns: + set: Authorized resources. + """ + self.require_valid() + + resources = _authorized_resources(self._Resources, Tenant.get(None)) + + if self.has_superuser_access(): + # Ensure superuser resource is present no matter the tenant + resources.add(SUPERUSER_RESOURCE_ID) + + return resources + + +def is_superuser(resources_claim: typing.Mapping) -> bool: """ Check if the superuser resource is present in the authorized resource list. + + Args: + resources_claim (typing.Mapping): The "resources" field from authorization server claims (aka UserInfo). + + Returns: + bool: Do I have superuser access? """ - return SUPERUSER_RESOURCE_ID in get_authorized_resources(userinfo, tenant=None) + return SUPERUSER_RESOURCE_ID in _authorized_resources(resources_claim, tenant=None) def has_resource_access( - userinfo: typing.Mapping, - resources: typing.Iterable, + resources_claim: typing.Mapping, + resources: typing.Collection[str], tenant: typing.Union[str, None], ) -> bool: """ Check if the requested resources or the superuser resource are present in the authorized resource list. + + Args: + resources_claim (typing.Mapping): The "resources" field from authorization server claims (aka UserInfo). + resources (typing.Collection[str]): A list of resource IDs whose authorization is requested. + tenant (str): Tenant context of the authorization (or `None` for global context). + + Returns: + bool: Am I authorized to access requested resources? """ - if is_superuser(userinfo): + if len(resources) == 0: + raise ValueError("Resources must not be empty") + + authorized_resources = _authorized_resources(resources_claim, tenant) + + if is_superuser(resources_claim): return True - authorized_resources = get_authorized_resources(userinfo, tenant) for resource in resources: if resource not in authorized_resources: return False @@ -255,26 +278,43 @@ def has_resource_access( return True -def has_tenant_access(userinfo: typing.Mapping, tenant: str) -> bool: +def has_tenant_access(resources_claim: typing.Mapping, tenant: str) -> bool: """ Check the agent's userinfo to see if they are authorized to access a tenant. If the agent has superuser access, tenant access is always implicitly granted. + + Args: + resources_claim (typing.Mapping): The "resources" field from authorization server claims (aka UserInfo). + tenant (str): Tenant context of the authorization. + + Returns: + bool: Am I authorized to access requested tenant? """ - if tenant == "*": - raise ValueError("Invalid tenant name: '*'") - if is_superuser(userinfo): + if tenant in {"*", None}: + raise ValueError("Invalid tenant: {!r}".format(tenant)) + + if is_superuser(resources_claim): return True - if tenant in userinfo.get("resources", {}): + + if tenant in resources_claim: return True + return False -def get_authorized_resources(userinfo: typing.Mapping, tenant: typing.Union[str, None]) -> typing.Set[str]: +def _authorized_resources(resources_claim: typing.Mapping, tenant: typing.Union[str, None]) -> typing.Set[str]: """ Extract resources authorized within given tenant (or globally, if tenant is None). - :param userinfo: - :param tenant: - :return: Set of authorized resources. + Args: + resources_claim (typing.Mapping): The "resources" field from authorization server claims (aka UserInfo). + tenant (str): Tenant context of the authorization (or `None` for global context). + + Returns: + set: Resources authorized within the tenant (or globally). """ - return set(userinfo.get("resources", {}).get(tenant if tenant is not None else "*", [])) + if tenant == "*": + # Use `None` for global context instead! + raise ValueError("Invalid tenant name: {}".format(tenant)) + + return set(resources_claim.get(tenant if tenant is not None else "*", [])) diff --git a/asab/web/auth/service.py b/asab/web/auth/service.py index 33f79944f..c529800bd 100644 --- a/asab/web/auth/service.py +++ b/asab/web/auth/service.py @@ -655,5 +655,5 @@ def _pass_resources(handler): @functools.wraps(handler) async def _pass_resources_wrapper(*args, **kwargs): authz = Authz.get(None) - return await handler(*args, resources=authz.authorized_resources() if authz is not None else None, **kwargs) + return await handler(*args, resources=authz._resources() if authz is not None else None, **kwargs) return _pass_resources_wrapper diff --git a/test/__init__.py b/test/__init__.py index 37ae64d01..c9e637817 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -5,3 +5,5 @@ from .test_metrics.test_histogram import * from .test_metrics.test_duplicates import * from .test_crash.test_crash import * +from .test_auth.test_access_control import * +from .test_auth.test_authorization import * diff --git a/test/test_auth/__init__.py b/test/test_auth/__init__.py new file mode 100644 index 000000000..bdd56297c --- /dev/null +++ b/test/test_auth/__init__.py @@ -0,0 +1,2 @@ +from .test_access_control import * +from .test_authorization import * diff --git a/test/test_auth/test_access_control.py b/test/test_auth/test_access_control.py new file mode 100644 index 000000000..173cffbce --- /dev/null +++ b/test/test_auth/test_access_control.py @@ -0,0 +1,513 @@ +import unittest + +from asab.web.auth.authorization import ( + has_tenant_access, + has_resource_access, + is_superuser, +) + +TENANT_1 = "TENANT_1" +TENANT_2 = "TENANT_2" +RESOURCE_1 = "RESOURCE_1" +RESOURCE_2 = "RESOURCE_2" +RESOURCE_3 = "RESOURCE_3" +RESOURCE_SUPERUSER = "authz:superuser" + + +class TestGlobalAndTenantResources(unittest.TestCase): + """ + This entity is authorized to access RESOURCE_1 and RESOURCE_2 in TENANT_1, and RESOURCE_1 globally. + + Access check for RESOURCE_1 or RESOURCE_2 in TENANT_1 must return True. + Global access check for RESOURCE_1 must return True. + Any other access check for tenant or resource must return False. + """ + ResourcesClaim = { + "*": [RESOURCE_1], + TENANT_1: [RESOURCE_1, RESOURCE_2], + } + + + + def test_tenant_resource_access(self): + self.assertFalse( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1, RESOURCE_2, RESOURCE_3], + tenant=TENANT_1, + ), + "Access to RESOURCE_3 is not authorized in TENANT_1.", + ) + + self.assertTrue( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1, RESOURCE_2], + tenant=TENANT_1, + ), + "Access to both resources is authorized in TENANT_1.", + ) + + self.assertTrue( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=TENANT_1, + ), + "Access to RESOURCE_1 is authorized in TENANT_1.", + ) + + self.assertFalse( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=TENANT_2, + ), + "Access to TENANT_2 is not authorized.", + ) + + + def test_global_resource_access(self): + self.assertFalse( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1, RESOURCE_2], + tenant=None, + ), + "Access to RESOURCE_2 is not authorized globally.", + ) + + self.assertTrue( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=None, + ), + "Access to RESOURCE_1 is authorized globally.", + ) + + + def test_tenant_access(self): + self.assertTrue( + has_tenant_access( + resources_claim=self.ResourcesClaim, + tenant=TENANT_1, + ), + "Access to TENANT_1 is authorized.", + ) + + self.assertFalse( + has_tenant_access( + resources_claim=self.ResourcesClaim, + tenant=TENANT_2, + ), + "Access to TENANT_2 is not authorized.", + ) + + + def test_superuser(self): + self.assertFalse( + is_superuser( + resources_claim=self.ResourcesClaim, + ), + "Access to superuser resource is not authorized.", + ) + + +class TestTenantWithoutResources(unittest.TestCase): + """ + This entity has authorized access to TENANT_1 but no resources. + + Only access check for TENANT_1 must return True. + Access check for any resource or any other tenant must return False. + """ + + ResourcesClaim = { + TENANT_1: [], + } + + + def test_tenant_resource_access(self): + self.assertFalse( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=TENANT_1, + ), + "Access to RESOURCE_1 in TENANT_1 is not authorized.", + ) + + + def test_global_resource_access(self): + self.assertFalse( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=None, + ), + "No resource is authorized globally.", + ) + + + def test_tenant_access(self): + self.assertTrue( + has_tenant_access( + resources_claim=self.ResourcesClaim, + tenant=TENANT_1, + ), + "Access to TENANT_1 is authorized.", + ) + + self.assertFalse( + has_tenant_access( + resources_claim=self.ResourcesClaim, + tenant=TENANT_2, + ), + "Access to TENANT_2 is not authorized.", + ) + + + def test_superuser(self): + self.assertFalse( + is_superuser( + resources_claim=self.ResourcesClaim, + ), + "Access to superuser resource is not authorized.", + ) + + +class TestTenantWithResources(unittest.TestCase): + """ + This entity is authorized to access RESOURCE_1 and RESOURCE_2 in TENANT_1. + + Access check for RESOURCE_1 or RESOURCE_2 in TENANT_1 must return True. + Any other access check for tenant or resource must return False. + """ + ResourcesClaim = { + TENANT_1: [RESOURCE_1, RESOURCE_2], + } + + def test_tenant_resource_access(self): + self.assertFalse( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1, RESOURCE_2, RESOURCE_3], + tenant=TENANT_1, + ), + "Access to RESOURCE_3 is not authorized in TENANT_1.", + ) + + self.assertTrue( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1, RESOURCE_2], + tenant=TENANT_1, + ), + "Access to both resources is authorized in TENANT_1.", + ) + + self.assertTrue( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=TENANT_1, + ), + "Access to RESOURCE_1 is authorized in TENANT_1.", + ) + + self.assertFalse( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1, RESOURCE_2], + tenant=TENANT_2, + ), + "Access to TENANT_2 is not authorized.", + ) + + self.assertFalse( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=TENANT_2, + ), + "Access to TENANT_2 is not authorized.", + ) + + + def test_global_resource_access(self): + self.assertFalse( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=None, + ), + "No resource is authorized globally.", + ) + + + def test_tenant_access(self): + self.assertTrue( + has_tenant_access( + resources_claim=self.ResourcesClaim, + tenant=TENANT_1, + ), + "Access to TENANT_1 is authorized.", + ) + + self.assertFalse( + has_tenant_access( + resources_claim=self.ResourcesClaim, + tenant=TENANT_2, + ), + "Access to TENANT_2 is not authorized.", + ) + + + def test_superuser(self): + self.assertFalse( + is_superuser( + resources_claim=self.ResourcesClaim, + ), + "Access to superuser resource is not authorized.", + ) + + +class TestGlobalResourcesNoTenant(unittest.TestCase): + """ + This entity is globally authorized to access RESOURCE_1, but doesn't have access to any tenant. + + Only global access check for RESOURCE_1 must return True. + Any other access check for tenant or resource must return False. + """ + ResourcesClaim = { + "*": [RESOURCE_1], + } + + + def test_tenant_resource_access(self): + self.assertFalse( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=TENANT_1, + ), + "Access to TENANT_1 is not authorized.", + ) + + + def test_global_resource_access(self): + self.assertFalse( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1, RESOURCE_2], + tenant=None, + ), + "Access to RESOURCE_2 is not authorized globally.", + ) + + self.assertTrue( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=None, + ), + "Access to RESOURCE_1 is authorized globally.", + ) + + + def test_tenant_access(self): + self.assertFalse( + has_tenant_access( + resources_claim=self.ResourcesClaim, + tenant=TENANT_1, + ), + "Access to TENANT_1 is not authorized.", + ) + + + def test_superuser(self): + self.assertFalse( + is_superuser( + resources_claim=self.ResourcesClaim, + ), + "Access to superuser resource is not authorized.", + ) + + +class TestSuperuser(unittest.TestCase): + """ + This entity has authorized access to "authz:superuser" resource, which grants them + superuser privileges. + + Access check for any tenant or resource must return True. + Incorrect tenant values must still throw ValueError. + """ + + ResourcesClaim = { + "*": [RESOURCE_SUPERUSER], + } + + + def test_tenant_resource_access(self): + self.assertTrue( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1, RESOURCE_2], + tenant=TENANT_1, + ), + "Superuser must have unrestricted resource access.", + ) + + self.assertTrue( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=TENANT_1, + ), + "Superuser must have unrestricted resource access.", + ) + + self.assertTrue( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1, RESOURCE_2], + tenant=TENANT_2, + ), + "Superuser must have unrestricted resource access.", + ) + + with self.assertRaises(ValueError): + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[], + tenant=TENANT_1, + ) + + + def test_global_resource_access(self): + self.assertTrue( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1, RESOURCE_2], + tenant=None, + ), + "Superuser must have unrestricted resource access.", + ) + + self.assertTrue( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=None, + ), + "Superuser must have unrestricted resource access.", + ) + + with self.assertRaises(ValueError): + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant="*", + ) + + + def test_tenant_access(self): + self.assertTrue( + has_tenant_access( + resources_claim=self.ResourcesClaim, + tenant=TENANT_1, + ), + "Superuser must have unrestricted tenant access.", + ) + + with self.assertRaises(ValueError): + has_tenant_access( + resources_claim=self.ResourcesClaim, + tenant="*", + ) + + with self.assertRaises(ValueError): + has_tenant_access( + resources_claim=self.ResourcesClaim, + tenant=None, + ) + + + def test_superuser(self): + self.assertTrue( + is_superuser( + resources_claim=self.ResourcesClaim, + ), + "Superuser resource must grant superuser privileges.", + ) + + +class TestEmptyResources(unittest.TestCase): + ResourcesClaim = {} + + + def test_tenant_resource_access(self): + self.assertFalse( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=TENANT_1, + ), + "No authorized tenants or resources.", + ) + + with self.assertRaises(ValueError): + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[], + tenant=TENANT_1, + ) + + + def test_global_resource_access(self): + self.assertFalse( + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant=None, + ), + "No globally authorized resources.", + ) + + with self.assertRaises(ValueError): + has_resource_access( + resources_claim=self.ResourcesClaim, + resources=[RESOURCE_1], + tenant="*", + ) + + + def test_tenant_access(self): + self.assertFalse( + has_tenant_access( + resources_claim=self.ResourcesClaim, + tenant=TENANT_2, + ), + "No authorized tenants.", + ) + + with self.assertRaises(ValueError): + has_tenant_access( + resources_claim=self.ResourcesClaim, + tenant="*", + ) + + with self.assertRaises(ValueError): + has_tenant_access( + resources_claim=self.ResourcesClaim, + tenant=None, + ) + + + def test_superuser(self): + self.assertFalse( + is_superuser( + resources_claim=self.ResourcesClaim, + ), + "No authorized tenants or resources.", + ) diff --git a/test/test_auth/test_authorization.py b/test/test_auth/test_authorization.py new file mode 100644 index 000000000..bc4612087 --- /dev/null +++ b/test/test_auth/test_authorization.py @@ -0,0 +1,515 @@ +import unittest +import time + +import asab.contextvars +import asab.exceptions +from asab.web.auth.authorization import Authorization + +TENANT_1 = "TENANT_1" +TENANT_2 = "TENANT_2" +RESOURCE_1 = "RESOURCE_1" +RESOURCE_2 = "RESOURCE_2" +RESOURCE_3 = "RESOURCE_3" +RESOURCE_SUPERUSER = "authz:superuser" + + +class TestInvalidClaims(unittest.TestCase): + """ + Entity is missing mandatory authorization claims. + + Authorization object should not initialize. + """ + + def setUp(self): + super().setUp() + self.TenantCtx = asab.contextvars.Tenant.set(TENANT_1) + + def tearDown(self): + super().tearDown() + asab.contextvars.Tenant.reset(self.TenantCtx) + + def test_missing_expiration(self): + claims = { + "iat": time.time() - 60, + } + with self.assertRaises(KeyError): + Authorization(claims) + + def test_missing_issued_at(self): + claims = { + "exp": time.time() + 3600, + } + with self.assertRaises(KeyError): + Authorization(claims) + + +class TestExpiredSuperuser(unittest.TestCase): + """ + Entity has expired superuser authorization. + + All access control methods must raise asab.exceptions.NotAuthenticatedError. + """ + def setUp(self): + super().setUp() + claims = { + "iat": time.time() - 60, + "exp": time.time() - 10, # Expired 10 seconds ago + "resources": { + "*": [RESOURCE_SUPERUSER], + TENANT_1: [RESOURCE_SUPERUSER, RESOURCE_1], + }, + } + self.Authz = Authorization(claims) + self.TenantCtx = asab.contextvars.Tenant.set(TENANT_1) + + def tearDown(self): + super().tearDown() + asab.contextvars.Tenant.reset(self.TenantCtx) + self.Authz = None + + def test_valid(self): + self.assertFalse( + self.Authz.is_valid(), + "Expired authorization must return False.", + ) + + with self.assertRaises(asab.exceptions.NotAuthenticatedError): + self.Authz.require_valid() + + def test_resource_access(self): + with self.assertRaises(asab.exceptions.NotAuthenticatedError): + self.Authz.has_resource_access(RESOURCE_1) + + with self.assertRaises(asab.exceptions.NotAuthenticatedError): + self.Authz.require_resource_access(RESOURCE_1) + + def test_tenant_access(self): + with self.assertRaises(asab.exceptions.NotAuthenticatedError): + self.Authz.has_tenant_access() + + with self.assertRaises(asab.exceptions.NotAuthenticatedError): + self.Authz.require_tenant_access() + + def test_superuser_access(self): + with self.assertRaises(asab.exceptions.NotAuthenticatedError): + self.Authz.has_superuser_access() + + with self.assertRaises(asab.exceptions.NotAuthenticatedError): + self.Authz.require_superuser_access() + + def test_authorized_resources(self): + with self.assertRaises(asab.exceptions.NotAuthenticatedError): + self.Authz._resources() + + def test_get_claim(self): + with self.assertRaises(asab.exceptions.NotAuthenticatedError): + self.Authz.get_claim("anything") + + def test_user_info(self): + with self.assertRaises(asab.exceptions.NotAuthenticatedError): + self.Authz.user_info() + + +class TestTenantAuthorized(unittest.TestCase): + """ + Entity is authorized to access RESOURCE_1 and RESOURCE_2 in TENANT_1, and RESOURCE_1 globally. + It is accessing TENANT_1 (in context). + """ + def setUp(self): + super().setUp() + + claims = { + "iat": time.time() - 60, + "exp": time.time() + 3600, + "resources": { + "*": [RESOURCE_1], + TENANT_1: [RESOURCE_1, RESOURCE_2], + }, + } + self.Authz = Authorization(claims) + self.TenantCtx = asab.contextvars.Tenant.set(TENANT_1) + + def tearDown(self): + super().tearDown() + asab.contextvars.Tenant.reset(self.TenantCtx) + self.Authz = None + + def test_valid(self): + self.assertTrue( + self.Authz.is_valid(), + "Unexpired authorization must return True.", + ) + + self.assertIsNone( + self.Authz.require_valid(), + "Unexpired authorization must succeed without return." + ) + + def test_resource_access(self): + self.assertTrue( + self.Authz.has_resource_access(RESOURCE_1), + "Access to RESOURCE_1 is authorized.", + ) + + self.assertIsNone( + self.Authz.require_resource_access(RESOURCE_1), + "Authorized access to RESOURCE_1 must succeed without return.", + ) + + self.assertFalse( + self.Authz.has_resource_access(RESOURCE_1, RESOURCE_3), + "Access to RESOURCE_3 is not authorized.", + ) + + with self.assertRaises(asab.exceptions.AccessDeniedError): + self.Authz.require_resource_access(RESOURCE_1, RESOURCE_3) + + def test_tenant_access(self): + self.assertTrue( + self.Authz.has_tenant_access(), + "Access to TENANT_1 is authorized.", + ) + + self.assertIsNone( + self.Authz.require_tenant_access(), + "Authorized access to TENANT_1 must succeed without return.", + ) + + def test_superuser_access(self): + self.assertFalse( + self.Authz.has_superuser_access(), + "Access to superuser resource is not authorized.", + ) + + with self.assertRaises(asab.exceptions.AccessDeniedError): + self.Authz.require_superuser_access() + + def test_authorized_resources(self): + self.assertEqual( + self.Authz._resources(), + {RESOURCE_1, RESOURCE_2}, + "Entity is authorized to access RESOURCE_1, RESOURCE_2 in TENANT_1.", + ) + + +class TestTenantForbidden(unittest.TestCase): + """ + Entity is authorized to access RESOURCE_1 and RESOURCE_2 in TENANT_1, and RESOURCE_1 globally. + It is accessing TENANT_1 (in context). + + All has-access methods must return False. + All require-access methods must raise asab.exceptions.AccessDeniedError. + """ + def setUp(self): + super().setUp() + + claims = { + "iat": time.time() - 60, + "exp": time.time() + 3600, + "resources": { + "*": [RESOURCE_1], + TENANT_1: [RESOURCE_1, RESOURCE_2], + }, + } + self.Authz = Authorization(claims) + self.TenantCtx = asab.contextvars.Tenant.set(TENANT_2) + + def tearDown(self): + super().tearDown() + asab.contextvars.Tenant.reset(self.TenantCtx) + self.Authz = None + + def test_valid(self): + self.assertTrue( + self.Authz.is_valid(), + "Unexpired authorization must return True.", + ) + + self.assertIsNone( + self.Authz.require_valid(), + "Unexpired authorization must succeed without return." + ) + + def test_resource_access(self): + self.assertFalse( + self.Authz.has_resource_access(RESOURCE_1), + "Access to RESOURCE_1 is not authorized in TENANT_2.", + ) + + with self.assertRaises(asab.exceptions.AccessDeniedError): + self.Authz.require_resource_access(RESOURCE_1) + + def test_tenant_access(self): + self.assertFalse( + self.Authz.has_tenant_access(), + "Access to TENANT_2 is not authorized.", + ) + + with self.assertRaises(asab.exceptions.AccessDeniedError): + self.Authz.require_tenant_access() + + def test_superuser_access(self): + self.assertFalse( + self.Authz.has_superuser_access(), + "Access to superuser resource is not authorized.", + ) + + with self.assertRaises(asab.exceptions.AccessDeniedError): + self.Authz.require_superuser_access() + + def test_authorized_resources(self): + self.assertEqual( + self.Authz._resources(), + set(), + "Entity is authorized to access RESOURCE_1, RESOURCE_2 in TENANT_1.", + ) + + +class TestGlobal(unittest.TestCase): + """ + Entity is authorized to access RESOURCE_1 and RESOURCE_2 in TENANT_1, and RESOURCE_1 globally. + It is accessing global space - Tenant context is empty. + + All has-access methods must return True. + All require-access methods must return None. + """ + def setUp(self): + super().setUp() + + claims = { + "iat": time.time() - 60, + "exp": time.time() + 3600, + "resources": { + "*": [RESOURCE_1], + TENANT_1: [RESOURCE_1, RESOURCE_2], + }, + } + self.Authz = Authorization(claims) + self.TenantCtx = asab.contextvars.Tenant.set(None) + + def tearDown(self): + super().tearDown() + asab.contextvars.Tenant.reset(self.TenantCtx) + self.Authz = None + + def test_valid(self): + self.assertTrue( + self.Authz.is_valid(), + "Unexpired authorization must return True.", + ) + + self.assertIsNone( + self.Authz.require_valid(), + "Unexpired authorization must succeed without return." + ) + + def test_resource_access(self): + self.assertTrue( + self.Authz.has_resource_access(RESOURCE_1), + "Global access to RESOURCE_1 is authorized.", + ) + + self.assertIsNone( + self.Authz.require_resource_access(RESOURCE_1), + "Authorized global access to RESOURCE_1 must succeed without return.", + ) + + self.assertFalse( + self.Authz.has_resource_access(RESOURCE_1, RESOURCE_2), + "Global access to RESOURCE_2 is not authorized.", + ) + + with self.assertRaises(asab.exceptions.AccessDeniedError): + self.Authz.require_resource_access(RESOURCE_1, RESOURCE_2) + + def test_tenant_access(self): + # There is no tenant in the context, hence ValueError + with self.assertRaises(ValueError): + self.Authz.has_tenant_access() + + with self.assertRaises(ValueError): + self.Authz.require_tenant_access() + + def test_superuser_access(self): + self.assertFalse( + self.Authz.has_superuser_access(), + "Access to superuser resource is not authorized.", + ) + + with self.assertRaises(asab.exceptions.AccessDeniedError): + self.Authz.require_superuser_access() + + def test_authorized_resources(self): + self.assertEqual( + self.Authz._resources(), + {RESOURCE_1}, + "Entity is globally authorized to access RESOURCE_1.", + ) + + +class TestSuperuserTenant(unittest.TestCase): + """ + Entity is authorized to access RESOURCE_1 and RESOURCE_2 in TENANT_1, and RESOURCE_1 globally. + It is accessing TENANT_1 (in context). + """ + def setUp(self): + super().setUp() + + claims = { + "iat": time.time() - 60, + "exp": time.time() + 3600, + "resources": { + "*": [RESOURCE_SUPERUSER], + }, + } + self.Authz = Authorization(claims) + self.TenantCtx = asab.contextvars.Tenant.set(TENANT_1) + + def tearDown(self): + super().tearDown() + asab.contextvars.Tenant.reset(self.TenantCtx) + self.Authz = None + + def test_valid(self): + self.assertTrue( + self.Authz.is_valid(), + "Unexpired authorization must return True.", + ) + + self.assertIsNone( + self.Authz.require_valid(), + "Unexpired authorization must succeed without return." + ) + + def test_resource_access(self): + self.assertTrue( + self.Authz.has_resource_access(RESOURCE_1), + "Superuser must have unrestricted resource access.", + ) + + self.assertIsNone( + self.Authz.require_resource_access(RESOURCE_1), + "Authorized superuser access to any resource must succeed without return.", + ) + + self.assertTrue( + self.Authz.has_resource_access(RESOURCE_1, RESOURCE_3), + "Superuser must have unrestricted resource access.", + ) + + self.assertIsNone( + self.Authz.require_resource_access(RESOURCE_1, RESOURCE_3), + "Authorized superuser access to any resource must succeed without return.", + ) + + def test_tenant_access(self): + self.assertTrue( + self.Authz.has_tenant_access(), + "Superuser must have unrestricted tenant access.", + ) + + self.assertIsNone( + self.Authz.require_tenant_access(), + "Authorized superuser access to any tenant must succeed without return.", + ) + + def test_superuser_access(self): + self.assertTrue( + self.Authz.has_superuser_access(), + "Superuser must have unrestricted tenant access.", + ) + + self.assertIsNone( + self.Authz.require_superuser_access(), + "Authorized superuser access must succeed without return.", + ) + + def test_authorized_resources(self): + self.assertEqual( + self.Authz._resources(), + {RESOURCE_SUPERUSER}, + ) + + +class TestSuperuserGlobal(unittest.TestCase): + """ + Entity is authorized to access RESOURCE_1 and RESOURCE_2 in TENANT_1, and RESOURCE_1 globally. + It is accessing global space - Tenant context is empty. + + All has-access methods must return True. + All require-access methods must return None. + """ + def setUp(self): + super().setUp() + + claims = { + "iat": time.time() - 60, + "exp": time.time() + 3600, + "resources": { + "*": [RESOURCE_SUPERUSER], + }, + } + self.Authz = Authorization(claims) + self.TenantCtx = asab.contextvars.Tenant.set(None) + + def tearDown(self): + super().tearDown() + asab.contextvars.Tenant.reset(self.TenantCtx) + self.Authz = None + + def test_valid(self): + self.assertTrue( + self.Authz.is_valid(), + "Unexpired authorization must return True.", + ) + + self.assertIsNone( + self.Authz.require_valid(), + "Unexpired authorization must succeed without return." + ) + + def test_resource_access(self): + self.assertTrue( + self.Authz.has_resource_access(RESOURCE_1), + "Superuser must have unrestricted resource access.", + ) + + self.assertIsNone( + self.Authz.require_resource_access(RESOURCE_1), + "Authorized superuser access to any resource must succeed without return.", + ) + + self.assertTrue( + self.Authz.has_resource_access(RESOURCE_1, RESOURCE_3), + "Superuser must have unrestricted resource access.", + ) + + self.assertIsNone( + self.Authz.require_resource_access(RESOURCE_1, RESOURCE_3), + "Authorized superuser access to any resource must succeed without return.", + ) + + def test_tenant_access(self): + # There is no tenant in the context, hence ValueError + with self.assertRaises(ValueError): + self.Authz.has_tenant_access() + + with self.assertRaises(ValueError): + self.Authz.require_tenant_access() + + def test_superuser_access(self): + self.assertTrue( + self.Authz.has_superuser_access(), + "Superuser must have unrestricted tenant access.", + ) + + self.assertIsNone( + self.Authz.require_superuser_access(), + "Authorized superuser access must succeed without return.", + ) + + def test_authorized_resources(self): + self.assertEqual( + self.Authz._resources(), + {RESOURCE_SUPERUSER}, + ) \ No newline at end of file