Skip to content

Commit

Permalink
bundle: add support for retrieving bundles from a block engine
Browse files Browse the repository at this point in the history
  • Loading branch information
mmcgee-jump committed Jan 8, 2025
1 parent 866a216 commit d5d4fa0
Show file tree
Hide file tree
Showing 69 changed files with 2,994 additions and 354 deletions.
7 changes: 7 additions & 0 deletions book/api/metrics-generated.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@
| quic_​pkt_​no_​key_​handshake | `counter` | Number of packets that failed decryption due to missing key. (handshake) |
| quic_​pkt_​no_​key_​app | `counter` | Number of packets that failed decryption due to missing key. (app data) |

## Bundle Tile
| Metric | Type | Description |
|--------|------|-------------|
| bundle_​transaction_​received | `counter` | Total count of transactions received, including transactions within bundles |
| bundle_​packet_​received | `counter` | Total count of packets received |
| bundle_​bundle_​received | `counter` | Total count of bundles received |

## Verify Tile
| Metric | Type | Description |
|--------|------|-------------|
Expand Down
47 changes: 40 additions & 7 deletions book/api/websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,37 @@ first connect by the `summary.tiles` message.

:::

### block_engine
Block engines are providers of additional transactions to the validator,
which are configurable by the operator. The validator may not be
configured to use any block engines, in which case no update will be
provided. For now, at most one block engine can be configured, and the
name and url will not change during the lifetime of the validator.

#### `block_engine.update`
| frequency | type | example |
|-----------------|---------------|-------- |
| *Once* + *Live* | `BlockEngine` | below |

::: details Example

```json
{
"name": "jito",
"url": "https://mainnet.block-engine.jito.wtf",
"status": "connected"
}
```

:::

**`BlockEngine`**
| Field | Type | Description |
|------------|----------|-------------|
| name | `string` | A short, descriptive name for the block engine |
| url | `string` | An HTTP URL for the block engine which the validator client connects to |
| status | `string` | One of `disconnected`, `connecting`, or `connected` indicating the state of the connection to the block engine |

