diff --git a/feature/s3/transfermanager/api.go b/feature/s3/transfermanager/api.go
new file mode 100644
index 00000000000..dbe430dadb4
--- /dev/null
+++ b/feature/s3/transfermanager/api.go
@@ -0,0 +1,16 @@
+package transfermanager
+
+import (
+ "context"
+
+ "github.com/aws/aws-sdk-go-v2/service/s3"
+)
+
+// S3APIClient defines an interface doing S3 client side operations for transfer manager
+type S3APIClient interface {
+ PutObject(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error)
+ UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)
+ CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)
+ CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error)
+ AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
+}
diff --git a/feature/s3/transfermanager/api_client.go b/feature/s3/transfermanager/api_client.go
new file mode 100644
index 00000000000..84ce16db425
--- /dev/null
+++ b/feature/s3/transfermanager/api_client.go
@@ -0,0 +1,51 @@
+package transfermanager
+
+import (
+ "github.com/aws/aws-sdk-go-v2/aws"
+)
+
+const userAgentKey = "s3-transfer"
+
+// defaultMaxUploadParts is the maximum allowed number of parts in a multi-part upload
+// on Amazon S3.
+const defaultMaxUploadParts = 10000
+
+// defaultPartSizeBytes is the default part size when transferring objects to/from S3
+const minPartSizeBytes = 1024 * 1024 * 8
+
+// defaultMultipartUploadThreshold is the default size threshold in bytes indicating when to use multipart upload.
+const defaultMultipartUploadThreshold = 1024 * 1024 * 16
+
+// defaultTransferConcurrency is the default number of goroutines to spin up when
+// using PutObject().
+const defaultTransferConcurrency = 5
+
+// Client provides the API client to make operations call for Amazon Simple
+// Storage Service's Transfer Manager
+// It is safe to call Client methods concurrently across goroutines.
+type Client struct {
+ options Options
+}
+
+// New returns an initialized Client from the client Options. Provide
+// more functional options to further configure the Client
+func New(s3Client S3APIClient, opts Options, optFns ...func(*Options)) *Client {
+ opts.S3 = s3Client
+ for _, fn := range optFns {
+ fn(&opts)
+ }
+
+ resolveConcurrency(&opts)
+ resolvePartSizeBytes(&opts)
+ resolveChecksumAlgorithm(&opts)
+ resolveMultipartUploadThreshold(&opts)
+
+ return &Client{
+ options: opts,
+ }
+}
+
+// NewFromConfig returns a new Client from the provided s3 config
+func NewFromConfig(s3Client S3APIClient, cfg aws.Config, optFns ...func(*Options)) *Client {
+ return New(s3Client, Options{}, optFns...)
+}
diff --git a/feature/s3/transfermanager/api_op_PutObject.go b/feature/s3/transfermanager/api_op_PutObject.go
new file mode 100644
index 00000000000..bba6b8036d3
--- /dev/null
+++ b/feature/s3/transfermanager/api_op_PutObject.go
@@ -0,0 +1,993 @@
+package transfermanager
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/aws/middleware"
+ "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/types"
+ "github.com/aws/aws-sdk-go-v2/service/s3"
+ s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
+ smithymiddleware "github.com/aws/smithy-go/middleware"
+)
+
+// A MultipartUploadError wraps a failed S3 multipart upload. An error returned
+// will satisfy this interface when a multi part upload failed to upload all
+// chucks to S3. In the case of a failure the UploadID is needed to operate on
+// the chunks, if any, which were uploaded.
+//
+// Example:
+//
+// c := transfermanager.New(client, opts)
+// output, err := c.PutObject(context.Background(), input)
+// if err != nil {
+// var multierr transfermanager.MultipartUploadError
+// if errors.As(err, &multierr) {
+// fmt.Printf("upload failure UploadID=%s, %s\n", multierr.UploadID(), multierr.Error())
+// } else {
+// fmt.Printf("upload failure, %s\n", err.Error())
+// }
+// }
+type MultipartUploadError interface {
+ error
+
+ // UploadID returns the upload id for the S3 multipart upload that failed.
+ UploadID() string
+}
+
+// A multipartUploadError wraps the upload ID of a failed s3 multipart upload.
+// Composed of BaseError for code, message, and original error
+//
+// Should be used for an error that occurred failing a S3 multipart upload,
+// and a upload ID is available.
+type multipartUploadError struct {
+ err error
+
+ // ID for multipart upload which failed.
+ uploadID string
+}
+
+// Error returns the string representation of the error.
+//
+// # See apierr.BaseError ErrorWithExtra for output format
+//
+// Satisfies the error interface.
+func (m *multipartUploadError) Error() string {
+ var extra string
+ if m.err != nil {
+ extra = fmt.Sprintf(", cause: %s", m.err.Error())
+ }
+ return fmt.Sprintf("upload multipart failed, upload id: %s%s", m.uploadID, extra)
+}
+
+// Unwrap returns the underlying error that cause the upload failure
+func (m *multipartUploadError) Unwrap() error {
+ return m.err
+}
+
+// UploadID returns the id of the S3 upload which failed.
+func (m *multipartUploadError) UploadID() string {
+ return m.uploadID
+}
+
+// PutObjectInput represents a request to the PutObject() call.
+type PutObjectInput struct {
+ // Bucket the object is uploaded into
+ Bucket string
+
+ // Object key for which the PUT action was initiated
+ Key string
+
+ // Object data
+ Body io.Reader
+
+ // The canned ACL to apply to the object. For more information, see [Canned ACL] in the Amazon
+ // S3 User Guide.
+ //
+ // When adding a new object, you can use headers to grant ACL-based permissions to
+ // individual Amazon Web Services accounts or to predefined groups defined by
+ // Amazon S3. These permissions are then added to the ACL on the object. By
+ // default, all objects are private. Only the owner has full access control. For
+ // more information, see [Access Control List (ACL) Overview]and [Managing ACLs Using the REST API] in the Amazon S3 User Guide.
+ //
+ // If the bucket that you're uploading objects to uses the bucket owner enforced
+ // setting for S3 Object Ownership, ACLs are disabled and no longer affect
+ // permissions. Buckets that use this setting only accept PUT requests that don't
+ // specify an ACL or PUT requests that specify bucket owner full control ACLs, such
+ // as the bucket-owner-full-control canned ACL or an equivalent form of this ACL
+ // expressed in the XML format. PUT requests that contain other ACLs (for example,
+ // custom grants to certain Amazon Web Services accounts) fail and return a 400
+ // error with the error code AccessControlListNotSupported . For more information,
+ // see [Controlling ownership of objects and disabling ACLs]in the Amazon S3 User Guide.
+ //
+ // - This functionality is not supported for directory buckets.
+ //
+ // - This functionality is not supported for Amazon S3 on Outposts.
+ //
+ // [Managing ACLs Using the REST API]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-using-rest-api.html
+ // [Access Control List (ACL) Overview]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html
+ // [Canned ACL]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#CannedACL
+ // [Controlling ownership of objects and disabling ACLs]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html
+ ACL types.ObjectCannedACL
+
+ // Specifies whether Amazon S3 should use an S3 Bucket Key for object encryption
+ // with server-side encryption using Key Management Service (KMS) keys (SSE-KMS).
+ // Setting this header to true causes Amazon S3 to use an S3 Bucket Key for object
+ // encryption with SSE-KMS.
+ //
+ // Specifying this header with a PUT action doesn’t affect bucket-level settings
+ // for S3 Bucket Key.
+ //
+ // This functionality is not supported for directory buckets.
+ BucketKeyEnabled bool
+
+ // Can be used to specify caching behavior along the request/reply chain. For more
+ // information, see [http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9].
+ //
+ // [http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9]: http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9
+ CacheControl string
+
+ // Indicates the algorithm used to create the checksum for the object when you use
+ // the SDK. This header will not provide any additional functionality if you don't
+ // use the SDK. When you send this header, there must be a corresponding
+ // x-amz-checksum-algorithm or x-amz-trailer header sent. Otherwise, Amazon S3
+ // fails the request with the HTTP status code 400 Bad Request .
+ //
+ // For the x-amz-checksum-algorithm header, replace algorithm with the
+ // supported algorithm from the following list:
+ //
+ // - CRC32
+ //
+ // - CRC32C
+ //
+ // - SHA1
+ //
+ // - SHA256
+ //
+ // For more information, see [Checking object integrity] in the Amazon S3 User Guide.
+ //
+ // If the individual checksum value you provide through x-amz-checksum-algorithm
+ // doesn't match the checksum algorithm you set through
+ // x-amz-sdk-checksum-algorithm , Amazon S3 ignores any provided ChecksumAlgorithm
+ // parameter and uses the checksum algorithm that matches the provided value in
+ // x-amz-checksum-algorithm .
+ //
+ // For directory buckets, when you use Amazon Web Services SDKs, CRC32 is the
+ // default checksum algorithm that's used for performance.
+ //
+ // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
+ ChecksumAlgorithm types.ChecksumAlgorithm
+
+ // Size of the body in bytes. This parameter is useful when the size of the body
+ // cannot be determined automatically. For more information, see [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length].
+ //
+ // [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length]: https://www.rfc-editor.org/rfc/rfc9110.html#name-content-length
+ ContentLength int64
+
+ // Specifies presentational information for the object. For more information, see [https://www.rfc-editor.org/rfc/rfc6266#section-4].
+ //
+ // [https://www.rfc-editor.org/rfc/rfc6266#section-4]: https://www.rfc-editor.org/rfc/rfc6266#section-4
+ ContentDisposition string
+
+ // Specifies what content encodings have been applied to the object and thus what
+ // decoding mechanisms must be applied to obtain the media-type referenced by the
+ // Content-Type header field. For more information, see [https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding].
+ //
+ // [https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding]: https://www.rfc-editor.org/rfc/rfc9110.html#field.content-encoding
+ ContentEncoding string
+
+ // The language the content is in.
+ ContentLanguage string
+
+ // A standard MIME type describing the format of the contents. For more
+ // information, see [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type].
+ //
+ // [https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type]: https://www.rfc-editor.org/rfc/rfc9110.html#name-content-type
+ ContentType string
+
+ // The account ID of the expected bucket owner. If the account ID that you provide
+ // does not match the actual owner of the bucket, the request fails with the HTTP
+ // status code 403 Forbidden (access denied).
+ ExpectedBucketOwner string
+
+ // The date and time at which the object is no longer cacheable. For more
+ // information, see [https://www.rfc-editor.org/rfc/rfc7234#section-5.3].
+ //
+ // [https://www.rfc-editor.org/rfc/rfc7234#section-5.3]: https://www.rfc-editor.org/rfc/rfc7234#section-5.3
+ Expires time.Time
+
+ // Gives the grantee READ, READ_ACP, and WRITE_ACP permissions on the object.
+ //
+ // - This functionality is not supported for directory buckets.
+ //
+ // - This functionality is not supported for Amazon S3 on Outposts.
+ GrantFullControl string
+
+ // Allows grantee to read the object data and its metadata.
+ //
+ // - This functionality is not supported for directory buckets.
+ //
+ // - This functionality is not supported for Amazon S3 on Outposts.
+ GrantRead string
+
+ // Allows grantee to read the object ACL.
+ //
+ // - This functionality is not supported for directory buckets.
+ //
+ // - This functionality is not supported for Amazon S3 on Outposts.
+ GrantReadACP string
+
+ // Allows grantee to write the ACL for the applicable object.
+ //
+ // - This functionality is not supported for directory buckets.
+ //
+ // - This functionality is not supported for Amazon S3 on Outposts.
+ GrantWriteACP string
+
+ // A map of metadata to store with the object in S3.
+ Metadata map[string]string
+
+ // Specifies whether a legal hold will be applied to this object. For more
+ // information about S3 Object Lock, see [Object Lock]in the Amazon S3 User Guide.
+ //
+ // This functionality is not supported for directory buckets.
+ //
+ // [Object Lock]: https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock.html
+ ObjectLockLegalHoldStatus types.ObjectLockLegalHoldStatus
+
+ // The Object Lock mode that you want to apply to this object.
+ //
+ // This functionality is not supported for directory buckets.
+ ObjectLockMode types.ObjectLockMode
+
+ // The date and time when you want this object's Object Lock to expire. Must be
+ // formatted as a timestamp parameter.
+ //
+ // This functionality is not supported for directory buckets.
+ ObjectLockRetainUntilDate time.Time
+
+ // Confirms that the requester knows that they will be charged for the request.
+ // Bucket owners need not specify this parameter in their requests. If either the
+ // source or destination S3 bucket has Requester Pays enabled, the requester will
+ // pay for corresponding charges to copy the object. For information about
+ // downloading objects from Requester Pays buckets, see [Downloading Objects in Requester Pays Buckets]in the Amazon S3 User
+ // Guide.
+ //
+ // This functionality is not supported for directory buckets.
+ //
+ // [Downloading Objects in Requester Pays Buckets]: https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectsinRequesterPaysBuckets.html
+ RequestPayer types.RequestPayer
+
+ // Specifies the algorithm to use when encrypting the object (for example, AES256 ).
+ //
+ // This functionality is not supported for directory buckets.
+ SSECustomerAlgorithm string
+
+ // Specifies the customer-provided encryption key for Amazon S3 to use in
+ // encrypting data. This value is used to store the object and then it is
+ // discarded; Amazon S3 does not store the encryption key. The key must be
+ // appropriate for use with the algorithm specified in the
+ // x-amz-server-side-encryption-customer-algorithm header.
+ //
+ // This functionality is not supported for directory buckets.
+ SSECustomerKey string
+
+ // Specifies the Amazon Web Services KMS Encryption Context to use for object
+ // encryption. The value of this header is a base64-encoded UTF-8 string holding
+ // JSON with the encryption context key-value pairs. This value is stored as object
+ // metadata and automatically gets passed on to Amazon Web Services KMS for future
+ // GetObject or CopyObject operations on this object. This value must be
+ // explicitly added during CopyObject operations.
+ //
+ // This functionality is not supported for directory buckets.
+ SSEKMSEncryptionContext string
+
+ // If x-amz-server-side-encryption has a valid value of aws:kms or aws:kms:dsse ,
+ // this header specifies the ID (Key ID, Key ARN, or Key Alias) of the Key
+ // Management Service (KMS) symmetric encryption customer managed key that was used
+ // for the object. If you specify x-amz-server-side-encryption:aws:kms or
+ // x-amz-server-side-encryption:aws:kms:dsse , but do not provide
+ // x-amz-server-side-encryption-aws-kms-key-id , Amazon S3 uses the Amazon Web
+ // Services managed key ( aws/s3 ) to protect the data. If the KMS key does not
+ // exist in the same account that's issuing the command, you must use the full ARN
+ // and not just the ID.
+ //
+ // This functionality is not supported for directory buckets.
+ SSEKMSKeyID string
+
+ // The server-side encryption algorithm that was used when you store this object
+ // in Amazon S3 (for example, AES256 , aws:kms , aws:kms:dsse ).
+ //
+ // General purpose buckets - You have four mutually exclusive options to protect
+ // data using server-side encryption in Amazon S3, depending on how you choose to
+ // manage the encryption keys. Specifically, the encryption key options are Amazon
+ // S3 managed keys (SSE-S3), Amazon Web Services KMS keys (SSE-KMS or DSSE-KMS),
+ // and customer-provided keys (SSE-C). Amazon S3 encrypts data with server-side
+ // encryption by using Amazon S3 managed keys (SSE-S3) by default. You can
+ // optionally tell Amazon S3 to encrypt data at rest by using server-side
+ // encryption with other key options. For more information, see [Using Server-Side Encryption]in the Amazon S3
+ // User Guide.
+ //
+ // Directory buckets - For directory buckets, only the server-side encryption with
+ // Amazon S3 managed keys (SSE-S3) ( AES256 ) value is supported.
+ //
+ // [Using Server-Side Encryption]: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
+ ServerSideEncryption types.ServerSideEncryption
+
+ // By default, Amazon S3 uses the STANDARD Storage Class to store newly created
+ // objects. The STANDARD storage class provides high durability and high
+ // availability. Depending on performance needs, you can specify a different
+ // Storage Class. For more information, see [Storage Classes]in the Amazon S3 User Guide.
+ //
+ // - For directory buckets, only the S3 Express One Zone storage class is
+ // supported to store newly created objects.
+ //
+ // - Amazon S3 on Outposts only uses the OUTPOSTS Storage Class.
+ //
+ // [Storage Classes]: https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html
+ StorageClass types.StorageClass
+
+ // The tag-set for the object. The tag-set must be encoded as URL Query
+ // parameters. (For example, "Key1=Value1")
+ //
+ // This functionality is not supported for directory buckets.
+ Tagging string
+
+ // If the bucket is configured as a website, redirects requests for this object to
+ // another object in the same bucket or to an external URL. Amazon S3 stores the
+ // value of this header in the object metadata. For information about object
+ // metadata, see [Object Key and Metadata]in the Amazon S3 User Guide.
+ //
+ // In the following example, the request header sets the redirect to an object
+ // (anotherPage.html) in the same bucket:
+ //
+ // x-amz-website-redirect-location: /anotherPage.html
+ //
+ // In the following example, the request header sets the object redirect to
+ // another website:
+ //
+ // x-amz-website-redirect-location: http://www.example.com/
+ //
+ // For more information about website hosting in Amazon S3, see [Hosting Websites on Amazon S3] and [How to Configure Website Page Redirects] in the
+ // Amazon S3 User Guide.
+ //
+ // This functionality is not supported for directory buckets.
+ //
+ // [How to Configure Website Page Redirects]: https://docs.aws.amazon.com/AmazonS3/latest/dev/how-to-page-redirect.html
+ // [Hosting Websites on Amazon S3]: https://docs.aws.amazon.com/AmazonS3/latest/dev/WebsiteHosting.html
+ // [Object Key and Metadata]: https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html
+ WebsiteRedirectLocation string
+}
+
+// map non-zero string to *string
+func nzstring(v string) *string {
+ if v == "" {
+ return nil
+ }
+ return aws.String(v)
+}
+
+// map non-zero Time to *Time
+func nztime(t time.Time) *time.Time {
+ if t.IsZero() {
+ return nil
+ }
+ return aws.Time(t)
+}
+
+func (i PutObjectInput) mapSingleUploadInput(body io.Reader, checksumAlgorithm types.ChecksumAlgorithm) *s3.PutObjectInput {
+ input := &s3.PutObjectInput{
+ Bucket: aws.String(i.Bucket),
+ Key: aws.String(i.Key),
+ Body: body,
+ }
+ if i.ACL != "" {
+ input.ACL = s3types.ObjectCannedACL(i.ACL)
+ }
+ if i.ChecksumAlgorithm != "" {
+ input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm)
+ } else {
+ input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(checksumAlgorithm)
+ }
+ if i.ObjectLockLegalHoldStatus != "" {
+ input.ObjectLockLegalHoldStatus = s3types.ObjectLockLegalHoldStatus(i.ObjectLockLegalHoldStatus)
+ }
+ if i.ObjectLockMode != "" {
+ input.ObjectLockMode = s3types.ObjectLockMode(i.ObjectLockMode)
+ }
+ if i.RequestPayer != "" {
+ input.RequestPayer = s3types.RequestPayer(i.RequestPayer)
+ }
+ if i.ServerSideEncryption != "" {
+ input.ServerSideEncryption = s3types.ServerSideEncryption(i.ServerSideEncryption)
+ }
+ if i.StorageClass != "" {
+ input.StorageClass = s3types.StorageClass(i.StorageClass)
+ }
+ input.BucketKeyEnabled = aws.Bool(i.BucketKeyEnabled)
+ input.CacheControl = nzstring(i.CacheControl)
+ input.ContentDisposition = nzstring(i.ContentDisposition)
+ input.ContentEncoding = nzstring(i.ContentEncoding)
+ input.ContentLanguage = nzstring(i.ContentLanguage)
+ input.ContentType = nzstring(i.ContentType)
+ input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner)
+ input.GrantFullControl = nzstring(i.GrantFullControl)
+ input.GrantRead = nzstring(i.GrantRead)
+ input.GrantReadACP = nzstring(i.GrantReadACP)
+ input.GrantWriteACP = nzstring(i.GrantWriteACP)
+ input.Metadata = i.Metadata
+ input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm)
+ input.SSECustomerKey = nzstring(i.SSECustomerKey)
+ input.SSEKMSEncryptionContext = nzstring(i.SSEKMSEncryptionContext)
+ input.SSEKMSKeyId = nzstring(i.SSEKMSKeyID)
+ input.Tagging = nzstring(i.Tagging)
+ input.WebsiteRedirectLocation = nzstring(i.WebsiteRedirectLocation)
+ input.Expires = nztime(i.Expires)
+ input.ObjectLockRetainUntilDate = nztime(i.ObjectLockRetainUntilDate)
+ return input
+}
+
+func (i PutObjectInput) mapCreateMultipartUploadInput() *s3.CreateMultipartUploadInput {
+ input := &s3.CreateMultipartUploadInput{
+ Bucket: aws.String(i.Bucket),
+ Key: aws.String(i.Key),
+ }
+ if i.ACL != "" {
+ input.ACL = s3types.ObjectCannedACL(i.ACL)
+ }
+ if i.ChecksumAlgorithm != "" {
+ input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm)
+ } else {
+ input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm)
+ }
+ if i.ObjectLockLegalHoldStatus != "" {
+ input.ObjectLockLegalHoldStatus = s3types.ObjectLockLegalHoldStatus(i.ObjectLockLegalHoldStatus)
+ }
+ if i.ObjectLockMode != "" {
+ input.ObjectLockMode = s3types.ObjectLockMode(i.ObjectLockMode)
+ }
+ if i.RequestPayer != "" {
+ input.RequestPayer = s3types.RequestPayer(i.RequestPayer)
+ }
+ if i.ServerSideEncryption != "" {
+ input.ServerSideEncryption = s3types.ServerSideEncryption(i.ServerSideEncryption)
+ }
+ if i.StorageClass != "" {
+ input.StorageClass = s3types.StorageClass(i.StorageClass)
+ }
+ input.BucketKeyEnabled = aws.Bool(i.BucketKeyEnabled)
+ input.CacheControl = nzstring(i.CacheControl)
+ input.ContentDisposition = nzstring(i.ContentDisposition)
+ input.ContentEncoding = nzstring(i.ContentEncoding)
+ input.ContentLanguage = nzstring(i.ContentLanguage)
+ input.ContentType = nzstring(i.ContentType)
+ input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner)
+ input.GrantFullControl = nzstring(i.GrantFullControl)
+ input.GrantRead = nzstring(i.GrantRead)
+ input.GrantReadACP = nzstring(i.GrantReadACP)
+ input.GrantWriteACP = nzstring(i.GrantWriteACP)
+ input.Metadata = i.Metadata
+ input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm)
+ input.SSECustomerKey = nzstring(i.SSECustomerKey)
+ input.SSEKMSEncryptionContext = nzstring(i.SSEKMSEncryptionContext)
+ input.SSEKMSKeyId = nzstring(i.SSEKMSKeyID)
+ input.Tagging = nzstring(i.Tagging)
+ input.WebsiteRedirectLocation = nzstring(i.WebsiteRedirectLocation)
+ input.Expires = nztime(i.Expires)
+ input.ObjectLockRetainUntilDate = nztime(i.ObjectLockRetainUntilDate)
+ return input
+}
+
+func (i PutObjectInput) mapCompleteMultipartUploadInput(uploadID *string, completedParts completedParts) *s3.CompleteMultipartUploadInput {
+ input := &s3.CompleteMultipartUploadInput{
+ Bucket: aws.String(i.Bucket),
+ Key: aws.String(i.Key),
+ UploadId: uploadID,
+ }
+ if i.RequestPayer != "" {
+ input.RequestPayer = s3types.RequestPayer(i.RequestPayer)
+ }
+ input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner)
+ input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm)
+ input.SSECustomerKey = nzstring(i.SSECustomerKey)
+ var parts []s3types.CompletedPart
+ for _, part := range completedParts {
+ parts = append(parts, part.MapCompletedPart())
+ }
+ if parts != nil {
+ input.MultipartUpload = &s3types.CompletedMultipartUpload{Parts: parts}
+ }
+ return input
+}
+
+func (i PutObjectInput) mapUploadPartInput(body io.Reader, partNum *int32, uploadID *string) *s3.UploadPartInput {
+ input := &s3.UploadPartInput{
+ Bucket: aws.String(i.Bucket),
+ Key: aws.String(i.Key),
+ Body: body,
+ PartNumber: partNum,
+ UploadId: uploadID,
+ }
+ if i.ChecksumAlgorithm != "" {
+ input.ChecksumAlgorithm = s3types.ChecksumAlgorithm(i.ChecksumAlgorithm)
+ }
+ if i.RequestPayer != "" {
+ input.RequestPayer = s3types.RequestPayer(i.RequestPayer)
+ }
+ input.ExpectedBucketOwner = nzstring(i.ExpectedBucketOwner)
+ input.SSECustomerAlgorithm = nzstring(i.SSECustomerAlgorithm)
+ input.SSECustomerKey = nzstring(i.SSECustomerKey)
+ return input
+}
+
+func (i *PutObjectInput) mapAbortMultipartUploadInput(uploadID *string) *s3.AbortMultipartUploadInput {
+ input := &s3.AbortMultipartUploadInput{
+ Bucket: aws.String(i.Bucket),
+ Key: aws.String(i.Key),
+ UploadId: uploadID,
+ }
+ return input
+}
+
+// PutObjectOutput represents a response from the Upload() call.
+type PutObjectOutput struct {
+ // The ID for a multipart upload to S3. In the case of an error the error
+ // can be cast to the MultiUploadFailure interface to extract the upload ID.
+ // Will be empty string if multipart upload was not used, and the object
+ // was uploaded as a single PutObject call.
+ UploadID string
+
+ // The list of parts that were uploaded and their checksums. Will be empty
+ // if multipart upload was not used, and the object was uploaded as a
+ // single PutObject call.
+ CompletedParts []types.CompletedPart
+
+ // Indicates whether the uploaded object uses an S3 Bucket Key for server-side
+ // encryption with Amazon Web Services KMS (SSE-KMS).
+ BucketKeyEnabled bool
+
+ // The base64-encoded, 32-bit CRC32 checksum of the object.
+ ChecksumCRC32 string
+
+ // The base64-encoded, 32-bit CRC32C checksum of the object.
+ ChecksumCRC32C string
+
+ // The base64-encoded, 160-bit SHA-1 digest of the object.
+ ChecksumSHA1 string
+
+ // The base64-encoded, 256-bit SHA-256 digest of the object.
+ ChecksumSHA256 string
+
+ // Entity tag for the uploaded object.
+ ETag string
+
+ // If the object expiration is configured, this will contain the expiration date
+ // (expiry-date) and rule ID (rule-id). The value of rule-id is URL encoded.
+ Expiration string
+
+ // The bucket where the newly created object is put
+ Bucket string
+
+ // The object key of the newly created object.
+ Key string
+
+ // If present, indicates that the requester was successfully charged for the
+ // request.
+ RequestCharged types.RequestCharged
+
+ // If present, specifies the ID of the Amazon Web Services Key Management Service
+ // (Amazon Web Services KMS) symmetric customer managed customer master key (CMK)
+ // that was used for the object.
+ SSEKMSKeyID string
+
+ // If you specified server-side encryption either with an Amazon S3-managed
+ // encryption key or an Amazon Web Services KMS customer master key (CMK) in your
+ // initiate multipart upload request, the response includes this header. It
+ // confirms the encryption algorithm that Amazon S3 used to encrypt the object.
+ ServerSideEncryption types.ServerSideEncryption
+
+ // The version of the object that was uploaded. Will only be populated if
+ // the S3 Bucket is versioned. If the bucket is not versioned this field
+ // will not be set.
+ VersionID string
+
+ // Metadata pertaining to the operation's result.
+ ResultMetadata smithymiddleware.Metadata
+}
+
+func (o *PutObjectOutput) mapFromPutObjectOutput(out *s3.PutObjectOutput, bucket, key string) {
+ o.BucketKeyEnabled = aws.ToBool(out.BucketKeyEnabled)
+ o.ChecksumCRC32 = aws.ToString(out.ChecksumCRC32)
+ o.ChecksumCRC32C = aws.ToString(out.ChecksumCRC32C)
+ o.ChecksumSHA1 = aws.ToString(out.ChecksumSHA1)
+ o.ChecksumSHA256 = aws.ToString(out.ChecksumSHA256)
+ o.ETag = aws.ToString(out.ETag)
+ o.Expiration = aws.ToString(out.Expiration)
+ o.Bucket = bucket
+ o.Key = key
+ o.RequestCharged = types.RequestCharged(out.RequestCharged)
+ o.SSEKMSKeyID = aws.ToString(out.SSEKMSKeyId)
+ o.ServerSideEncryption = types.ServerSideEncryption(out.ServerSideEncryption)
+ o.VersionID = aws.ToString(out.VersionId)
+ o.ResultMetadata = out.ResultMetadata.Clone()
+}
+
+func (o *PutObjectOutput) mapFromCompleteMultipartUploadOutput(out *s3.CompleteMultipartUploadOutput, bucket, uploadID string, completedParts completedParts) {
+ o.UploadID = uploadID
+ o.CompletedParts = completedParts
+ o.BucketKeyEnabled = aws.ToBool(out.BucketKeyEnabled)
+ o.ChecksumCRC32 = aws.ToString(out.ChecksumCRC32)
+ o.ChecksumCRC32C = aws.ToString(out.ChecksumCRC32C)
+ o.ChecksumSHA1 = aws.ToString(out.ChecksumSHA1)
+ o.ChecksumSHA256 = aws.ToString(out.ChecksumSHA256)
+ o.ETag = aws.ToString(out.ETag)
+ o.Expiration = aws.ToString(out.Expiration)
+ o.Bucket = bucket
+ o.Key = aws.ToString(out.Key)
+ o.RequestCharged = types.RequestCharged(out.RequestCharged)
+ o.SSEKMSKeyID = aws.ToString(out.SSEKMSKeyId)
+ o.ServerSideEncryption = types.ServerSideEncryption(out.ServerSideEncryption)
+ o.VersionID = aws.ToString(out.VersionId)
+ o.ResultMetadata = out.ResultMetadata
+}
+
+// PutObject uploads an object to S3, intelligently buffering large
+// files into smaller chunks and sending them in parallel across multiple
+// goroutines. You can configure the chunk size and concurrency through the
+// Options parameters.
+//
+// Additional functional options can be provided to configure the individual
+// upload. These options are copies of the original Options instance, the client of which PutObject is called from.
+// Modifying the options will not impact the original Client and Options instance.
+func (c *Client) PutObject(ctx context.Context, input *PutObjectInput, opts ...func(*Options)) (*PutObjectOutput, error) {
+ i := uploader{in: input, options: c.options.Copy()}
+ for _, opt := range opts {
+ opt(&i.options)
+ }
+
+ return i.upload(ctx)
+}
+
+type uploader struct {
+ options Options
+
+ in *PutObjectInput
+
+ // PartPool allows for the re-usage of streaming payload part buffers between upload calls
+ partPool bytesBufferPool
+}
+
+func (u *uploader) upload(ctx context.Context) (*PutObjectOutput, error) {
+ if err := u.init(); err != nil {
+ return nil, fmt.Errorf("unable to initialize upload: %w", err)
+ }
+ defer u.partPool.Close()
+
+ clientOptions := []func(o *s3.Options){
+ func(o *s3.Options) {
+ o.APIOptions = append(o.APIOptions,
+ middleware.AddSDKAgentKey(middleware.FeatureMetadata, userAgentKey),
+ addFeatureUserAgent,
+ )
+ }}
+
+ r, _, cleanUp, err := u.nextReader(ctx)
+
+ if err == io.EOF {
+ return u.singleUpload(ctx, r, cleanUp, clientOptions...)
+ } else if err != nil {
+ cleanUp()
+ return nil, err
+ }
+
+ mu := multiUploader{
+ uploader: u,
+ }
+ return mu.upload(ctx, r, cleanUp, clientOptions...)
+}
+
+func (u *uploader) init() error {
+ if err := u.initSize(); err != nil {
+ return err
+ }
+ u.partPool = newDefaultSlicePool(u.options.PartSizeBytes, u.options.Concurrency+1)
+
+ return nil
+}
+
+// initSize checks user configured partsize and up-size it if calculated part count exceeds max value
+func (u *uploader) initSize() error {
+ if u.options.PartSizeBytes < minPartSizeBytes {
+ return fmt.Errorf("part size must be at least %d bytes", minPartSizeBytes)
+ }
+
+ var bodySize int64
+ switch r := u.in.Body.(type) {
+ case io.Seeker:
+ n, err := types.SeekerLen(r)
+ if err != nil {
+ return err
+ }
+ bodySize = n
+ default:
+ if l := u.in.ContentLength; l > 0 {
+ bodySize = l
+ }
+ }
+
+ // Try to adjust partSize if it is too small and account for
+ // integer division truncation.
+ if bodySize/u.options.PartSizeBytes >= int64(defaultMaxUploadParts) {
+ // Add one to the part size to account for remainders
+ // during the size calculation. e.g odd number of bytes.
+ u.options.PartSizeBytes = (bodySize / int64(defaultMaxUploadParts)) + 1
+ }
+ return nil
+}
+
+func (u *uploader) singleUpload(ctx context.Context, r io.Reader, cleanUp func(), clientOptions ...func(*s3.Options)) (*PutObjectOutput, error) {
+ defer cleanUp()
+
+ params := u.in.mapSingleUploadInput(r, u.options.ChecksumAlgorithm)
+
+ out, err := u.options.S3.PutObject(ctx, params, clientOptions...)
+ if err != nil {
+ return nil, err
+ }
+
+ var output PutObjectOutput
+ output.mapFromPutObjectOutput(out, u.in.Bucket, u.in.Key)
+ return &output, nil
+}
+
+// nextReader reads the next chunk of data from input Body
+func (u *uploader) nextReader(ctx context.Context) (io.Reader, int, func(), error) {
+ part, err := u.partPool.Get(ctx)
+ if err != nil {
+ return nil, 0, func() {}, err
+ }
+
+ n, err := readFillBuf(u.in.Body, part)
+
+ cleanup := func() {
+ u.partPool.Put(part)
+ }
+ return bytes.NewReader(part[0:n]), n, cleanup, err
+}
+
+func readFillBuf(r io.Reader, b []byte) (offset int, err error) {
+ for offset < len(b) && err == nil {
+ var n int
+ n, err = r.Read(b[offset:])
+ offset += n
+ }
+ return offset, err
+}
+
+type multiUploader struct {
+ *uploader
+ wg sync.WaitGroup
+ m sync.Mutex
+ err error
+ uploadID *string
+ parts completedParts
+}
+
+type ulChunk struct {
+ buf io.Reader
+ partNum *int32
+ cleanup func()
+}
+
+type completedParts []types.CompletedPart
+
+func (cp completedParts) Len() int {
+ return len(cp)
+}
+
+func (cp completedParts) Less(i, j int) bool {
+ return aws.ToInt32(cp[i].PartNumber) < aws.ToInt32(cp[j].PartNumber)
+}
+
+func (cp completedParts) Swap(i, j int) {
+ cp[i], cp[j] = cp[j], cp[i]
+}
+
+// upload will perform a multipart upload using the firstBuf buffer containing
+// the first chunk of data.
+func (u *multiUploader) upload(ctx context.Context, firstBuf io.Reader, cleanup func(), clientOptions ...func(*s3.Options)) (*PutObjectOutput, error) {
+ params := u.uploader.in.mapCreateMultipartUploadInput()
+
+ // Create a multipart
+ resp, err := u.uploader.options.S3.CreateMultipartUpload(ctx, params, clientOptions...)
+ if err != nil {
+ cleanup()
+ return nil, err
+ }
+ u.uploadID = resp.UploadId
+
+ ch := make(chan ulChunk, u.options.Concurrency)
+ for i := 0; i < u.options.Concurrency; i++ {
+ // launch workers
+ u.wg.Add(1)
+ go u.readChunk(ctx, ch, clientOptions...)
+ }
+
+ var partNum int32 = 1
+ ch <- ulChunk{buf: firstBuf, partNum: aws.Int32(partNum), cleanup: cleanup}
+ for u.geterr() == nil && err == nil {
+ partNum++
+ var (
+ data io.Reader
+ nextChunkLen int
+ ok bool
+ )
+ data, nextChunkLen, cleanup, err = u.nextReader(ctx)
+ ok, err = u.shouldContinue(partNum, nextChunkLen, err)
+ if !ok {
+ cleanup()
+ if err != nil {
+ u.seterr(err)
+ }
+ break
+ }
+
+ ch <- ulChunk{buf: data, partNum: aws.Int32(partNum), cleanup: cleanup}
+ }
+
+ // close the channel, wait for workers and complete upload
+ close(ch)
+ u.wg.Wait()
+ completeOut := u.complete(ctx, clientOptions...)
+
+ if err := u.geterr(); err != nil {
+ return nil, &multipartUploadError{
+ err: err,
+ uploadID: *u.uploadID,
+ }
+ }
+
+ var out PutObjectOutput
+ out.mapFromCompleteMultipartUploadOutput(completeOut, aws.ToString(params.Bucket), aws.ToString(u.uploadID), u.parts)
+ return &out, nil
+}
+
+func (u *multiUploader) shouldContinue(part int32, nextChunkLen int, err error) (bool, error) {
+ if err != nil && err != io.EOF {
+ return false, fmt.Errorf("read multipart upload data failed, %w", err)
+ }
+
+ if nextChunkLen == 0 {
+ // No need to upload empty part, if file was empty to start
+ // with empty single part would of been created and never
+ // started multipart upload.
+ return false, nil
+ }
+
+ // This upload exceeded maximum number of supported parts, error now.
+ if part > defaultMaxUploadParts {
+ return false, fmt.Errorf(fmt.Sprintf("exceeded total allowed S3 limit MaxUploadParts (%d). Adjust PartSize to fit in this limit", defaultMaxUploadParts))
+ }
+
+ return true, err
+}
+
+// readChunk runs in worker goroutines to pull chunks off of the ch channel
+// and send() them as UploadPart requests.
+func (u *multiUploader) readChunk(ctx context.Context, ch chan ulChunk, clientOptions ...func(*s3.Options)) {
+ defer u.wg.Done()
+ for {
+ data, ok := <-ch
+
+ if !ok {
+ break
+ }
+
+ if u.geterr() == nil {
+ if err := u.send(ctx, data, clientOptions...); err != nil {
+ u.seterr(err)
+ }
+ }
+
+ data.cleanup()
+ }
+}
+
+// send performs an UploadPart request and keeps track of the completed
+// part information.
+func (u *multiUploader) send(ctx context.Context, c ulChunk, clientOptions ...func(*s3.Options)) error {
+ params := u.in.mapUploadPartInput(c.buf, c.partNum, u.uploadID)
+ resp, err := u.options.S3.UploadPart(ctx, params, clientOptions...)
+ if err != nil {
+ return err
+ }
+
+ var completed types.CompletedPart
+ completed.MapFrom(resp, c.partNum)
+
+ u.m.Lock()
+ u.parts = append(u.parts, completed)
+ u.m.Unlock()
+
+ return nil
+}
+
+// geterr is a thread-safe getter for the error object
+func (u *multiUploader) geterr() error {
+ u.m.Lock()
+ defer u.m.Unlock()
+
+ return u.err
+}
+
+// seterr is a thread-safe setter for the error object
+func (u *multiUploader) seterr(e error) {
+ u.m.Lock()
+ defer u.m.Unlock()
+
+ u.err = e
+}
+
+// fail will abort the multipart unless LeavePartsOnError is set to true.
+func (u *multiUploader) fail(ctx context.Context, clientOptions ...func(*s3.Options)) {
+ params := u.in.mapAbortMultipartUploadInput(u.uploadID)
+ _, err := u.options.S3.AbortMultipartUpload(ctx, params, clientOptions...)
+ if err != nil {
+ u.seterr(fmt.Errorf("failed to abort multipart upload (%v), triggered after multipart upload failed: %v", err, u.geterr()))
+ }
+}
+
+// complete successfully completes a multipart upload and returns the response.
+func (u *multiUploader) complete(ctx context.Context, clientOptions ...func(*s3.Options)) *s3.CompleteMultipartUploadOutput {
+ if u.geterr() != nil {
+ u.fail(ctx)
+ return nil
+ }
+
+ // Parts must be sorted in PartNumber order.
+ sort.Sort(u.parts)
+
+ params := u.in.mapCompleteMultipartUploadInput(u.uploadID, u.parts)
+
+ resp, err := u.options.S3.CompleteMultipartUpload(ctx, params, clientOptions...)
+ if err != nil {
+ u.seterr(err)
+ u.fail(ctx)
+ }
+
+ return resp
+}
+
+func addFeatureUserAgent(stack *smithymiddleware.Stack) error {
+ ua, err := getOrAddRequestUserAgent(stack)
+ if err != nil {
+ return err
+ }
+
+ ua.AddUserAgentFeature(middleware.UserAgentFeatureS3Transfer)
+ return nil
+}
+
+func getOrAddRequestUserAgent(stack *smithymiddleware.Stack) (*middleware.RequestUserAgent, error) {
+ id := (*middleware.RequestUserAgent)(nil).ID()
+ mw, ok := stack.Build.Get(id)
+ if !ok {
+ mw = middleware.NewRequestUserAgent()
+ if err := stack.Build.Add(mw, smithymiddleware.After); err != nil {
+ return nil, err
+ }
+ }
+
+ ua, ok := mw.(*middleware.RequestUserAgent)
+ if !ok {
+ return nil, fmt.Errorf("%T for %s middleware did not match expected type", mw, id)
+ }
+
+ return ua, nil
+}
diff --git a/feature/s3/transfermanager/go.mod b/feature/s3/transfermanager/go.mod
new file mode 100644
index 00000000000..066daeb30ed
--- /dev/null
+++ b/feature/s3/transfermanager/go.mod
@@ -0,0 +1,27 @@
+module github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager
+
+go 1.20
+
+require (
+ github.com/aws/aws-sdk-go-v2 v1.30.3
+ github.com/aws/aws-sdk-go-v2/config v1.27.27
+ github.com/aws/aws-sdk-go-v2/service/s3 v1.56.1
+ github.com/aws/smithy-go v1.20.3
+)
+
+require (
+ github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
+ github.com/aws/aws-sdk-go-v2/credentials v1.17.27 // indirect
+ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.12 // indirect
+ github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect
+ github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.14 // indirect
+ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect
+ github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.12 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 // indirect
+ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect
+ github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect
+)
diff --git a/feature/s3/transfermanager/go.sum b/feature/s3/transfermanager/go.sum
new file mode 100644
index 00000000000..2147a83948c
--- /dev/null
+++ b/feature/s3/transfermanager/go.sum
@@ -0,0 +1,36 @@
+github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY=
+github.com/aws/aws-sdk-go-v2 v1.30.3/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc=
+github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 h1:x6xsQXGSmW6frevwDA+vi/wqhp1ct18mVXYN08/93to=
+github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2/go.mod h1:lPprDr1e6cJdyYeGXnRaJoP4Md+cDBvi2eOj00BlGmg=
+github.com/aws/aws-sdk-go-v2/config v1.27.27 h1:HdqgGt1OAP0HkEDDShEl0oSYa9ZZBSOmKpdpsDMdO90=
+github.com/aws/aws-sdk-go-v2/config v1.27.27/go.mod h1:MVYamCg76dFNINkZFu4n4RjDixhVr51HLj4ErWzrVwg=
+github.com/aws/aws-sdk-go-v2/credentials v1.17.27 h1:2raNba6gr2IfA0eqqiP2XiQ0UVOpGPgDSi0I9iAP+UI=
+github.com/aws/aws-sdk-go-v2/credentials v1.17.27/go.mod h1:gniiwbGahQByxan6YjQUMcW4Aov6bLC3m+evgcoN4r4=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 h1:KreluoV8FZDEtI6Co2xuNk/UqI9iwMrOx/87PBNIKqw=
+github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11/go.mod h1:SeSUYBLsMYFoRvHE0Tjvn7kbxaUhl75CJi1sbfhMxkU=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 h1:SoNJ4RlFEQEbtDcCEt+QG56MY4fm4W8rYirAmq+/DdU=
+github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15/go.mod h1:U9ke74k1n2bf+RIgoX1SXFed1HLs51OgUSs+Ph0KJP8=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 h1:C6WHdGnTDIYETAm5iErQUiVNsclNx9qbJVPIt03B6bI=
+github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15/go.mod h1:ZQLZqhcu+JhSrA9/NXRm8SkDvsycE+JkV3WGY41e+IM=
+github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU=
+github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY=
+github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.12 h1:DXFWyt7ymx/l1ygdyTTS0X923e+Q2wXIxConJzrgwc0=
+github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.12/go.mod h1:mVOr/LbvaNySK1/BTy4cBOCjhCNY2raWBwK4v+WR5J4=
+github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 h1:dT3MqvGhSoaIhRseqw2I0yH81l7wiR2vjs57O51EAm8=
+github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3/go.mod h1:GlAeCkHwugxdHaueRr4nhPuY+WW+gR8UjlcqzPr1SPI=
+github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.14 h1:oWccitSnByVU74rQRHac4gLfDqjB6Z1YQGOY/dXKedI=
+github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.14/go.mod h1:8SaZBlQdCLrc/2U3CEO48rYj9uR8qRsPRkmzwNM52pM=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 h1:HGErhhrxZlQ044RiM+WdoZxp0p+EGM62y3L6pwA4olE=
+github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17/go.mod h1:RkZEx4l0EHYDJpWppMJ3nD9wZJAa8/0lq9aVC+r2UII=
+github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.12 h1:tzha+v1SCEBpXWEuw6B/+jm4h5z8hZbTpXz0zRZqTnw=
+github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.12/go.mod h1:n+nt2qjHGoseWeLHt1vEr6ZRCCxIN2KcNpJxBcYQSwI=
+github.com/aws/aws-sdk-go-v2/service/s3 v1.56.1 h1:wsg9Z/vNnCmxWikfGIoOlnExtEU459cR+2d+iDJ8elo=
+github.com/aws/aws-sdk-go-v2/service/s3 v1.56.1/go.mod h1:8rDw3mVwmvIWWX/+LWY3PPIMZuwnQdJMCt0iVFVT3qw=
+github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 h1:BXx0ZIxvrJdSgSvKTZ+yRBeSqqgPM89VPlulEcl37tM=
+github.com/aws/aws-sdk-go-v2/service/sso v1.22.4/go.mod h1:ooyCOXjvJEsUw7x+ZDHeISPMhtwI3ZCB7ggFMcFfWLU=
+github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 h1:yiwVzJW2ZxZTurVbYWA7QOrAaCYQR72t0wrSBfoesUE=
+github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4/go.mod h1:0oxfLkpz3rQ/CHlx5hB7H69YUpFiI1tql6Q6Ne+1bCw=
+github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 h1:ZsDKRLXGWHk8WdtyYMoGNO7bTudrvuKpDKgMVRlepGE=
+github.com/aws/aws-sdk-go-v2/service/sts v1.30.3/go.mod h1:zwySh8fpFyXp9yOr/KVzxOl8SRqgf/IDw5aUt9UKFcQ=
+github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE=
+github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
diff --git a/feature/s3/transfermanager/internal/testing/endpoint.go b/feature/s3/transfermanager/internal/testing/endpoint.go
new file mode 100644
index 00000000000..082aedec4f2
--- /dev/null
+++ b/feature/s3/transfermanager/internal/testing/endpoint.go
@@ -0,0 +1,25 @@
+package testing
+
+import (
+ "context"
+ "net/url"
+
+ "github.com/aws/aws-sdk-go-v2/service/s3"
+ smithyendpoints "github.com/aws/smithy-go/endpoints"
+)
+
+// EndpointResolverV2 is a mock s3 endpoint resolver v2 for testing
+type EndpointResolverV2 struct {
+ URL string
+}
+
+// ResolveEndpoint returns the given endpoint url
+func (r EndpointResolverV2) ResolveEndpoint(ctx context.Context, params s3.EndpointParameters) (smithyendpoints.Endpoint, error) {
+ u, err := url.Parse(r.URL)
+ if err != nil {
+ return smithyendpoints.Endpoint{}, err
+ }
+ return smithyendpoints.Endpoint{
+ URI: *u,
+ }, nil
+}
diff --git a/feature/s3/transfermanager/internal/testing/upload.go b/feature/s3/transfermanager/internal/testing/upload.go
new file mode 100644
index 00000000000..10009ccdc0f
--- /dev/null
+++ b/feature/s3/transfermanager/internal/testing/upload.go
@@ -0,0 +1,201 @@
+package testing
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "sync"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/s3"
+)
+
+// UploadLoggingClient is a mock client that can be used to record and stub responses for testing the Uploader.
+type UploadLoggingClient struct {
+ Invocations []string
+ Params []interface{}
+
+ ConsumeBody bool
+
+ ignoredOperations []string
+
+ PartNum int
+ m sync.Mutex
+
+ PutObjectFn func(*UploadLoggingClient, *s3.PutObjectInput) (*s3.PutObjectOutput, error)
+ UploadPartFn func(*UploadLoggingClient, *s3.UploadPartInput) (*s3.UploadPartOutput, error)
+ CreateMultipartUploadFn func(*UploadLoggingClient, *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error)
+ CompleteMultipartUploadFn func(*UploadLoggingClient, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error)
+ AbortMultipartUploadFn func(*UploadLoggingClient, *s3.AbortMultipartUploadInput) (*s3.AbortMultipartUploadOutput, error)
+}
+
+func (u *UploadLoggingClient) simulateHTTPClientOption(optFns ...func(*s3.Options)) error {
+
+ o := s3.Options{
+ HTTPClient: httpDoFunc(func(r *http.Request) (*http.Response, error) {
+ return &http.Response{
+ Request: r,
+ }, nil
+ }),
+ }
+
+ for _, fn := range optFns {
+ fn(&o)
+ }
+
+ _, err := o.HTTPClient.Do(&http.Request{
+ URL: &url.URL{
+ Scheme: "https",
+ Host: "mock.amazonaws.com",
+ Path: "/key",
+ RawQuery: "foo=bar",
+ },
+ })
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+type httpDoFunc func(*http.Request) (*http.Response, error)
+
+func (f httpDoFunc) Do(r *http.Request) (*http.Response, error) {
+ return f(r)
+}
+
+func (u *UploadLoggingClient) traceOperation(name string, params interface{}) {
+ if contains(u.ignoredOperations, name) {
+ return
+ }
+ u.Invocations = append(u.Invocations, name)
+ u.Params = append(u.Params, params)
+
+}
+
+// PutObject is the S3 PutObject API.
+func (u *UploadLoggingClient) PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) {
+ u.m.Lock()
+ defer u.m.Unlock()
+
+ if u.ConsumeBody {
+ io.Copy(ioutil.Discard, params.Body)
+ }
+
+ u.traceOperation("PutObject", params)
+
+ if err := u.simulateHTTPClientOption(optFns...); err != nil {
+ return nil, err
+ }
+
+ if u.PutObjectFn != nil {
+ return u.PutObjectFn(u, params)
+ }
+
+ return &s3.PutObjectOutput{
+ VersionId: aws.String("VERSION-ID"),
+ }, nil
+}
+
+// UploadPart is the S3 UploadPart API.
+func (u *UploadLoggingClient) UploadPart(ctx context.Context, params *s3.UploadPartInput, optFns ...func(*s3.Options)) (*s3.UploadPartOutput, error) {
+ u.m.Lock()
+ defer u.m.Unlock()
+
+ if u.ConsumeBody {
+ io.Copy(ioutil.Discard, params.Body)
+ }
+
+ u.traceOperation("UploadPart", params)
+
+ if err := u.simulateHTTPClientOption(optFns...); err != nil {
+ return nil, err
+ }
+
+ if u.UploadPartFn != nil {
+ return u.UploadPartFn(u, params)
+ }
+
+ return &s3.UploadPartOutput{
+ ETag: aws.String(fmt.Sprintf("ETAG%d", *params.PartNumber)),
+ }, nil
+}
+
+// CreateMultipartUpload is the S3 CreateMultipartUpload API.
+func (u *UploadLoggingClient) CreateMultipartUpload(ctx context.Context, params *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) {
+ u.m.Lock()
+ defer u.m.Unlock()
+
+ u.traceOperation("CreateMultipartUpload", params)
+
+ if err := u.simulateHTTPClientOption(optFns...); err != nil {
+ return nil, err
+ }
+
+ if u.CreateMultipartUploadFn != nil {
+ return u.CreateMultipartUploadFn(u, params)
+ }
+
+ return &s3.CreateMultipartUploadOutput{
+ UploadId: aws.String("UPLOAD-ID"),
+ }, nil
+}
+
+// CompleteMultipartUpload is the S3 CompleteMultipartUpload API.
+func (u *UploadLoggingClient) CompleteMultipartUpload(ctx context.Context, params *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) {
+ u.m.Lock()
+ defer u.m.Unlock()
+
+ u.traceOperation("CompleteMultipartUpload", params)
+
+ if err := u.simulateHTTPClientOption(optFns...); err != nil {
+ return nil, err
+ }
+
+ if u.CompleteMultipartUploadFn != nil {
+ return u.CompleteMultipartUploadFn(u, params)
+ }
+
+ return &s3.CompleteMultipartUploadOutput{
+ Location: aws.String("http://location"),
+ VersionId: aws.String("VERSION-ID"),
+ }, nil
+}
+
+// AbortMultipartUpload is the S3 AbortMultipartUpload API.
+func (u *UploadLoggingClient) AbortMultipartUpload(ctx context.Context, params *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) {
+ u.m.Lock()
+ defer u.m.Unlock()
+
+ u.traceOperation("AbortMultipartUpload", params)
+ if err := u.simulateHTTPClientOption(optFns...); err != nil {
+ return nil, err
+ }
+
+ if u.AbortMultipartUploadFn != nil {
+ return u.AbortMultipartUploadFn(u, params)
+ }
+
+ return &s3.AbortMultipartUploadOutput{}, nil
+}
+
+// NewUploadLoggingClient returns a new UploadLoggingClient.
+func NewUploadLoggingClient(ignoredOps []string) (*UploadLoggingClient, *[]string, *[]interface{}) {
+ c := &UploadLoggingClient{
+ ignoredOperations: ignoredOps,
+ }
+
+ return c, &c.Invocations, &c.Params
+}
+
+func contains(src []string, s string) bool {
+ for _, v := range src {
+ if v == s {
+ return true
+ }
+ }
+ return false
+}
diff --git a/feature/s3/transfermanager/options.go b/feature/s3/transfermanager/options.go
new file mode 100644
index 00000000000..a49e74afd64
--- /dev/null
+++ b/feature/s3/transfermanager/options.go
@@ -0,0 +1,63 @@
+package transfermanager
+
+import "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/types"
+
+// Options provides params needed for transfer api calls
+type Options struct {
+ // The client to use when uploading to S3.
+ S3 S3APIClient
+
+ // The buffer size (in bytes) to use when buffering data into chunks and
+ // sending them as parts to S3. The minimum allowed part size is 5MB, and
+ // if this value is set to zero, the DefaultUploadPartSize value will be used.
+ PartSizeBytes int64
+
+ // The threshold bytes to decide when the file should be multi-uploaded
+ MultipartUploadThreshold int64
+
+ // Option to disable checksum validation for download
+ DisableChecksum bool
+
+ // Checksum algorithm to use for upload
+ ChecksumAlgorithm types.ChecksumAlgorithm
+
+ // The number of goroutines to spin up in parallel per call to Upload when
+ // sending parts. If this is set to zero, the DefaultUploadConcurrency value
+ // will be used.
+ //
+ // The concurrency pool is not shared between calls to Upload.
+ Concurrency int
+}
+
+func (o *Options) init() {
+}
+
+func resolveConcurrency(o *Options) {
+ if o.Concurrency == 0 {
+ o.Concurrency = defaultTransferConcurrency
+ }
+}
+
+func resolvePartSizeBytes(o *Options) {
+ if o.PartSizeBytes == 0 {
+ o.PartSizeBytes = minPartSizeBytes
+ }
+}
+
+func resolveChecksumAlgorithm(o *Options) {
+ if o.ChecksumAlgorithm == "" {
+ o.ChecksumAlgorithm = types.ChecksumAlgorithmCrc32
+ }
+}
+
+func resolveMultipartUploadThreshold(o *Options) {
+ if o.MultipartUploadThreshold == 0 {
+ o.MultipartUploadThreshold = defaultMultipartUploadThreshold
+ }
+}
+
+// Copy returns new copy of the Options
+func (o Options) Copy() Options {
+ to := o
+ return to
+}
diff --git a/feature/s3/transfermanager/pool.go b/feature/s3/transfermanager/pool.go
new file mode 100644
index 00000000000..d681861a121
--- /dev/null
+++ b/feature/s3/transfermanager/pool.go
@@ -0,0 +1,63 @@
+package transfermanager
+
+import (
+ "context"
+ "fmt"
+)
+
+type bytesBufferPool interface {
+ Get(context.Context) ([]byte, error)
+ Put([]byte)
+ Close()
+}
+
+type defaultSlicePool struct {
+ slices chan []byte
+}
+
+func newDefaultSlicePool(sliceSize int64, capacity int) *defaultSlicePool {
+ p := &defaultSlicePool{}
+
+ slices := make(chan []byte, capacity)
+ for i := 0; i < capacity; i++ {
+ slices <- make([]byte, sliceSize)
+ }
+
+ p.slices = slices
+ return p
+}
+
+var errZeroCapacity = fmt.Errorf("get called on zero capacity pool")
+
+func (p *defaultSlicePool) Get(ctx context.Context) ([]byte, error) {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
+
+ for {
+ select {
+ case bs, ok := <-p.slices:
+ if !ok {
+ return nil, errZeroCapacity
+ }
+ return bs, nil
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
+ }
+}
+
+func (p *defaultSlicePool) Put(bs []byte) {
+ p.slices <- bs
+}
+
+func (p *defaultSlicePool) Close() {
+ close(p.slices)
+ for range p.slices {
+ // drain channel
+ }
+ p.slices = nil
+}
diff --git a/feature/s3/transfermanager/pool_test.go b/feature/s3/transfermanager/pool_test.go
new file mode 100644
index 00000000000..d6e107bc98c
--- /dev/null
+++ b/feature/s3/transfermanager/pool_test.go
@@ -0,0 +1,47 @@
+package transfermanager
+
+import (
+ "context"
+ "sync"
+ "testing"
+)
+
+func TestDefaultSlicePool(t *testing.T) {
+ pool := newDefaultSlicePool(1, 2)
+
+ var bs []byte
+ var err error
+ var wg sync.WaitGroup
+
+ for i := 0; i < 200; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ pool.Put(bs)
+ }()
+ }
+ // wait for a slice to be put back
+ for i := 0; i < 200; i++ {
+ bs, err = pool.Get(context.Background())
+ if err != nil {
+ t.Errorf("failed to get slice from pool: %v", err)
+ }
+ }
+
+ wg.Wait()
+
+ // failed to get a slice due to ctx cancelled
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+ bs, err = pool.Get(ctx)
+ if err == nil {
+ pool.Put(bs)
+ t.Errorf("expectd no slice to be returned")
+ }
+
+ if e, a := 2, len(pool.slices); e != a {
+ t.Errorf("expect pool size to be %v, got %v", e, a)
+ }
+
+ pool.Close()
+}
diff --git a/feature/s3/transfermanager/putobject_test.go b/feature/s3/transfermanager/putobject_test.go
new file mode 100644
index 00000000000..06cd0ebe149
--- /dev/null
+++ b/feature/s3/transfermanager/putobject_test.go
@@ -0,0 +1,904 @@
+package transfermanager
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "reflect"
+ "regexp"
+ "sort"
+ "strconv"
+ "strings"
+ "testing"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/aws/retry"
+ s3testing "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/internal/testing"
+ "github.com/aws/aws-sdk-go-v2/internal/awstesting"
+ "github.com/aws/aws-sdk-go-v2/internal/sdk"
+ "github.com/aws/aws-sdk-go-v2/service/s3"
+ "github.com/aws/aws-sdk-go-v2/service/s3/types"
+)
+
+// getReaderLength discards the bytes from reader and returns the length
+func getReaderLength(r io.Reader) int64 {
+ n, _ := io.Copy(ioutil.Discard, r)
+ return n
+}
+
+func TestUploadOrderMulti(t *testing.T) {
+ c, invocations, args := s3testing.NewUploadLoggingClient(nil)
+ mgr := New(c, Options{})
+
+ resp, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key - value",
+ Body: bytes.NewReader(buf20MB),
+ ServerSideEncryption: "aws:kms",
+ SSEKMSKeyID: "KmsId",
+ ContentType: "content/type",
+ })
+
+ if err != nil {
+ t.Errorf("Expected no error but received %v", err)
+ }
+
+ if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 {
+ t.Errorf(diff)
+ }
+
+ if "UPLOAD-ID" != resp.UploadID {
+ t.Errorf("expect %q, got %q", "UPLOAD-ID", resp.UploadID)
+ }
+
+ if "VERSION-ID" != resp.VersionID {
+ t.Errorf("expect %q, got %q", "VERSION-ID", resp.VersionID)
+ }
+
+ // Validate input values
+
+ // UploadPart
+ for i := 1; i < 4; i++ {
+ v := aws.ToString((*args)[i].(*s3.UploadPartInput).UploadId)
+ if "UPLOAD-ID" != v {
+ t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v)
+ }
+ }
+
+ // CompleteMultipartUpload
+ v := aws.ToString((*args)[4].(*s3.CompleteMultipartUploadInput).UploadId)
+ if "UPLOAD-ID" != v {
+ t.Errorf("Expected %q, but received %q", "UPLOAD-ID", v)
+ }
+
+ parts := (*args)[4].(*s3.CompleteMultipartUploadInput).MultipartUpload.Parts
+
+ for i := 0; i < 3; i++ {
+ num := parts[i].PartNumber
+ etag := aws.ToString(parts[i].ETag)
+
+ if int32(i+1) != aws.ToInt32(num) {
+ t.Errorf("expect %d, got %d", i+1, num)
+ }
+
+ if matched, err := regexp.MatchString(`^ETAG\d+$`, etag); !matched || err != nil {
+ t.Errorf("Failed regexp expression `^ETAG\\d+$`")
+ }
+ }
+
+ // Custom headers
+ cmu := (*args)[0].(*s3.CreateMultipartUploadInput)
+
+ if e, a := types.ServerSideEncryption("aws:kms"), cmu.ServerSideEncryption; e != a {
+ t.Errorf("expect %q, got %q", e, a)
+ }
+
+ if e, a := "KmsId", aws.ToString(cmu.SSEKMSKeyId); e != a {
+ t.Errorf("expect %q, got %q", e, a)
+ }
+
+ if e, a := "content/type", aws.ToString(cmu.ContentType); e != a {
+ t.Errorf("expect %q, got %q", e, a)
+ }
+}
+
+func TestUploadOrderMultiDifferentPartSize(t *testing.T) {
+ c, ops, args := s3testing.NewUploadLoggingClient(nil)
+ mgr := New(c, Options{
+ PartSizeBytes: 1024 * 1024 * 11,
+ Concurrency: 1,
+ })
+
+ _, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: bytes.NewReader(buf20MB),
+ })
+
+ if err != nil {
+ t.Errorf("expect no error, got %v", err)
+ }
+
+ vals := []string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"}
+ if !reflect.DeepEqual(vals, *ops) {
+ t.Errorf("expect %v, got %v", vals, *ops)
+ }
+
+ // Part lengths
+ if len := getReaderLength((*args)[1].(*s3.UploadPartInput).Body); 1024*1024*11 != len {
+ t.Errorf("expect %d, got %d", 1024*1024*7, len)
+ }
+ if len := getReaderLength((*args)[2].(*s3.UploadPartInput).Body); 1024*1024*9 != len {
+ t.Errorf("expect %d, got %d", 1024*1024*5, len)
+ }
+}
+
+func TestUploadFailIfPartSizeTooSmall(t *testing.T) {
+ mgr := New(s3.New(s3.Options{}), Options{},
+ func(o *Options) {
+ o.PartSizeBytes = 5
+ })
+ resp, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: bytes.NewReader(buf20MB),
+ })
+
+ if resp != nil {
+ t.Errorf("Expected response to be nil, but received %v", resp)
+ }
+
+ if err == nil {
+ t.Errorf("Expected error, but received nil")
+ }
+ if e, a := "part size must be at least", err.Error(); !strings.Contains(a, e) {
+ t.Errorf("expect %v to be in %v", e, a)
+ }
+}
+
+func TestUploadOrderSingle(t *testing.T) {
+ c, invocations, params := s3testing.NewUploadLoggingClient(nil)
+ mgr := New(c, Options{})
+ resp, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key - value",
+ Body: bytes.NewReader(buf2MB),
+ ServerSideEncryption: "aws:kms",
+ SSEKMSKeyID: "KmsId",
+ ContentType: "content/type",
+ })
+
+ if err != nil {
+ t.Errorf("expect no error but received %v", err)
+ }
+
+ if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 {
+ t.Error(diff)
+ }
+
+ if e := "VERSION-ID"; e != resp.VersionID {
+ t.Errorf("expect %q, got %q", e, resp.VersionID)
+ }
+
+ if len(resp.UploadID) > 0 {
+ t.Errorf("expect empty string, got %q", resp.UploadID)
+ }
+
+ putObjectInput := (*params)[0].(*s3.PutObjectInput)
+
+ if e, a := types.ServerSideEncryption("aws:kms"), putObjectInput.ServerSideEncryption; e != a {
+ t.Errorf("expect %q, got %q", e, a)
+ }
+
+ if e, a := "KmsId", aws.ToString(putObjectInput.SSEKMSKeyId); e != a {
+ t.Errorf("expect %q, got %q", e, a)
+ }
+
+ if e, a := "content/type", aws.ToString(putObjectInput.ContentType); e != a {
+ t.Errorf("Expected %q, but received %q", e, a)
+ }
+}
+
+func TestUploadSingleFailure(t *testing.T) {
+ c, invocations, _ := s3testing.NewUploadLoggingClient(nil)
+
+ c.PutObjectFn = func(*s3testing.UploadLoggingClient, *s3.PutObjectInput) (*s3.PutObjectOutput, error) {
+ return nil, fmt.Errorf("put object failure")
+ }
+
+ mgr := New(c, Options{})
+ resp, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: bytes.NewReader(buf2MB),
+ })
+
+ if err == nil {
+ t.Error("expect error, got nil")
+ }
+
+ if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 {
+ t.Error(diff)
+ }
+
+ if resp != nil {
+ t.Errorf("expect response to be nil, got %v", resp)
+ }
+}
+
+func TestUploadOrderZero(t *testing.T) {
+ c, invocations, params := s3testing.NewUploadLoggingClient(nil)
+ mgr := New(c, Options{})
+ resp, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: bytes.NewReader(make([]byte, 0)),
+ })
+
+ if err != nil {
+ t.Errorf("expect no error, got %v", err)
+ }
+
+ if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 {
+ t.Error(diff)
+ }
+
+ if len(resp.UploadID) > 0 {
+ t.Errorf("expect empty string, got %q", resp.UploadID)
+ }
+
+ if e, a := int64(0), getReaderLength((*params)[0].(*s3.PutObjectInput).Body); e != a {
+ t.Errorf("Expected %d, but received %d", e, a)
+ }
+}
+
+func TestUploadOrderMultiFailure(t *testing.T) {
+ c, invocations, _ := s3testing.NewUploadLoggingClient(nil)
+
+ c.UploadPartFn = func(u *s3testing.UploadLoggingClient, params *s3.UploadPartInput) (*s3.UploadPartOutput, error) {
+ if *params.PartNumber == 2 {
+ return nil, fmt.Errorf("an unexpected error")
+ }
+ return &s3.UploadPartOutput{ETag: aws.String(fmt.Sprintf("ETAG%d", u.PartNum))}, nil
+ }
+
+ mgr := New(c, Options{}, func(o *Options) {
+ o.Concurrency = 1
+ })
+ _, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: bytes.NewReader(buf20MB),
+ })
+
+ if err == nil {
+ t.Error("expect error, got nil")
+ }
+
+ if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "AbortMultipartUpload"}, *invocations); len(diff) > 0 {
+ t.Error(diff)
+ }
+}
+
+func TestUploadOrderMultiFailureOnComplete(t *testing.T) {
+ c, invocations, _ := s3testing.NewUploadLoggingClient(nil)
+
+ c.CompleteMultipartUploadFn = func(*s3testing.UploadLoggingClient, *s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) {
+ return nil, fmt.Errorf("complete multipart error")
+ }
+
+ mgr := New(c, Options{}, func(o *Options) {
+ o.Concurrency = 1
+ })
+ _, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: bytes.NewReader(buf20MB),
+ })
+
+ if err == nil {
+ t.Error("expect error, got nil")
+ }
+
+ if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "UploadPart",
+ "CompleteMultipartUpload", "AbortMultipartUpload"}, *invocations); len(diff) > 0 {
+ t.Error(diff)
+ }
+}
+
+func TestUploadOrderMultiFailureOnCreate(t *testing.T) {
+ c, invocations, _ := s3testing.NewUploadLoggingClient(nil)
+
+ c.CreateMultipartUploadFn = func(*s3testing.UploadLoggingClient, *s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) {
+ return nil, fmt.Errorf("create multipart upload failure")
+ }
+
+ mgr := New(c, Options{})
+ _, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: bytes.NewReader(make([]byte, 1024*1024*12)),
+ })
+
+ if err == nil {
+ t.Error("expect error, got nil")
+ }
+
+ if diff := cmpDiff([]string{"CreateMultipartUpload"}, *invocations); len(diff) > 0 {
+ t.Error(diff)
+ }
+}
+
+type failreader struct {
+ times int
+ failCount int
+}
+
+func (f *failreader) Read(b []byte) (int, error) {
+ f.failCount++
+ if f.failCount >= f.times {
+ return 0, fmt.Errorf("random failure")
+ }
+ return len(b), nil
+}
+
+func TestUploadOrderReadFail1(t *testing.T) {
+ c, invocations, _ := s3testing.NewUploadLoggingClient(nil)
+ mgr := New(c, Options{})
+ _, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: &failreader{times: 1},
+ })
+ if err == nil {
+ t.Fatalf("expect error to not be nil")
+ }
+
+ if e, a := "random failure", err.Error(); !strings.Contains(a, e) {
+ t.Errorf("expect %v, got %v", e, a)
+ }
+
+ if diff := cmpDiff([]string(nil), *invocations); len(diff) > 0 {
+ t.Error(diff)
+ }
+}
+
+func TestUploadOrderReadFail2(t *testing.T) {
+ c, invocations, _ := s3testing.NewUploadLoggingClient([]string{"UploadPart"})
+ mgr := New(c, Options{}, func(o *Options) {
+ o.Concurrency = 1
+ })
+ _, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: &failreader{times: 2},
+ })
+ if err == nil {
+ t.Fatalf("expect error to not be nil")
+ }
+
+ if e, a := "random failure", err.Error(); !strings.Contains(a, e) {
+ t.Errorf("expect %v, got %q", e, a)
+ }
+
+ if diff := cmpDiff([]string{"CreateMultipartUpload", "AbortMultipartUpload"}, *invocations); len(diff) > 0 {
+ t.Error(diff)
+ }
+}
+
+type sizedReader struct {
+ size int
+ cur int
+ err error
+}
+
+func (s *sizedReader) Read(p []byte) (n int, err error) {
+ if s.cur >= s.size {
+ if s.err == nil {
+ s.err = io.EOF
+ }
+ return 0, s.err
+ }
+
+ n = len(p)
+ s.cur += len(p)
+ if s.cur > s.size {
+ n -= s.cur - s.size
+ }
+
+ return n, err
+}
+
+func TestUploadOrderMultiBufferedReader(t *testing.T) {
+ c, invocations, params := s3testing.NewUploadLoggingClient(nil)
+ mgr := New(c, Options{})
+ _, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: &sizedReader{size: 1024 * 1024 * 21},
+ })
+ if err != nil {
+ t.Errorf("expect no error, got %v", err)
+ }
+
+ if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart",
+ "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 {
+ t.Error(diff)
+ }
+
+ // Part lengths
+ var parts []int64
+ for i := 1; i <= 3; i++ {
+ parts = append(parts, getReaderLength((*params)[i].(*s3.UploadPartInput).Body))
+ }
+ sort.Slice(parts, func(i, j int) bool {
+ return parts[i] < parts[j]
+ })
+
+ if diff := cmpDiff([]int64{1024 * 1024 * 5, 1024 * 1024 * 8, 1024 * 1024 * 8}, parts); len(diff) > 0 {
+ t.Error(diff)
+ }
+}
+
+func TestUploadOrderMultiBufferedReaderPartial(t *testing.T) {
+ c, invocations, params := s3testing.NewUploadLoggingClient(nil)
+ mgr := New(c, Options{})
+ _, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: &sizedReader{size: 1024 * 1024 * 21, err: io.EOF},
+ })
+ if err != nil {
+ t.Errorf("expect no error, got %v", err)
+ }
+
+ if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart",
+ "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 {
+ t.Error(diff)
+ }
+
+ // Part lengths
+ var parts []int64
+ for i := 1; i <= 3; i++ {
+ parts = append(parts, getReaderLength((*params)[i].(*s3.UploadPartInput).Body))
+ }
+ sort.Slice(parts, func(i, j int) bool {
+ return parts[i] < parts[j]
+ })
+
+ if diff := cmpDiff([]int64{1024 * 1024 * 5, 1024 * 1024 * 8, 1024 * 1024 * 8}, parts); len(diff) > 0 {
+ t.Error(diff)
+ }
+}
+
+// TestUploadOrderMultiBufferedReaderEOF tests the edge case where the
+// file size is the same as part size.
+func TestUploadOrderMultiBufferedReaderEOF(t *testing.T) {
+ c, invocations, params := s3testing.NewUploadLoggingClient(nil)
+ mgr := New(c, Options{})
+ _, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: &sizedReader{size: 1024 * 1024 * 16, err: io.EOF},
+ })
+
+ if err != nil {
+ t.Errorf("expect no error, got %v", err)
+ }
+
+ if diff := cmpDiff([]string{"CreateMultipartUpload", "UploadPart", "UploadPart", "CompleteMultipartUpload"}, *invocations); len(diff) > 0 {
+ t.Error(diff)
+ }
+
+ // Part lengths
+ var parts []int64
+ for i := 1; i <= 2; i++ {
+ parts = append(parts, getReaderLength((*params)[i].(*s3.UploadPartInput).Body))
+ }
+ sort.Slice(parts, func(i, j int) bool {
+ return parts[i] < parts[j]
+ })
+
+ if diff := cmpDiff([]int64{1024 * 1024 * 8, 1024 * 1024 * 8}, parts); len(diff) > 0 {
+ t.Error(diff)
+ }
+}
+
+func TestUploadOrderSingleBufferedReader(t *testing.T) {
+ c, invocations, _ := s3testing.NewUploadLoggingClient(nil)
+ mgr := New(c, Options{})
+ resp, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: &sizedReader{size: 1024 * 1024 * 2},
+ })
+
+ if err != nil {
+ t.Errorf("expect no error, got %v", err)
+ }
+
+ if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 {
+ t.Error(diff)
+ }
+
+ if len(resp.UploadID) > 0 {
+ t.Errorf("expect no value, got %q", resp.UploadID)
+ }
+}
+
+func TestUploadZeroLenObject(t *testing.T) {
+ c, invocations, _ := s3testing.NewUploadLoggingClient(nil)
+
+ mgr := New(c, Options{})
+ resp, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: strings.NewReader(""),
+ })
+
+ if err != nil {
+ t.Errorf("expect no error but received %v", err)
+ }
+ if diff := cmpDiff([]string{"PutObject"}, *invocations); len(diff) > 0 {
+ t.Errorf("expect request to have been made, but was not, %v", diff)
+ }
+
+ if len(resp.UploadID) > 0 {
+ t.Errorf("expect empty string, but received %q", resp.UploadID)
+ }
+}
+
+type testIncompleteReader struct {
+ Size int64
+ read int64
+}
+
+func (r *testIncompleteReader) Read(p []byte) (n int, err error) {
+ r.read += int64(len(p))
+ if r.read >= r.Size {
+ return int(r.read - r.Size), io.ErrUnexpectedEOF
+ }
+ return len(p), nil
+}
+
+func TestUploadUnexpectedEOF(t *testing.T) {
+ c, invocations, _ := s3testing.NewUploadLoggingClient(nil)
+ mgr := New(c, Options{}, func(o *Options) {
+ o.Concurrency = 1
+ o.PartSizeBytes = minPartSizeBytes
+ })
+ _, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: &testIncompleteReader{
+ Size: minPartSizeBytes + 1,
+ },
+ })
+ if err == nil {
+ t.Error("expect error, got nil")
+ }
+
+ // Ensure upload started.
+ if e, a := "CreateMultipartUpload", (*invocations)[0]; e != a {
+ t.Errorf("expect %q, got %q", e, a)
+ }
+
+ // Part may or may not be sent because of timing of sending parts and
+ // reading next part in upload manager. Just check for the last abort.
+ if e, a := "AbortMultipartUpload", (*invocations)[len(*invocations)-1]; e != a {
+ t.Errorf("expect %q, got %q", e, a)
+ }
+}
+
+func TestSSE(t *testing.T) {
+ c, _, _ := s3testing.NewUploadLoggingClient(nil)
+ c.UploadPartFn = func(u *s3testing.UploadLoggingClient, params *s3.UploadPartInput) (*s3.UploadPartOutput, error) {
+ if params.SSECustomerAlgorithm == nil {
+ t.Fatal("SSECustomerAlgoritm should not be nil")
+ }
+ if params.SSECustomerKey == nil {
+ t.Fatal("SSECustomerKey should not be nil")
+ }
+ return &s3.UploadPartOutput{ETag: aws.String(fmt.Sprintf("ETAG%d", u.PartNum))}, nil
+ }
+
+ mgr := New(c, Options{}, func(o *Options) {
+ o.Concurrency = 5
+ })
+
+ _, err := mgr.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ SSECustomerAlgorithm: "AES256",
+ SSECustomerKey: "foo",
+ Body: bytes.NewBuffer(make([]byte, 1024*1024*10)),
+ })
+
+ if err != nil {
+ t.Fatal("Expected no error, but received" + err.Error())
+ }
+}
+
+func TestUploadWithContextCanceled(t *testing.T) {
+ c := s3.New(s3.Options{
+ UsePathStyle: true,
+ Region: "mock-region",
+ })
+ u := New(c, Options{})
+
+ ctx := &awstesting.FakeContext{DoneCh: make(chan struct{})}
+ ctx.Error = fmt.Errorf("context canceled")
+ close(ctx.DoneCh)
+
+ _, err := u.PutObject(ctx, &PutObjectInput{
+ Bucket: "Bucket",
+ Key: "Key",
+ Body: bytes.NewReader(make([]byte, 0)),
+ })
+ if err == nil {
+ t.Fatalf("expect error, got nil")
+ }
+
+ if e, a := "canceled", err.Error(); !strings.Contains(a, e) {
+ t.Errorf("expected error message to contain %q, but did not %q", e, a)
+ }
+}
+
+func TestUploadRetry(t *testing.T) {
+ const part, retries = 3, 10
+ testFile, testFileCleanup, err := createTempFile(t, minPartSizeBytes*part)
+ if err != nil {
+ t.Fatalf("failed to create test file, %v", err)
+ }
+ defer testFileCleanup(t)
+
+ cases := map[string]struct {
+ Body io.Reader
+ PartHandlers func(testing.TB) []http.Handler
+ }{
+ "bytes.Buffer": {
+ Body: bytes.NewBuffer(make([]byte, minPartSizeBytes*part)),
+ PartHandlers: func(tb testing.TB) []http.Handler {
+ return buildFailHandlers(tb, part, retries)
+ },
+ },
+ "bytes.Reader": {
+ Body: bytes.NewReader(make([]byte, minPartSizeBytes*part)),
+ PartHandlers: func(tb testing.TB) []http.Handler {
+ return buildFailHandlers(tb, part, retries)
+ },
+ },
+ "os.File": {
+ Body: testFile,
+ PartHandlers: func(tb testing.TB) []http.Handler {
+ return buildFailHandlers(tb, part, retries)
+ },
+ },
+ }
+
+ for name, c := range cases {
+ t.Run(name, func(t *testing.T) {
+ restoreSleep := sdk.TestingUseNopSleep()
+ defer restoreSleep()
+
+ mux := newMockS3UploadServer(t, c.PartHandlers(t))
+ server := httptest.NewServer(mux)
+ defer server.Close()
+
+ client := s3.New(s3.Options{
+ EndpointResolverV2: s3testing.EndpointResolverV2{URL: server.URL},
+ UsePathStyle: true,
+ Retryer: retry.NewStandard(func(o *retry.StandardOptions) {
+ o.MaxAttempts = retries + 1
+ }),
+ })
+
+ uploader := New(client, Options{})
+ _, err := uploader.PutObject(context.Background(), &PutObjectInput{
+ Bucket: "bucket",
+ Key: "key",
+ Body: c.Body,
+ })
+
+ if err != nil {
+ t.Fatalf("expect no error, got %v", err)
+ }
+ })
+ }
+}
+
+func newMockS3UploadServer(tb testing.TB, partHandler []http.Handler) *mockS3UploadServer {
+ s := &mockS3UploadServer{
+ ServeMux: http.NewServeMux(),
+ partHandlers: partHandler,
+ tb: tb,
+ }
+
+ s.HandleFunc("/", s.handleRequest)
+
+ return s
+}
+
+func buildFailHandlers(tb testing.TB, part, retry int) []http.Handler {
+ handlers := make([]http.Handler, part)
+
+ for i := 0; i < part; i++ {
+ handlers[i] = &failPartHandler{
+ tb: tb,
+ failLeft: retry,
+ successPartHandler: &successPartHandler{tb: tb},
+ }
+ }
+
+ return handlers
+}
+
+type mockS3UploadServer struct {
+ *http.ServeMux
+
+ tb testing.TB
+ partHandlers []http.Handler
+}
+
+func (s mockS3UploadServer) handleRequest(w http.ResponseWriter, r *http.Request) {
+ defer func() {
+ if err := r.Body.Close(); err != nil {
+ failRequest(w, 0, "BodyCloseError", fmt.Sprintf("request body close error: %v", err))
+ }
+ }()
+
+ _, hasUploads := r.URL.Query()["uploads"]
+
+ switch {
+ case r.Method == "POST" && hasUploads:
+ // CreateMultipartUpload request
+ w.Header().Set("Content-Length", strconv.Itoa(len(createUploadResp)))
+ w.Write([]byte(createUploadResp))
+ case r.Method == "PUT":
+ partStr := r.URL.Query().Get("partNumber")
+ part, err := strconv.ParseInt(partStr, 10, 64)
+ if err != nil {
+ failRequest(w, 400, "BadRequest", fmt.Sprintf("unable to parse partNumber, %q, %v", partStr, err))
+ return
+ }
+ if part <= 0 || part > int64(len(s.partHandlers)) {
+ failRequest(w, 400, "BadRequest", fmt.Sprintf("invalid partNumber %v", part))
+ return
+ }
+ s.partHandlers[part-1].ServeHTTP(w, r)
+ case r.Method == "POST":
+ // CompleteMultipartUpload request
+ w.Header().Set("Content-Length", strconv.Itoa(len(completeUploadResp)))
+ w.Write([]byte(completeUploadResp))
+ case r.Method == "DELETE":
+ w.Header().Set("Content-Length", strconv.Itoa(len(abortUploadResp)))
+ w.Write([]byte(abortUploadResp))
+ w.WriteHeader(200)
+ default:
+ failRequest(w, 400, "BadRequest", fmt.Sprintf("invalid request %v %v", r.Method, r.URL))
+ }
+}
+
+func createTempFile(t *testing.T, size int64) (*os.File, func(*testing.T), error) {
+ file, err := ioutil.TempFile(os.TempDir(), aws.SDKName+t.Name())
+ if err != nil {
+ return nil, nil, err
+ }
+ filename := file.Name()
+ if err := file.Truncate(size); err != nil {
+ return nil, nil, err
+ }
+
+ return file,
+ func(t *testing.T) {
+ if err := file.Close(); err != nil {
+ t.Errorf("failed to close temp file, %s, %v", filename, err)
+ }
+ if err := os.Remove(filename); err != nil {
+ t.Errorf("failed to remove temp file, %s, %v", filename, err)
+ }
+ },
+ nil
+}
+
+type failPartHandler struct {
+ tb testing.TB
+ failLeft int
+ successPartHandler http.Handler
+}
+
+func (h *failPartHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ defer func() {
+ if err := r.Body.Close(); err != nil {
+ failRequest(w, 0, "BodyCloseError", fmt.Sprintf("request body close error: %v", err))
+ }
+ }()
+
+ if h.failLeft == 0 && h.successPartHandler != nil {
+ h.successPartHandler.ServeHTTP(w, r)
+ return
+ }
+
+ io.Copy(ioutil.Discard, r.Body)
+ failRequest(w, 500, "InternalException", fmt.Sprintf("mock error, partNumber %v", r.URL.Query().Get("partNumber")))
+ h.failLeft--
+}
+
+func failRequest(w http.ResponseWriter, status int, code, msg string) {
+ msg = fmt.Sprintf(baseRequestErrorResp, code, msg)
+ w.Header().Set("Content-Length", strconv.Itoa(len(msg)))
+ w.WriteHeader(status)
+ w.Write([]byte(msg))
+}
+
+type successPartHandler struct {
+ tb testing.TB
+}
+
+func (h *successPartHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ defer func() {
+ if err := r.Body.Close(); err != nil {
+ failRequest(w, 0, "BodyCloseError", fmt.Sprintf("request body close error: %v", err))
+ }
+ }()
+
+ n, err := io.Copy(ioutil.Discard, r.Body)
+ if err != nil {
+ failRequest(w, 400, "BadRequest", fmt.Sprintf("failed to read body, %v", err))
+ return
+ }
+ contentLength := r.Header.Get("Content-Length")
+ expectLength, err := strconv.ParseInt(contentLength, 10, 64)
+ if err != nil {
+ h.tb.Logf("expect content-length, got %q, %v", contentLength, err)
+ failRequest(w, 400, "BadRequest", fmt.Sprintf("unable to get content-length %v", err))
+ return
+ }
+
+ if e, a := expectLength, n; e != a {
+ h.tb.Logf("expect content-length to be %v, got %v", e, a)
+ failRequest(w, 400, "BadRequest", fmt.Sprintf("content-length and body do not match, %v, %v", e, a))
+ return
+ }
+
+ w.Header().Set("Content-Length", strconv.Itoa(len(uploadPartResp)))
+ w.Write([]byte(uploadPartResp))
+}
+
+const createUploadResp = `
+ bucket
+ key
+ abc123
+`
+
+const uploadPartResp = `
+ key
+`
+const baseRequestErrorResp = `
+ %s
+ %s
+ request-id
+ host-id
+`
+
+const completeUploadResp = `
+ bucket
+ key
+ key
+ https://bucket.us-west-2.amazonaws.com/key
+ abc123
+`
+
+const abortUploadResp = ``
+
+func cmpDiff(e, a interface{}) string {
+ if !reflect.DeepEqual(e, a) {
+ return fmt.Sprintf("%v != %v", e, a)
+ }
+ return ""
+}
diff --git a/feature/s3/transfermanager/shared_test.go b/feature/s3/transfermanager/shared_test.go
new file mode 100644
index 00000000000..364423e96c2
--- /dev/null
+++ b/feature/s3/transfermanager/shared_test.go
@@ -0,0 +1,4 @@
+package transfermanager
+
+var buf20MB = make([]byte, 1024*1024*20)
+var buf2MB = make([]byte, 1024*1024*2)
diff --git a/feature/s3/transfermanager/types/types.go b/feature/s3/transfermanager/types/types.go
new file mode 100644
index 00000000000..8a2d877e461
--- /dev/null
+++ b/feature/s3/transfermanager/types/types.go
@@ -0,0 +1,346 @@
+package types
+
+import (
+ "io"
+
+ "github.com/aws/aws-sdk-go-v2/service/s3"
+ "github.com/aws/aws-sdk-go-v2/service/s3/types"
+)
+
+// ReadSeekCloser wraps a io.Reader returning a ReaderSeekerCloser. Allows the
+// SDK to accept an io.Reader that is not also an io.Seeker for unsigned
+// streaming payload API operations.
+//
+// A readSeekCloser wrapping an nonseekable io.Reader used in an API operation's
+// input will prevent that operation being retried in the case of
+// network errors, and cause operation requests to fail if the operation
+// requires payload signing.
+func ReadSeekCloser(r io.Reader) *ReaderSeekerCloser {
+ return &ReaderSeekerCloser{r}
+}
+
+// ReaderSeekerCloser represents a reader that can also delegate io.Seeker and
+// io.Closer interfaces to the underlying object if they are available.
+type ReaderSeekerCloser struct {
+ r io.Reader
+}
+
+// SeekerLen attempts to get the number of bytes remaining at the seeker's
+// current position. Returns the number of bytes remaining or error.
+func SeekerLen(s io.Seeker) (int64, error) {
+ // Determine if the seeker is actually seekable. ReaderSeekerCloser
+ // hides the fact that a io.Readers might not actually be seekable.
+ switch v := s.(type) {
+ case *ReaderSeekerCloser:
+ return v.GetLen()
+ }
+
+ return computeSeekerLength(s)
+}
+
+// GetLen returns the length of the bytes remaining in the underlying reader.
+// Checks first for Len(), then io.Seeker to determine the size of the
+// underlying reader.
+//
+// Will return -1 if the length cannot be determined.
+func (r *ReaderSeekerCloser) GetLen() (int64, error) {
+ if l, ok := r.HasLen(); ok {
+ return int64(l), nil
+ }
+
+ if s, ok := r.r.(io.Seeker); ok {
+ return computeSeekerLength(s)
+ }
+
+ return -1, nil
+}
+
+func computeSeekerLength(s io.Seeker) (int64, error) {
+ curOffset, err := s.Seek(0, io.SeekCurrent)
+ if err != nil {
+ return 0, err
+ }
+
+ endOffset, err := s.Seek(0, io.SeekEnd)
+ if err != nil {
+ return 0, err
+ }
+
+ _, err = s.Seek(curOffset, io.SeekStart)
+ if err != nil {
+ return 0, err
+ }
+
+ return endOffset - curOffset, nil
+}
+
+// HasLen returns the length of the underlying reader if the value implements
+// the Len() int method.
+func (r *ReaderSeekerCloser) HasLen() (int, bool) {
+ type lenner interface {
+ Len() int
+ }
+
+ if lr, ok := r.r.(lenner); ok {
+ return lr.Len(), true
+ }
+
+ return 0, false
+}
+
+// Read reads from the reader up to size of p. The number of bytes read, and
+// error if it occurred will be returned.
+//
+// If the reader is not an io.Reader zero bytes read, and nil error will be
+// returned.
+//
+// Performs the same functionality as io.Reader Read
+func (r *ReaderSeekerCloser) Read(p []byte) (int, error) {
+ switch t := r.r.(type) {
+ case io.Reader:
+ return t.Read(p)
+ }
+ return 0, nil
+}
+
+// Seek sets the offset for the next Read to offset, interpreted according to
+// whence: 0 means relative to the origin of the file, 1 means relative to the
+// current offset, and 2 means relative to the end. Seek returns the new offset
+// and an error, if any.
+//
+// If the ReaderSeekerCloser is not an io.Seeker nothing will be done.
+func (r *ReaderSeekerCloser) Seek(offset int64, whence int) (int64, error) {
+ switch t := r.r.(type) {
+ case io.Seeker:
+ return t.Seek(offset, whence)
+ }
+ return int64(0), nil
+}
+
+// IsSeeker returns if the underlying reader is also a seeker.
+func (r *ReaderSeekerCloser) IsSeeker() bool {
+ _, ok := r.r.(io.Seeker)
+ return ok
+}
+
+// Close closes the ReaderSeekerCloser.
+//
+// If the ReaderSeekerCloser is not an io.Closer nothing will be done.
+func (r *ReaderSeekerCloser) Close() error {
+ switch t := r.r.(type) {
+ case io.Closer:
+ return t.Close()
+ }
+ return nil
+}
+
+// ChecksumAlgorithm indicates the algorithm used to create the checksum for the object
+type ChecksumAlgorithm string
+
+// Enum values for ChecksumAlgorithm
+const (
+ ChecksumAlgorithmCrc32 ChecksumAlgorithm = "CRC32"
+ ChecksumAlgorithmCrc32c = "CRC32C"
+ ChecksumAlgorithmSha1 = "SHA1"
+ ChecksumAlgorithmSha256 = "SHA256"
+)
+
+// ObjectCannedACL defines the canned ACL to apply to the object, see [Canned ACL] in the
+// Amazon S3 User Guide.
+type ObjectCannedACL string
+
+// Enum values for ObjectCannedACL
+const (
+ ObjectCannedACLPrivate ObjectCannedACL = "private"
+ ObjectCannedACLPublicRead = "public-read"
+ ObjectCannedACLPublicReadWrite = "public-read-write"
+ ObjectCannedACLAuthenticatedRead = "authenticated-read"
+ ObjectCannedACLAwsExecRead = "aws-exec-read"
+ ObjectCannedACLBucketOwnerRead = "bucket-owner-read"
+ ObjectCannedACLBucketOwnerFullControl = "bucket-owner-full-control"
+)
+
+// Values returns all known values for ObjectCannedACL. Note that this can be
+// expanded in the future, and so it is only as up to date as the client.
+//
+// The ordering of this slice is not guaranteed to be stable across updates.
+func (ObjectCannedACL) Values() []ObjectCannedACL {
+ return []ObjectCannedACL{
+ "private",
+ "public-read",
+ "public-read-write",
+ "authenticated-read",
+ "aws-exec-read",
+ "bucket-owner-read",
+ "bucket-owner-full-control",
+ }
+}
+
+// ObjectLockLegalHoldStatus specifies whether a legal hold will be applied to this object. For more
+// information about S3 Object Lock, see [Object Lock] in the Amazon S3 User Guide.
+type ObjectLockLegalHoldStatus string
+
+// Enum values for ObjectLockLegalHoldStatus
+const (
+ ObjectLockLegalHoldStatusOn ObjectLockLegalHoldStatus = "ON"
+ ObjectLockLegalHoldStatusOff = "OFF"
+)
+
+// ObjectLockMode is the Object Lock mode that you want to apply to this object.
+type ObjectLockMode string
+
+// Enum values for ObjectLockMode
+const (
+ ObjectLockModeGovernance ObjectLockMode = "GOVERNANCE"
+ ObjectLockModeCompliance = "COMPLIANCE"
+)
+
+// RequestPayer confirms that the requester knows that they will be charged for the request.
+// Bucket owners need not specify this parameter in their requests. If either the
+// source or destination S3 bucket has Requester Pays enabled, the requester will
+// pay for corresponding charges to copy the object. For information about
+// downloading objects from Requester Pays buckets, see [Downloading Objects in Requester Pays Buckets]in the Amazon S3 User
+// Guide.
+type RequestPayer string
+
+// Enum values for RequestPayer
+const (
+ RequestPayerRequester RequestPayer = "requester"
+)
+
+// ServerSideEncryption indicates the server-side encryption algorithm that was used when you store this object
+// in Amazon S3 (for example, AES256 , aws:kms , aws:kms:dsse )
+type ServerSideEncryption string
+
+// Enum values for ServerSideEncryption
+const (
+ ServerSideEncryptionAes256 ServerSideEncryption = "AES256"
+ ServerSideEncryptionAwsKms = "aws:kms"
+ ServerSideEncryptionAwsKmsDsse = "aws:kms:dsse"
+)
+
+// StorageClass specifies class to store newly created
+// objects, which has default value of STANDARD. For more information, see
+// [Storage Classes] in the Amazon S3 User Guide.
+type StorageClass string
+
+// Enum values for StorageClass
+const (
+ StorageClassStandard StorageClass = "STANDARD"
+ StorageClassReducedRedundancy = "REDUCED_REDUNDANCY"
+ StorageClassStandardIa = "STANDARD_IA"
+ StorageClassOnezoneIa = "ONEZONE_IA"
+ StorageClassIntelligentTiering = "INTELLIGENT_TIERING"
+ StorageClassGlacier = "GLACIER"
+ StorageClassDeepArchive = "DEEP_ARCHIVE"
+ StorageClassOutposts = "OUTPOSTS"
+ StorageClassGlacierIr = "GLACIER_IR"
+ StorageClassSnow = "SNOW"
+ StorageClassExpressOnezone = "EXPRESS_ONEZONE"
+)
+
+// CompletedPart includes details of the parts that were uploaded.
+type CompletedPart struct {
+
+ // The base64-encoded, 32-bit CRC32 checksum of the object. This will only be
+ // present if it was uploaded with the object. When you use an API operation on an
+ // object that was uploaded using multipart uploads, this value may not be a direct
+ // checksum value of the full object. Instead, it's a calculation based on the
+ // checksum values of each individual part. For more information about how
+ // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User
+ // Guide.
+ //
+ // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums
+ ChecksumCRC32 *string
+
+ // The base64-encoded, 32-bit CRC32C checksum of the object. This will only be
+ // present if it was uploaded with the object. When you use an API operation on an
+ // object that was uploaded using multipart uploads, this value may not be a direct
+ // checksum value of the full object. Instead, it's a calculation based on the
+ // checksum values of each individual part. For more information about how
+ // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User
+ // Guide.
+ //
+ // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums
+ ChecksumCRC32C *string
+
+ // The base64-encoded, 160-bit SHA-1 digest of the object. This will only be
+ // present if it was uploaded with the object. When you use the API operation on an
+ // object that was uploaded using multipart uploads, this value may not be a direct
+ // checksum value of the full object. Instead, it's a calculation based on the
+ // checksum values of each individual part. For more information about how
+ // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User
+ // Guide.
+ //
+ // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums
+ ChecksumSHA1 *string
+
+ // The base64-encoded, 256-bit SHA-256 digest of the object. This will only be
+ // present if it was uploaded with the object. When you use an API operation on an
+ // object that was uploaded using multipart uploads, this value may not be a direct
+ // checksum value of the full object. Instead, it's a calculation based on the
+ // checksum values of each individual part. For more information about how
+ // checksums are calculated with multipart uploads, see [Checking object integrity]in the Amazon S3 User
+ // Guide.
+ //
+ // [Checking object integrity]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html#large-object-checksums
+ ChecksumSHA256 *string
+
+ // Entity tag returned when the part was uploaded.
+ ETag *string
+
+ // Part number that identifies the part. This is a positive integer between 1 and
+ // 10,000.
+ //
+ // - General purpose buckets - In CompleteMultipartUpload , when a additional
+ // checksum (including x-amz-checksum-crc32 , x-amz-checksum-crc32c ,
+ // x-amz-checksum-sha1 , or x-amz-checksum-sha256 ) is applied to each part, the
+ // PartNumber must start at 1 and the part numbers must be consecutive.
+ // Otherwise, Amazon S3 generates an HTTP 400 Bad Request status code and an
+ // InvalidPartOrder error code.
+ //
+ // - Directory buckets - In CompleteMultipartUpload , the PartNumber must start
+ // at 1 and the part numbers must be consecutive.
+ PartNumber *int32
+}
+
+// MapCompletedPart maps CompletedPart to s3 types
+func (cp CompletedPart) MapCompletedPart() types.CompletedPart {
+ return types.CompletedPart{
+ ChecksumCRC32: cp.ChecksumCRC32,
+ ChecksumCRC32C: cp.ChecksumCRC32C,
+ ChecksumSHA1: cp.ChecksumSHA1,
+ ChecksumSHA256: cp.ChecksumSHA256,
+ ETag: cp.ETag,
+ PartNumber: cp.PartNumber,
+ }
+}
+
+// MapFrom set CompletedPart fields from s3 UploadPartOutput
+func (cp *CompletedPart) MapFrom(resp *s3.UploadPartOutput, partNum *int32) {
+ cp.ChecksumCRC32 = resp.ChecksumCRC32
+ cp.ChecksumCRC32C = resp.ChecksumCRC32C
+ cp.ChecksumSHA1 = resp.ChecksumSHA1
+ cp.ChecksumSHA256 = resp.ChecksumSHA256
+ cp.ETag = resp.ETag
+ cp.PartNumber = partNum
+}
+
+// RequestCharged indicates that the requester was successfully charged for the request.
+type RequestCharged string
+
+// Enum values for RequestCharged
+const (
+ RequestChargedRequester RequestCharged = "requester"
+)
+
+// Metadata provides storing and reading metadata values. Keys may be any
+// comparable value type. Get and set will panic if key is not a comparable
+// value type.
+//
+// Metadata uses lazy initialization, and Set method must be called as an
+// addressable value, or pointer. Not doing so may cause key/value pair to not
+// be set.
+type Metadata struct {
+ values map[interface{}]interface{}
+}