Skip to content

Commit

Permalink
Adds a type change utility for append data keys in lib_tool
Browse files Browse the repository at this point in the history
Does this by adding a cpp layer functionality to overwrite append data
keys. And using a read + type change + overwrite in the python layer.

Also adds an elaborate test which verifies that the type change works as
expected and doesn't break the linked list structure.
  • Loading branch information
IvoDD committed Oct 18, 2024
1 parent d9ec089 commit 47cd9ef
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 20 deletions.
5 changes: 5 additions & 0 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#include <arcticdb/processing/clause.hpp>
#include <arcticdb/storage/key_segment_pair.hpp>

namespace arcticdb::toolbox::apy{
class LibraryTool;
}

namespace arcticdb::async {

std::pair<VariantKey, std::optional<Segment>> lookup_match_in_dedup_map(
Expand Down Expand Up @@ -395,6 +399,7 @@ folly::Future<SliceAndKey> async_write(
}

private:
friend class arcticdb::toolbox::apy::LibraryTool;
std::shared_ptr<storage::Library> library_;
std::shared_ptr<arcticdb::proto::encoding::VariantCodec> codec_;
const EncodingVersion encoding_version_;
Expand Down
6 changes: 6 additions & 0 deletions cpp/arcticdb/stream/append_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ void append_incomplete(
const std::shared_ptr<pipelines::InputTensorFrame>& frame,
bool validate_index);

SegmentInMemory incomplete_segment_from_frame(
const std::shared_ptr<pipelines::InputTensorFrame>& frame,
size_t existing_rows,
std::optional<entity::AtomKey>&& prev_key,
bool allow_sparse);

std::optional<int64_t> latest_incomplete_timestamp(
const std::shared_ptr<Store>& store,
const StreamId& stream_id);
Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/stream/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <arcticdb/stream/stream_reader.hpp>
#include <arcticdb/stream/stream_writer.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/entity/protobuf_mappings.hpp>

namespace py = pybind11;

Expand Down Expand Up @@ -66,6 +67,8 @@ void register_types(py::module &m) {
DATA_TYPE(NANOSECONDS_UTC64)
DATA_TYPE(ASCII_FIXED64)
DATA_TYPE(ASCII_DYNAMIC64)
DATA_TYPE(UTF_FIXED64)
DATA_TYPE(UTF_DYNAMIC64)
#undef DATA_TYPE
;

Expand Down Expand Up @@ -128,6 +131,11 @@ void register_types(py::module &m) {
return self.index();
}).def_property_readonly("total_rows", [](const TimeseriesDescriptor& self) {
return self.total_rows();
}).def_property_readonly("next_key", [](const TimeseriesDescriptor& self) -> std::optional<AtomKey> {
if (self.proto().has_next_key()){
return key_from_proto(self.proto().next_key());
}
return std::nullopt;
});

py::class_<PyTimestampRange>(m, "TimestampRange")
Expand Down
66 changes: 51 additions & 15 deletions cpp/arcticdb/toolbox/library_tool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@
#include <arcticdb/util/key_utils.hpp>
#include <arcticdb/util/variant.hpp>
#include <arcticdb/version/version_utils.hpp>
#include <arcticdb/stream/append_map.hpp>
#include <cstdlib>

namespace arcticdb::toolbox::apy {

using namespace arcticdb::entity;

LibraryTool::LibraryTool(std::shared_ptr<storage::Library> lib) {
store_ = std::make_shared<async::AsyncStore<util::SysClock>>(lib, codec::default_lz4_codec(), encoding_version(lib->config()));
LibraryTool::LibraryTool(std::shared_ptr<storage::Library> lib): engine_(lib, util::SysClock()) {}

std::shared_ptr<Store> LibraryTool::store() {
return engine_._test_get_store();
}

async::AsyncStore<>& LibraryTool::async_store() {
return dynamic_cast<async::AsyncStore<>&>(*store());
}

ReadResult LibraryTool::read(const VariantKey& key) {
Expand All @@ -42,46 +49,75 @@ ReadResult LibraryTool::read(const VariantKey& key) {
}

Segment LibraryTool::read_to_segment(const VariantKey& key) {
auto kv = store_->read_compressed_sync(key, storage::ReadKeyOpts{});
auto kv = store()->read_compressed_sync(key, storage::ReadKeyOpts{});
util::check(kv.has_segment(), "Failed to read key: {}", key);
kv.segment().force_own_buffer();
return std::move(kv.segment());
}

std::optional<google::protobuf::Any> LibraryTool::read_metadata(const VariantKey& key){
return store_->read_metadata(key, storage::ReadKeyOpts{}).get().second;
return store()->read_metadata(key, storage::ReadKeyOpts{}).get().second;
}

StreamDescriptor LibraryTool::read_descriptor(const VariantKey& key){
auto metadata_and_descriptor = store_->read_metadata_and_descriptor(key, storage::ReadKeyOpts{}).get();
auto metadata_and_descriptor = store()->read_metadata_and_descriptor(key, storage::ReadKeyOpts{}).get();
return std::get<StreamDescriptor>(metadata_and_descriptor);
}

TimeseriesDescriptor LibraryTool::read_timeseries_descriptor(const VariantKey& key){
return store_->read_timeseries_descriptor(key).get().second;
return store()->read_timeseries_descriptor(key).get().second;
}

void LibraryTool::write(VariantKey key, Segment& segment) {
storage::KeySegmentPair kv{std::move(key), std::move(segment)};
store_->write_compressed_sync(kv);
store()->write_compressed_sync(kv);
}

void LibraryTool::write_segment_in_memory(VariantKey key, SegmentInMemory& segment_in_memory) {
auto segment = encode_dispatch(std::move(segment_in_memory), *(async_store().codec_), async_store().encoding_version_);
write(key, segment);
}

SegmentInMemory LibraryTool::overwrite_append_data(
VariantKey key,
const py::tuple &item,
const py::object &norm,
const py::object & user_meta) {
if (!std::holds_alternative<AtomKey>(key) || std::get<AtomKey>(key).type() != KeyType::APPEND_DATA) {
throw_error<ErrorCode::E_INVALID_USER_ARGUMENT>(fmt::format("Can only override APPEND_DATA keys. Received: {}", key));
}
auto old_segment = read_to_segment(key);
auto old_segment_in_memory = decode_segment(std::move(old_segment));
const auto& tsd = old_segment_in_memory.index_descriptor();
std::optional<AtomKey> next_key = std::nullopt;
if (tsd.proto().has_next_key()){
next_key = key_from_proto(tsd.proto().next_key());
}

auto stream_id = util::variant_match(key, [](const auto& key){return key.id();});
auto frame = convert::py_ndf_to_frame(stream_id, item, norm, user_meta, engine_.cfg().write_options().empty_types());
auto segment_in_memory = incomplete_segment_from_frame(frame, 0, std::move(next_key), engine_.cfg().write_options().allow_sparse());
remove(key);
write_segment_in_memory(key, segment_in_memory);
return old_segment_in_memory;
}

bool LibraryTool::key_exists(const VariantKey& key) {
return store_->key_exists_sync(key);
return store()->key_exists_sync(key);
}

void LibraryTool::remove(VariantKey key) {
store_->remove_key_sync(std::move(key), storage::RemoveOpts{});
store()->remove_key_sync(std::move(key), storage::RemoveOpts{});
}

void LibraryTool::clear_ref_keys() {
delete_all_keys_of_type(KeyType::SNAPSHOT_REF, store_, false);
delete_all_keys_of_type(KeyType::SNAPSHOT_REF, store(), false);
}

std::vector<VariantKey> LibraryTool::find_keys(entity::KeyType kt) {
std::vector<VariantKey> res;

store_->iterate_type(kt, [&](VariantKey &&found_key) {
store()->iterate_type(kt, [&](VariantKey &&found_key) {
res.emplace_back(found_key);
}, "");
return res;
Expand All @@ -94,12 +130,12 @@ int LibraryTool::count_keys(entity::KeyType kt) {
count++;
};

store_->iterate_type(kt, visitor, "");
store()->iterate_type(kt, visitor, "");
return count;
}

std::vector<bool> LibraryTool::batch_key_exists(const std::vector<VariantKey>& keys) {
auto key_exists_fut = store_->batch_key_exists(keys);
auto key_exists_fut = store()->batch_key_exists(keys);
return folly::collect(key_exists_fut).get();
}

Expand All @@ -116,12 +152,12 @@ std::vector<VariantKey> LibraryTool::find_keys_for_id(entity::KeyType kt, const
}
};

store_->iterate_type(kt, visitor, string_id);
store()->iterate_type(kt, visitor, string_id);
return res;
}

std::string LibraryTool::get_key_path(const VariantKey& key) {
return store_->key_path(key);
return async_store().key_path(key);
}

std::optional<std::string> LibraryTool::inspect_env_variable(std::string name){
Expand Down
12 changes: 8 additions & 4 deletions cpp/arcticdb/toolbox/library_tool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <arcticdb/storage/library_manager.hpp>
#include <arcticdb/async/async_store.hpp>
#include <arcticdb/entity/read_result.hpp>
#include <arcticdb/version/local_versioned_engine.hpp>

#include <memory>

Expand Down Expand Up @@ -46,6 +47,10 @@ class LibraryTool {

void write(VariantKey key, Segment& segment);

void write_segment_in_memory(VariantKey key, SegmentInMemory& segment_in_memory);

SegmentInMemory overwrite_append_data(VariantKey key, const py::tuple &item, const py::object &norm, const py::object & user_meta);

void remove(VariantKey key);

std::vector<VariantKey> find_keys(arcticdb::entity::KeyType);
Expand All @@ -67,10 +72,9 @@ class LibraryTool {
static py::object read_unaltered_lib_cfg(const storage::LibraryManager& lib_manager, std::string lib_name);

private:
// TODO: Remove the shared_ptr and just keep the store.
// The only reason we use a shared_ptr for the store is to be able to pass it to delete_all_keys_of_type.
// We can remove the shared_ptr when delete_all_keys_of_type takes a const ref instead of a shared pointer.
std::shared_ptr<arcticdb::async::AsyncStore<util::SysClock>> store_;
std::shared_ptr<Store> store();
async::AsyncStore<>& async_store();
version_store::LocalVersionedEngine engine_;
};

} //namespace arcticdb::toolbox::apy
2 changes: 2 additions & 0 deletions cpp/arcticdb/toolbox/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ void register_bindings(py::module &m) {
E.g. an Index key for a symbol which has columns "index" and "col" will have <FieldRef>s for those columns.
)pbdoc")
.def("write", &LibraryTool::write)
.def("write_segment_in_memory", &LibraryTool::write_segment_in_memory)
.def("overwrite_append_data", &LibraryTool::overwrite_append_data)
.def("remove", &LibraryTool::remove)
.def("find_keys", &LibraryTool::find_keys)
.def("count_keys", &LibraryTool::count_keys)
Expand Down
31 changes: 30 additions & 1 deletion python/arcticdb/toolbox/library_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from arcticdb_ext.stream import SegmentInMemory
from arcticdb_ext.tools import LibraryTool as LibraryToolImpl
from arcticdb_ext.version_store import AtomKey, PythonOutputFrame, RefKey
from arcticdb.version_store._normalization import denormalize_dataframe
from arcticdb.version_store._normalization import denormalize_dataframe, normalize_dataframe

VariantKey = Union[AtomKey, RefKey]
VersionQueryInput = Union[int, str, ExplicitlySupportedDates, None]
Expand Down Expand Up @@ -159,3 +159,32 @@ def read_index(self, symbol: str, as_of: Optional[VersionQueryInput] = None, **k
Pandas DataFrame representing the index key in a human-readable format.
"""
return self._nvs.read_index(symbol, as_of, **kwargs)

def normalize_dataframe_with_nvs_defaults(self, df : pd.DataFrame):
# TODO: Have a unified place where we resolve all the normalization parameters and use that here.
# Currently all these parameters are resolved in various places throughout the _store.py. This can result in
# different defaults for different operations which is not desirable.
write_options = self._nvs._lib_cfg.lib_desc.version.write_options
dynamic_schema = self._nvs.resolve_defaults("dynamic_schema", write_options, False)
empty_types = self._nvs.resolve_defaults("empty_types", write_options, False)
dynamic_strings = self._nvs._resolve_dynamic_strings({})
return normalize_dataframe(df, dynamic_schema=dynamic_schema, empty_types=empty_types, dynamic_strings=dynamic_strings)

def overwrite_append_data_with_dataframe(self, key : VariantKey, df : pd.DataFrame) -> SegmentInMemory:
"""
Overwrites the variant key with the provided dataframe. Use with extreme caution as it can completely mess up
the underlying data structures.
Returns
-------
Segment copy of what was stored in the key before it was overwritten. Can be used with lib_tool.write_segment to
back out the change caused by this in case data ends up corrupted.
"""
item, norm_meta = self.normalize_dataframe_with_nvs_defaults(df)
return self.overwrite_append_data(key, item, norm_meta, None)

def update_append_data_column_type(self, key : VariantKey, column : str, to_type : type) -> SegmentInMemory:
old_df = self.read_to_dataframe(key)
assert column in old_df.columns
new_df = old_df.astype({column: to_type})
return self.overwrite_append_data_with_dataframe(key, new_df)
3 changes: 3 additions & 0 deletions python/arcticdb/version_store/_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -1247,6 +1247,9 @@ def denormalize_dataframe(ret):

return DataFrameNormalizer().denormalize(frame_data, read_result.norm.df)

def normalize_dataframe(df, **kwargs):
return DataFrameNormalizer().normalize(df, **kwargs)


T = TypeVar("T", bound=Union[pd.DataFrame, pd.Series])

Expand Down
84 changes: 84 additions & 0 deletions python/tests/integration/toolbox/test_library_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

from arcticdb.util.test import sample_dataframe, populate_db, assert_frame_equal
from arcticdb_ext.storage import KeyType
from arcticdb_ext.types import DataType
from arcticdb_ext.exceptions import SchemaException


def get_ref_key_types():
Expand Down Expand Up @@ -230,3 +232,85 @@ def iterate_through_version_chain(key):
assert len(keys_by_key_type[KeyType.TABLE_DATA]) == (num_versions-1) % 3 + 1
assert len(keys_by_key_type[KeyType.TOMBSTONE_ALL]) == num_versions // 3


def test_overwrite_append_data(object_and_mem_and_lmdb_version_store):
lib = object_and_mem_and_lmdb_version_store
if lib._lib_cfg.lib_desc.version.encoding_version == 1:
# TODO: Fix the timeseries descriptor packing. Currently the [incomplete_segment_from_frame] function in cpp is
# not encoding aware so all incomplete writes are broken with v2 encoding.
pytest.xfail("Writing the timeseries descriptor for incompletes is currently broken with encoding v2")
lib_tool = lib.library_tool()
sym = "sym"

def get_df(num_rows, start_index, col_type):
start_date = pd.Timestamp(2024, 1, 1) + pd.Timedelta(start_index, unit="d")
index = pd.date_range(start_date, periods=num_rows)
df = pd.DataFrame({"col": range(start_index, num_rows+start_index) , "other": range(start_index, num_rows+start_index)}, index=index)
# Streaming data has a named index
df.index.name = "time"
return df.astype({"col": col_type})

# Deliberately write mismatching incomplete types
lib.write(sym, get_df(3, 0, np.int64))
lib.write(sym, get_df(1, 3, np.int64), incomplete=True)
lib.write(sym, get_df(1, 4, str), incomplete=True)
lib.write(sym, get_df(1, 5, np.int64), incomplete=True)

def read_append_data_keys_from_ref(symbol):
nonlocal lib_tool
append_ref = lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, symbol)[0]
append_data_keys = []
next_key = lib_tool.read_timeseries_descriptor(append_ref).next_key
while next_key != None and lib_tool.key_exists(next_key):
append_data_keys.append(next_key)
next_key = lib_tool.read_timeseries_descriptor(next_key).next_key
return append_data_keys

def read_type(key, column):
nonlocal lib_tool
fields = lib_tool.read_descriptor(key).fields()
for field in fields:
if field.name == column:
return field.type.data_type()
return None

# We assert that types are as we wrote them and we can't read or compact because of type mismatch
append_keys = read_append_data_keys_from_ref(sym)
assert len(append_keys) == 3
# Different storages use either fixed or dynamic strings
str_dtype = DataType.UTF_DYNAMIC64 if lib_tool._nvs._resolve_dynamic_strings({}) else DataType.UTF_FIXED64
assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, str_dtype, DataType.INT64]
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64]
with pytest.raises(SchemaException):
lib.read(sym, incomplete=True, date_range=(pd.Timestamp(0), pd.Timestamp(2030, 1, 1)))
with pytest.raises(SchemaException):
lib.compact_incomplete(sym, append=True, convert_int_to_float=False, via_iteration=False)

# We change the last append data key to string and verify it's now a string
lib_tool.update_append_data_column_type(append_keys[0], "col", str)
assert read_append_data_keys_from_ref(sym) == append_keys
assert [read_type(key, "col") for key in append_keys] == [str_dtype, str_dtype, DataType.INT64]
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64]
assert_frame_equal(lib_tool.read_to_dataframe(append_keys[0]), get_df(1, 5, str))

# And now make all append data keys ints which makes the symbol readable.
lib_tool.update_append_data_column_type(append_keys[0], "col", np.int64)
lib_tool.update_append_data_column_type(append_keys[1], "col", np.int64)
assert read_append_data_keys_from_ref(sym) == append_keys
assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64]
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64]
entire_symbol = lib.read(sym, incomplete=True, date_range=(pd.Timestamp(0), pd.Timestamp(2030, 1, 1))).data
assert_frame_equal(entire_symbol, get_df(6, 0, np.int64))

# And test that we can overwrite with arbitrary data
lib_tool.overwrite_append_data_with_dataframe(append_keys[0], get_df(10, 5, np.int64))
assert read_append_data_keys_from_ref(sym) == append_keys
assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64]
assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64]
entire_symbol = lib.read(sym, incomplete=True, date_range=(pd.Timestamp(0), pd.Timestamp(2030, 1, 1))).data
assert_frame_equal(entire_symbol, get_df(15, 0, np.int64))

# And test that compaction now works with the new types
lib.compact_incomplete(sym, append=True, convert_int_to_float=False, via_iteration=False)
assert read_append_data_keys_from_ref(sym) == []
assert_frame_equal(lib.read(sym).data, get_df(15, 0, np.int64))

0 comments on commit 47cd9ef

Please sign in to comment.