Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pkhry committed Sep 4, 2024
1 parent c271500 commit b17c74f
Showing 1 changed file with 44 additions and 40 deletions.
84 changes: 44 additions & 40 deletions subxt/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,20 +361,13 @@ mod test {
type RpcResult<T> = Result<T, RpcError>;
type Item = RpcResult<String>;

struct StorageEntry {
struct MockDataTable {
items: HashMap<Vec<u8>, VecDeque<Item>>,
}

trait Storage {
fn new() -> Self;
fn push<I: Serialize>(&mut self, key: Vec<u8>, item: RpcResult<I>);

fn pop(&mut self, key: Vec<u8>) -> Item;
}

impl Storage for StorageEntry {
impl MockDataTable {
fn new() -> Self {
StorageEntry {
MockDataTable {
items: HashMap::new(),
}
}
Expand All @@ -393,13 +386,33 @@ mod test {
}
}

struct Data<R: Storage> {
request: R,
subscription: mpsc::Receiver<RpcResult<Vec<Item>>>,
struct Subscription {
sender: mpsc::Sender<RpcResult<Vec<Item>>>,
receiver: mpsc::Receiver<RpcResult<Vec<Item>>>,
}

impl Subscription {
fn new() -> Self {
let (sender, receiver) = mpsc::channel(32);
Self { sender, receiver }
}

async fn read(&mut self) -> RpcResult<Vec<Item>> {
self.receiver.recv().await.unwrap()
}

async fn write(&self, items: RpcResult<Vec<Item>>) {
self.sender.send(items).await.unwrap()
}
}

struct Data {
request: MockDataTable,
subscription: Subscription,
}

struct MockRpcClientStorage {
data: Arc<Mutex<Data<StorageEntry>>>,
data: Arc<Mutex<Data>>,
}

impl RpcClientT for MockRpcClientStorage {
Expand All @@ -412,16 +425,10 @@ mod test {
match method {
"state_getStorage" => {
let mut data = self.data.lock().await;
let key = match serde_json::to_value(params.unwrap()).unwrap() {
serde_json::Value::Array(arr) => match &arr[0] {
serde_json::Value::String(s) => s.clone(),
_ => todo!("not expected"),
},
_ => todo!("not expected"),
};
let key = key.strip_prefix("0x").unwrap().as_bytes().to_vec();
let key = hex::decode(key).unwrap();
let value = data.request.pop(key);
let params = params.map(|p| p.get().to_string());
let rpc_params = jsonrpsee::types::Params::new(params.as_deref());
let key: sp_core::Bytes = rpc_params.sequence().next().unwrap();
let value = data.request.pop(key.0);
value.map(|v| serde_json::value::RawValue::from_string(v).unwrap())
}
"chain_getBlockHash" => {
Expand All @@ -443,7 +450,7 @@ mod test {
Box::pin(async {
let mut data = self.data.lock().await;
let values: RpcResult<Vec<RpcResult<Box<RawValue>>>> =
data.subscription.recv().await.unwrap().map(|v| {
data.subscription.read().await.map(|v| {
v.into_iter()
.map(|v| {
v.map(|v| serde_json::value::RawValue::from_string(v).unwrap())
Expand Down Expand Up @@ -504,7 +511,7 @@ mod test {
let mock_data: Vec<(String, String)> = (1..=3)
.map(|num| (format!("ID{}", num), format!("Data{}", num)))
.collect();
let mut entries = StorageEntry::new();
let mut entries = MockDataTable::new();
entries.push(mock_data[0].0.clone().into(), bytes(&mock_data[0].1));
entries.push::<()>(
mock_data[1].0.clone().into(),
Expand All @@ -519,10 +526,9 @@ mod test {
);
entries.push(mock_data[2].0.clone().into(), bytes(&mock_data[2].1));

let (_rx, tx) = tokio::sync::mpsc::channel(32);
let entries = Data {
request: entries,
subscription: tx,
subscription: Subscription::new(),
};
let rpc_client = RpcClient::new(MockRpcClientStorage {
data: Arc::new(Mutex::new(entries)),
Expand Down Expand Up @@ -552,18 +558,17 @@ mod test {
async fn storage_fetch_value() {
// Setup
let mock_data = ("ID1".to_owned(), "Data1".to_owned());
let mut entries = StorageEntry::new();
let mut entries = MockDataTable::new();
entries.push::<()>(
mock_data.0.clone().into(),
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
);
entries.push(mock_data.0.clone().into(), bytes(&mock_data.1));
let (_rx, tx) = tokio::sync::mpsc::channel(32);
let entries = Data {
request: entries,
subscription: tx,
subscription: Subscription::new(),
};
let rpc_client = RpcClient::new(MockRpcClientStorage {
data: Arc::new(Mutex::new(entries)),
Expand Down Expand Up @@ -597,7 +602,7 @@ mod test {
async fn simple_fetch() {
let hash = crate::utils::H256::random();
// Setup
let mut entries = StorageEntry::new();
let mut entries = MockDataTable::new();
entries.push::<()>(
"chain_getBlockHash".into(),
Err(RpcError::DisconnectedWillReconnect(
Expand All @@ -606,10 +611,9 @@ mod test {
);
entries.push("chain_getBlockHash".into(), Ok(Some(hash)));

let (_rx, tx) = tokio::sync::mpsc::channel(32);
let entries = Data {
request: entries,
subscription: tx,
subscription: Subscription::new(),
};
let rpc_client = RpcClient::new(MockRpcClientStorage {
data: Arc::new(Mutex::new(entries)),
Expand Down Expand Up @@ -646,7 +650,7 @@ mod test {
/// ```
async fn stream_simple() {
// Setup
let (rx, tx) = tokio::sync::mpsc::channel(32);
let subscription = Subscription::new();

let sub = vec![
Ok(runtime_version(0)),
Expand All @@ -659,7 +663,7 @@ mod test {
.map(|x| x.map(|x| serde_json::to_string(&x).unwrap()))
.collect();

rx.send(Ok(sub)).await.unwrap();
subscription.write(Ok(sub)).await;

let sub = vec![
Err(RpcError::DisconnectedWillReconnect(
Expand All @@ -672,7 +676,7 @@ mod test {
.map(|x| x.map(|x| serde_json::to_string(&x).unwrap()))
.collect();

rx.send(Ok(sub)).await.unwrap();
subscription.write(Ok(sub)).await;

let sub = vec![
Ok(runtime_version(4)),
Expand All @@ -682,14 +686,14 @@ mod test {
.into_iter()
.map(|x| x.map(|x| serde_json::to_string(&x).unwrap()))
.collect();
rx.send(Ok(sub)).await.unwrap();
subscription.write(Ok(sub)).await;

// dont need them here
let entries = StorageEntry::new();
let entries = MockDataTable::new();

let entries = Data {
request: entries,
subscription: tx,
subscription: subscription,
};
let rpc_client = RpcClient::new(MockRpcClientStorage {
data: Arc::new(Mutex::new(entries)),
Expand Down

0 comments on commit b17c74f

Please sign in to comment.