diff --git a/ocs_ci/ocs/resources/bucket_policy.py b/ocs_ci/ocs/resources/bucket_policy.py index ddb88f93e9b..15089ce365f 100644 --- a/ocs_ci/ocs/resources/bucket_policy.py +++ b/ocs_ci/ocs/resources/bucket_policy.py @@ -130,7 +130,14 @@ def __init__( def gen_bucket_policy( - user_list, actions_list, resources_list, effect="Allow", sid="statement" + user_list, + actions_list, + resources_list, + effect=None, + sid="statement", + principal_property=None, + action_property=None, + resource_property=None, ): """ Function prepares bucket policy parameters in syntax and format provided by AWS bucket policy @@ -141,6 +148,9 @@ def gen_bucket_policy( resources_list (list): List of resources. Eg: Bucket name, specific object in a bucket etc effect (str): Permission given to the bucket policy ie: Allow(default) or Deny sid (str): Statement name. Can be any string. Default: "Statement" + principal_property (str): Element to specify the principal to allow/deny access to a resource. + action_property (str): Element describes the specific action(s) that will be allowed or denied. + resource_property (str): Element specifies the object(s) that the statement covers Returns: dict: Bucket policy in json format @@ -152,19 +162,24 @@ def gen_bucket_policy( ) ver = datetime.date.today().strftime("%Y-%m-%d") + principal = principal_property if principal_property else "Principal" + effect = effect if effect else "Allow" + action = action_property if action_property else "Action" + resource = resource_property if resource_property else "Resource" + logger.info(f"version: {ver}") - logger.info(f"principal_list: {principals}") - logger.info(f"actions_list: {actions_list}") - logger.info(f"resource: {resources_list}") + logger.info(f"{principal}: {principals}") + logger.info(f"{action}: {actions_list}") + logger.info(f"{resource}: {resources_list}") logger.info(f"effect: {effect}") logger.info(f"sid: {sid}") bucket_policy = { "Version": ver, "Statement": [ { - "Action": actions, - "Principal": {"AWS": principals}, - "Resource": resources, + action: actions, + principal: {"AWS": principals}, + resource: resources, "Effect": effect, "Sid": sid, } diff --git a/tests/functional/object/mcg/test_bucket_policy.py b/tests/functional/object/mcg/test_bucket_policy.py index e35e0bde54a..8fdf7208fba 100644 --- a/tests/functional/object/mcg/test_bucket_policy.py +++ b/tests/functional/object/mcg/test_bucket_policy.py @@ -59,6 +59,34 @@ logger = logging.getLogger(__name__) +def delete_bucket_policy_verify(s3_obj, bucket_name): + """ + Delete bucket policy and confirm it got deleted successfully, if not throwing + UnexpectedBehaviour with an invalid error code. + Args: + s3_obj (obj): MCG or OBC object + bucket_name (str): Name of the bucket + """ + + # Delete bucket policy + logger.info(f"Delete bucket policy by admin on bucket: {bucket_name}") + delete_policy = delete_bucket_policy(s3_obj, bucket_name) + logger.info(f"Delete policy response: {delete_policy}") + + # Confirming again by calling get_bucket_policy + try: + get_bucket_policy(s3_obj, bucket_name) + except boto3exception.ClientError as e: + logger.info(e.response) + response = HttpResponseParser(e.response) + if response.error["Code"] == "NoSuchBucketPolicy": + logger.info("Bucket policy has been deleted successfully") + else: + raise UnexpectedBehaviour( + f"{e.response} received invalid error code {response.error['Code']}" + ) + + @mcg @red_squad @runs_on_provider @@ -850,6 +878,199 @@ def test_bucket_policy_verify_invalid_scenarios( f"{e.response} received invalid error code {response.error['Code']}" ) + @pytest.mark.polarion_id("OCS-5767") + @tier1 + def test_bucket_policy_elements_NotPrincipal(self, mcg_obj, bucket_factory): + """ + Test bucket policy element of NotPrincipal and Effect: Deny + """ + + # Creating obc and obc object + obc_bucket = bucket_factory(amount=1, interface="OC") + obc_obj = OBC(obc_bucket[0].name) + + # Create data and object key + data = "Sample string content to write to a new S3 object" + object_key = "ObjKey-" + str(uuid.uuid4().hex) + + # Set bucket policy for obc_bucket + bucket_policy_generated = gen_bucket_policy( + principal_property="NotPrincipal", + user_list=[obc_obj.obc_account], + actions_list=["PutObject"], + resources_list=[f'{obc_obj.bucket_name}/{"*"}'], + effect="Deny", + ) + bucket_policy = json.dumps(bucket_policy_generated) + + # Add Bucket Policy + logger.info(f"Creating bucket policy on bucket: {obc_obj.bucket_name}") + put_bucket_policy(mcg_obj, obc_obj.bucket_name, bucket_policy) + + # Get bucket policy + logger.info(f"Getting Bucket policy on bucket: {obc_obj.bucket_name}") + get_policy = get_bucket_policy(mcg_obj, obc_obj.bucket_name) + logger.info(f"Got bucket policy: {get_policy['Policy']}") + + # Verify put Object is allowed. + logger.info(f"Put Object to the bucket: {obc_obj.bucket_name} ") + assert s3_put_object( + mcg_obj, + obc_obj.bucket_name, + object_key, + data, + ), f"Failed to put object to bucket {obc_obj.bucket_name}" + + # Delete policy and confirm policy got deleted. + delete_bucket_policy_verify(obc_obj, obc_obj.bucket_name) + + @pytest.mark.parametrize( + argnames="effect", + argvalues=[ + pytest.param( + *["Allow"], marks=[tier1, pytest.mark.polarion_id("OCS-5768")] + ), + pytest.param(*["Deny"], marks=[tier1, pytest.mark.polarion_id("OCS-5769")]), + ], + ) + def test_bucket_policy_elements_NotAction(self, mcg_obj, bucket_factory, effect): + """ + Test bucket policy element of NotAction with Effect: Allow/Deny + """ + + # Creating obc and obc object to get account details, keys etc + obc_bucket = bucket_factory(amount=1, interface="OC") + obc_obj = OBC(obc_bucket[0].name) + + # Set bucket policy for user + bucket_policy_generated = gen_bucket_policy( + user_list=obc_obj.obc_account, + action_property="NotAction", + actions_list=["DeleteBucket"], + resources_list=[f'{obc_obj.bucket_name}/{"*"}'], + effect=effect, + ) + bucket_policy = json.dumps(bucket_policy_generated) + + # Add Bucket Policy + logger.info(f"Creating bucket policy on bucket: {obc_obj.bucket_name}") + put_policy = put_bucket_policy(mcg_obj, obc_obj.bucket_name, bucket_policy) + logger.info(f"Put bucket policy response from admin: {put_policy}") + + # Get bucket policy on the bucket + logger.info(f"Getting Bucket policy on bucket: {obc_obj.bucket_name}") + get_policy = get_bucket_policy(mcg_obj, obc_obj.bucket_name) + logger.info(f"Got bucket policy: {get_policy['Policy']}") + + # Verify DeleteBucket and putObject operation + # in both scenarios: Effect=Allow/Deny + if effect == "Allow": + # Put Object is allowed + logger.info("Writing index data to the bucket") + assert s3_put_object( + s3_obj=obc_obj, + bucketname=obc_obj.bucket_name, + object_key="index.html", + data=index, + content_type="text/html", + ), "Failed to put object." + + # Delete bucket get access denied. + logger.info(f"Deleting bucket {obc_obj.bucket_name}") + try: + s3_delete_bucket_website(s3_obj=obc_obj, bucketname=obc_obj.bucket_name) + raise UnexpectedBehaviour( + "Failed: Bucket got deleted, expect to get AccessDenied." + ) + except boto3exception.ClientError as e: + logger.info(e.response) + response = HttpResponseParser(e.response) + if response.error["Code"] == "AccessDenied": + logger.info(f"Bucket deleting got {response.error['Code']}") + else: + raise UnexpectedBehaviour( + f"{e.response} received invalid error code " + f"{response.error['Code']}" + ) + if effect == "Deny": + # Put Object get access denied. + logger.info("Writing index data to the bucket") + try: + s3_put_object( + s3_obj=obc_obj, + bucketname=obc_obj.bucket_name, + object_key="index.html", + data=index, + content_type="text/html", + ) + raise UnexpectedBehaviour( + "Failed: Completed put object to bucket, expect to get AccessDenied." + ) + except boto3exception.ClientError as e: + logger.info(e.response) + response = HttpResponseParser(e.response) + if response.error["Code"] == "AccessDenied": + logger.info(f"PutObject got {response.error['Code']}") + else: + raise UnexpectedBehaviour( + f"{e.response} received invalid error code " + f"{response.error['Code']}" + ) + + # Delete bucket is allowed. + logger.info(f"Deleting bucket {obc_obj.bucket_name}") + assert s3_delete_bucket_website( + s3_obj=obc_obj, bucketname=obc_obj.bucket_name + ), "Failed to delete bucket." + + # Delete policy and confirm policy got deleted. + delete_bucket_policy_verify(obc_obj, obc_obj.bucket_name) + + @pytest.mark.polarion_id("OCS-5770") + @tier1 + def test_bucket_policy_elements_NotResource(self, mcg_obj, bucket_factory): + """ + Test bucket policy element of NotResource with Effect: Deny + """ + + # Creating obc and obc object to get account details, keys etc + obc_bucket = bucket_factory(amount=1, interface="OC") + obc_obj = OBC(obc_bucket[0].name) + + # Create data and object key + data = "Sample string content to write to a new S3 object" + object_key = "ObjKey-" + str(uuid.uuid4().hex) + + # Set bucket policy for user + bucket_policy_generated = gen_bucket_policy( + user_list=obc_obj.obc_account, + actions_list=["*"], + resources_list=[obc_obj.bucket_name, f'{obc_obj.bucket_name}/{"*"}'], + resource_property="NotResource", + effect="Deny", + ) + bucket_policy = json.dumps(bucket_policy_generated) + + # Add Bucket Policy + logger.info(f"Creating bucket policy on bucket: {obc_obj.bucket_name}") + put_bucket_policy(mcg_obj, obc_obj.bucket_name, bucket_policy) + + # Get bucket policy + logger.info(f"Getting Bucket policy on bucket: {obc_obj.bucket_name}") + get_policy = get_bucket_policy(mcg_obj, obc_obj.bucket_name) + logger.info(f"Got bucket policy: {get_policy['Policy']}") + + # Verify S3 action (putObject) is allowed. + logger.info( + f"Adding object on the bucket: {obc_obj.bucket_name} using user: {obc_obj.obc_account}" + ) + assert s3_put_object( + obc_obj, obc_obj.bucket_name, object_key, data + ), "Failed to put Object" + + # Delete policy and confirm policy got deleted. + delete_bucket_policy_verify(obc_obj, obc_obj.bucket_name) + @pytest.mark.polarion_id("OCS-2451") @pytest.mark.bugzilla("1893163") @skipif_ocs_version("<4.6")