Skip to content

Commit

Permalink
Merge branch 'master' into split-keygen-into-crates-and-bin
Browse files Browse the repository at this point in the history
  • Loading branch information
Voxelot authored Oct 20, 2023
2 parents 779776a + 9402068 commit 315c86c
Show file tree
Hide file tree
Showing 21 changed files with 95 additions and 77 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Description of the upcoming release here.
### Added

- [#1426](https://github.com/FuelLabs/fuel-core/pull/1426) Split keygen into a crate and a binary
- [#1432](https://github.com/FuelLabs/fuel-core/pull/1432): Add a new `--api-request-timeout` argument to control TTL for GraphQL requests.
- [#1419](https://github.com/FuelLabs/fuel-core/pull/1419): Add additional "sanity" benchmarks for arithmetic op code instructions.
- [#1411](https://github.com/FuelLabs/fuel-core/pull/1411): Added WASM and `no_std` compatibility
- [#1371](https://github.com/FuelLabs/fuel-core/pull/1371): Add new client function for querying the `MessageStatus` for a specific message (by `Nonce`)
Expand Down Expand Up @@ -51,6 +52,7 @@ Description of the upcoming release here.
- [#1395](https://github.com/FuelLabs/fuel-core/pull/1395): Add DependentCost benchmarks for `k256`, `s256` and `mcpi` instructions.

#### Breaking
- [#1432](https://github.com/FuelLabs/fuel-core/pull/1432): All subscriptions and requests have a TTL now. So each subscription lifecycle is limited in time. If the subscription is closed because of TTL, it means that you subscribed after your transaction had been dropped by the network.
- [#1407](https://github.com/FuelLabs/fuel-core/pull/1407): The recipient is a `ContractId` instead of `Address`. The block producer should deploy its contract to receive the transaction fee. The collected fee is zero until the recipient contract is set.
- [#1407](https://github.com/FuelLabs/fuel-core/pull/1407): The `Mint` transaction is reworked with new fields to support the account-base model. It affects serialization and deserialization of the transaction and also affects GraphQL schema.
- [#1407](https://github.com/FuelLabs/fuel-core/pull/1407): The `Mint` transaction is the last transaction in the block instead of the first.
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ pub struct Command {
#[clap(long = "query-log-threshold-time", default_value = "2s", env)]
pub query_log_threshold_time: humantime::Duration,

/// Timeout before drop the request.
#[clap(long = "api-request-timeout", default_value = "30m", env)]
pub api_request_timeout: humantime::Duration,

#[clap(flatten)]
pub profiling: profiling::ProfilingArgs,
}
Expand Down Expand Up @@ -240,6 +244,7 @@ impl Command {
min_connected_reserved_peers,
time_until_synced,
query_log_threshold_time,
api_request_timeout,
profiling: _,
} = self;

Expand Down Expand Up @@ -297,6 +302,7 @@ impl Command {

let config = Config {
addr,
api_request_timeout: api_request_timeout.into(),
max_database_cache_size,
database_path,
database_type,
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ tempfile = { workspace = true, optional = true }
thiserror = "1.0"
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio-stream = { workspace = true, features = ["sync"] }
tower-http = { version = "0.3", features = ["set-header", "trace"] }
tower-http = { version = "0.3", features = ["set-header", "trace", "timeout"] }
tracing = { workspace = true }
uuid = { version = "1.1", features = ["v4"] }

Expand Down
7 changes: 2 additions & 5 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use async_trait::async_trait;
use fuel_core_services::stream::{
BoxFuture,
BoxStream,
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
iter::{
BoxedIter,
Expand Down Expand Up @@ -172,7 +169,7 @@ pub trait TxPoolPort: Send + Sync {
fn tx_update_subscribe(
&self,
tx_id: TxId,
) -> BoxFuture<'_, BoxStream<TxStatusMessage>>;
) -> anyhow::Result<BoxStream<TxStatusMessage>>;
}

#[async_trait]
Expand Down
10 changes: 7 additions & 3 deletions crates/fuel-core/src/graphql_api/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use std::{
use tokio_stream::StreamExt;
use tower_http::{
set_header::SetResponseHeaderLayer,
timeout::TimeoutLayer,
trace::TraceLayer,
};

Expand Down Expand Up @@ -155,15 +156,17 @@ impl RunnableTask for Task {
}
}

// Need a separate Data Object for each Query endpoint, cannot be avoided
// Need a seperate Data Object for each Query endpoint, cannot be avoided
#[allow(clippy::too_many_arguments)]
pub fn new_service(
config: Config,
schema: CoreSchemaBuilder,
database: Database,
txpool: TxPool,
producer: BlockProducer,
consensus_module: ConsensusModule,
_log_threshold_ms: Duration,
log_threshold_ms: Duration,
request_timeout: Duration,
) -> anyhow::Result<Service> {
let network_addr = config.addr;

Expand All @@ -174,7 +177,7 @@ pub fn new_service(
.data(producer)
.data(consensus_module)
.extension(async_graphql::extensions::Tracing)
.extension(MetricsExtension::new(_log_threshold_ms))
.extension(MetricsExtension::new(log_threshold_ms))
.finish();

let router = Router::new()
Expand All @@ -188,6 +191,7 @@ pub fn new_service(
.route("/health", get(health))
.layer(Extension(schema))
.layer(TraceLayer::new_for_http())
.layer(TimeoutLayer::new(request_timeout))
.layer(SetResponseHeaderLayer::<_>::overriding(
ACCESS_CONTROL_ALLOW_ORIGIN,
HeaderValue::from_static("*"),
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/query/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ where
}

#[tracing::instrument(skip(state, stream), fields(transaction_id = %transaction_id))]
pub(crate) async fn transaction_status_change<'a, State>(
pub(crate) fn transaction_status_change<'a, State>(
state: State,
stream: BoxStream<'a, TxStatusMessage>,
transaction_id: Bytes32,
Expand Down
1 change: 0 additions & 1 deletion crates/fuel-core/src/query/subscriptions/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ fn test_tsc_inner(

let stream = futures::stream::iter(stream).boxed();
super::transaction_status_change(mock_state, stream, txn_id(0))
.await
.collect::<Vec<_>>()
.await
})
Expand Down
12 changes: 6 additions & 6 deletions crates/fuel-core/src/schema/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,13 @@ impl TxStatusSubscription {
&self,
ctx: &Context<'a>,
#[graphql(desc = "The ID of the transaction")] id: TransactionId,
) -> impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a {
) -> anyhow::Result<impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a>
{
let txpool = ctx.data_unchecked::<TxPool>();
let db = ctx.data_unchecked::<Database>();
let rx = txpool.tx_update_subscribe(id.into()).await;
let rx = txpool.tx_update_subscribe(id.into())?;

transaction_status_change(
Ok(transaction_status_change(
move |id| match db.tx_status(&id) {
Ok(status) => Ok(Some(status)),
Err(StorageError::NotFound(_, _)) => {
Expand All @@ -322,8 +323,7 @@ impl TxStatusSubscription {
rx,
id.into(),
)
.await
.map_err(async_graphql::Error::from)
.map_err(async_graphql::Error::from))
}

/// Submits transaction to the `TxPool` and await either confirmation or failure.
Expand All @@ -338,7 +338,7 @@ impl TxStatusSubscription {
let config = ctx.data_unchecked::<Config>();
let tx = FuelTx::from_bytes(&tx.0)?;
let tx_id = tx.id(&config.consensus_parameters.chain_id);
let subscription = txpool.tx_update_subscribe(tx_id).await;
let subscription = txpool.tx_update_subscribe(tx_id)?;

let _: Vec<_> = txpool
.insert(vec![Arc::new(tx)])
Expand Down
12 changes: 6 additions & 6 deletions crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ use crate::{
service::adapters::TxPoolAdapter,
};
use async_trait::async_trait;
use fuel_core_services::stream::{
BoxFuture,
BoxStream,
};
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
iter::{
BoxedIter,
Expand Down Expand Up @@ -233,8 +230,11 @@ impl TxPoolPort for TxPoolAdapter {
self.service.insert(txs).await
}

fn tx_update_subscribe(&self, id: TxId) -> BoxFuture<BoxStream<TxStatusMessage>> {
Box::pin(self.service.tx_update_subscribe(id))
fn tx_update_subscribe(
&self,
id: TxId,
) -> anyhow::Result<BoxStream<TxStatusMessage>> {
self.service.tx_update_subscribe(id)
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/fuel-core/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub use fuel_core_poa::Trigger;
#[derive(Clone, Debug)]
pub struct Config {
pub addr: SocketAddr,
pub api_request_timeout: Duration,
pub max_database_cache_size: usize,
pub database_path: PathBuf,
pub database_type: DbType,
Expand Down Expand Up @@ -77,6 +78,7 @@ impl Config {

Self {
addr: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 0),
api_request_timeout: Duration::from_secs(60),
// Set the cache for tests = 10MB
max_database_cache_size: 10 * 1024 * 1024,
database_path: Default::default(),
Expand Down
16 changes: 7 additions & 9 deletions crates/fuel-core/src/service/query.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//! Queries we can run directly on `FuelService`.
use std::sync::Arc;

use fuel_core_types::{
Expand Down Expand Up @@ -44,7 +43,7 @@ impl FuelService {
tx: Transaction,
) -> anyhow::Result<impl Stream<Item = anyhow::Result<TransactionStatus>>> {
let id = tx.id(&self.shared.config.chain_conf.consensus_parameters.chain_id);
let stream = self.transaction_status_change(id).await;
let stream = self.transaction_status_change(id)?;
self.submit(tx).await?;
Ok(stream)
}
Expand All @@ -55,7 +54,7 @@ impl FuelService {
tx: Transaction,
) -> anyhow::Result<TransactionStatus> {
let id = tx.id(&self.shared.config.chain_conf.consensus_parameters.chain_id);
let stream = self.transaction_status_change(id).await.filter(|status| {
let stream = self.transaction_status_change(id)?.filter(|status| {
futures::future::ready(!matches!(status, Ok(TransactionStatus::Submitted(_))))
});
futures::pin_mut!(stream);
Expand All @@ -67,21 +66,20 @@ impl FuelService {
}

/// Return a stream of status changes for a transaction.
pub async fn transaction_status_change(
pub fn transaction_status_change(
&self,
id: Bytes32,
) -> impl Stream<Item = anyhow::Result<TransactionStatus>> {
) -> anyhow::Result<impl Stream<Item = anyhow::Result<TransactionStatus>>> {
let txpool = self.shared.txpool.clone();
let db = self.shared.database.clone();
let rx = Box::pin(txpool.tx_update_subscribe(id).await);
transaction_status_change(
let rx = txpool.tx_update_subscribe(id)?;
Ok(transaction_status_change(
move |id| match db.get_tx_status(&id)? {
Some(status) => Ok(Some(status)),
None => Ok(txpool.find_one(id).map(Into::into)),
},
rx,
id,
)
.await
))
}
}
1 change: 1 addition & 0 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ pub fn init_sub_services(
Box::new(producer_adapter),
Box::new(poa_adapter),
config.query_log_threshold_time,
config.api_request_timeout,
)?;

let shared = SharedState {
Expand Down
25 changes: 18 additions & 7 deletions crates/services/txpool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ use fuel_core_types::{
tai64::Tai64,
};

use anyhow::anyhow;
use parking_lot::Mutex as ParkingMutex;
use std::sync::Arc;
use std::{
sync::Arc,
time::Duration,
};
use tokio::{
sync::broadcast,
time::MissedTickBehavior,
Expand All @@ -76,9 +80,9 @@ pub struct TxStatusChange {
}

impl TxStatusChange {
pub fn new(capacity: usize) -> Self {
pub fn new(capacity: usize, ttl: Duration) -> Self {
let (new_tx_notification_sender, _) = broadcast::channel(capacity);
let update_sender = UpdateSender::new(capacity);
let update_sender = UpdateSender::new(capacity, ttl);
Self {
new_tx_notification_sender,
update_sender,
Expand Down Expand Up @@ -326,11 +330,11 @@ where
self.tx_status_sender.new_tx_notification_sender.subscribe()
}

pub async fn tx_update_subscribe(&self, tx_id: Bytes32) -> TxStatusStream {
pub fn tx_update_subscribe(&self, tx_id: Bytes32) -> anyhow::Result<TxStatusStream> {
self.tx_status_sender
.update_sender
.subscribe::<MpscChannel>(tx_id)
.await
.try_subscribe::<MpscChannel>(tx_id)
.ok_or(anyhow!("Maximum number of subscriptions reached"))
}
}

Expand Down Expand Up @@ -450,7 +454,14 @@ where
gossiped_tx_stream,
committed_block_stream,
shared: SharedState {
tx_status_sender: TxStatusChange::new(number_of_active_subscription),
tx_status_sender: TxStatusChange::new(
number_of_active_subscription,
// The connection should be closed automatically after the `SqueezedOut` event.
// But because of slow/malicious consumers, the subscriber can still be occupied.
// We allow the subscriber to receive the event produced by TxPool's TTL.
// But we still want to drop subscribers after `2 * TxPool_TTL`.
2 * config.transaction_ttl,
),
txpool,
p2p,
consensus_params,
Expand Down
4 changes: 2 additions & 2 deletions crates/services/txpool/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ async fn simple_insert_removal_subscription() {
let mut tx1_subscribe_updates = service
.shared
.tx_update_subscribe(tx1.cached_id().unwrap())
.await;
.unwrap();
let mut tx2_subscribe_updates = service
.shared
.tx_update_subscribe(tx2.cached_id().unwrap())
.await;
.unwrap();

let out = service.shared.insert(vec![tx1.clone(), tx2.clone()]).await;

Expand Down
6 changes: 3 additions & 3 deletions crates/services/txpool/src/service/tests_p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn can_insert_from_p2p() {
let mut receiver = service
.shared
.tx_update_subscribe(tx1.id(&Default::default()))
.await;
.unwrap();

service.start_and_await().await.unwrap();

Expand Down Expand Up @@ -66,7 +66,7 @@ async fn insert_from_local_broadcasts_to_p2p() {
let mut subscribe_update = service
.shared
.tx_update_subscribe(tx1.cached_id().unwrap())
.await;
.unwrap();

let out = service.shared.insert(vec![Arc::new(tx1.clone())]).await;

Expand Down Expand Up @@ -115,7 +115,7 @@ async fn test_insert_from_p2p_does_not_broadcast_to_p2p() {
let mut receiver = service
.shared
.tx_update_subscribe(tx1.id(&Default::default()))
.await;
.unwrap();

service.start_and_await().await.unwrap();

Expand Down
Loading

0 comments on commit 315c86c

Please sign in to comment.