diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index 40dc642ba1..aea666e72c 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -18,6 +18,10 @@ #include #include +namespace arcticdb::toolbox::apy{ + class LibraryTool; +} + namespace arcticdb::async { std::pair> lookup_match_in_dedup_map( @@ -395,6 +399,7 @@ folly::Future async_write( } private: + friend class arcticdb::toolbox::apy::LibraryTool; std::shared_ptr library_; std::shared_ptr codec_; const EncodingVersion encoding_version_; diff --git a/cpp/arcticdb/stream/append_map.hpp b/cpp/arcticdb/stream/append_map.hpp index b32631b039..2c9d9791e0 100644 --- a/cpp/arcticdb/stream/append_map.hpp +++ b/cpp/arcticdb/stream/append_map.hpp @@ -72,6 +72,12 @@ void append_incomplete( const std::shared_ptr& frame, bool validate_index); +SegmentInMemory incomplete_segment_from_frame( + const std::shared_ptr& frame, + size_t existing_rows, + std::optional&& prev_key, + bool allow_sparse); + std::optional latest_incomplete_timestamp( const std::shared_ptr& store, const StreamId& stream_id); diff --git a/cpp/arcticdb/stream/python_bindings.cpp b/cpp/arcticdb/stream/python_bindings.cpp index e69be7ac60..4b068a48a3 100644 --- a/cpp/arcticdb/stream/python_bindings.cpp +++ b/cpp/arcticdb/stream/python_bindings.cpp @@ -19,6 +19,7 @@ #include #include #include +#include namespace py = pybind11; @@ -66,6 +67,8 @@ void register_types(py::module &m) { DATA_TYPE(NANOSECONDS_UTC64) DATA_TYPE(ASCII_FIXED64) DATA_TYPE(ASCII_DYNAMIC64) + DATA_TYPE(UTF_FIXED64) + DATA_TYPE(UTF_DYNAMIC64) #undef DATA_TYPE ; @@ -128,6 +131,11 @@ void register_types(py::module &m) { return self.index(); }).def_property_readonly("total_rows", [](const TimeseriesDescriptor& self) { return self.total_rows(); + }).def_property_readonly("next_key", [](const TimeseriesDescriptor& self) -> std::optional { + if (self.proto().has_next_key()){ + return key_from_proto(self.proto().next_key()); + } + return std::nullopt; }); py::class_(m, "TimestampRange") diff --git a/cpp/arcticdb/toolbox/library_tool.cpp b/cpp/arcticdb/toolbox/library_tool.cpp index 77844d8376..a3a6f62c25 100644 --- a/cpp/arcticdb/toolbox/library_tool.cpp +++ b/cpp/arcticdb/toolbox/library_tool.cpp @@ -18,14 +18,21 @@ #include #include #include +#include #include namespace arcticdb::toolbox::apy { using namespace arcticdb::entity; -LibraryTool::LibraryTool(std::shared_ptr lib) { - store_ = std::make_shared>(lib, codec::default_lz4_codec(), encoding_version(lib->config())); +LibraryTool::LibraryTool(std::shared_ptr lib): engine_(lib, util::SysClock()) {} + +std::shared_ptr LibraryTool::store() { + return engine_._test_get_store(); +} + +async::AsyncStore<>& LibraryTool::async_store() { + return dynamic_cast&>(*store()); } ReadResult LibraryTool::read(const VariantKey& key) { @@ -42,46 +49,75 @@ ReadResult LibraryTool::read(const VariantKey& key) { } Segment LibraryTool::read_to_segment(const VariantKey& key) { - auto kv = store_->read_compressed_sync(key, storage::ReadKeyOpts{}); + auto kv = store()->read_compressed_sync(key, storage::ReadKeyOpts{}); util::check(kv.has_segment(), "Failed to read key: {}", key); kv.segment().force_own_buffer(); return std::move(kv.segment()); } std::optional LibraryTool::read_metadata(const VariantKey& key){ - return store_->read_metadata(key, storage::ReadKeyOpts{}).get().second; + return store()->read_metadata(key, storage::ReadKeyOpts{}).get().second; } StreamDescriptor LibraryTool::read_descriptor(const VariantKey& key){ - auto metadata_and_descriptor = store_->read_metadata_and_descriptor(key, storage::ReadKeyOpts{}).get(); + auto metadata_and_descriptor = store()->read_metadata_and_descriptor(key, storage::ReadKeyOpts{}).get(); return std::get(metadata_and_descriptor); } TimeseriesDescriptor LibraryTool::read_timeseries_descriptor(const VariantKey& key){ - return store_->read_timeseries_descriptor(key).get().second; + return store()->read_timeseries_descriptor(key).get().second; } void LibraryTool::write(VariantKey key, Segment& segment) { storage::KeySegmentPair kv{std::move(key), std::move(segment)}; - store_->write_compressed_sync(kv); + store()->write_compressed_sync(kv); +} + +void LibraryTool::overwrite_segment_in_memory(VariantKey key, SegmentInMemory& segment_in_memory) { + auto segment = encode_dispatch(std::move(segment_in_memory), *(async_store().codec_), async_store().encoding_version_); + remove(key); + write(key, segment); +} + +SegmentInMemory LibraryTool::overwrite_append_data( + VariantKey key, + const py::tuple &item, + const py::object &norm, + const py::object & user_meta) { + if (!std::holds_alternative(key) || std::get(key).type() != KeyType::APPEND_DATA) { + throw_error(fmt::format("Can only override APPEND_DATA keys. Received: {}", key)); + } + auto old_segment = read_to_segment(key); + auto old_segment_in_memory = decode_segment(std::move(old_segment)); + const auto& tsd = old_segment_in_memory.index_descriptor(); + std::optional next_key = std::nullopt; + if (tsd.proto().has_next_key()){ + next_key = key_from_proto(tsd.proto().next_key()); + } + + auto stream_id = util::variant_match(key, [](const auto& key){return key.id();}); + auto frame = convert::py_ndf_to_frame(stream_id, item, norm, user_meta, engine_.cfg().write_options().empty_types()); + auto segment_in_memory = incomplete_segment_from_frame(frame, 0, std::move(next_key), engine_.cfg().write_options().allow_sparse()); + overwrite_segment_in_memory(key, segment_in_memory); + return old_segment_in_memory; } bool LibraryTool::key_exists(const VariantKey& key) { - return store_->key_exists_sync(key); + return store()->key_exists_sync(key); } void LibraryTool::remove(VariantKey key) { - store_->remove_key_sync(std::move(key), storage::RemoveOpts{}); + store()->remove_key_sync(std::move(key), storage::RemoveOpts{}); } void LibraryTool::clear_ref_keys() { - delete_all_keys_of_type(KeyType::SNAPSHOT_REF, store_, false); + delete_all_keys_of_type(KeyType::SNAPSHOT_REF, store(), false); } std::vector LibraryTool::find_keys(entity::KeyType kt) { std::vector res; - store_->iterate_type(kt, [&](VariantKey &&found_key) { + store()->iterate_type(kt, [&](VariantKey &&found_key) { res.emplace_back(found_key); }, ""); return res; @@ -94,12 +130,12 @@ int LibraryTool::count_keys(entity::KeyType kt) { count++; }; - store_->iterate_type(kt, visitor, ""); + store()->iterate_type(kt, visitor, ""); return count; } std::vector LibraryTool::batch_key_exists(const std::vector& keys) { - auto key_exists_fut = store_->batch_key_exists(keys); + auto key_exists_fut = store()->batch_key_exists(keys); return folly::collect(key_exists_fut).get(); } @@ -116,12 +152,12 @@ std::vector LibraryTool::find_keys_for_id(entity::KeyType kt, const } }; - store_->iterate_type(kt, visitor, string_id); + store()->iterate_type(kt, visitor, string_id); return res; } std::string LibraryTool::get_key_path(const VariantKey& key) { - return store_->key_path(key); + return async_store().key_path(key); } std::optional LibraryTool::inspect_env_variable(std::string name){ diff --git a/cpp/arcticdb/toolbox/library_tool.hpp b/cpp/arcticdb/toolbox/library_tool.hpp index 7c4a5ff493..da5b2bec12 100644 --- a/cpp/arcticdb/toolbox/library_tool.hpp +++ b/cpp/arcticdb/toolbox/library_tool.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include @@ -46,6 +47,10 @@ class LibraryTool { void write(VariantKey key, Segment& segment); + void overwrite_segment_in_memory(VariantKey key, SegmentInMemory& segment_in_memory); + + SegmentInMemory overwrite_append_data(VariantKey key, const py::tuple &item, const py::object &norm, const py::object & user_meta); + void remove(VariantKey key); std::vector find_keys(arcticdb::entity::KeyType); @@ -67,10 +72,9 @@ class LibraryTool { static py::object read_unaltered_lib_cfg(const storage::LibraryManager& lib_manager, std::string lib_name); private: - // TODO: Remove the shared_ptr and just keep the store. - // The only reason we use a shared_ptr for the store is to be able to pass it to delete_all_keys_of_type. - // We can remove the shared_ptr when delete_all_keys_of_type takes a const ref instead of a shared pointer. - std::shared_ptr> store_; + std::shared_ptr store(); + async::AsyncStore<>& async_store(); + version_store::LocalVersionedEngine engine_; }; } //namespace arcticdb::toolbox::apy diff --git a/cpp/arcticdb/toolbox/python_bindings.cpp b/cpp/arcticdb/toolbox/python_bindings.cpp index 4b2cc95b6c..1ec9f124aa 100644 --- a/cpp/arcticdb/toolbox/python_bindings.cpp +++ b/cpp/arcticdb/toolbox/python_bindings.cpp @@ -50,6 +50,8 @@ void register_bindings(py::module &m) { E.g. an Index key for a symbol which has columns "index" and "col" will have s for those columns. )pbdoc") .def("write", &LibraryTool::write) + .def("overwrite_segment_in_memory", &LibraryTool::overwrite_segment_in_memory) + .def("overwrite_append_data", &LibraryTool::overwrite_append_data) .def("remove", &LibraryTool::remove) .def("find_keys", &LibraryTool::find_keys) .def("count_keys", &LibraryTool::count_keys) diff --git a/python/arcticdb/toolbox/library_tool.py b/python/arcticdb/toolbox/library_tool.py index 93e0def396..ec7b970695 100644 --- a/python/arcticdb/toolbox/library_tool.py +++ b/python/arcticdb/toolbox/library_tool.py @@ -13,7 +13,7 @@ from arcticdb_ext.stream import SegmentInMemory from arcticdb_ext.tools import LibraryTool as LibraryToolImpl from arcticdb_ext.version_store import AtomKey, PythonOutputFrame, RefKey -from arcticdb.version_store._normalization import denormalize_dataframe +from arcticdb.version_store._normalization import denormalize_dataframe, normalize_dataframe VariantKey = Union[AtomKey, RefKey] VersionQueryInput = Union[int, str, ExplicitlySupportedDates, None] @@ -159,3 +159,32 @@ def read_index(self, symbol: str, as_of: Optional[VersionQueryInput] = None, **k Pandas DataFrame representing the index key in a human-readable format. """ return self._nvs.read_index(symbol, as_of, **kwargs) + + def normalize_dataframe_with_nvs_defaults(self, df : pd.DataFrame): + # TODO: Have a unified place where we resolve all the normalization parameters and use that here. + # Currently all these parameters are resolved in various places throughout the _store.py. This can result in + # different defaults for different operations which is not desirable. + write_options = self._nvs._lib_cfg.lib_desc.version.write_options + dynamic_schema = self._nvs.resolve_defaults("dynamic_schema", write_options, False) + empty_types = self._nvs.resolve_defaults("empty_types", write_options, False) + dynamic_strings = self._nvs._resolve_dynamic_strings({}) + return normalize_dataframe(df, dynamic_schema=dynamic_schema, empty_types=empty_types, dynamic_strings=dynamic_strings) + + def overwrite_append_data_with_dataframe(self, key : VariantKey, df : pd.DataFrame) -> SegmentInMemory: + """ + Overwrites the append data key with the provided dataframe. Use with extreme caution as overwriting with + inappropriate data can render the symbol unreadable. + + Returns + ------- + SegmentInMemory backup of what was stored in the key before it was overwritten. Can be used with + lib_tool.overwrite_segment_in_memory to back out the change caused by this in case data ends up corrupted. + """ + item, norm_meta = self.normalize_dataframe_with_nvs_defaults(df) + return self.overwrite_append_data(key, item, norm_meta, None) + + def update_append_data_column_type(self, key : VariantKey, column : str, to_type : type) -> SegmentInMemory: + old_df = self.read_to_dataframe(key) + assert column in old_df.columns + new_df = old_df.astype({column: to_type}) + return self.overwrite_append_data_with_dataframe(key, new_df) \ No newline at end of file diff --git a/python/arcticdb/version_store/_normalization.py b/python/arcticdb/version_store/_normalization.py index 8b870b7718..d0c1d4d8db 100644 --- a/python/arcticdb/version_store/_normalization.py +++ b/python/arcticdb/version_store/_normalization.py @@ -1247,6 +1247,9 @@ def denormalize_dataframe(ret): return DataFrameNormalizer().denormalize(frame_data, read_result.norm.df) +def normalize_dataframe(df, **kwargs): + return DataFrameNormalizer().normalize(df, **kwargs) + T = TypeVar("T", bound=Union[pd.DataFrame, pd.Series]) diff --git a/python/tests/integration/toolbox/test_library_tool.py b/python/tests/integration/toolbox/test_library_tool.py index 470c3a1999..ec84276e1f 100644 --- a/python/tests/integration/toolbox/test_library_tool.py +++ b/python/tests/integration/toolbox/test_library_tool.py @@ -9,6 +9,8 @@ from arcticdb.util.test import sample_dataframe, populate_db, assert_frame_equal from arcticdb_ext.storage import KeyType +from arcticdb_ext.types import DataType +from arcticdb_ext.exceptions import SchemaException def get_ref_key_types(): @@ -230,3 +232,92 @@ def iterate_through_version_chain(key): assert len(keys_by_key_type[KeyType.TABLE_DATA]) == (num_versions-1) % 3 + 1 assert len(keys_by_key_type[KeyType.TOMBSTONE_ALL]) == num_versions // 3 + +def test_overwrite_append_data(object_and_mem_and_lmdb_version_store): + lib = object_and_mem_and_lmdb_version_store + if lib._lib_cfg.lib_desc.version.encoding_version == 1: + # TODO: Fix the timeseries descriptor packing. Currently the [incomplete_segment_from_frame] function in cpp is + # not encoding aware so all incomplete writes are broken with v2 encoding. + pytest.xfail("Writing the timeseries descriptor for incompletes is currently broken with encoding v2") + lib_tool = lib.library_tool() + sym = "sym" + + def get_df(num_rows, start_index, col_type): + start_date = pd.Timestamp(2024, 1, 1) + pd.Timedelta(start_index, unit="d") + index = pd.date_range(start_date, periods=num_rows) + df = pd.DataFrame({"col": range(start_index, num_rows+start_index) , "other": range(start_index, num_rows+start_index)}, index=index) + # Streaming data has a named index + df.index.name = "time" + return df.astype({"col": col_type}) + + # Deliberately write mismatching incomplete types + lib.write(sym, get_df(3, 0, np.int64)) + lib.write(sym, get_df(1, 3, np.int64), incomplete=True) + lib.write(sym, get_df(1, 4, str), incomplete=True) + lib.write(sym, get_df(1, 5, np.int64), incomplete=True) + + def read_append_data_keys_from_ref(symbol): + nonlocal lib_tool + append_ref = lib_tool.find_keys_for_symbol(KeyType.APPEND_REF, symbol)[0] + append_data_keys = [] + next_key = lib_tool.read_timeseries_descriptor(append_ref).next_key + while next_key != None and lib_tool.key_exists(next_key): + append_data_keys.append(next_key) + next_key = lib_tool.read_timeseries_descriptor(next_key).next_key + return append_data_keys + + def read_type(key, column): + nonlocal lib_tool + fields = lib_tool.read_descriptor(key).fields() + for field in fields: + if field.name == column: + return field.type.data_type() + return None + + # We assert that types are as we wrote them and we can't read or compact because of type mismatch + append_keys = read_append_data_keys_from_ref(sym) + assert len(append_keys) == 3 + # Different storages use either fixed or dynamic strings + str_dtype = DataType.UTF_DYNAMIC64 if lib_tool._nvs._resolve_dynamic_strings({}) else DataType.UTF_FIXED64 + assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, str_dtype, DataType.INT64] + assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] + with pytest.raises(SchemaException): + lib.read(sym, incomplete=True, date_range=(pd.Timestamp(0), pd.Timestamp(2030, 1, 1))) + with pytest.raises(SchemaException): + lib.compact_incomplete(sym, append=True, convert_int_to_float=False, via_iteration=False) + + # We change the last append data key to string and verify it's now a string + backout_segment = lib_tool.update_append_data_column_type(append_keys[0], "col", str) + assert read_append_data_keys_from_ref(sym) == append_keys + assert [read_type(key, "col") for key in append_keys] == [str_dtype, str_dtype, DataType.INT64] + assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] + assert_frame_equal(lib_tool.read_to_dataframe(append_keys[0]), get_df(1, 5, str)) + + # We test that we can backout the change using the returned SegmentInMemory + lib_tool.overwrite_segment_in_memory(append_keys[0], backout_segment) + assert read_append_data_keys_from_ref(sym) == append_keys + assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, str_dtype, DataType.INT64] + assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] + assert_frame_equal(lib_tool.read_to_dataframe(append_keys[0]), get_df(1, 5, np.int64)) + + # And now make all append data keys ints which makes the symbol readable. + lib_tool.update_append_data_column_type(append_keys[1], "col", np.int64) + lib_tool.update_append_data_column_type(append_keys[2], "col", np.int64) # This should be idempotent + assert read_append_data_keys_from_ref(sym) == append_keys + assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] + assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] + entire_symbol = lib.read(sym, incomplete=True, date_range=(pd.Timestamp(0), pd.Timestamp(2030, 1, 1))).data + assert_frame_equal(entire_symbol, get_df(6, 0, np.int64)) + + # And test that we can overwrite with arbitrary data + lib_tool.overwrite_append_data_with_dataframe(append_keys[0], get_df(10, 5, np.int64)) + assert read_append_data_keys_from_ref(sym) == append_keys + assert [read_type(key, "col") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] + assert [read_type(key, "other") for key in append_keys] == [DataType.INT64, DataType.INT64, DataType.INT64] + entire_symbol = lib.read(sym, incomplete=True, date_range=(pd.Timestamp(0), pd.Timestamp(2030, 1, 1))).data + assert_frame_equal(entire_symbol, get_df(15, 0, np.int64)) + + # And test that compaction now works with the new types + lib.compact_incomplete(sym, append=True, convert_int_to_float=False, via_iteration=False) + assert read_append_data_keys_from_ref(sym) == [] + assert_frame_equal(lib.read(sym).data, get_df(15, 0, np.int64))