Skip to content

Commit

Permalink
Add Storage API to check for existence of a key matching a predicate (#…
Browse files Browse the repository at this point in the history
…1762)

Useful for enterprise (where we want to check if op logs exist on
secondaries as a safety guard).

Remove `test_mongo_storage.cpp` - it just returned immediately and was a
maintenance burden to keep it compiling.

The important API changes are in `storage.hpp`.

We add a new kind of `IterateTypeVisitor` - an `IterateTypePredicate`.
We change the internal `do_iterate_type` functions to take the predicate
instead of the visitor. We change the public `iterate_type` to coerce
its `IterateTypeVisitor` in to a degenerate `IterateTypePredicate` that
never matches anything, so this call still iterates all keys.

Finally we add `test_storage_operations.cpp` to test the new API across
all the supported storage backends.
  • Loading branch information
poodlewars authored Aug 16, 2024
1 parent 7f152ce commit 83deb98
Show file tree
Hide file tree
Showing 23 changed files with 400 additions and 353 deletions.
7 changes: 4 additions & 3 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -764,13 +764,13 @@ if(${TEST})
processing/test/test_set_membership.cpp
processing/test/test_signed_unsigned_comparison.cpp
processing/test/test_type_comparison.cpp
storage/test/test_embedded.cpp
storage/test/test_local_storages.cpp
storage/test/test_memory_storage.cpp
storage/test/test_mongo_storage.cpp
storage/test/test_s3_storage.cpp
storage/test/test_storage_factory.cpp
storage/test/test_storage_exceptions.cpp
storage/test/test_azure_storage.cpp
storage/test/test_storage_operations.cpp
stream/test/stream_test_common.cpp
stream/test/test_aggregator.cpp
stream/test/test_append_map.cpp
Expand Down Expand Up @@ -806,7 +806,8 @@ if(${TEST})
version/test/test_sorting_info_state_machine.cpp
version/test/version_map_model.hpp
storage/test/common.hpp
version/test/test_sort_index.cpp)
version/test/test_sort_index.cpp
)

set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755

Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/entity/variant_key.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ using VariantKey = std::variant<entity::AtomKey, entity::RefKey>;

using IterateTypeVisitor = std::function<void(VariantKey &&key)>;

using IterateTypePredicate = std::function<bool(VariantKey &&key)>;

// Aliases to clarify usage and allow more detailed typing in the future, similar to aliases for AtomKey:
/** Should be a SNAPSHOT_REF key or the legacy SNAPSHOT AtomKey. */
using SnapshotVariantKey = VariantKey;
Expand Down
25 changes: 12 additions & 13 deletions cpp/arcticdb/storage/azure/azure_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,16 +236,14 @@ std::string prefix_handler(const std::string& prefix, const std::string& key_typ
return !prefix.empty() ? fmt::format("{}/{}*{}", key_type_dir, key_descriptor, prefix) : key_type_dir;
}

template<class KeyBucketizer>
void do_iterate_type_impl(KeyType key_type,
const IterateTypeVisitor& visitor,
bool do_iterate_type_impl(KeyType key_type,
const IterateTypePredicate& visitor,
const std::string& root_folder,
AzureClientWrapper& azure_client,
KeyBucketizer&& bucketizer,
const std::string& prefix = std::string{}) {
ARCTICDB_SAMPLE(AzureStorageIterateType, 0)
auto key_type_dir = key_type_folder(root_folder, key_type);
const auto path_to_key_size = key_type_dir.size() + 1 + bucketizer.bucketize_length(key_type);
const auto path_to_key_size = key_type_dir.size() + 1;
// if prefix is empty, add / to avoid matching both log and logc when key_type_dir is {root_folder}/log
if (prefix.empty()) {
key_type_dir += "/";
Expand All @@ -266,7 +264,9 @@ void do_iterate_type_impl(KeyType key_type,
key_type);
ARCTICDB_DEBUG(log::storage(), "Iterating key {}: {}", variant_key_type(k), variant_key_view(k));
ARCTICDB_SUBSAMPLE(AzureStorageVisitKey, 0)
visitor(std::move(k));
if (visitor(std::move(k))) {
return true;
}
ARCTICDB_SUBSAMPLE(AzureStorageCursorNext, 0)
}
}
Expand All @@ -278,16 +278,15 @@ void do_iterate_type_impl(KeyType key_type,
static_cast<int>(e.StatusCode),
e.ReasonPhrase);
}
return false;
}

