Skip to content
This repository has been archived by the owner on Mar 22, 2023. It is now read-only.

[tests] add async cases for timestamp test #274

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion tests/common/stream_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ struct pmemstream_helpers_type {
rrt = it->second;
}
for (const auto &e : data) {

auto [ret, entry] = stream.append(region, e, rrt);
UT_ASSERTeq(ret, 0);
}
Expand Down
89 changes: 84 additions & 5 deletions tests/unittest/timestamp.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// SPDX-License-Identifier: BSD-3-Clause
/* Copyright 2022, Intel Corporation */

#include <random>

#include "libpmemstream.h"
#include "rapidcheck_helpers.hpp"
#include "stream_helpers.hpp"
Expand All @@ -11,6 +13,31 @@
* timestamp - unit test for testing method pmemstream_entry_timestamp()
*/

void multithreaded_asynchronous_append(pmemstream_test_base &stream, const std::vector<pmemstream_region> &regions,
const std::vector<std::vector<std::string>> &data)
{
using future_type = decltype(stream.helpers.async_append(regions[0], data[0]));
std::vector<std::vector<future_type>> futures(data.size());

parallel_exec(data.size(), [&](size_t thread_id) {
for (auto &chunk : data) {
futures[thread_id].emplace_back(stream.helpers.async_append(regions[thread_id], chunk));
}
});

for (auto &future_sequence : futures) {
std::mt19937_64 g(*rc::gen::arbitrary<size_t>());
std::shuffle(future_sequence.begin(), future_sequence.end(), g);
}

parallel_exec(data.size(), [&](size_t thread_id) {
for (auto &fut : futures[thread_id]) {
while (fut.poll() != FUTURE_STATE_COMPLETE)
;
}
});
}

void multithreaded_synchronous_append(pmemstream_test_base &stream, const std::vector<pmemstream_region> &regions,
const std::vector<std::vector<std::string>> &data)
{
Expand All @@ -37,8 +64,7 @@ std::tuple<std::vector<pmemstream_region>, size_t> generate_and_append_data(pmem
concurrency_level, rc::gen::arbitrary<std::vector<std::string>>());

if (async) {
// XXX: multithreaded_asynchronous_append(stream, regions, data);
UT_ASSERT(false);
multithreaded_asynchronous_append(stream, regions, data);
} else {
multithreaded_synchronous_append(stream, regions, data);
}
Expand All @@ -52,9 +78,9 @@ std::tuple<std::vector<pmemstream_region>, size_t> generate_and_append_data(pmem
}

size_t remove_random_region(pmemstream_with_multi_empty_regions &stream, std::vector<pmemstream_region> &regions,
size_t concurrency_level)
test_config_type &config)
{
size_t pos = *rc::gen::inRange<size_t>(0, concurrency_level);
size_t pos = *rc::gen::inRange<size_t>(0, get_concurrency_level(config, regions));
auto region_to_remove = regions[pos];
auto region_size = stream.sut.region_size(region_to_remove);
UT_ASSERTeq(stream.helpers.remove_region(region_to_remove.offset), 0);
Expand Down Expand Up @@ -104,7 +130,7 @@ int main(int argc, char *argv[])
generate_and_append_data(stream, test_config, false /* sync */);
auto concurrency_level = get_concurrency_level(test_config, regions);

auto region_size = remove_random_region(stream, regions, concurrency_level);
auto region_size = remove_random_region(stream, regions, test_config);

/* Global ordering validation. */
if (regions.size() >= 1)
Expand All @@ -115,6 +141,59 @@ int main(int argc, char *argv[])

regions.push_back(stream.helpers.initialize_single_region(region_size, extra_data));

UT_ASSERTeq(stream.helpers.get_entries_from_regions(regions).size(),
elements * (concurrency_level - 1) + extra_data.size());
UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps(regions));
});

ret += rc::check("timestamp values should increase in each region after asynchronous append",
[&](pmemstream_with_multi_empty_regions &&stream) {
auto [regions, elements] =
generate_and_append_data(stream, test_config, true /* async */);

/* Single region ordering validation. */
for (auto &region : regions) {
UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps({region}));
}
});

ret += rc::check(
"timestamp values should globally increase in multi-region environment after asynchronous append",
[&](pmemstream_with_multi_empty_regions &&stream) {
auto [regions, elements] =
generate_and_append_data(stream, test_config, true /* async */);

/* Global ordering validation */
UT_ASSERT(stream.helpers.validate_timestamps_no_gaps(regions));
});

ret += rc::check(
"timestamp values should globally increase in multi-region environment after asynchronous append to respawned region",
[&](pmemstream_with_multi_empty_regions &&stream, const std::vector<std::string> &extra_data) {
RC_PRE(extra_data.size() > 0);
auto [regions, elements] =
generate_and_append_data(stream, test_config, true /* async */);
auto concurrency_level = get_concurrency_level(test_config, regions);

auto region_size = remove_random_region(stream, regions, test_config);

/* Global ordering validation. */
if (regions.size() >= 1)
UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps(regions));

UT_ASSERTeq(stream.helpers.get_entries_from_regions(regions).size(),
elements * (concurrency_level - 1));

{
auto [ret, region] = stream.sut.region_allocate(region_size);
UT_ASSERTeq(ret, 0);
regions.push_back(region);

auto future = stream.helpers.async_append(region, extra_data);
while (future.poll() != FUTURE_STATE_COMPLETE)
;
}

UT_ASSERTeq(stream.helpers.get_entries_from_regions(regions).size(),
elements * (concurrency_level - 1) + extra_data.size());
UT_ASSERT(stream.helpers.validate_timestamps_possible_gaps(regions));
Expand Down