diff --git a/cpp/arcticdb/toolbox/library_tool.cpp b/cpp/arcticdb/toolbox/library_tool.cpp index 45cd516d2a..77844d8376 100644 --- a/cpp/arcticdb/toolbox/library_tool.cpp +++ b/cpp/arcticdb/toolbox/library_tool.cpp @@ -32,7 +32,13 @@ ReadResult LibraryTool::read(const VariantKey& key) { auto segment = read_to_segment(key); auto segment_in_memory = decode_segment(std::move(segment)); auto frame_and_descriptor = frame_and_descriptor_from_segment(std::move(segment_in_memory)); - return pipelines::read_result_from_single_frame(frame_and_descriptor, to_atom(key)); + auto atom_key = util::variant_match( + key, + [](const AtomKey& key){return key;}, + // We construct a dummy atom key in case of a RefKey to be able to build the read_result + [](const RefKey& key){return AtomKeyBuilder().build(key.id());}, + [](const auto&){}); + return pipelines::read_result_from_single_frame(frame_and_descriptor, atom_key); } Segment LibraryTool::read_to_segment(const VariantKey& key) { diff --git a/python/arcticdb/toolbox/library_tool.py b/python/arcticdb/toolbox/library_tool.py index e159961996..93e0def396 100644 --- a/python/arcticdb/toolbox/library_tool.py +++ b/python/arcticdb/toolbox/library_tool.py @@ -55,7 +55,7 @@ def dataframe_to_keys( int(row.version_id), int(row.creation_ts), int(row.content_hash), - int(index.timestamp()), + index.value, row.end_index.value, key_type, ) diff --git a/python/tests/integration/toolbox/test_library_tool.py b/python/tests/integration/toolbox/test_library_tool.py index 42a45146ec..470c3a1999 100644 --- a/python/tests/integration/toolbox/test_library_tool.py +++ b/python/tests/integration/toolbox/test_library_tool.py @@ -5,8 +5,9 @@ """ import pandas as pd import numpy as np +import pytest -from arcticdb.util.test import sample_dataframe, populate_db +from arcticdb.util.test import sample_dataframe, populate_db, assert_frame_equal from arcticdb_ext.storage import KeyType @@ -137,3 +138,95 @@ def test_count_keys(object_and_mem_and_lmdb_version_store): assert lib_tool.count_keys(KeyType.SNAPSHOT_REF) == 1 assert lib_tool.count_keys(KeyType.MULTI_KEY) == 1 assert lib_tool.count_keys(KeyType.SNAPSHOT) == 0 + + +@pytest.mark.parametrize("use_time_index", [True, False]) +def test_read_data_key_from_version_ref(in_memory_version_store, use_time_index): + lib = in_memory_version_store + lib_tool = lib.library_tool() + sym = "sym" + + df = pd.DataFrame({"a": [1, 2, 3]}) + if use_time_index: + df = pd.DataFrame(index=pd.date_range(start=pd.Timestamp(0), periods=3), data={"a": [1, 2, 3]}) + lib.write(sym, df) + + ver_ref_key = lib_tool.find_keys_for_symbol(KeyType.VERSION_REF, sym)[0] + ver_ref_entries = lib_tool.read_to_keys(ver_ref_key) + assert len(ver_ref_entries) == 2 # We expect a 2 entry version ref (i.e. without a cached undeleted) because we have just one version + assert ver_ref_entries[0].type == KeyType.TABLE_INDEX + assert ver_ref_entries[1].type == KeyType.VERSION + + ver_key = ver_ref_entries[-1] + ver_entries = lib_tool.read_to_keys(ver_key) + assert len(ver_entries) == 1 + assert ver_entries[0].type == KeyType.TABLE_INDEX + + index_key = ver_entries[0] + index_entries = lib_tool.read_to_keys(index_key) + assert len(index_entries) == 1 + assert index_entries[0].type == KeyType.TABLE_DATA + + data_key = index_entries[0] + stored_df = lib_tool.read_to_dataframe(data_key) + + # Fix index because reading directly from data key loses that information + if use_time_index: + stored_df.index.name = None + else: + stored_df = stored_df.reset_index() + assert_frame_equal(stored_df, df) + + +def test_iterate_version_chain_with_lib_tool(in_memory_version_store): + lib = in_memory_version_store + lib_tool = lib.library_tool() + sym = "sym" + num_versions = 20 + + # Populate some versions + df = pd.DataFrame(index=pd.date_range(start=pd.Timestamp(0), periods=3), data={"a": [1, 2, 3]}) + for i in range(num_versions): + prune_previous = i % 3 == 0 + lib.write(sym, df, prune_previous_version=prune_previous) + + keys_by_key_type = {} + # No need for memoization because we will visit each entry exactly once because we only do writes. + # (If we e.g. did appends we would have added table data entries multiple times) + def iterate_through_version_chain(key): + nonlocal keys_by_key_type + nonlocal lib_tool + # Add current key + if key.type not in keys_by_key_type: + keys_by_key_type[key.type] = [] + keys_by_key_type[key.type].append(key) + + # Iterate next keys + next_keys = [] + if key.type == KeyType.VERSION_REF: + # For version refs we only want to visit the last entry which is the last VERSION key + next_keys = lib_tool.read_to_keys(key)[-1:] + if key.type in [KeyType.VERSION, KeyType.TABLE_INDEX]: + try: + next_keys = lib_tool.read_to_keys(key) + except: + # Deleted index key + next_keys = [] + for next_key in next_keys: + iterate_through_version_chain(next_key) + + version_ref = lib_tool.find_keys_for_symbol(KeyType.VERSION_REF, sym)[0] + iterate_through_version_chain(version_ref) + + # We exclude index keys because we'll see table indices while iterating which are deleted. + for key_type in [KeyType.VERSION_REF, KeyType.VERSION, KeyType.TABLE_DATA]: + expected_keys = [str(key) for key in lib_tool.find_keys_for_symbol(key_type, sym)] + iterated_keys = [str(key) for key in keys_by_key_type[key_type]] + assert sorted(iterated_keys) == sorted(expected_keys) + + assert len(keys_by_key_type[KeyType.VERSION_REF]) == 1 + assert len(keys_by_key_type[KeyType.VERSION]) == num_versions + num_versions // 3 + assert len(keys_by_key_type[KeyType.TABLE_INDEX]) == num_versions + assert len(keys_by_key_type[KeyType.TABLE_DATA]) == (num_versions-1) % 3 + 1 + assert len(keys_by_key_type[KeyType.TOMBSTONE_ALL]) == num_versions // 3 +