Skip to content

Commit

Permalink
Add in-mem cache of dataset IDs and aliases for S3 repo
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Mar 19, 2024
1 parent 8b37172 commit ed26a2a
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 37 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
### Added
- Implementation of `ObjectRepository` that can cache small objects on local file system (e.g. to avoid too many calls to S3 repo)
- Optional `S3RegistryCache` component that can cache the list of datasets under an S3 repo to avoid very expensive bucket prefix listing calls

## [0.166.1] - 2024-03-14
### Fixed
- Allow OData adapter to skip fields with unsupported data types instead of chasing
Expand Down
153 changes: 118 additions & 35 deletions src/infra/core/src/repos/dataset_repository_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use std::path::PathBuf;
use std::sync::Arc;

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use dill::*;
use event_bus::EventBus;
use kamu_core::auth::{DatasetAction, DatasetActionAuthorizer, DEFAULT_ACCOUNT_NAME};
use kamu_core::*;
use opendatafabric::*;
use tokio::sync::Mutex;
use url::Url;

use crate::utils::s3_context::S3Context;
Expand All @@ -30,20 +32,31 @@ pub struct DatasetRepositoryS3 {
dependency_graph_service: Arc<dyn DependencyGraphService>,
event_bus: Arc<EventBus>,
multi_tenant: bool,
registry_cache: Option<Arc<S3RegistryCache>>,
metadata_cache_local_fs_path: Option<Arc<PathBuf>>,
}

/////////////////////////////////////////////////////////////////////////////////////////

