diff --git a/test/functional/tests/cache_ops/test_concurrent_flushes.py b/test/functional/tests/cache_ops/test_concurrent_flushes.py index add7978d5..df9458c86 100644 --- a/test/functional/tests/cache_ops/test_concurrent_flushes.py +++ b/test/functional/tests/cache_ops/test_concurrent_flushes.py @@ -4,12 +4,13 @@ # SPDX-License-Identifier: BSD-3-Clause # -import pytest - from time import sleep +import pytest + from api.cas import casadm, casadm_parser, cli from api.cas.cache_config import CacheMode, CleaningPolicy, CacheModeTrait, SeqCutOffPolicy +from api.cas.casadm_params import StatsFilter from core.test_run import TestRun from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan from test_tools.fio.fio import Fio @@ -23,10 +24,10 @@ @pytest.mark.require_disk("core", DiskTypeSet([DiskType.hdd, DiskType.hdd4k])) def test_concurrent_cores_flush(cache_mode: CacheMode): """ - title: Fail to flush two cores simultaneously. + title: Flush two cores simultaneously - negative. description: | - CAS should return an error on attempt to flush second core if there is already - one flush in progress. + Validate that the attempt to flush another core when there is already one flush in + progress on the same cache will fail. pass_criteria: - No system crash. - First core flushing should finish successfully. @@ -39,7 +40,7 @@ def test_concurrent_cores_flush(cache_mode: CacheMode): core_dev = TestRun.disks["core"] cache_dev.create_partitions([Size(2, Unit.GibiByte)]) - core_dev.create_partitions([Size(5, Unit.GibiByte)] * 2) + core_dev.create_partitions([Size(2, Unit.GibiByte)] * 2) cache_part = cache_dev.partitions[0] core_part1 = core_dev.partitions[0] @@ -48,7 +49,7 @@ def test_concurrent_cores_flush(cache_mode: CacheMode): with TestRun.step("Start cache"): cache = casadm.start_cache(cache_part, cache_mode, force=True) - with TestRun.step(f"Add both core devices to cache"): + with TestRun.step("Add both core devices to cache"): core1 = cache.add_core(core_part1) core2 = cache.add_core(core_part2) @@ -56,37 +57,34 @@ def test_concurrent_cores_flush(cache_mode: CacheMode): cache.set_cleaning_policy(CleaningPolicy.nop) cache.set_seq_cutoff_policy(SeqCutOffPolicy.never) - with TestRun.step("Run concurrent fio on both cores"): - fio_pids = [] + with TestRun.step("Run fio on both cores"): + data_per_core = cache.size / 2 + fio = ( + Fio() + .create_command() + .io_engine(IoEngine.libaio) + .size(data_per_core) + .block_size(Size(4, Unit.MebiByte)) + .read_write(ReadWrite.write) + .direct(1) + ) for core in [core1, core2]: - fio = ( - Fio() - .create_command() - .io_engine(IoEngine.libaio) - .target(core.path) - .size(core.size) - .block_size(Size(4, Unit.MebiByte)) - .read_write(ReadWrite.write) - .direct(1) - ) - fio_pid = fio.run_in_background() - fio_pids.append(fio_pid) - - for fio_pid in fio_pids: - if not TestRun.executor.check_if_process_exists(fio_pid): - TestRun.fail("Fio failed to start") - - with TestRun.step("Wait for fio to finish"): - for fio_pid in fio_pids: - while TestRun.executor.check_if_process_exists(fio_pid): - sleep(1) + fio.add_job().target(core.path) + fio.run() with TestRun.step("Check if both cores contain dirty blocks"): - if core1.get_dirty_blocks() == Size.zero(): - TestRun.fail("The first core does not contain dirty blocks") - if core2.get_dirty_blocks() == Size.zero(): - TestRun.fail("The second core does not contain dirty blocks") - core2_dirty_blocks_before = core2.get_dirty_blocks() + required_dirty_data = ( + (data_per_core * 0.9).align_down(Unit.Blocks4096.value).set_unit(Unit.Blocks4096) + ) + core1_dirty_data = core1.get_dirty_blocks() + if core1_dirty_data < required_dirty_data: + TestRun.fail(f"Core {core1.core_id} does not contain enough dirty data.\n" + f"Expected at least {required_dirty_data}, actual {core1_dirty_data}.") + core2_dirty_data_before = core2.get_dirty_blocks() + if core2_dirty_data_before < required_dirty_data: + TestRun.fail(f"Core {core2.core_id} does not contain enough dirty data.\n" + f"Expected at least {required_dirty_data}, actual " + f" {core2_dirty_data_before}.") with TestRun.step("Start flushing the first core in background"): output_pid = TestRun.executor.run_in_background( @@ -104,7 +102,7 @@ def test_concurrent_cores_flush(cache_mode: CacheMode): pass with TestRun.step( - "Wait until first core reach 40% flush and start flush operation on the second core" + "Wait until first core reaches 40% flush and start flush operation on the second core" ): percentage = 0 while percentage < 40: @@ -131,18 +129,20 @@ def test_concurrent_cores_flush(cache_mode: CacheMode): except CmdException: TestRun.LOGGER.info("The first core is not flushing dirty data anymore") - with TestRun.step("Check number of dirty data on both cores"): - if core1.get_dirty_blocks() > Size.zero(): + with TestRun.step("Check the size of dirty data on both cores"): + core1_dirty_data = core1.get_dirty_blocks() + if core1_dirty_data > Size.zero(): TestRun.LOGGER.error( - "The quantity of dirty cache lines on the first core " - "after completed flush should be zero" + "There should not be any dirty data on the first core after completed flush.\n" + f"Dirty data: {core1_dirty_data}." ) - core2_dirty_blocks_after = core2.get_dirty_blocks() - if core2_dirty_blocks_before != core2_dirty_blocks_after: + core2_dirty_data_after = core2.get_dirty_blocks() + if core2_dirty_data_after != core2_dirty_data_before: TestRun.LOGGER.error( - "The quantity of dirty cache lines on the second core " - "after failed flush should not change" + "Dirty data on the second core after failed flush should not change." + f"Dirty data before flush: {core2_dirty_data_before}, " + f"after: {core2_dirty_data_after}" ) @@ -151,7 +151,7 @@ def test_concurrent_cores_flush(cache_mode: CacheMode): @pytest.mark.require_disk("core", DiskTypeLowerThan("cache")) def test_concurrent_caches_flush(cache_mode: CacheMode): """ - title: Success to flush two caches simultaneously. + title: Flush multiple caches simultaneously. description: | CAS should successfully flush multiple caches if there is already other flush in progress. pass_criteria: @@ -178,28 +178,29 @@ def test_concurrent_caches_flush(cache_mode: CacheMode): cache.set_cleaning_policy(CleaningPolicy.nop) cache.set_seq_cutoff_policy(SeqCutOffPolicy.never) - with TestRun.step(f"Add core devices to caches"): + with TestRun.step("Add cores to caches"): cores = [cache.add_core(core_dev=core_dev.partitions[i]) for i, cache in enumerate(caches)] with TestRun.step("Run fio on all cores"): - fio_pids = [] + fio = ( + Fio() + .create_command() + .io_engine(IoEngine.libaio) + .block_size(Size(4, Unit.MebiByte)) + .size(cache.size) + .read_write(ReadWrite.write) + .direct(1) + ) for core in cores: - fio = ( - Fio() - .create_command() - .target(core) - .io_engine(IoEngine.libaio) - .block_size(Size(4, Unit.MebiByte)) - .size(core.size) - .read_write(ReadWrite.write) - .direct(1) - ) - fio_pids.append(fio.run_in_background()) + fio.add_job().target(core) + fio.run() with TestRun.step("Check if each cache is full of dirty blocks"): for cache in caches: - if not cache.get_dirty_blocks() != core.size: - TestRun.fail(f"The cache {cache.cache_id} does not contain dirty blocks") + cache_stats = cache.get_statistics(stat_filter=[StatsFilter.usage], percentage_val=True) + if cache_stats.usage_stats.dirty < 90: + TestRun.fail(f"Cache {cache.cache_id} should contain at least 90% of dirty data, " + f"actual dirty data: {cache_stats.usage_stats.dirty}%") with TestRun.step("Start flush operation on all caches simultaneously"): flush_pids = [ @@ -214,8 +215,9 @@ def test_concurrent_caches_flush(cache_mode: CacheMode): with TestRun.step("Check number of dirty data on each cache"): for cache in caches: - if cache.get_dirty_blocks() > Size.zero(): + dirty_blocks = cache.get_dirty_blocks() + if dirty_blocks > Size.zero(): TestRun.LOGGER.error( - f"The quantity of dirty cache lines on the cache " - f"{str(cache.cache_id)} after complete flush should be zero" + f"The quantity of dirty data on cache {cache.cache_id} after complete " + f"flush should be zero, is: {dirty_blocks.set_unit(Unit.Blocks4096)}" )