Skip to content

Commit

Permalink
add auction manager and clean up optimistic executor, remove
Browse files Browse the repository at this point in the history
unncessarily bundle handle
  • Loading branch information
itamarreif committed Nov 1, 2024
1 parent 31646e1 commit dafa9f2
Show file tree
Hide file tree
Showing 14 changed files with 458 additions and 268 deletions.
22 changes: 22 additions & 0 deletions crates/astria-auctioneer/src/auction/allocation_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use super::Bundle;

pub(super) struct FirstPrice {
highest_bid: Option<Bundle>,
}

impl FirstPrice {
pub(super) fn new() -> Self {
Self {
highest_bid: None,
}
}

pub(crate) fn bid(&mut self, _bid: Bundle) -> bool {
// save the bid if its higher than self.highest_bid
unimplemented!()
}

pub(crate) fn highest_bid(self) -> Option<Bundle> {
self.highest_bid
}
}
17 changes: 2 additions & 15 deletions crates/astria-auctioneer/src/auction/bid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,7 @@ impl Bundle {
unimplemented!()
}

pub(crate) fn into_bid(self) -> Bid {
Bid::from_bundle(self)
}
}

#[derive(Debug, Clone)]
pub(crate) struct Bid {}

impl Bid {
fn from_bundle(_bundle: Bundle) -> Self {
unimplemented!()
}

fn into_bundle(self) -> Bundle {
unimplemented!()
pub(crate) fn bid(&self) -> u64 {
self.fee
}
}
26 changes: 13 additions & 13 deletions crates/astria-auctioneer/src/auction/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use tokio::sync::{
use tokio_util::sync::CancellationToken;

use super::{
Auction,
BundlesHandle,
Driver,
Id,
OptimisticExecutionHandle,
};
Expand All @@ -31,7 +31,7 @@ pub(crate) struct Builder {
}

impl Builder {
pub(crate) fn build(self) -> eyre::Result<(Driver, OptimisticExecutionHandle, BundlesHandle)> {
pub(crate) fn build(self) -> (Auction, OptimisticExecutionHandle, BundlesHandle) {
let Self {
metrics,
shutdown_token,
Expand All @@ -47,29 +47,29 @@ impl Builder {
// TODO: get the capacity from config or something instead of using a magic number
let (new_bids_tx, new_bids_rx) = mpsc::channel(16);

let driver = Driver {
let driver = Auction {
metrics,
shutdown_token,
sequencer_grpc_endpoint,
sequencer_abci_endpoint,
executed_block_rx,
block_commitment_rx,
reorg_rx,
new_bids_rx,
start_processing_bids_rx: executed_block_rx,
start_timer_rx: block_commitment_rx,
abort_rx: reorg_rx,
new_bundles_rx: new_bids_rx,
auction_id,
latency_margin,
};

Ok((
(
driver,
OptimisticExecutionHandle {
executed_block_tx: Some(executed_block_tx),
block_commitment_tx: Some(block_commitment_tx),
reorg_tx: Some(reorg_tx),
start_processing_bids_tx: Some(executed_block_tx),
start_timer_tx: Some(block_commitment_tx),
abort_tx: Some(reorg_tx),
},
BundlesHandle {
new_bids_tx,
new_bundles_tx: new_bids_tx,
},
))
)
}
}
134 changes: 134 additions & 0 deletions crates/astria-auctioneer/src/auction/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::collections::HashMap;

use astria_core::generated::bundle::v1alpha1::Bundle;
use astria_eyre::eyre::{
self,
OptionExt as _,
WrapErr as _,
};
use tokio_util::{
sync::CancellationToken,
task::JoinMap,
};
use tracing::instrument;

use super::{
BundlesHandle,
Id,
OptimisticExecutionHandle,
};
use crate::flatten_result;

pub(crate) struct Builder {
pub(crate) metrics: &'static crate::Metrics,
pub(crate) shutdown_token: CancellationToken,
pub(crate) sequencer_grpc_endpoint: String,
pub(crate) sequencer_abci_endpoint: String,
pub(crate) latency_margin: std::time::Duration,
}

impl Builder {
pub(crate) fn build(self) -> Manager {
let Self {
metrics,
shutdown_token,
sequencer_grpc_endpoint,
sequencer_abci_endpoint,
latency_margin,
} = self;

(Manager {
metrics,
shutdown_token,
sequencer_grpc_endpoint,
sequencer_abci_endpoint,
latency_margin,
running_auctions: JoinMap::new(),
execution_handles: HashMap::new(),
bundle_handles: HashMap::new(),
})
}
}

pub(crate) struct Manager {
metrics: &'static crate::Metrics,
shutdown_token: CancellationToken,
sequencer_grpc_endpoint: String,
sequencer_abci_endpoint: String,
latency_margin: std::time::Duration,
running_auctions: JoinMap<Id, eyre::Result<()>>,
execution_handles: HashMap<Id, OptimisticExecutionHandle>,
bundle_handles: HashMap<Id, BundlesHandle>,
// TODO: hold the bundle stream here
}

impl Manager {
#[instrument(skip(self))]
pub(crate) fn new_auction(&mut self, auction_id: Id) {
let (auction, optimistic_execution_handle, bundles_handle) = super::Builder {
metrics: self.metrics,
shutdown_token: self.shutdown_token.clone(),
sequencer_grpc_endpoint: self.sequencer_grpc_endpoint.clone(),
sequencer_abci_endpoint: self.sequencer_abci_endpoint.clone(),
latency_margin: self.latency_margin,
auction_id,
}
.build();

// spawn and save handle
self.running_auctions.spawn(auction_id, auction.run());
self.execution_handles
.insert(auction_id, optimistic_execution_handle);
self.bundle_handles.insert(auction_id, bundles_handle);
}

pub(crate) fn abort_auction(&mut self, auction_id: Id) -> eyre::Result<()> {
// TODO: this should return an option in case the auction returned before being aborted
self.execution_handles
.get_mut(&auction_id)
.ok_or_eyre("unable to get handle for the given auction")?
.abort()
.wrap_err("failed to abort auction")
}

#[instrument(skip(self))]
pub(crate) fn start_timer(&mut self, auction_id: Id) -> eyre::Result<()> {
self.execution_handles
.get_mut(&auction_id)
.ok_or_eyre("unable to get handle for the given auction")?
.start_timer()
.wrap_err("failed to start timer")
}

#[instrument(skip(self))]
pub(crate) fn start_processing_bids(&mut self, auction_id: Id) -> eyre::Result<()> {
self.execution_handles
.get_mut(&auction_id)
.ok_or_eyre("unable to get handle for the given auction")?
.start_processing_bids()
.wrap_err("failed to start processing bids")
}

pub(crate) fn try_send_bundle(&mut self, _bundle: Bundle) -> eyre::Result<()> {
unimplemented!()
// try to get the handle for the appropriate auction
// try send into that auction
}

pub(crate) async fn join_next(&mut self) -> Option<(Id, eyre::Result<()>)> {
if let Some((auction_id, result)) = self.running_auctions.join_next().await {
// TODO: get rid of these excepts, maybe by consolidating the join handle and auction
// handles
self.execution_handles
.remove(&auction_id)
.expect("unable to get handle for the given auction");
self.bundle_handles
.remove(&auction_id)
.expect("unable to get handle for the given auction");

Some((auction_id, flatten_result(result)))
} else {
None
}
}
}
Loading

0 comments on commit dafa9f2

Please sign in to comment.