Skip to content

Commit

Permalink
fix(tests): fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
EreminAnton committed Sep 12, 2023
1 parent dee438f commit d031737
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 16 deletions.
51 changes: 37 additions & 14 deletions src/access_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,43 +37,54 @@ class AccessRequestDecision(BaseModel):
approvers: FrozenSet[str] = frozenset()


def make_decision_on_access_request( # noqa: PLR0911
def make_decision_on_access_request( # noqa: PLR0911
statements: FrozenSet[Statement],
permission_set_name: str,
account_id: str,
requester_email: str,
) -> AccessRequestDecision:
affected_statements = get_affected_statements(statements, account_id, permission_set_name)
affected_statements = get_affected_statements(
statements, account_id, permission_set_name
)
decision_based_on_statements: set[Statement] = set()
potential_approvers = set()

explicit_deny_self_approval = any(
statement.allow_self_approval is False and requester_email in statement.approvers
statement.allow_self_approval is False
and requester_email in statement.approvers
for statement in affected_statements
)
explicit_deny_approval_not_required = any(
statement.approval_is_not_required is False
for statement in affected_statements
statement.approval_is_not_required is False for statement in affected_statements
)

for statement in affected_statements:
if statement.approval_is_not_required and not explicit_deny_approval_not_required:
if (
statement.approval_is_not_required
and not explicit_deny_approval_not_required
):
return AccessRequestDecision(
grant=True,
reason=DecisionReason.ApprovalNotRequired,
based_on_statements=frozenset([statement]),
)
if requester_email in statement.approvers and statement.allow_self_approval and not explicit_deny_self_approval:
if (
requester_email in statement.approvers
and statement.allow_self_approval
and not explicit_deny_self_approval
):
return AccessRequestDecision(
grant=True,
reason=DecisionReason.SelfApproval,
based_on_statements=frozenset([statement]),
)

decision_based_on_statements.add(statement)
potential_approvers.update(approver for approver in statement.approvers if approver != requester_email)
potential_approvers.update(
approver for approver in statement.approvers if approver != requester_email
)

if len(decision_based_on_statements) == 0: # sourcery skip
if len(decision_based_on_statements) == 0: # sourcery skip
return AccessRequestDecision(
grant=False,
reason=DecisionReason.NoStatements,
Expand Down Expand Up @@ -116,12 +127,18 @@ def make_decision_on_approve_request( # noqa: PLR0913
approver_email: str,
requester_email: str,
) -> ApproveRequestDecision:
affected_statements = get_affected_statements(statements, account_id, permission_set_name)
affected_statements = get_affected_statements(
statements, account_id, permission_set_name
)

for statement in affected_statements:
if approver_email in statement.approvers:
is_self_approval = approver_email == requester_email
if is_self_approval and statement.allow_self_approval or not is_self_approval:
if (
is_self_approval
and statement.allow_self_approval
or not is_self_approval
):
return ApproveRequestDecision(
grant=action == entities.ApproverAction.Approve,
permit=True,
Expand Down Expand Up @@ -150,16 +167,22 @@ def execute_decision( # noqa: PLR0913
return False # Temporary solution for testing

sso_instance = sso.describe_sso_instance(sso_client, cfg.sso_instance_arn)
permission_set = sso.get_permission_set_by_name(sso_client, sso_instance.arn, permission_set_name)
user_principal_id = sso.get_user_principal_id_by_email(identitystore_client, sso_instance.identity_store_id, requester.email)
permission_set = sso.get_permission_set_by_name(
sso_client, sso_instance.arn, permission_set_name
)
user_principal_id = sso.get_user_principal_id_by_email(
identitystore_client, sso_instance.identity_store_id, requester.email
)
account_assignment = sso.UserAccountAssignment(
instance_arn=sso_instance.arn,
account_id=account_id,
permission_set_arn=permission_set.arn,
user_principal_id=user_principal_id,
)

logger.info("Creating account assignment", extra={"account_assignment": account_assignment})
logger.info(
"Creating account assignment", extra={"account_assignment": account_assignment}
)

account_assignment_status = sso.create_account_assignment_and_wait_for_result(
sso_client,
Expand Down
6 changes: 5 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ def get_accounts_and_permission_sets(cls, values: dict) -> dict: # noqa: ANN101
permission_sets.update(statement.permission_set)
if statement.resource_type == "Account":
accounts.update(statement.resource)
return values | {"accounts": accounts, "permission_sets": permission_sets, "statements": frozenset(statements)}
return values | {
"accounts": accounts,
"permission_sets": permission_sets,
"statements": frozenset(statements),
}


def get_logger(service: Optional[str] = None, level: Optional[str] = None) -> Logger:
Expand Down
4 changes: 3 additions & 1 deletion src/tests/test_access_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,9 @@ def test_cases_for_approve_request_decision(request):


def test_make_decision_on_access_request(test_cases_for_access_request_decision):
actual = make_decision_on_access_request(**test_cases_for_access_request_decision["in"])
actual = make_decision_on_access_request(
**test_cases_for_access_request_decision["in"]
)
expected = test_cases_for_access_request_decision["out"]

# Compare grant and reason attributes directly
Expand Down

0 comments on commit d031737

Please sign in to comment.