Skip to content

Commit

Permalink
Add replication for missing blobs in readFallbackBlobAccess (#174)
Browse files Browse the repository at this point in the history
Right now we only replicate as part of Get(). This is sufficient for the AC/ISCC/FSAC, but for the CAS we also need to do this as part of FindMissing().
  • Loading branch information
annapst authored Aug 4, 2023
1 parent 72f277c commit d2663dc
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
11 changes: 11 additions & 0 deletions pkg/blobstore/readfallback/read_fallback_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,16 @@ func (ba *readFallbackBlobAccess) FindMissing(ctx context.Context, digests diges
if err != nil {
return digest.EmptySet, util.StatusWrap(err, "Secondary")
}

// Replicate the blobs that are present only in the secondary
// backend to the primary backend.
presentOnlyInSecondary, _, _ := digest.GetDifferenceAndIntersection(missingInPrimary, missingInBoth)
if err := ba.replicator.ReplicateMultiple(ctx, presentOnlyInSecondary); err != nil {
if status.Code(err) == codes.NotFound {
return digest.EmptySet, util.StatusWrapWithCode(err, codes.Internal, "Backend secondary returned inconsistent results while synchronizing")
}
return digest.EmptySet, util.StatusWrap(err, "Failed to synchronize from backend secondary to backend primary")
}

return missingInBoth, nil
}
29 changes: 28 additions & 1 deletion pkg/blobstore/readfallback/read_fallback_blob_access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ func TestReadFallbackBlobAccessFindMissing(t *testing.T) {

primary := mock.NewMockBlobAccess(ctrl)
secondary := mock.NewMockBlobAccess(ctrl)
blobAccess := readfallback.NewReadFallbackBlobAccess(primary, secondary, nil)
replicator := mock.NewMockBlobReplicator(ctrl)
blobAccess := readfallback.NewReadFallbackBlobAccess(primary, secondary, replicator)

allDigests := digest.NewSetBuilder().
Add(digest.MustNewDigest("instance", remoteexecution.DigestFunction_MD5, "00000000000000000000000000000000", 100)).
Expand All @@ -171,6 +172,7 @@ func TestReadFallbackBlobAccessFindMissing(t *testing.T) {
Add(digest.MustNewDigest("instance", remoteexecution.DigestFunction_MD5, "00000000000000000000000000000001", 101)).
Build()
missingFromBoth := digest.MustNewDigest("instance", remoteexecution.DigestFunction_MD5, "00000000000000000000000000000000", 100).ToSingletonSet()
presentOnlyInSecondary := digest.MustNewDigest("instance", remoteexecution.DigestFunction_MD5, "00000000000000000000000000000001", 101).ToSingletonSet()

t.Run("Success", func(t *testing.T) {
// Both backends should be queried. Only the missing
Expand All @@ -181,6 +183,7 @@ func TestReadFallbackBlobAccessFindMissing(t *testing.T) {
Return(missingFromPrimary, nil)
secondary.EXPECT().FindMissing(ctx, missingFromPrimary).
Return(missingFromBoth, nil)
replicator.EXPECT().ReplicateMultiple(ctx, presentOnlyInSecondary)

missing, err := blobAccess.FindMissing(ctx, allDigests)
require.NoError(t, err)
Expand All @@ -204,4 +207,28 @@ func TestReadFallbackBlobAccessFindMissing(t *testing.T) {
_, err := blobAccess.FindMissing(ctx, allDigests)
testutil.RequireEqualStatus(t, status.Error(codes.Internal, "Secondary: I/O error"), err)
})

t.Run("ReplicateError", func(t *testing.T) {
primary.EXPECT().FindMissing(ctx, allDigests).
Return(missingFromPrimary, nil)
secondary.EXPECT().FindMissing(ctx, missingFromPrimary).
Return(missingFromBoth, nil)
replicator.EXPECT().ReplicateMultiple(ctx, presentOnlyInSecondary).
Return(status.Error(codes.Internal, "Server on fire"))

_, err := blobAccess.FindMissing(ctx, allDigests)
testutil.RequireEqualStatus(t, status.Error(codes.Internal, "Failed to synchronize from backend secondary to backend primary: Server on fire"), err)
})

t.Run("InconsistentBackendSecondary", func(t *testing.T) {
primary.EXPECT().FindMissing(ctx, allDigests).
Return(missingFromPrimary, nil)
secondary.EXPECT().FindMissing(ctx, missingFromPrimary).
Return(missingFromBoth, nil)
replicator.EXPECT().ReplicateMultiple(ctx, presentOnlyInSecondary).
Return(status.Error(codes.NotFound, "Object 00000000000000000000000000000001 not found"))

_, err := blobAccess.FindMissing(ctx, allDigests)
testutil.RequireEqualStatus(t, status.Error(codes.Internal, "Backend secondary returned inconsistent results while synchronizing: Object 00000000000000000000000000000001 not found"), err)
})
}

0 comments on commit d2663dc

Please sign in to comment.