Skip to content

Commit

Permalink
S3 binary cache store
Browse files Browse the repository at this point in the history
  • Loading branch information
Ericson2314 committed Jul 21, 2024
1 parent b0c6f18 commit 40ebe0f
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 127 deletions.
7 changes: 0 additions & 7 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,6 @@ AC_CHECK_HEADERS([aws/s3/S3Client.h],
AC_SUBST(ENABLE_S3, [$enable_s3])
AC_LANG_POP(C++)
if test -n "$enable_s3"; then
declare -a aws_version_tokens=($(printf '#include <aws/core/VersionConfig.h>\nAWS_SDK_VERSION_STRING' | $CPP $CPPFLAGS - | grep -v '^#.*' | sed 's/"//g' | tr '.' ' '))
AC_DEFINE_UNQUOTED([AWS_VERSION_MAJOR], ${aws_version_tokens@<:@0@:>@}, [Major version of aws-sdk-cpp.])
AC_DEFINE_UNQUOTED([AWS_VERSION_MINOR], ${aws_version_tokens@<:@1@:>@}, [Minor version of aws-sdk-cpp.])
AC_DEFINE_UNQUOTED([AWS_VERSION_PATCH], ${aws_version_tokens@<:@2@:>@}, [Patch version of aws-sdk-cpp.])
fi
# Whether to use the Boehm garbage collector.
AC_ARG_ENABLE(gc, AS_HELP_STRING([--enable-gc],[enable garbage collection in the Nix expression evaluator (requires Boehm GC) [default=yes]]),
Expand Down
3 changes: 2 additions & 1 deletion src/libstore/http-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class HttpBinaryCacheStore :
} catch (UploadToHTTP &) {
throw Error("'%s' does not appear to be a binary cache", config->cacheUri);
}
diskCache->createCache(config->cacheUri, storeDir, resolvedSubstConfig.wantMassQuery, resolvedSubstConfig.priority);
diskCache->createCache(
config->cacheUri, storeDir, resolvedSubstConfig.wantMassQuery, resolvedSubstConfig.priority);
}
}

Expand Down
247 changes: 132 additions & 115 deletions src/libstore/s3-binary-cache-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class AwsLogger : public Aws::Utils::Logging::FormattedLogSystem
debug("AWS: %s", chomp(statement));
}

#if !(AWS_VERSION_MAJOR <= 1 && AWS_VERSION_MINOR <= 7 && AWS_VERSION_PATCH <= 115)
#if !(AWS_SDK_VERSION_MAJOR <= 1 && AWS_SDK_VERSION_MINOR <= 7 && AWS_SDK_VERSION_PATCH <= 115)
void Flush() override {}
#endif
};
Expand Down Expand Up @@ -103,7 +103,7 @@ S3Helper::S3Helper(
std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>(profile.c_str())),
*config,
// FIXME: https://github.com/aws/aws-sdk-cpp/issues/759
#if AWS_VERSION_MAJOR == 1 && AWS_VERSION_MINOR < 3
#if AWS_SDK_VERSION_MAJOR == 1 && AWS_SDK_VERSION_MINOR < 3
false,
#else
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
Expand Down Expand Up @@ -188,105 +188,120 @@ S3Helper::FileTransferResult S3Helper::getObject(
}


