From d031737964ed08bf52add14a83b800fc84281cad Mon Sep 17 00:00:00 2001 From: Anton Eremin Date: Tue, 12 Sep 2023 13:32:42 +0500 Subject: [PATCH] fix(tests): fmt --- src/access_control.py | 51 +++++++++++++++++++++++--------- src/config.py | 6 +++- src/tests/test_access_control.py | 4 ++- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/src/access_control.py b/src/access_control.py index 65112b9..29e0115 100644 --- a/src/access_control.py +++ b/src/access_control.py @@ -37,33 +37,42 @@ class AccessRequestDecision(BaseModel): approvers: FrozenSet[str] = frozenset() -def make_decision_on_access_request( # noqa: PLR0911 +def make_decision_on_access_request( # noqa: PLR0911 statements: FrozenSet[Statement], permission_set_name: str, account_id: str, requester_email: str, ) -> AccessRequestDecision: - affected_statements = get_affected_statements(statements, account_id, permission_set_name) + affected_statements = get_affected_statements( + statements, account_id, permission_set_name + ) decision_based_on_statements: set[Statement] = set() potential_approvers = set() explicit_deny_self_approval = any( - statement.allow_self_approval is False and requester_email in statement.approvers + statement.allow_self_approval is False + and requester_email in statement.approvers for statement in affected_statements ) explicit_deny_approval_not_required = any( - statement.approval_is_not_required is False - for statement in affected_statements + statement.approval_is_not_required is False for statement in affected_statements ) for statement in affected_statements: - if statement.approval_is_not_required and not explicit_deny_approval_not_required: + if ( + statement.approval_is_not_required + and not explicit_deny_approval_not_required + ): return AccessRequestDecision( grant=True, reason=DecisionReason.ApprovalNotRequired, based_on_statements=frozenset([statement]), ) - if requester_email in statement.approvers and statement.allow_self_approval and not explicit_deny_self_approval: + if ( + requester_email in statement.approvers + and statement.allow_self_approval + and not explicit_deny_self_approval + ): return AccessRequestDecision( grant=True, reason=DecisionReason.SelfApproval, @@ -71,9 +80,11 @@ def make_decision_on_access_request( # noqa: PLR0911 ) decision_based_on_statements.add(statement) - potential_approvers.update(approver for approver in statement.approvers if approver != requester_email) + potential_approvers.update( + approver for approver in statement.approvers if approver != requester_email + ) - if len(decision_based_on_statements) == 0: # sourcery skip + if len(decision_based_on_statements) == 0: # sourcery skip return AccessRequestDecision( grant=False, reason=DecisionReason.NoStatements, @@ -116,12 +127,18 @@ def make_decision_on_approve_request( # noqa: PLR0913 approver_email: str, requester_email: str, ) -> ApproveRequestDecision: - affected_statements = get_affected_statements(statements, account_id, permission_set_name) + affected_statements = get_affected_statements( + statements, account_id, permission_set_name + ) for statement in affected_statements: if approver_email in statement.approvers: is_self_approval = approver_email == requester_email - if is_self_approval and statement.allow_self_approval or not is_self_approval: + if ( + is_self_approval + and statement.allow_self_approval + or not is_self_approval + ): return ApproveRequestDecision( grant=action == entities.ApproverAction.Approve, permit=True, @@ -150,8 +167,12 @@ def execute_decision( # noqa: PLR0913 return False # Temporary solution for testing sso_instance = sso.describe_sso_instance(sso_client, cfg.sso_instance_arn) - permission_set = sso.get_permission_set_by_name(sso_client, sso_instance.arn, permission_set_name) - user_principal_id = sso.get_user_principal_id_by_email(identitystore_client, sso_instance.identity_store_id, requester.email) + permission_set = sso.get_permission_set_by_name( + sso_client, sso_instance.arn, permission_set_name + ) + user_principal_id = sso.get_user_principal_id_by_email( + identitystore_client, sso_instance.identity_store_id, requester.email + ) account_assignment = sso.UserAccountAssignment( instance_arn=sso_instance.arn, account_id=account_id, @@ -159,7 +180,9 @@ def execute_decision( # noqa: PLR0913 user_principal_id=user_principal_id, ) - logger.info("Creating account assignment", extra={"account_assignment": account_assignment}) + logger.info( + "Creating account assignment", extra={"account_assignment": account_assignment} + ) account_assignment_status = sso.create_account_assignment_and_wait_for_result( sso_client, diff --git a/src/config.py b/src/config.py index 81d9720..0aa812c 100644 --- a/src/config.py +++ b/src/config.py @@ -68,7 +68,11 @@ def get_accounts_and_permission_sets(cls, values: dict) -> dict: # noqa: ANN101 permission_sets.update(statement.permission_set) if statement.resource_type == "Account": accounts.update(statement.resource) - return values | {"accounts": accounts, "permission_sets": permission_sets, "statements": frozenset(statements)} + return values | { + "accounts": accounts, + "permission_sets": permission_sets, + "statements": frozenset(statements), + } def get_logger(service: Optional[str] = None, level: Optional[str] = None) -> Logger: diff --git a/src/tests/test_access_control.py b/src/tests/test_access_control.py index c21f08b..6e2f09e 100644 --- a/src/tests/test_access_control.py +++ b/src/tests/test_access_control.py @@ -964,7 +964,9 @@ def test_cases_for_approve_request_decision(request): def test_make_decision_on_access_request(test_cases_for_access_request_decision): - actual = make_decision_on_access_request(**test_cases_for_access_request_decision["in"]) + actual = make_decision_on_access_request( + **test_cases_for_access_request_decision["in"] + ) expected = test_cases_for_access_request_decision["out"] # Compare grant and reason attributes directly