Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc): introduce FilterIter #1614

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/bitcoind_rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ bdk_chain = { path = "../chain" }
default = ["std"]
std = ["bitcoin/std", "bdk_core/std"]
serde = ["bitcoin/serde", "bdk_core/serde"]

[[example]]
name = "filter_iter"
required-features = ["std"]
106 changes: 106 additions & 0 deletions crates/bitcoind_rpc/examples/filter_iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#![allow(clippy::print_stdout)]
use std::time::Instant;

use anyhow::Context;
use bdk_bitcoind_rpc::bip158::{Event, EventInner, FilterIter};
use bdk_chain::bitcoin::{constants::genesis_block, secp256k1::Secp256k1, Network};
use bdk_chain::indexer::keychain_txout::KeychainTxOutIndex;
use bdk_chain::local_chain::LocalChain;
use bdk_chain::miniscript::Descriptor;
use bdk_chain::{BlockId, ConfirmationBlockTime, IndexedTxGraph, SpkIterator};
use bdk_testenv::anyhow;

// This example shows how BDK chain and tx-graph structures are updated using compact
// filters syncing. Assumes a connection can be made to a bitcoin node via environment
// variables `RPC_URL` and `RPC_COOKIE`.

// Usage: `cargo run -p bdk_bitcoind_rpc --example filter_iter`

const EXTERNAL: &str = "tr([7d94197e]tprv8ZgxMBicQKsPe1chHGzaa84k1inY2nAXUL8iPSyWESPrEst4E5oCFXhPATqj5fvw34LDknJz7rtXyEC4fKoXryUdc9q87pTTzfQyv61cKdE/86'/1'/0'/0/*)#uswl2jj7";
const INTERNAL: &str = "tr([7d94197e]tprv8ZgxMBicQKsPe1chHGzaa84k1inY2nAXUL8iPSyWESPrEst4E5oCFXhPATqj5fvw34LDknJz7rtXyEC4fKoXryUdc9q87pTTzfQyv61cKdE/86'/1'/0'/1/*)#dyt7h8zx";
const SPK_COUNT: u32 = 25;
const NETWORK: Network = Network::Signet;

const START_HEIGHT: u32 = 170_000;
const START_HASH: &str = "00000041c812a89f084f633e4cf47e819a2f6b1c0a15162355a930410522c99d";

fn main() -> anyhow::Result<()> {
// Setup receiving chain and graph structures.
let secp = Secp256k1::new();
let (descriptor, _) = Descriptor::parse_descriptor(&secp, EXTERNAL)?;
let (change_descriptor, _) = Descriptor::parse_descriptor(&secp, INTERNAL)?;
let (mut chain, _) = LocalChain::from_genesis_hash(genesis_block(NETWORK).block_hash());
let mut graph = IndexedTxGraph::<ConfirmationBlockTime, KeychainTxOutIndex<&str>>::new({
let mut index = KeychainTxOutIndex::default();
index.insert_descriptor("external", descriptor.clone())?;
index.insert_descriptor("internal", change_descriptor.clone())?;
index
});

// Assume a minimum birthday height
let block = BlockId {
height: START_HEIGHT,
hash: START_HASH.parse()?,
};
let _ = chain.insert_block(block)?;

// Configure RPC client
let url = std::env::var("RPC_URL").context("must set RPC_URL")?;
let cookie = std::env::var("RPC_COOKIE").context("must set RPC_COOKIE")?;
let rpc_client =
bitcoincore_rpc::Client::new(&url, bitcoincore_rpc::Auth::CookieFile(cookie.into()))?;

// Initialize block emitter
let cp = chain.tip();
let start_height = cp.height();
let mut emitter = FilterIter::new_with_checkpoint(&rpc_client, cp);
for (_, desc) in graph.index.keychains() {
let spks = SpkIterator::new_with_range(desc, 0..SPK_COUNT).map(|(_, spk)| spk);
emitter.add_spks(spks);
}

let start = Instant::now();

// Sync
if let Some(tip) = emitter.get_tip()? {
let blocks_to_scan = tip.height - start_height;

for event in emitter.by_ref() {
let event = event?;
let curr = event.height();
// apply relevant blocks
if let Event::Block(EventInner { height, ref block }) = event {
let _ = graph.apply_block_relevant(block, height);
println!("Matched block {}", curr);
}
if curr % 1000 == 0 {
let progress = (curr - start_height) as f32 / blocks_to_scan as f32;
println!("[{:.2}%]", progress * 100.0);
}
}
// update chain
if let Some(cp) = emitter.chain_update() {
let _ = chain.apply_update(cp)?;
}
}

println!("\ntook: {}s", start.elapsed().as_secs());
println!("Local tip: {}", chain.tip().height());
let unspent: Vec<_> = graph
.graph()
.filter_chain_unspents(
&chain,
chain.tip().block_id(),
graph.index.outpoints().clone(),
)
.collect();
if !unspent.is_empty() {
println!("\nUnspent");
for (index, utxo) in unspent {
// (k, index) | value | outpoint |
println!("{:?} | {} | {}", index, utxo.txout.value, utxo.outpoint);
}
}

Ok(())
}
277 changes: 277 additions & 0 deletions crates/bitcoind_rpc/src/bip158.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
//! Compact block filters sync over RPC. For more details refer to [BIP157][0].
//!
//! This module is home to [`FilterIter`], a structure that returns bitcoin blocks by matching
//! a list of script pubkeys against a [BIP158][1] [`BlockFilter`].
//!
//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki

