Skip to content

Commit

Permalink
Side effect retry
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Aug 28, 2024
1 parent c803edf commit 6b6a217
Show file tree
Hide file tree
Showing 22 changed files with 886 additions and 170 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ This library follows [Semantic Versioning](https://semver.org/).

The compatibility with Restate is described in the following table:

| Restate Server\sdk-shared-core | 0.0/0.1 |
|--------------------------------|---------|
| 1.0 ||
| Restate Server\sdk-shared-core | 0.0.x | 0.1.x |
|--------------------------------|-------|-------|
| 1.0 |||
| 1.1 |||

## Development

Expand Down
55 changes: 48 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
mod headers;
mod request_identity;
mod retries;
mod service_protocol;
mod vm;

use bytes::Bytes;
use std::borrow::Cow;
use std::time::Duration;

pub use crate::retries::RetryPolicy;
pub use headers::HeaderMap;
pub use request_identity::*;
pub use vm::CoreVM;
Expand All @@ -29,9 +32,9 @@ pub struct SuspendedError;
#[derive(Debug, Clone, thiserror::Error)]
#[error("VM Error [{code}]: {message}. Description: {description}")]
pub struct VMError {
pub code: u16,
pub message: Cow<'static, str>,
pub description: Cow<'static, str>,
code: u16,
message: Cow<'static, str>,
description: Cow<'static, str>,
}

impl VMError {
Expand All @@ -42,6 +45,18 @@ impl VMError {
description: Default::default(),
}
}

pub fn code(&self) -> u16 {
self.code
}

pub fn message(&self) -> &str {
&self.message
}

pub fn description(&self) -> &str {
&self.description
}
}

#[derive(Debug, Clone, thiserror::Error)]
Expand Down Expand Up @@ -100,10 +115,28 @@ pub struct Failure {
pub message: String,
}

#[derive(Debug, Default)]
pub struct EntryRetryInfo {
/// Number of retries that happened so far for this entry.
pub retry_count: u32,
/// Time spent in the current retry loop.
pub retry_loop_duration: Duration,
}

#[derive(Debug)]
pub enum RunEnterResult {
Executed(NonEmptyValue),
NotExecuted,
NotExecuted(EntryRetryInfo),
}

