diff --git a/pkg/blobstore/readfallback/read_fallback_blob_access.go b/pkg/blobstore/readfallback/read_fallback_blob_access.go index c8a001d2..f78fdc60 100644 --- a/pkg/blobstore/readfallback/read_fallback_blob_access.go +++ b/pkg/blobstore/readfallback/read_fallback_blob_access.go @@ -90,5 +90,16 @@ func (ba *readFallbackBlobAccess) FindMissing(ctx context.Context, digests diges if err != nil { return digest.EmptySet, util.StatusWrap(err, "Secondary") } + + // Replicate the blobs that are present only in the secondary + // backend to the primary backend. + presentOnlyInSecondary, _, _ := digest.GetDifferenceAndIntersection(missingInPrimary, missingInBoth) + if err := ba.replicator.ReplicateMultiple(ctx, presentOnlyInSecondary); err != nil { + if status.Code(err) == codes.NotFound { + return digest.EmptySet, util.StatusWrapWithCode(err, codes.Internal, "Backend secondary returned inconsistent results while synchronizing") + } + return digest.EmptySet, util.StatusWrap(err, "Failed to synchronize from backend secondary to backend primary") + } + return missingInBoth, nil } diff --git a/pkg/blobstore/readfallback/read_fallback_blob_access_test.go b/pkg/blobstore/readfallback/read_fallback_blob_access_test.go index 6302de76..0dad44ad 100644 --- a/pkg/blobstore/readfallback/read_fallback_blob_access_test.go +++ b/pkg/blobstore/readfallback/read_fallback_blob_access_test.go @@ -159,7 +159,8 @@ func TestReadFallbackBlobAccessFindMissing(t *testing.T) { primary := mock.NewMockBlobAccess(ctrl) secondary := mock.NewMockBlobAccess(ctrl) - blobAccess := readfallback.NewReadFallbackBlobAccess(primary, secondary, nil) + replicator := mock.NewMockBlobReplicator(ctrl) + blobAccess := readfallback.NewReadFallbackBlobAccess(primary, secondary, replicator) allDigests := digest.NewSetBuilder(). Add(digest.MustNewDigest("instance", remoteexecution.DigestFunction_MD5, "00000000000000000000000000000000", 100)). @@ -171,6 +172,7 @@ func TestReadFallbackBlobAccessFindMissing(t *testing.T) { Add(digest.MustNewDigest("instance", remoteexecution.DigestFunction_MD5, "00000000000000000000000000000001", 101)). Build() missingFromBoth := digest.MustNewDigest("instance", remoteexecution.DigestFunction_MD5, "00000000000000000000000000000000", 100).ToSingletonSet() + presentOnlyInSecondary := digest.MustNewDigest("instance", remoteexecution.DigestFunction_MD5, "00000000000000000000000000000001", 101).ToSingletonSet() t.Run("Success", func(t *testing.T) { // Both backends should be queried. Only the missing @@ -181,6 +183,7 @@ func TestReadFallbackBlobAccessFindMissing(t *testing.T) { Return(missingFromPrimary, nil) secondary.EXPECT().FindMissing(ctx, missingFromPrimary). Return(missingFromBoth, nil) + replicator.EXPECT().ReplicateMultiple(ctx, presentOnlyInSecondary) missing, err := blobAccess.FindMissing(ctx, allDigests) require.NoError(t, err) @@ -204,4 +207,28 @@ func TestReadFallbackBlobAccessFindMissing(t *testing.T) { _, err := blobAccess.FindMissing(ctx, allDigests) testutil.RequireEqualStatus(t, status.Error(codes.Internal, "Secondary: I/O error"), err) }) + + t.Run("ReplicateError", func(t *testing.T) { + primary.EXPECT().FindMissing(ctx, allDigests). + Return(missingFromPrimary, nil) + secondary.EXPECT().FindMissing(ctx, missingFromPrimary). + Return(missingFromBoth, nil) + replicator.EXPECT().ReplicateMultiple(ctx, presentOnlyInSecondary). + Return(status.Error(codes.Internal, "Server on fire")) + + _, err := blobAccess.FindMissing(ctx, allDigests) + testutil.RequireEqualStatus(t, status.Error(codes.Internal, "Failed to synchronize from backend secondary to backend primary: Server on fire"), err) + }) + + t.Run("InconsistentBackendSecondary", func(t *testing.T) { + primary.EXPECT().FindMissing(ctx, allDigests). + Return(missingFromPrimary, nil) + secondary.EXPECT().FindMissing(ctx, missingFromPrimary). + Return(missingFromBoth, nil) + replicator.EXPECT().ReplicateMultiple(ctx, presentOnlyInSecondary). + Return(status.Error(codes.NotFound, "Object 00000000000000000000000000000001 not found")) + + _, err := blobAccess.FindMissing(ctx, allDigests) + testutil.RequireEqualStatus(t, status.Error(codes.Internal, "Backend secondary returned inconsistent results while synchronizing: Object 00000000000000000000000000000001 not found"), err) + }) }