template<class KeyBucketizer>
bool do_key_exists_impl(
const VariantKey& key,
const std::string& root_folder,
AzureClientWrapper& azure_client,
KeyBucketizer&& bucketizer) {
AzureClientWrapper& azure_client) {
auto key_type_dir = key_type_folder(root_folder, variant_key_type(key));
auto blob_name = object_path(bucketizer.bucketize(key_type_dir, key), key);
auto blob_name = object_path(key_type_dir, key);
try {
return azure_client.blob_exists(blob_name);
}
Expand Down Expand Up @@ -323,12 +322,12 @@ void AzureStorage::do_remove(Composite<VariantKey>&& ks, RemoveOpts) {
detail::do_remove_impl(std::move(ks), root_folder_, *azure_client_, FlatBucketizer{}, request_timeout_);
}

void AzureStorage::do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string &prefix) {
detail::do_iterate_type_impl(key_type, visitor, root_folder_, *azure_client_, FlatBucketizer{}, prefix);
bool AzureStorage::do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) {
return detail::do_iterate_type_impl(key_type, visitor, root_folder_, *azure_client_, prefix);
}

bool AzureStorage::do_key_exists(const VariantKey& key) {
return detail::do_key_exists_impl(key, root_folder_, *azure_client_, FlatBucketizer{});
return detail::do_key_exists_impl(key, root_folder_, *azure_client_);
}

} // namespace azure
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/azure/azure_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class AzureStorage final : public Storage {

void do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) final;

void do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string &prefix) final;
bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final;

bool do_key_exists(const VariantKey& key) final;

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/file/mapped_file_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ uint8_t* MappedFileStorage::do_read_raw(size_t offset, size_t bytes) {
return file_.data() + offset;
}

void MappedFileStorage::do_iterate_type(KeyType, const IterateTypeVisitor&, const std::string&) {
bool MappedFileStorage::do_iterate_type_until_match(KeyType, const IterateTypePredicate&, const std::string&) {
util::raise_rte("Iterate type not implemented for file storage");
}

Expand Down
6 changes: 3 additions & 3 deletions cpp/arcticdb/storage/file/mapped_file_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ class MappedFileStorage final : public SingleFileStorage {

void do_write(Composite<KeySegmentPair>&& kvs) override;

[[noreturn]] void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) override;
void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) override;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts opts) override;

[[noreturn]] void do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) override;
void do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) override;

bool do_supports_prefix_matching() const override {
return false;
Expand All @@ -50,7 +50,7 @@ class MappedFileStorage final : public SingleFileStorage {

bool do_fast_delete() override;

[[noreturn]] void do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string &prefix) override;
bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) override;

bool do_key_exists(const VariantKey & key) override;

Expand Down
7 changes: 5 additions & 2 deletions cpp/arcticdb/storage/lmdb/lmdb_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ bool LmdbStorage::do_fast_delete() {
return true;
}

void LmdbStorage::do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string &prefix) {
bool LmdbStorage::do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) {
ARCTICDB_SAMPLE(LmdbStorageItType, 0);
auto txn = ::lmdb::txn::begin(env(), nullptr, MDB_RDONLY); // scoped abort on
std::string type_db = fmt::format("{}", key_type);
Expand All @@ -270,11 +270,14 @@ void LmdbStorage::do_iterate_type(KeyType key_type, const IterateTypeVisitor& vi
auto keys = lmdb_client_->list(type_db, prefix, txn, dbi, key_type);
for (auto &k: keys) {
ARCTICDB_SUBSAMPLE(LmdbStorageVisitKey, 0)
visitor(std::move(k));
if(visitor(std::move(k))) {
return true;
}
}
} catch (const ::lmdb::error& ex) {
raise_lmdb_exception(ex, type_db);
}
return false;
}

