Skip to content

Commit

Permalink
Fix duplicate dataset creation in fs mode (#849)
Browse files Browse the repository at this point in the history
  • Loading branch information
rmn-boiko authored Sep 26, 2024
1 parent 687a963 commit 6ac1496
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 129 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Recommendation: for ease of reading, use the following order:
- Fixed
-->

## [Unreleased]
### Fixed
- Dataset creation with unique alias but with existing id for FS dataset storage mode

## [0.204.2] - 2024-09-26
### Fixed
- `kamu init`: fixed regression in case of using `exists_ok` flag
Expand Down
71 changes: 49 additions & 22 deletions src/adapter/http/src/smart_protocol/axum_server_push_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,12 @@ impl AxumServerPushProtocolInstance {
tracing::debug!("Push process success");
}
Err(ref e @ PushServerError::Internal(ref int_err)) => {
if let Err(write_err) = axum_write_close_payload::<DatasetPushResponse>(
&mut self.socket,
Err(DatasetPushRequestError::Internal(TransferInternalError {
phase: int_err.phase.clone(),
error_message: "Internal error".to_string(),
})),
)
.await
let payload = Err(DatasetPushRequestError::Internal(TransferInternalError {
phase: int_err.phase.clone(),
error_message: "Internal error".to_string(),
}));
if let Err(write_err) =
axum_write_close_payload::<DatasetPushResponse>(&mut self.socket, payload).await
{
tracing::error!(
error = ?write_err,
Expand All @@ -102,14 +100,13 @@ impl AxumServerPushProtocolInstance {
}
Err(ref _e @ PushServerError::ReadFailed(ref err)) => {
if let ReadMessageError::IncompatibleVersion = err.read_error {
if let Err(write_err) = axum_write_close_payload::<DatasetPushResponse>(
&mut self.socket,
Err(DatasetPushRequestError::Internal(TransferInternalError {
phase: TransferPhase::Push(PushPhase::InitialRequest),
error_message: "Incompatible version.".to_string(),
})),
)
.await
let payload = Err(DatasetPushRequestError::Internal(TransferInternalError {
phase: TransferPhase::Push(PushPhase::InitialRequest),
error_message: "Incompatible version.".to_string(),
}));
if let Err(write_err) =
axum_write_close_payload::<DatasetPushResponse>(&mut self.socket, payload)
.await
{
tracing::error!(
error = ?write_err,
Expand All @@ -120,15 +117,42 @@ impl AxumServerPushProtocolInstance {
}
}
Err(ref _e @ PushServerError::RefCollision(ref err)) => {
let payload = DatasetPushObjectsTransferResponse::Err(
DatasetPushObjectsTransferError::RefCollision(
DatasetPushObjectsTransferRefCollisionError {
dataset_id: err.id.clone(),
},
),
);
if let Err(write_err) = axum_write_payload::<DatasetPushObjectsTransferResponse>(
&mut self.socket,
DatasetPushObjectsTransferResponse::Err(
DatasetPushObjectsTransferError::RefCollision(
DatasetPushObjectsTransferRefCollisionError {
dataset_id: err.id.clone(),
},
),
payload,
)
.await
{
tracing::error!(
error = ?write_err,
error_msg = %write_err,
"Failed to send error to client with error",
);
};
tracing::error!(
error = ?err,
error_msg = %err,
"Push process aborted with error",
);
}
Err(ref _e @ PushServerError::NameCollision(ref err)) => {
let payload = DatasetPushObjectsTransferResponse::Err(
DatasetPushObjectsTransferError::NameCollision(
DatasetPushObjectsTransferNameCollisionError {
dataset_alias: err.alias.clone(),
},
),
);
if let Err(write_err) = axum_write_payload::<DatasetPushObjectsTransferResponse>(
&mut self.socket,
payload,
)
.await
{
Expand Down Expand Up @@ -219,6 +243,9 @@ impl AxumServerPushProtocolInstance {
id: err.id.clone(),
}));
}
Err(ref _e @ CreateDatasetError::NameCollision(ref err)) => {
return Err(PushServerError::NameCollision(err.clone()));
}
Err(e) => {
return Err(PushServerError::Internal(PhaseInternalError {
phase: TransferPhase::Push(PushPhase::ObjectsUploadProgress),
Expand Down
8 changes: 7 additions & 1 deletion src/adapter/http/src/smart_protocol/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use std::fmt::{self, Display};

use internal_error::{BoxedError, InternalError};
use kamu_core::{InvalidIntervalError, RefCASError, RefCollisionError};
use kamu_core::{InvalidIntervalError, NameCollisionError, RefCASError, RefCollisionError};
use thiserror::Error;

use super::phases::*;
Expand Down Expand Up @@ -172,6 +172,9 @@ pub enum PushServerError {
#[error(transparent)]
RefCollision(RefCollisionError),

#[error(transparent)]
NameCollision(NameCollisionError),

#[error(transparent)]
Internal(PhaseInternalError),
}
Expand All @@ -192,6 +195,9 @@ pub enum PushClientError {
#[error(transparent)]
RefCollision(RefCollisionError),

#[error(transparent)]
NameCollision(NameCollisionError),

#[error(transparent)]
Internal(
#[from]
Expand Down
11 changes: 10 additions & 1 deletion src/adapter/http/src/smart_protocol/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use chrono::{DateTime, Utc};
use kamu_core::DatasetVisibility;
use opendatafabric::{DatasetID, Multihash};
use opendatafabric::{DatasetAlias, DatasetID, Multihash};
use serde::{Deserialize, Serialize};
use url::Url;

Expand Down Expand Up @@ -193,6 +193,7 @@ pub struct DatasetPushObjectsTransferAccepted {
pub enum DatasetPushObjectsTransferError {
Internal(TransferInternalError),
RefCollision(DatasetPushObjectsTransferRefCollisionError),
NameCollision(DatasetPushObjectsTransferNameCollisionError),
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -205,6 +206,14 @@ pub struct DatasetPushObjectsTransferRefCollisionError {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// Dataset with such id already exists
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
pub struct DatasetPushObjectsTransferNameCollisionError {
pub dataset_alias: DatasetAlias,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

/// Push object transfer strategy
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
pub struct PushObjectTransferStrategy {
Expand Down
5 changes: 5 additions & 0 deletions src/adapter/http/src/smart_protocol/ws_tungstenite_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ impl WsSmartTransferProtocolClient {
DatasetPushObjectsTransferError::RefCollision(err) => {
PushClientError::RefCollision(RefCollisionError { id: err.dataset_id })
}
DatasetPushObjectsTransferError::NameCollision(err) => {
PushClientError::NameCollision(NameCollisionError {
alias: err.dataset_alias,
})
}
DatasetPushObjectsTransferError::Internal(err) => {
PushClientError::Internal(InternalError::new(Box::new(
ClientInternalError::new(err.error_message.as_str(), err.phase),
Expand Down
11 changes: 10 additions & 1 deletion src/infra/core/src/repos/dataset_repository_local_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,16 @@ impl DatasetRepositoryWriter for DatasetRepositoryLocalFs {
.await
{
Ok(existing_handle) => Ok(Some(existing_handle)),
Err(GetDatasetError::NotFound(_)) => Ok(None),
// ToDo temporary fix, remove it on favor of
// https://github.com/kamu-data/kamu-cli/issues/342
Err(GetDatasetError::NotFound(_)) => match self
.resolve_dataset_ref(&(seed_block.event.dataset_id.clone().into()))
.await
{
Ok(existing_handle) => Ok(Some(existing_handle)),
Err(GetDatasetError::NotFound(_)) => Ok(None),
Err(GetDatasetError::Internal(e)) => Err(e),
},
Err(GetDatasetError::Internal(e)) => Err(e),
}?;

Expand Down
2 changes: 1 addition & 1 deletion src/infra/core/src/utils/ipfs_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::path::{Path, PathBuf};

use internal_error::{ErrorIntoInternal, InternalError, ResultIntoInternal};

#[derive(Default)]
#[derive(Default, Clone)]
pub struct IpfsClient {
ipfs_path: Option<PathBuf>,
default_allow_offline: bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,31 @@ async fn test_create_and_get_case_insensetive_dataset_multi_tenant() {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[tokio::test]
async fn test_create_multiple_datasets_with_same_id() {
let tempdir = tempfile::tempdir().unwrap();
let harness = LocalFsRepoHarness::create(&tempdir, false);

test_dataset_repository_shared::test_create_multiple_datasets_with_same_id(
harness.dataset_repo.as_ref(),
None,
)
.await;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[tokio::test]
async fn test_create_multiple_datasets_with_same_id_multi_tenant() {
let tempdir = tempfile::tempdir().unwrap();
let harness = LocalFsRepoHarness::create(&tempdir, true);

test_dataset_repository_shared::test_create_multiple_datasets_with_same_id(
harness.dataset_repo.as_ref(),
Some(DEFAULT_ACCOUNT_NAME.clone()),
)
.await;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
45 changes: 45 additions & 0 deletions src/infra/core/tests/tests/repos/test_dataset_repository_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,3 +669,48 @@ async fn check_expected_datasets(
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

pub async fn test_create_multiple_datasets_with_same_id<
TDatasetRepository: DatasetRepository + DatasetRepositoryWriter,
>(
repo: &TDatasetRepository,
account_name: Option<AccountName>,
) {
let dataset_alias = DatasetAlias::new(account_name.clone(), DatasetName::new_unchecked("foo"));

assert_matches!(
repo.find_dataset_by_ref(&dataset_alias.as_local_ref())
.await
.err()
.unwrap(),
GetDatasetError::NotFound(_)
);
let seed_block =
MetadataFactory::metadata_block(MetadataFactory::seed(DatasetKind::Root).build())
.build_typed();

let create_result = repo
.create_dataset(&dataset_alias, seed_block.clone())
.await
.unwrap();

assert_eq!(create_result.dataset_handle.alias, dataset_alias);

// We should see the dataset
assert!(repo
.find_dataset_by_ref(&dataset_alias.as_local_ref())
.await
.is_ok());

let dataset_alias = DatasetAlias::new(account_name, DatasetName::new_unchecked("bar"));

// Now test id collision with different alias
let create_result = repo.create_dataset(&dataset_alias, seed_block).await;

assert_matches!(
create_result.err(),
Some(CreateDatasetError::NameCollision(_))
);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Loading

0 comments on commit 6ac1496

Please sign in to comment.