Skip to content

Commit

Permalink
readd the rpc middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
oxarbitrage committed Nov 26, 2024
1 parent 1ae99d3 commit ac4ff2e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 86 deletions.
17 changes: 13 additions & 4 deletions zebra-rpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use std::{fmt, panic};

use cookie::Cookie;

use jsonrpsee::server::middleware::rpc::RpcServiceBuilder;
use jsonrpsee::server::{Server, ServerHandle};

use tokio::task::JoinHandle;
use tower::Service;
use tracing::*;
Expand All @@ -25,14 +27,18 @@ use zebra_node_services::mempool;
use crate::{
config::Config,
methods::{RpcImpl, RpcServer as _},
server::http_request_compatibility::HttpRequestMiddlewareLayer,
server::{
http_request_compatibility::HttpRequestMiddlewareLayer,
rpc_call_compatibility::FixRpcResponseMiddleware,
},
};

#[cfg(feature = "getblocktemplate-rpcs")]
use crate::methods::{GetBlockTemplateRpcImpl, GetBlockTemplateRpcServer};

pub mod cookie;
pub mod http_request_compatibility;
pub mod rpc_call_compatibility;

#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -112,7 +118,6 @@ impl RpcServer {
latest_chain_tip: Tip,
network: Network,
) -> Result<ServerTask, tower::BoxError>
//Option<(ServerHandle, JoinHandle<()>, Option<Self>)>
where
VersionString: ToString + Clone + Send + 'static,
UserAgentString: ToString + Clone + Send + 'static,
Expand Down Expand Up @@ -147,8 +152,6 @@ impl RpcServer {
SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static,
AddressBook: AddressBookPeers + Clone + Send + Sync + 'static,
{
//if let Some(listen_addr) = config.listen_addr {

let listen_addr = config
.listen_addr
.expect("caller should make sure listen_addr is set");
Expand Down Expand Up @@ -191,9 +194,15 @@ impl RpcServer {
};

let http_middleware = tower::ServiceBuilder::new().layer(http_middleware_layer);

let rpc_middleware = RpcServiceBuilder::new()
.rpc_logger(1024)
.layer_fn(FixRpcResponseMiddleware::new);

let server_instance = Server::builder()
.http_only()
.set_http_middleware(http_middleware)
.set_rpc_middleware(rpc_middleware)
.build(listen_addr)
.await
.expect("Unable to start RPC server");
Expand Down
126 changes: 44 additions & 82 deletions zebra-rpc/src/server/rpc_call_compatibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@
//! These fixes are applied at the JSON-RPC call level,
//! after the RPC request is parsed and split into calls.
use std::future::Future;
use jsonrpsee::server::middleware::rpc::layer::ResponseFuture;
use jsonrpsee::server::middleware::rpc::RpcService;
use jsonrpsee::server::middleware::rpc::RpcServiceT;
use jsonrpsee::MethodResponse;
use jsonrpsee_types::ErrorObject;

use futures::future::{Either, FutureExt};
use jsonrpc_core::{
middleware::Middleware,
types::{Call, Failure, Output, Response},
BoxFuture, ErrorCode, Metadata, MethodCall, Notification,
};

use crate::constants::{INVALID_PARAMETERS_ERROR_CODE, MAX_PARAMS_LOG_LENGTH};
use crate::constants::INVALID_PARAMETERS_ERROR_CODE;

/// JSON-RPC [`Middleware`] with compatibility workarounds.
///
Expand All @@ -21,88 +18,53 @@ use crate::constants::{INVALID_PARAMETERS_ERROR_CODE, MAX_PARAMS_LOG_LENGTH};
/// ## Make RPC framework response codes match `zcashd`
///
/// [`jsonrpc_core`] returns specific error codes while parsing requests:
/// <https://docs.rs/jsonrpc-core/18.0.0/jsonrpc_core/types/error/enum.ErrorCode.html#variants>
/// <https://docs.rs/jsonrpsee-types/latest/jsonrpsee_types/error/enum.ErrorCode.html>
///
/// But these codes are different from `zcashd`, and some RPC clients rely on the exact code.
///
/// ## Read-Only Functionality
///
/// This middleware also logs unrecognized RPC requests.
pub struct FixRpcResponseMiddleware;

impl<M: Metadata> Middleware<M> for FixRpcResponseMiddleware {
type Future = BoxFuture<Option<Response>>;
type CallFuture = BoxFuture<Option<Output>>;

fn on_call<Next, NextFuture>(
&self,
call: Call,
meta: M,
next: Next,
) -> Either<Self::CallFuture, NextFuture>
where
Next: Fn(Call, M) -> NextFuture + Send + Sync,
NextFuture: Future<Output = Option<Output>> + Send + 'static,
{
Either::Left(
next(call.clone(), meta)
.map(|mut output| {
Self::fix_error_codes(&mut output);
output
})
.inspect(|output| Self::log_if_error(output, call))
.boxed(),
)
}
/// Specifically, the [`INVALID_PARAMETERS_ERROR_CODE`] is different:
/// <https://docs.rs/jsonrpsee-types/latest/jsonrpsee_types/error/constant.INVALID_PARAMS_CODE.html>
pub struct FixRpcResponseMiddleware {
service: RpcService,
}

impl FixRpcResponseMiddleware {
/// Replace [`jsonrpc_core`] server error codes in `output` with the `zcashd` equivalents.
fn fix_error_codes(output: &mut Option<Output>) {
if let Some(Output::Failure(Failure { ref mut error, .. })) = output {
if matches!(error.code, ErrorCode::InvalidParams) {
let original_code = error.code.clone();

error.code = INVALID_PARAMETERS_ERROR_CODE;
tracing::debug!("Replacing RPC error: {original_code:?} with {error}");
}
}
/// Create a new `FixRpcResponseMiddleware` with the given `service`.
pub fn new(service: RpcService) -> Self {
Self { service }
}
}

/// Obtain a description string for a received request.
///
/// Prints out only the method name and the received parameters.
fn call_description(call: &Call) -> String {
match call {
Call::MethodCall(MethodCall { method, params, .. }) => {
let mut params = format!("{params:?}");
if params.len() >= MAX_PARAMS_LOG_LENGTH {
params.truncate(MAX_PARAMS_LOG_LENGTH);
params.push_str("...");
}
impl<'a> RpcServiceT<'a> for FixRpcResponseMiddleware {
type Future = ResponseFuture<futures::future::BoxFuture<'a, jsonrpsee::MethodResponse>>;

format!(r#"method = {method:?}, params = {params}"#)
}
Call::Notification(Notification { method, params, .. }) => {
let mut params = format!("{params:?}");
if params.len() >= MAX_PARAMS_LOG_LENGTH {
params.truncate(MAX_PARAMS_LOG_LENGTH);
params.push_str("...");
}
fn call(&self, request: jsonrpsee::types::Request<'a>) -> Self::Future {
let service = self.service.clone();
ResponseFuture::future(Box::pin(async move {
let response = service.call(request).await;
if response.is_error() {
let original_error_code = response
.as_error_code()
.expect("response should have an error code");
if original_error_code == jsonrpsee_types::ErrorCode::InvalidParams.code() {
let new_error_code = INVALID_PARAMETERS_ERROR_CODE.code();
tracing::debug!(
"Replacing RPC error: {original_error_code} with {new_error_code}"
);
let json: serde_json::Value =
serde_json::from_str(response.into_parts().0.as_str())
.expect("response string should be valid json");
let id = json["id"]
.as_str()
.expect("response json should have an id")
.to_string();

format!(r#"notification = {method:?}, params = {params}"#)
return MethodResponse::error(
jsonrpsee_types::Id::Str(id.into()),
ErrorObject::borrowed(new_error_code, "Invalid params", None),
);
}
}
Call::Invalid { .. } => "invalid request".to_owned(),
}
}

/// Check RPC output and log any errors.
//
// TODO: do we want to ignore ErrorCode::ServerError(_), or log it at debug?
fn log_if_error(output: &Option<Output>, call: Call) {
if let Some(Output::Failure(Failure { error, .. })) = output {
let call_description = Self::call_description(&call);
tracing::info!("RPC error: {error} in call: {call_description}");
}
response
}))
}
}

0 comments on commit ac4ff2e

Please sign in to comment.