use bdk_core::collections::BTreeMap;
use core::fmt;

use bdk_core::bitcoin;
use bdk_core::{BlockId, CheckPoint};
use bitcoin::{
bip158::{self, BlockFilter},
Block, BlockHash, ScriptBuf,
};
use bitcoincore_rpc;
use bitcoincore_rpc::RpcApi;

/// Block height
type Height = u32;

/// Type that generates block [`Event`]s by matching a list of script pubkeys against a
/// [`BlockFilter`].
#[derive(Debug)]
pub struct FilterIter<'c, C> {
// RPC client
client: &'c C,
// SPK inventory
spks: Vec<ScriptBuf>,
// local cp
cp: Option<CheckPoint>,
// blocks map
blocks: BTreeMap<Height, BlockHash>,
// next filter
next_filter: Option<NextFilter>,
// best height counter
height: Height,
// stop height
stop: Height,
}

impl<'c, C: RpcApi> FilterIter<'c, C> {
/// Construct [`FilterIter`] from a given `client` and start `height`.
pub fn new_with_height(client: &'c C, height: u32) -> Self {
Self {
client,
spks: vec![],
cp: None,
blocks: BTreeMap::new(),
next_filter: None,
height,
stop: 0,
}
}

/// Construct [`FilterIter`] from a given `client` and [`CheckPoint`].
pub fn new_with_checkpoint(client: &'c C, cp: CheckPoint) -> Self {
let mut filter_iter = Self::new_with_height(client, cp.height());
filter_iter.cp = Some(cp);
filter_iter
}

/// Extends `self` with an iterator of spks.
pub fn add_spks(&mut self, spks: impl IntoIterator<Item = ScriptBuf>) {
self.spks.extend(spks)
}

/// Add spk to the list of spks to scan with.
pub fn add_spk(&mut self, spk: ScriptBuf) {
self.spks.push(spk);
}

/// Get the next filter and increment the current best height.
///
/// Returns `Ok(None)` when the stop height is exceeded.
fn next_filter(&mut self) -> Result<Option<NextFilter>, Error> {
if self.height > self.stop {
return Ok(None);
}
let height = self.height;
let hash = match self.blocks.get(&height) {
Some(h) => *h,
None => self.client.get_block_hash(height as u64)?,
};
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
let filter = BlockFilter::new(&filter_bytes);
self.height += 1;
Ok(Some((BlockId { height, hash }, filter)))
}

/// Get the remote tip.
///
/// Returns `None` if the remote height is not strictly greater than the height of this
/// [`FilterIter`].
pub fn get_tip(&mut self) -> Result<Option<BlockId>, Error> {
let tip_hash = self.client.get_best_block_hash()?;
let mut header = self.client.get_block_header_info(&tip_hash)?;
let tip_height = header.height as u32;
if self.height >= tip_height {
// nothing to do
return Ok(None);
}
self.blocks.insert(tip_height, tip_hash);

// if we have a checkpoint we use a lookback of ten blocks
// to ensure consistency of the local chain
if let Some(cp) = self.cp.as_ref() {
// adjust start height to point of agreement + 1
let base = self.find_base_with(cp.clone())?;
self.height = base.height + 1;

for _ in 0..9 {
let hash = match header.previous_block_hash {
Some(hash) => hash,
None => break,
};
header = self.client.get_block_header_info(&hash)?;
let height = header.height as u32;
if height < self.height {
break;
}
self.blocks.insert(height, hash);
}
}

self.stop = tip_height;

// get the first filter
self.next_filter = self.next_filter()?;

Ok(Some(BlockId {
height: tip_height,
hash: tip_hash,
}))
}
}

