Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pkhry committed Sep 4, 2024
1 parent c870583 commit a97bcbe
Showing 1 changed file with 133 additions and 116 deletions.
249 changes: 133 additions & 116 deletions subxt/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,17 @@ mod test {
items: HashMap::new(),
}
}

fn from_vec<'a, T: Serialize, I: IntoIterator<Item = (&'a str, RpcResult<T>)>>(
item: I,
) -> Self {
let mut data = Self::new();
for (key, item) in item.into_iter() {
data.push(key.into(), item);
}
data
}

fn push<I: Serialize>(&mut self, key: Vec<u8>, item: RpcResult<I>) {
let item = item.map(|x| serde_json::to_string(&x).unwrap());
match self.items.entry(key) {
Expand All @@ -397,6 +408,25 @@ mod test {
Self { sender, receiver }
}

async fn from_vec<
T: Serialize,
S: IntoIterator<Item = RpcResult<Vec<RpcResult<T>>>>,
>(
items: S,
) -> Self {
let sub = Self::new();
for i in items {
let i: RpcResult<Vec<Item>> = i.map(|items| {
items
.into_iter()
.map(|item| item.map(|i| serde_json::to_string(&i).unwrap()))
.collect()
});
sub.write(i).await
}
sub
}

async fn read(&mut self) -> RpcResult<Vec<Item>> {
self.receiver.recv().await.unwrap()
}
Expand Down Expand Up @@ -498,46 +528,61 @@ mod test {
Ok(Some(Bytes(str.into())))
}

fn storage_response(key: &str, value: &str) -> StorageResponse {
fn storage_response<K: Into<Vec<u8>>, V: Into<Vec<u8>>>(key: K, value: V) -> StorageResponse
where
Vec<u8>: From<K>,
{
StorageResponse {
key: key.into(),
value: value.into(),
}
}

async fn build_mock_client<
'a,
T: Serialize,
D: IntoIterator<Item = (&'a str, RpcResult<T>)>,
S: IntoIterator<Item = RpcResult<Vec<RpcResult<T>>>>,
>(
table_data: D,
subscription_data: S,
) -> RpcClient {
let data = Data {
request: MockDataTable::from_vec(table_data),
subscription: Subscription::from_vec(subscription_data).await,
};
RpcClient::new(MockRpcClientStorage {
data: Arc::new(Mutex::new(data)),
})
}

#[tokio::test]
async fn storage_fetch_values() {
// Setup
let mock_data: Vec<(String, String)> = (1..=3)
.map(|num| (format!("ID{}", num), format!("Data{}", num)))
.collect();
let mut entries = MockDataTable::new();
entries.push(mock_data[0].0.clone().into(), bytes(&mock_data[0].1));
entries.push::<()>(
mock_data[1].0.clone().into(),
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
);
entries.push(mock_data[1].0.clone().into(), bytes(&mock_data[1].1));
entries.push::<()>(
mock_data[2].0.clone().into(),
Err(RpcError::RequestRejected("Reconnecting".to_string())),
);
entries.push(mock_data[2].0.clone().into(), bytes(&mock_data[2].1));

let entries = Data {
request: entries,
subscription: Subscription::new(),
};
let rpc_client = RpcClient::new(MockRpcClientStorage {
data: Arc::new(Mutex::new(entries)),
});

let mock_data = vec![
("ID1", bytes("Data1")),
(
"ID2",
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
),
("ID2", bytes("Data2")),
(
"ID3",
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
),
("ID3", bytes("Data3")),
];
let rpc_client = build_mock_client(mock_data, vec![]).await;
let backend: LegacyBackend<Conf> = LegacyBackend::builder().build(rpc_client);

// Test
let response = backend
.storage_fetch_values(
mock_data.iter().map(|x| x.0.clone().into()).collect(),
["ID1".into(), "ID2".into(), "ID3".into()].into(),
crate::utils::H256::random(),
)
.await
Expand All @@ -547,41 +592,39 @@ mod test {
.map(|x| x.unwrap())
.collect::<Vec<StorageResponse>>()
.await;
let expected: Vec<_> = mock_data
.iter()
.map(|(key, value)| storage_response(key, value))
.collect();

let expected = vec![
storage_response("ID1", "Data1"),
storage_response("ID2", "Data2"),
storage_response("ID3", "Data3"),
];

assert_eq!(expected, response)
}

#[tokio::test]
async fn storage_fetch_value() {
// Setup
let mock_data = ("ID1".to_owned(), "Data1".to_owned());
let mut entries = MockDataTable::new();
entries.push::<()>(
mock_data.0.clone().into(),
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
);
entries.push(mock_data.0.clone().into(), bytes(&mock_data.1));
let entries = Data {
request: entries,
subscription: Subscription::new(),
};
let rpc_client = RpcClient::new(MockRpcClientStorage {
data: Arc::new(Mutex::new(entries)),
});

let mock_data = [
(
"ID1",
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
),
("ID1", bytes("Data1")),
];
let rpc_client = build_mock_client(mock_data, vec![]).await;

// Test
let backend: LegacyBackend<Conf> = LegacyBackend::builder().build(rpc_client);
let response = backend
.storage_fetch_value(mock_data.0.clone().into(), crate::utils::H256::random())
.storage_fetch_value("ID1".into(), crate::utils::H256::random())
.await
.unwrap();

let response = response.unwrap();
assert_eq!(mock_data.1, String::from_utf8(response).unwrap())
assert_eq!("Data1".to_owned(), String::from_utf8(response).unwrap())
}

#[tokio::test]
Expand All @@ -601,24 +644,20 @@ mod test {
/// ```
async fn simple_fetch() {
let hash = crate::utils::H256::random();
// Setup
let mut entries = MockDataTable::new();
entries.push::<()>(
"chain_getBlockHash".into(),
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
);
entries.push("chain_getBlockHash".into(), Ok(Some(hash)));

let entries = Data {
request: entries,
subscription: Subscription::new(),
};
let rpc_client = RpcClient::new(MockRpcClientStorage {
data: Arc::new(Mutex::new(entries)),
});

// Setup
let mock_data = vec![
(
"chain_getBlockHash".into(),
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
),
("chain_getBlockHash".into(), Ok(Some(hash))),
];
let rpc_client = build_mock_client(mock_data, vec![]).await;

// Test
let backend: LegacyBackend<Conf> = LegacyBackend::builder().build(rpc_client);
let response = backend.genesis_hash().await.unwrap();

Expand Down Expand Up @@ -650,54 +689,31 @@ mod test {
/// ```
async fn stream_simple() {
// Setup
let subscription = Subscription::new();

let sub = vec![
Ok(runtime_version(0)),
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
Ok(runtime_version(1)),
]
.into_iter()
.map(|x| x.map(|x| serde_json::to_string(&x).unwrap()))
.collect();

subscription.write(Ok(sub)).await;

let sub = vec![
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
Ok(runtime_version(2)),
Ok(runtime_version(3)),
]
.into_iter()
.map(|x| x.map(|x| serde_json::to_string(&x).unwrap()))
.collect();

subscription.write(Ok(sub)).await;

let sub = vec![
Ok(runtime_version(4)),
Ok(runtime_version(5)),
Err(RpcError::RequestRejected("Reconnecting".to_string())),
]
.into_iter()
.map(|x| x.map(|x| serde_json::to_string(&x).unwrap()))
.collect();
subscription.write(Ok(sub)).await;

// dont need them here
let entries = MockDataTable::new();

let entries = Data {
request: entries,
subscription,
};
let rpc_client = RpcClient::new(MockRpcClientStorage {
data: Arc::new(Mutex::new(entries)),
});

let mock_subscription_data = vec![
Ok(vec![
Ok(runtime_version(0)),
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
Ok(runtime_version(1)),
]),
Ok(vec![
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
Ok(runtime_version(2)),
Ok(runtime_version(3)),
]),
Ok(vec![
Ok(runtime_version(4)),
Ok(runtime_version(5)),
Err(RpcError::RequestRejected("Reconnecting".to_string())),
]),
];
let rpc_client = build_mock_client(vec![], mock_subscription_data).await;

// Test
let backend: LegacyBackend<Conf> = LegacyBackend::builder().build(rpc_client);

let mut results = backend.stream_runtime_version().await.unwrap();
Expand All @@ -706,6 +722,7 @@ mod test {
Ok(client_runtime_version(4)),
Ok(client_runtime_version(5)),
]);

while let Some(res) = results.next().await {
if res.is_ok() {
assert_eq!(expected.pop_front().unwrap().unwrap(), res.unwrap())
Expand Down

0 comments on commit a97bcbe

Please sign in to comment.