Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration test for balances and (non)retryable messages #2505

Merged
36 changes: 21 additions & 15 deletions tests/tests/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ use fuel_core_types::{
},
};

const RETRYABLE: &[u8] = &[1];
const NON_RETRYABLE: &[u8] = &[];

#[tokio::test]
async fn balance() {
let owner = Address::default();
Expand All @@ -55,18 +58,22 @@ async fn balance() {
..coin_generator.generate()
})
.collect(),
messages: vec![(owner, 60), (owner, 90)]
.into_iter()
.enumerate()
.map(|(nonce, (owner, amount))| MessageConfig {
sender: owner,
recipient: owner,
nonce: (nonce as u64).into(),
amount,
data: vec![],
da_height: DaBlockHeight::from(0usize),
})
.collect(),
messages: vec![
(owner, 60, NON_RETRYABLE),
(owner, 90, NON_RETRYABLE),
(owner, 200000, RETRYABLE),
]
.into_iter()
.enumerate()
.map(|(nonce, (owner, amount, data))| MessageConfig {
sender: owner,
recipient: owner,
nonce: (nonce as u64).into(),
amount,
data: data.to_vec(),
da_height: DaBlockHeight::from(0usize),
})
.collect(),
..Default::default()
};
let config = Config::local_node_with_state_config(state_config);
Expand Down Expand Up @@ -129,6 +136,8 @@ async fn balance() {
client.submit_and_await_commit(&tx).await.unwrap();

let balance = client.balance(&owner, Some(&asset_id)).await.unwrap();

// Note that the big (200000) message, which is RETRYABLE is not included in the balance
assert_eq!(balance, 449);
}