bool LmdbStorage::do_is_path_valid(const std::string_view pathString ARCTICDB_UNUSED) const {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/lmdb/lmdb_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class LmdbStorage final : public Storage {

void cleanup() override;

void do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string &prefix) final;
bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final;

bool do_key_exists(const VariantKey & key) final;

Expand Down
7 changes: 5 additions & 2 deletions cpp/arcticdb/storage/memory/memory_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void add_serialization_fields(KeySegmentPair& kv) {
return true;
}

void MemoryStorage::do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string& prefix) {
bool MemoryStorage::do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string& prefix) {
ARCTICDB_SAMPLE(MemoryStorageItType, 0)
auto& key_vec = data_[key_type];
auto prefix_matcher = stream_id_prefix_matcher(prefix);
Expand All @@ -151,9 +151,12 @@ void add_serialization_fields(KeySegmentPair& kv) {
auto key = key_value.first;

if (prefix_matcher(variant_key_id(key))) {
visitor(std::move(key));
if (visitor(std::move(key))) {
return true;
}
}
}
return false;
}

MemoryStorage::MemoryStorage(const LibraryPath &library_path, OpenMode mode, const Config&) :
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/memory/memory_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace arcticdb::storage::memory {

inline bool do_fast_delete() final;

void do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string & prefix) final;
bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string & prefix) final;

std::string do_key_path(const VariantKey&) const final { return {}; };

Expand Down
8 changes: 5 additions & 3 deletions cpp/arcticdb/storage/mongo/mongo_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,8 @@ void MongoStorage::do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) {
}
}

void MongoStorage::do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string &prefix) {
bool MongoStorage::do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) {
auto collection = collection_name(key_type);
auto func = folly::Function<void(entity::VariantKey&&)>(visitor);
ARCTICDB_SAMPLE(MongoStorageItType, 0)
std::vector<VariantKey> keys;
try {
Expand All @@ -212,8 +211,11 @@ void MongoStorage::do_iterate_type(KeyType key_type, const IterateTypeVisitor& v
ex.what());
}
for (auto &key : keys) {
func(std::move(key));
if (visitor(std::move(key))) {
return true;
}
}
return false;
}

bool MongoStorage::do_key_exists(const VariantKey& key) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/mongo/mongo_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MongoStorage final : public Storage {

inline bool do_fast_delete() final;

void do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string &prefix) final;
bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final;

std::string do_key_path(const VariantKey&) const final { return {}; };

Expand Down
27 changes: 15 additions & 12 deletions cpp/arcticdb/storage/s3/detail-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,16 @@ namespace s3 {
};
}

template<class Visitor, class KeyBucketizer, class PrefixHandler>
void do_iterate_type_impl(KeyType key_type,
Visitor &&visitor,
const std::string &root_folder,
const std::string &bucket_name,
const S3ClientWrapper &s3_client,
KeyBucketizer &&bucketizer,
PrefixHandler &&prefix_handler = default_prefix_handler(),
const std::string &prefix = std::string{}
) {
template<class KeyBucketizer, class PrefixHandler>
bool do_iterate_type_impl(
KeyType key_type,
const IterateTypePredicate &visitor,
const std::string &root_folder,
const std::string &bucket_name,
const S3ClientWrapper &s3_client,
KeyBucketizer &&bucketizer,
PrefixHandler &&prefix_handler = default_prefix_handler(),
const std::string &prefix = std::string{}) {
ARCTICDB_SAMPLE(S3StorageIterateType, 0)
auto key_type_dir = key_type_folder(root_folder, key_type);
const auto path_to_key_size = key_type_dir.size() + 1 + bucketizer.bucketize_length(key_type);
Expand Down Expand Up @@ -320,7 +320,9 @@ namespace s3 {
ARCTICDB_DEBUG(log::storage(), "Iterating key {}: {}", variant_key_type(k),
variant_key_view(k));
ARCTICDB_SUBSAMPLE(S3StorageVisitKey, 0)
visitor(std::move(k));
if (visitor(std::move(k))) {
return true;
}
ARCTICDB_SUBSAMPLE(S3StorageCursorNext, 0)
}

Expand All @@ -334,9 +336,10 @@ namespace s3 {
// We don't raise on expected errors like NoSuchKey because we want to return an empty list
// instead of raising.
raise_if_unexpected_error(error, key_prefix);
return;
return false;
}
} while (continuation_token.has_value());
return false;
}

