Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small update to test_concurrent_caches_flush #1562

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 65 additions & 63 deletions test/functional/tests/cache_ops/test_concurrent_flushes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
# SPDX-License-Identifier: BSD-3-Clause
#

import pytest

from time import sleep

import pytest

from api.cas import casadm, casadm_parser, cli
from api.cas.cache_config import CacheMode, CleaningPolicy, CacheModeTrait, SeqCutOffPolicy
from api.cas.casadm_params import StatsFilter
from core.test_run import TestRun
from storage_devices.disk import DiskType, DiskTypeSet, DiskTypeLowerThan
from test_tools.fio.fio import Fio
Expand All @@ -23,10 +24,10 @@
@pytest.mark.require_disk("core", DiskTypeSet([DiskType.hdd, DiskType.hdd4k]))
def test_concurrent_cores_flush(cache_mode: CacheMode):
"""
title: Fail to flush two cores simultaneously.
title: Flush two cores simultaneously - negative.
description: |
CAS should return an error on attempt to flush second core if there is already
one flush in progress.
Validate that the attempt to flush another core when there is already one flush in
progress on the same cache will fail.
pass_criteria:
- No system crash.
- First core flushing should finish successfully.
Expand All @@ -39,7 +40,7 @@ def test_concurrent_cores_flush(cache_mode: CacheMode):
core_dev = TestRun.disks["core"]

cache_dev.create_partitions([Size(2, Unit.GibiByte)])
core_dev.create_partitions([Size(5, Unit.GibiByte)] * 2)
core_dev.create_partitions([Size(2, Unit.GibiByte)] * 2)

cache_part = cache_dev.partitions[0]
core_part1 = core_dev.partitions[0]
Expand All @@ -48,45 +49,42 @@ def test_concurrent_cores_flush(cache_mode: CacheMode):
with TestRun.step("Start cache"):
cache = casadm.start_cache(cache_part, cache_mode, force=True)

with TestRun.step(f"Add both core devices to cache"):
with TestRun.step("Add both core devices to cache"):
core1 = cache.add_core(core_part1)
core2 = cache.add_core(core_part2)

with TestRun.step("Disable cleaning and sequential cutoff"):
cache.set_cleaning_policy(CleaningPolicy.nop)
cache.set_seq_cutoff_policy(SeqCutOffPolicy.never)

with TestRun.step("Run concurrent fio on both cores"):
fio_pids = []
with TestRun.step("Run fio on both cores"):
data_per_core = cache.size / 2
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.size(data_per_core)
.block_size(Size(4, Unit.MebiByte))
.read_write(ReadWrite.write)
.direct(1)
)
for core in [core1, core2]:
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.target(core.path)
.size(core.size)
.block_size(Size(4, Unit.MebiByte))
.read_write(ReadWrite.write)
.direct(1)
)
fio_pid = fio.run_in_background()
fio_pids.append(fio_pid)

for fio_pid in fio_pids:
if not TestRun.executor.check_if_process_exists(fio_pid):
TestRun.fail("Fio failed to start")

with TestRun.step("Wait for fio to finish"):
for fio_pid in fio_pids:
while TestRun.executor.check_if_process_exists(fio_pid):
sleep(1)
fio.add_job().target(core.path)
fio.run()

with TestRun.step("Check if both cores contain dirty blocks"):
if core1.get_dirty_blocks() == Size.zero():
TestRun.fail("The first core does not contain dirty blocks")
if core2.get_dirty_blocks() == Size.zero():
TestRun.fail("The second core does not contain dirty blocks")
core2_dirty_blocks_before = core2.get_dirty_blocks()
required_dirty_data = (
(data_per_core * 0.9).align_down(Unit.Blocks4096.value).set_unit(Unit.Blocks4096)
)
core1_dirty_data = core1.get_dirty_blocks()
if core1_dirty_data < required_dirty_data:
TestRun.fail(f"Core {core1.core_id} does not contain enough dirty data.\n"
f"Expected at least {required_dirty_data}, actual {core1_dirty_data}.")
core2_dirty_data_before = core2.get_dirty_blocks()
if core2_dirty_data_before < required_dirty_data:
TestRun.fail(f"Core {core2.core_id} does not contain enough dirty data.\n"
f"Expected at least {required_dirty_data}, actual "
f" {core2_dirty_data_before}.")

with TestRun.step("Start flushing the first core in background"):
output_pid = TestRun.executor.run_in_background(
Expand All @@ -104,7 +102,7 @@ def test_concurrent_cores_flush(cache_mode: CacheMode):
pass

with TestRun.step(
"Wait until first core reach 40% flush and start flush operation on the second core"
"Wait until first core reaches 40% flush and start flush operation on the second core"
):
percentage = 0
while percentage < 40:
Expand All @@ -131,18 +129,20 @@ def test_concurrent_cores_flush(cache_mode: CacheMode):
except CmdException:
TestRun.LOGGER.info("The first core is not flushing dirty data anymore")

with TestRun.step("Check number of dirty data on both cores"):
if core1.get_dirty_blocks() > Size.zero():
with TestRun.step("Check the size of dirty data on both cores"):
core1_dirty_data = core1.get_dirty_blocks()
if core1_dirty_data > Size.zero():
TestRun.LOGGER.error(
"The quantity of dirty cache lines on the first core "
"after completed flush should be zero"
"There should not be any dirty data on the first core after completed flush.\n"
f"Dirty data: {core1_dirty_data}."
)

core2_dirty_blocks_after = core2.get_dirty_blocks()
if core2_dirty_blocks_before != core2_dirty_blocks_after:
core2_dirty_data_after = core2.get_dirty_blocks()
if core2_dirty_data_after != core2_dirty_data_before:
TestRun.LOGGER.error(
"The quantity of dirty cache lines on the second core "
"after failed flush should not change"
"Dirty data on the second core after failed flush should not change."
f"Dirty data before flush: {core2_dirty_data_before}, "
f"after: {core2_dirty_data_after}"
)


Expand All @@ -151,7 +151,7 @@ def test_concurrent_cores_flush(cache_mode: CacheMode):
@pytest.mark.require_disk("core", DiskTypeLowerThan("cache"))
def test_concurrent_caches_flush(cache_mode: CacheMode):
"""
title: Success to flush two caches simultaneously.
title: Flush multiple caches simultaneously.
description: |
CAS should successfully flush multiple caches if there is already other flush in progress.
pass_criteria:
Expand All @@ -178,28 +178,29 @@ def test_concurrent_caches_flush(cache_mode: CacheMode):
cache.set_cleaning_policy(CleaningPolicy.nop)
cache.set_seq_cutoff_policy(SeqCutOffPolicy.never)

