Skip to content

Commit

Permalink
Separated watermarks setting from PullService
Browse files Browse the repository at this point in the history
  • Loading branch information
zaychenko-sergei committed Oct 17, 2024
1 parent ae14733 commit 54522c4
Show file tree
Hide file tree
Showing 15 changed files with 383 additions and 216 deletions.
4 changes: 2 additions & 2 deletions src/adapter/graphql/src/mutations/dataset_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ impl DatasetMut {
.execute(&self.dataset_handle, watermark)
.await
{
Ok(domain::PullResult::UpToDate(_)) => {
Ok(domain::SetWatermarkResult::UpToDate) => {
Ok(SetWatermarkResult::UpToDate(SetWatermarkUpToDate {
_dummy: String::new(),
}))
}
Ok(domain::PullResult::Updated { new_head, .. }) => {
Ok(domain::SetWatermarkResult::Updated { new_head, .. }) => {
Ok(SetWatermarkResult::Updated(SetWatermarkUpdated {
new_head: new_head.into(),
}))
Expand Down
2 changes: 2 additions & 0 deletions src/app/cli/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ pub fn configure_base_catalog(

b.add::<PushServiceImpl>();

b.add::<WatermarkServiceImpl>();

b.add::<ResetServiceImpl>();

b.add::<ProvenanceServiceImpl>();
Expand Down
4 changes: 2 additions & 2 deletions src/app/cli/src/commands/set_watermark_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ impl Command for SetWatermarkCommand {
.execute(&dataset_handle, watermark.into())
.await
{
Ok(PullResult::UpToDate(_)) => {
Ok(SetWatermarkResult::UpToDate) => {
eprintln!("{}", console::style("Watermark was up-to-date").yellow());
Ok(())
}
Ok(PullResult::Updated { new_head, .. }) => {
Ok(SetWatermarkResult::Updated { new_head, .. }) => {
eprintln!(
"{}",
console::style(format!(
Expand Down
2 changes: 2 additions & 0 deletions src/domain/core/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod server_url_config;
pub mod sync_service;
pub mod transform_service;
pub mod verification_service;
pub mod watermark_service;

pub use compaction_service::*;
pub use dataset_changes_service::*;
Expand All @@ -55,3 +56,4 @@ pub use server_url_config::*;
pub use sync_service::*;
pub use transform_service::*;
pub use verification_service::*;
pub use watermark_service::*;
9 changes: 0 additions & 9 deletions src/domain/core/src/services/pull_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use std::sync::Arc;

use ::serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc};
use internal_error::InternalError;
use opendatafabric::*;
use thiserror::Error;
Expand Down Expand Up @@ -51,13 +50,6 @@ pub trait PullService: Send + Sync {
options: PullMultiOptions,
listener: Option<Arc<dyn PullMultiListener>>,
) -> Result<Vec<PullResponse>, InternalError>;

/// Manually advances the watermark of a root dataset
async fn set_watermark(
&self,
dataset: Arc<dyn Dataset>,
watermark: DateTime<Utc>,
) -> Result<PullResult, SetWatermarkError>;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -186,7 +178,6 @@ pub enum PullResultUpToDate {
PushIngest(PushInsgestResultUpToDate),
Transform,
Sync,
SetWatermark,
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
Expand Down
41 changes: 41 additions & 0 deletions src/domain/core/src/services/watermark_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright Kamu Data, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use chrono::{DateTime, Utc};
use opendatafabric::Multihash;

use super::SetWatermarkError;
use crate::Dataset;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[async_trait::async_trait]
pub trait WatermarkService: Send + Sync {
/// Manually advances the watermark of a root dataset
async fn set_watermark(
&self,
dataset: Arc<dyn Dataset>,
new_watermark: DateTime<Utc>,
) -> Result<SetWatermarkResult, SetWatermarkError>;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Debug)]
pub enum SetWatermarkResult {
UpToDate,
Updated {
old_head: Option<Multihash>,
new_head: Multihash,
},
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
4 changes: 2 additions & 2 deletions src/domain/core/src/use_cases/set_watermark_use_case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use chrono::{DateTime, Utc};
use opendatafabric::DatasetHandle;

use crate::{PullResult, SetWatermarkError};
use crate::{SetWatermarkError, SetWatermarkResult};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Expand All @@ -20,7 +20,7 @@ pub trait SetWatermarkUseCase: Send + Sync {
&self,
dataset_handle: &DatasetHandle,
new_watermark: DateTime<Utc>,
) -> Result<PullResult, SetWatermarkError>;
) -> Result<SetWatermarkResult, SetWatermarkError>;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,7 @@ impl From<ts::TaskResult> for FlowResult {
ts::TaskResult::UpdateDatasetResult(task_update_result) => {
match task_update_result.pull_result {
PullResult::UpToDate(up_to_date_result) => match up_to_date_result {
PullResultUpToDate::Sync
| PullResultUpToDate::Transform
| PullResultUpToDate::SetWatermark => Self::Empty,
PullResultUpToDate::Sync | PullResultUpToDate::Transform => Self::Empty,
PullResultUpToDate::PollingIngest(result) => Self::DatasetUpdate(
FlowResultDatasetUpdate::UpToDate(FlowResultDatasetUpdateUpToDate {
uncacheable: result.uncacheable,
Expand Down
2 changes: 2 additions & 0 deletions src/infra/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mod sync_request_builder;
mod sync_service_impl;
mod transform_service_impl;
mod verification_service_impl;
mod watermark_service_impl;

pub use compaction_service_impl::*;
pub use dataset_changes_service_impl::*;
Expand Down Expand Up @@ -76,3 +77,4 @@ pub use sync_service_impl::*;
pub use transform_service_impl::*;
pub use use_cases::*;
pub use verification_service_impl::*;
pub use watermark_service_impl::*;
84 changes: 10 additions & 74 deletions src/infra/core/src/pull_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@ use std::cmp::Ordering;
use std::collections::HashMap;
use std::sync::Arc;

use chrono::prelude::*;
use dill::*;
use internal_error::{ErrorIntoInternal, InternalError, ResultIntoInternal};
use internal_error::{InternalError, ResultIntoInternal};
use kamu_accounts::CurrentAccountSubject;
use kamu_core::*;
use kamu_ingest_datafusion::DataWriterDataFusion;
use opendatafabric::*;
use time_source::SystemTimeSource;
use url::Url;

pub struct PullServiceImpl {
Expand All @@ -27,7 +24,6 @@ pub struct PullServiceImpl {
ingest_svc: Arc<dyn PollingIngestService>,
transform_svc: Arc<dyn TransformService>,
sync_svc: Arc<dyn SyncService>,
system_time_source: Arc<dyn SystemTimeSource>,
current_account_subject: Arc<CurrentAccountSubject>,
}

Expand All @@ -40,7 +36,6 @@ impl PullServiceImpl {
ingest_svc: Arc<dyn PollingIngestService>,
transform_svc: Arc<dyn TransformService>,
sync_svc: Arc<dyn SyncService>,
system_time_source: Arc<dyn SystemTimeSource>,
current_account_subject: Arc<CurrentAccountSubject>,
) -> Self {
Self {
Expand All @@ -49,7 +44,6 @@ impl PullServiceImpl {
ingest_svc,
transform_svc,
sync_svc,
system_time_source,
current_account_subject,
}
}
Expand Down Expand Up @@ -98,18 +92,18 @@ impl PullServiceImpl {
tracing::debug!(?request, "Entering node");

// Resolve local dataset if it exists
let local_handle = if let Some(local_ref) = &request.local_ref {
let local_handle = self
let maybe_local_handle = if let Some(local_ref) = &request.local_ref {
let maybe_local_handle = self
.dataset_repo
.try_resolve_dataset_handle_by_ref(local_ref)
.await?;
if local_handle.is_none() && request.remote_ref.is_none() {
if maybe_local_handle.is_none() && request.remote_ref.is_none() {
// Dataset does not exist locally nor remote ref was provided
return Err(PullError::NotFound(DatasetNotFoundError {
dataset_ref: local_ref.clone(),
}));
}
local_handle
maybe_local_handle
} else if let Some(remote_ref) = &request.remote_ref {
self.try_inverse_lookup_dataset_by_pull_alias(remote_ref)
.await?
Expand All @@ -120,7 +114,7 @@ impl PullServiceImpl {
// Resolve the name of a local dataset if it exists
// or a name to create dataset with if syncing from remote and creation is
// allowed
let local_alias = if let Some(hdl) = &local_handle {
let local_alias = if let Some(hdl) = &maybe_local_handle {
// Target exists
hdl.alias.clone()
} else if let Some(local_ref) = &request.local_ref {
Expand Down Expand Up @@ -160,7 +154,7 @@ impl PullServiceImpl {
}
};

if local_handle.is_none() && !options.sync_options.create_if_not_exists {
if maybe_local_handle.is_none() && !options.sync_options.create_if_not_exists {
return Err(PullError::InvalidOperation(
"Dataset does not exist and auto-create is switched off".to_owned(),
));
Expand All @@ -178,7 +172,7 @@ impl PullServiceImpl {
// Resolve remote alias, if any
let remote_ref = if let Some(remote_ref) = &request.remote_ref {
Ok(Some(remote_ref.clone()))
} else if let Some(hdl) = &local_handle {
} else if let Some(hdl) = &maybe_local_handle {
self.resolve_pull_alias(hdl).await
} else {
Ok(None)
Expand All @@ -189,14 +183,14 @@ impl PullServiceImpl {
PullItem {
original_request: None, // May be set below
depth: 0,
local_ref: local_handle
local_ref: maybe_local_handle
.map(Into::into)
.unwrap_or(local_alias.clone().into()),
remote_ref,
}
} else {
// Pulling an existing local root or derivative dataset
let local_handle = local_handle.unwrap();
let local_handle = maybe_local_handle.unwrap();

let summary = self
.dataset_repo
Expand Down Expand Up @@ -640,64 +634,6 @@ impl PullService for PullServiceImpl {

Ok(results)
}

async fn set_watermark(
&self,
dataset: Arc<dyn Dataset>,
new_watermark: DateTime<Utc>,
) -> Result<PullResult, SetWatermarkError> {
let aliases = match self
.remote_alias_reg
.get_remote_aliases(dataset.clone())
.await
{
Ok(v) => Ok(v),
Err(GetAliasesError::Internal(e)) => Err(SetWatermarkError::Internal(e)),
}?;

if !aliases.is_empty(RemoteAliasKind::Pull) {
return Err(SetWatermarkError::IsRemote);
}

let summary = dataset
.get_summary(GetSummaryOpts::default())
.await
.int_err()?;

if summary.kind != DatasetKind::Root {
return Err(SetWatermarkError::IsDerivative);
}

let mut writer =
DataWriterDataFusion::builder(dataset, datafusion::prelude::SessionContext::new())
.with_metadata_state_scanned(None)
.await
.int_err()?
.build();

match writer
.write_watermark(
new_watermark,
WriteWatermarkOpts {
system_time: self.system_time_source.now(),
new_source_state: None,
},
)
.await
{
Ok(res) => Ok(PullResult::Updated {
old_head: Some(res.old_head),
new_head: res.new_head,
}),
Err(
WriteWatermarkError::EmptyCommit(_)
| WriteWatermarkError::CommitError(CommitError::MetadataAppendError(
AppendError::InvalidBlock(AppendValidationError::WatermarkIsNotMonotonic),
)),
) => Ok(PullResult::UpToDate(PullResultUpToDate::SetWatermark)),
Err(e) => Err(e.int_err().into()),
}
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
18 changes: 12 additions & 6 deletions src/infra/core/src/use_cases/set_watermark_use_case_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,33 @@ use std::sync::Arc;
use chrono::{DateTime, Utc};
use dill::{component, interface};
use kamu_core::auth::{DatasetAction, DatasetActionAuthorizer};
use kamu_core::{DatasetRegistry, PullResult, PullService, SetWatermarkError, SetWatermarkUseCase};
use kamu_core::{
DatasetRegistry,
SetWatermarkError,
SetWatermarkResult,
SetWatermarkUseCase,
WatermarkService,
};
use opendatafabric::DatasetHandle;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[component(pub)]
#[interface(dyn SetWatermarkUseCase)]
pub struct SetWatermarkUseCaseImpl {
pull_service: Arc<dyn PullService>,
watermark_service: Arc<dyn WatermarkService>,
dataset_registry: Arc<dyn DatasetRegistry>,
dataset_action_authorizer: Arc<dyn DatasetActionAuthorizer>,
}

impl SetWatermarkUseCaseImpl {
pub fn new(
pull_service: Arc<dyn PullService>,
watermark_service: Arc<dyn WatermarkService>,
dataset_registry: Arc<dyn DatasetRegistry>,
dataset_action_authorizer: Arc<dyn DatasetActionAuthorizer>,
) -> Self {
Self {
pull_service,
watermark_service,
dataset_registry,
dataset_action_authorizer,
}
Expand All @@ -45,7 +51,7 @@ impl SetWatermarkUseCase for SetWatermarkUseCaseImpl {
&self,
dataset_handle: &DatasetHandle,
new_watermark: DateTime<Utc>,
) -> Result<PullResult, SetWatermarkError> {
) -> Result<SetWatermarkResult, SetWatermarkError> {
// Permission check
self.dataset_action_authorizer
.check_action_allowed(dataset_handle, DatasetAction::Write)
Expand All @@ -55,7 +61,7 @@ impl SetWatermarkUseCase for SetWatermarkUseCaseImpl {
let dataset = self.dataset_registry.get_dataset_by_handle(dataset_handle);

// Actual action
self.pull_service
self.watermark_service
.set_watermark(dataset, new_watermark)
.await
}
Expand Down
Loading

0 comments on commit 54522c4

Please sign in to comment.