S3BinaryCacheStore::Config::Descriptions::Descriptions()
: Store::Config::Descriptions{Store::Config::descriptions}
, BinaryCacheStore::Config::Descriptions{BinaryCacheStore::Config::descriptions}
, S3BinaryCacheStoreConfigT<config::SettingInfo>{
.profile{
.name = "profile",
.description = R"(
The name of the AWS configuration profile to use. By default
Nix will use the `default` profile.
)",
},
.region{
.name = "region",
.description = R"(
The region of the S3 bucket. If your bucket is not in
`us–east-1`, you should always explicitly specify the region
parameter.
)",
},
.scheme{
.name = "scheme",
.description = R"(
The scheme used for S3 requests, `https` (default) or `http`. This
option allows you to disable HTTPS for binary caches which don't
support it.
> **Note**
>
> HTTPS should be used if the cache might contain sensitive
> information.
)",
},
.endpoint{
.name = "endpoint",
.description = R"(
The URL of the endpoint of an S3-compatible service such as MinIO.
Do not specify this setting if you're using Amazon S3.
> **Note**
>
> This endpoint must support HTTPS and will use path-based
> addressing instead of virtual host based addressing.
)",
},
.narinfoCompression{
.name = "narinfo-compression",
.description = "Compression method for `.narinfo` files.",
},
.lsCompression{
.name = "ls-compression",
.description = "Compression method for `.ls` files.",
},
.logCompression{
.name = "log-compression",
.description = R"(
Compression method for `log/*` files. It is recommended to
use a compression method supported by most web browsers
(e.g. `brotli`).
)",
},
.multipartUpload{
.name = "multipart-upload",
.description = "Whether to use multi-part uploads.",
},
.bufferSize{
.name = "buffer-size",
.description = "Size (in bytes) of each part in multi-part uploads.",
},
}
{}
static const S3BinaryCacheStoreConfigT<config::SettingInfo> s3BinaryCacheStoreConfigDescriptions = {
.profile{
.name = "profile",
.description = R"(
The name of the AWS configuration profile to use. By default
Nix will use the `default` profile.
)",
},
.region{
.name = "region",
.description = R"(
The region of the S3 bucket. If your bucket is not in
`us–east-1`, you should always explicitly specify the region
parameter.
)",
},
.scheme{
.name = "scheme",
.description = R"(
The scheme used for S3 requests, `https` (default) or `http`. This
option allows you to disable HTTPS for binary caches which don't
support it.
> **Note**
>
> HTTPS should be used if the cache might contain sensitive
> information.
)",
},
.endpoint{
.name = "endpoint",
.description = R"(
The URL of the endpoint of an S3-compatible service such as MinIO.
Do not specify this setting if you're using Amazon S3.
> **Note**
>
> This endpoint must support HTTPS and will use path-based
> addressing instead of virtual host based addressing.
)",
},
.narinfoCompression{
.name = "narinfo-compression",
.description = "Compression method for `.narinfo` files.",
},
.lsCompression{
.name = "ls-compression",
.description = "Compression method for `.ls` files.",
},
.logCompression{
.name = "log-compression",
.description = R"(
Compression method for `log/*` files. It is recommended to
use a compression method supported by most web browsers
(e.g. `brotli`).
)",
},
.multipartUpload{
.name = "multipart-upload",
.description = "Whether to use multi-part uploads.",
},
.bufferSize{
.name = "buffer-size",
.description = "Size (in bytes) of each part in multi-part uploads.",
},
};

#define S3_BINARY_CACHE_STORE_CONFIG_FIELDS(X) \
X(profile), \
X(region), \
X(scheme), \
X(endpoint), \
X(narinfoCompression), \
X(lsCompression), \
X(logCompression), \
X(multipartUpload), \
X(bufferSize),

MAKE_PARSE(S3BinaryCacheStoreConfig, s3BinaryCacheStoreConfig, S3_BINARY_CACHE_STORE_CONFIG_FIELDS)

static S3BinaryCacheStoreConfigT<config::JustValue> s3BinaryCacheStoreConfigDefaults()
{
return {
.profile = {""},
.region = {Aws::Region::US_EAST_1},
.scheme = {""},
.endpoint = {""},
.narinfoCompression = {""},
.lsCompression = {""},
.logCompression = {""},
.multipartUpload = {false},
.bufferSize = {5 * 1024 * 1024},
};
}

const S3BinaryCacheStore::Config::Descriptions S3BinaryCacheStore::Config::descriptions{};
MAKE_APPLY_PARSE(S3BinaryCacheStoreConfig, s3BinaryCacheStoreConfig, S3_BINARY_CACHE_STORE_CONFIG_FIELDS)

S3BinaryCacheStore::Config::S3BinaryCacheStoreConfig(
std::string_view scheme,
std::string_view authority,
const StoreReference::Params & params)
: Store::Config(params)
, BinaryCacheStore::Config(params)
, S3BinaryCacheStoreConfigT<config::JustValue>{
CONFIG_ROW(profile, ""),
CONFIG_ROW(region, Aws::Region::US_EAST_1),
CONFIG_ROW(scheme, ""),
CONFIG_ROW(endpoint, ""),
CONFIG_ROW(narinfoCompression, ""),
CONFIG_ROW(lsCompression, ""),
CONFIG_ROW(logCompression, ""),
CONFIG_ROW(multipartUpload, false),
CONFIG_ROW(bufferSize, 5 * 1024 * 1024),
}
: Store::Config{params}
, BinaryCacheStore::Config{*this, params}
, S3BinaryCacheStoreConfigT<config::JustValue>{s3BinaryCacheStoreConfigApplyParse(params)}
, bucketName{authority}
{
if (bucketName.empty())
throw UsageError("`%s` store requires a bucket name in its Store URI", scheme);
}


S3BinaryCacheStore::S3BinaryCacheStore(const Config & config)
: BinaryCacheStore(config)
S3BinaryCacheStore::S3BinaryCacheStore(ref<const Config> config)
: BinaryCacheStore(*config)
, config{config}
{ }

std::string S3BinaryCacheStoreConfig::doc()
Expand All @@ -297,39 +312,37 @@ std::string S3BinaryCacheStoreConfig::doc()
}


struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual S3BinaryCacheStore
struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStore
{
Stats stats;

S3Helper s3Helper;

S3BinaryCacheStoreImpl(const Config & config)
: Store::Config{config}
, BinaryCacheStore::Config{config}
, S3BinaryCacheStore::Config{config}
, Store{static_cast<const Store::Config &>(*this)}
, BinaryCacheStore{static_cast<const BinaryCacheStore::Config &>(*this)}
, S3BinaryCacheStore{static_cast<const S3BinaryCacheStore::Config &>(*this)}
, s3Helper(profile, region, scheme, endpoint)
S3BinaryCacheStoreImpl(ref<const Config> config)
: Store{*config}
, BinaryCacheStore{*config}
, S3BinaryCacheStore{config}
, s3Helper(config->profile, config->region, config->scheme, config->endpoint)
{
diskCache = getNarInfoDiskCache();
}

std::string getUri() override
{
return "s3://" + bucketName;
return "s3://" + config->bucketName;
}

void init() override
{
if (auto cacheInfo = diskCache->upToDateCacheExists(getUri())) {
if (defaultWantMassQuery)
wantMassQuery.value = cacheInfo->wantMassQuery;
if (defaultPriority)
priority.value = cacheInfo->priority;
resolvedSubstConfig.wantMassQuery.value =
config->storeConfig.wantMassQuery.optValue.value_or(cacheInfo->wantMassQuery);
resolvedSubstConfig.priority.value =
config->storeConfig.priority.optValue.value_or(cacheInfo->priority);
} else {
BinaryCacheStore::init();
diskCache->createCache(getUri(), storeDir, wantMassQuery, priority);
diskCache->createCache(
getUri(), config->storeDir, resolvedSubstConfig.wantMassQuery, resolvedSubstConfig.priority);
}
}

Expand Down Expand Up @@ -358,7 +371,7 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual

auto res = s3Helper.client->HeadObject(
Aws::S3::Model::HeadObjectRequest()
.WithBucket(bucketName)
.WithBucket(config->bucketName)
.WithKey(path));

if (!res.IsSuccess()) {
Expand Down Expand Up @@ -393,11 +406,11 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual

std::call_once(transferManagerCreated, [&]()
{
if (multipartUpload) {
if (config->multipartUpload) {
TransferManagerConfiguration transferConfig(executor.get());

transferConfig.s3Client = s3Helper.client;
transferConfig.bufferSize = bufferSize;
transferConfig.bufferSize = config->bufferSize;

transferConfig.uploadProgressCallback =
[](const TransferManager *transferManager,
Expand All @@ -418,6 +431,8 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual

auto now1 = std::chrono::steady_clock::now();

auto & bucketName = config->bucketName;

if (transferManager) {

if (contentEncoding != "")
Expand Down Expand Up @@ -481,12 +496,12 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
return std::make_shared<std::stringstream>(std::move(compressed));
};

if (narinfoCompression != "" && hasSuffix(path, ".narinfo"))
uploadFile(path, compress(narinfoCompression), mimeType, narinfoCompression);
else if (lsCompression != "" && hasSuffix(path, ".ls"))
uploadFile(path, compress(lsCompression), mimeType, lsCompression);
else if (logCompression != "" && hasPrefix(path, "log/"))
uploadFile(path, compress(logCompression), mimeType, logCompression);
if (config->narinfoCompression != "" && hasSuffix(path, ".narinfo"))
uploadFile(path, compress(config->narinfoCompression), mimeType, config->narinfoCompression);
else if (config->lsCompression != "" && hasSuffix(path, ".ls"))
uploadFile(path, compress(config->lsCompression), mimeType, config->lsCompression);
else if (config->logCompression != "" && hasPrefix(path, "log/"))
uploadFile(path, compress(config->logCompression), mimeType, config->logCompression);
else
uploadFile(path, istream, mimeType, "");
}
Expand All @@ -496,14 +511,14 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
stats.get++;

// FIXME: stream output to sink.
auto res = s3Helper.getObject(bucketName, path);
auto res = s3Helper.getObject(config->bucketName, path);

stats.getBytes += res.data ? res.data->size() : 0;
stats.getTimeMs += res.durationMs;

if (res.data) {
printTalkative("downloaded 's3://%s/%s' (%d bytes) in %d ms",
bucketName, path, res.data->size(), res.durationMs);
config->bucketName, path, res.data->size(), res.durationMs);

sink(*res.data);
} else
Expand All @@ -515,6 +530,8 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual
StorePathSet paths;
std::string marker;

auto & bucketName = config->bucketName;

do {
debug("listing bucket 's3://%s' from key '%s'...", bucketName, marker);

Expand Down Expand Up @@ -555,7 +572,7 @@ struct S3BinaryCacheStoreImpl : virtual S3BinaryCacheStoreConfig, public virtual

ref<Store> S3BinaryCacheStoreImpl::Config::openStore() const
{
return make_ref<S3BinaryCacheStoreImpl>(*this);
return make_ref<S3BinaryCacheStoreImpl>(ref{shared_from_this()});
}

static RegisterStoreImplementation<S3BinaryCacheStoreImpl::Config> regS3BinaryCacheStore;
Expand Down
Loading

0 comments on commit 40ebe0f

Please sign in to comment.