Skip to content

Commit

Permalink
ReferenceExpandingBlobAccess: add support for CAS references
Browse files Browse the repository at this point in the history
ReferenceExpandingBlobAccess can already forward requests to HTTP and S3
backends. This change adds support for forwarding requests to a regular
REv2 CAS. This is great when designing asset storage services that
provide direct exposition over bytestream://.
  • Loading branch information
EdSchouten committed Dec 5, 2023
1 parent c57493a commit 96218ae
Show file tree
Hide file tree
Showing 7 changed files with 613 additions and 346 deletions.
18 changes: 15 additions & 3 deletions pkg/blobstore/configuration/cas_blob_access_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (bac *casBlobAccessCreator) NewCustomBlobAccess(configuration *pb.BlobAcces
// location of a blob, not the blobs themselves. Create
// a new BlobAccessCreator to ensure data is loaded
// properly.
base, err := nestedCreator.NewNestedBlobAccess(
indirectContentAddressableStorage, err := nestedCreator.NewNestedBlobAccess(
backend.ReferenceExpanding.IndirectContentAddressableStorage,
NewICASBlobAccessCreator(
bac.grpcClientFactory,
Expand All @@ -118,6 +118,17 @@ func (bac *casBlobAccessCreator) NewCustomBlobAccess(configuration *pb.BlobAcces
return BlobAccessInfo{}, "", err
}

var contentAddressableStorage blobstore.BlobAccess
if backend.ReferenceExpanding.ContentAddressableStorage != nil {
backend, err := nestedCreator.NewNestedBlobAccess(backend.ReferenceExpanding.ContentAddressableStorage, bac)
if err != nil {
return BlobAccessInfo{}, "", err
}
contentAddressableStorage = backend.BlobAccess
} else {
contentAddressableStorage = blobstore.NewErrorBlobAccess(status.Error(codes.Unimplemented, "No Content Addressable Storage configured"))
}

awsConfig, err := aws.NewConfigFromConfiguration(backend.ReferenceExpanding.AwsSession, "S3ReferenceExpandingBlobAccess")
if err != nil {
return BlobAccessInfo{}, "", util.StatusWrap(err, "Failed to create AWS config")
Expand All @@ -143,14 +154,15 @@ func (bac *casBlobAccessCreator) NewCustomBlobAccess(configuration *pb.BlobAcces

return BlobAccessInfo{
BlobAccess: blobstore.NewReferenceExpandingBlobAccess(
base.BlobAccess,
indirectContentAddressableStorage.BlobAccess,
contentAddressableStorage,
&http.Client{
Transport: bb_http.NewMetricsRoundTripper(roundTripper, "HTTPReferenceExpandingBlobAccess"),
},
s3.NewFromConfig(awsConfig),
gcsClient,
bac.maximumMessageSizeBytes),
DigestKeyFormat: base.DigestKeyFormat,
DigestKeyFormat: indirectContentAddressableStorage.DigestKeyFormat,
}, "reference_expanding", nil
default:
return BlobAccessInfo{}, "", status.Error(codes.InvalidArgument, "Configuration did not contain a supported storage backend")
Expand Down
63 changes: 48 additions & 15 deletions pkg/blobstore/reference_expanding_blob_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import (
)

type referenceExpandingBlobAccess struct {
blobAccess BlobAccess
httpClient *http.Client
s3Client cloud_aws.S3Client
gcsClient cloud_gcp.StorageClient
maximumMessageSizeBytes int
indirectContentAddressableStorage BlobAccess
contentAddressableStorage BlobAccess
httpClient *http.Client
s3Client cloud_aws.S3Client
gcsClient cloud_gcp.StorageClient
maximumMessageSizeBytes int
}

// getHTTPRangeHeader creates a HTTP Range header based on the offset
Expand All @@ -46,19 +47,20 @@ func getHTTPRangeHeader(reference *icas.Reference) string {
// Storage (CAS) backend. Any object requested through this BlobAccess
// will cause its reference to be loaded from the ICAS, followed by
// fetching its data from the referenced location.
func NewReferenceExpandingBlobAccess(blobAccess BlobAccess, httpClient *http.Client, s3Client cloud_aws.S3Client, gcsClient cloud_gcp.StorageClient, maximumMessageSizeBytes int) BlobAccess {
func NewReferenceExpandingBlobAccess(indirectContentAddressableStorage, contentAddressableStorage BlobAccess, httpClient *http.Client, s3Client cloud_aws.S3Client, gcsClient cloud_gcp.StorageClient, maximumMessageSizeBytes int) BlobAccess {
return &referenceExpandingBlobAccess{
blobAccess: blobAccess,
httpClient: httpClient,
s3Client: s3Client,
gcsClient: gcsClient,
maximumMessageSizeBytes: maximumMessageSizeBytes,
indirectContentAddressableStorage: indirectContentAddressableStorage,
contentAddressableStorage: contentAddressableStorage,
httpClient: httpClient,
s3Client: s3Client,
gcsClient: gcsClient,
maximumMessageSizeBytes: maximumMessageSizeBytes,
}
}

func (ba *referenceExpandingBlobAccess) Get(ctx context.Context, digest digest.Digest) buffer.Buffer {
func (ba *referenceExpandingBlobAccess) Get(ctx context.Context, blobDigest digest.Digest) buffer.Buffer {
// Load reference from the ICAS.
referenceMessage, err := ba.blobAccess.Get(ctx, digest).ToProto(&icas.Reference{}, ba.maximumMessageSizeBytes)
referenceMessage, err := ba.indirectContentAddressableStorage.Get(ctx, blobDigest).ToProto(&icas.Reference{}, ba.maximumMessageSizeBytes)
if err != nil {
return buffer.NewBufferFromError(util.StatusWrap(err, "Failed to load reference"))
}
Expand Down Expand Up @@ -111,6 +113,37 @@ func (ba *referenceExpandingBlobAccess) Get(ctx context.Context, digest digest.D
if err != nil {
return buffer.NewBufferFromError(util.StatusWrap(errToStatus(err), "Google Cloud Storage request failed"))
}
case *icas.Reference_ContentAddressableStorage_:
if reference.OffsetBytes != 0 || reference.SizeBytes != 0 {
return buffer.NewBufferFromError(status.Error(codes.Unimplemented, "Partial reads are not supported by the Content Addressable Storage backend"))
}

instanceNameStr := medium.ContentAddressableStorage.InstanceName
instanceName, err := digest.NewInstanceName(instanceNameStr)
if err != nil {
return buffer.NewBufferFromError(util.StatusWrapfWithCode(err, codes.Internal, "Invalid instance name %#v", instanceNameStr))
}
digestFunctionValue := medium.ContentAddressableStorage.DigestFunction
digestFunction, err := instanceName.GetDigestFunction(digestFunctionValue, 0)
if err != nil {
return buffer.NewBufferFromError(util.StatusWrapfWithCode(err, codes.Internal, "Invalid digest function %d", digestFunctionValue))
}
referenceDigest, err := digestFunction.NewDigestFromProto(medium.ContentAddressableStorage.BlobDigest)
if err != nil {
return buffer.NewBufferFromError(util.StatusWrapWithCode(err, codes.Internal, "Invalid digest"))
}

b := ba.contentAddressableStorage.Get(ctx, referenceDigest)
if reference.Decompressor == remoteexecution.Compressor_IDENTITY {
// Optimize the fast path: if no transformations are
// being performed and the digests are identical, we
// can pass through the underlying buffer directly.
instanceNamePatcher := digest.NewInstanceNamePatcher(referenceDigest.GetInstanceName(), blobDigest.GetInstanceName())
if blobDigest == instanceNamePatcher.PatchDigest(referenceDigest) {
return b
}
}
r = b.ToReader()
default:
return buffer.NewBufferFromError(status.Error(codes.Unimplemented, "Reference uses an unsupported medium"))
}
Expand Down Expand Up @@ -154,7 +187,7 @@ func (ba *referenceExpandingBlobAccess) Get(ctx context.Context, digest digest.D
// If we wanted to support this, should we add a separate
// BlobAccess.Delete(), or maybe a mechanism to forward the
// RepairFunc from the ICAS buffer?
return buffer.NewCASBufferFromReader(digest, r, buffer.BackendProvided(buffer.Irreparable(digest)))
return buffer.NewCASBufferFromReader(blobDigest, r, buffer.BackendProvided(buffer.Irreparable(blobDigest)))
}

func (ba *referenceExpandingBlobAccess) GetFromComposite(ctx context.Context, parentDigest, childDigest digest.Digest, slicer slicing.BlobSlicer) buffer.Buffer {
Expand All @@ -171,7 +204,7 @@ func (ba *referenceExpandingBlobAccess) GetCapabilities(ctx context.Context, ins
}

func (ba *referenceExpandingBlobAccess) FindMissing(ctx context.Context, digests digest.Set) (digest.Set, error) {
return ba.blobAccess.FindMissing(ctx, digests)
return ba.indirectContentAddressableStorage.FindMissing(ctx, digests)
}

func errToStatus(err error) error {
Expand Down
Loading

0 comments on commit 96218ae

Please sign in to comment.