Skip to content

Commit

Permalink
Implement GetInvocationId and CancelInvocation entries (#10)
Browse files Browse the repository at this point in the history
* Squashed 'service-protocol/' changes from 0d6b476..5129eca

5129eca Add idempotency_key to CallEntryMessage & OneWayCallEntryMessage. (#97)
65560bf Add CancelInvocation and GetCallInvocationId entries. (#96)

git-subtree-dir: service-protocol
git-subtree-split: 5129eca343a214665237cd69b5e6b5d1842d1cab

* Implement `sys_get_call_invocation_id` and `sys_cancel_invocation`

Refactor header macro

New protocol

* Add idempotency key

* Add version checks
  • Loading branch information
slinkydeveloper authored Oct 1, 2024
1 parent 5851db7 commit 40b2e10
Show file tree
Hide file tree
Showing 15 changed files with 636 additions and 241 deletions.
41 changes: 40 additions & 1 deletion service-protocol/dev/restate/service/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ enum ServiceProtocolVersion {
// Added
// * Entry retry mechanism: ErrorMessage.next_retry_delay, StartMessage.retry_count_since_last_stored_entry and StartMessage.duration_since_last_stored_entry
V2 = 2;
// Added
// * New entry to cancel invocations: CancelInvocationEntryMessage
// * New entry to retrieve the invocation id: GetCallInvocationIdEntryMessage
// * New field to set idempotency key for Call entries
V3 = 3;
}

// --- Core frames ---
Expand Down Expand Up @@ -313,6 +318,8 @@ message CallEntryMessage {
// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise.
string key = 5;

string idempotency_key = 6;

oneof result {
bytes value = 14;
Failure failure = 15;
Expand Down Expand Up @@ -342,6 +349,8 @@ message OneWayCallEntryMessage {
// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise.
string key = 6;

string idempotency_key = 7;

// Entry name
string name = 12;
}
Expand Down Expand Up @@ -383,13 +392,43 @@ message CompleteAwakeableEntryMessage {
message RunEntryMessage {
oneof result {
bytes value = 14;
dev.restate.service.protocol.Failure failure = 15;
Failure failure = 15;
};

// Entry name
string name = 12;
}

// Completable: No
// Fallible: Yes
// Type: 0x0C00 + 6
message CancelInvocationEntryMessage {
oneof target {
// Target invocation id to cancel
string invocation_id = 1;
// Target index of the call/one way call journal entry in this journal.
uint32 call_entry_index = 2;
}

// Entry name
string name = 12;
}

// Completable: Yes
// Fallible: Yes
// Type: 0x0C00 + 7
message GetCallInvocationIdEntryMessage {
// Index of the call/one way call journal entry in this journal.
uint32 call_entry_index = 1;

oneof result {
string value = 14;
Failure failure = 15;
};

string name = 12;
}

// --- Nested messages

// This failure object carries user visible errors,
Expand Down
38 changes: 20 additions & 18 deletions service-protocol/service-invocation-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,24 +330,26 @@ used for observability purposes by Restate observability tools.
The following tables describe the currently available journal entries. For more details, check the protobuf message
descriptions in [`protocol.proto`](dev/restate/service/protocol.proto).

| Message | Type | Completable | Fallible | Description |
| ------------------------------- | -------- | ----------- | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `InputEntryMessage` | `0x0400` | No | No | Carries the invocation input message(s) of the invocation. |
| `GetStateEntryMessage` | `0x0800` | Yes | No | Get the value of a service instance state key. |
| `GetStateKeysEntryMessage` | `0x0804` | Yes | No | Get all the known state keys for this service instance. Note: the completion value for this message is a protobuf of type `GetStateKeysEntryMessage.StateKeys`. |
| `SleepEntryMessage` | `0x0C00` | Yes | No | Initiate a timer that completes after the given time. |
| `CallEntryMessage` | `0x0C01` | Yes | Yes | Invoke another Restate service. |
| `AwakeableEntryMessage` | `0x0C03` | Yes | No | Arbitrary result container which can be completed from another service, given a specific id. See [Awakeable identifier](#awakeable-identifier) for more details. |
| `OneWayCallEntryMessage` | `0x0C02` | No | Yes | Invoke another Restate service at the given time, without waiting for the response. |
| `CompleteAwakeableEntryMessage` | `0x0C04` | No | Yes | Complete an `Awakeable`, given its id. See [Awakeable identifier](#awakeable-identifier) for more details. |
| `OutputEntryMessage` | `0x0401` | No | No | Carries the invocation output message(s) or terminal failure of the invocation. |
| `SetStateEntryMessage` | `0x0800` | No | No | Set the value of a service instance state key. |
| `ClearStateEntryMessage` | `0x0801` | No | No | Clear the value of a service instance state key. |
| `ClearAllStateEntryMessage` | `0x0802` | No | No | Clear all the values of the service instance state. |
| `RunEntryMessage` | `0x0C05` | No | No | Run non-deterministic user provided code and persist the result. |
| `GetPromiseEntryMessage` | `0x0808` | Yes | No | Get or wait the value of the given promise. If the value is not present yet, this entry will block waiting for the value. |
| `PeekPromiseEntryMessage` | `0x0809` | Yes | No | Get the value of the given promise. If the value is not present, this entry completes immediately with empty completion. |
| `CompletePromiseEntryMessage` | `0x080A` | Yes | No | Complete the given promise. If the promise was completed already, this entry completes with a failure. |
| Message | Type | Completable | Fallible | Description |
|-----------------------------------|----------|-------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `InputEntryMessage` | `0x0400` | No | No | Carries the invocation input message(s) of the invocation. |
| `GetStateEntryMessage` | `0x0800` | Yes | No | Get the value of a service instance state key. |
| `GetStateKeysEntryMessage` | `0x0804` | Yes | No | Get all the known state keys for this service instance. Note: the completion value for this message is a protobuf of type `GetStateKeysEntryMessage.StateKeys`. |
| `SleepEntryMessage` | `0x0C00` | Yes | No | Initiate a timer that completes after the given time. |
| `CallEntryMessage` | `0x0C01` | Yes | Yes | Invoke another Restate service. |
| `AwakeableEntryMessage` | `0x0C03` | Yes | No | Arbitrary result container which can be completed from another service, given a specific id. See [Awakeable identifier](#awakeable-identifier) for more details. |
| `OneWayCallEntryMessage` | `0x0C02` | No | Yes | Invoke another Restate service at the given time, without waiting for the response. |
| `CompleteAwakeableEntryMessage` | `0x0C04` | No | Yes | Complete an `Awakeable`, given its id. See [Awakeable identifier](#awakeable-identifier) for more details. |
| `OutputEntryMessage` | `0x0401` | No | No | Carries the invocation output message(s) or terminal failure of the invocation. |
| `SetStateEntryMessage` | `0x0800` | No | No | Set the value of a service instance state key. |
| `ClearStateEntryMessage` | `0x0801` | No | No | Clear the value of a service instance state key. |
| `ClearAllStateEntryMessage` | `0x0802` | No | No | Clear all the values of the service instance state. |
| `RunEntryMessage` | `0x0C05` | No | No | Run non-deterministic user provided code and persist the result. |
| `GetPromiseEntryMessage` | `0x0808` | Yes | No | Get or wait the value of the given promise. If the value is not present yet, this entry will block waiting for the value. |
| `PeekPromiseEntryMessage` | `0x0809` | Yes | No | Get the value of the given promise. If the value is not present, this entry completes immediately with empty completion. |
| `CompletePromiseEntryMessage` | `0x080A` | Yes | No | Complete the given promise. If the promise was completed already, this entry completes with a failure. |
| `CancelInvocationEntryMessage` | `0x0C06` | No | Yes | Cancel the target invocation id or the target journal entry. |
| `GetCallInvocationIdEntryMessage` | `0x0C07` | Yes | Yes | Get the invocation id of a previously created call/one way call. |

#### Awakeable identifier

Expand Down
40 changes: 39 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub struct Target {
pub service: String,
pub handler: String,
pub key: Option<String>,
pub idempotency_key: Option<String>,
}

#[derive(Debug, Hash, Clone, Copy, Eq, PartialEq)]
Expand All @@ -139,6 +140,21 @@ impl From<AsyncResultHandle> for u32 {
}
}

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct SendHandle(u32);

impl From<u32> for SendHandle {
fn from(value: u32) -> Self {
SendHandle(value)
}
}

impl From<SendHandle> for u32 {
fn from(value: SendHandle) -> Self {
value.0
}
}

#[derive(Debug, Eq, PartialEq)]
pub enum Value {
/// a void/None/undefined success
Expand All @@ -147,6 +163,8 @@ pub enum Value {
Failure(TerminalFailure),
/// Only returned for get_state_keys
StateKeys(Vec<String>),
/// Only returned for get_call_invocation_id
InvocationId(String),
CombinatorResult(Vec<AsyncResultHandle>),
}

Expand Down Expand Up @@ -196,6 +214,19 @@ impl From<NonEmptyValue> for Value {
}
}

#[derive(Debug, Eq, PartialEq)]
pub enum GetInvocationIdTarget {
CallEntry(AsyncResultHandle),
SendEntry(SendHandle),
}

#[derive(Debug, Eq, PartialEq)]
pub enum CancelInvocationTarget {
InvocationId(String),
CallEntry(AsyncResultHandle),
SendEntry(SendHandle),
}

#[derive(Debug, Eq, PartialEq)]
pub enum TakeOutputResult {
Buffer(Bytes),
Expand Down Expand Up @@ -274,7 +305,7 @@ pub trait VM: Sized {
target: Target,
input: Bytes,
execution_time_since_unix_epoch: Option<Duration>,
) -> VMResult<()>;
) -> VMResult<SendHandle>;

fn sys_awakeable(&mut self) -> VMResult<(String, AsyncResultHandle)>;

Expand All @@ -298,6 +329,13 @@ pub trait VM: Sized {
retry_policy: RetryPolicy,
) -> VMResult<AsyncResultHandle>;

fn sys_get_call_invocation_id(
&mut self,
call: GetInvocationIdTarget,
) -> VMResult<AsyncResultHandle>;

fn sys_cancel_invocation(&mut self, target: CancelInvocationTarget) -> VMResult<()>;

fn sys_write_output(&mut self, value: NonEmptyValue) -> VMResult<()>;

fn sys_end(&mut self) -> VMResult<()>;
Expand Down
57 changes: 57 additions & 0 deletions src/service_protocol/generated/dev.restate.service.protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ pub struct CallEntryMessage {
/// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise.
#[prost(string, tag = "5")]
pub key: ::prost::alloc::string::String,
#[prost(string, tag = "6")]
pub idempotency_key: ::prost::alloc::string::String,
/// Entry name
#[prost(string, tag = "12")]
pub name: ::prost::alloc::string::String,
Expand Down Expand Up @@ -399,6 +401,8 @@ pub struct OneWayCallEntryMessage {
/// If this invocation has a key associated (e.g. for objects and workflows), then this key is filled in. Empty otherwise.
#[prost(string, tag = "6")]
pub key: ::prost::alloc::string::String,
#[prost(string, tag = "7")]
pub idempotency_key: ::prost::alloc::string::String,
/// Entry name
#[prost(string, tag = "12")]
pub name: ::prost::alloc::string::String,
Expand Down Expand Up @@ -471,6 +475,52 @@ pub mod run_entry_message {
Failure(super::Failure),
}
}
/// Completable: No
/// Fallible: Yes
/// Type: 0x0C00 + 6
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CancelInvocationEntryMessage {
/// Entry name
#[prost(string, tag = "12")]
pub name: ::prost::alloc::string::String,
#[prost(oneof = "cancel_invocation_entry_message::Target", tags = "1, 2")]
pub target: ::core::option::Option<cancel_invocation_entry_message::Target>,
}
/// Nested message and enum types in `CancelInvocationEntryMessage`.
pub mod cancel_invocation_entry_message {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Target {
/// Target invocation id to cancel
#[prost(string, tag = "1")]
InvocationId(::prost::alloc::string::String),
/// Target index of the call/one way call journal entry in this journal.
#[prost(uint32, tag = "2")]
CallEntryIndex(u32),
}
}
/// Completable: Yes
/// Fallible: Yes
/// Type: 0x0C00 + 7
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetCallInvocationIdEntryMessage {
/// Index of the call/one way call journal entry in this journal.
#[prost(uint32, tag = "1")]
pub call_entry_index: u32,
#[prost(string, tag = "12")]
pub name: ::prost::alloc::string::String,
#[prost(oneof = "get_call_invocation_id_entry_message::Result", tags = "14, 15")]
pub result: ::core::option::Option<get_call_invocation_id_entry_message::Result>,
}
/// Nested message and enum types in `GetCallInvocationIdEntryMessage`.
pub mod get_call_invocation_id_entry_message {
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Result {
#[prost(string, tag = "14")]
Value(::prost::alloc::string::String),
#[prost(message, tag = "15")]
Failure(super::Failure),
}
}
/// This failure object carries user visible errors,
/// e.g. invocation failure return value or failure result of an InvokeEntryMessage.
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -501,6 +551,11 @@ pub enum ServiceProtocolVersion {
/// Added
/// * Entry retry mechanism: ErrorMessage.next_retry_delay, StartMessage.retry_count_since_last_stored_entry and StartMessage.duration_since_last_stored_entry
V2 = 2,
/// Added
/// * New entry to cancel invocations: CancelInvocationEntryMessage
/// * New entry to retrieve the invocation id: GetCallInvocationIdEntryMessage
/// * New field to set idempotency key for Call entries
V3 = 3,
}
impl ServiceProtocolVersion {
/// String value of the enum field names used in the ProtoBuf definition.
Expand All @@ -512,6 +567,7 @@ impl ServiceProtocolVersion {
Self::Unspecified => "SERVICE_PROTOCOL_VERSION_UNSPECIFIED",
Self::V1 => "V1",
Self::V2 => "V2",
Self::V3 => "V3",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
Expand All @@ -520,6 +576,7 @@ impl ServiceProtocolVersion {
"SERVICE_PROTOCOL_VERSION_UNSPECIFIED" => Some(Self::Unspecified),
"V1" => Some(Self::V1),
"V2" => Some(Self::V2),
"V3" => Some(Self::V3),
_ => None,
}
}
Expand Down
Loading

0 comments on commit 40b2e10

Please sign in to comment.