Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable partial/incremental ChunkIndex cache updates #8541

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions src/borg/archiver/compact_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,8 @@ def get_repository_chunks(self) -> ChunkIndex:
return chunks

def save_chunk_index(self):
# first clean up:
for id, entry in self.chunks.iteritems():
# we already deleted the unused chunks, so everything left must be used:
assert entry.flags & ChunkIndex.F_USED
# as we put the wrong size in there, we need to clean up the size:
self.chunks[id] = entry._replace(size=0)
# now self.chunks is an uptodate ChunkIndex, usable for general borg usage!
# write_chunkindex_to_repo now removes all flags and size infos.
# we need this, as we put the wrong size in there.
write_chunkindex_to_repo_cache(self.repository, self.chunks, clear=True, force_write=True, delete_other=True)
self.chunks = None # nothing there (cleared!)

Expand Down
54 changes: 37 additions & 17 deletions src/borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,7 @@
assert isinstance(entry, FileCacheEntry)
compressed_chunks = []
for id, size in entry.chunks:
cie = self.chunks.get(id)
assert cie is not None
assert cie.flags & ChunkIndex.F_USED
cie = self.chunks[id] # may raise KeyError if chunk id is not in repo
if cie.size == 0: # size is not known in the chunks index yet
self.chunks[id] = cie._replace(size=size)
else:
Expand All @@ -418,9 +416,7 @@
for idx in entry.chunks:
assert isinstance(idx, int), f"{idx} is not an int"
id = self.chunks.idx_to_k(idx)
cie = self.chunks.get(id)
assert cie is not None
assert cie.flags & ChunkIndex.F_USED
cie = self.chunks[id]
assert cie.size > 0
chunks.append((id, cie.size))
entry = entry._replace(chunks=chunks)
Expand Down Expand Up @@ -485,6 +481,7 @@
mtime=int_to_timestamp(mtime_ns),
chunks=item.chunks,
)
# note: if the repo is an a valid state, next line should not fail with KeyError:
files[path_hash] = self.compress_entry(entry)
# deal with special snapshot / timestamp granularity case, see FAQ:
for path_hash in self._newest_path_hashes:
Expand Down Expand Up @@ -529,7 +526,11 @@
for path_hash, entry in u:
entry = FileCacheEntry(*entry)
entry = entry._replace(age=entry.age + 1)
files[path_hash] = self.compress_entry(entry)
try:
files[path_hash] = self.compress_entry(entry)
except KeyError:

Check warning on line 531 in src/borg/cache.py

View check run for this annotation

Codecov / codecov/patch

src/borg/cache.py#L531

Added line #L531 was not covered by tests
# repo is missing a chunk referenced from entry
logger.debug(f"compress_entry failed for {entry}, skipping.")

Check warning on line 533 in src/borg/cache.py

View check run for this annotation

Codecov / codecov/patch

src/borg/cache.py#L533

