From ac4ff2e4e2e90bc76cb8ea7142e5d1423c0f766e Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Tue, 26 Nov 2024 17:29:19 -0300 Subject: [PATCH] readd the rpc middleware --- zebra-rpc/src/server.rs | 17 ++- .../src/server/rpc_call_compatibility.rs | 126 ++++++------------ 2 files changed, 57 insertions(+), 86 deletions(-) diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index daa6c264e84..bddf12d5826 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -11,7 +11,9 @@ use std::{fmt, panic}; use cookie::Cookie; +use jsonrpsee::server::middleware::rpc::RpcServiceBuilder; use jsonrpsee::server::{Server, ServerHandle}; + use tokio::task::JoinHandle; use tower::Service; use tracing::*; @@ -25,7 +27,10 @@ use zebra_node_services::mempool; use crate::{ config::Config, methods::{RpcImpl, RpcServer as _}, - server::http_request_compatibility::HttpRequestMiddlewareLayer, + server::{ + http_request_compatibility::HttpRequestMiddlewareLayer, + rpc_call_compatibility::FixRpcResponseMiddleware, + }, }; #[cfg(feature = "getblocktemplate-rpcs")] @@ -33,6 +38,7 @@ use crate::methods::{GetBlockTemplateRpcImpl, GetBlockTemplateRpcServer}; pub mod cookie; pub mod http_request_compatibility; +pub mod rpc_call_compatibility; #[cfg(test)] mod tests; @@ -112,7 +118,6 @@ impl RpcServer { latest_chain_tip: Tip, network: Network, ) -> Result - //Option<(ServerHandle, JoinHandle<()>, Option)> where VersionString: ToString + Clone + Send + 'static, UserAgentString: ToString + Clone + Send + 'static, @@ -147,8 +152,6 @@ impl RpcServer { SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static, AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, { - //if let Some(listen_addr) = config.listen_addr { - let listen_addr = config .listen_addr .expect("caller should make sure listen_addr is set"); @@ -191,9 +194,15 @@ impl RpcServer { }; let http_middleware = tower::ServiceBuilder::new().layer(http_middleware_layer); + + let rpc_middleware = RpcServiceBuilder::new() + .rpc_logger(1024) + .layer_fn(FixRpcResponseMiddleware::new); + let server_instance = Server::builder() .http_only() .set_http_middleware(http_middleware) + .set_rpc_middleware(rpc_middleware) .build(listen_addr) .await .expect("Unable to start RPC server"); diff --git a/zebra-rpc/src/server/rpc_call_compatibility.rs b/zebra-rpc/src/server/rpc_call_compatibility.rs index c3974ac3cf8..fe6d18af54a 100644 --- a/zebra-rpc/src/server/rpc_call_compatibility.rs +++ b/zebra-rpc/src/server/rpc_call_compatibility.rs @@ -3,16 +3,13 @@ //! These fixes are applied at the JSON-RPC call level, //! after the RPC request is parsed and split into calls. -use std::future::Future; +use jsonrpsee::server::middleware::rpc::layer::ResponseFuture; +use jsonrpsee::server::middleware::rpc::RpcService; +use jsonrpsee::server::middleware::rpc::RpcServiceT; +use jsonrpsee::MethodResponse; +use jsonrpsee_types::ErrorObject; -use futures::future::{Either, FutureExt}; -use jsonrpc_core::{ - middleware::Middleware, - types::{Call, Failure, Output, Response}, - BoxFuture, ErrorCode, Metadata, MethodCall, Notification, -}; - -use crate::constants::{INVALID_PARAMETERS_ERROR_CODE, MAX_PARAMS_LOG_LENGTH}; +use crate::constants::INVALID_PARAMETERS_ERROR_CODE; /// JSON-RPC [`Middleware`] with compatibility workarounds. /// @@ -21,88 +18,53 @@ use crate::constants::{INVALID_PARAMETERS_ERROR_CODE, MAX_PARAMS_LOG_LENGTH}; /// ## Make RPC framework response codes match `zcashd` /// /// [`jsonrpc_core`] returns specific error codes while parsing requests: -/// +/// /// /// But these codes are different from `zcashd`, and some RPC clients rely on the exact code. -/// -/// ## Read-Only Functionality -/// -/// This middleware also logs unrecognized RPC requests. -pub struct FixRpcResponseMiddleware; - -impl Middleware for FixRpcResponseMiddleware { - type Future = BoxFuture>; - type CallFuture = BoxFuture>; - - fn on_call( - &self, - call: Call, - meta: M, - next: Next, - ) -> Either - where - Next: Fn(Call, M) -> NextFuture + Send + Sync, - NextFuture: Future> + Send + 'static, - { - Either::Left( - next(call.clone(), meta) - .map(|mut output| { - Self::fix_error_codes(&mut output); - output - }) - .inspect(|output| Self::log_if_error(output, call)) - .boxed(), - ) - } +/// Specifically, the [`INVALID_PARAMETERS_ERROR_CODE`] is different: +/// +pub struct FixRpcResponseMiddleware { + service: RpcService, } impl FixRpcResponseMiddleware { - /// Replace [`jsonrpc_core`] server error codes in `output` with the `zcashd` equivalents. - fn fix_error_codes(output: &mut Option) { - if let Some(Output::Failure(Failure { ref mut error, .. })) = output { - if matches!(error.code, ErrorCode::InvalidParams) { - let original_code = error.code.clone(); - - error.code = INVALID_PARAMETERS_ERROR_CODE; - tracing::debug!("Replacing RPC error: {original_code:?} with {error}"); - } - } + /// Create a new `FixRpcResponseMiddleware` with the given `service`. + pub fn new(service: RpcService) -> Self { + Self { service } } +} - /// Obtain a description string for a received request. - /// - /// Prints out only the method name and the received parameters. - fn call_description(call: &Call) -> String { - match call { - Call::MethodCall(MethodCall { method, params, .. }) => { - let mut params = format!("{params:?}"); - if params.len() >= MAX_PARAMS_LOG_LENGTH { - params.truncate(MAX_PARAMS_LOG_LENGTH); - params.push_str("..."); - } +impl<'a> RpcServiceT<'a> for FixRpcResponseMiddleware { + type Future = ResponseFuture>; - format!(r#"method = {method:?}, params = {params}"#) - } - Call::Notification(Notification { method, params, .. }) => { - let mut params = format!("{params:?}"); - if params.len() >= MAX_PARAMS_LOG_LENGTH { - params.truncate(MAX_PARAMS_LOG_LENGTH); - params.push_str("..."); - } + fn call(&self, request: jsonrpsee::types::Request<'a>) -> Self::Future { + let service = self.service.clone(); + ResponseFuture::future(Box::pin(async move { + let response = service.call(request).await; + if response.is_error() { + let original_error_code = response + .as_error_code() + .expect("response should have an error code"); + if original_error_code == jsonrpsee_types::ErrorCode::InvalidParams.code() { + let new_error_code = INVALID_PARAMETERS_ERROR_CODE.code(); + tracing::debug!( + "Replacing RPC error: {original_error_code} with {new_error_code}" + ); + let json: serde_json::Value = + serde_json::from_str(response.into_parts().0.as_str()) + .expect("response string should be valid json"); + let id = json["id"] + .as_str() + .expect("response json should have an id") + .to_string(); - format!(r#"notification = {method:?}, params = {params}"#) + return MethodResponse::error( + jsonrpsee_types::Id::Str(id.into()), + ErrorObject::borrowed(new_error_code, "Invalid params", None), + ); + } } - Call::Invalid { .. } => "invalid request".to_owned(), - } - } - - /// Check RPC output and log any errors. - // - // TODO: do we want to ignore ErrorCode::ServerError(_), or log it at debug? - fn log_if_error(output: &Option, call: Call) { - if let Some(Output::Failure(Failure { error, .. })) = output { - let call_description = Self::call_description(&call); - tracing::info!("RPC error: {error} in call: {call_description}"); - } + response + })) } }