/// Alias for a compact filter and associated block id.
type NextFilter = (BlockId, BlockFilter);

/// Event inner type
#[derive(Debug, Clone)]
pub struct EventInner {
/// Height
pub height: Height,
/// Block
pub block: Block,
}

/// Kind of event produced by [`FilterIter`].
#[derive(Debug, Clone)]
pub enum Event {
/// Block
Block(EventInner),
/// No match
NoMatch(Height),
}

impl Event {
/// Whether this event contains a matching block.
pub fn is_match(&self) -> bool {
matches!(self, Event::Block(_))
}

/// Get the height of this event.
pub fn height(&self) -> Height {
match self {
Self::Block(EventInner { height, .. }) => *height,
Self::NoMatch(h) => *h,
}
}
}

impl<C: RpcApi> Iterator for FilterIter<'_, C> {
type Item = Result<Event, Error>;

fn next(&mut self) -> Option<Self::Item> {
let (block, filter) = self.next_filter.clone()?;

(|| -> Result<_, Error> {
// if the next filter matches any of our watched spks, get the block
// and return it, inserting relevant block ids along the way
let height = block.height;
let hash = block.hash;

let result = if self.spks.is_empty() {
Err(Error::NoScripts)
} else if filter
.match_any(&hash, self.spks.iter().map(|script| script.as_bytes()))
.map_err(Error::Bip158)?
{
let block = self.client.get_block(&hash)?;
self.blocks.insert(height, hash);
let inner = EventInner { height, block };
Ok(Some(Event::Block(inner)))
} else {
Ok(Some(Event::NoMatch(height)))
};

self.next_filter = self.next_filter()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt next_iter field was redundant, as it is a result of the state of other fields already kept by FilterIter, and not something accessed by the other struct methods besides next from Iterator. If you agree, I came up with the following:

diff --git a/crates/bitcoind_rpc/src/bip158.rs b/crates/bitcoind_rpc/src/bip158.rs
index 2c5bc114..5419716b 100644
--- a/crates/bitcoind_rpc/src/bip158.rs
+++ b/crates/bitcoind_rpc/src/bip158.rs
@@ -33,8 +33,6 @@ pub struct FilterIter<'c, C> {
     cp: Option<CheckPoint>,
     // blocks map
     blocks: BTreeMap<Height, BlockHash>,
-    // next filter
-    next_filter: Option<NextFilter>,
     // best height counter
     height: Height,
     // stop height
@@ -49,7 +47,6 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
             spks: vec![],
             cp: None,
             blocks: BTreeMap::new(),
-            next_filter: None,
             height,
             stop: 0,
         }
@@ -127,9 +124,6 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
 
         self.stop = tip_height;
 
-        // get the first filter
-        self.next_filter = self.next_filter()?;
-
         Ok(Some(BlockId {
             height: tip_height,
             hash: tip_hash,
@@ -177,31 +171,27 @@ impl<C: RpcApi> Iterator for FilterIter<'_, C> {
     type Item = Result<Event, Error>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        let (block, filter) = self.next_filter.clone()?;
-
         (|| -> Result<_, Error> {
             // if the next filter matches any of our watched spks, get the block
             // and return it, inserting relevant block ids along the way
-            let height = block.height;
-            let hash = block.hash;
-
-            let result = if self.spks.is_empty() {
-                Err(Error::NoScripts)
-            } else if filter
-                .match_any(&hash, self.spks.iter().map(|script| script.as_bytes()))
-                .map_err(Error::Bip158)?
-            {
-                let block = self.client.get_block(&hash)?;
-                self.blocks.insert(height, hash);
-                let inner = EventInner { height, block };
-                Ok(Some(Event::Block(inner)))
-            } else {
-                Ok(Some(Event::NoMatch(height)))
-            };
-
-            self.next_filter = self.next_filter()?;
-
-            result
+            self.next_filter()?.map_or(Ok(None), |(block, filter)| {
+                let height = block.height;
+                let hash = block.hash;
+
+                if self.spks.is_empty() {
+                    Err(Error::NoScripts)
+                } else if filter
+                    .match_any(&hash, self.spks.iter().map(|script| script.as_bytes()))
+                    .map_err(Error::Bip158)?
+                {
+                    let block = self.client.get_block(&hash)?;
+                    self.blocks.insert(height, hash);
+                    let inner = EventInner { height, block };
+                    Ok(Some(Event::Block(inner)))
+                } else {
+                    Ok(Some(Event::NoMatch(height)))
+                }
+            })
         })()
         .transpose()
     }


result
})()
.transpose()
}
}

