Skip to content

Commit

Permalink
transaction submission logic
Browse files Browse the repository at this point in the history
  • Loading branch information
itamarreif committed Nov 7, 2024
1 parent 047c33f commit de15c44
Show file tree
Hide file tree
Showing 11 changed files with 307 additions and 153 deletions.
3 changes: 3 additions & 0 deletions crates/astria-auctioneer/src/auction/allocation_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ impl FirstPrice {
}
}

/// Submit a bundle with a bid.
///
/// Returns `true` if the bid is accepted as the highest bid.
pub(crate) fn bid(&mut self, bundle: Bundle) -> bool {
if bundle.bid() > self.highest_bid.as_ref().map_or(0, |b| b.bid()) {
self.highest_bid = Some(bundle);
Expand Down
33 changes: 22 additions & 11 deletions crates/astria-auctioneer/src/auction/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Manager {
pub(crate) fn new_auction(&mut self, auction_id: Id) {
let (handle, auction) = super::Builder {
metrics: self.metrics,
shutdown_token: self.shutdown_token.clone(),
shutdown_token: self.shutdown_token.child_token(),
sequencer_grpc_client: self.sequencer_grpc_client.clone(),
sequencer_abci_client: self.sequencer_abci_client.clone(),
latency_margin: self.latency_margin,
Expand All @@ -141,29 +141,40 @@ impl Manager {

pub(crate) fn abort_auction(&mut self, auction_id: Id) -> eyre::Result<()> {
// TODO: this should return an option in case the auction returned before being aborted
self.auction_handles
let handle = self
.auction_handles
.get_mut(&auction_id)
.ok_or_eyre("unable to get handle for the given auction")?
.abort()
.wrap_err("failed to abort auction")
.ok_or_eyre("unable to get handle for the given auction")?;

handle.abort().expect("should only abort once per auction");
Ok(())
}

#[instrument(skip(self))]
pub(crate) fn start_timer(&mut self, auction_id: Id) -> eyre::Result<()> {
self.auction_handles
let handle = self
.auction_handles
.get_mut(&auction_id)
.ok_or_eyre("unable to get handle for the given auction")?
.ok_or_eyre("unable to get handle for the given auction")?;

handle
.start_timer()
.wrap_err("failed to start timer")
.expect("should only start timer once per auction");

Ok(())
}

#[instrument(skip(self))]
pub(crate) fn start_processing_bids(&mut self, auction_id: Id) -> eyre::Result<()> {
self.auction_handles
let handle = self
.auction_handles
.get_mut(&auction_id)
.ok_or_eyre("unable to get handle for the given auction")?
.ok_or_eyre("unable to get handle for the given auction")?;

handle
.start_processing_bids()
.wrap_err("failed to start processing bids")
.expect("should only start processing bids once per auction");
Ok(())
}

pub(crate) fn try_send_bundle(&mut self, auction_id: Id, bundle: Bundle) -> eyre::Result<()> {
Expand Down
209 changes: 139 additions & 70 deletions crates/astria-auctioneer/src/auction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@ use astria_eyre::eyre::{
self,
eyre,
Context,
ContextCompat,
OptionExt as _,
};
pub(crate) use builder::Builder;
use sequencer_client::Address;
use sequencer_client::{
tendermint_rpc::endpoint::broadcast::tx_sync,
Address,
SequencerClientExt,
};
use telemetry::display::base64;
use tokio::{
select,
Expand All @@ -31,13 +36,13 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;
use tracing::{
debug,
error,
info,
instrument,
warn,
Instrument,
Span,
};
use tryhard::backoff_strategies::ExponentialBackoff;

use crate::{
bundle::Bundle,
Expand Down Expand Up @@ -66,7 +71,14 @@ pub(crate) use manager::Manager;

mod allocation_rule;

enum Command {
StartProcessingBids,
StartTimer,
Abort,
}

pub(crate) struct Handle {
commands_tx: mpsc::Sender<Command>,
start_processing_bids_tx: Option<oneshot::Sender<()>>,
start_timer_tx: Option<oneshot::Sender<()>>,
abort_tx: Option<oneshot::Sender<()>>,
Expand All @@ -78,7 +90,7 @@ impl Handle {
let _ = self
.abort_tx
.take()
.expect("should only send reorg signal to a given auction once");
.ok_or_eyre("should only send reorg signal to a given auction once")?;

Ok(())
}
Expand All @@ -87,7 +99,7 @@ impl Handle {
let _ = self
.start_processing_bids_tx
.take()
.expect("should only send executed signal to a given auction once")
.ok_or_eyre("should only send executed signal to a given auction once")?
.send(());
Ok(())
}
Expand All @@ -96,7 +108,7 @@ impl Handle {
let _ = self
.start_timer_tx
.take()
.expect("should only send block commitment signal to a given auction once")
.ok_or_eyre("should only send block commitment signal to a given auction once")?
.send(());

Ok(())
Expand All @@ -112,7 +124,7 @@ impl Handle {
}

// TODO: should this be the same object as the auction?
struct Auction {
pub(crate) struct Auction {
#[allow(dead_code)]
metrics: &'static Metrics,
shutdown_token: CancellationToken,
Expand Down Expand Up @@ -176,108 +188,105 @@ impl Auction {

signal = &mut self.start_processing_bids_rx, if !auction_is_open => {
if let Err(e) = signal {
break Err(eyre!("exec signal channel closed")).wrap_err(e);
break Err(e).wrap_err("exec signal channel closed");
}
// set auction to open so it starts collecting bids
auction_is_open = true;
}

signal = &mut self.start_timer_rx, if auction_is_open => {
if let Err(e) = signal {
break Err(eyre!("commit signal channel closed")).wrap_err(e);
break Err(e).wrap_err("commit signal channel closed");
}
// set the timer
latency_margin_timer = Some(tokio::time::sleep(self.latency_margin));

// TODO: also want to fetch the pending nonce here (we wait for commit because we want the pending nonce from after the commit)
nonce_fetch = Some(tokio::task::spawn(async {
// TODO: fetch the pending nonce using the sequencer client with tryhard
Ok(0)
let client = self.sequencer_grpc_client.clone();
let address = self.sequencer_key.address().clone();

// we wait for commit because we want the pending nonce from after the commit
// TODO: fix lifetime issue with passing metrics here?
nonce_fetch = Some(tokio::task::spawn(async move {
get_pending_nonce(client, address).await
}));
}

Some(bundle) = self.new_bundles_rx.recv(), if auction_is_open => {
if allocation_rule.bid(bundle.clone()) {
info!(auction.id = %base64(self.auction_id), bundle.bid = %bundle.bid(), "new highest bid")
info!(
auction.id = %base64(self.auction_id),
bundle.bid = %bundle.bid(),
"received new highest bid"
);
} else {
debug!(
auction.id = %base64(self.auction_id),
bundle.bid = %bundle.bid(),
"received bid lower than current highest bid, discarding"
);
}
}

}
};

// await the nonce fetch result
// TODO: separate the rest of this to a different object, e.g. AuctionResult?
// TODO: flatten this or get rid of the option somehow?
// await the nonce fetch result
let nonce = nonce_fetch
.expect("should have received commit to exit the bid loop")
.expect(
"should have received commit and fetched pending nonce before exiting the auction \
loop",
)
.await
.wrap_err("task failed")?
.wrap_err("get_pending_nonce task failed")?
.wrap_err("failed to fetch nonce")?;

// handle auction result
// serialize, sign and submit to the sequencer
let transaction_body = auction_result
.wrap_err("")?
.wrap_err("auction failed unexpectedly")?
.ok_or_eyre("auction ended with no winning bid")?
.into_transaction_body(nonce, self.rollup_id, self.fee_asset_denomination.clone());
.into_transaction_body(
nonce,
self.rollup_id,
self.fee_asset_denomination.clone(),
self.sequencer_chain_id,
);

let transaction = transaction_body.sign(self.sequencer_key.signing_key());

let submission_result = select! {
biased;

// TODO: should this be Ok(())?
() = self.shutdown_token.cancelled() => Err(eyre!("received shutdown signal")),

// submit the transaction to the sequencer
result = self.submit_transaction(transaction) => {
// TODO: handle submission failure better?
result
// TODO: should this be Ok(())? or Ok("received shutdown signal")?
() = self.shutdown_token.cancelled() => Err(eyre!("received shutdown signal during auction result submission")),

result = submit_transaction(self.sequencer_abci_client.clone(), transaction, self.metrics) => {
// TODO: how to handle submission failure better?
match result {
Ok(resp) => {
// TODO: handle failed submission instead of just logging the result
info!(auction.id = %base64(self.auction_id), auction.result = %resp.log, "auction result submitted to sequencer");
Ok(())
},
Err(e) => {
error!(auction.id = %base64(self.auction_id), err = %e, "failed to submit auction result to sequencer");
Err(e).wrap_err("failed to submit auction result to sequencer")
},
}
}
};

submission_result
}

#[instrument(skip_all, fields(auction.id = %base64(self.auction_id), %address, err))]
async fn get_pending_nonce(&self, address: Address) -> eyre::Result<u32> {
let span = tracing::Span::current();
let retry_cfg = make_retry_cfg("get pending nonce".into(), span);
let client = self.sequencer_grpc_client.clone();

let nonce = tryhard::retry_fn(|| {
let mut client = client.clone();
let address = address.clone();

async move {
client
.get_pending_nonce(GetPendingNonceRequest {
address: Some(address.into_raw()),
})
.await
}
})
.with_config(retry_cfg)
.in_current_span()
.await
.wrap_err("failed to get pending nonce")?
.into_inner()
.inner;

Ok(nonce)
}

async fn submit_transaction(&self, _transaction: Transaction) -> eyre::Result<()> {
unimplemented!()
}
}

fn make_retry_cfg(
msg: String,
span: Span,
) -> tryhard::RetryFutureConfig<
ExponentialBackoff,
impl Fn(u32, Option<Duration>, &tonic::Status) -> futures::future::Ready<()>,
> {
tryhard::RetryFutureConfig::new(1024)
#[instrument(skip_all, fields(%address, err))]
async fn get_pending_nonce(
client: SequencerServiceClient<tonic::transport::Channel>,
address: Address,
) -> eyre::Result<u32> {
let span = tracing::Span::current();
let retry_cfg = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(2))
.on_retry(
Expand All @@ -290,9 +299,69 @@ fn make_retry_cfg(
attempt,
wait_duration,
error = error as &dyn std::error::Error,
"attempt to {msg} failed; retrying after backoff",
"attempt to get pending nonce failed; retrying after backoff",
);
futures::future::ready(())
},
);

let nonce = tryhard::retry_fn(|| {
let mut client = client.clone();
let address = address.clone();

async move {
client
.get_pending_nonce(GetPendingNonceRequest {
address: Some(address.into_raw()),
})
.await
}
})
.with_config(retry_cfg)
.in_current_span()
.await
.wrap_err("failed to get pending nonce")?
.into_inner()
.inner;

Ok(nonce)
}

async fn submit_transaction(
client: sequencer_client::HttpClient,
transaction: Transaction,
_metrics: &'static Metrics,
) -> eyre::Result<tx_sync::Response> {
let span = tracing::Span::current();
let retry_cfg = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(2))
.on_retry(
move |attempt: u32,
next_delay: Option<Duration>,
error: &sequencer_client::extension_trait::Error| {
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
warn!(
parent: &span,
attempt,
wait_duration,
error = error as &dyn std::error::Error,
"attempt to submit transaction failed; retrying after backoff",
);
futures::future::ready(())
},
)
);

tryhard::retry_fn(|| {
let client = client.clone();
let transaction = transaction.clone();

async move { client.submit_transaction_sync(transaction).await }
})
.with_config(retry_cfg)
.in_current_span()
.await
.wrap_err("failed to submit transaction")
}
Loading

0 comments on commit de15c44

Please sign in to comment.