Skip to content

Commit

Permalink
feat: fetch all methods
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimirvolek committed Dec 4, 2023
1 parent 03ac4c3 commit 6f326d7
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ homepage = "https://blockfrost.io"
[dependencies]
blockfrost-openapi = { version = "0.0.3" }
futures = "0.3.17"
reqwest = { version = "0.11.4", features = ["multipart"] }
reqwest = { version = "0.11.4", features = ["multipart", "json"] }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.68"
paste = "1.0"
Expand Down
14 changes: 10 additions & 4 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
pub(super) mod endpoints;

use crate::{
request::send_get_request, url::Url, utils::build_header_map,
utils::create_client_with_project_id, BlockFrostSettings, BlockfrostError, Pagination,
request::{fetch_all_pages, send_get_request},
url::Url,
utils::build_header_map,
utils::create_client_with_project_id,
BlockFrostSettings, BlockfrostError, Pagination,
};
use reqwest::ClientBuilder;

Expand Down Expand Up @@ -61,6 +63,10 @@ impl BlockfrostAPI {
{
let url = Url::from_paginated_endpoint(self.base_url.as_str(), url_endpoint, pagination)?;

send_get_request(&self.client, url, self.settings.retry_settings).await
if pagination.fetch_all {
fetch_all_pages(&self.client, url, self.settings.retry_settings, pagination).await

Check failure on line 67 in src/api/mod.rs

View workflow job for this annotation

GitHub Actions / Lints

mismatched types

Check failure on line 67 in src/api/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests

mismatched types

Check failure on line 67 in src/api/mod.rs

View workflow job for this annotation

GitHub Actions / Check

mismatched types
} else {
send_get_request(&self.client, url, self.settings.retry_settings).await
}
}
}
59 changes: 55 additions & 4 deletions src/request.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use crate::{json_error, process_error_response, reqwest_error, BlockfrostError, RetrySettings};
use crate::{
json_error, process_error_response, reqwest_error, url::Url, BlockfrostError, Pagination,
RetrySettings,
};
use futures::future;
use reqwest::{Client, RequestBuilder, Response, StatusCode};
use serde_json::from_str as json_from;
use std::{future::Future, thread};
use serde::de::DeserializeOwned;
use serde_json::from_str;
use std::{error::Error, future::Future, thread};

// Used only for simple and common GET requests.
// Functions that require extra logic may not call this.
Expand All @@ -24,7 +29,7 @@ where
return Err(process_error_response(&text, status, &url));
}

json_from::<T>(&text).map_err(|reason| json_error(url, text, reason))
from_str::<T>(&text).map_err(|reason| json_error(url, text, reason))
}
}

Expand Down Expand Up @@ -67,3 +72,49 @@ fn clone_request(request: &RequestBuilder) -> RequestBuilder {
// .try_clone() will always succeed.
request.try_clone().unwrap()
}

pub(crate) async fn fetch_all_pages<T: DeserializeOwned + Send + Sync>(
client: &Client,
url: String,
retry_settings: RetrySettings,

Check failure on line 79 in src/request.rs

View workflow job for this annotation

GitHub Actions / Lints

unused variable: `retry_settings`

Check warning on line 79 in src/request.rs

View workflow job for this annotation

GitHub Actions / Unit Tests

unused variable: `retry_settings`

Check warning on line 79 in src/request.rs

View workflow job for this annotation

GitHub Actions / Check

unused variable: `retry_settings`
pagination: Pagination,
) -> Result<Vec<T>, Box<dyn Error>> {
const CONCURRENT_REQUESTS: usize = 10;
const BATCH_SIZE: usize = 10;

let mut page_start: usize = 1;
let mut is_end = false;
let mut result = Vec::new();

while !is_end {
let batch = Url::generate_batch(url.as_str(), BATCH_SIZE, page_start, pagination)?;

println!("batch {:?}", batch);

let bodies: Vec<Result<Vec<T>, Box<dyn Error>>> =
future::join_all(batch.into_iter().map(|url| {
let client = client.clone();
async move {
let resp = client.get(url).send().await?;
resp.json::<Vec<T>>().await.map_err(|e| e.into())
}
}))
.await;

for b in bodies {
match b {
Ok(data) => {
if data.len() < pagination.count {
is_end = true;
}
result.extend(data);
},
Err(e) => eprintln!("Got an error: {}", e),
}
}

page_start += BATCH_SIZE;
}

Ok(result)
}
9 changes: 9 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,12 @@ impl Default for Pagination {
}
}
}

impl Pagination {
pub fn order_to_string(&self) -> String {
match self.order {
Order::Asc => "asc".to_string(),
Order::Desc => "desc".to_string(),
}
}
}
44 changes: 42 additions & 2 deletions src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ impl Url {
pagination: Pagination,
) -> Result<String, Box<dyn Error>> {
let mut url = Self::create_base_url(base_url, endpoint_url)?;

let mut query_pairs = form_urlencoded::Serializer::new(String::new());

query_pairs.append_pair("page", pagination.page.to_string().as_str());
query_pairs.append_pair("count", pagination.count.to_string().as_str());
query_pairs.append_pair("count", pagination.count.to_string().as_str());
query_pairs.append_pair("order", pagination.order_to_string().as_str());

let query = query_pairs.finish();

Expand All @@ -32,6 +31,32 @@ impl Url {
Ok(url.to_string())
}

pub fn generate_batch(
url: &str,
batch_size: usize,
start: usize,
pagination: Pagination,
) -> Result<Vec<String>, Box<dyn Error>> {
let mut result = Vec::new();
let mut url = UrlI::parse(url)?;

for page in start..batch_size {
let mut query_pairs = form_urlencoded::Serializer::new(String::new());

query_pairs.append_pair("page", page.to_string().as_str());
query_pairs.append_pair("count", pagination.count.to_string().as_str());
query_pairs.append_pair("order", pagination.order_to_string().as_str());

let query = query_pairs.finish();

url.set_query(Some(&query));

result.push(url.to_string());
}

Ok(result)
}

pub fn get_base_url_from_project_id(project_id: &str) -> String {
match project_id {
id if id.starts_with("mainnet") => CARDANO_MAINNET_URL,
Expand All @@ -53,3 +78,18 @@ impl Url {
Ok(url.join(endpoint)?)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_from_endpoint() {
let base_url = "http://example.com";
let endpoint_url = "api/data";
let expected_url = "http://example.com/api/data";

let result = Url::from_endpoint(base_url, endpoint_url).unwrap();
assert_eq!(result, expected_url);
}
}

0 comments on commit 6f326d7

Please sign in to comment.