with TestRun.step(f"Add core devices to caches"):
with TestRun.step("Add cores to caches"):
cores = [cache.add_core(core_dev=core_dev.partitions[i]) for i, cache in enumerate(caches)]

with TestRun.step("Run fio on all cores"):
fio_pids = []
fio = (
Fio()
.create_command()
.io_engine(IoEngine.libaio)
.block_size(Size(4, Unit.MebiByte))
.size(cache.size)
.read_write(ReadWrite.write)
.direct(1)
)
for core in cores:
fio = (
Fio()
.create_command()
.target(core)
.io_engine(IoEngine.libaio)
.block_size(Size(4, Unit.MebiByte))
.size(core.size)
.read_write(ReadWrite.write)
.direct(1)
)
fio_pids.append(fio.run_in_background())
fio.add_job().target(core)
fio.run()

with TestRun.step("Check if each cache is full of dirty blocks"):
for cache in caches:
if not cache.get_dirty_blocks() != core.size:
TestRun.fail(f"The cache {cache.cache_id} does not contain dirty blocks")
cache_stats = cache.get_statistics(stat_filter=[StatsFilter.usage], percentage_val=True)
if cache_stats.usage_stats.dirty < 90:
TestRun.fail(f"Cache {cache.cache_id} should contain at least 90% of dirty data, "
f"actual dirty data: {cache_stats.usage_stats.dirty}%")

with TestRun.step("Start flush operation on all caches simultaneously"):
flush_pids = [
Expand All @@ -214,8 +215,9 @@ def test_concurrent_caches_flush(cache_mode: CacheMode):

with TestRun.step("Check number of dirty data on each cache"):
for cache in caches:
if cache.get_dirty_blocks() > Size.zero():
dirty_blocks = cache.get_dirty_blocks()
if dirty_blocks > Size.zero():
TestRun.LOGGER.error(
f"The quantity of dirty cache lines on the cache "
f"{str(cache.cache_id)} after complete flush should be zero"
f"The quantity of dirty data on cache {cache.cache_id} after complete "
f"flush should be zero, is: {dirty_blocks.set_unit(Unit.Blocks4096)}"
)