Added line #L533 was not covered by tests
except (TypeError, ValueError) as exc:
msg = "The files cache seems invalid. [%s]" % str(exc)
break
Expand Down Expand Up @@ -706,14 +707,23 @@
def write_chunkindex_to_repo_cache(
repository, chunks, *, clear=False, force_write=False, delete_other=False, delete_these=None
):
cached_hashes = list_chunkindex_hashes(repository)
# the borghash code has no means to only serialize the F_NEW table entries,
# thus we copy only the new entries to a temporary table:
new_chunks = ChunkIndex()
# for now, we don't want to serialize the flags or the size, just the keys (chunk IDs):
cleaned_value = ChunkIndexEntry(flags=ChunkIndex.F_NONE, size=0)
for key, _ in chunks.iteritems(only_new=True):
new_chunks[key] = cleaned_value
with io.BytesIO() as f:
chunks.write(f)
new_chunks.write(f)
data = f.getvalue()
logger.debug(f"caching {len(new_chunks)} new chunks.")
new_chunks.clear() # free memory of the temporary table
if clear:
# if we don't need the in-memory chunks index anymore:
chunks.clear() # free memory, immediately
new_hash = bin_to_hex(xxh64(data, seed=CHUNKINDEX_HASH_SEED))
cached_hashes = list_chunkindex_hashes(repository)
if force_write or new_hash not in cached_hashes:
# when an updated chunks index is stored into the cache, we also store its hash as part of the name.
# when a client is loading the chunks index from a cache, it has to compare its xxh64
Expand All @@ -725,12 +735,15 @@
cache_name = f"cache/chunks.{new_hash}"
logger.debug(f"caching chunks index as {cache_name} in repository...")
repository.store_store(cache_name, data)
# we have successfully stored to the repository, so we can clear all F_NEW flags now:
chunks.clear_new()
# delete some not needed cached chunk indexes, but never the one we just wrote:
if delete_other:
delete_these = cached_hashes
delete_these = set(cached_hashes) - {new_hash}
elif delete_these:
pass
delete_these = set(delete_these) - {new_hash}
else:
delete_these = []
delete_these = set()
for hash in delete_these:
cache_name = f"cache/chunks.{hash}"
try:
Expand Down Expand Up @@ -783,6 +796,8 @@
write_chunkindex_to_repo_cache(
repository, chunks, clear=False, force_write=True, delete_these=hashes
)
else:
chunks.clear_new()
return chunks
# if we didn't get anything from the cache, compute the ChunkIndex the slow way:
logger.debug("querying the chunk IDs list from the repo...")
Expand Down Expand Up @@ -818,6 +833,8 @@
self._chunks = None
self.last_refresh_dt = datetime.now(timezone.utc)
self.refresh_td = timedelta(seconds=60)
self.chunks_cache_last_write = datetime.now(timezone.utc)
self.chunks_cache_write_td = timedelta(seconds=600)

@property
def chunks(self):
Expand Down Expand Up @@ -864,6 +881,7 @@
else:
raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
now = datetime.now(timezone.utc)
self._maybe_write_chunks_cache(now)
exists = self.seen_chunk(id, size)
if exists:
# if borg create is processing lots of unchanged files (no content and not metadata changes),
Expand All @@ -879,10 +897,10 @@
stats.update(size, not exists)
return ChunkListEntry(id, size)

def _write_chunks_cache(self, chunks):
# this is called from .close, so we can clear here:
write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=True)
self._chunks = None # nothing there (cleared!)
def _maybe_write_chunks_cache(self, now, force=False, clear=False):
if force or now > self.chunks_cache_last_write + self.chunks_cache_write_td:
write_chunkindex_to_repo_cache(self.repository, self._chunks, clear=clear)
self.chunks_cache_last_write = now

def refresh_lock(self, now):
if now > self.last_refresh_dt + self.refresh_td:
Expand Down Expand Up @@ -980,7 +998,9 @@
for key, value in sorted(self._chunks.stats.items()):
logger.debug(f"Chunks index stats: {key}: {value}")
pi.output("Saving chunks cache")
self._write_chunks_cache(self._chunks) # cache/chunks in repo has a different integrity mechanism
# note: cache/chunks.* in repo has a different integrity mechanism
self._maybe_write_chunks_cache(self._chunks, force=True, clear=True)
self._chunks = None # nothing there (cleared!)
pi.output("Saving cache config")
self.cache_config.save(self.manifest)
self.cache_config.close()
Expand Down
6 changes: 5 additions & 1 deletion src/borg/hashindex.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ CIE = Union[Tuple[int, int], Type[ChunkIndexEntry]]
class ChunkIndex:
F_NONE: int
F_USED: int
F_NEW: int
M_USER: int
M_SYSTEM: int
def add(self, key: bytes, size: int) -> None: ...
def iteritems(self, marker: bytes = ...) -> Iterator: ...
def iteritems(self, *, only_new: bool = ...) -> Iterator: ...
def clear_new(self) -> None: ...
def __contains__(self, key: bytes) -> bool: ...
def __getitem__(self, key: bytes) -> Type[ChunkIndexEntry]: ...
def __setitem__(self, key: bytes, value: CIE) -> None: ...
Expand Down
52 changes: 47 additions & 5 deletions src/borg/hashindex.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,16 @@ ChunkIndexEntry = namedtuple('ChunkIndexEntry', 'flags size')