#[component(pub)]
impl DatasetRepositoryS3 {
/// # Arguments
///
/// * `registry_cache` - when present in the catalog enables in-memory cache
/// of the dataset IDs and aliases present in the repository, allowing to
/// avoid expensive bucket scanning
///
/// * `metadata_cache_local_fs_path` - when specified enables the local FS
/// cache of metadata blocks, allowing to dramatically reduce the number
/// of requests to S3
pub fn new(
s3_context: S3Context,
current_account_subject: Arc<CurrentAccountSubject>,
dataset_action_authorizer: Arc<dyn DatasetActionAuthorizer>,
dependency_graph_service: Arc<dyn DependencyGraphService>,
event_bus: Arc<EventBus>,
multi_tenant: bool,
registry_cache: Option<Arc<S3RegistryCache>>,
metadata_cache_local_fs_path: Option<Arc<PathBuf>>,
) -> Self {
Self {
Expand All @@ -53,11 +66,12 @@ impl DatasetRepositoryS3 {
dependency_graph_service,
event_bus,
multi_tenant,
registry_cache,
metadata_cache_local_fs_path,
}
}

fn get_dataset_impl(&self, dataset_id: &DatasetID) -> Result<Arc<dyn Dataset>, InternalError> {
fn get_dataset_impl(&self, dataset_id: &DatasetID) -> Arc<dyn Dataset> {
let s3_context = self
.s3_context
.sub_context(&format!("{}/", &dataset_id.as_multibase()));
Expand All @@ -70,7 +84,7 @@ impl DatasetRepositoryS3 {
// TODO: Consider switching DatasetImpl to dynamic dispatch to simplify
// configurability
if let Some(metadata_cache_local_fs_path) = &self.metadata_cache_local_fs_path {
Ok(Arc::new(DatasetImpl::new(
Arc::new(DatasetImpl::new(
self.event_bus.clone(),
MetadataChainImpl::new(
MetadataBlockRepositoryCachingInMem::new(MetadataBlockRepositoryImpl::new(
Expand Down Expand Up @@ -109,9 +123,9 @@ impl DatasetRepositoryS3 {
bucket.clone(),
format!("{key_prefix}info/"),
)),
)))
))
} else {
Ok(Arc::new(DatasetImpl::new(
Arc::new(DatasetImpl::new(
self.event_bus.clone(),
MetadataChainImpl::new(
MetadataBlockRepositoryCachingInMem::new(MetadataBlockRepositoryImpl::new(
Expand Down Expand Up @@ -147,7 +161,7 @@ impl DatasetRepositoryS3 {
bucket.clone(),
format!("{key_prefix}info/"),
)),
)))
))
}
}

Expand All @@ -171,7 +185,7 @@ impl DatasetRepositoryS3 {
async fn save_dataset_alias(
&self,
dataset: &dyn Dataset,
dataset_alias: DatasetAlias,
dataset_alias: &DatasetAlias,
) -> Result<(), InternalError> {
dataset
.as_info_repo()
Expand All @@ -182,26 +196,53 @@ impl DatasetRepositoryS3 {
Ok(())
}

#[tracing::instrument(level = "info", skip_all)]
async fn list_datasets_in_s3(&self) -> Result<Vec<DatasetHandle>, InternalError> {
let mut res = Vec::new();

let folders_common_prefixes = self.s3_context.bucket_list_folders().await?;

for prefix in folders_common_prefixes {
let mut prefix = prefix.prefix.unwrap();
while prefix.ends_with('/') {
prefix.pop();
}

if let Ok(id) = DatasetID::from_multibase_string(&prefix) {
let dataset = self.get_dataset_impl(&id);
let dataset_alias = self.resolve_dataset_alias(dataset.as_ref()).await?;
res.push(DatasetHandle::new(id, dataset_alias));
}
}

Ok(res)
}

async fn list_datasets_maybe_cached(&self) -> Result<Vec<DatasetHandle>, InternalError> {
if let Some(cache) = &self.registry_cache {
let mut cache = cache.state.lock().await;

// Init cache
if cache.last_updated == DateTime::UNIX_EPOCH {
tracing::debug!("Initializing dataset registry cache");
cache.datasets = self.list_datasets_in_s3().await?;
cache.last_updated = Utc::now();
}

Ok(cache.datasets.clone())
} else {
self.list_datasets_in_s3().await
}
}

fn stream_datasets_if<'s>(
&'s self,
alias_filter: impl Fn(&DatasetAlias) -> bool + Send + 's,
) -> DatasetHandleStream<'s> {
Box::pin(async_stream::try_stream! {
let folders_common_prefixes = self.s3_context.bucket_list_folders().await?;
for prefix in folders_common_prefixes {
let mut prefix = prefix.prefix.unwrap();
while prefix.ends_with('/') {
prefix.pop();
}

if let Ok(id) = DatasetID::from_multibase_string(&prefix) {
let dataset = self.get_dataset_impl(&id)?;
let dataset_alias = self.resolve_dataset_alias(dataset.as_ref()).await?;
if alias_filter(&dataset_alias) {
let hdl = DatasetHandle::new(id, dataset_alias);
yield hdl;
}

for hdl in self.list_datasets_maybe_cached().await? {
if alias_filter(&hdl.alias) {
yield hdl;
}
}
})
Expand Down Expand Up @@ -269,7 +310,7 @@ impl DatasetRepository for DatasetRepositoryS3 {
.bucket_path_exists(id.as_multibase().to_stack_string().as_str())
.await?
{
let dataset = self.get_dataset_impl(id)?;
let dataset = self.get_dataset_impl(id);
let dataset_alias = self.resolve_dataset_alias(dataset.as_ref()).await?;
Ok(DatasetHandle::new(id.clone(), dataset_alias))
} else {
Expand Down Expand Up @@ -305,7 +346,7 @@ impl DatasetRepository for DatasetRepositoryS3 {
dataset_ref: &DatasetRef,
) -> Result<Arc<dyn Dataset>, GetDatasetError> {
let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?;
let dataset = self.get_dataset_impl(&dataset_handle.id)?;
let dataset = self.get_dataset_impl(&dataset_handle.id);
Ok(dataset)
}

Expand Down Expand Up @@ -361,7 +402,7 @@ impl DatasetRepository for DatasetRepositoryS3 {
// It's okay to create a new dataset by this point

let dataset_id = seed_block.event.dataset_id.clone();
let dataset = self.get_dataset_impl(&dataset_id)?;
let dataset = self.get_dataset_impl(&dataset_id);

// There are three possibilities at this point:
// - Dataset did not exist before - continue normally
Expand All @@ -387,11 +428,17 @@ impl DatasetRepository for DatasetRepositoryS3 {
};

let normalized_alias = self.normalize_alias(dataset_alias);
self.save_dataset_alias(dataset.as_ref(), normalized_alias)
self.save_dataset_alias(dataset.as_ref(), &normalized_alias)
.await?;

let dataset_handle = DatasetHandle::new(dataset_id, dataset_alias.clone());

// Update cache if enabled
if let Some(cache) = &self.registry_cache {
let mut cache = cache.state.lock().await;
cache.datasets.push(dataset_handle.clone());
}

self.event_bus
.dispatch_event(events::DatasetEventCreated {
dataset_id: dataset_handle.id.clone(),
Expand Down Expand Up @@ -424,31 +471,37 @@ impl DatasetRepository for DatasetRepositoryS3 {
dataset_ref: &DatasetRef,
new_name: &DatasetName,
) -> Result<(), RenameDatasetError> {
let dataset_handle = self.resolve_dataset_ref(dataset_ref).await?;
let old_handle = self.resolve_dataset_ref(dataset_ref).await?;

let dataset = self.get_dataset_impl(&dataset_handle.id)?;
let dataset = self.get_dataset_impl(&old_handle.id);

let new_alias =
DatasetAlias::new(dataset_handle.alias.account_name.clone(), new_name.clone());
let new_alias = DatasetAlias::new(old_handle.alias.account_name.clone(), new_name.clone());

// Check against possible name collisions
match self.resolve_dataset_ref(&new_alias.as_local_ref()).await {
Ok(_) => Err(RenameDatasetError::NameCollision(NameCollisionError {
alias: DatasetAlias::new(
dataset_handle.alias.account_name.clone(),
new_name.clone(),
),
alias: DatasetAlias::new(old_handle.alias.account_name.clone(), new_name.clone()),
})),
Err(GetDatasetError::Internal(e)) => Err(RenameDatasetError::Internal(e)),
Err(GetDatasetError::NotFound(_)) => Ok(()),
}?;

self.dataset_action_authorizer
.check_action_allowed(&dataset_handle, DatasetAction::Write)
.check_action_allowed(&old_handle, DatasetAction::Write)
.await?;

// It's safe to rename dataset
self.save_dataset_alias(dataset.as_ref(), new_alias).await?;
self.save_dataset_alias(dataset.as_ref(), &new_alias)
.await?;

// Update cache if enabled
if let Some(cache) = &self.registry_cache {
let mut cache = cache.state.lock().await;
cache.datasets.retain(|h| h.id != old_handle.id);
cache
.datasets
.push(DatasetHandle::new(old_handle.id, new_alias));
}

Ok(())
}
Expand Down Expand Up @@ -494,6 +547,12 @@ impl DatasetRepository for DatasetRepositoryS3 {
.await
.map_err(DeleteDatasetError::Internal)?;

// Update cache if enabled
if let Some(cache) = &self.registry_cache {
let mut cache = cache.state.lock().await;
cache.datasets.retain(|h| h.id != dataset_handle.id);
}

self.event_bus
.dispatch_event(events::DatasetEventDeleted {
dataset_id: dataset_handle.id,
Expand All @@ -505,3 +564,27 @@ impl DatasetRepository for DatasetRepositoryS3 {
}

/////////////////////////////////////////////////////////////////////////////////////////

pub struct S3RegistryCache {
state: Arc<Mutex<State>>,
}

struct State {
datasets: Vec<DatasetHandle>,
last_updated: DateTime<Utc>,
}

#[component(pub)]
#[dill::scope(Singleton)]
impl S3RegistryCache {
pub fn new() -> Self {
Self {
state: Arc::new(Mutex::new(State {
datasets: Vec::new(),
last_updated: DateTime::UNIX_EPOCH,
})),
}
}
}

/////////////////////////////////////////////////////////////////////////////////////////
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::utils::s3_context::AsyncReadObj;
/////////////////////////////////////////////////////////////////////////////////////////

/// A read-through and (partially) a write-through caching layer for
/// [ObjectRepository] using a local file system.
/// [`ObjectRepository`] using a local file system.
///
/// Currently caches objects forever, so a cache directory cleanup has to be
/// handled separately.
Expand Down Expand Up @@ -140,7 +140,7 @@ where
) -> Result<InsertResult, InsertError> {
let res = self.wrapped.insert_bytes(data, options).await?;
let cache_path = self.cache_path(&res.hash);
std::fs::write(&cache_path, data).int_err()?;
std::fs::write(cache_path, data).int_err()?;
Ok(res)
}

Expand Down

0 comments on commit ed26a2a

Please sign in to comment.