Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V0.6.0 #947

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
eadece7
fix: add again old NewBatch event
uri-99 Aug 26, 2024
f27fd9a
chore: make bindings and redeploy contracts
uri-99 Aug 26, 2024
e015126
fix: go bindings
uri-99 Aug 26, 2024
d1a325f
feat: add RespondToTaskOld
uri-99 Aug 26, 2024
3ca02bc
refactor: change _old for old, to V2 for new
uri-99 Aug 26, 2024
ca7fd53
feat: add sendAggregatedResponseV2 to aggregator
uri-99 Aug 26, 2024
679ed22
feat: add processNewBatchV2 to avs_subscriber
uri-99 Aug 26, 2024
feab74c
feat: add SignedTaskResponseV2
uri-99 Aug 26, 2024
9fe3900
feat: add ProcessNewBatchLogV2
uri-99 Aug 26, 2024
019f7cc
feat: subscribeToNewTasksV2, SubscribeToNewTasksV2, getLatestTaskFrom…
uri-99 Aug 26, 2024
60b1ccd
feat: aggregator: sendAggregatedResponseV2, AddNewTaskV2
uri-99 Aug 26, 2024
0038673
feat: subscriber SubscribeToNewTasksV2
uri-99 Aug 26, 2024
e1088d3
feat: server ProcessOperatorSignedTaskResponseV2
uri-99 Aug 26, 2024
318ef36
feat: operator NewTaskCreatedChanV2, and discerning ProcessNewBatchLogV2
uri-99 Aug 26, 2024
48d7cc8
feat: rpc_client SendSignedTaskResponseToAggregatorV2
uri-99 Aug 26, 2024
99b503f
feat: aggregator v1 v2 discerning
uri-99 Aug 26, 2024
5a5f4f8
fixes: fix
uri-99 Aug 26, 2024
6fa127a
remove: agg subscribe to v2 in different goroutine, now subscribes bo…
uri-99 Aug 27, 2024
549a2ba
feat: agg subscribes to both success
uri-99 Aug 27, 2024
943d7a1
feat: add if block.number in alignedservicemanager.sol
uri-99 Aug 27, 2024
79a6a33
feat: reading of blockNumber insteadof B2 bool
uri-99 Aug 27, 2024
642169d
feat: if block num in aggregator
uri-99 Aug 27, 2024
87ac992
fix: lower block num
uri-99 Aug 27, 2024
23947ee
fix: if conditions and switchBlockNumber
uri-99 Aug 27, 2024
06e86d6
refactor: comment on respondToTask easier to use
uri-99 Aug 27, 2024
39bbb6d
chore: update OPERATOR_VERSION to v0.5.0
JuArce Aug 27, 2024
2aa3500
chore: set switch block at 2_268_375
JuArce Aug 27, 2024
5e12786
chore: set switch block at 2_268_375 in service manager
JuArce Aug 27, 2024
2cda24f
chore: update config operator template
JuArce Aug 27, 2024
b5ff94b
fix: use correct types
JuArce Aug 27, 2024
f2fac00
chore: update OPERATOR VERSION
JuArce Aug 27, 2024
6908110
fix: sdk version for v060 (#944)
uri-99 Sep 9, 2024
dd52e7c
fix: public inputs sdk integration (#945)
NicolasRampoldi Sep 9, 2024
bdda0d2
fix: apply zkquiz fixes from v052 to v060 (#946)
uri-99 Sep 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ OS := $(shell uname -s)
CONFIG_FILE?=config-files/config.yaml
AGG_CONFIG_FILE?=config-files/config-aggregator.yaml

OPERATOR_VERSION=v0.4.1
OPERATOR_VERSION=v0.5.2

ifeq ($(OS),Linux)
BUILD_ALL_FFI = $(MAKE) build_all_ffi_linux
Expand Down
2 changes: 1 addition & 1 deletion aggregator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func aggregatorMain(ctx *cli.Context) error {
return err
}

// Listen for new task created in the ServiceManager contract in a separate goroutine
// Listen for new task created in the ServiceManager contract in a separate goroutine, both V1 and V2 subscriptions:
go func() {
listenErr := aggregator.SubscribeToNewTasks()
if listenErr != nil {
Expand Down
121 changes: 107 additions & 14 deletions aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type BatchData struct {
type Aggregator struct {
AggregatorConfig *config.AggregatorConfig
NewBatchChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatch
NewBatchChanV2 chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2
avsReader *chainio.AvsReader
avsSubscriber *chainio.AvsSubscriber
avsWriter *chainio.AvsWriter
Expand Down Expand Up @@ -92,6 +93,7 @@ type Aggregator struct {

func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error) {
newBatchChan := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatch)
newBatchChanV2 := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)

avsReader, err := chainio.NewAvsReaderFromConfig(aggregatorConfig.BaseConfig, aggregatorConfig.EcdsaConfig)
if err != nil {
Expand Down Expand Up @@ -162,6 +164,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
avsSubscriber: avsSubscriber,
avsWriter: avsWriter,
NewBatchChan: newBatchChan,
NewBatchChanV2: newBatchChanV2,

batchesIdentifierHashByIdx: batchesIdentifierHashByIdx,
batchesIdxByIdentifierHash: batchesIdxByIdentifierHash,
Expand Down Expand Up @@ -207,14 +210,16 @@ func (agg *Aggregator) Start(ctx context.Context) error {
case blsAggServiceResp := <-agg.blsAggregationService.GetResponseChannel():
agg.logger.Info("Received response from BLS aggregation service",
"taskIndex", blsAggServiceResp.TaskIndex)

go agg.handleBlsAggServiceResponse(blsAggServiceResp)
}
}
}

const MaxSentTxRetries = 5

var switchBlockNumber = uint64(2_268_375) // 2_268_375 is the block at sep 3th 15:00

func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
if blsAggServiceResp.Err != nil {
agg.taskMutex.Lock()
Expand Down Expand Up @@ -276,19 +281,37 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA

agg.logger.Info("Sending aggregated response onchain", "taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

current_task_block := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]
for i := 0; i < MaxSentTxRetries; i++ {
_, err = agg.sendAggregatedResponse(batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
if err == nil {
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

return
if current_task_block < switchBlockNumber {
agg.logger.Info("agg if V1")
_, err = agg.sendAggregatedResponse(batchData.BatchMerkleRoot, nonSignerStakesAndSignature)
if err == nil {
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

return
}

// Sleep for a bit before retrying
time.Sleep(2 * time.Second)

} else {
agg.logger.Info("agg if V2")
_, err = agg.sendAggregatedResponseV2(batchData.BatchMerkleRoot, batchData.SenderAddress, nonSignerStakesAndSignature)
if err == nil {
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

return
}

// Sleep for a bit before retrying
time.Sleep(2 * time.Second)
}

// Sleep for a bit before retrying
time.Sleep(2 * time.Second)
}

agg.logger.Error("Aggregator failed to respond to task, this batch will be lost",
Expand All @@ -301,7 +324,32 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA

// / Sends response to contract and waits for transaction receipt
// / Returns error if it fails to send tx or receipt is not found
func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*gethtypes.Receipt, error) {
func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*gethtypes.Receipt, error) {
agg.walletMutex.Lock()
agg.logger.Infof("- Locked Wallet Resources: Sending aggregated response for batch",
"merkleRoot", hex.EncodeToString(batchMerkleRoot[:]))

txHash, err := agg.avsWriter.SendAggregatedResponse(batchMerkleRoot, nonSignerStakesAndSignature)
if err != nil {
agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchMerkleRoot[:]), err)
return nil, err
}

agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchMerkleRoot[:]))

receipt, err := utils.WaitForTransactionReceipt(
agg.AggregatorConfig.BaseConfig.EthRpcClient, context.Background(), *txHash)
if err != nil {
return nil, err
}

agg.metrics.IncAggregatedResponses()

return receipt, nil
}
func (agg *Aggregator) sendAggregatedResponseV2(batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*gethtypes.Receipt, error) {
batchIdentifier := append(batchMerkleRoot[:], senderAddress[:]...)
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))

Expand All @@ -311,7 +359,7 @@ func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, senderAd
"senderAddress", hex.EncodeToString(senderAddress[:]),
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))

txHash, err := agg.avsWriter.SendAggregatedResponse(batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
txHash, err := agg.avsWriter.SendAggregatedResponseV2(batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
if err != nil {
agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
Expand All @@ -332,8 +380,53 @@ func (agg *Aggregator) sendAggregatedResponse(batchMerkleRoot [32]byte, senderAd
return receipt, nil
}

func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]byte, taskCreatedBlock uint32) {
func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, taskCreatedBlock uint32) {
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task",
"Batch merkle root", "0x"+hex.EncodeToString(batchMerkleRoot[:]))

agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Adding new task")

// --- UPDATE BATCH - INDEX CACHES ---
batchIndex := agg.nextBatchIndex
if _, ok := agg.batchesIdxByIdentifierHash[batchMerkleRoot]; ok {
agg.logger.Warn("Batch already exists", "batchIndex", batchIndex, "batchIdentifierHash (actually batchMerkleRoot)", batchMerkleRoot)
agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
return
}

// This shouldn't happen, since both maps are updated together
if _, ok := agg.batchesIdentifierHashByIdx[batchIndex]; ok {
agg.logger.Warn("Batch already exists", "batchIndex", batchIndex, "batchIdentifierHash (actually batchMerkleRoot)", batchMerkleRoot)
agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
return
}

agg.batchesIdxByIdentifierHash[batchMerkleRoot] = batchIndex
agg.batchCreatedBlockByIdx[batchIndex] = uint64(taskCreatedBlock)
agg.batchesIdentifierHashByIdx[batchIndex] = batchMerkleRoot
agg.batchDataByIdentifierHash[batchMerkleRoot] = BatchData{
BatchMerkleRoot: batchMerkleRoot,
SenderAddress: [20]byte{},
}
agg.nextBatchIndex += 1

quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}

err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, 100*time.Second)
// FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
if err != nil {
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
}

agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
agg.logger.Info("New task added", "batchIndex", batchIndex, "batchIdentifierHash (actually batchMerkleRoot)", "0x"+hex.EncodeToString(batchMerkleRoot[:]))
}
func (agg *Aggregator) AddNewTaskV2(batchMerkleRoot [32]byte, senderAddress [20]byte, taskCreatedBlock uint32) {
batchIdentifier := append(batchMerkleRoot[:], senderAddress[:]...)
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))

Expand Down
91 changes: 91 additions & 0 deletions aggregator/internal/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,97 @@ func (agg *Aggregator) ServeOperators() error {
// - 0: Success
// - 1: Error
func (agg *Aggregator) ProcessOperatorSignedTaskResponse(signedTaskResponse *types.SignedTaskResponse, reply *uint8) error {
agg.AggregatorConfig.BaseConfig.Logger.Info("New task response",
"BatchMerkleRoot", "0x"+hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))

taskIndex := uint32(0)
ok := false

for i := 0; i < waitForEventRetries; i++ {
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
taskIndex, ok = agg.batchesIdxByIdentifierHash[signedTaskResponse.BatchMerkleRoot]
if !ok {
agg.taskMutex.Unlock()
agg.logger.Info("- Unlocked Resources: Task not found in the internal map")
time.Sleep(waitForEventSleepSeconds)
} else {
break
}
}

if !ok {
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
*reply = 1
return nil
}

// Note: we already have lock here
agg.logger.Debug("- Checking if operator already responded")
batchResponses, ok := agg.operatorRespondedBatch[taskIndex]
if !ok {
batchResponses = make(map[eigentypes.Bytes32]struct{})
agg.operatorRespondedBatch[taskIndex] = batchResponses
}

if _, ok := batchResponses[signedTaskResponse.OperatorId]; ok {
*reply = 0
agg.logger.Warn("Operator already responded, ignoring",
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]),
"taskIndex", taskIndex, "batchMerkleRoot", hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]))