class ChunkIndex(HTProxyMixin, MutableMapping):
"""
Mapping from key256 to (refcount32, size32) to track chunks in the repository.
Mapping from key256 to (flags32, size32) to track chunks in the repository.
"""
# .flags values: 2^0 .. 2^31
# .flags related values:
F_NONE = 0 # all flags cleared
F_USED = 1 # chunk is used/referenced
M_USER = 0x00ffffff # mask for user flags
M_SYSTEM = 0xff000000 # mask for system flags
# user flags:
F_USED = 2 ** 0 # chunk is used/referenced
# system flags (internal use, always 0 to user, not changeable by user):
F_NEW = 2 ** 24 # a new chunk that is not present in repo/cache/chunks.* yet.

def __init__(self, capacity=1000, path=None, usable=None):
if path:
Expand All @@ -53,8 +58,15 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
capacity = usable * 2 # load factor 0.5
self.ht = HashTableNT(key_size=32, value_format="<II", value_type=ChunkIndexEntry, capacity=capacity)

def iteritems(self):
yield from self.ht.items()
def hide_system_flags(self, value):
user_flags = value.flags & self.M_USER
return value._replace(flags=user_flags)

def iteritems(self, *, only_new=False):
"""iterate items (optionally only new items), hide system flags."""
for key, value in self.ht.items():
if not only_new or (value.flags & self.F_NEW):
yield key, self.hide_system_flags(value)

def add(self, key, size):
v = self.get(key)
Expand All @@ -65,6 +77,36 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
assert v.size == 0 or v.size == size
self[key] = ChunkIndexEntry(flags=flags, size=size)

def __getitem__(self, key):
"""specialized __getitem__ that hides system flags."""
value = self.ht[key]
return self.hide_system_flags(value)

def __setitem__(self, key, value):
"""specialized __setitem__ that protects system flags, manages F_NEW flag."""
try:
prev = self.ht[key]
except KeyError:
prev_flags = self.F_NONE
is_new = True
else:
prev_flags = prev.flags
is_new = bool(prev_flags & self.F_NEW) # was new? stays new!
system_flags = prev_flags & self.M_SYSTEM
if is_new:
system_flags |= self.F_NEW
else:
system_flags &= ~self.F_NEW
user_flags = value.flags & self.M_USER
self.ht[key] = value._replace(flags=system_flags | user_flags)

def clear_new(self):
"""clear F_NEW flag of all items"""
for key, value in self.ht.items():
if value.flags & self.F_NEW:
flags = value.flags & ~self.F_NEW
self.ht[key] = value._replace(flags=flags)

@classmethod
def read(cls, path):
return cls(path=path)
Expand Down
21 changes: 20 additions & 1 deletion src/borg/testsuite/hashindex_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,23 @@ def test_keyerror():
with pytest.raises(KeyError):
chunks[x]
with pytest.raises(struct.error):
chunks[x] = ChunkIndexEntry(flags=2**33, size=0)
chunks[x] = ChunkIndexEntry(flags=ChunkIndex.F_NONE, size=2**33)


def test_new():
def new_chunks():
return list(chunks.iteritems(only_new=True))

chunks = ChunkIndex()
key1, value1a = H2(1), ChunkIndexEntry(flags=ChunkIndex.F_USED, size=23)
key2, value2a = H2(2), ChunkIndexEntry(flags=ChunkIndex.F_USED, size=42)
# tracking of new entries
assert new_chunks() == []
chunks[key1] = value1a
assert new_chunks() == [(key1, value1a)]
chunks.clear_new()
assert new_chunks() == []
chunks[key2] = value2a
assert new_chunks() == [(key2, value2a)]
chunks.clear_new()
assert new_chunks() == []
Loading