Skip to content

Commit

Permalink
fix the rpc tx queue
Browse files Browse the repository at this point in the history
  • Loading branch information
oxarbitrage committed Dec 18, 2024
1 parent 8d694bc commit 01dde31
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 61 deletions.
22 changes: 4 additions & 18 deletions zebra-rpc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,10 @@ pub struct Config {

/// The number of threads used to process RPC requests and responses.
///
/// Zebra's RPC server has a separate thread pool and a `tokio` executor for each thread.
/// State queries are run concurrently using the shared thread pool controlled by
/// the [`SyncSection.parallel_cpu_threads`](https://docs.rs/zebrad/latest/zebrad/components/sync/struct.Config.html#structfield.parallel_cpu_threads) config.
///
/// If the number of threads is not configured or zero, Zebra uses the number of logical cores.
/// If the number of logical cores can't be detected, Zebra uses one thread.
///
/// Set to `1` to run all RPC queries on a single thread, and detect RPC port conflicts from
/// multiple Zebra or `zcashd` instances.
///
/// For details, see [the `jsonrpc_http_server` documentation](https://docs.rs/jsonrpc-http-server/latest/jsonrpc_http_server/struct.ServerBuilder.html#method.threads).
///
/// ## Warning
///
/// The default config uses multiple threads, which disables RPC port conflict detection.
/// This can allow multiple Zebra instances to share the same RPC port.
///
/// If some of those instances are outdated or failed, RPC queries can be slow or inconsistent.
/// This field is deprecated and could be removed in a future release.
/// We keep it just for backward compatibility but it actually do nothing.
/// It was something configurable when the RPC server was based in the jsonrpc-core crate,
/// not anymore since we migrated to jsonrpsee.
pub parallel_cpu_threads: usize,

/// Test-only option that makes Zebra say it is at the chain tip,
Expand Down
23 changes: 11 additions & 12 deletions zebra-rpc/src/methods/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use super::super::{
};

proptest! {
/*
/// Test that when sending a raw transaction, it is received by the mempool service.
#[test]
fn mempool_receives_raw_tx(transaction in any::<Transaction>(), network in any::<Network>()) {
Expand All @@ -50,7 +49,7 @@ proptest! {

let transaction_hex = hex::encode(&transaction_bytes);

let send_task = rpc.send_raw_transaction(transaction_hex);
let send_task = tokio::spawn(async move { rpc.send_raw_transaction(transaction_hex).await });

let unmined_transaction = UnminedTx::from(transaction);
let expected_request = mempool::Request::Queue(vec![unmined_transaction.into()]);
Expand All @@ -65,7 +64,7 @@ proptest! {

state.expect_no_requests().await?;

let result = send_task.await;
let result = send_task.await.expect("send_raw_transaction should not panic");

prop_assert_eq!(result, Ok(hash));

Expand All @@ -92,7 +91,9 @@ proptest! {
let transaction_bytes = transaction.zcash_serialize_to_vec()?;
let transaction_hex = hex::encode(&transaction_bytes);

let send_task = rpc.send_raw_transaction(transaction_hex.clone());
let _rpc = rpc.clone();
let _transaction_hex = transaction_hex.clone();
let send_task = tokio::spawn(async move { _rpc.send_raw_transaction(_transaction_hex).await });

let unmined_transaction = UnminedTx::from(transaction);
let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]);
Expand All @@ -104,11 +105,11 @@ proptest! {

state.expect_no_requests().await?;

let result = send_task.await;
let result = send_task.await.expect("send_raw_transaction should not panic");

check_err_code(result, ErrorCode::ServerError(-1))?;

let send_task = rpc.send_raw_transaction(transaction_hex);
let send_task = tokio::spawn(async move { rpc.send_raw_transaction(transaction_hex.clone()).await });

let expected_request = mempool::Request::Queue(vec![unmined_transaction.clone().into()]);

Expand All @@ -119,7 +120,7 @@ proptest! {
.await?
.respond(Ok::<_, BoxError>(mempool::Response::Queued(vec![Ok(rsp_rx)])));

let result = send_task.await;
let result = send_task.await.expect("send_raw_transaction should not panic");

check_err_code(result, ErrorCode::ServerError(-25))?;

Expand All @@ -129,7 +130,6 @@ proptest! {
Ok(())
})?;
}
*/

/// Test that when the mempool rejects a transaction the caller receives an error.
#[test]
Expand Down Expand Up @@ -591,7 +591,6 @@ proptest! {
})?;
}