### epoch
Information about an epoch. Epochs are never modified once they have
been determined, so the topic only publishes a continuous stream of new
Expand Down Expand Up @@ -976,7 +1007,8 @@ are skipped on the currently active fork.
"retained": 0,
"quic": 28159,
"udp": 14323,
"gossip": 4659
"gossip": 4646,
"block_engine": 13
},
"out": {
"net_overrun": 0,
Expand Down Expand Up @@ -1077,12 +1109,13 @@ are skipped on the currently active fork.
| out | `TxnWaterfallOut` | Transactions sent out of the waterfall |

**`TxnWaterfallIn`**
| Field | Type | Description |
|----------|----------|-------------|
| retained | `number` | Transactions were received during or prior to an earlier leader slot, but weren't executed and were retained inside the validator to potentially be included in a later slot |
| quic | `number` | A QUIC transaction was received. The stream does not have to successfully complete |
| udp | `number` | A non-QUIC UDP transaction was received |
| gossip | `number` | A gossipped vote transaction was received from a gossip peer |
| Field | Type | Description |
|--------------|----------|-------------|
| retained | `number` | Transactions were received during or prior to an earlier leader slot, but weren't executed and were retained inside the validator to potentially be included in a later slot |
| quic | `number` | A QUIC transaction was received. The stream does not have to successfully complete |
| udp | `number` | A non-QUIC UDP transaction was received |
| gossip | `number` | A gossipped vote transaction was received from a gossip peer |
| block_engine | `number` | A transaction received from a block engine, for example Jito. The transaction might or might not have been part of a bundle |

**`TxnWaterfallOut`**
| Field | Type | Description |
Expand Down
33 changes: 33 additions & 0 deletions plugin/bundle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "firedancer-plugin-bundle"
version = "0.1.0"
edition = "2021"

[lib]
crate-type = ["staticlib"]

[dependencies]
tonic = { version = "0.12.2", features = ["tls-roots", "tls", "tls-webpki-roots"] }
prost = "0.13.3"
prost-types = "0.13.3"
log = "0.4.22"
tokio = "1.40.0"
tokio-stream = "0.1"
futures = "0.3.30"
chrono = "0.4.38"
thiserror = "1.0.64"
bs58 = "0.5.1"

[build-dependencies]
tonic-build = "0.12.2"
protobuf-src = "2.1.0"
prost-types = "0.13.3"

[dev-dependencies]
env_logger = "0.11.5"
ed25519-dalek = "2.1.1"

[profile.release-with-debug]
inherits = "release"
debug = true
split-debuginfo = "packed"
38 changes: 38 additions & 0 deletions plugin/bundle/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use tonic_build::configure;

fn main() -> Result<(), std::io::Error> {
const PROTOC_ENVAR: &str = "PROTOC";
if std::env::var(PROTOC_ENVAR).is_err() {
#[cfg(not(windows))]
std::env::set_var(PROTOC_ENVAR, protobuf_src::protoc());
}

let proto_base_path = std::path::PathBuf::from("protos");
let proto_files = [
"auth.proto",
"block_engine.proto",
"bundle.proto",
"packet.proto",
"relayer.proto",
"shared.proto",
];
let mut protos = Vec::new();
for proto_file in &proto_files {
let proto = proto_base_path.join(proto_file);
println!("cargo:rerun-if-changed={}", proto.display());
protos.push(proto);
}

configure()
.build_client(true)
.build_server(false)
.type_attribute(
"TransactionErrorType",
"#[cfg_attr(test, derive(enum_iterator::Sequence))]",
)
.type_attribute(
"InstructionErrorType",
"#[cfg_attr(test, derive(enum_iterator::Sequence))]",
)
.compile(&protos, &[proto_base_path])
}
76 changes: 76 additions & 0 deletions plugin/bundle/protos/auth.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
syntax = "proto3";

package auth;

import "google/protobuf/timestamp.proto";

enum Role {
RELAYER = 0;
SEARCHER = 1;
VALIDATOR = 2;
SHREDSTREAM_SUBSCRIBER = 3;
}

message GenerateAuthChallengeRequest {
/// Role the client is attempting to generate tokens for.
Role role = 1;

/// Client's 32 byte pubkey.
bytes pubkey = 2;
}

message GenerateAuthChallengeResponse {
string challenge = 1;
}

message GenerateAuthTokensRequest {
/// The pre-signed challenge.
string challenge = 1;

/// The signing keypair's corresponding 32 byte pubkey.
bytes client_pubkey = 2;

/// The 64 byte signature of the challenge signed by the client's private key. The private key must correspond to
// the pubkey passed in the [GenerateAuthChallenge] method. The client is expected to sign the challenge token
// prepended with their pubkey. For example sign(pubkey, challenge).
bytes signed_challenge = 3;
}

message Token {
/// The token.
string value = 1;

/// When the token will expire.
google.protobuf.Timestamp expires_at_utc = 2;
}

message GenerateAuthTokensResponse {
/// The token granting access to resources.
Token access_token = 1;

/// The token used to refresh the access_token. This has a longer TTL than the access_token.
Token refresh_token = 2;
}

message RefreshAccessTokenRequest {
/// Non-expired refresh token obtained from the [GenerateAuthTokens] method.
string refresh_token = 1;
}

message RefreshAccessTokenResponse {
/// Fresh access_token.
Token access_token = 1;
}

/// This service is responsible for issuing auth tokens to clients for API access.
service AuthService {
/// Returns a challenge, client is expected to sign this challenge with an appropriate keypair in order to obtain access tokens.
rpc GenerateAuthChallenge(GenerateAuthChallengeRequest) returns (GenerateAuthChallengeResponse) {}

/// Provides the client with the initial pair of auth tokens for API access.
rpc GenerateAuthTokens(GenerateAuthTokensRequest) returns (GenerateAuthTokensResponse) {}

/// Call this method with a non-expired refresh token to obtain a new access token.
rpc RefreshAccessToken(RefreshAccessTokenRequest) returns (RefreshAccessTokenResponse) {}
}

98 changes: 98 additions & 0 deletions plugin/bundle/protos/block_engine.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
syntax = "proto3";

import "packet.proto";
import "shared.proto";
import "bundle.proto";

package block_engine;

message SubscribePacketsRequest {}
message SubscribePacketsResponse {
shared.Header header = 1;
packet.PacketBatch batch = 2;
}

message SubscribeBundlesRequest {}
message SubscribeBundlesResponse {
repeated bundle.BundleUuid bundles = 1;
}

message BlockBuilderFeeInfoRequest {}
message BlockBuilderFeeInfoResponse {
string pubkey = 1;

// commission (0-100)
uint64 commission = 2;
}

message AccountsOfInterest {
// use * for all accounts
repeated string accounts = 1;
}

message AccountsOfInterestRequest {}
message AccountsOfInterestUpdate {
repeated string accounts = 1;
}

message ProgramsOfInterestRequest {}
message ProgramsOfInterestUpdate {
repeated string programs = 1;
}

// A series of packets with an expiration attached to them.
// The header contains a timestamp for when this packet was generated.
// The expiry is how long the packet batches have before they expire and are forwarded to the validator.
// This provides a more censorship resistant method to MEV than block engines receiving packets directly.
message ExpiringPacketBatch {
shared.Header header = 1;
packet.PacketBatch batch = 2;
uint32 expiry_ms = 3;
}

// Packets and heartbeats are sent over the same stream.
// ExpiringPacketBatches have an expiration attached to them so the block engine can track
// how long it has until the relayer forwards the packets to the validator.
// Heartbeats contain a timestamp from the system and is used as a simple and naive time-sync mechanism
// so the block engine has some idea on how far their clocks are apart.
message PacketBatchUpdate {
oneof msg {
ExpiringPacketBatch batches = 1;
shared.Heartbeat heartbeat = 2;
}
}

message StartExpiringPacketStreamResponse {
shared.Heartbeat heartbeat = 1;
}

/// Validators can connect to Block Engines to receive packets and bundles.
service BlockEngineValidator {
/// Validators can subscribe to the block engine to receive a stream of packets
rpc SubscribePackets (SubscribePacketsRequest) returns (stream SubscribePacketsResponse) {}

/// Validators can subscribe to the block engine to receive a stream of simulated and profitable bundles
rpc SubscribeBundles (SubscribeBundlesRequest) returns (stream SubscribeBundlesResponse) {}

// Block builders can optionally collect fees. This returns fee information if a block builder wants to
// collect one.
rpc GetBlockBuilderFeeInfo (BlockBuilderFeeInfoRequest) returns (BlockBuilderFeeInfoResponse) {}
}

/// Relayers can forward packets to Block Engines.
/// Block Engines provide an AccountsOfInterest field to only send transactions that are of interest.
service BlockEngineRelayer {
/// The block engine feeds accounts of interest (AOI) updates to the relayer periodically.
/// For all transactions the relayer receives, it forwards transactions to the block engine which write-lock
/// any of the accounts in the AOI.
rpc SubscribeAccountsOfInterest (AccountsOfInterestRequest) returns (stream AccountsOfInterestUpdate) {}

rpc SubscribeProgramsOfInterest (ProgramsOfInterestRequest) returns (stream ProgramsOfInterestUpdate) {}

// Validators can subscribe to packets from the relayer and receive a multiplexed signal that contains a mixture
// of packets and heartbeats.
// NOTE: This is a bi-directional stream due to a bug with how Envoy handles half closed client-side streams.
// The issue is being tracked here: https://github.com/envoyproxy/envoy/issues/22748. In the meantime, the
// server will stream heartbeats to clients at some reasonable cadence.
rpc StartExpiringPacketStream (stream PacketBatchUpdate) returns (stream StartExpiringPacketStreamResponse) {}
}
Loading

0 comments on commit d5d4fa0

Please sign in to comment.