-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds a type change utility for append data keys in lib_tool #1932
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,14 +18,21 @@ | |
#include <arcticdb/util/key_utils.hpp> | ||
#include <arcticdb/util/variant.hpp> | ||
#include <arcticdb/version/version_utils.hpp> | ||
#include <arcticdb/stream/append_map.hpp> | ||
#include <cstdlib> | ||
|
||
namespace arcticdb::toolbox::apy { | ||
|
||
using namespace arcticdb::entity; | ||
|
||
LibraryTool::LibraryTool(std::shared_ptr<storage::Library> lib) { | ||
store_ = std::make_shared<async::AsyncStore<util::SysClock>>(lib, codec::default_lz4_codec(), encoding_version(lib->config())); | ||
LibraryTool::LibraryTool(std::shared_ptr<storage::Library> lib): engine_(lib, util::SysClock()) {} | ||
|
||
std::shared_ptr<Store> LibraryTool::store() { | ||
return engine_._test_get_store(); | ||
} | ||
|
||
async::AsyncStore<>& LibraryTool::async_store() { | ||
return dynamic_cast<async::AsyncStore<>&>(*store()); | ||
} | ||
|
||
ReadResult LibraryTool::read(const VariantKey& key) { | ||
|
@@ -42,46 +49,75 @@ ReadResult LibraryTool::read(const VariantKey& key) { | |
} | ||
|
||
Segment LibraryTool::read_to_segment(const VariantKey& key) { | ||
auto kv = store_->read_compressed_sync(key, storage::ReadKeyOpts{}); | ||
auto kv = store()->read_compressed_sync(key, storage::ReadKeyOpts{}); | ||
util::check(kv.has_segment(), "Failed to read key: {}", key); | ||
kv.segment().force_own_buffer(); | ||
return std::move(kv.segment()); | ||
} | ||
|
||
std::optional<google::protobuf::Any> LibraryTool::read_metadata(const VariantKey& key){ | ||
return store_->read_metadata(key, storage::ReadKeyOpts{}).get().second; | ||
return store()->read_metadata(key, storage::ReadKeyOpts{}).get().second; | ||
} | ||
|
||
StreamDescriptor LibraryTool::read_descriptor(const VariantKey& key){ | ||
auto metadata_and_descriptor = store_->read_metadata_and_descriptor(key, storage::ReadKeyOpts{}).get(); | ||
auto metadata_and_descriptor = store()->read_metadata_and_descriptor(key, storage::ReadKeyOpts{}).get(); | ||
return std::get<StreamDescriptor>(metadata_and_descriptor); | ||
} | ||
|
||
TimeseriesDescriptor LibraryTool::read_timeseries_descriptor(const VariantKey& key){ | ||
return store_->read_timeseries_descriptor(key).get().second; | ||
return store()->read_timeseries_descriptor(key).get().second; | ||
} | ||
|
||
void LibraryTool::write(VariantKey key, Segment& segment) { | ||
storage::KeySegmentPair kv{std::move(key), std::move(segment)}; | ||
store_->write_compressed_sync(kv); | ||
store()->write_compressed_sync(kv); | ||
} | ||
|
||
void LibraryTool::overwrite_segment_in_memory(VariantKey key, SegmentInMemory& segment_in_memory) { | ||
auto segment = encode_dispatch(std::move(segment_in_memory), *(async_store().codec_), async_store().encoding_version_); | ||
remove(key); | ||
write(key, segment); | ||
} | ||
|
||
SegmentInMemory LibraryTool::overwrite_append_data( | ||
VariantKey key, | ||
const py::tuple &item, | ||
const py::object &norm, | ||
const py::object & user_meta) { | ||
if (!std::holds_alternative<AtomKey>(key) || std::get<AtomKey>(key).type() != KeyType::APPEND_DATA) { | ||
throw_error<ErrorCode::E_INVALID_USER_ARGUMENT>(fmt::format("Can only override APPEND_DATA keys. Received: {}", key)); | ||
} | ||
auto old_segment = read_to_segment(key); | ||
auto old_segment_in_memory = decode_segment(std::move(old_segment)); | ||
const auto& tsd = old_segment_in_memory.index_descriptor(); | ||
std::optional<AtomKey> next_key = std::nullopt; | ||
if (tsd.proto().has_next_key()){ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does the testing for the next key logic work? Given that ArcticDB doesn't write it? Or am I wrong and normal (non streaming) incompletes also have the linked list structure? |
||
next_key = key_from_proto(tsd.proto().next_key()); | ||
} | ||
|
||
auto stream_id = util::variant_match(key, [](const auto& key){return key.id();}); | ||
auto frame = convert::py_ndf_to_frame(stream_id, item, norm, user_meta, engine_.cfg().write_options().empty_types()); | ||
auto segment_in_memory = incomplete_segment_from_frame(frame, 0, std::move(next_key), engine_.cfg().write_options().allow_sparse()); | ||
overwrite_segment_in_memory(key, segment_in_memory); | ||
return old_segment_in_memory; | ||
} | ||
|
||
bool LibraryTool::key_exists(const VariantKey& key) { | ||
return store_->key_exists_sync(key); | ||
return store()->key_exists_sync(key); | ||
} | ||
|
||
void LibraryTool::remove(VariantKey key) { | ||
store_->remove_key_sync(std::move(key), storage::RemoveOpts{}); | ||
store()->remove_key_sync(std::move(key), storage::RemoveOpts{}); | ||
} | ||
|
||
void LibraryTool::clear_ref_keys() { | ||
delete_all_keys_of_type(KeyType::SNAPSHOT_REF, store_, false); | ||
delete_all_keys_of_type(KeyType::SNAPSHOT_REF, store(), false); | ||
} | ||
|
||
std::vector<VariantKey> LibraryTool::find_keys(entity::KeyType kt) { | ||
std::vector<VariantKey> res; | ||
|
||
store_->iterate_type(kt, [&](VariantKey &&found_key) { | ||
store()->iterate_type(kt, [&](VariantKey &&found_key) { | ||
res.emplace_back(found_key); | ||
}, ""); | ||
return res; | ||
|
@@ -94,12 +130,12 @@ int LibraryTool::count_keys(entity::KeyType kt) { | |
count++; | ||
}; | ||
|
||
store_->iterate_type(kt, visitor, ""); | ||
store()->iterate_type(kt, visitor, ""); | ||
return count; | ||
} | ||
|
||
std::vector<bool> LibraryTool::batch_key_exists(const std::vector<VariantKey>& keys) { | ||
auto key_exists_fut = store_->batch_key_exists(keys); | ||
auto key_exists_fut = store()->batch_key_exists(keys); | ||
return folly::collect(key_exists_fut).get(); | ||
} | ||
|
||
|
@@ -116,12 +152,12 @@ std::vector<VariantKey> LibraryTool::find_keys_for_id(entity::KeyType kt, const | |
} | ||
}; | ||
|
||
store_->iterate_type(kt, visitor, string_id); | ||
store()->iterate_type(kt, visitor, string_id); | ||
return res; | ||
} | ||
|
||
std::string LibraryTool::get_key_path(const VariantKey& key) { | ||
return store_->key_path(key); | ||
return async_store().key_path(key); | ||
} | ||
|
||
std::optional<std::string> LibraryTool::inspect_env_variable(std::string name){ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,8 @@ | |
|
||
from arcticdb.util.test import sample_dataframe, populate_db, assert_frame_equal | ||
from arcticdb_ext.storage import KeyType | ||
from arcticdb_ext.types import DataType | ||
from arcticdb_ext.exceptions import SchemaException | ||
|
||
|
||
def get_ref_key_types(): | ||
|
@@ -230,3 +232,92 @@ def iterate_through_version_chain(key): | |
assert len(keys_by_key_type[KeyType.TABLE_DATA]) == (num_versions-1) % 3 + 1 | ||
assert len(keys_by_key_type[KeyType.TOMBSTONE_ALL]) == num_versions // 3 | ||
|
||
|
||
def test_overwrite_append_data(object_and_mem_and_lmdb_version_store): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why all 3 stores? Why not use one of the _v1 fixtures to avoid the skip below? |
||
lib = object_and_mem_and_lmdb_version_store | ||
if lib._lib_cfg.lib_desc.version.encoding_version == 1: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just checking this is definitely zero indexed and we aren't skipping on v1 encoding by mistake? |
||
# TODO: Fix the timeseries descriptor packing. Currently the [incomplete_segment_from_frame] function in cpp is | ||
# not encoding aware so all incomplete writes are broken with v2 encoding. | ||
pytest.xfail("Writing the timeseries descriptor for incompletes is currently broken with encoding v2") | ||
lib_tool = lib.library_tool() | ||
sym = "sym" | ||
|
||
def get_df(num_rows, start_index, col_type): | ||
start_date = pd.Timestamp(2024, 1, 1) + pd.Timedelta(start_index, unit="d") | ||
index = pd.date_range(start_date, periods=num_rows) | ||
df = pd.DataFrame({"col": range(start_index, num_rows+start_index) , "other": range(start_index, num_rows+start_index)}, index=index) | ||
# Streaming data has a named index | ||
df.index.name = "time" | ||
return df.astype({"col": col_type}) | ||
|
||
# Deliberately write mismatching incomplete types | ||
lib.write(sym, get_df(3, 0, np.int64)) | ||
lib.write(sym, get_df(1, 3, np.int64), incomplete=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test where all the incompletes are the wrong type? |
||
lib.write(sym, get_df(1, 4, str), incomplete=True) | ||
lib.write(sym, get_df(1, 5, np.int64), incomplete=True) | ||
|
||
def read_append_data_keys_from_ref(symbol): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These helper functions could be added to the library_tool api but I decided against it because it would require frequent tweaks (e.g. if you want to load only the last 20). So instead after this is merged I'll include some examples in the lib tool docs. |
||
nonlocal lib_tool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't need the nonlocal do you? Given that you aren't assigning to |
||
append_ref = lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, symbol)[0] | ||
append_data_keys = [] | ||
next_key = lib_tool.read_timeseries_descriptor(append_ref).next_key | ||
while next_key != None and lib_tool.key_exists(next_key): | ||
append_data_keys.append(next_key) | ||
next_key = lib_tool.read_timeseries_descriptor(next_key).next_key | ||
return append_data_keys | ||
|
||
def read_type(key, column): | ||
nonlocal lib_tool | ||
fields = lib_tool.read_descriptor(key).fields() | ||
for field in fields: | ||
if field.name == column: | ||
return field.type.data_type() | ||
return None | ||
|
||
# We assert that types are as we wrote them and we can't read or compact because of type mismatch | ||
append_keys = read_append_data_keys_from_ref(sym) | ||
assert len(append_keys) == 3 | ||
# Different storages use either fixed or dynamic strings | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, this is a good reason for the different backends 👍 |
||
str_dtype = DataType.UTF_DYNAMIC64 if lib_tool._nvs._resolve_dynamic_strings({}) else DataType.UTF_FIXED64 | ||
assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, str_dtype, DataType.INT64] | ||
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] | ||
with pytest.raises(SchemaException): | ||
lib.read(sym, incomplete=True, date_range=(pd.Timestamp(0), pd.Timestamp(2030, 1, 1))) | ||
with pytest.raises(SchemaException): | ||
lib.compact_incomplete(sym, append=True, convert_int_to_float=False, via_iteration=False) | ||
|
||
# We change the last append data key to string and verify it's now a string | ||
backout_segment = lib_tool.update_append_data_column_type(append_keys[0], "col", str) | ||
assert read_append_data_keys_from_ref(sym) == append_keys | ||
assert [read_type(key, "col") for key in append_keys] == [str_dtype, str_dtype, DataType.INT64] | ||
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] | ||
assert_frame_equal(lib_tool.read_to_dataframe(append_keys[0]), get_df(1, 5, str)) | ||
|
||
# We test that we can backout the change using the returned SegmentInMemory | ||
lib_tool.overwrite_segment_in_memory(append_keys[0], backout_segment) | ||
assert read_append_data_keys_from_ref(sym) == append_keys | ||
assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, str_dtype, DataType.INT64] | ||
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] | ||
assert_frame_equal(lib_tool.read_to_dataframe(append_keys[0]), get_df(1, 5, np.int64)) | ||
|
||
# And now make all append data keys ints which makes the symbol readable. | ||
lib_tool.update_append_data_column_type(append_keys[1], "col", np.int64) | ||
lib_tool.update_append_data_column_type(append_keys[2], "col", np.int64) # This should be idempotent | ||
assert read_append_data_keys_from_ref(sym) == append_keys | ||
assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] | ||
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] | ||
entire_symbol = lib.read(sym, incomplete=True, date_range=(pd.Timestamp(0), pd.Timestamp(2030, 1, 1))).data | ||
assert_frame_equal(entire_symbol, get_df(6, 0, np.int64)) | ||
|
||
# And test that we can overwrite with arbitrary data | ||
lib_tool.overwrite_append_data_with_dataframe(append_keys[0], get_df(10, 5, np.int64)) | ||
assert read_append_data_keys_from_ref(sym) == append_keys | ||
assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] | ||
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] | ||
entire_symbol = lib.read(sym, incomplete=True, date_range=(pd.Timestamp(0), pd.Timestamp(2030, 1, 1))).data | ||
assert_frame_equal(entire_symbol, get_df(15, 0, np.int64)) | ||
|
||
# And test that compaction now works with the new types | ||
lib.compact_incomplete(sym, append=True, convert_int_to_float=False, via_iteration=False) | ||
assert read_append_data_keys_from_ref(sym) == [] | ||
assert_frame_equal(lib.read(sym).data, get_df(15, 0, np.int64)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test for appending more incompletes (with the right types) and compacting? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
util::check
?