/*
/// Test the queue functionality using `send_raw_transaction`
#[test]
fn rpc_queue_main_loop(tx in any::<Transaction>(), network in any::<Network>()) {
Expand All @@ -606,7 +605,7 @@ proptest! {
let transaction_hash = tx.hash();
let tx_bytes = tx.zcash_serialize_to_vec()?;
let tx_hex = hex::encode(&tx_bytes);
let send_task = rpc.send_raw_transaction(tx_hex);
let send_task = tokio::task::spawn(async move { rpc.send_raw_transaction(tx_hex).await });

let tx_unmined = UnminedTx::from(tx);
let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]);
Expand Down Expand Up @@ -681,10 +680,11 @@ proptest! {
runtime.block_on(async move {
let mut transactions_hash_set = HashSet::new();
for tx in txs.clone() {
let rpc_clone = rpc.clone();
// send a transaction
let tx_bytes = tx.zcash_serialize_to_vec()?;
let tx_hex = hex::encode(&tx_bytes);
let send_task = rpc.send_raw_transaction(tx_hex);
let send_task = tokio::task::spawn(async move { rpc_clone.send_raw_transaction(tx_hex).await });

let tx_unmined = UnminedTx::from(tx.clone());
let expected_request = mempool::Request::Queue(vec![tx_unmined.clone().into()]);
Expand Down Expand Up @@ -753,7 +753,6 @@ proptest! {
Ok(())
})?;
}
*/
}

#[derive(Clone, Copy, Debug, Error)]
Expand Down
10 changes: 4 additions & 6 deletions zebra-rpc/src/methods/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,6 @@ async fn rpc_getblock_parse_error() {
assert!(rpc_tx_queue_task_result.is_none());
}

/*
#[tokio::test(flavor = "multi_thread")]
async fn rpc_getblock_missing_error() {
let _init_guard = zebra_test::init();
Expand All @@ -497,17 +496,17 @@ async fn rpc_getblock_missing_error() {

// Make sure Zebra returns the correct error code `-8` for missing blocks
// https://github.com/zcash/lightwalletd/blob/v0.4.16/common/common.go#L287-L290
let block_future = rpc.get_block("0".to_string(), Some(0u8));
let block_future = tokio::spawn(async move { rpc.get_block("0".to_string(), Some(0u8)).await });

// Make the mock service respond with no block
let response_handler = state
.expect_request(zebra_state::ReadRequest::Block(Height(0).into()))
.await;
response_handler.respond(zebra_state::ReadResponse::Block(None));

let block_response = block_future.await;
let block_response = block_response
.expect_err("unexpected success from missing block state response");
let block_response = block_future.await.expect("block future should not panic");
let block_response =
block_response.expect_err("unexpected success from missing block state response");
assert_eq!(block_response.code(), ErrorCode::ServerError(-8).code());

// Now check the error string the way `lightwalletd` checks it
Expand All @@ -529,7 +528,6 @@ async fn rpc_getblock_missing_error() {
let rpc_tx_queue_task_result = rpc_tx_queue_task_handle.now_or_never();
assert!(rpc_tx_queue_task_result.is_none());
}
*/

#[tokio::test(flavor = "multi_thread")]
async fn rpc_getblockheader() {
Expand Down
3 changes: 3 additions & 0 deletions zebra-rpc/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ impl Runner {
};

self.queue.insert(tx.clone());
if self.receiver.is_empty() {
break;
}
}