impl<C: RpcApi> FilterIter<'_, C> {
/// Returns the point of agreement between `self` and the given `cp`.
fn find_base_with(&mut self, mut cp: CheckPoint) -> Result<BlockId, Error> {
loop {
let height = cp.height();
let fetched_hash = match self.blocks.get(&height) {
Some(hash) => *hash,
None if height == 0 => cp.hash(),
_ => self.client.get_block_hash(height as _)?,
};
if cp.hash() == fetched_hash {
// ensure this block also exists in self
self.blocks.insert(height, cp.hash());
return Ok(cp.block_id());
}
// remember conflicts
self.blocks.insert(height, fetched_hash);
cp = cp.prev().expect("must break before genesis");
}
}

/// Returns a chain update from the newly scanned blocks.
///
/// Returns `None` if this [`FilterIter`] was not constructed using a [`CheckPoint`], or
/// if no blocks have been fetched for example by using [`get_tip`](Self::get_tip).
pub fn chain_update(&mut self) -> Option<CheckPoint> {
if self.cp.is_none() || self.blocks.is_empty() {
return None;
}

// note: to connect with the local chain we must guarantee that `self.blocks.first()`
// is also the point of agreement with `self.cp`.
Some(
CheckPoint::from_block_ids(self.blocks.iter().map(BlockId::from))
.expect("blocks must be in order"),
)
}
}

/// Errors that may occur during a compact filters sync.
#[derive(Debug)]
pub enum Error {
/// bitcoin bip158 error
Bip158(bip158::Error),
/// attempted to scan blocks without any script pubkeys
NoScripts,
/// `bitcoincore_rpc` error
Rpc(bitcoincore_rpc::Error),
}

impl From<bitcoincore_rpc::Error> for Error {
fn from(e: bitcoincore_rpc::Error) -> Self {
Self::Rpc(e)
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Bip158(e) => e.fmt(f),
Self::NoScripts => write!(f, "no script pubkeys were provided to match with"),
Self::Rpc(e) => e.fmt(f),
}
}
}

#[cfg(feature = "std")]
impl std::error::Error for Error {}
Loading
Loading