Skip to content

Commit

Permalink
Merge branch 'main' into 323-feat-add-github-action-to-upload-batcher…
Browse files Browse the repository at this point in the history
…-client-binary-in-each-release
  • Loading branch information
NicolasRampoldi authored Jun 6, 2024
2 parents ef03fac + 7ed3f3e commit 5ab23c6
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 99 deletions.
122 changes: 59 additions & 63 deletions aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pkg

import (
"context"
"encoding/hex"
"sync"
"time"

Expand All @@ -28,10 +29,7 @@ const QUORUM_NUMBER = byte(0)
const QUORUM_THRESHOLD = byte(67)

// Aggregator stores TaskResponse for a task here
type TaskResponsesWithStatus struct {
taskResponses []types.SignedTaskResponse
submittedToEthereum bool
}
type TaskResponses = []types.SignedTaskResponse

type Aggregator struct {
AggregatorConfig *config.AggregatorConfig
Expand All @@ -46,26 +44,23 @@ type Aggregator struct {
// Since our ID is not an idx, we build this cache
// Note: In case of a reboot, this doesn't need to be loaded,
// and can start from zero
batchesRootByIdx map[uint32][32]byte
batchesRootByIdxMutex *sync.Mutex
batchesRootByIdx map[uint32][32]byte

// This is the counterpart,
// to use when we have the batch but not the index
// Note: In case of a reboot, this doesn't need to be loaded,
// and can start from zero
batchesIdxByRoot map[[32]byte]uint32
batchesIdxByRootMutex *sync.Mutex
batchesIdxByRoot map[[32]byte]uint32

// This task index is to communicate with the local BLS
// Service.
// Note: In case of a reboot it can start from 0 again
nextBatchIndex uint32
nextBatchIndexMutex *sync.Mutex
nextBatchIndex uint32

// Mutex to protect batchesRootByIdx, batchesIdxByRoot and nextBatchIndex
taskMutex *sync.Mutex

OperatorTaskResponses map[[32]byte]*TaskResponsesWithStatus
// Mutex to protect the taskResponses map
batchesResponseMutex *sync.Mutex
logger logging.Logger
logger logging.Logger

metricsReg *prometheus.Registry
metrics *metrics.Metrics
Expand All @@ -92,8 +87,6 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesRootByIdx := make(map[uint32][32]byte)
batchesIdxByRoot := make(map[[32]byte]uint32)

operatorTaskResponses := make(map[[32]byte]*TaskResponsesWithStatus, 0)

chainioConfig := sdkclients.BuildAllConfig{
EthHttpUrl: aggregatorConfig.BaseConfig.EthRpcUrl,
EthWsUrl: aggregatorConfig.BaseConfig.EthWsUrl,
Expand Down Expand Up @@ -129,17 +122,11 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
avsWriter: avsWriter,
NewBatchChan: newBatchChan,

batchesRootByIdx: batchesRootByIdx,
batchesRootByIdxMutex: &sync.Mutex{},

batchesIdxByRoot: batchesIdxByRoot,
batchesIdxByRootMutex: &sync.Mutex{},

nextBatchIndex: nextBatchIndex,
nextBatchIndexMutex: &sync.Mutex{},
batchesRootByIdx: batchesRootByIdx,
batchesIdxByRoot: batchesIdxByRoot,
nextBatchIndex: nextBatchIndex,
taskMutex: &sync.Mutex{},

OperatorTaskResponses: operatorTaskResponses,
batchesResponseMutex: &sync.Mutex{},
blsAggregationService: blsAggregationService,
logger: logger,
metricsReg: reg,
Expand Down Expand Up @@ -173,13 +160,16 @@ func (agg *Aggregator) Start(ctx context.Context) error {
case err := <-metricsErrChan:
agg.logger.Fatal("Metrics server failed", "err", err)
case blsAggServiceResp := <-agg.blsAggregationService.GetResponseChannel():
agg.logger.Info("Received response from BLS aggregation service", "blsAggServiceResp", blsAggServiceResp)
agg.logger.Info("Received response from BLS aggregation service",
"taskIndex", blsAggServiceResp.TaskIndex)
agg.sendAggregatedResponseToContract(blsAggServiceResp)
agg.metrics.IncAggregatedResponses()
}
}
}

const MaxSentTxRetries = 5

func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg.BlsAggregationServiceResponse) {
if blsAggServiceResp.Err != nil {
agg.logger.Error("BlsAggregationServiceResponse contains an error", "err", blsAggServiceResp.Err)
Expand All @@ -206,70 +196,76 @@ func (agg *Aggregator) sendAggregatedResponseToContract(blsAggServiceResp blsagg
NonSignerStakeIndices: blsAggServiceResp.NonSignerStakeIndices,
}

agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Fetching merkle root")
batchMerkleRoot := agg.batchesRootByIdx[blsAggServiceResp.TaskIndex]
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching merkle root")
agg.taskMutex.Unlock()

agg.logger.Info("Threshold reached. Sending aggregated response onchain.",
"taskIndex", blsAggServiceResp.TaskIndex,
)
"merkleRoot", hex.EncodeToString(batchMerkleRoot[:]))

agg.batchesRootByIdxMutex.Lock()
batchMerkleRoot := agg.batchesRootByIdx[blsAggServiceResp.TaskIndex]
agg.batchesRootByIdxMutex.Unlock()
var err error

_, err := agg.avsWriter.SendAggregatedResponse(context.Background(), batchMerkleRoot, nonSignerStakesAndSignature)
if err != nil {
agg.logger.Error("Aggregator failed to respond to task", "err", err)
for i := 0; i < MaxSentTxRetries; i++ {
_, err = agg.avsWriter.SendAggregatedResponse(context.Background(), batchMerkleRoot, nonSignerStakesAndSignature)
if err == nil {
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"merkleRoot", hex.EncodeToString(batchMerkleRoot[:]))

return
}

// Sleep for a bit before retrying
time.Sleep(2 * time.Second)
}

agg.logger.Error("Aggregator failed to respond to task, this batch will be lost",
"err", err,
"taskIndex", blsAggServiceResp.TaskIndex,
"merkleRoot", hex.EncodeToString(batchMerkleRoot[:]))
}


func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, taskCreatedBlock uint32) {
agg.AggregatorConfig.BaseConfig.Logger.Info("Adding new task", "Batch merkle root", batchMerkleRoot)

agg.nextBatchIndexMutex.Lock()
batchIndex := agg.nextBatchIndex
agg.nextBatchIndexMutex.Unlock()
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Adding new task")

// --- UPDATE BATCH - INDEX CACHES ---

agg.batchesRootByIdxMutex.Lock()
if _, ok := agg.batchesRootByIdx[batchIndex]; ok {
batchIndex := agg.nextBatchIndex
if _, ok := agg.batchesIdxByRoot[batchMerkleRoot]; ok {
agg.logger.Warn("Batch already exists", "batchIndex", batchIndex, "batchRoot", batchMerkleRoot)
agg.batchesRootByIdxMutex.Unlock()
agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
return
}
agg.batchesRootByIdx[batchIndex] = batchMerkleRoot
agg.batchesRootByIdxMutex.Unlock()

agg.batchesIdxByRootMutex.Lock()
// This shouldn't happen, since both maps are updated together
if _, ok := agg.batchesIdxByRoot[batchMerkleRoot]; ok {
if _, ok := agg.batchesRootByIdx[batchIndex]; ok {
agg.logger.Warn("Batch already exists", "batchIndex", batchIndex, "batchRoot", batchMerkleRoot)
agg.batchesRootByIdxMutex.Unlock()
agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
return
}
agg.batchesIdxByRoot[batchMerkleRoot] = batchIndex
agg.batchesIdxByRootMutex.Unlock()

// --- UPDATE TASK RESPONSES ---

agg.batchesResponseMutex.Lock()
agg.OperatorTaskResponses[batchMerkleRoot] = &TaskResponsesWithStatus{
taskResponses: make([]types.SignedTaskResponse, 0),
submittedToEthereum: false,
}
agg.batchesResponseMutex.Unlock()
agg.batchesIdxByRoot[batchMerkleRoot] = batchIndex
agg.batchesRootByIdx[batchIndex] = batchMerkleRoot
agg.nextBatchIndex += 1

quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)}
quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)}