// skip some work if stored tip height is the same as the one arriving
Expand Down
6 changes: 3 additions & 3 deletions zebra-rpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl RpcServer {
address_book: AddressBook,
latest_chain_tip: Tip,
network: Network,
) -> Result<ServerTask, tower::BoxError>
) -> Result<(ServerTask, JoinHandle<()>), tower::BoxError>
where
VersionString: ToString + Clone + Send + 'static,
UserAgentString: ToString + Clone + Send + 'static,
Expand Down Expand Up @@ -169,7 +169,7 @@ impl RpcServer {
);

// Initialize the rpc methods with the zebra version
let (rpc_impl, _rpc_tx_queue_task_handle) = RpcImpl::new(
let (rpc_impl, rpc_tx_queue_task_handle) = RpcImpl::new(
build_version.clone(),
user_agent,
network.clone(),
Expand Down Expand Up @@ -223,7 +223,7 @@ impl RpcServer {
server_instance.start(rpc_module).stopped().await;
Ok(())
});
Ok(server_task)
Ok((server_task, rpc_tx_queue_task_handle))
}

/// Shut down this RPC server, blocking the current thread.
Expand Down
2 changes: 1 addition & 1 deletion zebra-rpc/src/server/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async fn rpc_spawn_unallocated_port(do_shutdown: bool) {
block_verifier_router.expect_no_requests().await;

if do_shutdown {
rpc_server_task_handle.abort();
rpc_server_task_handle.0.abort();
}
}

Expand Down
54 changes: 33 additions & 21 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,27 +243,31 @@ impl StartCmd {
}

// Launch RPC server
let rpc_task_handle = if let Some(listen_addr) = config.rpc.listen_addr {
info!("spawning RPC server");
info!("Trying to open RPC endpoint at {}...", listen_addr,);
let rpc_task_handle = RpcServer::spawn(
config.rpc.clone(),
config.mining.clone(),
build_version(),
user_agent(),
mempool.clone(),
read_only_state_service.clone(),
block_verifier_router.clone(),
sync_status.clone(),
address_book.clone(),
latest_chain_tip.clone(),
config.network.network.clone(),
);
rpc_task_handle.await.unwrap()
} else {
warn!("configure an listen_addr to start the RPC server");
tokio::spawn(std::future::pending().in_current_span())
};
let (rpc_task_handle, mut rpc_tx_queue_task_handle) =
if let Some(listen_addr) = config.rpc.listen_addr {
info!("spawning RPC server");
info!("Trying to open RPC endpoint at {}...", listen_addr,);
let rpc_task_handle = RpcServer::spawn(
config.rpc.clone(),
config.mining.clone(),
build_version(),
user_agent(),
mempool.clone(),
read_only_state_service.clone(),
block_verifier_router.clone(),
sync_status.clone(),
address_book.clone(),
latest_chain_tip.clone(),
config.network.network.clone(),
);
rpc_task_handle.await.unwrap()
} else {
warn!("configure an listen_addr to start the RPC server");
(
tokio::spawn(std::future::pending().in_current_span()),
tokio::spawn(std::future::pending().in_current_span()),
)
};

// TODO: Add a shutdown signal and start the server with `serve_with_incoming_shutdown()` if
// any related unit tests sometimes crash with memory errors
Expand Down Expand Up @@ -438,6 +442,13 @@ impl StartCmd {
Ok(())
}

rpc_tx_queue_result = &mut rpc_tx_queue_task_handle => {
rpc_tx_queue_result
.expect("unexpected panic in the rpc transaction queue task");
info!("rpc transaction queue task exited");
Ok(())
}

indexer_rpc_join_result = &mut indexer_rpc_task_handle => {
let indexer_rpc_server_result = indexer_rpc_join_result
.expect("unexpected panic in the indexer task");
Expand Down Expand Up @@ -521,6 +532,7 @@ impl StartCmd {

// ongoing tasks
rpc_task_handle.abort();
rpc_tx_queue_task_handle.abort();
syncer_task_handle.abort();
block_gossip_task_handle.abort();
mempool_crawler_task_handle.abort();
Expand Down

0 comments on commit 01dde31

Please sign in to comment.