template<class KeyBucketizer>
Expand Down
13 changes: 8 additions & 5 deletions cpp/arcticdb/storage/s3/nfs_backed_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,14 @@ void NfsBackedStorage::do_remove(Composite<VariantKey>&& ks, RemoveOpts) {
s3::detail::do_remove_impl(std::move(enc), root_folder_, bucket_name_, *s3_client_, NfsBucketizer{});
}

void NfsBackedStorage::do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string& prefix) {
auto func = [&v = visitor, prefix=prefix] (VariantKey&& k) mutable {
bool NfsBackedStorage::do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string& prefix) {
const IterateTypePredicate func = [&v = visitor, prefix=prefix] (VariantKey&& k) {
auto key = unencode_object_id(k);
if(prefix.empty() || variant_key_id(key) == StreamId{prefix})
v(std::move(key));
if(prefix.empty() || variant_key_id(key) == StreamId{prefix}) {
return v(std::move(key));
} else {
return false;
}
};

auto prefix_handler = [] (const std::string& prefix, const std::string& key_type_dir, const KeyDescriptor&, KeyType key_type) {
Expand All @@ -173,7 +176,7 @@ void NfsBackedStorage::do_iterate_type(KeyType key_type, const IterateTypeVisito
return !prefix.empty() ? fmt::format("{}/{}", key_type_dir, new_prefix) : key_type_dir;
};

s3::detail::do_iterate_type_impl(key_type, std::move(func), root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}, prefix_handler, prefix);
return s3::detail::do_iterate_type_impl(key_type, func, root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}, prefix_handler, prefix);
}

bool NfsBackedStorage::do_key_exists(const VariantKey& key) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/s3/nfs_backed_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class NfsBackedStorage final : public Storage {

void do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) final;

void do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string &prefix) final;
bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final;

bool do_key_exists(const VariantKey& key) final;

Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/storage/s3/s3_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ void S3Storage::do_remove(Composite<VariantKey>&& ks, RemoveOpts) {
detail::do_remove_impl(std::move(ks), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{});
}

void S3Storage::do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string& prefix) {
bool S3Storage::do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string& prefix) {
auto prefix_handler = [] (const std::string& prefix, const std::string& key_type_dir, const KeyDescriptor& key_descriptor, KeyType) {
return !prefix.empty() ? fmt::format("{}/{}*{}", key_type_dir, key_descriptor, prefix) : key_type_dir;
};

detail::do_iterate_type_impl(key_type, visitor, root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}, std::move(prefix_handler), prefix);
return detail::do_iterate_type_impl(key_type, visitor, root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}, std::move(prefix_handler), prefix);
}

bool S3Storage::do_key_exists(const VariantKey& key) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/s3/s3_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class S3Storage final : public Storage {

void do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) final;

void do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string &prefix) final;
bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final;

bool do_key_exists(const VariantKey& key) final;

Expand Down
14 changes: 12 additions & 2 deletions cpp/arcticdb/storage/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,16 @@ class Storage {
return do_key_exists(key);
}

bool scan_for_matching_key(KeyType key_type, const IterateTypePredicate& predicate) {
return do_iterate_type_until_match(key_type, predicate, std::string());
}

void iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string &prefix = std::string()) {
do_iterate_type(key_type, visitor, prefix);
const IterateTypePredicate predicate_visitor = [&visitor](VariantKey&& k) {
visitor(std::move(k));
return false; // keep applying the visitor no matter what
};
do_iterate_type_until_match(key_type, predicate_visitor, prefix);
}

std::string key_path(const VariantKey& key) const {
Expand Down Expand Up @@ -190,7 +198,9 @@ class Storage {

virtual bool do_fast_delete() = 0;

virtual void do_iterate_type(KeyType key_type, const IterateTypeVisitor& visitor, const std::string & prefix) = 0;
// Stop iteration and return true upon the first key k for which visitor(k) is true, return false if no key matches
// the predicate.
virtual bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string & prefix) = 0;

virtual std::string do_key_path(const VariantKey& key) const = 0;

Expand Down
Loading

0 comments on commit 83deb98

Please sign in to comment.