Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash when releasing all blocks #181

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/blobstore/local/old_current_new_location_blob_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (lbm *OldCurrentNewLocationBlobMap) findBlockWithSpace(sizeBytes int64) (in
// DataIntegrityCallback, as it's impossible to pick up a write
// lock from that context.
totalBlocksToBeReleased := lbm.totalBlocksToBeReleased.Load()
newBlockHasBeenReleased := false
for lbm.totalBlocksReleased < totalBlocksToBeReleased {
lbm.popFront()
if len(lbm.oldBlocks) > 0 {
Expand All @@ -299,7 +300,7 @@ func (lbm *OldCurrentNewLocationBlobMap) findBlockWithSpace(sizeBytes int64) (in
lbm.currentBlocks--
} else {
lbm.newBlocks--
lbm.startAllocatingFromBlock(0)
newBlockHasBeenReleased = true
}
}

Expand All @@ -313,6 +314,11 @@ func (lbm *OldCurrentNewLocationBlobMap) findBlockWithSpace(sizeBytes int64) (in
}
lbm.newBlocks++
}
// Make sure a released block is not written to.
if newBlockHasBeenReleased {
// Allocate from the first new block instead.
lbm.startAllocatingFromBlock(0)
}

// Move the first "new" block(s) to "current" whenever they no
// longer have enough space to fit a blob. This ensures that the
Expand Down
78 changes: 78 additions & 0 deletions pkg/blobstore/local/old_current_new_location_blob_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,81 @@ func TestOldCurrentNewLocationBlobMapDataCorruption(t *testing.T) {
_, err = locationBlobPutWriter(buffer.NewBufferFromError(status.Error(codes.Unknown, "Client hung up")))()
testutil.RequireEqualStatus(t, status.Error(codes.Unknown, "Client hung up"), err)
}

func TestOldCurrentNewLocationBlobMapDataCorruptionInAllBlocks(t *testing.T) {
ctrl := gomock.NewController(t)

blockList := mock.NewMockBlockList(ctrl)
errorLogger := mock.NewMockErrorLogger(ctrl)
locationBlobMap := local.NewOldCurrentNewLocationBlobMap(
blockList,
local.NewImmutableBlockListGrowthPolicy(
/* currentBlocksCount = */ 4,
/* newBlocksCount = */ 4),
errorLogger,
"cas",
/* blockSizeBytes = */ 16,
/* oldBlocksCount = */ 2,
/* newBlocksCount = */ 4,
/* initialBlocksCount = */ 10)

// Perform a Get() call against the new block 9. Return a buffer that
// will trigger a data integrity error in the last block, as the digest
// corresponds with "Hello", not "xyzzy". This should cause the
// all blocks to be marked for immediate release.
helloDigest := digest.MustNewDigest("example", remoteexecution.DigestFunction_MD5, "8b1a9953c4611296a827abf8c47804d7", 5)
blockList.EXPECT().Get(9, helloDigest, int64(10), int64(5), gomock.Any()).DoAndReturn(
func(blockIndex int, digest digest.Digest, offsetBytes, sizeBytes int64, dataIntegrityCallback buffer.DataIntegrityCallback) buffer.Buffer {
return buffer.NewCASBufferFromByteSlice(digest, []byte("xyzzy"), buffer.BackendProvided(dataIntegrityCallback))
})
errorLogger.EXPECT().Log(status.Error(codes.Internal, "Releasing 10 blocks due to a data integrity error"))

locationBlobGetter, needsRefresh := locationBlobMap.Get(local.Location{
BlockIndex: 9,
OffsetBytes: 10,
SizeBytes: 5,
})
require.False(t, needsRefresh)
_, err := locationBlobGetter(helloDigest).ToByteSlice(10)
testutil.RequireEqualStatus(t, status.Error(codes.Internal, "Buffer has checksum 1271ed5ef305aadabc605b1609e24c52, while 8b1a9953c4611296a827abf8c47804d7 was expected"), err)

// Get() is not capable of releasing blocks immediately due to
// locking constraints. Still, we should make sure that further
// reads don't end up getting sent to these blocks.
// BlockReferenceToBlockIndex() should hide the results returned
// by the underlying BlockList.
blockList.EXPECT().BlockReferenceToBlockIndex(local.BlockReference{
EpochID: 72,
BlocksFromLast: 7,
}).Return(0, uint64(0xb8e12b9fbe428eba), true)

_, _, found := locationBlobMap.BlockReferenceToBlockIndex(local.BlockReference{
EpochID: 72,
BlocksFromLast: 7,
})
require.False(t, found)

// The next time Put() is called, we should first see that all
// the (corrupted) blocks are released. Eight blocks should be
// created, so that we continue the desired minimum number of
// blocks.
blockList.EXPECT().PopFront().Times(10)
blockList.EXPECT().PushBack().Times(8)

blockList.EXPECT().HasSpace(0, int64(5)).Return(true).Times(2)
blockListPutWriter := mock.NewMockBlockListPutWriter(ctrl)
blockList.EXPECT().Put(0, int64(5)).Return(blockListPutWriter.Call)
blockListPutFinalizer := mock.NewMockBlockListPutFinalizer(ctrl)
blockListPutWriter.EXPECT().Call(gomock.Any()).DoAndReturn(
func(b buffer.Buffer) local.BlockListPutFinalizer {
_, err := b.ToByteSlice(10)
testutil.RequireEqualStatus(t, status.Error(codes.Unknown, "Client hung up"), err)
return blockListPutFinalizer.Call
})
blockListPutFinalizer.EXPECT().Call().Return(int64(0), status.Error(codes.Unknown, "Client hung up"))

locationBlobPutWriter, err := locationBlobMap.Put(5)
require.NoError(t, err)
_, err = locationBlobPutWriter(buffer.NewBufferFromError(status.Error(codes.Unknown, "Client hung up")))()
testutil.RequireEqualStatus(t, status.Error(codes.Unknown, "Client hung up"), err)
}
Loading