diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index 3061c4f44..a258b6867 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -16,14 +16,14 @@ use std::collections::HashMap; use serde::Deserialize; -use crate::schedulers::SchedulerConfig; +use crate::schedulers::SchedulerSpec; use crate::serde_utils::{ convert_data_size_with_shellexpand, convert_duration_with_shellexpand, convert_numeric_with_shellexpand, convert_optional_numeric_with_shellexpand, convert_optional_string_with_shellexpand, convert_string_with_shellexpand, convert_vec_string_with_shellexpand, }; -use crate::stores::{ClientTlsConfig, ConfigDigestHashFunction, StoreConfig, StoreRefName}; +use crate::stores::{ClientTlsConfig, ConfigDigestHashFunction, StoreRefName, StoreSpec}; /// Name of the scheduler. This type will be used when referencing a /// scheduler in the `CasConfig::schedulers`'s map key. @@ -215,12 +215,12 @@ pub struct BepConfig { #[serde(deny_unknown_fields)] pub struct ServicesConfig { /// The Content Addressable Storage (CAS) backend config. - /// The key is the instance_name used in the protocol and the + /// The key is the `instance_name` used in the protocol and the /// value is the underlying CAS store config. pub cas: Option>, /// The Action Cache (AC) backend config. - /// The key is the instance_name used in the protocol and the + /// The key is the `instance_name` used in the protocol and the /// value is the underlying AC store config. pub ac: Option>, @@ -413,7 +413,7 @@ pub struct ServerConfig { pub enum WorkerProperty { /// List of static values. /// Note: Generally there should only ever be 1 value, but if the platform - /// property key is PropertyType::Priority it may have more than one value. + /// property key is `PropertyType::Priority` it may have more than one value. #[serde(deserialize_with = "convert_vec_string_with_shellexpand")] values(Vec), @@ -511,11 +511,11 @@ pub struct UploadActionResultConfig { /// Default: {No uploading is done} pub ac_store: Option, - /// In which situations should the results be published to the ac_store, if - /// set to SuccessOnly then only results with an exit code of 0 will be + /// In which situations should the results be published to the `ac_store`, + /// if set to `SuccessOnly` then only results with an exit code of 0 will be /// uploaded, if set to Everything all completed results will be uploaded. /// - /// Default: UploadCacheResultsStrategy::SuccessOnly + /// Default: `UploadCacheResultsStrategy::SuccessOnly` #[serde(default)] pub upload_ac_results_strategy: UploadCacheResultsStrategy, @@ -529,18 +529,18 @@ pub struct UploadActionResultConfig { /// to the CAS key-value lookup format and are always a `HistoricalExecuteResponse` /// serialized message. /// - /// Default: UploadCacheResultsStrategy::FailuresOnly + /// Default: `UploadCacheResultsStrategy::FailuresOnly` #[serde(default)] pub upload_historical_results_strategy: Option, /// Template to use for the `ExecuteResponse.message` property. This message /// is attached to the response before it is sent to the client. The following /// special variables are supported: - /// - {digest_function} - Digest function used to calculate the action digest. - /// - {action_digest_hash} - Action digest hash. - /// - {action_digest_size} - Action digest size. - /// - {historical_results_hash} - HistoricalExecuteResponse digest hash. - /// - {historical_results_size} - HistoricalExecuteResponse digest size. + /// - `digest_function`: Digest function used to calculate the action digest. + /// - `action_digest_hash`: Action digest hash. + /// - `action_digest_size`: Action digest size. + /// - `historical_results_hash`: `HistoricalExecuteResponse` digest hash. + /// - `historical_results_size`: `HistoricalExecuteResponse` digest size. /// /// A common use case of this is to provide a link to the web page that /// contains more useful information for the user. @@ -571,7 +571,7 @@ pub struct LocalWorkerConfig { #[serde(default, deserialize_with = "convert_string_with_shellexpand")] pub name: String, - /// Endpoint which the worker will connect to the scheduler's WorkerApiService. + /// Endpoint which the worker will connect to the scheduler's `WorkerApiService`. pub worker_api_endpoint: EndpointConfig, /// The maximum time an action is allowed to run. If a task requests for a timeout @@ -582,10 +582,10 @@ pub struct LocalWorkerConfig { pub max_action_timeout: usize, /// If timeout is handled in `entrypoint` or another wrapper script. - /// If set to true NativeLink will not honor the timeout the action requested - /// and instead will always force kill the action after max_action_timeout + /// If set to true `NativeLink` will not honor the timeout the action requested + /// and instead will always force kill the action after `max_action_timeout` /// has been reached. If this is set to false, the smaller value of the action's - /// timeout and max_action_timeout will be used to which NativeLink will kill + /// timeout and `max_action_timeout` will be used to which `NativeLink` will kill /// the action. /// /// The real timeout can be received via an environment variable set in: @@ -597,7 +597,7 @@ pub struct LocalWorkerConfig { /// the action. In this case, action will likely be wrapped in another program, /// like `timeout` and propagate timeouts via `EnvironmentSource::SideChannelFile`. /// - /// Default: false (NativeLink fully handles timeouts) + /// Default: false (`NativeLink` fully handles timeouts) #[serde(default)] pub timeout_handled_externally: bool, @@ -633,10 +633,10 @@ pub struct LocalWorkerConfig { /// The directory work jobs will be executed from. This directory will be fully /// managed by the worker service and will be purged on startup. - /// This directory and the directory referenced in local_filesystem_store_ref's - /// stores::FilesystemStore::content_path must be on the same filesystem. + /// This directory and the directory referenced in `local_filesystem_store_ref`'s + /// `stores::FilesystemStore::content_path` must be on the same filesystem. /// Hardlinks will be used when placing files that are accessible to the jobs - /// that are sourced from local_filesystem_store_ref's content_path. + /// that are sourced from `local_filesystem_store_ref`'s `content_path`. #[serde(deserialize_with = "convert_string_with_shellexpand")] pub work_directory: String, @@ -679,7 +679,7 @@ pub struct GlobalConfig { /// If a file descriptor is idle for this many milliseconds, it will be closed. /// In the event a client or store takes a long time to send or receive data /// the file descriptor will be closed, and since `max_open_files` blocks new - /// open_file requests until a slot opens up, it will allow new requests to be + /// `open_file` requests until a slot opens up, it will allow new requests to be /// processed. If a read or write is attempted on a closed file descriptor, the /// file will be reopened and the operation will continue. /// @@ -707,7 +707,7 @@ pub struct GlobalConfig { /// Default hash function to use while uploading blobs to the CAS when not set /// by client. /// - /// Default: ConfigDigestHashFunction::sha256 + /// Default: `ConfigDigestHashFunction::sha256` pub default_digest_hash_function: Option, /// Default digest size to use for health check when running @@ -725,7 +725,7 @@ pub struct GlobalConfig { pub struct CasConfig { /// List of stores available to use in this config. /// The keys can be used in other configs when needing to reference a store. - pub stores: HashMap, + pub stores: HashMap, /// Worker configurations used to execute jobs. pub workers: Option>, @@ -733,7 +733,7 @@ pub struct CasConfig { /// List of schedulers available to use in this config. /// The keys can be used in other configs when needing to reference a /// scheduler. - pub schedulers: Option>, + pub schedulers: Option>, /// Servers to setup for this process. pub servers: Vec, diff --git a/nativelink-config/src/schedulers.rs b/nativelink-config/src/schedulers.rs index 588b6021d..35d84a76e 100644 --- a/nativelink-config/src/schedulers.rs +++ b/nativelink-config/src/schedulers.rs @@ -21,11 +21,11 @@ use crate::stores::{GrpcEndpoint, Retry, StoreRefName}; #[allow(non_camel_case_types)] #[derive(Deserialize, Debug)] -pub enum SchedulerConfig { - simple(SimpleScheduler), - grpc(GrpcScheduler), - cache_lookup(CacheLookupScheduler), - property_modifier(PropertyModifierScheduler), +pub enum SchedulerSpec { + simple(SimpleSpec), + grpc(GrpcSpec), + cache_lookup(CacheLookupSpec), + property_modifier(PropertyModifierSpec), } /// When the scheduler matches tasks to workers that are capable of running @@ -67,7 +67,7 @@ pub enum WorkerAllocationStrategy { #[derive(Deserialize, Debug, Default)] #[serde(deny_unknown_fields)] -pub struct SimpleScheduler { +pub struct SimpleSpec { /// A list of supported platform properties mapped to how these properties /// are used when the scheduler looks for worker nodes capable of running /// the task. @@ -81,21 +81,21 @@ pub struct SimpleScheduler { /// { "cpu_count": "8", "cpu_arch": "arm" } /// ``` /// Will result in the scheduler filtering out any workers that do not have - /// "cpu_arch" = "arm" and filter out any workers that have less than 8 cpu + /// `"cpu_arch" = "arm"` and filter out any workers that have less than 8 cpu /// cores available. /// /// The property names here must match the property keys provided by the /// worker nodes when they join the pool. In other words, the workers will /// publish their capabilities to the scheduler when they join the worker /// pool. If the worker fails to notify the scheduler of its (for example) - /// "cpu_arch", the scheduler will never send any jobs to it, if all jobs - /// have the "cpu_arch" label. There is no special treatment of any platform + /// `"cpu_arch"`, the scheduler will never send any jobs to it, if all jobs + /// have the `"cpu_arch"` label. There is no special treatment of any platform /// property labels other and entirely driven by worker configs and this /// config. pub supported_platform_properties: Option>, /// The amount of time to retain completed actions in memory for in case - /// a WaitExecution is called after the action has completed. + /// a `WaitExecution` is called after the action has completed. /// Default: 60 (seconds) #[serde(default, deserialize_with = "convert_duration_with_shellexpand")] pub retain_completed_for_s: u32, @@ -144,7 +144,7 @@ pub enum ExperimentalSimpleSchedulerBackend { #[serde(deny_unknown_fields)] pub struct ExperimentalRedisSchedulerBackend { /// A reference to the redis store to use for the scheduler. - /// Note: This MUST resolve to a RedisStore. + /// Note: This MUST resolve to a `RedisSpec`. pub redis_store: StoreRefName, } @@ -154,7 +154,7 @@ pub struct ExperimentalRedisSchedulerBackend { /// build at the main scheduler directly though. #[derive(Deserialize, Debug)] #[serde(deny_unknown_fields)] -pub struct GrpcScheduler { +pub struct GrpcSpec { /// The upstream scheduler to forward requests to. pub endpoint: GrpcEndpoint, @@ -176,14 +176,14 @@ pub struct GrpcScheduler { #[derive(Deserialize, Debug)] #[serde(deny_unknown_fields)] -pub struct CacheLookupScheduler { +pub struct CacheLookupSpec { /// The reference to the action cache store used to return cached /// actions from rather than running them again. - /// To prevent unintended issues, this store should probably be a CompletenessCheckingStore. + /// To prevent unintended issues, this store should probably be a `CompletenessCheckingSpec`. pub ac_store: StoreRefName, /// The nested scheduler to use if cache lookup fails. - pub scheduler: Box, + pub scheduler: Box, } #[derive(Deserialize, Debug, Clone)] @@ -206,7 +206,7 @@ pub enum PropertyModification { #[derive(Deserialize, Debug)] #[serde(deny_unknown_fields)] -pub struct PropertyModifierScheduler { +pub struct PropertyModifierSpec { /// A list of modifications to perform to incoming actions for the nested /// scheduler. These are performed in order and blindly, so removing a /// property that doesn't exist is fine and overwriting an existing property @@ -215,5 +215,5 @@ pub struct PropertyModifierScheduler { pub modifications: Vec, /// The nested scheduler to use after modifying the properties. - pub scheduler: Box, + pub scheduler: Box, } diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index 9c4e21a87..7e6e31821 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -38,7 +38,7 @@ pub enum ConfigDigestHashFunction { #[allow(non_camel_case_types)] #[derive(Serialize, Deserialize, Debug, Clone)] -pub enum StoreConfig { +pub enum StoreSpec { /// Memory store will store all data in a hashmap in memory. /// /// **Example JSON Config:** @@ -52,7 +52,7 @@ pub enum StoreConfig { /// } /// ``` /// - memory(MemoryStore), + memory(MemorySpec), /// S3 store will use Amazon's S3 service as a backend to store /// the files. This configuration can be used to share files @@ -76,7 +76,7 @@ pub enum StoreConfig { /// } /// ``` /// - experimental_s3_store(S3Store), + experimental_s3_store(S3Spec), /// Verify store is used to apply verifications to an underlying /// store implementation. It is strongly encouraged to validate @@ -100,7 +100,7 @@ pub enum StoreConfig { /// } /// ``` /// - verify(Box), + verify(Box), /// Completeness checking store verifies if the /// output files & folders exist in the CAS before forwarding @@ -128,7 +128,7 @@ pub enum StoreConfig { /// } /// ``` /// - completeness_checking(Box), + completeness_checking(Box), /// A compression store that will compress the data inbound and /// outbound. There will be a non-trivial cost to compress and @@ -156,7 +156,7 @@ pub enum StoreConfig { /// } /// ``` /// - compression(Box), + compression(Box), /// A dedup store will take the inputs and run a rolling hash /// algorithm on them to slice the input into smaller parts then @@ -175,9 +175,9 @@ pub enum StoreConfig { /// * Content is not compressed or encrypted /// * Uploading or downloading from `content_store` is the bottleneck. /// - /// Note: This store pairs well when used with CompressionStore as - /// the `content_store`, but never put DedupStore as the backend of - /// CompressionStore as it will negate all the gains. + /// Note: This store pairs well when used with `CompressionSpec` as + /// the `content_store`, but never put `DedupSpec` as the backend of + /// `CompressionSpec` as it will negate all the gains. /// /// Note: When running `.has()` on this store, it will only check /// to see if the entry exists in the `index_store` and not check @@ -221,10 +221,10 @@ pub enum StoreConfig { /// } /// ``` /// - dedup(Box), + dedup(Box), /// Existence store will wrap around another store and cache calls - /// to has so that subsequent has_with_results calls will be + /// to has so that subsequent `has_with_results` calls will be /// faster. This is useful for cases when you have a store that /// is slow to respond to has calls. /// Note: This store should only be used on CAS stores. @@ -248,9 +248,9 @@ pub enum StoreConfig { /// } /// ``` /// - existence_cache(Box), + existence_cache(Box), - /// FastSlow store will first try to fetch the data from the `fast` + /// `FastSlow` store will first try to fetch the data from the `fast` /// store and then if it does not exist try the `slow` store. /// When the object does exist in the `slow` store, it will copy /// the data to the `fast` store while returning the data. @@ -291,7 +291,7 @@ pub enum StoreConfig { /// } /// ``` /// - fast_slow(Box), + fast_slow(Box), /// Shards the data to multiple stores. This is useful for cases /// when you want to distribute the load across multiple stores. @@ -313,7 +313,7 @@ pub enum StoreConfig { /// } /// ``` /// - shard(ShardStore), + shard(ShardSpec), /// Stores the data on the filesystem. This store is designed for /// local persistent storage. Restarts of this program should restore @@ -334,12 +334,12 @@ pub enum StoreConfig { /// } /// ``` /// - filesystem(FilesystemStore), + filesystem(FilesystemSpec), /// Store used to reference a store in the root store manager. /// This is useful for cases when you want to share a store in different /// nested stores. Example, you may want to share the same memory store - /// used for the action cache, but use a FastSlowStore and have the fast + /// used for the action cache, but use a `FastSlowSpec` and have the fast /// store also share the memory store for efficiency. /// /// **Example JSON Config:** @@ -349,14 +349,14 @@ pub enum StoreConfig { /// } /// ``` /// - ref_store(RefStore), + ref_store(RefSpec), /// Uses the size field of the digest to separate which store to send the /// data. This is useful for cases when you'd like to put small objects /// in one store and large objects in another store. This should only be /// used if the size field is the real size of the content, in other /// words, don't use on AC (Action Cache) stores. Any store where you can - /// safely use VerifyStore.verify_size = true, this store should be safe + /// safely use `VerifySpec.verify_size = true`, this store should be safe /// to use (ie: CAS stores). /// /// **Example JSON Config:** @@ -377,7 +377,7 @@ pub enum StoreConfig { /// } /// ``` /// - size_partitioning(Box), + size_partitioning(Box), /// This store will pass-through calls to another GRPC store. This store /// is not designed to be used as a sub-store of another store, but it @@ -400,11 +400,11 @@ pub enum StoreConfig { /// } /// ``` /// - grpc(GrpcStore), + grpc(GrpcSpec), /// Stores data in any stores compatible with Redis APIs. /// - /// Pairs well with SizePartitioning and/or FastSlow stores. + /// Pairs well with `SizePartitioning` and/or `FastSlow` stores. /// Ideal for accepting small object sizes as most redis store /// services have a max file upload of between 256Mb-512Mb. /// @@ -417,10 +417,10 @@ pub enum StoreConfig { /// } /// ``` /// - redis_store(RedisStore), + redis_store(RedisSpec), /// Noop store is a store that sends streams into the void and all data - /// retrieval will return 404 (NotFound). This can be useful for cases + /// retrieval will return 404 (`NotFound`). This can be useful for cases /// where you may need to partition your data and part of your data needs /// to be discarded. /// @@ -429,7 +429,7 @@ pub enum StoreConfig { /// "noop": {} /// ``` /// - noop, + noop(NoopSpec), } /// Configuration for an individual shard of the store. @@ -437,7 +437,7 @@ pub enum StoreConfig { #[serde(deny_unknown_fields)] pub struct ShardConfig { /// Store to shard the data to. - pub store: StoreConfig, + pub store: StoreSpec, /// The weight of the store. This is used to determine how much data /// should be sent to the store. The actual percentage is the sum of @@ -449,28 +449,28 @@ pub struct ShardConfig { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] -pub struct ShardStore { +pub struct ShardSpec { /// Stores to shard the data to. pub stores: Vec, } #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] -pub struct SizePartitioningStore { +pub struct SizePartitioningSpec { /// Size to partition the data on. #[serde(deserialize_with = "convert_data_size_with_shellexpand")] pub size: u64, /// Store to send data when object is < (less than) size. - pub lower_store: StoreConfig, + pub lower_store: StoreSpec, /// Store to send data when object is >= (less than eq) size. - pub upper_store: StoreConfig, + pub upper_store: StoreSpec, } #[derive(Serialize, Deserialize, Debug, Default, Clone)] #[serde(deny_unknown_fields)] -pub struct RefStore { +pub struct RefSpec { /// Name of the store under the root "stores" config object. #[serde(deserialize_with = "convert_string_with_shellexpand")] pub name: String, @@ -478,7 +478,7 @@ pub struct RefStore { #[derive(Serialize, Deserialize, Debug, Default, Clone)] #[serde(deny_unknown_fields)] -pub struct FilesystemStore { +pub struct FilesystemSpec { /// Path on the system where to store the actual content. This is where /// the bulk of the data will be placed. /// On service bootup this folder will be scanned and all files will be @@ -516,19 +516,19 @@ pub struct FilesystemStore { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] -pub struct FastSlowStore { +pub struct FastSlowSpec { /// Fast store that will be attempted to be contacted before reaching /// out to the `slow` store. - pub fast: StoreConfig, + pub fast: StoreSpec, /// If the object does not exist in the `fast` store it will try to /// get it from this store. - pub slow: StoreConfig, + pub slow: StoreSpec, } #[derive(Serialize, Deserialize, Debug, Default, Clone)] #[serde(deny_unknown_fields)] -pub struct MemoryStore { +pub struct MemorySpec { /// Policy used to evict items out of the store. Failure to set this /// value will cause items to never be removed from the store causing /// infinite memory usage. @@ -537,14 +537,14 @@ pub struct MemoryStore { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] -pub struct DedupStore { +pub struct DedupSpec { /// Store used to store the index of each dedup slice. This store /// should generally be fast and small. - pub index_store: StoreConfig, + pub index_store: StoreSpec, /// The store where the individual chunks will be uploaded. This /// store should generally be the slower & larger store. - pub content_store: StoreConfig, + pub content_store: StoreSpec, /// Minimum size that a chunk will be when slicing up the content. /// Note: This setting can be increased to improve performance @@ -561,7 +561,7 @@ pub struct DedupStore { /// /// This value will also be about the threshold used to determine /// if we should even attempt to dedup the entry or just forward - /// it directly to the content_store without an index. The actual + /// it directly to the `content_store` without an index. The actual /// value will be about `normal_size * 1.3` due to implementation /// details. /// @@ -592,13 +592,13 @@ pub struct DedupStore { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] -pub struct ExistenceCacheStore { +pub struct ExistenceCacheSpec { /// The underlying store wrap around. All content will first flow /// through self before forwarding to backend. In the event there /// is an error detected in self, the connection to the backend /// will be terminated, and early termination should always cause /// updates to fail on the backend. - pub backend: StoreConfig, + pub backend: StoreSpec, /// Policy used to evict items out of the store. Failure to set this /// value will cause items to never be removed from the store causing @@ -608,13 +608,13 @@ pub struct ExistenceCacheStore { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] -pub struct VerifyStore { +pub struct VerifySpec { /// The underlying store wrap around. All content will first flow /// through self before forwarding to backend. In the event there /// is an error detected in self, the connection to the backend /// will be terminated, and early termination should always cause /// updates to fail on the backend. - pub backend: StoreConfig, + pub backend: StoreSpec, /// If set the store will verify the size of the data before accepting /// an upload of data. @@ -634,13 +634,13 @@ pub struct VerifyStore { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] -pub struct CompletenessCheckingStore { +pub struct CompletenessCheckingSpec { /// The underlying store that will have it's results validated before sending to client. - pub backend: StoreConfig, + pub backend: StoreSpec, /// When a request is made, the results are decoded and all output digests/files are verified /// to exist in this CAS store before returning success. - pub cas_store: StoreConfig, + pub cas_store: StoreSpec, } #[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone, Copy)] @@ -655,9 +655,9 @@ pub struct Lz4Config { pub block_size: u32, /// Maximum size allowed to attempt to deserialize data into. - /// This is needed because the block_size is embedded into the data + /// This is needed because the `block_size` is embedded into the data /// so if there was a bad actor, they could upload an extremely large - /// block_size'ed entry and we'd allocate a large amount of memory + /// `block_size`'ed entry and we'd allocate a large amount of memory /// when retrieving the data. To prevent this from happening, we /// allow you to specify the maximum that we'll attempt deserialize. /// @@ -681,13 +681,13 @@ pub enum CompressionAlgorithm { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] -pub struct CompressionStore { +pub struct CompressionSpec { /// The underlying store wrap around. All content will first flow /// through self before forwarding to backend. In the event there /// is an error detected in self, the connection to the backend /// will be terminated, and early termination should always cause /// updates to fail on the backend. - pub backend: StoreConfig, + pub backend: StoreSpec, /// The compression algorithm to use. pub compression_algorithm: CompressionAlgorithm, @@ -696,7 +696,7 @@ pub struct CompressionStore { /// Eviction policy always works on LRU (Least Recently Used). Any time an entry /// is touched it updates the timestamp. Inserts and updates will execute the /// eviction policy removing any expired entries and/or the oldest entries -/// until the store size becomes smaller than max_bytes. +/// until the store size becomes smaller than `max_bytes`. #[derive(Serialize, Deserialize, Debug, Default, Clone)] #[serde(deny_unknown_fields)] pub struct EvictionPolicy { @@ -705,8 +705,8 @@ pub struct EvictionPolicy { #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")] pub max_bytes: usize, - /// When eviction starts based on hitting max_bytes, continue until - /// max_bytes - evict_bytes is met to create a low watermark. This stops + /// When eviction starts based on hitting `max_bytes`, continue until + /// `max_bytes - evict_bytes` is met to create a low watermark. This stops /// operations from thrashing when the store is close to the limit. /// Default: 0 #[serde(default, deserialize_with = "convert_data_size_with_shellexpand")] @@ -726,7 +726,7 @@ pub struct EvictionPolicy { #[derive(Serialize, Deserialize, Debug, Default, Clone)] #[serde(deny_unknown_fields)] -pub struct S3Store { +pub struct S3Spec { /// S3 region. Usually us-east-1, us-west-2, af-south-1, exc... #[serde(default, deserialize_with = "convert_string_with_shellexpand")] pub region: String, @@ -746,7 +746,7 @@ pub struct S3Store { /// If the number of seconds since the `last_modified` time of the object /// is greater than this value, the object will not be considered /// "existing". This allows for external tools to delete objects that - /// have not been uploaded in a long time. If a client receives a NotFound + /// have not been uploaded in a long time. If a client receives a `NotFound` /// the client should re-upload the object. /// /// There should be sufficient buffer time between how long the expiration @@ -765,7 +765,7 @@ pub struct S3Store { /// Default: 5MB. pub max_retry_buffer_per_request: Option, - /// Maximum number of concurrent UploadPart requests per MultipartUpload. + /// Maximum number of concurrent `UploadPart` requests per `MultipartUpload`. /// /// Default: 10. pub multipart_max_concurrent_uploads: Option, @@ -825,8 +825,8 @@ pub struct GrpcEndpoint { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] -pub struct GrpcStore { - /// Instance name for GRPC calls. Proxy calls will have the instance_name changed to this. +pub struct GrpcSpec { + /// Instance name for GRPC calls. Proxy calls will have the `instance_name` changed to this. #[serde(default, deserialize_with = "convert_string_with_shellexpand")] pub instance_name: String, @@ -875,9 +875,9 @@ pub enum ErrorCode { } #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct RedisStore { +pub struct RedisSpec { /// The hostname or IP address of the Redis server. - /// Ex: ["redis://username:password@redis-server-url:6380/99"] + /// Ex: `["redis://username:password@redis-server-url:6380/99"]` /// 99 Represents database ID, 6380 represents the port. #[serde(deserialize_with = "convert_vec_string_with_shellexpand")] pub addresses: Vec, @@ -971,8 +971,8 @@ pub struct RedisStore { /// This is used to limit the amount of memory used when uploading /// large objects to the redis server. A good rule of thumb is to /// think of the data as: - /// AVAIL_MEMORY / (read_chunk_size * max_chunk_uploads_per_update) = THORETICAL_MAX_CONCURRENT_UPLOADS - /// (note: it is a good idea to divide AVAIL_MAX_MEMORY by ~10 to account for other memory usage) + /// `AVAIL_MEMORY / (read_chunk_size * max_chunk_uploads_per_update) = THORETICAL_MAX_CONCURRENT_UPLOADS` + /// (note: it is a good idea to divide `AVAIL_MAX_MEMORY` by ~10 to account for other memory usage) /// /// Default: 10 #[serde(default, deserialize_with = "convert_numeric_with_shellexpand")] @@ -981,12 +981,14 @@ pub struct RedisStore { /// Retry configuration to use when a network request fails. /// See the `Retry` struct for more information. /// + /// ```txt /// Default: Retry { /// max_retries: 0, /* unlimited */ /// delay: 0.1, /* 100ms */ /// jitter: 0.5, /* 50% */ /// retry_on_errors: None, /* not used in redis store */ /// } + /// ``` #[serde(default)] pub retry: Retry, } @@ -1000,6 +1002,9 @@ pub enum RedisMode { Standard, } +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct NoopSpec {} + /// Retry configuration. This configuration is exponential and each iteration /// a jitter as a percentage is applied of the calculated delay. For example: /// ```haskell @@ -1047,14 +1052,14 @@ pub struct Retry { /// A list of error codes to retry on, if this is not set then the default /// error codes to retry on are used. These default codes are the most /// likely to be non-permanent. - /// - Unknown - /// - Cancelled - /// - DeadlineExceeded - /// - ResourceExhausted - /// - Aborted - /// - Internal - /// - Unavailable - /// - DataLoss + /// - `Unknown` + /// - `Cancelled` + /// - `DeadlineExceeded` + /// - `ResourceExhausted` + /// - `Aborted` + /// - `Internal` + /// - `Unavailable` + /// - `DataLoss` #[serde(default)] pub retry_on_errors: Option>, } diff --git a/nativelink-scheduler/src/default_scheduler_factory.rs b/nativelink-scheduler/src/default_scheduler_factory.rs index daa110469..b66ca84fc 100644 --- a/nativelink-scheduler/src/default_scheduler_factory.rs +++ b/nativelink-scheduler/src/default_scheduler_factory.rs @@ -15,7 +15,9 @@ use std::sync::Arc; use std::time::SystemTime; -use nativelink_config::schedulers::{ExperimentalSimpleSchedulerBackend, SchedulerConfig}; +use nativelink_config::schedulers::{ + ExperimentalSimpleSchedulerBackend, SchedulerSpec, SimpleSpec, +}; use nativelink_config::stores::EvictionPolicy; use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_store::redis_store::RedisStore; @@ -42,27 +44,27 @@ pub type SchedulerFactoryResults = ( ); pub fn scheduler_factory( - scheduler_type_cfg: &SchedulerConfig, + spec: &SchedulerSpec, store_manager: &StoreManager, ) -> Result { - inner_scheduler_factory(scheduler_type_cfg, store_manager) + inner_scheduler_factory(spec, store_manager) } fn inner_scheduler_factory( - scheduler_type_cfg: &SchedulerConfig, + spec: &SchedulerSpec, store_manager: &StoreManager, ) -> Result { - let scheduler: SchedulerFactoryResults = match scheduler_type_cfg { - SchedulerConfig::simple(config) => { - simple_scheduler_factory(config, store_manager, SystemTime::now)? + let scheduler: SchedulerFactoryResults = match spec { + SchedulerSpec::simple(spec) => { + simple_scheduler_factory(spec, store_manager, SystemTime::now)? } - SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(config)?)), None), - SchedulerConfig::cache_lookup(config) => { + SchedulerSpec::grpc(spec) => (Some(Arc::new(GrpcScheduler::new(spec)?)), None), + SchedulerSpec::cache_lookup(spec) => { let ac_store = store_manager - .get_store(&config.ac_store) - .err_tip(|| format!("'ac_store': '{}' does not exist", config.ac_store))?; + .get_store(&spec.ac_store) + .err_tip(|| format!("'ac_store': '{}' does not exist", spec.ac_store))?; let (action_scheduler, worker_scheduler) = - inner_scheduler_factory(&config.scheduler, store_manager) + inner_scheduler_factory(&spec.scheduler, store_manager) .err_tip(|| "In nested CacheLookupScheduler construction")?; let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new( ac_store, @@ -70,12 +72,12 @@ fn inner_scheduler_factory( )?); (Some(cache_lookup_scheduler), worker_scheduler) } - SchedulerConfig::property_modifier(config) => { + SchedulerSpec::property_modifier(spec) => { let (action_scheduler, worker_scheduler) = - inner_scheduler_factory(&config.scheduler, store_manager) + inner_scheduler_factory(&spec.scheduler, store_manager) .err_tip(|| "In nested PropertyModifierScheduler construction")?; let property_modifier_scheduler = Arc::new(PropertyModifierScheduler::new( - config, + spec, action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?, )); (Some(property_modifier_scheduler), worker_scheduler) @@ -86,11 +88,11 @@ fn inner_scheduler_factory( } fn simple_scheduler_factory( - config: &nativelink_config::schedulers::SimpleScheduler, + spec: &SimpleSpec, store_manager: &StoreManager, now_fn: fn() -> SystemTime, ) -> Result { - match config + match spec .experimental_backend .as_ref() .unwrap_or(&ExperimentalSimpleSchedulerBackend::memory) @@ -98,12 +100,12 @@ fn simple_scheduler_factory( ExperimentalSimpleSchedulerBackend::memory => { let task_change_notify = Arc::new(Notify::new()); let awaited_action_db = memory_awaited_action_db_factory( - config.retain_completed_for_s, + spec.retain_completed_for_s, &task_change_notify.clone(), SystemTime::now, ); let (action_scheduler, worker_scheduler) = - SimpleScheduler::new(config, awaited_action_db, task_change_notify); + SimpleScheduler::new(spec, awaited_action_db, task_change_notify); Ok((Some(action_scheduler), Some(worker_scheduler))) } ExperimentalSimpleSchedulerBackend::redis(redis_config) => { @@ -133,7 +135,7 @@ fn simple_scheduler_factory( ) .err_tip(|| "In state_manager_factory::redis_state_manager")?; let (action_scheduler, worker_scheduler) = - SimpleScheduler::new(config, awaited_action_db, task_change_notify); + SimpleScheduler::new(spec, awaited_action_db, task_change_notify); Ok((Some(action_scheduler), Some(worker_scheduler))) } } diff --git a/nativelink-scheduler/src/grpc_scheduler.rs b/nativelink-scheduler/src/grpc_scheduler.rs index 38b0e71b1..6db7b9201 100644 --- a/nativelink-scheduler/src/grpc_scheduler.rs +++ b/nativelink-scheduler/src/grpc_scheduler.rs @@ -20,6 +20,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::stream::unfold; use futures::{StreamExt, TryFutureExt}; +use nativelink_config::schedulers::GrpcSpec; use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; use nativelink_proto::build::bazel::remote::execution::v2::capabilities_client::CapabilitiesClient; @@ -91,10 +92,10 @@ pub struct GrpcScheduler { } impl GrpcScheduler { - pub fn new(config: &nativelink_config::schedulers::GrpcScheduler) -> Result { - let jitter_amt = config.retry.jitter; + pub fn new(spec: &GrpcSpec) -> Result { + let jitter_amt = spec.retry.jitter; Self::new_with_jitter( - config, + spec, Box::new(move |delay: Duration| { if jitter_amt == 0. { return delay; @@ -107,23 +108,23 @@ impl GrpcScheduler { } pub fn new_with_jitter( - config: &nativelink_config::schedulers::GrpcScheduler, + spec: &GrpcSpec, jitter_fn: Box Duration + Send + Sync>, ) -> Result { - let endpoint = tls_utils::endpoint(&config.endpoint)?; + let endpoint = tls_utils::endpoint(&spec.endpoint)?; let jitter_fn = Arc::new(jitter_fn); Ok(Self { supported_props: Mutex::new(HashMap::new()), retrier: Retrier::new( Arc::new(|duration| Box::pin(sleep(duration))), jitter_fn.clone(), - config.retry.clone(), + spec.retry.clone(), ), connection_manager: ConnectionManager::new( std::iter::once(endpoint), - config.connections_per_endpoint, - config.max_concurrent_requests, - config.retry.clone(), + spec.connections_per_endpoint, + spec.max_concurrent_requests, + spec.retry.clone(), jitter_fn, ), }) diff --git a/nativelink-scheduler/src/property_modifier_scheduler.rs b/nativelink-scheduler/src/property_modifier_scheduler.rs index 49044cb58..231d22054 100644 --- a/nativelink-scheduler/src/property_modifier_scheduler.rs +++ b/nativelink-scheduler/src/property_modifier_scheduler.rs @@ -16,7 +16,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use async_trait::async_trait; -use nativelink_config::schedulers::PropertyModification; +use nativelink_config::schedulers::{PropertyModification, PropertyModifierSpec}; use nativelink_error::{Error, ResultExt}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; use nativelink_util::action_messages::{ActionInfo, OperationId}; @@ -28,7 +28,7 @@ use parking_lot::Mutex; #[derive(MetricsComponent)] pub struct PropertyModifierScheduler { - modifications: Vec, + modifications: Vec, #[metric(group = "scheduler")] scheduler: Arc, #[metric(group = "property_manager")] @@ -36,12 +36,9 @@ pub struct PropertyModifierScheduler { } impl PropertyModifierScheduler { - pub fn new( - config: &nativelink_config::schedulers::PropertyModifierScheduler, - scheduler: Arc, - ) -> Self { + pub fn new(spec: &PropertyModifierSpec, scheduler: Arc) -> Self { Self { - modifications: config.modifications.clone(), + modifications: spec.modifications.clone(), scheduler, known_properties: Mutex::new(HashMap::new()), } diff --git a/nativelink-scheduler/src/simple_scheduler.rs b/nativelink-scheduler/src/simple_scheduler.rs index ee4ce0491..744b27903 100644 --- a/nativelink-scheduler/src/simple_scheduler.rs +++ b/nativelink-scheduler/src/simple_scheduler.rs @@ -17,6 +17,7 @@ use std::time::SystemTime; use async_trait::async_trait; use futures::Future; +use nativelink_config::schedulers::SimpleSpec; use nativelink_error::{Code, Error, ResultExt}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; use nativelink_util::action_messages::{ActionInfo, ActionState, OperationId, WorkerId}; @@ -281,12 +282,12 @@ impl SimpleScheduler { impl SimpleScheduler { pub fn new( - scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler, + spec: &SimpleSpec, awaited_action_db: A, task_change_notify: Arc, ) -> (Arc, Arc) { Self::new_with_callback( - scheduler_cfg, + spec, awaited_action_db, || { // The cost of running `do_try_match()` is very high, but constant @@ -311,30 +312,29 @@ impl SimpleScheduler { I: InstantWrapper, NowFn: Fn() -> I + Clone + Send + Unpin + Sync + 'static, >( - scheduler_cfg: &nativelink_config::schedulers::SimpleScheduler, + spec: &SimpleSpec, awaited_action_db: A, on_matching_engine_run: F, task_change_notify: Arc, now_fn: NowFn, ) -> (Arc, Arc) { let platform_property_manager = Arc::new(PlatformPropertyManager::new( - scheduler_cfg - .supported_platform_properties + spec.supported_platform_properties .clone() .unwrap_or_default(), )); - let mut worker_timeout_s = scheduler_cfg.worker_timeout_s; + let mut worker_timeout_s = spec.worker_timeout_s; if worker_timeout_s == 0 { worker_timeout_s = DEFAULT_WORKER_TIMEOUT_S; } - let mut client_action_timeout_s = scheduler_cfg.client_action_timeout_s; + let mut client_action_timeout_s = spec.client_action_timeout_s; if client_action_timeout_s == 0 { client_action_timeout_s = DEFAULT_CLIENT_ACTION_TIMEOUT_S; } - let mut max_job_retries = scheduler_cfg.max_job_retries; + let mut max_job_retries = spec.max_job_retries; if max_job_retries == 0 { max_job_retries = DEFAULT_MAX_JOB_RETRIES; } @@ -351,7 +351,7 @@ impl SimpleScheduler { let worker_scheduler = ApiWorkerScheduler::new( state_manager.clone(), platform_property_manager.clone(), - scheduler_cfg.allocation_strategy, + spec.allocation_strategy, worker_change_notify.clone(), worker_timeout_s, ); diff --git a/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs b/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs index bdc326a95..4fa79862f 100644 --- a/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs +++ b/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs @@ -21,6 +21,7 @@ mod utils { } use futures::join; +use nativelink_config::stores::MemorySpec; use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::ActionResult as ProtoActionResult; @@ -48,9 +49,7 @@ struct TestContext { fn make_cache_scheduler() -> Result { let mock_scheduler = Arc::new(MockActionScheduler::new()); - let ac_store = Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )); + let ac_store = Store::new(MemoryStore::new(&MemorySpec::default())); let cache_scheduler = CacheLookupScheduler::new(ac_store.clone(), mock_scheduler.clone())?; Ok(TestContext { mock_scheduler, diff --git a/nativelink-scheduler/tests/property_modifier_scheduler_test.rs b/nativelink-scheduler/tests/property_modifier_scheduler_test.rs index 07c315cee..0bf4405bc 100644 --- a/nativelink-scheduler/tests/property_modifier_scheduler_test.rs +++ b/nativelink-scheduler/tests/property_modifier_scheduler_test.rs @@ -22,7 +22,9 @@ mod utils { } use futures::{join, StreamExt}; -use nativelink_config::schedulers::{PlatformPropertyAddition, PropertyModification}; +use nativelink_config::schedulers::{ + PlatformPropertyAddition, PropertyModification, PropertyModifierSpec, SchedulerSpec, SimpleSpec, +}; use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_scheduler::property_modifier_scheduler::PropertyModifierScheduler; @@ -42,11 +44,9 @@ struct TestContext { fn make_modifier_scheduler(modifications: Vec) -> TestContext { let mock_scheduler = Arc::new(MockActionScheduler::new()); - let config = nativelink_config::schedulers::PropertyModifierScheduler { + let config = PropertyModifierSpec { modifications, - scheduler: Box::new(nativelink_config::schedulers::SchedulerConfig::simple( - nativelink_config::schedulers::SimpleScheduler::default(), - )), + scheduler: Box::new(SchedulerSpec::simple(SimpleSpec::default())), }; let modifier_scheduler = PropertyModifierScheduler::new(&config, mock_scheduler.clone()); TestContext { diff --git a/nativelink-scheduler/tests/simple_scheduler_test.rs b/nativelink-scheduler/tests/simple_scheduler_test.rs index 46620dbfa..18370ff19 100644 --- a/nativelink-scheduler/tests/simple_scheduler_test.rs +++ b/nativelink-scheduler/tests/simple_scheduler_test.rs @@ -24,7 +24,7 @@ use async_lock::Mutex; use futures::task::Poll; use futures::{poll, Stream, StreamExt}; use mock_instant::thread_local::{MockClock, SystemTime as MockSystemTime}; -use nativelink_config::schedulers::PropertyType; +use nativelink_config::schedulers::{PropertyType, SimpleSpec}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_metric::MetricsComponent; @@ -160,7 +160,7 @@ async fn basic_add_action_with_one_worker_test() -> Result<(), Error> { let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &SimpleSpec::default(), memory_awaited_action_db_factory( 0, &task_change_notify.clone(), @@ -228,7 +228,7 @@ async fn client_does_not_receive_update_timeout() -> Result<(), Error> { let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler { + &SimpleSpec { worker_timeout_s: WORKER_TIMEOUT_S, ..Default::default() }, @@ -291,7 +291,7 @@ async fn find_executing_action() -> Result<(), Error> { let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &SimpleSpec::default(), memory_awaited_action_db_factory( 0, &task_change_notify.clone(), @@ -368,7 +368,7 @@ async fn remove_worker_reschedules_multiple_running_job_test() -> Result<(), Err let worker_id2: WorkerId = WorkerId(Uuid::new_v4()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler { + &SimpleSpec { worker_timeout_s: WORKER_TIMEOUT_S, ..Default::default() }, @@ -546,7 +546,7 @@ async fn set_drain_worker_pauses_and_resumes_worker_test() -> Result<(), Error> let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &SimpleSpec::default(), memory_awaited_action_db_factory( 0, &task_change_notify.clone(), @@ -630,7 +630,7 @@ async fn worker_should_not_queue_if_properties_dont_match_test() -> Result<(), E let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler { + &SimpleSpec { supported_platform_properties: Some(prop_defs), ..Default::default() }, @@ -724,7 +724,7 @@ async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &SimpleSpec::default(), memory_awaited_action_db_factory( 0, &task_change_notify.clone(), @@ -825,7 +825,7 @@ async fn cacheable_items_join_same_action_queued_test() -> Result<(), Error> { async fn worker_disconnects_does_not_schedule_for_execution_test() -> Result<(), Error> { let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &SimpleSpec::default(), memory_awaited_action_db_factory( 0, &task_change_notify.clone(), @@ -985,7 +985,7 @@ async fn matching_engine_fails_sends_abort() -> Result<(), Error> { let (senders, awaited_action) = MockAwaitedAction::new(); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &SimpleSpec::default(), awaited_action, || async move {}, task_change_notify, @@ -1030,7 +1030,7 @@ async fn matching_engine_fails_sends_abort() -> Result<(), Error> { let (senders, awaited_action) = MockAwaitedAction::new(); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &SimpleSpec::default(), awaited_action, || async move {}, task_change_notify, @@ -1081,7 +1081,7 @@ async fn worker_timesout_reschedules_running_job_test() -> Result<(), Error> { let worker_id2: WorkerId = WorkerId(Uuid::new_v4()); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler { + &SimpleSpec { worker_timeout_s: WORKER_TIMEOUT_S, ..Default::default() }, @@ -1206,7 +1206,7 @@ async fn update_action_sends_completed_result_to_client_test() -> Result<(), Err let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &SimpleSpec::default(), memory_awaited_action_db_factory( 0, &task_change_notify.clone(), @@ -1307,7 +1307,7 @@ async fn update_action_sends_completed_result_after_disconnect() -> Result<(), E let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &SimpleSpec::default(), memory_awaited_action_db_factory( 0, &task_change_notify.clone(), @@ -1425,7 +1425,7 @@ async fn update_action_with_wrong_worker_id_errors_test() -> Result<(), Error> { let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &SimpleSpec::default(), memory_awaited_action_db_factory( 0, &task_change_notify.clone(), @@ -1523,7 +1523,7 @@ async fn does_not_crash_if_operation_joined_then_relaunched() -> Result<(), Erro let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &SimpleSpec::default(), memory_awaited_action_db_factory( 0, &task_change_notify.clone(), @@ -1663,7 +1663,7 @@ async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> supported_props.insert("prop1".to_string(), PropertyType::minimum); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler { + &SimpleSpec { supported_platform_properties: Some(supported_props), ..Default::default() }, @@ -1825,7 +1825,7 @@ async fn run_jobs_in_the_order_they_were_queued() -> Result<(), Error> { supported_props.insert("prop1".to_string(), PropertyType::minimum); let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler { + &SimpleSpec { supported_platform_properties: Some(supported_props), ..Default::default() }, @@ -1892,7 +1892,7 @@ async fn worker_retries_on_internal_error_and_fails_test() -> Result<(), Error> let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler { + &SimpleSpec { max_job_retries: 1, ..Default::default() }, @@ -2044,7 +2044,7 @@ async fn ensure_scheduler_drops_inner_spawn() -> Result<(), Error> { // DropChecker. let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &SimpleSpec::default(), memory_awaited_action_db_factory( 0, &task_change_notify.clone(), @@ -2077,7 +2077,7 @@ async fn ensure_task_or_worker_change_notification_received_test() -> Result<(), let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler::default(), + &SimpleSpec::default(), memory_awaited_action_db_factory( 0, &task_change_notify.clone(), @@ -2149,7 +2149,7 @@ async fn ensure_task_or_worker_change_notification_received_test() -> Result<(), async fn client_reconnect_keeps_action_alive() -> Result<(), Error> { let task_change_notify = Arc::new(Notify::new()); let (scheduler, _worker_scheduler) = SimpleScheduler::new_with_callback( - &nativelink_config::schedulers::SimpleScheduler { + &SimpleSpec { worker_timeout_s: WORKER_TIMEOUT_S, ..Default::default() }, diff --git a/nativelink-service/tests/ac_server_test.rs b/nativelink-service/tests/ac_server_test.rs index 50dcada89..065b5296a 100644 --- a/nativelink-service/tests/ac_server_test.rs +++ b/nativelink-service/tests/ac_server_test.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use bytes::BytesMut; use maplit::hashmap; +use nativelink_config::stores::{MemorySpec, StoreSpec}; use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::action_cache_server::ActionCache; @@ -55,9 +56,7 @@ async fn make_store_manager() -> Result, Error> { store_manager.add_store( "main_cas", store_factory( - &nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &StoreSpec::memory(MemorySpec::default()), &store_manager, None, ) @@ -66,9 +65,7 @@ async fn make_store_manager() -> Result, Error> { store_manager.add_store( "main_ac", store_factory( - &nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &StoreSpec::memory(MemorySpec::default()), &store_manager, None, ) diff --git a/nativelink-service/tests/bep_server_test.rs b/nativelink-service/tests/bep_server_test.rs index 4beefaa71..2537f5ac6 100644 --- a/nativelink-service/tests/bep_server_test.rs +++ b/nativelink-service/tests/bep_server_test.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use futures::StreamExt; use hyper::body::Frame; use nativelink_config::cas_server::BepConfig; +use nativelink_config::stores::{MemorySpec, StoreSpec}; use nativelink_error::{Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_proto::google::devtools::build::v1::build_event::console_output::Output; @@ -53,9 +54,7 @@ async fn make_store_manager() -> Result, Error> { store_manager.add_store( BEP_STORE_NAME, store_factory( - &nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &StoreSpec::memory(MemorySpec::default()), &store_manager, None, ) diff --git a/nativelink-service/tests/bytestream_server_test.rs b/nativelink-service/tests/bytestream_server_test.rs index aa8d37698..97154e804 100644 --- a/nativelink-service/tests/bytestream_server_test.rs +++ b/nativelink-service/tests/bytestream_server_test.rs @@ -25,6 +25,7 @@ use hyper_util::server::conn::auto; use hyper_util::service::TowerToHyperService; use maplit::hashmap; use nativelink_config::cas_server::ByteStreamConfig; +use nativelink_config::stores::{MemorySpec, StoreSpec}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_proto::google::bytestream::byte_stream_client::ByteStreamClient; @@ -60,9 +61,7 @@ async fn make_store_manager() -> Result, Error> { store_manager.add_store( "main_cas", store_factory( - &nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &StoreSpec::memory(MemorySpec::default()), &store_manager, None, ) diff --git a/nativelink-service/tests/cas_server_test.rs b/nativelink-service/tests/cas_server_test.rs index d97728c3f..a9b6ffe34 100644 --- a/nativelink-service/tests/cas_server_test.rs +++ b/nativelink-service/tests/cas_server_test.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use futures::StreamExt; use maplit::hashmap; +use nativelink_config::stores::{MemorySpec, StoreSpec}; use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::content_addressable_storage_server::ContentAddressableStorage; @@ -49,9 +50,7 @@ async fn make_store_manager() -> Result, Error> { store_manager.add_store( "main_cas", store_factory( - &nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &StoreSpec::memory(MemorySpec::default()), &store_manager, None, ) diff --git a/nativelink-store/src/compression_store.rs b/nativelink-store/src/compression_store.rs index dd50ce7ea..0e55ff427 100644 --- a/nativelink-store/src/compression_store.rs +++ b/nativelink-store/src/compression_store.rs @@ -23,6 +23,7 @@ use byteorder::{ByteOrder, LittleEndian}; use bytes::{Buf, BufMut, BytesMut}; use futures::future::FutureExt; use lz4_flex::block::{compress_into, decompress_into, get_maximum_output_size}; +use nativelink_config::stores::CompressionSpec; use nativelink_error::{error_if, make_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{ @@ -218,11 +219,8 @@ pub struct CompressionStore { } impl CompressionStore { - pub fn new( - compression_config: &nativelink_config::stores::CompressionStore, - inner_store: Store, - ) -> Result, Error> { - let lz4_config = match compression_config.compression_algorithm { + pub fn new(spec: &CompressionSpec, inner_store: Store) -> Result, Error> { + let lz4_config = match spec.compression_algorithm { nativelink_config::stores::CompressionAlgorithm::lz4(mut lz4_config) => { if lz4_config.block_size == 0 { lz4_config.block_size = DEFAULT_BLOCK_SIZE; diff --git a/nativelink-store/src/dedup_store.rs b/nativelink-store/src/dedup_store.rs index fee1e021f..a3d28324e 100644 --- a/nativelink-store/src/dedup_store.rs +++ b/nativelink-store/src/dedup_store.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use bincode::config::{FixintEncoding, WithOtherIntEncoding}; use bincode::{DefaultOptions, Options}; use futures::stream::{self, FuturesOrdered, StreamExt, TryStreamExt}; +use nativelink_config::stores::DedupSpec; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; @@ -60,29 +61,29 @@ pub struct DedupStore { impl DedupStore { pub fn new( - config: &nativelink_config::stores::DedupStore, + spec: &DedupSpec, index_store: Store, content_store: Store, ) -> Result, Error> { - let min_size = if config.min_size == 0 { + let min_size = if spec.min_size == 0 { DEFAULT_MIN_SIZE } else { - u64::from(config.min_size) + u64::from(spec.min_size) }; - let normal_size = if config.normal_size == 0 { + let normal_size = if spec.normal_size == 0 { DEFAULT_NORM_SIZE } else { - u64::from(config.normal_size) + u64::from(spec.normal_size) }; - let max_size = if config.max_size == 0 { + let max_size = if spec.max_size == 0 { DEFAULT_MAX_SIZE } else { - u64::from(config.max_size) + u64::from(spec.max_size) }; - let max_concurrent_fetch_per_get = if config.max_concurrent_fetch_per_get == 0 { + let max_concurrent_fetch_per_get = if spec.max_concurrent_fetch_per_get == 0 { DEFAULT_MAX_CONCURRENT_FETCH_PER_GET } else { - config.max_concurrent_fetch_per_get as usize + spec.max_concurrent_fetch_per_get as usize }; Ok(Arc::new(Self { index_store, @@ -302,7 +303,7 @@ impl StoreDriver for DedupStore { // 5 requests at a time, and request 3 is stalled, request 1 & 2 can be output and // request 4 & 5 can be executing (or finished) while waiting for 3 to finish. // Note: We will buffer our data here up to: - // `config.max_size * config.max_concurrent_fetch_per_get` per `get_part()` request. + // `spec.max_size * spec.max_concurrent_fetch_per_get` per `get_part()` request. let mut entries_stream = stream::iter(entries) .map(move |index_entry| async move { let data = self diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index b72b8e1a4..506ef6752 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -18,7 +18,7 @@ use std::time::SystemTime; use futures::stream::FuturesOrdered; use futures::{Future, TryStreamExt}; -use nativelink_config::stores::StoreConfig; +use nativelink_config::stores::StoreSpec; use nativelink_error::Error; use nativelink_util::health_utils::HealthRegistryBuilder; use nativelink_util::store_trait::{Store, StoreDriver}; @@ -43,61 +43,59 @@ use crate::verify_store::VerifyStore; type FutureMaybeStore<'a> = Box> + 'a>; pub fn store_factory<'a>( - backend: &'a StoreConfig, + backend: &'a StoreSpec, store_manager: &'a Arc, maybe_health_registry_builder: Option<&'a mut HealthRegistryBuilder>, ) -> Pin> { Box::pin(async move { let store: Arc = match backend { - StoreConfig::memory(config) => MemoryStore::new(config), - StoreConfig::experimental_s3_store(config) => { - S3Store::new(config, SystemTime::now).await? - } - StoreConfig::redis_store(config) => RedisStore::new(config.clone())?, - StoreConfig::verify(config) => VerifyStore::new( - config, - store_factory(&config.backend, store_manager, None).await?, + StoreSpec::memory(spec) => MemoryStore::new(spec), + StoreSpec::experimental_s3_store(spec) => S3Store::new(spec, SystemTime::now).await?, + StoreSpec::redis_store(spec) => RedisStore::new(spec.clone())?, + StoreSpec::verify(spec) => VerifyStore::new( + spec, + store_factory(&spec.backend, store_manager, None).await?, ), - StoreConfig::compression(config) => CompressionStore::new( - &config.clone(), - store_factory(&config.backend, store_manager, None).await?, + StoreSpec::compression(spec) => CompressionStore::new( + &spec.clone(), + store_factory(&spec.backend, store_manager, None).await?, )?, - StoreConfig::dedup(config) => DedupStore::new( - config, - store_factory(&config.index_store, store_manager, None).await?, - store_factory(&config.content_store, store_manager, None).await?, + StoreSpec::dedup(spec) => DedupStore::new( + spec, + store_factory(&spec.index_store, store_manager, None).await?, + store_factory(&spec.content_store, store_manager, None).await?, )?, - StoreConfig::existence_cache(config) => ExistenceCacheStore::new( - config, - store_factory(&config.backend, store_manager, None).await?, + StoreSpec::existence_cache(spec) => ExistenceCacheStore::new( + spec, + store_factory(&spec.backend, store_manager, None).await?, ), - StoreConfig::completeness_checking(config) => CompletenessCheckingStore::new( - store_factory(&config.backend, store_manager, None).await?, - store_factory(&config.cas_store, store_manager, None).await?, + StoreSpec::completeness_checking(spec) => CompletenessCheckingStore::new( + store_factory(&spec.backend, store_manager, None).await?, + store_factory(&spec.cas_store, store_manager, None).await?, ), - StoreConfig::fast_slow(config) => FastSlowStore::new( - config, - store_factory(&config.fast, store_manager, None).await?, - store_factory(&config.slow, store_manager, None).await?, + StoreSpec::fast_slow(spec) => FastSlowStore::new( + spec, + store_factory(&spec.fast, store_manager, None).await?, + store_factory(&spec.slow, store_manager, None).await?, ), - StoreConfig::filesystem(config) => ::new(config).await?, - StoreConfig::ref_store(config) => RefStore::new(config, Arc::downgrade(store_manager)), - StoreConfig::size_partitioning(config) => SizePartitioningStore::new( - config, - store_factory(&config.lower_store, store_manager, None).await?, - store_factory(&config.upper_store, store_manager, None).await?, + StoreSpec::filesystem(spec) => ::new(spec).await?, + StoreSpec::ref_store(spec) => RefStore::new(spec, Arc::downgrade(store_manager)), + StoreSpec::size_partitioning(spec) => SizePartitioningStore::new( + spec, + store_factory(&spec.lower_store, store_manager, None).await?, + store_factory(&spec.upper_store, store_manager, None).await?, ), - StoreConfig::grpc(config) => GrpcStore::new(config).await?, - StoreConfig::noop => NoopStore::new(), - StoreConfig::shard(config) => { - let stores = config + StoreSpec::grpc(spec) => GrpcStore::new(spec).await?, + StoreSpec::noop(_) => NoopStore::new(), + StoreSpec::shard(spec) => { + let stores = spec .stores .iter() - .map(|store_config| store_factory(&store_config.store, store_manager, None)) + .map(|store_spec| store_factory(&store_spec.store, store_manager, None)) .collect::>() .try_collect::>() .await?; - ShardStore::new(config, stores)? + ShardStore::new(spec, stores)? } }; diff --git a/nativelink-store/src/existence_cache_store.rs b/nativelink-store/src/existence_cache_store.rs index 087e37b8c..c48fa6e50 100644 --- a/nativelink-store/src/existence_cache_store.rs +++ b/nativelink-store/src/existence_cache_store.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use std::time::SystemTime; use async_trait::async_trait; -use nativelink_config::stores::{EvictionPolicy, ExistenceCacheStore as ExistenceCacheStoreConfig}; +use nativelink_config::stores::{EvictionPolicy, ExistenceCacheSpec}; use nativelink_error::{error_if, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; @@ -51,19 +51,19 @@ pub struct ExistenceCacheStore { } impl ExistenceCacheStore { - pub fn new(config: &ExistenceCacheStoreConfig, inner_store: Store) -> Arc { - Self::new_with_time(config, inner_store, SystemTime::now()) + pub fn new(spec: &ExistenceCacheSpec, inner_store: Store) -> Arc { + Self::new_with_time(spec, inner_store, SystemTime::now()) } } impl ExistenceCacheStore { pub fn new_with_time( - config: &ExistenceCacheStoreConfig, + spec: &ExistenceCacheSpec, inner_store: Store, anchor_time: I, ) -> Arc { let empty_policy = EvictionPolicy::default(); - let eviction_policy = config.eviction_policy.as_ref().unwrap_or(&empty_policy); + let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy); Arc::new(Self { inner_store, existence_cache: EvictingMap::new(eviction_policy, anchor_time), diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index a6ae8ba71..a89eefb25 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -21,6 +21,7 @@ use std::sync::{Arc, Weak}; use async_trait::async_trait; use futures::{join, FutureExt}; +use nativelink_config::stores::FastSlowSpec; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{ @@ -52,11 +53,7 @@ pub struct FastSlowStore { } impl FastSlowStore { - pub fn new( - _config: &nativelink_config::stores::FastSlowStore, - fast_store: Store, - slow_store: Store, - ) -> Arc { + pub fn new(_spec: &FastSlowSpec, fast_store: Store, slow_store: Store) -> Arc { Arc::new_cyclic(|weak_self| Self { fast_store, slow_store, diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 924edca79..22d79cec8 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -26,6 +26,7 @@ use bytes::BytesMut; use filetime::{set_file_atime, FileTime}; use futures::stream::{StreamExt, TryStreamExt}; use futures::{Future, TryFutureExt}; +use nativelink_config::stores::FilesystemSpec; use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{ @@ -535,49 +536,47 @@ pub struct FilesystemStore { } impl FilesystemStore { - pub async fn new( - config: &nativelink_config::stores::FilesystemStore, - ) -> Result, Error> { - Self::new_with_timeout_and_rename_fn(config, sleep, |from, to| std::fs::rename(from, to)) + pub async fn new(spec: &FilesystemSpec) -> Result, Error> { + Self::new_with_timeout_and_rename_fn(spec, sleep, |from, to| std::fs::rename(from, to)) .await } pub async fn new_with_timeout_and_rename_fn( - config: &nativelink_config::stores::FilesystemStore, + spec: &FilesystemSpec, sleep_fn: fn(Duration) -> Sleep, rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>, ) -> Result, Error> { let now = SystemTime::now(); let empty_policy = nativelink_config::stores::EvictionPolicy::default(); - let eviction_policy = config.eviction_policy.as_ref().unwrap_or(&empty_policy); + let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy); let evicting_map = Arc::new(EvictingMap::new(eviction_policy, now)); - fs::create_dir_all(&config.temp_path) + fs::create_dir_all(&spec.temp_path) .await - .err_tip(|| format!("Failed to temp directory {:?}", &config.temp_path))?; - fs::create_dir_all(&config.content_path) + .err_tip(|| format!("Failed to temp directory {:?}", &spec.temp_path))?; + fs::create_dir_all(&spec.content_path) .await - .err_tip(|| format!("Failed to content directory {:?}", &config.content_path))?; + .err_tip(|| format!("Failed to content directory {:?}", &spec.content_path))?; let shared_context = Arc::new(SharedContext { active_drop_spawns: AtomicU64::new(0), - temp_path: config.temp_path.clone(), - content_path: config.content_path.clone(), + temp_path: spec.temp_path.clone(), + content_path: spec.content_path.clone(), }); - let block_size = if config.block_size == 0 { + let block_size = if spec.block_size == 0 { DEFAULT_BLOCK_SIZE } else { - config.block_size + spec.block_size }; add_files_to_cache(evicting_map.as_ref(), &now, &shared_context, block_size).await?; prune_temp_path(&shared_context.temp_path).await?; - let read_buffer_size = if config.read_buffer_size == 0 { + let read_buffer_size = if spec.read_buffer_size == 0 { DEFAULT_BUFF_SIZE } else { - config.read_buffer_size as usize + spec.read_buffer_size as usize }; Ok(Arc::new_cyclic(|weak_self| Self { shared_context, diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index d02a0b943..bcdda6d9b 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use bytes::BytesMut; use futures::stream::{unfold, FuturesUnordered}; use futures::{future, Future, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use nativelink_config::stores::GrpcSpec; use nativelink_error::{error_if, make_input_err, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_proto::build::bazel::remote::execution::v2::action_cache_client::ActionCacheClient; @@ -70,10 +71,10 @@ pub struct GrpcStore { } impl GrpcStore { - pub async fn new(config: &nativelink_config::stores::GrpcStore) -> Result, Error> { - let jitter_amt = config.retry.jitter; + pub async fn new(spec: &GrpcSpec) -> Result, Error> { + let jitter_amt = spec.retry.jitter; Self::new_with_jitter( - config, + spec, Box::new(move |delay: Duration| { if jitter_amt == 0. { return delay; @@ -87,15 +88,15 @@ impl GrpcStore { } pub async fn new_with_jitter( - config: &nativelink_config::stores::GrpcStore, + spec: &GrpcSpec, jitter_fn: Box Duration + Send + Sync>, ) -> Result, Error> { error_if!( - config.endpoints.is_empty(), + spec.endpoints.is_empty(), "Expected at least 1 endpoint in GrpcStore" ); - let mut endpoints = Vec::with_capacity(config.endpoints.len()); - for endpoint_config in &config.endpoints { + let mut endpoints = Vec::with_capacity(spec.endpoints.len()); + for endpoint_config in &spec.endpoints { let endpoint = tls_utils::endpoint(endpoint_config) .map_err(|e| make_input_err!("Invalid URI for GrpcStore endpoint : {e:?}"))?; endpoints.push(endpoint); @@ -103,18 +104,18 @@ impl GrpcStore { let jitter_fn = Arc::new(jitter_fn); Ok(Arc::new(GrpcStore { - instance_name: config.instance_name.clone(), - store_type: config.store_type, + instance_name: spec.instance_name.clone(), + store_type: spec.store_type, retrier: Retrier::new( Arc::new(|duration| Box::pin(sleep(duration))), jitter_fn.clone(), - config.retry.clone(), + spec.retry.clone(), ), connection_manager: ConnectionManager::new( endpoints.into_iter(), - config.connections_per_endpoint, - config.max_concurrent_requests, - config.retry.clone(), + spec.connections_per_endpoint, + spec.max_concurrent_requests, + spec.retry.clone(), jitter_fn, ), })) diff --git a/nativelink-store/src/memory_store.rs b/nativelink-store/src/memory_store.rs index 99042d37f..27cf2cda0 100644 --- a/nativelink-store/src/memory_store.rs +++ b/nativelink-store/src/memory_store.rs @@ -20,6 +20,7 @@ use std::time::SystemTime; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; +use nativelink_config::stores::MemorySpec; use nativelink_error::{Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; @@ -57,9 +58,9 @@ pub struct MemoryStore { } impl MemoryStore { - pub fn new(config: &nativelink_config::stores::MemoryStore) -> Arc { + pub fn new(spec: &MemorySpec) -> Arc { let empty_policy = nativelink_config::stores::EvictionPolicy::default(); - let eviction_policy = config.eviction_policy.as_ref().unwrap_or(&empty_policy); + let eviction_policy = spec.eviction_policy.as_ref().unwrap_or(&empty_policy); Arc::new(Self { evicting_map: EvictingMap::new(eviction_policy, SystemTime::now()), }) diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 05dbf82c4..c1134361d 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -30,7 +30,7 @@ use fred::types::{ }; use futures::stream::FuturesUnordered; use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; -use nativelink_config::stores::RedisMode; +use nativelink_config::stores::{RedisMode, RedisSpec}; use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; @@ -140,17 +140,17 @@ pub struct RedisStore { impl RedisStore { /// Create a new `RedisStore` from the given configuration. - pub fn new(mut config: nativelink_config::stores::RedisStore) -> Result, Error> { - if config.addresses.is_empty() { + pub fn new(mut spec: RedisSpec) -> Result, Error> { + if spec.addresses.is_empty() { return Err(make_err!( Code::InvalidArgument, "No addresses were specified in redis store configuration." )); }; - let [addr] = config.addresses.as_slice() else { + let [addr] = spec.addresses.as_slice() else { return Err(make_err!(Code::Unimplemented, "Connecting directly to multiple redis nodes in a cluster is currently unsupported. Please specify a single URL to a single node, and nativelink will use cluster discover to find the other nodes.")); }; - let redis_config = match config.mode { + let redis_config = match spec.mode { RedisMode::Cluster => RedisConfig::from_url_clustered(addr), RedisMode::Sentinel => RedisConfig::from_url_sentinel(addr), RedisMode::Standard => RedisConfig::from_url_centralized(addr), @@ -163,18 +163,18 @@ impl RedisStore { })?; let reconnect_policy = { - if config.retry.delay == 0.0 { - config.retry.delay = DEFAULT_RETRY_DELAY; + if spec.retry.delay == 0.0 { + spec.retry.delay = DEFAULT_RETRY_DELAY; } - if config.retry.jitter == 0.0 { - config.retry.jitter = DEFAULT_RETRY_JITTER; + if spec.retry.jitter == 0.0 { + spec.retry.jitter = DEFAULT_RETRY_JITTER; } - let max_retries = u32::try_from(config.retry.max_retries) + let max_retries = u32::try_from(spec.retry.max_retries) .err_tip(|| "max_retries could not be converted to u32 in RedisStore::new")?; - let min_delay_ms = (config.retry.delay * 1000.0) as u32; + let min_delay_ms = (spec.retry.delay * 1000.0) as u32; let max_delay_ms = 8000; - let jitter = (config.retry.jitter * config.retry.delay * 1000.0) as u32; + let jitter = (spec.retry.jitter * spec.retry.delay * 1000.0) as u32; let mut reconnect_policy = ReconnectPolicy::new_exponential( max_retries, /* max_retries, 0 is unlimited */ @@ -187,33 +187,33 @@ impl RedisStore { }; { - if config.broadcast_channel_capacity == 0 { - config.broadcast_channel_capacity = DEFAULT_BROADCAST_CHANNEL_CAPACITY; + if spec.broadcast_channel_capacity == 0 { + spec.broadcast_channel_capacity = DEFAULT_BROADCAST_CHANNEL_CAPACITY; } - if config.connection_timeout_ms == 0 { - config.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS; + if spec.connection_timeout_ms == 0 { + spec.connection_timeout_ms = DEFAULT_CONNECTION_TIMEOUT_MS; } - if config.command_timeout_ms == 0 { - config.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS; + if spec.command_timeout_ms == 0 { + spec.command_timeout_ms = DEFAULT_COMMAND_TIMEOUT_MS; } - if config.connection_pool_size == 0 { - config.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE; + if spec.connection_pool_size == 0 { + spec.connection_pool_size = DEFAULT_CONNECTION_POOL_SIZE; } - if config.read_chunk_size == 0 { - config.read_chunk_size = DEFAULT_READ_CHUNK_SIZE; + if spec.read_chunk_size == 0 { + spec.read_chunk_size = DEFAULT_READ_CHUNK_SIZE; } - if config.max_chunk_uploads_per_update == 0 { - config.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE; + if spec.max_chunk_uploads_per_update == 0 { + spec.max_chunk_uploads_per_update = DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE; } } - let connection_timeout = Duration::from_millis(config.connection_timeout_ms); - let command_timeout = Duration::from_millis(config.command_timeout_ms); + let connection_timeout = Duration::from_millis(spec.connection_timeout_ms); + let command_timeout = Duration::from_millis(spec.command_timeout_ms); let mut builder = Builder::from_config(redis_config); builder .set_performance_config(PerformanceConfig { default_command_timeout: command_timeout, - broadcast_channel_capacity: config.broadcast_channel_capacity, + broadcast_channel_capacity: spec.broadcast_channel_capacity, ..Default::default() }) .set_connection_config(ConnectionConfig { @@ -231,7 +231,7 @@ impl RedisStore { .set_policy(reconnect_policy); let client_pool = builder - .build_pool(config.connection_pool_size) + .build_pool(spec.connection_pool_size) .err_tip(|| "while creating redis connection pool")?; let subscriber_client = builder @@ -241,11 +241,11 @@ impl RedisStore { Self::new_from_builder_and_parts( client_pool, subscriber_client, - config.experimental_pub_sub_channel.clone(), + spec.experimental_pub_sub_channel.clone(), || Uuid::new_v4().to_string(), - config.key_prefix.clone(), - config.read_chunk_size, - config.max_chunk_uploads_per_update, + spec.key_prefix.clone(), + spec.read_chunk_size, + spec.max_chunk_uploads_per_update, ) .map(Arc::new) } diff --git a/nativelink-store/src/ref_store.rs b/nativelink-store/src/ref_store.rs index d2446d174..6014fbbb1 100644 --- a/nativelink-store/src/ref_store.rs +++ b/nativelink-store/src/ref_store.rs @@ -17,6 +17,7 @@ use std::pin::Pin; use std::sync::{Arc, Mutex, Weak}; use async_trait::async_trait; +use nativelink_config::stores::RefSpec; use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; @@ -45,12 +46,9 @@ pub struct RefStore { } impl RefStore { - pub fn new( - config: &nativelink_config::stores::RefStore, - store_manager: Weak, - ) -> Arc { + pub fn new(spec: &RefSpec, store_manager: Weak) -> Arc { Arc::new(RefStore { - ref_store_name: config.name.clone(), + ref_store_name: spec.name.clone(), store_manager, ref_store: StoreReference { mux: Mutex::new(()), diff --git a/nativelink-store/src/s3_store.rs b/nativelink-store/src/s3_store.rs index 8622d1b0e..90b827293 100644 --- a/nativelink-store/src/s3_store.rs +++ b/nativelink-store/src/s3_store.rs @@ -40,6 +40,7 @@ use hyper::client::connect::{Connected, Connection, HttpConnector}; use hyper::service::Service; use hyper::Uri; use hyper_rustls::{HttpsConnector, MaybeHttpsStream}; +use nativelink_config::stores::S3Spec; // Note: S3 store should be very careful about the error codes it returns // when in a retryable wrapper. Always prefer Code::Aborted or another // retryable code over Code::InvalidArgument or make_input_err!(). @@ -141,19 +142,16 @@ pub struct TlsConnector { impl TlsConnector { #[must_use] - pub fn new( - config: &nativelink_config::stores::S3Store, - jitter_fn: Arc Duration + Send + Sync>, - ) -> Self { + pub fn new(spec: &S3Spec, jitter_fn: Arc Duration + Send + Sync>) -> Self { let connector_with_roots = hyper_rustls::HttpsConnectorBuilder::new().with_webpki_roots(); - let connector_with_schemes = if config.insecure_allow_http { + let connector_with_schemes = if spec.insecure_allow_http { connector_with_roots.https_or_http() } else { connector_with_roots.https_only() }; - let connector = if config.disable_http2 { + let connector = if spec.disable_http2 { connector_with_schemes.enable_http1().build() } else { connector_with_schemes.enable_http1().enable_http2().build() @@ -164,7 +162,7 @@ impl TlsConnector { retrier: Retrier::new( Arc::new(|duration| Box::pin(sleep(duration))), jitter_fn, - config.retry.clone(), + spec.retry.clone(), ), } } @@ -260,11 +258,8 @@ where I: InstantWrapper, NowFn: Fn() -> I + Send + Sync + Unpin + 'static, { - pub async fn new( - config: &nativelink_config::stores::S3Store, - now_fn: NowFn, - ) -> Result, Error> { - let jitter_amt = config.retry.jitter; + pub async fn new(spec: &S3Spec, now_fn: NowFn) -> Result, Error> { + let jitter_amt = spec.retry.jitter; let jitter_fn = Arc::new(move |delay: Duration| { if jitter_amt == 0. { return delay; @@ -275,7 +270,7 @@ where }); let s3_client = { let http_client = - HyperClientBuilder::new().build(TlsConnector::new(config, jitter_fn.clone())); + HyperClientBuilder::new().build(TlsConnector::new(spec, jitter_fn.clone())); let credential_provider = credentials::default_provider().await; let mut config_builder = aws_config::defaults(BehaviorVersion::v2024_03_28()) .credentials_provider(credential_provider) @@ -285,7 +280,7 @@ where .connect_timeout(Duration::from_secs(15)) .build(), ) - .region(Region::new(Cow::Owned(config.region.clone()))) + .region(Region::new(Cow::Owned(spec.region.clone()))) .http_client(http_client); // TODO(allada) When aws-sdk supports this env variable we should be able // to remove this. @@ -295,11 +290,11 @@ where } aws_sdk_s3::Client::new(&config_builder.load().await) }; - Self::new_with_client_and_jitter(config, s3_client, jitter_fn, now_fn) + Self::new_with_client_and_jitter(spec, s3_client, jitter_fn, now_fn) } pub fn new_with_client_and_jitter( - config: &nativelink_config::stores::S3Store, + spec: &S3Spec, s3_client: Client, jitter_fn: Arc Duration + Send + Sync>, now_fn: NowFn, @@ -307,18 +302,18 @@ where Ok(Arc::new(Self { s3_client: Arc::new(s3_client), now_fn, - bucket: config.bucket.to_string(), - key_prefix: config.key_prefix.as_ref().unwrap_or(&String::new()).clone(), + bucket: spec.bucket.to_string(), + key_prefix: spec.key_prefix.as_ref().unwrap_or(&String::new()).clone(), retrier: Retrier::new( Arc::new(|duration| Box::pin(sleep(duration))), jitter_fn, - config.retry.clone(), + spec.retry.clone(), ), - consider_expired_after_s: i64::from(config.consider_expired_after_s), - max_retry_buffer_per_request: config + consider_expired_after_s: i64::from(spec.consider_expired_after_s), + max_retry_buffer_per_request: spec .max_retry_buffer_per_request .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST), - multipart_max_concurrent_uploads: config + multipart_max_concurrent_uploads: spec .multipart_max_concurrent_uploads .map_or(DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS, |v| v), })) diff --git a/nativelink-store/src/shard_store.rs b/nativelink-store/src/shard_store.rs index b57f60a12..f874b58be 100644 --- a/nativelink-store/src/shard_store.rs +++ b/nativelink-store/src/shard_store.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use async_trait::async_trait; use futures::stream::{FuturesUnordered, TryStreamExt}; +use nativelink_config::stores::ShardSpec; use nativelink_error::{error_if, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; @@ -45,24 +46,21 @@ pub struct ShardStore { } impl ShardStore { - pub fn new( - config: &nativelink_config::stores::ShardStore, - stores: Vec, - ) -> Result, Error> { + pub fn new(spec: &ShardSpec, stores: Vec) -> Result, Error> { error_if!( - config.stores.len() != stores.len(), + spec.stores.len() != stores.len(), "Config shards do not match stores length" ); error_if!( - config.stores.is_empty(), + spec.stores.is_empty(), "ShardStore must have at least one store" ); - let total_weight: u64 = config + let total_weight: u64 = spec .stores .iter() .map(|shard_config| u64::from(shard_config.weight.unwrap_or(1))) .sum(); - let mut weights: Vec = config + let mut weights: Vec = spec .stores .iter() .map(|shard_config| { diff --git a/nativelink-store/src/size_partitioning_store.rs b/nativelink-store/src/size_partitioning_store.rs index b3569c90f..703646c45 100644 --- a/nativelink-store/src/size_partitioning_store.rs +++ b/nativelink-store/src/size_partitioning_store.rs @@ -16,6 +16,7 @@ use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; +use nativelink_config::stores::SizePartitioningSpec; use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf}; @@ -34,13 +35,9 @@ pub struct SizePartitioningStore { } impl SizePartitioningStore { - pub fn new( - config: &nativelink_config::stores::SizePartitioningStore, - lower_store: Store, - upper_store: Store, - ) -> Arc { + pub fn new(spec: &SizePartitioningSpec, lower_store: Store, upper_store: Store) -> Arc { Arc::new(SizePartitioningStore { - partition_size: config.size, + partition_size: spec.size, lower_store, upper_store, }) diff --git a/nativelink-store/src/verify_store.rs b/nativelink-store/src/verify_store.rs index 674ab2bd2..c541fa238 100644 --- a/nativelink-store/src/verify_store.rs +++ b/nativelink-store/src/verify_store.rs @@ -16,6 +16,7 @@ use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; +use nativelink_config::stores::VerifySpec; use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_metric::MetricsComponent; use nativelink_util::buf_channel::{ @@ -47,11 +48,11 @@ pub struct VerifyStore { } impl VerifyStore { - pub fn new(config: &nativelink_config::stores::VerifyStore, inner_store: Store) -> Arc { + pub fn new(spec: &VerifySpec, inner_store: Store) -> Arc { Arc::new(VerifyStore { inner_store, - verify_size: config.verify_size, - verify_hash: config.verify_hash, + verify_size: spec.verify_size, + verify_hash: spec.verify_hash, size_verification_failures: CounterWithTime::default(), hash_verification_failures: CounterWithTime::default(), }) diff --git a/nativelink-store/tests/ac_utils_test.rs b/nativelink-store/tests/ac_utils_test.rs index f4d50b46d..131e2688d 100644 --- a/nativelink-store/tests/ac_utils_test.rs +++ b/nativelink-store/tests/ac_utils_test.rs @@ -15,6 +15,7 @@ use std::env; use std::ffi::OsString; +use nativelink_config::stores::MemorySpec; use nativelink_error::{Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::memory_store::MemoryStore; @@ -46,7 +47,7 @@ const HASH1_SIZE: i64 = 147; async fn upload_file_to_store_with_large_file() -> Result<(), Error> { let filepath = make_temp_path("test.txt").await; let expected_data = vec![0x88; 1024 * 1024]; // 1MB. - let store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let store = MemoryStore::new(&MemorySpec::default()); let digest = DigestInfo::try_new(HASH1, HASH1_SIZE)?; // Dummy hash data. { // Write 1MB of 0x88s to the file. diff --git a/nativelink-store/tests/completeness_checking_store_test.rs b/nativelink-store/tests/completeness_checking_store_test.rs index 319299924..66b7205ed 100644 --- a/nativelink-store/tests/completeness_checking_store_test.rs +++ b/nativelink-store/tests/completeness_checking_store_test.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use nativelink_config::stores::MemoryStore as MemoryStoreConfig; +use nativelink_config::stores::MemorySpec; use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::{ @@ -36,8 +36,8 @@ const STDOUT: DigestInfo = DigestInfo::new([5u8; 32], 0); const STDERR: DigestInfo = DigestInfo::new([6u8; 32], 0); async fn setup() -> Result<(Arc, Arc, DigestInfo), Error> { - let backend_store = Store::new(MemoryStore::new(&MemoryStoreConfig::default())); - let cas_store = MemoryStore::new(&MemoryStoreConfig::default()); + let backend_store = Store::new(MemoryStore::new(&MemorySpec::default())); + let cas_store = MemoryStore::new(&MemorySpec::default()); let ac_store = CompletenessCheckingStore::new(backend_store.clone(), Store::new(cas_store.clone())); diff --git a/nativelink-store/tests/compression_store_test.rs b/nativelink-store/tests/compression_store_test.rs index d6780dec0..c3468243b 100644 --- a/nativelink-store/tests/compression_store_test.rs +++ b/nativelink-store/tests/compression_store_test.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use bincode::{DefaultOptions, Options}; use bytes::Bytes; +use nativelink_config::stores::{CompressionSpec, MemorySpec, StoreSpec}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::compression_store::{ @@ -73,19 +74,15 @@ async fn simple_smoke_test() -> Result<(), Error> { const RAW_INPUT: &str = "123"; let store = CompressionStore::new( - &nativelink_config::stores::CompressionStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &CompressionSpec { + backend: StoreSpec::memory(MemorySpec::default()), compression_algorithm: nativelink_config::stores::CompressionAlgorithm::lz4( nativelink_config::stores::Lz4Config { ..Default::default() }, ), }, - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), + Store::new(MemoryStore::new(&MemorySpec::default())), ) .err_tip(|| "Failed to create compression store")?; @@ -114,10 +111,8 @@ async fn partial_reads_test() -> Result<(), Error> { ]; let store_owned = CompressionStore::new( - &nativelink_config::stores::CompressionStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &CompressionSpec { + backend: StoreSpec::memory(MemorySpec::default()), compression_algorithm: nativelink_config::stores::CompressionAlgorithm::lz4( nativelink_config::stores::Lz4Config { block_size: 10, @@ -125,9 +120,7 @@ async fn partial_reads_test() -> Result<(), Error> { }, ), }, - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), + Store::new(MemoryStore::new(&MemorySpec::default())), ) .err_tip(|| "Failed to create compression store")?; let store = Pin::new(&store_owned); @@ -167,19 +160,15 @@ async fn partial_reads_test() -> Result<(), Error> { #[nativelink_test] async fn rand_5mb_smoke_test() -> Result<(), Error> { let store_owned = CompressionStore::new( - &nativelink_config::stores::CompressionStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &CompressionSpec { + backend: StoreSpec::memory(MemorySpec::default()), compression_algorithm: nativelink_config::stores::CompressionAlgorithm::lz4( nativelink_config::stores::Lz4Config { ..Default::default() }, ), }, - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), + Store::new(MemoryStore::new(&MemorySpec::default())), ) .err_tip(|| "Failed to create compression store")?; let store = Pin::new(&store_owned); @@ -202,12 +191,10 @@ async fn rand_5mb_smoke_test() -> Result<(), Error> { #[nativelink_test] async fn sanity_check_zero_bytes_test() -> Result<(), Error> { - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store_owned = CompressionStore::new( - &nativelink_config::stores::CompressionStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &CompressionSpec { + backend: StoreSpec::memory(MemorySpec::default()), compression_algorithm: nativelink_config::stores::CompressionAlgorithm::lz4( nativelink_config::stores::Lz4Config { ..Default::default() @@ -259,12 +246,10 @@ async fn check_header_test() -> Result<(), Error> { const MAX_SIZE_INPUT: u64 = 1024 * 1024; // 1MB. const RAW_INPUT: &str = "123"; - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store_owned = CompressionStore::new( - &nativelink_config::stores::CompressionStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &CompressionSpec { + backend: StoreSpec::memory(MemorySpec::default()), compression_algorithm: nativelink_config::stores::CompressionAlgorithm::lz4( nativelink_config::stores::Lz4Config { block_size: BLOCK_SIZE, @@ -347,12 +332,10 @@ async fn check_footer_test() -> Result<(), Error> { const BLOCK_SIZE: u32 = 32 * 1024; const EXPECTED_INDEXES: [u32; 7] = [32898, 32898, 32898, 32898, 140, 140, 140]; - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store_owned = CompressionStore::new( - &nativelink_config::stores::CompressionStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &CompressionSpec { + backend: StoreSpec::memory(MemorySpec::default()), compression_algorithm: nativelink_config::stores::CompressionAlgorithm::lz4( nativelink_config::stores::Lz4Config { block_size: BLOCK_SIZE, @@ -495,12 +478,10 @@ async fn get_part_is_zero_digest() -> Result<(), Error> { let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store_owned = CompressionStore::new( - &nativelink_config::stores::CompressionStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &CompressionSpec { + backend: StoreSpec::memory(MemorySpec::default()), compression_algorithm: nativelink_config::stores::CompressionAlgorithm::lz4( nativelink_config::stores::Lz4Config { block_size: BLOCK_SIZE, diff --git a/nativelink-store/tests/dedup_store_test.rs b/nativelink-store/tests/dedup_store_test.rs index d8372fde5..e8d4d1762 100644 --- a/nativelink-store/tests/dedup_store_test.rs +++ b/nativelink-store/tests/dedup_store_test.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use nativelink_config::stores::{DedupSpec, MemorySpec, StoreSpec}; use nativelink_error::{Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS; @@ -23,14 +24,10 @@ use pretty_assertions::assert_eq; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; -fn make_default_config() -> nativelink_config::stores::DedupStore { - nativelink_config::stores::DedupStore { - index_store: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), - content_store: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), +fn make_default_config() -> DedupSpec { + DedupSpec { + index_store: StoreSpec::memory(MemorySpec::default()), + content_store: StoreSpec::memory(MemorySpec::default()), min_size: 8 * 1024, normal_size: 32 * 1024, max_size: 128 * 1024, @@ -53,12 +50,8 @@ const MEGABYTE_SZ: usize = 1024 * 1024; async fn simple_round_trip_test() -> Result<(), Error> { let store = DedupStore::new( &make_default_config(), - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), // Index store. - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), // Content store. + Store::new(MemoryStore::new(&MemorySpec::default())), // Index store. + Store::new(MemoryStore::new(&MemorySpec::default())), // Content store. )?; let original_data = make_random_data(MEGABYTE_SZ); @@ -85,12 +78,10 @@ async fn check_missing_last_chunk_test() -> Result<(), Error> { "7c8608f5b079bef66c45bd67f7d8ede15d2e1830ea38fd8ad4c6de08b6f21a0c"; const LAST_CHUNK_SIZE: usize = 25779; - let content_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let content_store = MemoryStore::new(&MemorySpec::default()); let store = DedupStore::new( &make_default_config(), - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), // Index store. + Store::new(MemoryStore::new(&MemorySpec::default())), // Index store. Store::new(content_store.clone()), )?; @@ -132,12 +123,8 @@ async fn fetch_part_test() -> Result<(), Error> { let store = DedupStore::new( &make_default_config(), - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), // Index store. - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), // Content store. + Store::new(MemoryStore::new(&MemorySpec::default())), // Index store. + Store::new(MemoryStore::new(&MemorySpec::default())), // Content store. )?; let original_data = make_random_data(DATA_SIZE); @@ -173,24 +160,16 @@ async fn check_length_not_set_with_chunk_read_beyond_first_chunk_regression_test const START_READ_BYTE: usize = 7; let store = DedupStore::new( - &nativelink_config::stores::DedupStore { - index_store: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), - content_store: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &DedupSpec { + index_store: StoreSpec::memory(MemorySpec::default()), + content_store: StoreSpec::memory(MemorySpec::default()), min_size: 5, normal_size: 6, max_size: 7, max_concurrent_fetch_per_get: 10, }, - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), // Index store. - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), // Content store. + Store::new(MemoryStore::new(&MemorySpec::default())), // Index store. + Store::new(MemoryStore::new(&MemorySpec::default())), // Content store. )?; let original_data = make_random_data(DATA_SIZE); @@ -226,24 +205,16 @@ async fn check_chunk_boundary_reads_test() -> Result<(), Error> { const START_READ_BYTE: usize = 10; let store = DedupStore::new( - &nativelink_config::stores::DedupStore { - index_store: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), - content_store: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &DedupSpec { + index_store: StoreSpec::memory(MemorySpec::default()), + content_store: StoreSpec::memory(MemorySpec::default()), min_size: 5, normal_size: 6, max_size: 7, max_concurrent_fetch_per_get: 10, }, - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), // Index store. - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), // Content store. + Store::new(MemoryStore::new(&MemorySpec::default())), // Index store. + Store::new(MemoryStore::new(&MemorySpec::default())), // Content store. )?; let original_data = make_random_data(DATA_SIZE); @@ -307,8 +278,8 @@ async fn check_chunk_boundary_reads_test() -> Result<(), Error> { async fn has_checks_content_store() -> Result<(), Error> { const DATA_SIZE: usize = MEGABYTE_SZ / 4; - let index_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); - let content_store = MemoryStore::new(&nativelink_config::stores::MemoryStore { + let index_store = MemoryStore::new(&MemorySpec::default()); + let content_store = MemoryStore::new(&MemorySpec { eviction_policy: Some(nativelink_config::stores::EvictionPolicy { max_count: 10, ..Default::default() @@ -373,8 +344,8 @@ async fn has_checks_content_store() -> Result<(), Error> { async fn has_with_no_existing_index_returns_none_test() -> Result<(), Error> { const DATA_SIZE: usize = 10; - let index_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); - let content_store = MemoryStore::new(&nativelink_config::stores::MemoryStore { + let index_store = MemoryStore::new(&MemorySpec::default()); + let content_store = MemoryStore::new(&MemorySpec { eviction_policy: Some(nativelink_config::stores::EvictionPolicy { max_count: 10, ..Default::default() @@ -404,8 +375,8 @@ async fn has_with_no_existing_index_returns_none_test() -> Result<(), Error> { /// properly return Some(0). #[nativelink_test] async fn has_with_zero_digest_returns_some_test() -> Result<(), Error> { - let index_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); - let content_store = MemoryStore::new(&nativelink_config::stores::MemoryStore { + let index_store = MemoryStore::new(&MemorySpec::default()); + let content_store = MemoryStore::new(&MemorySpec { eviction_policy: Some(nativelink_config::stores::EvictionPolicy { max_count: 10, ..Default::default() diff --git a/nativelink-store/tests/existence_store_test.rs b/nativelink-store/tests/existence_store_test.rs index 2f47eabe0..3d7f1c92d 100644 --- a/nativelink-store/tests/existence_store_test.rs +++ b/nativelink-store/tests/existence_store_test.rs @@ -16,7 +16,7 @@ use std::time::Duration; use mock_instant::thread_local::MockClock; use nativelink_config::stores::{ - EvictionPolicy, ExistenceCacheStore as ExistenceCacheStoreConfig, StoreConfig, + EvictionPolicy, ExistenceCacheSpec, MemorySpec, NoopSpec, StoreSpec, }; use nativelink_error::{Error, ResultExt}; use nativelink_macro::nativelink_test; @@ -32,14 +32,12 @@ const VALID_HASH1: &str = "0123456789abcdef0000000000000000000100000000000001234 #[nativelink_test] async fn simple_exist_cache_test() -> Result<(), Error> { const VALUE: &str = "123"; - let config = ExistenceCacheStoreConfig { - backend: StoreConfig::noop, // Note: Not used. + let spec = ExistenceCacheSpec { + backend: StoreSpec::noop(NoopSpec::default()), // Note: Not used. eviction_policy: Default::default(), }; - let inner_store = Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )); - let store = ExistenceCacheStore::new(&config, inner_store.clone()); + let inner_store = Store::new(MemoryStore::new(&MemorySpec::default())); + let store = ExistenceCacheStore::new(&spec, inner_store.clone()); let digest = DigestInfo::try_new(VALID_HASH1, 3).unwrap(); store @@ -72,14 +70,12 @@ async fn simple_exist_cache_test() -> Result<(), Error> { #[nativelink_test] async fn update_flags_existance_cache_test() -> Result<(), Error> { const VALUE: &str = "123"; - let config = ExistenceCacheStoreConfig { - backend: StoreConfig::noop, + let spec = ExistenceCacheSpec { + backend: StoreSpec::noop(NoopSpec::default()), eviction_policy: Default::default(), }; - let inner_store = Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )); - let store = ExistenceCacheStore::new(&config, inner_store.clone()); + let inner_store = Store::new(MemoryStore::new(&MemorySpec::default())); + let store = ExistenceCacheStore::new(&spec, inner_store.clone()); let digest = DigestInfo::try_new(VALID_HASH1, 3).unwrap(); store @@ -97,19 +93,17 @@ async fn update_flags_existance_cache_test() -> Result<(), Error> { #[nativelink_test] async fn get_part_caches_if_exact_size_set() -> Result<(), Error> { const VALUE: &str = "123"; - let config = ExistenceCacheStoreConfig { - backend: StoreConfig::noop, + let spec = ExistenceCacheSpec { + backend: StoreSpec::noop(NoopSpec::default()), eviction_policy: Default::default(), }; - let inner_store = Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )); + let inner_store = Store::new(MemoryStore::new(&MemorySpec::default())); let digest = DigestInfo::try_new(VALID_HASH1, 3).unwrap(); inner_store .update_oneshot(digest, VALUE.into()) .await .err_tip(|| "Failed to update store")?; - let store = ExistenceCacheStore::new(&config, inner_store.clone()); + let store = ExistenceCacheStore::new(&spec, inner_store.clone()); let _ = store .get_part_unchunked(digest, 0, None) @@ -127,15 +121,15 @@ async fn get_part_caches_if_exact_size_set() -> Result<(), Error> { #[nativelink_test] async fn ensure_has_requests_eventually_do_let_evictions_happen() -> Result<(), Error> { const VALUE: &str = "123"; - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let digest = DigestInfo::try_new(VALID_HASH1, 3).unwrap(); inner_store .update_oneshot(digest, VALUE.into()) .await .err_tip(|| "Failed to update store")?; let store = ExistenceCacheStore::new_with_time( - &ExistenceCacheStoreConfig { - backend: StoreConfig::noop, + &ExistenceCacheSpec { + backend: StoreSpec::noop(NoopSpec::default()), eviction_policy: Some(EvictionPolicy { max_seconds: 10, ..Default::default() diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index caaaa7344..74b65f749 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -18,6 +18,7 @@ use std::sync::{Arc, Mutex}; use async_trait::async_trait; use bytes::Bytes; +use nativelink_config::stores::{FastSlowSpec, MemorySpec, NoopSpec, StoreSpec}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_metric::MetricsComponent; @@ -35,20 +36,12 @@ use rand::{Rng, SeedableRng}; const MEGABYTE_SZ: usize = 1024 * 1024; fn make_stores() -> (Store, Store, Store) { - let fast_store = Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )); - let slow_store = Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )); + let fast_store = Store::new(MemoryStore::new(&MemorySpec::default())); + let slow_store = Store::new(MemoryStore::new(&MemorySpec::default())); let fast_slow_store = Store::new(FastSlowStore::new( - &nativelink_config::stores::FastSlowStore { - fast: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), - slow: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &FastSlowSpec { + fast: StoreSpec::memory(MemorySpec::default()), + slow: StoreSpec::memory(MemorySpec::default()), }, fast_store.clone(), slow_store.clone(), @@ -336,13 +329,9 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { })); let fast_slow_store = FastSlowStore::new( - &nativelink_config::stores::FastSlowStore { - fast: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), - slow: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &FastSlowSpec { + fast: StoreSpec::memory(MemorySpec::default()), + slow: StoreSpec::memory(MemorySpec::default()), }, fast_store, slow_store, @@ -378,20 +367,12 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { #[nativelink_test] async fn ignore_value_in_fast_store() -> Result<(), Error> { - let fast_store = Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )); - let slow_store = Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )); + let fast_store = Store::new(MemoryStore::new(&MemorySpec::default())); + let slow_store = Store::new(MemoryStore::new(&MemorySpec::default())); let fast_slow_store = Arc::new(FastSlowStore::new( - &nativelink_config::stores::FastSlowStore { - fast: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), - slow: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &FastSlowSpec { + fast: StoreSpec::memory(MemorySpec::default()), + slow: StoreSpec::memory(MemorySpec::default()), }, fast_store.clone(), slow_store, @@ -410,15 +391,11 @@ async fn ignore_value_in_fast_store() -> Result<(), Error> { // Regression test for https://github.com/TraceMachina/nativelink/issues/665 #[nativelink_test] async fn has_checks_fast_store_when_noop() -> Result<(), Error> { - let fast_store = Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )); + let fast_store = Store::new(MemoryStore::new(&MemorySpec::default())); let slow_store = Store::new(NoopStore::new()); - let fast_slow_store_config = nativelink_config::stores::FastSlowStore { - fast: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), - slow: nativelink_config::stores::StoreConfig::noop, + let fast_slow_store_config = FastSlowSpec { + fast: StoreSpec::memory(MemorySpec::default()), + slow: StoreSpec::noop(NoopSpec::default()), }; let fast_slow_store = Arc::new(FastSlowStore::new( &fast_slow_store_config, diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 7a3ca42b2..99b3333c8 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -27,6 +27,7 @@ use filetime::{set_file_atime, FileTime}; use futures::executor::block_on; use futures::task::Poll; use futures::{poll, Future, FutureExt}; +use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec}; use nativelink_error::{make_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::fast_slow_store::FastSlowStore; @@ -255,7 +256,7 @@ async fn valid_results_after_shutdown_test() -> Result<(), Error> { let temp_path = make_temp_path("temp_path"); { let store = Store::new( - FilesystemStore::::new(&nativelink_config::stores::FilesystemStore { + FilesystemStore::::new(&FilesystemSpec { content_path: content_path.clone(), temp_path: temp_path.clone(), eviction_policy: None, @@ -278,7 +279,7 @@ async fn valid_results_after_shutdown_test() -> Result<(), Error> { { // With a new store ensure content is still readable (ie: restores from shutdown). let store = Box::pin( - FilesystemStore::::new(&nativelink_config::stores::FilesystemStore { + FilesystemStore::::new(&FilesystemSpec { content_path, temp_path, eviction_policy: None, @@ -310,17 +311,15 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> { let temp_path = make_temp_path("temp_path"); let store = Box::pin( - FilesystemStore::>::new( - &nativelink_config::stores::FilesystemStore { - content_path: content_path.clone(), - temp_path: temp_path.clone(), - eviction_policy: Some(nativelink_config::stores::EvictionPolicy { - max_count: 3, - ..Default::default() - }), + FilesystemStore::>::new(&FilesystemSpec { + content_path: content_path.clone(), + temp_path: temp_path.clone(), + eviction_policy: Some(nativelink_config::stores::EvictionPolicy { + max_count: 3, ..Default::default() - }, - ) + }), + ..Default::default() + }) .await?, ); @@ -390,18 +389,16 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error> let temp_path = make_temp_path("temp_path"); let store = Arc::new( - FilesystemStore::>::new( - &nativelink_config::stores::FilesystemStore { - content_path: content_path.clone(), - temp_path: temp_path.clone(), - eviction_policy: Some(nativelink_config::stores::EvictionPolicy { - max_count: 3, - ..Default::default() - }), - block_size: 1, - read_buffer_size: 1, - }, - ) + FilesystemStore::>::new(&FilesystemSpec { + content_path: content_path.clone(), + temp_path: temp_path.clone(), + eviction_policy: Some(nativelink_config::stores::EvictionPolicy { + max_count: 3, + ..Default::default() + }), + block_size: 1, + read_buffer_size: 1, + }) .await?, ); @@ -513,18 +510,16 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> { let temp_path = make_temp_path("temp_path"); let store = Arc::new( - FilesystemStore::>::new( - &nativelink_config::stores::FilesystemStore { - content_path: content_path.clone(), - temp_path: temp_path.clone(), - eviction_policy: Some(nativelink_config::stores::EvictionPolicy { - max_count: 1, - ..Default::default() - }), - block_size: 1, - read_buffer_size: 1, - }, - ) + FilesystemStore::>::new(&FilesystemSpec { + content_path: content_path.clone(), + temp_path: temp_path.clone(), + eviction_policy: Some(nativelink_config::stores::EvictionPolicy { + max_count: 1, + ..Default::default() + }), + block_size: 1, + read_buffer_size: 1, + }) .await?, ); @@ -610,7 +605,7 @@ async fn atime_updates_on_get_part_test() -> Result<(), Error> { let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?; let store = Box::pin( - FilesystemStore::::new(&nativelink_config::stores::FilesystemStore { + FilesystemStore::::new(&FilesystemSpec { content_path: make_temp_path("content_path"), temp_path: make_temp_path("temp_path"), eviction_policy: None, @@ -671,7 +666,7 @@ async fn oldest_entry_evicted_with_access_times_loaded_from_disk() -> Result<(), // Load the existing store from disk. let store = Box::pin( - FilesystemStore::::new(&nativelink_config::stores::FilesystemStore { + FilesystemStore::::new(&FilesystemSpec { content_path, temp_path: make_temp_path("temp_path"), eviction_policy: Some(nativelink_config::stores::EvictionPolicy { @@ -702,7 +697,7 @@ async fn eviction_drops_file_test() -> Result<(), Error> { let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?; let store = Box::pin( - FilesystemStore::::new(&nativelink_config::stores::FilesystemStore { + FilesystemStore::::new(&FilesystemSpec { content_path: make_temp_path("content_path"), temp_path: make_temp_path("temp_path"), eviction_policy: None, @@ -752,7 +747,7 @@ async fn digest_contents_replaced_continues_using_old_data() -> Result<(), Error let digest = DigestInfo::try_new(HASH1, VALUE1.len())?; let store = Box::pin( - FilesystemStore::::new(&nativelink_config::stores::FilesystemStore { + FilesystemStore::::new(&FilesystemSpec { content_path: make_temp_path("content_path"), temp_path: make_temp_path("temp_path"), eviction_policy: None, @@ -819,18 +814,16 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> { let big_digest = DigestInfo::try_new(HASH1, BIG_VALUE.len())?; let store = Box::pin( - FilesystemStore::>::new( - &nativelink_config::stores::FilesystemStore { - content_path: make_temp_path("content_path"), - temp_path: make_temp_path("temp_path"), - eviction_policy: Some(nativelink_config::stores::EvictionPolicy { - max_bytes: 5, - ..Default::default() - }), - block_size: 1, + FilesystemStore::>::new(&FilesystemSpec { + content_path: make_temp_path("content_path"), + temp_path: make_temp_path("temp_path"), + eviction_policy: Some(nativelink_config::stores::EvictionPolicy { + max_bytes: 5, ..Default::default() - }, - ) + }), + block_size: 1, + ..Default::default() + }) .await?, ); // Insert data into store. @@ -916,14 +909,12 @@ async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens() let temp_path = make_temp_path("temp_path"); let store = Box::pin( - FilesystemStore::>::new( - &nativelink_config::stores::FilesystemStore { - content_path: content_path.clone(), - temp_path: temp_path.clone(), - eviction_policy: None, - ..Default::default() - }, - ) + FilesystemStore::>::new(&FilesystemSpec { + content_path: content_path.clone(), + temp_path: temp_path.clone(), + eviction_policy: None, + ..Default::default() + }) .await?, ); @@ -1004,7 +995,7 @@ async fn get_part_timeout_test() -> Result<(), Error> { let store = Arc::new( FilesystemStore::::new_with_timeout_and_rename_fn( - &nativelink_config::stores::FilesystemStore { + &FilesystemSpec { content_path: content_path.clone(), temp_path: temp_path.clone(), read_buffer_size: 1, @@ -1051,7 +1042,7 @@ async fn get_part_is_zero_digest() -> Result<(), Error> { let store = Arc::new( FilesystemStore::::new_with_timeout_and_rename_fn( - &nativelink_config::stores::FilesystemStore { + &FilesystemSpec { content_path: content_path.clone(), temp_path: temp_path.clone(), read_buffer_size: 1, @@ -1118,7 +1109,7 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> { let store = Arc::new( FilesystemStore::::new_with_timeout_and_rename_fn( - &nativelink_config::stores::FilesystemStore { + &FilesystemSpec { content_path: content_path.clone(), temp_path: temp_path.clone(), read_buffer_size: 1, @@ -1161,7 +1152,7 @@ async fn update_file_future_drops_before_rename() -> Result<(), Error> { let content_path = make_temp_path("content_path"); let store = Arc::pin( FilesystemStore::::new_with_timeout_and_rename_fn( - &nativelink_config::stores::FilesystemStore { + &FilesystemSpec { content_path: content_path.clone(), temp_path: make_temp_path("temp_path"), eviction_policy: None, @@ -1245,7 +1236,7 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> { let store = Box::pin( FilesystemStore::::new_with_timeout_and_rename_fn( - &nativelink_config::stores::FilesystemStore { + &FilesystemSpec { content_path: content_path.clone(), temp_path: temp_path.clone(), read_buffer_size: 1, @@ -1290,7 +1281,7 @@ async fn get_file_size_uses_block_size() -> Result<(), Error> { let store = Box::pin( FilesystemStore::::new_with_timeout_and_rename_fn( - &nativelink_config::stores::FilesystemStore { + &FilesystemSpec { content_path: content_path.clone(), temp_path: temp_path.clone(), read_buffer_size: 1, @@ -1336,7 +1327,7 @@ async fn update_with_whole_file_closes_file() -> Result<(), Error> { let digest = DigestInfo::try_new(HASH1, value.len())?; let store = Box::pin( - FilesystemStore::::new(&nativelink_config::stores::FilesystemStore { + FilesystemStore::::new(&FilesystemSpec { content_path: content_path.clone(), temp_path: temp_path.clone(), read_buffer_size: 1, @@ -1383,16 +1374,12 @@ async fn update_with_whole_file_slow_path_when_low_file_descriptors() -> Result< let store = FastSlowStore::new( // Note: The config is not needed for this test, so use dummy data. - &nativelink_config::stores::FastSlowStore { - fast: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), - slow: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &FastSlowSpec { + fast: StoreSpec::memory(MemorySpec::default()), + slow: StoreSpec::memory(MemorySpec::default()), }, Store::new( - FilesystemStore::::new(&nativelink_config::stores::FilesystemStore { + FilesystemStore::::new(&FilesystemSpec { content_path: make_temp_path("content_path"), temp_path: make_temp_path("temp_path"), read_buffer_size: 1, @@ -1401,7 +1388,7 @@ async fn update_with_whole_file_slow_path_when_low_file_descriptors() -> Result< .await?, ), Store::new( - FilesystemStore::::new(&nativelink_config::stores::FilesystemStore { + FilesystemStore::::new(&FilesystemSpec { content_path: make_temp_path("content_path1"), temp_path: make_temp_path("temp_path1"), read_buffer_size: 1, @@ -1443,7 +1430,7 @@ async fn update_with_whole_file_uses_same_inode() -> Result<(), Error> { let store = Box::pin( FilesystemStore::::new_with_timeout_and_rename_fn( - &nativelink_config::stores::FilesystemStore { + &FilesystemSpec { content_path: content_path.clone(), temp_path: temp_path.clone(), read_buffer_size: 1, diff --git a/nativelink-store/tests/memory_store_test.rs b/nativelink-store/tests/memory_store_test.rs index a31f18469..65623f8a1 100644 --- a/nativelink-store/tests/memory_store_test.rs +++ b/nativelink-store/tests/memory_store_test.rs @@ -17,6 +17,7 @@ use std::pin::Pin; use bytes::{BufMut, Bytes, BytesMut}; use memory_stats::memory_stats; +use nativelink_config::stores::MemorySpec; use nativelink_error::{Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::memory_store::MemoryStore; @@ -39,7 +40,7 @@ const INVALID_HASH: &str = "g111111111111111111111111111111111111111111111111111 async fn insert_one_item_then_update() -> Result<(), Error> { const VALUE1: &str = "13"; const VALUE2: &str = "23"; - let store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let store = MemoryStore::new(&MemorySpec::default()); // Insert dummy value into store. store @@ -91,7 +92,7 @@ async fn ensure_full_copy_of_bytes_is_made_test() -> Result<(), Error> { let mut sum_memory_usage_increase_perc: f64 = 0.0; for _ in 0..MAX_STATS_ITERATIONS { - let store_owned = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let store_owned = MemoryStore::new(&MemorySpec::default()); let store = Pin::new(&store_owned); let initial_virtual_mem = memory_stats() @@ -130,7 +131,7 @@ async fn ensure_full_copy_of_bytes_is_made_test() -> Result<(), Error> { #[nativelink_test] async fn read_partial() -> Result<(), Error> { const VALUE1: &str = "1234"; - let store_owned = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let store_owned = MemoryStore::new(&MemorySpec::default()); let store = Pin::new(&store_owned); let digest = DigestInfo::try_new(VALID_HASH1, 4).unwrap(); @@ -153,7 +154,7 @@ async fn read_partial() -> Result<(), Error> { #[nativelink_test] async fn read_zero_size_item_test() -> Result<(), Error> { const VALUE: &str = ""; - let store_owned = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let store_owned = MemoryStore::new(&MemorySpec::default()); let store = Pin::new(&store_owned); // Insert dummy value into store. @@ -173,7 +174,7 @@ async fn read_zero_size_item_test() -> Result<(), Error> { #[nativelink_test] async fn errors_with_invalid_inputs() -> Result<(), Error> { const VALUE1: &str = "123"; - let store_owned = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let store_owned = MemoryStore::new(&MemorySpec::default()); let store = Pin::new(store_owned.as_ref()); { // .has() tests. @@ -241,7 +242,7 @@ async fn errors_with_invalid_inputs() -> Result<(), Error> { async fn get_part_is_zero_digest() -> Result<(), Error> { let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); - let store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let store = MemoryStore::new(&MemorySpec::default()); let store_clone = store.clone(); let (mut writer, mut reader) = make_buf_channel_pair(); @@ -269,7 +270,7 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> { let keys = vec![digest.into()]; let mut results = vec![None]; - let store_owned = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let store_owned = MemoryStore::new(&MemorySpec::default()); let store = Pin::new(&store_owned); let _ = store @@ -304,7 +305,7 @@ async fn list_test() -> Result<(), Error> { const KEY3: StoreKey = StoreKey::new_str("key3"); const VALUE: &str = "value1"; - let store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let store = MemoryStore::new(&MemorySpec::default()); store.update_oneshot(KEY1, VALUE.into()).await?; store.update_oneshot(KEY2, VALUE.into()).await?; store.update_oneshot(KEY3, VALUE.into()).await?; diff --git a/nativelink-store/tests/ref_store_test.rs b/nativelink-store/tests/ref_store_test.rs index d47ab4a25..18b1463a0 100644 --- a/nativelink-store/tests/ref_store_test.rs +++ b/nativelink-store/tests/ref_store_test.rs @@ -15,6 +15,7 @@ use std::ptr::from_ref; use std::sync::Arc; +use nativelink_config::stores::{MemorySpec, RefSpec}; use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_store::memory_store::MemoryStore; @@ -29,13 +30,11 @@ const VALID_HASH1: &str = "0123456789abcdef0000000000000000000100000000000001234 fn setup_stores() -> (Arc, Store, Store) { let store_manager = Arc::new(StoreManager::new()); - let memory_store = Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )); + let memory_store = Store::new(MemoryStore::new(&MemorySpec::default())); store_manager.add_store("foo", memory_store.clone()); let ref_store = Store::new(RefStore::new( - &nativelink_config::stores::RefStore { + &RefSpec { name: "foo".to_string(), }, Arc::downgrade(&store_manager), @@ -140,13 +139,11 @@ async fn update_test() -> Result<(), Error> { async fn inner_store_test() -> Result<(), Error> { let store_manager = Arc::new(StoreManager::new()); - let memory_store = Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )); + let memory_store = Store::new(MemoryStore::new(&MemorySpec::default())); store_manager.add_store("mem_store", memory_store.clone()); let ref_store_inner = Store::new(RefStore::new( - &nativelink_config::stores::RefStore { + &RefSpec { name: "mem_store".to_string(), }, Arc::downgrade(&store_manager), @@ -154,7 +151,7 @@ async fn inner_store_test() -> Result<(), Error> { store_manager.add_store("ref_store_inner", ref_store_inner.clone()); let ref_store_outer = Store::new(RefStore::new( - &nativelink_config::stores::RefStore { + &RefSpec { name: "ref_store_inner".to_string(), }, Arc::downgrade(&store_manager), diff --git a/nativelink-store/tests/s3_store_test.rs b/nativelink-store/tests/s3_store_test.rs index e25ce5154..d8ca9085c 100644 --- a/nativelink-store/tests/s3_store_test.rs +++ b/nativelink-store/tests/s3_store_test.rs @@ -26,6 +26,7 @@ use http::header; use http::status::StatusCode; use hyper::Body; use mock_instant::thread_local::MockClock; +use nativelink_config::stores::S3Spec; use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::s3_store::S3Store; @@ -59,7 +60,7 @@ async fn simple_has_object_found() -> Result<(), Error> { .build(); let s3_client = aws_sdk_s3::Client::from_conf(test_config); let store = S3Store::new_with_client_and_jitter( - &nativelink_config::stores::S3Store { + &S3Spec { bucket: BUCKET_NAME.to_string(), ..Default::default() }, @@ -94,7 +95,7 @@ async fn simple_has_object_not_found() -> Result<(), Error> { .build(); let s3_client = aws_sdk_s3::Client::from_conf(test_config); let store = S3Store::new_with_client_and_jitter( - &nativelink_config::stores::S3Store { + &S3Spec { bucket: BUCKET_NAME.to_string(), ..Default::default() }, @@ -153,7 +154,7 @@ async fn simple_has_retries() -> Result<(), Error> { let s3_client = aws_sdk_s3::Client::from_conf(test_config); let store = S3Store::new_with_client_and_jitter( - &nativelink_config::stores::S3Store { + &S3Spec { bucket: BUCKET_NAME.to_string(), retry: nativelink_config::stores::Retry { max_retries: 1024, @@ -205,7 +206,7 @@ async fn simple_update_ac() -> Result<(), Error> { .build(); let s3_client = aws_sdk_s3::Client::from_conf(test_config); let store = S3Store::new_with_client_and_jitter( - &nativelink_config::stores::S3Store { + &S3Spec { bucket: BUCKET_NAME.to_string(), ..Default::default() }, @@ -293,7 +294,7 @@ async fn simple_get_ac() -> Result<(), Error> { .build(); let s3_client = aws_sdk_s3::Client::from_conf(test_config); let store = S3Store::new_with_client_and_jitter( - &nativelink_config::stores::S3Store { + &S3Spec { bucket: BUCKET_NAME.to_string(), ..Default::default() }, @@ -338,7 +339,7 @@ async fn smoke_test_get_part() -> Result<(), Error> { .build(); let s3_client = aws_sdk_s3::Client::from_conf(test_config); let store = S3Store::new_with_client_and_jitter( - &nativelink_config::stores::S3Store { + &S3Spec { bucket: BUCKET_NAME.to_string(), ..Default::default() }, @@ -399,7 +400,7 @@ async fn get_part_simple_retries() -> Result<(), Error> { let s3_client = aws_sdk_s3::Client::from_conf(test_config); let store = S3Store::new_with_client_and_jitter( - &nativelink_config::stores::S3Store { + &S3Spec { bucket: BUCKET_NAME.to_string(), retry: nativelink_config::stores::Retry { max_retries: 1024, @@ -530,7 +531,7 @@ async fn multipart_update_large_cas() -> Result<(), Error> { .build(); let s3_client = aws_sdk_s3::Client::from_conf(test_config); let store = S3Store::new_with_client_and_jitter( - &nativelink_config::stores::S3Store { + &S3Spec { bucket: BUCKET_NAME.to_string(), ..Default::default() }, @@ -570,7 +571,7 @@ async fn ensure_empty_string_in_stream_works_test() -> Result<(), Error> { .build(); let s3_client = aws_sdk_s3::Client::from_conf(test_config); let store = S3Store::new_with_client_and_jitter( - &nativelink_config::stores::S3Store { + &S3Spec { bucket: BUCKET_NAME.to_string(), ..Default::default() }, @@ -613,7 +614,7 @@ async fn get_part_is_zero_digest() -> Result<(), Error> { .build(); let s3_client = aws_sdk_s3::Client::from_conf(test_config); let store = Arc::new(S3Store::new_with_client_and_jitter( - &nativelink_config::stores::S3Store { + &S3Spec { bucket: BUCKET_NAME.to_string(), ..Default::default() }, @@ -655,7 +656,7 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> { .build(); let s3_client = aws_sdk_s3::Client::from_conf(test_config); let store = S3Store::new_with_client_and_jitter( - &nativelink_config::stores::S3Store { + &S3Spec { bucket: BUCKET_NAME.to_string(), ..Default::default() }, @@ -698,7 +699,7 @@ async fn has_with_expired_result() -> Result<(), Error> { .build(); let s3_client = aws_sdk_s3::Client::from_conf(test_config); let store = S3Store::new_with_client_and_jitter( - &nativelink_config::stores::S3Store { + &S3Spec { bucket: BUCKET_NAME.to_string(), consider_expired_after_s: 2 * 24 * 60 * 60, // 2 days. ..Default::default() diff --git a/nativelink-store/tests/shard_store_test.rs b/nativelink-store/tests/shard_store_test.rs index ac67b855c..ea130bcd5 100644 --- a/nativelink-store/tests/shard_store_test.rs +++ b/nativelink-store/tests/shard_store_test.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use nativelink_config::stores::{MemorySpec, ShardSpec, StoreSpec}; use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_store::memory_store::MemoryStore; @@ -28,15 +29,15 @@ use rand::{Rng, SeedableRng}; const MEGABYTE_SZ: usize = 1024 * 1024; fn make_stores(weights: &[u32]) -> (Arc, Vec>) { - let memory_store_config = nativelink_config::stores::MemoryStore::default(); - let store_config = nativelink_config::stores::StoreConfig::memory(memory_store_config.clone()); + let memory_store_config = MemorySpec::default(); + let store_config = StoreSpec::memory(memory_store_config.clone()); let stores: Vec<_> = weights .iter() .map(|_| MemoryStore::new(&memory_store_config)) .collect(); let shard_store = ShardStore::new( - &nativelink_config::stores::ShardStore { + &ShardSpec { stores: weights .iter() .map(|weight| nativelink_config::stores::ShardConfig { diff --git a/nativelink-store/tests/size_partitioning_store_test.rs b/nativelink-store/tests/size_partitioning_store_test.rs index 0339121ec..0dec2ae9c 100644 --- a/nativelink-store/tests/size_partitioning_store_test.rs +++ b/nativelink-store/tests/size_partitioning_store_test.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use nativelink_config::stores::{MemorySpec, SizePartitioningSpec, StoreSpec}; use nativelink_error::Error; use nativelink_macro::nativelink_test; use nativelink_store::memory_store::MemoryStore; @@ -37,18 +38,14 @@ fn setup_stores( Arc, Arc, ) { - let lower_memory_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); - let upper_memory_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let lower_memory_store = MemoryStore::new(&MemorySpec::default()); + let upper_memory_store = MemoryStore::new(&MemorySpec::default()); let size_part_store = SizePartitioningStore::new( - &nativelink_config::stores::SizePartitioningStore { + &SizePartitioningSpec { size, - lower_store: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), - upper_store: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + lower_store: StoreSpec::memory(MemorySpec::default()), + upper_store: StoreSpec::memory(MemorySpec::default()), }, Store::new(lower_memory_store.clone()), Store::new(upper_memory_store.clone()), diff --git a/nativelink-store/tests/verify_store_test.rs b/nativelink-store/tests/verify_store_test.rs index 93d92b907..c0225c90b 100644 --- a/nativelink-store/tests/verify_store_test.rs +++ b/nativelink-store/tests/verify_store_test.rs @@ -16,6 +16,7 @@ use std::pin::Pin; use futures::future::pending; use futures::try_join; +use nativelink_config::stores::{MemorySpec, StoreSpec, VerifySpec}; use nativelink_error::{Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_store::memory_store::MemoryStore; @@ -34,12 +35,10 @@ const VALID_HASH1: &str = "0123456789abcdef0000000000000000000100000000000001234 async fn verify_size_false_passes_on_update() -> Result<(), Error> { const VALUE1: &str = "123"; - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store = VerifyStore::new( - &nativelink_config::stores::VerifyStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &VerifySpec { + backend: StoreSpec::memory(MemorySpec::default()), verify_size: false, verify_hash: false, }, @@ -67,12 +66,10 @@ async fn verify_size_true_fails_on_update() -> Result<(), Error> { const VALUE1: &str = "123"; const EXPECTED_ERR: &str = "Expected size 100 but got size 3 on insert"; - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store = VerifyStore::new( - &nativelink_config::stores::VerifyStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &VerifySpec { + backend: StoreSpec::memory(MemorySpec::default()), verify_size: true, verify_hash: false, }, @@ -107,12 +104,10 @@ async fn verify_size_true_fails_on_update() -> Result<(), Error> { async fn verify_size_true_suceeds_on_update() -> Result<(), Error> { const VALUE1: &str = "123"; - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store = VerifyStore::new( - &nativelink_config::stores::VerifyStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &VerifySpec { + backend: StoreSpec::memory(MemorySpec::default()), verify_size: true, verify_hash: false, }, @@ -132,12 +127,10 @@ async fn verify_size_true_suceeds_on_update() -> Result<(), Error> { #[nativelink_test] async fn verify_size_true_suceeds_on_multi_chunk_stream_update() -> Result<(), Error> { - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store = VerifyStore::new( - &nativelink_config::stores::VerifyStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &VerifySpec { + backend: StoreSpec::memory(MemorySpec::default()), verify_size: true, verify_hash: false, }, @@ -174,12 +167,10 @@ async fn verify_sha256_hash_true_suceeds_on_update() -> Result<(), Error> { const HASH: &str = "a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3"; const VALUE: &str = "123"; - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store = VerifyStore::new( - &nativelink_config::stores::VerifyStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &VerifySpec { + backend: StoreSpec::memory(MemorySpec::default()), verify_size: false, verify_hash: true, }, @@ -204,12 +195,10 @@ async fn verify_sha256_hash_true_fails_on_update() -> Result<(), Error> { const VALUE: &str = "123"; const ACTUAL_HASH: &str = "a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3"; - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store = VerifyStore::new( - &nativelink_config::stores::VerifyStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &VerifySpec { + backend: StoreSpec::memory(MemorySpec::default()), verify_size: false, verify_hash: true, }, @@ -239,12 +228,10 @@ async fn verify_blake3_hash_true_suceeds_on_update() -> Result<(), Error> { const HASH: &str = "b3d4f8803f7e24b8f389b072e75477cdbcfbe074080fb5e500e53e26e054158e"; const VALUE: &str = "123"; - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store = VerifyStore::new( - &nativelink_config::stores::VerifyStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &VerifySpec { + backend: StoreSpec::memory(MemorySpec::default()), verify_size: false, verify_hash: true, }, @@ -275,12 +262,10 @@ async fn verify_blake3_hash_true_fails_on_update() -> Result<(), Error> { const VALUE: &str = "123"; const ACTUAL_HASH: &str = "b3d4f8803f7e24b8f389b072e75477cdbcfbe074080fb5e500e53e26e054158e"; - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store = VerifyStore::new( - &nativelink_config::stores::VerifyStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &VerifySpec { + backend: StoreSpec::memory(MemorySpec::default()), verify_size: false, verify_hash: true, }, @@ -320,12 +305,10 @@ async fn verify_fails_immediately_on_too_much_data_sent_update() -> Result<(), E const VALUE: &str = "123"; const EXPECTED_ERR: &str = "Expected size 4 but already received 6 on insert"; - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store = VerifyStore::new( - &nativelink_config::stores::VerifyStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &VerifySpec { + backend: StoreSpec::memory(MemorySpec::default()), verify_size: true, verify_hash: false, }, @@ -366,12 +349,10 @@ async fn verify_size_and_hash_suceeds_on_small_data() -> Result<(), Error> { const HASH: &str = "a665a45920422f9d417e4867efdc4fb8a04a1f3fff1fa07e998e86f7f7a27ae3"; const VALUE: &str = "123"; - let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); + let inner_store = MemoryStore::new(&MemorySpec::default()); let store = VerifyStore::new( - &nativelink_config::stores::VerifyStore { - backend: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + &VerifySpec { + backend: StoreSpec::memory(MemorySpec::default()), verify_size: true, verify_hash: true, }, diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs index f7fe2d1ce..39534e075 100644 --- a/nativelink-worker/tests/local_worker_test.rs +++ b/nativelink-worker/tests/local_worker_test.rs @@ -31,6 +31,7 @@ mod utils { use hyper::body::Frame; use nativelink_config::cas_server::{LocalWorkerConfig, WorkerProperty}; +use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec}; use nativelink_error::{make_err, make_input_err, Code, Error}; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::platform::Property; @@ -406,30 +407,22 @@ async fn simple_worker_start_action_test() -> Result<(), Box Result<(), Box> { let cas_store = Store::new(FastSlowStore::new( - &nativelink_config::stores::FastSlowStore { + &FastSlowSpec { // Note: These are not needed for this test, so we put dummy memory stores here. - fast: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), - slow: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + fast: StoreSpec::memory(MemorySpec::default()), + slow: StoreSpec::memory(MemorySpec::default()), }, Store::new( - ::new(&nativelink_config::stores::FilesystemStore { + ::new(&FilesystemSpec { content_path: make_temp_path("content_path"), temp_path: make_temp_path("temp_path"), ..Default::default() }) .await?, ), - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), - )); - let ac_store = Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), + Store::new(MemoryStore::new(&MemorySpec::default())), )); + let ac_store = Store::new(MemoryStore::new(&MemorySpec::default())); let work_directory = make_temp_path("foo"); new_local_worker( Arc::new(LocalWorkerConfig { @@ -454,30 +447,22 @@ async fn new_local_worker_creates_work_directory_test() -> Result<(), Box Result<(), Box> { let cas_store = Store::new(FastSlowStore::new( - &nativelink_config::stores::FastSlowStore { + &FastSlowSpec { // Note: These are not needed for this test, so we put dummy memory stores here. - fast: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), - slow: nativelink_config::stores::StoreConfig::memory( - nativelink_config::stores::MemoryStore::default(), - ), + fast: StoreSpec::memory(MemorySpec::default()), + slow: StoreSpec::memory(MemorySpec::default()), }, Store::new( - ::new(&nativelink_config::stores::FilesystemStore { + ::new(&FilesystemSpec { content_path: make_temp_path("content_path"), temp_path: make_temp_path("temp_path"), ..Default::default() }) .await?, ), - Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )), - )); - let ac_store = Store::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), + Store::new(MemoryStore::new(&MemorySpec::default())), )); + let ac_store = Store::new(MemoryStore::new(&MemorySpec::default())); let work_directory = make_temp_path("foo"); fs::create_dir_all(format!("{}/{}", work_directory, "another_dir")).await?; let mut file = diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs index 4328190ba..60cfe9b69 100644 --- a/nativelink-worker/tests/running_actions_manager_test.rs +++ b/nativelink-worker/tests/running_actions_manager_test.rs @@ -27,6 +27,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use nativelink_config::cas_server::EnvironmentSource; +use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec}; use nativelink_error::{make_input_err, Code, Error, ResultExt}; use nativelink_macro::nativelink_test; use nativelink_proto::build::bazel::remote::execution::v2::command::EnvironmentVariable; @@ -88,20 +89,20 @@ async fn setup_stores() -> Result< ), Error, > { - let fast_config = nativelink_config::stores::FilesystemStore { + let fast_config = FilesystemSpec { content_path: make_temp_path("content_path"), temp_path: make_temp_path("temp_path"), eviction_policy: None, ..Default::default() }; - let slow_config = nativelink_config::stores::MemoryStore::default(); + let slow_config = MemorySpec::default(); let fast_store = FilesystemStore::new(&fast_config).await?; let slow_store = MemoryStore::new(&slow_config); let ac_store = MemoryStore::new(&slow_config); let cas_store = FastSlowStore::new( - &nativelink_config::stores::FastSlowStore { - fast: nativelink_config::stores::StoreConfig::filesystem(fast_config), - slow: nativelink_config::stores::StoreConfig::memory(slow_config), + &FastSlowSpec { + fast: StoreSpec::filesystem(fast_config), + slow: StoreSpec::memory(slow_config), }, Store::new(fast_store.clone()), Store::new(slow_store.clone()),