Skip to content

Commit

Permalink
A bunch of additional tests + Fixed an issue with the state machine h…
Browse files Browse the repository at this point in the history
…anging in case the input is closed, but we're still waiting for entries to replay
  • Loading branch information
slinkydeveloper committed Jul 17, 2024
1 parent 456ca9c commit 747cd01
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 3 deletions.
96 changes: 96 additions & 0 deletions src/tests/async_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,102 @@ fn greeter_target() -> Target {
}
}

mod notify_await_point {
use super::*;

use test_log::test;

#[test]
fn await_twice_the_same_handle() {
let mut output = VMTestCase::new(Version::V1)
.input(StartMessage {
id: Bytes::from_static(b"123"),
debug_id: "123".to_string(),
known_entries: 1,
state_map: vec![],
partial_state: false,
key: "".to_string(),
})
.input(InputEntryMessage {
headers: vec![],
value: Bytes::from_static(b"my-data"),
..InputEntryMessage::default()
})
.run_without_closing_input(|vm, _| {
vm.sys_input().unwrap();

let (_, h) = vm.sys_awakeable().unwrap();

vm.notify_await_point(h);
vm.notify_await_point(h);

vm.notify_input_closed();
});

assert_eq!(
output.next_decoded::<AwakeableEntryMessage>().unwrap(),
AwakeableEntryMessage::default()
);
assert_eq!(
output.next_decoded::<SuspensionMessage>().unwrap(),
SuspensionMessage {
entry_indexes: vec![1],
}
);
assert_eq!(output.next(), None);
}

#[test]
fn await_two_handles_at_same_time() {
let mut output = VMTestCase::new(Version::V1)
.input(StartMessage {
id: Bytes::from_static(b"123"),
debug_id: "123".to_string(),
known_entries: 1,
state_map: vec![],
partial_state: false,
key: "".to_string(),
})
.input(InputEntryMessage {
headers: vec![],
value: Bytes::from_static(b"my-data"),
..InputEntryMessage::default()
})
.run_without_closing_input(|vm, _| {
vm.sys_input().unwrap();

let (_, h1) = vm.sys_awakeable().unwrap();
let (_, h2) = vm.sys_awakeable().unwrap();

vm.notify_await_point(h1);
// This should transition the state machine to error
vm.notify_await_point(h2);

vm.notify_input_closed();
});

assert_eq!(
output.next_decoded::<AwakeableEntryMessage>().unwrap(),
AwakeableEntryMessage::default()
);
assert_eq!(
output.next_decoded::<AwakeableEntryMessage>().unwrap(),
AwakeableEntryMessage::default()
);
assert_that!(
output.next_decoded::<ErrorMessage>().unwrap(),
error_message_as_vm_error(
vm::errors::AwaitingTwoAsyncResultError {
previous: 1,
current: 2,
}
.into()
)
);
assert_eq!(output.next(), None);
}
}

mod reverse_await_order {
use super::*;

Expand Down
125 changes: 125 additions & 0 deletions src/tests/failures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use super::*;

use crate::service_protocol::messages::{
ErrorMessage, GetStateEntryMessage, InputEntryMessage, OneWayCallEntryMessage, StartMessage,
};
use std::fmt;
use test_log::test;

#[test]
fn got_closed_stream_before_end_of_replay() {
let mut vm = CoreVM::mock_init(Version::V1);
let encoder = Encoder::new(Version::V1);

vm.notify_input(
encoder
.encode(&StartMessage {
id: Bytes::from_static(b"123"),
debug_id: "123".to_string(),
// 2 expected entries!
known_entries: 2,
..Default::default()
})
.to_vec(),
);
vm.notify_input(encoder.encode(&InputEntryMessage::default()).to_vec());

// Now notify input closed
vm.notify_input_closed();

// Try to check if input is ready, this should fail
assert_that!(
vm.is_ready_to_execute(),
err(eq_vm_error(vm::errors::INPUT_CLOSED_WHILE_WAITING_ENTRIES))
);

let mut output = OutputIterator::collect_vm(&mut vm);
assert_that!(
output.next_decoded::<ErrorMessage>().unwrap(),
error_message_as_vm_error(vm::errors::INPUT_CLOSED_WHILE_WAITING_ENTRIES)
);
assert_eq!(output.next(), None);
}

#[test]
fn get_state_entry_mismatch() {
test_entry_mismatch(
GetStateEntryMessage {
key: Bytes::from_static(b"my-key"),
..Default::default()
},
GetStateEntryMessage {
key: Bytes::from_static(b"another-key"),
..Default::default()
},
|vm| vm.sys_get_state("another-key".to_owned()),
);
}

#[test]
fn one_way_call_entry_mismatch() {
test_entry_mismatch(
OneWayCallEntryMessage {
service_name: "greeter".to_owned(),
handler_name: "greet".to_owned(),
key: "my-key".to_owned(),
parameter: Bytes::from_static(b"123"),
..Default::default()
},
OneWayCallEntryMessage {
service_name: "greeter".to_owned(),
handler_name: "greet".to_owned(),
key: "my-key".to_owned(),
parameter: Bytes::from_static(b"456"),
..Default::default()
},
|vm| {
vm.sys_send(
Target {
service: "greeter".to_owned(),
handler: "greet".to_owned(),
key: Some("my-key".to_owned()),
},
b"456".to_vec(),
None,
)
},
);
}

fn test_entry_mismatch<M: WriteableRestateMessage + Clone, T: fmt::Debug>(
expected: M,
actual: M,
user_code: impl FnOnce(&mut CoreVM) -> Result<T, VMError>,
) {
let mut output = VMTestCase::new(Version::V1)
.input(StartMessage {
id: Bytes::from_static(b"123"),
debug_id: "123".to_string(),
known_entries: 2,
partial_state: true,
..Default::default()
})
.input(InputEntryMessage {
headers: vec![],
value: Bytes::from_static(b"my-data"),
..InputEntryMessage::default()
})
.input(expected.clone())
.run(|vm| {
vm.sys_input().unwrap();

assert_that!(
user_code(vm),
err(eq_vm_error(
vm::errors::EntryMismatchError::new(expected.clone(), actual.clone(),).into()
))
);
});

assert_that!(
output.next_decoded::<ErrorMessage>().unwrap(),
error_message_as_vm_error(vm::errors::EntryMismatchError::new(expected, actual,).into())
);
assert_eq!(output.next(), None);
}
46 changes: 46 additions & 0 deletions src/tests/input_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,49 @@ fn echo() {
);
assert_eq!(output.next(), None);
}