err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, 100*time.Second)

// --- INCREASE BATCH INDEX ---

agg.nextBatchIndexMutex.Lock()
agg.nextBatchIndex = agg.nextBatchIndex + 1
agg.nextBatchIndexMutex.Unlock()

// FIXME(marian): When this errors, should we retry initializing new task? Logging fatal for now.
if err != nil {
agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err)
}

agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Adding new task")
agg.logger.Info("New task added", "batchIndex", batchIndex, "batchMerkleRoot", batchMerkleRoot)
}
72 changes: 48 additions & 24 deletions aggregator/internal/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package pkg

import (
"context"
"encoding/hex"
"fmt"
"net/http"
"net/rpc"
"time"

"github.com/yetanotherco/aligned_layer/core/types"
)
Expand Down Expand Up @@ -43,35 +45,57 @@ func (agg *Aggregator) ServeOperators() error {
// - 0: Success
// - 1: Error
func (agg *Aggregator) ProcessOperatorSignedTaskResponse(signedTaskResponse *types.SignedTaskResponse, reply *uint8) error {

agg.AggregatorConfig.BaseConfig.Logger.Info("New task response", "taskResponse", signedTaskResponse)

if _, ok := agg.OperatorTaskResponses[signedTaskResponse.BatchMerkleRoot]; !ok {
agg.AggregatorConfig.BaseConfig.Logger.Info("New task response",
"merkleRoot", hex.EncodeToString(signedTaskResponse.BatchMerkleRoot[:]),
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))

agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
taskIndex, ok := agg.batchesIdxByRoot[signedTaskResponse.BatchMerkleRoot]
if !ok {
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources")
agg.taskMutex.Unlock()
return fmt.Errorf("task with batch merkle root %d does not exist", signedTaskResponse.BatchMerkleRoot)
}

agg.batchesResponseMutex.Lock()
taskResponses := agg.OperatorTaskResponses[signedTaskResponse.BatchMerkleRoot]
taskResponses.taskResponses = append(
agg.OperatorTaskResponses[signedTaskResponse.BatchMerkleRoot].taskResponses,
*signedTaskResponse)
agg.batchesResponseMutex.Unlock()

agg.batchesIdxByRootMutex.Lock()
taskIndex := agg.batchesIdxByRoot[signedTaskResponse.BatchMerkleRoot]
agg.batchesIdxByRootMutex.Unlock()

err := agg.blsAggregationService.ProcessNewSignature(
context.Background(), taskIndex, signedTaskResponse.BatchMerkleRoot,
&signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId,
)
if err != nil {
agg.logger.Warnf("BLS aggregation service error: %s", err)
*reply = 1
return err
// Don't wait infinitely if it can't answer
// Create a context with a timeout of 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() // Ensure the cancel function is called to release resources

// Create a channel to signal when the task is done
done := make(chan struct{})

agg.logger.Info("Starting bls signature process")
go func() {
err := agg.blsAggregationService.ProcessNewSignature(
context.Background(), taskIndex, signedTaskResponse.BatchMerkleRoot,
&signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId,
)

if err != nil {
agg.logger.Warnf("BLS aggregation service error: %s", err)
} else {
agg.logger.Info("BLS process succeeded")
}

close(done)
}()

*reply = 1
// Wait for either the context to be done or the task to complete
select {
case <-ctx.Done():
// The context's deadline was exceeded or it was canceled
agg.logger.Info("Bls process timed out, operator signature will be lost. Batch may not reach quorum")
case <-done:
// The task completed successfully
agg.logger.Info("Bls context finished correctly")
*reply = 0
}

*reply = 0
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Task response processing finished")
agg.taskMutex.Unlock()

return nil
}
Expand Down
5 changes: 2 additions & 3 deletions aggregator/internal/pkg/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
)

const (
MaxRetries = 20
RetryInterval = 10 * time.Second
MaxRetries = 100
RetryInterval = 1 * time.Second
)

func (agg *Aggregator) SubscribeToNewTasks() error {
Expand All @@ -32,7 +32,6 @@ func (agg *Aggregator) subscribeToNewTasks() error {
for {
select {
case err := <-agg.taskSubscriber.Err():
agg.AggregatorConfig.BaseConfig.Logger.Error("Error in subscription", "err", err)
return err
case newBatch := <-agg.NewBatchChan:
agg.AddNewTask(newBatch.BatchMerkleRoot, newBatch.TaskCreatedBlock)
Expand Down
9 changes: 0 additions & 9 deletions core/chainio/avs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (w *AvsWriter) SendAggregatedResponse(ctx context.Context, batchMerkleRoot
txOpts.NoSend = true // simulate the transaction
tx, err := w.AvsContractBindings.ServiceManager.RespondToTask(&txOpts, batchMerkleRoot, nonSignerStakesAndSignature)
if err != nil {
w.logger.Error("Error submitting SubmitTaskResponse tx while calling respondToTask", "err", err)
return nil, err
}

Expand All @@ -99,7 +98,6 @@ func (w *AvsWriter) SendAggregatedResponse(ctx context.Context, batchMerkleRoot
txOpts.GasLimit = tx.Gas() * 110 / 100 // Add 10% to the gas limit
tx, err = w.AvsContractBindings.ServiceManager.RespondToTask(&txOpts, batchMerkleRoot, nonSignerStakesAndSignature)
if err != nil {
w.logger.Error("Error submitting SubmitTaskResponse tx while calling respondToTask", "err", err)
return nil, err
}

Expand All @@ -108,13 +106,6 @@ func (w *AvsWriter) SendAggregatedResponse(ctx context.Context, batchMerkleRoot
return nil, err
}

taskRespondedEvent, err := w.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerFilterer.ParseBatchVerified(*receipt.Logs[0])
if err != nil {
return nil, err
}

// FIXME(marian): Dummy log to check integration with the contract
w.logger.Infof("TASK RESPONDED EVENT: %+v", taskRespondedEvent)
return receipt, nil
}

Expand Down

0 comments on commit 5ab23c6

Please sign in to comment.