Skip to content

Commit

Permalink
Add new backfill queues (#102)
Browse files Browse the repository at this point in the history
This moves the queues the acc forwarder and txn forwarder uses to be backfill queues. This allows us to backfill without disturbing ingestion of new data coming in. This can be quite important considering the need for latest data for proofs to work. It aligns well with the changes introduced via out of order processing allowing us to introduce past history while still maintaining proof status.

We have been running this in our infrastructure:

It allows us to ingest a tree with 1mn/2mn transactions (which can take a little time) while staying on the tip with all other trees because they aren ot competing for queue space. This means that the "on tip queue" never has more than a few seconds worth of transactions in it, while the backfill queue can have a lot more and can be used to fill in trees that have been missed in ingestion.
  • Loading branch information
linuskendall authored Sep 19, 2023
1 parent 16e898b commit 600a711
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 165 deletions.
186 changes: 89 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

27 changes: 2 additions & 25 deletions das_api/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions das_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ schemars = "0.8.6"
schemars_derive = "0.8.6"
open-rpc-derive = { version = "0.0.4"}
open-rpc-schema = { version = "0.0.4"}
blockbuster = { git = "https://github.com/metaplex-foundation/blockbuster.git", rev = "e39c5bf2" }
blockbuster = { git = "https://github.com/metaplex-foundation/blockbuster.git", rev = "552aba6a" }
anchor-lang = { version = "0.26.0"}
mpl-token-metadata = { version = "1.8.3", features = ["no-entrypoint", "serde-feature"] }
mpl-candy-machine-core = { version = "0.1.4", features = ["no-entrypoint"] }
mpl-bubblegum = { git = "https://github.com/metaplex-foundation/mpl-bubblegum.git", rev = "3cb3976d", features = ["no-entrypoint"] }
mpl-candy-guard = { version="0.3.0", features = ["no-entrypoint"] }
mpl-candy-guard = { version="0.3.0", features = ["no-entrypoint"] }
2 changes: 1 addition & 1 deletion digital_asset_types/src/dao/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#![allow(ambiguous_glob_reexports)]
mod full_asset;
mod generated;
pub mod scopes;
pub use full_asset::*;
#[allow(ambiguous_glob_reexports)]
pub use generated::*;

use self::sea_orm_active_enums::{
Expand Down
2 changes: 1 addition & 1 deletion nft_ingester/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ tokio-postgres = "0.7.7"
serde = "1.0.136"
bs58 = "0.4.0"
reqwest = "0.11.11"
plerkle_messenger = { version = "1.5.2", features = ['redis'] }
plerkle_messenger = { git = "https://github.com/metaplex-foundation/digital-asset-validator-plugin.git", rev = "35a8801c", features = ['redis'] }
plerkle_serialization = { version = "1.5.2" }
flatbuffers = "23.1.21"
lazy_static = "1.4.0"
Expand Down
21 changes: 11 additions & 10 deletions nft_ingester/src/account_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
use cadence_macros::{is_global_default_set, statsd_count, statsd_time};
use chrono::Utc;
use log::{debug, error};
use plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData, ACCOUNT_STREAM};
use plerkle_messenger::{ConsumptionType, Messenger, MessengerConfig, RecvData};
use plerkle_serialization::root_as_account_info;
use sqlx::{Pool, Postgres};
use tokio::{
Expand All @@ -21,19 +21,20 @@ pub fn account_worker<T: Messenger>(
bg_task_sender: UnboundedSender<TaskData>,
ack_channel: UnboundedSender<(&'static str, String)>,
consumption_type: ConsumptionType,
stream_key: &'static str,
) -> JoinHandle<()> {
tokio::spawn(async move {
let source = T::new(config).await;
if let Ok(mut msg) = source {
let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender));
loop {
let e = msg.recv(ACCOUNT_STREAM, consumption_type.clone()).await;
let e = msg.recv(stream_key, consumption_type.clone()).await;
let mut tasks = JoinSet::new();
match e {
Ok(data) => {
let len = data.len();
for item in data {
tasks.spawn(handle_account(Arc::clone(&manager), item));
tasks.spawn(handle_account(Arc::clone(&manager), item, stream_key));
}
if len > 0 {
debug!("Processed {} accounts", len);
Expand All @@ -42,18 +43,18 @@ pub fn account_worker<T: Messenger>(
Err(e) => {
error!("Error receiving from account stream: {}", e);
metric! {
statsd_count!("ingester.stream.receive_error", 1, "stream" => ACCOUNT_STREAM);
statsd_count!("ingester.stream.receive_error", 1, "stream" => stream_key);
}
}
}
while let Some(res) = tasks.join_next().await {
if let Ok(id) = res {
if let Some(id) = id {
let send = ack_channel.send((ACCOUNT_STREAM, id));
let send = ack_channel.send((stream_key, id));
if let Err(err) = send {
metric! {
error!("Account stream ack error: {}", err);
statsd_count!("ingester.stream.ack_error", 1, "stream" => ACCOUNT_STREAM);
statsd_count!("ingester.stream.ack_error", 1, "stream" => stream_key);
}
}
}
Expand All @@ -64,7 +65,7 @@ pub fn account_worker<T: Messenger>(
})
}

async fn handle_account(manager: Arc<ProgramTransformer>, item: RecvData) -> Option<String> {
async fn handle_account(manager: Arc<ProgramTransformer>, item: RecvData, stream_key: &'static str) -> Option<String> {
let id = item.id;
let mut ret_id = None;
let data = item.data;
Expand All @@ -78,13 +79,13 @@ async fn handle_account(manager: Arc<ProgramTransformer>, item: RecvData) -> Opt
let str_program_id =
bs58::encode(account_update.owner().unwrap().0.as_slice()).into_string();
metric! {
statsd_count!("ingester.seen", 1, "owner" => &str_program_id, "stream" => ACCOUNT_STREAM);
statsd_count!("ingester.seen", 1, "owner" => &str_program_id, "stream" => stream_key);
let seen_at = Utc::now();
statsd_time!(
"ingester.bus_ingest_time",
(seen_at.timestamp_millis() - account_update.seen_at()) as u64,
"owner" => &str_program_id,
"stream" => ACCOUNT_STREAM
"stream" => stream_key
);
}
let mut account = None;
Expand All @@ -95,7 +96,7 @@ async fn handle_account(manager: Arc<ProgramTransformer>, item: RecvData) -> Opt
let res = manager.handle_account_update(account_update).await;
let should_ack = capture_result(
id.clone(),
ACCOUNT_STREAM,
stream_key,
("owner", &str_program_id),
item.tries,
res,
Expand Down
8 changes: 4 additions & 4 deletions nft_ingester/src/backfiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use digital_asset_types::dao::backfill_items;
use flatbuffers::FlatBufferBuilder;
use futures::{stream::FuturesUnordered, StreamExt};
use log::{debug, error, info};
use plerkle_messenger::{Messenger, TRANSACTION_STREAM};
use plerkle_messenger::{Messenger, TRANSACTION_BACKFILL_STREAM};
use plerkle_serialization::serializer::seralize_encoded_transaction_with_status;

use sea_orm::{
Expand Down Expand Up @@ -257,9 +257,9 @@ impl<'a, T: Messenger> Backfiller<'a, T> {

// Instantiate messenger.
let mut messenger = T::new(config.get_messneger_client_config()).await.unwrap();
messenger.add_stream(TRANSACTION_STREAM).await.unwrap();
messenger.add_stream(TRANSACTION_BACKFILL_STREAM).await.unwrap();
messenger
.set_buffer_size(TRANSACTION_STREAM, 10_000_000)
.set_buffer_size(TRANSACTION_BACKFILL_STREAM, 10_000_000)
.await;

Self {
Expand Down Expand Up @@ -968,7 +968,7 @@ impl<'a, T: Messenger> Backfiller<'a, T> {
};
let builder = seralize_encoded_transaction_with_status(builder, tx_wrap)?;
self.messenger
.send(TRANSACTION_STREAM, builder.finished_data())
.send(TRANSACTION_BACKFILL_STREAM, builder.finished_data())
.await?;
}
drop(block_ref);
Expand Down
47 changes: 46 additions & 1 deletion nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use chrono::Duration;
use clap::{arg, command, value_parser};
use log::{error, info};
use plerkle_messenger::{
redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_STREAM, TRANSACTION_STREAM,
redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_STREAM, ACCOUNT_BACKFILL_STREAM, TRANSACTION_STREAM, TRANSACTION_BACKFILL_STREAM
};
use std::{path::PathBuf, time};
use tokio::{signal, task::JoinSet};
Expand Down Expand Up @@ -102,18 +102,35 @@ pub async fn main() -> Result<(), IngesterError> {
config.messenger_config.clone(),
ACCOUNT_STREAM,
)?;
let mut timer_backfiller_acc = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
ACCOUNT_BACKFILL_STREAM,
)?;
let mut timer_txn = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
TRANSACTION_STREAM,
)?;
let mut timer_backfiller_txn = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
TRANSACTION_BACKFILL_STREAM,
)?;


if let Some(t) = timer_acc.start::<RedisMessenger>().await {
tasks.spawn(t);
}
if let Some(t) = timer_backfiller_acc.start::<RedisMessenger>().await {
tasks.spawn(t);
}
if let Some(t) = timer_txn.start::<RedisMessenger>().await {
tasks.spawn(t);
}
if let Some(t) = timer_backfiller_txn.start::<RedisMessenger>().await {
tasks.spawn(t);
}

// Stream Consumers Setup -------------------------------------
if role == IngesterRole::Ingester || role == IngesterRole::All {
Expand All @@ -130,6 +147,20 @@ pub async fn main() -> Result<(), IngesterError> {
} else {
ConsumptionType::New
},
ACCOUNT_STREAM,
);

let _account_backfill = account_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
ACCOUNT_BACKFILL_STREAM,
);
}
for i in 0..config.get_transaction_stream_worker_count() {
Expand All @@ -143,6 +174,20 @@ pub async fn main() -> Result<(), IngesterError> {
} else {
ConsumptionType::New
},
TRANSACTION_STREAM,
);

let _txn_backfill = transaction_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
TRANSACTION_BACKFILL_STREAM,
);
}
}
Expand Down
23 changes: 12 additions & 11 deletions nft_ingester/src/transaction_notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use cadence_macros::{is_global_default_set, statsd_count, statsd_time};
use chrono::Utc;
use log::{debug, error};
use plerkle_messenger::{
ConsumptionType, Messenger, MessengerConfig, RecvData, TRANSACTION_STREAM,
ConsumptionType, Messenger, MessengerConfig, RecvData,
};
use plerkle_serialization::root_as_transaction_info;

Expand All @@ -24,19 +24,20 @@ pub fn transaction_worker<T: Messenger>(
bg_task_sender: UnboundedSender<TaskData>,
ack_channel: UnboundedSender<(&'static str, String)>,
consumption_type: ConsumptionType,
stream_key: &'static str,
) -> JoinHandle<()> {
tokio::spawn(async move {
let source = T::new(config).await;
if let Ok(mut msg) = source {
let manager = Arc::new(ProgramTransformer::new(pool, bg_task_sender));
loop {
let e = msg.recv(TRANSACTION_STREAM, consumption_type.clone()).await;
let e = msg.recv(stream_key, consumption_type.clone()).await;
let mut tasks = JoinSet::new();
match e {
Ok(data) => {
let len = data.len();
for item in data {
tasks.spawn(handle_transaction(Arc::clone(&manager), item));
tasks.spawn(handle_transaction(Arc::clone(&manager), item, stream_key));
}
if len > 0 {
debug!("Processed {} txns", len);
Expand All @@ -45,18 +46,18 @@ pub fn transaction_worker<T: Messenger>(
Err(e) => {
error!("Error receiving from txn stream: {}", e);
metric! {
statsd_count!("ingester.stream.receive_error", 1, "stream" => TRANSACTION_STREAM);
statsd_count!("ingester.stream.receive_error", 1, "stream" => stream_key);
}
}
}
while let Some(res) = tasks.join_next().await {
if let Ok(id) = res {
if let Some(id) = id {
let send = ack_channel.send((TRANSACTION_STREAM, id));
let send = ack_channel.send((stream_key, id));
if let Err(err) = send {
metric! {
error!("Txn stream ack error: {}", err);
statsd_count!("ingester.stream.ack_error", 1, "stream" => TRANSACTION_STREAM);
statsd_count!("ingester.stream.ack_error", 1, "stream" => stream_key);
}
}
}
Expand All @@ -67,11 +68,11 @@ pub fn transaction_worker<T: Messenger>(
})
}

async fn handle_transaction(manager: Arc<ProgramTransformer>, item: RecvData) -> Option<String> {
async fn handle_transaction(manager: Arc<ProgramTransformer>, item: RecvData, stream_key: &'static str) -> Option<String> {
let mut ret_id = None;
if item.tries > 0 {
metric! {
statsd_count!("ingester.stream_redelivery", 1, "stream" => TRANSACTION_STREAM);
statsd_count!("ingester.stream_redelivery", 1, "stream" => stream_key);
}
}
let id = item.id.to_string();
Expand All @@ -80,22 +81,22 @@ async fn handle_transaction(manager: Arc<ProgramTransformer>, item: RecvData) ->
let signature = tx.signature().unwrap_or("NO SIG");
debug!("Received transaction: {}", signature);
metric! {
statsd_count!("ingester.seen", 1, "stream" => TRANSACTION_STREAM);
statsd_count!("ingester.seen", 1, "stream" => stream_key);
}
let seen_at = Utc::now();
metric! {
statsd_time!(
"ingester.bus_ingest_time",
(seen_at.timestamp_millis() - tx.seen_at()) as u64,
"stream" => TRANSACTION_STREAM
"stream" => stream_key
);
}

let begin = Instant::now();
let res = manager.handle_transaction(&tx).await;
let should_ack = capture_result(
id.clone(),
TRANSACTION_STREAM,
stream_key,
("txn", "txn"),
item.tries,
res,
Expand Down
4 changes: 2 additions & 2 deletions tools/acc_forwarder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ futures = "0.3.28"
lazy_static = "1.4.0"
log = "0.4.17"
mpl-token-metadata = "1.8.3"
plerkle_messenger = { version = "1.5.2", features = ["redis"] }
plerkle_serialization = "1.5.2"
plerkle_serialization = { version = "1.5.2" }
plerkle_messenger = { git = "https://github.com/metaplex-foundation/digital-asset-validator-plugin.git", rev = "35a8801c", features = ['redis'] }
prometheus = "0.13.3"
reqwest = { version = "0.11", features = ["json"] }
serde_json = "1.0.81"
Expand Down
Loading

0 comments on commit 600a711

Please sign in to comment.