#[derive(Debug, Clone)]
pub enum RunExitResult {
Success(Bytes),
TerminalFailure(Failure),
RetryableFailure {
attempt_duration: Duration,
failure: Failure,
},
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -142,7 +175,12 @@ pub trait VM: Sized {

// --- Errors

fn notify_error(&mut self, message: Cow<'static, str>, description: Cow<'static, str>);
fn notify_error(
&mut self,
message: Cow<'static, str>,
description: Cow<'static, str>,
next_retry_delay: Option<Duration>,
);

// --- Output stream

Expand Down Expand Up @@ -198,7 +236,11 @@ pub trait VM: Sized {

fn sys_run_enter(&mut self, name: String) -> VMResult<RunEnterResult>;

fn sys_run_exit(&mut self, value: NonEmptyValue) -> VMResult<AsyncResultHandle>;
fn sys_run_exit(
&mut self,
value: RunExitResult,
retry_policy: RetryPolicy,
) -> VMResult<AsyncResultHandle>;

fn sys_write_output(&mut self, value: NonEmptyValue) -> VMResult<()>;

Expand Down Expand Up @@ -246,6 +288,5 @@ pub trait VM: Sized {
// }
// io.close()

mod headers;
#[cfg(test)]
mod tests;
197 changes: 197 additions & 0 deletions src/retries.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
use crate::EntryRetryInfo;
use std::cmp;
use std::time::Duration;

/// This struct represents the policy to execute retries.
#[derive(Debug, Clone, Default)]
pub enum RetryPolicy {
/// # Infinite
///
/// Infinite retry strategy.
#[default]
Infinite,
/// # None
///
/// No retry strategy, fail on first failure.
None,
/// # Fixed delay
///
/// Retry with a fixed delay strategy.
FixedDelay {
/// # Interval
///
/// Interval between retries.
interval: Duration,

/// # Max attempts
///
/// Gives up retrying when either this number of attempts is reached,
/// or `max_duration` (if set) is reached first.
/// Infinite retries if this field and `max_duration` are unset.
max_attempts: Option<u32>,

/// # Max duration
///
/// Gives up retrying when either the retry loop lasted for this given max duration,
/// or `max_attempts` (if set) is reached first.
/// Infinite retries if this field and `max_attempts` are unset.
max_duration: Option<Duration>,
},
/// # Exponential
///
/// Retry with an exponential strategy. The next retry is computed as `min(last_retry_interval * factor, max_interval)`.
Exponential {
/// # Initial Interval
///
/// Initial interval for the first retry attempt.
initial_interval: Duration,

/// # Factor
///
/// The factor to use to compute the next retry attempt. This value should be higher than 1.0
factor: f32,

/// # Max interval
///
/// Maximum interval between retries.
max_interval: Option<Duration>,

/// # Max attempts
///
/// Gives up retrying when either this number of attempts is reached,
/// or `max_duration` (if set) is reached first.
/// Infinite retries if this field and `max_duration` are unset.
max_attempts: Option<u32>,

/// # Max duration
///
/// Gives up retrying when either the retry loop lasted for this given max duration,
/// or `max_attempts` (if set) is reached first.
/// Infinite retries if this field and `max_attempts` are unset.
max_duration: Option<Duration>,
},
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) enum NextRetry {
Retry(Option<Duration>),
DoNotRetry,
}

impl RetryPolicy {
pub fn fixed_delay(
interval: Duration,
max_attempts: Option<u32>,
max_duration: Option<Duration>,
) -> Self {
Self::FixedDelay {
interval,
max_attempts,
max_duration,
}
}

pub fn exponential(
initial_interval: Duration,
factor: f32,
max_attempts: Option<u32>,
max_interval: Option<Duration>,
max_duration: Option<Duration>,
) -> Self {
Self::Exponential {
initial_interval,
factor,
max_attempts,
max_interval,
max_duration,
}
}

pub(crate) fn next_retry(&self, retry_info: EntryRetryInfo) -> NextRetry {
match self {
RetryPolicy::Infinite => NextRetry::Retry(None),
RetryPolicy::None => NextRetry::DoNotRetry,
RetryPolicy::FixedDelay {
interval,
max_attempts,
max_duration,
} => {
if max_attempts.is_some_and(|max_attempts| max_attempts <= retry_info.retry_count)
|| max_duration
.is_some_and(|max_duration| max_duration <= retry_info.retry_loop_duration)
{
// Reached either max_attempts or max_duration bound
return NextRetry::DoNotRetry;
}

// No bound reached, we need to retry
NextRetry::Retry(Some(*interval))
}
RetryPolicy::Exponential {
initial_interval,
factor,
max_interval,
max_attempts,
max_duration,
} => {
if max_attempts.is_some_and(|max_attempts| max_attempts <= retry_info.retry_count)
|| max_duration
.is_some_and(|max_duration| max_duration <= retry_info.retry_loop_duration)
{
// Reached either max_attempts or max_duration bound
return NextRetry::DoNotRetry;
}

NextRetry::Retry(Some(cmp::min(
max_interval.unwrap_or(Duration::MAX),
initial_interval.mul_f32(factor.powi((retry_info.retry_count - 1) as i32)),
)))
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_exponential_policy() {
let policy = RetryPolicy::Exponential {
initial_interval: Duration::from_millis(100),
factor: 2.0,
max_interval: Some(Duration::from_millis(500)),
max_attempts: None,
max_duration: Some(Duration::from_secs(10)),
};

assert_eq!(
policy.next_retry(EntryRetryInfo {
retry_count: 2,
retry_loop_duration: Duration::from_secs(1)
}),
NextRetry::Retry(Some(Duration::from_millis(100).mul_f32(2.0)))
);
assert_eq!(
policy.next_retry(EntryRetryInfo {
retry_count: 3,
retry_loop_duration: Duration::from_secs(1)
}),
NextRetry::Retry(Some(Duration::from_millis(100).mul_f32(4.0)))
);
assert_eq!(
policy.next_retry(EntryRetryInfo {
retry_count: 4,
retry_loop_duration: Duration::from_secs(1)
}),
NextRetry::Retry(Some(Duration::from_millis(500)))
);
assert_eq!(
policy.next_retry(EntryRetryInfo {
retry_count: 4,
retry_loop_duration: Duration::from_secs(10)
}),
NextRetry::DoNotRetry
);
}
}
20 changes: 12 additions & 8 deletions src/service_protocol/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ impl Encoder {
pub fn new(service_protocol_version: Version) -> Self {
assert_eq!(
service_protocol_version,
Version::V1,
"Encoder only supports service protocol version V1"
Version::latest(),
"Encoder only supports service protocol version {:?}",
Version::latest()
);
Self {}
}
Expand Down Expand Up @@ -106,8 +107,9 @@ impl Decoder {
pub fn new(service_protocol_version: Version) -> Self {
assert_eq!(
service_protocol_version,
Version::V1,
"Decoder only supports service protocol version V1"
Version::latest(),
"Decoder only supports service protocol version {:?}",
Version::latest()
);
Self {
buf: SegmentedBuf::new(),
Expand Down Expand Up @@ -183,8 +185,8 @@ mod tests {

#[test]
fn fill_decoder_with_several_messages() {
let encoder = Encoder::new(Version::V1);
let mut decoder = Decoder::new(Version::V1);
let encoder = Encoder::new(Version::latest());
let mut decoder = Decoder::new(Version::latest());

let expected_msg_0 = messages::StartMessage {
id: "key".into(),
Expand All @@ -193,6 +195,8 @@ mod tests {
state_map: vec![],
partial_state: true,
key: "key".to_string(),
retry_count_since_last_stored_entry: 0,
duration_since_last_stored_entry: 0,
};

let expected_msg_1 = messages::InputEntryMessage {
Expand Down Expand Up @@ -256,8 +260,8 @@ mod tests {
}

fn partial_decoding_test(split_index: usize) {
let encoder = Encoder::new(Version::V1);
let mut decoder = Decoder::new(Version::V1);
let encoder = Encoder::new(Version::latest());
let mut decoder = Decoder::new(Version::latest());

let expected_msg = messages::InputEntryMessage {
value: Bytes::from_static("input".as_bytes()),
Expand Down
Loading

0 comments on commit 6b6a217

Please sign in to comment.