Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auth unit tests #656

Merged
merged 23 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 73 additions & 25 deletions asab/web/auth/authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Authorization:
def __init__(self, claims: dict):
# Userinfo should not be accessed directly
self._Claims = claims or {}
self._Resources = self._Claims.get("resources", {})

self.CredentialsId = self._Claims.get("sub")
self.Username = self._Claims.get("preferred_username") or self._Claims.get("username")
Expand Down Expand Up @@ -62,7 +63,7 @@ def has_superuser_access(self) -> bool:
>>> print("I am but a mere mortal.")
"""
self.require_valid()
return is_superuser(self._Claims)
return is_superuser(self._Resources)


def has_resource_access(self, *resources: str) -> bool:
Expand All @@ -84,7 +85,7 @@ def has_resource_access(self, *resources: str) -> bool:
>>> print("Not much to do here.")
"""
self.require_valid()
return has_resource_access(self._Claims, resources, tenant=Tenant.get(None))
return has_resource_access(self._Resources, resources, tenant=Tenant.get(None))


def has_tenant_access(self) -> bool:
Expand All @@ -102,7 +103,7 @@ def has_tenant_access(self) -> bool:
>>> if authz.has_tenant_access():
>>> print("I have access to Big Corporation!")
>>> else:
>>> print("Not much to do here.")
>>> print("Not much to do here.")
>>> finally:
>>> asab.contextvars.Tenant.reset(tenant_ctx)
"""
Expand All @@ -113,7 +114,7 @@ def has_tenant_access(self) -> bool:
except LookupError as e:
raise ValueError("No tenant in context.") from e

return has_tenant_access(self._Claims, tenant)
return has_tenant_access(self._Resources, tenant)


def require_valid(self):
Expand Down Expand Up @@ -189,16 +190,28 @@ def require_tenant_access(self):

def authorized_resources(self) -> typing.Optional[typing.Set[str]]:
"""
Return the set of authorized resources.
DEPRECATED. Return the set of authorized resources.
byewokko marked this conversation as resolved.
Show resolved Hide resolved

NOTE: If possible, use methods has_resource_access(resource_id) and has_superuser_access() instead of inspecting
the set of resources directly.
Use these methods instead:
- has_resource_access(resource_id)
- has_superuser_access()
- has_tenant_access()
- require_resource_access(resource_id)
- require_superuser_access()
- require_tenant_access()

Returns:
Set of authorized resources.
set: Authorized resources.
"""
self.require_valid()
return get_authorized_resources(self._Claims, Tenant.get(None))

resources = _authorized_resources(self._Resources, Tenant.get(None))

if self.has_superuser_access():
# Ensure superuser resource is present no matter the tenant
resources.add(SUPERUSER_RESOURCE_ID)

return resources


def user_info(self) -> typing.Dict[str, typing.Any]:
Expand Down Expand Up @@ -229,52 +242,87 @@ def get_claim(self, key: str) -> typing.Any:
return self._Claims.get(key)


def is_superuser(userinfo: typing.Mapping) -> bool:
def is_superuser(resources_claim: typing.Mapping) -> bool:
"""
Check if the superuser resource is present in the authorized resource list.

Args:
resources_claim (typing.Mapping): The "resources" field from authorization server claims (aka UserInfo).

Returns:
bool: Do I have superuser access?
"""
return SUPERUSER_RESOURCE_ID in get_authorized_resources(userinfo, tenant=None)
return SUPERUSER_RESOURCE_ID in _authorized_resources(resources_claim, tenant=None)


def has_resource_access(
userinfo: typing.Mapping,
resources: typing.Iterable,
resources_claim: typing.Mapping,
resources: typing.Collection[str],
tenant: typing.Union[str, None],
) -> bool:
"""
Check if the requested resources or the superuser resource are present in the authorized resource list.

Args:
resources_claim (typing.Mapping): The "resources" field from authorization server claims (aka UserInfo).
resources (typing.Collection[str]): A list of resource IDs whose authorization is requested.
tenant (str): Tenant context of the authorization (or `None` for global context).

Returns:
bool: Am I authorized to access requested resources?
"""
if is_superuser(userinfo):
if len(resources) == 0:
raise ValueError("Resources must not be empty")

authorized_resources = _authorized_resources(resources_claim, tenant)

if is_superuser(resources_claim):
return True

authorized_resources = get_authorized_resources(userinfo, tenant)
for resource in resources:
if resource not in authorized_resources:
return False

return True


def has_tenant_access(userinfo: typing.Mapping, tenant: str) -> bool:
def has_tenant_access(resources_claim: typing.Mapping, tenant: str) -> bool:
"""
Check the agent's userinfo to see if they are authorized to access a tenant.
If the agent has superuser access, tenant access is always implicitly granted.

Args:
resources_claim (typing.Mapping): The "resources" field from authorization server claims (aka UserInfo).
tenant (str): Tenant context of the authorization.

Returns:
bool: Am I authorized to access requested tenant?
"""
if tenant == "*":
raise ValueError("Invalid tenant name: '*'")
if is_superuser(userinfo):
if tenant in {"*", None}:
raise ValueError("Invalid tenant: {!r}".format(tenant))

if is_superuser(resources_claim):
return True
if tenant in userinfo.get("resources", {}):

if tenant in resources_claim:
return True

return False


def get_authorized_resources(userinfo: typing.Mapping, tenant: typing.Union[str, None]) -> typing.Set[str]:
def _authorized_resources(resources_claim: typing.Mapping, tenant: typing.Union[str, None]) -> typing.Set[str]:
"""
Extract resources authorized within given tenant (or globally, if tenant is None).

:param userinfo:
:param tenant:
:return: Set of authorized resources.
Args:
resources_claim (typing.Mapping): The "resources" field from authorization server claims (aka UserInfo).
tenant (str): Tenant context of the authorization (or `None` for global context).

Returns:
set: Resources authorized within the tenant (or globally).
"""
return set(userinfo.get("resources", {}).get(tenant if tenant is not None else "*", []))
if tenant == "*":
# Use `None` for global context instead!
raise ValueError("Invalid tenant name: {}".format(tenant))

return set(resources_claim.get(tenant if tenant is not None else "*", []))
2 changes: 2 additions & 0 deletions test/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
from .test_metrics.test_histogram import *
from .test_metrics.test_duplicates import *
from .test_crash.test_crash import *
from .test_auth.test_access_control import *
from .test_auth.test_authorization import *
2 changes: 2 additions & 0 deletions test/test_auth/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .test_access_control import *
from .test_authorization import *
Loading
Loading