Skip to content

Commit

Permalink
Merge branch 'staging' into 999-fix-batcher-sends-proof-even-if-eip71…
Browse files Browse the repository at this point in the history
…2-signature-contents-are-incompatible
  • Loading branch information
uri-99 committed Sep 24, 2024
2 parents a7b92e0 + 07e6f34 commit 90ee57c
Show file tree
Hide file tree
Showing 31 changed files with 7,704 additions and 2,626 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build-go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ jobs:
run: make build_halo2_ipa_linux
- name: Build Merkle Tree bindings
run: make build_merkle_tree_linux
- name: Build Old Merkle Tree bindings
run: make build_merkle_tree_linux_old
- name: Build operator
run: go build operator/cmd/main.go
- name: Build aggregator
Expand Down
26 changes: 25 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ OS := $(shell uname -s)
CONFIG_FILE?=config-files/config.yaml
AGG_CONFIG_FILE?=config-files/config-aggregator.yaml

OPERATOR_VERSION=v0.5.2
OPERATOR_VERSION=v0.7.2

ifeq ($(OS),Linux)
BUILD_ALL_FFI = $(MAKE) build_all_ffi_linux
Expand Down Expand Up @@ -540,15 +540,29 @@ build_merkle_tree_macos:
@cp operator/merkle_tree/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.dylib operator/merkle_tree/lib/libmerkle_tree.dylib
@cp operator/merkle_tree/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.a operator/merkle_tree/lib/libmerkle_tree.a

build_merkle_tree_macos_old:
@cd operator/merkle_tree_old/lib && cargo build $(RELEASE_FLAG)
@cp operator/merkle_tree_old/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.dylib operator/merkle_tree_old/lib/libmerkle_tree.dylib
@cp operator/merkle_tree_old/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.a operator/merkle_tree_old/lib/libmerkle_tree.a

build_merkle_tree_linux:
@cd operator/merkle_tree/lib && cargo build $(RELEASE_FLAG)
@cp operator/merkle_tree/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.so operator/merkle_tree/lib/libmerkle_tree.so
@cp operator/merkle_tree/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.a operator/merkle_tree/lib/libmerkle_tree.a

build_merkle_tree_linux_old:
@cd operator/merkle_tree_old/lib && cargo build $(RELEASE_FLAG)
@cp operator/merkle_tree_old/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.so operator/merkle_tree_old/lib/libmerkle_tree.so
@cp operator/merkle_tree_old/lib/target/$(TARGET_REL_PATH)/libmerkle_tree.a operator/merkle_tree_old/lib/libmerkle_tree.a

test_merkle_tree_rust_ffi:
@echo "Testing Merkle Tree Rust FFI source code..."
@cd operator/merkle_tree/lib && RUST_MIN_STACK=83886080 cargo t --release

test_merkle_tree_rust_ffi_old:
@echo "Testing Old Merkle Tree Rust FFI source code..."
@cd operator/merkle_tree_old/lib && RUST_MIN_STACK=83886080 cargo t --release

test_merkle_tree_go_bindings_macos: build_merkle_tree_macos
@echo "Testing Merkle Tree Go bindings..."
go test ./operator/merkle_tree/... -v
Expand All @@ -557,6 +571,14 @@ test_merkle_tree_go_bindings_linux: build_merkle_tree_linux
@echo "Testing Merkle Tree Go bindings..."
go test ./operator/merkle_tree/... -v

test_merkle_tree_old_go_bindings_macos: build_merkle_tree_macos_old
@echo "Testing Old Merkle Tree Go bindings..."
go test ./operator/merkle_tree_old/... -v

test_merkle_tree_go_bindings_linux_old: build_merkle_tree_linux_old
@echo "Testing Merkle Tree Go bindings..."
go test ./operator/merkle_tree_old/... -v

__HALO2_KZG_FFI__: ##
build_halo2_kzg_macos:
@cd operator/halo2kzg/lib && cargo build $(RELEASE_FLAG)
Expand Down Expand Up @@ -631,6 +653,7 @@ build_all_ffi_macos: ## Build all FFIs for macOS
@$(MAKE) build_sp1_macos
@$(MAKE) build_risc_zero_macos
@$(MAKE) build_merkle_tree_macos
@$(MAKE) build_merkle_tree_macos_old
@$(MAKE) build_halo2_ipa_macos
@$(MAKE) build_halo2_kzg_macos
@echo "All macOS FFIs built successfully."
Expand All @@ -640,6 +663,7 @@ build_all_ffi_linux: ## Build all FFIs for Linux
@$(MAKE) build_sp1_linux
@$(MAKE) build_risc_zero_linux
@$(MAKE) build_merkle_tree_linux
@$(MAKE) build_merkle_tree_linux_old
@$(MAKE) build_halo2_ipa_linux
@$(MAKE) build_halo2_kzg_linux
@echo "All Linux FFIs built successfully."
Expand Down
5 changes: 1 addition & 4 deletions aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ func aggregatorMain(ctx *cli.Context) error {
}()

