Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds a type change utility for append data keys in lib_tool #1932

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#include <arcticdb/processing/clause.hpp>
#include <arcticdb/storage/key_segment_pair.hpp>

namespace arcticdb::toolbox::apy{
class LibraryTool;
}

namespace arcticdb::async {

std::pair<VariantKey, std::optional<Segment>> lookup_match_in_dedup_map(
Expand Down Expand Up @@ -395,6 +399,7 @@ folly::Future<SliceAndKey> async_write(
}

private:
friend class arcticdb::toolbox::apy::LibraryTool;
std::shared_ptr<storage::Library> library_;
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_;
const EncodingVersion encoding_version_;
Expand Down
6 changes: 6 additions & 0 deletions cpp/arcticdb/stream/append_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ void append_incomplete(
const std::shared_ptr<pipelines::InputTensorFrame>& frame,
bool validate_index);

SegmentInMemory incomplete_segment_from_frame(
const std::shared_ptr<pipelines::InputTensorFrame>& frame,
size_t existing_rows,
std::optional<entity::AtomKey>&& prev_key,
bool allow_sparse);

std::optional<int64_t> latest_incomplete_timestamp(
const std::shared_ptr<Store>& store,
const StreamId& stream_id);
Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/stream/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <arcticdb/stream/stream_reader.hpp>
#include <arcticdb/stream/stream_writer.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/entity/protobuf_mappings.hpp>

namespace py = pybind11;

Expand Down Expand Up @@ -66,6 +67,8 @@ void register_types(py::module &m) {
DATA_TYPE(NANOSECONDS_UTC64)
DATA_TYPE(ASCII_FIXED64)
DATA_TYPE(ASCII_DYNAMIC64)
DATA_TYPE(UTF_FIXED64)
DATA_TYPE(UTF_DYNAMIC64)
#undef DATA_TYPE
;

Expand Down Expand Up @@ -128,6 +131,11 @@ void register_types(py::module &m) {
return self.index();
}).def_property_readonly("total_rows", [](const TimeseriesDescriptor& self) {
return self.total_rows();
}).def_property_readonly("next_key", [](const TimeseriesDescriptor& self) -> std::optional<AtomKey> {
if (self.proto().has_next_key()){
return key_from_proto(self.proto().next_key());
}
return std::nullopt;
});

py::class_<PyTimestampRange>(m, "TimestampRange")
Expand Down
66 changes: 51 additions & 15 deletions cpp/arcticdb/toolbox/library_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@
#include <arcticdb/util/key_utils.hpp>
#include <arcticdb/util/variant.hpp>
#include <arcticdb/version/version_utils.hpp>
#include <arcticdb/stream/append_map.hpp>
#include <cstdlib>

namespace arcticdb::toolbox::apy {

using namespace arcticdb::entity;

LibraryTool::LibraryTool(std::shared_ptr<storage::Library> lib) {
store_ = std::make_shared<async::AsyncStore<util::SysClock>>(lib, codec::default_lz4_codec(), encoding_version(lib->config()));
LibraryTool::LibraryTool(std::shared_ptr<storage::Library> lib): engine_(lib, util::SysClock()) {}

std::shared_ptr<Store> LibraryTool::store() {
return engine_._test_get_store();
}

async::AsyncStore<>& LibraryTool::async_store() {
return dynamic_cast<async::AsyncStore<>&>(*store());
}

ReadResult LibraryTool::read(const VariantKey& key) {
Expand All @@ -42,46 +49,75 @@ ReadResult LibraryTool::read(const VariantKey& key) {
}

Segment LibraryTool::read_to_segment(const VariantKey& key) {
auto kv = store_->read_compressed_sync(key, storage::ReadKeyOpts{});
auto kv = store()->read_compressed_sync(key, storage::ReadKeyOpts{});
util::check(kv.has_segment(), "Failed to read key: {}", key);
kv.segment().force_own_buffer();
return std::move(kv.segment());
}

std::optional<google::protobuf::Any> LibraryTool::read_metadata(const VariantKey& key){
return store_->read_metadata(key, storage::ReadKeyOpts{}).get().second;
return store()->read_metadata(key, storage::ReadKeyOpts{}).get().second;
}

StreamDescriptor LibraryTool::read_descriptor(const VariantKey& key){
auto metadata_and_descriptor = store_->read_metadata_and_descriptor(key, storage::ReadKeyOpts{}).get();
auto metadata_and_descriptor = store()->read_metadata_and_descriptor(key, storage::ReadKeyOpts{}).get();
return std::get<StreamDescriptor>(metadata_and_descriptor);
}

TimeseriesDescriptor LibraryTool::read_timeseries_descriptor(const VariantKey& key){
return store_->read_timeseries_descriptor(key).get().second;
return store()->read_timeseries_descriptor(key).get().second;
}

void LibraryTool::write(VariantKey key, Segment& segment) {
storage::KeySegmentPair kv{std::move(key), std::move(segment)};
store_->write_compressed_sync(kv);
store()->write_compressed_sync(kv);
}

void LibraryTool::overwrite_segment_in_memory(VariantKey key, SegmentInMemory& segment_in_memory) {
auto segment = encode_dispatch(std::move(segment_in_memory), *(async_store().codec_), async_store().encoding_version_);
remove(key);
write(key, segment);
}

SegmentInMemory LibraryTool::overwrite_append_data(
VariantKey key,
const py::tuple &item,
const py::object &norm,
const py::object & user_meta) {
if (!std::holds_alternative<AtomKey>(key) || std::get<AtomKey>(key).type() != KeyType::APPEND_DATA) {
throw_error<ErrorCode::E_INVALID_USER_ARGUMENT>(fmt::format("Can only override APPEND_DATA keys. Received: {}", key));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

util::check?

}
auto old_segment = read_to_segment(key);
auto old_segment_in_memory = decode_segment(std::move(old_segment));
const auto& tsd = old_segment_in_memory.index_descriptor();
std::optional<AtomKey> next_key = std::nullopt;
if (tsd.proto().has_next_key()){
Copy link
Collaborator

@poodlewars poodlewars Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the testing for the next key logic work? Given that ArcticDB doesn't write it? Or am I wrong and normal (non streaming) incompletes also have the linked list structure?

next_key = key_from_proto(tsd.proto().next_key());
}

auto stream_id = util::variant_match(key, [](const auto& key){return key.id();});
auto frame = convert::py_ndf_to_frame(stream_id, item, norm, user_meta, engine_.cfg().write_options().empty_types());
auto segment_in_memory = incomplete_segment_from_frame(frame, 0, std::move(next_key), engine_.cfg().write_options().allow_sparse());
overwrite_segment_in_memory(key, segment_in_memory);
return old_segment_in_memory;
}

bool LibraryTool::key_exists(const VariantKey& key) {
return store_->key_exists_sync(key);
return store()->key_exists_sync(key);
}

void LibraryTool::remove(VariantKey key) {
store_->remove_key_sync(std::move(key), storage::RemoveOpts{});
store()->remove_key_sync(std::move(key), storage::RemoveOpts{});
}

void LibraryTool::clear_ref_keys() {
delete_all_keys_of_type(KeyType::SNAPSHOT_REF, store_, false);
delete_all_keys_of_type(KeyType::SNAPSHOT_REF, store(), false);
}

std::vector<VariantKey> LibraryTool::find_keys(entity::KeyType kt) {
std::vector<VariantKey> res;

store_->iterate_type(kt, [&](VariantKey &&found_key) {
store()->iterate_type(kt, [&](VariantKey &&found_key) {
res.emplace_back(found_key);
}, "");
return res;
Expand All @@ -94,12 +130,12 @@ int LibraryTool::count_keys(entity::KeyType kt) {
count++;
};

store_->iterate_type(kt, visitor, "");
store()->iterate_type(kt, visitor, "");
return count;
}

std::vector<bool> LibraryTool::batch_key_exists(const std::vector<VariantKey>& keys) {
auto key_exists_fut = store_->batch_key_exists(keys);
auto key_exists_fut = store()->batch_key_exists(keys);
return folly::collect(key_exists_fut).get();
}

Expand All @@ -116,12 +152,12 @@ std::vector<VariantKey> LibraryTool::find_keys_for_id(entity::KeyType kt, const
}
};

store_->iterate_type(kt, visitor, string_id);
store()->iterate_type(kt, visitor, string_id);
return res;
}

std::string LibraryTool::get_key_path(const VariantKey& key) {
return store_->key_path(key);
return async_store().key_path(key);
}

std::optional<std::string> LibraryTool::inspect_env_variable(std::string name){
Expand Down
12 changes: 8 additions & 4 deletions cpp/arcticdb/toolbox/library_tool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <arcticdb/storage/library_manager.hpp>
#include <arcticdb/async/async_store.hpp>
#include <arcticdb/entity/read_result.hpp>
#include <arcticdb/version/local_versioned_engine.hpp>

#include <memory>

Expand Down Expand Up @@ -46,6 +47,10 @@ class LibraryTool {

void write(VariantKey key, Segment& segment);

void overwrite_segment_in_memory(VariantKey key, SegmentInMemory& segment_in_memory);

SegmentInMemory overwrite_append_data(VariantKey key, const py::tuple &item, const py::object &norm, const py::object & user_meta);

void remove(VariantKey key);

std::vector<VariantKey> find_keys(arcticdb::entity::KeyType);
Expand All @@ -67,10 +72,9 @@ class LibraryTool {
static py::object read_unaltered_lib_cfg(const storage::LibraryManager& lib_manager, std::string lib_name);

private:
// TODO: Remove the shared_ptr and just keep the store.
// The only reason we use a shared_ptr for the store is to be able to pass it to delete_all_keys_of_type.
// We can remove the shared_ptr when delete_all_keys_of_type takes a const ref instead of a shared pointer.
std::shared_ptr<arcticdb::async::AsyncStore<util::SysClock>> store_;
std::shared_ptr<Store> store();
async::AsyncStore<>& async_store();
version_store::LocalVersionedEngine engine_;
};

} //namespace arcticdb::toolbox::apy
2 changes: 2 additions & 0 deletions cpp/arcticdb/toolbox/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ void register_bindings(py::module &m) {
E.g. an Index key for a symbol which has columns "index" and "col" will have <FieldRef>s for those columns.
)pbdoc")
.def("write", &LibraryTool::write)
.def("overwrite_segment_in_memory", &LibraryTool::overwrite_segment_in_memory)
.def("overwrite_append_data", &LibraryTool::overwrite_append_data)
.def("remove", &LibraryTool::remove)
.def("find_keys", &LibraryTool::find_keys)
.def("count_keys", &LibraryTool::count_keys)
Expand Down
31 changes: 30 additions & 1 deletion python/arcticdb/toolbox/library_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from arcticdb_ext.stream import SegmentInMemory
from arcticdb_ext.tools import LibraryTool as LibraryToolImpl
from arcticdb_ext.version_store import AtomKey, PythonOutputFrame, RefKey
from arcticdb.version_store._normalization import denormalize_dataframe
from arcticdb.version_store._normalization import denormalize_dataframe, normalize_dataframe

VariantKey = Union[AtomKey, RefKey]
VersionQueryInput = Union[int, str, ExplicitlySupportedDates, None]
Expand Down Expand Up @@ -159,3 +159,32 @@ def read_index(self, symbol: str, as_of: Optional[VersionQueryInput] = None, **k
Pandas DataFrame representing the index key in a human-readable format.
"""
return self._nvs.read_index(symbol, as_of, **kwargs)

def normalize_dataframe_with_nvs_defaults(self, df : pd.DataFrame):
# TODO: Have a unified place where we resolve all the normalization parameters and use that here.
# Currently all these parameters are resolved in various places throughout the _store.py. This can result in
# different defaults for different operations which is not desirable.
write_options = self._nvs._lib_cfg.lib_desc.version.write_options
dynamic_schema = self._nvs.resolve_defaults("dynamic_schema", write_options, False)
empty_types = self._nvs.resolve_defaults("empty_types", write_options, False)
dynamic_strings = self._nvs._resolve_dynamic_strings({})
return normalize_dataframe(df, dynamic_schema=dynamic_schema, empty_types=empty_types, dynamic_strings=dynamic_strings)

def overwrite_append_data_with_dataframe(self, key : VariantKey, df : pd.DataFrame) -> SegmentInMemory:
"""
Overwrites the append data key with the provided dataframe. Use with extreme caution as overwriting with
inappropriate data can render the symbol unreadable.

Returns
-------
SegmentInMemory backup of what was stored in the key before it was overwritten. Can be used with
lib_tool.overwrite_segment_in_memory to back out the change caused by this in case data ends up corrupted.
"""
item, norm_meta = self.normalize_dataframe_with_nvs_defaults(df)
return self.overwrite_append_data(key, item, norm_meta, None)

def update_append_data_column_type(self, key : VariantKey, column : str, to_type : type) -> SegmentInMemory:
old_df = self.read_to_dataframe(key)
assert column in old_df.columns
new_df = old_df.astype({column: to_type})
return self.overwrite_append_data_with_dataframe(key, new_df)
3 changes: 3 additions & 0 deletions python/arcticdb/version_store/_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,9 @@ def denormalize_dataframe(ret):

return DataFrameNormalizer().denormalize(frame_data, read_result.norm.df)

def normalize_dataframe(df, **kwargs):
return DataFrameNormalizer().normalize(df, **kwargs)


T = TypeVar("T", bound=Union[pd.DataFrame, pd.Series])

Expand Down
91 changes: 91 additions & 0 deletions python/tests/integration/toolbox/test_library_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from arcticdb.util.test import sample_dataframe, populate_db, assert_frame_equal
from arcticdb_ext.storage import KeyType
from arcticdb_ext.types import DataType
from arcticdb_ext.exceptions import SchemaException


def get_ref_key_types():
Expand Down Expand Up @@ -230,3 +232,92 @@ def iterate_through_version_chain(key):
assert len(keys_by_key_type[KeyType.TABLE_DATA]) == (num_versions-1) % 3 + 1
assert len(keys_by_key_type[KeyType.TOMBSTONE_ALL]) == num_versions // 3


def test_overwrite_append_data(object_and_mem_and_lmdb_version_store):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why all 3 stores? Why not use one of the _v1 fixtures to avoid the skip below?

lib = object_and_mem_and_lmdb_version_store
if lib._lib_cfg.lib_desc.version.encoding_version == 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking this is definitely zero indexed and we aren't skipping on v1 encoding by mistake?

# TODO: Fix the timeseries descriptor packing. Currently the [incomplete_segment_from_frame] function in cpp is
# not encoding aware so all incomplete writes are broken with v2 encoding.
pytest.xfail("Writing the timeseries descriptor for incompletes is currently broken with encoding v2")
lib_tool = lib.library_tool()
sym = "sym"

def get_df(num_rows, start_index, col_type):
start_date = pd.Timestamp(2024, 1, 1) + pd.Timedelta(start_index, unit="d")
index = pd.date_range(start_date, periods=num_rows)
df = pd.DataFrame({"col": range(start_index, num_rows+start_index) , "other": range(start_index, num_rows+start_index)}, index=index)
# Streaming data has a named index
df.index.name = "time"
return df.astype({"col": col_type})

# Deliberately write mismatching incomplete types
lib.write(sym, get_df(3, 0, np.int64))
lib.write(sym, get_df(1, 3, np.int64), incomplete=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test where all the incompletes are the wrong type?

lib.write(sym, get_df(1, 4, str), incomplete=True)
lib.write(sym, get_df(1, 5, np.int64), incomplete=True)

def read_append_data_keys_from_ref(symbol):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These helper functions could be added to the library_tool api but I decided against it because it would require frequent tweaks (e.g. if you want to load only the last 20). So instead after this is merged I'll include some examples in the lib tool docs.

nonlocal lib_tool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need the nonlocal do you? Given that you aren't assigning to lib_tool

append_ref = lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, symbol)[0]
append_data_keys = []
next_key = lib_tool.read_timeseries_descriptor(append_ref).next_key
while next_key != None and lib_tool.key_exists(next_key):
append_data_keys.append(next_key)
next_key = lib_tool.read_timeseries_descriptor(next_key).next_key
return append_data_keys

def read_type(key, column):
nonlocal lib_tool
fields = lib_tool.read_descriptor(key).fields()
for field in fields:
if field.name == column:
return field.type.data_type()
return None

# We assert that types are as we wrote them and we can't read or compact because of type mismatch
append_keys = read_append_data_keys_from_ref(sym)
assert len(append_keys) == 3
# Different storages use either fixed or dynamic strings
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is a good reason for the different backends 👍

str_dtype = DataType.UTF_DYNAMIC64 if lib_tool._nvs._resolve_dynamic_strings({}) else DataType.UTF_FIXED64
assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, str_dtype, DataType.INT64]
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64]
with pytest.raises(SchemaException):
lib.read(sym, incomplete=True, date_range=(pd.Timestamp(0), pd.Timestamp(2030, 1, 1)))
with pytest.raises(SchemaException):
lib.compact_incomplete(sym, append=True, convert_int_to_float=False, via_iteration=False)

# We change the last append data key to string and verify it's now a string
backout_segment = lib_tool.update_append_data_column_type(append_keys[0], "col", str)
assert read_append_data_keys_from_ref(sym) == append_keys
assert [read_type(key, "col") for key in append_keys] == [str_dtype, str_dtype, DataType.INT64]
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64]
assert_frame_equal(lib_tool.read_to_dataframe(append_keys[0]), get_df(1, 5, str))

# We test that we can backout the change using the returned SegmentInMemory
lib_tool.overwrite_segment_in_memory(append_keys[0], backout_segment)
assert read_append_data_keys_from_ref(sym) == append_keys
assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, str_dtype, DataType.INT64]
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64]
assert_frame_equal(lib_tool.read_to_dataframe(append_keys[0]), get_df(1, 5, np.int64))

# And now make all append data keys ints which makes the symbol readable.
lib_tool.update_append_data_column_type(append_keys[1], "col", np.int64)
lib_tool.update_append_data_column_type(append_keys[2], "col", np.int64) # This should be idempotent
assert read_append_data_keys_from_ref(sym) == append_keys
assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64]
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64]
entire_symbol = lib.read(sym, incomplete=True, date_range=(pd.Timestamp(0), pd.Timestamp(2030, 1, 1))).data
assert_frame_equal(entire_symbol, get_df(6, 0, np.int64))

# And test that we can overwrite with arbitrary data
lib_tool.overwrite_append_data_with_dataframe(append_keys[0], get_df(10, 5, np.int64))
assert read_append_data_keys_from_ref(sym) == append_keys
assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64]
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64]
entire_symbol = lib.read(sym, incomplete=True, date_range=(pd.Timestamp(0), pd.Timestamp(2030, 1, 1))).data
assert_frame_equal(entire_symbol, get_df(15, 0, np.int64))

# And test that compaction now works with the new types
lib.compact_incomplete(sym, append=True, convert_int_to_float=False, via_iteration=False)
assert read_append_data_keys_from_ref(sym) == []
assert_frame_equal(lib.read(sym).data, get_df(15, 0, np.int64))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test for appending more incompletes (with the right types) and compacting?

Loading