#[test]
fn headers() {
let mut output = VMTestCase::new(Version::V1)
.input(StartMessage {
id: Bytes::from_static(b"123"),
debug_id: "123".to_string(),
known_entries: 1,
..Default::default()
})
.input(InputEntryMessage {
headers: vec![service_protocol::messages::Header {
key: "x-my-header".to_owned(),
value: "my-value".to_owned(),
}],
value: Bytes::from_static(b"other-value"),
..InputEntryMessage::default()
})
.run(|vm| {
let_assert!(Input { headers, .. } = vm.sys_input().unwrap());

assert_that!(
headers,
elements_are![eq(Header {
key: Cow::Borrowed("x-my-header"),
value: Cow::Borrowed("my-value"),
})]
);

vm.sys_write_output(NonEmptyValue::Success(vec![])).unwrap();
vm.sys_end().unwrap();
});

assert_eq!(
output.next_decoded::<OutputEntryMessage>().unwrap(),
OutputEntryMessage {
result: Some(output_entry_message::Result::Value(Bytes::default())),
..OutputEntryMessage::default()
}
);
assert_eq!(
output.next_decoded::<EndMessage>().unwrap(),
EndMessage::default()
);
assert_eq!(output.next(), None);
}
2 changes: 2 additions & 0 deletions src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod async_result;
mod failures;
mod get_state;
mod input_output;
mod promise;
Expand All @@ -16,6 +17,7 @@ use crate::service_protocol::messages::{
use crate::service_protocol::{Decoder, Encoder, RawMessage, Version};
use bytes::Bytes;
use googletest::prelude::*;
use std::result::Result;
use test_log::test;

// --- Test infra
Expand Down
34 changes: 32 additions & 2 deletions src/tests/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use crate::service_protocol::messages::{
use assert2::let_assert;
use test_log::test;

/// Old tests

#[test]
fn run_guard() {
let mut output = VMTestCase::new(Version::V1)
Expand Down Expand Up @@ -42,6 +40,38 @@ fn run_guard() {
assert_eq!(output.next(), None);
}

#[test]
fn exit_without_enter() {
let mut output = VMTestCase::new(Version::V1)
.input(StartMessage {
id: Bytes::from_static(b"123"),
debug_id: "123".to_string(),
known_entries: 1,
state_map: vec![],
partial_state: false,
key: "".to_string(),
})
.input(InputEntryMessage {
headers: vec![],
value: Bytes::from_static(b"my-data"),
..InputEntryMessage::default()
})
.run(|vm| {
vm.sys_input().unwrap();

assert_that!(
vm.sys_run_exit(NonEmptyValue::Success(vec![1, 2, 3])),
err(eq_vm_error(vm::errors::INVOKED_RUN_EXIT_WITHOUT_ENTER))
);
});

assert_that!(
output.next_decoded::<ErrorMessage>().unwrap(),
error_message_as_vm_error(vm::errors::INVOKED_RUN_EXIT_WITHOUT_ENTER)
);
assert_eq!(output.next(), None);
}

#[test]
fn enter_then_exit_then_suspend() {
let mut output = VMTestCase::new(Version::V1)
Expand Down
5 changes: 5 additions & 0 deletions src/vm/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ pub const INVOKED_RUN_EXIT_WITHOUT_ENTER: VMError = VMError::new_const(
"Invoked sys_run_exit without invoking sys_run_enter before",
);

pub const INPUT_CLOSED_WHILE_WAITING_ENTRIES: VMError = VMError::new_const(
codes::PROTOCOL_VIOLATION,
"The input was closed while still waiting to receive all the `known_entries`",
);

// Other errors

#[derive(Debug, Clone, thiserror::Error)]
Expand Down
7 changes: 6 additions & 1 deletion src/vm/transitions/async_results.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::vm::context::Context;
use crate::vm::errors::{AwaitingTwoAsyncResultError, UnexpectedStateError};
use crate::vm::errors::{
AwaitingTwoAsyncResultError, UnexpectedStateError, INPUT_CLOSED_WHILE_WAITING_ENTRIES,
};
use crate::vm::transitions::{HitSuspensionPoint, Transition, TransitionAndReturn};
use crate::vm::State;
use crate::{SuspendedError, VMError, Value};
Expand All @@ -21,6 +23,9 @@ impl Transition<Context, NotifyInputClosed> for State {
} if !async_results.has_ready_result(await_point) => {
self.transition(context, HitSuspensionPoint(await_point))
}
State::WaitingStart | State::WaitingReplayEntries { .. } => {
Err(INPUT_CLOSED_WHILE_WAITING_ENTRIES)
}
_ => Ok(self),
}
}
Expand Down

0 comments on commit 747cd01

Please sign in to comment.