agg.taskMutex.Unlock()
return nil
}

batchResponses[signedTaskResponse.OperatorId] = struct{}{}

// Don't wait infinitely if it can't answer
// Create a context with a timeout of 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() // Ensure the cancel function is called to release resources

// Create a channel to signal when the task is done
done := make(chan struct{})

agg.logger.Info("Starting bls signature process")
go func() {
err := agg.blsAggregationService.ProcessNewSignature(
context.Background(), taskIndex, signedTaskResponse.BatchMerkleRoot,
&signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId,
)

if err != nil {
agg.logger.Warnf("BLS aggregation service error: %s", err)
// remove operator from the list of operators that responded
// so that it can try again
delete(batchResponses, signedTaskResponse.OperatorId)
} else {
agg.logger.Info("BLS process succeeded")
}

close(done)
}()

*reply = 1
// Wait for either the context to be done or the task to complete
select {
case <-ctx.Done():
// The context's deadline was exceeded or it was canceled
agg.logger.Info("Bls process timed out, operator signature will be lost. Batch may not reach quorum")
case <-done:
// The task completed successfully
agg.logger.Info("Bls context finished correctly")
*reply = 0
}

agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Task response processing finished")
agg.taskMutex.Unlock()

return nil
}
func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *types.SignedTaskResponseV2, reply *uint8) error {
agg.AggregatorConfig.BaseConfig.Logger.Info("New task response",
"BatchMerkleRoot", "0x"+hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
"SenderAddress", "0x"+hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
Expand Down
33 changes: 31 additions & 2 deletions aggregator/internal/pkg/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,39 @@ func (agg *Aggregator) SubscribeToNewTasks() error {
if err != nil {
return err
}
err = agg.subscribeToNewTasksV2()
if err != nil {
return err
}

var switchBlockNumber = uint32(2_268_375) // 2_268_375 is the block at sep 3th 15:00

for {
select {
case err := <-agg.taskSubscriber:
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to subscribe to new tasks", "err", err)

// TODO not sure if this is the best way, but no way to calculate blocknumber from here
err = agg.subscribeToNewTasks()
errV2 := agg.subscribeToNewTasksV2()
if err != nil {
return err
}
if errV2 != nil {
return err
}
case newBatch := <-agg.NewBatchChan:
agg.AddNewTask(newBatch.BatchMerkleRoot, newBatch.SenderAddress, newBatch.TaskCreatedBlock)
if newBatch.TaskCreatedBlock < switchBlockNumber {
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task, V1")
agg.AddNewTask(newBatch.BatchMerkleRoot, newBatch.TaskCreatedBlock)
}
case newBatchV2 := <-agg.NewBatchChanV2:
if newBatchV2.TaskCreatedBlock >= switchBlockNumber {
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task, V2")
agg.AddNewTaskV2(newBatchV2.BatchMerkleRoot, newBatchV2.SenderAddress, newBatchV2.TaskCreatedBlock)
}
}
}

}

func (agg *Aggregator) subscribeToNewTasks() error {
Expand All @@ -33,3 +51,14 @@ func (agg *Aggregator) subscribeToNewTasks() error {

return err
}
func (agg *Aggregator) subscribeToNewTasksV2() error {
var err error

agg.taskSubscriber, err = agg.avsSubscriber.SubscribeToNewTasksV2(agg.NewBatchChanV2)

if err != nil {
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", err)
}

return err
}
2 changes: 1 addition & 1 deletion batcher/aligned-sdk/abi/AlignedLayerServiceManager.json

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions batcher/aligned-sdk/src/communication/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,17 @@ pub async fn await_batch_verification(
aligned_verification_data: &AlignedVerificationData,
rpc_url: &str,
chain: Chain,
payment_service_addr: &str,
) -> Result<(), errors::SubmitError> {
for _ in 0..RETRIES {
if is_proof_verified(aligned_verification_data, chain.clone(), rpc_url)
.await
.is_ok_and(|r| r)
if is_proof_verified(
aligned_verification_data,
chain.clone(),
rpc_url,
payment_service_addr,
)
.await
.is_ok_and(|r| r)
{
return Ok(());
}
Expand Down
Loading