diff --git a/pkg/blobstore/local/old_current_new_location_blob_map.go b/pkg/blobstore/local/old_current_new_location_blob_map.go index 1579a1bb..5b42290f 100644 --- a/pkg/blobstore/local/old_current_new_location_blob_map.go +++ b/pkg/blobstore/local/old_current_new_location_blob_map.go @@ -291,6 +291,7 @@ func (lbm *OldCurrentNewLocationBlobMap) findBlockWithSpace(sizeBytes int64) (in // DataIntegrityCallback, as it's impossible to pick up a write // lock from that context. totalBlocksToBeReleased := lbm.totalBlocksToBeReleased.Load() + newBlockHasBeenReleased := false for lbm.totalBlocksReleased < totalBlocksToBeReleased { lbm.popFront() if len(lbm.oldBlocks) > 0 { @@ -299,7 +300,7 @@ func (lbm *OldCurrentNewLocationBlobMap) findBlockWithSpace(sizeBytes int64) (in lbm.currentBlocks-- } else { lbm.newBlocks-- - lbm.startAllocatingFromBlock(0) + newBlockHasBeenReleased = true } } @@ -313,6 +314,11 @@ func (lbm *OldCurrentNewLocationBlobMap) findBlockWithSpace(sizeBytes int64) (in } lbm.newBlocks++ } + // Make sure a released block is not written to. + if newBlockHasBeenReleased { + // Allocate from the first new block instead. + lbm.startAllocatingFromBlock(0) + } // Move the first "new" block(s) to "current" whenever they no // longer have enough space to fit a blob. This ensures that the diff --git a/pkg/blobstore/local/old_current_new_location_blob_map_test.go b/pkg/blobstore/local/old_current_new_location_blob_map_test.go index b0311fd0..481683f0 100644 --- a/pkg/blobstore/local/old_current_new_location_blob_map_test.go +++ b/pkg/blobstore/local/old_current_new_location_blob_map_test.go @@ -169,3 +169,81 @@ func TestOldCurrentNewLocationBlobMapDataCorruption(t *testing.T) { _, err = locationBlobPutWriter(buffer.NewBufferFromError(status.Error(codes.Unknown, "Client hung up")))() testutil.RequireEqualStatus(t, status.Error(codes.Unknown, "Client hung up"), err) } + +func TestOldCurrentNewLocationBlobMapDataCorruptionInAllBlocks(t *testing.T) { + ctrl := gomock.NewController(t) + + blockList := mock.NewMockBlockList(ctrl) + errorLogger := mock.NewMockErrorLogger(ctrl) + locationBlobMap := local.NewOldCurrentNewLocationBlobMap( + blockList, + local.NewImmutableBlockListGrowthPolicy( + /* currentBlocksCount = */ 4, + /* newBlocksCount = */ 4), + errorLogger, + "cas", + /* blockSizeBytes = */ 16, + /* oldBlocksCount = */ 2, + /* newBlocksCount = */ 4, + /* initialBlocksCount = */ 10) + + // Perform a Get() call against the new block 9. Return a buffer that + // will trigger a data integrity error in the last block, as the digest + // corresponds with "Hello", not "xyzzy". This should cause the + // all blocks to be marked for immediate release. + helloDigest := digest.MustNewDigest("example", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5) + blockList.EXPECT().Get(9, helloDigest, int64(10), int64(5), gomock.Any()).DoAndReturn( + func(blockIndex int, digest digest.Digest, offsetBytes, sizeBytes int64, dataIntegrityCallback buffer.DataIntegrityCallback) buffer.Buffer { + return buffer.NewCASBufferFromByteSlice(digest, []byte("xyzzy"), buffer.BackendProvided(dataIntegrityCallback)) + }) + errorLogger.EXPECT().Log(status.Error(codes.Internal, "Releasing 10 blocks due to a data integrity error")) + + locationBlobGetter, needsRefresh := locationBlobMap.Get(local.Location{ + BlockIndex: 9, + OffsetBytes: 10, + SizeBytes: 5, + }) + require.False(t, needsRefresh) + _, err := locationBlobGetter(helloDigest).ToByteSlice(10) + testutil.RequireEqualStatus(t, status.Error(codes.Internal, "Buffer has checksum 1271ed5ef305aadabc605b1609e24c52, while 8b1a9953c4611296a827abf8c47804d7 was expected"), err) + + // Get() is not capable of releasing blocks immediately due to + // locking constraints. Still, we should make sure that further + // reads don't end up getting sent to these blocks. + // BlockReferenceToBlockIndex() should hide the results returned + // by the underlying BlockList. + blockList.EXPECT().BlockReferenceToBlockIndex(local.BlockReference{ + EpochID: 72, + BlocksFromLast: 7, + }).Return(0, uint64(0xb8e12b9fbe428eba), true) + + _, _, found := locationBlobMap.BlockReferenceToBlockIndex(local.BlockReference{ + EpochID: 72, + BlocksFromLast: 7, + }) + require.False(t, found) + + // The next time Put() is called, we should first see that all + // the (corrupted) blocks are released. Eight blocks should be + // created, so that we continue the desired minimum number of + // blocks. + blockList.EXPECT().PopFront().Times(10) + blockList.EXPECT().PushBack().Times(8) + + blockList.EXPECT().HasSpace(0, int64(5)).Return(true).Times(2) + blockListPutWriter := mock.NewMockBlockListPutWriter(ctrl) + blockList.EXPECT().Put(0, int64(5)).Return(blockListPutWriter.Call) + blockListPutFinalizer := mock.NewMockBlockListPutFinalizer(ctrl) + blockListPutWriter.EXPECT().Call(gomock.Any()).DoAndReturn( + func(b buffer.Buffer) local.BlockListPutFinalizer { + _, err := b.ToByteSlice(10) + testutil.RequireEqualStatus(t, status.Error(codes.Unknown, "Client hung up"), err) + return blockListPutFinalizer.Call + }) + blockListPutFinalizer.EXPECT().Call().Return(int64(0), status.Error(codes.Unknown, "Client hung up")) + + locationBlobPutWriter, err := locationBlobMap.Put(5) + require.NoError(t, err) + _, err = locationBlobPutWriter(buffer.NewBufferFromError(status.Error(codes.Unknown, "Client hung up")))() + testutil.RequireEqualStatus(t, status.Error(codes.Unknown, "Client hung up"), err) +}