Expand All @@ -137,9 +146,6 @@ async fn balance_messages_only() {
let owner = Address::default();
let asset_id = AssetId::BASE;

const RETRYABLE: &[u8] = &[1];
const NON_RETRYABLE: &[u8] = &[];

// setup config
let state_config = StateConfig {
contracts: vec![],
Expand Down
237 changes: 237 additions & 0 deletions tests/tests/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use fuel_core_client::client::{
PaginationRequest,
},
types::{
CoinType,
RelayedTransactionStatus as ClientRelayedTransactionStatus,
TransactionStatus,
},
Expand Down Expand Up @@ -72,9 +73,15 @@ use std::{
SocketAddr,
},
sync::Arc,
time::Duration,
};
use tokio::sync::oneshot::Sender;

enum MessageKind {
Retryable { nonce: u64, amount: u64 },
NonRetryable { nonce: u64, amount: u64 },
}

#[tokio::test(flavor = "multi_thread")]
async fn relayer_can_download_logs() {
let mut config = Config::local_node();
Expand Down Expand Up @@ -477,3 +484,233 @@ async fn handle(

Ok(Response::new(Body::from(r)))
}

#[tokio::test(flavor = "multi_thread")]
async fn balances_and_coins_to_spend_never_return_retryable_messages() {
let mut rng = StdRng::seed_from_u64(1234);
let mut config = Config::local_node();
config.relayer = Some(relayer::Config::default());
let relayer_config = config.relayer.as_mut().expect("Expected relayer config");
let eth_node = MockMiddleware::default();
let contract_address = relayer_config.eth_v2_listening_contracts[0];
const TIMEOUT: Duration = Duration::from_secs(1);

// Large enough to get all messages, but not to trigger the "query is too complex" error.
const UNLIMITED_QUERY_RESULTS: i32 = 100;

// Given

// setup a retryable and non-retryable message
let secret_key: SecretKey = SecretKey::random(&mut rng);
let public_key = secret_key.public_key();
let recipient = Input::owner(&public_key);

const RETRYABLE_AMOUNT: u64 = 99;
const RETRYABLE_NONCE: u64 = 0;
const NON_RETRYABLE_AMOUNT: u64 = 100;
const NON_RETRYABLE_NONCE: u64 = 1;
let messages = vec![
MessageKind::Retryable {
nonce: RETRYABLE_NONCE,
amount: RETRYABLE_AMOUNT,
},
MessageKind::NonRetryable {
nonce: NON_RETRYABLE_NONCE,
amount: NON_RETRYABLE_AMOUNT,
},
];
Comment on lines +512 to +521
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, what's expected to happen if you assign the same nonce to both messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If recipient is also the same one message will overwritten by the other in DB. See here.

A lack of explicit checks suggest that we assume that such message will never come from the relayer.

IIUC, we don't have any special handling if two or more messages have the same nonce but different recipients.

let logs: Vec<_> = setup_messages(&messages, &recipient, &contract_address);

eth_node.update_data(|data| data.logs_batch = vec![logs.clone()]);
// Setup the eth node with a block high enough that there
// will be some finalized blocks.
eth_node.update_data(|data| data.best_block.number = Some(200.into()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we have a constant defined somewhere for the L1 finalization period?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None that I know of.

let eth_node = Arc::new(eth_node);
let eth_node_handle = spawn_eth_node(eth_node).await;

relayer_config.relayer = Some(vec![format!("http://{}", eth_node_handle.address)
.as_str()
.try_into()
.unwrap()]);

config.utxo_validation = true;

// setup fuel node with mocked eth url
let db = Database::in_memory();

let srv = FuelService::from_database(db.clone(), config)
.await
.unwrap();

let client = FuelClient::from(srv.bound_address);
let base_asset_id = client
.consensus_parameters(0)
.await
.unwrap()
.unwrap()
.base_asset_id()
.clone();

// When

// wait for relayer to catch up to eth node
srv.await_relayer_synced().await.unwrap();
// Wait for the block producer to create a block that targets the latest da height.
srv.shared
.poa_adapter
.manually_produce_blocks(
None,
Mode::Blocks {
number_of_blocks: 1,
},
)
.await
.unwrap();

// Balances are processed in the off-chain worker, so we need to wait for it
// to process the messages before we can assert the balances.
let result = tokio::time::timeout(TIMEOUT, async {
loop {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to ensure that off chain worker has processed the events?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Ideally we should be able to register and listen for events from the tracing module.
To me it looks like waiting for the balances to be available is a viable alternative

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you submit transaction and subscribe for its status, you only receive notification when block is processed by the off-chain worker

let query = client
.balances(
&recipient,
PaginationRequest {
cursor: None,
results: UNLIMITED_QUERY_RESULTS,
direction: PageDirection::Forward,
},
)
.await
.unwrap();

if !query.results.is_empty() {
break;
}
}
})
.await;
if let Err(_) = result {
panic!("Off-chain worker didn't process balances within timeout")
}
// Then

// Expect two messages to be available
let query = client
.messages(
None,
PaginationRequest {
cursor: None,
results: UNLIMITED_QUERY_RESULTS,
direction: PageDirection::Forward,
},
)
.await
.unwrap();
assert_eq!(query.results.len(), 2);
let total_amount = query.results.iter().map(|m| m.amount).sum::<u64>();
assert_eq!(total_amount, NON_RETRYABLE_AMOUNT + RETRYABLE_AMOUNT);

// Expect only the non-retryable message balance to be returned via "balance"
let query = client
.balance(&recipient, Some(&base_asset_id))
.await
.unwrap();
assert_eq!(query, NON_RETRYABLE_AMOUNT);

// Expect only the non-retryable message balance to be returned via "balances"
let query = client
.balances(
&recipient,
PaginationRequest {
cursor: None,
results: UNLIMITED_QUERY_RESULTS,
direction: PageDirection::Forward,
},
)
.await
.unwrap();
assert_eq!(query.results.len(), 1);
let total_amount = query
.results
.iter()
.map(|m| {
assert_eq!(m.asset_id, base_asset_id);
m.amount
})
.sum::<u128>();
assert_eq!(total_amount, NON_RETRYABLE_AMOUNT as u128);

// Expect only the non-retryable message balance to be returned via "coins to spend"
let query = client
.coins_to_spend(
&recipient,
vec![(base_asset_id, NON_RETRYABLE_AMOUNT, None)],
None,
)
.await
.unwrap();
let message_coins: Vec<_> = query
.iter()
.flatten()
.map(|m| {
let CoinType::MessageCoin(m) = m else {
panic!("should have message coin")
};
m
})
.collect();
assert_eq!(message_coins.len(), 1);
assert_eq!(message_coins[0].amount, NON_RETRYABLE_AMOUNT);
assert_eq!(message_coins[0].nonce, NON_RETRYABLE_NONCE.into());

// Expect no messages when querying more than the available non-retryable amount
let query = client
.coins_to_spend(
&recipient,
vec![(base_asset_id, NON_RETRYABLE_AMOUNT + 1, None)],
None,
)
.await
.unwrap_err();
assert_eq!(
query.to_string(),
"Response errors; not enough coins to fit the target"
);

srv.send_stop_signal_and_await_shutdown().await.unwrap();
eth_node_handle.shutdown.send(()).unwrap();
}

fn setup_messages(
messages: &[MessageKind],
recipient: &Address,
contract_address: &Bytes20,
) -> Vec<Log> {
const SENDER: Address = Address::zeroed();

messages
.iter()
.map(|m| match m {
MessageKind::Retryable { nonce, amount } => make_message_event(
Nonce::from(*nonce),
5,
*contract_address,
Some(SENDER.into()),
Some((*recipient).into()),
Some(*amount),
Some(vec![1]),
0,
),
MessageKind::NonRetryable { nonce, amount } => make_message_event(
Nonce::from(*nonce),
5,
*contract_address,
Some(SENDER.into()),
Some((*recipient).into()),
Some(*amount),
None,
0,
),
})
.collect()
}
Loading