err = aggregator.Start(context.Background())
if err != nil {
return err
}

return nil
return err
}
5 changes: 1 addition & 4 deletions aggregator/internal/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,8 @@ func (agg *Aggregator) ServeOperators() error {
agg.AggregatorConfig.Aggregator.ServerIpPortAddress)

err = http.ListenAndServe(agg.AggregatorConfig.Aggregator.ServerIpPortAddress, nil)
if err != nil {
return err
}

return nil
return err
}

// Aggregator Methods
Expand Down
2 changes: 1 addition & 1 deletion aggregator/internal/pkg/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (agg *Aggregator) SubscribeToNewTasks() error {
func (agg *Aggregator) subscribeToNewTasks() error {
var err error

agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasks(agg.NewBatchChan)
agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan)

if err != nil {
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err)
Expand Down
153 changes: 50 additions & 103 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use config::NonPayingConfig;
use dotenv::dotenv;
use ethers::contract::ContractError;
use ethers::signers::Signer;
use priority_queue::PriorityQueue;
use serde::Serialize;

use std::collections::hash_map::Entry;
Expand Down Expand Up @@ -35,7 +34,7 @@ use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{Mutex, RwLock};
use tokio_tungstenite::tungstenite::{Error, Message};
use tokio_tungstenite::WebSocketStream;
use types::batch_queue::{BatchQueue, BatchQueueEntry, BatchQueueEntryPriority};
use types::batch_queue::{self, BatchQueue, BatchQueueEntry, BatchQueueEntryPriority};
use types::errors::{BatcherError, BatcherSendError};

use crate::config::{ConfigFromYaml, ContractDeploymentOutput};
Expand All @@ -52,10 +51,11 @@ mod zk_utils;

const AGGREGATOR_GAS_COST: u128 = 400_000;
const BATCHER_SUBMISSION_BASE_GAS_COST: u128 = 125_000;
const ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF: u128 = 13_000;
const CONSTANT_GAS_COST: u128 = ((AGGREGATOR_GAS_COST * DEFAULT_AGGREGATOR_FEE_MULTIPLIER)
/ DEFAULT_AGGREGATOR_FEE_DIVIDER)
+ BATCHER_SUBMISSION_BASE_GAS_COST;
pub(crate) const ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF: u128 = 13_000;
pub(crate) const CONSTANT_GAS_COST: u128 =
((AGGREGATOR_GAS_COST * DEFAULT_AGGREGATOR_FEE_MULTIPLIER) / DEFAULT_AGGREGATOR_FEE_DIVIDER)
+ BATCHER_SUBMISSION_BASE_GAS_COST;

const DEFAULT_MAX_FEE_PER_PROOF: u128 = ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF * 100_000_000_000; // gas_price = 100 Gwei = 0.0000001 ether (high gas price)
const MIN_FEE_PER_PROOF: u128 = ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF * 100_000_000; // gas_price = 0.1 Gwei = 0.0000000001 ether (low gas price)
const RESPOND_TO_TASK_FEE_LIMIT_MULTIPLIER: u128 = 5; // to set the respondToTaskFeeLimit variable higher than fee_for_aggregator
Expand Down Expand Up @@ -731,14 +731,20 @@ impl Batcher {

// close old sink and replace with new one
{
let mut old_sink = replacement_entry.messaging_sink.write().await;
if let Err(e) = old_sink.close().await {
// we dont want to exit here, just log the error
warn!("Error closing sink: {:?}", e);
}
if let Some(messaging_sink) = replacement_entry.messaging_sink {
let mut old_sink = messaging_sink.write().await;
if let Err(e) = old_sink.close().await {
// we dont want to exit here, just log the error
warn!("Error closing sink: {:?}", e);
}
} else {
warn!(
"Old websocket sink was empty. This should only happen in testing environments"
)
};
}
replacement_entry.messaging_sink = ws_conn_sink.clone();

replacement_entry.messaging_sink = Some(ws_conn_sink.clone());
if let Some(msg) = batch_state.validate_and_increment_max_fee(replacement_entry) {
warn!("Invalid max fee");
send_message(ws_conn_sink.clone(), msg).await;
Expand Down Expand Up @@ -841,97 +847,27 @@ impl Batcher {
// Set the batch posting flag to true
*batch_posting = true;

let mut batch_queue_copy = batch_state.batch_queue.clone();

match self.try_build_batch(&mut batch_queue_copy, gas_price) {
Some(finalized_batch) => {
let batch_queue_copy = batch_state.batch_queue.clone();
match batch_queue::try_build_batch(batch_queue_copy, gas_price, self.max_batch_size) {
Ok((resulting_batch_queue, finalized_batch)) => {
// Set the batch queue to batch queue copy
batch_state.batch_queue = batch_queue_copy;
batch_state.batch_queue = resulting_batch_queue;
batch_state.update_user_proofs_in_batch_and_min_fee();

Some(finalized_batch)
}
None => {
Err(BatcherError::BatchCostTooHigh) => {
// We cant post a batch since users are not willing to pay the needed fee, wait for more proofs
info!("No working batch found. Waiting for more proofs...");
*batch_posting = false;
None
}
}
}

/// Tries to build a batch from the current batch queue.
/// The function iterates over the batch queue and tries to build a batch that satisfies the gas price
/// and the max_fee set by the users.
/// If a working batch is found, the function tries to make it as big as possible by adding more proofs,
/// until a user is not willing to pay the required fee.
/// The extra check is that the batch size does not surpass the maximum batch size.
/// Note that the batch queue is sorted descending by the max_fee set by the users.
/// We use a copy of the batch queue because we might not find a working batch,
/// and we want to keep the original batch queue intact.
/// Returns Some(working_batch) if found, None otherwise.
fn try_build_batch(
&self,
batch_queue_copy: &mut PriorityQueue<BatchQueueEntry, BatchQueueEntryPriority>,
gas_price: U256,
) -> Option<Vec<BatchQueueEntry>> {
let mut finalized_batch = vec![];
let mut finalized_batch_size = 2; // at most two extra bytes for cbor encoding array markers
let mut finalized_batch_works = false;

while let Some((entry, _)) = batch_queue_copy.peek() {
let serialized_vd_size =
match cbor_serialize(&entry.nonced_verification_data.verification_data) {
Ok(val) => val.len(),
Err(e) => {
warn!("Serialization error: {:?}", e);
break;
}
};

if finalized_batch_size + serialized_vd_size > self.max_batch_size {
break;
}

let num_proofs = finalized_batch.len() + 1;

let gas_per_proof = (CONSTANT_GAS_COST
+ ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF * num_proofs as u128)
/ num_proofs as u128;

let fee_per_proof = U256::from(gas_per_proof) * gas_price;

debug!(
"Validating that batch submission fee {} is less than max fee {} for sender {}",
fee_per_proof, entry.nonced_verification_data.max_fee, entry.sender,
);

// it is sufficient to check this max fee because it will be the lowest since its sorted
if fee_per_proof < entry.nonced_verification_data.max_fee && num_proofs >= 2 {
finalized_batch_works = true;
} else if finalized_batch_works {
// Can not add latest element since it is not willing to pay the corresponding fee
// Could potentially still find another working solution later with more elements,
// maybe we can explore all lengths in a future version
// or do the reverse from this, try with whole batch,
// then with whole batch minus last element, etc
break;
// FIXME: We should refactor this code and instead of returning None, return an error.
// See issue https://github.com/yetanotherco/aligned_layer/issues/1046.
Err(e) => {
error!("Unexpected error: {:?}", e);
*batch_posting = false;
None
}

// Either max fee is insufficient but we have not found a working solution yet,
// or we can keep adding to a working batch,
// Either way we need to keep iterating
finalized_batch_size += serialized_vd_size;

// We can unwrap here because we have already peeked to check there is a value
let (entry, _) = batch_queue_copy.pop().unwrap();
finalized_batch.push(entry);
}

if finalized_batch_works {
Some(finalized_batch)
} else {
None
}
}

Expand Down Expand Up @@ -993,13 +929,17 @@ impl Batcher {
)
.await
{
for entry in finalized_batch.iter() {
let merkle_root = hex::encode(batch_merkle_tree.root);
send_message(
entry.messaging_sink.clone(),
ResponseMessage::CreateNewTaskError(merkle_root),
)
.await
for entry in finalized_batch.into_iter() {
if let Some(ws_sink) = entry.messaging_sink {
let merkle_root = hex::encode(batch_merkle_tree.root);
send_message(
ws_sink.clone(),
ResponseMessage::CreateNewTaskError(merkle_root),
)
.await
} else {
warn!("Websocket sink was found empty. This should only happen in tests");
}
}

self.flush_queue_and_clear_nonce_cache().await;
Expand All @@ -1015,7 +955,11 @@ impl Batcher {
let mut batch_state = self.batch_state.lock().await;

for (entry, _) in batch_state.batch_queue.iter() {
send_message(entry.messaging_sink.clone(), ResponseMessage::BatchReset).await;
if let Some(ws_sink) = entry.messaging_sink.as_ref() {
send_message(ws_sink.clone(), ResponseMessage::BatchReset).await;
} else {
warn!("Websocket sink was found empty. This should only happen in tests");
}
}

batch_state.batch_queue.clear();
Expand Down Expand Up @@ -1371,8 +1315,11 @@ async fn send_batch_inclusion_data_responses(
let serialized_response = cbor_serialize(&response)
.map_err(|e| BatcherError::SerializationError(e.to_string()))?;

let sending_result = entry
.messaging_sink
let Some(ws_sink) = entry.messaging_sink.as_ref() else {
return Err(BatcherError::WsSinkEmpty);
};

let sending_result = ws_sink
.write()
.await
.send(Message::binary(serialized_response))
Expand Down
Loading

0 comments on commit 90ee57c

Please sign in to comment.