-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integration test for balances and (non)retryable messages #2505
Changes from 7 commits
31c691d
2fd5428
777c977
557b209
e2f3d7b
90ead90
0f582aa
7819fee
d2abe43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ use fuel_core_client::client::{ | |
PaginationRequest, | ||
}, | ||
types::{ | ||
CoinType, | ||
RelayedTransactionStatus as ClientRelayedTransactionStatus, | ||
TransactionStatus, | ||
}, | ||
|
@@ -72,9 +73,15 @@ use std::{ | |
SocketAddr, | ||
}, | ||
sync::Arc, | ||
time::Duration, | ||
}; | ||
use tokio::sync::oneshot::Sender; | ||
|
||
enum MessageKind { | ||
Retryable { nonce: u64, amount: u64 }, | ||
NonRetryable { nonce: u64, amount: u64 }, | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn relayer_can_download_logs() { | ||
let mut config = Config::local_node(); | ||
|
@@ -477,3 +484,233 @@ async fn handle( | |
|
||
Ok(Response::new(Body::from(r))) | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn balances_and_coins_to_spend_never_return_retryable_messages() { | ||
let mut rng = StdRng::seed_from_u64(1234); | ||
let mut config = Config::local_node(); | ||
config.relayer = Some(relayer::Config::default()); | ||
let relayer_config = config.relayer.as_mut().expect("Expected relayer config"); | ||
let eth_node = MockMiddleware::default(); | ||
let contract_address = relayer_config.eth_v2_listening_contracts[0]; | ||
const TIMEOUT: Duration = Duration::from_secs(1); | ||
|
||
// Large enough to get all messages, but not to trigger the "query is too complex" error. | ||
const UNLIMITED_QUERY_RESULTS: i32 = 100; | ||
|
||
// Given | ||
|
||
// setup a retryable and non-retryable message | ||
let secret_key: SecretKey = SecretKey::random(&mut rng); | ||
let public_key = secret_key.public_key(); | ||
let recipient = Input::owner(&public_key); | ||
|
||
const RETRYABLE_AMOUNT: u64 = 99; | ||
const RETRYABLE_NONCE: u64 = 0; | ||
const NON_RETRYABLE_AMOUNT: u64 = 100; | ||
const NON_RETRYABLE_NONCE: u64 = 1; | ||
let messages = vec![ | ||
MessageKind::Retryable { | ||
nonce: RETRYABLE_NONCE, | ||
amount: RETRYABLE_AMOUNT, | ||
}, | ||
MessageKind::NonRetryable { | ||
nonce: NON_RETRYABLE_NONCE, | ||
amount: NON_RETRYABLE_AMOUNT, | ||
}, | ||
]; | ||
let logs: Vec<_> = setup_messages(&messages, &recipient, &contract_address); | ||
|
||
eth_node.update_data(|data| data.logs_batch = vec![logs.clone()]); | ||
// Setup the eth node with a block high enough that there | ||
// will be some finalized blocks. | ||
eth_node.update_data(|data| data.best_block.number = Some(200.into())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: do we have a constant defined somewhere for the L1 finalization period? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. None that I know of. |
||
let eth_node = Arc::new(eth_node); | ||
let eth_node_handle = spawn_eth_node(eth_node).await; | ||
|
||
relayer_config.relayer = Some(vec![format!("http://{}", eth_node_handle.address) | ||
.as_str() | ||
.try_into() | ||
.unwrap()]); | ||
|
||
config.utxo_validation = true; | ||
|
||
// setup fuel node with mocked eth url | ||
let db = Database::in_memory(); | ||
|
||
let srv = FuelService::from_database(db.clone(), config) | ||
.await | ||
.unwrap(); | ||
|
||
let client = FuelClient::from(srv.bound_address); | ||
let base_asset_id = client | ||
.consensus_parameters(0) | ||
.await | ||
.unwrap() | ||
.unwrap() | ||
.base_asset_id() | ||
.clone(); | ||
|
||
// When | ||
|
||
// wait for relayer to catch up to eth node | ||
srv.await_relayer_synced().await.unwrap(); | ||
// Wait for the block producer to create a block that targets the latest da height. | ||
srv.shared | ||
.poa_adapter | ||
.manually_produce_blocks( | ||
None, | ||
Mode::Blocks { | ||
number_of_blocks: 1, | ||
}, | ||
) | ||
.await | ||
.unwrap(); | ||
|
||
// Balances are processed in the off-chain worker, so we need to wait for it | ||
// to process the messages before we can assert the balances. | ||
let result = tokio::time::timeout(TIMEOUT, async { | ||
loop { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a better way to ensure that off chain worker has processed the events? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. Ideally we should be able to register and listen for events from the tracing module. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When you submit transaction and subscribe for its status, you only receive notification when block is processed by the off-chain worker |
||
let query = client | ||
.balances( | ||
&recipient, | ||
PaginationRequest { | ||
cursor: None, | ||
results: UNLIMITED_QUERY_RESULTS, | ||
direction: PageDirection::Forward, | ||
}, | ||
) | ||
.await | ||
.unwrap(); | ||
|
||
if !query.results.is_empty() { | ||
break; | ||
} | ||
} | ||
}) | ||
.await; | ||
if let Err(_) = result { | ||
panic!("Off-chain worker didn't process balances within timeout") | ||
} | ||
// Then | ||
|
||
// Expect two messages to be available | ||
let query = client | ||
.messages( | ||
None, | ||
PaginationRequest { | ||
cursor: None, | ||
results: UNLIMITED_QUERY_RESULTS, | ||
direction: PageDirection::Forward, | ||
}, | ||
) | ||
.await | ||
.unwrap(); | ||
assert_eq!(query.results.len(), 2); | ||
let total_amount = query.results.iter().map(|m| m.amount).sum::<u64>(); | ||
assert_eq!(total_amount, NON_RETRYABLE_AMOUNT + RETRYABLE_AMOUNT); | ||
|
||
// Expect only the non-retryable message balance to be returned via "balance" | ||
let query = client | ||
.balance(&recipient, Some(&base_asset_id)) | ||
.await | ||
.unwrap(); | ||
assert_eq!(query, NON_RETRYABLE_AMOUNT); | ||
|
||
// Expect only the non-retryable message balance to be returned via "balances" | ||
let query = client | ||
.balances( | ||
&recipient, | ||
PaginationRequest { | ||
cursor: None, | ||
results: UNLIMITED_QUERY_RESULTS, | ||
direction: PageDirection::Forward, | ||
}, | ||
) | ||
.await | ||
.unwrap(); | ||
assert_eq!(query.results.len(), 1); | ||
let total_amount = query | ||
.results | ||
.iter() | ||
.map(|m| { | ||
assert_eq!(m.asset_id, base_asset_id); | ||
m.amount | ||
}) | ||
.sum::<u128>(); | ||
assert_eq!(total_amount, NON_RETRYABLE_AMOUNT as u128); | ||
|
||
// Expect only the non-retryable message balance to be returned via "coins to spend" | ||
let query = client | ||
.coins_to_spend( | ||
&recipient, | ||
vec![(base_asset_id, NON_RETRYABLE_AMOUNT, None)], | ||
None, | ||
) | ||
.await | ||
.unwrap(); | ||
let message_coins: Vec<_> = query | ||
.iter() | ||
.flatten() | ||
.map(|m| { | ||
let CoinType::MessageCoin(m) = m else { | ||
panic!("should have message coin") | ||
}; | ||
m | ||
}) | ||
.collect(); | ||
assert_eq!(message_coins.len(), 1); | ||
assert_eq!(message_coins[0].amount, NON_RETRYABLE_AMOUNT); | ||
assert_eq!(message_coins[0].nonce, NON_RETRYABLE_NONCE.into()); | ||
|
||
// Expect no messages when querying more than the available non-retryable amount | ||
let query = client | ||
.coins_to_spend( | ||
&recipient, | ||
vec![(base_asset_id, NON_RETRYABLE_AMOUNT + 1, None)], | ||
None, | ||
) | ||
.await | ||
.unwrap_err(); | ||
assert_eq!( | ||
query.to_string(), | ||
"Response errors; not enough coins to fit the target" | ||
); | ||
|
||
srv.send_stop_signal_and_await_shutdown().await.unwrap(); | ||
eth_node_handle.shutdown.send(()).unwrap(); | ||
} | ||
|
||
fn setup_messages( | ||
messages: &[MessageKind], | ||
recipient: &Address, | ||
contract_address: &Bytes20, | ||
) -> Vec<Log> { | ||
const SENDER: Address = Address::zeroed(); | ||
|
||
messages | ||
.iter() | ||
.map(|m| match m { | ||
MessageKind::Retryable { nonce, amount } => make_message_event( | ||
Nonce::from(*nonce), | ||
5, | ||
*contract_address, | ||
Some(SENDER.into()), | ||
Some((*recipient).into()), | ||
Some(*amount), | ||
Some(vec![1]), | ||
0, | ||
), | ||
MessageKind::NonRetryable { nonce, amount } => make_message_event( | ||
Nonce::from(*nonce), | ||
5, | ||
*contract_address, | ||
Some(SENDER.into()), | ||
Some((*recipient).into()), | ||
Some(*amount), | ||
None, | ||
0, | ||
), | ||
}) | ||
.collect() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of curiosity, what's expected to happen if you assign the same nonce to both messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If recipient is also the same one message will overwritten by the other in DB. See here.
A lack of explicit checks suggest that we assume that such message will never come from the relayer.
IIUC, we don't have any special handling if two or more messages have the same nonce but different recipients.