From ed26a2aeb6811979928b3dd6ebd4b325d9ef6328 Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Mon, 18 Mar 2024 18:30:34 -0700 Subject: [PATCH] Add in-mem cache of dataset IDs and aliases for S3 repo --- CHANGELOG.md | 5 + .../core/src/repos/dataset_repository_s3.rs | 153 ++++++++++++++---- .../object_repository_caching_local_fs.rs | 4 +- 3 files changed, 125 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bdb3ce069..c1109e120 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased +### Added +- Implementation of `ObjectRepository` that can cache small objects on local file system (e.g. to avoid too many calls to S3 repo) +- Optional `S3RegistryCache` component that can cache the list of datasets under an S3 repo to avoid very expensive bucket prefix listing calls + ## [0.166.1] - 2024-03-14 ### Fixed - Allow OData adapter to skip fields with unsupported data types instead of chasing diff --git a/src/infra/core/src/repos/dataset_repository_s3.rs b/src/infra/core/src/repos/dataset_repository_s3.rs index b98e5fb31..e93efa5d9 100644 --- a/src/infra/core/src/repos/dataset_repository_s3.rs +++ b/src/infra/core/src/repos/dataset_repository_s3.rs @@ -11,11 +11,13 @@ use std::path::PathBuf; use std::sync::Arc; use async_trait::async_trait; +use chrono::{DateTime, Utc}; use dill::*; use event_bus::EventBus; use kamu_core::auth::{DatasetAction, DatasetActionAuthorizer, DEFAULT_ACCOUNT_NAME}; use kamu_core::*; use opendatafabric::*; +use tokio::sync::Mutex; use url::Url; use crate::utils::s3_context::S3Context; @@ -30,6 +32,7 @@ pub struct DatasetRepositoryS3 { dependency_graph_service: Arc, event_bus: Arc, multi_tenant: bool, + registry_cache: Option>, metadata_cache_local_fs_path: Option>, } @@ -37,6 +40,15 @@ pub struct DatasetRepositoryS3 { #[component(pub)] impl DatasetRepositoryS3 { + /// # Arguments + /// + /// * `registry_cache` - when present in the catalog enables in-memory cache + /// of the dataset IDs and aliases present in the repository, allowing to + /// avoid expensive bucket scanning + /// + /// * `metadata_cache_local_fs_path` - when specified enables the local FS + /// cache of metadata blocks, allowing to dramatically reduce the number + /// of requests to S3 pub fn new( s3_context: S3Context, current_account_subject: Arc, @@ -44,6 +56,7 @@ impl DatasetRepositoryS3 { dependency_graph_service: Arc, event_bus: Arc, multi_tenant: bool, + registry_cache: Option>, metadata_cache_local_fs_path: Option>, ) -> Self { Self { @@ -53,11 +66,12 @@ impl DatasetRepositoryS3 { dependency_graph_service, event_bus, multi_tenant, + registry_cache, metadata_cache_local_fs_path, } } - fn get_dataset_impl(&self, dataset_id: &DatasetID) -> Result, InternalError> { + fn get_dataset_impl(&self, dataset_id: &DatasetID) -> Arc { let s3_context = self .s3_context .sub_context(&format!("{}/", &dataset_id.as_multibase())); @@ -70,7 +84,7 @@ impl DatasetRepositoryS3 { // TODO: Consider switching DatasetImpl to dynamic dispatch to simplify // configurability if let Some(metadata_cache_local_fs_path) = &self.metadata_cache_local_fs_path { - Ok(Arc::new(DatasetImpl::new( + Arc::new(DatasetImpl::new( self.event_bus.clone(), MetadataChainImpl::new( MetadataBlockRepositoryCachingInMem::new(MetadataBlockRepositoryImpl::new( @@ -109,9 +123,9 @@ impl DatasetRepositoryS3 { bucket.clone(), format!("{key_prefix}info/"), )), - ))) + )) } else { - Ok(Arc::new(DatasetImpl::new( + Arc::new(DatasetImpl::new( self.event_bus.clone(), MetadataChainImpl::new( MetadataBlockRepositoryCachingInMem::new(MetadataBlockRepositoryImpl::new( @@ -147,7 +161,7 @@ impl DatasetRepositoryS3 { bucket.clone(), format!("{key_prefix}info/"), )), - ))) + )) } } @@ -171,7 +185,7 @@ impl DatasetRepositoryS3 { async fn save_dataset_alias( &self, dataset: &dyn Dataset, - dataset_alias: DatasetAlias, + dataset_alias: &DatasetAlias, ) -> Result<(), InternalError> { dataset .as_info_repo() @@ -182,26 +196,53 @@ impl DatasetRepositoryS3 { Ok(()) } + #[tracing::instrument(level = "info", skip_all)] + async fn list_datasets_in_s3(&self) -> Result, InternalError> { + let mut res = Vec::new(); + + let folders_common_prefixes = self.s3_context.bucket_list_folders().await?; + + for prefix in folders_common_prefixes { + let mut prefix = prefix.prefix.unwrap(); + while prefix.ends_with('/') { + prefix.pop(); + } + + if let Ok(id) = DatasetID::from_multibase_string(&prefix) { + let dataset = self.get_dataset_impl(&id); + let dataset_alias = self.resolve_dataset_alias(dataset.as_ref()).await?; + res.push(DatasetHandle::new(id, dataset_alias)); + } + } + + Ok(res) + } + + async fn list_datasets_maybe_cached(&self) -> Result, InternalError> { + if let Some(cache) = &self.registry_cache { + let mut cache = cache.state.lock().await; + + // Init cache + if cache.last_updated == DateTime::UNIX_EPOCH { + tracing::debug!("Initializing dataset registry cache"); + cache.datasets = self.list_datasets_in_s3().await?; + cache.last_updated = Utc::now(); + } + + Ok(cache.datasets.clone()) + } else { + self.list_datasets_in_s3().await + } + } + fn stream_datasets_if<'s>( &'s self, alias_filter: impl Fn(&DatasetAlias) -> bool + Send + 's, ) -> DatasetHandleStream<'s> { Box::pin(async_stream::try_stream! { - let folders_common_prefixes = self.s3_context.bucket_list_folders().await?; - for prefix in folders_common_prefixes { - let mut prefix = prefix.prefix.unwrap(); - while prefix.ends_with('/') { - prefix.pop(); - } - - if let Ok(id) = DatasetID::from_multibase_string(&prefix) { - let dataset = self.get_dataset_impl(&id)?; - let dataset_alias = self.resolve_dataset_alias(dataset.as_ref()).await?; - if alias_filter(&dataset_alias) { - let hdl = DatasetHandle::new(id, dataset_alias); - yield hdl; - } - + for hdl in self.list_datasets_maybe_cached().await? { + if alias_filter(&hdl.alias) { + yield hdl; } } }) @@ -269,7 +310,7 @@ impl DatasetRepository for DatasetRepositoryS3 { .bucket_path_exists(id.as_multibase().to_stack_string().as_str()) .await? { - let dataset = self.get_dataset_impl(id)?; + let dataset = self.get_dataset_impl(id); let dataset_alias = self.resolve_dataset_alias(dataset.as_ref()).await?; Ok(DatasetHandle::new(id.clone(), dataset_alias)) } else { @@ -305,7 +346,7 @@ impl DatasetRepository for DatasetRepositoryS3 { dataset_ref: &DatasetRef, ) -> Result, GetDatasetError> { let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?; - let dataset = self.get_dataset_impl(&dataset_handle.id)?; + let dataset = self.get_dataset_impl(&dataset_handle.id); Ok(dataset) } @@ -361,7 +402,7 @@ impl DatasetRepository for DatasetRepositoryS3 { // It's okay to create a new dataset by this point let dataset_id = seed_block.event.dataset_id.clone(); - let dataset = self.get_dataset_impl(&dataset_id)?; + let dataset = self.get_dataset_impl(&dataset_id); // There are three possibilities at this point: // - Dataset did not exist before - continue normally @@ -387,11 +428,17 @@ impl DatasetRepository for DatasetRepositoryS3 { }; let normalized_alias = self.normalize_alias(dataset_alias); - self.save_dataset_alias(dataset.as_ref(), normalized_alias) + self.save_dataset_alias(dataset.as_ref(), &normalized_alias) .await?; let dataset_handle = DatasetHandle::new(dataset_id, dataset_alias.clone()); + // Update cache if enabled + if let Some(cache) = &self.registry_cache { + let mut cache = cache.state.lock().await; + cache.datasets.push(dataset_handle.clone()); + } + self.event_bus .dispatch_event(events::DatasetEventCreated { dataset_id: dataset_handle.id.clone(), @@ -424,31 +471,37 @@ impl DatasetRepository for DatasetRepositoryS3 { dataset_ref: &DatasetRef, new_name: &DatasetName, ) -> Result<(), RenameDatasetError> { - let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?; + let old_handle = self.resolve_dataset_ref(dataset_ref).await?; - let dataset = self.get_dataset_impl(&dataset_handle.id)?; + let dataset = self.get_dataset_impl(&old_handle.id); - let new_alias = - DatasetAlias::new(dataset_handle.alias.account_name.clone(), new_name.clone()); + let new_alias = DatasetAlias::new(old_handle.alias.account_name.clone(), new_name.clone()); // Check against possible name collisions match self.resolve_dataset_ref(&new_alias.as_local_ref()).await { Ok(_) => Err(RenameDatasetError::NameCollision(NameCollisionError { - alias: DatasetAlias::new( - dataset_handle.alias.account_name.clone(), - new_name.clone(), - ), + alias: DatasetAlias::new(old_handle.alias.account_name.clone(), new_name.clone()), })), Err(GetDatasetError::Internal(e)) => Err(RenameDatasetError::Internal(e)), Err(GetDatasetError::NotFound(_)) => Ok(()), }?; self.dataset_action_authorizer - .check_action_allowed(&dataset_handle, DatasetAction::Write) + .check_action_allowed(&old_handle, DatasetAction::Write) .await?; // It's safe to rename dataset - self.save_dataset_alias(dataset.as_ref(), new_alias).await?; + self.save_dataset_alias(dataset.as_ref(), &new_alias) + .await?; + + // Update cache if enabled + if let Some(cache) = &self.registry_cache { + let mut cache = cache.state.lock().await; + cache.datasets.retain(|h| h.id != old_handle.id); + cache + .datasets + .push(DatasetHandle::new(old_handle.id, new_alias)); + } Ok(()) } @@ -494,6 +547,12 @@ impl DatasetRepository for DatasetRepositoryS3 { .await .map_err(DeleteDatasetError::Internal)?; + // Update cache if enabled + if let Some(cache) = &self.registry_cache { + let mut cache = cache.state.lock().await; + cache.datasets.retain(|h| h.id != dataset_handle.id); + } + self.event_bus .dispatch_event(events::DatasetEventDeleted { dataset_id: dataset_handle.id, @@ -505,3 +564,27 @@ impl DatasetRepository for DatasetRepositoryS3 { } ///////////////////////////////////////////////////////////////////////////////////////// + +pub struct S3RegistryCache { + state: Arc>, +} + +struct State { + datasets: Vec, + last_updated: DateTime, +} + +#[component(pub)] +#[dill::scope(Singleton)] +impl S3RegistryCache { + pub fn new() -> Self { + Self { + state: Arc::new(Mutex::new(State { + datasets: Vec::new(), + last_updated: DateTime::UNIX_EPOCH, + })), + } + } +} + +///////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/core/src/repos/object_repository_caching_local_fs.rs b/src/infra/core/src/repos/object_repository_caching_local_fs.rs index f0af94cda..a159aceea 100644 --- a/src/infra/core/src/repos/object_repository_caching_local_fs.rs +++ b/src/infra/core/src/repos/object_repository_caching_local_fs.rs @@ -23,7 +23,7 @@ use crate::utils::s3_context::AsyncReadObj; ///////////////////////////////////////////////////////////////////////////////////////// /// A read-through and (partially) a write-through caching layer for -/// [ObjectRepository] using a local file system. +/// [`ObjectRepository`] using a local file system. /// /// Currently caches objects forever, so a cache directory cleanup has to be /// handled separately. @@ -140,7 +140,7 @@ where ) -> Result { let res = self.wrapped.insert_bytes(data, options).await?; let cache_path = self.cache_path(&res.hash); - std::fs::write(&cache_path, data).int_err()?; + std::fs::write(cache_path, data).int_err()?; Ok(res) }